1
0
Fork 0

fix: otel not working without USER

This commit is contained in:
Michael 2025-10-03 14:48:04 +02:00 committed by GitHub
parent ad566ee9ef
commit c5ed376d5f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
127 changed files with 347 additions and 305 deletions

View file

@ -0,0 +1,174 @@
package metrics
import (
"context"
"net"
"strings"
"time"
"github.com/go-kit/kit/metrics/dogstatsd"
"github.com/go-kit/kit/util/conn"
gokitlog "github.com/go-kit/log"
"github.com/rs/zerolog/log"
"github.com/traefik/traefik/v3/pkg/observability/logs"
otypes "github.com/traefik/traefik/v3/pkg/observability/types"
"github.com/traefik/traefik/v3/pkg/safe"
)
const (
unixAddressPrefix = "unix://"
unixAddressDatagramPrefix = "unixgram://"
unixAddressStreamPrefix = "unixstream://"
)
var (
datadogClient *dogstatsd.Dogstatsd
datadogLoopCancelFunc context.CancelFunc
)
// Metric names consistent with https://github.com/DataDog/integrations-extras/pull/64
const (
ddConfigReloadsName = "config.reload.total"
ddLastConfigReloadSuccessName = "config.reload.lastSuccessTimestamp"
ddOpenConnsName = "open.connections"
ddTLSCertsNotAfterTimestampName = "tls.certs.notAfterTimestamp"
ddEntryPointReqsName = "entrypoint.request.total"
ddEntryPointReqsTLSName = "entrypoint.request.tls.total"
ddEntryPointReqDurationName = "entrypoint.request.duration"
ddEntryPointReqsBytesName = "entrypoint.requests.bytes.total"
ddEntryPointRespsBytesName = "entrypoint.responses.bytes.total"
ddRouterReqsName = "router.request.total"
ddRouterReqsTLSName = "router.request.tls.total"
ddRouterReqsDurationName = "router.request.duration"
ddRouterReqsBytesName = "router.requests.bytes.total"
ddRouterRespsBytesName = "router.responses.bytes.total"
ddServiceReqsName = "service.request.total"
ddServiceReqsTLSName = "service.request.tls.total"
ddServiceReqsDurationName = "service.request.duration"
ddServiceRetriesName = "service.retries.total"
ddServiceServerUpName = "service.server.up"
ddServiceReqsBytesName = "service.requests.bytes.total"
ddServiceRespsBytesName = "service.responses.bytes.total"
)
// RegisterDatadog registers the metrics pusher if this didn't happen yet and creates a datadog Registry instance.
func RegisterDatadog(ctx context.Context, config *otypes.Datadog) Registry {
// Ensures there is only one DataDog client sending metrics at any given time.
StopDatadog()
// just to be sure there is a prefix defined
if config.Prefix == "" {
config.Prefix = defaultMetricsPrefix
}
datadogLogger := logs.NewGoKitWrapper(log.Logger.With().Str(logs.MetricsProviderName, "datadog").Logger())
datadogClient = dogstatsd.New(config.Prefix+".", datadogLogger)
initDatadogClient(ctx, config, datadogLogger)
registry := &standardRegistry{
configReloadsCounter: datadogClient.NewCounter(ddConfigReloadsName, 1.0),
lastConfigReloadSuccessGauge: datadogClient.NewGauge(ddLastConfigReloadSuccessName),
openConnectionsGauge: datadogClient.NewGauge(ddOpenConnsName),
tlsCertsNotAfterTimestampGauge: datadogClient.NewGauge(ddTLSCertsNotAfterTimestampName),
}
if config.AddEntryPointsLabels {
registry.epEnabled = config.AddEntryPointsLabels
registry.entryPointReqsCounter = NewCounterWithNoopHeaders(datadogClient.NewCounter(ddEntryPointReqsName, 1.0))
registry.entryPointReqsTLSCounter = datadogClient.NewCounter(ddEntryPointReqsTLSName, 1.0)
registry.entryPointReqDurationHistogram, _ = NewHistogramWithScale(datadogClient.NewHistogram(ddEntryPointReqDurationName, 1.0), time.Second)
registry.entryPointReqsBytesCounter = datadogClient.NewCounter(ddEntryPointReqsBytesName, 1.0)
registry.entryPointRespsBytesCounter = datadogClient.NewCounter(ddEntryPointRespsBytesName, 1.0)
}
if config.AddRoutersLabels {
registry.routerEnabled = config.AddRoutersLabels
registry.routerReqsCounter = NewCounterWithNoopHeaders(datadogClient.NewCounter(ddRouterReqsName, 1.0))
registry.routerReqsTLSCounter = datadogClient.NewCounter(ddRouterReqsTLSName, 1.0)
registry.routerReqDurationHistogram, _ = NewHistogramWithScale(datadogClient.NewHistogram(ddRouterReqsDurationName, 1.0), time.Second)
registry.routerReqsBytesCounter = datadogClient.NewCounter(ddRouterReqsBytesName, 1.0)
registry.routerRespsBytesCounter = datadogClient.NewCounter(ddRouterRespsBytesName, 1.0)
}
if config.AddServicesLabels {
registry.svcEnabled = config.AddServicesLabels
registry.serviceReqsCounter = NewCounterWithNoopHeaders(datadogClient.NewCounter(ddServiceReqsName, 1.0))
registry.serviceReqsTLSCounter = datadogClient.NewCounter(ddServiceReqsTLSName, 1.0)
registry.serviceReqDurationHistogram, _ = NewHistogramWithScale(datadogClient.NewHistogram(ddServiceReqsDurationName, 1.0), time.Second)
registry.serviceRetriesCounter = datadogClient.NewCounter(ddServiceRetriesName, 1.0)
registry.serviceServerUpGauge = datadogClient.NewGauge(ddServiceServerUpName)
registry.serviceReqsBytesCounter = datadogClient.NewCounter(ddServiceReqsBytesName, 1.0)
registry.serviceRespsBytesCounter = datadogClient.NewCounter(ddServiceRespsBytesName, 1.0)
}
return registry
}
func initDatadogClient(ctx context.Context, config *otypes.Datadog, logger gokitlog.LoggerFunc) {
network, address := parseDatadogAddress(config.Address)
ctx, datadogLoopCancelFunc = context.WithCancel(ctx)
safe.Go(func() {
ticker := time.NewTicker(time.Duration(config.PushInterval))
defer ticker.Stop()
dialer := func(network, address string) (net.Conn, error) {
switch network {
case "unix":
// To mimic the Datadog client when the network is unix we will try to guess the UDS type.
newConn, err := net.Dial("unixgram", address)
if err != nil && strings.Contains(err.Error(), "protocol wrong type for socket") {
return net.Dial("unix", address)
}
return newConn, err
case "unixgram":
return net.Dial("unixgram", address)
case "unixstream":
return net.Dial("unix", address)
default:
return net.Dial(network, address)
}
}
datadogClient.WriteLoop(ctx, ticker.C, conn.NewManager(dialer, network, address, time.After, logger))
})
}
// StopDatadog stops the Datadog metrics pusher.
func StopDatadog() {
if datadogLoopCancelFunc != nil {
datadogLoopCancelFunc()
datadogLoopCancelFunc = nil
}
}
func parseDatadogAddress(address string) (string, string) {
network := "udp"
var addr string
switch {
case strings.HasPrefix(address, unixAddressPrefix):
network = "unix"
addr = address[len(unixAddressPrefix):]
case strings.HasPrefix(address, unixAddressDatagramPrefix):
network = "unixgram"
addr = address[len(unixAddressDatagramPrefix):]
case strings.HasPrefix(address, unixAddressStreamPrefix):
network = "unixstream"
addr = address[len(unixAddressStreamPrefix):]
case address != "":
addr = address
default:
addr = "localhost:8125"
}
return network, addr
}

View file

