Merge branch v3.3 into master

This commit is contained in:
kevinpollet 2025-02-19 10:48:30 +01:00
commit f0849e8ee6
No known key found for this signature in database
GPG key ID: 0C9A5DDD1B292453
23 changed files with 470 additions and 191 deletions

View file

@ -11,7 +11,6 @@ const (
ServiceName = "serviceName"
MetricsProviderName = "metricsProviderName"
TracingProviderName = "tracingProviderName"
ServerName = "serverName"
ServerIndex = "serverIndex"
TLSStoreName = "tlsStoreName"
ServersTransportName = "serversTransport"

View file

@ -132,7 +132,7 @@ func RegisterOpenTelemetry(ctx context.Context, config *types.OTLP) Registry {
"How many HTTP requests with TLS processed on an entrypoint, partitioned by TLS Version and TLS cipher Used.")
reg.entryPointReqDurationHistogram, _ = NewHistogramWithScale(newOTLPHistogramFrom(meter, entryPointReqDurationName,
"How long it took to process the request on an entrypoint, partitioned by status code, protocol, and method.",
"ms"), time.Second)
"s"), time.Second)
reg.entryPointReqsBytesCounter = newOTLPCounterFrom(meter, entryPointReqsBytesTotalName,
"The total size of requests in bytes handled by an entrypoint, partitioned by status code, protocol, and method.")
reg.entryPointRespsBytesCounter = newOTLPCounterFrom(meter, entryPointRespsBytesTotalName,
@ -146,7 +146,7 @@ func RegisterOpenTelemetry(ctx context.Context, config *types.OTLP) Registry {
"How many HTTP requests with TLS are processed on a router, partitioned by service, TLS Version, and TLS cipher Used.")
reg.routerReqDurationHistogram, _ = NewHistogramWithScale(newOTLPHistogramFrom(meter, routerReqDurationName,
"How long it took to process the request on a router, partitioned by service, status code, protocol, and method.",
"ms"), time.Second)
"s"), time.Second)
reg.routerReqsBytesCounter = newOTLPCounterFrom(meter, routerReqsBytesTotalName,
"The total size of requests in bytes handled by a router, partitioned by status code, protocol, and method.")
reg.routerRespsBytesCounter = newOTLPCounterFrom(meter, routerRespsBytesTotalName,
@ -160,7 +160,7 @@ func RegisterOpenTelemetry(ctx context.Context, config *types.OTLP) Registry {
"How many HTTP requests with TLS processed on a service, partitioned by TLS version and TLS cipher.")
reg.serviceReqDurationHistogram, _ = NewHistogramWithScale(newOTLPHistogramFrom(meter, serviceReqDurationName,
"How long it took to process the request on a service, partitioned by status code, protocol, and method.",
"ms"), time.Second)
"s"), time.Second)
reg.serviceRetriesCounter = newOTLPCounterFrom(meter, serviceRetriesTotalName,
"How many request retries happened on a service.")
reg.serviceServerUpGauge = newOTLPGaugeFrom(meter, serviceServerUpName,

View file

@ -376,7 +376,7 @@ func TestOpenTelemetry(t *testing.T) {
expectedEntryPoints := []string{
`({"name":"traefik_entrypoint_requests_total","description":"How many HTTP requests processed on an entrypoint, partitioned by status code, protocol, and method.","unit":"1","sum":{"dataPoints":\[{"attributes":\[{"key":"code","value":{"stringValue":"200"}},{"key":"entrypoint","value":{"stringValue":"test1"}},{"key":"method","value":{"stringValue":"GET"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":1}\],"aggregationTemporality":2,"isMonotonic":true}})`,
`({"name":"traefik_entrypoint_requests_tls_total","description":"How many HTTP requests with TLS processed on an entrypoint, partitioned by TLS Version and TLS cipher Used.","unit":"1","sum":{"dataPoints":\[{"attributes":\[{"key":"entrypoint","value":{"stringValue":"test2"}},{"key":"tls_cipher","value":{"stringValue":"bar"}},{"key":"tls_version","value":{"stringValue":"foo"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":1}\],"aggregationTemporality":2,"isMonotonic":true}})`,
`({"name":"traefik_entrypoint_request_duration_seconds","description":"How long it took to process the request on an entrypoint, partitioned by status code, protocol, and method.","unit":"ms","histogram":{"dataPoints":\[{"attributes":\[{"key":"entrypoint","value":{"stringValue":"test3"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","count":"1","sum":10000,"bucketCounts":\["0","0","0","0","0","0","0","0","0","0","0","0","0","0","1"\],"explicitBounds":\[0.005,0.01,0.025,0.05,0.075,0.1,0.25,0.5,0.75,1,2.5,5,7.5,10\],"min":10000,"max":10000}\],"aggregationTemporality":2}})`,
`({"name":"traefik_entrypoint_request_duration_seconds","description":"How long it took to process the request on an entrypoint, partitioned by status code, protocol, and method.","unit":"s","histogram":{"dataPoints":\[{"attributes":\[{"key":"entrypoint","value":{"stringValue":"test3"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","count":"1","sum":10000,"bucketCounts":\["0","0","0","0","0","0","0","0","0","0","0","0","0","0","1"\],"explicitBounds":\[0.005,0.01,0.025,0.05,0.075,0.1,0.25,0.5,0.75,1,2.5,5,7.5,10\],"min":10000,"max":10000}\],"aggregationTemporality":2}})`,
`({"name":"traefik_entrypoint_requests_bytes_total","description":"The total size of requests in bytes handled by an entrypoint, partitioned by status code, protocol, and method.","unit":"1","sum":{"dataPoints":\[{"attributes":\[{"key":"code","value":{"stringValue":"200"}},{"key":"entrypoint","value":{"stringValue":"test1"}},{"key":"method","value":{"stringValue":"GET"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":1}\],"aggregationTemporality":2,"isMonotonic":true}})`,
`({"name":"traefik_entrypoint_responses_bytes_total","description":"The total size of responses in bytes handled by an entrypoint, partitioned by status code, protocol, and method.","unit":"1","sum":{"dataPoints":\[{"attributes":\[{"key":"code","value":{"stringValue":"200"}},{"key":"entrypoint","value":{"stringValue":"test1"}},{"key":"method","value":{"stringValue":"GET"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":1}\],"aggregationTemporality":2,"isMonotonic":true}})`,
}
@ -392,7 +392,7 @@ func TestOpenTelemetry(t *testing.T) {
expectedRouters := []string{
`({"name":"traefik_router_requests_total","description":"How many HTTP requests are processed on a router, partitioned by service, status code, protocol, and method.","unit":"1","sum":{"dataPoints":\[{"attributes":\[{"key":"code","value":{"stringValue":"(?:200|404)"}},{"key":"method","value":{"stringValue":"GET"}},{"key":"router","value":{"stringValue":"RouterReqsCounter"}},{"key":"service","value":{"stringValue":"test"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":1},{"attributes":\[{"key":"code","value":{"stringValue":"(?:200|404)"}},{"key":"method","value":{"stringValue":"GET"}},{"key":"router","value":{"stringValue":"RouterReqsCounter"}},{"key":"service","value":{"stringValue":"test"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":1}\],"aggregationTemporality":2,"isMonotonic":true}})`,
`({"name":"traefik_router_requests_tls_total","description":"How many HTTP requests with TLS are processed on a router, partitioned by service, TLS Version, and TLS cipher Used.","unit":"1","sum":{"dataPoints":\[{"attributes":\[{"key":"router","value":{"stringValue":"demo"}},{"key":"service","value":{"stringValue":"test"}},{"key":"tls_cipher","value":{"stringValue":"bar"}},{"key":"tls_version","value":{"stringValue":"foo"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":1}\],"aggregationTemporality":2,"isMonotonic":true}})`,
`({"name":"traefik_router_request_duration_seconds","description":"How long it took to process the request on a router, partitioned by service, status code, protocol, and method.","unit":"ms","histogram":{"dataPoints":\[{"attributes":\[{"key":"code","value":{"stringValue":"200"}},{"key":"router","value":{"stringValue":"demo"}},{"key":"service","value":{"stringValue":"test"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","count":"1","sum":10000,"bucketCounts":\["0","0","0","0","0","0","0","0","0","0","0","0","0","0","1"\],"explicitBounds":\[0.005,0.01,0.025,0.05,0.075,0.1,0.25,0.5,0.75,1,2.5,5,7.5,10\],"min":10000,"max":10000}\],"aggregationTemporality":2}})`,
`({"name":"traefik_router_request_duration_seconds","description":"How long it took to process the request on a router, partitioned by service, status code, protocol, and method.","unit":"s","histogram":{"dataPoints":\[{"attributes":\[{"key":"code","value":{"stringValue":"200"}},{"key":"router","value":{"stringValue":"demo"}},{"key":"service","value":{"stringValue":"test"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","count":"1","sum":10000,"bucketCounts":\["0","0","0","0","0","0","0","0","0","0","0","0","0","0","1"\],"explicitBounds":\[0.005,0.01,0.025,0.05,0.075,0.1,0.25,0.5,0.75,1,2.5,5,7.5,10\],"min":10000,"max":10000}\],"aggregationTemporality":2}})`,
`({"name":"traefik_router_requests_bytes_total","description":"The total size of requests in bytes handled by a router, partitioned by status code, protocol, and method.","unit":"1","sum":{"dataPoints":\[{"attributes":\[{"key":"code","value":{"stringValue":"404"}},{"key":"method","value":{"stringValue":"GET"}},{"key":"router","value":{"stringValue":"RouterReqsCounter"}},{"key":"service","value":{"stringValue":"test"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":1}\],"aggregationTemporality":2,"isMonotonic":true}})`,
`({"name":"traefik_router_responses_bytes_total","description":"The total size of responses in bytes handled by a router, partitioned by status code, protocol, and method.","unit":"1","sum":{"dataPoints":\[{"attributes":\[{"key":"code","value":{"stringValue":"404"}},{"key":"method","value":{"stringValue":"GET"}},{"key":"router","value":{"stringValue":"RouterReqsCounter"}},{"key":"service","value":{"stringValue":"test"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":1}\],"aggregationTemporality":2,"isMonotonic":true}})`,
}
@ -409,7 +409,7 @@ func TestOpenTelemetry(t *testing.T) {
expectedServices := []string{
`({"name":"traefik_service_requests_total","description":"How many HTTP requests processed on a service, partitioned by status code, protocol, and method.","unit":"1","sum":{"dataPoints":\[{"attributes":\[{"key":"code","value":{"stringValue":"(?:200|404)"}},{"key":"method","value":{"stringValue":"GET"}},{"key":"service","value":{"stringValue":"ServiceReqsCounter"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":1},{"attributes":\[{"key":"code","value":{"stringValue":"(?:200|404)"}},{"key":"method","value":{"stringValue":"GET"}},{"key":"service","value":{"stringValue":"ServiceReqsCounter"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":1}\],"aggregationTemporality":2,"isMonotonic":true}})`,
`({"name":"traefik_service_requests_tls_total","description":"How many HTTP requests with TLS processed on a service, partitioned by TLS version and TLS cipher.","unit":"1","sum":{"dataPoints":\[{"attributes":\[{"key":"service","value":{"stringValue":"test"}},{"key":"tls_cipher","value":{"stringValue":"bar"}},{"key":"tls_version","value":{"stringValue":"foo"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":1}\],"aggregationTemporality":2,"isMonotonic":true}})`,
`({"name":"traefik_service_request_duration_seconds","description":"How long it took to process the request on a service, partitioned by status code, protocol, and method.","unit":"ms","histogram":{"dataPoints":\[{"attributes":\[{"key":"code","value":{"stringValue":"200"}},{"key":"service","value":{"stringValue":"test"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","count":"1","sum":10000,"bucketCounts":\["0","0","0","0","0","0","0","0","0","0","0","0","0","0","1"\],"explicitBounds":\[0.005,0.01,0.025,0.05,0.075,0.1,0.25,0.5,0.75,1,2.5,5,7.5,10\],"min":10000,"max":10000}\],"aggregationTemporality":2}})`,
`({"name":"traefik_service_request_duration_seconds","description":"How long it took to process the request on a service, partitioned by status code, protocol, and method.","unit":"s","histogram":{"dataPoints":\[{"attributes":\[{"key":"code","value":{"stringValue":"200"}},{"key":"service","value":{"stringValue":"test"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","count":"1","sum":10000,"bucketCounts":\["0","0","0","0","0","0","0","0","0","0","0","0","0","0","1"\],"explicitBounds":\[0.005,0.01,0.025,0.05,0.075,0.1,0.25,0.5,0.75,1,2.5,5,7.5,10\],"min":10000,"max":10000}\],"aggregationTemporality":2}})`,
`({"name":"traefik_service_server_up","description":"service server is up, described by gauge value of 0 or 1.","unit":"1","gauge":{"dataPoints":\[{"attributes":\[{"key":"service","value":{"stringValue":"test"}},{"key":"url","value":{"stringValue":"http://127.0.0.1"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":1}\]}})`,
`({"name":"traefik_service_requests_bytes_total","description":"The total size of requests in bytes received by a service, partitioned by status code, protocol, and method.","unit":"1","sum":{"dataPoints":\[{"attributes":\[{"key":"code","value":{"stringValue":"404"}},{"key":"method","value":{"stringValue":"GET"}},{"key":"service","value":{"stringValue":"ServiceReqsCounter"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":1}\],"aggregationTemporality":2,"isMonotonic":true}})`,
`({"name":"traefik_service_responses_bytes_total","description":"The total size of responses in bytes returned by a service, partitioned by status code, protocol, and method.","unit":"1","sum":{"dataPoints":\[{"attributes":\[{"key":"code","value":{"stringValue":"404"}},{"key":"method","value":{"stringValue":"GET"}},{"key":"service","value":{"stringValue":"ServiceReqsCounter"}}\],"startTimeUnixNano":"[\d]{19}","timeUnixNano":"[\d]{19}","asDouble":1}\],"aggregationTemporality":2,"isMonotonic":true}})`,

View file

@ -215,34 +215,20 @@ func (c *conn) handleResponse(r rwWithUpgrade) error {
return nil
}
hasContentLength := len(res.Header.Peek("Content-Length")) > 0
if hasContentLength && res.Header.ContentLength() == 0 {
return nil
}
// When a body is not allowed for a given status code the body is ignored.
// The connection will be marked as broken by the next Peek in the readloop.
if !isBodyAllowedForStatus(res.StatusCode()) {
return nil
}
if !hasContentLength {
b := c.bufferPool.Get()
if b == nil {
b = make([]byte, bufferSize)
}
defer c.bufferPool.Put(b)
if _, err := io.CopyBuffer(r.RW, c.br, b); err != nil {
return err
}
contentLength := res.Header.ContentLength()
if contentLength == 0 {
return nil
}
// Chunked response, Content-Length is set to -1 by FastProxy when "Transfer-Encoding: chunked" header is received.
if res.Header.ContentLength() == -1 {
if contentLength == -1 {
cbr := httputil.NewChunkedReader(c.br)
b := c.bufferPool.Get()
@ -282,6 +268,23 @@ func (c *conn) handleResponse(r rwWithUpgrade) error {
return nil
}
// Response without Content-Length header.
// The message body length is determined by the number of bytes received prior to the server closing the connection.
if contentLength == -2 {
b := c.bufferPool.Get()
if b == nil {
b = make([]byte, bufferSize)
}
defer c.bufferPool.Put(b)
if _, err := io.CopyBuffer(r.RW, c.br, b); err != nil {
return err
}
return nil
}
// Response with a valid Content-Length header.
brl := c.limitedReaderPool.Get()
if brl == nil {
brl = &io.LimitedReader{}

View file

@ -171,6 +171,7 @@ func (p *ReverseProxy) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
if reqUpType != "" {
outReq.Header.Set("Connection", "Upgrade")
outReq.Header.Set("Upgrade", reqUpType)
if strings.EqualFold(reqUpType, "websocket") {
cleanWebSocketHeaders(&outReq.Header)
}
@ -351,7 +352,7 @@ func isGraphic(s string) bool {
type fasthttpHeader interface {
Peek(key string) []byte
Set(key string, value string)
SetBytesV(key string, value []byte)
SetCanonical(key []byte, value []byte)
DelBytes(key []byte)
Del(key string)
ConnectionUpgrade() bool
@ -382,18 +383,33 @@ func fixPragmaCacheControl(header fasthttpHeader) {
// Sec-WebSocket-Protocol and Sec-WebSocket-Version to be case-sensitive.
// https://tools.ietf.org/html/rfc6455#page-20
func cleanWebSocketHeaders(headers fasthttpHeader) {
headers.SetBytesV("Sec-WebSocket-Key", headers.Peek("Sec-Websocket-Key"))
headers.Del("Sec-Websocket-Key")
secWebsocketKey := headers.Peek("Sec-Websocket-Key")
if len(secWebsocketKey) > 0 {
headers.SetCanonical([]byte("Sec-WebSocket-Key"), secWebsocketKey)
headers.Del("Sec-Websocket-Key")
}
headers.SetBytesV("Sec-WebSocket-Extensions", headers.Peek("Sec-Websocket-Extensions"))
headers.Del("Sec-Websocket-Extensions")
secWebsocketExtensions := headers.Peek("Sec-Websocket-Extensions")
if len(secWebsocketExtensions) > 0 {
headers.SetCanonical([]byte("Sec-WebSocket-Extensions"), secWebsocketExtensions)
headers.Del("Sec-Websocket-Extensions")
}
headers.SetBytesV("Sec-WebSocket-Accept", headers.Peek("Sec-Websocket-Accept"))
headers.Del("Sec-Websocket-Accept")
secWebsocketAccept := headers.Peek("Sec-Websocket-Accept")
if len(secWebsocketAccept) > 0 {
headers.SetCanonical([]byte("Sec-WebSocket-Accept"), secWebsocketAccept)
headers.Del("Sec-Websocket-Accept")
}
headers.SetBytesV("Sec-WebSocket-Protocol", headers.Peek("Sec-Websocket-Protocol"))
headers.Del("Sec-Websocket-Protocol")
secWebsocketProtocol := headers.Peek("Sec-Websocket-Protocol")
if len(secWebsocketProtocol) > 0 {
headers.SetCanonical([]byte("Sec-WebSocket-Protocol"), secWebsocketProtocol)
headers.Del("Sec-Websocket-Protocol")
}
headers.SetBytesV("Sec-WebSocket-Version", headers.Peek("Sec-Websocket-Version"))
headers.DelBytes([]byte("Sec-Websocket-Version"))
secWebsocketVersion := headers.Peek("Sec-Websocket-Version")
if len(secWebsocketVersion) > 0 {
headers.SetCanonical([]byte("Sec-WebSocket-Version"), secWebsocketVersion)
headers.Del("Sec-Websocket-Version")
}
}

View file

@ -306,7 +306,7 @@ func TestHeadRequest(t *testing.T) {
assert.Equal(t, http.StatusOK, res.Code)
}
func TestNoContentLengthResponse(t *testing.T) {
func TestNoContentLength(t *testing.T) {
backendListener, err := net.Listen("tcp", ":0")
require.NoError(t, err)
@ -346,6 +346,45 @@ func TestNoContentLengthResponse(t *testing.T) {
assert.Equal(t, "foo", res.Body.String())
}
func TestTransferEncodingChunked(t *testing.T) {
backendServer := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
flusher, ok := rw.(http.Flusher)
require.True(t, ok)
for i := range 3 {
_, err := rw.Write([]byte(fmt.Sprintf("chunk %d\n", i)))
require.NoError(t, err)
flusher.Flush()
}
}))
t.Cleanup(backendServer.Close)
builder := NewProxyBuilder(&transportManagerMock{}, static.FastProxyConfig{})
proxyHandler, err := builder.Build("", testhelpers.MustParseURL(backendServer.URL), true, true)
require.NoError(t, err)
proxyServer := httptest.NewServer(proxyHandler)
t.Cleanup(proxyServer.Close)
req, err := http.NewRequest(http.MethodGet, proxyServer.URL, http.NoBody)
require.NoError(t, err)
res, err := http.DefaultClient.Do(req)
require.NoError(t, err)
t.Cleanup(func() { _ = res.Body.Close() })
assert.Equal(t, http.StatusOK, res.StatusCode)
assert.Equal(t, []string{"chunked"}, res.TransferEncoding)
body, err := io.ReadAll(res.Body)
require.NoError(t, err)
assert.Equal(t, "chunk 0\nchunk 1\nchunk 2\n", string(body))
}
func newCertificate(t *testing.T, domain string) *tls.Certificate {
t.Helper()

View file

@ -18,9 +18,12 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/traefik/traefik/v3/pkg/testhelpers"
"github.com/valyala/fasthttp"
"golang.org/x/net/websocket"
)
const dialTimeout = time.Second
func TestWebSocketUpgradeCase(t *testing.T) {
srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
challengeKey := r.Header.Get("Sec-Websocket-Key")
@ -49,6 +52,31 @@ func TestWebSocketUpgradeCase(t *testing.T) {
conn.Close()
}
func TestCleanWebSocketHeaders(t *testing.T) {
// Asserts that no headers are sent if the request contain anything.
req := fasthttp.AcquireRequest()
defer fasthttp.ReleaseRequest(req)
cleanWebSocketHeaders(&req.Header)
want := "GET / HTTP/1.1\r\n\r\n"
assert.Equal(t, want, req.Header.String())
// Asserts that the Sec-WebSocket-* is enforced.
req.Reset()
req.Header.Set("Sec-Websocket-Key", "key")
req.Header.Set("Sec-Websocket-Extensions", "extensions")
req.Header.Set("Sec-Websocket-Accept", "accept")
req.Header.Set("Sec-Websocket-Protocol", "protocol")
req.Header.Set("Sec-Websocket-Version", "version")
cleanWebSocketHeaders(&req.Header)
want = "GET / HTTP/1.1\r\nSec-WebSocket-Key: key\r\nSec-WebSocket-Extensions: extensions\r\nSec-WebSocket-Accept: accept\r\nSec-WebSocket-Protocol: protocol\r\nSec-WebSocket-Version: version\r\n\r\n"
assert.Equal(t, want, req.Header.String())
}
func TestWebSocketTCPClose(t *testing.T) {
errChan := make(chan error, 1)
upgrader := gorillawebsocket.Upgrader{}
@ -535,29 +563,6 @@ func TestForwardsWebsocketTraffic(t *testing.T) {
assert.Equal(t, "ok", resp)
}
func createTLSWebsocketServer() *httptest.Server {
upgrader := gorillawebsocket.Upgrader{}
srv := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
defer conn.Close()
for {
mt, message, err := conn.ReadMessage()
if err != nil {
break
}
err = conn.WriteMessage(mt, message)
if err != nil {
break
}
}
}))
return srv
}
func TestWebSocketTransferTLSConfig(t *testing.T) {
srv := createTLSWebsocketServer()
defer srv.Close()
@ -592,7 +597,28 @@ func TestWebSocketTransferTLSConfig(t *testing.T) {
assert.Equal(t, "ok", resp)
}
const dialTimeout = time.Second
func createTLSWebsocketServer() *httptest.Server {
upgrader := gorillawebsocket.Upgrader{}
srv := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
defer conn.Close()
for {
mt, message, err := conn.ReadMessage()
if err != nil {
break
}
err = conn.WriteMessage(mt, message)
if err != nil {
break
}
}
}))
return srv
}
type websocketRequestOpt func(w *websocketRequest)

View file

@ -70,7 +70,9 @@ func directorBuilder(target *url.URL, passHostHeader bool, preservePath bool) fu
outReq.Host = outReq.URL.Host
}
cleanWebSocketHeaders(outReq)
if isWebSocketUpgrade(outReq) {
cleanWebSocketHeaders(outReq)
}
}
}
@ -79,10 +81,6 @@ func directorBuilder(target *url.URL, passHostHeader bool, preservePath bool) fu
// Sec-WebSocket-Protocol and Sec-WebSocket-Version to be case-sensitive.
// https://tools.ietf.org/html/rfc6455#page-20
func cleanWebSocketHeaders(req *http.Request) {
if !isWebSocketUpgrade(req) {
return
}
req.Header["Sec-WebSocket-Key"] = req.Header["Sec-Websocket-Key"]
delete(req.Header, "Sec-Websocket-Key")

View file

@ -2,6 +2,7 @@ package httputil
import (
"bufio"
"bytes"
"crypto/tls"
"errors"
"fmt"
@ -18,6 +19,8 @@ import (
"golang.org/x/net/websocket"
)
const dialTimeout = time.Second
func TestWebSocketTCPClose(t *testing.T) {
errChan := make(chan error, 1)
upgrader := gorillawebsocket.Upgrader{}
@ -419,28 +422,6 @@ func TestForwardsWebsocketTraffic(t *testing.T) {
assert.Equal(t, "ok", resp)
}
func createTLSWebsocketServer() *httptest.Server {
upgrader := gorillawebsocket.Upgrader{}
srv := httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
defer conn.Close()
for {
mt, message, err := conn.ReadMessage()
if err != nil {
break
}
err = conn.WriteMessage(mt, message)
if err != nil {
break
}
}
}))
return srv
}
func TestWebSocketTransferTLSConfig(t *testing.T) {
srv := createTLSWebsocketServer()
defer srv.Close()
@ -495,7 +476,58 @@ func TestWebSocketTransferTLSConfig(t *testing.T) {
assert.Equal(t, "ok", resp)
}
const dialTimeout = time.Second
func TestCleanWebSocketHeaders(t *testing.T) {
// Asserts that no headers are sent if the request contain anything.
req := httptest.NewRequest(http.MethodGet, "/", http.NoBody)
req.Header.Del("User-Agent")
cleanWebSocketHeaders(req)
b := bytes.NewBuffer(nil)
err := req.Header.Write(b)
require.NoError(t, err)
assert.Empty(t, b)
// Asserts that the Sec-WebSocket-* is enforced.
req.Header.Set("Sec-Websocket-Key", "key")
req.Header.Set("Sec-Websocket-Extensions", "extensions")
req.Header.Set("Sec-Websocket-Accept", "accept")
req.Header.Set("Sec-Websocket-Protocol", "protocol")
req.Header.Set("Sec-Websocket-Version", "version")
cleanWebSocketHeaders(req)
want := http.Header{
"Sec-WebSocket-Key": {"key"},
"Sec-WebSocket-Extensions": {"extensions"},
"Sec-WebSocket-Accept": {"accept"},
"Sec-WebSocket-Protocol": {"protocol"},
"Sec-WebSocket-Version": {"version"},
}
assert.Equal(t, want, req.Header)
}
func createTLSWebsocketServer() *httptest.Server {
var upgrader gorillawebsocket.Upgrader
return httptest.NewTLSServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
return
}
defer conn.Close()
for {
mt, message, err := conn.ReadMessage()
if err != nil {
break
}
err = conn.WriteMessage(mt, message)
if err != nil {
break
}
}
}))
}
type websocketRequestOpt func(w *websocketRequest)

View file

@ -3,6 +3,8 @@ package wrr
import (
"container/heap"
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"hash/fnv"
"net/http"
@ -15,9 +17,10 @@ import (
type namedHandler struct {
http.Handler
name string
weight float64
deadline float64
name string
hashedName string
weight float64
deadline float64
}
type stickyCookie struct {
@ -53,9 +56,10 @@ type Balancer struct {
handlersMu sync.RWMutex
// References all the handlers by name and also by the hashed value of the name.
handlerMap map[string]*namedHandler
handlers []*namedHandler
curDeadline float64
stickyMap map[string]*namedHandler
compatibilityStickyMap map[string]*namedHandler
handlers []*namedHandler
curDeadline float64
// status is a record of which child services of the Balancer are healthy, keyed
// by name of child service. A service is initially added to the map when it is
// created via Add, and it is later removed or added to the map as needed,
@ -73,7 +77,6 @@ func New(sticky *dynamic.Sticky, wantHealthCheck bool) *Balancer {
balancer := &Balancer{
status: make(map[string]struct{}),
fenced: make(map[string]struct{}),
handlerMap: make(map[string]*namedHandler),
wantsHealthCheck: wantHealthCheck,
}
if sticky != nil && sticky.Cookie != nil {
@ -88,6 +91,9 @@ func New(sticky *dynamic.Sticky, wantHealthCheck bool) *Balancer {
if sticky.Cookie.Path != nil {
balancer.stickyCookie.path = *sticky.Cookie.Path
}
balancer.stickyMap = make(map[string]*namedHandler)
balancer.compatibilityStickyMap = make(map[string]*namedHandler)
}
return balancer
@ -218,7 +224,7 @@ func (b *Balancer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if err == nil && cookie != nil {
b.handlersMu.RLock()
handler, ok := b.handlerMap[cookie.Value]
handler, ok := b.stickyMap[cookie.Value]
b.handlersMu.RUnlock()
if ok && handler != nil {
@ -230,6 +236,22 @@ func (b *Balancer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
return
}
}
b.handlersMu.RLock()
handler, ok = b.compatibilityStickyMap[cookie.Value]
b.handlersMu.RUnlock()
if ok && handler != nil {
b.handlersMu.RLock()
_, isHealthy := b.status[handler.name]
b.handlersMu.RUnlock()
if isHealthy {
b.writeStickyCookie(w, handler)
handler.ServeHTTP(w, req)
return
}
}
}
}
@ -244,21 +266,25 @@ func (b *Balancer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
if b.stickyCookie != nil {
cookie := &http.Cookie{
Name: b.stickyCookie.name,
Value: hash(server.name),
Path: b.stickyCookie.path,
HttpOnly: b.stickyCookie.httpOnly,
Secure: b.stickyCookie.secure,
SameSite: convertSameSite(b.stickyCookie.sameSite),
MaxAge: b.stickyCookie.maxAge,
}
http.SetCookie(w, cookie)
b.writeStickyCookie(w, server)
}
server.ServeHTTP(w, req)
}
func (b *Balancer) writeStickyCookie(w http.ResponseWriter, handler *namedHandler) {
cookie := &http.Cookie{
Name: b.stickyCookie.name,
Value: handler.hashedName,
Path: b.stickyCookie.path,
HttpOnly: b.stickyCookie.httpOnly,
Secure: b.stickyCookie.secure,
SameSite: convertSameSite(b.stickyCookie.sameSite),
MaxAge: b.stickyCookie.maxAge,
}
http.SetCookie(w, cookie)
}
// Add adds a handler.
// A handler with a non-positive weight is ignored.
func (b *Balancer) Add(name string, handler http.Handler, weight *int, fenced bool) {
@ -280,15 +306,41 @@ func (b *Balancer) Add(name string, handler http.Handler, weight *int, fenced bo
if fenced {
b.fenced[name] = struct{}{}
}
b.handlerMap[name] = h
b.handlerMap[hash(name)] = h
if b.stickyCookie != nil {
sha256HashedName := sha256Hash(name)
h.hashedName = sha256HashedName
b.stickyMap[sha256HashedName] = h
b.compatibilityStickyMap[name] = h
hashedName := fnvHash(name)
b.compatibilityStickyMap[hashedName] = h
// server.URL was fnv hashed in service.Manager
// so we can have "double" fnv hash in already existing cookies
hashedName = fnvHash(hashedName)
b.compatibilityStickyMap[hashedName] = h
}
b.handlersMu.Unlock()
}
func hash(input string) string {
func fnvHash(input string) string {
hasher := fnv.New64()
// We purposely ignore the error because the implementation always returns nil.
_, _ = hasher.Write([]byte(input))
return strconv.FormatUint(hasher.Sum64(), 16)
}
func sha256Hash(input string) string {
hash := sha256.New()
// We purposely ignore the error because the implementation always returns nil.
_, _ = hash.Write([]byte(input))
hashedInput := hex.EncodeToString(hash.Sum(nil))
if len(hashedInput) < 16 {
return hashedInput
}
return hashedInput[:16]
}

View file

@ -296,7 +296,7 @@ func TestSticky_FallBack(t *testing.T) {
rw.WriteHeader(http.StatusOK)
}), pointer(2), false)
recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}}
recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}, cookies: make(map[string]*http.Cookie)}
req := httptest.NewRequest(http.MethodGet, "/", nil)
req.AddCookie(&http.Cookie{Name: "test", Value: "second"})
@ -373,7 +373,7 @@ func TestSticky_Fenced(t *testing.T) {
rw.WriteHeader(http.StatusOK)
}), pointer(1), true)
recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}}
recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}, cookies: make(map[string]*http.Cookie)}
stickyReq := httptest.NewRequest(http.MethodGet, "/", nil)
stickyReq.AddCookie(&http.Cookie{Name: "test", Value: "fenced"})
@ -391,3 +391,99 @@ func TestSticky_Fenced(t *testing.T) {
assert.Equal(t, 2, recorder.save["first"])
assert.Equal(t, 2, recorder.save["second"])
}
func TestStickyWithCompatibility(t *testing.T) {
testCases := []struct {
desc string
servers []string
cookies []*http.Cookie
expectedCookies []*http.Cookie
expectedServer string
}{
{
desc: "No previous cookie",
servers: []string{"first"},
expectedServer: "first",
expectedCookies: []*http.Cookie{
{Name: "test", Value: sha256Hash("first")},
},
},
{
desc: "Sha256 previous cookie",
servers: []string{"first", "second"},
cookies: []*http.Cookie{
{Name: "test", Value: sha256Hash("first")},
},
expectedServer: "first",
expectedCookies: []*http.Cookie{},
},
{
desc: "Raw previous cookie",
servers: []string{"first", "second"},
cookies: []*http.Cookie{
{Name: "test", Value: "first"},
},
expectedServer: "first",
expectedCookies: []*http.Cookie{
{Name: "test", Value: sha256Hash("first")},
},
},
{
desc: "Fnv previous cookie",
servers: []string{"first", "second"},
cookies: []*http.Cookie{
{Name: "test", Value: fnvHash("first")},
},
expectedServer: "first",
expectedCookies: []*http.Cookie{
{Name: "test", Value: sha256Hash("first")},
},
},
{
desc: "Double fnv previous cookie",
servers: []string{"first", "second"},
cookies: []*http.Cookie{
{Name: "test", Value: fnvHash(fnvHash("first"))},
},
expectedServer: "first",
expectedCookies: []*http.Cookie{
{Name: "test", Value: sha256Hash("first")},
},
},
}
for _, test := range testCases {
t.Run(test.desc, func(t *testing.T) {
t.Parallel()
balancer := New(&dynamic.Sticky{Cookie: &dynamic.Cookie{Name: "test"}}, false)
for _, server := range test.servers {
balancer.Add(server, http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.WriteHeader(http.StatusOK)
_, _ = rw.Write([]byte(server))
}), pointer(1), false)
}
// Do it twice, to be sure it's not just the luck.
for range 2 {
req := httptest.NewRequest(http.MethodGet, "/", nil)
for _, cookie := range test.cookies {
req.AddCookie(cookie)
}
recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}, cookies: make(map[string]*http.Cookie)}
balancer.ServeHTTP(recorder, req)
assert.Equal(t, test.expectedServer, recorder.Body.String())
assert.Len(t, recorder.cookies, len(test.expectedCookies))
for _, cookie := range test.expectedCookies {
assert.Equal(t, cookie.Value, recorder.cookies[cookie.Name].Value)
}
}
})
}
}

