1
0
Fork 0

Merge branch v3.2 into master

This commit is contained in:
kevinpollet 2024-11-21 11:45:02 +01:00
commit 090db6d4b0
No known key found for this signature in database
GPG key ID: 0C9A5DDD1B292453
76 changed files with 1823 additions and 1016 deletions

View file

@ -8,11 +8,6 @@ import (
"strings"
)
type serviceManager interface {
BuildHTTP(rootCtx context.Context, serviceName string) (http.Handler, error)
LaunchHealthCheck(ctx context.Context)
}
// InternalHandlers is the internal HTTP handlers builder.
type InternalHandlers struct {
api http.Handler
@ -21,26 +16,24 @@ type InternalHandlers struct {
prometheus http.Handler
ping http.Handler
acmeHTTP http.Handler
serviceManager
}
// NewInternalHandlers creates a new InternalHandlers.
func NewInternalHandlers(next serviceManager, apiHandler, rest, metricsHandler, pingHandler, dashboard, acmeHTTP http.Handler) *InternalHandlers {
func NewInternalHandlers(apiHandler, rest, metricsHandler, pingHandler, dashboard, acmeHTTP http.Handler) *InternalHandlers {
return &InternalHandlers{
api: apiHandler,
dashboard: dashboard,
rest: rest,
prometheus: metricsHandler,
ping: pingHandler,
acmeHTTP: acmeHTTP,
serviceManager: next,
api: apiHandler,
dashboard: dashboard,
rest: rest,
prometheus: metricsHandler,
ping: pingHandler,
acmeHTTP: acmeHTTP,
}
}
// BuildHTTP builds an HTTP handler.
func (m *InternalHandlers) BuildHTTP(rootCtx context.Context, serviceName string) (http.Handler, error) {
if !strings.HasSuffix(serviceName, "@internal") {
return m.serviceManager.BuildHTTP(rootCtx, serviceName)
return nil, nil
}
internalHandler, err := m.get(serviceName)

View file

@ -10,18 +10,20 @@ import (
"github.com/traefik/traefik/v3/pkg/config/dynamic"
)
func pointer[T any](v T) *T { return &v }
func TestBalancer(t *testing.T) {
balancer := New(nil, false)
balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "first")
rw.WriteHeader(http.StatusOK)
}), Int(3))
}), pointer(3))
balancer.Add("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "second")
rw.WriteHeader(http.StatusOK)
}), Int(1))
}), pointer(1))
recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}}
for range 4 {
@ -47,9 +49,9 @@ func TestBalancerOneServerZeroWeight(t *testing.T) {
balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "first")
rw.WriteHeader(http.StatusOK)
}), Int(1))
}), pointer(1))
balancer.Add("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {}), Int(0))
balancer.Add("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {}), pointer(0))
recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}}
for range 3 {
@ -68,11 +70,11 @@ func TestBalancerNoServiceUp(t *testing.T) {
balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.WriteHeader(http.StatusInternalServerError)
}), Int(1))
}), pointer(1))
balancer.Add("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.WriteHeader(http.StatusInternalServerError)
}), Int(1))
}), pointer(1))
balancer.SetStatus(context.WithValue(context.Background(), serviceName, "parent"), "first", false)
balancer.SetStatus(context.WithValue(context.Background(), serviceName, "parent"), "second", false)
@ -89,11 +91,11 @@ func TestBalancerOneServerDown(t *testing.T) {
balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "first")
rw.WriteHeader(http.StatusOK)
}), Int(1))
}), pointer(1))
balancer.Add("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.WriteHeader(http.StatusInternalServerError)
}), Int(1))
}), pointer(1))
balancer.SetStatus(context.WithValue(context.Background(), serviceName, "parent"), "second", false)
recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}}
@ -110,12 +112,12 @@ func TestBalancerDownThenUp(t *testing.T) {
balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "first")
rw.WriteHeader(http.StatusOK)
}), Int(1))
}), pointer(1))
balancer.Add("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "second")
rw.WriteHeader(http.StatusOK)
}), Int(1))
}), pointer(1))
balancer.SetStatus(context.WithValue(context.Background(), serviceName, "parent"), "second", false)
recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}}
@ -139,30 +141,30 @@ func TestBalancerPropagate(t *testing.T) {
balancer1.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "first")
rw.WriteHeader(http.StatusOK)
}), Int(1))
}), pointer(1))
balancer1.Add("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "second")
rw.WriteHeader(http.StatusOK)
}), Int(1))
}), pointer(1))
balancer2 := New(nil, true)
balancer2.Add("third", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "third")
rw.WriteHeader(http.StatusOK)
}), Int(1))
}), pointer(1))
balancer2.Add("fourth", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "fourth")
rw.WriteHeader(http.StatusOK)
}), Int(1))
}), pointer(1))
topBalancer := New(nil, true)
topBalancer.Add("balancer1", balancer1, Int(1))
topBalancer.Add("balancer1", balancer1, pointer(1))
_ = balancer1.RegisterStatusUpdater(func(up bool) {
topBalancer.SetStatus(context.WithValue(context.Background(), serviceName, "top"), "balancer1", up)
// TODO(mpl): if test gets flaky, add channel or something here to signal that
// propagation is done, and wait on it before sending request.
})
topBalancer.Add("balancer2", balancer2, Int(1))
topBalancer.Add("balancer2", balancer2, pointer(1))
_ = balancer2.RegisterStatusUpdater(func(up bool) {
topBalancer.SetStatus(context.WithValue(context.Background(), serviceName, "top"), "balancer2", up)
})
@ -209,8 +211,8 @@ func TestBalancerPropagate(t *testing.T) {
func TestBalancerAllServersZeroWeight(t *testing.T) {
balancer := New(nil, false)
balancer.Add("test", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {}), Int(0))
balancer.Add("test2", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {}), Int(0))
balancer.Add("test", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {}), pointer(0))
balancer.Add("test2", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {}), pointer(0))
recorder := httptest.NewRecorder()
balancer.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil))
@ -233,12 +235,12 @@ func TestSticky(t *testing.T) {
balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "first")
rw.WriteHeader(http.StatusOK)
}), Int(1))
}), pointer(1))
balancer.Add("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "second")
rw.WriteHeader(http.StatusOK)
}), Int(2))
}), pointer(2))
recorder := &responseRecorder{
ResponseRecorder: httptest.NewRecorder(),
@ -275,12 +277,12 @@ func TestSticky_FallBack(t *testing.T) {
balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "first")
rw.WriteHeader(http.StatusOK)
}), Int(1))
}), pointer(1))
balancer.Add("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "second")
rw.WriteHeader(http.StatusOK)
}), Int(2))
}), pointer(2))
recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}}
@ -304,12 +306,12 @@ func TestBalancerBias(t *testing.T) {
balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "A")
rw.WriteHeader(http.StatusOK)
}), Int(11))
}), pointer(11))
balancer.Add("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "B")
rw.WriteHeader(http.StatusOK)
}), Int(3))
}), pointer(3))
recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}}
@ -322,8 +324,6 @@ func TestBalancerBias(t *testing.T) {
assert.Equal(t, wantSequence, recorder.sequence)
}
func Int(v int) *int { return &v }
type responseRecorder struct {
*httptest.ResponseRecorder
save map[string]int

View file

@ -74,13 +74,12 @@ func NewManagerFactory(staticConfiguration static.Configuration, routinesPool *s
}
// Build creates a service manager.
func (f *ManagerFactory) Build(configuration *runtime.Configuration) *InternalHandlers {
svcManager := NewManager(configuration.Services, f.observabilityMgr, f.routinesPool, f.transportManager, f.proxyBuilder)
func (f *ManagerFactory) Build(configuration *runtime.Configuration) *Manager {
var apiHandler http.Handler
if f.api != nil {
apiHandler = f.api(configuration)
}
return NewInternalHandlers(svcManager, apiHandler, f.restHandler, f.metricsHandler, f.pingHandler, f.dashboardHandler, f.acmeHTTPHandler)
internalHandlers := NewInternalHandlers(apiHandler, f.restHandler, f.metricsHandler, f.pingHandler, f.dashboardHandler, f.acmeHTTPHandler)
return NewManager(configuration.Services, f.observabilityMgr, f.routinesPool, f.transportManager, f.proxyBuilder, internalHandlers)
}

View file

@ -46,12 +46,18 @@ type ProxyBuilder interface {
Update(configs map[string]*dynamic.ServersTransport)
}
// ServiceBuilder is a Service builder.
type ServiceBuilder interface {
BuildHTTP(rootCtx context.Context, serviceName string) (http.Handler, error)
}
// Manager The service manager.
type Manager struct {
routinePool *safe.Pool
observabilityMgr *middleware.ObservabilityMgr
transportManager httputil.TransportManager
proxyBuilder ProxyBuilder
serviceBuilders []ServiceBuilder
services map[string]http.Handler
configs map[string]*runtime.ServiceInfo
@ -60,12 +66,13 @@ type Manager struct {
}
// NewManager creates a new Manager.
func NewManager(configs map[string]*runtime.ServiceInfo, observabilityMgr *middleware.ObservabilityMgr, routinePool *safe.Pool, transportManager httputil.TransportManager, proxyBuilder ProxyBuilder) *Manager {
func NewManager(configs map[string]*runtime.ServiceInfo, observabilityMgr *middleware.ObservabilityMgr, routinePool *safe.Pool, transportManager httputil.TransportManager, proxyBuilder ProxyBuilder, serviceBuilders ...ServiceBuilder) *Manager {
return &Manager{
routinePool: routinePool,
observabilityMgr: observabilityMgr,
transportManager: transportManager,
proxyBuilder: proxyBuilder,
serviceBuilders: serviceBuilders,
services: make(map[string]http.Handler),
configs: configs,
healthCheckers: make(map[string]*healthcheck.ServiceHealthChecker),
@ -85,6 +92,18 @@ func (m *Manager) BuildHTTP(rootCtx context.Context, serviceName string) (http.H
return handler, nil
}
// Must be before we get configs to handle services without config.
for _, builder := range m.serviceBuilders {
handler, err := builder.BuildHTTP(rootCtx, serviceName)
if err != nil {
return nil, err
}
if handler != nil {
m.services[serviceName] = handler
return handler, nil
}
}
conf, ok := m.configs[serviceName]
if !ok {
return nil, fmt.Errorf("the service %q does not exist", serviceName)

View file

@ -20,6 +20,8 @@ import (
"github.com/traefik/traefik/v3/pkg/testhelpers"
)
func pointer[T any](v T) *T { return &v }
func TestGetLoadBalancer(t *testing.T) {
sm := Manager{
transportManager: &transportManagerMock{},
@ -235,7 +237,7 @@ func TestGetLoadBalancerServiceHandler(t *testing.T) {
serviceName: "test",
service: &dynamic.ServersLoadBalancer{
Sticky: &dynamic.Sticky{Cookie: &dynamic.Cookie{}},
PassHostHeader: func(v bool) *bool { return &v }(true),
PassHostHeader: pointer(true),
Servers: []dynamic.Server{
{
URL: serverPassHost.URL,
@ -253,7 +255,7 @@ func TestGetLoadBalancerServiceHandler(t *testing.T) {
desc: "PassHost doesn't pass the host instead of the IP",
serviceName: "test",
service: &dynamic.ServersLoadBalancer{
PassHostHeader: boolPtr(false),
PassHostHeader: pointer(false),
Sticky: &dynamic.Sticky{Cookie: &dynamic.Cookie{}},
Servers: []dynamic.Server{
{
@ -448,6 +450,48 @@ func Test1xxResponses(t *testing.T) {
}
}
type serviceBuilderFunc func(ctx context.Context, serviceName string) (http.Handler, error)
func (s serviceBuilderFunc) BuildHTTP(ctx context.Context, serviceName string) (http.Handler, error) {
return s(ctx, serviceName)
}
type internalHandler struct{}
func (internalHandler) ServeHTTP(_ http.ResponseWriter, _ *http.Request) {}
func TestManager_ServiceBuilders(t *testing.T) {
var internalHandler internalHandler
manager := NewManager(map[string]*runtime.ServiceInfo{
"test@test": {
Service: &dynamic.Service{
LoadBalancer: &dynamic.ServersLoadBalancer{},
},
},
}, nil, nil, &TransportManager{
roundTrippers: map[string]http.RoundTripper{
"default@internal": http.DefaultTransport,
},
}, nil, serviceBuilderFunc(func(rootCtx context.Context, serviceName string) (http.Handler, error) {
if strings.HasSuffix(serviceName, "@internal") {
return internalHandler, nil
}
return nil, nil
}))
h, err := manager.BuildHTTP(context.Background(), "test@internal")
require.NoError(t, err)
assert.Equal(t, internalHandler, h)
h, err = manager.BuildHTTP(context.Background(), "test@test")
require.NoError(t, err)
assert.NotNil(t, h)
_, err = manager.BuildHTTP(context.Background(), "wrong@test")
assert.Error(t, err)
}
func TestManager_Build(t *testing.T) {
testCases := []struct {
desc string

View file

@ -27,10 +27,6 @@ import (
"github.com/traefik/traefik/v3/pkg/types"
)
func Int32(i int32) *int32 {
return &i
}
// LocalhostCert is a PEM-encoded TLS cert
// for host example.com, www.example.com
// expiring at Jan 29 16:00:00 2084 GMT.
@ -128,7 +124,7 @@ func TestKeepConnectionWhenSameConfiguration(t *testing.T) {
rw.WriteHeader(http.StatusOK)
}))
connCount := Int32(0)
connCount := pointer[int32](0)
srv.Config.ConnState = func(conn net.Conn, state http.ConnState) {
if state == http.StateNew {
atomic.AddInt32(connCount, 1)