@ -0,0 +1,155 @@
package metrics
import (
"net/http"
"strconv"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stvp/go-udp-testing"
ptypes "github.com/traefik/paerser/types"
otypes "github.com/traefik/traefik/v3/pkg/observability/types"
)
func TestDatadog(t *testing.T) {
t.Cleanup(StopDatadog)
udp.SetAddr(":18125")
// This is needed to make sure that UDP Listener listens for data a bit longer, otherwise it will quit after a millisecond
udp.Timeout = 5 * time.Second
datadogRegistry := RegisterDatadog(t.Context(), &otypes.Datadog{Address: ":18125", PushInterval: ptypes.Duration(time.Second), AddEntryPointsLabels: true, AddRoutersLabels: true, AddServicesLabels: true})
if !datadogRegistry.IsEpEnabled() || !datadogRegistry.IsRouterEnabled() || !datadogRegistry.IsSvcEnabled() {
t.Errorf("DatadogRegistry should return true for IsEnabled(), IsRouterEnabled() and IsSvcEnabled()")
}
testDatadogRegistry(t, defaultMetricsPrefix, datadogRegistry)
}
func TestDatadogWithPrefix(t *testing.T) {
t.Cleanup(StopDatadog)
udp.SetAddr(":18125")
// This is needed to make sure that UDP Listener listens for data a bit longer, otherwise it will quit after a millisecond
udp.Timeout = 5 * time.Second
datadogRegistry := RegisterDatadog(t.Context(), &otypes.Datadog{Prefix: "testPrefix", Address: ":18125", PushInterval: ptypes.Duration(time.Second), AddEntryPointsLabels: true, AddRoutersLabels: true, AddServicesLabels: true})
testDatadogRegistry(t, "testPrefix", datadogRegistry)
}
func TestDatadog_parseDatadogAddress(t *testing.T) {
tests := []struct {
desc string
address string
expNetwork string
expAddress string
}{
{
desc: "empty address",
expNetwork: "udp",
expAddress: "localhost:8125",
},
{
desc: "udp address",
address: "127.0.0.1:8080",
expNetwork: "udp",
expAddress: "127.0.0.1:8080",
},
{
desc: "unix address",
address: "unix:///path/to/datadog.socket",
expNetwork: "unix",
expAddress: "/path/to/datadog.socket",
},
{
desc: "unixgram address",
address: "unixgram:///path/to/datadog.socket",
expNetwork: "unixgram",
expAddress: "/path/to/datadog.socket",
},
{
desc: "unixstream address",
address: "unixstream:///path/to/datadog.socket",
expNetwork: "unixstream",
expAddress: "/path/to/datadog.socket",
},
}
for _, test := range tests {
t.Run(test.desc, func(t *testing.T) {
t.Parallel()
gotNetwork, gotAddress := parseDatadogAddress(test.address)
assert.Equal(t, test.expNetwork, gotNetwork)
assert.Equal(t, test.expAddress, gotAddress)
})
}
}
func testDatadogRegistry(t *testing.T, metricsPrefix string, datadogRegistry Registry) {
t.Helper()
expected := []string{
metricsPrefix + ".config.reload.total:1.000000|c\n",
metricsPrefix + ".config.reload.lastSuccessTimestamp:1.000000|g\n",
metricsPrefix + ".open.connections:1.000000|g|#entrypoint:test,protocol:TCP\n",
metricsPrefix + ".tls.certs.notAfterTimestamp:1.000000|g|#key:value\n",
metricsPrefix + ".entrypoint.request.total:1.000000|c|#entrypoint:test\n",
metricsPrefix + ".entrypoint.request.tls.total:1.000000|c|#entrypoint:test,tls_version:foo,tls_cipher:bar\n",
metricsPrefix + ".entrypoint.request.duration:10000.000000|h|#entrypoint:test\n",
metricsPrefix + ".entrypoint.requests.bytes.total:1.000000|c|#entrypoint:test\n",
metricsPrefix + ".entrypoint.responses.bytes.total:1.000000|c|#entrypoint:test\n",
metricsPrefix + ".router.request.total:1.000000|c|#router:demo,service:test,code:404,method:GET\n",
metricsPrefix + ".router.request.total:1.000000|c|#router:demo,service:test,code:200,method:GET\n",
metricsPrefix + ".router.request.tls.total:1.000000|c|#router:demo,service:test,tls_version:foo,tls_cipher:bar\n",
metricsPrefix + ".router.request.duration:10000.000000|h|#router:demo,service:test,code:200\n",
metricsPrefix + ".router.requests.bytes.total:1.000000|c|#router:demo,service:test,code:200,method:GET\n",
metricsPrefix + ".router.responses.bytes.total:1.000000|c|#router:demo,service:test,code:200,method:GET\n",
metricsPrefix + ".service.request.total:1.000000|c|#service:test,code:404,method:GET\n",
metricsPrefix + ".service.request.total:1.000000|c|#service:test,code:200,method:GET\n",
metricsPrefix + ".service.request.tls.total:1.000000|c|#service:test,tls_version:foo,tls_cipher:bar\n",
metricsPrefix + ".service.request.duration:10000.000000|h|#service:test,code:200\n",
metricsPrefix + ".service.retries.total:2.000000|c|#service:test\n",
metricsPrefix + ".service.request.duration:10000.000000|h|#service:test,code:200\n",
metricsPrefix + ".service.server.up:1.000000|g|#service:test,url:http://127.0.0.1,one:two\n",
metricsPrefix + ".service.requests.bytes.total:1.000000|c|#service:test,code:200,method:GET\n",
metricsPrefix + ".service.responses.bytes.total:1.000000|c|#service:test,code:200,method:GET\n",
}
udp.ShouldReceiveAll(t, expected, func() {
datadogRegistry.ConfigReloadsCounter().Add(1)
datadogRegistry.LastConfigReloadSuccessGauge().Add(1)
datadogRegistry.OpenConnectionsGauge().With("entrypoint", "test", "protocol", "TCP").Add(1)
datadogRegistry.TLSCertsNotAfterTimestampGauge().With("key", "value").Set(1)
datadogRegistry.EntryPointReqsCounter().With(nil, "entrypoint", "test").Add(1)
datadogRegistry.EntryPointReqsTLSCounter().With("entrypoint", "test", "tls_version", "foo", "tls_cipher", "bar").Add(1)
datadogRegistry.EntryPointReqDurationHistogram().With("entrypoint", "test").Observe(10000)
datadogRegistry.EntryPointReqsBytesCounter().With("entrypoint", "test").Add(1)
datadogRegistry.EntryPointRespsBytesCounter().With("entrypoint", "test").Add(1)
datadogRegistry.RouterReqsCounter().With(nil, "router", "demo", "service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1)
datadogRegistry.RouterReqsCounter().With(nil, "router", "demo", "service", "test", "code", strconv.Itoa(http.StatusNotFound), "method", http.MethodGet).Add(1)
datadogRegistry.RouterReqsTLSCounter().With("router", "demo", "service", "test", "tls_version", "foo", "tls_cipher", "bar").Add(1)
datadogRegistry.RouterReqDurationHistogram().With("router", "demo", "service", "test", "code", strconv.Itoa(http.StatusOK)).Observe(10000)
datadogRegistry.RouterReqsBytesCounter().With("router", "demo", "service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1)
datadogRegistry.RouterRespsBytesCounter().With("router", "demo", "service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1)
datadogRegistry.ServiceReqsCounter().With(nil, "service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1)
datadogRegistry.ServiceReqsCounter().With(nil, "service", "test", "code", strconv.Itoa(http.StatusNotFound), "method", http.MethodGet).Add(1)
datadogRegistry.ServiceReqsTLSCounter().With("service", "test", "tls_version", "foo", "tls_cipher", "bar").Add(1)
datadogRegistry.ServiceReqDurationHistogram().With("service", "test", "code", strconv.Itoa(http.StatusOK)).Observe(10000)
datadogRegistry.ServiceRetriesCounter().With("service", "test").Add(1)
datadogRegistry.ServiceRetriesCounter().With("service", "test").Add(1)
datadogRegistry.ServiceServerUpGauge().With("service", "test", "url", "http://127.0.0.1", "one", "two").Set(1)
datadogRegistry.ServiceReqsBytesCounter().With("service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1)
datadogRegistry.ServiceRespsBytesCounter().With("service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1)
})
}

View file

@ -0,0 +1,57 @@
package metrics
import (
"net/http"
"github.com/go-kit/kit/metrics"
)
// CounterWithHeaders represents a counter that can use http.Header values as label values.
type CounterWithHeaders interface {
Add(delta float64)
With(headers http.Header, labelValues ...string) CounterWithHeaders
}
// MultiCounterWithHeaders collects multiple individual CounterWithHeaders and treats them as a unit.
type MultiCounterWithHeaders []CounterWithHeaders
// NewMultiCounterWithHeaders returns a multi-counter, wrapping the passed CounterWithHeaders.
func NewMultiCounterWithHeaders(c ...CounterWithHeaders) MultiCounterWithHeaders {
return c
}
// Add adds the given delta value to the counter value.
func (c MultiCounterWithHeaders) Add(delta float64) {
for _, counter := range c {
counter.Add(delta)
}
}
// With creates a new counter by appending the given label values and http.Header as labels and returns it.
func (c MultiCounterWithHeaders) With(headers http.Header, labelValues ...string) CounterWithHeaders {
next := make(MultiCounterWithHeaders, len(c))
for i := range c {
next[i] = c[i].With(headers, labelValues...)
}
return next
}
// NewCounterWithNoopHeaders returns a CounterWithNoopHeaders.
func NewCounterWithNoopHeaders(counter metrics.Counter) CounterWithNoopHeaders {
return CounterWithNoopHeaders{counter: counter}
}
// CounterWithNoopHeaders is a counter that satisfies CounterWithHeaders but ignores the given http.Header.
type CounterWithNoopHeaders struct {
counter metrics.Counter
}
// Add adds the given delta value to the counter value.
func (c CounterWithNoopHeaders) Add(delta float64) {
c.counter.Add(delta)
}
// With creates a new counter by appending the given label values and returns it.
func (c CounterWithNoopHeaders) With(_ http.Header, labelValues ...string) CounterWithHeaders {
return NewCounterWithNoopHeaders(c.counter.With(labelValues...))
}

View file

@ -0,0 +1,174 @@
package metrics
import (
"context"
"errors"
"time"
"github.com/go-kit/kit/metrics/influx"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
influxdb2api "github.com/influxdata/influxdb-client-go/v2/api"
"github.com/influxdata/influxdb-client-go/v2/api/write"
influxdb2log "github.com/influxdata/influxdb-client-go/v2/log"
influxdb "github.com/influxdata/influxdb1-client/v2"
"github.com/rs/zerolog/log"
"github.com/traefik/traefik/v3/pkg/observability/logs"
otypes "github.com/traefik/traefik/v3/pkg/observability/types"
"github.com/traefik/traefik/v3/pkg/safe"
)
var (
influxDB2Ticker *time.Ticker
influxDB2Store *influx.Influx
influxDB2Client influxdb2.Client
)
const (
influxDBConfigReloadsName = "traefik.config.reload.total"
influxDBLastConfigReloadSuccessName = "traefik.config.reload.lastSuccessTimestamp"
influxDBOpenConnsName = "traefik.open.connections"
influxDBTLSCertsNotAfterTimestampName = "traefik.tls.certs.notAfterTimestamp"
influxDBEntryPointReqsName = "traefik.entrypoint.requests.total"
influxDBEntryPointReqsTLSName = "traefik.entrypoint.requests.tls.total"
influxDBEntryPointReqDurationName = "traefik.entrypoint.request.duration"
influxDBEntryPointReqsBytesName = "traefik.entrypoint.requests.bytes.total"
influxDBEntryPointRespsBytesName = "traefik.entrypoint.responses.bytes.total"
influxDBRouterReqsName = "traefik.router.requests.total"
influxDBRouterReqsTLSName = "traefik.router.requests.tls.total"
influxDBRouterReqsDurationName = "traefik.router.request.duration"
influxDBRouterReqsBytesName = "traefik.router.requests.bytes.total"
influxDBRouterRespsBytesName = "traefik.router.responses.bytes.total"
influxDBServiceReqsName = "traefik.service.requests.total"
influxDBServiceReqsTLSName = "traefik.service.requests.tls.total"
influxDBServiceReqsDurationName = "traefik.service.request.duration"
influxDBServiceRetriesTotalName = "traefik.service.retries.total"
influxDBServiceServerUpName = "traefik.service.server.up"
influxDBServiceReqsBytesName = "traefik.service.requests.bytes.total"
influxDBServiceRespsBytesName = "traefik.service.responses.bytes.total"
)
// RegisterInfluxDB2 creates metrics exporter for InfluxDB2.
func RegisterInfluxDB2(ctx context.Context, config *otypes.InfluxDB2) Registry {
logger := log.Ctx(ctx)
if influxDB2Client == nil {
var err error
if influxDB2Client, err = newInfluxDB2Client(config); err != nil {
logger.Error().Err(err).Send()
return nil
}
}
if influxDB2Store == nil {
influxDB2Store = influx.New(
config.AdditionalLabels,
influxdb.BatchPointsConfig{},
logs.NewGoKitWrapper(*logger),
)
influxDB2Ticker = time.NewTicker(time.Duration(config.PushInterval))
safe.Go(func() {
wc := influxDB2Client.WriteAPIBlocking(config.Org, config.Bucket)
influxDB2Store.WriteLoop(ctx, influxDB2Ticker.C, influxDB2Writer{wc: wc})
})
}
registry := &standardRegistry{
configReloadsCounter: influxDB2Store.NewCounter(influxDBConfigReloadsName),
lastConfigReloadSuccessGauge: influxDB2Store.NewGauge(influxDBLastConfigReloadSuccessName),
openConnectionsGauge: influxDB2Store.NewGauge(influxDBOpenConnsName),
tlsCertsNotAfterTimestampGauge: influxDB2Store.NewGauge(influxDBTLSCertsNotAfterTimestampName),
}
if config.AddEntryPointsLabels {
registry.epEnabled = config.AddEntryPointsLabels
registry.entryPointReqsCounter = NewCounterWithNoopHeaders(influxDB2Store.NewCounter(influxDBEntryPointReqsName))
registry.entryPointReqsTLSCounter = influxDB2Store.NewCounter(influxDBEntryPointReqsTLSName)
registry.entryPointReqDurationHistogram, _ = NewHistogramWithScale(influxDB2Store.NewHistogram(influxDBEntryPointReqDurationName), time.Second)
registry.entryPointReqsBytesCounter = influxDB2Store.NewCounter(influxDBEntryPointReqsBytesName)
registry.entryPointRespsBytesCounter = influxDB2Store.NewCounter(influxDBEntryPointRespsBytesName)
}
if config.AddRoutersLabels {
registry.routerEnabled = config.AddRoutersLabels
registry.routerReqsCounter = NewCounterWithNoopHeaders(influxDB2Store.NewCounter(influxDBRouterReqsName))
registry.routerReqsTLSCounter = influxDB2Store.NewCounter(influxDBRouterReqsTLSName)
registry.routerReqDurationHistogram, _ = NewHistogramWithScale(influxDB2Store.NewHistogram(influxDBRouterReqsDurationName), time.Second)
registry.routerReqsBytesCounter = influxDB2Store.NewCounter(influxDBRouterReqsBytesName)
registry.routerRespsBytesCounter = influxDB2Store.NewCounter(influxDBRouterRespsBytesName)
}
if config.AddServicesLabels {
registry.svcEnabled = config.AddServicesLabels
registry.serviceReqsCounter = NewCounterWithNoopHeaders(influxDB2Store.NewCounter(influxDBServiceReqsName))
registry.serviceReqsTLSCounter = influxDB2Store.NewCounter(influxDBServiceReqsTLSName)
registry.serviceReqDurationHistogram, _ = NewHistogramWithScale(influxDB2Store.NewHistogram(influxDBServiceReqsDurationName), time.Second)
registry.serviceRetriesCounter = influxDB2Store.NewCounter(influxDBServiceRetriesTotalName)
registry.serviceServerUpGauge = influxDB2Store.NewGauge(influxDBServiceServerUpName)
registry.serviceReqsBytesCounter = influxDB2Store.NewCounter(influxDBServiceReqsBytesName)
registry.serviceRespsBytesCounter = influxDB2Store.NewCounter(influxDBServiceRespsBytesName)
}
return registry
}
// StopInfluxDB2 stops and resets InfluxDB2 client, ticker and store.
func StopInfluxDB2() {
if influxDB2Client != nil {
influxDB2Client.Close()
}
influxDB2Client = nil
if influxDB2Ticker != nil {
influxDB2Ticker.Stop()
}
influxDB2Ticker = nil
influxDB2Store = nil
}
// newInfluxDB2Client creates an influxdb2.Client.
func newInfluxDB2Client(config *otypes.InfluxDB2) (influxdb2.Client, error) {
if config.Token == "" || config.Org == "" || config.Bucket == "" {
return nil, errors.New("token, org or bucket property is missing")
}
// Disable InfluxDB2 logs.
// See https://github.com/influxdata/influxdb-client-go/blob/v2.7.0/options.go#L128
influxdb2log.Log = nil
return influxdb2.NewClient(config.Address, config.Token), nil
}
type influxDB2Writer struct {
wc influxdb2api.WriteAPIBlocking
}
func (w influxDB2Writer) Write(bp influxdb.BatchPoints) error {
logger := log.With().Str(logs.MetricsProviderName, "influxdb2").Logger()
wps := make([]*write.Point, 0, len(bp.Points()))
for _, p := range bp.Points() {
fields, err := p.Fields()
if err != nil {
logger.Error().Err(err).Msgf("Error while getting %s point fields", p.Name())
continue
}
wps = append(wps, influxdb2.NewPoint(
p.Name(),
p.Tags(),
fields,
p.Time(),
))
}
ctx := logger.WithContext(context.Background())
return w.wc.WritePoint(ctx, wps...)
}

View file

@ -0,0 +1,140 @@
package metrics
import (
"fmt"
"io"
"net/http"
"net/http/httptest"
"strconv"
"testing"
"time"
"github.com/stretchr/testify/require"
ptypes "github.com/traefik/paerser/types"
otypes "github.com/traefik/traefik/v3/pkg/observability/types"
)
func TestInfluxDB2(t *testing.T) {
c := make(chan *string)
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
require.NoError(t, err)
bodyStr := string(body)
c <- &bodyStr
_, _ = fmt.Fprintln(w, "ok")
}))
influxDB2Registry := RegisterInfluxDB2(t.Context(),
&otypes.InfluxDB2{
Address: ts.URL,
Token: "test-token",
PushInterval: ptypes.Duration(10 * time.Millisecond),
Org: "test-org",
Bucket: "test-bucket",
AddEntryPointsLabels: true,
AddRoutersLabels: true,
AddServicesLabels: true,
})
t.Cleanup(func() {
StopInfluxDB2()
ts.Close()
})
if !influxDB2Registry.IsEpEnabled() || !influxDB2Registry.IsRouterEnabled() || !influxDB2Registry.IsSvcEnabled() {
t.Fatalf("InfluxDB2Registry should return true for IsEnabled(), IsRouterEnabled() and IsSvcEnabled()")
}
expectedServer := []string{
`(traefik\.config\.reload\.total count=1) [\d]{19}`,
`(traefik\.config\.reload\.lastSuccessTimestamp value=1) [\d]{19}`,
`(traefik\.open\.connections,entrypoint=test,protocol=TCP value=1) [\d]{19}`,
}
influxDB2Registry.ConfigReloadsCounter().Add(1)
influxDB2Registry.LastConfigReloadSuccessGauge().Set(1)
influxDB2Registry.OpenConnectionsGauge().With("entrypoint", "test", "protocol", "TCP").Set(1)
msgServer := <-c
assertMessage(t, *msgServer, expectedServer)
expectedTLS := []string{
`(traefik\.tls\.certs\.notAfterTimestamp,key=value value=1) [\d]{19}`,
}
influxDB2Registry.TLSCertsNotAfterTimestampGauge().With("key", "value").Set(1)
msgTLS := <-c
assertMessage(t, *msgTLS, expectedTLS)
expectedEntrypoint := []string{
`(traefik\.entrypoint\.requests\.total,code=200,entrypoint=test,method=GET count=1) [\d]{19}`,
`(traefik\.entrypoint\.requests\.tls\.total,entrypoint=test,tls_cipher=bar,tls_version=foo count=1) [\d]{19}`,
`(traefik\.entrypoint\.request\.duration(?:,code=[\d]{3})?,entrypoint=test p50=10000,p90=10000,p95=10000,p99=10000) [\d]{19}`,
`(traefik\.entrypoint\.requests\.bytes\.total,code=200,entrypoint=test,method=GET count=1) [\d]{19}`,
`(traefik\.entrypoint\.responses\.bytes\.total,code=200,entrypoint=test,method=GET count=1) [\d]{19}`,
}
influxDB2Registry.EntryPointReqsCounter().With(nil, "entrypoint", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1)
influxDB2Registry.EntryPointReqsTLSCounter().With("entrypoint", "test", "tls_version", "foo", "tls_cipher", "bar").Add(1)
influxDB2Registry.EntryPointReqDurationHistogram().With("entrypoint", "test").Observe(10000)
influxDB2Registry.EntryPointReqsBytesCounter().With("entrypoint", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1)
influxDB2Registry.EntryPointRespsBytesCounter().With("entrypoint", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1)
msgEntrypoint := <-c
assertMessage(t, *msgEntrypoint, expectedEntrypoint)
expectedRouter := []string{
`(traefik\.router\.requests\.total,code=200,method=GET,router=demo,service=test count=1) [\d]{19}`,
`(traefik\.router\.requests\.total,code=404,method=GET,router=demo,service=test count=1) [\d]{19}`,
`(traefik\.router\.requests\.tls\.total,router=demo,service=test,tls_cipher=bar,tls_version=foo count=1) [\d]{19}`,
`(traefik\.router\.request\.duration,code=200,router=demo,service=test p50=10000,p90=10000,p95=10000,p99=10000) [\d]{19}`,
`(traefik\.router\.requests\.bytes\.total,code=200,method=GET,router=demo,service=test count=1) [\d]{19}`,
`(traefik\.router\.responses\.bytes\.total,code=200,method=GET,router=demo,service=test count=1) [\d]{19}`,
}
influxDB2Registry.RouterReqsCounter().With(nil, "router", "demo", "service", "test", "code", strconv.Itoa(http.StatusNotFound), "method", http.MethodGet).Add(1)
influxDB2Registry.RouterReqsCounter().With(nil, "router", "demo", "service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1)
influxDB2Registry.RouterReqsTLSCounter().With("router", "demo", "service", "test", "tls_version", "foo", "tls_cipher", "bar").Add(1)
influxDB2Registry.RouterReqDurationHistogram().With("router", "demo", "service", "test", "code", strconv.Itoa(http.StatusOK)).Observe(10000)
influxDB2Registry.RouterReqsBytesCounter().With("router", "demo", "service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1)
influxDB2Registry.RouterRespsBytesCounter().With("router", "demo", "service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1)
msgRouter := <-c
assertMessage(t, *msgRouter, expectedRouter)
expectedService := []string{
`(traefik\.service\.requests\.total,code=200,method=GET,service=test count=1) [\d]{19}`,
`(traefik\.service\.requests\.total,code=404,method=GET,service=test count=1) [\d]{19}`,
`(traefik\.service\.requests\.tls\.total,service=test,tls_cipher=bar,tls_version=foo count=1) [\d]{19}`,
`(traefik\.service\.request\.duration,code=200,service=test p50=10000,p90=10000,p95=10000,p99=10000) [\d]{19}`,
`(traefik\.service\.server\.up,service=test,url=http://127.0.0.1 value=1) [\d]{19}`,
`(traefik\.service\.requests\.bytes\.total,code=200,method=GET,service=test count=1) [\d]{19}`,
`(traefik\.service\.responses\.bytes\.total,code=200,method=GET,service=test count=1) [\d]{19}`,
}
influxDB2Registry.ServiceReqsCounter().With(nil, "service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1)
influxDB2Registry.ServiceReqsCounter().With(nil, "service", "test", "code", strconv.Itoa(http.StatusNotFound), "method", http.MethodGet).Add(1)
influxDB2Registry.ServiceReqsTLSCounter().With("service", "test", "tls_version", "foo", "tls_cipher", "bar").Add(1)
influxDB2Registry.ServiceReqDurationHistogram().With("service", "test", "code", strconv.Itoa(http.StatusOK)).Observe(10000)
influxDB2Registry.ServiceServerUpGauge().With("service", "test", "url", "http://127.0.0.1").Set(1)
influxDB2Registry.ServiceReqsBytesCounter().With("service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1)
influxDB2Registry.ServiceRespsBytesCounter().With("service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1)
msgService := <-c
assertMessage(t, *msgService, expectedService)
expectedServiceRetries := []string{
`(traefik\.service\.retries\.total,service=test count=2) [\d]{19}`,
`(traefik\.service\.retries\.total,service=foobar count=1) [\d]{19}`,
}
influxDB2Registry.ServiceRetriesCounter().With("service", "test").Add(1)
influxDB2Registry.ServiceRetriesCounter().With("service", "test").Add(1)
influxDB2Registry.ServiceRetriesCounter().With("service", "foobar").Add(1)
msgServiceRetries := <-c
assertMessage(t, *msgServiceRetries, expectedServiceRetries)
}

View file

@ -0,0 +1,381 @@
package metrics
import (
"errors"
"time"
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/multi"
)
const defaultMetricsPrefix = "traefik"
// Registry has to implemented by any system that wants to monitor and expose metrics.
type Registry interface {
// IsEpEnabled shows whether metrics instrumentation is enabled on entry points.
IsEpEnabled() bool
// IsRouterEnabled shows whether metrics instrumentation is enabled on routers.
IsRouterEnabled() bool
// IsSvcEnabled shows whether metrics instrumentation is enabled on services.
IsSvcEnabled() bool
// server metrics
ConfigReloadsCounter() metrics.Counter
LastConfigReloadSuccessGauge() metrics.Gauge
OpenConnectionsGauge() metrics.Gauge
// TLS
TLSCertsNotAfterTimestampGauge() metrics.Gauge
// entry point metrics
EntryPointReqsCounter() CounterWithHeaders
EntryPointReqsTLSCounter() metrics.Counter
EntryPointReqDurationHistogram() ScalableHistogram
EntryPointReqsBytesCounter() metrics.Counter
EntryPointRespsBytesCounter() metrics.Counter
// router metrics
RouterReqsCounter() CounterWithHeaders
RouterReqsTLSCounter() metrics.Counter
RouterReqDurationHistogram() ScalableHistogram
RouterReqsBytesCounter() metrics.Counter
RouterRespsBytesCounter() metrics.Counter
// service metrics
ServiceReqsCounter() CounterWithHeaders
ServiceReqsTLSCounter() metrics.Counter
ServiceReqDurationHistogram() ScalableHistogram
ServiceRetriesCounter() metrics.Counter
ServiceServerUpGauge() metrics.Gauge
ServiceReqsBytesCounter() metrics.Counter
ServiceRespsBytesCounter() metrics.Counter
}
// NewVoidRegistry is a noop implementation of metrics.Registry.
// It is used to avoid nil checking in components that do metric collections.
func NewVoidRegistry() Registry {
return NewMultiRegistry([]Registry{})
}
// NewMultiRegistry is an implementation of metrics.Registry that wraps multiple registries.
// It handles the case when a registry hasn't registered some metric and returns nil.
// This allows for feature disparity between the different metric implementations.
func NewMultiRegistry(registries []Registry) Registry {
var configReloadsCounter []metrics.Counter
var lastConfigReloadSuccessGauge []metrics.Gauge
var openConnectionsGauge []metrics.Gauge
var tlsCertsNotAfterTimestampGauge []metrics.Gauge
var entryPointReqsCounter []CounterWithHeaders
var entryPointReqsTLSCounter []metrics.Counter
var entryPointReqDurationHistogram []ScalableHistogram
var entryPointReqsBytesCounter []metrics.Counter
var entryPointRespsBytesCounter []metrics.Counter
var routerReqsCounter []CounterWithHeaders
var routerReqsTLSCounter []metrics.Counter
var routerReqDurationHistogram []ScalableHistogram
var routerReqsBytesCounter []metrics.Counter
var routerRespsBytesCounter []metrics.Counter
var serviceReqsCounter []CounterWithHeaders
var serviceReqsTLSCounter []metrics.Counter
var serviceReqDurationHistogram []ScalableHistogram
var serviceRetriesCounter []metrics.Counter
var serviceServerUpGauge []metrics.Gauge
var serviceReqsBytesCounter []metrics.Counter
var serviceRespsBytesCounter []metrics.Counter
for _, r := range registries {
if r.ConfigReloadsCounter() != nil {
configReloadsCounter = append(configReloadsCounter, r.ConfigReloadsCounter())
}
if r.LastConfigReloadSuccessGauge() != nil {
lastConfigReloadSuccessGauge = append(lastConfigReloadSuccessGauge, r.LastConfigReloadSuccessGauge())
}
if r.OpenConnectionsGauge() != nil {
openConnectionsGauge = append(openConnectionsGauge, r.OpenConnectionsGauge())
}
if r.TLSCertsNotAfterTimestampGauge() != nil {
tlsCertsNotAfterTimestampGauge = append(tlsCertsNotAfterTimestampGauge, r.TLSCertsNotAfterTimestampGauge())
}
if r.EntryPointReqsCounter() != nil {
entryPointReqsCounter = append(entryPointReqsCounter, r.EntryPointReqsCounter())
}
if r.EntryPointReqsTLSCounter() != nil {
entryPointReqsTLSCounter = append(entryPointReqsTLSCounter, r.EntryPointReqsTLSCounter())
}
if r.EntryPointReqDurationHistogram() != nil {
entryPointReqDurationHistogram = append(entryPointReqDurationHistogram, r.EntryPointReqDurationHistogram())
}
if r.EntryPointReqsBytesCounter() != nil {
entryPointReqsBytesCounter = append(entryPointReqsBytesCounter, r.EntryPointReqsBytesCounter())
}
if r.EntryPointRespsBytesCounter() != nil {
entryPointRespsBytesCounter = append(entryPointRespsBytesCounter, r.EntryPointRespsBytesCounter())
}
if r.RouterReqsCounter() != nil {
routerReqsCounter = append(routerReqsCounter, r.RouterReqsCounter())
}
if r.RouterReqsTLSCounter() != nil {
routerReqsTLSCounter = append(routerReqsTLSCounter, r.RouterReqsTLSCounter())
}
if r.RouterReqDurationHistogram() != nil {
routerReqDurationHistogram = append(routerReqDurationHistogram, r.RouterReqDurationHistogram())
}
if r.RouterReqsBytesCounter() != nil {
routerReqsBytesCounter = append(routerReqsBytesCounter, r.RouterReqsBytesCounter())
}
if r.RouterRespsBytesCounter() != nil {
routerRespsBytesCounter = append(routerRespsBytesCounter, r.RouterRespsBytesCounter())
}
if r.ServiceReqsCounter() != nil {
serviceReqsCounter = append(serviceReqsCounter, r.ServiceReqsCounter())
}
if r.ServiceReqsTLSCounter() != nil {
serviceReqsTLSCounter = append(serviceReqsTLSCounter, r.ServiceReqsTLSCounter())
}
if r.ServiceReqDurationHistogram() != nil {
serviceReqDurationHistogram = append(serviceReqDurationHistogram, r.ServiceReqDurationHistogram())
}
if r.ServiceRetriesCounter() != nil {
serviceRetriesCounter = append(serviceRetriesCounter, r.ServiceRetriesCounter())
}
if r.ServiceServerUpGauge() != nil {
serviceServerUpGauge = append(serviceServerUpGauge, r.ServiceServerUpGauge())
}
if r.ServiceReqsBytesCounter() != nil {
serviceReqsBytesCounter = append(serviceReqsBytesCounter, r.ServiceReqsBytesCounter())
}
if r.ServiceRespsBytesCounter() != nil {
serviceRespsBytesCounter = append(serviceRespsBytesCounter, r.ServiceRespsBytesCounter())
}
}
return &standardRegistry{
epEnabled: len(entryPointReqsCounter) > 0 || len(entryPointReqDurationHistogram) > 0,
svcEnabled: len(serviceReqsCounter) > 0 || len(serviceReqDurationHistogram) > 0 || len(serviceRetriesCounter) > 0 || len(serviceServerUpGauge) > 0,
routerEnabled: len(routerReqsCounter) > 0 || len(routerReqDurationHistogram) > 0,
configReloadsCounter: multi.NewCounter(configReloadsCounter...),
lastConfigReloadSuccessGauge: multi.NewGauge(lastConfigReloadSuccessGauge...),
openConnectionsGauge: multi.NewGauge(openConnectionsGauge...),
tlsCertsNotAfterTimestampGauge: multi.NewGauge(tlsCertsNotAfterTimestampGauge...),
entryPointReqsCounter: NewMultiCounterWithHeaders(entryPointReqsCounter...),
entryPointReqsTLSCounter: multi.NewCounter(entryPointReqsTLSCounter...),
entryPointReqDurationHistogram: MultiHistogram(entryPointReqDurationHistogram),
entryPointReqsBytesCounter: multi.NewCounter(entryPointReqsBytesCounter...),
entryPointRespsBytesCounter: multi.NewCounter(entryPointRespsBytesCounter...),
routerReqsCounter: NewMultiCounterWithHeaders(routerReqsCounter...),
routerReqsTLSCounter: multi.NewCounter(routerReqsTLSCounter...),
routerReqDurationHistogram: MultiHistogram(routerReqDurationHistogram),
routerReqsBytesCounter: multi.NewCounter(routerReqsBytesCounter...),
routerRespsBytesCounter: multi.NewCounter(routerRespsBytesCounter...),
serviceReqsCounter: NewMultiCounterWithHeaders(serviceReqsCounter...),
serviceReqsTLSCounter: multi.NewCounter(serviceReqsTLSCounter...),
serviceReqDurationHistogram: MultiHistogram(serviceReqDurationHistogram),
serviceRetriesCounter: multi.NewCounter(serviceRetriesCounter...),
serviceServerUpGauge: multi.NewGauge(serviceServerUpGauge...),
serviceReqsBytesCounter: multi.NewCounter(serviceReqsBytesCounter...),
serviceRespsBytesCounter: multi.NewCounter(serviceRespsBytesCounter...),
}
}
type standardRegistry struct {
epEnabled bool
routerEnabled bool
svcEnabled bool
configReloadsCounter metrics.Counter
lastConfigReloadSuccessGauge metrics.Gauge
openConnectionsGauge metrics.Gauge
tlsCertsNotAfterTimestampGauge metrics.Gauge
entryPointReqsCounter CounterWithHeaders
entryPointReqsTLSCounter metrics.Counter
entryPointReqDurationHistogram ScalableHistogram
entryPointReqsBytesCounter metrics.Counter
entryPointRespsBytesCounter metrics.Counter
routerReqsCounter CounterWithHeaders
routerReqsTLSCounter metrics.Counter
routerReqDurationHistogram ScalableHistogram
routerReqsBytesCounter metrics.Counter
routerRespsBytesCounter metrics.Counter
serviceReqsCounter CounterWithHeaders
serviceReqsTLSCounter metrics.Counter
serviceReqDurationHistogram ScalableHistogram
serviceRetriesCounter metrics.Counter
serviceServerUpGauge metrics.Gauge
serviceReqsBytesCounter metrics.Counter
serviceRespsBytesCounter metrics.Counter
}
func (r *standardRegistry) IsEpEnabled() bool {
return r.epEnabled
}
func (r *standardRegistry) IsRouterEnabled() bool {
return r.routerEnabled
}
func (r *standardRegistry) IsSvcEnabled() bool {
return r.svcEnabled
}
func (r *standardRegistry) ConfigReloadsCounter() metrics.Counter {
return r.configReloadsCounter
}
func (r *standardRegistry) LastConfigReloadSuccessGauge() metrics.Gauge {
return r.lastConfigReloadSuccessGauge
}
func (r *standardRegistry) OpenConnectionsGauge() metrics.Gauge {
return r.openConnectionsGauge
}
func (r *standardRegistry) TLSCertsNotAfterTimestampGauge() metrics.Gauge {
return r.tlsCertsNotAfterTimestampGauge
}
func (r *standardRegistry) EntryPointReqsCounter() CounterWithHeaders {
return r.entryPointReqsCounter
}
func (r *standardRegistry) EntryPointReqsTLSCounter() metrics.Counter {
return r.entryPointReqsTLSCounter
}
func (r *standardRegistry) EntryPointReqDurationHistogram() ScalableHistogram {
return r.entryPointReqDurationHistogram
}
func (r *standardRegistry) EntryPointReqsBytesCounter() metrics.Counter {
return r.entryPointReqsBytesCounter
}
func (r *standardRegistry) EntryPointRespsBytesCounter() metrics.Counter {
return r.entryPointRespsBytesCounter
}
func (r *standardRegistry) RouterReqsCounter() CounterWithHeaders {
return r.routerReqsCounter
}
func (r *standardRegistry) RouterReqsTLSCounter() metrics.Counter {
return r.routerReqsTLSCounter
}
func (r *standardRegistry) RouterReqDurationHistogram() ScalableHistogram {
return r.routerReqDurationHistogram
}
func (r *standardRegistry) RouterReqsBytesCounter() metrics.Counter {
return r.routerReqsBytesCounter
}
func (r *standardRegistry) RouterRespsBytesCounter() metrics.Counter {
return r.routerRespsBytesCounter
}
func (r *standardRegistry) ServiceReqsCounter() CounterWithHeaders {
return r.serviceReqsCounter
}
func (r *standardRegistry) ServiceReqsTLSCounter() metrics.Counter {
return r.serviceReqsTLSCounter
}
func (r *standardRegistry) ServiceReqDurationHistogram() ScalableHistogram {
return r.serviceReqDurationHistogram
}
func (r *standardRegistry) ServiceRetriesCounter() metrics.Counter {
return r.serviceRetriesCounter
}
func (r *standardRegistry) ServiceServerUpGauge() metrics.Gauge {
return r.serviceServerUpGauge
}
func (r *standardRegistry) ServiceReqsBytesCounter() metrics.Counter {
return r.serviceReqsBytesCounter
}
func (r *standardRegistry) ServiceRespsBytesCounter() metrics.Counter {
return r.serviceRespsBytesCounter
}
// ScalableHistogram is a Histogram with a predefined time unit,
// used when producing observations without explicitly setting the observed value.
type ScalableHistogram interface {
With(labelValues ...string) ScalableHistogram
Observe(v float64)
ObserveFromStart(start time.Time)
}
// HistogramWithScale is a histogram that will convert its observed value to the specified unit.
type HistogramWithScale struct {
histogram metrics.Histogram
unit time.Duration
}
// With implements ScalableHistogram.
func (s *HistogramWithScale) With(labelValues ...string) ScalableHistogram {
h, _ := NewHistogramWithScale(s.histogram.With(labelValues...), s.unit)
return h
}
// ObserveFromStart implements ScalableHistogram.
func (s *HistogramWithScale) ObserveFromStart(start time.Time) {
if s.unit <= 0 {
return
}
d := float64(time.Since(start).Nanoseconds()) / float64(s.unit)
if d < 0 {
d = 0
}
s.histogram.Observe(d)
}
// Observe implements ScalableHistogram.
func (s *HistogramWithScale) Observe(v float64) {
s.histogram.Observe(v)
}
// NewHistogramWithScale returns a ScalableHistogram. It returns an error if the given unit is <= 0.
func NewHistogramWithScale(histogram metrics.Histogram, unit time.Duration) (ScalableHistogram, error) {
if unit <= 0 {
return nil, errors.New("invalid time unit")
}
return &HistogramWithScale{
histogram: histogram,
unit: unit,
}, nil
}
// MultiHistogram collects multiple individual histograms and treats them as a unit.
type MultiHistogram []ScalableHistogram
// ObserveFromStart implements ScalableHistogram.
func (h MultiHistogram) ObserveFromStart(start time.Time) {
for _, histogram := range h {
histogram.ObserveFromStart(start)
}
}
// Observe implements ScalableHistogram.
func (h MultiHistogram) Observe(v float64) {
for _, histogram := range h {
histogram.Observe(v)
}
}
// With implements ScalableHistogram.
func (h MultiHistogram) With(labelValues ...string) ScalableHistogram {
next := make(MultiHistogram, len(h))
for i := range h {
next[i] = h[i].With(labelValues...)
}
return next
}

View file

@ -0,0 +1,121 @@
package metrics
import (
"bytes"
"net/http"
"strings"
"testing"
"time"
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/generic"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestScalableHistogram(t *testing.T) {
h := generic.NewHistogram("test", 1)
sh, err := NewHistogramWithScale(h, time.Millisecond)
require.NoError(t, err)
ticker := time.NewTicker(500 * time.Millisecond)
<-ticker.C
start := time.Now()
<-ticker.C
sh.ObserveFromStart(start)
var b bytes.Buffer
h.Print(&b)
extractedDurationString := strings.Split(strings.Split(b.String(), "\n")[1], " ")
measuredDuration, err := time.ParseDuration(extractedDurationString[0] + "ms")
assert.NoError(t, err)
assert.InDelta(t, 500*time.Millisecond, measuredDuration, float64(15*time.Millisecond))
}
func TestNewMultiRegistry(t *testing.T) {
registries := []Registry{newCollectingRetryMetrics(), newCollectingRetryMetrics()}
registry := NewMultiRegistry(registries)
registry.ServiceReqsCounter().With(nil, "key", "requests").Add(1)
registry.ServiceReqDurationHistogram().With("key", "durations").Observe(float64(2))
registry.ServiceRetriesCounter().With("key", "retries").Add(3)
for _, collectingRegistry := range registries {
cReqsCounter := collectingRegistry.ServiceReqsCounter().(*counterWithHeadersMock)
cReqDurationHistogram := collectingRegistry.ServiceReqDurationHistogram().(*histogramMock)
cRetriesCounter := collectingRegistry.ServiceRetriesCounter().(*counterMock)
wantCounterValue := float64(1)
if cReqsCounter.counterValue != wantCounterValue {
t.Errorf("Got value %f for ReqsCounter, want %f", cReqsCounter.counterValue, wantCounterValue)
}
wantHistogramValue := float64(2)
if cReqDurationHistogram.lastHistogramValue != wantHistogramValue {
t.Errorf("Got last observation %f for ReqDurationHistogram, want %f", cReqDurationHistogram.lastHistogramValue, wantHistogramValue)
}
wantCounterValue = float64(3)
if cRetriesCounter.counterValue != wantCounterValue {
t.Errorf("Got value %f for RetriesCounter, want %f", cRetriesCounter.counterValue, wantCounterValue)
}
assert.Equal(t, []string{"key", "requests"}, cReqsCounter.lastLabelValues)
assert.Equal(t, []string{"key", "durations"}, cReqDurationHistogram.lastLabelValues)
assert.Equal(t, []string{"key", "retries"}, cRetriesCounter.lastLabelValues)
}
}
func newCollectingRetryMetrics() Registry {
return &standardRegistry{
serviceReqsCounter: &counterWithHeadersMock{},
serviceReqDurationHistogram: &histogramMock{},
serviceRetriesCounter: &counterMock{},
}
}
type counterMock struct {
counterValue float64
lastLabelValues []string
}
func (c *counterMock) With(labelValues ...string) metrics.Counter {
c.lastLabelValues = labelValues
return c
}
func (c *counterMock) Add(delta float64) {
c.counterValue += delta
}
type counterWithHeadersMock struct {
counterValue float64
lastLabelValues []string
}
func (c *counterWithHeadersMock) With(_ http.Header, labelValues ...string) CounterWithHeaders {
c.lastLabelValues = labelValues
return c
}
func (c *counterWithHeadersMock) Add(delta float64) {
c.counterValue += delta
}
type histogramMock struct {
lastHistogramValue float64
lastLabelValues []string
}
func (c *histogramMock) With(labelValues ...string) ScalableHistogram {
c.lastLabelValues = labelValues
return c
}
func (c *histogramMock) Start() {}
func (c *histogramMock) ObserveFromStart(t time.Time) {}
func (c *histogramMock) Observe(v float64) {
c.lastHistogramValue = v
}

View file

@ -0,0 +1,513 @@
package metrics
import (
"context"
"fmt"
"net"
"net/url"
"strings"
"sync"
"time"
"github.com/go-kit/kit/metrics"
"github.com/rs/zerolog/log"
"github.com/traefik/traefik/v3/pkg/observability"
otypes "github.com/traefik/traefik/v3/pkg/observability/types"
"github.com/traefik/traefik/v3/pkg/types"
"github.com/traefik/traefik/v3/pkg/version"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
"go.opentelemetry.io/otel/metric"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
"go.opentelemetry.io/otel/semconv/v1.37.0/httpconv"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/encoding/gzip"
)
var (
openTelemetryMeterProvider *sdkmetric.MeterProvider
openTelemetryGaugeCollector *gaugeCollector
)
// SetMeterProvider sets the meter provider for the tests.
func SetMeterProvider(meterProvider *sdkmetric.MeterProvider) {
openTelemetryMeterProvider = meterProvider
otel.SetMeterProvider(meterProvider)
}
// SemConvMetricsRegistry holds stables semantic conventions metric instruments.
type SemConvMetricsRegistry struct {
// server metrics
httpServerRequestDuration httpconv.ServerRequestDuration
// client metrics
httpClientRequestDuration httpconv.ClientRequestDuration
}
// NewSemConvMetricRegistry registers all stables semantic conventions metrics.
func NewSemConvMetricRegistry(ctx context.Context, config *otypes.OTLP) (*SemConvMetricsRegistry, error) {
if err := observability.EnsureUserEnvVar(); err != nil {
return nil, err
}
if openTelemetryMeterProvider == nil {
var err error
if openTelemetryMeterProvider, err = newOpenTelemetryMeterProvider(ctx, config); err != nil {
log.Ctx(ctx).Err(err).Msg("Unable to create OpenTelemetry meter provider")
return nil, nil
}
}
meter := otel.Meter("github.com/traefik/traefik",
metric.WithInstrumentationVersion(version.Version))
httpServerRequestDuration, err := httpconv.NewServerRequestDuration(meter,
metric.WithExplicitBucketBoundaries(config.ExplicitBoundaries...))
if err != nil {
return nil, fmt.Errorf("can't build httpServerRequestDuration histogram: %w", err)
}
httpClientRequestDuration, err := httpconv.NewClientRequestDuration(meter,
metric.WithExplicitBucketBoundaries(config.ExplicitBoundaries...))
if err != nil {
return nil, fmt.Errorf("can't build httpClientRequestDuration histogram: %w", err)
}
return &SemConvMetricsRegistry{
httpServerRequestDuration: httpServerRequestDuration,
httpClientRequestDuration: httpClientRequestDuration,
}, nil
}
// HTTPServerRequestDuration returns the HTTP server request duration histogram.
func (s *SemConvMetricsRegistry) HTTPServerRequestDuration() httpconv.ServerRequestDuration {
if s == nil {
return httpconv.ServerRequestDuration{}
}
return s.httpServerRequestDuration
}
// HTTPClientRequestDuration returns the HTTP client request duration histogram.
func (s *SemConvMetricsRegistry) HTTPClientRequestDuration() httpconv.ClientRequestDuration {
if s == nil {
return httpconv.ClientRequestDuration{}
}
return s.httpClientRequestDuration
}
// RegisterOpenTelemetry registers all OpenTelemetry metrics.
func RegisterOpenTelemetry(ctx context.Context, config *otypes.OTLP) Registry {
if openTelemetryMeterProvider == nil {
var err error
if openTelemetryMeterProvider, err = newOpenTelemetryMeterProvider(ctx, config); err != nil {
log.Ctx(ctx).Err(err).Msg("Unable to create OpenTelemetry meter provider")
return nil
}
}
if openTelemetryGaugeCollector == nil {
openTelemetryGaugeCollector = newOpenTelemetryGaugeCollector()
}
meter := otel.Meter("github.com/traefik/traefik",
metric.WithInstrumentationVersion(version.Version))
reg := &standardRegistry{
epEnabled: config.AddEntryPointsLabels,
routerEnabled: config.AddRoutersLabels,
svcEnabled: config.AddServicesLabels,
configReloadsCounter: newOTLPCounterFrom(meter, configReloadsTotalName, "Config reloads"),
lastConfigReloadSuccessGauge: newOTLPGaugeFrom(meter, configLastReloadSuccessName, "Last config reload success", "ms"),
openConnectionsGauge: newOTLPGaugeFrom(meter, openConnectionsName, "How many open connections exist, by entryPoint and protocol", "1"),
tlsCertsNotAfterTimestampGauge: newOTLPGaugeFrom(meter, tlsCertsNotAfterTimestampName, "Certificate expiration timestamp", "ms"),
}
if config.AddEntryPointsLabels {
reg.entryPointReqsCounter = NewCounterWithNoopHeaders(newOTLPCounterFrom(meter, entryPointReqsTotalName,
"How many HTTP requests processed on an entrypoint, partitioned by status code, protocol, and method."))
reg.entryPointReqsTLSCounter = newOTLPCounterFrom(meter, entryPointReqsTLSTotalName,
"How many HTTP requests with TLS processed on an entrypoint, partitioned by TLS Version and TLS cipher Used.")
reg.entryPointReqDurationHistogram, _ = NewHistogramWithScale(newOTLPHistogramFrom(meter, entryPointReqDurationName,
"How long it took to process the request on an entrypoint, partitioned by status code, protocol, and method.",
"s"), time.Second)
reg.entryPointReqsBytesCounter = newOTLPCounterFrom(meter, entryPointReqsBytesTotalName,
"The total size of requests in bytes handled by an entrypoint, partitioned by status code, protocol, and method.")
reg.entryPointRespsBytesCounter = newOTLPCounterFrom(meter, entryPointRespsBytesTotalName,
"The total size of responses in bytes handled by an entrypoint, partitioned by status code, protocol, and method.")
}
if config.AddRoutersLabels {
reg.routerReqsCounter = NewCounterWithNoopHeaders(newOTLPCounterFrom(meter, routerReqsTotalName,
"How many HTTP requests are processed on a router, partitioned by service, status code, protocol, and method."))
reg.routerReqsTLSCounter = newOTLPCounterFrom(meter, routerReqsTLSTotalName,
"How many HTTP requests with TLS are processed on a router, partitioned by service, TLS Version, and TLS cipher Used.")
reg.routerReqDurationHistogram, _ = NewHistogramWithScale(newOTLPHistogramFrom(meter, routerReqDurationName,
"How long it took to process the request on a router, partitioned by service, status code, protocol, and method.",
"s"), time.Second)
reg.routerReqsBytesCounter = newOTLPCounterFrom(meter, routerReqsBytesTotalName,
"The total size of requests in bytes handled by a router, partitioned by status code, protocol, and method.")
reg.routerRespsBytesCounter = newOTLPCounterFrom(meter, routerRespsBytesTotalName,
"The total size of responses in bytes handled by a router, partitioned by status code, protocol, and method.")
}
if config.AddServicesLabels {
reg.serviceReqsCounter = NewCounterWithNoopHeaders(newOTLPCounterFrom(meter, serviceReqsTotalName,
"How many HTTP requests processed on a service, partitioned by status code, protocol, and method."))
reg.serviceReqsTLSCounter = newOTLPCounterFrom(meter, serviceReqsTLSTotalName,
"How many HTTP requests with TLS processed on a service, partitioned by TLS version and TLS cipher.")
reg.serviceReqDurationHistogram, _ = NewHistogramWithScale(newOTLPHistogramFrom(meter, serviceReqDurationName,
"How long it took to process the request on a service, partitioned by status code, protocol, and method.",
"s"), time.Second)
reg.serviceRetriesCounter = newOTLPCounterFrom(meter, serviceRetriesTotalName,
"How many request retries happened on a service.")
reg.serviceServerUpGauge = newOTLPGaugeFrom(meter, serviceServerUpName,
"service server is up, described by gauge value of 0 or 1.",
"1")
reg.serviceReqsBytesCounter = newOTLPCounterFrom(meter, serviceReqsBytesTotalName,
"The total size of requests in bytes received by a service, partitioned by status code, protocol, and method.")
reg.serviceRespsBytesCounter = newOTLPCounterFrom(meter, serviceRespsBytesTotalName,
"The total size of responses in bytes returned by a service, partitioned by status code, protocol, and method.")
}
return reg
}
// StopOpenTelemetry stops and resets Open-Telemetry client.
func StopOpenTelemetry() {
if openTelemetryMeterProvider == nil {
return
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := openTelemetryMeterProvider.Shutdown(ctx); err != nil {
log.Err(err).Msg("Unable to shutdown OpenTelemetry meter provider")
}
openTelemetryMeterProvider = nil
}
// newOpenTelemetryMeterProvider creates a new controller.Controller.
func newOpenTelemetryMeterProvider(ctx context.Context, config *otypes.OTLP) (*sdkmetric.MeterProvider, error) {
var (
exporter sdkmetric.Exporter
err error
)
if config.GRPC != nil {
exporter, err = newGRPCExporter(ctx, config.GRPC)
} else {
exporter, err = newHTTPExporter(ctx, config.HTTP)
}
if err != nil {
return nil, fmt.Errorf("creating exporter: %w", err)
}
var resAttrs []attribute.KeyValue
for k, v := range config.ResourceAttributes {
resAttrs = append(resAttrs, attribute.String(k, v))
}
res, err := resource.New(ctx,
resource.WithContainer(),
resource.WithHost(),
resource.WithOS(),
resource.WithProcess(),
resource.WithTelemetrySDK(),
resource.WithDetectors(types.K8sAttributesDetector{}),
// The following order allows the user to override the service name and version,
// as well as any other attributes set by the above detectors.
resource.WithAttributes(
semconv.ServiceName(config.ServiceName),
semconv.ServiceVersion(version.Version),
),
resource.WithAttributes(resAttrs...),
// Use the environment variables to allow overriding above resource attributes.
resource.WithFromEnv(),
)
if err != nil {
return nil, fmt.Errorf("building resource: %w", err)
}
opts := []sdkmetric.PeriodicReaderOption{
sdkmetric.WithInterval(time.Duration(config.PushInterval)),
}
meterProvider := sdkmetric.NewMeterProvider(
sdkmetric.WithResource(res),
sdkmetric.WithReader(sdkmetric.NewPeriodicReader(exporter, opts...)),
// View to customize histogram buckets and rename a single histogram instrument.
sdkmetric.WithView(sdkmetric.NewView(
sdkmetric.Instrument{Name: "traefik_*_request_duration_seconds"},
sdkmetric.Stream{Aggregation: sdkmetric.AggregationExplicitBucketHistogram{
Boundaries: config.ExplicitBoundaries,
}},
)),
)
otel.SetMeterProvider(meterProvider)
return meterProvider, nil
}
func newHTTPExporter(ctx context.Context, config *otypes.OTelHTTP) (sdkmetric.Exporter, error) {
endpoint, err := url.Parse(config.Endpoint)
if err != nil {
return nil, fmt.Errorf("invalid collector endpoint %q: %w", config.Endpoint, err)
}
opts := []otlpmetrichttp.Option{
otlpmetrichttp.WithEndpoint(endpoint.Host),
otlpmetrichttp.WithHeaders(config.Headers),
otlpmetrichttp.WithCompression(otlpmetrichttp.GzipCompression),
}
if endpoint.Scheme == "http" {
opts = append(opts, otlpmetrichttp.WithInsecure())
}
if endpoint.Path != "" {
opts = append(opts, otlpmetrichttp.WithURLPath(endpoint.Path))
}
if config.TLS != nil {
tlsConfig, err := config.TLS.CreateTLSConfig(ctx)
if err != nil {
return nil, fmt.Errorf("creating TLS client config: %w", err)
}
opts = append(opts, otlpmetrichttp.WithTLSClientConfig(tlsConfig))
}
return otlpmetrichttp.New(ctx, opts...)
}
func newGRPCExporter(ctx context.Context, config *otypes.OTelGRPC) (sdkmetric.Exporter, error) {
host, port, err := net.SplitHostPort(config.Endpoint)
if err != nil {
return nil, fmt.Errorf("invalid collector endpoint %q: %w", config.Endpoint, err)
}
opts := []otlpmetricgrpc.Option{
otlpmetricgrpc.WithEndpoint(fmt.Sprintf("%s:%s", host, port)),
otlpmetricgrpc.WithHeaders(config.Headers),
otlpmetricgrpc.WithCompressor(gzip.Name),
}
if config.Insecure {
opts = append(opts, otlpmetricgrpc.WithInsecure())
}
if config.TLS != nil {
tlsConfig, err := config.TLS.CreateTLSConfig(ctx)
if err != nil {
return nil, fmt.Errorf("creating TLS client config: %w", err)
}
opts = append(opts, otlpmetricgrpc.WithTLSCredentials(credentials.NewTLS(tlsConfig)))
}
return otlpmetricgrpc.New(ctx, opts...)
}
func newOTLPCounterFrom(meter metric.Meter, name, desc string) *otelCounter {
c, _ := meter.Float64Counter(name,
metric.WithDescription(desc),
metric.WithUnit("1"),
)
return &otelCounter{
ip: c,
}
}
type otelCounter struct {
labelNamesValues otelLabelNamesValues
ip metric.Float64Counter
}
func (c *otelCounter) With(labelValues ...string) metrics.Counter {
return &otelCounter{
labelNamesValues: c.labelNamesValues.With(labelValues...),
ip: c.ip,
}
}
func (c *otelCounter) Add(delta float64) {
c.ip.Add(context.Background(), delta, metric.WithAttributes(c.labelNamesValues.ToLabels()...))
}
type gaugeValue struct {
attributes otelLabelNamesValues
value float64
}
type gaugeCollector struct {
mu sync.Mutex
values map[string]map[string]gaugeValue
}
func newOpenTelemetryGaugeCollector() *gaugeCollector {
return &gaugeCollector{
values: make(map[string]map[string]gaugeValue),
}
}
func (c *gaugeCollector) add(name string, delta float64, attributes otelLabelNamesValues) {
c.mu.Lock()
defer c.mu.Unlock()
str := strings.Join(attributes, "")
if _, exists := c.values[name]; !exists {
c.values[name] = map[string]gaugeValue{
str: {
attributes: attributes,
value: delta,
},
}
return
}
v, exists := c.values[name][str]
if !exists {
c.values[name][str] = gaugeValue{
attributes: attributes,
value: delta,
}
return
}
c.values[name][str] = gaugeValue{
attributes: attributes,
value: v.value + delta,
}
}
func (c *gaugeCollector) set(name string, value float64, attributes otelLabelNamesValues) {
c.mu.Lock()
defer c.mu.Unlock()
if _, exists := c.values[name]; !exists {
c.values[name] = make(map[string]gaugeValue)
}
c.values[name][strings.Join(attributes, "")] = gaugeValue{
attributes: attributes,
value: value,
}
}
func newOTLPGaugeFrom(meter metric.Meter, name, desc string, unit string) *otelGauge {
openTelemetryGaugeCollector.values[name] = make(map[string]gaugeValue)
c, _ := meter.Float64ObservableGauge(name,
metric.WithDescription(desc),
metric.WithUnit(unit),
)
_, err := meter.RegisterCallback(func(ctx context.Context, observer metric.Observer) error {
openTelemetryGaugeCollector.mu.Lock()
defer openTelemetryGaugeCollector.mu.Unlock()
values, exists := openTelemetryGaugeCollector.values[name]
if !exists {
return nil
}
for _, value := range values {
observer.ObserveFloat64(c, value.value, metric.WithAttributes(value.attributes.ToLabels()...))
}
return nil
}, c)
if err != nil {
log.Err(err).Msg("Unable to register OpenTelemetry meter callback")
}
return &otelGauge{
ip: c,
name: name,
}
}
type otelGauge struct {
labelNamesValues otelLabelNamesValues
ip metric.Float64ObservableGauge
name string
}
func (g *otelGauge) With(labelValues ...string) metrics.Gauge {
return &otelGauge{
labelNamesValues: g.labelNamesValues.With(labelValues...),
ip: g.ip,
name: g.name,
}
}
func (g *otelGauge) Add(delta float64) {
openTelemetryGaugeCollector.add(g.name, delta, g.labelNamesValues)
}
func (g *otelGauge) Set(value float64) {
openTelemetryGaugeCollector.set(g.name, value, g.labelNamesValues)
}
func newOTLPHistogramFrom(meter metric.Meter, name, desc string, unit string) *otelHistogram {
c, _ := meter.Float64Histogram(name,
metric.WithDescription(desc),
metric.WithUnit(unit),
)
return &otelHistogram{
ip: c,
}
}
type otelHistogram struct {
labelNamesValues otelLabelNamesValues
ip metric.Float64Histogram
}
func (h *otelHistogram) With(labelValues ...string) metrics.Histogram {
return &otelHistogram{
labelNamesValues: h.labelNamesValues.With(labelValues...),
ip: h.ip,
}
}
func (h *otelHistogram) Observe(incr float64) {
h.ip.Record(context.Background(), incr, metric.WithAttributes(h.labelNamesValues.ToLabels()...))
}
// otelLabelNamesValues is the equivalent of prometheus' labelNamesValues
// but adapted to OpenTelemetry.
// otelLabelNamesValues is a type alias that provides validation on its With
// method.
// Metrics may include it as a member to help them satisfy With semantics and
// save some code duplication.
type otelLabelNamesValues []string
// With validates the input, and returns a new aggregate otelLabelNamesValues.
func (lvs otelLabelNamesValues) With(labelValues ...string) otelLabelNamesValues {
if len(labelValues)%2 != 0 {
labelValues = append(labelValues, "unknown")
}
return append(lvs, labelValues...)
}
// ToLabels is a convenience method to convert a otelLabelNamesValues
// to the native attribute.KeyValue.
func (lvs otelLabelNamesValues) ToLabels() []attribute.KeyValue {
labels := make([]attribute.KeyValue, len(lvs)/2)
for i := 0; i < len(labels); i++ {
labels[i] = attribute.String(lvs[2*i], lvs[2*i+1])
}
return labels
}

View file

@ -0,0 +1,492 @@
package metrics
import (
"compress/gzip"
"encoding/json"
"fmt"
"io"
"net/http"
"net/http/httptest"
"regexp"
"strconv"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
ptypes "github.com/traefik/paerser/types"
otypes "github.com/traefik/traefik/v3/pkg/observability/types"
"github.com/traefik/traefik/v3/pkg/version"
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
"go.opentelemetry.io/otel/attribute"
)
func TestOpenTelemetry_labels(t *testing.T) {
tests := []struct {
desc string
values otelLabelNamesValues
with []string
expect []attribute.KeyValue
}{
{
desc: "with no starting value",
values: otelLabelNamesValues{},
expect: []attribute.KeyValue{},
},
{
desc: "with one starting value",
values: otelLabelNamesValues{"foo"},
expect: []attribute.KeyValue{},
},
{
desc: "with two starting value",
values: otelLabelNamesValues{"foo", "bar"},
expect: []attribute.KeyValue{attribute.String("foo", "bar")},
},
{
desc: "with no starting value, and with one other value",
values: otelLabelNamesValues{},
with: []string{"baz"},
expect: []attribute.KeyValue{attribute.String("baz", "unknown")},
},
{
desc: "with no starting value, and with two other value",
values: otelLabelNamesValues{},
with: []string{"baz", "buz"},
expect: []attribute.KeyValue{attribute.String("baz", "buz")},
},
{
desc: "with one starting value, and with one other value",
values: otelLabelNamesValues{"foo"},
with: []string{"baz"},
expect: []attribute.KeyValue{attribute.String("foo", "baz")},
},
{
desc: "with one starting value, and with two other value",
values: otelLabelNamesValues{"foo"},
with: []string{"baz", "buz"},
expect: []attribute.KeyValue{attribute.String("foo", "baz")},
},
{
desc: "with two starting value, and with one other value",
values: otelLabelNamesValues{"foo", "bar"},
with: []string{"baz"},
expect: []attribute.KeyValue{
attribute.String("foo", "bar"),
attribute.String("baz", "unknown"),
},
},
{
desc: "with two starting value, and with two other value",
values: otelLabelNamesValues{"foo", "bar"},
with: []string{"baz", "buz"},
expect: []attribute.KeyValue{
attribute.String("foo", "bar"),
attribute.String("baz", "buz"),
},
},
}
for _, test := range tests {
t.Run(test.desc, func(t *testing.T) {
t.Parallel()
assert.Equal(t, test.expect, test.values.With(test.with...).ToLabels())
})
}
}
func TestOpenTelemetry_GaugeCollectorAdd(t *testing.T) {
tests := []struct {
desc string
gc *gaugeCollector
delta float64
name string
attributes otelLabelNamesValues
expect map[string]map[string]gaugeValue
}{
{
desc: "empty collector",
gc: newOpenTelemetryGaugeCollector(),
delta: 1,
name: "foo",
expect: map[string]map[string]gaugeValue{
"foo": {"": {value: 1}},
},
},
{
desc: "initialized collector",
gc: &gaugeCollector{
values: map[string]map[string]gaugeValue{
"foo": {"": {value: 1}},
},
},
delta: 1,
name: "foo",
expect: map[string]map[string]gaugeValue{
"foo": {"": {value: 2}},
},
},
{
desc: "initialized collector, values with label (only the last one counts)",
gc: &gaugeCollector{
values: map[string]map[string]gaugeValue{
"foo": {
"bar": {
attributes: otelLabelNamesValues{"bar"},
value: 1,
},
},
},
},
delta: 1,
name: "foo",
expect: map[string]map[string]gaugeValue{
"foo": {
"": {
value: 1,
},
"bar": {
attributes: otelLabelNamesValues{"bar"},
value: 1,
},
},
},
},
{
desc: "initialized collector, values with label on set",
gc: &gaugeCollector{
values: map[string]map[string]gaugeValue{
"foo": {"bar": {value: 1}},
},
},
delta: 1,
name: "foo",
attributes: otelLabelNamesValues{"baz"},
expect: map[string]map[string]gaugeValue{
"foo": {
"bar": {
value: 1,
},
"baz": {
value: 1,
attributes: otelLabelNamesValues{"baz"},
},
},
},
},
}
for _, test := range tests {
t.Run(test.desc, func(t *testing.T) {
t.Parallel()
test.gc.add(test.name, test.delta, test.attributes)
assert.Equal(t, test.expect, test.gc.values)
})
}
}
func TestOpenTelemetry_GaugeCollectorSet(t *testing.T) {
tests := []struct {
desc string
gc *gaugeCollector
value float64
name string
attributes otelLabelNamesValues
expect map[string]map[string]gaugeValue
}{
{
desc: "empty collector",
gc: newOpenTelemetryGaugeCollector(),
value: 1,
name: "foo",
expect: map[string]map[string]gaugeValue{
"foo": {"": {value: 1}},
},
},
{
desc: "initialized collector",
gc: &gaugeCollector{
values: map[string]map[string]gaugeValue{
"foo": {"": {value: 1}},
},
},
value: 1,
name: "foo",
expect: map[string]map[string]gaugeValue{
"foo": {"": {value: 1}},
},
},
{
desc: "initialized collector, values with label",
gc: &gaugeCollector{
values: map[string]map[string]gaugeValue{
"foo": {
"bar": {
attributes: otelLabelNamesValues{"bar"},
value: 1,
},
},
},
},
value: 1,
name: "foo",
expect: map[string]map[string]gaugeValue{
"foo": {
"": {
value: 1,
},
"bar": {
attributes: otelLabelNamesValues{"bar"},
value: 1,
},
},
},
},
{
desc: "initialized collector, values with label on set",
gc: &gaugeCollector{
values: map[string]map[string]gaugeValue{
"foo": {"": {value: 1}},
},
},
value: 1,
name: "foo",
attributes: otelLabelNamesValues{"bar"},
expect: map[string]map[string]gaugeValue{
"foo": {
"": {
value: 1,
},
"bar": {
value: 1,
attributes: otelLabelNamesValues{"bar"},
},
},
},
},
}
for _, test := range tests {
t.Run(test.desc, func(t *testing.T) {
t.Parallel()
test.gc.set(test.name, test.value, test.attributes)
assert.Equal(t, test.expect, test.gc.values)
})
}
}
func TestOpenTelemetry(t *testing.T) {
tests := []struct {
desc string
serviceName string
}{
{
desc: "default",
},
{
desc: "custom-service-name",
serviceName: "custom-service-name",
},
}
for _, test := range tests {
t.Run(test.desc, func(t *testing.T) {
c := make(chan *string, 5)
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
gzr, err := gzip.NewReader(r.Body)
require.NoError(t, err)
body, err := io.ReadAll(gzr)
require.NoError(t, err)
req := pmetricotlp.NewExportRequest()
err = req.UnmarshalProto(body)
require.NoError(t, err)
marshalledReq, err := json.Marshal(req)
require.NoError(t, err)
bodyStr := string(marshalledReq)
c <- &bodyStr
w.WriteHeader(http.StatusOK)
}))
t.Cleanup(func() {
StopOpenTelemetry()
ts.Close()
})
var cfg otypes.OTLP
(&cfg).SetDefaults()
cfg.AddRoutersLabels = true
cfg.HTTP = &otypes.OTelHTTP{
Endpoint: ts.URL,
}
cfg.PushInterval = ptypes.Duration(10 * time.Millisecond)
wantServiceName := "traefik"
if test.serviceName != "" {
cfg.ServiceName = test.serviceName
wantServiceName = test.serviceName
}
registry := RegisterOpenTelemetry(t.Context(), &cfg)
require.NotNil(t, registry)
if !registry.IsEpEnabled() || !registry.IsRouterEnabled() || !registry.IsSvcEnabled() {
t.Fatalf("registry should return true for IsEnabled(), IsRouterEnabled() and IsSvcEnabled()")
}
expected := []string{
`({"key":"service.name","value":{"stringValue":"` + wantServiceName + `"}})`,
`({"key":"service.version","value":{"stringValue":"` + version.Version + `"}})`,
}
tryAssertMessage(t, c, expected)
// TODO: the len of startUnixNano is no supposed to be 20, it should be 19
expectedConfig := []string{
`({"name":"traefik_config_reloads_total","description":"Config reloads","unit":"1","sum":{"dataPoints":\[{"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":1}\],"aggregationTemporality":2,"isMonotonic":true}})`,
`({"name":"traefik_config_last_reload_success","description":"Last config reload success","unit":"ms","gauge":{"dataPoints":\[{"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":1}\]}})`,
`({"name":"traefik_open_connections","description":"How many open connections exist, by entryPoint and protocol","unit":"1","gauge":{"dataPoints":\[{"attributes":\[{"key":"entrypoint","value":{"stringValue":"test"}},{"key":"protocol","value":{"stringValue":"TCP"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":1}\]}})`,
}
registry.ConfigReloadsCounter().Add(1)
registry.LastConfigReloadSuccessGauge().Set(1)
registry.OpenConnectionsGauge().With("entrypoint", "test", "protocol", "TCP").Set(1)
tryAssertMessage(t, c, expectedConfig)
expectedTLSCerts := []string{
`({"name":"traefik_tls_certs_not_after","description":"Certificate expiration timestamp","unit":"ms","gauge":{"dataPoints":\[{"attributes":\[{"key":"key","value":{"stringValue":"value"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":1}\]}})`,
}
registry.TLSCertsNotAfterTimestampGauge().With("key", "value").Set(1)
tryAssertMessage(t, c, expectedTLSCerts)
expectedEntryPoints := []string{
`({"name":"traefik_entrypoint_requests_total","description":"How many HTTP requests processed on an entrypoint, partitioned by status code, protocol, and method.","unit":"1","sum":{"dataPoints":\[{"attributes":\[{"key":"code","value":{"stringValue":"200"}},{"key":"entrypoint","value":{"stringValue":"test1"}},{"key":"method","value":{"stringValue":"GET"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":1}\],"aggregationTemporality":2,"isMonotonic":true}})`,
`({"name":"traefik_entrypoint_requests_tls_total","description":"How many HTTP requests with TLS processed on an entrypoint, partitioned by TLS Version and TLS cipher Used.","unit":"1","sum":{"dataPoints":\[{"attributes":\[{"key":"entrypoint","value":{"stringValue":"test2"}},{"key":"tls_cipher","value":{"stringValue":"bar"}},{"key":"tls_version","value":{"stringValue":"foo"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":1}\],"aggregationTemporality":2,"isMonotonic":true}})`,
`({"name":"traefik_entrypoint_request_duration_seconds","description":"How long it took to process the request on an entrypoint, partitioned by status code, protocol, and method.","unit":"s","histogram":{"dataPoints":\[{"attributes":\[{"key":"entrypoint","value":{"stringValue":"test3"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","count":"1","sum":10000,"bucketCounts":\["0","0","0","0","0","0","0","0","0","0","0","0","0","0","1"\],"explicitBounds":\[0.005,0.01,0.025,0.05,0.075,0.1,0.25,0.5,0.75,1,2.5,5,7.5,10\],"min":10000,"max":10000}\],"aggregationTemporality":2}})`,
`({"name":"traefik_entrypoint_requests_bytes_total","description":"The total size of requests in bytes handled by an entrypoint, partitioned by status code, protocol, and method.","unit":"1","sum":{"dataPoints":\[{"attributes":\[{"key":"code","value":{"stringValue":"200"}},{"key":"entrypoint","value":{"stringValue":"test1"}},{"key":"method","value":{"stringValue":"GET"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":1}\],"aggregationTemporality":2,"isMonotonic":true}})`,
`({"name":"traefik_entrypoint_responses_bytes_total","description":"The total size of responses in bytes handled by an entrypoint, partitioned by status code, protocol, and method.","unit":"1","sum":{"dataPoints":\[{"attributes":\[{"key":"code","value":{"stringValue":"200"}},{"key":"entrypoint","value":{"stringValue":"test1"}},{"key":"method","value":{"stringValue":"GET"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":1}\],"aggregationTemporality":2,"isMonotonic":true}})`,
}
registry.EntryPointReqsCounter().With(nil, "entrypoint", "test1", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1)
registry.EntryPointReqsTLSCounter().With("entrypoint", "test2", "tls_version", "foo", "tls_cipher", "bar").Add(1)
registry.EntryPointReqDurationHistogram().With("entrypoint", "test3").Observe(10000)
registry.EntryPointReqsBytesCounter().With("entrypoint", "test1", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1)
registry.EntryPointRespsBytesCounter().With("entrypoint", "test1", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1)
tryAssertMessage(t, c, expectedEntryPoints)
expectedRouters := []string{
`({"name":"traefik_router_requests_total","description":"How many HTTP requests are processed on a router, partitioned by service, status code, protocol, and method.","unit":"1","sum":{"dataPoints":\[{"attributes":\[{"key":"code","value":{"stringValue":"(?:200|404)"}},{"key":"method","value":{"stringValue":"GET"}},{"key":"router","value":{"stringValue":"RouterReqsCounter"}},{"key":"service","value":{"stringValue":"test"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":1},{"attributes":\[{"key":"code","value":{"stringValue":"(?:200|404)"}},{"key":"method","value":{"stringValue":"GET"}},{"key":"router","value":{"stringValue":"RouterReqsCounter"}},{"key":"service","value":{"stringValue":"test"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":1}\],"aggregationTemporality":2,"isMonotonic":true}})`,
`({"name":"traefik_router_requests_tls_total","description":"How many HTTP requests with TLS are processed on a router, partitioned by service, TLS Version, and TLS cipher Used.","unit":"1","sum":{"dataPoints":\[{"attributes":\[{"key":"router","value":{"stringValue":"demo"}},{"key":"service","value":{"stringValue":"test"}},{"key":"tls_cipher","value":{"stringValue":"bar"}},{"key":"tls_version","value":{"stringValue":"foo"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":1}\],"aggregationTemporality":2,"isMonotonic":true}})`,
`({"name":"traefik_router_request_duration_seconds","description":"How long it took to process the request on a router, partitioned by service, status code, protocol, and method.","unit":"s","histogram":{"dataPoints":\[{"attributes":\[{"key":"code","value":{"stringValue":"200"}},{"key":"router","value":{"stringValue":"demo"}},{"key":"service","value":{"stringValue":"test"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","count":"1","sum":10000,"bucketCounts":\["0","0","0","0","0","0","0","0","0","0","0","0","0","0","1"\],"explicitBounds":\[0.005,0.01,0.025,0.05,0.075,0.1,0.25,0.5,0.75,1,2.5,5,7.5,10\],"min":10000,"max":10000}\],"aggregationTemporality":2}})`,
`({"name":"traefik_router_requests_bytes_total","description":"The total size of requests in bytes handled by a router, partitioned by status code, protocol, and method.","unit":"1","sum":{"dataPoints":\[{"attributes":\[{"key":"code","value":{"stringValue":"404"}},{"key":"method","value":{"stringValue":"GET"}},{"key":"router","value":{"stringValue":"RouterReqsCounter"}},{"key":"service","value":{"stringValue":"test"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":1}\],"aggregationTemporality":2,"isMonotonic":true}})`,
`({"name":"traefik_router_responses_bytes_total","description":"The total size of responses in bytes handled by a router, partitioned by status code, protocol, and method.","unit":"1","sum":{"dataPoints":\[{"attributes":\[{"key":"code","value":{"stringValue":"404"}},{"key":"method","value":{"stringValue":"GET"}},{"key":"router","value":{"stringValue":"RouterReqsCounter"}},{"key":"service","value":{"stringValue":"test"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":1}\],"aggregationTemporality":2,"isMonotonic":true}})`,
}
registry.RouterReqsCounter().With(nil, "router", "RouterReqsCounter", "service", "test", "code", strconv.Itoa(http.StatusNotFound), "method", http.MethodGet).Add(1)
registry.RouterReqsCounter().With(nil, "router", "RouterReqsCounter", "service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1)
registry.RouterReqsTLSCounter().With("router", "demo", "service", "test", "tls_version", "foo", "tls_cipher", "bar").Add(1)
registry.RouterReqDurationHistogram().With("router", "demo", "service", "test", "code", strconv.Itoa(http.StatusOK)).Observe(10000)
registry.RouterReqsBytesCounter().With("router", "RouterReqsCounter", "service", "test", "code", strconv.Itoa(http.StatusNotFound), "method", http.MethodGet).Add(1)
registry.RouterRespsBytesCounter().With("router", "RouterReqsCounter", "service", "test", "code", strconv.Itoa(http.StatusNotFound), "method", http.MethodGet).Add(1)
tryAssertMessage(t, c, expectedRouters)
expectedServices := []string{
`({"name":"traefik_service_requests_total","description":"How many HTTP requests processed on a service, partitioned by status code, protocol, and method.","unit":"1","sum":{"dataPoints":\[{"attributes":\[{"key":"code","value":{"stringValue":"(?:200|404)"}},{"key":"method","value":{"stringValue":"GET"}},{"key":"service","value":{"stringValue":"ServiceReqsCounter"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":1},{"attributes":\[{"key":"code","value":{"stringValue":"(?:200|404)"}},{"key":"method","value":{"stringValue":"GET"}},{"key":"service","value":{"stringValue":"ServiceReqsCounter"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":1}\],"aggregationTemporality":2,"isMonotonic":true}})`,
`({"name":"traefik_service_requests_tls_total","description":"How many HTTP requests with TLS processed on a service, partitioned by TLS version and TLS cipher.","unit":"1","sum":{"dataPoints":\[{"attributes":\[{"key":"service","value":{"stringValue":"test"}},{"key":"tls_cipher","value":{"stringValue":"bar"}},{"key":"tls_version","value":{"stringValue":"foo"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":1}\],"aggregationTemporality":2,"isMonotonic":true}})`,
`({"name":"traefik_service_request_duration_seconds","description":"How long it took to process the request on a service, partitioned by status code, protocol, and method.","unit":"s","histogram":{"dataPoints":\[{"attributes":\[{"key":"code","value":{"stringValue":"200"}},{"key":"service","value":{"stringValue":"test"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","count":"1","sum":10000,"bucketCounts":\["0","0","0","0","0","0","0","0","0","0","0","0","0","0","1"\],"explicitBounds":\[0.005,0.01,0.025,0.05,0.075,0.1,0.25,0.5,0.75,1,2.5,5,7.5,10\],"min":10000,"max":10000}\],"aggregationTemporality":2}})`,
`({"name":"traefik_service_server_up","description":"service server is up, described by gauge value of 0 or 1.","unit":"1","gauge":{"dataPoints":\[{"attributes":\[{"key":"service","value":{"stringValue":"test"}},{"key":"url","value":{"stringValue":"http://127.0.0.1"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":1}\]}})`,
`({"name":"traefik_service_requests_bytes_total","description":"The total size of requests in bytes received by a service, partitioned by status code, protocol, and method.","unit":"1","sum":{"dataPoints":\[{"attributes":\[{"key":"code","value":{"stringValue":"404"}},{"key":"method","value":{"stringValue":"GET"}},{"key":"service","value":{"stringValue":"ServiceReqsCounter"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":1}\],"aggregationTemporality":2,"isMonotonic":true}})`,
`({"name":"traefik_service_responses_bytes_total","description":"The total size of responses in bytes returned by a service, partitioned by status code, protocol, and method.","unit":"1","sum":{"dataPoints":\[{"attributes":\[{"key":"code","value":{"stringValue":"404"}},{"key":"method","value":{"stringValue":"GET"}},{"key":"service","value":{"stringValue":"ServiceReqsCounter"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":1}\],"aggregationTemporality":2,"isMonotonic":true}})`,
}
registry.ServiceReqsCounter().With(nil, "service", "ServiceReqsCounter", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1)
registry.ServiceReqsCounter().With(nil, "service", "ServiceReqsCounter", "code", strconv.Itoa(http.StatusNotFound), "method", http.MethodGet).Add(1)
registry.ServiceReqsTLSCounter().With("service", "test", "tls_version", "foo", "tls_cipher", "bar").Add(1)
registry.ServiceReqDurationHistogram().With("service", "test", "code", strconv.Itoa(http.StatusOK)).Observe(10000)
registry.ServiceServerUpGauge().With("service", "test", "url", "http://127.0.0.1").Set(1)
registry.ServiceReqsBytesCounter().With("service", "ServiceReqsCounter", "code", strconv.Itoa(http.StatusNotFound), "method", http.MethodGet).Add(1)
registry.ServiceRespsBytesCounter().With("service", "ServiceReqsCounter", "code", strconv.Itoa(http.StatusNotFound), "method", http.MethodGet).Add(1)
tryAssertMessage(t, c, expectedServices)
expectedServicesRetries := []string{
`({"attributes":\[{"key":"service","value":{"stringValue":"foobar"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":1})`,
`({"attributes":\[{"key":"service","value":{"stringValue":"test"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":2})`,
}
registry.ServiceRetriesCounter().With("service", "test").Add(1)
registry.ServiceRetriesCounter().With("service", "test").Add(1)
registry.ServiceRetriesCounter().With("service", "foobar").Add(1)
tryAssertMessage(t, c, expectedServicesRetries)
// We cannot rely on the previous expected pattern,
// because this pattern was for matching only one dataPoint in the histogram,
// and as soon as the EntryPointReqDurationHistogram.Observe is called,
// it adds a new dataPoint to the histogram.
expectedEntryPointReqDuration := []string{
`({"attributes":\[{"key":"entrypoint","value":{"stringValue":"myEntrypoint"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","count":"2","sum":30000,"bucketCounts":\["0","0","0","0","0","0","0","0","0","0","0","0","0","0","2"\],"explicitBounds":\[0.005,0.01,0.025,0.05,0.075,0.1,0.25,0.5,0.75,1,2.5,5,7.5,10\],"min":10000,"max":20000})`,
}
registry.EntryPointReqDurationHistogram().With("entrypoint", "myEntrypoint").Observe(10000)
registry.EntryPointReqDurationHistogram().With("entrypoint", "myEntrypoint").Observe(20000)
tryAssertMessage(t, c, expectedEntryPointReqDuration)
})
}
}
func assertMessage(t *testing.T, msg string, expected []string) {
t.Helper()
errs := verifyMessage(msg, expected)
for _, err := range errs {
t.Error(err)
}
}
func tryAssertMessage(t *testing.T, c chan *string, expected []string) {
t.Helper()
var errs []error
timeout := time.After(1 * time.Second)
for {
select {
case <-timeout:
for _, err := range errs {
t.Error(err)
}
case msg := <-c:
errs = verifyMessage(*msg, expected)
if len(errs) == 0 {
return
}
}
}
}
func verifyMessage(msg string, expected []string) []error {
var errs []error
for _, pattern := range expected {
re := regexp.MustCompile(pattern)
match := re.FindStringSubmatch(msg)
if len(match) != 2 {
errs = append(errs, fmt.Errorf("got %q %v, want %q", msg, match, pattern))
}
}
return errs
}

View file

@ -0,0 +1,672 @@
package metrics
import (
"context"
"errors"
"net/http"
"sync"
"time"
"github.com/go-kit/kit/metrics"
stdprometheus "github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/collectors"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/rs/zerolog/log"
"github.com/traefik/traefik/v3/pkg/config/dynamic"
otypes "github.com/traefik/traefik/v3/pkg/observability/types"
)
const (
// MetricNamePrefix prefix of all metric names.
MetricNamePrefix = "traefik_"
// server meta information.
metricConfigPrefix = MetricNamePrefix + "config_"
configReloadsTotalName = metricConfigPrefix + "reloads_total"
configLastReloadSuccessName = metricConfigPrefix + "last_reload_success"
openConnectionsName = MetricNamePrefix + "open_connections"
// TLS.
metricsTLSPrefix = MetricNamePrefix + "tls_"
tlsCertsNotAfterTimestampName = metricsTLSPrefix + "certs_not_after"
// entry point.
metricEntryPointPrefix = MetricNamePrefix + "entrypoint_"
entryPointReqsTotalName = metricEntryPointPrefix + "requests_total"
entryPointReqsTLSTotalName = metricEntryPointPrefix + "requests_tls_total"
entryPointReqDurationName = metricEntryPointPrefix + "request_duration_seconds"
entryPointReqsBytesTotalName = metricEntryPointPrefix + "requests_bytes_total"
entryPointRespsBytesTotalName = metricEntryPointPrefix + "responses_bytes_total"
// router level.
metricRouterPrefix = MetricNamePrefix + "router_"
routerReqsTotalName = metricRouterPrefix + "requests_total"
routerReqsTLSTotalName = metricRouterPrefix + "requests_tls_total"
routerReqDurationName = metricRouterPrefix + "request_duration_seconds"
routerReqsBytesTotalName = metricRouterPrefix + "requests_bytes_total"
routerRespsBytesTotalName = metricRouterPrefix + "responses_bytes_total"
// service level.
metricServicePrefix = MetricNamePrefix + "service_"
serviceReqsTotalName = metricServicePrefix + "requests_total"
serviceReqsTLSTotalName = metricServicePrefix + "requests_tls_total"
serviceReqDurationName = metricServicePrefix + "request_duration_seconds"
serviceRetriesTotalName = metricServicePrefix + "retries_total"
serviceServerUpName = metricServicePrefix + "server_up"
serviceReqsBytesTotalName = metricServicePrefix + "requests_bytes_total"
serviceRespsBytesTotalName = metricServicePrefix + "responses_bytes_total"
)
// promState holds all metric state internally and acts as the only Collector we register for Prometheus.
//
// This enables control to remove metrics that belong to outdated configuration.
// As an example why this is required, consider Traefik learns about a new service.
// It populates the 'traefik_server_service_up' metric for it with a value of 1 (alive).
// When the service is undeployed now the metric is still there in the client library
// and will be returned on the metrics endpoint until Traefik would be restarted.
//
// To solve this problem promState keeps track of Traefik's dynamic configuration.
// Metrics that "belong" to a dynamic configuration part like services or entryPoints
// are removed after they were scraped at least once when the corresponding object
// doesn't exist anymore.
var promState = newPrometheusState()
var promRegistry = stdprometheus.NewRegistry()
// PrometheusHandler exposes Prometheus routes.
func PrometheusHandler() http.Handler {
return promhttp.HandlerFor(promRegistry, promhttp.HandlerOpts{})
}
// RegisterPrometheus registers all Prometheus metrics.
// It must be called only once and failing to register the metrics will lead to a panic.
func RegisterPrometheus(ctx context.Context, config *otypes.Prometheus) Registry {
standardRegistry := initStandardRegistry(config)
if err := promRegistry.Register(collectors.NewProcessCollector(collectors.ProcessCollectorOpts{})); err != nil {
var arErr stdprometheus.AlreadyRegisteredError
if !errors.As(err, &arErr) {
log.Ctx(ctx).Warn().Msg("ProcessCollector is already registered")
}
}
if err := promRegistry.Register(collectors.NewGoCollector()); err != nil {
var arErr stdprometheus.AlreadyRegisteredError
if !errors.As(err, &arErr) {
log.Ctx(ctx).Warn().Msg("GoCollector is already registered")
}
}
if !registerPromState(ctx) {
return nil
}
return standardRegistry
}
func initStandardRegistry(config *otypes.Prometheus) Registry {
buckets := []float64{0.1, 0.3, 1.2, 5.0}
if config.Buckets != nil {
buckets = config.Buckets
}
configReloads := newCounterFrom(stdprometheus.CounterOpts{
Name: configReloadsTotalName,
Help: "Config reloads",
}, []string{})
lastConfigReloadSuccess := newGaugeFrom(stdprometheus.GaugeOpts{
Name: configLastReloadSuccessName,
Help: "Last config reload success",
}, []string{})
tlsCertsNotAfterTimestamp := newGaugeFrom(stdprometheus.GaugeOpts{
Name: tlsCertsNotAfterTimestampName,
Help: "Certificate expiration timestamp",
}, []string{"cn", "serial", "sans"})
openConnections := newGaugeFrom(stdprometheus.GaugeOpts{
Name: openConnectionsName,
Help: "How many open connections exist, by entryPoint and protocol",
}, []string{"entrypoint", "protocol"})
promState.vectors = []vector{
configReloads.cv,
lastConfigReloadSuccess.gv,
tlsCertsNotAfterTimestamp.gv,
openConnections.gv,
}
reg := &standardRegistry{
epEnabled: config.AddEntryPointsLabels,
routerEnabled: config.AddRoutersLabels,
svcEnabled: config.AddServicesLabels,
configReloadsCounter: configReloads,
lastConfigReloadSuccessGauge: lastConfigReloadSuccess,
tlsCertsNotAfterTimestampGauge: tlsCertsNotAfterTimestamp,
openConnectionsGauge: openConnections,
}
if config.AddEntryPointsLabels {
entryPointReqs := newCounterWithHeadersFrom(stdprometheus.CounterOpts{
Name: entryPointReqsTotalName,
Help: "How many HTTP requests processed on an entrypoint, partitioned by status code, protocol, and method.",
}, config.HeaderLabels, []string{"code", "method", "protocol", "entrypoint"})
entryPointReqsTLS := newCounterFrom(stdprometheus.CounterOpts{
Name: entryPointReqsTLSTotalName,
Help: "How many HTTP requests with TLS processed on an entrypoint, partitioned by TLS Version and TLS cipher Used.",
}, []string{"tls_version", "tls_cipher", "entrypoint"})
entryPointReqDurations := newHistogramFrom(stdprometheus.HistogramOpts{
Name: entryPointReqDurationName,
Help: "How long it took to process the request on an entrypoint, partitioned by status code, protocol, and method.",
Buckets: buckets,
}, []string{"code", "method", "protocol", "entrypoint"})
entryPointReqsBytesTotal := newCounterFrom(stdprometheus.CounterOpts{
Name: entryPointReqsBytesTotalName,
Help: "The total size of requests in bytes handled by an entrypoint, partitioned by status code, protocol, and method.",
}, []string{"code", "method", "protocol", "entrypoint"})
entryPointRespsBytesTotal := newCounterFrom(stdprometheus.CounterOpts{
Name: entryPointRespsBytesTotalName,
Help: "The total size of responses in bytes handled by an entrypoint, partitioned by status code, protocol, and method.",
}, []string{"code", "method", "protocol", "entrypoint"})
promState.vectors = append(promState.vectors,
entryPointReqs.cv,
entryPointReqsTLS.cv,
entryPointReqDurations.hv,
entryPointReqsBytesTotal.cv,
entryPointRespsBytesTotal.cv,
)
reg.entryPointReqsCounter = entryPointReqs
reg.entryPointReqsTLSCounter = entryPointReqsTLS
reg.entryPointReqDurationHistogram, _ = NewHistogramWithScale(entryPointReqDurations, time.Second)
reg.entryPointReqsBytesCounter = entryPointReqsBytesTotal
reg.entryPointRespsBytesCounter = entryPointRespsBytesTotal
}
if config.AddRoutersLabels {
routerReqs := newCounterWithHeadersFrom(stdprometheus.CounterOpts{
Name: routerReqsTotalName,
Help: "How many HTTP requests are processed on a router, partitioned by service, status code, protocol, and method.",
}, config.HeaderLabels, []string{"code", "method", "protocol", "router", "service"})
routerReqsTLS := newCounterFrom(stdprometheus.CounterOpts{
Name: routerReqsTLSTotalName,
Help: "How many HTTP requests with TLS are processed on a router, partitioned by service, TLS Version, and TLS cipher Used.",
}, []string{"tls_version", "tls_cipher", "router", "service"})
routerReqDurations := newHistogramFrom(stdprometheus.HistogramOpts{
Name: routerReqDurationName,
Help: "How long it took to process the request on a router, partitioned by service, status code, protocol, and method.",
Buckets: buckets,
}, []string{"code", "method", "protocol", "router", "service"})
routerReqsBytesTotal := newCounterFrom(stdprometheus.CounterOpts{
Name: routerReqsBytesTotalName,
Help: "The total size of requests in bytes handled by a router, partitioned by service, status code, protocol, and method.",
}, []string{"code", "method", "protocol", "router", "service"})
routerRespsBytesTotal := newCounterFrom(stdprometheus.CounterOpts{
Name: routerRespsBytesTotalName,
Help: "The total size of responses in bytes handled by a router, partitioned by service, status code, protocol, and method.",
}, []string{"code", "method", "protocol", "router", "service"})
promState.vectors = append(promState.vectors,
routerReqs.cv,
routerReqsTLS.cv,
routerReqDurations.hv,
routerReqsBytesTotal.cv,
routerRespsBytesTotal.cv,
)
reg.routerReqsCounter = routerReqs
reg.routerReqsTLSCounter = routerReqsTLS
reg.routerReqDurationHistogram, _ = NewHistogramWithScale(routerReqDurations, time.Second)
reg.routerReqsBytesCounter = routerReqsBytesTotal
reg.routerRespsBytesCounter = routerRespsBytesTotal
}
if config.AddServicesLabels {
serviceReqs := newCounterWithHeadersFrom(stdprometheus.CounterOpts{
Name: serviceReqsTotalName,
Help: "How many HTTP requests processed on a service, partitioned by status code, protocol, and method.",
}, config.HeaderLabels, []string{"code", "method", "protocol", "service"})
serviceReqsTLS := newCounterFrom(stdprometheus.CounterOpts{
Name: serviceReqsTLSTotalName,
Help: "How many HTTP requests with TLS processed on a service, partitioned by TLS version and TLS cipher.",
}, []string{"tls_version", "tls_cipher", "service"})
serviceReqDurations := newHistogramFrom(stdprometheus.HistogramOpts{
Name: serviceReqDurationName,
Help: "How long it took to process the request on a service, partitioned by status code, protocol, and method.",
Buckets: buckets,
}, []string{"code", "method", "protocol", "service"})
serviceRetries := newCounterFrom(stdprometheus.CounterOpts{
Name: serviceRetriesTotalName,
Help: "How many request retries happened on a service.",
}, []string{"service"})
serviceServerUp := newGaugeFrom(stdprometheus.GaugeOpts{
Name: serviceServerUpName,
Help: "service server is up, described by gauge value of 0 or 1.",
}, []string{"service", "url"})
serviceReqsBytesTotal := newCounterFrom(stdprometheus.CounterOpts{
Name: serviceReqsBytesTotalName,
Help: "The total size of requests in bytes received by a service, partitioned by status code, protocol, and method.",
}, []string{"code", "method", "protocol", "service"})
serviceRespsBytesTotal := newCounterFrom(stdprometheus.CounterOpts{
Name: serviceRespsBytesTotalName,
Help: "The total size of responses in bytes returned by a service, partitioned by status code, protocol, and method.",
}, []string{"code", "method", "protocol", "service"})
promState.vectors = append(promState.vectors,
serviceReqs.cv,
serviceReqsTLS.cv,
serviceReqDurations.hv,
serviceRetries.cv,
serviceServerUp.gv,
serviceReqsBytesTotal.cv,
serviceRespsBytesTotal.cv,
)
reg.serviceReqsCounter = serviceReqs
reg.serviceReqsTLSCounter = serviceReqsTLS
reg.serviceReqDurationHistogram, _ = NewHistogramWithScale(serviceReqDurations, time.Second)
reg.serviceRetriesCounter = serviceRetries
reg.serviceServerUpGauge = serviceServerUp
reg.serviceReqsBytesCounter = serviceReqsBytesTotal
reg.serviceRespsBytesCounter = serviceRespsBytesTotal
}
return reg
}
func registerPromState(ctx context.Context) bool {
err := promRegistry.Register(promState)
if err == nil {
return true
}
logger := log.Ctx(ctx)
var arErr stdprometheus.AlreadyRegisteredError
if errors.As(err, &arErr) {
logger.Debug().Msg("Prometheus collector already registered.")
return true
}
logger.Error().Err(err).Msg("Unable to register Traefik to Prometheus")
return false
}
// OnConfigurationUpdate receives the current configuration from Traefik.
// It then converts the configuration to the optimized package internal format
// and sets it to the promState.
func OnConfigurationUpdate(conf dynamic.Configuration, entryPoints []string) {
dynCfg := newDynamicConfig()
for _, value := range entryPoints {
dynCfg.entryPoints[value] = true
}
if conf.HTTP == nil {
promState.SetDynamicConfig(dynCfg)
return
}
for name := range conf.HTTP.Routers {
dynCfg.routers[name] = true
}
for serviceName, service := range conf.HTTP.Services {
dynCfg.services[serviceName] = make(map[string]bool)
if service.LoadBalancer != nil {
for _, server := range service.LoadBalancer.Servers {
dynCfg.services[serviceName][server.URL] = true
}
}
}
promState.SetDynamicConfig(dynCfg)
}
func newPrometheusState() *prometheusState {
return &prometheusState{
dynamicConfig: newDynamicConfig(),
deletedURLs: make(map[string][]string),
}
}
type vector interface {
stdprometheus.Collector
DeletePartialMatch(labels stdprometheus.Labels) int
}
type prometheusState struct {
vectors []vector
mtx sync.Mutex
dynamicConfig *dynamicConfig
deletedEP []string
deletedRouters []string
deletedServices []string
deletedURLs map[string][]string
}
func (ps *prometheusState) SetDynamicConfig(dynamicConfig *dynamicConfig) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
for ep := range ps.dynamicConfig.entryPoints {
if _, ok := dynamicConfig.entryPoints[ep]; !ok {
ps.deletedEP = append(ps.deletedEP, ep)
}
}
for router := range ps.dynamicConfig.routers {
if _, ok := dynamicConfig.routers[router]; !ok {
ps.deletedRouters = append(ps.deletedRouters, router)
}
}
for service, serV := range ps.dynamicConfig.services {
actualService, ok := dynamicConfig.services[service]
if !ok {
ps.deletedServices = append(ps.deletedServices, service)
}
for url := range serV {
if _, ok := actualService[url]; !ok {
ps.deletedURLs[service] = append(ps.deletedURLs[service], url)
}
}
}
ps.dynamicConfig = dynamicConfig
}
// Describe implements prometheus.Collector and simply calls
// the registered describer functions.
func (ps *prometheusState) Describe(ch chan<- *stdprometheus.Desc) {
for _, v := range ps.vectors {
v.Describe(ch)
}
}
// Collect implements prometheus.Collector. It calls the Collect
// method of all metrics it received on the collectors channel.
// It's also responsible to remove metrics that belong to an outdated configuration.
// The removal happens only after their Collect method was called to ensure that
// also those metrics will be exported on the current scrape.
func (ps *prometheusState) Collect(ch chan<- stdprometheus.Metric) {
for _, v := range ps.vectors {
v.Collect(ch)
}
ps.mtx.Lock()
defer ps.mtx.Unlock()
for _, ep := range ps.deletedEP {
if !ps.dynamicConfig.hasEntryPoint(ep) {
ps.DeletePartialMatch(map[string]string{"entrypoint": ep})
}
}
for _, router := range ps.deletedRouters {
if !ps.dynamicConfig.hasRouter(router) {
ps.DeletePartialMatch(map[string]string{"router": router})
}
}
for _, service := range ps.deletedServices {
if !ps.dynamicConfig.hasService(service) {
ps.DeletePartialMatch(map[string]string{"service": service})
}
}
for service, urls := range ps.deletedURLs {
for _, url := range urls {
if !ps.dynamicConfig.hasServerURL(service, url) {
ps.DeletePartialMatch(map[string]string{"service": service, "url": url})
}
}
}
ps.deletedEP = nil
ps.deletedRouters = nil
ps.deletedServices = nil
ps.deletedURLs = make(map[string][]string)
}
// DeletePartialMatch deletes all metrics where the variable labels contain all of those passed in as labels.
// The order of the labels does not matter.
// It returns the number of metrics deleted.
func (ps *prometheusState) DeletePartialMatch(labels stdprometheus.Labels) int {
var count int
for _, elem := range ps.vectors {
count += elem.DeletePartialMatch(labels)
}
return count
}
func newDynamicConfig() *dynamicConfig {
return &dynamicConfig{
entryPoints: make(map[string]bool),
routers: make(map[string]bool),
services: make(map[string]map[string]bool),
}
}
// dynamicConfig holds the current configuration for entryPoints, services,
// and server URLs in an optimized way to check for existence. This provides
// a performant way to check whether the collected metrics belong to the
// current configuration or to an outdated one.
type dynamicConfig struct {
entryPoints map[string]bool
routers map[string]bool
services map[string]map[string]bool
}
func (d *dynamicConfig) hasEntryPoint(entrypointName string) bool {
_, ok := d.entryPoints[entrypointName]
return ok
}
func (d *dynamicConfig) hasService(serviceName string) bool {
_, ok := d.services[serviceName]
return ok
}
func (d *dynamicConfig) hasRouter(routerName string) bool {
_, ok := d.routers[routerName]
return ok
}
func (d *dynamicConfig) hasServerURL(serviceName, serverURL string) bool {
if service, hasService := d.services[serviceName]; hasService {
_, ok := service[serverURL]
return ok
}
return false
}
func newCounterWithHeadersFrom(opts stdprometheus.CounterOpts, headers map[string]string, labelNames []string) *counterWithHeaders {
var headerLabels []string
for k := range headers {
headerLabels = append(headerLabels, k)
}
cv := stdprometheus.NewCounterVec(opts, append(labelNames, headerLabels...))
c := &counterWithHeaders{
name: opts.Name,
headers: headers,
cv: cv,
}
if len(labelNames) == 0 && len(headerLabels) == 0 {
c.collector = cv.WithLabelValues()
c.Add(0)
}
return c
}
type counterWithHeaders struct {
name string
cv *stdprometheus.CounterVec
labelNamesValues labelNamesValues
headers map[string]string
collector stdprometheus.Counter
}
func (c *counterWithHeaders) With(headers http.Header, labelValues ...string) CounterWithHeaders {
for headerLabel, headerKey := range c.headers {
labelValues = append(labelValues, headerLabel, headers.Get(headerKey))
}
lnv := c.labelNamesValues.With(labelValues...)
return &counterWithHeaders{
name: c.name,
headers: c.headers,
cv: c.cv,
labelNamesValues: lnv,
collector: c.cv.With(lnv.ToLabels()),
}
}
func (c *counterWithHeaders) Add(delta float64) {
c.collector.Add(delta)
}
func (c *counterWithHeaders) Describe(ch chan<- *stdprometheus.Desc) {
c.cv.Describe(ch)
}
func newCounterFrom(opts stdprometheus.CounterOpts, labelNames []string) *counter {
cv := stdprometheus.NewCounterVec(opts, labelNames)
c := &counter{
name: opts.Name,
cv: cv,
}
if len(labelNames) == 0 {
c.collector = cv.WithLabelValues()
c.Add(0)
}
return c
}
type counter struct {
name string
cv *stdprometheus.CounterVec
labelNamesValues labelNamesValues
collector stdprometheus.Counter
}
func (c *counter) With(labelValues ...string) metrics.Counter {
lnv := c.labelNamesValues.With(labelValues...)
return &counter{
name: c.name,
cv: c.cv,
labelNamesValues: lnv,
collector: c.cv.With(lnv.ToLabels()),
}
}
func (c *counter) Add(delta float64) {
c.collector.Add(delta)
}
func (c *counter) Describe(ch chan<- *stdprometheus.Desc) {
c.cv.Describe(ch)
}
func newGaugeFrom(opts stdprometheus.GaugeOpts, labelNames []string) *gauge {
gv := stdprometheus.NewGaugeVec(opts, labelNames)
g := &gauge{
name: opts.Name,
gv: gv,
}
if len(labelNames) == 0 {
g.collector = gv.WithLabelValues()
g.Set(0)
}
return g
}
type gauge struct {
name string
gv *stdprometheus.GaugeVec
labelNamesValues labelNamesValues
collector stdprometheus.Gauge
}
func (g *gauge) With(labelValues ...string) metrics.Gauge {
lnv := g.labelNamesValues.With(labelValues...)
return &gauge{
name: g.name,
gv: g.gv,
labelNamesValues: lnv,
collector: g.gv.With(lnv.ToLabels()),
}
}
func (g *gauge) Add(delta float64) {
g.collector.Add(delta)
}
func (g *gauge) Set(value float64) {
g.collector.Set(value)
}
func (g *gauge) Describe(ch chan<- *stdprometheus.Desc) {
g.gv.Describe(ch)
}
func newHistogramFrom(opts stdprometheus.HistogramOpts, labelNames []string) *histogram {
hv := stdprometheus.NewHistogramVec(opts, labelNames)
return &histogram{
name: opts.Name,
hv: hv,
}
}
type histogram struct {
name string
hv *stdprometheus.HistogramVec
labelNamesValues labelNamesValues
collector stdprometheus.Observer
}
func (h *histogram) With(labelValues ...string) metrics.Histogram {
lnv := h.labelNamesValues.With(labelValues...)
return &histogram{
name: h.name,
hv: h.hv,
labelNamesValues: lnv,
collector: h.hv.With(lnv.ToLabels()),
}
}
func (h *histogram) Observe(value float64) {
h.collector.Observe(value)
}
func (h *histogram) Describe(ch chan<- *stdprometheus.Desc) {
h.hv.Describe(ch)
}
// labelNamesValues is a type alias that provides validation on its With method.
// Metrics may include it as a member to help them satisfy With semantics and
// save some code duplication.
type labelNamesValues []string
// With validates the input, and returns a new aggregate labelNamesValues.
func (lvs labelNamesValues) With(labelValues ...string) labelNamesValues {
if len(labelValues)%2 != 0 {
labelValues = append(labelValues, "unknown")
}
labels := make([]string, len(lvs)+len(labelValues))
n := copy(labels, lvs)
copy(labels[n:], labelValues)
return labels
}
// ToLabels is a convenience method to convert a labelNamesValues
// to the native prometheus.Labels.
func (lvs labelNamesValues) ToLabels() stdprometheus.Labels {
labels := make(map[string]string, len(lvs)/2)
for i := 0; i < len(lvs); i += 2 {
labels[lvs[i]] = lvs[i+1]
}
return labels
}

View file

@ -0,0 +1,740 @@
package metrics
import (
"fmt"
"net/http"
"strconv"
"testing"
"time"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
"github.com/traefik/traefik/v3/pkg/config/dynamic"
otypes "github.com/traefik/traefik/v3/pkg/observability/types"
th "github.com/traefik/traefik/v3/pkg/testhelpers"
)
func TestRegisterPromState(t *testing.T) {
t.Cleanup(promState.reset)
testCases := []struct {
desc string
prometheusSlice []*otypes.Prometheus
initPromState bool
unregisterPromState bool
expectedNbRegistries int
}{
{
desc: "Register once",
prometheusSlice: []*otypes.Prometheus{{}},
initPromState: true,
unregisterPromState: false,
expectedNbRegistries: 1,
},
{
desc: "Register once with no promState init",
prometheusSlice: []*otypes.Prometheus{{}},
initPromState: false,
unregisterPromState: false,
expectedNbRegistries: 1,
},
{
desc: "Register twice",
prometheusSlice: []*otypes.Prometheus{{}, {}},
initPromState: true,
unregisterPromState: false,
expectedNbRegistries: 2,
},
{
desc: "Register twice with no promstate init",
prometheusSlice: []*otypes.Prometheus{{}, {}},
initPromState: false,
unregisterPromState: false,
expectedNbRegistries: 2,
},
{
desc: "Register twice with unregister",
prometheusSlice: []*otypes.Prometheus{{}, {}},
initPromState: true,
unregisterPromState: true,
expectedNbRegistries: 2,
},
}
for _, test := range testCases {
t.Run(test.desc, func(t *testing.T) {
actualNbRegistries := 0
for _, prom := range test.prometheusSlice {
if test.initPromState {
initStandardRegistry(prom)
}
if registerPromState(t.Context()) {
actualNbRegistries++
}
if test.unregisterPromState {
promRegistry.Unregister(promState)
}
promState.reset()
}
promRegistry.Unregister(promState)
assert.Equal(t, test.expectedNbRegistries, actualNbRegistries)
})
}
}
func TestPrometheus(t *testing.T) {
promState = newPrometheusState()
promRegistry = prometheus.NewRegistry()
t.Cleanup(promState.reset)
prometheusRegistry := RegisterPrometheus(t.Context(), &otypes.Prometheus{
AddEntryPointsLabels: true,
AddRoutersLabels: true,
AddServicesLabels: true,
HeaderLabels: map[string]string{"useragent": "User-Agent"},
})
defer promRegistry.Unregister(promState)
if !prometheusRegistry.IsEpEnabled() || !prometheusRegistry.IsRouterEnabled() || !prometheusRegistry.IsSvcEnabled() {
t.Errorf("PrometheusRegistry should return true for IsEnabled(), IsRouterEnabled() and IsSvcEnabled()")
}
prometheusRegistry.ConfigReloadsCounter().Add(1)
prometheusRegistry.LastConfigReloadSuccessGauge().Set(float64(time.Now().Unix()))
prometheusRegistry.
OpenConnectionsGauge().
With("entrypoint", "test", "protocol", "TCP").
Set(1)
prometheusRegistry.
TLSCertsNotAfterTimestampGauge().
With("cn", "value", "serial", "value", "sans", "value").
Set(float64(time.Now().Unix()))
prometheusRegistry.
EntryPointReqsCounter().
With(map[string][]string{"User-Agent": {"foobar"}}, "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet, "protocol", "http", "entrypoint", "http").
Add(1)
prometheusRegistry.
EntryPointReqDurationHistogram().
With("code", strconv.Itoa(http.StatusOK), "method", http.MethodGet, "protocol", "http", "entrypoint", "http").
Observe(1)
prometheusRegistry.
EntryPointRespsBytesCounter().
With("code", strconv.Itoa(http.StatusOK), "method", http.MethodGet, "protocol", "http", "entrypoint", "http").
Add(1)
prometheusRegistry.
EntryPointReqsBytesCounter().
With("code", strconv.Itoa(http.StatusOK), "method", http.MethodGet, "protocol", "http", "entrypoint", "http").
Add(1)
prometheusRegistry.
RouterReqsCounter().
With(nil, "router", "demo", "service", "service1", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet, "protocol", "http").
Add(1)
prometheusRegistry.
RouterReqsTLSCounter().
With("router", "demo", "service", "service1", "tls_version", "foo", "tls_cipher", "bar").
Add(1)
prometheusRegistry.
RouterReqDurationHistogram().
With("router", "demo", "service", "service1", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet, "protocol", "http").
Observe(10000)
prometheusRegistry.
RouterRespsBytesCounter().
With("router", "demo", "service", "service1", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet, "protocol", "http").
Add(1)
prometheusRegistry.
RouterReqsBytesCounter().
With("router", "demo", "service", "service1", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet, "protocol", "http").
Add(1)
prometheusRegistry.
ServiceReqsCounter().
With(map[string][]string{"User-Agent": {"foobar"}}, "service", "service1", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet, "protocol", "http").
Add(1)
prometheusRegistry.
ServiceReqsTLSCounter().
With("service", "service1", "tls_version", "foo", "tls_cipher", "bar").
Add(1)
prometheusRegistry.
ServiceReqDurationHistogram().
With("service", "service1", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet, "protocol", "http").
Observe(10000)
prometheusRegistry.
ServiceRetriesCounter().
With("service", "service1").
Add(1)
prometheusRegistry.
ServiceServerUpGauge().
With("service", "service1", "url", "http://127.0.0.10:80").
Set(1)
prometheusRegistry.
ServiceRespsBytesCounter().
With("service", "service1", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet, "protocol", "http").
Add(1)
prometheusRegistry.
ServiceReqsBytesCounter().
With("service", "service1", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet, "protocol", "http").
Add(1)
delayForTrackingCompletion()
metricsFamilies := mustScrape()
testCases := []struct {
name string
labels map[string]string
assert func(*dto.MetricFamily)
}{
{
name: configReloadsTotalName,
assert: buildCounterAssert(t, configReloadsTotalName, 1),
},
{
name: configLastReloadSuccessName,
assert: buildTimestampAssert(t, configLastReloadSuccessName),
},
{
name: openConnectionsName,
labels: map[string]string{
"protocol": "TCP",
"entrypoint": "test",
},
assert: buildGaugeAssert(t, openConnectionsName, 1),
},
{
name: tlsCertsNotAfterTimestampName,
labels: map[string]string{
"cn": "value",
"serial": "value",
"sans": "value",
},
assert: buildTimestampAssert(t, tlsCertsNotAfterTimestampName),
},
{
name: entryPointReqsTotalName,
labels: map[string]string{
"code": "200",
"method": http.MethodGet,
"protocol": "http",
"entrypoint": "http",
"useragent": "foobar",
},
assert: buildCounterAssert(t, entryPointReqsTotalName, 1),
},
{
name: entryPointReqDurationName,
labels: map[string]string{
"code": "200",
"method": http.MethodGet,
"protocol": "http",
"entrypoint": "http",
},
assert: buildHistogramAssert(t, entryPointReqDurationName, 1),
},
{
name: entryPointReqsBytesTotalName,
labels: map[string]string{
"code": "200",
"method": http.MethodGet,
"protocol": "http",
"entrypoint": "http",
},
assert: buildCounterAssert(t, entryPointReqsBytesTotalName, 1),
},
{
name: entryPointRespsBytesTotalName,
labels: map[string]string{
"code": "200",
"method": http.MethodGet,
"protocol": "http",
"entrypoint": "http",
},
assert: buildCounterAssert(t, entryPointRespsBytesTotalName, 1),
},
{
name: routerReqsTotalName,
labels: map[string]string{
"code": "200",
"method": http.MethodGet,
"protocol": "http",
"service": "service1",
"router": "demo",
"useragent": "",
},
assert: buildCounterAssert(t, routerReqsTotalName, 1),
},
{
name: routerReqsTLSTotalName,
labels: map[string]string{
"service": "service1",
"router": "demo",
"tls_version": "foo",
"tls_cipher": "bar",
},
assert: buildCounterAssert(t, routerReqsTLSTotalName, 1),
},
{
name: routerReqDurationName,
labels: map[string]string{
"code": "200",
"method": http.MethodGet,
"protocol": "http",
"service": "service1",
"router": "demo",
},
assert: buildHistogramAssert(t, routerReqDurationName, 1),
},
{
name: routerReqsBytesTotalName,
labels: map[string]string{
"code": "200",
"method": http.MethodGet,
"protocol": "http",
"service": "service1",
"router": "demo",
},
assert: buildCounterAssert(t, routerReqsBytesTotalName, 1),
},
{
name: routerRespsBytesTotalName,
labels: map[string]string{
"code": "200",
"method": http.MethodGet,
"protocol": "http",
"service": "service1",
"router": "demo",
},
assert: buildCounterAssert(t, routerRespsBytesTotalName, 1),
},
{
name: serviceReqsTotalName,
labels: map[string]string{
"code": "200",
"method": http.MethodGet,
"protocol": "http",
"service": "service1",
"useragent": "foobar",
},
assert: buildCounterAssert(t, serviceReqsTotalName, 1),
},
{
name: serviceReqsTLSTotalName,
labels: map[string]string{
"service": "service1",
"tls_version": "foo",
"tls_cipher": "bar",
},
assert: buildCounterAssert(t, serviceReqsTLSTotalName, 1),
},
{
name: serviceReqDurationName,
labels: map[string]string{
"code": "200",
"method": http.MethodGet,
"protocol": "http",
"service": "service1",
},
assert: buildHistogramAssert(t, serviceReqDurationName, 1),
},
{
name: serviceRetriesTotalName,
labels: map[string]string{
"service": "service1",
},
assert: buildGreaterThanCounterAssert(t, serviceRetriesTotalName, 1),
},
{
name: serviceServerUpName,
labels: map[string]string{
"service": "service1",
"url": "http://127.0.0.10:80",
},
assert: buildGaugeAssert(t, serviceServerUpName, 1),
},
{
name: serviceReqsBytesTotalName,
labels: map[string]string{
"code": "200",
"method": http.MethodGet,
"protocol": "http",
"service": "service1",
},
assert: buildCounterAssert(t, serviceReqsBytesTotalName, 1),
},
{
name: serviceRespsBytesTotalName,
labels: map[string]string{
"code": "200",
"method": http.MethodGet,
"protocol": "http",
"service": "service1",
},
assert: buildCounterAssert(t, serviceRespsBytesTotalName, 1),
},
}
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
family := findMetricFamily(test.name, metricsFamilies)
if family == nil {
t.Errorf("gathered metrics do not contain %q", test.name)
return
}
for _, label := range family.GetMetric()[0].GetLabel() {
val, ok := test.labels[label.GetName()]
if !ok {
t.Errorf("%q metric contains unexpected label %q", test.name, label.GetName())
} else if val != label.GetValue() {
t.Errorf("label %q in metric %q has wrong value %q, expected %q", label.GetName(), test.name, label.GetValue(), val)
}
}
test.assert(family)
})
}
}
func TestPrometheusMetricRemoval(t *testing.T) {
promState = newPrometheusState()
promRegistry = prometheus.NewRegistry()
t.Cleanup(promState.reset)
prometheusRegistry := RegisterPrometheus(t.Context(), &otypes.Prometheus{AddEntryPointsLabels: true, AddServicesLabels: true, AddRoutersLabels: true})
defer promRegistry.Unregister(promState)
conf1 := dynamic.Configuration{
HTTP: th.BuildConfiguration(
th.WithRouters(
th.WithRouter("foo@providerName", th.WithServiceName("bar")),
th.WithRouter("router2", th.WithServiceName("bar@providerName")),
),
th.WithLoadBalancerServices(
th.WithService("bar@providerName", th.WithServers(
th.WithServer("http://localhost:9000"),
th.WithServer("http://localhost:9999"),
th.WithServer("http://localhost:9998"),
)),
th.WithService("service1", th.WithServers(th.WithServer("http://localhost:9000"))),
),
),
}
conf2 := dynamic.Configuration{
HTTP: th.BuildConfiguration(
th.WithRouters(
th.WithRouter("foo@providerName", th.WithServiceName("bar")),
),
th.WithLoadBalancerServices(
th.WithService("bar@providerName", th.WithServers(th.WithServer("http://localhost:9000"))),
),
),
}
OnConfigurationUpdate(conf1, []string{"entrypoint1", "entrypoint2"})
OnConfigurationUpdate(conf2, []string{"entrypoint1"})
// Register some metrics manually that are not part of the active configuration.
// Those metrics should be part of the /metrics output on the first scrape but
// should be removed after that scrape.
prometheusRegistry.
EntryPointReqsCounter().
With(nil, "entrypoint", "entrypoint2", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet, "protocol", "http").
Add(1)
prometheusRegistry.
RouterReqsCounter().
With(nil, "router", "router2", "service", "bar@providerName", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet, "protocol", "http").
Add(1)
prometheusRegistry.
ServiceReqsCounter().
With(nil, "service", "service1", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet, "protocol", "http").
Add(1)
prometheusRegistry.
ServiceServerUpGauge().
With("service", "bar@providerName", "url", "http://localhost:9999").
Set(1)
prometheusRegistry.
ServiceServerUpGauge().
With("service", "bar@providerName", "url", "http://localhost:9998").
Set(1)
assertMetricsExist(t, mustScrape(), entryPointReqsTotalName, routerReqsTotalName, serviceReqsTotalName, serviceServerUpName)
assertMetricsAbsent(t, mustScrape(), entryPointReqsTotalName, routerReqsTotalName, serviceReqsTotalName, serviceServerUpName)
// To verify that metrics belonging to active configurations are not removed
// here the counter examples.
prometheusRegistry.
EntryPointReqsCounter().
With(nil, "entrypoint", "entrypoint1", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet, "protocol", "http").
Add(1)
prometheusRegistry.
RouterReqsCounter().
With(nil, "router", "foo@providerName", "service", "bar", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet, "protocol", "http").
Add(1)
prometheusRegistry.
ServiceReqsCounter().
With(nil, "service", "bar@providerName", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet, "protocol", "http").
Add(1)
prometheusRegistry.
ServiceServerUpGauge().
With("service", "bar@providerName", "url", "http://localhost:9000").
Set(1)
delayForTrackingCompletion()
assertMetricsExist(t, mustScrape(), entryPointReqsTotalName, serviceReqsTotalName, serviceServerUpName, routerReqsTotalName)
assertMetricsExist(t, mustScrape(), entryPointReqsTotalName, serviceReqsTotalName, serviceServerUpName, routerReqsTotalName)
}
func TestPrometheusMetricRemoveEndpointForRecoveredService(t *testing.T) {
promState = newPrometheusState()
promRegistry = prometheus.NewRegistry()
t.Cleanup(promState.reset)
prometheusRegistry := RegisterPrometheus(t.Context(), &otypes.Prometheus{AddServicesLabels: true})
defer promRegistry.Unregister(promState)
conf1 := dynamic.Configuration{
HTTP: th.BuildConfiguration(
th.WithLoadBalancerServices(
th.WithService("service1", th.WithServers(th.WithServer("http://localhost:9000"))),
),
),
}
conf2 := dynamic.Configuration{
HTTP: th.BuildConfiguration(),
}
conf3 := dynamic.Configuration{
HTTP: th.BuildConfiguration(
th.WithLoadBalancerServices(
th.WithService("service1", th.WithServers(th.WithServer("http://localhost:9001"))),
),
),
}
OnConfigurationUpdate(conf1, []string{})
OnConfigurationUpdate(conf2, []string{})
OnConfigurationUpdate(conf3, []string{})
prometheusRegistry.
ServiceServerUpGauge().
With("service", "service1", "url", "http://localhost:9000").
Add(1)
assertMetricsExist(t, mustScrape(), serviceServerUpName)
assertMetricsAbsent(t, mustScrape(), serviceServerUpName)
}
func TestPrometheusRemovedMetricsReset(t *testing.T) {
t.Cleanup(promState.reset)
prometheusRegistry := RegisterPrometheus(t.Context(), &otypes.Prometheus{AddEntryPointsLabels: true, AddServicesLabels: true})
defer promRegistry.Unregister(promState)
conf1 := dynamic.Configuration{
HTTP: th.BuildConfiguration(
th.WithLoadBalancerServices(th.WithService("service",
th.WithServers(th.WithServer("http://localhost:9000"))),
),
),
}
OnConfigurationUpdate(conf1, []string{"entrypoint1", "entrypoint2"})
OnConfigurationUpdate(dynamic.Configuration{}, nil)
labelNamesValues := []string{
"service", "service",
"code", strconv.Itoa(http.StatusOK),
"method", http.MethodGet,
"protocol", "http",
}
prometheusRegistry.
ServiceReqsCounter().
With(nil, labelNamesValues...).
Add(3)
delayForTrackingCompletion()
metricsFamilies := mustScrape()
assertCounterValue(t, 3, findMetricFamily(serviceReqsTotalName, metricsFamilies), labelNamesValues...)
// There is no dynamic configuration and so this metric will be deleted
// after the first scrape.
assertMetricsAbsent(t, mustScrape(), serviceReqsTotalName)
prometheusRegistry.
ServiceReqsCounter().
With(nil, labelNamesValues...).
Add(1)
delayForTrackingCompletion()
metricsFamilies = mustScrape()
assertCounterValue(t, 1, findMetricFamily(serviceReqsTotalName, metricsFamilies), labelNamesValues...)
}
// reset is a utility method for unit testing.
// It should be called after each test run that changes promState internally
// in order to avoid dependencies between unit tests.
func (ps *prometheusState) reset() {
ps.dynamicConfig = newDynamicConfig()
ps.vectors = nil
ps.deletedEP = nil
ps.deletedRouters = nil
ps.deletedServices = nil
ps.deletedURLs = make(map[string][]string)
}
// Tracking and gathering the metrics happens concurrently.
// In practice this is no problem, because in case a tracked metric would miss the current scrape,
// it would just be there in the next one.
// That we can test reliably the tracking of all metrics here,
// we sleep for a short amount of time,
// to make sure the metric will be present in the next scrape.
func delayForTrackingCompletion() {
time.Sleep(250 * time.Millisecond)
}
func mustScrape() []*dto.MetricFamily {
families, err := promRegistry.Gather()
if err != nil {
panic(fmt.Sprintf("could not gather metrics families: %s", err))
}
return families
}
func assertMetricsExist(t *testing.T, families []*dto.MetricFamily, metricNames ...string) {
t.Helper()
for _, metricName := range metricNames {
if findMetricFamily(metricName, families) == nil {
t.Errorf("gathered metrics should contain %q", metricName)
}
}
}
func assertMetricsAbsent(t *testing.T, families []*dto.MetricFamily, metricNames ...string) {
t.Helper()
for _, metricName := range metricNames {
if findMetricFamily(metricName, families) != nil {
t.Errorf("gathered metrics should not contain %q", metricName)
}
}
}
func findMetricFamily(name string, families []*dto.MetricFamily) *dto.MetricFamily {
for _, family := range families {
if family.GetName() == name {
return family
}
}
return nil
}
func findMetricByLabelNamesValues(family *dto.MetricFamily, labelNamesValues ...string) *dto.Metric {
if family == nil {
return nil
}
for _, metric := range family.GetMetric() {
if hasMetricAllLabelPairs(metric, labelNamesValues...) {
return metric
}
}
return nil
}
func hasMetricAllLabelPairs(metric *dto.Metric, labelNamesValues ...string) bool {
for i := 0; i < len(labelNamesValues); i += 2 {
name, val := labelNamesValues[i], labelNamesValues[i+1]
if !hasMetricLabelPair(metric, name, val) {
return false
}
}
return true
}
func hasMetricLabelPair(metric *dto.Metric, labelName, labelValue string) bool {
for _, labelPair := range metric.GetLabel() {
if labelPair.GetName() == labelName && labelPair.GetValue() == labelValue {
return true
}
}
return false
}
func assertCounterValue(t *testing.T, want float64, family *dto.MetricFamily, labelNamesValues ...string) {
t.Helper()
metric := findMetricByLabelNamesValues(family, labelNamesValues...)
if metric == nil {
t.Error("metric must not be nil")
return
}
if metric.GetCounter() == nil {
t.Errorf("metric %s must be a counter", family.GetName())
return
}
if cv := metric.GetCounter().GetValue(); cv != want {
t.Errorf("metric %s has value %v, want %v", family.GetName(), cv, want)
}
}
func buildCounterAssert(t *testing.T, metricName string, expectedValue int) func(family *dto.MetricFamily) {
t.Helper()
return func(family *dto.MetricFamily) {
if cv := int(family.GetMetric()[0].GetCounter().GetValue()); cv != expectedValue {
t.Errorf("metric %s has value %d, want %d", metricName, cv, expectedValue)
}
}
}
func buildGreaterThanCounterAssert(t *testing.T, metricName string, expectedMinValue int) func(family *dto.MetricFamily) {
t.Helper()
return func(family *dto.MetricFamily) {
if cv := int(family.GetMetric()[0].GetCounter().GetValue()); cv < expectedMinValue {
t.Errorf("metric %s has value %d, want at least %d", metricName, cv, expectedMinValue)
}
}
}
func buildHistogramAssert(t *testing.T, metricName string, expectedSampleCount int) func(family *dto.MetricFamily) {
t.Helper()
return func(family *dto.MetricFamily) {
if sc := int(family.GetMetric()[0].GetHistogram().GetSampleCount()); sc != expectedSampleCount {
t.Errorf("metric %s has sample count value %d, want %d", metricName, sc, expectedSampleCount)
}
}
}
func buildGaugeAssert(t *testing.T, metricName string, expectedValue int) func(family *dto.MetricFamily) {
t.Helper()
return func(family *dto.MetricFamily) {
if gv := int(family.GetMetric()[0].GetGauge().GetValue()); gv != expectedValue {
t.Errorf("metric %s has value %d, want %d", metricName, gv, expectedValue)
}
}
}
func buildTimestampAssert(t *testing.T, metricName string) func(family *dto.MetricFamily) {
t.Helper()
return func(family *dto.MetricFamily) {
if ts := time.Unix(int64(family.GetMetric()[0].GetGauge().GetValue()), 0); time.Since(ts) > time.Minute {
t.Errorf("metric %s has wrong timestamp %v", metricName, ts)
}
}
}

View file

@ -0,0 +1,121 @@
package metrics
import (
"context"
"time"
"github.com/go-kit/kit/metrics/statsd"
"github.com/rs/zerolog/log"
"github.com/traefik/traefik/v3/pkg/observability/logs"
otypes "github.com/traefik/traefik/v3/pkg/observability/types"
"github.com/traefik/traefik/v3/pkg/safe"
)
var (
statsdClient *statsd.Statsd
statsdTicker *time.Ticker
)
const (
statsdConfigReloadsName = "config.reload.total"
statsdLastConfigReloadSuccessName = "config.reload.lastSuccessTimestamp"
statsdOpenConnectionsName = "open.connections"
statsdTLSCertsNotAfterTimestampName = "tls.certs.notAfterTimestamp"
statsdEntryPointReqsName = "entrypoint.request.total"
statsdEntryPointReqsTLSName = "entrypoint.request.tls.total"
statsdEntryPointReqDurationName = "entrypoint.request.duration"
statsdEntryPointReqsBytesName = "entrypoint.requests.bytes.total"
statsdEntryPointRespsBytesName = "entrypoint.responses.bytes.total"
statsdRouterReqsName = "router.request.total"
statsdRouterReqsTLSName = "router.request.tls.total"
statsdRouterReqsDurationName = "router.request.duration"
statsdRouterReqsBytesName = "router.requests.bytes.total"
statsdRouterRespsBytesName = "router.responses.bytes.total"
statsdServiceReqsName = "service.request.total"
statsdServiceReqsTLSName = "service.request.tls.total"
statsdServiceReqsDurationName = "service.request.duration"
statsdServiceRetriesTotalName = "service.retries.total"
statsdServiceServerUpName = "service.server.up"
statsdServiceReqsBytesName = "service.requests.bytes.total"
statsdServiceRespsBytesName = "service.responses.bytes.total"
)
// RegisterStatsd registers the metrics pusher if this didn't happen yet and creates a statsd Registry instance.
func RegisterStatsd(ctx context.Context, config *otypes.Statsd) Registry {
// just to be sure there is a prefix defined
if config.Prefix == "" {
config.Prefix = defaultMetricsPrefix
}
statsdClient = statsd.New(config.Prefix+".", logs.NewGoKitWrapper(*log.Ctx(ctx)))
if statsdTicker == nil {
statsdTicker = initStatsdTicker(ctx, config)
}
registry := &standardRegistry{
configReloadsCounter: statsdClient.NewCounter(statsdConfigReloadsName, 1.0),
lastConfigReloadSuccessGauge: statsdClient.NewGauge(statsdLastConfigReloadSuccessName),
tlsCertsNotAfterTimestampGauge: statsdClient.NewGauge(statsdTLSCertsNotAfterTimestampName),
openConnectionsGauge: statsdClient.NewGauge(statsdOpenConnectionsName),
}
if config.AddEntryPointsLabels {
registry.epEnabled = config.AddEntryPointsLabels
registry.entryPointReqsCounter = NewCounterWithNoopHeaders(statsdClient.NewCounter(statsdEntryPointReqsName, 1.0))
registry.entryPointReqsTLSCounter = statsdClient.NewCounter(statsdEntryPointReqsTLSName, 1.0)
registry.entryPointReqDurationHistogram, _ = NewHistogramWithScale(statsdClient.NewTiming(statsdEntryPointReqDurationName, 1.0), time.Millisecond)
registry.entryPointReqsBytesCounter = statsdClient.NewCounter(statsdEntryPointReqsBytesName, 1.0)
registry.entryPointRespsBytesCounter = statsdClient.NewCounter(statsdEntryPointRespsBytesName, 1.0)
}
if config.AddRoutersLabels {
registry.routerEnabled = config.AddRoutersLabels
registry.routerReqsCounter = NewCounterWithNoopHeaders(statsdClient.NewCounter(statsdRouterReqsName, 1.0))
registry.routerReqsTLSCounter = statsdClient.NewCounter(statsdRouterReqsTLSName, 1.0)
registry.routerReqDurationHistogram, _ = NewHistogramWithScale(statsdClient.NewTiming(statsdRouterReqsDurationName, 1.0), time.Millisecond)
registry.routerReqsBytesCounter = statsdClient.NewCounter(statsdRouterReqsBytesName, 1.0)
registry.routerRespsBytesCounter = statsdClient.NewCounter(statsdRouterRespsBytesName, 1.0)
}
if config.AddServicesLabels {
registry.svcEnabled = config.AddServicesLabels
registry.serviceReqsCounter = NewCounterWithNoopHeaders(statsdClient.NewCounter(statsdServiceReqsName, 1.0))
registry.serviceReqsTLSCounter = statsdClient.NewCounter(statsdServiceReqsTLSName, 1.0)
registry.serviceReqDurationHistogram, _ = NewHistogramWithScale(statsdClient.NewTiming(statsdServiceReqsDurationName, 1.0), time.Millisecond)
registry.serviceRetriesCounter = statsdClient.NewCounter(statsdServiceRetriesTotalName, 1.0)
registry.serviceServerUpGauge = statsdClient.NewGauge(statsdServiceServerUpName)
registry.serviceReqsBytesCounter = statsdClient.NewCounter(statsdServiceReqsBytesName, 1.0)
registry.serviceRespsBytesCounter = statsdClient.NewCounter(statsdServiceRespsBytesName, 1.0)
}
return registry
}
// initStatsdTicker initializes metrics pusher and creates a statsdClient if not created already.
func initStatsdTicker(ctx context.Context, config *otypes.Statsd) *time.Ticker {
address := config.Address
if len(address) == 0 {
address = "localhost:8125"
}
report := time.NewTicker(time.Duration(config.PushInterval))
safe.Go(func() {
statsdClient.SendLoop(ctx, report.C, "udp", address)
})
return report
}
// StopStatsd stops internal statsdTicker which controls the pushing of metrics to StatsD Agent and resets it to `nil`.
func StopStatsd() {
if statsdTicker != nil {
statsdTicker.Stop()
}
statsdTicker = nil
}

View file

@ -0,0 +1,107 @@
package metrics
import (
"net/http"
"strconv"
"testing"
"time"
"github.com/stvp/go-udp-testing"
ptypes "github.com/traefik/paerser/types"
otypes "github.com/traefik/traefik/v3/pkg/observability/types"
)
func TestStatsD(t *testing.T) {
t.Cleanup(func() {
StopStatsd()
})
udp.SetAddr(":18125")
// This is needed to make sure that UDP Listener listens for data a bit longer, otherwise it will quit after a millisecond
udp.Timeout = 5 * time.Second
statsdRegistry := RegisterStatsd(t.Context(), &otypes.Statsd{Address: ":18125", PushInterval: ptypes.Duration(time.Second), AddEntryPointsLabels: true, AddRoutersLabels: true, AddServicesLabels: true})
testRegistry(t, defaultMetricsPrefix, statsdRegistry)
}
func TestStatsDWithPrefix(t *testing.T) {
t.Cleanup(func() {
StopStatsd()
})
udp.SetAddr(":18125")
// This is needed to make sure that UDP Listener listens for data a bit longer, otherwise it will quit after a millisecond
udp.Timeout = 5 * time.Second
statsdRegistry := RegisterStatsd(t.Context(), &otypes.Statsd{Address: ":18125", PushInterval: ptypes.Duration(time.Second), AddEntryPointsLabels: true, AddRoutersLabels: true, AddServicesLabels: true, Prefix: "testPrefix"})
testRegistry(t, "testPrefix", statsdRegistry)
}
func testRegistry(t *testing.T, metricsPrefix string, registry Registry) {
t.Helper()
if !registry.IsEpEnabled() || !registry.IsRouterEnabled() || !registry.IsSvcEnabled() {
t.Errorf("Statsd registry should return true for IsEnabled(), IsRouterEnabled() and IsSvcEnabled()")
}
expected := []string{
metricsPrefix + ".config.reload.total:1.000000|c\n",
metricsPrefix + ".config.reload.lastSuccessTimestamp:1.000000|g\n",
metricsPrefix + ".open.connections:1.000000|g\n",
metricsPrefix + ".tls.certs.notAfterTimestamp:1.000000|g\n",
metricsPrefix + ".entrypoint.request.total:1.000000|c\n",
metricsPrefix + ".entrypoint.request.tls.total:1.000000|c\n",
metricsPrefix + ".entrypoint.request.duration:10000.000000|ms",
metricsPrefix + ".entrypoint.requests.bytes.total:1.000000|c\n",
metricsPrefix + ".entrypoint.responses.bytes.total:1.000000|c\n",
metricsPrefix + ".router.request.total:2.000000|c\n",
metricsPrefix + ".router.request.tls.total:1.000000|c\n",
metricsPrefix + ".router.request.duration:10000.000000|ms",
metricsPrefix + ".router.requests.bytes.total:1.000000|c\n",
metricsPrefix + ".router.responses.bytes.total:1.000000|c\n",
metricsPrefix + ".service.request.total:2.000000|c\n",
metricsPrefix + ".service.request.tls.total:1.000000|c\n",
metricsPrefix + ".service.request.duration:10000.000000|ms",
metricsPrefix + ".service.retries.total:2.000000|c\n",
metricsPrefix + ".service.server.up:1.000000|g\n",
metricsPrefix + ".service.requests.bytes.total:1.000000|c\n",
metricsPrefix + ".service.responses.bytes.total:1.000000|c\n",
}
udp.ShouldReceiveAll(t, expected, func() {
registry.ConfigReloadsCounter().Add(1)
registry.LastConfigReloadSuccessGauge().Set(1)
registry.OpenConnectionsGauge().With("entrypoint", "test", "protocol", "TCP").Set(1)
registry.TLSCertsNotAfterTimestampGauge().With("key", "value").Set(1)
registry.EntryPointReqsCounter().With(nil, "entrypoint", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1)
registry.EntryPointReqsTLSCounter().With("entrypoint", "test", "tls_version", "foo", "tls_cipher", "bar").Add(1)
registry.EntryPointReqDurationHistogram().With("entrypoint", "test").Observe(10000)
registry.EntryPointReqsBytesCounter().With("entrypoint", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1)
registry.EntryPointRespsBytesCounter().With("entrypoint", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1)
registry.RouterReqsCounter().With(nil, "router", "demo", "service", "test", "code", strconv.Itoa(http.StatusNotFound), "method", http.MethodGet).Add(1)
registry.RouterReqsCounter().With(nil, "router", "demo", "service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1)
registry.RouterReqsTLSCounter().With("router", "demo", "service", "test", "tls_version", "foo", "tls_cipher", "bar").Add(1)
registry.RouterReqDurationHistogram().With("router", "demo", "service", "test", "code", strconv.Itoa(http.StatusOK)).Observe(10000)
registry.RouterReqsBytesCounter().With("router", "demo", "service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1)
registry.RouterRespsBytesCounter().With("router", "demo", "service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1)
registry.ServiceReqsCounter().With(nil, "service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1)
registry.ServiceReqsCounter().With(nil, "service", "test", "code", strconv.Itoa(http.StatusNotFound), "method", http.MethodGet).Add(1)
registry.ServiceReqsTLSCounter().With("service", "test", "tls_version", "foo", "tls_cipher", "bar").Add(1)
registry.ServiceReqDurationHistogram().With("service", "test", "code", strconv.Itoa(http.StatusOK)).Observe(10000)
registry.ServiceRetriesCounter().With("service", "test").Add(1)
registry.ServiceRetriesCounter().With("service", "test").Add(1)
registry.ServiceServerUpGauge().With("service:test", "url", "http://127.0.0.1").Set(1)
registry.ServiceReqsBytesCounter().With("service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1)
registry.ServiceRespsBytesCounter().With("service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1)
})
}