View file

@ -2,11 +2,9 @@ package service
import (
"context"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"hash/fnv"
"math/rand"
"net/http"
"net/url"
@ -335,18 +333,13 @@ func (m *Manager) getLoadBalancerServiceHandler(ctx context.Context, serviceName
lb := wrr.New(service.Sticky, service.HealthCheck != nil)
healthCheckTargets := make(map[string]*url.URL)
for _, server := range shuffle(service.Servers, m.rand) {
hasher := fnv.New64a()
_, _ = hasher.Write([]byte(server.URL)) // this will never return an error.
proxyName := hex.EncodeToString(hasher.Sum(nil))
for i, server := range shuffle(service.Servers, m.rand) {
target, err := url.Parse(server.URL)
if err != nil {
return nil, fmt.Errorf("error parsing server URL %s: %w", server.URL, err)
}
logger.Debug().Str(logs.ServerName, proxyName).Stringer("target", target).
logger.Debug().Int(logs.ServerIndex, i).Str("URL", server.URL).
Msg("Creating server")
qualifiedSvcName := provider.GetQualifiedName(ctx, serviceName)
@ -392,12 +385,12 @@ func (m *Manager) getLoadBalancerServiceHandler(ctx context.Context, serviceName
proxy, _ = capture.Wrap(proxy)
}
lb.Add(proxyName, proxy, server.Weight, server.Fenced)
lb.Add(server.URL, proxy, server.Weight, server.Fenced)
// servers are considered UP by default.
info.UpdateServerStatus(target.String(), runtime.StatusUp)
healthCheckTargets[proxyName] = target
healthCheckTargets[server.URL] = target
}
if service.HealthCheck != nil {

View file

@ -25,7 +25,7 @@ import (
// Backend is an abstraction for tracking backend (OpenTelemetry, ...).
type Backend interface {
Setup(serviceName string, sampleRate float64, globalAttributes map[string]string) (trace.Tracer, io.Closer, error)
Setup(serviceName string, sampleRate float64, resourceAttributes map[string]string) (trace.Tracer, io.Closer, error)
}
// NewTracing Creates a Tracing.
@ -44,7 +44,7 @@ func NewTracing(conf *static.Tracing) (*Tracer, io.Closer, error) {
otel.SetTextMapPropagator(autoprop.NewTextMapPropagator())
tr, closer, err := backend.Setup(conf.ServiceName, conf.SampleRate, conf.GlobalAttributes)
tr, closer, err := backend.Setup(conf.ServiceName, conf.SampleRate, conf.ResourceAttributes)
if err != nil {
return nil, nil, err
}

View file

@ -73,6 +73,7 @@ func TestTracing(t *testing.T) {
desc string
propagators string
headers map[string]string
resourceAttributes map[string]string
wantServiceHeadersFn func(t *testing.T, headers http.Header)
assertFn func(*testing.T, string)
}{
@ -85,6 +86,17 @@ func TestTracing(t *testing.T) {
assert.Regexp(t, `({"key":"service.version","value":{"stringValue":"dev"}})`, trace)
},
},
{
desc: "resource attributes must be propagated",
resourceAttributes: map[string]string{
"service.environment": "custom",
},
assertFn: func(t *testing.T, trace string) {
t.Helper()
assert.Regexp(t, `({"key":"service.environment","value":{"stringValue":"custom"}})`, trace)
},
},
{
desc: "TraceContext propagation",
propagators: "tracecontext",
@ -328,8 +340,9 @@ func TestTracing(t *testing.T) {
})
tracingConfig := &static.Tracing{
ServiceName: "traefik",
SampleRate: 1.0,
ServiceName: "traefik",
SampleRate: 1.0,
ResourceAttributes: test.resourceAttributes,
OTLP: &types.OTelTracing{
HTTP: &types.OTelHTTP{
Endpoint: collector.URL,

View file

@ -36,7 +36,7 @@ func (c *OTelTracing) SetDefaults() {
}
// Setup sets up the tracer.
func (c *OTelTracing) Setup(serviceName string, sampleRate float64, globalAttributes map[string]string) (trace.Tracer, io.Closer, error) {
func (c *OTelTracing) Setup(serviceName string, sampleRate float64, resourceAttributes map[string]string) (trace.Tracer, io.Closer, error) {
var (
err error
exporter *otlptrace.Exporter
@ -55,7 +55,7 @@ func (c *OTelTracing) Setup(serviceName string, sampleRate float64, globalAttrib
semconv.ServiceVersionKey.String(version.Version),
}
for k, v := range globalAttributes {
for k, v := range resourceAttributes {
attr = append(attr, attribute.String(k, v))
}