Migrate to opentelemetry
This commit is contained in:
parent
45bb00be04
commit
4ddef9830b
89 changed files with 2113 additions and 3898 deletions
|
@ -8,20 +8,20 @@ import (
|
|||
"github.com/traefik/traefik/v3/pkg/metrics"
|
||||
"github.com/traefik/traefik/v3/pkg/middlewares/accesslog"
|
||||
"github.com/traefik/traefik/v3/pkg/middlewares/capture"
|
||||
metricsmiddleware "github.com/traefik/traefik/v3/pkg/middlewares/metrics"
|
||||
mTracing "github.com/traefik/traefik/v3/pkg/middlewares/tracing"
|
||||
"github.com/traefik/traefik/v3/pkg/tracing"
|
||||
metricsMiddle "github.com/traefik/traefik/v3/pkg/middlewares/metrics"
|
||||
tracingMiddle "github.com/traefik/traefik/v3/pkg/middlewares/tracing"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
// ChainBuilder Creates a middleware chain by entry point. It is used for middlewares that are created almost systematically and that need to be created before all others.
|
||||
type ChainBuilder struct {
|
||||
metricsRegistry metrics.Registry
|
||||
accessLoggerMiddleware *accesslog.Handler
|
||||
tracer *tracing.Tracing
|
||||
tracer trace.Tracer
|
||||
}
|
||||
|
||||
// NewChainBuilder Creates a new ChainBuilder.
|
||||
func NewChainBuilder(metricsRegistry metrics.Registry, accessLoggerMiddleware *accesslog.Handler, tracer *tracing.Tracing) *ChainBuilder {
|
||||
func NewChainBuilder(metricsRegistry metrics.Registry, accessLoggerMiddleware *accesslog.Handler, tracer trace.Tracer) *ChainBuilder {
|
||||
return &ChainBuilder{
|
||||
metricsRegistry: metricsRegistry,
|
||||
accessLoggerMiddleware: accessLoggerMiddleware,
|
||||
|
@ -42,11 +42,12 @@ func (c *ChainBuilder) Build(ctx context.Context, entryPointName string) alice.C
|
|||
}
|
||||
|
||||
if c.tracer != nil {
|
||||
chain = chain.Append(mTracing.WrapEntryPointHandler(ctx, c.tracer, entryPointName))
|
||||
chain = chain.Append(tracingMiddle.WrapEntryPointHandler(ctx, c.tracer, entryPointName))
|
||||
}
|
||||
|
||||
if c.metricsRegistry != nil && c.metricsRegistry.IsEpEnabled() {
|
||||
chain = chain.Append(metricsmiddleware.WrapEntryPointHandler(ctx, c.metricsRegistry, entryPointName))
|
||||
metricsHandler := metricsMiddle.WrapEntryPointHandler(ctx, c.metricsRegistry, entryPointName)
|
||||
chain = chain.Append(tracingMiddle.WrapMiddleware(ctx, metricsHandler))
|
||||
}
|
||||
|
||||
return chain
|
||||
|
@ -59,8 +60,4 @@ func (c *ChainBuilder) Close() {
|
|||
log.Error().Err(err).Msg("Could not close the access log file")
|
||||
}
|
||||
}
|
||||
|
||||
if c.tracer != nil {
|
||||
c.tracer.Close()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -372,7 +372,7 @@ func (b *Builder) buildConstructor(ctx context.Context, middlewareName string) (
|
|||
return nil, fmt.Errorf("invalid middleware %q configuration: invalid middleware type or middleware does not exist", middlewareName)
|
||||
}
|
||||
|
||||
return tracing.Wrap(ctx, middleware), nil
|
||||
return tracing.WrapMiddleware(ctx, middleware), nil
|
||||
}
|
||||
|
||||
func inSlice(element string, stack []string) bool {
|
||||
|
|
|
@ -5,12 +5,13 @@ import (
|
|||
"errors"
|
||||
"net/http"
|
||||
|
||||
"github.com/opentracing/opentracing-go/ext"
|
||||
"github.com/traefik/traefik/v3/pkg/config/dynamic"
|
||||
"github.com/traefik/traefik/v3/pkg/plugins"
|
||||
"github.com/traefik/traefik/v3/pkg/tracing"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
const typeName = "Plugin"
|
||||
|
||||
// PluginsBuilder the plugin's builder interface.
|
||||
type PluginsBuilder interface {
|
||||
Build(pName string, config map[string]interface{}, middlewareName string) (plugins.Constructor, error)
|
||||
|
@ -54,6 +55,6 @@ func (s *traceablePlugin) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
|
|||
s.h.ServeHTTP(rw, req)
|
||||
}
|
||||
|
||||
func (s *traceablePlugin) GetTracingInformation() (string, ext.SpanKindEnum) {
|
||||
return s.name, tracing.SpanKindNoneEnum
|
||||
func (s *traceablePlugin) GetTracingInformation() (string, string, trace.SpanKind) {
|
||||
return s.name, typeName, trace.SpanKindInternal
|
||||
}
|
||||
|
|
|
@ -198,21 +198,20 @@ func (m *Manager) buildHTTPHandler(ctx context.Context, router *runtime.RouterIn
|
|||
|
||||
mHandler := m.middlewaresBuilder.BuildChain(ctx, router.Middlewares)
|
||||
|
||||
tHandler := func(next http.Handler) (http.Handler, error) {
|
||||
return tracing.NewForwarder(ctx, routerName, router.Service, next), nil
|
||||
}
|
||||
|
||||
chain := alice.New()
|
||||
|
||||
chain = chain.Append(tracing.WrapRouterHandler(ctx, routerName, router.Rule, provider.GetQualifiedName(ctx, router.Service)))
|
||||
|
||||
if m.metricsRegistry != nil && m.metricsRegistry.IsRouterEnabled() {
|
||||
chain = chain.Append(metricsMiddle.WrapRouterHandler(ctx, m.metricsRegistry, routerName, provider.GetQualifiedName(ctx, router.Service)))
|
||||
metricsHandler := metricsMiddle.WrapRouterHandler(ctx, m.metricsRegistry, routerName, provider.GetQualifiedName(ctx, router.Service))
|
||||
chain = chain.Append(tracing.WrapMiddleware(ctx, metricsHandler))
|
||||
}
|
||||
|
||||
if router.DefaultRule {
|
||||
chain = chain.Append(denyrouterrecursion.WrapHandler(routerName))
|
||||
}
|
||||
|
||||
return chain.Extend(*mHandler).Append(tHandler).Then(sHandler)
|
||||
return chain.Extend(*mHandler).Then(sHandler)
|
||||
}
|
||||
|
||||
// BuildDefaultHTTPRouter creates a default HTTP router.
|
||||
|
|
|
@ -3,6 +3,7 @@ package server
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"os"
|
||||
"os/signal"
|
||||
"time"
|
||||
|
@ -27,12 +28,12 @@ type Server struct {
|
|||
stopChan chan bool
|
||||
|
||||
routinesPool *safe.Pool
|
||||
|
||||
tracerCloser io.Closer
|
||||
}
|
||||
|
||||
// NewServer returns an initialized Server.
|
||||
func NewServer(routinesPool *safe.Pool, entryPoints TCPEntryPoints, entryPointsUDP UDPEntryPoints, watcher *ConfigurationWatcher,
|
||||
chainBuilder *middleware.ChainBuilder, accessLoggerMiddleware *accesslog.Handler,
|
||||
) *Server {
|
||||
func NewServer(routinesPool *safe.Pool, entryPoints TCPEntryPoints, entryPointsUDP UDPEntryPoints, watcher *ConfigurationWatcher, chainBuilder *middleware.ChainBuilder, accessLoggerMiddleware *accesslog.Handler, tracerCloser io.Closer) *Server {
|
||||
srv := &Server{
|
||||
watcher: watcher,
|
||||
tcpEntryPoints: entryPoints,
|
||||
|
@ -42,6 +43,7 @@ func NewServer(routinesPool *safe.Pool, entryPoints TCPEntryPoints, entryPointsU
|
|||
stopChan: make(chan bool, 1),
|
||||
routinesPool: routinesPool,
|
||||
udpEntryPoints: entryPointsUDP,
|
||||
tracerCloser: tracerCloser,
|
||||
}
|
||||
|
||||
srv.configureSignals()
|
||||
|
@ -105,6 +107,12 @@ func (s *Server) Close() {
|
|||
|
||||
s.chainBuilder.Close()
|
||||
|
||||
if s.tracerCloser != nil {
|
||||
if err := s.tracerCloser.Close(); err != nil {
|
||||
log.Error().Err(err).Msg("Could not close the tracer")
|
||||
}
|
||||
}
|
||||
|
||||
cancel()
|
||||
}
|
||||
|
||||
|
|
|
@ -22,9 +22,13 @@ const StatusClientClosedRequest = 499
|
|||
const StatusClientClosedRequestText = "Client Closed Request"
|
||||
|
||||
func buildSingleHostProxy(target *url.URL, passHostHeader bool, flushInterval time.Duration, roundTripper http.RoundTripper, bufferPool httputil.BufferPool) http.Handler {
|
||||
// Wrapping the roundTripper with the Tracing roundTripper,
|
||||
// to handle the reverseProxy client span creation.
|
||||
tracingRoundTripper := newTracingRoundTripper(roundTripper)
|
||||
|
||||
return &httputil.ReverseProxy{
|
||||
Director: directorBuilder(target, passHostHeader),
|
||||
Transport: roundTripper,
|
||||
Transport: tracingRoundTripper,
|
||||
FlushInterval: flushInterval,
|
||||
BufferPool: bufferPool,
|
||||
ErrorHandler: errorHandler,
|
||||
|
@ -94,23 +98,7 @@ func isWebSocketUpgrade(req *http.Request) bool {
|
|||
}
|
||||
|
||||
func errorHandler(w http.ResponseWriter, req *http.Request, err error) {
|
||||
statusCode := http.StatusInternalServerError
|
||||
|
||||
switch {
|
||||
case errors.Is(err, io.EOF):
|
||||
statusCode = http.StatusBadGateway
|
||||
case errors.Is(err, context.Canceled):
|
||||
statusCode = StatusClientClosedRequest
|
||||
default:
|
||||
var netErr net.Error
|
||||
if errors.As(err, &netErr) {
|
||||
if netErr.Timeout() {
|
||||
statusCode = http.StatusGatewayTimeout
|
||||
} else {
|
||||
statusCode = http.StatusBadGateway
|
||||
}
|
||||
}
|
||||
}
|
||||
statusCode := computeStatusCode(err)
|
||||
|
||||
logger := log.Ctx(req.Context())
|
||||
logger.Debug().Err(err).Msgf("%d %s", statusCode, statusText(statusCode))
|
||||
|
@ -121,6 +109,26 @@ func errorHandler(w http.ResponseWriter, req *http.Request, err error) {
|
|||
}
|
||||
}
|
||||
|
||||
func computeStatusCode(err error) int {
|
||||
switch {
|
||||
case errors.Is(err, io.EOF):
|
||||
return http.StatusBadGateway
|
||||
case errors.Is(err, context.Canceled):
|
||||
return StatusClientClosedRequest
|
||||
default:
|
||||
var netErr net.Error
|
||||
if errors.As(err, &netErr) {
|
||||
if netErr.Timeout() {
|
||||
return http.StatusGatewayTimeout
|
||||
}
|
||||
|
||||
return http.StatusBadGateway
|
||||
}
|
||||
}
|
||||
|
||||
return http.StatusInternalServerError
|
||||
}
|
||||
|
||||
func statusText(statusCode int) string {
|
||||
if statusCode == StatusClientClosedRequest {
|
||||
return StatusClientClosedRequestText
|
||||
|
|
|
@ -14,6 +14,7 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/containous/alice"
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/traefik/traefik/v3/pkg/config/dynamic"
|
||||
"github.com/traefik/traefik/v3/pkg/config/runtime"
|
||||
|
@ -22,6 +23,7 @@ import (
|
|||
"github.com/traefik/traefik/v3/pkg/metrics"
|
||||
"github.com/traefik/traefik/v3/pkg/middlewares/accesslog"
|
||||
metricsMiddle "github.com/traefik/traefik/v3/pkg/middlewares/metrics"
|
||||
tracingMiddle "github.com/traefik/traefik/v3/pkg/middlewares/tracing"
|
||||
"github.com/traefik/traefik/v3/pkg/safe"
|
||||
"github.com/traefik/traefik/v3/pkg/server/cookie"
|
||||
"github.com/traefik/traefik/v3/pkg/server/provider"
|
||||
|
@ -305,9 +307,18 @@ func (m *Manager) getLoadBalancerServiceHandler(ctx context.Context, serviceName
|
|||
proxy = accesslog.NewFieldHandler(proxy, accesslog.ServiceName, serviceName, accesslog.AddServiceFields)
|
||||
|
||||
if m.metricsRegistry != nil && m.metricsRegistry.IsSvcEnabled() {
|
||||
proxy = metricsMiddle.NewServiceMiddleware(ctx, proxy, m.metricsRegistry, serviceName)
|
||||
metricsHandler := metricsMiddle.WrapServiceHandler(ctx, m.metricsRegistry, serviceName)
|
||||
|
||||
proxy, err = alice.New().
|
||||
Append(tracingMiddle.WrapMiddleware(ctx, metricsHandler)).
|
||||
Then(proxy)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error wrapping metrics handler: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
proxy = tracingMiddle.NewService(ctx, serviceName, proxy)
|
||||
|
||||
lb.Add(proxyName, proxy, nil)
|
||||
|
||||
// servers are considered UP by default.
|
||||
|
|
42
pkg/server/service/tracing_roundtripper.go
Normal file
42
pkg/server/service/tracing_roundtripper.go
Normal file
|
@ -0,0 +1,42 @@
|
|||
package service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
|
||||
"github.com/traefik/traefik/v3/pkg/tracing"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
)
|
||||
|
||||
type wrapper struct {
|
||||
rt http.RoundTripper
|
||||
}
|
||||
|
||||
func (t *wrapper) RoundTrip(req *http.Request) (*http.Response, error) {
|
||||
var span trace.Span
|
||||
if tracer := tracing.TracerFromContext(req.Context()); tracer != nil {
|
||||
var tracingCtx context.Context
|
||||
tracingCtx, span = tracer.Start(req.Context(), "ReverseProxy", trace.WithSpanKind(trace.SpanKindClient))
|
||||
defer span.End()
|
||||
|
||||
req = req.WithContext(tracingCtx)
|
||||
|
||||
tracing.LogClientRequest(span, req)
|
||||
tracing.InjectContextIntoCarrier(req)
|
||||
}
|
||||
|
||||
response, err := t.rt.RoundTrip(req)
|
||||
if err != nil {
|
||||
statusCode := computeStatusCode(err)
|
||||
tracing.LogResponseCode(span, statusCode, trace.SpanKindClient)
|
||||
return response, err
|
||||
}
|
||||
|
||||
tracing.LogResponseCode(span, response.StatusCode, trace.SpanKindClient)
|
||||
|
||||
return response, nil
|
||||
}
|
||||
|
||||
func newTracingRoundTripper(rt http.RoundTripper) http.RoundTripper {
|
||||
return &wrapper{rt: rt}
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue