1
0
Fork 0

UDP support

Co-authored-by: Julien Salleyron <julien.salleyron@gmail.com>
This commit is contained in:
mpl 2020-02-11 01:26:04 +01:00 committed by GitHub
parent 8988c8f9af
commit 115d42e0f0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
72 changed files with 4730 additions and 321 deletions

View file

@ -18,6 +18,10 @@ func mergeConfiguration(configurations dynamic.Configurations) dynamic.Configura
Routers: make(map[string]*dynamic.TCPRouter),
Services: make(map[string]*dynamic.TCPService),
},
UDP: &dynamic.UDPConfiguration{
Routers: make(map[string]*dynamic.UDPRouter),
Services: make(map[string]*dynamic.UDPService),
},
TLS: &dynamic.TLSConfiguration{
Stores: make(map[string]tls.Store),
Options: make(map[string]tls.Options),
@ -47,6 +51,15 @@ func mergeConfiguration(configurations dynamic.Configurations) dynamic.Configura
}
}
if configuration.UDP != nil {
for routerName, router := range configuration.UDP.Routers {
conf.UDP.Routers[provider.MakeQualifiedName(pvd, routerName)] = router
}
for serviceName, service := range configuration.UDP.Services {
conf.UDP.Services[provider.MakeQualifiedName(pvd, serviceName)] = service
}
}
if configuration.TLS != nil {
conf.TLS.Certificates = append(conf.TLS.Certificates, configuration.TLS.Certificates...)

View file

@ -79,6 +79,10 @@ func TestNewConfigurationWatcher(t *testing.T) {
},
Stores: map[string]tls.Store{},
},
UDP: &dynamic.UDPConfiguration{
Routers: map[string]*dynamic.UDPRouter{},
Services: map[string]*dynamic.UDPService{},
},
}
assert.Equal(t, expected, conf)
@ -222,6 +226,10 @@ func TestListenProvidersPublishesConfigForEachProvider(t *testing.T) {
},
Stores: map[string]tls.Store{},
},
UDP: &dynamic.UDPConfiguration{
Routers: map[string]*dynamic.UDPRouter{},
Services: map[string]*dynamic.UDPService{},
},
}
assert.Equal(t, expected, publishedProviderConfig)

View file

