1
0
Fork 0

added support for tcp proxyProtocol v1&v2 to backend

This commit is contained in:
Matthias Schneider 2020-11-17 13:04:04 +01:00 committed by GitHub
parent 520fcf82ae
commit 84b125bdde
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
28 changed files with 388 additions and 83 deletions

View file

@ -72,8 +72,9 @@ type TCPServersLoadBalancer struct {
// connection, to close the reading capability as well, hence fully terminating the
// connection. It is a duration in milliseconds, defaulting to 100. A negative value
// means an infinite deadline (i.e. the reading capability is never closed).
TerminationDelay *int `json:"terminationDelay,omitempty" toml:"terminationDelay,omitempty" yaml:"terminationDelay,omitempty"`
Servers []TCPServer `json:"servers,omitempty" toml:"servers,omitempty" yaml:"servers,omitempty" label-slice-as-struct:"server"`
TerminationDelay *int `json:"terminationDelay,omitempty" toml:"terminationDelay,omitempty" yaml:"terminationDelay,omitempty"`
ProxyProtocol *ProxyProtocol `json:"proxyProtocol,omitempty" toml:"proxyProtocol,omitempty" yaml:"proxyProtocol,omitempty" label:"allowEmpty" file:"allowEmpty"`
Servers []TCPServer `json:"servers,omitempty" toml:"servers,omitempty" yaml:"servers,omitempty" label-slice-as-struct:"server"`
}
// SetDefaults Default values for a TCPServersLoadBalancer.
@ -106,3 +107,15 @@ type TCPServer struct {
Address string `json:"address,omitempty" toml:"address,omitempty" yaml:"address,omitempty" label:"-"`
Port string `toml:"-" json:"-" yaml:"-"`
}
// +k8s:deepcopy-gen=true
// ProxyProtocol holds the ProxyProtocol configuration.
type ProxyProtocol struct {
Version int `json:"version,omitempty" toml:"version,omitempty" yaml:"version,omitempty"`
}
// SetDefaults Default values for a ProxyProtocol.
func (p *ProxyProtocol) SetDefaults() {
p.Version = 2
}

View file

@ -880,6 +880,22 @@ func (in *PassTLSClientCert) DeepCopy() *PassTLSClientCert {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ProxyProtocol) DeepCopyInto(out *ProxyProtocol) {
*out = *in
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ProxyProtocol.
func (in *ProxyProtocol) DeepCopy() *ProxyProtocol {
if in == nil {
return nil
}
out := new(ProxyProtocol)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *RateLimit) DeepCopyInto(out *RateLimit) {
*out = *in
@ -1373,6 +1389,11 @@ func (in *TCPServersLoadBalancer) DeepCopyInto(out *TCPServersLoadBalancer) {
*out = new(int)
**out = **in
}
if in.ProxyProtocol != nil {
in, out := &in.ProxyProtocol, &out.ProxyProtocol
*out = new(ProxyProtocol)
**out = **in
}
if in.Servers != nil {
in, out := &in.Servers, &out.Servers
*out = make([]TCPServer, len(*in))

View file

@ -182,8 +182,10 @@ func TestDecodeConfiguration(t *testing.T) {
"traefik.tcp.routers.Router1.tls.passthrough": "false",
"traefik.tcp.services.Service0.loadbalancer.server.Port": "42",
"traefik.tcp.services.Service0.loadbalancer.TerminationDelay": "42",
"traefik.tcp.services.Service0.loadbalancer.proxyProtocol.version": "42",
"traefik.tcp.services.Service1.loadbalancer.server.Port": "42",
"traefik.tcp.services.Service1.loadbalancer.TerminationDelay": "42",
"traefik.tcp.services.Service1.loadbalancer.proxyProtocol": "true",
"traefik.udp.routers.Router0.entrypoints": "foobar, fiibar",
"traefik.udp.routers.Router0.service": "foobar",
@ -233,6 +235,7 @@ func TestDecodeConfiguration(t *testing.T) {
},
},
TerminationDelay: func(i int) *int { return &i }(42),
ProxyProtocol: &dynamic.ProxyProtocol{Version: 42},
},
},
"Service1": {
@ -243,6 +246,7 @@ func TestDecodeConfiguration(t *testing.T) {
},
},
TerminationDelay: func(i int) *int { return &i }(42),
ProxyProtocol: &dynamic.ProxyProtocol{Version: 2},
},
},
},

View file

@ -0,0 +1,17 @@
apiVersion: traefik.containo.us/v1alpha1
kind: IngressRouteTCP
metadata:
name: test.route
namespace: default
spec:
entryPoints:
- foo
routes:
- match: HostSNI(`foo.com`)
services:
- name: whoamitcp
port: 8000
proxyProtocol:
version: 2

View file

@ -140,6 +140,15 @@ func createLoadBalancerServerTCP(client Client, namespace string, service v1alph
},
}
if service.ProxyProtocol != nil {
tcpService.LoadBalancer.ProxyProtocol = &dynamic.ProxyProtocol{}
tcpService.LoadBalancer.ProxyProtocol.SetDefaults()
if service.ProxyProtocol.Version != 0 {
tcpService.LoadBalancer.ProxyProtocol.Version = service.ProxyProtocol.Version
}
}
if service.TerminationDelay != nil {
tcpService.LoadBalancer.TerminationDelay = service.TerminationDelay
}

View file

@ -2878,6 +2878,49 @@ func TestLoadIngressRoutes(t *testing.T) {
TLS: &dynamic.TLSConfiguration{},
},
},
{
desc: "TCP with proxyProtocol Version",
paths: []string{"tcp/services.yml", "tcp/with_proxyprotocol.yml"},
expected: &dynamic.Configuration{
TLS: &dynamic.TLSConfiguration{},
UDP: &dynamic.UDPConfiguration{
Routers: map[string]*dynamic.UDPRouter{},
Services: map[string]*dynamic.UDPService{},
},
TCP: &dynamic.TCPConfiguration{
Routers: map[string]*dynamic.TCPRouter{
"default-test.route-fdd3e9338e47a45efefc": {
EntryPoints: []string{"foo"},
Service: "default-test.route-fdd3e9338e47a45efefc",
Rule: "HostSNI(`foo.com`)",
},
},
Services: map[string]*dynamic.TCPService{
"default-test.route-fdd3e9338e47a45efefc": {
LoadBalancer: &dynamic.TCPServersLoadBalancer{
Servers: []dynamic.TCPServer{
{
Address: "10.10.0.1:8000",
Port: "",
},
{
Address: "10.10.0.2:8000",
Port: "",
},
},
ProxyProtocol: &dynamic.ProxyProtocol{Version: 2},
},
},
},
},
HTTP: &dynamic.HTTPConfiguration{
ServersTransports: map[string]*dynamic.ServersTransport{},
Routers: map[string]*dynamic.Router{},
Middlewares: map[string]*dynamic.Middleware{},
Services: map[string]*dynamic.Service{},
},
},
},
{
desc: "TLS with tls store",
paths: []string{"services.yml", "with_tls_store.yml"},

View file

@ -1,6 +1,7 @@
package v1alpha1
import (
"github.com/traefik/traefik/v2/pkg/config/dynamic"
"github.com/traefik/traefik/v2/pkg/types"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
@ -53,11 +54,12 @@ type TLSStoreTCPRef struct {
// ServiceTCP defines an upstream to proxy traffic.
type ServiceTCP struct {
Name string `json:"name"`
Namespace string `json:"namespace"`
Port int32 `json:"port"`
Weight *int `json:"weight,omitempty"`
TerminationDelay *int `json:"terminationDelay,omitempty"`
Name string `json:"name"`
Namespace string `json:"namespace"`
Port int32 `json:"port"`
Weight *int `json:"weight,omitempty"`
TerminationDelay *int `json:"terminationDelay,omitempty"`
ProxyProtocol *dynamic.ProxyProtocol `json:"proxyProtocol,omitempty"`
}
// +genclient

View file

@ -1011,6 +1011,11 @@ func (in *ServiceTCP) DeepCopyInto(out *ServiceTCP) {
*out = new(int)
**out = **in
}
if in.ProxyProtocol != nil {
in, out := &in.ProxyProtocol, &out.ProxyProtocol
*out = new(dynamic.ProxyProtocol)
**out = **in
}
return
}

View file

@ -11,7 +11,7 @@ import (
"syscall"
"time"
proxyprotocol "github.com/c0va23/go-proxyprotocol"
"github.com/pires/go-proxyproto"
"github.com/sirupsen/logrus"
"github.com/traefik/traefik/v2/pkg/config/static"
"github.com/traefik/traefik/v2/pkg/ip"
@ -316,10 +316,10 @@ func (c *writeCloserWrapper) CloseWrite() error {
// implementation, if any was found within the underlying conn.
func writeCloser(conn net.Conn) (tcp.WriteCloser, error) {
switch typedConn := conn.(type) {
case *proxyprotocol.Conn:
underlying, err := writeCloser(typedConn.Conn)
if err != nil {
return nil, err
case *proxyproto.Conn:
underlying, ok := typedConn.TCPConn()
if !ok {
return nil, fmt.Errorf("underlying connection is not a tcp connection")
}
return &writeCloserWrapper{writeCloser: underlying, Conn: typedConn}, nil
case *net.TCPConn:
@ -356,42 +356,35 @@ func (ln tcpKeepAliveListener) Accept() (net.Conn, error) {
return tc, nil
}
type proxyProtocolLogger struct {
log.Logger
}
// Printf force log level to debug.
func (p proxyProtocolLogger) Printf(format string, v ...interface{}) {
p.Debugf(format, v...)
}
func buildProxyProtocolListener(ctx context.Context, entryPoint *static.EntryPoint, listener net.Listener) (net.Listener, error) {
var sourceCheck func(addr net.Addr) (bool, error)
proxyListener := &proxyproto.Listener{Listener: listener}
if entryPoint.ProxyProtocol.Insecure {
sourceCheck = func(_ net.Addr) (bool, error) {
return true, nil
}
} else {
checker, err := ip.NewChecker(entryPoint.ProxyProtocol.TrustedIPs)
if err != nil {
return nil, err
log.FromContext(ctx).Infof("Enabling ProxyProtocol without trusted IPs: Insecure")
return proxyListener, nil
}
checker, err := ip.NewChecker(entryPoint.ProxyProtocol.TrustedIPs)
if err != nil {
return nil, err
}
proxyListener.Policy = func(upstream net.Addr) (proxyproto.Policy, error) {
ipAddr, ok := upstream.(*net.TCPAddr)
if !ok {
return proxyproto.REJECT, fmt.Errorf("type error %v", upstream)
}
sourceCheck = func(addr net.Addr) (bool, error) {
ipAddr, ok := addr.(*net.TCPAddr)
if !ok {
return false, fmt.Errorf("type error %v", addr)
}
return checker.ContainsIP(ipAddr.IP), nil
if !checker.ContainsIP(ipAddr.IP) {
log.FromContext(ctx).Debugf("IP %s is not in trusted IPs list, ignoring ProxyProtocol Headers and bypass connection", ipAddr.IP)
return proxyproto.IGNORE, nil
}
return proxyproto.USE, nil
}
log.FromContext(ctx).Infof("Enabling ProxyProtocol for trusted IPs %v", entryPoint.ProxyProtocol.TrustedIPs)
return proxyprotocol.NewDefaultListener(listener).
WithSourceChecker(sourceCheck).
WithLogger(proxyProtocolLogger{Logger: log.FromContext(ctx)}), nil
return proxyListener, nil
}
func buildListener(ctx context.Context, entryPoint *static.EntryPoint) (net.Listener, error) {

View file

@ -59,7 +59,7 @@ func (m *Manager) BuildTCP(rootCtx context.Context, serviceName string) (tcp.Han
continue
}
handler, err := tcp.NewProxy(server.Address, duration)
handler, err := tcp.NewProxy(server.Address, duration, conf.LoadBalancer.ProxyProtocol)
if err != nil {
logger.Errorf("In service %q server %q: %v", serviceQualifiedName, server.Address, err)
continue

View file

@ -1,10 +1,13 @@
package tcp
import (
"fmt"
"io"
"net"
"time"
"github.com/pires/go-proxyproto"
"github.com/traefik/traefik/v2/pkg/config/dynamic"
"github.com/traefik/traefik/v2/pkg/log"
)
@ -12,16 +15,21 @@ import (
type Proxy struct {
target *net.TCPAddr
terminationDelay time.Duration
proxyProtocol *dynamic.ProxyProtocol
}
// NewProxy creates a new Proxy.
func NewProxy(address string, terminationDelay time.Duration) (*Proxy, error) {
func NewProxy(address string, terminationDelay time.Duration, proxyProtocol *dynamic.ProxyProtocol) (*Proxy, error) {
tcpAddr, err := net.ResolveTCPAddr("tcp", address)
if err != nil {
return nil, err
}
return &Proxy{target: tcpAddr, terminationDelay: terminationDelay}, nil
if proxyProtocol != nil && (proxyProtocol.Version < 1 || proxyProtocol.Version > 2) {
return nil, fmt.Errorf("unknown proxyProtocol version: %d", proxyProtocol.Version)
}
return &Proxy{target: tcpAddr, terminationDelay: terminationDelay, proxyProtocol: proxyProtocol}, nil
}
// ServeTCP forwards the connection to a service.
@ -39,8 +47,16 @@ func (p *Proxy) ServeTCP(conn WriteCloser) {
// maybe not needed, but just in case
defer connBackend.Close()
errChan := make(chan error)
if p.proxyProtocol != nil && p.proxyProtocol.Version > 0 && p.proxyProtocol.Version < 3 {
header := proxyproto.HeaderProxyFromAddrs(byte(p.proxyProtocol.Version), conn.RemoteAddr(), conn.LocalAddr())
if _, err := header.WriteTo(connBackend); err != nil {
log.WithoutContext().Errorf("Error while writing proxy protocol headers to backend connection: %v", err)
return
}
}
go p.connCopy(conn, connBackend, errChan)
go p.connCopy(connBackend, conn, errChan)

View file

@ -2,13 +2,17 @@ package tcp
import (
"bytes"
"errors"
"fmt"
"io"
"net"
"testing"
"time"
"github.com/pires/go-proxyproto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/traefik/traefik/v2/pkg/config/dynamic"
)
func fakeRedis(t *testing.T, listener net.Listener) {
@ -16,6 +20,7 @@ func fakeRedis(t *testing.T, listener net.Listener) {
conn, err := listener.Accept()
fmt.Println("Accept on server")
require.NoError(t, err)
for {
withErr := false
buf := make([]byte, 64)
@ -26,12 +31,13 @@ func fakeRedis(t *testing.T, listener net.Listener) {
if string(buf[:4]) == "ping" {
time.Sleep(1 * time.Millisecond)
if _, err := conn.Write([]byte("PONG")); err != nil {
conn.Close()
_ = conn.Close()
return
}
}
if withErr {
conn.Close()
_ = conn.Close()
return
}
}
@ -46,7 +52,7 @@ func TestCloseWrite(t *testing.T) {
_, port, err := net.SplitHostPort(backendListener.Addr().String())
require.NoError(t, err)
proxy, err := NewProxy(":"+port, 10*time.Millisecond)
proxy, err := NewProxy(":"+port, 10*time.Millisecond, nil)
require.NoError(t, err)
proxyListener, err := net.Listen("tcp", ":0")
@ -79,3 +85,87 @@ func TestCloseWrite(t *testing.T) {
require.Equal(t, int64(4), n)
require.Equal(t, "PONG", buffer.String())
}
func TestProxyProtocol(t *testing.T) {
testCases := []struct {
desc string
version int
}{
{
desc: "PROXY protocol v1",
version: 1,
},
{
desc: "PROXY protocol v2",
version: 2,
},
}
for _, test := range testCases {
test := test
t.Run(test.desc, func(t *testing.T) {
backendListener, err := net.Listen("tcp", ":0")
require.NoError(t, err)
var version int
proxyBackendListener := proxyproto.Listener{
Listener: backendListener,
ValidateHeader: func(h *proxyproto.Header) error {
version = int(h.Version)
return nil
},
Policy: func(upstream net.Addr) (proxyproto.Policy, error) {
switch test.version {
case 1, 2:
return proxyproto.USE, nil
default:
return proxyproto.REQUIRE, errors.New("unsupported version")
}
},
}
defer proxyBackendListener.Close()
go fakeRedis(t, &proxyBackendListener)
_, port, err := net.SplitHostPort(proxyBackendListener.Addr().String())
require.NoError(t, err)
proxy, err := NewProxy(":"+port, 10*time.Millisecond, &dynamic.ProxyProtocol{Version: test.version})
require.NoError(t, err)
proxyListener, err := net.Listen("tcp", ":0")
require.NoError(t, err)
go func() {
for {
conn, err := proxyListener.Accept()
require.NoError(t, err)
proxy.ServeTCP(conn.(*net.TCPConn))
}
}()
_, port, err = net.SplitHostPort(proxyListener.Addr().String())
require.NoError(t, err)
conn, err := net.Dial("tcp", ":"+port)
require.NoError(t, err)
_, err = conn.Write([]byte("ping\n"))
require.NoError(t, err)
err = conn.(*net.TCPConn).CloseWrite()
require.NoError(t, err)
var buf []byte
buffer := bytes.NewBuffer(buf)
n, err := io.Copy(buffer, conn)
require.NoError(t, err)
assert.Equal(t, int64(4), n)
assert.Equal(t, "PONG", buffer.String())
assert.Equal(t, test.version, version)
})
}
}