From 67a0b4b4b1176f2eec1eca615a3ebe1af41cdff9 Mon Sep 17 00:00:00 2001 From: Drew Kerrigan Date: Tue, 29 May 2018 16:58:03 -0400 Subject: [PATCH] Metrics: Add support for InfluxDB Database / RetentionPolicy and HTTP client --- cmd/configuration.go | 2 + docs/configuration/backends/web.md | 21 ++++++ docs/configuration/metrics.md | 22 ++++++ metrics/influxdb.go | 117 ++++++++++++++++++++++++++--- metrics/influxdb_test.go | 60 +++++++++++++++ types/types.go | 7 +- 6 files changed, 217 insertions(+), 12 deletions(-) diff --git a/cmd/configuration.go b/cmd/configuration.go index 2480187bd..3c986ada4 100644 --- a/cmd/configuration.go +++ b/cmd/configuration.go @@ -78,6 +78,7 @@ func NewTraefikDefaultPointersConfiguration() *TraefikConfiguration { }, InfluxDB: &types.InfluxDB{ Address: "localhost:8089", + Protocol: "udp", PushInterval: "10s", }, } @@ -262,6 +263,7 @@ func NewTraefikDefaultPointersConfiguration() *TraefikConfiguration { }, InfluxDB: &types.InfluxDB{ Address: "localhost:8089", + Protocol: "udp", PushInterval: "10s", }, } diff --git a/docs/configuration/backends/web.md b/docs/configuration/backends/web.md index 0347bddf9..489fa9363 100644 --- a/docs/configuration/backends/web.md +++ b/docs/configuration/backends/web.md @@ -185,6 +185,13 @@ pushinterval = "10s" # address = "localhost:8089" +# InfluxDB's address protocol (udp or http) +# +# Required +# Default: "udp" +# +protocol = "udp" + # InfluxDB push interval # # Optional @@ -192,6 +199,20 @@ address = "localhost:8089" # pushinterval = "10s" +# InfluxDB database used when protocol is http +# +# Optional +# Default: "" +# +database = "" + +# InfluxDB retention policy used when protocol is http +# +# Optional +# Default: "" +# +retentionpolicy = "" + # ... ``` diff --git a/docs/configuration/metrics.md b/docs/configuration/metrics.md index b305d1355..34fe943be 100644 --- a/docs/configuration/metrics.md +++ b/docs/configuration/metrics.md @@ -80,6 +80,7 @@ # ... ``` + ### InfluxDB ```toml @@ -96,6 +97,13 @@ # address = "localhost:8089" + # InfluxDB's address protocol (udp or http) + # + # Required + # Default: "udp" + # + protocol = "udp" + # InfluxDB push interval # # Optional @@ -103,6 +111,20 @@ # pushinterval = "10s" + # InfluxDB database used when protocol is http + # + # Optional + # Default: "" + # + database = "" + + # InfluxDB retention policy used when protocol is http + # + # Optional + # Default: "" + # + retentionpolicy = "" + # ... ``` diff --git a/metrics/influxdb.go b/metrics/influxdb.go index f3c862a57..6f777a986 100644 --- a/metrics/influxdb.go +++ b/metrics/influxdb.go @@ -2,6 +2,9 @@ package metrics import ( "bytes" + "fmt" + "net/url" + "regexp" "time" "github.com/containous/traefik/log" @@ -12,10 +15,7 @@ import ( influxdb "github.com/influxdata/influxdb/client/v2" ) -var influxDBClient = influx.New(map[string]string{}, influxdb.BatchPointsConfig{}, kitlog.LoggerFunc(func(keyvals ...interface{}) error { - log.Info(keyvals) - return nil -})) +var influxDBClient *influx.Influx type influxDBWriter struct { buf bytes.Buffer @@ -41,6 +41,9 @@ const ( // RegisterInfluxDB registers the metrics pusher if this didn't happen yet and creates a InfluxDB Registry instance. func RegisterInfluxDB(config *types.InfluxDB) Registry { + if influxDBClient == nil { + influxDBClient = initInfluxDBClient(config) + } if influxDBTicker == nil { influxDBTicker = initInfluxDBTicker(config) } @@ -62,7 +65,48 @@ func RegisterInfluxDB(config *types.InfluxDB) Registry { } } -// initInfluxDBTicker initializes metrics pusher and creates a influxDBClient if not created already +// initInfluxDBTicker creates a influxDBClient +func initInfluxDBClient(config *types.InfluxDB) *influx.Influx { + // TODO deprecated: move this switch into configuration.SetEffectiveConfiguration when web provider will be removed. + switch config.Protocol { + case "udp": + if len(config.Database) > 0 || len(config.RetentionPolicy) > 0 { + log.Warn("Database and RetentionPolicy are only used when protocol is http.") + config.Database = "" + config.RetentionPolicy = "" + } + case "http": + if u, err := url.Parse(config.Address); err == nil { + if u.Scheme != "http" && u.Scheme != "https" { + log.Warnf("InfluxDB address %s should specify a scheme of http or https, defaulting to http.", config.Address) + config.Address = "http://" + config.Address + } + } else { + log.Errorf("Unable to parse influxdb address: %v, defaulting to udp.", err) + config.Protocol = "udp" + config.Database = "" + config.RetentionPolicy = "" + } + default: + log.Warnf("Unsupported protocol: %s, defaulting to udp.", config.Protocol) + config.Protocol = "udp" + config.Database = "" + config.RetentionPolicy = "" + } + + return influx.New( + map[string]string{}, + influxdb.BatchPointsConfig{ + Database: config.Database, + RetentionPolicy: config.RetentionPolicy, + }, + kitlog.LoggerFunc(func(keyvals ...interface{}) error { + log.Info(keyvals) + return nil + })) +} + +// initInfluxDBTicker initializes metrics pusher func initInfluxDBTicker(config *types.InfluxDB) *time.Ticker { pushInterval, err := time.ParseDuration(config.PushInterval) if err != nil { @@ -88,16 +132,69 @@ func StopInfluxDB() { influxDBTicker = nil } +// Write creates a http or udp client and attempts to write BatchPoints. +// If a "database not found" error is encountered, a CREATE DATABASE +// query is attempted when using protocol http. func (w *influxDBWriter) Write(bp influxdb.BatchPoints) error { - c, err := influxdb.NewUDPClient(influxdb.UDPConfig{ - Addr: w.config.Address, - }) - + c, err := w.initWriteClient() if err != nil { return err } defer c.Close() - return c.Write(bp) + if writeErr := c.Write(bp); writeErr != nil { + log.Errorf("Error writing to influx: %s", writeErr.Error()) + if handleErr := w.handleWriteError(c, writeErr); handleErr != nil { + return handleErr + } + // Retry write after successful handling of writeErr + return c.Write(bp) + } + return nil +} + +func (w *influxDBWriter) initWriteClient() (c influxdb.Client, err error) { + if w.config.Protocol == "http" { + c, err = influxdb.NewHTTPClient(influxdb.HTTPConfig{ + Addr: w.config.Address, + }) + } else { + c, err = influxdb.NewUDPClient(influxdb.UDPConfig{ + Addr: w.config.Address, + }) + } + return +} + +func (w *influxDBWriter) handleWriteError(c influxdb.Client, writeErr error) error { + if w.config.Protocol != "http" { + return writeErr + } + + match, matchErr := regexp.MatchString("database not found", writeErr.Error()) + + if matchErr != nil || !match { + return writeErr + } + + qStr := fmt.Sprintf("CREATE DATABASE \"%s\"", w.config.Database) + if w.config.RetentionPolicy != "" { + qStr = fmt.Sprintf("%s WITH NAME \"%s\"", qStr, w.config.RetentionPolicy) + } + + log.Debugf("Influx database does not exist, attempting to create with query: %s", qStr) + + q := influxdb.NewQuery(qStr, "", "") + response, queryErr := c.Query(q) + if queryErr == nil && response.Error() != nil { + queryErr = response.Error() + } + if queryErr != nil { + log.Errorf("Error creating InfluxDB database: %s", queryErr) + return queryErr + } + + log.Debugf("Successfully created influx database: %s", w.config.Database) + return nil } diff --git a/metrics/influxdb_test.go b/metrics/influxdb_test.go index 493d54da3..7604416cb 100644 --- a/metrics/influxdb_test.go +++ b/metrics/influxdb_test.go @@ -1,7 +1,10 @@ package metrics import ( + "fmt" + "io/ioutil" "net/http" + "net/http/httptest" "regexp" "strconv" "testing" @@ -62,6 +65,63 @@ func TestInfluxDB(t *testing.T) { assertMessage(t, msgEntrypoint, expectedEntrypoint) } +func TestInfluxDBHTTP(t *testing.T) { + c := make(chan *string) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + body, err := ioutil.ReadAll(r.Body) + if err != nil { + http.Error(w, "can't read body "+err.Error(), http.StatusBadRequest) + return + } + bodyStr := string(body) + c <- &bodyStr + fmt.Fprintln(w, "ok") + })) + defer ts.Close() + + influxDBRegistry := RegisterInfluxDB(&types.InfluxDB{Address: ts.URL, Protocol: "http", PushInterval: "1s", Database: "test", RetentionPolicy: "autogen"}) + defer StopInfluxDB() + + if !influxDBRegistry.IsEnabled() { + t.Fatalf("InfluxDB registry must be enabled") + } + + expectedBackend := []string{ + `(traefik\.backend\.requests\.total,backend=test,code=200,method=GET count=1) [\d]{19}`, + `(traefik\.backend\.requests\.total,backend=test,code=404,method=GET count=1) [\d]{19}`, + `(traefik\.backend\.request\.duration,backend=test,code=200 p50=10000,p90=10000,p95=10000,p99=10000) [\d]{19}`, + `(traefik\.backend\.retries\.total(?:,code=[\d]{3},method=GET)?,backend=test count=2) [\d]{19}`, + `(traefik\.config\.reload\.total(?:[a-z=0-9A-Z,]+)? count=1) [\d]{19}`, + `(traefik\.config\.reload\.total\.failure(?:[a-z=0-9A-Z,]+)? count=1) [\d]{19}`, + `(traefik\.backend\.server\.up,backend=test(?:[a-z=0-9A-Z,]+)?,url=http://127.0.0.1 value=1) [\d]{19}`, + } + + influxDBRegistry.BackendReqsCounter().With("backend", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1) + influxDBRegistry.BackendReqsCounter().With("backend", "test", "code", strconv.Itoa(http.StatusNotFound), "method", http.MethodGet).Add(1) + influxDBRegistry.BackendRetriesCounter().With("backend", "test").Add(1) + influxDBRegistry.BackendRetriesCounter().With("backend", "test").Add(1) + influxDBRegistry.BackendReqDurationHistogram().With("backend", "test", "code", strconv.Itoa(http.StatusOK)).Observe(10000) + influxDBRegistry.ConfigReloadsCounter().Add(1) + influxDBRegistry.ConfigReloadsFailureCounter().Add(1) + influxDBRegistry.BackendServerUpGauge().With("backend", "test", "url", "http://127.0.0.1").Set(1) + msgBackend := <-c + + assertMessage(t, *msgBackend, expectedBackend) + + expectedEntrypoint := []string{ + `(traefik\.entrypoint\.requests\.total,entrypoint=test(?:[a-z=0-9A-Z,:/.]+)? count=1) [\d]{19}`, + `(traefik\.entrypoint\.request\.duration(?:,code=[\d]{3})?,entrypoint=test(?:[a-z=0-9A-Z,:/.]+)? p50=10000,p90=10000,p95=10000,p99=10000) [\d]{19}`, + `(traefik\.entrypoint\.connections\.open,entrypoint=test value=1) [\d]{19}`, + } + + influxDBRegistry.EntrypointReqsCounter().With("entrypoint", "test").Add(1) + influxDBRegistry.EntrypointReqDurationHistogram().With("entrypoint", "test").Observe(10000) + influxDBRegistry.EntrypointOpenConnsGauge().With("entrypoint", "test").Set(1) + msgEntrypoint := <-c + + assertMessage(t, *msgEntrypoint, expectedEntrypoint) +} + func assertMessage(t *testing.T, msg string, patterns []string) { t.Helper() for _, pattern := range patterns { diff --git a/types/types.go b/types/types.go index cc4a981b3..38031f1a1 100644 --- a/types/types.go +++ b/types/types.go @@ -443,8 +443,11 @@ type Statsd struct { // InfluxDB contains address and metrics pushing interval configuration type InfluxDB struct { - Address string `description:"InfluxDB address"` - PushInterval string `description:"InfluxDB push interval"` + Address string `description:"InfluxDB address"` + Protocol string `description:"InfluxDB address protocol (udp or http)"` + PushInterval string `description:"InfluxDB push interval" export:"true"` + Database string `description:"InfluxDB database used when protocol is http" export:"true"` + RetentionPolicy string `description:"InfluxDB retention policy used when protocol is http" export:"true"` } // Buckets holds Prometheus Buckets