Add OpenTelemetry tracing and metrics support
This commit is contained in:
parent
db287c4d31
commit
0d81fac3fc
19 changed files with 2199 additions and 90 deletions
|
@ -35,6 +35,7 @@ import (
|
|||
"github.com/traefik/traefik/v2/pkg/tracing/haystack"
|
||||
"github.com/traefik/traefik/v2/pkg/tracing/instana"
|
||||
"github.com/traefik/traefik/v2/pkg/tracing/jaeger"
|
||||
"github.com/traefik/traefik/v2/pkg/tracing/opentelemetry"
|
||||
"github.com/traefik/traefik/v2/pkg/tracing/zipkin"
|
||||
"github.com/traefik/traefik/v2/pkg/types"
|
||||
)
|
||||
|
@ -169,14 +170,15 @@ func (a *LifeCycle) SetDefaults() {
|
|||
|
||||
// Tracing holds the tracing configuration.
|
||||
type Tracing struct {
|
||||
ServiceName string `description:"Set the name for this service." json:"serviceName,omitempty" toml:"serviceName,omitempty" yaml:"serviceName,omitempty" export:"true"`
|
||||
SpanNameLimit int `description:"Set the maximum character limit for Span names (default 0 = no limit)." json:"spanNameLimit,omitempty" toml:"spanNameLimit,omitempty" yaml:"spanNameLimit,omitempty" export:"true"`
|
||||
Jaeger *jaeger.Config `description:"Settings for Jaeger." json:"jaeger,omitempty" toml:"jaeger,omitempty" yaml:"jaeger,omitempty" export:"true" label:"allowEmpty" file:"allowEmpty"`
|
||||
Zipkin *zipkin.Config `description:"Settings for Zipkin." json:"zipkin,omitempty" toml:"zipkin,omitempty" yaml:"zipkin,omitempty" export:"true" label:"allowEmpty" file:"allowEmpty"`
|
||||
Datadog *datadog.Config `description:"Settings for Datadog." json:"datadog,omitempty" toml:"datadog,omitempty" yaml:"datadog,omitempty" export:"true" label:"allowEmpty" file:"allowEmpty"`
|
||||
Instana *instana.Config `description:"Settings for Instana." json:"instana,omitempty" toml:"instana,omitempty" yaml:"instana,omitempty" export:"true" label:"allowEmpty" file:"allowEmpty"`
|
||||
Haystack *haystack.Config `description:"Settings for Haystack." json:"haystack,omitempty" toml:"haystack,omitempty" yaml:"haystack,omitempty" export:"true" label:"allowEmpty" file:"allowEmpty"`
|
||||
Elastic *elastic.Config `description:"Settings for Elastic." json:"elastic,omitempty" toml:"elastic,omitempty" yaml:"elastic,omitempty" export:"true" label:"allowEmpty" file:"allowEmpty"`
|
||||
ServiceName string `description:"Set the name for this service." json:"serviceName,omitempty" toml:"serviceName,omitempty" yaml:"serviceName,omitempty" export:"true"`
|
||||
SpanNameLimit int `description:"Set the maximum character limit for Span names (default 0 = no limit)." json:"spanNameLimit,omitempty" toml:"spanNameLimit,omitempty" yaml:"spanNameLimit,omitempty" export:"true"`
|
||||
Jaeger *jaeger.Config `description:"Settings for Jaeger." json:"jaeger,omitempty" toml:"jaeger,omitempty" yaml:"jaeger,omitempty" export:"true" label:"allowEmpty" file:"allowEmpty"`
|
||||
Zipkin *zipkin.Config `description:"Settings for Zipkin." json:"zipkin,omitempty" toml:"zipkin,omitempty" yaml:"zipkin,omitempty" export:"true" label:"allowEmpty" file:"allowEmpty"`
|
||||
Datadog *datadog.Config `description:"Settings for Datadog." json:"datadog,omitempty" toml:"datadog,omitempty" yaml:"datadog,omitempty" export:"true" label:"allowEmpty" file:"allowEmpty"`
|
||||
Instana *instana.Config `description:"Settings for Instana." json:"instana,omitempty" toml:"instana,omitempty" yaml:"instana,omitempty" export:"true" label:"allowEmpty" file:"allowEmpty"`
|
||||
Haystack *haystack.Config `description:"Settings for Haystack." json:"haystack,omitempty" toml:"haystack,omitempty" yaml:"haystack,omitempty" export:"true" label:"allowEmpty" file:"allowEmpty"`
|
||||
Elastic *elastic.Config `description:"Settings for Elastic." json:"elastic,omitempty" toml:"elastic,omitempty" yaml:"elastic,omitempty" export:"true" label:"allowEmpty" file:"allowEmpty"`
|
||||
OpenTelemetry *opentelemetry.Config `description:"Settings for OpenTelemetry." json:"openTelemetry,omitempty" toml:"openTelemetry,omitempty" yaml:"openTelemetry,omitempty" export:"true" label:"allowEmpty" file:"allowEmpty"`
|
||||
}
|
||||
|
||||
// SetDefaults sets the default values.
|
||||
|
|
422
pkg/metrics/opentelemetry.go
Normal file
422
pkg/metrics/opentelemetry.go
Normal file
|
@ -0,0 +1,422 @@
|
|||
package metrics
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/go-kit/kit/metrics"
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/traefik/traefik/v2/pkg/types"
|
||||
"github.com/traefik/traefik/v2/pkg/version"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
"go.opentelemetry.io/otel/metric/global"
|
||||
"go.opentelemetry.io/otel/metric/instrument"
|
||||
"go.opentelemetry.io/otel/metric/instrument/asyncfloat64"
|
||||
"go.opentelemetry.io/otel/metric/instrument/syncfloat64"
|
||||
"go.opentelemetry.io/otel/metric/unit"
|
||||
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
|
||||
"go.opentelemetry.io/otel/sdk/metric/aggregation"
|
||||
"go.opentelemetry.io/otel/sdk/metric/view"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/encoding/gzip"
|
||||
)
|
||||
|
||||
var (
|
||||
openTelemetryMeterProvider *sdkmetric.MeterProvider
|
||||
openTelemetryGaugeCollector *gaugeCollector
|
||||
)
|
||||
|
||||
// RegisterOpenTelemetry registers all OpenTelemetry metrics.
|
||||
func RegisterOpenTelemetry(ctx context.Context, config *types.OpenTelemetry) Registry {
|
||||
if openTelemetryMeterProvider == nil {
|
||||
var err error
|
||||
if openTelemetryMeterProvider, err = newOpenTelemetryMeterProvider(ctx, config); err != nil {
|
||||
log.Ctx(ctx).Err(err).Msg("Unable to create OpenTelemetry meter provider")
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
if openTelemetryGaugeCollector == nil {
|
||||
openTelemetryGaugeCollector = newOpenTelemetryGaugeCollector()
|
||||
}
|
||||
|
||||
meter := global.Meter("github.com/traefik/traefik",
|
||||
metric.WithInstrumentationVersion(version.Version))
|
||||
|
||||
reg := &standardRegistry{
|
||||
epEnabled: config.AddEntryPointsLabels,
|
||||
routerEnabled: config.AddRoutersLabels,
|
||||
svcEnabled: config.AddServicesLabels,
|
||||
configReloadsCounter: newOTLPCounterFrom(meter, configReloadsTotalName, "Config reloads"),
|
||||
configReloadsFailureCounter: newOTLPCounterFrom(meter, configReloadsFailuresTotalName, "Config reload failures"),
|
||||
lastConfigReloadSuccessGauge: newOTLPGaugeFrom(meter, configLastReloadSuccessName, "Last config reload success", unit.Milliseconds),
|
||||
lastConfigReloadFailureGauge: newOTLPGaugeFrom(meter, configLastReloadFailureName, "Last config reload failure", unit.Milliseconds),
|
||||
tlsCertsNotAfterTimestampGauge: newOTLPGaugeFrom(meter, tlsCertsNotAfterTimestamp, "Certificate expiration timestamp", unit.Milliseconds),
|
||||
}
|
||||
|
||||
if config.AddEntryPointsLabels {
|
||||
reg.entryPointReqsCounter = newOTLPCounterFrom(meter, entryPointReqsTotalName,
|
||||
"How many HTTP requests processed on an entrypoint, partitioned by status code, protocol, and method.")
|
||||
reg.entryPointReqsTLSCounter = newOTLPCounterFrom(meter, entryPointReqsTLSTotalName,
|
||||
"How many HTTP requests with TLS processed on an entrypoint, partitioned by TLS Version and TLS cipher Used.")
|
||||
reg.entryPointReqDurationHistogram, _ = NewHistogramWithScale(newOTLPHistogramFrom(meter, entryPointReqDurationName,
|
||||
"How long it took to process the request on an entrypoint, partitioned by status code, protocol, and method.",
|
||||
unit.Milliseconds), time.Second)
|
||||
reg.entryPointOpenConnsGauge = newOTLPGaugeFrom(meter, entryPointOpenConnsName,
|
||||
"How many open connections exist on an entrypoint, partitioned by method and protocol.",
|
||||
unit.Dimensionless)
|
||||
}
|
||||
|
||||
if config.AddRoutersLabels {
|
||||
reg.routerReqsCounter = newOTLPCounterFrom(meter, routerReqsTotalName,
|
||||
"How many HTTP requests are processed on a router, partitioned by service, status code, protocol, and method.")
|
||||
reg.routerReqsTLSCounter = newOTLPCounterFrom(meter, routerReqsTLSTotalName,
|
||||
"How many HTTP requests with TLS are processed on a router, partitioned by service, TLS Version, and TLS cipher Used.")
|
||||
reg.routerReqDurationHistogram, _ = NewHistogramWithScale(newOTLPHistogramFrom(meter, routerReqDurationName,
|
||||
"How long it took to process the request on a router, partitioned by service, status code, protocol, and method.",
|
||||
unit.Milliseconds), time.Second)
|
||||
reg.routerOpenConnsGauge = newOTLPGaugeFrom(meter, routerOpenConnsName,
|
||||
"How many open connections exist on a router, partitioned by service, method, and protocol.",
|
||||
unit.Dimensionless)
|
||||
}
|
||||
|
||||
if config.AddServicesLabels {
|
||||
reg.serviceReqsCounter = newOTLPCounterFrom(meter, serviceReqsTotalName,
|
||||
"How many HTTP requests processed on a service, partitioned by status code, protocol, and method.")
|
||||
reg.serviceReqsTLSCounter = newOTLPCounterFrom(meter, serviceReqsTLSTotalName,
|
||||
"How many HTTP requests with TLS processed on a service, partitioned by TLS version and TLS cipher.")
|
||||
reg.serviceReqDurationHistogram, _ = NewHistogramWithScale(newOTLPHistogramFrom(meter, serviceReqDurationName,
|
||||
"How long it took to process the request on a service, partitioned by status code, protocol, and method.",
|
||||
unit.Milliseconds), time.Second)
|
||||
reg.serviceOpenConnsGauge = newOTLPGaugeFrom(meter, serviceOpenConnsName,
|
||||
"How many open connections exist on a service, partitioned by method and protocol.",
|
||||
unit.Dimensionless)
|
||||
reg.serviceRetriesCounter = newOTLPCounterFrom(meter, serviceRetriesTotalName,
|
||||
"How many request retries happened on a service.")
|
||||
reg.serviceServerUpGauge = newOTLPGaugeFrom(meter, serviceServerUpName,
|
||||
"service server is up, described by gauge value of 0 or 1.",
|
||||
unit.Dimensionless)
|
||||
}
|
||||
|
||||
return reg
|
||||
}
|
||||
|
||||
// StopOpenTelemetry stops and resets Open-Telemetry client.
|
||||
func StopOpenTelemetry() {
|
||||
if openTelemetryMeterProvider == nil {
|
||||
return
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
if err := openTelemetryMeterProvider.Shutdown(ctx); err != nil {
|
||||
log.Err(err).Msg("Unable to shutdown OpenTelemetry meter provider")
|
||||
}
|
||||
|
||||
openTelemetryMeterProvider = nil
|
||||
}
|
||||
|
||||
// newOpenTelemetryMeterProvider creates a new controller.Controller.
|
||||
func newOpenTelemetryMeterProvider(ctx context.Context, config *types.OpenTelemetry) (*sdkmetric.MeterProvider, error) {
|
||||
var (
|
||||
exporter sdkmetric.Exporter
|
||||
err error
|
||||
)
|
||||
if config.GRPC != nil {
|
||||
exporter, err = newGRPCExporter(ctx, config)
|
||||
} else {
|
||||
exporter, err = newHTTPExporter(ctx, config)
|
||||
}
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("creating exporter: %w", err)
|
||||
}
|
||||
|
||||
opts := []sdkmetric.PeriodicReaderOption{
|
||||
sdkmetric.WithInterval(time.Duration(config.PushInterval)),
|
||||
}
|
||||
|
||||
// View to customize histogram buckets and rename a single histogram instrument.
|
||||
customBucketsView, err := view.New(
|
||||
// Match* to match instruments
|
||||
view.MatchInstrumentName("traefik_*_request_duration_seconds"),
|
||||
|
||||
view.WithSetAggregation(aggregation.ExplicitBucketHistogram{
|
||||
Boundaries: config.ExplicitBoundaries,
|
||||
}),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("creating histogram view: %w", err)
|
||||
}
|
||||
|
||||
meterProvider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(
|
||||
sdkmetric.NewPeriodicReader(exporter, opts...),
|
||||
customBucketsView,
|
||||
))
|
||||
|
||||
global.SetMeterProvider(meterProvider)
|
||||
|
||||
return meterProvider, nil
|
||||
}
|
||||
|
||||
func newHTTPExporter(ctx context.Context, config *types.OpenTelemetry) (sdkmetric.Exporter, error) {
|
||||
host, port, err := net.SplitHostPort(config.Address)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid collector address %q: %w", config.Address, err)
|
||||
}
|
||||
|
||||
opts := []otlpmetrichttp.Option{
|
||||
otlpmetrichttp.WithEndpoint(fmt.Sprintf("%s:%s", host, port)),
|
||||
otlpmetrichttp.WithHeaders(config.Headers),
|
||||
otlpmetrichttp.WithCompression(otlpmetrichttp.GzipCompression),
|
||||
}
|
||||
|
||||
if config.Insecure {
|
||||
opts = append(opts, otlpmetrichttp.WithInsecure())
|
||||
}
|
||||
|
||||
if config.Path != "" {
|
||||
opts = append(opts, otlpmetrichttp.WithURLPath(config.Path))
|
||||
}
|
||||
|
||||
if config.TLS != nil {
|
||||
tlsConfig, err := config.TLS.CreateTLSConfig(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("creating TLS client config: %w", err)
|
||||
}
|
||||
|
||||
opts = append(opts, otlpmetrichttp.WithTLSClientConfig(tlsConfig))
|
||||
}
|
||||
|
||||
return otlpmetrichttp.New(ctx, opts...)
|
||||
}
|
||||
|
||||
func newGRPCExporter(ctx context.Context, config *types.OpenTelemetry) (sdkmetric.Exporter, error) {
|
||||
host, port, err := net.SplitHostPort(config.Address)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid collector address %q: %w", config.Address, err)
|
||||
}
|
||||
|
||||
opts := []otlpmetricgrpc.Option{
|
||||
otlpmetricgrpc.WithEndpoint(fmt.Sprintf("%s:%s", host, port)),
|
||||
otlpmetricgrpc.WithHeaders(config.Headers),
|
||||
otlpmetricgrpc.WithCompressor(gzip.Name),
|
||||
}
|
||||
|
||||
if config.Insecure {
|
||||
opts = append(opts, otlpmetricgrpc.WithInsecure())
|
||||
}
|
||||
|
||||
if config.TLS != nil {
|
||||
tlsConfig, err := config.TLS.CreateTLSConfig(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("creating TLS client config: %w", err)
|
||||
}
|
||||
|
||||
opts = append(opts, otlpmetricgrpc.WithTLSCredentials(credentials.NewTLS(tlsConfig)))
|
||||
}
|
||||
|
||||
return otlpmetricgrpc.New(ctx, opts...)
|
||||
}
|
||||
|
||||
func newOTLPCounterFrom(meter metric.Meter, name, desc string) *otelCounter {
|
||||
c, _ := meter.SyncFloat64().Counter(name,
|
||||
instrument.WithDescription(desc),
|
||||
instrument.WithUnit(unit.Dimensionless),
|
||||
)
|
||||
|
||||
return &otelCounter{
|
||||
ip: c,
|
||||
}
|
||||
}
|
||||
|
||||
type otelCounter struct {
|
||||
labelNamesValues otelLabelNamesValues
|
||||
ip syncfloat64.Counter
|
||||
}
|
||||
|
||||
func (c *otelCounter) With(labelValues ...string) metrics.Counter {
|
||||
return &otelCounter{
|
||||
labelNamesValues: c.labelNamesValues.With(labelValues...),
|
||||
ip: c.ip,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *otelCounter) Add(delta float64) {
|
||||
c.ip.Add(context.Background(), delta, c.labelNamesValues.ToLabels()...)
|
||||
}
|
||||
|
||||
type gaugeValue struct {
|
||||
attributes otelLabelNamesValues
|
||||
value float64
|
||||
}
|
||||
|
||||
type gaugeCollector struct {
|
||||
mu sync.Mutex
|
||||
values map[string]map[string]gaugeValue
|
||||
}
|
||||
|
||||
func newOpenTelemetryGaugeCollector() *gaugeCollector {
|
||||
return &gaugeCollector{
|
||||
values: make(map[string]map[string]gaugeValue),
|
||||
}
|
||||
}
|
||||
|
||||
func (c *gaugeCollector) add(name string, delta float64, attributes otelLabelNamesValues) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
str := strings.Join(attributes, "")
|
||||
|
||||
if _, exists := c.values[name]; !exists {
|
||||
c.values[name] = map[string]gaugeValue{
|
||||
str: {
|
||||
attributes: attributes,
|
||||
value: delta,
|
||||
},
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
v, exists := c.values[name][str]
|
||||
if !exists {
|
||||
c.values[name][str] = gaugeValue{
|
||||
attributes: attributes,
|
||||
value: delta,
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
c.values[name][str] = gaugeValue{
|
||||
attributes: attributes,
|
||||
value: v.value + delta,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *gaugeCollector) set(name string, value float64, attributes otelLabelNamesValues) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
if _, exists := c.values[name]; !exists {
|
||||
c.values[name] = make(map[string]gaugeValue)
|
||||
}
|
||||
|
||||
c.values[name][strings.Join(attributes, "")] = gaugeValue{
|
||||
attributes: attributes,
|
||||
value: value,
|
||||
}
|
||||
}
|
||||
|
||||
func newOTLPGaugeFrom(meter metric.Meter, name, desc string, u unit.Unit) *otelGauge {
|
||||
openTelemetryGaugeCollector.values[name] = make(map[string]gaugeValue)
|
||||
|
||||
c, _ := meter.AsyncFloat64().Gauge(name,
|
||||
instrument.WithDescription(desc),
|
||||
instrument.WithUnit(u),
|
||||
)
|
||||
|
||||
err := meter.RegisterCallback([]instrument.Asynchronous{c}, func(ctx context.Context) {
|
||||
openTelemetryGaugeCollector.mu.Lock()
|
||||
defer openTelemetryGaugeCollector.mu.Unlock()
|
||||
|
||||
values, exists := openTelemetryGaugeCollector.values[name]
|
||||
if !exists {
|
||||
return
|
||||
}
|
||||
|
||||
for _, value := range values {
|
||||
c.Observe(ctx, value.value, value.attributes.ToLabels()...)
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
log.Err(err).Msg("Unable to register OpenTelemetry meter callback")
|
||||
}
|
||||
|
||||
return &otelGauge{
|
||||
ip: c,
|
||||
name: name,
|
||||
}
|
||||
}
|
||||
|
||||
type otelGauge struct {
|
||||
labelNamesValues otelLabelNamesValues
|
||||
ip asyncfloat64.Gauge
|
||||
name string
|
||||
}
|
||||
|
||||
func (g *otelGauge) With(labelValues ...string) metrics.Gauge {
|
||||
return &otelGauge{
|
||||
labelNamesValues: g.labelNamesValues.With(labelValues...),
|
||||
ip: g.ip,
|
||||
name: g.name,
|
||||
}
|
||||
}
|
||||
|
||||
func (g *otelGauge) Add(delta float64) {
|
||||
openTelemetryGaugeCollector.add(g.name, delta, g.labelNamesValues)
|
||||
}
|
||||
|
||||
func (g *otelGauge) Set(value float64) {
|
||||
openTelemetryGaugeCollector.set(g.name, value, g.labelNamesValues)
|
||||
}
|
||||
|
||||
func newOTLPHistogramFrom(meter metric.Meter, name, desc string, u unit.Unit) *otelHistogram {
|
||||
c, _ := meter.SyncFloat64().Histogram(name,
|
||||
instrument.WithDescription(desc),
|
||||
instrument.WithUnit(u),
|
||||
)
|
||||
|
||||
return &otelHistogram{
|
||||
ip: c,
|
||||
}
|
||||
}
|
||||
|
||||
type otelHistogram struct {
|
||||
labelNamesValues otelLabelNamesValues
|
||||
ip syncfloat64.Histogram
|
||||
}
|
||||
|
||||
func (h *otelHistogram) With(labelValues ...string) metrics.Histogram {
|
||||
return &otelHistogram{
|
||||
labelNamesValues: h.labelNamesValues.With(labelValues...),
|
||||
ip: h.ip,
|
||||
}
|
||||
}
|
||||
|
||||
func (h *otelHistogram) Observe(incr float64) {
|
||||
h.ip.Record(context.Background(), incr, h.labelNamesValues.ToLabels()...)
|
||||
}
|
||||
|
||||
// otelLabelNamesValues is the equivalent of prometheus' labelNamesValues
|
||||
// but adapted to OpenTelemetry.
|
||||
// otelLabelNamesValues is a type alias that provides validation on its With
|
||||
// method.
|
||||
// Metrics may include it as a member to help them satisfy With semantics and
|
||||
// save some code duplication.
|
||||
type otelLabelNamesValues []string
|
||||
|
||||
// With validates the input, and returns a new aggregate otelLabelNamesValues.
|
||||
func (lvs otelLabelNamesValues) With(labelValues ...string) otelLabelNamesValues {
|
||||
if len(labelValues)%2 != 0 {
|
||||
labelValues = append(labelValues, "unknown")
|
||||
}
|
||||
return append(lvs, labelValues...)
|
||||
}
|
||||
|
||||
// ToLabels is a convenience method to convert a otelLabelNamesValues
|
||||
// to the native attribute.KeyValue.
|
||||
func (lvs otelLabelNamesValues) ToLabels() []attribute.KeyValue {
|
||||
labels := make([]attribute.KeyValue, len(lvs)/2)
|
||||
for i := 0; i < len(labels); i++ {
|
||||
labels[i] = attribute.String(lvs[2*i], lvs[2*i+1])
|
||||
}
|
||||
return labels
|
||||
}
|
446
pkg/metrics/opentelemetry_test.go
Normal file
446
pkg/metrics/opentelemetry_test.go
Normal file
|
@ -0,0 +1,446 @@
|
|||
package metrics
|
||||
|
||||
import (
|
||||
"compress/gzip"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
ptypes "github.com/traefik/paerser/types"
|
||||
"github.com/traefik/traefik/v2/pkg/types"
|
||||
"go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
)
|
||||
|
||||
func TestOpenTelemetry_labels(t *testing.T) {
|
||||
tests := []struct {
|
||||
desc string
|
||||
values otelLabelNamesValues
|
||||
with []string
|
||||
expect []attribute.KeyValue
|
||||
}{
|
||||
{
|
||||
desc: "with no starting value",
|
||||
values: otelLabelNamesValues{},
|
||||
expect: []attribute.KeyValue{},
|
||||
},
|
||||
{
|
||||
desc: "with one starting value",
|
||||
values: otelLabelNamesValues{"foo"},
|
||||
expect: []attribute.KeyValue{},
|
||||
},
|
||||
{
|
||||
desc: "with two starting value",
|
||||
values: otelLabelNamesValues{"foo", "bar"},
|
||||
expect: []attribute.KeyValue{attribute.String("foo", "bar")},
|
||||
},
|
||||
{
|
||||
desc: "with no starting value, and with one other value",
|
||||
values: otelLabelNamesValues{},
|
||||
with: []string{"baz"},
|
||||
expect: []attribute.KeyValue{attribute.String("baz", "unknown")},
|
||||
},
|
||||
{
|
||||
desc: "with no starting value, and with two other value",
|
||||
values: otelLabelNamesValues{},
|
||||
with: []string{"baz", "buz"},
|
||||
expect: []attribute.KeyValue{attribute.String("baz", "buz")},
|
||||
},
|
||||
{
|
||||
desc: "with one starting value, and with one other value",
|
||||
values: otelLabelNamesValues{"foo"},
|
||||
with: []string{"baz"},
|
||||
expect: []attribute.KeyValue{attribute.String("foo", "baz")},
|
||||
},
|
||||
{
|
||||
desc: "with one starting value, and with two other value",
|
||||
values: otelLabelNamesValues{"foo"},
|
||||
with: []string{"baz", "buz"},
|
||||
expect: []attribute.KeyValue{attribute.String("foo", "baz")},
|
||||
},
|
||||
{
|
||||
desc: "with two starting value, and with one other value",
|
||||
values: otelLabelNamesValues{"foo", "bar"},
|
||||
with: []string{"baz"},
|
||||
expect: []attribute.KeyValue{
|
||||
attribute.String("foo", "bar"),
|
||||
attribute.String("baz", "unknown"),
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "with two starting value, and with two other value",
|
||||
values: otelLabelNamesValues{"foo", "bar"},
|
||||
with: []string{"baz", "buz"},
|
||||
expect: []attribute.KeyValue{
|
||||
attribute.String("foo", "bar"),
|
||||
attribute.String("baz", "buz"),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
test := test
|
||||
|
||||
t.Run(test.desc, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
assert.Equal(t, test.expect, test.values.With(test.with...).ToLabels())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestOpenTelemetry_GaugeCollectorAdd(t *testing.T) {
|
||||
tests := []struct {
|
||||
desc string
|
||||
gc *gaugeCollector
|
||||
delta float64
|
||||
name string
|
||||
attributes otelLabelNamesValues
|
||||
expect map[string]map[string]gaugeValue
|
||||
}{
|
||||
{
|
||||
desc: "empty collector",
|
||||
gc: newOpenTelemetryGaugeCollector(),
|
||||
delta: 1,
|
||||
name: "foo",
|
||||
expect: map[string]map[string]gaugeValue{
|
||||
"foo": {"": {value: 1}},
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "initialized collector",
|
||||
gc: &gaugeCollector{
|
||||
values: map[string]map[string]gaugeValue{
|
||||
"foo": {"": {value: 1}},
|
||||
},
|
||||
},
|
||||
delta: 1,
|
||||
name: "foo",
|
||||
expect: map[string]map[string]gaugeValue{
|
||||
"foo": {"": {value: 2}},
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "initialized collector, values with label (only the last one counts)",
|
||||
gc: &gaugeCollector{
|
||||
values: map[string]map[string]gaugeValue{
|
||||
"foo": {
|
||||
"bar": {
|
||||
attributes: otelLabelNamesValues{"bar"},
|
||||
value: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
delta: 1,
|
||||
name: "foo",
|
||||
expect: map[string]map[string]gaugeValue{
|
||||
"foo": {
|
||||
"": {
|
||||
value: 1,
|
||||
},
|
||||
"bar": {
|
||||
attributes: otelLabelNamesValues{"bar"},
|
||||
value: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "initialized collector, values with label on set",
|
||||
gc: &gaugeCollector{
|
||||
values: map[string]map[string]gaugeValue{
|
||||
"foo": {"bar": {value: 1}},
|
||||
},
|
||||
},
|
||||
delta: 1,
|
||||
name: "foo",
|
||||
attributes: otelLabelNamesValues{"baz"},
|
||||
expect: map[string]map[string]gaugeValue{
|
||||
"foo": {
|
||||
"bar": {
|
||||
value: 1,
|
||||
},
|
||||
"baz": {
|
||||
value: 1,
|
||||
attributes: otelLabelNamesValues{"baz"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
test := test
|
||||
|
||||
t.Run(test.desc, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
test.gc.add(test.name, test.delta, test.attributes)
|
||||
|
||||
assert.Equal(t, test.expect, test.gc.values)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestOpenTelemetry_GaugeCollectorSet(t *testing.T) {
|
||||
tests := []struct {
|
||||
desc string
|
||||
gc *gaugeCollector
|
||||
value float64
|
||||
name string
|
||||
attributes otelLabelNamesValues
|
||||
expect map[string]map[string]gaugeValue
|
||||
}{
|
||||
{
|
||||
desc: "empty collector",
|
||||
gc: newOpenTelemetryGaugeCollector(),
|
||||
value: 1,
|
||||
name: "foo",
|
||||
expect: map[string]map[string]gaugeValue{
|
||||
"foo": {"": {value: 1}},
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "initialized collector",
|
||||
gc: &gaugeCollector{
|
||||
values: map[string]map[string]gaugeValue{
|
||||
"foo": {"": {value: 1}},
|
||||
},
|
||||
},
|
||||
value: 1,
|
||||
name: "foo",
|
||||
expect: map[string]map[string]gaugeValue{
|
||||
"foo": {"": {value: 1}},
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "initialized collector, values with label",
|
||||
gc: &gaugeCollector{
|
||||
values: map[string]map[string]gaugeValue{
|
||||
"foo": {
|
||||
"bar": {
|
||||
attributes: otelLabelNamesValues{"bar"},
|
||||
value: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
value: 1,
|
||||
name: "foo",
|
||||
expect: map[string]map[string]gaugeValue{
|
||||
"foo": {
|
||||
"": {
|
||||
value: 1,
|
||||
},
|
||||
"bar": {
|
||||
attributes: otelLabelNamesValues{"bar"},
|
||||
value: 1,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "initialized collector, values with label on set",
|
||||
gc: &gaugeCollector{
|
||||
values: map[string]map[string]gaugeValue{
|
||||
"foo": {"": {value: 1}},
|
||||
},
|
||||
},
|
||||
value: 1,
|
||||
name: "foo",
|
||||
attributes: otelLabelNamesValues{"bar"},
|
||||
expect: map[string]map[string]gaugeValue{
|
||||
"foo": {
|
||||
"": {
|
||||
value: 1,
|
||||
},
|
||||
"bar": {
|
||||
value: 1,
|
||||
attributes: otelLabelNamesValues{"bar"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
test := test
|
||||
|
||||
t.Run(test.desc, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
test.gc.set(test.name, test.value, test.attributes)
|
||||
|
||||
assert.Equal(t, test.expect, test.gc.values)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestOpenTelemetry(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
c := make(chan *string)
|
||||
defer close(c)
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
gzr, err := gzip.NewReader(r.Body)
|
||||
require.NoError(t, err)
|
||||
|
||||
body, err := io.ReadAll(gzr)
|
||||
require.NoError(t, err)
|
||||
|
||||
req := pmetricotlp.NewExportRequest()
|
||||
err = req.UnmarshalProto(body)
|
||||
require.NoError(t, err)
|
||||
|
||||
marshalledReq, err := json.Marshal(req)
|
||||
require.NoError(t, err)
|
||||
|
||||
bodyStr := string(marshalledReq)
|
||||
c <- &bodyStr
|
||||
|
||||
_, err = fmt.Fprintln(w, "ok")
|
||||
require.NoError(t, err)
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
sURL, err := url.Parse(ts.URL)
|
||||
require.NoError(t, err)
|
||||
|
||||
var cfg types.OpenTelemetry
|
||||
(&cfg).SetDefaults()
|
||||
cfg.AddRoutersLabels = true
|
||||
cfg.Address = sURL.Host
|
||||
cfg.Insecure = true
|
||||
cfg.PushInterval = ptypes.Duration(10 * time.Millisecond)
|
||||
|
||||
registry := RegisterOpenTelemetry(context.Background(), &cfg)
|
||||
require.NotNil(t, registry)
|
||||
|
||||
if !registry.IsEpEnabled() || !registry.IsRouterEnabled() || !registry.IsSvcEnabled() {
|
||||
t.Fatalf("registry should return true for IsEnabled(), IsRouterEnabled() and IsSvcEnabled()")
|
||||
}
|
||||
|
||||
// TODO: the len of startUnixNano is no supposed to be 20, it should be 19
|
||||
expectedServer := []string{
|
||||
`({"name":"traefik_config_reloads_total","description":"Config reloads","unit":"1","sum":{"dataPoints":\[{"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":1}\],"aggregationTemporality":2,"isMonotonic":true}})`,
|
||||
`({"name":"traefik_config_reloads_failure_total","description":"Config reload failures","unit":"1","sum":{"dataPoints":\[{"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":1}\],"aggregationTemporality":2,"isMonotonic":true}})`,
|
||||
`({"name":"traefik_config_last_reload_success","description":"Last config reload success","unit":"ms","gauge":{"dataPoints":\[{"startTimeUnixNano":"[\d]{20}","timeUnixNano":"[\d]{19}","asDouble":1}\]}})`,
|
||||
`({"name":"traefik_config_last_reload_failure","description":"Last config reload failure","unit":"ms","gauge":{"dataPoints":\[{"startTimeUnixNano":"[\d]{20}","timeUnixNano":"[\d]{19}","asDouble":1}\]}})`,
|
||||
}
|
||||
|
||||
registry.ConfigReloadsCounter().Add(1)
|
||||
registry.ConfigReloadsFailureCounter().Add(1)
|
||||
registry.LastConfigReloadSuccessGauge().Set(1)
|
||||
registry.LastConfigReloadFailureGauge().Set(1)
|
||||
msgServer := <-c
|
||||
|
||||
assertMessage(t, *msgServer, expectedServer)
|
||||
|
||||
expectedTLS := []string{
|
||||
`({"name":"traefik_tls_certs_not_after","description":"Certificate expiration timestamp","unit":"ms","gauge":{"dataPoints":\[{"attributes":\[{"key":"key","value":{"stringValue":"value"}}\],"startTimeUnixNano":"[\d]{20}","timeUnixNano":"[\d]{19}","asDouble":1}\]}})`,
|
||||
}
|
||||
|
||||
registry.TLSCertsNotAfterTimestampGauge().With("key", "value").Set(1)
|
||||
msgTLS := <-c
|
||||
|
||||
assertMessage(t, *msgTLS, expectedTLS)
|
||||
|
||||
expectedEntrypoint := []string{
|
||||
`({"name":"traefik_entrypoint_requests_total","description":"How many HTTP requests processed on an entrypoint, partitioned by status code, protocol, and method.","unit":"1","sum":{"dataPoints":\[{"attributes":\[{"key":"code","value":{"stringValue":"200"}},{"key":"entrypoint","value":{"stringValue":"test1"}},{"key":"method","value":{"stringValue":"GET"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":1}\],"aggregationTemporality":2,"isMonotonic":true}})`,
|
||||
`({"name":"traefik_entrypoint_requests_tls_total","description":"How many HTTP requests with TLS processed on an entrypoint, partitioned by TLS Version and TLS cipher Used.","unit":"1","sum":{"dataPoints":\[{"attributes":\[{"key":"entrypoint","value":{"stringValue":"test2"}},{"key":"tls_cipher","value":{"stringValue":"bar"}},{"key":"tls_version","value":{"stringValue":"foo"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":1}\],"aggregationTemporality":2,"isMonotonic":true}})`,
|
||||
`({"name":"traefik_entrypoint_request_duration_seconds","description":"How long it took to process the request on an entrypoint, partitioned by status code, protocol, and method.","unit":"ms","histogram":{"dataPoints":\[{"attributes":\[{"key":"entrypoint","value":{"stringValue":"test3"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","count":"1","sum":10000,"bucketCounts":\["0","0","0","0","0","0","0","0","0","0","0","1"\],"explicitBounds":\[0.005,0.01,0.025,0.05,0.1,0.25,0.5,1,2.5,5,10\],"min":10000,"max":10000}\],"aggregationTemporality":2}})`,
|
||||
`({"name":"traefik_entrypoint_open_connections","description":"How many open connections exist on an entrypoint, partitioned by method and protocol.","unit":"1","gauge":{"dataPoints":\[{"attributes":\[{"key":"entrypoint","value":{"stringValue":"test4"}}\],"startTimeUnixNano":"[\d]{20}","timeUnixNano":"[\d]{19}","asDouble":1}\]}})`,
|
||||
}
|
||||
|
||||
registry.EntryPointReqsCounter().With("entrypoint", "test1", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1)
|
||||
registry.EntryPointReqsTLSCounter().With("entrypoint", "test2", "tls_version", "foo", "tls_cipher", "bar").Add(1)
|
||||
registry.EntryPointReqDurationHistogram().With("entrypoint", "test3").Observe(10000)
|
||||
registry.EntryPointOpenConnsGauge().With("entrypoint", "test4").Set(1)
|
||||
msgEntrypoint := <-c
|
||||
|
||||
assertMessage(t, *msgEntrypoint, expectedEntrypoint)
|
||||
|
||||
expectedRouter := []string{
|
||||
`({"name":"traefik_router_requests_total","description":"How many HTTP requests are processed on a router, partitioned by service, status code, protocol, and method.","unit":"1","sum":{"dataPoints":\[{"attributes":\[{"key":"code","value":{"stringValue":"(?:200|404)"}},{"key":"method","value":{"stringValue":"GET"}},{"key":"router","value":{"stringValue":"RouterReqsCounter"}},{"key":"service","value":{"stringValue":"test"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":1},{"attributes":\[{"key":"code","value":{"stringValue":"(?:200|404)"}},{"key":"method","value":{"stringValue":"GET"}},{"key":"router","value":{"stringValue":"RouterReqsCounter"}},{"key":"service","value":{"stringValue":"test"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":1}\],"aggregationTemporality":2,"isMonotonic":true}})`,
|
||||
`({"name":"traefik_router_requests_tls_total","description":"How many HTTP requests with TLS are processed on a router, partitioned by service, TLS Version, and TLS cipher Used.","unit":"1","sum":{"dataPoints":\[{"attributes":\[{"key":"router","value":{"stringValue":"demo"}},{"key":"service","value":{"stringValue":"test"}},{"key":"tls_cipher","value":{"stringValue":"bar"}},{"key":"tls_version","value":{"stringValue":"foo"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":1}\],"aggregationTemporality":2,"isMonotonic":true}})`,
|
||||
`({"name":"traefik_router_request_duration_seconds","description":"How long it took to process the request on a router, partitioned by service, status code, protocol, and method.","unit":"ms","histogram":{"dataPoints":\[{"attributes":\[{"key":"code","value":{"stringValue":"200"}},{"key":"router","value":{"stringValue":"demo"}},{"key":"service","value":{"stringValue":"test"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","count":"1","sum":10000,"bucketCounts":\["0","0","0","0","0","0","0","0","0","0","0","1"\],"explicitBounds":\[0.005,0.01,0.025,0.05,0.1,0.25,0.5,1,2.5,5,10\],"min":10000,"max":10000}\],"aggregationTemporality":2}})`,
|
||||
`({"name":"traefik_router_open_connections","description":"How many open connections exist on a router, partitioned by service, method, and protocol.","unit":"1","gauge":{"dataPoints":\[{"attributes":\[{"key":"router","value":{"stringValue":"demo"}},{"key":"service","value":{"stringValue":"test"}}\],"startTimeUnixNano":"[\d]{20}","timeUnixNano":"[\d]{19}","asDouble":1}\]}})`,
|
||||
}
|
||||
|
||||
registry.RouterReqsCounter().With("router", "RouterReqsCounter", "service", "test", "code", strconv.Itoa(http.StatusNotFound), "method", http.MethodGet).Add(1)
|
||||
registry.RouterReqsCounter().With("router", "RouterReqsCounter", "service", "test", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1)
|
||||
registry.RouterReqsTLSCounter().With("router", "demo", "service", "test", "tls_version", "foo", "tls_cipher", "bar").Add(1)
|
||||
registry.RouterReqDurationHistogram().With("router", "demo", "service", "test", "code", strconv.Itoa(http.StatusOK)).Observe(10000)
|
||||
registry.RouterOpenConnsGauge().With("router", "demo", "service", "test").Set(1)
|
||||
msgRouter := <-c
|
||||
|
||||
assertMessage(t, *msgRouter, expectedRouter)
|
||||
|
||||
expectedService := []string{
|
||||
`({"name":"traefik_service_requests_total","description":"How many HTTP requests processed on a service, partitioned by status code, protocol, and method.","unit":"1","sum":{"dataPoints":\[{"attributes":\[{"key":"code","value":{"stringValue":"(?:200|404)"}},{"key":"method","value":{"stringValue":"GET"}},{"key":"service","value":{"stringValue":"ServiceReqsCounter"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":1},{"attributes":\[{"key":"code","value":{"stringValue":"(?:200|404)"}},{"key":"method","value":{"stringValue":"GET"}},{"key":"service","value":{"stringValue":"ServiceReqsCounter"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":1}\],"aggregationTemporality":2,"isMonotonic":true}})`,
|
||||
`({"name":"traefik_service_requests_tls_total","description":"How many HTTP requests with TLS processed on a service, partitioned by TLS version and TLS cipher.","unit":"1","sum":{"dataPoints":\[{"attributes":\[{"key":"service","value":{"stringValue":"test"}},{"key":"tls_cipher","value":{"stringValue":"bar"}},{"key":"tls_version","value":{"stringValue":"foo"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":1}\],"aggregationTemporality":2,"isMonotonic":true}})`,
|
||||
`({"name":"traefik_service_request_duration_seconds","description":"How long it took to process the request on a service, partitioned by status code, protocol, and method.","unit":"ms","histogram":{"dataPoints":\[{"attributes":\[{"key":"code","value":{"stringValue":"200"}},{"key":"service","value":{"stringValue":"test"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","count":"1","sum":10000,"bucketCounts":\["0","0","0","0","0","0","0","0","0","0","0","1"\],"explicitBounds":\[0.005,0.01,0.025,0.05,0.1,0.25,0.5,1,2.5,5,10\],"min":10000,"max":10000}\],"aggregationTemporality":2}})`,
|
||||
`({"name":"traefik_service_server_up","description":"service server is up, described by gauge value of 0 or 1.","unit":"1","gauge":{"dataPoints":\[{"attributes":\[{"key":"service","value":{"stringValue":"test"}},{"key":"url","value":{"stringValue":"http://127.0.0.1"}}\],"startTimeUnixNano":"[\d]{20}","timeUnixNano":"[\d]{19}","asDouble":1}\]}})`,
|
||||
}
|
||||
|
||||
registry.ServiceReqsCounter().With("service", "ServiceReqsCounter", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet).Add(1)
|
||||
registry.ServiceReqsCounter().With("service", "ServiceReqsCounter", "code", strconv.Itoa(http.StatusNotFound), "method", http.MethodGet).Add(1)
|
||||
registry.ServiceReqsTLSCounter().With("service", "test", "tls_version", "foo", "tls_cipher", "bar").Add(1)
|
||||
registry.ServiceReqDurationHistogram().With("service", "test", "code", strconv.Itoa(http.StatusOK)).Observe(10000)
|
||||
registry.ServiceServerUpGauge().With("service", "test", "url", "http://127.0.0.1").Set(1)
|
||||
msgService := <-c
|
||||
|
||||
assertMessage(t, *msgService, expectedService)
|
||||
|
||||
expectedServiceRetries := []string{
|
||||
`({"attributes":\[{"key":"service","value":{"stringValue":"foobar"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":1})`,
|
||||
`({"attributes":\[{"key":"service","value":{"stringValue":"test"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":2})`,
|
||||
}
|
||||
|
||||
registry.ServiceRetriesCounter().With("service", "test").Add(1)
|
||||
registry.ServiceRetriesCounter().With("service", "test").Add(1)
|
||||
registry.ServiceRetriesCounter().With("service", "foobar").Add(1)
|
||||
msgServiceRetries := <-c
|
||||
|
||||
assertMessage(t, *msgServiceRetries, expectedServiceRetries)
|
||||
|
||||
expectedServiceOpenConns := []string{
|
||||
`({"attributes":\[{"key":"service","value":{"stringValue":"test"}}\],"startTimeUnixNano":"[\d]{20}","timeUnixNano":"[\d]{19}","asDouble":3})`,
|
||||
`({"attributes":\[{"key":"service","value":{"stringValue":"foobar"}}\],"startTimeUnixNano":"[\d]{20}","timeUnixNano":"[\d]{19}","asDouble":1})`,
|
||||
}
|
||||
|
||||
registry.ServiceOpenConnsGauge().With("service", "test").Set(1)
|
||||
registry.ServiceOpenConnsGauge().With("service", "test").Add(1)
|
||||
registry.ServiceOpenConnsGauge().With("service", "test").Add(1)
|
||||
registry.ServiceOpenConnsGauge().With("service", "foobar").Add(1)
|
||||
msgServiceOpenConns := <-c
|
||||
|
||||
assertMessage(t, *msgServiceOpenConns, expectedServiceOpenConns)
|
||||
|
||||
expectedEntryPointReqDuration := []string{
|
||||
`({"attributes":\[{"key":"entrypoint","value":{"stringValue":"myEntrypoint"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","count":"2","sum":30000,"bucketCounts":\["0","0","0","0","0","0","0","0","0","0","0","2"\],"explicitBounds":\[0.005,0.01,0.025,0.05,0.1,0.25,0.5,1,2.5,5,10\],"min":10000,"max":20000})`,
|
||||
}
|
||||
|
||||
registry.EntryPointReqDurationHistogram().With("entrypoint", "myEntrypoint").Observe(10000)
|
||||
registry.EntryPointReqDurationHistogram().With("entrypoint", "myEntrypoint").Observe(20000)
|
||||
msgEntryPointReqDurationHistogram := <-c
|
||||
|
||||
assertMessage(t, *msgEntryPointReqDurationHistogram, expectedEntryPointReqDuration)
|
||||
|
||||
// We need to unlock the HTTP Server for the last export call when stopping
|
||||
// OpenTelemetry.
|
||||
go func() {
|
||||
<-c
|
||||
}()
|
||||
StopOpenTelemetry()
|
||||
}
|
|
@ -113,4 +113,5 @@ func stopMetricsClients() {
|
|||
metrics.StopStatsd()
|
||||
metrics.StopInfluxDB()
|
||||
metrics.StopInfluxDB2()
|
||||
metrics.StopOpenTelemetry()
|
||||
}
|
||||
|
|
138
pkg/tracing/opentelemetry/opentelemetry.go
Normal file
138
pkg/tracing/opentelemetry/opentelemetry.go
Normal file
|
@ -0,0 +1,138 @@
|
|||
package opentelemetry
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
|
||||
"github.com/opentracing/opentracing-go"
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/traefik/traefik/v2/pkg/types"
|
||||
"github.com/traefik/traefik/v2/pkg/version"
|
||||
"go.opentelemetry.io/otel"
|
||||
oteltracer "go.opentelemetry.io/otel/bridge/opentracing"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
|
||||
"go.opentelemetry.io/otel/propagation"
|
||||
sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/encoding/gzip"
|
||||
)
|
||||
|
||||
// Config provides configuration settings for the open-telemetry tracer.
|
||||
type Config struct {
|
||||
// NOTE: as no gRPC option is implemented yet, the type is empty and is used as a boolean for upward compatibility purposes.
|
||||
GRPC *struct{} `description:"gRPC specific configuration for the OpenTelemetry collector." json:"grpc,omitempty" toml:"grpc,omitempty" yaml:"grpc,omitempty" export:"true" label:"allowEmpty" file:"allowEmpty"`
|
||||
|
||||
Address string `description:"Sets the address (host:port) of the collector endpoint." json:"address,omitempty" toml:"address,omitempty" yaml:"address,omitempty"`
|
||||
Path string `description:"Sets the URL path of the collector endpoint." json:"path,omitempty" toml:"path,omitempty" yaml:"path,omitempty" export:"true"`
|
||||
Insecure bool `description:"Disables client transport security for the exporter." json:"insecure,omitempty" toml:"insecure,omitempty" yaml:"insecure,omitempty" export:"true"`
|
||||
Headers map[string]string `description:"Defines additional headers to be sent with the payloads." json:"headers,omitempty" toml:"headers,omitempty" yaml:"headers,omitempty" export:"true"`
|
||||
TLS *types.ClientTLS `description:"Defines client transport security parameters." json:"tls,omitempty" toml:"tls,omitempty" yaml:"tls,omitempty" export:"true"`
|
||||
}
|
||||
|
||||
// SetDefaults sets the default values.
|
||||
func (c *Config) SetDefaults() {
|
||||
c.Address = "localhost:4318"
|
||||
}
|
||||
|
||||
// Setup sets up the tracer.
|
||||
func (c *Config) Setup(componentName string) (opentracing.Tracer, io.Closer, error) {
|
||||
var (
|
||||
err error
|
||||
exporter *otlptrace.Exporter
|
||||
)
|
||||
if c.GRPC != nil {
|
||||
exporter, err = c.setupGRPCExporter()
|
||||
} else {
|
||||
exporter, err = c.setupHTTPExporter()
|
||||
}
|
||||
if err != nil {
|
||||
return nil, nil, fmt.Errorf("setting up exporter: %w", err)
|
||||
}
|
||||
|
||||
bt := oteltracer.NewBridgeTracer()
|
||||
bt.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
|
||||
|
||||
bt.SetOpenTelemetryTracer(otel.Tracer(componentName, trace.WithInstrumentationVersion(version.Version)))
|
||||
opentracing.SetGlobalTracer(bt)
|
||||
|
||||
tracerProvider := sdktrace.NewTracerProvider(sdktrace.WithBatcher(exporter))
|
||||
otel.SetTracerProvider(tracerProvider)
|
||||
|
||||
log.Debug().Msg("OpenTelemetry tracer configured")
|
||||
|
||||
return bt, tpCloser{provider: tracerProvider}, err
|
||||
}
|
||||
|
||||
func (c *Config) setupHTTPExporter() (*otlptrace.Exporter, error) {
|
||||
host, port, err := net.SplitHostPort(c.Address)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid collector address %q: %w", c.Address, err)
|
||||
}
|
||||
|
||||
opts := []otlptracehttp.Option{
|
||||
otlptracehttp.WithEndpoint(fmt.Sprintf("%s:%s", host, port)),
|
||||
otlptracehttp.WithHeaders(c.Headers),
|
||||
otlptracehttp.WithCompression(otlptracehttp.GzipCompression),
|
||||
}
|
||||
|
||||
if c.Insecure {
|
||||
opts = append(opts, otlptracehttp.WithInsecure())
|
||||
}
|
||||
|
||||
if c.Path != "" {
|
||||
opts = append(opts, otlptracehttp.WithURLPath(c.Path))
|
||||
}
|
||||
|
||||
if c.TLS != nil {
|
||||
tlsConfig, err := c.TLS.CreateTLSConfig(context.Background())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("creating TLS client config: %w", err)
|
||||
}
|
||||
|
||||
opts = append(opts, otlptracehttp.WithTLSClientConfig(tlsConfig))
|
||||
}
|
||||
|
||||
return otlptrace.New(context.Background(), otlptracehttp.NewClient(opts...))
|
||||
}
|
||||
|
||||
func (c *Config) setupGRPCExporter() (*otlptrace.Exporter, error) {
|
||||
host, port, err := net.SplitHostPort(c.Address)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("invalid collector address %q: %w", c.Address, err)
|
||||
}
|
||||
|
||||
opts := []otlptracegrpc.Option{
|
||||
otlptracegrpc.WithEndpoint(fmt.Sprintf("%s:%s", host, port)),
|
||||
otlptracegrpc.WithHeaders(c.Headers),
|
||||
otlptracegrpc.WithCompressor(gzip.Name),
|
||||
}
|
||||
|
||||
if c.Insecure {
|
||||
opts = append(opts, otlptracegrpc.WithInsecure())
|
||||
}
|
||||
|
||||
if c.TLS != nil {
|
||||
tlsConfig, err := c.TLS.CreateTLSConfig(context.Background())
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("creating TLS client config: %w", err)
|
||||
}
|
||||
|
||||
opts = append(opts, otlptracegrpc.WithTLSCredentials(credentials.NewTLS(tlsConfig)))
|
||||
}
|
||||
|
||||
return otlptrace.New(context.Background(), otlptracegrpc.NewClient(opts...))
|
||||
}
|
||||
|
||||
// tpCloser converts a TraceProvider into an io.Closer.
|
||||
type tpCloser struct {
|
||||
provider *sdktrace.TracerProvider
|
||||
}
|
||||
|
||||
func (t tpCloser) Close() error {
|
||||
return t.provider.Shutdown(context.Background())
|
||||
}
|
67
pkg/tracing/opentelemetry/opentelemetry_test.go
Normal file
67
pkg/tracing/opentelemetry/opentelemetry_test.go
Normal file
|
@ -0,0 +1,67 @@
|
|||
package opentelemetry
|
||||
|
||||
import (
|
||||
"compress/gzip"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
mtracing "github.com/traefik/traefik/v2/pkg/middlewares/tracing"
|
||||
"github.com/traefik/traefik/v2/pkg/tracing"
|
||||
"go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp"
|
||||
)
|
||||
|
||||
func TestTraceContextPropagation(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
gzr, err := gzip.NewReader(r.Body)
|
||||
require.NoError(t, err)
|
||||
|
||||
body, err := io.ReadAll(gzr)
|
||||
require.NoError(t, err)
|
||||
|
||||
req := ptraceotlp.NewExportRequest()
|
||||
err = req.UnmarshalProto(body)
|
||||
require.NoError(t, err)
|
||||
|
||||
marshalledReq, err := json.Marshal(req)
|
||||
require.NoError(t, err)
|
||||
|
||||
bodyStr := string(marshalledReq)
|
||||
assert.Regexp(t, `("traceId":"00000000000000000000000000000001")`, bodyStr)
|
||||
assert.Regexp(t, `("parentSpanId":"0000000000000001")`, bodyStr)
|
||||
assert.Regexp(t, `("traceState":"foo=bar")`, bodyStr)
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
cfg := Config{
|
||||
Address: strings.TrimPrefix(ts.URL, "http://"),
|
||||
Insecure: true,
|
||||
}
|
||||
|
||||
newTracing, err := tracing.NewTracing("", 0, &cfg)
|
||||
require.NoError(t, err)
|
||||
defer newTracing.Close()
|
||||
|
||||
req := httptest.NewRequest(http.MethodGet, "http://www.test.com", nil)
|
||||
req.Header.Set("traceparent", "00-00000000000000000000000000000001-0000000000000001-00")
|
||||
req.Header.Set("tracestate", "foo=bar")
|
||||
rw := httptest.NewRecorder()
|
||||
|
||||
var forwarded bool
|
||||
next := http.HandlerFunc(func(http.ResponseWriter, *http.Request) {
|
||||
forwarded = true
|
||||
})
|
||||
|
||||
handler := mtracing.NewEntryPoint(context.Background(), newTracing, "test", next)
|
||||
handler.ServeHTTP(rw, req)
|
||||
|
||||
require.True(t, forwarded)
|
||||
}
|
|
@ -72,8 +72,8 @@ func TestComputeHash(t *testing.T) {
|
|||
}{
|
||||
{
|
||||
desc: "hashing",
|
||||
text: "some very long pice of text",
|
||||
expected: "0258ea1c",
|
||||
text: "some very long piece of text",
|
||||
expected: "0c6e798b",
|
||||
},
|
||||
{
|
||||
desc: "short text less than limit 10",
|
||||
|
@ -109,7 +109,7 @@ func TestTruncateString(t *testing.T) {
|
|||
},
|
||||
{
|
||||
desc: "basic truncate with limit 10",
|
||||
text: "some very long pice of text",
|
||||
text: "some very long piece of text",
|
||||
limit: 10,
|
||||
expected: "some ve...",
|
||||
},
|
||||
|
|
|
@ -10,11 +10,12 @@ import (
|
|||
|
||||
// Metrics provides options to expose and send Traefik metrics to different third party monitoring systems.
|
||||
type Metrics struct {
|
||||
Prometheus *Prometheus `description:"Prometheus metrics exporter type." json:"prometheus,omitempty" toml:"prometheus,omitempty" yaml:"prometheus,omitempty" label:"allowEmpty" file:"allowEmpty" export:"true"`
|
||||
Datadog *Datadog `description:"Datadog metrics exporter type." json:"datadog,omitempty" toml:"datadog,omitempty" yaml:"datadog,omitempty" label:"allowEmpty" file:"allowEmpty" export:"true"`
|
||||
StatsD *Statsd `description:"StatsD metrics exporter type." json:"statsD,omitempty" toml:"statsD,omitempty" yaml:"statsD,omitempty" label:"allowEmpty" file:"allowEmpty" export:"true"`
|
||||
InfluxDB *InfluxDB `description:"InfluxDB metrics exporter type." json:"influxDB,omitempty" toml:"influxDB,omitempty" yaml:"influxDB,omitempty" label:"allowEmpty" file:"allowEmpty" export:"true"`
|
||||
InfluxDB2 *InfluxDB2 `description:"InfluxDB v2 metrics exporter type." json:"influxDB2,omitempty" toml:"influxDB2,omitempty" yaml:"influxDB2,omitempty" label:"allowEmpty" file:"allowEmpty" export:"true"`
|
||||
Prometheus *Prometheus `description:"Prometheus metrics exporter type." json:"prometheus,omitempty" toml:"prometheus,omitempty" yaml:"prometheus,omitempty" label:"allowEmpty" file:"allowEmpty" export:"true"`
|
||||
Datadog *Datadog `description:"Datadog metrics exporter type." json:"datadog,omitempty" toml:"datadog,omitempty" yaml:"datadog,omitempty" label:"allowEmpty" file:"allowEmpty" export:"true"`
|
||||
StatsD *Statsd `description:"StatsD metrics exporter type." json:"statsD,omitempty" toml:"statsD,omitempty" yaml:"statsD,omitempty" label:"allowEmpty" file:"allowEmpty" export:"true"`
|
||||
InfluxDB *InfluxDB `description:"InfluxDB metrics exporter type." json:"influxDB,omitempty" toml:"influxDB,omitempty" yaml:"influxDB,omitempty" label:"allowEmpty" file:"allowEmpty" export:"true"`
|
||||
InfluxDB2 *InfluxDB2 `description:"InfluxDB v2 metrics exporter type." json:"influxDB2,omitempty" toml:"influxDB2,omitempty" yaml:"influxDB2,omitempty" label:"allowEmpty" file:"allowEmpty" export:"true"`
|
||||
OpenTelemetry *OpenTelemetry `description:"OpenTelemetry metrics exporter type." json:"openTelemetry,omitempty" toml:"openTelemetry,omitempty" yaml:"openTelemetry,omitempty" label:"allowEmpty" file:"allowEmpty" export:"true"`
|
||||
}
|
||||
|
||||
// Prometheus can contain specific configuration used by the Prometheus Metrics exporter.
|
||||
|
@ -127,6 +128,32 @@ func (i *InfluxDB2) SetDefaults() {
|
|||
i.AddServicesLabels = true
|
||||
}
|
||||
|
||||
// OpenTelemetry contains specific configuration used by the OpenTelemetry Metrics exporter.
|
||||
type OpenTelemetry struct {
|
||||
// NOTE: as no gRPC option is implemented yet, the type is empty and is used as a boolean for upward compatibility purposes.
|
||||
GRPC *struct{} `description:"gRPC specific configuration for the OpenTelemetry collector." json:"grpc,omitempty" toml:"grpc,omitempty" yaml:"grpc,omitempty" export:"true" label:"allowEmpty" file:"allowEmpty"`
|
||||
|
||||
Address string `description:"Address (host:port) of the collector endpoint." json:"address,omitempty" toml:"address,omitempty" yaml:"address,omitempty"`
|
||||
AddEntryPointsLabels bool `description:"Enable metrics on entry points." json:"addEntryPointsLabels,omitempty" toml:"addEntryPointsLabels,omitempty" yaml:"addEntryPointsLabels,omitempty" export:"true"`
|
||||
AddRoutersLabels bool `description:"Enable metrics on routers." json:"addRoutersLabels,omitempty" toml:"addRoutersLabels,omitempty" yaml:"addRoutersLabels,omitempty" export:"true"`
|
||||
AddServicesLabels bool `description:"Enable metrics on services." json:"addServicesLabels,omitempty" toml:"addServicesLabels,omitempty" yaml:"addServicesLabels,omitempty" export:"true"`
|
||||
ExplicitBoundaries []float64 `description:"Boundaries for latency metrics." json:"explicitBoundaries,omitempty" toml:"explicitBoundaries,omitempty" yaml:"explicitBoundaries,omitempty" export:"true"`
|
||||
Headers map[string]string `description:"Headers sent with payload." json:"headers,omitempty" toml:"headers,omitempty" yaml:"headers,omitempty" export:"true"`
|
||||
Insecure bool `description:"Disables client transport security for the exporter." json:"insecure,omitempty" toml:"insecure,omitempty" yaml:"insecure,omitempty" export:"true"`
|
||||
Path string `description:"Set the URL path of the collector endpoint." json:"path,omitempty" toml:"path,omitempty" yaml:"path,omitempty" export:"true"`
|
||||
PushInterval types.Duration `description:"Period between calls to collect a checkpoint." json:"pushInterval,omitempty" toml:"pushInterval,omitempty" yaml:"pushInterval,omitempty" export:"true"`
|
||||
TLS *ClientTLS `description:"Enable TLS support." json:"tls,omitempty" toml:"tls,omitempty" yaml:"tls,omitempty" export:"true"`
|
||||
}
|
||||
|
||||
// SetDefaults sets the default values.
|
||||
func (o *OpenTelemetry) SetDefaults() {
|
||||
o.Address = "localhost:4318"
|
||||
o.AddEntryPointsLabels = true
|
||||
o.AddServicesLabels = true
|
||||
o.ExplicitBoundaries = []float64{.005, .01, .025, .05, .1, .25, .5, 1, 2.5, 5, 10}
|
||||
o.PushInterval = types.Duration(10 * time.Second)
|
||||
}
|
||||
|
||||
// Statistics provides options for monitoring request and response stats.
|
||||
type Statistics struct {
|
||||
RecentErrors int `description:"Number of recent errors logged." json:"recentErrors,omitempty" toml:"recentErrors,omitempty" yaml:"recentErrors,omitempty" export:"true"`
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue