1
0
Fork 0

Remove observability for internal resources

This commit is contained in:
Romain 2024-01-30 16:28:05 +01:00 committed by GitHub
parent d02be003ab
commit 8b77f0c2dd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
36 changed files with 594 additions and 317 deletions

View file

@ -1,63 +0,0 @@
package middleware
import (
"context"
"github.com/containous/alice"
"github.com/rs/zerolog/log"
"github.com/traefik/traefik/v3/pkg/metrics"
"github.com/traefik/traefik/v3/pkg/middlewares/accesslog"
"github.com/traefik/traefik/v3/pkg/middlewares/capture"
metricsMiddle "github.com/traefik/traefik/v3/pkg/middlewares/metrics"
tracingMiddle "github.com/traefik/traefik/v3/pkg/middlewares/tracing"
"go.opentelemetry.io/otel/trace"
)
// ChainBuilder Creates a middleware chain by entry point. It is used for middlewares that are created almost systematically and that need to be created before all others.
type ChainBuilder struct {
metricsRegistry metrics.Registry
accessLoggerMiddleware *accesslog.Handler
tracer trace.Tracer
}
// NewChainBuilder Creates a new ChainBuilder.
func NewChainBuilder(metricsRegistry metrics.Registry, accessLoggerMiddleware *accesslog.Handler, tracer trace.Tracer) *ChainBuilder {
return &ChainBuilder{
metricsRegistry: metricsRegistry,
accessLoggerMiddleware: accessLoggerMiddleware,
tracer: tracer,
}
}
// Build a middleware chain by entry point.
func (c *ChainBuilder) Build(ctx context.Context, entryPointName string) alice.Chain {
chain := alice.New()
if c.accessLoggerMiddleware != nil || c.metricsRegistry != nil && (c.metricsRegistry.IsEpEnabled() || c.metricsRegistry.IsRouterEnabled() || c.metricsRegistry.IsSvcEnabled()) {
chain = chain.Append(capture.Wrap)
}
if c.accessLoggerMiddleware != nil {
chain = chain.Append(accesslog.WrapHandler(c.accessLoggerMiddleware))
}
if c.tracer != nil {
chain = chain.Append(tracingMiddle.WrapEntryPointHandler(ctx, c.tracer, entryPointName))
}
if c.metricsRegistry != nil && c.metricsRegistry.IsEpEnabled() {
metricsHandler := metricsMiddle.WrapEntryPointHandler(ctx, c.metricsRegistry, entryPointName)
chain = chain.Append(tracingMiddle.WrapMiddleware(ctx, metricsHandler))
}
return chain
}
// Close accessLogger and tracer.
func (c *ChainBuilder) Close() {
if c.accessLoggerMiddleware != nil {
if err := c.accessLoggerMiddleware.Close(); err != nil {
log.Error().Err(err).Msg("Could not close the access log file")
}
}
}

View file

@ -387,6 +387,9 @@ func (b *Builder) buildConstructor(ctx context.Context, middlewareName string) (
return nil, fmt.Errorf("invalid middleware %q configuration: invalid middleware type or middleware does not exist", middlewareName)
}
// The tracing middleware is a NOOP if tracing is not setup on the middleware chain.
// Hence, regarding internal resources' observability deactivation,
// this would not enable tracing.
return tracing.WrapMiddleware(ctx, middleware), nil
}

View file

@ -0,0 +1,140 @@
package middleware
import (
"context"
"io"
"net/http"
"strings"
"github.com/containous/alice"
"github.com/rs/zerolog/log"
"github.com/traefik/traefik/v3/pkg/config/static"
"github.com/traefik/traefik/v3/pkg/logs"
"github.com/traefik/traefik/v3/pkg/metrics"
"github.com/traefik/traefik/v3/pkg/middlewares/accesslog"
"github.com/traefik/traefik/v3/pkg/middlewares/capture"
metricsMiddle "github.com/traefik/traefik/v3/pkg/middlewares/metrics"
tracingMiddle "github.com/traefik/traefik/v3/pkg/middlewares/tracing"
"go.opentelemetry.io/otel/trace"
)
// ObservabilityMgr is a manager for observability (AccessLogs, Metrics and Tracing) enablement.
type ObservabilityMgr struct {
config static.Configuration
accessLoggerMiddleware *accesslog.Handler
metricsRegistry metrics.Registry
tracer trace.Tracer
tracerCloser io.Closer
}
// NewObservabilityMgr creates a new ObservabilityMgr.
func NewObservabilityMgr(config static.Configuration, metricsRegistry metrics.Registry, accessLoggerMiddleware *accesslog.Handler, tracer trace.Tracer, tracerCloser io.Closer) *ObservabilityMgr {
return &ObservabilityMgr{
config: config,
metricsRegistry: metricsRegistry,
accessLoggerMiddleware: accessLoggerMiddleware,
tracer: tracer,
tracerCloser: tracerCloser,
}
}
// BuildEPChain an observability middleware chain by entry point.
func (c *ObservabilityMgr) BuildEPChain(ctx context.Context, entryPointName string, resourceName string) alice.Chain {
chain := alice.New()
if c == nil {
return chain
}
if c.accessLoggerMiddleware != nil || c.metricsRegistry != nil && (c.metricsRegistry.IsEpEnabled() || c.metricsRegistry.IsRouterEnabled() || c.metricsRegistry.IsSvcEnabled()) {
if c.ShouldAddAccessLogs(resourceName) || c.ShouldAddMetrics(resourceName) {
chain = chain.Append(capture.Wrap)
}
}
if c.accessLoggerMiddleware != nil && c.ShouldAddAccessLogs(resourceName) {
chain = chain.Append(accesslog.WrapHandler(c.accessLoggerMiddleware))
chain = chain.Append(func(next http.Handler) (http.Handler, error) {
return accesslog.NewFieldHandler(next, logs.EntryPointName, entryPointName, accesslog.InitServiceFields), nil
})
}
if c.tracer != nil && c.ShouldAddTracing(resourceName) {
chain = chain.Append(tracingMiddle.WrapEntryPointHandler(ctx, c.tracer, entryPointName))
}
if c.metricsRegistry != nil && c.metricsRegistry.IsEpEnabled() && c.ShouldAddMetrics(resourceName) {
metricsHandler := metricsMiddle.WrapEntryPointHandler(ctx, c.metricsRegistry, entryPointName)
if c.tracer != nil && c.ShouldAddTracing(resourceName) {
chain = chain.Append(tracingMiddle.WrapMiddleware(ctx, metricsHandler))
} else {
chain = chain.Append(metricsHandler)
}
}
return chain
}
// ShouldAddAccessLogs returns whether the access logs should be enabled for the given resource.
func (c *ObservabilityMgr) ShouldAddAccessLogs(resourceName string) bool {
if c == nil {
return false
}
return c.config.AccessLog != nil && (c.config.AccessLog.AddInternals || !strings.HasSuffix(resourceName, "@internal"))
}
// ShouldAddMetrics returns whether the metrics should be enabled for the given resource.
func (c *ObservabilityMgr) ShouldAddMetrics(resourceName string) bool {
if c == nil {
return false
}
return c.config.Metrics != nil && (c.config.Metrics.AddInternals || !strings.HasSuffix(resourceName, "@internal"))
}
// ShouldAddTracing returns whether the tracing should be enabled for the given resource.
func (c *ObservabilityMgr) ShouldAddTracing(resourceName string) bool {
if c == nil {
return false
}
return c.config.Tracing != nil && (c.config.Tracing.AddInternals || !strings.HasSuffix(resourceName, "@internal"))
}
// MetricsRegistry is an accessor to the metrics registry.
func (c *ObservabilityMgr) MetricsRegistry() metrics.Registry {
if c == nil {
return nil
}
return c.metricsRegistry
}
// Close closes the accessLogger and tracer.
func (c *ObservabilityMgr) Close() {
if c == nil {
return
}
if c.accessLoggerMiddleware != nil {
if err := c.accessLoggerMiddleware.Close(); err != nil {
log.Error().Err(err).Msg("Could not close the access log file")
}
}
if c.tracerCloser != nil {
if err := c.tracerCloser.Close(); err != nil {
log.Error().Err(err).Msg("Could not close the tracer")
}
}
}
func (c *ObservabilityMgr) RotateAccessLogs() error {
if c.accessLoggerMiddleware == nil {
return nil
}
return c.accessLoggerMiddleware.Rotate()
}

View file

@ -10,7 +10,6 @@ import (
"github.com/rs/zerolog/log"
"github.com/traefik/traefik/v3/pkg/config/runtime"
"github.com/traefik/traefik/v3/pkg/logs"
"github.com/traefik/traefik/v3/pkg/metrics"
"github.com/traefik/traefik/v3/pkg/middlewares/accesslog"
"github.com/traefik/traefik/v3/pkg/middlewares/denyrouterrecursion"
metricsMiddle "github.com/traefik/traefik/v3/pkg/middlewares/metrics"
@ -35,21 +34,19 @@ type serviceManager interface {
type Manager struct {
routerHandlers map[string]http.Handler
serviceManager serviceManager
metricsRegistry metrics.Registry
observabilityMgr *middleware.ObservabilityMgr
middlewaresBuilder middlewareBuilder
chainBuilder *middleware.ChainBuilder
conf *runtime.Configuration
tlsManager *tls.Manager
}
// NewManager creates a new Manager.
func NewManager(conf *runtime.Configuration, serviceManager serviceManager, middlewaresBuilder middlewareBuilder, chainBuilder *middleware.ChainBuilder, metricsRegistry metrics.Registry, tlsManager *tls.Manager) *Manager {
func NewManager(conf *runtime.Configuration, serviceManager serviceManager, middlewaresBuilder middlewareBuilder, observabilityMgr *middleware.ObservabilityMgr, tlsManager *tls.Manager) *Manager {
return &Manager{
routerHandlers: make(map[string]http.Handler),
serviceManager: serviceManager,
metricsRegistry: metricsRegistry,
observabilityMgr: observabilityMgr,
middlewaresBuilder: middlewaresBuilder,
chainBuilder: chainBuilder,
conf: conf,
tlsManager: tlsManager,
}
@ -73,49 +70,49 @@ func (m *Manager) BuildHandlers(rootCtx context.Context, entryPoints []string, t
logger := log.Ctx(rootCtx).With().Str(logs.EntryPointName, entryPointName).Logger()
ctx := logger.WithContext(rootCtx)
handler, err := m.buildEntryPointHandler(ctx, routers)
handler, err := m.buildEntryPointHandler(ctx, entryPointName, routers)
if err != nil {
logger.Error().Err(err).Send()
continue
}
handlerWithAccessLog, err := alice.New(func(next http.Handler) (http.Handler, error) {
return accesslog.NewFieldHandler(next, logs.EntryPointName, entryPointName, accesslog.InitServiceFields), nil
}).Then(handler)
if err != nil {
logger.Error().Err(err).Send()
entryPointHandlers[entryPointName] = handler
} else {
entryPointHandlers[entryPointName] = handlerWithAccessLog
}
entryPointHandlers[entryPointName] = handler
}
// Create default handlers.
for _, entryPointName := range entryPoints {
logger := log.Ctx(rootCtx).With().Str(logs.EntryPointName, entryPointName).Logger()
ctx := logger.WithContext(rootCtx)
handler, ok := entryPointHandlers[entryPointName]
if !ok || handler == nil {
handler = BuildDefaultHTTPRouter()
if ok || handler != nil {
continue
}
handlerWithMiddlewares, err := m.chainBuilder.Build(ctx, entryPointName).Then(handler)
handler, err := m.observabilityMgr.BuildEPChain(ctx, entryPointName, "").Then(BuildDefaultHTTPRouter())
if err != nil {
logger.Error().Err(err).Send()
continue
}
entryPointHandlers[entryPointName] = handlerWithMiddlewares
entryPointHandlers[entryPointName] = handler
}
return entryPointHandlers
}
func (m *Manager) buildEntryPointHandler(ctx context.Context, configs map[string]*runtime.RouterInfo) (http.Handler, error) {
func (m *Manager) buildEntryPointHandler(ctx context.Context, entryPointName string, configs map[string]*runtime.RouterInfo) (http.Handler, error) {
muxer, err := httpmuxer.NewMuxer()
if err != nil {
return nil, err
}
defaultHandler, err := m.observabilityMgr.BuildEPChain(ctx, entryPointName, "defaultHandler").Then(http.NotFoundHandler())
if err != nil {
return nil, err
}
muxer.SetDefaultHandler(defaultHandler)
for routerName, routerConfig := range configs {
logger := log.Ctx(ctx).With().Str(logs.RouterName, routerName).Logger()
ctxRouter := logger.WithContext(provider.AddInContext(ctx, routerName))
@ -131,6 +128,14 @@ func (m *Manager) buildEntryPointHandler(ctx context.Context, configs map[string
continue
}
observabilityChain := m.observabilityMgr.BuildEPChain(ctx, entryPointName, routerConfig.Service)
handler, err = observabilityChain.Then(handler)
if err != nil {
routerConfig.AddError(err, true)
logger.Error().Err(err).Send()
continue
}
if err = muxer.AddRoute(routerConfig.Rule, routerConfig.RuleSyntax, routerConfig.Priority, handler); err != nil {
routerConfig.AddError(err, true)
logger.Error().Err(err).Send()
@ -167,6 +172,12 @@ func (m *Manager) buildRouterHandler(ctx context.Context, routerName string, rou
return nil, err
}
// Prevents from enabling observability for internal resources.
if !m.observabilityMgr.ShouldAddAccessLogs(provider.GetQualifiedName(ctx, routerConfig.Service)) {
m.routerHandlers[routerName] = handler
return m.routerHandlers[routerName], nil
}
handlerWithAccessLog, err := alice.New(func(next http.Handler) (http.Handler, error) {
return accesslog.NewFieldHandler(next, accesslog.RouterName, routerName, nil), nil
}).Then(handler)
@ -200,10 +211,20 @@ func (m *Manager) buildHTTPHandler(ctx context.Context, router *runtime.RouterIn
chain := alice.New()
if m.observabilityMgr.MetricsRegistry() != nil && m.observabilityMgr.MetricsRegistry().IsRouterEnabled() &&
m.observabilityMgr.ShouldAddMetrics(provider.GetQualifiedName(ctx, router.Service)) {
chain = chain.Append(metricsMiddle.WrapRouterHandler(ctx, m.observabilityMgr.MetricsRegistry(), routerName, provider.GetQualifiedName(ctx, router.Service)))
}
// Prevents from enabling tracing for internal resources.
if !m.observabilityMgr.ShouldAddTracing(provider.GetQualifiedName(ctx, router.Service)) {
return chain.Extend(*mHandler).Then(sHandler)
}
chain = chain.Append(tracing.WrapRouterHandler(ctx, routerName, router.Rule, provider.GetQualifiedName(ctx, router.Service)))
if m.metricsRegistry != nil && m.metricsRegistry.IsRouterEnabled() {
metricsHandler := metricsMiddle.WrapRouterHandler(ctx, m.metricsRegistry, routerName, provider.GetQualifiedName(ctx, router.Service))
if m.observabilityMgr.MetricsRegistry() != nil && m.observabilityMgr.MetricsRegistry().IsRouterEnabled() {
metricsHandler := metricsMiddle.WrapRouterHandler(ctx, m.observabilityMgr.MetricsRegistry(), routerName, provider.GetQualifiedName(ctx, router.Service))
chain = chain.Append(tracing.WrapMiddleware(ctx, metricsHandler))
}

View file

@ -9,21 +9,15 @@ import (
"testing"
"time"
"github.com/containous/alice"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
ptypes "github.com/traefik/paerser/types"
"github.com/traefik/traefik/v3/pkg/config/dynamic"
"github.com/traefik/traefik/v3/pkg/config/runtime"
"github.com/traefik/traefik/v3/pkg/metrics"
"github.com/traefik/traefik/v3/pkg/middlewares/accesslog"
"github.com/traefik/traefik/v3/pkg/middlewares/capture"
"github.com/traefik/traefik/v3/pkg/middlewares/requestdecorator"
"github.com/traefik/traefik/v3/pkg/server/middleware"
"github.com/traefik/traefik/v3/pkg/server/service"
"github.com/traefik/traefik/v3/pkg/testhelpers"
"github.com/traefik/traefik/v3/pkg/tls"
"github.com/traefik/traefik/v3/pkg/types"
)
func TestRouterManager_Get(t *testing.T) {
@ -319,10 +313,9 @@ func TestRouterManager_Get(t *testing.T) {
roundTripperManager.Update(map[string]*dynamic.ServersTransport{"default@internal": {}})
serviceManager := service.NewManager(rtConf.Services, nil, nil, roundTripperManager)
middlewaresBuilder := middleware.NewBuilder(rtConf.Middlewares, serviceManager, nil)
chainBuilder := middleware.NewChainBuilder(nil, nil, nil)
tlsManager := tls.NewManager()
routerManager := NewManager(rtConf, serviceManager, middlewaresBuilder, chainBuilder, metrics.NewVoidRegistry(), tlsManager)
routerManager := NewManager(rtConf, serviceManager, middlewaresBuilder, nil, tlsManager)
handlers := routerManager.BuildHandlers(context.Background(), test.entryPoints, false)
@ -341,126 +334,6 @@ func TestRouterManager_Get(t *testing.T) {
}
}
func TestAccessLog(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}))
t.Cleanup(func() { server.Close() })
testCases := []struct {
desc string
routersConfig map[string]*dynamic.Router
serviceConfig map[string]*dynamic.Service
middlewaresConfig map[string]*dynamic.Middleware
entryPoints []string
expected string
}{
{
desc: "apply routerName in accesslog (first match)",
routersConfig: map[string]*dynamic.Router{
"foo": {
EntryPoints: []string{"web"},
Service: "foo-service",
Rule: "Host(`foo.bar`)",
},
"bar": {
EntryPoints: []string{"web"},
Service: "foo-service",
Rule: "Host(`bar.foo`)",
},
},
serviceConfig: map[string]*dynamic.Service{
"foo-service": {
LoadBalancer: &dynamic.ServersLoadBalancer{
Servers: []dynamic.Server{
{
URL: server.URL,
},
},
},
},
},
entryPoints: []string{"web"},
expected: "foo",
},
{
desc: "apply routerName in accesslog (second match)",
routersConfig: map[string]*dynamic.Router{
"foo": {
EntryPoints: []string{"web"},
Service: "foo-service",
Rule: "Host(`bar.foo`)",
},
"bar": {
EntryPoints: []string{"web"},
Service: "foo-service",
Rule: "Host(`foo.bar`)",
},
},
serviceConfig: map[string]*dynamic.Service{
"foo-service": {
LoadBalancer: &dynamic.ServersLoadBalancer{
Servers: []dynamic.Server{
{
URL: server.URL,
},
},
},
},
},
entryPoints: []string{"web"},
expected: "bar",
},
}
for _, test := range testCases {
t.Run(test.desc, func(t *testing.T) {
rtConf := runtime.NewConfig(dynamic.Configuration{
HTTP: &dynamic.HTTPConfiguration{
Services: test.serviceConfig,
Routers: test.routersConfig,
Middlewares: test.middlewaresConfig,
},
})
roundTripperManager := service.NewRoundTripperManager(nil)
roundTripperManager.Update(map[string]*dynamic.ServersTransport{"default@internal": {}})
serviceManager := service.NewManager(rtConf.Services, nil, nil, roundTripperManager)
middlewaresBuilder := middleware.NewBuilder(rtConf.Middlewares, serviceManager, nil)
chainBuilder := middleware.NewChainBuilder(nil, nil, nil)
tlsManager := tls.NewManager()
routerManager := NewManager(rtConf, serviceManager, middlewaresBuilder, chainBuilder, metrics.NewVoidRegistry(), tlsManager)
handlers := routerManager.BuildHandlers(context.Background(), test.entryPoints, false)
w := httptest.NewRecorder()
req := testhelpers.MustNewRequest(http.MethodGet, "http://foo.bar/", nil)
accesslogger, err := accesslog.NewHandler(&types.AccessLog{
Format: "json",
})
require.NoError(t, err)
reqHost := requestdecorator.New(nil)
chain := alice.New()
chain = chain.Append(capture.Wrap)
chain = chain.Append(accesslog.WrapHandler(accesslogger))
handler, err := chain.Then(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
reqHost.ServeHTTP(w, req, handlers["web"].ServeHTTP)
data := accesslog.GetLogData(req)
require.NotNil(t, data)
assert.Equal(t, test.expected, data.Core[accesslog.RouterName])
}))
require.NoError(t, err)
handler.ServeHTTP(w, req)
})
}
}
func TestRuntimeConfiguration(t *testing.T) {
testCases := []struct {
desc string
@ -788,11 +661,10 @@ func TestRuntimeConfiguration(t *testing.T) {
roundTripperManager.Update(map[string]*dynamic.ServersTransport{"default@internal": {}})
serviceManager := service.NewManager(rtConf.Services, nil, nil, roundTripperManager)
middlewaresBuilder := middleware.NewBuilder(rtConf.Middlewares, serviceManager, nil)
chainBuilder := middleware.NewChainBuilder(nil, nil, nil)
tlsManager := tls.NewManager()
tlsManager.UpdateConfigs(context.Background(), nil, test.tlsOptions, nil)
routerManager := NewManager(rtConf, serviceManager, middlewaresBuilder, chainBuilder, metrics.NewVoidRegistry(), tlsManager)
routerManager := NewManager(rtConf, serviceManager, middlewaresBuilder, nil, tlsManager)
_ = routerManager.BuildHandlers(context.Background(), entryPoints, false)
_ = routerManager.BuildHandlers(context.Background(), entryPoints, true)
@ -866,10 +738,9 @@ func TestProviderOnMiddlewares(t *testing.T) {
roundTripperManager.Update(map[string]*dynamic.ServersTransport{"default@internal": {}})
serviceManager := service.NewManager(rtConf.Services, nil, nil, roundTripperManager)
middlewaresBuilder := middleware.NewBuilder(rtConf.Middlewares, serviceManager, nil)
chainBuilder := middleware.NewChainBuilder(nil, nil, nil)
tlsManager := tls.NewManager()
routerManager := NewManager(rtConf, serviceManager, middlewaresBuilder, chainBuilder, metrics.NewVoidRegistry(), tlsManager)
routerManager := NewManager(rtConf, serviceManager, middlewaresBuilder, nil, tlsManager)
_ = routerManager.BuildHandlers(context.Background(), entryPoints, false)
@ -935,10 +806,9 @@ func BenchmarkRouterServe(b *testing.B) {
serviceManager := service.NewManager(rtConf.Services, nil, nil, staticRoundTripperGetter{res})
middlewaresBuilder := middleware.NewBuilder(rtConf.Middlewares, serviceManager, nil)
chainBuilder := middleware.NewChainBuilder(nil, nil, nil)
tlsManager := tls.NewManager()
routerManager := NewManager(rtConf, serviceManager, middlewaresBuilder, chainBuilder, metrics.NewVoidRegistry(), tlsManager)
routerManager := NewManager(rtConf, serviceManager, middlewaresBuilder, nil, tlsManager)
handlers := routerManager.BuildHandlers(context.Background(), entryPoints, false)

View file

@ -6,7 +6,6 @@ import (
"github.com/rs/zerolog/log"
"github.com/traefik/traefik/v3/pkg/config/runtime"
"github.com/traefik/traefik/v3/pkg/config/static"
"github.com/traefik/traefik/v3/pkg/metrics"
"github.com/traefik/traefik/v3/pkg/server/middleware"
tcpmiddleware "github.com/traefik/traefik/v3/pkg/server/middleware/tcp"
"github.com/traefik/traefik/v3/pkg/server/router"
@ -25,13 +24,12 @@ type RouterFactory struct {
entryPointsTCP []string
entryPointsUDP []string
managerFactory *service.ManagerFactory
metricsRegistry metrics.Registry
managerFactory *service.ManagerFactory
pluginBuilder middleware.PluginsBuilder
chainBuilder *middleware.ChainBuilder
tlsManager *tls.Manager
observabilityMgr *middleware.ObservabilityMgr
tlsManager *tls.Manager
dialerManager *tcp.DialerManager
@ -40,7 +38,7 @@ type RouterFactory struct {
// NewRouterFactory creates a new RouterFactory.
func NewRouterFactory(staticConfiguration static.Configuration, managerFactory *service.ManagerFactory, tlsManager *tls.Manager,
chainBuilder *middleware.ChainBuilder, pluginBuilder middleware.PluginsBuilder, metricsRegistry metrics.Registry, dialerManager *tcp.DialerManager,
observabilityMgr *middleware.ObservabilityMgr, pluginBuilder middleware.PluginsBuilder, dialerManager *tcp.DialerManager,
) *RouterFactory {
var entryPointsTCP, entryPointsUDP []string
for name, cfg := range staticConfiguration.EntryPoints {
@ -58,14 +56,13 @@ func NewRouterFactory(staticConfiguration static.Configuration, managerFactory *
}
return &RouterFactory{
entryPointsTCP: entryPointsTCP,
entryPointsUDP: entryPointsUDP,
managerFactory: managerFactory,
metricsRegistry: metricsRegistry,
tlsManager: tlsManager,
chainBuilder: chainBuilder,
pluginBuilder: pluginBuilder,
dialerManager: dialerManager,
entryPointsTCP: entryPointsTCP,
entryPointsUDP: entryPointsUDP,
managerFactory: managerFactory,
observabilityMgr: observabilityMgr,
tlsManager: tlsManager,
pluginBuilder: pluginBuilder,
dialerManager: dialerManager,
}
}
@ -83,7 +80,7 @@ func (f *RouterFactory) CreateRouters(rtConf *runtime.Configuration) (map[string
middlewaresBuilder := middleware.NewBuilder(rtConf.Middlewares, serviceManager, f.pluginBuilder)
routerManager := router.NewManager(rtConf, serviceManager, middlewaresBuilder, f.chainBuilder, f.metricsRegistry, f.tlsManager)
routerManager := router.NewManager(rtConf, serviceManager, middlewaresBuilder, f.observabilityMgr, f.tlsManager)
handlersNonTLS := routerManager.BuildHandlers(ctx, f.entryPointsTCP, false)
handlersTLS := routerManager.BuildHandlers(ctx, f.entryPointsTCP, true)

View file

@ -9,7 +9,6 @@ import (
"github.com/traefik/traefik/v3/pkg/config/dynamic"
"github.com/traefik/traefik/v3/pkg/config/runtime"
"github.com/traefik/traefik/v3/pkg/config/static"
"github.com/traefik/traefik/v3/pkg/metrics"
"github.com/traefik/traefik/v3/pkg/server/middleware"
"github.com/traefik/traefik/v3/pkg/server/service"
"github.com/traefik/traefik/v3/pkg/tcp"
@ -51,12 +50,12 @@ func TestReuseService(t *testing.T) {
roundTripperManager := service.NewRoundTripperManager(nil)
roundTripperManager.Update(map[string]*dynamic.ServersTransport{"default@internal": {}})
managerFactory := service.NewManagerFactory(staticConfig, nil, metrics.NewVoidRegistry(), roundTripperManager, nil)
managerFactory := service.NewManagerFactory(staticConfig, nil, nil, roundTripperManager, nil)
tlsManager := tls.NewManager()
dialerManager := tcp.NewDialerManager(nil)
dialerManager.Update(map[string]*dynamic.TCPServersTransport{"default@internal": {}})
factory := NewRouterFactory(staticConfig, managerFactory, tlsManager, middleware.NewChainBuilder(nil, nil, nil), nil, metrics.NewVoidRegistry(), dialerManager)
factory := NewRouterFactory(staticConfig, managerFactory, tlsManager, nil, nil, dialerManager)
entryPointsHandlers, _ := factory.CreateRouters(runtime.NewConfig(dynamic.Configuration{HTTP: dynamicConfigs}))
@ -189,12 +188,13 @@ func TestServerResponseEmptyBackend(t *testing.T) {
roundTripperManager := service.NewRoundTripperManager(nil)
roundTripperManager.Update(map[string]*dynamic.ServersTransport{"default@internal": {}})
managerFactory := service.NewManagerFactory(staticConfig, nil, metrics.NewVoidRegistry(), roundTripperManager, nil)
managerFactory := service.NewManagerFactory(staticConfig, nil, nil, roundTripperManager, nil)
tlsManager := tls.NewManager()
dialerManager := tcp.NewDialerManager(nil)
dialerManager.Update(map[string]*dynamic.TCPServersTransport{"default@internal": {}})
factory := NewRouterFactory(staticConfig, managerFactory, tlsManager, middleware.NewChainBuilder(nil, nil, nil), nil, metrics.NewVoidRegistry(), dialerManager)
observabiltyMgr := middleware.NewObservabilityMgr(staticConfig, nil, nil, nil, nil)
factory := NewRouterFactory(staticConfig, managerFactory, tlsManager, observabiltyMgr, nil, dialerManager)
entryPointsHandlers, _ := factory.CreateRouters(runtime.NewConfig(dynamic.Configuration{HTTP: test.config(testServer.URL)}))
@ -232,14 +232,12 @@ func TestInternalServices(t *testing.T) {
roundTripperManager := service.NewRoundTripperManager(nil)
roundTripperManager.Update(map[string]*dynamic.ServersTransport{"default@internal": {}})
managerFactory := service.NewManagerFactory(staticConfig, nil, metrics.NewVoidRegistry(), roundTripperManager, nil)
managerFactory := service.NewManagerFactory(staticConfig, nil, nil, roundTripperManager, nil)
tlsManager := tls.NewManager()
voidRegistry := metrics.NewVoidRegistry()
dialerManager := tcp.NewDialerManager(nil)
dialerManager.Update(map[string]*dynamic.TCPServersTransport{"default@internal": {}})
factory := NewRouterFactory(staticConfig, managerFactory, tlsManager, middleware.NewChainBuilder(voidRegistry, nil, nil), nil, voidRegistry, dialerManager)
factory := NewRouterFactory(staticConfig, managerFactory, tlsManager, nil, nil, dialerManager)
entryPointsHandlers, _ := factory.CreateRouters(runtime.NewConfig(dynamic.Configuration{HTTP: dynamicConfigs}))

View file

@ -3,47 +3,39 @@ package server
import (
"context"
"errors"
"io"
"os"
"os/signal"
"time"
"github.com/rs/zerolog/log"
"github.com/traefik/traefik/v3/pkg/metrics"
"github.com/traefik/traefik/v3/pkg/middlewares/accesslog"
"github.com/traefik/traefik/v3/pkg/safe"
"github.com/traefik/traefik/v3/pkg/server/middleware"
)
// Server is the reverse-proxy/load-balancer engine.
type Server struct {
watcher *ConfigurationWatcher
tcpEntryPoints TCPEntryPoints
udpEntryPoints UDPEntryPoints
chainBuilder *middleware.ChainBuilder
accessLoggerMiddleware *accesslog.Handler
watcher *ConfigurationWatcher
tcpEntryPoints TCPEntryPoints
udpEntryPoints UDPEntryPoints
observabilityMgr *middleware.ObservabilityMgr
signals chan os.Signal
stopChan chan bool
routinesPool *safe.Pool
tracerCloser io.Closer
}
// NewServer returns an initialized Server.
func NewServer(routinesPool *safe.Pool, entryPoints TCPEntryPoints, entryPointsUDP UDPEntryPoints, watcher *ConfigurationWatcher, chainBuilder *middleware.ChainBuilder, accessLoggerMiddleware *accesslog.Handler, tracerCloser io.Closer) *Server {
func NewServer(routinesPool *safe.Pool, entryPoints TCPEntryPoints, entryPointsUDP UDPEntryPoints, watcher *ConfigurationWatcher, observabilityMgr *middleware.ObservabilityMgr) *Server {
srv := &Server{
watcher: watcher,
tcpEntryPoints: entryPoints,
chainBuilder: chainBuilder,
accessLoggerMiddleware: accessLoggerMiddleware,
signals: make(chan os.Signal, 1),
stopChan: make(chan bool, 1),
routinesPool: routinesPool,
udpEntryPoints: entryPointsUDP,
tracerCloser: tracerCloser,
watcher: watcher,
tcpEntryPoints: entryPoints,
observabilityMgr: observabilityMgr,
signals: make(chan os.Signal, 1),
stopChan: make(chan bool, 1),
routinesPool: routinesPool,
udpEntryPoints: entryPointsUDP,
}
srv.configureSignals()
@ -105,13 +97,7 @@ func (s *Server) Close() {
close(s.stopChan)
s.chainBuilder.Close()
if s.tracerCloser != nil {
if err := s.tracerCloser.Close(); err != nil {
log.Error().Err(err).Msg("Could not close the tracer")
}
}
s.observabilityMgr.Close()
cancel()
}

View file

@ -24,10 +24,8 @@ func (s *Server) listenSignals(ctx context.Context) {
if sig == syscall.SIGUSR1 {
log.Info().Msgf("Closing and re-opening log files for rotation: %+v", sig)
if s.accessLoggerMiddleware != nil {
if err := s.accessLoggerMiddleware.Rotate(); err != nil {
log.Error().Err(err).Msg("Error rotating access log")
}
if err := s.observabilityMgr.RotateAccessLogs(); err != nil {
log.Error().Err(err).Msg("Error rotating access log")
}
}
}

View file

@ -10,11 +10,12 @@ import (
"github.com/traefik/traefik/v3/pkg/config/static"
"github.com/traefik/traefik/v3/pkg/metrics"
"github.com/traefik/traefik/v3/pkg/safe"
"github.com/traefik/traefik/v3/pkg/server/middleware"
)
// ManagerFactory a factory of service manager.
type ManagerFactory struct {
metricsRegistry metrics.Registry
observabilityMgr *middleware.ObservabilityMgr
roundTripperManager *RoundTripperManager
@ -29,9 +30,9 @@ type ManagerFactory struct {
}
// NewManagerFactory creates a new ManagerFactory.
func NewManagerFactory(staticConfiguration static.Configuration, routinesPool *safe.Pool, metricsRegistry metrics.Registry, roundTripperManager *RoundTripperManager, acmeHTTPHandler http.Handler) *ManagerFactory {
func NewManagerFactory(staticConfiguration static.Configuration, routinesPool *safe.Pool, observabilityMgr *middleware.ObservabilityMgr, roundTripperManager *RoundTripperManager, acmeHTTPHandler http.Handler) *ManagerFactory {
factory := &ManagerFactory{
metricsRegistry: metricsRegistry,
observabilityMgr: observabilityMgr,
routinesPool: routinesPool,
roundTripperManager: roundTripperManager,
acmeHTTPHandler: acmeHTTPHandler,
@ -72,7 +73,7 @@ func NewManagerFactory(staticConfiguration static.Configuration, routinesPool *s
// Build creates a service manager.
func (f *ManagerFactory) Build(configuration *runtime.Configuration) *InternalHandlers {
svcManager := NewManager(configuration.Services, f.metricsRegistry, f.routinesPool, f.roundTripperManager)
svcManager := NewManager(configuration.Services, f.observabilityMgr, f.routinesPool, f.roundTripperManager)
var apiHandler http.Handler
if f.api != nil {

View file

@ -20,12 +20,12 @@ import (
"github.com/traefik/traefik/v3/pkg/config/runtime"
"github.com/traefik/traefik/v3/pkg/healthcheck"
"github.com/traefik/traefik/v3/pkg/logs"
"github.com/traefik/traefik/v3/pkg/metrics"
"github.com/traefik/traefik/v3/pkg/middlewares/accesslog"
metricsMiddle "github.com/traefik/traefik/v3/pkg/middlewares/metrics"
tracingMiddle "github.com/traefik/traefik/v3/pkg/middlewares/tracing"
"github.com/traefik/traefik/v3/pkg/safe"
"github.com/traefik/traefik/v3/pkg/server/cookie"
"github.com/traefik/traefik/v3/pkg/server/middleware"
"github.com/traefik/traefik/v3/pkg/server/provider"
"github.com/traefik/traefik/v3/pkg/server/service/loadbalancer/failover"
"github.com/traefik/traefik/v3/pkg/server/service/loadbalancer/mirror"
@ -42,7 +42,7 @@ type RoundTripperGetter interface {
// Manager The service manager.
type Manager struct {
routinePool *safe.Pool
metricsRegistry metrics.Registry
observabilityMgr *middleware.ObservabilityMgr
bufferPool httputil.BufferPool
roundTripperManager RoundTripperGetter
@ -53,10 +53,10 @@ type Manager struct {
}
// NewManager creates a new Manager.
func NewManager(configs map[string]*runtime.ServiceInfo, metricsRegistry metrics.Registry, routinePool *safe.Pool, roundTripperManager RoundTripperGetter) *Manager {
func NewManager(configs map[string]*runtime.ServiceInfo, observabilityMgr *middleware.ObservabilityMgr, routinePool *safe.Pool, roundTripperManager RoundTripperGetter) *Manager {
return &Manager{
routinePool: routinePool,
metricsRegistry: metricsRegistry,
observabilityMgr: observabilityMgr,
bufferPool: newBufferPool(),
roundTripperManager: roundTripperManager,
services: make(map[string]http.Handler),
@ -302,12 +302,17 @@ func (m *Manager) getLoadBalancerServiceHandler(ctx context.Context, serviceName
proxy := buildSingleHostProxy(target, passHostHeader, time.Duration(flushInterval), roundTripper, m.bufferPool)
proxy = accesslog.NewFieldHandler(proxy, accesslog.ServiceURL, target.String(), nil)
proxy = accesslog.NewFieldHandler(proxy, accesslog.ServiceAddr, target.Host, nil)
proxy = accesslog.NewFieldHandler(proxy, accesslog.ServiceName, serviceName, accesslog.AddServiceFields)
// Prevents from enabling observability for internal resources.
if m.metricsRegistry != nil && m.metricsRegistry.IsSvcEnabled() {
metricsHandler := metricsMiddle.WrapServiceHandler(ctx, m.metricsRegistry, serviceName)
if m.observabilityMgr.ShouldAddAccessLogs(provider.GetQualifiedName(ctx, serviceName)) {
proxy = accesslog.NewFieldHandler(proxy, accesslog.ServiceURL, target.String(), nil)
proxy = accesslog.NewFieldHandler(proxy, accesslog.ServiceAddr, target.Host, nil)
proxy = accesslog.NewFieldHandler(proxy, accesslog.ServiceName, serviceName, accesslog.AddServiceFields)
}
if m.observabilityMgr.MetricsRegistry() != nil && m.observabilityMgr.MetricsRegistry().IsSvcEnabled() &&
m.observabilityMgr.ShouldAddMetrics(provider.GetQualifiedName(ctx, serviceName)) {
metricsHandler := metricsMiddle.WrapServiceHandler(ctx, m.observabilityMgr.MetricsRegistry(), serviceName)
proxy, err = alice.New().
Append(tracingMiddle.WrapMiddleware(ctx, metricsHandler)).
@ -317,7 +322,9 @@ func (m *Manager) getLoadBalancerServiceHandler(ctx context.Context, serviceName
}
}
proxy = tracingMiddle.NewService(ctx, serviceName, proxy)
if m.observabilityMgr.ShouldAddTracing(provider.GetQualifiedName(ctx, serviceName)) {
proxy = tracingMiddle.NewService(ctx, serviceName, proxy)
}
lb.Add(proxyName, proxy, server.Weight)
@ -330,7 +337,7 @@ func (m *Manager) getLoadBalancerServiceHandler(ctx context.Context, serviceName
if service.HealthCheck != nil {
m.healthCheckers[serviceName] = healthcheck.NewServiceHealthChecker(
ctx,
m.metricsRegistry,
m.observabilityMgr.MetricsRegistry(),
service.HealthCheck,
lb,
info,