diff --git a/docs/content/routing/providers/kubernetes-crd.md b/docs/content/routing/providers/kubernetes-crd.md index 5d2873e5f..edd3e4a3b 100644 --- a/docs/content/routing/providers/kubernetes-crd.md +++ b/docs/content/routing/providers/kubernetes-crd.md @@ -982,6 +982,9 @@ More information in the dedicated [mirroring](../services/index.md#mirroring-ser As explained in the section about [Sticky sessions](../../services/#sticky-sessions), for stickiness to work all the way, it must be specified at each load-balancing level. +When stickiness is enabled, Traefik uses Kubernetes [serving](https://kubernetes.io/docs/concepts/services-networking/endpoint-slices/#serving) endpoints status to detect and mark servers as fenced. +Fenced servers can still process requests tied to sticky cookies, while they are terminating. + For instance, in the example below, there is a first level of load-balancing because there is a (Weighted Round Robin) load-balancing of the two `whoami` services, and there is a second level because each whoami service is a `replicaset` and is thus handled as a load-balancer of servers. diff --git a/docs/content/routing/providers/kubernetes-ingress.md b/docs/content/routing/providers/kubernetes-ingress.md index efbf3a0de..39993b60a 100644 --- a/docs/content/routing/providers/kubernetes-ingress.md +++ b/docs/content/routing/providers/kubernetes-ingress.md @@ -391,6 +391,11 @@ which in turn will create the resulting routers, services, handlers, etc. traefik.ingress.kubernetes.io/service.sticky.cookie.path: /foobar ``` +## Stickiness and load-balancing + +When stickiness is enabled, Traefik uses Kubernetes [serving](https://kubernetes.io/docs/concepts/services-networking/endpoint-slices/#serving) endpoints status to detect and mark servers as fenced. +Fenced servers can still process requests tied to sticky cookies, while they are terminating. + ## Path Types on Kubernetes 1.18+ If the Kubernetes cluster version is 1.18+, diff --git a/pkg/config/dynamic/http_config.go b/pkg/config/dynamic/http_config.go index c92b1328a..7655036b0 100644 --- a/pkg/config/dynamic/http_config.go +++ b/pkg/config/dynamic/http_config.go @@ -257,6 +257,7 @@ type Server struct { URL string `json:"url,omitempty" toml:"url,omitempty" yaml:"url,omitempty" label:"-"` Weight *int `json:"weight,omitempty" toml:"weight,omitempty" yaml:"weight,omitempty" label:"weight" export:"true"` PreservePath bool `json:"preservePath,omitempty" toml:"preservePath,omitempty" yaml:"preservePath,omitempty" label:"-" export:"true"` + Fenced bool `json:"fenced,omitempty" toml:"-" yaml:"-" label:"-" file:"-" kv:"-"` Scheme string `json:"-" toml:"-" yaml:"-" file:"-"` Port string `json:"-" toml:"-" yaml:"-" file:"-"` } diff --git a/pkg/provider/kubernetes/crd/kubernetes_http.go b/pkg/provider/kubernetes/crd/kubernetes_http.go index c76a8ad3f..fb972a6c0 100644 --- a/pkg/provider/kubernetes/crd/kubernetes_http.go +++ b/pkg/provider/kubernetes/crd/kubernetes_http.go @@ -13,9 +13,11 @@ import ( "github.com/traefik/traefik/v3/pkg/logs" "github.com/traefik/traefik/v3/pkg/provider" traefikv1alpha1 "github.com/traefik/traefik/v3/pkg/provider/kubernetes/crd/traefikio/v1alpha1" + "github.com/traefik/traefik/v3/pkg/provider/kubernetes/k8s" "github.com/traefik/traefik/v3/pkg/tls" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/utils/ptr" ) const ( @@ -544,7 +546,7 @@ func (c configBuilder) loadServers(parentNamespace string, svc traefikv1alpha1.L } for _, endpoint := range endpointSlice.Endpoints { - if endpoint.Conditions.Ready == nil || !*endpoint.Conditions.Ready { + if !k8s.EndpointServing(endpoint) { continue } @@ -555,7 +557,8 @@ func (c configBuilder) loadServers(parentNamespace string, svc traefikv1alpha1.L addresses[address] = struct{}{} servers = append(servers, dynamic.Server{ - URL: fmt.Sprintf("%s://%s", protocol, net.JoinHostPort(address, strconv.Itoa(int(port)))), + URL: fmt.Sprintf("%s://%s", protocol, net.JoinHostPort(address, strconv.Itoa(int(port)))), + Fenced: ptr.Deref(endpoint.Conditions.Serving, false), }) } } diff --git a/pkg/provider/kubernetes/crd/kubernetes_test.go b/pkg/provider/kubernetes/crd/kubernetes_test.go index bc0ff82cc..29856eef1 100644 --- a/pkg/provider/kubernetes/crd/kubernetes_test.go +++ b/pkg/provider/kubernetes/crd/kubernetes_test.go @@ -4737,6 +4737,14 @@ func TestLoadIngressRoutes(t *testing.T) { { URL: "http://10.10.0.2:80", }, + { + URL: "http://10.10.0.3:80", + Fenced: true, + }, + { + URL: "http://10.10.0.4:80", + Fenced: true, + }, { URL: "http://10.10.0.5:80", }, diff --git a/pkg/provider/kubernetes/ingress/convert_test.go b/pkg/provider/kubernetes/ingress/convert_test.go index 285a6cb40..96cbc36fd 100644 --- a/pkg/provider/kubernetes/ingress/convert_test.go +++ b/pkg/provider/kubernetes/ingress/convert_test.go @@ -18,7 +18,7 @@ func Test_convertSlice_corev1_to_networkingv1(t *testing.T) { { Port: 123, Protocol: "https", - Error: ptr("test"), + Error: pointer("test"), }, }, }, @@ -35,7 +35,7 @@ func Test_convertSlice_corev1_to_networkingv1(t *testing.T) { { Port: 123, Protocol: "https", - Error: ptr("test"), + Error: pointer("test"), }, }, }, @@ -52,7 +52,7 @@ func Test_convert(t *testing.T) { { Port: 123, Protocol: "https", - Error: ptr("test"), + Error: pointer("test"), }, }, } @@ -67,14 +67,10 @@ func Test_convert(t *testing.T) { { Port: 123, Protocol: "https", - Error: ptr("test"), + Error: pointer("test"), }, }, } assert.Equal(t, expected, actual) } - -func ptr[T any](v T) *T { - return &v -} diff --git a/pkg/provider/kubernetes/ingress/kubernetes.go b/pkg/provider/kubernetes/ingress/kubernetes.go index 08f945511..ad53bb5eb 100644 --- a/pkg/provider/kubernetes/ingress/kubernetes.go +++ b/pkg/provider/kubernetes/ingress/kubernetes.go @@ -30,6 +30,7 @@ import ( corev1 "k8s.io/api/core/v1" netv1 "k8s.io/api/networking/v1" "k8s.io/apimachinery/pkg/labels" + "k8s.io/utils/ptr" ) const ( @@ -587,7 +588,7 @@ func (p *Provider) loadService(client Client, namespace string, backend netv1.In protocol := getProtocol(portSpec, portName, svcConfig) for _, endpoint := range endpointSlice.Endpoints { - if endpoint.Conditions.Ready == nil || !*endpoint.Conditions.Ready { + if !k8s.EndpointServing(endpoint) { continue } @@ -598,7 +599,8 @@ func (p *Provider) loadService(client Client, namespace string, backend netv1.In addresses[address] = struct{}{} svc.LoadBalancer.Servers = append(svc.LoadBalancer.Servers, dynamic.Server{ - URL: fmt.Sprintf("%s://%s", protocol, net.JoinHostPort(address, strconv.Itoa(int(port)))), + URL: fmt.Sprintf("%s://%s", protocol, net.JoinHostPort(address, strconv.Itoa(int(port)))), + Fenced: ptr.Deref(endpoint.Conditions.Serving, false), }) } } diff --git a/pkg/provider/kubernetes/k8s/endpoint.go b/pkg/provider/kubernetes/k8s/endpoint.go new file mode 100644 index 000000000..415613de3 --- /dev/null +++ b/pkg/provider/kubernetes/k8s/endpoint.go @@ -0,0 +1,11 @@ +package k8s + +import ( + v1 "k8s.io/api/discovery/v1" + "k8s.io/utils/ptr" +) + +// EndpointServing returns true if the endpoint is still serving the service. +func EndpointServing(endpoint v1.Endpoint) bool { + return ptr.Deref(endpoint.Conditions.Ready, false) || ptr.Deref(endpoint.Conditions.Serving, false) +} diff --git a/pkg/provider/kubernetes/k8s/endpoint_test.go b/pkg/provider/kubernetes/k8s/endpoint_test.go new file mode 100644 index 000000000..9eba33f3a --- /dev/null +++ b/pkg/provider/kubernetes/k8s/endpoint_test.go @@ -0,0 +1,75 @@ +package k8s + +import ( + "testing" + + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/discovery/v1" +) + +func TestEndpointServing(t *testing.T) { + tests := []struct { + name string + endpoint v1.Endpoint + want bool + }{ + { + name: "no status", + endpoint: v1.Endpoint{ + Conditions: v1.EndpointConditions{ + Ready: nil, + Serving: nil, + }, + }, + want: false, + }, + { + name: "ready", + endpoint: v1.Endpoint{ + Conditions: v1.EndpointConditions{ + Ready: pointer(true), + Serving: nil, + }, + }, + want: true, + }, + { + name: "not ready", + endpoint: v1.Endpoint{ + Conditions: v1.EndpointConditions{ + Ready: pointer(false), + Serving: nil, + }, + }, + want: false, + }, + { + name: "not ready and serving", + endpoint: v1.Endpoint{ + Conditions: v1.EndpointConditions{ + Ready: pointer(false), + Serving: pointer(true), + }, + }, + want: true, + }, + { + name: "not ready and not serving", + endpoint: v1.Endpoint{ + Conditions: v1.EndpointConditions{ + Ready: pointer(false), + Serving: pointer(false), + }, + }, + want: false, + }, + } + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + got := EndpointServing(test.endpoint) + assert.Equal(t, test.want, got) + }) + } +} + +func pointer[T any](v T) *T { return &v } diff --git a/pkg/server/server_entrypoint_listenconfig_other_test.go b/pkg/server/server_entrypoint_listenconfig_other_test.go index f2c736d26..b143db1c6 100644 --- a/pkg/server/server_entrypoint_listenconfig_other_test.go +++ b/pkg/server/server_entrypoint_listenconfig_other_test.go @@ -4,7 +4,6 @@ package server import ( "context" - "net" "testing" "github.com/stretchr/testify/require" diff --git a/pkg/server/service/loadbalancer/wrr/wrr.go b/pkg/server/service/loadbalancer/wrr/wrr.go index 3d258877b..72ce19ff1 100644 --- a/pkg/server/service/loadbalancer/wrr/wrr.go +++ b/pkg/server/service/loadbalancer/wrr/wrr.go @@ -64,12 +64,15 @@ type Balancer struct { // updaters is the list of hooks that are run (to update the Balancer // parent(s)), whenever the Balancer status changes. updaters []func(bool) + // fenced is the list of terminating yet still serving child services. + fenced map[string]struct{} } // New creates a new load balancer. func New(sticky *dynamic.Sticky, wantHealthCheck bool) *Balancer { balancer := &Balancer{ status: make(map[string]struct{}), + fenced: make(map[string]struct{}), handlerMap: make(map[string]*namedHandler), wantsHealthCheck: wantHealthCheck, } @@ -179,7 +182,7 @@ func (b *Balancer) nextServer() (*namedHandler, error) { b.handlersMu.Lock() defer b.handlersMu.Unlock() - if len(b.handlers) == 0 || len(b.status) == 0 { + if len(b.handlers) == 0 || len(b.status) == 0 || len(b.fenced) == len(b.handlers) { return nil, errNoAvailableServer } @@ -194,7 +197,10 @@ func (b *Balancer) nextServer() (*namedHandler, error) { heap.Push(b, handler) if _, ok := b.status[handler.name]; ok { - break + if _, ok := b.fenced[handler.name]; !ok { + // do not select a fenced handler. + break + } } } @@ -255,7 +261,7 @@ func (b *Balancer) ServeHTTP(w http.ResponseWriter, req *http.Request) { // Add adds a handler. // A handler with a non-positive weight is ignored. -func (b *Balancer) Add(name string, handler http.Handler, weight *int) { +func (b *Balancer) Add(name string, handler http.Handler, weight *int, fenced bool) { w := 1 if weight != nil { w = *weight @@ -271,6 +277,9 @@ func (b *Balancer) Add(name string, handler http.Handler, weight *int) { h.deadline = b.curDeadline + 1/h.weight heap.Push(b, h) b.status[name] = struct{}{} + if fenced { + b.fenced[name] = struct{}{} + } b.handlerMap[name] = h b.handlerMap[hash(name)] = h b.handlersMu.Unlock() diff --git a/pkg/server/service/loadbalancer/wrr/wrr_test.go b/pkg/server/service/loadbalancer/wrr/wrr_test.go index a7e03e154..d969bd279 100644 --- a/pkg/server/service/loadbalancer/wrr/wrr_test.go +++ b/pkg/server/service/loadbalancer/wrr/wrr_test.go @@ -18,12 +18,12 @@ func TestBalancer(t *testing.T) { balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { rw.Header().Set("server", "first") rw.WriteHeader(http.StatusOK) - }), pointer(3)) + }), pointer(3), false) balancer.Add("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { rw.Header().Set("server", "second") rw.WriteHeader(http.StatusOK) - }), pointer(1)) + }), pointer(1), false) recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}} for range 4 { @@ -49,9 +49,9 @@ func TestBalancerOneServerZeroWeight(t *testing.T) { balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { rw.Header().Set("server", "first") rw.WriteHeader(http.StatusOK) - }), pointer(1)) + }), pointer(1), false) - balancer.Add("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {}), pointer(0)) + balancer.Add("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {}), pointer(0), false) recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}} for range 3 { @@ -70,11 +70,11 @@ func TestBalancerNoServiceUp(t *testing.T) { balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { rw.WriteHeader(http.StatusInternalServerError) - }), pointer(1)) + }), pointer(1), false) balancer.Add("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { rw.WriteHeader(http.StatusInternalServerError) - }), pointer(1)) + }), pointer(1), false) balancer.SetStatus(context.WithValue(context.Background(), serviceName, "parent"), "first", false) balancer.SetStatus(context.WithValue(context.Background(), serviceName, "parent"), "second", false) @@ -91,11 +91,11 @@ func TestBalancerOneServerDown(t *testing.T) { balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { rw.Header().Set("server", "first") rw.WriteHeader(http.StatusOK) - }), pointer(1)) + }), pointer(1), false) balancer.Add("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { rw.WriteHeader(http.StatusInternalServerError) - }), pointer(1)) + }), pointer(1), false) balancer.SetStatus(context.WithValue(context.Background(), serviceName, "parent"), "second", false) recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}} @@ -112,12 +112,12 @@ func TestBalancerDownThenUp(t *testing.T) { balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { rw.Header().Set("server", "first") rw.WriteHeader(http.StatusOK) - }), pointer(1)) + }), pointer(1), false) balancer.Add("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { rw.Header().Set("server", "second") rw.WriteHeader(http.StatusOK) - }), pointer(1)) + }), pointer(1), false) balancer.SetStatus(context.WithValue(context.Background(), serviceName, "parent"), "second", false) recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}} @@ -141,30 +141,30 @@ func TestBalancerPropagate(t *testing.T) { balancer1.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { rw.Header().Set("server", "first") rw.WriteHeader(http.StatusOK) - }), pointer(1)) + }), pointer(1), false) balancer1.Add("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { rw.Header().Set("server", "second") rw.WriteHeader(http.StatusOK) - }), pointer(1)) + }), pointer(1), false) balancer2 := New(nil, true) balancer2.Add("third", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { rw.Header().Set("server", "third") rw.WriteHeader(http.StatusOK) - }), pointer(1)) + }), pointer(1), false) balancer2.Add("fourth", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { rw.Header().Set("server", "fourth") rw.WriteHeader(http.StatusOK) - }), pointer(1)) + }), pointer(1), false) topBalancer := New(nil, true) - topBalancer.Add("balancer1", balancer1, pointer(1)) + topBalancer.Add("balancer1", balancer1, pointer(1), false) _ = balancer1.RegisterStatusUpdater(func(up bool) { topBalancer.SetStatus(context.WithValue(context.Background(), serviceName, "top"), "balancer1", up) // TODO(mpl): if test gets flaky, add channel or something here to signal that // propagation is done, and wait on it before sending request. }) - topBalancer.Add("balancer2", balancer2, pointer(1)) + topBalancer.Add("balancer2", balancer2, pointer(1), false) _ = balancer2.RegisterStatusUpdater(func(up bool) { topBalancer.SetStatus(context.WithValue(context.Background(), serviceName, "top"), "balancer2", up) }) @@ -211,8 +211,20 @@ func TestBalancerPropagate(t *testing.T) { func TestBalancerAllServersZeroWeight(t *testing.T) { balancer := New(nil, false) - balancer.Add("test", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {}), pointer(0)) - balancer.Add("test2", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {}), pointer(0)) + balancer.Add("test", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {}), pointer(0), false) + balancer.Add("test2", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {}), pointer(0), false) + + recorder := httptest.NewRecorder() + balancer.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil)) + + assert.Equal(t, http.StatusServiceUnavailable, recorder.Result().StatusCode) +} + +func TestBalancerAllServersFenced(t *testing.T) { + balancer := New(nil, false) + + balancer.Add("test", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {}), pointer(1), true) + balancer.Add("test2", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {}), pointer(1), true) recorder := httptest.NewRecorder() balancer.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil)) @@ -235,12 +247,12 @@ func TestSticky(t *testing.T) { balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { rw.Header().Set("server", "first") rw.WriteHeader(http.StatusOK) - }), pointer(1)) + }), pointer(1), false) balancer.Add("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { rw.Header().Set("server", "second") rw.WriteHeader(http.StatusOK) - }), pointer(2)) + }), pointer(2), false) recorder := &responseRecorder{ ResponseRecorder: httptest.NewRecorder(), @@ -277,12 +289,12 @@ func TestSticky_FallBack(t *testing.T) { balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { rw.Header().Set("server", "first") rw.WriteHeader(http.StatusOK) - }), pointer(1)) + }), pointer(1), false) balancer.Add("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { rw.Header().Set("server", "second") rw.WriteHeader(http.StatusOK) - }), pointer(2)) + }), pointer(2), false) recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}} @@ -306,12 +318,12 @@ func TestBalancerBias(t *testing.T) { balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { rw.Header().Set("server", "A") rw.WriteHeader(http.StatusOK) - }), pointer(11)) + }), pointer(11), false) balancer.Add("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { rw.Header().Set("server", "B") rw.WriteHeader(http.StatusOK) - }), pointer(3)) + }), pointer(3), false) recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}} @@ -341,3 +353,41 @@ func (r *responseRecorder) WriteHeader(statusCode int) { } r.ResponseRecorder.WriteHeader(statusCode) } + +// TestSticky_Fenced checks that fenced node receive traffic if their sticky cookie matches. +func TestSticky_Fenced(t *testing.T) { + balancer := New(&dynamic.Sticky{Cookie: &dynamic.Cookie{Name: "test"}}, false) + + balancer.Add("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + rw.Header().Set("server", "first") + rw.WriteHeader(http.StatusOK) + }), pointer(1), false) + + balancer.Add("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + rw.Header().Set("server", "second") + rw.WriteHeader(http.StatusOK) + }), pointer(1), false) + + balancer.Add("fenced", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + rw.Header().Set("server", "fenced") + rw.WriteHeader(http.StatusOK) + }), pointer(1), true) + + recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}} + + stickyReq := httptest.NewRequest(http.MethodGet, "/", nil) + stickyReq.AddCookie(&http.Cookie{Name: "test", Value: "fenced"}) + + req := httptest.NewRequest(http.MethodGet, "/", nil) + + for range 4 { + recorder.ResponseRecorder = httptest.NewRecorder() + + balancer.ServeHTTP(recorder, stickyReq) + balancer.ServeHTTP(recorder, req) + } + + assert.Equal(t, 4, recorder.save["fenced"]) + assert.Equal(t, 2, recorder.save["first"]) + assert.Equal(t, 2, recorder.save["second"]) +} diff --git a/pkg/server/service/service.go b/pkg/server/service/service.go index 1c7d01642..602c7bd45 100644 --- a/pkg/server/service/service.go +++ b/pkg/server/service/service.go @@ -258,7 +258,7 @@ func (m *Manager) getWRRServiceHandler(ctx context.Context, serviceName string, return nil, err } - balancer.Add(service.Name, serviceHandler, service.Weight) + balancer.Add(service.Name, serviceHandler, service.Weight, false) if config.HealthCheck == nil { continue @@ -397,7 +397,7 @@ func (m *Manager) getLoadBalancerServiceHandler(ctx context.Context, serviceName proxy, _ = capture.Wrap(proxy) } - lb.Add(proxyName, proxy, server.Weight) + lb.Add(proxyName, proxy, server.Weight, server.Fenced) // servers are considered UP by default. info.UpdateServerStatus(target.String(), runtime.StatusUp)