1
0
Fork 0

Add SO_REUSEPORT support for EntryPoints

This commit is contained in:
Aofei Sheng 2024-01-30 21:56:05 +08:00 committed by GitHub
parent 40de310927
commit d02be003ab
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
19 changed files with 279 additions and 43 deletions

View file

@ -12,6 +12,7 @@ import (
// EntryPoint holds the entry point configuration.
type EntryPoint struct {
Address string `description:"Entry point address." json:"address,omitempty" toml:"address,omitempty" yaml:"address,omitempty"`
ReusePort bool `description:"Enables EntryPoints from the same or different processes listening on the same TCP/UDP port." json:"reusePort,omitempty" toml:"reusePort,omitempty" yaml:"reusePort,omitempty"`
AsDefault bool `description:"Adds this EntryPoint to the list of default EntryPoints to be used on routers that don't have any Entrypoint defined." json:"asDefault,omitempty" toml:"asDefault,omitempty" yaml:"asDefault,omitempty"`
Transport *EntryPointsTransport `description:"Configures communication between clients and Traefik." json:"transport,omitempty" toml:"transport,omitempty" yaml:"transport,omitempty" export:"true"`
ProxyProtocol *ProxyProtocol `description:"Proxy-Protocol configuration." json:"proxyProtocol,omitempty" toml:"proxyProtocol,omitempty" yaml:"proxyProtocol,omitempty" label:"allowEmpty" file:"allowEmpty" export:"true"`

View file

@ -0,0 +1,15 @@
//go:build !(linux || freebsd || openbsd || darwin)
package server
import (
"net"
"github.com/traefik/traefik/v3/pkg/config/static"
)
// newListenConfig creates a new net.ListenConfig for the given configuration of
// the entry point.
func newListenConfig(configuration *static.EntryPoint) (lc net.ListenConfig) {
return
}

View file

@ -0,0 +1,44 @@
//go:build !(linux || freebsd || openbsd || darwin)
package server
import (
"context"
"net"
"testing"
"github.com/stretchr/testify/require"
"github.com/traefik/traefik/v3/pkg/config/static"
)
func TestNewListenConfig(t *testing.T) {
ep := static.EntryPoint{Address: ":0"}
listenConfig := newListenConfig(&ep)
require.Nil(t, listenConfig.Control)
require.Zero(t, listenConfig.KeepAlive)
l1, err := listenConfig.Listen(context.Background(), "tcp", ep.Address)
require.NoError(t, err)
require.NotNil(t, l1)
defer l1.Close()
l2, err := listenConfig.Listen(context.Background(), "tcp", l1.Addr().String())
require.Error(t, err)
require.ErrorContains(t, err, "address already in use")
require.Nil(t, l2)
ep = static.EntryPoint{Address: ":0", ReusePort: true}
listenConfig = newListenConfig(&ep)
require.Nil(t, listenConfig.Control)
require.Zero(t, listenConfig.KeepAlive)
l3, err := listenConfig.Listen(context.Background(), "tcp", ep.Address)
require.NoError(t, err)
require.NotNil(t, l3)
defer l3.Close()
l4, err := listenConfig.Listen(context.Background(), "tcp", l3.Addr().String())
require.Error(t, err)
require.ErrorContains(t, err, "address already in use")
require.Nil(t, l4)
}

View file

@ -0,0 +1,44 @@
//go:build linux || freebsd || openbsd || darwin
package server
import (
"fmt"
"net"
"syscall"
"github.com/traefik/traefik/v3/pkg/config/static"
"golang.org/x/sys/unix"
)
// newListenConfig creates a new net.ListenConfig for the given configuration of
// the entry point.
func newListenConfig(configuration *static.EntryPoint) (lc net.ListenConfig) {
if configuration != nil && configuration.ReusePort {
lc.Control = controlReusePort
}
return
}
// controlReusePort is a net.ListenConfig.Control function that enables SO_REUSEPORT
// on the socket.
func controlReusePort(network, address string, c syscall.RawConn) error {
var setSockOptErr error
err := c.Control(func(fd uintptr) {
// Note that net.ListenConfig enables unix.SO_REUSEADDR by default,
// as seen in https://go.dev/src/net/sockopt_linux.go. Therefore, no
// additional action is required to enable it here.
setSockOptErr = unix.SetsockoptInt(int(fd), unix.SOL_SOCKET, unixSOREUSEPORT, 1)
if setSockOptErr != nil {
return
}
})
if err != nil {
return fmt.Errorf("control: %w", err)
}
if setSockOptErr != nil {
return fmt.Errorf("setsockopt: %w", setSockOptErr)
}
return nil
}

View file

@ -0,0 +1,7 @@
//go:build freebsd
package server
import "golang.org/x/sys/unix"
const unixSOREUSEPORT = unix.SO_REUSEPORT_LB

View file

@ -0,0 +1,7 @@
//go:build linux || openbsd || darwin
package server
import "golang.org/x/sys/unix"
const unixSOREUSEPORT = unix.SO_REUSEPORT

View file

@ -0,0 +1,56 @@
//go:build linux || freebsd || openbsd || darwin
package server
import (
"context"
"net"
"testing"
"github.com/stretchr/testify/require"
"github.com/traefik/traefik/v3/pkg/config/static"
)
func TestNewListenConfig(t *testing.T) {
ep := static.EntryPoint{Address: ":0"}
listenConfig := newListenConfig(&ep)
require.Nil(t, listenConfig.Control)
require.Zero(t, listenConfig.KeepAlive)
l1, err := listenConfig.Listen(context.Background(), "tcp", ep.Address)
require.NoError(t, err)
require.NotNil(t, l1)
defer l1.Close()
l2, err := listenConfig.Listen(context.Background(), "tcp", l1.Addr().String())
require.Error(t, err)
require.ErrorContains(t, err, "address already in use")
require.Nil(t, l2)
ep = static.EntryPoint{Address: ":0", ReusePort: true}
listenConfig = newListenConfig(&ep)
require.NotNil(t, listenConfig.Control)
require.Zero(t, listenConfig.KeepAlive)
l3, err := listenConfig.Listen(context.Background(), "tcp", ep.Address)
require.NoError(t, err)
require.NotNil(t, l3)
defer l3.Close()
l4, err := listenConfig.Listen(context.Background(), "tcp", l3.Addr().String())
require.NoError(t, err)
require.NotNil(t, l4)
defer l4.Close()
_, l3Port, err := net.SplitHostPort(l3.Addr().String())
require.NoError(t, err)
l5, err := listenConfig.Listen(context.Background(), "tcp", "127.0.0.1:"+l3Port)
require.NoError(t, err)
require.NotNil(t, l5)
defer l5.Close()
l6, err := listenConfig.Listen(context.Background(), "tcp", l1.Addr().String())
require.Error(t, err)
require.ErrorContains(t, err, "address already in use")
require.Nil(t, l6)
}

View file

@ -460,7 +460,8 @@ func buildProxyProtocolListener(ctx context.Context, entryPoint *static.EntryPoi
}
func buildListener(ctx context.Context, entryPoint *static.EntryPoint) (net.Listener, error) {
listener, err := net.Listen("tcp", entryPoint.GetAddress())
listenConfig := newListenConfig(entryPoint)
listener, err := listenConfig.Listen(ctx, "tcp", entryPoint.GetAddress())
if err != nil {
return nil, fmt.Errorf("error opening listener: %w", err)
}

View file

@ -33,7 +33,8 @@ func newHTTP3Server(ctx context.Context, configuration *static.EntryPoint, https
return nil, errors.New("advertised port must be greater than or equal to zero")
}
conn, err := net.ListenPacket("udp", configuration.GetAddress())
listenConfig := newListenConfig(configuration)
conn, err := listenConfig.ListenPacket(ctx, "udp", configuration.GetAddress())
if err != nil {
return nil, fmt.Errorf("starting listener: %w", err)
}

View file

@ -3,7 +3,6 @@ package server
import (
"context"
"fmt"
"net"
"sync"
"time"
@ -87,12 +86,8 @@ type UDPEntryPoint struct {
// NewUDPEntryPoint returns a UDP entry point.
func NewUDPEntryPoint(cfg *static.EntryPoint) (*UDPEntryPoint, error) {
addr, err := net.ResolveUDPAddr("udp", cfg.GetAddress())
if err != nil {
return nil, err
}
listener, err := udp.Listen("udp", addr, time.Duration(cfg.UDP.Timeout))
listenConfig := newListenConfig(cfg)
listener, err := udp.Listen(listenConfig, "udp", cfg.GetAddress(), time.Duration(cfg.UDP.Timeout))
if err != nil {
return nil, err
}

View file

@ -1,7 +1,9 @@
package udp
import (
"context"
"errors"
"fmt"
"io"
"net"
"sync"
@ -33,18 +35,22 @@ type Listener struct {
}
// Listen creates a new listener.
func Listen(network string, laddr *net.UDPAddr, timeout time.Duration) (*Listener, error) {
func Listen(listenConfig net.ListenConfig, network, address string, timeout time.Duration) (*Listener, error) {
if timeout <= 0 {
return nil, errors.New("timeout should be greater than zero")
}
conn, err := net.ListenUDP(network, laddr)
packetConn, err := listenConfig.ListenPacket(context.Background(), network, address)
if err != nil {
return nil, err
return nil, fmt.Errorf("listen packet: %w", err)
}
pConn, ok := packetConn.(*net.UDPConn)
if !ok {
return nil, errors.New("packet conn is not an UDPConn")
}
l := &Listener{
pConn: conn,
pConn: pConn,
acceptCh: make(chan *Conn),
conns: make(map[string]*Conn),
accepting: true,

View file

@ -14,10 +14,7 @@ import (
)
func TestConsecutiveWrites(t *testing.T) {
addr, err := net.ResolveUDPAddr("udp", ":0")
require.NoError(t, err)
ln, err := Listen("udp", addr, 3*time.Second)
ln, err := Listen(net.ListenConfig{}, "udp", ":0", 3*time.Second)
require.NoError(t, err)
defer func() {
err := ln.Close()
@ -75,11 +72,7 @@ func TestConsecutiveWrites(t *testing.T) {
}
func TestListenNotBlocking(t *testing.T) {
addr, err := net.ResolveUDPAddr("udp", ":0")
require.NoError(t, err)
ln, err := Listen("udp", addr, 3*time.Second)
ln, err := Listen(net.ListenConfig{}, "udp", ":0", 3*time.Second)
require.NoError(t, err)
defer func() {
err := ln.Close()
@ -165,10 +158,7 @@ func TestListenNotBlocking(t *testing.T) {
}
func TestListenWithZeroTimeout(t *testing.T) {
addr, err := net.ResolveUDPAddr("udp", ":0")
require.NoError(t, err)
_, err = Listen("udp", addr, 0)
_, err := Listen(net.ListenConfig{}, "udp", ":0", 0)
assert.Error(t, err)
}
@ -183,10 +173,7 @@ func TestTimeoutWithoutRead(t *testing.T) {
func testTimeout(t *testing.T, withRead bool) {
t.Helper()
addr, err := net.ResolveUDPAddr("udp", ":0")
require.NoError(t, err)
ln, err := Listen("udp", addr, 3*time.Second)
ln, err := Listen(net.ListenConfig{}, "udp", ":0", 3*time.Second)
require.NoError(t, err)
defer func() {
err := ln.Close()
@ -227,10 +214,7 @@ func testTimeout(t *testing.T, withRead bool) {
}
func TestShutdown(t *testing.T) {
addr, err := net.ResolveUDPAddr("udp", ":0")
require.NoError(t, err)
l, err := Listen("udp", addr, 3*time.Second)
l, err := Listen(net.ListenConfig{}, "udp", ":0", 3*time.Second)
require.NoError(t, err)
go func() {
@ -331,10 +315,7 @@ func TestReadLoopMaxDataSize(t *testing.T) {
doneCh := make(chan struct{})
addr, err := net.ResolveUDPAddr("udp", ":0")
require.NoError(t, err)
l, err := Listen("udp", addr, 3*time.Second)
l, err := Listen(net.ListenConfig{}, "udp", ":0", 3*time.Second)
require.NoError(t, err)
defer func() {

View file

@ -96,10 +96,7 @@ func TestProxy_ServeUDP_MaxDataSize(t *testing.T) {
func newServer(t *testing.T, addr string, handler Handler) {
t.Helper()
addrL, err := net.ResolveUDPAddr("udp", addr)
require.NoError(t, err)
listener, err := Listen("udp", addrL, 3*time.Second)
listener, err := Listen(net.ListenConfig{}, "udp", addr, 3*time.Second)
require.NoError(t, err)
for {