1
0
Fork 0

Support InfluxDB v2 metrics backend

This commit is contained in:
Dmitry Sharshakov 2022-02-09 17:32:12 +03:00 committed by GitHub
parent 5780dc2b15
commit ca55dfe1c6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 681 additions and 49 deletions

View file

@ -98,7 +98,7 @@ func RegisterInfluxDB(ctx context.Context, config *types.InfluxDB) Registry {
return registry
}
// initInfluxDBTicker creates a influxDBClient.
// initInfluxDBClient creates a influxDBClient.
func initInfluxDBClient(ctx context.Context, config *types.InfluxDB) *influx.Influx {
logger := log.FromContext(ctx)

144
pkg/metrics/influxdb2.go Normal file
View file

@ -0,0 +1,144 @@
package metrics
import (
"context"
"errors"
"time"
kitlog "github.com/go-kit/kit/log"
"github.com/go-kit/kit/metrics/influx"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
influxdb2api "github.com/influxdata/influxdb-client-go/v2/api"
"github.com/influxdata/influxdb-client-go/v2/api/write"
influxdb2log "github.com/influxdata/influxdb-client-go/v2/log"
influxdb "github.com/influxdata/influxdb1-client/v2"
"github.com/traefik/traefik/v2/pkg/log"
"github.com/traefik/traefik/v2/pkg/safe"
"github.com/traefik/traefik/v2/pkg/types"
)
var (
influxDB2Ticker *time.Ticker
influxDB2Store *influx.Influx
influxDB2Client influxdb2.Client
)
// RegisterInfluxDB2 creates metrics exporter for InfluxDB2.
func RegisterInfluxDB2(ctx context.Context, config *types.InfluxDB2) Registry {
if influxDB2Client == nil {
var err error
if influxDB2Client, err = newInfluxDB2Client(config); err != nil {
log.FromContext(ctx).Error(err)
return nil
}
}
if influxDB2Store == nil {
influxDB2Store = influx.New(
config.AdditionalLabels,
influxdb.BatchPointsConfig{},
kitlog.LoggerFunc(func(kv ...interface{}) error {
log.FromContext(ctx).Error(kv)
return nil
}),
)
influxDB2Ticker = time.NewTicker(time.Duration(config.PushInterval))
safe.Go(func() {
wc := influxDB2Client.WriteAPIBlocking(config.Org, config.Bucket)
influxDB2Store.WriteLoop(ctx, influxDB2Ticker.C, influxDB2Writer{wc: wc})
})
}
registry := &standardRegistry{
configReloadsCounter: influxDB2Store.NewCounter(influxDBConfigReloadsName),
configReloadsFailureCounter: influxDB2Store.NewCounter(influxDBConfigReloadsFailureName),
lastConfigReloadSuccessGauge: influxDB2Store.NewGauge(influxDBLastConfigReloadSuccessName),
lastConfigReloadFailureGauge: influxDB2Store.NewGauge(influxDBLastConfigReloadFailureName),
tlsCertsNotAfterTimestampGauge: influxDB2Store.NewGauge(influxDBTLSCertsNotAfterTimestampName),
}
if config.AddEntryPointsLabels {
registry.epEnabled = config.AddEntryPointsLabels
registry.entryPointReqsCounter = influxDB2Store.NewCounter(influxDBEntryPointReqsName)
registry.entryPointReqsTLSCounter = influxDB2Store.NewCounter(influxDBEntryPointReqsTLSName)
registry.entryPointReqDurationHistogram, _ = NewHistogramWithScale(influxDB2Store.NewHistogram(influxDBEntryPointReqDurationName), time.Second)
registry.entryPointOpenConnsGauge = influxDB2Store.NewGauge(influxDBEntryPointOpenConnsName)
}
if config.AddRoutersLabels {
registry.routerEnabled = config.AddRoutersLabels
registry.routerReqsCounter = influxDB2Store.NewCounter(influxDBRouterReqsName)
registry.routerReqsTLSCounter = influxDB2Store.NewCounter(influxDBRouterReqsTLSName)
registry.routerReqDurationHistogram, _ = NewHistogramWithScale(influxDB2Store.NewHistogram(influxDBRouterReqsDurationName), time.Second)
registry.routerOpenConnsGauge = influxDB2Store.NewGauge(influxDBORouterOpenConnsName)
}
if config.AddServicesLabels {
registry.svcEnabled = config.AddServicesLabels
registry.serviceReqsCounter = influxDB2Store.NewCounter(influxDBServiceReqsName)
registry.serviceReqsTLSCounter = influxDB2Store.NewCounter(influxDBServiceReqsTLSName)
registry.serviceReqDurationHistogram, _ = NewHistogramWithScale(influxDB2Store.NewHistogram(influxDBServiceReqsDurationName), time.Second)
registry.serviceRetriesCounter = influxDB2Store.NewCounter(influxDBServiceRetriesTotalName)
registry.serviceOpenConnsGauge = influxDB2Store.NewGauge(influxDBServiceOpenConnsName)
registry.serviceServerUpGauge = influxDB2Store.NewGauge(influxDBServiceServerUpName)
}
return registry
}
// StopInfluxDB2 stops and resets InfluxDB2 client, ticker and store.
func StopInfluxDB2() {
if influxDB2Client != nil {
influxDB2Client.Close()
}
influxDB2Client = nil
if influxDB2Ticker != nil {
influxDB2Ticker.Stop()
}
influxDB2Ticker = nil
influxDB2Store = nil
}
// newInfluxDB2Client creates an influxdb2.Client.
func newInfluxDB2Client(config *types.InfluxDB2) (influxdb2.Client, error) {
if config.Token == "" || config.Org == "" || config.Bucket == "" {
return nil, errors.New("token, org or bucket property is missing")
}
// Disable InfluxDB2 logs.
// See https://github.com/influxdata/influxdb-client-go/blob/v2.7.0/options.go#L128
influxdb2log.Log = nil
return influxdb2.NewClient(config.Address, config.Token), nil
}
type influxDB2Writer struct {
wc influxdb2api.WriteAPIBlocking
}
func (w influxDB2Writer) Write(bp influxdb.BatchPoints) error {
ctx := log.With(context.Background(), log.Str(log.MetricsProviderName, "influxdb2"))
logger := log.FromContext(ctx)
wps := make([]*write.Point, 0, len(bp.Points()))
for _, p := range bp.Points() {
fields, err := p.Fields()
if err != nil {
logger.Errorf("Error while getting %s point fields: %s", p.Name(), err)
continue
}
wps = append(wps, influxdb2.NewPoint(
p.Name(),
p.Tags(),
fields,
p.Time(),
))
}
return w.wc.WritePoint(ctx, wps...)
}

View file

@ -0,0 +1,145 @@
package metrics
import (
"context"
"fmt"
"io"
"net/http"
"net/http/httptest"
"strconv"
"testing"
"time"
"github.com/stretchr/testify/require"
ptypes "github.com/traefik/paerser/types"
"github.com/traefik/traefik/v2/pkg/types"
)
func TestInfluxDB2(t *testing.T) {
c := make(chan *string)
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
require.NoError(t, err)
bodyStr := string(body)
c <- &bodyStr
_, _ = fmt.Fprintln(w, "ok")
}))
defer ts.Close()
influxDB2Registry := RegisterInfluxDB2(context.Background(),
&types.InfluxDB2{
Address: ts.URL,
Token: "test-token",
PushInterval: ptypes.Duration(10 * time.Millisecond),
Org: "test-org",
Bucket: "test-bucket",
AddEntryPointsLabels: true,
AddRoutersLabels: true,
AddServicesLabels: true,
})
defer StopInfluxDB2()
if !influxDB2Registry.IsEpEnabled() || !influxDB2Registry.IsRouterEnabled() || !influxDB2Registry.IsSvcEnabled() {
t.Fatalf("InfluxDB2Registry should return true for IsEnabled(), IsRouterEnabled() and IsSvcEnabled()")
}
expectedServer := []string{
`(traefik\.config\.reload\.total count=1) [\d]{19}`,
`(traefik\.config\.reload\.total\.failure count=1) [\d]{19}`,
`(traefik\.config\.reload\.lastSuccessTimestamp value=1) [\d]{19}`,
`(traefik\.config\.reload\.lastFailureTimestamp value=1) [\d]{19}`,
}
influxDB2Registry.ConfigReloadsCounter().Add(1)
influxDB2Registry.ConfigReloadsFailureCounter().Add(1)
influxDB2Registry.LastConfigReloadSuccessGauge().Set(1)
influxDB2Registry.LastConfigReloadFailureGauge().Set(1)
msgServer := <-c
assertMessage(t, *msgServer, expectedServer)
expectedTLS := []string{
`(traefik\.tls\.certs\.notAfterTimestamp,key=value value=1) [\d]{19}`,
}
influxDB2Registry.TLSCertsNotAfterTimestampGauge().With("key", "value").Set(1)
msgTLS := <-c
assertMessage(t, *msgTLS, expectedTLS)
expectedEntrypoint := []string{
`(traefik\.entrypoint\.requests\.total,code=200,entrypoint=test,method=GET count=1) [\d]{19}`,
`(traefik\.entrypoint\.requests\.tls\.total,entrypoint=test,tls_cipher=bar,tls_version=foo count=1) [\d]{19}`,
`(traefik\.entrypoint\.request\.duration(?:,code=[\d]{3})?,entrypoint=test p50=10000,p90=10000,p95=10000,p99=10000) [\d]{19}`,
`(traefik\.entrypoint\.connections\.open,entrypoint=test value=1) [\d]{19}`,
}
influxDB2Registry.EntryPointReqsCounter().With("entrypoint", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1)
influxDB2Registry.EntryPointReqsTLSCounter().With("entrypoint", "test", "tls_version", "foo", "tls_cipher", "bar").Add(1)
influxDB2Registry.EntryPointReqDurationHistogram().With("entrypoint", "test").Observe(10000)
influxDB2Registry.EntryPointOpenConnsGauge().With("entrypoint", "test").Set(1)
msgEntrypoint := <-c
assertMessage(t, *msgEntrypoint, expectedEntrypoint)
expectedRouter := []string{
`(traefik\.router\.requests\.total,code=200,method=GET,router=demo,service=test count=1) [\d]{19}`,
`(traefik\.router\.requests\.total,code=404,method=GET,router=demo,service=test count=1) [\d]{19}`,
`(traefik\.router\.requests\.tls\.total,router=demo,service=test,tls_cipher=bar,tls_version=foo count=1) [\d]{19}`,
`(traefik\.router\.request\.duration,code=200,router=demo,service=test p50=10000,p90=10000,p95=10000,p99=10000) [\d]{19}`,
`(traefik\.router\.connections\.open,router=demo,service=test value=1) [\d]{19}`,
}
influxDB2Registry.RouterReqsCounter().With("router", "demo", "service", "test", "code", strconv.Itoa(http.StatusNotFound), "method", http.MethodGet).Add(1)
influxDB2Registry.RouterReqsCounter().With("router", "demo", "service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1)
influxDB2Registry.RouterReqsTLSCounter().With("router", "demo", "service", "test", "tls_version", "foo", "tls_cipher", "bar").Add(1)
influxDB2Registry.RouterReqDurationHistogram().With("router", "demo", "service", "test", "code", strconv.Itoa(http.StatusOK)).Observe(10000)
influxDB2Registry.RouterOpenConnsGauge().With("router", "demo", "service", "test").Set(1)
msgRouter := <-c
assertMessage(t, *msgRouter, expectedRouter)
expectedService := []string{
`(traefik\.service\.requests\.total,code=200,method=GET,service=test count=1) [\d]{19}`,
`(traefik\.service\.requests\.total,code=404,method=GET,service=test count=1) [\d]{19}`,
`(traefik\.service\.requests\.tls\.total,service=test,tls_cipher=bar,tls_version=foo count=1) [\d]{19}`,
`(traefik\.service\.request\.duration,code=200,service=test p50=10000,p90=10000,p95=10000,p99=10000) [\d]{19}`,
`(traefik\.service\.server\.up,service=test,url=http://127.0.0.1 value=1) [\d]{19}`,
}
influxDB2Registry.ServiceReqsCounter().With("service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1)
influxDB2Registry.ServiceReqsCounter().With("service", "test", "code", strconv.Itoa(http.StatusNotFound), "method", http.MethodGet).Add(1)
influxDB2Registry.ServiceReqsTLSCounter().With("service", "test", "tls_version", "foo", "tls_cipher", "bar").Add(1)
influxDB2Registry.ServiceReqDurationHistogram().With("service", "test", "code", strconv.Itoa(http.StatusOK)).Observe(10000)
influxDB2Registry.ServiceServerUpGauge().With("service", "test", "url", "http://127.0.0.1").Set(1)
msgService := <-c
assertMessage(t, *msgService, expectedService)
expectedServiceRetries := []string{
`(traefik\.service\.retries\.total,service=test count=2) [\d]{19}`,
`(traefik\.service\.retries\.total,service=foobar count=1) [\d]{19}`,
}
influxDB2Registry.ServiceRetriesCounter().With("service", "test").Add(1)
influxDB2Registry.ServiceRetriesCounter().With("service", "test").Add(1)
influxDB2Registry.ServiceRetriesCounter().With("service", "foobar").Add(1)
msgServiceRetries := <-c
assertMessage(t, *msgServiceRetries, expectedServiceRetries)
expectedServiceOpenConns := []string{
`(traefik\.service\.connections\.open,service=test value=2) [\d]{19}`,
`(traefik\.service\.connections\.open,service=foobar value=1) [\d]{19}`,
}
influxDB2Registry.ServiceOpenConnsGauge().With("service", "test").Add(1)
influxDB2Registry.ServiceOpenConnsGauge().With("service", "test").Add(1)
influxDB2Registry.ServiceOpenConnsGauge().With("service", "foobar").Add(1)
msgServiceOpenConns := <-c
assertMessage(t, *msgServiceOpenConns, expectedServiceOpenConns)
}

View file

@ -11,6 +11,7 @@ import (
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/stvp/go-udp-testing"
ptypes "github.com/traefik/paerser/types"
"github.com/traefik/traefik/v2/pkg/types"
@ -125,10 +126,8 @@ func TestInfluxDBHTTP(t *testing.T) {
c := make(chan *string)
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, "can't read body "+err.Error(), http.StatusBadRequest)
return
}
require.NoError(t, err)
bodyStr := string(body)
c <- &bodyStr
_, _ = fmt.Fprintln(w, "ok")
@ -140,7 +139,7 @@ func TestInfluxDBHTTP(t *testing.T) {
&types.InfluxDB{
Address: ts.URL,
Protocol: "http",
PushInterval: ptypes.Duration(time.Second),
PushInterval: ptypes.Duration(10 * time.Millisecond),
Database: "test",
RetentionPolicy: "autogen",
AddEntryPointsLabels: true,

View file

@ -111,4 +111,5 @@ func stopMetricsClients() {
metrics.StopDatadog()
metrics.StopStatsd()
metrics.StopInfluxDB()
metrics.StopInfluxDB2()
}

View file

@ -14,6 +14,7 @@ type Metrics struct {
Datadog *Datadog `description:"Datadog metrics exporter type." json:"datadog,omitempty" toml:"datadog,omitempty" yaml:"datadog,omitempty" label:"allowEmpty" file:"allowEmpty" export:"true"`
StatsD *Statsd `description:"StatsD metrics exporter type." json:"statsD,omitempty" toml:"statsD,omitempty" yaml:"statsD,omitempty" label:"allowEmpty" file:"allowEmpty" export:"true"`
InfluxDB *InfluxDB `description:"InfluxDB metrics exporter type." json:"influxDB,omitempty" toml:"influxDB,omitempty" yaml:"influxDB,omitempty" label:"allowEmpty" file:"allowEmpty" export:"true"`
InfluxDB2 *InfluxDB2 `description:"InfluxDB v2 metrics exporter type." json:"influxDB2,omitempty" toml:"influxDB2,omitempty" yaml:"influxDB2,omitempty" label:"allowEmpty" file:"allowEmpty" export:"true"`
}
// Prometheus can contain specific configuration used by the Prometheus Metrics exporter.
@ -105,6 +106,27 @@ func (i *InfluxDB) SetDefaults() {
i.AddServicesLabels = true
}
// InfluxDB2 contains address, token and metrics pushing interval configuration.
type InfluxDB2 struct {
Address string `description:"InfluxDB v2 address." json:"address,omitempty" toml:"address,omitempty" yaml:"address,omitempty"`
Token string `description:"InfluxDB v2 access token." json:"token,omitempty" toml:"token,omitempty" yaml:"token,omitempty" loggable:"false"`
PushInterval types.Duration `description:"InfluxDB v2 push interval." json:"pushInterval,omitempty" toml:"pushInterval,omitempty" yaml:"pushInterval,omitempty" export:"true"`
Org string `description:"InfluxDB v2 org ID." json:"org,omitempty" toml:"org,omitempty" yaml:"org,omitempty" export:"true"`
Bucket string `description:"InfluxDB v2 bucket ID." json:"bucket,omitempty" toml:"bucket,omitempty" yaml:"bucket,omitempty" export:"true"`
AddEntryPointsLabels bool `description:"Enable metrics on entry points." json:"addEntryPointsLabels,omitempty" toml:"addEntryPointsLabels,omitempty" yaml:"addEntryPointsLabels,omitempty" export:"true"`
AddRoutersLabels bool `description:"Enable metrics on routers." json:"addRoutersLabels,omitempty" toml:"addRoutersLabels,omitempty" yaml:"addRoutersLabels,omitempty" export:"true"`
AddServicesLabels bool `description:"Enable metrics on services." json:"addServicesLabels,omitempty" toml:"addServicesLabels,omitempty" yaml:"addServicesLabels,omitempty" export:"true"`
AdditionalLabels map[string]string `description:"Additional labels (influxdb tags) on all metrics" json:"additionalLabels,omitempty" toml:"additionalLabels,omitEmpty" yaml:"additionalLabels,omitEmpty" export:"true"`
}
// SetDefaults sets the default values.
func (i *InfluxDB2) SetDefaults() {
i.Address = "http://localhost:8086"
i.PushInterval = types.Duration(10 * time.Second)
i.AddEntryPointsLabels = true
i.AddServicesLabels = true
}
// Statistics provides options for monitoring request and response stats.
type Statistics struct {
RecentErrors int `description:"Number of recent errors logged." json:"recentErrors,omitempty" toml:"recentErrors,omitempty" yaml:"recentErrors,omitempty" export:"true"`