Merge branch v2.11 into v3.0
This commit is contained in:
commit
a69c1ba3b7
112 changed files with 1133 additions and 238 deletions
29
pkg/server/keep_alive_middleware.go
Normal file
29
pkg/server/keep_alive_middleware.go
Normal file
|
@ -0,0 +1,29 @@
|
|||
package server
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/rs/zerolog/log"
|
||||
ptypes "github.com/traefik/paerser/types"
|
||||
)
|
||||
|
||||
func newKeepAliveMiddleware(next http.Handler, maxRequests int, maxTime ptypes.Duration) http.Handler {
|
||||
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||
state, ok := req.Context().Value(connStateKey).(*connState)
|
||||
if ok {
|
||||
state.HTTPRequestCount++
|
||||
if maxRequests > 0 && state.HTTPRequestCount >= maxRequests {
|
||||
log.Debug().Msg("Close because of too many requests")
|
||||
state.KeepAliveState = "Close because of too many requests"
|
||||
rw.Header().Set("Connection", "close")
|
||||
}
|
||||
if maxTime > 0 && time.Now().After(state.Start.Add(time.Duration(maxTime))) {
|
||||
log.Debug().Msg("Close because of too long connection")
|
||||
state.KeepAliveState = "Close because of too long connection"
|
||||
rw.Header().Set("Connection", "close")
|
||||
}
|
||||
}
|
||||
next.ServeHTTP(rw, req)
|
||||
})
|
||||
}
|
|
@ -3,6 +3,7 @@ package server
|
|||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"expvar"
|
||||
"fmt"
|
||||
stdlog "log"
|
||||
"net"
|
||||
|
@ -36,6 +37,25 @@ import (
|
|||
"golang.org/x/net/http2/h2c"
|
||||
)
|
||||
|
||||
type key string
|
||||
|
||||
const (
|
||||
connStateKey key = "connState"
|
||||
debugConnectionEnv string = "DEBUG_CONNECTION"
|
||||
)
|
||||
|
||||
var (
|
||||
clientConnectionStates = map[string]*connState{}
|
||||
clientConnectionStatesMu = sync.RWMutex{}
|
||||
)
|
||||
|
||||
type connState struct {
|
||||
State string
|
||||
KeepAliveState string
|
||||
Start time.Time
|
||||
HTTPRequestCount int
|
||||
}
|
||||
|
||||
type httpForwarder struct {
|
||||
net.Listener
|
||||
connChan chan net.Conn
|
||||
|
@ -70,6 +90,11 @@ type TCPEntryPoints map[string]*TCPEntryPoint
|
|||
|
||||
// NewTCPEntryPoints creates a new TCPEntryPoints.
|
||||
func NewTCPEntryPoints(entryPointsConfig static.EntryPoints, hostResolverConfig *types.HostResolverConfig, metricsRegistry metrics.Registry) (TCPEntryPoints, error) {
|
||||
if os.Getenv(debugConnectionEnv) != "" {
|
||||
expvar.Publish("clientConnectionStates", expvar.Func(func() any {
|
||||
return clientConnectionStates
|
||||
}))
|
||||
}
|
||||
serverEntryPointsTCP := make(TCPEntryPoints)
|
||||
for entryPointName, config := range entryPointsConfig {
|
||||
protocol, err := config.GetProtocol()
|
||||
|
@ -399,7 +424,12 @@ func (ln tcpKeepAliveListener) Accept() (net.Conn, error) {
|
|||
}
|
||||
|
||||
func buildProxyProtocolListener(ctx context.Context, entryPoint *static.EntryPoint, listener net.Listener) (net.Listener, error) {
|
||||
proxyListener := &proxyproto.Listener{Listener: listener}
|
||||
timeout := entryPoint.Transport.RespondingTimeouts.ReadTimeout
|
||||
// proxyproto use 200ms if ReadHeaderTimeout is set to 0 and not no timeout
|
||||
if timeout == 0 {
|
||||
timeout = -1
|
||||
}
|
||||
proxyListener := &proxyproto.Listener{Listener: listener, ReadHeaderTimeout: time.Duration(timeout)}
|
||||
|
||||
if entryPoint.ProxyProtocol.Insecure {
|
||||
log.Ctx(ctx).Info().Msg("Enabling ProxyProtocol without trusted IPs: Insecure")
|
||||
|
@ -568,6 +598,11 @@ func createHTTPServer(ctx context.Context, ln net.Listener, configuration *stati
|
|||
})
|
||||
}
|
||||
|
||||
debugConnection := os.Getenv(debugConnectionEnv) != ""
|
||||
if debugConnection || (configuration.Transport != nil && (configuration.Transport.KeepAliveMaxTime > 0 || configuration.Transport.KeepAliveMaxRequests > 0)) {
|
||||
handler = newKeepAliveMiddleware(handler, configuration.Transport.KeepAliveMaxRequests, configuration.Transport.KeepAliveMaxTime)
|
||||
}
|
||||
|
||||
serverHTTP := &http.Server{
|
||||
Handler: handler,
|
||||
ErrorLog: stdlog.New(logs.NoLevel(log.Logger, zerolog.DebugLevel), "", 0),
|
||||
|
@ -575,6 +610,27 @@ func createHTTPServer(ctx context.Context, ln net.Listener, configuration *stati
|
|||
WriteTimeout: time.Duration(configuration.Transport.RespondingTimeouts.WriteTimeout),
|
||||
IdleTimeout: time.Duration(configuration.Transport.RespondingTimeouts.IdleTimeout),
|
||||
}
|
||||
if debugConnection || (configuration.Transport != nil && (configuration.Transport.KeepAliveMaxTime > 0 || configuration.Transport.KeepAliveMaxRequests > 0)) {
|
||||
serverHTTP.ConnContext = func(ctx context.Context, c net.Conn) context.Context {
|
||||
cState := &connState{Start: time.Now()}
|
||||
if debugConnection {
|
||||
clientConnectionStatesMu.Lock()
|
||||
clientConnectionStates[getConnKey(c)] = cState
|
||||
clientConnectionStatesMu.Unlock()
|
||||
}
|
||||
return context.WithValue(ctx, connStateKey, cState)
|
||||
}
|
||||
|
||||
if debugConnection {
|
||||
serverHTTP.ConnState = func(c net.Conn, state http.ConnState) {
|
||||
clientConnectionStatesMu.Lock()
|
||||
if clientConnectionStates[getConnKey(c)] != nil {
|
||||
clientConnectionStates[getConnKey(c)].State = state.String()
|
||||
}
|
||||
clientConnectionStatesMu.Unlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ConfigureServer configures HTTP/2 with the MaxConcurrentStreams option for the given server.
|
||||
// Also keeping behavior the same as
|
||||
|
@ -604,6 +660,10 @@ func createHTTPServer(ctx context.Context, ln net.Listener, configuration *stati
|
|||
}, nil
|
||||
}
|
||||
|
||||
func getConnKey(conn net.Conn) string {
|
||||
return fmt.Sprintf("%s => %s", conn.RemoteAddr(), conn.LocalAddr())
|
||||
}
|
||||
|
||||
func newTrackedConnection(conn tcp.WriteCloser, tracker *connectionTracker) *trackedConnection {
|
||||
tracker.AddConnection(conn)
|
||||
return &trackedConnection{
|
||||
|
|
|
@ -230,3 +230,91 @@ func TestReadTimeoutWithFirstByte(t *testing.T) {
|
|||
t.Error("Timeout while read")
|
||||
}
|
||||
}
|
||||
|
||||
func TestKeepAliveMaxRequests(t *testing.T) {
|
||||
epConfig := &static.EntryPointsTransport{}
|
||||
epConfig.SetDefaults()
|
||||
epConfig.KeepAliveMaxRequests = 3
|
||||
|
||||
entryPoint, err := NewTCPEntryPoint(context.Background(), &static.EntryPoint{
|
||||
Address: ":0",
|
||||
Transport: epConfig,
|
||||
ForwardedHeaders: &static.ForwardedHeaders{},
|
||||
HTTP2: &static.HTTP2Config{},
|
||||
}, nil, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
router := &tcprouter.Router{}
|
||||
router.SetHTTPHandler(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||
rw.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
|
||||
conn, err := startEntrypoint(entryPoint, router)
|
||||
require.NoError(t, err)
|
||||
|
||||
http.DefaultClient.Transport = &http.Transport{
|
||||
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
|
||||
return conn, nil
|
||||
},
|
||||
}
|
||||
|
||||
resp, err := http.Get("http://" + entryPoint.listener.Addr().String())
|
||||
require.NoError(t, err)
|
||||
require.False(t, resp.Close)
|
||||
err = resp.Body.Close()
|
||||
require.NoError(t, err)
|
||||
|
||||
resp, err = http.Get("http://" + entryPoint.listener.Addr().String())
|
||||
require.NoError(t, err)
|
||||
require.False(t, resp.Close)
|
||||
err = resp.Body.Close()
|
||||
require.NoError(t, err)
|
||||
|
||||
resp, err = http.Get("http://" + entryPoint.listener.Addr().String())
|
||||
require.NoError(t, err)
|
||||
require.True(t, resp.Close)
|
||||
err = resp.Body.Close()
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
func TestKeepAliveMaxTime(t *testing.T) {
|
||||
epConfig := &static.EntryPointsTransport{}
|
||||
epConfig.SetDefaults()
|
||||
epConfig.KeepAliveMaxTime = ptypes.Duration(time.Millisecond)
|
||||
|
||||
entryPoint, err := NewTCPEntryPoint(context.Background(), &static.EntryPoint{
|
||||
Address: ":0",
|
||||
Transport: epConfig,
|
||||
ForwardedHeaders: &static.ForwardedHeaders{},
|
||||
HTTP2: &static.HTTP2Config{},
|
||||
}, nil, nil)
|
||||
require.NoError(t, err)
|
||||
|
||||
router := &tcprouter.Router{}
|
||||
router.SetHTTPHandler(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||
rw.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
|
||||
conn, err := startEntrypoint(entryPoint, router)
|
||||
require.NoError(t, err)
|
||||
|
||||
http.DefaultClient.Transport = &http.Transport{
|
||||
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
|
||||
return conn, nil
|
||||
},
|
||||
}
|
||||
|
||||
resp, err := http.Get("http://" + entryPoint.listener.Addr().String())
|
||||
require.NoError(t, err)
|
||||
require.False(t, resp.Close)
|
||||
err = resp.Body.Close()
|
||||
require.NoError(t, err)
|
||||
|
||||
time.Sleep(time.Millisecond)
|
||||
|
||||
resp, err = http.Get("http://" + entryPoint.listener.Addr().String())
|
||||
require.NoError(t, err)
|
||||
require.True(t, resp.Close)
|
||||
err = resp.Body.Close()
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
|
|
@ -4,6 +4,8 @@ import (
|
|||
"container/heap"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"hash/fnv"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
|
@ -47,7 +49,9 @@ type Balancer struct {
|
|||
stickyCookie *stickyCookie
|
||||
wantsHealthCheck bool
|
||||
|
||||
mutex sync.RWMutex
|
||||
handlersMu sync.RWMutex
|
||||
// References all the handlers by name and also by the hashed value of the name.
|
||||
handlerMap map[string]*namedHandler
|
||||
handlers []*namedHandler
|
||||
curDeadline float64
|
||||
// status is a record of which child services of the Balancer are healthy, keyed
|
||||
|
@ -64,6 +68,7 @@ type Balancer struct {
|
|||
func New(sticky *dynamic.Sticky, wantHealthCheck bool) *Balancer {
|
||||
balancer := &Balancer{
|
||||
status: make(map[string]struct{}),
|
||||
handlerMap: make(map[string]*namedHandler),
|
||||
wantsHealthCheck: wantHealthCheck,
|
||||
}
|
||||
if sticky != nil && sticky.Cookie != nil {
|
||||
|
@ -74,6 +79,7 @@ func New(sticky *dynamic.Sticky, wantHealthCheck bool) *Balancer {
|
|||
sameSite: sticky.Cookie.SameSite,
|
||||
}
|
||||
}
|
||||
|
||||
return balancer
|
||||
}
|
||||
|
||||
|
@ -111,8 +117,8 @@ func (b *Balancer) Pop() interface{} {
|
|||
// SetStatus sets on the balancer that its given child is now of the given
|
||||
// status. balancerName is only needed for logging purposes.
|
||||
func (b *Balancer) SetStatus(ctx context.Context, childName string, up bool) {
|
||||
b.mutex.Lock()
|
||||
defer b.mutex.Unlock()
|
||||
b.handlersMu.Lock()
|
||||
defer b.handlersMu.Unlock()
|
||||
|
||||
upBefore := len(b.status) > 0
|
||||
|
||||
|
@ -163,8 +169,8 @@ func (b *Balancer) RegisterStatusUpdater(fn func(up bool)) error {
|
|||
var errNoAvailableServer = errors.New("no available server")
|
||||
|
||||
func (b *Balancer) nextServer() (*namedHandler, error) {
|
||||
b.mutex.Lock()
|
||||
defer b.mutex.Unlock()
|
||||
b.handlersMu.Lock()
|
||||
defer b.handlersMu.Unlock()
|
||||
|
||||
if len(b.handlers) == 0 || len(b.status) == 0 {
|
||||
return nil, errNoAvailableServer
|
||||
|
@ -198,22 +204,18 @@ func (b *Balancer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|||
}
|
||||
|
||||
if err == nil && cookie != nil {
|
||||
for _, handler := range b.handlers {
|
||||
if handler.name != cookie.Value {
|
||||
continue
|
||||
}
|
||||
b.handlersMu.RLock()
|
||||
handler, ok := b.handlerMap[cookie.Value]
|
||||
b.handlersMu.RUnlock()
|
||||
|
||||
b.mutex.RLock()
|
||||
_, ok := b.status[handler.name]
|
||||
b.mutex.RUnlock()
|
||||
if !ok {
|
||||
// because we already are in the only iteration that matches the cookie, so none
|
||||
// of the following iterations are going to be a match for the cookie anyway.
|
||||
break
|
||||
if ok && handler != nil {
|
||||
b.handlersMu.RLock()
|
||||
_, isHealthy := b.status[handler.name]
|
||||
b.handlersMu.RUnlock()
|
||||
if isHealthy {
|
||||
handler.ServeHTTP(w, req)
|
||||
return
|
||||
}
|
||||
|
||||
handler.ServeHTTP(w, req)
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -231,7 +233,7 @@ func (b *Balancer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|||
if b.stickyCookie != nil {
|
||||
cookie := &http.Cookie{
|
||||
Name: b.stickyCookie.name,
|
||||
Value: server.name,
|
||||
Value: hash(server.name),
|
||||
Path: "/",
|
||||
HttpOnly: b.stickyCookie.httpOnly,
|
||||
Secure: b.stickyCookie.secure,
|
||||
|
@ -257,9 +259,19 @@ func (b *Balancer) Add(name string, handler http.Handler, weight *int) {
|
|||
|
||||
h := &namedHandler{Handler: handler, name: name, weight: float64(w)}
|
||||
|
||||
b.mutex.Lock()
|
||||
b.handlersMu.Lock()
|
||||
h.deadline = b.curDeadline + 1/h.weight
|
||||
heap.Push(b, h)
|
||||
b.status[name] = struct{}{}
|
||||
b.mutex.Unlock()
|
||||
b.handlerMap[name] = h
|
||||
b.handlerMap[hash(name)] = h
|
||||
b.handlersMu.Unlock()
|
||||
}
|
||||
|
||||
func hash(input string) string {
|
||||
hasher := fnv.New64()
|
||||
// We purposely ignore the error because the implementation always returns nil.
|
||||
_, _ = hasher.Write([]byte(input))
|
||||
|
||||
return fmt.Sprintf("%x", hasher.Sum64())
|
||||
}
|
||||
|
|
|
@ -247,6 +247,8 @@ func TestSticky(t *testing.T) {
|
|||
req := httptest.NewRequest(http.MethodGet, "/", nil)
|
||||
for i := 0; i < 3; i++ {
|
||||
for _, cookie := range recorder.Result().Cookies() {
|
||||
assert.NotContains(t, "test=first", cookie.Value)
|
||||
assert.NotContains(t, "test=second", cookie.Value)
|
||||
req.AddCookie(cookie)
|
||||
}
|
||||
recorder.ResponseRecorder = httptest.NewRecorder()
|
||||
|
@ -261,6 +263,35 @@ func TestSticky(t *testing.T) {
|
|||
assert.Equal(t, http.SameSiteNoneMode, recorder.cookies["test"].SameSite)
|
||||
}
|
||||
|
||||
func TestSticky_FallBack(t *testing.T) {
|
||||
balancer := New(&dynamic.Sticky{
|
||||
Cookie: &dynamic.Cookie{Name: "test"},
|
||||
}, false)
|
||||
|
||||
balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||
rw.Header().Set("server", "first")
|
||||
rw.WriteHeader(http.StatusOK)
|
||||
}), Int(1))
|
||||
|
||||
balancer.Add("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||
rw.Header().Set("server", "second")
|
||||
rw.WriteHeader(http.StatusOK)
|
||||
}), Int(2))
|
||||
|
||||
recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}}
|
||||
|
||||
req := httptest.NewRequest(http.MethodGet, "/", nil)
|
||||
req.AddCookie(&http.Cookie{Name: "test", Value: "second"})
|
||||
for i := 0; i < 3; i++ {
|
||||
recorder.ResponseRecorder = httptest.NewRecorder()
|
||||
|
||||
balancer.ServeHTTP(recorder, req)
|
||||
}
|
||||
|
||||
assert.Equal(t, 0, recorder.save["first"])
|
||||
assert.Equal(t, 3, recorder.save["second"])
|
||||
}
|
||||
|
||||
// TestBalancerBias makes sure that the WRR algorithm spreads elements evenly right from the start,
|
||||
// and that it does not "over-favor" the high-weighted ones with a biased start-up regime.
|
||||
func TestBalancerBias(t *testing.T) {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue