Add TCP Servers Transports support

Co-authored-by: Romain <rtribotte@users.noreply.github.com>
This commit is contained in:
Simon Delicata 2022-12-09 09:58:05 +01:00 committed by GitHub
parent c2dac39da1
commit 3eeea2bb2b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
101 changed files with 5956 additions and 1669 deletions

View file

@ -21,9 +21,10 @@ func mergeConfiguration(configurations dynamic.Configurations, defaultEntryPoint
ServersTransports: make(map[string]*dynamic.ServersTransport),
},
TCP: &dynamic.TCPConfiguration{
Routers: make(map[string]*dynamic.TCPRouter),
Services: make(map[string]*dynamic.TCPService),
Middlewares: make(map[string]*dynamic.TCPMiddleware),
Routers: make(map[string]*dynamic.TCPRouter),
Services: make(map[string]*dynamic.TCPService),
Middlewares: make(map[string]*dynamic.TCPMiddleware),
ServersTransports: make(map[string]*dynamic.TCPServersTransport),
},
UDP: &dynamic.UDPConfiguration{
Routers: make(map[string]*dynamic.UDPRouter),
@ -80,6 +81,9 @@ func mergeConfiguration(configurations dynamic.Configurations, defaultEntryPoint
for serviceName, service := range configuration.TCP.Services {
conf.TCP.Services[provider.MakeQualifiedName(pvd, serviceName)] = service
}
for serversTransportName, serversTransport := range configuration.TCP.ServersTransports {
conf.TCP.ServersTransports[provider.MakeQualifiedName(pvd, serversTransportName)] = serversTransport
}
}
if configuration.UDP != nil {

View file

@ -473,6 +473,7 @@ func Test_mergeConfiguration_defaultTCPEntryPoint(t *testing.T) {
Services: map[string]*dynamic.TCPService{
"service-1@provider-1": {},
},
ServersTransports: make(map[string]*dynamic.TCPServersTransport),
}
actual := mergeConfiguration(given, []string{"defaultEP"})

View file

@ -209,6 +209,15 @@ func logConfiguration(logger zerolog.Logger, configMsg dynamic.Message) {
}
}
if copyConf.TCP != nil {
for _, transport := range copyConf.TCP.ServersTransports {
if transport.TLS != nil {
transport.TLS.Certificates = tls.Certificates{}
transport.TLS.RootCAs = []tls.FileOrContent{}
}
}
}
jsonConf, err := json.Marshal(copyConf)
if err != nil {
logger.Error().Err(err).Msg("Could not marshal dynamic configuration")

View file

@ -89,9 +89,10 @@ func TestNewConfigurationWatcher(t *testing.T) {
th.WithLoadBalancerServices(),
),
TCP: &dynamic.TCPConfiguration{
Routers: map[string]*dynamic.TCPRouter{},
Middlewares: map[string]*dynamic.TCPMiddleware{},
Services: map[string]*dynamic.TCPService{},
Routers: map[string]*dynamic.TCPRouter{},
Middlewares: map[string]*dynamic.TCPMiddleware{},
Services: map[string]*dynamic.TCPService{},
ServersTransports: map[string]*dynamic.TCPServersTransport{},
},
TLS: &dynamic.TLSConfiguration{
Options: map[string]tls.Options{
@ -224,9 +225,10 @@ func TestIgnoreTransientConfiguration(t *testing.T) {
th.WithMiddlewares(),
),
TCP: &dynamic.TCPConfiguration{
Routers: map[string]*dynamic.TCPRouter{},
Middlewares: map[string]*dynamic.TCPMiddleware{},
Services: map[string]*dynamic.TCPService{},
Routers: map[string]*dynamic.TCPRouter{},
Middlewares: map[string]*dynamic.TCPMiddleware{},
Services: map[string]*dynamic.TCPService{},
ServersTransports: map[string]*dynamic.TCPServersTransport{},
},
UDP: &dynamic.UDPConfiguration{
Routers: map[string]*dynamic.UDPRouter{},
@ -392,9 +394,10 @@ func TestListenProvidersDoesNotSkipFlappingConfiguration(t *testing.T) {
th.WithMiddlewares(),
),
TCP: &dynamic.TCPConfiguration{
Routers: map[string]*dynamic.TCPRouter{},
Middlewares: map[string]*dynamic.TCPMiddleware{},
Services: map[string]*dynamic.TCPService{},
Routers: map[string]*dynamic.TCPRouter{},
Middlewares: map[string]*dynamic.TCPMiddleware{},
Services: map[string]*dynamic.TCPService{},
ServersTransports: map[string]*dynamic.TCPServersTransport{},
},
UDP: &dynamic.UDPConfiguration{
Routers: map[string]*dynamic.UDPRouter{},
@ -481,9 +484,10 @@ func TestListenProvidersIgnoreSameConfig(t *testing.T) {
th.WithMiddlewares(),
),
TCP: &dynamic.TCPConfiguration{
Routers: map[string]*dynamic.TCPRouter{},
Middlewares: map[string]*dynamic.TCPMiddleware{},
Services: map[string]*dynamic.TCPService{},
Routers: map[string]*dynamic.TCPRouter{},
Middlewares: map[string]*dynamic.TCPMiddleware{},
Services: map[string]*dynamic.TCPService{},
ServersTransports: map[string]*dynamic.TCPServersTransport{},
},
UDP: &dynamic.UDPConfiguration{
Routers: map[string]*dynamic.UDPRouter{},
@ -615,9 +619,10 @@ func TestListenProvidersIgnoreIntermediateConfigs(t *testing.T) {
th.WithMiddlewares(),
),
TCP: &dynamic.TCPConfiguration{
Routers: map[string]*dynamic.TCPRouter{},
Middlewares: map[string]*dynamic.TCPMiddleware{},
Services: map[string]*dynamic.TCPService{},
Routers: map[string]*dynamic.TCPRouter{},
Middlewares: map[string]*dynamic.TCPMiddleware{},
Services: map[string]*dynamic.TCPService{},
ServersTransports: map[string]*dynamic.TCPServersTransport{},
},
UDP: &dynamic.UDPConfiguration{
Routers: map[string]*dynamic.UDPRouter{},
@ -682,9 +687,10 @@ func TestListenProvidersPublishesConfigForEachProvider(t *testing.T) {
th.WithMiddlewares(),
),
TCP: &dynamic.TCPConfiguration{
Routers: map[string]*dynamic.TCPRouter{},
Middlewares: map[string]*dynamic.TCPMiddleware{},
Services: map[string]*dynamic.TCPService{},
Routers: map[string]*dynamic.TCPRouter{},
Middlewares: map[string]*dynamic.TCPMiddleware{},
Services: map[string]*dynamic.TCPService{},
ServersTransports: map[string]*dynamic.TCPServersTransport{},
},
TLS: &dynamic.TLSConfiguration{
Options: map[string]tls.Options{

View file

@ -13,6 +13,7 @@ import (
"github.com/traefik/traefik/v2/pkg/config/runtime"
tcpmiddleware "github.com/traefik/traefik/v2/pkg/server/middleware/tcp"
"github.com/traefik/traefik/v2/pkg/server/service/tcp"
tcp2 "github.com/traefik/traefik/v2/pkg/tcp"
traefiktls "github.com/traefik/traefik/v2/pkg/tls"
)
@ -311,7 +312,9 @@ func TestRuntimeConfiguration(t *testing.T) {
TCPServices: test.tcpServiceConfig,
TCPRouters: test.tcpRouterConfig,
}
serviceManager := tcp.NewManager(conf)
dialerManager := tcp2.NewDialerManager(nil)
dialerManager.Update(map[string]*dynamic.TCPServersTransport{"default@internal": {}})
serviceManager := tcp.NewManager(conf, dialerManager)
tlsManager := traefiktls.NewManager()
tlsManager.UpdateConfigs(
context.Background(),
@ -622,7 +625,7 @@ func TestDomainFronting(t *testing.T) {
Routers: test.routers,
}
serviceManager := tcp.NewManager(conf)
serviceManager := tcp.NewManager(conf, tcp2.NewDialerManager(nil))
tlsManager := traefiktls.NewManager()
tlsManager.UpdateConfigs(context.Background(), map[string]traefiktls.Store{}, test.tlsOptions, []*traefiktls.CertAndStores{})

View file

@ -162,7 +162,9 @@ func Test_Routing(t *testing.T) {
},
}
serviceManager := tcp.NewManager(conf)
dialerManager := tcp2.NewDialerManager(nil)
dialerManager.Update(map[string]*dynamic.TCPServersTransport{"default@internal": {}})
serviceManager := tcp.NewManager(conf, dialerManager)
// Creates the tlsManager and defines the TLS 1.0 and 1.2 TLSOptions.
tlsManager := traefiktls.NewManager()

View file

@ -13,10 +13,11 @@ import (
tcprouter "github.com/traefik/traefik/v2/pkg/server/router/tcp"
udprouter "github.com/traefik/traefik/v2/pkg/server/router/udp"
"github.com/traefik/traefik/v2/pkg/server/service"
"github.com/traefik/traefik/v2/pkg/server/service/tcp"
"github.com/traefik/traefik/v2/pkg/server/service/udp"
tcpsvc "github.com/traefik/traefik/v2/pkg/server/service/tcp"
udpsvc "github.com/traefik/traefik/v2/pkg/server/service/udp"
"github.com/traefik/traefik/v2/pkg/tcp"
"github.com/traefik/traefik/v2/pkg/tls"
udptypes "github.com/traefik/traefik/v2/pkg/udp"
"github.com/traefik/traefik/v2/pkg/udp"
)
// RouterFactory the factory of TCP/UDP routers.
@ -32,12 +33,14 @@ type RouterFactory struct {
chainBuilder *middleware.ChainBuilder
tlsManager *tls.Manager
dialerManager *tcp.DialerManager
cancelPrevState func()
}
// NewRouterFactory creates a new RouterFactory.
func NewRouterFactory(staticConfiguration static.Configuration, managerFactory *service.ManagerFactory, tlsManager *tls.Manager,
chainBuilder *middleware.ChainBuilder, pluginBuilder middleware.PluginsBuilder, metricsRegistry metrics.Registry,
chainBuilder *middleware.ChainBuilder, pluginBuilder middleware.PluginsBuilder, metricsRegistry metrics.Registry, dialerManager *tcp.DialerManager,
) *RouterFactory {
var entryPointsTCP, entryPointsUDP []string
for name, cfg := range staticConfiguration.EntryPoints {
@ -62,11 +65,12 @@ func NewRouterFactory(staticConfiguration static.Configuration, managerFactory *
tlsManager: tlsManager,
chainBuilder: chainBuilder,
pluginBuilder: pluginBuilder,
dialerManager: dialerManager,
}
}
// CreateRouters creates new TCPRouters and UDPRouters.
func (f *RouterFactory) CreateRouters(rtConf *runtime.Configuration) (map[string]*tcprouter.Router, map[string]udptypes.Handler) {
func (f *RouterFactory) CreateRouters(rtConf *runtime.Configuration) (map[string]*tcprouter.Router, map[string]udp.Handler) {
if f.cancelPrevState != nil {
f.cancelPrevState()
}
@ -87,7 +91,7 @@ func (f *RouterFactory) CreateRouters(rtConf *runtime.Configuration) (map[string
serviceManager.LaunchHealthCheck(ctx)
// TCP
svcTCPManager := tcp.NewManager(rtConf)
svcTCPManager := tcpsvc.NewManager(rtConf, f.dialerManager)
middlewaresTCPBuilder := tcpmiddleware.NewBuilder(rtConf.TCPMiddlewares)
@ -95,7 +99,7 @@ func (f *RouterFactory) CreateRouters(rtConf *runtime.Configuration) (map[string
routersTCP := rtTCPManager.BuildHandlers(ctx, f.entryPointsTCP)
// UDP
svcUDPManager := udp.NewManager(rtConf)
svcUDPManager := udpsvc.NewManager(rtConf)
rtUDPManager := udprouter.NewManager(rtConf, svcUDPManager)
routersUDP := rtUDPManager.BuildHandlers(ctx, f.entryPointsUDP)

View file

@ -12,6 +12,7 @@ import (
"github.com/traefik/traefik/v2/pkg/metrics"
"github.com/traefik/traefik/v2/pkg/server/middleware"
"github.com/traefik/traefik/v2/pkg/server/service"
"github.com/traefik/traefik/v2/pkg/tcp"
th "github.com/traefik/traefik/v2/pkg/testhelpers"
"github.com/traefik/traefik/v2/pkg/tls"
)
@ -53,7 +54,9 @@ func TestReuseService(t *testing.T) {
managerFactory := service.NewManagerFactory(staticConfig, nil, metrics.NewVoidRegistry(), roundTripperManager, nil)
tlsManager := tls.NewManager()
factory := NewRouterFactory(staticConfig, managerFactory, tlsManager, middleware.NewChainBuilder(nil, nil, nil), nil, metrics.NewVoidRegistry())
dialerManager := tcp.NewDialerManager(nil)
dialerManager.Update(map[string]*dynamic.TCPServersTransport{"default@internal": {}})
factory := NewRouterFactory(staticConfig, managerFactory, tlsManager, middleware.NewChainBuilder(nil, nil, nil), nil, metrics.NewVoidRegistry(), dialerManager)
entryPointsHandlers, _ := factory.CreateRouters(runtime.NewConfig(dynamic.Configuration{HTTP: dynamicConfigs}))
@ -189,7 +192,9 @@ func TestServerResponseEmptyBackend(t *testing.T) {
managerFactory := service.NewManagerFactory(staticConfig, nil, metrics.NewVoidRegistry(), roundTripperManager, nil)
tlsManager := tls.NewManager()
factory := NewRouterFactory(staticConfig, managerFactory, tlsManager, middleware.NewChainBuilder(nil, nil, nil), nil, metrics.NewVoidRegistry())
dialerManager := tcp.NewDialerManager(nil)
dialerManager.Update(map[string]*dynamic.TCPServersTransport{"default@internal": {}})
factory := NewRouterFactory(staticConfig, managerFactory, tlsManager, middleware.NewChainBuilder(nil, nil, nil), nil, metrics.NewVoidRegistry(), dialerManager)
entryPointsHandlers, _ := factory.CreateRouters(runtime.NewConfig(dynamic.Configuration{HTTP: test.config(testServer.URL)}))
@ -232,7 +237,9 @@ func TestInternalServices(t *testing.T) {
voidRegistry := metrics.NewVoidRegistry()
factory := NewRouterFactory(staticConfig, managerFactory, tlsManager, middleware.NewChainBuilder(voidRegistry, nil, nil), nil, voidRegistry)
dialerManager := tcp.NewDialerManager(nil)
dialerManager.Update(map[string]*dynamic.TCPServersTransport{"default@internal": {}})
factory := NewRouterFactory(staticConfig, managerFactory, tlsManager, middleware.NewChainBuilder(voidRegistry, nil, nil), nil, voidRegistry, dialerManager)
entryPointsHandlers, _ := factory.CreateRouters(runtime.NewConfig(dynamic.Configuration{HTTP: dynamicConfigs}))

View file

@ -17,15 +17,17 @@ import (
// Manager is the TCPHandlers factory.
type Manager struct {
configs map[string]*runtime.TCPServiceInfo
rand *rand.Rand // For the initial shuffling of load-balancers.
dialerManager *tcp.DialerManager
configs map[string]*runtime.TCPServiceInfo
rand *rand.Rand // For the initial shuffling of load-balancers.
}
// NewManager creates a new manager.
func NewManager(conf *runtime.Configuration) *Manager {
func NewManager(conf *runtime.Configuration, dialerManager *tcp.DialerManager) *Manager {
return &Manager{
configs: conf.TCPServices,
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
dialerManager: dialerManager,
configs: conf.TCPServices,
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
}
}
@ -51,11 +53,9 @@ func (m *Manager) BuildTCP(rootCtx context.Context, serviceName string) (tcp.Han
case conf.LoadBalancer != nil:
loadBalancer := tcp.NewWRRLoadBalancer()
if conf.LoadBalancer.TerminationDelay == nil {
defaultTerminationDelay := 100
conf.LoadBalancer.TerminationDelay = &defaultTerminationDelay
if len(conf.LoadBalancer.ServersTransport) > 0 {
conf.LoadBalancer.ServersTransport = provider.GetQualifiedName(ctx, conf.LoadBalancer.ServersTransport)
}
duration := time.Duration(*conf.LoadBalancer.TerminationDelay) * time.Millisecond
for index, server := range shuffle(conf.LoadBalancer.Servers, m.rand) {
srvLogger := logger.With().
@ -67,7 +67,12 @@ func (m *Manager) BuildTCP(rootCtx context.Context, serviceName string) (tcp.Han
continue
}
handler, err := tcp.NewProxy(server.Address, duration, conf.LoadBalancer.ProxyProtocol)
dialer, err := m.dialerManager.Get(conf.LoadBalancer.ServersTransport, server.TLS)
if err != nil {
return nil, err
}
handler, err := tcp.NewProxy(server.Address, conf.LoadBalancer.ProxyProtocol, dialer)
if err != nil {
srvLogger.Error().Err(err).Msg("Failed to create server")
continue

View file

@ -9,6 +9,7 @@ import (
"github.com/traefik/traefik/v2/pkg/config/dynamic"
"github.com/traefik/traefik/v2/pkg/config/runtime"
"github.com/traefik/traefik/v2/pkg/server/provider"
"github.com/traefik/traefik/v2/pkg/tcp"
)
func TestManager_BuildTCP(t *testing.T) {
@ -16,6 +17,7 @@ func TestManager_BuildTCP(t *testing.T) {
desc string
serviceName string
configs map[string]*runtime.TCPServiceInfo
stConfigs map[string]*dynamic.TCPServersTransport
providerName string
expectedError string
}{
@ -38,6 +40,7 @@ func TestManager_BuildTCP(t *testing.T) {
{
desc: "no such host, server is skipped, error is logged",
serviceName: "test",
stConfigs: map[string]*dynamic.TCPServersTransport{"default@internal": {}},
configs: map[string]*runtime.TCPServiceInfo{
"test": {
TCPService: &dynamic.TCPService{
@ -102,6 +105,7 @@ func TestManager_BuildTCP(t *testing.T) {
{
desc: "Server with correct host:port as address",
serviceName: "serviceName",
stConfigs: map[string]*dynamic.TCPServersTransport{"default@internal": {}},
configs: map[string]*runtime.TCPServiceInfo{
"serviceName@provider-1": {
TCPService: &dynamic.TCPService{
@ -120,6 +124,7 @@ func TestManager_BuildTCP(t *testing.T) {
{
desc: "Server with correct ip:port as address",
serviceName: "serviceName",
stConfigs: map[string]*dynamic.TCPServersTransport{"default@internal": {}},
configs: map[string]*runtime.TCPServiceInfo{
"serviceName@provider-1": {
TCPService: &dynamic.TCPService{
@ -135,6 +140,24 @@ func TestManager_BuildTCP(t *testing.T) {
},
providerName: "provider-1",
},
{
desc: "empty server address, server is skipped, error is logged",
serviceName: "serviceName",
configs: map[string]*runtime.TCPServiceInfo{
"serviceName@provider-1": {
TCPService: &dynamic.TCPService{
LoadBalancer: &dynamic.TCPServersLoadBalancer{
Servers: []dynamic.TCPServer{
{
Address: "",
},
},
},
},
},
},
providerName: "provider-1",
},
{
desc: "missing port in address with hostname, server is skipped, error is logged",
serviceName: "serviceName",
@ -171,6 +194,46 @@ func TestManager_BuildTCP(t *testing.T) {
},
providerName: "provider-1",
},
{
desc: "user defined serversTransport reference",
serviceName: "serviceName",
stConfigs: map[string]*dynamic.TCPServersTransport{"myServersTransport@provider-1": {}},
configs: map[string]*runtime.TCPServiceInfo{
"serviceName@provider-1": {
TCPService: &dynamic.TCPService{
LoadBalancer: &dynamic.TCPServersLoadBalancer{
Servers: []dynamic.TCPServer{
{
Address: "192.168.0.12:80",
},
},
ServersTransport: "myServersTransport@provider-1",
},
},
},
},
providerName: "provider-1",
},
{
desc: "user defined serversTransport reference not found",
serviceName: "serviceName",
configs: map[string]*runtime.TCPServiceInfo{
"serviceName@provider-1": {
TCPService: &dynamic.TCPService{
LoadBalancer: &dynamic.TCPServersLoadBalancer{
Servers: []dynamic.TCPServer{
{
Address: "192.168.0.12:80",
},
},
ServersTransport: "myServersTransport@provider-1",
},
},
},
},
providerName: "provider-1",
expectedError: "TCP dialer not found myServersTransport@provider-1",
},
}
for _, test := range testCases {
@ -178,9 +241,14 @@ func TestManager_BuildTCP(t *testing.T) {
t.Run(test.desc, func(t *testing.T) {
t.Parallel()
dialerManager := tcp.NewDialerManager(nil)
if test.stConfigs != nil {
dialerManager.Update(test.stConfigs)
}
manager := NewManager(&runtime.Configuration{
TCPServices: test.configs,
})
}, dialerManager)
ctx := context.Background()
if len(test.providerName) > 0 {