1
0
Fork 0

Manage observability at entrypoint and router level

Co-authored-by: Kevin Pollet <pollet.kevin@gmail.com>
This commit is contained in:
Romain 2024-12-12 09:52:07 +01:00 committed by GitHub
parent 9588e51146
commit b1934231ca
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
58 changed files with 1216 additions and 303 deletions

View file

@ -2,21 +2,15 @@ package observability
import (
"context"
"fmt"
"net/http"
"strconv"
"strings"
"time"
"github.com/containous/alice"
"github.com/rs/zerolog/log"
"github.com/traefik/traefik/v3/pkg/metrics"
"github.com/traefik/traefik/v3/pkg/middlewares"
"github.com/traefik/traefik/v3/pkg/middlewares/accesslog"
"github.com/traefik/traefik/v3/pkg/tracing"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
)
@ -28,24 +22,19 @@ const (
type entryPointTracing struct {
tracer *tracing.Tracer
entryPoint string
next http.Handler
semConvMetricRegistry *metrics.SemConvMetricsRegistry
entryPoint string
next http.Handler
}
// WrapEntryPointHandler Wraps tracing to alice.Constructor.
func WrapEntryPointHandler(ctx context.Context, tracer *tracing.Tracer, semConvMetricRegistry *metrics.SemConvMetricsRegistry, entryPointName string) alice.Constructor {
// EntryPointHandler Wraps tracing to alice.Constructor.
func EntryPointHandler(ctx context.Context, tracer *tracing.Tracer, entryPointName string) alice.Constructor {
return func(next http.Handler) (http.Handler, error) {
if tracer == nil {
tracer = tracing.NewTracer(noop.Tracer{}, nil, nil, nil)
}
return newEntryPoint(ctx, tracer, semConvMetricRegistry, entryPointName, next), nil
return newEntryPoint(ctx, tracer, entryPointName, next), nil
}
}
// newEntryPoint creates a new tracing middleware for incoming requests.
func newEntryPoint(ctx context.Context, tracer *tracing.Tracer, semConvMetricRegistry *metrics.SemConvMetricsRegistry, entryPointName string, next http.Handler) http.Handler {
func newEntryPoint(ctx context.Context, tracer *tracing.Tracer, entryPointName string, next http.Handler) http.Handler {
middlewares.GetLogger(ctx, "tracing", entryPointTypeName).Debug().Msg("Creating middleware")
if tracer == nil {
@ -53,10 +42,9 @@ func newEntryPoint(ctx context.Context, tracer *tracing.Tracer, semConvMetricReg
}
return &entryPointTracing{
entryPoint: entryPointName,
tracer: tracer,
semConvMetricRegistry: semConvMetricRegistry,
next: next,
entryPoint: entryPointName,
tracer: tracer,
next: next,
}
}
@ -88,23 +76,4 @@ func (e *entryPointTracing) ServeHTTP(rw http.ResponseWriter, req *http.Request)
end := time.Now()
span.End(trace.WithTimestamp(end))
if e.semConvMetricRegistry != nil && e.semConvMetricRegistry.HTTPServerRequestDuration() != nil {
var attrs []attribute.KeyValue
if recorder.Status() < 100 || recorder.Status() >= 600 {
attrs = append(attrs, attribute.Key("error.type").String(fmt.Sprintf("Invalid HTTP status code ; %d", recorder.Status())))
} else if recorder.Status() >= 400 {
attrs = append(attrs, attribute.Key("error.type").String(strconv.Itoa(recorder.Status())))
}
attrs = append(attrs, semconv.HTTPRequestMethodKey.String(req.Method))
attrs = append(attrs, semconv.HTTPResponseStatusCode(recorder.Status()))
attrs = append(attrs, semconv.NetworkProtocolName(strings.ToLower(req.Proto)))
attrs = append(attrs, semconv.NetworkProtocolVersion(Proto(req.Proto)))
attrs = append(attrs, semconv.ServerAddress(req.Host))
attrs = append(attrs, semconv.URLScheme(req.Header.Get("X-Forwarded-Proto")))
e.semConvMetricRegistry.HTTPServerRequestDuration().Record(req.Context(), end.Sub(start).Seconds(), metric.WithAttributes(attrs...))
}
}

View file

@ -5,19 +5,11 @@ import (
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
ptypes "github.com/traefik/paerser/types"
"github.com/traefik/traefik/v3/pkg/metrics"
"github.com/traefik/traefik/v3/pkg/middlewares/accesslog"
"github.com/traefik/traefik/v3/pkg/tracing"
"github.com/traefik/traefik/v3/pkg/types"
"go.opentelemetry.io/otel/attribute"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
)
func TestEntryPointMiddleware_tracing(t *testing.T) {
@ -77,7 +69,7 @@ func TestEntryPointMiddleware_tracing(t *testing.T) {
tracer := &mockTracer{}
handler := newEntryPoint(context.Background(), tracing.NewTracer(tracer, []string{"X-Foo"}, []string{"X-Bar"}, []string{"q"}), nil, test.entryPoint, next)
handler := newEntryPoint(context.Background(), tracing.NewTracer(tracer, []string{"X-Foo"}, []string{"X-Bar"}, []string{"q"}), test.entryPoint, next)
handler.ServeHTTP(rw, req)
for _, span := range tracer.spans {
@ -88,101 +80,6 @@ func TestEntryPointMiddleware_tracing(t *testing.T) {
}
}
func TestEntryPointMiddleware_metrics(t *testing.T) {
tests := []struct {
desc string
statusCode int
wantAttributes attribute.Set
}{
{
desc: "not found status",
statusCode: http.StatusNotFound,
wantAttributes: attribute.NewSet(
attribute.Key("error.type").String("404"),
attribute.Key("http.request.method").String("GET"),
attribute.Key("http.response.status_code").Int(404),
attribute.Key("network.protocol.name").String("http/1.1"),
attribute.Key("network.protocol.version").String("1.1"),
attribute.Key("server.address").String("www.test.com"),
attribute.Key("url.scheme").String("http"),
),
},
{
desc: "created status",
statusCode: http.StatusCreated,
wantAttributes: attribute.NewSet(
attribute.Key("http.request.method").String("GET"),
attribute.Key("http.response.status_code").Int(201),
attribute.Key("network.protocol.name").String("http/1.1"),
attribute.Key("network.protocol.version").String("1.1"),
attribute.Key("server.address").String("www.test.com"),
attribute.Key("url.scheme").String("http"),
),
},
}
for _, test := range tests {
t.Run(test.desc, func(t *testing.T) {
t.Parallel()
var cfg types.OTLP
(&cfg).SetDefaults()
cfg.AddRoutersLabels = true
cfg.PushInterval = ptypes.Duration(10 * time.Millisecond)
rdr := sdkmetric.NewManualReader()
meterProvider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(rdr))
// force the meter provider with manual reader to collect metrics for the test.
metrics.SetMeterProvider(meterProvider)
semConvMetricRegistry, err := metrics.NewSemConvMetricRegistry(context.Background(), &cfg)
require.NoError(t, err)
require.NotNil(t, semConvMetricRegistry)
req := httptest.NewRequest(http.MethodGet, "http://www.test.com/search?q=Opentelemetry", nil)
rw := httptest.NewRecorder()
req.RemoteAddr = "10.0.0.1:1234"
req.Header.Set("User-Agent", "entrypoint-test")
req.Header.Set("X-Forwarded-Proto", "http")
next := http.HandlerFunc(func(rw http.ResponseWriter, _ *http.Request) {
rw.WriteHeader(test.statusCode)
})
handler := newEntryPoint(context.Background(), nil, semConvMetricRegistry, "test", next)
handler.ServeHTTP(rw, req)
got := metricdata.ResourceMetrics{}
err = rdr.Collect(context.Background(), &got)
require.NoError(t, err)
require.Len(t, got.ScopeMetrics, 1)
expected := metricdata.Metrics{
Name: "http.server.request.duration",
Description: "Duration of HTTP server requests.",
Unit: "s",
Data: metricdata.Histogram[float64]{
DataPoints: []metricdata.HistogramDataPoint[float64]{
{
Attributes: test.wantAttributes,
Count: 1,
Bounds: []float64{0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10},
BucketCounts: []uint64{0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
Min: metricdata.NewExtrema[float64](1),
Max: metricdata.NewExtrema[float64](1),
Sum: 1,
},
},
Temporality: metricdata.CumulativeTemporality,
},
}
metricdatatest.AssertEqual[metricdata.Metrics](t, expected, got.ScopeMetrics[0].Metrics[0], metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
})
}
}
func TestEntryPointMiddleware_tracingInfoIntoLog(t *testing.T) {
req := httptest.NewRequest(http.MethodGet, "http://www.test.com/", http.NoBody)
req = req.WithContext(
@ -197,7 +94,7 @@ func TestEntryPointMiddleware_tracingInfoIntoLog(t *testing.T) {
tracer := &mockTracer{}
handler := newEntryPoint(context.Background(), tracing.NewTracer(tracer, []string{}, []string{}, []string{}), nil, "test", next)
handler := newEntryPoint(context.Background(), tracing.NewTracer(tracer, []string{}, []string{}, []string{}), "test", next)
handler.ServeHTTP(httptest.NewRecorder(), req)
expectedSpanCtx := tracer.spans[0].SpanContext()

View file

@ -8,6 +8,11 @@ import (
"go.opentelemetry.io/otel/trace"
)
type contextKey int
// DisableMetricsKey is a context key used to disable the metrics.
const DisableMetricsKey contextKey = iota
// SetStatusErrorf flags the span as in error and log an event.
func SetStatusErrorf(ctx context.Context, format string, args ...interface{}) {
if span := trace.SpanFromContext(ctx); span != nil {

View file

@ -0,0 +1,81 @@
package observability
import (
"context"
"fmt"
"net/http"
"strconv"
"strings"
"time"
"github.com/containous/alice"
"github.com/rs/zerolog/log"
"github.com/traefik/traefik/v3/pkg/logs"
"github.com/traefik/traefik/v3/pkg/metrics"
"github.com/traefik/traefik/v3/pkg/middlewares"
"github.com/traefik/traefik/v3/pkg/middlewares/capture"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
)
const (
semConvServerMetricsTypeName = "SemConvServerMetrics"
)
type semConvServerMetrics struct {
next http.Handler
semConvMetricRegistry *metrics.SemConvMetricsRegistry
}
// SemConvServerMetricsHandler return the alice.Constructor for semantic conventions servers metrics.
func SemConvServerMetricsHandler(ctx context.Context, semConvMetricRegistry *metrics.SemConvMetricsRegistry) alice.Constructor {
return func(next http.Handler) (http.Handler, error) {
return newServerMetricsSemConv(ctx, semConvMetricRegistry, next), nil
}
}
// newServerMetricsSemConv creates a new semConv server metrics middleware for incoming requests.
func newServerMetricsSemConv(ctx context.Context, semConvMetricRegistry *metrics.SemConvMetricsRegistry, next http.Handler) http.Handler {
middlewares.GetLogger(ctx, "tracing", semConvServerMetricsTypeName).Debug().Msg("Creating middleware")
return &semConvServerMetrics{
semConvMetricRegistry: semConvMetricRegistry,
next: next,
}
}
func (e *semConvServerMetrics) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
if e.semConvMetricRegistry == nil || e.semConvMetricRegistry.HTTPServerRequestDuration() == nil {
e.next.ServeHTTP(rw, req)
return
}
start := time.Now()
e.next.ServeHTTP(rw, req)
end := time.Now()
ctx := req.Context()
capt, err := capture.FromContext(ctx)
if err != nil {
log.Ctx(ctx).Error().Err(err).Str(logs.MiddlewareType, semConvServerMetricsTypeName).Msg("Could not get Capture")
return
}
var attrs []attribute.KeyValue
if capt.StatusCode() < 100 || capt.StatusCode() >= 600 {
attrs = append(attrs, attribute.Key("error.type").String(fmt.Sprintf("Invalid HTTP status code ; %d", capt.StatusCode())))
} else if capt.StatusCode() >= 400 {
attrs = append(attrs, attribute.Key("error.type").String(strconv.Itoa(capt.StatusCode())))
}
attrs = append(attrs, semconv.HTTPRequestMethodKey.String(req.Method))
attrs = append(attrs, semconv.HTTPResponseStatusCode(capt.StatusCode()))
attrs = append(attrs, semconv.NetworkProtocolName(strings.ToLower(req.Proto)))
attrs = append(attrs, semconv.NetworkProtocolVersion(Proto(req.Proto)))
attrs = append(attrs, semconv.ServerAddress(req.Host))
attrs = append(attrs, semconv.URLScheme(req.Header.Get("X-Forwarded-Proto")))
e.semConvMetricRegistry.HTTPServerRequestDuration().Record(req.Context(), end.Sub(start).Seconds(), metric.WithAttributes(attrs...))
}

View file

@ -0,0 +1,118 @@
package observability
import (
"context"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/stretchr/testify/require"
ptypes "github.com/traefik/paerser/types"
"github.com/traefik/traefik/v3/pkg/metrics"
"github.com/traefik/traefik/v3/pkg/middlewares/capture"
"github.com/traefik/traefik/v3/pkg/types"
"go.opentelemetry.io/otel/attribute"
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
)
func TestSemConvServerMetrics(t *testing.T) {
tests := []struct {
desc string
statusCode int
wantAttributes attribute.Set
}{
{
desc: "not found status",
statusCode: http.StatusNotFound,
wantAttributes: attribute.NewSet(
attribute.Key("error.type").String("404"),
attribute.Key("http.request.method").String("GET"),
attribute.Key("http.response.status_code").Int(404),
attribute.Key("network.protocol.name").String("http/1.1"),
attribute.Key("network.protocol.version").String("1.1"),
attribute.Key("server.address").String("www.test.com"),
attribute.Key("url.scheme").String("http"),
),
},
{
desc: "created status",
statusCode: http.StatusCreated,
wantAttributes: attribute.NewSet(
attribute.Key("http.request.method").String("GET"),
attribute.Key("http.response.status_code").Int(201),
attribute.Key("network.protocol.name").String("http/1.1"),
attribute.Key("network.protocol.version").String("1.1"),
attribute.Key("server.address").String("www.test.com"),
attribute.Key("url.scheme").String("http"),
),
},
}
for _, test := range tests {
t.Run(test.desc, func(t *testing.T) {
t.Parallel()
var cfg types.OTLP
(&cfg).SetDefaults()
cfg.AddRoutersLabels = true
cfg.PushInterval = ptypes.Duration(10 * time.Millisecond)
rdr := sdkmetric.NewManualReader()
meterProvider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(rdr))
// force the meter provider with manual reader to collect metrics for the test.
metrics.SetMeterProvider(meterProvider)
semConvMetricRegistry, err := metrics.NewSemConvMetricRegistry(context.Background(), &cfg)
require.NoError(t, err)
require.NotNil(t, semConvMetricRegistry)
req := httptest.NewRequest(http.MethodGet, "http://www.test.com/search?q=Opentelemetry", nil)
rw := httptest.NewRecorder()
req.RemoteAddr = "10.0.0.1:1234"
req.Header.Set("User-Agent", "entrypoint-test")
req.Header.Set("X-Forwarded-Proto", "http")
next := http.HandlerFunc(func(rw http.ResponseWriter, _ *http.Request) {
rw.WriteHeader(test.statusCode)
})
handler := newServerMetricsSemConv(context.Background(), semConvMetricRegistry, next)
handler, err = capture.Wrap(handler)
require.NoError(t, err)
handler.ServeHTTP(rw, req)
got := metricdata.ResourceMetrics{}
err = rdr.Collect(context.Background(), &got)
require.NoError(t, err)
require.Len(t, got.ScopeMetrics, 1)
expected := metricdata.Metrics{
Name: "http.server.request.duration",
Description: "Duration of HTTP server requests.",
Unit: "s",
Data: metricdata.Histogram[float64]{
DataPoints: []metricdata.HistogramDataPoint[float64]{
{
Attributes: test.wantAttributes,
Count: 1,
Bounds: []float64{0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10},
BucketCounts: []uint64{0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
Min: metricdata.NewExtrema[float64](1),
Max: metricdata.NewExtrema[float64](1),
Sum: 1,
},
},
Temporality: metricdata.CumulativeTemporality,
},
}
metricdatatest.AssertEqual[metricdata.Metrics](t, expected, got.ScopeMetrics[0].Metrics[0], metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
})
}
}