@ -7,7 +7,7 @@ import (
"github.com/stretchr/testify/assert"
)
func TestAddProviderInContext(t *testing.T) {
func TestAddInContext(t *testing.T) {
testCases := []struct {
desc string
ctx context.Context

View file

@ -0,0 +1,87 @@
package udp
import (
"context"
"errors"
"github.com/containous/traefik/v2/pkg/config/runtime"
"github.com/containous/traefik/v2/pkg/log"
"github.com/containous/traefik/v2/pkg/server/provider"
udpservice "github.com/containous/traefik/v2/pkg/server/service/udp"
"github.com/containous/traefik/v2/pkg/udp"
)
// NewManager Creates a new Manager
func NewManager(conf *runtime.Configuration,
serviceManager *udpservice.Manager,
) *Manager {
return &Manager{
serviceManager: serviceManager,
conf: conf,
}
}
// Manager is a route/router manager
type Manager struct {
serviceManager *udpservice.Manager
conf *runtime.Configuration
}
func (m *Manager) getUDPRouters(ctx context.Context, entryPoints []string) map[string]map[string]*runtime.UDPRouterInfo {
if m.conf != nil {
return m.conf.GetUDPRoutersByEntryPoints(ctx, entryPoints)
}
return make(map[string]map[string]*runtime.UDPRouterInfo)
}
// BuildHandlers builds the handlers for the given entrypoints
func (m *Manager) BuildHandlers(rootCtx context.Context, entryPoints []string) map[string]udp.Handler {
entryPointsRouters := m.getUDPRouters(rootCtx, entryPoints)
entryPointHandlers := make(map[string]udp.Handler)
for _, entryPointName := range entryPoints {
entryPointName := entryPointName
routers := entryPointsRouters[entryPointName]
ctx := log.With(rootCtx, log.Str(log.EntryPointName, entryPointName))
handler, err := m.buildEntryPointHandler(ctx, routers)
if err != nil {
log.FromContext(ctx).Error(err)
continue
}
entryPointHandlers[entryPointName] = handler
}
return entryPointHandlers
}
func (m *Manager) buildEntryPointHandler(ctx context.Context, configs map[string]*runtime.UDPRouterInfo) (udp.Handler, error) {
logger := log.FromContext(ctx)
if len(configs) > 1 {
logger.Warn("Warning: config has more than one udp router for a given entrypoint")
}
for routerName, routerConfig := range configs {
ctxRouter := log.With(provider.AddInContext(ctx, routerName), log.Str(log.RouterName, routerName))
logger := log.FromContext(ctxRouter)
if routerConfig.Service == "" {
err := errors.New("the service is missing on the udp router")
routerConfig.AddError(err, true)
logger.Error(err)
continue
}
handler, err := m.serviceManager.BuildUDP(ctxRouter, routerConfig.Service)
if err != nil {
routerConfig.AddError(err, true)
logger.Error(err)
continue
}
return handler, nil
}
return nil, nil
}

View file

@ -0,0 +1,144 @@
package udp
import (
"context"
"testing"
"github.com/containous/traefik/v2/pkg/config/dynamic"
"github.com/containous/traefik/v2/pkg/config/runtime"
"github.com/containous/traefik/v2/pkg/server/service/udp"
"github.com/stretchr/testify/assert"
)
func TestRuntimeConfiguration(t *testing.T) {
testCases := []struct {
desc string
serviceConfig map[string]*runtime.UDPServiceInfo
routerConfig map[string]*runtime.UDPRouterInfo
expectedError int
}{
{
desc: "No error",
serviceConfig: map[string]*runtime.UDPServiceInfo{
"foo-service": {
UDPService: &dynamic.UDPService{
LoadBalancer: &dynamic.UDPServersLoadBalancer{
Servers: []dynamic.UDPServer{
{
Port: "8085",
Address: "127.0.0.1:8085",
},
{
Address: "127.0.0.1:8086",
Port: "8086",
},
},
},
},
},
},
routerConfig: map[string]*runtime.UDPRouterInfo{
"foo": {
UDPRouter: &dynamic.UDPRouter{
EntryPoints: []string{"web"},
Service: "foo-service",
},
},
"bar": {
UDPRouter: &dynamic.UDPRouter{
EntryPoints: []string{"web"},
Service: "foo-service",
},
},
},
expectedError: 0,
},
{
desc: "Router with unknown service",
serviceConfig: map[string]*runtime.UDPServiceInfo{
"foo-service": {
UDPService: &dynamic.UDPService{
LoadBalancer: &dynamic.UDPServersLoadBalancer{
Servers: []dynamic.UDPServer{
{
Address: "127.0.0.1:80",
},
},
},
},
},
},
routerConfig: map[string]*runtime.UDPRouterInfo{
"foo": {
UDPRouter: &dynamic.UDPRouter{
EntryPoints: []string{"web"},
Service: "wrong-service",
},
},
"bar": {
UDPRouter: &dynamic.UDPRouter{
EntryPoints: []string{"web"},
Service: "foo-service",
},
},
},
expectedError: 1,
},
{
desc: "Router with broken service",
serviceConfig: map[string]*runtime.UDPServiceInfo{
"foo-service": {
UDPService: &dynamic.UDPService{
LoadBalancer: nil,
},
},
},
routerConfig: map[string]*runtime.UDPRouterInfo{
"bar": {
UDPRouter: &dynamic.UDPRouter{
EntryPoints: []string{"web"},
Service: "foo-service",
},
},
},
expectedError: 2,
},
}
for _, test := range testCases {
test := test
t.Run(test.desc, func(t *testing.T) {
t.Parallel()
entryPoints := []string{"web"}
conf := &runtime.Configuration{
UDPServices: test.serviceConfig,
UDPRouters: test.routerConfig,
}
serviceManager := udp.NewManager(conf)
routerManager := NewManager(conf, serviceManager)
_ = routerManager.BuildHandlers(context.Background(), entryPoints)
// even though conf was passed by argument to the manager builders above,
// it's ok to use it as the result we check, because everything worth checking
// can be accessed by pointers in it.
var allErrors int
for _, v := range conf.UDPServices {
if v.Err != nil {
allErrors++
}
}
for _, v := range conf.UDPRouters {
if len(v.Err) > 0 {
allErrors++
}
}
assert.Equal(t, test.expectedError, allErrors)
})
}
}

View file

@ -0,0 +1,91 @@
package server
import (
"context"
"github.com/containous/traefik/v2/pkg/config/dynamic"
"github.com/containous/traefik/v2/pkg/config/runtime"
"github.com/containous/traefik/v2/pkg/config/static"
"github.com/containous/traefik/v2/pkg/log"
"github.com/containous/traefik/v2/pkg/responsemodifiers"
"github.com/containous/traefik/v2/pkg/server/middleware"
"github.com/containous/traefik/v2/pkg/server/router"
routertcp "github.com/containous/traefik/v2/pkg/server/router/tcp"
routerudp "github.com/containous/traefik/v2/pkg/server/router/udp"
"github.com/containous/traefik/v2/pkg/server/service"
"github.com/containous/traefik/v2/pkg/server/service/tcp"
"github.com/containous/traefik/v2/pkg/server/service/udp"
tcpCore "github.com/containous/traefik/v2/pkg/tcp"
"github.com/containous/traefik/v2/pkg/tls"
udpCore "github.com/containous/traefik/v2/pkg/udp"
)
// RouterFactory the factory of TCP/UDP routers.
type RouterFactory struct {
entryPointsTCP []string
entryPointsUDP []string
managerFactory *service.ManagerFactory
chainBuilder *middleware.ChainBuilder
tlsManager *tls.Manager
}
// NewRouterFactory creates a new RouterFactory
func NewRouterFactory(staticConfiguration static.Configuration, managerFactory *service.ManagerFactory, tlsManager *tls.Manager, chainBuilder *middleware.ChainBuilder) *RouterFactory {
var entryPointsTCP, entryPointsUDP []string
for name, cfg := range staticConfiguration.EntryPoints {
protocol, err := cfg.GetProtocol()
if err != nil {
// Should never happen because Traefik should not start if protocol is invalid.
log.WithoutContext().Errorf("Invalid protocol: %v", err)
}
if protocol == "udp" {
entryPointsUDP = append(entryPointsUDP, name)
} else {
entryPointsTCP = append(entryPointsTCP, name)
}
}
return &RouterFactory{
entryPointsTCP: entryPointsTCP,
entryPointsUDP: entryPointsUDP,
managerFactory: managerFactory,
tlsManager: tlsManager,
chainBuilder: chainBuilder,
}
}
// CreateRouters creates new TCPRouters and UDPRouters
func (f *RouterFactory) CreateRouters(conf dynamic.Configuration) (map[string]*tcpCore.Router, map[string]udpCore.Handler) {
ctx := context.Background()
rtConf := runtime.NewConfig(conf)
// HTTP
serviceManager := f.managerFactory.Build(rtConf)
middlewaresBuilder := middleware.NewBuilder(rtConf.Middlewares, serviceManager)
responseModifierFactory := responsemodifiers.NewBuilder(rtConf.Middlewares)
routerManager := router.NewManager(rtConf, serviceManager, middlewaresBuilder, responseModifierFactory, f.chainBuilder)
handlersNonTLS := routerManager.BuildHandlers(ctx, f.entryPointsTCP, false)
handlersTLS := routerManager.BuildHandlers(ctx, f.entryPointsTCP, true)
// TCP
svcTCPManager := tcp.NewManager(rtConf)
rtTCPManager := routertcp.NewManager(rtConf, svcTCPManager, handlersNonTLS, handlersTLS, f.tlsManager)
routersTCP := rtTCPManager.BuildHandlers(ctx, f.entryPointsTCP)
// UDP
svcUDPManager := udp.NewManager(rtConf)
rtUDPManager := routerudp.NewManager(rtConf, svcUDPManager)
routersUDP := rtUDPManager.BuildHandlers(ctx, f.entryPointsUDP)
rtConf.PopulateUsedBy()
return routersTCP, routersUDP
}

View file

@ -49,9 +49,9 @@ func TestReuseService(t *testing.T) {
managerFactory := service.NewManagerFactory(staticConfig, nil, metrics.NewVoidRegistry())
tlsManager := tls.NewManager()
factory := NewTCPRouterFactory(staticConfig, managerFactory, tlsManager, middleware.NewChainBuilder(staticConfig, metrics.NewVoidRegistry(), nil))
factory := NewRouterFactory(staticConfig, managerFactory, tlsManager, middleware.NewChainBuilder(staticConfig, metrics.NewVoidRegistry(), nil))
entryPointsHandlers := factory.CreateTCPRouters(dynamic.Configuration{HTTP: dynamicConfigs})
entryPointsHandlers, _ := factory.CreateRouters(dynamic.Configuration{HTTP: dynamicConfigs})
// Test that the /ok path returns a status 200.
responseRecorderOk := &httptest.ResponseRecorder{}
@ -183,9 +183,9 @@ func TestServerResponseEmptyBackend(t *testing.T) {
managerFactory := service.NewManagerFactory(staticConfig, nil, metrics.NewVoidRegistry())
tlsManager := tls.NewManager()
factory := NewTCPRouterFactory(staticConfig, managerFactory, tlsManager, middleware.NewChainBuilder(staticConfig, metrics.NewVoidRegistry(), nil))
factory := NewRouterFactory(staticConfig, managerFactory, tlsManager, middleware.NewChainBuilder(staticConfig, metrics.NewVoidRegistry(), nil))
entryPointsHandlers := factory.CreateTCPRouters(dynamic.Configuration{HTTP: test.config(testServer.URL)})
entryPointsHandlers, _ := factory.CreateRouters(dynamic.Configuration{HTTP: test.config(testServer.URL)})
responseRecorder := &httptest.ResponseRecorder{}
request := httptest.NewRequest(http.MethodGet, testServer.URL+requestPath, nil)
@ -221,9 +221,9 @@ func TestInternalServices(t *testing.T) {
managerFactory := service.NewManagerFactory(staticConfig, nil, metrics.NewVoidRegistry())
tlsManager := tls.NewManager()
factory := NewTCPRouterFactory(staticConfig, managerFactory, tlsManager, middleware.NewChainBuilder(staticConfig, metrics.NewVoidRegistry(), nil))
factory := NewRouterFactory(staticConfig, managerFactory, tlsManager, middleware.NewChainBuilder(staticConfig, metrics.NewVoidRegistry(), nil))
entryPointsHandlers := factory.CreateTCPRouters(dynamic.Configuration{HTTP: dynamicConfigs})
entryPointsHandlers, _ := factory.CreateRouters(dynamic.Configuration{HTTP: dynamicConfigs})
// Test that the /ok path returns a status 200.
responseRecorderOk := &httptest.ResponseRecorder{}

View file

@ -17,6 +17,7 @@ import (
type Server struct {
watcher *ConfigurationWatcher
tcpEntryPoints TCPEntryPoints
udpEntryPoints UDPEntryPoints
chainBuilder *middleware.ChainBuilder
accessLoggerMiddleware *accesslog.Handler
@ -28,7 +29,7 @@ type Server struct {
}
// NewServer returns an initialized Server.
func NewServer(routinesPool *safe.Pool, entryPoints TCPEntryPoints, watcher *ConfigurationWatcher,
func NewServer(routinesPool *safe.Pool, entryPoints TCPEntryPoints, entryPointsUDP UDPEntryPoints, watcher *ConfigurationWatcher,
chainBuilder *middleware.ChainBuilder, accessLoggerMiddleware *accesslog.Handler) *Server {
srv := &Server{
watcher: watcher,
@ -38,6 +39,7 @@ func NewServer(routinesPool *safe.Pool, entryPoints TCPEntryPoints, watcher *Con
signals: make(chan os.Signal, 1),
stopChan: make(chan bool, 1),
routinesPool: routinesPool,
udpEntryPoints: entryPointsUDP,
}
srv.configureSignals()
@ -56,6 +58,7 @@ func (s *Server) Start(ctx context.Context) {
}()
s.tcpEntryPoints.Start()
s.udpEntryPoints.Start()
s.watcher.Start()
s.routinesPool.GoCtx(s.listenSignals)
@ -71,6 +74,7 @@ func (s *Server) Stop() {
defer log.WithoutContext().Info("Server stopped")
s.tcpEntryPoints.Stop()
s.udpEntryPoints.Stop()
s.stopChan <- true
}

View file

@ -55,9 +55,17 @@ type TCPEntryPoints map[string]*TCPEntryPoint
func NewTCPEntryPoints(entryPointsConfig static.EntryPoints) (TCPEntryPoints, error) {
serverEntryPointsTCP := make(TCPEntryPoints)
for entryPointName, config := range entryPointsConfig {
protocol, err := config.GetProtocol()
if err != nil {
return nil, fmt.Errorf("error while building entryPoint %s: %v", entryPointName, err)
}
if protocol != "tcp" {
continue
}
ctx := log.With(context.Background(), log.Str(log.EntryPointName, entryPointName))
var err error
serverEntryPointsTCP[entryPointName], err = NewTCPEntryPoint(ctx, config)
if err != nil {
return nil, fmt.Errorf("error while building entryPoint %s: %v", entryPointName, err)
@ -70,7 +78,7 @@ func NewTCPEntryPoints(entryPointsConfig static.EntryPoints) (TCPEntryPoints, er
func (eps TCPEntryPoints) Start() {
for entryPointName, serverEntryPoint := range eps {
ctx := log.With(context.Background(), log.Str(log.EntryPointName, entryPointName))
go serverEntryPoint.StartTCP(ctx)
go serverEntryPoint.Start(ctx)
}
}
@ -149,8 +157,8 @@ func NewTCPEntryPoint(ctx context.Context, configuration *static.EntryPoint) (*T
}, nil
}
// StartTCP starts the TCP server.
func (e *TCPEntryPoint) StartTCP(ctx context.Context) {
// Start starts the TCP server.
func (e *TCPEntryPoint) Start(ctx context.Context) {
logger := log.FromContext(ctx)
logger.Debugf("Start TCP Server")
@ -370,7 +378,7 @@ func buildProxyProtocolListener(ctx context.Context, entryPoint *static.EntryPoi
}
func buildListener(ctx context.Context, entryPoint *static.EntryPoint) (net.Listener, error) {
listener, err := net.Listen("tcp", entryPoint.Address)
listener, err := net.Listen("tcp", entryPoint.GetAddress())
if err != nil {
return nil, fmt.Errorf("error opening listener: %v", err)

View file

@ -125,7 +125,7 @@ func testShutdown(t *testing.T, router *tcp.Router) {
}
func startEntrypoint(entryPoint *TCPEntryPoint, router *tcp.Router) (net.Conn, error) {
go entryPoint.StartTCP(context.Background())
go entryPoint.Start(context.Background())
entryPoint.SwitchRouter(router)

View file

@ -0,0 +1,135 @@
package server
import (
"context"
"fmt"
"net"
"sync"
"time"
"github.com/containous/traefik/v2/pkg/config/static"
"github.com/containous/traefik/v2/pkg/log"
"github.com/containous/traefik/v2/pkg/udp"
)
// UDPEntryPoints maps UDP entry points by their names.
type UDPEntryPoints map[string]*UDPEntryPoint
// NewUDPEntryPoints returns all the UDP entry points, keyed by name.
func NewUDPEntryPoints(cfg static.EntryPoints) (UDPEntryPoints, error) {
entryPoints := make(UDPEntryPoints)
for entryPointName, entryPoint := range cfg {
protocol, err := entryPoint.GetProtocol()
if err != nil {
return nil, fmt.Errorf("error while building entryPoint %s: %v", entryPointName, err)
}
if protocol != "udp" {
continue
}
ep, err := NewUDPEntryPoint(entryPoint)
if err != nil {
return nil, fmt.Errorf("error while building entryPoint %s: %v", entryPointName, err)
}
entryPoints[entryPointName] = ep
}
return entryPoints, nil
}
// Start commences the listening for all the entry points.
func (eps UDPEntryPoints) Start() {
for entryPointName, ep := range eps {
ctx := log.With(context.Background(), log.Str(log.EntryPointName, entryPointName))
go ep.Start(ctx)
}
}
// Stop makes all the entry points stop listening, and release associated resources.
func (eps UDPEntryPoints) Stop() {
var wg sync.WaitGroup
for epn, ep := range eps {
wg.Add(1)
go func(entryPointName string, entryPoint *UDPEntryPoint) {
defer wg.Done()
ctx := log.With(context.Background(), log.Str(log.EntryPointName, entryPointName))
entryPoint.Shutdown(ctx)
log.FromContext(ctx).Debugf("Entry point %s closed", entryPointName)
}(epn, ep)
}
wg.Wait()
}
// Switch swaps out all the given handlers in their associated entrypoints.
func (eps UDPEntryPoints) Switch(handlers map[string]udp.Handler) {
for epName, handler := range handlers {
if ep, ok := eps[epName]; ok {
ep.Switch(handler)
continue
}
log.WithoutContext().Errorf("EntryPoint %q does not exist", epName)
}
}
// UDPEntryPoint is an entry point where we listen for UDP packets.
type UDPEntryPoint struct {
listener *udp.Listener
switcher *udp.HandlerSwitcher
transportConfiguration *static.EntryPointsTransport
}
// NewUDPEntryPoint returns a UDP entry point.
func NewUDPEntryPoint(cfg *static.EntryPoint) (*UDPEntryPoint, error) {
addr, err := net.ResolveUDPAddr("udp", cfg.GetAddress())
if err != nil {
return nil, err
}
listener, err := udp.Listen("udp", addr)
if err != nil {
return nil, err
}
return &UDPEntryPoint{listener: listener, switcher: &udp.HandlerSwitcher{}, transportConfiguration: cfg.Transport}, nil
}
// Start commences the listening for ep.
func (ep *UDPEntryPoint) Start(ctx context.Context) {
log.FromContext(ctx).Debug("Start UDP Server")
for {
conn, err := ep.listener.Accept()
if err != nil {
// Only errClosedListener can happen that's why we return
return
}
go ep.switcher.ServeUDP(conn)
}
}
// Shutdown closes ep's listener. It eventually closes all "sessions" and
// releases associated resources, but only after it has waited for a graceTimeout,
// if any was configured.
func (ep *UDPEntryPoint) Shutdown(ctx context.Context) {
logger := log.FromContext(ctx)
reqAcceptGraceTimeOut := time.Duration(ep.transportConfiguration.LifeCycle.RequestAcceptGraceTimeout)
if reqAcceptGraceTimeOut > 0 {
logger.Infof("Waiting %s for incoming requests to cease", reqAcceptGraceTimeOut)
time.Sleep(reqAcceptGraceTimeOut)
}
graceTimeOut := time.Duration(ep.transportConfiguration.LifeCycle.GraceTimeOut)
if err := ep.listener.Shutdown(graceTimeOut); err != nil {
logger.Error(err)
}
}
// Switch replaces ep's handler with the one given as argument.
func (ep *UDPEntryPoint) Switch(handler udp.Handler) {
ep.switcher.Switch(handler)
}

View file

@ -0,0 +1,123 @@
package server
import (
"context"
"io"
"net"
"testing"
"time"
"github.com/containous/traefik/v2/pkg/config/static"
"github.com/containous/traefik/v2/pkg/types"
"github.com/containous/traefik/v2/pkg/udp"
"github.com/stretchr/testify/require"
)
func TestShutdownUDPConn(t *testing.T) {
entryPoint, err := NewUDPEntryPoint(&static.EntryPoint{
Address: ":0",
Transport: &static.EntryPointsTransport{
LifeCycle: &static.LifeCycle{
GraceTimeOut: types.Duration(5 * time.Second),
},
},
})
require.NoError(t, err)
go entryPoint.Start(context.Background())
entryPoint.Switch(udp.HandlerFunc(func(conn *udp.Conn) {
for {
b := make([]byte, 1024*1024)
n, err := conn.Read(b)
require.NoError(t, err)
// We control the termination, otherwise we would block on the Read above, until
// conn is closed by a timeout. Which means we would get an error, and even though
// we are in a goroutine and the current test might be over, go test would still
// yell at us if this happens while other tests are still running.
if string(b[:n]) == "CLOSE" {
return
}
_, err = conn.Write(b[:n])
require.NoError(t, err)
}
}))
conn, err := net.Dial("udp", entryPoint.listener.Addr().String())
require.NoError(t, err)
// Start sending packets, to create a "session" with the server.
requireEcho(t, "TEST", conn, time.Second)
doneChan := make(chan struct{})
go func() {
entryPoint.Shutdown(context.Background())
close(doneChan)
}()
// Make sure that our session is still live even after the shutdown.
requireEcho(t, "TEST2", conn, time.Second)
// And make sure that on the other hand, opening new sessions is not possible anymore.
conn2, err := net.Dial("udp", entryPoint.listener.Addr().String())
require.NoError(t, err)
_, err = conn2.Write([]byte("TEST"))
// Packet is accepted, but dropped
require.NoError(t, err)
// Make sure that our session is yet again still live. This is specifically to
// make sure we don't create a regression in listener's readLoop, i.e. that we only
// terminate the listener's readLoop goroutine by closing its pConn.
requireEcho(t, "TEST3", conn, time.Second)
done := make(chan bool)
go func() {
defer close(done)
b := make([]byte, 1024*1024)
n, err := conn2.Read(b)
require.Error(t, err)
require.Equal(t, 0, n)
}()
conn2.Close()
select {
case <-done:
case <-time.Tick(time.Second):
t.Fatal("Timeout")
}
_, err = conn.Write([]byte("CLOSE"))
require.NoError(t, err)
select {
case <-doneChan:
case <-time.Tick(time.Second * 5):
// In case we introduce a regression that would make the test wait forever.
t.Fatal("Timeout during shutdown")
}
}
// requireEcho tests that the conn session is live and functional, by writing
// data through it, and expecting the same data as a response when reading on it.
// It fatals if the read blocks longer than timeout, which is useful to detect
// regressions that would make a test wait forever.
func requireEcho(t *testing.T, data string, conn io.ReadWriter, timeout time.Duration) {
_, err := conn.Write([]byte(data))
require.NoError(t, err)
doneChan := make(chan struct{})
go func() {
b := make([]byte, 1024*1024)
n, err := conn.Read(b)
require.NoError(t, err)
require.Equal(t, data, string(b[:n]))
close(doneChan)
}()
select {
case <-doneChan:
case <-time.Tick(timeout):
t.Fatalf("Timeout during echo for: %s", data)
}
}

View file

@ -81,7 +81,7 @@ func (m *Manager) BuildTCP(rootCtx context.Context, serviceName string) (tcp.Han
}
return loadBalancer, nil
default:
err := fmt.Errorf("the service %q doesn't have any TCP load balancer", serviceQualifiedName)
err := fmt.Errorf("the service %q does not have any type defined", serviceQualifiedName)
conf.AddError(err, true)
return nil, err
}

View file

@ -33,7 +33,7 @@ func TestManager_BuildTCP(t *testing.T) {
TCPService: &dynamic.TCPService{},
},
},
expectedError: `the service "test" doesn't have any TCP load balancer`,
expectedError: `the service "test" does not have any type defined`,
},
{
desc: "no such host, server is skipped, error is logged",

View file

@ -0,0 +1,81 @@
package udp
import (
"context"
"errors"
"fmt"
"net"
"github.com/containous/traefik/v2/pkg/config/runtime"
"github.com/containous/traefik/v2/pkg/log"
"github.com/containous/traefik/v2/pkg/server/provider"
"github.com/containous/traefik/v2/pkg/udp"
)
// Manager handles UDP services creation.
type Manager struct {
configs map[string]*runtime.UDPServiceInfo
}
// NewManager creates a new manager
func NewManager(conf *runtime.Configuration) *Manager {
return &Manager{
configs: conf.UDPServices,
}
}
// BuildUDP creates the UDP handler for the given service name.
func (m *Manager) BuildUDP(rootCtx context.Context, serviceName string) (udp.Handler, error) {
serviceQualifiedName := provider.GetQualifiedName(rootCtx, serviceName)
ctx := provider.AddInContext(rootCtx, serviceQualifiedName)
ctx = log.With(ctx, log.Str(log.ServiceName, serviceName))
conf, ok := m.configs[serviceQualifiedName]
if !ok {
return nil, fmt.Errorf("the udp service %q does not exist", serviceQualifiedName)
}
if conf.LoadBalancer != nil && conf.Weighted != nil {
err := errors.New("cannot create service: multi-types service not supported, consider declaring two different pieces of service instead")
conf.AddError(err, true)
return nil, err
}
logger := log.FromContext(ctx)
switch {
case conf.LoadBalancer != nil:
loadBalancer := udp.NewWRRLoadBalancer()
for name, server := range conf.LoadBalancer.Servers {
if _, _, err := net.SplitHostPort(server.Address); err != nil {
logger.Errorf("In udp service %q: %v", serviceQualifiedName, err)
continue
}
handler, err := udp.NewProxy(server.Address)
if err != nil {
logger.Errorf("In udp service %q server %q: %v", serviceQualifiedName, server.Address, err)
continue
}
loadBalancer.AddServer(handler)
logger.WithField(log.ServerName, name).Debugf("Creating UDP server %d at %s", name, server.Address)
}
return loadBalancer, nil
case conf.Weighted != nil:
loadBalancer := udp.NewWRRLoadBalancer()
for _, service := range conf.Weighted.Services {
handler, err := m.BuildUDP(rootCtx, service.Name)
if err != nil {
logger.Errorf("In udp service %q: %v", serviceQualifiedName, err)
return nil, err
}
loadBalancer.AddWeightedServer(handler, service.Weight)
}
return loadBalancer, nil
default:
err := fmt.Errorf("the udp service %q does not have any type defined", serviceQualifiedName)
conf.AddError(err, true)
return nil, err
}
}

View file

@ -0,0 +1,201 @@
package udp
import (
"context"
"testing"
"github.com/containous/traefik/v2/pkg/config/dynamic"
"github.com/containous/traefik/v2/pkg/config/runtime"
"github.com/containous/traefik/v2/pkg/server/provider"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestManager_BuildUDP(t *testing.T) {
testCases := []struct {
desc string
serviceName string
configs map[string]*runtime.UDPServiceInfo
providerName string
expectedError string
}{
{
desc: "without configuration",
serviceName: "test",
configs: nil,
expectedError: `the udp service "test" does not exist`,
},
{
desc: "missing lb configuration",
serviceName: "test",
configs: map[string]*runtime.UDPServiceInfo{
"test": {
UDPService: &dynamic.UDPService{},
},
},
expectedError: `the udp service "test" does not have any type defined`,
},
{
desc: "no such host, server is skipped, error is logged",
serviceName: "test",
configs: map[string]*runtime.UDPServiceInfo{
"test": {
UDPService: &dynamic.UDPService{
LoadBalancer: &dynamic.UDPServersLoadBalancer{
Servers: []dynamic.UDPServer{
{Address: "test:31"},
},
},
},
},
},
},
{
desc: "invalid IP address, server is skipped, error is logged",
serviceName: "test",
configs: map[string]*runtime.UDPServiceInfo{
"test": {
UDPService: &dynamic.UDPService{
LoadBalancer: &dynamic.UDPServersLoadBalancer{
Servers: []dynamic.UDPServer{
{Address: "foobar"},
},
},
},
},
},
},
{
desc: "Simple service name",
serviceName: "serviceName",
configs: map[string]*runtime.UDPServiceInfo{
"serviceName": {
UDPService: &dynamic.UDPService{
LoadBalancer: &dynamic.UDPServersLoadBalancer{},
},
},
},
},
{
desc: "Service name with provider",
serviceName: "serviceName@provider-1",
configs: map[string]*runtime.UDPServiceInfo{
"serviceName@provider-1": {
UDPService: &dynamic.UDPService{
LoadBalancer: &dynamic.UDPServersLoadBalancer{},
},
},
},
},
{
desc: "Service name with provider in context",
serviceName: "serviceName",
configs: map[string]*runtime.UDPServiceInfo{
"serviceName@provider-1": {
UDPService: &dynamic.UDPService{
LoadBalancer: &dynamic.UDPServersLoadBalancer{},
},
},
},
providerName: "provider-1",
},
{
desc: "Server with correct host:port as address",
serviceName: "serviceName",
configs: map[string]*runtime.UDPServiceInfo{
"serviceName@provider-1": {
UDPService: &dynamic.UDPService{
LoadBalancer: &dynamic.UDPServersLoadBalancer{
Servers: []dynamic.UDPServer{
{
Address: "foobar.com:80",
},
},
},
},
},
},
providerName: "provider-1",
},
{
desc: "Server with correct ip:port as address",
serviceName: "serviceName",
configs: map[string]*runtime.UDPServiceInfo{
"serviceName@provider-1": {
UDPService: &dynamic.UDPService{
LoadBalancer: &dynamic.UDPServersLoadBalancer{
Servers: []dynamic.UDPServer{
{
Address: "192.168.0.12:80",
},
},
},
},
},
},
providerName: "provider-1",
},
{
desc: "missing port in address with hostname, server is skipped, error is logged",
serviceName: "serviceName",
configs: map[string]*runtime.UDPServiceInfo{
"serviceName@provider-1": {
UDPService: &dynamic.UDPService{
LoadBalancer: &dynamic.UDPServersLoadBalancer{
Servers: []dynamic.UDPServer{
{
Address: "foobar.com",
},
},
},
},
},
},
providerName: "provider-1",
},
{
desc: "missing port in address with ip, server is skipped, error is logged",
serviceName: "serviceName",
configs: map[string]*runtime.UDPServiceInfo{
"serviceName@provider-1": {
UDPService: &dynamic.UDPService{
LoadBalancer: &dynamic.UDPServersLoadBalancer{
Servers: []dynamic.UDPServer{
{
Address: "192.168.0.12",
},
},
},
},
},
},
providerName: "provider-1",
},
}
for _, test := range testCases {
test := test
t.Run(test.desc, func(t *testing.T) {
t.Parallel()
manager := NewManager(&runtime.Configuration{
UDPServices: test.configs,
})
ctx := context.Background()
if len(test.providerName) > 0 {
ctx = provider.AddInContext(ctx, "foobar@"+test.providerName)
}
handler, err := manager.BuildUDP(ctx, test.serviceName)
if test.expectedError != "" {
assert.EqualError(t, err, test.expectedError)
require.Nil(t, handler)
} else {
assert.Nil(t, err)
require.NotNil(t, handler)
}
})
}
}

View file

@ -1,70 +0,0 @@
package server
import (
"context"
"github.com/containous/traefik/v2/pkg/config/dynamic"
"github.com/containous/traefik/v2/pkg/config/runtime"
"github.com/containous/traefik/v2/pkg/config/static"
"github.com/containous/traefik/v2/pkg/responsemodifiers"
"github.com/containous/traefik/v2/pkg/server/middleware"
"github.com/containous/traefik/v2/pkg/server/router"
routertcp "github.com/containous/traefik/v2/pkg/server/router/tcp"
"github.com/containous/traefik/v2/pkg/server/service"
"github.com/containous/traefik/v2/pkg/server/service/tcp"
tcpCore "github.com/containous/traefik/v2/pkg/tcp"
"github.com/containous/traefik/v2/pkg/tls"
)
// TCPRouterFactory the factory of TCP routers.
type TCPRouterFactory struct {
entryPoints []string
managerFactory *service.ManagerFactory
chainBuilder *middleware.ChainBuilder
tlsManager *tls.Manager
}
// NewTCPRouterFactory creates a new TCPRouterFactory
func NewTCPRouterFactory(staticConfiguration static.Configuration, managerFactory *service.ManagerFactory, tlsManager *tls.Manager, chainBuilder *middleware.ChainBuilder) *TCPRouterFactory {
var entryPoints []string
for name := range staticConfiguration.EntryPoints {
entryPoints = append(entryPoints, name)
}
return &TCPRouterFactory{
entryPoints: entryPoints,
managerFactory: managerFactory,
tlsManager: tlsManager,
chainBuilder: chainBuilder,
}
}
// CreateTCPRouters creates new TCPRouters
func (f *TCPRouterFactory) CreateTCPRouters(conf dynamic.Configuration) map[string]*tcpCore.Router {
ctx := context.Background()
rtConf := runtime.NewConfig(conf)
// HTTP
serviceManager := f.managerFactory.Build(rtConf)
middlewaresBuilder := middleware.NewBuilder(rtConf.Middlewares, serviceManager)
responseModifierFactory := responsemodifiers.NewBuilder(rtConf.Middlewares)
routerManager := router.NewManager(rtConf, serviceManager, middlewaresBuilder, responseModifierFactory, f.chainBuilder)
handlersNonTLS := routerManager.BuildHandlers(ctx, f.entryPoints, false)
handlersTLS := routerManager.BuildHandlers(ctx, f.entryPoints, true)
// TCP
svcTCPManager := tcp.NewManager(rtConf)
rtTCPManager := routertcp.NewManager(rtConf, svcTCPManager, handlersNonTLS, handlersTLS, f.tlsManager)
routersTCP := rtTCPManager.BuildHandlers(ctx, f.entryPoints)
rtConf.PopulateUsedBy()
return routersTCP
}