Semconv OTLP stable HTTP metrics
This commit is contained in:
parent
709ff6fb09
commit
6c9687f410
44 changed files with 803 additions and 432 deletions
105
pkg/server/service/observability_roundtripper.go
Normal file
105
pkg/server/service/observability_roundtripper.go
Normal file
|
@ -0,0 +1,105 @@
|
|||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/traefik/traefik/v3/pkg/metrics"
|
||||
"github.com/traefik/traefik/v3/pkg/middlewares/observability"
|
||||
"github.com/traefik/traefik/v3/pkg/tracing"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/metric"
|
||||
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
type wrapper struct {
|
||||
semConvMetricRegistry *metrics.SemConvMetricsRegistry
|
||||
rt http.RoundTripper
|
||||
}
|
||||
|
||||
func (t *wrapper) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
start := time.Now()
|
||||
var span trace.Span
|
||||
var tracingCtx context.Context
|
||||
var tracer *tracing.Tracer
|
||||
if tracer = tracing.TracerFromContext(req.Context()); tracer != nil {
|
||||
tracingCtx, span = tracer.Start(req.Context(), "ReverseProxy", trace.WithSpanKind(trace.SpanKindClient))
|
||||
defer span.End()
|
||||
|
||||
req = req.WithContext(tracingCtx)
|
||||
|
||||
tracer.CaptureClientRequest(span, req)
|
||||
tracing.InjectContextIntoCarrier(req)
|
||||
}
|
||||
|
||||
var statusCode int
|
||||
var headers http.Header
|
||||
response, err := t.rt.RoundTrip(req)
|
||||
if err != nil {
|
||||
statusCode = computeStatusCode(err)
|
||||
}
|
||||
if response != nil {
|
||||
statusCode = response.StatusCode
|
||||
headers = response.Header
|
||||
}
|
||||
|
||||
if tracer != nil {
|
||||
tracer.CaptureResponse(span, headers, statusCode, trace.SpanKindClient)
|
||||
}
|
||||
|
||||
end := time.Now()
|
||||
|
||||
// Ending the span as soon as the response is handled because we want to use the same end time for the trace and the metric.
|
||||
// If any errors happen earlier, this span will be close by the defer instruction.
|
||||
if span != nil {
|
||||
span.End(trace.WithTimestamp(end))
|
||||
}
|
||||
|
||||
if t.semConvMetricRegistry != nil && t.semConvMetricRegistry.HTTPClientRequestDuration() != nil {
|
||||
var attrs []attribute.KeyValue
|
||||
|
||||
if statusCode < 100 || statusCode >= 600 {
|
||||
attrs = append(attrs, attribute.Key("error.type").String(fmt.Sprintf("Invalid HTTP status code %d", statusCode)))
|
||||
} else if statusCode >= 400 {
|
||||
attrs = append(attrs, attribute.Key("error.type").String(strconv.Itoa(statusCode)))
|
||||
}
|
||||
|
||||
attrs = append(attrs, semconv.HTTPRequestMethodKey.String(req.Method))
|
||||
attrs = append(attrs, semconv.HTTPResponseStatusCode(statusCode))
|
||||
attrs = append(attrs, semconv.NetworkProtocolName(strings.ToLower(req.Proto)))
|
||||
attrs = append(attrs, semconv.NetworkProtocolVersion(observability.Proto(req.Proto)))
|
||||
attrs = append(attrs, semconv.ServerAddress(req.URL.Host))
|
||||
|
||||
_, port, err := net.SplitHostPort(req.URL.Host)
|
||||
if err != nil {
|
||||
switch req.URL.Scheme {
|
||||
case "http":
|
||||
attrs = append(attrs, semconv.ServerPort(80))
|
||||
case "https":
|
||||
attrs = append(attrs, semconv.ServerPort(443))
|
||||
}
|
||||
} else {
|
||||
intPort, _ := strconv.Atoi(port)
|
||||
attrs = append(attrs, semconv.ServerPort(intPort))
|
||||
}
|
||||
|
||||
attrs = append(attrs, semconv.URLScheme(req.Header.Get("X-Forwarded-Proto")))
|
||||
|
||||
t.semConvMetricRegistry.HTTPClientRequestDuration().Record(req.Context(), end.Sub(start).Seconds(), metric.WithAttributes(attrs...))
|
||||
}
|
||||
|
||||
return response, err
|
||||
}
|
||||
|
||||
func newObservabilityRoundTripper(semConvMetricRegistry *metrics.SemConvMetricsRegistry, rt http.RoundTripper) http.RoundTripper {
|
||||
return &wrapper{
|
||||
semConvMetricRegistry: semConvMetricRegistry,
|
||||
rt: rt,
|
||||
}
|
||||
}
|
123
pkg/server/service/observability_roundtripper_test.go
Normal file
123
pkg/server/service/observability_roundtripper_test.go
Normal file
|
@ -0,0 +1,123 @@
|
|||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
ptypes "github.com/traefik/paerser/types"
|
||||
"github.com/traefik/traefik/v3/pkg/metrics"
|
||||
"github.com/traefik/traefik/v3/pkg/types"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
sdkmetric "go.opentelemetry.io/otel/sdk/metric"
|
||||
"go.opentelemetry.io/otel/sdk/metric/metricdata"
|
||||
"go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest"
|
||||
)
|
||||
|
||||
func TestObservabilityRoundTripper_metrics(t *testing.T) {
|
||||
tests := []struct {
|
||||
desc string
|
||||
serverURL string
|
||||
statusCode int
|
||||
wantAttributes attribute.Set
|
||||
}{
|
||||
{
|
||||
desc: "not found status",
|
||||
serverURL: "http://www.test.com",
|
||||
statusCode: http.StatusNotFound,
|
||||
wantAttributes: attribute.NewSet(
|
||||
attribute.Key("error.type").String("404"),
|
||||
attribute.Key("http.request.method").String("GET"),
|
||||
attribute.Key("http.response.status_code").Int(404),
|
||||
attribute.Key("network.protocol.name").String("http/1.1"),
|
||||
attribute.Key("network.protocol.version").String("1.1"),
|
||||
attribute.Key("server.address").String("www.test.com"),
|
||||
attribute.Key("server.port").Int(80),
|
||||
attribute.Key("url.scheme").String("http"),
|
||||
),
|
||||
},
|
||||
{
|
||||
desc: "created status",
|
||||
serverURL: "https://www.test.com",
|
||||
statusCode: http.StatusCreated,
|
||||
wantAttributes: attribute.NewSet(
|
||||
attribute.Key("http.request.method").String("GET"),
|
||||
attribute.Key("http.response.status_code").Int(201),
|
||||
attribute.Key("network.protocol.name").String("http/1.1"),
|
||||
attribute.Key("network.protocol.version").String("1.1"),
|
||||
attribute.Key("server.address").String("www.test.com"),
|
||||
attribute.Key("server.port").Int(443),
|
||||
attribute.Key("url.scheme").String("http"),
|
||||
),
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
test := test
|
||||
t.Run(test.desc, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var cfg types.OTLP
|
||||
(&cfg).SetDefaults()
|
||||
cfg.AddRoutersLabels = true
|
||||
cfg.PushInterval = ptypes.Duration(10 * time.Millisecond)
|
||||
rdr := sdkmetric.NewManualReader()
|
||||
|
||||
meterProvider := sdkmetric.NewMeterProvider(sdkmetric.WithReader(rdr))
|
||||
// force the meter provider with manual reader to collect metrics for the test.
|
||||
metrics.SetMeterProvider(meterProvider)
|
||||
|
||||
semConvMetricRegistry, err := metrics.NewSemConvMetricRegistry(context.Background(), &cfg)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, semConvMetricRegistry)
|
||||
|
||||
req := httptest.NewRequest(http.MethodGet, test.serverURL+"/search?q=Opentelemetry", nil)
|
||||
req.RemoteAddr = "10.0.0.1:1234"
|
||||
req.Header.Set("User-Agent", "rt-test")
|
||||
req.Header.Set("X-Forwarded-Proto", "http")
|
||||
|
||||
ort := newObservabilityRoundTripper(semConvMetricRegistry, mockRoundTripper{statusCode: test.statusCode})
|
||||
_, err = ort.RoundTrip(req)
|
||||
require.NoError(t, err)
|
||||
|
||||
got := metricdata.ResourceMetrics{}
|
||||
err = rdr.Collect(context.Background(), &got)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Len(t, got.ScopeMetrics, 1)
|
||||
|
||||
expected := metricdata.Metrics{
|
||||
Name: "http.client.request.duration",
|
||||
Description: "Duration of HTTP client requests.",
|
||||
Unit: "s",
|
||||
Data: metricdata.Histogram[float64]{
|
||||
DataPoints: []metricdata.HistogramDataPoint[float64]{
|
||||
{
|
||||
Attributes: test.wantAttributes,
|
||||
Count: 1,
|
||||
Bounds: []float64{0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1, 2.5, 5, 7.5, 10},
|
||||
BucketCounts: []uint64{0x1, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0},
|
||||
Min: metricdata.NewExtrema[float64](1),
|
||||
Max: metricdata.NewExtrema[float64](1),
|
||||
Sum: 1,
|
||||
},
|
||||
},
|
||||
Temporality: metricdata.CumulativeTemporality,
|
||||
},
|
||||
}
|
||||
|
||||
metricdatatest.AssertEqual[metricdata.Metrics](t, expected, got.ScopeMetrics[0].Metrics[0], metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type mockRoundTripper struct {
|
||||
statusCode int
|
||||
}
|
||||
|
||||
func (m mockRoundTripper) RoundTrip(request *http.Request) (*http.Response, error) {
|
||||
return &http.Response{StatusCode: m.statusCode}, nil
|
||||
}
|
|
@ -22,13 +22,9 @@ const StatusClientClosedRequest = 499
|
|||
const StatusClientClosedRequestText = "Client Closed Request"
|
||||
|
||||
func buildSingleHostProxy(target *url.URL, passHostHeader bool, flushInterval time.Duration, roundTripper http.RoundTripper, bufferPool httputil.BufferPool) http.Handler {
|
||||
// Wrapping the roundTripper with the Tracing roundTripper,
|
||||
// to handle the reverseProxy client span creation.
|
||||
tracingRoundTripper := newTracingRoundTripper(roundTripper)
|
||||
|
||||
return &httputil.ReverseProxy{
|
||||
Director: directorBuilder(target, passHostHeader),
|
||||
Transport: tracingRoundTripper,
|
||||
Transport: roundTripper,
|
||||
FlushInterval: flushInterval,
|
||||
BufferPool: bufferPool,
|
||||
ErrorHandler: errorHandler,
|
||||
|
|
|
@ -22,7 +22,7 @@ import (
|
|||
"github.com/traefik/traefik/v3/pkg/logs"
|
||||
"github.com/traefik/traefik/v3/pkg/middlewares/accesslog"
|
||||
metricsMiddle "github.com/traefik/traefik/v3/pkg/middlewares/metrics"
|
||||
tracingMiddle "github.com/traefik/traefik/v3/pkg/middlewares/tracing"
|
||||
"github.com/traefik/traefik/v3/pkg/middlewares/observability"
|
||||
"github.com/traefik/traefik/v3/pkg/safe"
|
||||
"github.com/traefik/traefik/v3/pkg/server/cookie"
|
||||
"github.com/traefik/traefik/v3/pkg/server/middleware"
|
||||
|
@ -300,30 +300,38 @@ func (m *Manager) getLoadBalancerServiceHandler(ctx context.Context, serviceName
|
|||
logger.Debug().Str(logs.ServerName, proxyName).Stringer("target", target).
|
||||
Msg("Creating server")
|
||||
|
||||
qualifiedSvcName := provider.GetQualifiedName(ctx, serviceName)
|
||||
|
||||
if m.observabilityMgr.ShouldAddTracing(qualifiedSvcName) || m.observabilityMgr.ShouldAddMetrics(qualifiedSvcName) {
|
||||
// Wrapping the roundTripper with the Tracing roundTripper,
|
||||
// to handle the reverseProxy client span creation.
|
||||
roundTripper = newObservabilityRoundTripper(m.observabilityMgr.SemConvMetricsRegistry(), roundTripper)
|
||||
}
|
||||
|
||||
proxy := buildSingleHostProxy(target, passHostHeader, time.Duration(flushInterval), roundTripper, m.bufferPool)
|
||||
|
||||
// Prevents from enabling observability for internal resources.
|
||||
|
||||
if m.observabilityMgr.ShouldAddAccessLogs(provider.GetQualifiedName(ctx, serviceName)) {
|
||||
if m.observabilityMgr.ShouldAddAccessLogs(qualifiedSvcName) {
|
||||
proxy = accesslog.NewFieldHandler(proxy, accesslog.ServiceURL, target.String(), nil)
|
||||
proxy = accesslog.NewFieldHandler(proxy, accesslog.ServiceAddr, target.Host, nil)
|
||||
proxy = accesslog.NewFieldHandler(proxy, accesslog.ServiceName, serviceName, accesslog.AddServiceFields)
|
||||
}
|
||||
|
||||
if m.observabilityMgr.MetricsRegistry() != nil && m.observabilityMgr.MetricsRegistry().IsSvcEnabled() &&
|
||||
m.observabilityMgr.ShouldAddMetrics(provider.GetQualifiedName(ctx, serviceName)) {
|
||||
m.observabilityMgr.ShouldAddMetrics(qualifiedSvcName) {
|
||||
metricsHandler := metricsMiddle.WrapServiceHandler(ctx, m.observabilityMgr.MetricsRegistry(), serviceName)
|
||||
|
||||
proxy, err = alice.New().
|
||||
Append(tracingMiddle.WrapMiddleware(ctx, metricsHandler)).
|
||||
Append(observability.WrapMiddleware(ctx, metricsHandler)).
|
||||
Then(proxy)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error wrapping metrics handler: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
if m.observabilityMgr.ShouldAddTracing(provider.GetQualifiedName(ctx, serviceName)) {
|
||||
proxy = tracingMiddle.NewService(ctx, serviceName, proxy)
|
||||
if m.observabilityMgr.ShouldAddTracing(qualifiedSvcName) {
|
||||
proxy = observability.NewService(ctx, serviceName, proxy)
|
||||
}
|
||||
|
||||
lb.Add(proxyName, proxy, server.Weight)
|
||||
|
|
|
@ -1,44 +0,0 @@
|
|||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
|
||||
"github.com/traefik/traefik/v3/pkg/tracing"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
type wrapper struct {
|
||||
rt http.RoundTripper
|
||||
}
|
||||
|
||||
func (t *wrapper) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
var span trace.Span
|
||||
var tracer *tracing.Tracer
|
||||
if tracer = tracing.TracerFromContext(req.Context()); tracer != nil {
|
||||
var tracingCtx context.Context
|
||||
tracingCtx, span = tracer.Start(req.Context(), "ReverseProxy", trace.WithSpanKind(trace.SpanKindClient))
|
||||
defer span.End()
|
||||
|
||||
req = req.WithContext(tracingCtx)
|
||||
|
||||
tracer.CaptureClientRequest(span, req)
|
||||
tracing.InjectContextIntoCarrier(req)
|
||||
}
|
||||
|
||||
response, err := t.rt.RoundTrip(req)
|
||||
if err != nil {
|
||||
statusCode := computeStatusCode(err)
|
||||
tracer.CaptureResponse(span, nil, statusCode, trace.SpanKindClient)
|
||||
|
||||
return response, err
|
||||
}
|
||||
|
||||
tracer.CaptureResponse(span, response.Header, response.StatusCode, trace.SpanKindClient)
|
||||
|
||||
return response, nil
|
||||
}
|
||||
|
||||
func newTracingRoundTripper(rt http.RoundTripper) http.RoundTripper {
|
||||
return &wrapper{rt: rt}
|
||||
}
|
|
@ -1,138 +0,0 @@
|
|||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/traefik/traefik/v3/pkg/tracing"
|
||||
"go.opentelemetry.io/otel/attribute"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"go.opentelemetry.io/otel/trace/embedded"
|
||||
)
|
||||
|
||||
func TestTracingRoundTripper(t *testing.T) {
|
||||
type expected struct {
|
||||
name string
|
||||
attributes []attribute.KeyValue
|
||||
}
|
||||
|
||||
testCases := []struct {
|
||||
desc string
|
||||
expected []expected
|
||||
}{
|
||||
{
|
||||
desc: "basic test",
|
||||
expected: []expected{
|
||||
{
|
||||
name: "initial",
|
||||
attributes: []attribute.KeyValue{
|
||||
attribute.String("span.kind", "unspecified"),
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "ReverseProxy",
|
||||
attributes: []attribute.KeyValue{
|
||||
attribute.String("span.kind", "client"),
|
||||
attribute.String("http.request.method", "GET"),
|
||||
attribute.String("network.protocol.version", "1.1"),
|
||||
attribute.String("url.full", "http://www.test.com/search?q=Opentelemetry"),
|
||||
attribute.String("url.scheme", "http"),
|
||||
attribute.String("user_agent.original", "reverse-test"),
|
||||
attribute.String("network.peer.address", ""),
|
||||
attribute.String("server.address", "www.test.com"),
|
||||
attribute.String("network.peer.port", "80"),
|
||||
attribute.Int64("server.port", int64(80)),
|
||||
attribute.StringSlice("http.request.header.x-foo", []string{"foo", "bar"}),
|
||||
attribute.Int64("http.response.status_code", int64(404)),
|
||||
attribute.StringSlice("http.response.header.x-bar", []string{"foo", "bar"}),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range testCases {
|
||||
t.Run(test.desc, func(t *testing.T) {
|
||||
req := httptest.NewRequest(http.MethodGet, "http://www.test.com/search?q=Opentelemetry", nil)
|
||||
req.RemoteAddr = "10.0.0.1:1234"
|
||||
req.Header.Set("User-Agent", "reverse-test")
|
||||
req.Header.Set("X-Forwarded-Proto", "http")
|
||||
req.Header.Set("X-Foo", "foo")
|
||||
req.Header.Add("X-Foo", "bar")
|
||||
|
||||
mockTracer := &mockTracer{}
|
||||
tracer := tracing.NewTracer(mockTracer, []string{"X-Foo"}, []string{"X-Bar"})
|
||||
initialCtx, initialSpan := tracer.Start(req.Context(), "initial")
|
||||
defer initialSpan.End()
|
||||
req = req.WithContext(initialCtx)
|
||||
|
||||
tracingRoundTripper := newTracingRoundTripper(roundTripperFn(func(req *http.Request) (*http.Response, error) {
|
||||
return &http.Response{
|
||||
Header: map[string][]string{
|
||||
"X-Bar": {"foo", "bar"},
|
||||
},
|
||||
StatusCode: http.StatusNotFound,
|
||||
}, nil
|
||||
}))
|
||||
|
||||
_, err := tracingRoundTripper.RoundTrip(req)
|
||||
require.NoError(t, err)
|
||||
|
||||
for i, span := range mockTracer.spans {
|
||||
assert.Equal(t, test.expected[i].name, span.name)
|
||||
assert.Equal(t, test.expected[i].attributes, span.attributes)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type mockTracer struct {
|
||||
embedded.Tracer
|
||||
|
||||
spans []*mockSpan
|
||||
}
|
||||
|
||||
var _ trace.Tracer = &mockTracer{}
|
||||
|
||||
func (t *mockTracer) Start(ctx context.Context, name string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
|
||||
config := trace.NewSpanStartConfig(opts...)
|
||||
span := &mockSpan{}
|
||||
span.SetName(name)
|
||||
span.SetAttributes(attribute.String("span.kind", config.SpanKind().String()))
|
||||
span.SetAttributes(config.Attributes()...)
|
||||
t.spans = append(t.spans, span)
|
||||
return trace.ContextWithSpan(ctx, span), span
|
||||
}
|
||||
|
||||
// mockSpan is an implementation of Span that preforms no operations.
|
||||
type mockSpan struct {
|
||||
embedded.Span
|
||||
|
||||
name string
|
||||
attributes []attribute.KeyValue
|
||||
}
|
||||
|
||||
var _ trace.Span = &mockSpan{}
|
||||
|
||||
func (*mockSpan) SpanContext() trace.SpanContext {
|
||||
return trace.NewSpanContext(trace.SpanContextConfig{TraceID: trace.TraceID{1}, SpanID: trace.SpanID{1}})
|
||||
}
|
||||
func (*mockSpan) IsRecording() bool { return false }
|
||||
func (s *mockSpan) SetStatus(_ codes.Code, _ string) {}
|
||||
func (s *mockSpan) SetAttributes(kv ...attribute.KeyValue) {
|
||||
s.attributes = append(s.attributes, kv...)
|
||||
}
|
||||
func (s *mockSpan) End(...trace.SpanEndOption) {}
|
||||
func (s *mockSpan) RecordError(_ error, _ ...trace.EventOption) {}
|
||||
func (s *mockSpan) AddEvent(_ string, _ ...trace.EventOption) {}
|
||||
|
||||
func (s *mockSpan) SetName(name string) { s.name = name }
|
||||
|
||||
func (s *mockSpan) TracerProvider() trace.TracerProvider {
|
||||
return nil
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue