1
0
Fork 0

Merge branch v3.5 into master

This commit is contained in:
romain 2025-09-09 17:47:13 +02:00
commit c09d3fb03c
243 changed files with 6720 additions and 4386 deletions

View file

@ -156,7 +156,7 @@ func extractType(element interface{}) string {
for i := range v.NumField() {
field := v.Field(i)
if field.Kind() == reflect.Map && field.Type().Elem() == reflect.TypeOf(dynamic.PluginConf{}) {
if field.Kind() == reflect.Map && field.Type().Elem() == reflect.TypeFor[dynamic.PluginConf]() {
if keys := field.MapKeys(); len(keys) == 1 {
return keys[0].String()
}

View file

@ -232,7 +232,7 @@ func getProviders(conf static.Configuration) []string {
if !field.IsNil() {
providers = append(providers, v.Type().Field(i).Name)
}
} else if field.Kind() == reflect.Map && field.Type().Elem() == reflect.TypeOf(static.PluginConf{}) {
} else if field.Kind() == reflect.Map && field.Type().Elem() == reflect.TypeFor[static.PluginConf]() {
for _, value := range field.MapKeys() {
providers = append(providers, "plugin-"+value.String())
}

View file

@ -55,7 +55,7 @@ func fill(field reflect.Value) error {
setTyped(field, int32(defaultNumber))
case reflect.Int64:
switch field.Type() {
case reflect.TypeOf(types.Duration(time.Second)):
case reflect.TypeFor[types.Duration]():
setTyped(field, types.Duration(defaultNumber*time.Second))
default:
setTyped(field, int64(defaultNumber))

View file

@ -84,10 +84,12 @@ type RouterTCPTLSConfig struct {
// TCPServersLoadBalancer holds the LoadBalancerService configuration.
type TCPServersLoadBalancer struct {
ProxyProtocol *ProxyProtocol `json:"proxyProtocol,omitempty" toml:"proxyProtocol,omitempty" yaml:"proxyProtocol,omitempty" label:"allowEmpty" file:"allowEmpty" kv:"allowEmpty" export:"true"`
Servers []TCPServer `json:"servers,omitempty" toml:"servers,omitempty" yaml:"servers,omitempty" label-slice-as-struct:"server" export:"true"`
ServersTransport string `json:"serversTransport,omitempty" toml:"serversTransport,omitempty" yaml:"serversTransport,omitempty" export:"true"`
Servers []TCPServer `json:"servers,omitempty" toml:"servers,omitempty" yaml:"servers,omitempty" label-slice-as-struct:"server" export:"true"`
ServersTransport string `json:"serversTransport,omitempty" toml:"serversTransport,omitempty" yaml:"serversTransport,omitempty" export:"true"`
// ProxyProtocol holds the PROXY Protocol configuration.
// Deprecated: use ServersTransport to configure ProxyProtocol instead.
ProxyProtocol *ProxyProtocol `json:"proxyProtocol,omitempty" toml:"proxyProtocol,omitempty" yaml:"proxyProtocol,omitempty" label:"allowEmpty" file:"allowEmpty" kv:"allowEmpty" export:"true"`
// TerminationDelay, corresponds to the deadline that the proxy sets, after one
// of its connected peers indicates it has closed the writing capability of its
// connection, to close the reading capability as well, hence fully terminating the
@ -131,7 +133,7 @@ type ProxyProtocol struct {
// Version defines the PROXY Protocol version to use.
// +kubebuilder:validation:Minimum=1
// +kubebuilder:validation:Maximum=2
Version int `json:"version,omitempty" toml:"version,omitempty" yaml:"version,omitempty" export:"true"`
Version int `description:"Defines the PROXY Protocol version to use." json:"version,omitempty" toml:"version,omitempty" yaml:"version,omitempty" export:"true"`
}
// SetDefaults Default values for a ProxyProtocol.
@ -145,6 +147,8 @@ func (p *ProxyProtocol) SetDefaults() {
type TCPServersTransport struct {
DialKeepAlive ptypes.Duration `description:"Defines the interval between keep-alive probes for an active network connection. If zero, keep-alive probes are sent with a default value (currently 15 seconds), if supported by the protocol and operating system. Network protocols or operating systems that do not support keep-alives ignore this field. If negative, keep-alive probes are disabled" json:"dialKeepAlive,omitempty" toml:"dialKeepAlive,omitempty" yaml:"dialKeepAlive,omitempty" export:"true"`
DialTimeout ptypes.Duration `description:"Defines the amount of time to wait until a connection to a backend server can be established. If zero, no timeout exists." json:"dialTimeout,omitempty" toml:"dialTimeout,omitempty" yaml:"dialTimeout,omitempty" export:"true"`
// ProxyProtocol holds the PROXY Protocol configuration.
ProxyProtocol *ProxyProtocol `description:"Defines the PROXY Protocol configuration." json:"proxyProtocol,omitempty" toml:"proxyProtocol,omitempty" yaml:"proxyProtocol,omitempty" label:"allowEmpty" file:"allowEmpty" kv:"allowEmpty" export:"true"`
// TerminationDelay, corresponds to the deadline that the proxy sets, after one
// of its connected peers indicates it has closed the writing capability of its
// connection, to close the reading capability as well, hence fully terminating the
@ -154,6 +158,13 @@ type TCPServersTransport struct {
TLS *TLSClientConfig `description:"Defines the TLS configuration." json:"tls,omitempty" toml:"tls,omitempty" yaml:"tls,omitempty" label:"allowEmpty" file:"allowEmpty" kv:"allowEmpty" export:"true"`
}
// SetDefaults sets the default values for a TCPServersTransport.
func (t *TCPServersTransport) SetDefaults() {
t.DialTimeout = ptypes.Duration(30 * time.Second)
t.DialKeepAlive = ptypes.Duration(15 * time.Second)
t.TerminationDelay = ptypes.Duration(100 * time.Millisecond)
}
// +k8s:deepcopy-gen=true
// TLSClientConfig options to configure TLS communication between Traefik and the servers.
@ -165,10 +176,3 @@ type TLSClientConfig struct {
PeerCertURI string `description:"Defines the URI used to match against SAN URI during the peer certificate verification." json:"peerCertURI,omitempty" toml:"peerCertURI,omitempty" yaml:"peerCertURI,omitempty" export:"true"`
Spiffe *Spiffe `description:"Defines the SPIFFE TLS configuration." json:"spiffe,omitempty" toml:"spiffe,omitempty" yaml:"spiffe,omitempty" label:"allowEmpty" file:"allowEmpty" export:"true"`
}
// SetDefaults sets the default values for a TCPServersTransport.
func (t *TCPServersTransport) SetDefaults() {
t.DialTimeout = ptypes.Duration(30 * time.Second)
t.DialKeepAlive = ptypes.Duration(15 * time.Second)
t.TerminationDelay = ptypes.Duration(100 * time.Millisecond)
}

View file

@ -2004,16 +2004,16 @@ func (in *TCPServer) DeepCopy() *TCPServer {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *TCPServersLoadBalancer) DeepCopyInto(out *TCPServersLoadBalancer) {
*out = *in
if in.ProxyProtocol != nil {
in, out := &in.ProxyProtocol, &out.ProxyProtocol
*out = new(ProxyProtocol)
**out = **in
}
if in.Servers != nil {
in, out := &in.Servers, &out.Servers
*out = make([]TCPServer, len(*in))
copy(*out, *in)
}
if in.ProxyProtocol != nil {
in, out := &in.ProxyProtocol, &out.ProxyProtocol
*out = new(ProxyProtocol)
**out = **in
}
if in.TerminationDelay != nil {
in, out := &in.TerminationDelay, &out.TerminationDelay
*out = new(int)
@ -2035,6 +2035,11 @@ func (in *TCPServersLoadBalancer) DeepCopy() *TCPServersLoadBalancer {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *TCPServersTransport) DeepCopyInto(out *TCPServersTransport) {
*out = *in
if in.ProxyProtocol != nil {
in, out := &in.ProxyProtocol, &out.ProxyProtocol
*out = new(ProxyProtocol)
**out = **in
}
if in.TLS != nil {
in, out := &in.TLS, &out.TLS
*out = new(TLSClientConfig)

View file

@ -357,6 +357,25 @@ func (c *Configuration) SetEffectiveConfiguration() {
}
}
for _, resolver := range c.CertificatesResolvers {
if resolver.ACME == nil {
continue
}
if resolver.ACME.DNSChallenge == nil {
continue
}
switch resolver.ACME.DNSChallenge.Provider {
case "googledomains", "cloudxns", "brandit":
log.Warn().Msgf("%s DNS provider is deprecated.", resolver.ACME.DNSChallenge.Provider)
case "dnspod":
log.Warn().Msgf("%s provider is deprecated, please use 'tencentcloud' provider instead.", resolver.ACME.DNSChallenge.Provider)
case "azure":
log.Warn().Msgf("%s provider is deprecated, please use 'azuredns' provider instead.", resolver.ACME.DNSChallenge.Provider)
}
}
c.initACMEProvider()
}

View file

@ -37,6 +37,9 @@ const (
// CommonFormat is the common logging format (CLF).
CommonFormat string = "common"
// GenericCLFFormat is the generic CLF format.
GenericCLFFormat string = "genericCLF"
// JSONFormat is the JSON logging format.
JSONFormat string = "json"
)
@ -101,6 +104,8 @@ func NewHandler(ctx context.Context, config *types.AccessLog) (*Handler, error)
switch config.Format {
case CommonFormat:
formatter = new(CommonLogFormatter)
case GenericCLFFormat:
formatter = new(GenericCLFLogFormatter)
case JSONFormat:
formatter = new(logrus.JSONFormatter)
default:

View file

@ -52,6 +52,35 @@ func (f *CommonLogFormatter) Format(entry *logrus.Entry) ([]byte, error) {
return b.Bytes(), err
}
// GenericCLFLogFormatter provides formatting in the generic CLF log format.
type GenericCLFLogFormatter struct{}
// Format formats the log entry in the generic CLF log format.
func (f *GenericCLFLogFormatter) Format(entry *logrus.Entry) ([]byte, error) {
b := &bytes.Buffer{}
timestamp := defaultValue
if v, ok := entry.Data[StartUTC]; ok {
timestamp = v.(time.Time).Format(commonLogTimeFormat)
} else if v, ok := entry.Data[StartLocal]; ok {
timestamp = v.(time.Time).Local().Format(commonLogTimeFormat)
}
_, err := fmt.Fprintf(b, "%s - %s [%s] \"%s %s %s\" %v %v %s %s\n",
toLog(entry.Data, ClientHost, defaultValue, false),
toLog(entry.Data, ClientUsername, defaultValue, false),
timestamp,
toLog(entry.Data, RequestMethod, defaultValue, false),
toLog(entry.Data, RequestPath, defaultValue, false),
toLog(entry.Data, RequestProtocol, defaultValue, false),
toLog(entry.Data, DownstreamStatus, defaultValue, true),
toLog(entry.Data, DownstreamContentSize, defaultValue, true),
toLog(entry.Data, "request_Referer", `"-"`, true),
toLog(entry.Data, "request_User-Agent", `"-"`, true))
return b.Bytes(), err
}
func toLog(fields logrus.Fields, key, defaultValue string, quoted bool) interface{} {
if v, ok := fields[key]; ok {
if v == nil {

View file

@ -101,6 +101,97 @@ func TestCommonLogFormatter_Format(t *testing.T) {
}
}
func TestGenericCLFLogFormatter_Format(t *testing.T) {
clf := GenericCLFLogFormatter{}
testCases := []struct {
name string
data map[string]interface{}
expectedLog string
}{
{
name: "DownstreamStatus & DownstreamContentSize are nil",
data: map[string]interface{}{
StartUTC: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
Duration: 123 * time.Second,
ClientHost: "10.0.0.1",
ClientUsername: "Client",
RequestMethod: http.MethodGet,
RequestPath: "/foo",
RequestProtocol: "http",
DownstreamStatus: nil,
DownstreamContentSize: nil,
RequestRefererHeader: "",
RequestUserAgentHeader: "",
RequestCount: 0,
RouterName: "",
ServiceURL: "",
},
expectedLog: `10.0.0.1 - Client [10/Nov/2009:23:00:00 +0000] "GET /foo http" - - "-" "-"
`,
},
{
name: "all data",
data: map[string]interface{}{
StartUTC: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
Duration: 123 * time.Second,
ClientHost: "10.0.0.1",
ClientUsername: "Client",
RequestMethod: http.MethodGet,
RequestPath: "/foo",
RequestProtocol: "http",
DownstreamStatus: 123,
DownstreamContentSize: 132,
RequestRefererHeader: "referer",
RequestUserAgentHeader: "agent",
RequestCount: nil,
RouterName: "foo",
ServiceURL: "http://10.0.0.2/toto",
},
expectedLog: `10.0.0.1 - Client [10/Nov/2009:23:00:00 +0000] "GET /foo http" 123 132 "referer" "agent"
`,
},
{
name: "all data with local time",
data: map[string]interface{}{
StartLocal: time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC),
Duration: 123 * time.Second,
ClientHost: "10.0.0.1",
ClientUsername: "Client",
RequestMethod: http.MethodGet,
RequestPath: "/foo",
RequestProtocol: "http",
DownstreamStatus: 123,
DownstreamContentSize: 132,
RequestRefererHeader: "referer",
RequestUserAgentHeader: "agent",
RequestCount: nil,
RouterName: "foo",
ServiceURL: "http://10.0.0.2/toto",
},
expectedLog: `10.0.0.1 - Client [10/Nov/2009:14:00:00 -0900] "GET /foo http" 123 132 "referer" "agent"
`,
},
}
var err error
time.Local, err = time.LoadLocation("Etc/GMT+9")
require.NoError(t, err)
for _, test := range testCases {
t.Run(test.name, func(t *testing.T) {
t.Parallel()
entry := &logrus.Entry{Data: test.data}
raw, err := clf.Format(entry)
assert.NoError(t, err)
assert.Equal(t, test.expectedLog, string(raw))
})
}
}
func Test_toLog(t *testing.T) {
testCases := []struct {
desc string

View file

@ -68,7 +68,17 @@ func TestOTelAccessLogWithBody(t *testing.T) {
bodyCheckFn: func(t *testing.T, log string) {
t.Helper()
// For common format, verify the body contains the CLF formatted string
// For common format, verify the body contains the Traefik common log formatted string
assert.Regexp(t, `"body":{"stringValue":".*- /health -.*200.*[0-9]+ms.*"}`, log)
},
},
{
desc: "Generic CLF format with log body",
format: GenericCLFFormat,
bodyCheckFn: func(t *testing.T, log string) {
t.Helper()
// For generic CLF format, verify the body contains the CLF formatted string
assert.Regexp(t, `"body":{"stringValue":".*- /health -.*200.*"}`, log)
},
},
@ -375,7 +385,7 @@ func TestLoggerHeaderFields(t *testing.T) {
}
}
func TestLoggerCLF(t *testing.T) {
func TestCommonLogger(t *testing.T) {
logFilePath := filepath.Join(t.TempDir(), logFileNameSuffix)
config := &types.AccessLog{FilePath: logFilePath, Format: CommonFormat}
doLogging(t, config, false)
@ -384,10 +394,10 @@ func TestLoggerCLF(t *testing.T) {
require.NoError(t, err)
expectedLog := ` TestHost - TestUser [13/Apr/2016:07:14:19 -0700] "POST testpath HTTP/0.0" 123 12 "testReferer" "testUserAgent" 1 "testRouter" "http://127.0.0.1/testService" 1ms`
assertValidLogData(t, expectedLog, logData)
assertValidCommonLogData(t, expectedLog, logData)
}
func TestLoggerCLFWithBufferingSize(t *testing.T) {
func TestCommonLoggerWithBufferingSize(t *testing.T) {
logFilePath := filepath.Join(t.TempDir(), logFileNameSuffix)
config := &types.AccessLog{FilePath: logFilePath, Format: CommonFormat, BufferingSize: 1024}
doLogging(t, config, false)
@ -399,7 +409,34 @@ func TestLoggerCLFWithBufferingSize(t *testing.T) {
require.NoError(t, err)
expectedLog := ` TestHost - TestUser [13/Apr/2016:07:14:19 -0700] "POST testpath HTTP/0.0" 123 12 "testReferer" "testUserAgent" 1 "testRouter" "http://127.0.0.1/testService" 1ms`
assertValidLogData(t, expectedLog, logData)
assertValidCommonLogData(t, expectedLog, logData)
}
func TestLoggerGenericCLF(t *testing.T) {
logFilePath := filepath.Join(t.TempDir(), logFileNameSuffix)
config := &types.AccessLog{FilePath: logFilePath, Format: GenericCLFFormat}
doLogging(t, config, false)
logData, err := os.ReadFile(logFilePath)
require.NoError(t, err)
expectedLog := ` TestHost - TestUser [13/Apr/2016:07:14:19 -0700] "POST testpath HTTP/0.0" 123 12 "testReferer" "testUserAgent"`
assertValidGenericCLFLogData(t, expectedLog, logData)
}
func TestLoggerGenericCLFWithBufferingSize(t *testing.T) {
logFilePath := filepath.Join(t.TempDir(), logFileNameSuffix)
config := &types.AccessLog{FilePath: logFilePath, Format: GenericCLFFormat, BufferingSize: 1024}
doLogging(t, config, false)
// wait a bit for the buffer to be written in the file.
time.Sleep(50 * time.Millisecond)
logData, err := os.ReadFile(logFilePath)
require.NoError(t, err)
expectedLog := ` TestHost - TestUser [13/Apr/2016:07:14:19 -0700] "POST testpath HTTP/0.0" 123 12 "testReferer" "testUserAgent"`
assertValidGenericCLFLogData(t, expectedLog, logData)
}
func assertString(exp string) func(t *testing.T, actual interface{}) {
@ -963,12 +1000,12 @@ func TestNewLogHandlerOutputStdout(t *testing.T) {
written, err := os.ReadFile(file.Name())
require.NoError(t, err, "unable to read captured stdout from file")
assertValidLogData(t, test.expectedLog, written)
assertValidCommonLogData(t, test.expectedLog, written)
})
}
}
func assertValidLogData(t *testing.T, expected string, logData []byte) {
func assertValidCommonLogData(t *testing.T, expected string, logData []byte) {
t.Helper()
if len(expected) == 0 {
@ -1001,6 +1038,35 @@ func assertValidLogData(t *testing.T, expected string, logData []byte) {
assert.Regexp(t, `\d*ms`, result[Duration], formatErrMessage)
}
func assertValidGenericCLFLogData(t *testing.T, expected string, logData []byte) {
t.Helper()
if len(expected) == 0 {
assert.Empty(t, logData)
t.Log(string(logData))
return
}
result, err := ParseAccessLog(string(logData))
require.NoError(t, err)
resultExpected, err := ParseAccessLog(expected)
require.NoError(t, err)
formatErrMessage := fmt.Sprintf("Expected:\t%q\nActual:\t%q", expected, string(logData))
require.Len(t, result, len(resultExpected), formatErrMessage)
assert.Equal(t, resultExpected[ClientHost], result[ClientHost], formatErrMessage)
assert.Equal(t, resultExpected[ClientUsername], result[ClientUsername], formatErrMessage)
assert.Equal(t, resultExpected[RequestMethod], result[RequestMethod], formatErrMessage)
assert.Equal(t, resultExpected[RequestPath], result[RequestPath], formatErrMessage)
assert.Equal(t, resultExpected[RequestProtocol], result[RequestProtocol], formatErrMessage)
assert.Equal(t, resultExpected[OriginStatus], result[OriginStatus], formatErrMessage)
assert.Equal(t, resultExpected[OriginContentSize], result[OriginContentSize], formatErrMessage)
assert.Equal(t, resultExpected[RequestRefererHeader], result[RequestRefererHeader], formatErrMessage)
assert.Equal(t, resultExpected[RequestUserAgentHeader], result[RequestUserAgentHeader], formatErrMessage)
}
func captureStdout(t *testing.T) (out *os.File, restoreStdout func()) {
t.Helper()

View file

@ -122,11 +122,18 @@ func (c *customErrors) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
}
var query string
scheme := "http"
if req.TLS != nil {
scheme = "https"
}
orig := &url.URL{Scheme: scheme, Host: req.Host, Path: req.URL.Path, RawPath: req.URL.RawPath, RawQuery: req.URL.RawQuery, Fragment: req.URL.Fragment}
if len(c.backendQuery) > 0 {
query = "/" + strings.TrimPrefix(c.backendQuery, "/")
query = strings.ReplaceAll(query, "{status}", strconv.Itoa(code))
query = strings.ReplaceAll(query, "{originalStatus}", strconv.Itoa(originalCode))
query = strings.ReplaceAll(query, "{url}", url.QueryEscape(req.URL.String()))
query = strings.ReplaceAll(query, "{url}", url.QueryEscape(orig.String()))
}
pageReq, err := newRequest("http://" + req.Host + query)

View file

@ -175,6 +175,10 @@ func TestHandler(t *testing.T) {
req := testhelpers.MustNewRequest(http.MethodGet, "http://localhost/test?foo=bar&baz=buz", nil)
// Client like browser and curl will issue a relative HTTP request, which not have a host and scheme in the URL. But the http.NewRequest will set them automatically.
req.URL.Host = ""
req.URL.Scheme = ""
recorder := httptest.NewRecorder()
errorPageHandler.ServeHTTP(recorder, req)

View file

@ -55,7 +55,10 @@ func (e *entryPointTracing) ServeHTTP(rw http.ResponseWriter, req *http.Request)
tracingCtx := tracing.ExtractCarrierIntoContext(req.Context(), req.Header)
start := time.Now()
tracingCtx, span := e.tracer.Start(tracingCtx, "EntryPoint", trace.WithSpanKind(trace.SpanKindServer), trace.WithTimestamp(start))
// Follow semantic conventions defined by OTEL: https://opentelemetry.io/docs/specs/semconv/http/http-spans/#name
// At the moment this implementation only gets the {method} as there is no guarantee {target} is low cardinality.
tracingCtx, span := e.tracer.Start(tracingCtx, req.Method, trace.WithSpanKind(trace.SpanKindServer), trace.WithTimestamp(start))
// Associate the request context with the logger.
// This allows the logger to be aware of the tracing context and log accordingly (TraceID, SpanID, etc.).

View file

@ -6,6 +6,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/traefik/traefik/v3/pkg/tracing"
"go.opentelemetry.io/otel/attribute"
)
@ -19,13 +20,15 @@ func TestEntryPointMiddleware_tracing(t *testing.T) {
testCases := []struct {
desc string
entryPoint string
method string
expected expected
}{
{
desc: "basic test",
desc: "GET",
entryPoint: "test",
method: http.MethodGet,
expected: expected{
name: "EntryPoint",
name: "GET",
attributes: []attribute.KeyValue{
attribute.String("span.kind", "server"),
attribute.String("entry_point", "test"),
@ -47,11 +50,38 @@ func TestEntryPointMiddleware_tracing(t *testing.T) {
},
},
},
{
desc: "POST",
entryPoint: "test",
method: http.MethodPost,
expected: expected{
name: "POST",
attributes: []attribute.KeyValue{
attribute.String("span.kind", "server"),
attribute.String("entry_point", "test"),
attribute.String("http.request.method", "POST"),
attribute.String("network.protocol.version", "1.1"),
attribute.Int64("http.request.body.size", int64(0)),
attribute.String("url.path", "/search"),
attribute.String("url.query", "q=Opentelemetry&token=REDACTED"),
attribute.String("url.scheme", "http"),
attribute.String("user_agent.original", "entrypoint-test"),
attribute.String("server.address", "www.test.com"),
attribute.String("network.peer.address", "10.0.0.1"),
attribute.String("client.address", "10.0.0.1"),
attribute.Int64("client.port", int64(1234)),
attribute.Int64("network.peer.port", int64(1234)),
attribute.StringSlice("http.request.header.x-foo", []string{"foo", "bar"}),
attribute.Int64("http.response.status_code", int64(404)),
attribute.StringSlice("http.response.header.x-bar", []string{"foo", "bar"}),
},
},
},
}
for _, test := range testCases {
t.Run(test.desc, func(t *testing.T) {
req := httptest.NewRequest(http.MethodGet, "http://www.test.com/search?q=Opentelemetry&token=123", nil)
req := httptest.NewRequest(test.method, "http://www.test.com/search?q=Opentelemetry&token=123", nil)
rw := httptest.NewRecorder()
req.RemoteAddr = "10.0.0.1:1234"
req.Header.Set("User-Agent", "entrypoint-test")
@ -67,13 +97,17 @@ func TestEntryPointMiddleware_tracing(t *testing.T) {
tracer := &mockTracer{}
// Injection of the observability variables in the request context.
req = req.WithContext(WithObservability(req.Context(), Observability{
TracingEnabled: true,
}))
handler := newEntryPoint(t.Context(), tracing.NewTracer(tracer, []string{"X-Foo"}, []string{"X-Bar"}, []string{"q"}), test.entryPoint, next)
handler.ServeHTTP(rw, req)
for _, span := range tracer.spans {
assert.Equal(t, test.expected.name, span.name)
assert.Equal(t, test.expected.attributes, span.attributes)
}
require.Len(t, tracer.spans, 1)
assert.Equal(t, test.expected.name, tracer.spans[0].name)
assert.Equal(t, test.expected.attributes, tracer.spans[0].attributes)
})
}
}

View file

@ -37,7 +37,7 @@ func (t *mockTracer) Start(ctx context.Context, name string, opts ...trace.SpanS
return trace.ContextWithSpan(ctx, span), span
}
// mockSpan is an implementation of Span that preforms no operations.
// mockSpan is an implementation of Span that performs no operations.
type mockSpan struct {
embedded.Span

View file

@ -30,7 +30,7 @@ func TestNewRouter(t *testing.T) {
routerRule: "Path(`/`)",
expected: []expected{
{
name: "EntryPoint",
name: "GET",
attributes: []attribute.KeyValue{
attribute.String("span.kind", "server"),
},
@ -63,7 +63,7 @@ func TestNewRouter(t *testing.T) {
req.Header.Set("User-Agent", "router-test")
tracer := &mockTracer{}
tracingCtx, entryPointSpan := tracer.Start(req.Context(), "EntryPoint", trace.WithSpanKind(trace.SpanKindServer))
tracingCtx, entryPointSpan := tracer.Start(req.Context(), http.MethodGet, trace.WithSpanKind(trace.SpanKindServer))
defer entryPointSpan.End()
req = req.WithContext(tracingCtx)

View file

@ -26,7 +26,7 @@ func TestNewService(t *testing.T) {
service: "myService",
expected: []expected{
{
name: "EntryPoint",
name: "GET",
attributes: []attribute.KeyValue{
attribute.String("span.kind", "server"),
},
@ -57,7 +57,7 @@ func TestNewService(t *testing.T) {
req.Header.Set("User-Agent", "service-test")
tracer := &mockTracer{}
tracingCtx, entryPointSpan := tracer.Start(req.Context(), "EntryPoint", trace.WithSpanKind(trace.SpanKindServer))
tracingCtx, entryPointSpan := tracer.Start(req.Context(), http.MethodGet, trace.WithSpanKind(trace.SpanKindServer))
defer entryPointSpan.End()
req = req.WithContext(tracingCtx)

View file

@ -1,22 +1,21 @@
package docker
import (
dockertypes "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/network"
"github.com/docker/docker/api/types/swarm"
containertypes "github.com/docker/docker/api/types/container"
networktypes "github.com/docker/docker/api/types/network"
swarmtypes "github.com/docker/docker/api/types/swarm"
"github.com/docker/go-connections/nat"
)
func containerJSON(ops ...func(*dockertypes.ContainerJSON)) dockertypes.ContainerJSON {
c := &dockertypes.ContainerJSON{
ContainerJSONBase: &dockertypes.ContainerJSONBase{
func containerJSON(ops ...func(*containertypes.InspectResponse)) containertypes.InspectResponse {
c := &containertypes.InspectResponse{
ContainerJSONBase: &containertypes.ContainerJSONBase{
Name: "fake",
HostConfig: &container.HostConfig{},
HostConfig: &containertypes.HostConfig{},
},
Config: &container.Config{},
NetworkSettings: &dockertypes.NetworkSettings{
NetworkSettingsBase: dockertypes.NetworkSettingsBase{},
Config: &containertypes.Config{},
NetworkSettings: &containertypes.NetworkSettings{
NetworkSettingsBase: containertypes.NetworkSettingsBase{},
},
}
@ -27,58 +26,50 @@ func containerJSON(ops ...func(*dockertypes.ContainerJSON)) dockertypes.Containe
return *c
}
func name(name string) func(*dockertypes.ContainerJSON) {
return func(c *dockertypes.ContainerJSON) {
func name(name string) func(*containertypes.InspectResponse) {
return func(c *containertypes.InspectResponse) {
c.ContainerJSONBase.Name = name
}
}
func networkMode(mode string) func(*dockertypes.ContainerJSON) {
return func(c *dockertypes.ContainerJSON) {
c.ContainerJSONBase.HostConfig.NetworkMode = container.NetworkMode(mode)
func networkMode(mode string) func(*containertypes.InspectResponse) {
return func(c *containertypes.InspectResponse) {
c.ContainerJSONBase.HostConfig.NetworkMode = containertypes.NetworkMode(mode)
}
}
func nodeIP(ip string) func(*dockertypes.ContainerJSON) {
return func(c *dockertypes.ContainerJSON) {
c.ContainerJSONBase.Node = &dockertypes.ContainerNode{
IPAddress: ip,
}
}
}
func ports(portMap nat.PortMap) func(*dockertypes.ContainerJSON) {
return func(c *dockertypes.ContainerJSON) {
func ports(portMap nat.PortMap) func(*containertypes.InspectResponse) {
return func(c *containertypes.InspectResponse) {
c.NetworkSettings.NetworkSettingsBase.Ports = portMap
}
}
func withNetwork(name string, ops ...func(*network.EndpointSettings)) func(*dockertypes.ContainerJSON) {
return func(c *dockertypes.ContainerJSON) {
func withNetwork(name string, ops ...func(*networktypes.EndpointSettings)) func(*containertypes.InspectResponse) {
return func(c *containertypes.InspectResponse) {
if c.NetworkSettings.Networks == nil {
c.NetworkSettings.Networks = map[string]*network.EndpointSettings{}
c.NetworkSettings.Networks = map[string]*networktypes.EndpointSettings{}
}
c.NetworkSettings.Networks[name] = &network.EndpointSettings{}
c.NetworkSettings.Networks[name] = &networktypes.EndpointSettings{}
for _, op := range ops {
op(c.NetworkSettings.Networks[name])
}
}
}
func ipv4(ip string) func(*network.EndpointSettings) {
return func(s *network.EndpointSettings) {
func ipv4(ip string) func(*networktypes.EndpointSettings) {
return func(s *networktypes.EndpointSettings) {
s.IPAddress = ip
}
}
func ipv6(ip string) func(*network.EndpointSettings) {
return func(s *network.EndpointSettings) {
func ipv6(ip string) func(*networktypes.EndpointSettings) {
return func(s *networktypes.EndpointSettings) {
s.GlobalIPv6Address = ip
}
}
func swarmTask(id string, ops ...func(*swarm.Task)) swarm.Task {
task := &swarm.Task{
func swarmTask(id string, ops ...func(*swarmtypes.Task)) swarmtypes.Task {
task := &swarmtypes.Task{
ID: id,
}
@ -89,22 +80,28 @@ func swarmTask(id string, ops ...func(*swarm.Task)) swarm.Task {
return *task
}
func taskSlot(slot int) func(*swarm.Task) {
return func(task *swarm.Task) {
func taskSlot(slot int) func(*swarmtypes.Task) {
return func(task *swarmtypes.Task) {
task.Slot = slot
}
}
func taskNetworkAttachment(id, name, driver string, addresses []string) func(*swarm.Task) {
return func(task *swarm.Task) {
task.NetworksAttachments = append(task.NetworksAttachments, swarm.NetworkAttachment{
Network: swarm.Network{
func taskNodeID(id string) func(*swarmtypes.Task) {
return func(task *swarmtypes.Task) {
task.NodeID = id
}
}
func taskNetworkAttachment(id, name, driver string, addresses []string) func(*swarmtypes.Task) {
return func(task *swarmtypes.Task) {
task.NetworksAttachments = append(task.NetworksAttachments, swarmtypes.NetworkAttachment{
Network: swarmtypes.Network{
ID: id,
Spec: swarm.NetworkSpec{
Annotations: swarm.Annotations{
Spec: swarmtypes.NetworkSpec{
Annotations: swarmtypes.Annotations{
Name: name,
},
DriverConfiguration: &swarm.Driver{
DriverConfiguration: &swarmtypes.Driver{
Name: driver,
},
},
@ -114,9 +111,9 @@ func taskNetworkAttachment(id, name, driver string, addresses []string) func(*sw
}
}
func taskStatus(ops ...func(*swarm.TaskStatus)) func(*swarm.Task) {
return func(task *swarm.Task) {
status := &swarm.TaskStatus{}
func taskStatus(ops ...func(*swarmtypes.TaskStatus)) func(*swarmtypes.Task) {
return func(task *swarmtypes.Task) {
status := &swarmtypes.TaskStatus{}
for _, op := range ops {
op(status)
@ -126,25 +123,25 @@ func taskStatus(ops ...func(*swarm.TaskStatus)) func(*swarm.Task) {
}
}
func taskState(state swarm.TaskState) func(*swarm.TaskStatus) {
return func(status *swarm.TaskStatus) {
func taskState(state swarmtypes.TaskState) func(*swarmtypes.TaskStatus) {
return func(status *swarmtypes.TaskStatus) {
status.State = state
}
}
func taskContainerStatus(id string) func(*swarm.TaskStatus) {
return func(status *swarm.TaskStatus) {
status.ContainerStatus = &swarm.ContainerStatus{
func taskContainerStatus(id string) func(*swarmtypes.TaskStatus) {
return func(status *swarmtypes.TaskStatus) {
status.ContainerStatus = &swarmtypes.ContainerStatus{
ContainerID: id,
}
}
}
func swarmService(ops ...func(*swarm.Service)) swarm.Service {
service := &swarm.Service{
func swarmService(ops ...func(*swarmtypes.Service)) swarmtypes.Service {
service := &swarmtypes.Service{
ID: "serviceID",
Spec: swarm.ServiceSpec{
Annotations: swarm.Annotations{
Spec: swarmtypes.ServiceSpec{
Annotations: swarmtypes.Annotations{
Name: "defaultServiceName",
},
},
@ -157,21 +154,21 @@ func swarmService(ops ...func(*swarm.Service)) swarm.Service {
return *service
}
func serviceName(name string) func(service *swarm.Service) {
return func(service *swarm.Service) {
func serviceName(name string) func(service *swarmtypes.Service) {
return func(service *swarmtypes.Service) {
service.Spec.Annotations.Name = name
}
}
func serviceLabels(labels map[string]string) func(service *swarm.Service) {
return func(service *swarm.Service) {
func serviceLabels(labels map[string]string) func(service *swarmtypes.Service) {
return func(service *swarmtypes.Service) {
service.Spec.Annotations.Labels = labels
}
}
func withEndpoint(ops ...func(*swarm.Endpoint)) func(*swarm.Service) {
return func(service *swarm.Service) {
endpoint := &swarm.Endpoint{}
func withEndpoint(ops ...func(*swarmtypes.Endpoint)) func(*swarmtypes.Service) {
return func(service *swarmtypes.Service) {
endpoint := &swarmtypes.Endpoint{}
for _, op := range ops {
op(endpoint)
@ -181,21 +178,21 @@ func withEndpoint(ops ...func(*swarm.Endpoint)) func(*swarm.Service) {
}
}
func virtualIP(networkID, addr string) func(*swarm.Endpoint) {
return func(endpoint *swarm.Endpoint) {
func virtualIP(networkID, addr string) func(*swarmtypes.Endpoint) {
return func(endpoint *swarmtypes.Endpoint) {
if endpoint.VirtualIPs == nil {
endpoint.VirtualIPs = []swarm.EndpointVirtualIP{}
endpoint.VirtualIPs = []swarmtypes.EndpointVirtualIP{}
}
endpoint.VirtualIPs = append(endpoint.VirtualIPs, swarm.EndpointVirtualIP{
endpoint.VirtualIPs = append(endpoint.VirtualIPs, swarmtypes.EndpointVirtualIP{
NetworkID: networkID,
Addr: addr,
})
}
}
func withEndpointSpec(ops ...func(*swarm.EndpointSpec)) func(*swarm.Service) {
return func(service *swarm.Service) {
endpointSpec := &swarm.EndpointSpec{}
func withEndpointSpec(ops ...func(*swarmtypes.EndpointSpec)) func(*swarmtypes.Service) {
return func(service *swarmtypes.Service) {
endpointSpec := &swarmtypes.EndpointSpec{}
for _, op := range ops {
op(endpointSpec)
@ -205,10 +202,10 @@ func withEndpointSpec(ops ...func(*swarm.EndpointSpec)) func(*swarm.Service) {
}
}
func modeDNSRR(spec *swarm.EndpointSpec) {
spec.Mode = swarm.ResolutionModeDNSRR
func modeDNSRR(spec *swarmtypes.EndpointSpec) {
spec.Mode = swarmtypes.ResolutionModeDNSRR
}
func modeVIP(spec *swarm.EndpointSpec) {
spec.Mode = swarm.ResolutionModeVIP
func modeVIP(spec *swarmtypes.EndpointSpec) {
spec.Mode = swarmtypes.ResolutionModeVIP
}

View file

@ -7,7 +7,7 @@ import (
"net"
"strings"
dockertypes "github.com/docker/docker/api/types"
containertypes "github.com/docker/docker/api/types/container"
"github.com/docker/docker/client"
"github.com/docker/go-connections/nat"
"github.com/rs/zerolog/log"
@ -114,7 +114,7 @@ func (p *DynConfBuilder) buildTCPServiceConfiguration(ctx context.Context, conta
}
}
if container.Health != "" && container.Health != dockertypes.Healthy {
if container.Health != "" && container.Health != containertypes.Healthy {
return nil
}
@ -138,7 +138,7 @@ func (p *DynConfBuilder) buildUDPServiceConfiguration(ctx context.Context, conta
}
}
if container.Health != "" && container.Health != dockertypes.Healthy {
if container.Health != "" && container.Health != containertypes.Healthy {
return nil
}
@ -164,7 +164,7 @@ func (p *DynConfBuilder) buildServiceConfiguration(ctx context.Context, containe
}
}
if container.Health != "" && container.Health != dockertypes.Healthy {
if container.Health != "" && container.Health != containertypes.Healthy {
return nil
}
@ -196,7 +196,7 @@ func (p *DynConfBuilder) keepContainer(ctx context.Context, container dockerData
return false
}
if !p.AllowEmptyServices && container.Health != "" && container.Health != dockertypes.Healthy {
if !p.AllowEmptyServices && container.Health != "" && container.Health != containertypes.Healthy {
logger.Debug().Msg("Filtering unhealthy or starting container")
return false
}
@ -344,8 +344,8 @@ func (p *DynConfBuilder) getIPAddress(ctx context.Context, container dockerData)
}
if container.NetworkSettings.NetworkMode.IsHost() {
if container.Node != nil && container.Node.IPAddress != "" {
return container.Node.IPAddress
if container.NodeIP != "" {
return container.NodeIP
}
if host, err := net.LookupHost("host.docker.internal"); err == nil {
return host[0]

View file

@ -5,9 +5,9 @@ import (
"testing"
"time"
docker "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/network"
"github.com/docker/docker/api/types/swarm"
containertypes "github.com/docker/docker/api/types/container"
networktypes "github.com/docker/docker/api/types/network"
swarmtypes "github.com/docker/docker/api/types/swarm"
"github.com/docker/go-connections/nat"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -2746,7 +2746,7 @@ func TestDynConfBuilder_build(t *testing.T) {
{
ServiceName: "Test",
Name: "Test",
Health: docker.Unhealthy,
Health: containertypes.Unhealthy,
},
},
expected: &dynamic.Configuration{
@ -2778,7 +2778,7 @@ func TestDynConfBuilder_build(t *testing.T) {
{
ServiceName: "Test",
Name: "Test",
Health: docker.Unhealthy,
Health: containertypes.Unhealthy,
},
},
expected: &dynamic.Configuration{
@ -2825,7 +2825,7 @@ func TestDynConfBuilder_build(t *testing.T) {
{
ServiceName: "Test",
Name: "Test",
Health: docker.Unhealthy,
Health: containertypes.Unhealthy,
Labels: map[string]string{
"traefik.tcp.routers.foo.rule": "HostSNI(`foo.bar`)",
},
@ -2860,7 +2860,7 @@ func TestDynConfBuilder_build(t *testing.T) {
{
ServiceName: "Test",
Name: "Test",
Health: docker.Unhealthy,
Health: containertypes.Unhealthy,
Labels: map[string]string{
"traefik.tcp.routers.foo.rule": "HostSNI(`foo.bar`)",
},
@ -2903,7 +2903,7 @@ func TestDynConfBuilder_build(t *testing.T) {
{
ServiceName: "Test",
Name: "Test",
Health: docker.Unhealthy,
Health: containertypes.Unhealthy,
Labels: map[string]string{
"traefik.udp.routers.foo": "true",
},
@ -2941,7 +2941,7 @@ func TestDynConfBuilder_build(t *testing.T) {
Labels: map[string]string{
"traefik.udp.routers.foo": "true",
},
Health: docker.Unhealthy,
Health: containertypes.Unhealthy,
},
},
expected: &dynamic.Configuration{
@ -3944,7 +3944,7 @@ func TestDynConfBuilder_getIPPort_docker(t *testing.T) {
testCases := []struct {
desc string
container docker.ContainerJSON
container containertypes.InspectResponse
serverPort string
expected expected
}{
@ -4115,8 +4115,9 @@ func TestDynConfBuilder_getIPPort_docker(t *testing.T) {
func TestDynConfBuilder_getIPAddress_docker(t *testing.T) {
testCases := []struct {
desc string
container docker.ContainerJSON
container containertypes.InspectResponse
network string
nodeIP string
expected string
}{
{
@ -4192,10 +4193,10 @@ func TestDynConfBuilder_getIPAddress_docker(t *testing.T) {
expected: "127.0.0.1",
},
{
desc: "no network, no network label, mode host, node IP",
desc: "no network, no network label, mode host, node IP",
nodeIP: "10.0.0.5",
container: containerJSON(
networkMode("host"),
nodeIP("10.0.0.5"),
),
expected: "10.0.0.5",
},
@ -4210,6 +4211,9 @@ func TestDynConfBuilder_getIPAddress_docker(t *testing.T) {
}
dData := parseContainer(test.container)
if test.nodeIP != "" {
dData.NodeIP = test.nodeIP
}
dData.ExtraConf.Network = conf.Network
if len(test.network) > 0 {
@ -4226,14 +4230,14 @@ func TestDynConfBuilder_getIPAddress_docker(t *testing.T) {
func TestDynConfBuilder_getIPAddress_swarm(t *testing.T) {
testCases := []struct {
service swarm.Service
service swarmtypes.Service
expected string
networks map[string]*network.Summary
networks map[string]*networktypes.Summary
}{
{
service: swarmService(withEndpointSpec(modeDNSRR)),
expected: "",
networks: map[string]*network.Summary{},
networks: map[string]*networktypes.Summary{},
},
{
service: swarmService(
@ -4241,7 +4245,7 @@ func TestDynConfBuilder_getIPAddress_swarm(t *testing.T) {
withEndpoint(virtualIP("1", "10.11.12.13/24")),
),
expected: "10.11.12.13",
networks: map[string]*network.Summary{
networks: map[string]*networktypes.Summary{
"1": {
Name: "foo",
},
@ -4259,7 +4263,7 @@ func TestDynConfBuilder_getIPAddress_swarm(t *testing.T) {
),
),
expected: "10.11.12.99",
networks: map[string]*network.Summary{
networks: map[string]*networktypes.Summary{
"1": {
Name: "foonet",
},

View file

@ -1,8 +1,7 @@
package docker
import (
dockertypes "github.com/docker/docker/api/types"
dockercontainertypes "github.com/docker/docker/api/types/container"
containertypes "github.com/docker/docker/api/types/container"
"github.com/docker/go-connections/nat"
)
@ -14,13 +13,13 @@ type dockerData struct {
Labels map[string]string // List of labels set to container or service
NetworkSettings networkSettings
Health string
Node *dockertypes.ContainerNode
NodeIP string // Only filled in Swarm mode.
ExtraConf configuration
}
// NetworkSettings holds the networks data to the provider.
type networkSettings struct {
NetworkMode dockercontainertypes.NetworkMode
NetworkMode containertypes.NetworkMode
Ports nat.PortMap
Networks map[string]*networkData
}

View file

@ -9,7 +9,6 @@ import (
"time"
"github.com/cenkalti/backoff/v4"
dockertypes "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
eventtypes "github.com/docker/docker/api/types/events"
"github.com/docker/docker/api/types/filters"
@ -104,7 +103,7 @@ func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe.
if p.Watch {
f := filters.NewArgs()
f.Add("type", "container")
options := dockertypes.EventsOptions{
options := eventtypes.ListOptions{
Filters: f,
}

View file

@ -8,8 +8,8 @@ import (
"time"
"github.com/cenkalti/backoff/v4"
dockertypes "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/filters"
networktypes "github.com/docker/docker/api/types/network"
swarmtypes "github.com/docker/docker/api/types/swarm"
"github.com/docker/docker/api/types/versions"
"github.com/docker/docker/client"
@ -161,7 +161,7 @@ func (p *SwarmProvider) Provide(configurationChan chan<- dynamic.Message, pool *
func (p *SwarmProvider) listServices(ctx context.Context, dockerClient client.APIClient) ([]dockerData, error) {
logger := log.Ctx(ctx)
serviceList, err := dockerClient.ServiceList(ctx, dockertypes.ServiceListOptions{})
serviceList, err := dockerClient.ServiceList(ctx, swarmtypes.ServiceListOptions{})
if err != nil {
return nil, err
}
@ -179,13 +179,13 @@ func (p *SwarmProvider) listServices(ctx context.Context, dockerClient client.AP
networkListArgs.Add("driver", "overlay")
}
networkList, err := dockerClient.NetworkList(ctx, dockertypes.NetworkListOptions{Filters: networkListArgs})
networkList, err := dockerClient.NetworkList(ctx, networktypes.ListOptions{Filters: networkListArgs})
if err != nil {
logger.Debug().Err(err).Msg("Failed to network inspect on client for docker")
return nil, err
}
networkMap := make(map[string]*dockertypes.NetworkResource)
networkMap := make(map[string]*networktypes.Summary)
for _, network := range networkList {
networkMap[network.ID] = &network
}
@ -218,7 +218,7 @@ func (p *SwarmProvider) listServices(ctx context.Context, dockerClient client.AP
return dockerDataList, err
}
func (p *SwarmProvider) parseService(ctx context.Context, service swarmtypes.Service, networkMap map[string]*dockertypes.NetworkResource) (dockerData, error) {
func (p *SwarmProvider) parseService(ctx context.Context, service swarmtypes.Service, networkMap map[string]*networktypes.Summary) (dockerData, error) {
logger := log.Ctx(ctx)
dData := dockerData{
@ -267,13 +267,13 @@ func (p *SwarmProvider) parseService(ctx context.Context, service swarmtypes.Ser
}
func listTasks(ctx context.Context, dockerClient client.APIClient, serviceID string,
serviceDockerData dockerData, networkMap map[string]*dockertypes.NetworkResource, isGlobalSvc bool,
serviceDockerData dockerData, networkMap map[string]*networktypes.Summary, isGlobalSvc bool,
) ([]dockerData, error) {
serviceIDFilter := filters.NewArgs()
serviceIDFilter.Add("service", serviceID)
serviceIDFilter.Add("desired-state", "running")
taskList, err := dockerClient.TaskList(ctx, dockertypes.TaskListOptions{Filters: serviceIDFilter})
taskList, err := dockerClient.TaskList(ctx, swarmtypes.TaskListOptions{Filters: serviceIDFilter})
if err != nil {
return nil, err
}
@ -283,7 +283,11 @@ func listTasks(ctx context.Context, dockerClient client.APIClient, serviceID str
if task.Status.State != swarmtypes.TaskStateRunning {
continue
}
dData := parseTasks(ctx, task, serviceDockerData, networkMap, isGlobalSvc)
dData, err := parseTasks(ctx, dockerClient, task, serviceDockerData, networkMap, isGlobalSvc)
if err != nil {
log.Ctx(ctx).Warn().Err(err).Msgf("Error while parsing task %s", getServiceName(dData))
continue
}
if len(dData.NetworkSettings.Networks) > 0 {
dockerDataList = append(dockerDataList, dData)
}
@ -291,9 +295,9 @@ func listTasks(ctx context.Context, dockerClient client.APIClient, serviceID str
return dockerDataList, err
}
func parseTasks(ctx context.Context, task swarmtypes.Task, serviceDockerData dockerData,
networkMap map[string]*dockertypes.NetworkResource, isGlobalSvc bool,
) dockerData {
func parseTasks(ctx context.Context, dockerClient client.APIClient, task swarmtypes.Task, serviceDockerData dockerData,
networkMap map[string]*networktypes.Summary, isGlobalSvc bool,
) (dockerData, error) {
dData := dockerData{
ID: task.ID,
ServiceName: serviceDockerData.Name,
@ -307,6 +311,14 @@ func parseTasks(ctx context.Context, task swarmtypes.Task, serviceDockerData doc
dData.Name = serviceDockerData.Name + "." + task.ID
}
if task.NodeID != "" {
node, _, err := dockerClient.NodeInspectWithRaw(ctx, task.NodeID)
if err != nil {
return dockerData{}, fmt.Errorf("inspecting node %s: %w", task.NodeID, err)
}
dData.NodeIP = node.Status.Addr
}
if task.NetworksAttachments != nil {
dData.NetworkSettings.Networks = make(map[string]*networkData)
for _, virtualIP := range task.NetworksAttachments {
@ -328,5 +340,5 @@ func parseTasks(ctx context.Context, task swarmtypes.Task, serviceDockerData doc
}
}
}
return dData
return dData, nil
}

View file

@ -4,35 +4,47 @@ import (
"context"
dockertypes "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/swarm"
containertypes "github.com/docker/docker/api/types/container"
networktypes "github.com/docker/docker/api/types/network"
swarmtypes "github.com/docker/docker/api/types/swarm"
dockerclient "github.com/docker/docker/client"
)
type fakeTasksClient struct {
dockerclient.APIClient
tasks []swarm.Task
container dockertypes.ContainerJSON
tasks []swarmtypes.Task
container containertypes.InspectResponse
err error
}
func (c *fakeTasksClient) TaskList(ctx context.Context, options dockertypes.TaskListOptions) ([]swarm.Task, error) {
func (c *fakeTasksClient) TaskList(ctx context.Context, options swarmtypes.TaskListOptions) ([]swarmtypes.Task, error) {
return c.tasks, c.err
}
func (c *fakeTasksClient) ContainerInspect(ctx context.Context, container string) (dockertypes.ContainerJSON, error) {
func (c *fakeTasksClient) ContainerInspect(ctx context.Context, container string) (containertypes.InspectResponse, error) {
return c.container, c.err
}
type fakeServicesClient struct {
dockerclient.APIClient
dockerVersion string
networks []dockertypes.NetworkResource
services []swarm.Service
tasks []swarm.Task
networks []networktypes.Summary
nodes []swarmtypes.Node
services []swarmtypes.Service
tasks []swarmtypes.Task
err error
}
func (c *fakeServicesClient) ServiceList(ctx context.Context, options dockertypes.ServiceListOptions) ([]swarm.Service, error) {
func (c *fakeServicesClient) NodeInspectWithRaw(ctx context.Context, nodeID string) (swarmtypes.Node, []byte, error) {
for _, node := range c.nodes {
if node.ID == nodeID {
return node, nil, nil
}
}
return swarmtypes.Node{}, nil, c.err
}
func (c *fakeServicesClient) ServiceList(ctx context.Context, options swarmtypes.ServiceListOptions) ([]swarmtypes.Service, error) {
return c.services, c.err
}
@ -40,10 +52,10 @@ func (c *fakeServicesClient) ServerVersion(ctx context.Context) (dockertypes.Ver
return dockertypes.Version{APIVersion: c.dockerVersion}, c.err
}
func (c *fakeServicesClient) NetworkList(ctx context.Context, options dockertypes.NetworkListOptions) ([]dockertypes.NetworkResource, error) {
func (c *fakeServicesClient) NetworkList(ctx context.Context, options networktypes.ListOptions) ([]networktypes.Summary, error) {
return c.networks, c.err
}
func (c *fakeServicesClient) TaskList(ctx context.Context, options dockertypes.TaskListOptions) ([]swarm.Task, error) {
func (c *fakeServicesClient) TaskList(ctx context.Context, options swarmtypes.TaskListOptions) ([]swarmtypes.Task, error) {
return c.tasks, c.err
}

View file

@ -5,32 +5,32 @@ import (
"testing"
"time"
"github.com/docker/docker/api/types/network"
"github.com/docker/docker/api/types/swarm"
networktypes "github.com/docker/docker/api/types/network"
swarmtypes "github.com/docker/docker/api/types/swarm"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestListTasks(t *testing.T) {
testCases := []struct {
service swarm.Service
tasks []swarm.Task
service swarmtypes.Service
tasks []swarmtypes.Task
isGlobalSVC bool
expectedTasks []string
networks map[string]*network.Summary
networks map[string]*networktypes.Summary
}{
{
service: swarmService(serviceName("container")),
tasks: []swarm.Task{
tasks: []swarmtypes.Task{
swarmTask("id1",
taskSlot(1),
taskNetworkAttachment("1", "network1", "overlay", []string{"127.0.0.1"}),
taskStatus(taskState(swarm.TaskStateRunning)),
taskStatus(taskState(swarmtypes.TaskStateRunning)),
),
swarmTask("id2",
taskSlot(2),
taskNetworkAttachment("1", "network1", "overlay", []string{"127.0.0.2"}),
taskStatus(taskState(swarm.TaskStatePending)),
taskStatus(taskState(swarmtypes.TaskStatePending)),
),
swarmTask("id3",
taskSlot(3),
@ -39,12 +39,12 @@ func TestListTasks(t *testing.T) {
swarmTask("id4",
taskSlot(4),
taskNetworkAttachment("1", "network1", "overlay", []string{"127.0.0.4"}),
taskStatus(taskState(swarm.TaskStateRunning)),
taskStatus(taskState(swarmtypes.TaskStateRunning)),
),
swarmTask("id5",
taskSlot(5),
taskNetworkAttachment("1", "network1", "overlay", []string{"127.0.0.5"}),
taskStatus(taskState(swarm.TaskStateFailed)),
taskStatus(taskState(swarmtypes.TaskStateFailed)),
),
},
isGlobalSVC: false,
@ -52,7 +52,7 @@ func TestListTasks(t *testing.T) {
"container.1",
"container.4",
},
networks: map[string]*network.Summary{
networks: map[string]*networktypes.Summary{
"1": {
Name: "foo",
},
@ -89,15 +89,15 @@ func TestListTasks(t *testing.T) {
func TestSwarmProvider_listServices(t *testing.T) {
testCases := []struct {
desc string
services []swarm.Service
tasks []swarm.Task
services []swarmtypes.Service
tasks []swarmtypes.Task
dockerVersion string
networks []network.Summary
networks []networktypes.Summary
expectedServices []string
}{
{
desc: "Should return no service due to no networks defined",
services: []swarm.Service{
services: []swarmtypes.Service{
swarmService(
serviceName("service1"),
serviceLabels(map[string]string{
@ -118,12 +118,12 @@ func TestSwarmProvider_listServices(t *testing.T) {
withEndpointSpec(modeDNSRR)),
},
dockerVersion: "1.30",
networks: []network.Summary{},
networks: []networktypes.Summary{},
expectedServices: []string{},
},
{
desc: "Should return only service1",
services: []swarm.Service{
services: []swarmtypes.Service{
swarmService(
serviceName("service1"),
serviceLabels(map[string]string{
@ -144,7 +144,7 @@ func TestSwarmProvider_listServices(t *testing.T) {
withEndpointSpec(modeDNSRR)),
},
dockerVersion: "1.30",
networks: []network.Summary{
networks: []networktypes.Summary{
{
Name: "network_name",
ID: "yk6l57rfwizjzxxzftn4amaot",
@ -156,8 +156,8 @@ func TestSwarmProvider_listServices(t *testing.T) {
Ingress: false,
ConfigOnly: false,
Options: map[string]string{
"com.docker.network.driver.overlay.vxlanid_list": "4098",
"com.docker.network.enable_ipv6": "false",
"com.docker.networktypes.driver.overlay.vxlanid_list": "4098",
"com.docker.networktypes.enable_ipv6": "false",
},
Labels: map[string]string{
"com.docker.stack.namespace": "test",
@ -170,7 +170,7 @@ func TestSwarmProvider_listServices(t *testing.T) {
},
{
desc: "Should return service1 and service2",
services: []swarm.Service{
services: []swarmtypes.Service{
swarmService(
serviceName("service1"),
serviceLabels(map[string]string{
@ -188,18 +188,18 @@ func TestSwarmProvider_listServices(t *testing.T) {
}),
withEndpointSpec(modeDNSRR)),
},
tasks: []swarm.Task{
tasks: []swarmtypes.Task{
swarmTask("id1",
taskNetworkAttachment("yk6l57rfwizjzxxzftn4amaot", "network_name", "overlay", []string{"127.0.0.1"}),
taskStatus(taskState(swarm.TaskStateRunning)),
taskStatus(taskState(swarmtypes.TaskStateRunning)),
),
swarmTask("id2",
taskNetworkAttachment("yk6l57rfwizjzxxzftn4amaot", "network_name", "overlay", []string{"127.0.0.1"}),
taskStatus(taskState(swarm.TaskStateRunning)),
taskStatus(taskState(swarmtypes.TaskStateRunning)),
),
},
dockerVersion: "1.30",
networks: []network.Summary{
networks: []networktypes.Summary{
{
Name: "network_name",
ID: "yk6l57rfwizjzxxzftn4amaot",
@ -211,8 +211,8 @@ func TestSwarmProvider_listServices(t *testing.T) {
Ingress: false,
ConfigOnly: false,
Options: map[string]string{
"com.docker.network.driver.overlay.vxlanid_list": "4098",
"com.docker.network.enable_ipv6": "false",
"com.docker.networktypes.driver.overlay.vxlanid_list": "4098",
"com.docker.networktypes.enable_ipv6": "false",
},
Labels: map[string]string{
"com.docker.stack.namespace": "test",
@ -253,15 +253,16 @@ func TestSwarmProvider_listServices(t *testing.T) {
func TestSwarmProvider_parseService_task(t *testing.T) {
testCases := []struct {
service swarm.Service
tasks []swarm.Task
service swarmtypes.Service
tasks []swarmtypes.Task
nodes []swarmtypes.Node
isGlobalSVC bool
expected map[string]dockerData
networks map[string]*network.Summary
networks map[string]*networktypes.Summary
}{
{
service: swarmService(serviceName("container")),
tasks: []swarm.Task{
tasks: []swarmtypes.Task{
swarmTask("id1", taskSlot(1)),
swarmTask("id2", taskSlot(2)),
swarmTask("id3", taskSlot(3)),
@ -278,7 +279,7 @@ func TestSwarmProvider_parseService_task(t *testing.T) {
Name: "container.3",
},
},
networks: map[string]*network.Summary{
networks: map[string]*networktypes.Summary{
"1": {
Name: "foo",
},
@ -286,7 +287,7 @@ func TestSwarmProvider_parseService_task(t *testing.T) {
},
{
service: swarmService(serviceName("container")),
tasks: []swarm.Task{
tasks: []swarmtypes.Task{
swarmTask("id1"),
swarmTask("id2"),
swarmTask("id3"),
@ -303,7 +304,7 @@ func TestSwarmProvider_parseService_task(t *testing.T) {
Name: "container.id3",
},
},
networks: map[string]*network.Summary{
networks: map[string]*networktypes.Summary{
"1": {
Name: "foo",
},
@ -317,12 +318,12 @@ func TestSwarmProvider_parseService_task(t *testing.T) {
virtualIP("1", ""),
),
),
tasks: []swarm.Task{
tasks: []swarmtypes.Task{
swarmTask(
"id1",
taskNetworkAttachment("1", "vlan", "macvlan", []string{"127.0.0.1"}),
taskStatus(
taskState(swarm.TaskStateRunning),
taskState(swarmtypes.TaskStateRunning),
taskContainerStatus("c1"),
),
),
@ -341,12 +342,40 @@ func TestSwarmProvider_parseService_task(t *testing.T) {
},
},
},
networks: map[string]*network.Summary{
networks: map[string]*networktypes.Summary{
"1": {
Name: "vlan",
},
},
},
{
service: swarmService(serviceName("container")),
tasks: []swarmtypes.Task{
swarmTask("id1",
taskSlot(1),
taskNodeID("id1"),
),
},
nodes: []swarmtypes.Node{
{
ID: "id1",
Status: swarmtypes.NodeStatus{
Addr: "10.11.12.13",
},
},
},
expected: map[string]dockerData{
"id1": {
Name: "container.1",
NodeIP: "10.11.12.13",
},
},
networks: map[string]*networktypes.Summary{
"1": {
Name: "foo",
},
},
},
}
for caseID, test := range testCases {
@ -359,10 +388,18 @@ func TestSwarmProvider_parseService_task(t *testing.T) {
dData, err := p.parseService(t.Context(), test.service, test.networks)
require.NoError(t, err)
dockerClient := &fakeServicesClient{
tasks: test.tasks,
nodes: test.nodes,
}
for _, task := range test.tasks {
taskDockerData := parseTasks(t.Context(), task, dData, test.networks, test.isGlobalSVC)
taskDockerData, err := parseTasks(t.Context(), dockerClient, task, dData, test.networks, test.isGlobalSVC)
require.NoError(t, err)
expected := test.expected[task.ID]
assert.Equal(t, expected.Name, taskDockerData.Name)
assert.Equal(t, expected.NodeIP, taskDockerData.NodeIP)
}
})
}

View file

@ -9,7 +9,7 @@ import (
"time"
"github.com/docker/cli/cli/connhelper"
dockertypes "github.com/docker/docker/api/types"
containertypes "github.com/docker/docker/api/types/container"
"github.com/docker/docker/client"
"github.com/docker/go-connections/nat"
"github.com/docker/go-connections/sockets"
@ -52,7 +52,7 @@ func inspectContainers(ctx context.Context, dockerClient client.ContainerAPIClie
return dockerData{}
}
func parseContainer(container dockertypes.ContainerJSON) dockerData {
func parseContainer(container containertypes.InspectResponse) dockerData {
dData := dockerData{
NetworkSettings: networkSettings{},
}
@ -61,7 +61,6 @@ func parseContainer(container dockertypes.ContainerJSON) dockerData {
dData.ID = container.ContainerJSONBase.ID
dData.Name = container.ContainerJSONBase.Name
dData.ServiceName = dData.Name // Default ServiceName to be the container's Name.
dData.Node = container.ContainerJSONBase.Node
if container.ContainerJSONBase.HostConfig != nil {
dData.NetworkSettings.NetworkMode = container.ContainerJSONBase.HostConfig.NetworkMode

View file

@ -4,8 +4,9 @@ import (
"strconv"
"testing"
docker "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/swarm"
containertypes "github.com/docker/docker/api/types/container"
networktypes "github.com/docker/docker/api/types/network"
swarmtypes "github.com/docker/docker/api/types/swarm"
"github.com/docker/go-connections/nat"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -14,7 +15,7 @@ import (
func Test_getPort_docker(t *testing.T) {
testCases := []struct {
desc string
container docker.ContainerJSON
container containertypes.InspectResponse
serverPort string
expected string
}{
@ -78,16 +79,16 @@ func Test_getPort_docker(t *testing.T) {
func Test_getPort_swarm(t *testing.T) {
testCases := []struct {
service swarm.Service
service swarmtypes.Service
serverPort string
networks map[string]*docker.NetworkResource
networks map[string]*networktypes.Summary
expected string
}{
{
service: swarmService(
withEndpointSpec(modeDNSRR),
),
networks: map[string]*docker.NetworkResource{},
networks: map[string]*networktypes.Summary{},
serverPort: "8080",
expected: "8080",
},

View file

@ -490,6 +490,10 @@ func (p *Provider) loadConfigurationFromCRD(ctx context.Context, client Client)
}
}
if serversTransportTCP.Spec.ProxyProtocol != nil {
tcpServerTransport.ProxyProtocol = serversTransportTCP.Spec.ProxyProtocol
}
if serversTransportTCP.Spec.TLS != nil {
if len(serversTransportTCP.Spec.TLS.RootCAsSecrets) > 0 {
logger.Warn().Msg("RootCAsSecrets option is deprecated, please use the RootCA option instead.")

View file

@ -86,6 +86,7 @@ type ServiceTCP struct {
TerminationDelay *int `json:"terminationDelay,omitempty"`
// ProxyProtocol defines the PROXY protocol configuration.
// More info: https://doc.traefik.io/traefik/v3.5/routing/services/#proxy-protocol
// Deprecated: ProxyProtocol will not be supported in future APIVersions, please use ServersTransport to configure ProxyProtocol instead.
ProxyProtocol *dynamic.ProxyProtocol `json:"proxyProtocol,omitempty"`
// ServersTransport defines the name of ServersTransportTCP resource to use.
// It allows to configure the transport between Traefik and your servers.

View file

@ -35,12 +35,14 @@ type ServersTransportTCPSpec struct {
// +kubebuilder:validation:Pattern="^([0-9]+(ns|us|µs|ms|s|m|h)?)+$"
// +kubebuilder:validation:XIntOrString
DialKeepAlive *intstr.IntOrString `json:"dialKeepAlive,omitempty"`
// ProxyProtocol holds the PROXY Protocol configuration.
ProxyProtocol *dynamic.ProxyProtocol `json:"proxyProtocol,omitempty"`
// TerminationDelay defines the delay to wait before fully terminating the connection, after one connected peer has closed its writing capability.
// +kubebuilder:validation:Pattern="^([0-9]+(ns|us|µs|ms|s|m|h)?)+$"
// +kubebuilder:validation:XIntOrString
TerminationDelay *intstr.IntOrString `json:"terminationDelay,omitempty"`
// TLS defines the TLS configuration
TLS *TLSClientConfig `description:"Defines the TLS configuration." json:"tls,omitempty"`
TLS *TLSClientConfig `json:"tls,omitempty"`
}
// TLSClientConfig defines the desired state of a TLSClientConfig.

View file

@ -1522,6 +1522,11 @@ func (in *ServersTransportTCPSpec) DeepCopyInto(out *ServersTransportTCPSpec) {
*out = new(intstr.IntOrString)
**out = **in
}
if in.ProxyProtocol != nil {
in, out := &in.ProxyProtocol, &out.ProxyProtocol
*out = new(dynamic.ProxyProtocol)
**out = **in
}
if in.TerminationDelay != nil {
in, out := &in.TerminationDelay, &out.TerminationDelay
*out = new(intstr.IntOrString)

View file

@ -59,3 +59,13 @@ spec:
weight: 1
kind: Service
group: ""
- name: whoami-HTTP
port: 80
weight: 1
kind: Service
group: ""
- name: whoami-HTTPS
port: 443
weight: 1
kind: Service
group: ""

View file

@ -430,6 +430,80 @@ spec:
name: wss
appProtocol: kubernetes.io/wss
---
kind: EndpointSlice
apiVersion: discovery.k8s.io/v1
metadata:
name: whoami-HTTPS-abc
namespace: default
labels:
kubernetes.io/service-name: whoami-HTTPS
addressType: IPv4
ports:
- name: websecure
port: 8443
endpoints:
- addresses:
- 10.10.0.16
conditions:
ready: true
---
apiVersion: v1
kind: Service
metadata:
name: whoami-HTTPS
namespace: default
spec:
ports:
- name: websecure
protocol: TCP
appProtocol: HTTPS
port: 443
targetPort: websecure
selector:
app: containous
task: whoami-HTTPS
---
kind: EndpointSlice
apiVersion: discovery.k8s.io/v1
metadata:
name: whoami-HTTP-abc
namespace: default
labels:
kubernetes.io/service-name: whoami-HTTP
addressType: IPv4
ports:
- name: web
port: 8080
endpoints:
- addresses:
- 10.10.0.17
conditions:
ready: true
---
apiVersion: v1
kind: Service
metadata:
name: whoami-HTTP
namespace: default
spec:
ports:
- name: web
protocol: TCP
port: 80
appProtocol: HTTP
targetPort: web
selector:
app: containous
task: whoami-HTTP
---
apiVersion: v1
kind: Service

View file

@ -430,7 +430,7 @@ func getGRPCServiceProtocol(portSpec corev1.ServicePort) (string, error) {
return schemeH2C, nil
}
switch ap := *portSpec.AppProtocol; ap {
switch ap := strings.ToLower(*portSpec.AppProtocol); ap {
case appProtocolH2C:
return schemeH2C, nil
case appProtocolHTTPS:

View file

@ -817,7 +817,7 @@ func getHTTPServiceProtocol(portSpec corev1.ServicePort) (string, error) {
return protocol, nil
}
switch ap := *portSpec.AppProtocol; ap {
switch ap := strings.ToLower(*portSpec.AppProtocol); ap {
case appProtocolH2C:
return schemeH2C, nil
case appProtocolHTTP, appProtocolWS:

View file

@ -2959,6 +2959,14 @@ func TestLoadHTTPRoutes_backendExtensionRef(t *testing.T) {
Name: "default-whoami-wss-http-80",
Weight: ptr.To(1),
},
{
Name: "default-whoami-HTTP-http-80",
Weight: ptr.To(1),
},
{
Name: "default-whoami-HTTPS-http-443",
Weight: ptr.To(1),
},
},
},
},
@ -3004,6 +3012,34 @@ func TestLoadHTTPRoutes_backendExtensionRef(t *testing.T) {
},
},
},
"default-whoami-HTTPS-http-443": {
LoadBalancer: &dynamic.ServersLoadBalancer{
Strategy: dynamic.BalancerStrategyWRR,
Servers: []dynamic.Server{
{
URL: "https://10.10.0.16:8443",
},
},
PassHostHeader: ptr.To(true),
ResponseForwarding: &dynamic.ResponseForwarding{
FlushInterval: ptypes.Duration(100 * time.Millisecond),
},
},
},
"default-whoami-HTTP-http-80": {
LoadBalancer: &dynamic.ServersLoadBalancer{
Strategy: dynamic.BalancerStrategyWRR,
Servers: []dynamic.Server{
{
URL: "http://10.10.0.17:8080",
},
},
PassHostHeader: ptr.To(true),
ResponseForwarding: &dynamic.ResponseForwarding{
FlushInterval: ptypes.Duration(100 * time.Millisecond),
},
},
},
},
ServersTransports: map[string]*dynamic.ServersTransport{},
},

View file

@ -70,7 +70,7 @@ func doOnJSON(input string) string {
}
func doOnStruct(field reflect.Value, tag string, redactByDefault bool) error {
if field.Type().AssignableTo(reflect.TypeOf(dynamic.PluginConf{})) {
if field.Type().AssignableTo(reflect.TypeFor[dynamic.PluginConf]()) {
resetPlugin(field)
return nil
}
@ -164,7 +164,7 @@ func reset(field reflect.Value, name string) error {
}
case reflect.String:
if field.String() != "" {
if field.Type().AssignableTo(reflect.TypeOf(types.FileOrContent(""))) {
if field.Type().AssignableTo(reflect.TypeFor[types.FileOrContent]()) {
field.Set(reflect.ValueOf(types.FileOrContent(maskShort)))
} else {
field.Set(reflect.ValueOf(maskShort))

View file

@ -12,6 +12,7 @@ import (
"os"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
@ -56,15 +57,19 @@ type connState struct {
type httpForwarder struct {
net.Listener
connChan chan net.Conn
errChan chan error
connChan chan net.Conn
errChan chan error
closeChan chan struct{}
closeOnce sync.Once
}
func newHTTPForwarder(ln net.Listener) *httpForwarder {
return &httpForwarder{
Listener: ln,
connChan: make(chan net.Conn),
errChan: make(chan error),
Listener: ln,
connChan: make(chan net.Conn),
errChan: make(chan error),
closeChan: make(chan struct{}),
}
}
@ -76,6 +81,8 @@ func (h *httpForwarder) ServeTCP(conn tcp.WriteCloser) {
// Accept retrieves a served connection in ServeTCP.
func (h *httpForwarder) Accept() (net.Conn, error) {
select {
case <-h.closeChan:
return nil, errors.New("listener closed")
case conn := <-h.connChan:
return conn, nil
case err := <-h.errChan:
@ -83,6 +90,14 @@ func (h *httpForwarder) Accept() (net.Conn, error) {
}
}
// Close closes the wrapped listener and unblocks Accept.
func (h *httpForwarder) Close() error {
h.closeOnce.Do(func() {
close(h.closeChan)
})
return h.Listener.Close()
}
// TCPEntryPoints holds a map of TCPEntryPoint (the entrypoint names being the keys).
type TCPEntryPoints map[string]*TCPEntryPoint
@ -162,8 +177,9 @@ type TCPEntryPoint struct {
tracker *connectionTracker
httpServer *httpServer
httpsServer *httpServer
http3Server *http3server
http3Server *http3server
// inShutdown reports whether the Shutdown method has been called.
inShutdown atomic.Bool
}
// NewTCPEntryPoint creates a new TCPEntryPoint.
@ -172,31 +188,31 @@ func NewTCPEntryPoint(ctx context.Context, name string, config *static.EntryPoin
listener, err := buildListener(ctx, name, config)
if err != nil {
return nil, fmt.Errorf("error preparing server: %w", err)
return nil, fmt.Errorf("building listener: %w", err)
}
rt, err := tcprouter.NewRouter()
if err != nil {
return nil, fmt.Errorf("error preparing tcp router: %w", err)
return nil, fmt.Errorf("creating TCP router: %w", err)
}
reqDecorator := requestdecorator.New(hostResolverConfig)
httpServer, err := createHTTPServer(ctx, listener, config, true, reqDecorator)
httpServer, err := newHTTPServer(ctx, listener, config, true, reqDecorator)
if err != nil {
return nil, fmt.Errorf("error preparing http server: %w", err)
return nil, fmt.Errorf("creating HTTP server: %w", err)
}
rt.SetHTTPForwarder(httpServer.Forwarder)
httpsServer, err := createHTTPServer(ctx, listener, config, false, reqDecorator)
httpsServer, err := newHTTPServer(ctx, listener, config, false, reqDecorator)
if err != nil {
return nil, fmt.Errorf("error preparing https server: %w", err)
return nil, fmt.Errorf("creating HTTPS server: %w", err)
}
h3Server, err := newHTTP3Server(ctx, name, config, httpsServer)
if err != nil {
return nil, fmt.Errorf("error preparing http3 server: %w", err)
return nil, fmt.Errorf("creating HTTP3 server: %w", err)
}
rt.SetHTTPSForwarder(httpsServer.Forwarder)
@ -226,6 +242,11 @@ func (e *TCPEntryPoint) Start(ctx context.Context) {
for {
conn, err := e.listener.Accept()
// As the Shutdown method has been called, an error is expected.
// Thus, it is not necessary to log it.
if err != nil && e.inShutdown.Load() {
return
}
if err != nil {
logger.Error().Err(err).Send()
@ -277,6 +298,8 @@ func (e *TCPEntryPoint) Start(ctx context.Context) {
func (e *TCPEntryPoint) Shutdown(ctx context.Context) {
logger := log.Ctx(ctx)
e.inShutdown.Store(true)
reqAcceptGraceTimeOut := time.Duration(e.transportConfiguration.LifeCycle.RequestAcceptGraceTimeout)
if reqAcceptGraceTimeOut > 0 {
logger.Info().Msgf("Waiting %s for incoming requests to cease", reqAcceptGraceTimeOut)
@ -461,6 +484,18 @@ func buildProxyProtocolListener(ctx context.Context, entryPoint *static.EntryPoi
return proxyListener, nil
}
type onceCloseListener struct {
net.Listener
once sync.Once
closeErr error
}
func (oc *onceCloseListener) Close() error {
oc.once.Do(func() { oc.closeErr = oc.Listener.Close() })
return oc.closeErr
}
func buildListener(ctx context.Context, name string, config *static.EntryPoint) (net.Listener, error) {
var listener net.Listener
var err error
@ -496,7 +531,7 @@ func buildListener(ctx context.Context, name string, config *static.EntryPoint)
return nil, fmt.Errorf("error creating proxy protocol listener: %w", err)
}
}
return listener, nil
return &onceCloseListener{Listener: listener}, nil
}
func newConnectionTracker(openConnectionsGauge gokitmetrics.Gauge) *connectionTracker {
@ -592,7 +627,7 @@ type httpServer struct {
Switcher *middlewares.HTTPHandlerSwitcher
}
func createHTTPServer(ctx context.Context, ln net.Listener, configuration *static.EntryPoint, withH2c bool, reqDecorator *requestdecorator.RequestDecorator) (*httpServer, error) {
func newHTTPServer(ctx context.Context, ln net.Listener, configuration *static.EntryPoint, withH2c bool, reqDecorator *requestdecorator.RequestDecorator) (*httpServer, error) {
if configuration.HTTP2.MaxConcurrentStreams < 0 {
return nil, errors.New("max concurrent streams value must be greater than or equal to zero")
}
@ -692,7 +727,7 @@ func createHTTPServer(ctx context.Context, ln net.Listener, configuration *stati
go func() {
err := serverHTTP.Serve(listener)
if err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Ctx(ctx).Error().Err(err).Msg("Error while starting server")
log.Ctx(ctx).Error().Err(err).Msg("Error while running HTTP server")
}
}()
return &httpServer{

View file

@ -511,7 +511,7 @@ func TestNormalizePath_malformedPercentEncoding(t *testing.T) {
}
}
// TestPathOperations tests the whole behavior of normalizePath, and sanitizePath combined through the use of the createHTTPServer func.
// TestPathOperations tests the whole behavior of normalizePath, and sanitizePath combined through the use of the newHTTPServer func.
// It aims to guarantee the server entrypoint handler is secure regarding a large variety of cases that could lead to path traversal attacks.
func TestPathOperations(t *testing.T) {
// Create a listener for the server.
@ -525,8 +525,8 @@ func TestPathOperations(t *testing.T) {
configuration := &static.EntryPoint{}
configuration.SetDefaults()
// Create the HTTP server using createHTTPServer.
server, err := createHTTPServer(t.Context(), ln, configuration, false, requestdecorator.New(nil))
// Create the HTTP server using newHTTPServer.
server, err := newHTTPServer(t.Context(), ln, configuration, false, requestdecorator.New(nil))
require.NoError(t, err)
server.Switcher.UpdateHandler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

View file

@ -13,7 +13,6 @@ import (
"github.com/traefik/traefik/v3/pkg/logs"
"github.com/traefik/traefik/v3/pkg/server/provider"
"github.com/traefik/traefik/v3/pkg/tcp"
"golang.org/x/net/proxy"
)
// Manager is the TCPHandlers factory.
@ -58,6 +57,10 @@ func (m *Manager) BuildTCP(rootCtx context.Context, serviceName string) (tcp.Han
log.Ctx(ctx).Warn().Msgf("Service %q load balancer uses `TerminationDelay`, but this option is deprecated, please use ServersTransport configuration instead.", serviceName)
}
if conf.LoadBalancer.ProxyProtocol != nil {
log.Ctx(ctx).Warn().Msgf("Service %q load balancer uses `ProxyProtocol`, but this option is deprecated, please use ServersTransport configuration instead.", serviceName)
}
if len(conf.LoadBalancer.ServersTransport) > 0 {
conf.LoadBalancer.ServersTransport = provider.GetQualifiedName(ctx, conf.LoadBalancer.ServersTransport)
}
@ -72,20 +75,12 @@ func (m *Manager) BuildTCP(rootCtx context.Context, serviceName string) (tcp.Han
continue
}
dialer, err := m.dialerManager.Get(conf.LoadBalancer.ServersTransport, server.TLS)
dialer, err := m.dialerManager.Build(conf.LoadBalancer, server.TLS)
if err != nil {
return nil, err
}
// Handle TerminationDelay deprecated option.
if conf.LoadBalancer.ServersTransport == "" && conf.LoadBalancer.TerminationDelay != nil {
dialer = &dialerWrapper{
Dialer: dialer,
terminationDelay: time.Duration(*conf.LoadBalancer.TerminationDelay),
}
}
handler, err := tcp.NewProxy(server.Address, conf.LoadBalancer.ProxyProtocol, dialer)
handler, err := tcp.NewProxy(server.Address, dialer)
if err != nil {
srvLogger.Error().Err(err).Msg("Failed to create server")
continue
@ -126,13 +121,3 @@ func shuffle[T any](values []T, r *rand.Rand) []T {
return shuffled
}
// dialerWrapper is only used to handle TerminationDelay deprecated option on TCPServersLoadBalancer.
type dialerWrapper struct {
proxy.Dialer
terminationDelay time.Duration
}
func (d dialerWrapper) TerminationDelay() time.Duration {
return d.terminationDelay
}

View file

@ -231,7 +231,7 @@ func TestManager_BuildTCP(t *testing.T) {
},
},
providerName: "provider-1",
expectedError: "TCP dialer not found myServersTransport@provider-1",
expectedError: "no transport configuration found for \"myServersTransport@provider-1\"",
},
}

View file

@ -9,11 +9,13 @@ import (
"sync"
"time"
"github.com/pires/go-proxyproto"
"github.com/rs/zerolog/log"
"github.com/spiffe/go-spiffe/v2/bundle/x509bundle"
"github.com/spiffe/go-spiffe/v2/spiffeid"
"github.com/spiffe/go-spiffe/v2/spiffetls/tlsconfig"
"github.com/spiffe/go-spiffe/v2/svid/x509svid"
ptypes "github.com/traefik/paerser/types"
"github.com/traefik/traefik/v3/pkg/config/dynamic"
traefiktls "github.com/traefik/traefik/v3/pkg/tls"
"github.com/traefik/traefik/v3/pkg/types"
@ -27,15 +29,54 @@ type Dialer interface {
}
type tcpDialer struct {
proxy.Dialer
dialer *net.Dialer
terminationDelay time.Duration
proxyProtocol *dynamic.ProxyProtocol
}
func (d tcpDialer) TerminationDelay() time.Duration {
return d.terminationDelay
}
// SpiffeX509Source allows to retrieve a x509 SVID and bundle.
func (d tcpDialer) Dial(network, addr string) (net.Conn, error) {
conn, err := d.dialer.Dial(network, addr)
if err != nil {
return nil, err
}
if d.proxyProtocol != nil && d.proxyProtocol.Version > 0 && d.proxyProtocol.Version < 3 {
header := proxyproto.HeaderProxyFromAddrs(byte(d.proxyProtocol.Version), conn.RemoteAddr(), conn.LocalAddr())
if _, err := header.WriteTo(conn); err != nil {
_ = conn.Close()
return nil, fmt.Errorf("writing PROXY Protocol header: %w", err)
}
}
return conn, nil
}
type tcpTLSDialer struct {
tcpDialer
tlsConfig *tls.Config
}
func (d tcpTLSDialer) Dial(network, addr string) (net.Conn, error) {
conn, err := d.tcpDialer.Dial(network, addr)
if err != nil {
return nil, err
}
// Now perform TLS handshake on the connection
tlsConn := tls.Client(conn, d.tlsConfig)
if err := tlsConn.Handshake(); err != nil {
_ = conn.Close()
return nil, fmt.Errorf("TLS handshake failed: %w", err)
}
return tlsConn, nil
}
// SpiffeX509Source allows retrieving a x509 SVID and bundle.
type SpiffeX509Source interface {
x509svid.Source
x509bundle.Source
@ -43,118 +84,106 @@ type SpiffeX509Source interface {
// DialerManager handles dialer for the reverse proxy.
type DialerManager struct {
rtLock sync.RWMutex
dialers map[string]Dialer
dialersTLS map[string]Dialer
spiffeX509Source SpiffeX509Source
serversTransportsMu sync.RWMutex
serversTransports map[string]*dynamic.TCPServersTransport
spiffeX509Source SpiffeX509Source
}
// NewDialerManager creates a new DialerManager.
func NewDialerManager(spiffeX509Source SpiffeX509Source) *DialerManager {
return &DialerManager{
dialers: make(map[string]Dialer),
dialersTLS: make(map[string]Dialer),
spiffeX509Source: spiffeX509Source,
serversTransports: make(map[string]*dynamic.TCPServersTransport),
spiffeX509Source: spiffeX509Source,
}
}
// Update updates the dialers configurations.
// Update updates the TCP serversTransport configurations.
func (d *DialerManager) Update(configs map[string]*dynamic.TCPServersTransport) {
d.rtLock.Lock()
defer d.rtLock.Unlock()
d.serversTransportsMu.Lock()
defer d.serversTransportsMu.Unlock()
d.dialers = make(map[string]Dialer)
d.dialersTLS = make(map[string]Dialer)
for configName, config := range configs {
if err := d.createDialers(configName, config); err != nil {
log.Debug().
Str("dialer", configName).
Err(err).
Msg("Create TCP Dialer")
}
}
d.serversTransports = configs
}
// Get gets a dialer by name.
func (d *DialerManager) Get(name string, tls bool) (Dialer, error) {
if len(name) == 0 {
name = "default@internal"
// Build builds a dialer by name.
func (d *DialerManager) Build(config *dynamic.TCPServersLoadBalancer, isTLS bool) (Dialer, error) {
name := "default@internal"
if config.ServersTransport != "" {
name = config.ServersTransport
}
d.rtLock.RLock()
defer d.rtLock.RUnlock()
if tls {
if rt, ok := d.dialersTLS[name]; ok {
return rt, nil
}
return nil, fmt.Errorf("TCP dialer not found %s", name)
var st *dynamic.TCPServersTransport
d.serversTransportsMu.RLock()
st, ok := d.serversTransports[name]
d.serversTransportsMu.RUnlock()
if !ok || st == nil {
return nil, fmt.Errorf("no transport configuration found for %q", name)
}
if rt, ok := d.dialers[name]; ok {
return rt, nil
// Handle TerminationDelay and ProxyProtocol deprecated options.
var terminationDelay ptypes.Duration
if config.TerminationDelay != nil {
terminationDelay = ptypes.Duration(*config.TerminationDelay)
}
proxyProtocol := config.ProxyProtocol
if config.ServersTransport != "" {
terminationDelay = st.TerminationDelay
proxyProtocol = st.ProxyProtocol
}
return nil, fmt.Errorf("TCP dialer not found %s", name)
}
// createDialers creates the dialers according to the TCPServersTransport configuration.
func (d *DialerManager) createDialers(name string, cfg *dynamic.TCPServersTransport) error {
if cfg == nil {
return errors.New("no transport configuration given")
}
dialer := &net.Dialer{
Timeout: time.Duration(cfg.DialTimeout),
KeepAlive: time.Duration(cfg.DialKeepAlive),
if proxyProtocol != nil && (proxyProtocol.Version < 1 || proxyProtocol.Version > 2) {
return nil, fmt.Errorf("unknown proxyProtocol version: %d", proxyProtocol.Version)
}
var tlsConfig *tls.Config
if cfg.TLS != nil {
if cfg.TLS.Spiffe != nil {
if st.TLS != nil {
if st.TLS.Spiffe != nil {
if d.spiffeX509Source == nil {
return errors.New("SPIFFE is enabled for this transport, but not configured")
return nil, errors.New("SPIFFE is enabled for this transport, but not configured")
}
authorizer, err := buildSpiffeAuthorizer(cfg.TLS.Spiffe)
authorizer, err := buildSpiffeAuthorizer(st.TLS.Spiffe)
if err != nil {
return fmt.Errorf("unable to build SPIFFE authorizer: %w", err)
return nil, fmt.Errorf("unable to build SPIFFE authorizer: %w", err)
}
tlsConfig = tlsconfig.MTLSClientConfig(d.spiffeX509Source, d.spiffeX509Source, authorizer)
}
if cfg.TLS.InsecureSkipVerify || len(cfg.TLS.RootCAs) > 0 || len(cfg.TLS.ServerName) > 0 || len(cfg.TLS.Certificates) > 0 || cfg.TLS.PeerCertURI != "" {
if st.TLS.InsecureSkipVerify || len(st.TLS.RootCAs) > 0 || len(st.TLS.ServerName) > 0 || len(st.TLS.Certificates) > 0 || st.TLS.PeerCertURI != "" {
if tlsConfig != nil {
return errors.New("TLS and SPIFFE configuration cannot be defined at the same time")
return nil, errors.New("TLS and SPIFFE configuration cannot be defined at the same time")
}
tlsConfig = &tls.Config{
ServerName: cfg.TLS.ServerName,
InsecureSkipVerify: cfg.TLS.InsecureSkipVerify,
RootCAs: createRootCACertPool(cfg.TLS.RootCAs),
Certificates: cfg.TLS.Certificates.GetCertificates(),
ServerName: st.TLS.ServerName,
InsecureSkipVerify: st.TLS.InsecureSkipVerify,
RootCAs: createRootCACertPool(st.TLS.RootCAs),
Certificates: st.TLS.Certificates.GetCertificates(),
}
if cfg.TLS.PeerCertURI != "" {
if st.TLS.PeerCertURI != "" {
tlsConfig.VerifyPeerCertificate = func(rawCerts [][]byte, _ [][]*x509.Certificate) error {
return traefiktls.VerifyPeerCertificate(cfg.TLS.PeerCertURI, tlsConfig, rawCerts)
return traefiktls.VerifyPeerCertificate(st.TLS.PeerCertURI, tlsConfig, rawCerts)
}
}
}
}
tlsDialer := &tls.Dialer{
NetDialer: dialer,
Config: tlsConfig,
dialer := tcpDialer{
dialer: &net.Dialer{
Timeout: time.Duration(st.DialTimeout),
KeepAlive: time.Duration(st.DialKeepAlive),
},
terminationDelay: time.Duration(terminationDelay),
proxyProtocol: proxyProtocol,
}
d.dialers[name] = tcpDialer{dialer, time.Duration(cfg.TerminationDelay)}
d.dialersTLS[name] = tcpDialer{tlsDialer, time.Duration(cfg.TerminationDelay)}
return nil
if !isTLS {
return dialer, nil
}
return tcpTLSDialer{dialer, tlsConfig}, nil
}
func createRootCACertPool(rootCAs []types.FileOrContent) *x509.CertPool {

View file

@ -7,6 +7,7 @@ import (
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
"errors"
"io"
"math/big"
"net"
@ -14,6 +15,7 @@ import (
"testing"
"time"
"github.com/pires/go-proxyproto"
"github.com/spiffe/go-spiffe/v2/bundle/x509bundle"
"github.com/spiffe/go-spiffe/v2/spiffeid"
"github.com/spiffe/go-spiffe/v2/spiffetls/tlsconfig"
@ -131,7 +133,7 @@ func TestConflictingConfig(t *testing.T) {
dialerManager.Update(dynamicConf)
_, err := dialerManager.Get("test", false)
_, err := dialerManager.Build(&dynamic.TCPServersLoadBalancer{ServersTransport: "test"}, false)
require.Error(t, err)
}
@ -140,7 +142,7 @@ func TestNoTLS(t *testing.T) {
require.NoError(t, err)
defer backendListener.Close()
go fakeRedis(t, backendListener)
go fakeServer(t, backendListener)
_, port, err := net.SplitHostPort(backendListener.Addr().String())
require.NoError(t, err)
@ -155,7 +157,7 @@ func TestNoTLS(t *testing.T) {
dialerManager.Update(dynamicConf)
dialer, err := dialerManager.Get("test", false)
dialer, err := dialerManager.Build(&dynamic.TCPServersLoadBalancer{ServersTransport: "test"}, false)
require.NoError(t, err)
conn, err := dialer.Dial("tcp", ":"+port)
@ -186,7 +188,7 @@ func TestTLS(t *testing.T) {
tlsListener := tls.NewListener(backendListener, &tls.Config{Certificates: []tls.Certificate{cert}})
defer tlsListener.Close()
go fakeRedis(t, tlsListener)
go fakeServer(t, tlsListener)
_, port, err := net.SplitHostPort(tlsListener.Addr().String())
require.NoError(t, err)
@ -204,7 +206,7 @@ func TestTLS(t *testing.T) {
dialerManager.Update(dynamicConf)
dialer, err := dialerManager.Get("test", true)
dialer, err := dialerManager.Build(&dynamic.TCPServersLoadBalancer{ServersTransport: "test"}, true)
require.NoError(t, err)
conn, err := dialer.Dial("tcp", ":"+port)
@ -236,7 +238,7 @@ func TestTLSWithInsecureSkipVerify(t *testing.T) {
tlsListener := tls.NewListener(backendListener, &tls.Config{Certificates: []tls.Certificate{cert}})
defer tlsListener.Close()
go fakeRedis(t, tlsListener)
go fakeServer(t, tlsListener)
_, port, err := net.SplitHostPort(tlsListener.Addr().String())
require.NoError(t, err)
@ -255,7 +257,7 @@ func TestTLSWithInsecureSkipVerify(t *testing.T) {
dialerManager.Update(dynamicConf)
dialer, err := dialerManager.Get("test", true)
dialer, err := dialerManager.Build(&dynamic.TCPServersLoadBalancer{ServersTransport: "test"}, true)
require.NoError(t, err)
conn, err := dialer.Dial("tcp", ":"+port)
@ -297,7 +299,7 @@ func TestMTLS(t *testing.T) {
})
defer tlsListener.Close()
go fakeRedis(t, tlsListener)
go fakeServer(t, tlsListener)
_, port, err := net.SplitHostPort(tlsListener.Addr().String())
require.NoError(t, err)
@ -324,7 +326,7 @@ func TestMTLS(t *testing.T) {
dialerManager.Update(dynamicConf)
dialer, err := dialerManager.Get("test", true)
dialer, err := dialerManager.Build(&dynamic.TCPServersLoadBalancer{ServersTransport: "test"}, true)
require.NoError(t, err)
conn, err := dialer.Dial("tcp", ":"+port)
@ -444,7 +446,7 @@ func TestSpiffeMTLS(t *testing.T) {
for _, test := range testCases {
t.Run(test.desc, func(t *testing.T) {
go fakeRedis(t, tlsListener)
go fakeServer(t, tlsListener)
dialerManager := NewDialerManager(test.clientSource)
@ -458,7 +460,7 @@ func TestSpiffeMTLS(t *testing.T) {
dialerManager.Update(dynamicConf)
dialer, err := dialerManager.Get("test", true)
dialer, err := dialerManager.Build(&dynamic.TCPServersLoadBalancer{ServersTransport: "test"}, true)
require.NoError(t, err)
conn, err := dialer.Dial("tcp", ":"+port)
@ -487,6 +489,226 @@ func TestSpiffeMTLS(t *testing.T) {
}
}
func TestProxyProtocol(t *testing.T) {
testCases := []struct {
desc string
version int
}{
{
desc: "proxy protocol v1",
version: 1,
},
{
desc: "proxy protocol v2",
version: 2,
},
}
for _, test := range testCases {
t.Run(test.desc, func(t *testing.T) {
backendListener, err := net.Listen("tcp", ":0")
require.NoError(t, err)
var version int
proxyBackendListener := proxyproto.Listener{
Listener: backendListener,
ValidateHeader: func(h *proxyproto.Header) error {
version = int(h.Version)
return nil
},
Policy: func(upstream net.Addr) (proxyproto.Policy, error) {
switch test.version {
case 1, 2:
return proxyproto.USE, nil
default:
return proxyproto.REQUIRE, errors.New("unsupported version")
}
},
}
defer proxyBackendListener.Close()
go fakeServer(t, &proxyBackendListener)
_, port, err := net.SplitHostPort(backendListener.Addr().String())
require.NoError(t, err)
dialerManager := NewDialerManager(nil)
dialerManager.Update(map[string]*dynamic.TCPServersTransport{
"test": {
ProxyProtocol: &dynamic.ProxyProtocol{
Version: test.version,
},
},
})
dialer, err := dialerManager.Build(&dynamic.TCPServersLoadBalancer{ServersTransport: "test"}, false)
require.NoError(t, err)
conn, err := dialer.Dial("tcp", ":"+port)
require.NoError(t, err)
defer conn.Close()
_, err = conn.Write([]byte("ping"))
require.NoError(t, err)
buf := make([]byte, 64)
n, err := conn.Read(buf)
require.NoError(t, err)
assert.Equal(t, 4, n)
assert.Equal(t, "PONG", string(buf[:4]))
assert.Equal(t, test.version, version)
})
}
}
func TestProxyProtocolWithTLS(t *testing.T) {
testCases := []struct {
desc string
version int
}{
{
desc: "proxy protocol v1",
version: 1,
},
{
desc: "proxy protocol v2",
version: 2,
},
}
for _, test := range testCases {
t.Run(test.desc, func(t *testing.T) {
cert, err := tls.X509KeyPair(LocalhostCert, LocalhostKey)
require.NoError(t, err)
backendListener, err := net.Listen("tcp", ":0")
require.NoError(t, err)
var version int
proxyBackendListener := proxyproto.Listener{
Listener: backendListener,
ValidateHeader: func(h *proxyproto.Header) error {
version = int(h.Version)
return nil
},
Policy: func(upstream net.Addr) (proxyproto.Policy, error) {
switch test.version {
case 1, 2:
return proxyproto.USE, nil
default:
return proxyproto.REQUIRE, errors.New("unsupported version")
}
},
}
defer proxyBackendListener.Close()
go func() {
conn, err := proxyBackendListener.Accept()
require.NoError(t, err)
defer conn.Close()
// Now wrap with TLS and perform handshake
tlsConn := tls.Server(conn, &tls.Config{Certificates: []tls.Certificate{cert}})
defer tlsConn.Close()
err = tlsConn.Handshake()
require.NoError(t, err)
buf := make([]byte, 64)
n, err := tlsConn.Read(buf)
require.NoError(t, err)
if bytes.Equal(buf[:n], []byte("ping")) {
_, _ = tlsConn.Write([]byte("PONG"))
}
}()
_, port, err := net.SplitHostPort(backendListener.Addr().String())
require.NoError(t, err)
dialerManager := NewDialerManager(nil)
dialerManager.Update(map[string]*dynamic.TCPServersTransport{
"test": {
TLS: &dynamic.TLSClientConfig{
ServerName: "example.com",
RootCAs: []types.FileOrContent{types.FileOrContent(LocalhostCert)},
InsecureSkipVerify: true,
},
ProxyProtocol: &dynamic.ProxyProtocol{
Version: test.version,
},
},
})
dialer, err := dialerManager.Build(&dynamic.TCPServersLoadBalancer{
ServersTransport: "test",
}, true)
require.NoError(t, err)
conn, err := dialer.Dial("tcp", ":"+port)
require.NoError(t, err)
defer conn.Close()
_, err = conn.Write([]byte("ping"))
require.NoError(t, err)
buf := make([]byte, 64)
n, err := conn.Read(buf)
require.NoError(t, err)
assert.Equal(t, 4, n)
assert.Equal(t, "PONG", string(buf[:4]))
assert.Equal(t, test.version, version)
})
}
}
func TestProxyProtocolDisabled(t *testing.T) {
backendListener, err := net.Listen("tcp", ":0")
require.NoError(t, err)
defer backendListener.Close()
go func() {
conn, err := backendListener.Accept()
require.NoError(t, err)
defer conn.Close()
buf := make([]byte, 64)
n, err := conn.Read(buf)
require.NoError(t, err)
if bytes.Equal(buf[:n], []byte("ping")) {
_, _ = conn.Write([]byte("PONG"))
}
}()
_, port, err := net.SplitHostPort(backendListener.Addr().String())
require.NoError(t, err)
// No proxy protocol configuration.
dialerManager := NewDialerManager(nil)
dialerManager.Update(map[string]*dynamic.TCPServersTransport{
"test": {},
})
dialer, err := dialerManager.Build(&dynamic.TCPServersLoadBalancer{ServersTransport: "test"}, false)
require.NoError(t, err)
conn, err := dialer.Dial("tcp", ":"+port)
require.NoError(t, err)
_, err = conn.Write([]byte("ping"))
require.NoError(t, err)
buf := make([]byte, 64)
n, err := conn.Read(buf)
require.NoError(t, err)
assert.Equal(t, 4, n)
assert.Equal(t, "PONG", string(buf[:4]))
}
// fakeSpiffePKI simulates a SPIFFE aware PKI and allows generating multiple valid SVIDs.
type fakeSpiffePKI struct {
caPrivateKey *rsa.PrivateKey

View file

@ -2,34 +2,25 @@ package tcp
import (
"errors"
"fmt"
"io"
"net"
"syscall"
"time"
"github.com/pires/go-proxyproto"
"github.com/rs/zerolog/log"
"github.com/traefik/traefik/v3/pkg/config/dynamic"
)
// Proxy forwards a TCP request to a TCP service.
type Proxy struct {
address string
proxyProtocol *dynamic.ProxyProtocol
dialer Dialer
address string
dialer Dialer
}
// NewProxy creates a new Proxy.
func NewProxy(address string, proxyProtocol *dynamic.ProxyProtocol, dialer Dialer) (*Proxy, error) {
if proxyProtocol != nil && (proxyProtocol.Version < 1 || proxyProtocol.Version > 2) {
return nil, fmt.Errorf("unknown proxyProtocol version: %d", proxyProtocol.Version)
}
func NewProxy(address string, dialer Dialer) (*Proxy, error) {
return &Proxy{
address: address,
proxyProtocol: proxyProtocol,
dialer: dialer,
address: address,
dialer: dialer,
}, nil
}
@ -53,14 +44,6 @@ func (p *Proxy) ServeTCP(conn WriteCloser) {
defer connBackend.Close()
errChan := make(chan error)
if p.proxyProtocol != nil && p.proxyProtocol.Version > 0 && p.proxyProtocol.Version < 3 {
header := proxyproto.HeaderProxyFromAddrs(byte(p.proxyProtocol.Version), conn.RemoteAddr(), conn.LocalAddr())
if _, err := header.WriteTo(connBackend); err != nil {
log.Error().Err(err).Msg("Error while writing TCP proxy protocol headers to backend connection")
return
}
}
go p.connCopy(conn, connBackend, errChan)
go p.connCopy(connBackend, conn, errChan)

View file

@ -2,59 +2,25 @@ package tcp
import (
"bytes"
"errors"
"io"
"net"
"testing"
"time"
"github.com/pires/go-proxyproto"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/traefik/traefik/v3/pkg/config/dynamic"
)
func fakeRedis(t *testing.T, listener net.Listener) {
t.Helper()
for {
conn, err := listener.Accept()
require.NoError(t, err)
for {
withErr := false
buf := make([]byte, 64)
if _, err := conn.Read(buf); err != nil {
withErr = true
}
if string(buf[:4]) == "ping" {
time.Sleep(1 * time.Millisecond)
if _, err := conn.Write([]byte("PONG")); err != nil {
_ = conn.Close()
return
}
}
if withErr {
_ = conn.Close()
return
}
}
}
}
func TestCloseWrite(t *testing.T) {
backendListener, err := net.Listen("tcp", ":0")
require.NoError(t, err)
go fakeRedis(t, backendListener)
go fakeServer(t, backendListener)
_, port, err := net.SplitHostPort(backendListener.Addr().String())
require.NoError(t, err)
dialer := tcpDialer{&net.Dialer{}, 10 * time.Millisecond}
dialer := tcpDialer{&net.Dialer{}, 10 * time.Millisecond, nil}
proxy, err := NewProxy(":"+port, nil, dialer)
proxy, err := NewProxy(":"+port, dialer)
require.NoError(t, err)
proxyListener, err := net.Listen("tcp", ":0")
@ -84,90 +50,37 @@ func TestCloseWrite(t *testing.T) {
buffer := bytes.NewBuffer(buf)
n, err := io.Copy(buffer, conn)
require.NoError(t, err)
require.Equal(t, int64(4), n)
require.Equal(t, "PONG", buffer.String())
}
func TestProxyProtocol(t *testing.T) {
testCases := []struct {
desc string
version int
}{
{
desc: "PROXY protocol v1",
version: 1,
},
{
desc: "PROXY protocol v2",
version: 2,
},
}
func fakeServer(t *testing.T, listener net.Listener) {
t.Helper()
for _, test := range testCases {
t.Run(test.desc, func(t *testing.T) {
backendListener, err := net.Listen("tcp", ":0")
require.NoError(t, err)
for {
conn, err := listener.Accept()
require.NoError(t, err)
var version int
proxyBackendListener := proxyproto.Listener{
Listener: backendListener,
ValidateHeader: func(h *proxyproto.Header) error {
version = int(h.Version)
return nil
},
Policy: func(upstream net.Addr) (proxyproto.Policy, error) {
switch test.version {
case 1, 2:
return proxyproto.USE, nil
default:
return proxyproto.REQUIRE, errors.New("unsupported version")
}
},
for {
withErr := false
buf := make([]byte, 64)
if _, err := conn.Read(buf); err != nil {
withErr = true
}
defer proxyBackendListener.Close()
go fakeRedis(t, &proxyBackendListener)
_, port, err := net.SplitHostPort(proxyBackendListener.Addr().String())
require.NoError(t, err)
dialer := tcpDialer{&net.Dialer{}, 10 * time.Millisecond}
proxy, err := NewProxy(":"+port, &dynamic.ProxyProtocol{Version: test.version}, dialer)
require.NoError(t, err)
proxyListener, err := net.Listen("tcp", ":0")
require.NoError(t, err)
go func() {
for {
conn, err := proxyListener.Accept()
require.NoError(t, err)
proxy.ServeTCP(conn.(*net.TCPConn))
if string(buf[:4]) == "ping" {
time.Sleep(1 * time.Millisecond)
if _, err := conn.Write([]byte("PONG")); err != nil {
_ = conn.Close()
return
}
}()
}
_, port, err = net.SplitHostPort(proxyListener.Addr().String())
require.NoError(t, err)
conn, err := net.Dial("tcp", ":"+port)
require.NoError(t, err)
_, err = conn.Write([]byte("ping\n"))
require.NoError(t, err)
err = conn.(*net.TCPConn).CloseWrite()
require.NoError(t, err)
var buf []byte
buffer := bytes.NewBuffer(buf)
n, err := io.Copy(buffer, conn)
require.NoError(t, err)
assert.Equal(t, int64(4), n)
assert.Equal(t, "PONG", buffer.String())
assert.Equal(t, test.version, version)
})
if withErr {
_ = conn.Close()
return
}
}
}
}

View file

@ -127,8 +127,8 @@ func NewTracer(tracer trace.Tracer, capturedRequestHeaders, capturedResponseHead
return &Tracer{
Tracer: tracer,
safeQueryParams: safeQueryParams,
capturedRequestHeaders: capturedRequestHeaders,
capturedResponseHeaders: capturedResponseHeaders,
capturedRequestHeaders: canonicalizeHeaders(capturedRequestHeaders),
capturedResponseHeaders: canonicalizeHeaders(capturedResponseHeaders),
}
}
@ -346,3 +346,18 @@ func defaultStatus(code int) (codes.Code, string) {
}
return codes.Unset, ""
}
// canonicalizeHeaders converts a slice of header keys to their canonical form.
// It uses http.CanonicalHeaderKey to ensure that the headers are in a consistent format.
func canonicalizeHeaders(headers []string) []string {
if headers == nil {
return nil
}
canonicalHeaders := make([]string, len(headers))
for i, header := range headers {
canonicalHeaders[i] = http.CanonicalHeaderKey(header)
}
return canonicalHeaders
}

View file

@ -17,6 +17,7 @@ import (
"github.com/traefik/traefik/v3/pkg/types"
"go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
)
func Test_safeFullURL(t *testing.T) {
@ -414,3 +415,52 @@ func TestTracerProvider(t *testing.T) {
span.TracerProvider().Tracer("github.com/traefik/traefik")
span.TracerProvider().Tracer("other")
}
// TestNewTracer_HeadersCanonicalization tests that NewTracer properly canonicalizes headers.
func TestNewTracer_HeadersCanonicalization(t *testing.T) {
testCases := []struct {
desc string
inputHeaders []string
expectedCanonicalHeaders []string
}{
{
desc: "Empty headers",
inputHeaders: []string{},
expectedCanonicalHeaders: []string{},
},
{
desc: "Already canonical headers",
inputHeaders: []string{"Content-Type", "User-Agent", "Accept-Encoding"},
expectedCanonicalHeaders: []string{"Content-Type", "User-Agent", "Accept-Encoding"},
},
{
desc: "Lowercase headers",
inputHeaders: []string{"content-type", "user-agent", "accept-encoding"},
expectedCanonicalHeaders: []string{"Content-Type", "User-Agent", "Accept-Encoding"},
},
{
desc: "Mixed case headers",
inputHeaders: []string{"CoNtEnT-tYpE", "uSeR-aGeNt", "aCcEpT-eNcOdInG"},
expectedCanonicalHeaders: []string{"Content-Type", "User-Agent", "Accept-Encoding"},
},
}
for _, test := range testCases {
t.Run(test.desc, func(t *testing.T) {
t.Parallel()
// Create a mock tracer using a no-op tracer from OpenTelemetry
mockTracer := noop.NewTracerProvider().Tracer("test")
// Test capturedRequestHeaders
tracer := NewTracer(mockTracer, test.inputHeaders, nil, nil)
assert.Equal(t, test.expectedCanonicalHeaders, tracer.capturedRequestHeaders)
assert.Nil(t, tracer.capturedResponseHeaders)
// Test capturedResponseHeaders
tracer = NewTracer(mockTracer, nil, test.inputHeaders, nil)
assert.Equal(t, test.expectedCanonicalHeaders, tracer.capturedResponseHeaders)
assert.Nil(t, tracer.capturedRequestHeaders)
})
}
}

View file

@ -58,7 +58,7 @@ func (l *TraefikLog) SetDefaults() {
// AccessLog holds the configuration settings for the access logger (middlewares/accesslog).
type AccessLog struct {
FilePath string `description:"Access log file path. Stdout is used when omitted or empty." json:"filePath,omitempty" toml:"filePath,omitempty" yaml:"filePath,omitempty"`
Format string `description:"Access log format: json | common" json:"format,omitempty" toml:"format,omitempty" yaml:"format,omitempty" export:"true"`
Format string `description:"Access log format: json, common, or genericCLF" json:"format,omitempty" toml:"format,omitempty" yaml:"format,omitempty" export:"true"`
Filters *AccessLogFilters `description:"Access log filters, used to keep only specific access logs." json:"filters,omitempty" toml:"filters,omitempty" yaml:"filters,omitempty" export:"true"`
Fields *AccessLogFields `description:"AccessLogFields." json:"fields,omitempty" toml:"fields,omitempty" yaml:"fields,omitempty" export:"true"`
BufferingSize int64 `description:"Number of access log lines to process in a buffered way." json:"bufferingSize,omitempty" toml:"bufferingSize,omitempty" yaml:"bufferingSize,omitempty" export:"true"`