Add passive health checks
This commit is contained in:
parent
c20802b07e
commit
fc0fac8543
20 changed files with 696 additions and 6 deletions
|
|
@ -244,10 +244,12 @@ type ServersLoadBalancer struct {
|
|||
// children servers of this load-balancer. To propagate status changes (e.g. all
|
||||
// servers of this service are down) upwards, HealthCheck must also be enabled on
|
||||
// the parent(s) of this service.
|
||||
HealthCheck *ServerHealthCheck `json:"healthCheck,omitempty" toml:"healthCheck,omitempty" yaml:"healthCheck,omitempty" export:"true"`
|
||||
PassHostHeader *bool `json:"passHostHeader" toml:"passHostHeader" yaml:"passHostHeader" export:"true"`
|
||||
ResponseForwarding *ResponseForwarding `json:"responseForwarding,omitempty" toml:"responseForwarding,omitempty" yaml:"responseForwarding,omitempty" export:"true"`
|
||||
ServersTransport string `json:"serversTransport,omitempty" toml:"serversTransport,omitempty" yaml:"serversTransport,omitempty" export:"true"`
|
||||
HealthCheck *ServerHealthCheck `json:"healthCheck,omitempty" toml:"healthCheck,omitempty" yaml:"healthCheck,omitempty" export:"true"`
|
||||
// PassiveHealthCheck enables passive health checks for children servers of this load-balancer.
|
||||
PassiveHealthCheck *PassiveServerHealthCheck `json:"passiveHealthCheck,omitempty" toml:"passiveHealthCheck,omitempty" yaml:"passiveHealthCheck,omitempty" export:"true"`
|
||||
PassHostHeader *bool `json:"passHostHeader" toml:"passHostHeader" yaml:"passHostHeader" export:"true"`
|
||||
ResponseForwarding *ResponseForwarding `json:"responseForwarding,omitempty" toml:"responseForwarding,omitempty" yaml:"responseForwarding,omitempty" export:"true"`
|
||||
ServersTransport string `json:"serversTransport,omitempty" toml:"serversTransport,omitempty" yaml:"serversTransport,omitempty" export:"true"`
|
||||
}
|
||||
|
||||
// Mergeable tells if the given service is mergeable.
|
||||
|
|
@ -336,6 +338,20 @@ func (h *ServerHealthCheck) SetDefaults() {
|
|||
|
||||
// +k8s:deepcopy-gen=true
|
||||
|
||||
type PassiveServerHealthCheck struct {
|
||||
// FailureWindow defines the time window during which the failed attempts must occur for the server to be marked as unhealthy. It also defines for how long the server will be considered unhealthy.
|
||||
FailureWindow ptypes.Duration `json:"failureWindow,omitempty" toml:"failureWindow,omitempty" yaml:"failureWindow,omitempty" export:"true"`
|
||||
// MaxFailedAttempts is the number of consecutive failed attempts allowed within the failure window before marking the server as unhealthy.
|
||||
MaxFailedAttempts int `json:"maxFailedAttempts,omitempty" toml:"maxFailedAttempts,omitempty" yaml:"maxFailedAttempts,omitempty" export:"true"`
|
||||
}
|
||||
|
||||
func (p *PassiveServerHealthCheck) SetDefaults() {
|
||||
p.FailureWindow = ptypes.Duration(10 * time.Second)
|
||||
p.MaxFailedAttempts = 1
|
||||
}
|
||||
|
||||
// +k8s:deepcopy-gen=true
|
||||
|
||||
// HealthCheck controls healthcheck awareness and propagation at the services level.
|
||||
type HealthCheck struct{}
|
||||
|
||||
|
|
|
|||
|
|
@ -1071,6 +1071,22 @@ func (in *PassTLSClientCert) DeepCopy() *PassTLSClientCert {
|
|||
return out
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *PassiveServerHealthCheck) DeepCopyInto(out *PassiveServerHealthCheck) {
|
||||
*out = *in
|
||||
return
|
||||
}
|
||||
|
||||
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PassiveServerHealthCheck.
|
||||
func (in *PassiveServerHealthCheck) DeepCopy() *PassiveServerHealthCheck {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
out := new(PassiveServerHealthCheck)
|
||||
in.DeepCopyInto(out)
|
||||
return out
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *ProxyProtocol) DeepCopyInto(out *ProxyProtocol) {
|
||||
*out = *in
|
||||
|
|
@ -1478,6 +1494,11 @@ func (in *ServersLoadBalancer) DeepCopyInto(out *ServersLoadBalancer) {
|
|||
*out = new(ServerHealthCheck)
|
||||
(*in).DeepCopyInto(*out)
|
||||
}
|
||||
if in.PassiveHealthCheck != nil {
|
||||
in, out := &in.PassiveHealthCheck, &out.PassiveHealthCheck
|
||||
*out = new(PassiveServerHealthCheck)
|
||||
**out = **in
|
||||
}
|
||||
if in.PassHostHeader != nil {
|
||||
in, out := &in.PassHostHeader, &out.PassHostHeader
|
||||
*out = new(bool)
|
||||
|
|
|
|||
|
|
@ -1,19 +1,24 @@
|
|||
package healthcheck
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httptrace"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
gokitmetrics "github.com/go-kit/kit/metrics"
|
||||
"github.com/rs/zerolog/log"
|
||||
ptypes "github.com/traefik/paerser/types"
|
||||
"github.com/traefik/traefik/v3/pkg/config/dynamic"
|
||||
"github.com/traefik/traefik/v3/pkg/config/runtime"
|
||||
"golang.org/x/sync/singleflight"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
|
|
@ -322,3 +327,160 @@ func (shc *ServiceHealthChecker) checkHealthGRPC(ctx context.Context, serverURL
|
|||
|
||||
return nil
|
||||
}
|
||||
|
||||
type PassiveServiceHealthChecker struct {
|
||||
serviceName string
|
||||
balancer StatusSetter
|
||||
metrics metricsHealthCheck
|
||||
|
||||
maxFailedAttempts int
|
||||
failureWindow ptypes.Duration
|
||||
hasActiveHealthCheck bool
|
||||
|
||||
failuresMu sync.RWMutex
|
||||
failures map[string][]time.Time
|
||||
|
||||
timersGroup singleflight.Group
|
||||
timers sync.Map
|
||||
}
|
||||
|
||||
func NewPassiveHealthChecker(serviceName string, balancer StatusSetter, maxFailedAttempts int, failureWindow ptypes.Duration, hasActiveHealthCheck bool, metrics metricsHealthCheck) *PassiveServiceHealthChecker {
|
||||
return &PassiveServiceHealthChecker{
|
||||
serviceName: serviceName,
|
||||
balancer: balancer,
|
||||
failures: make(map[string][]time.Time),
|
||||
maxFailedAttempts: maxFailedAttempts,
|
||||
failureWindow: failureWindow,
|
||||
hasActiveHealthCheck: hasActiveHealthCheck,
|
||||
metrics: metrics,
|
||||
}
|
||||
}
|
||||
|
||||
func (p *PassiveServiceHealthChecker) WrapHandler(ctx context.Context, next http.Handler, targetURL string) http.Handler {
|
||||
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||
var backendCalled bool
|
||||
trace := &httptrace.ClientTrace{
|
||||
WroteHeaders: func() {
|
||||
backendCalled = true
|
||||
},
|
||||
WroteRequest: func(httptrace.WroteRequestInfo) {
|
||||
backendCalled = true
|
||||
},
|
||||
}
|
||||
clientTraceCtx := httptrace.WithClientTrace(req.Context(), trace)
|
||||
|
||||
codeCatcher := &codeCatcher{
|
||||
ResponseWriter: rw,
|
||||
}
|
||||
|
||||
next.ServeHTTP(codeCatcher, req.WithContext(clientTraceCtx))
|
||||
|
||||
if backendCalled && codeCatcher.statusCode < http.StatusInternalServerError {
|
||||
p.failuresMu.Lock()
|
||||
p.failures[targetURL] = nil
|
||||
p.failuresMu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
p.failuresMu.Lock()
|
||||
p.failures[targetURL] = append(p.failures[targetURL], time.Now())
|
||||
p.failuresMu.Unlock()
|
||||
|
||||
if p.healthy(targetURL) {
|
||||
return
|
||||
}
|
||||
|
||||
// We need to guarantee that only one goroutine (request) will update the status and create a timer for the target.
|
||||
_, _, _ = p.timersGroup.Do(targetURL, func() (interface{}, error) {
|
||||
// A timer is already running for this target;
|
||||
// it means that the target is already considered unhealthy.
|
||||
if _, ok := p.timers.Load(targetURL); ok {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
p.balancer.SetStatus(ctx, targetURL, false)
|
||||
p.metrics.ServiceServerUpGauge().With("service", p.serviceName, "url", targetURL).Set(0)
|
||||
|
||||
// If the service has an active health check, the passive health checker should not reset the status.
|
||||
// The active health check will handle the status updates.
|
||||
if p.hasActiveHealthCheck {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
go func() {
|
||||
timer := time.NewTimer(time.Duration(p.failureWindow))
|
||||
defer timer.Stop()
|
||||
|
||||
p.timers.Store(targetURL, timer)
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
case <-timer.C:
|
||||
p.timers.Delete(targetURL)
|
||||
|
||||
p.balancer.SetStatus(ctx, targetURL, true)
|
||||
p.metrics.ServiceServerUpGauge().With("service", p.serviceName, "url", targetURL).Set(1)
|
||||
}
|
||||
}()
|
||||
|
||||
return nil, nil
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
func (p *PassiveServiceHealthChecker) healthy(targetURL string) bool {
|
||||
windowStart := time.Now().Add(-time.Duration(p.failureWindow))
|
||||
|
||||
p.failuresMu.Lock()
|
||||
defer p.failuresMu.Unlock()
|
||||
|
||||
// Filter failures within the sliding window.
|
||||
failures := p.failures[targetURL]
|
||||
for i, t := range failures {
|
||||
if t.After(windowStart) {
|
||||
p.failures[targetURL] = failures[i:]
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Check if failures exceed maxFailedAttempts.
|
||||
return len(p.failures[targetURL]) < p.maxFailedAttempts
|
||||
}
|
||||
|
||||
type codeCatcher struct {
|
||||
http.ResponseWriter
|
||||
|
||||
statusCode int
|
||||
}
|
||||
|
||||
func (c *codeCatcher) WriteHeader(statusCode int) {
|
||||
// Here we allow the overriding of the status code,
|
||||
// for the health check we care about the last status code written.
|
||||
c.statusCode = statusCode
|
||||
c.ResponseWriter.WriteHeader(statusCode)
|
||||
}
|
||||
|
||||
func (c *codeCatcher) Write(bytes []byte) (int, error) {
|
||||
// At the time of writing, if the status code is not set,
|
||||
// or set to an informational status code (1xx),
|
||||
// we set it to http.StatusOK (200).
|
||||
if c.statusCode < http.StatusOK {
|
||||
c.statusCode = http.StatusOK
|
||||
}
|
||||
|
||||
return c.ResponseWriter.Write(bytes)
|
||||
}
|
||||
|
||||
func (c *codeCatcher) Flush() {
|
||||
if flusher, ok := c.ResponseWriter.(http.Flusher); ok {
|
||||
flusher.Flush()
|
||||
}
|
||||
}
|
||||
|
||||
func (c *codeCatcher) Hijack() (net.Conn, *bufio.ReadWriter, error) {
|
||||
if h, ok := c.ResponseWriter.(http.Hijacker); ok {
|
||||
return h.Hijack()
|
||||
}
|
||||
|
||||
return nil, nil, fmt.Errorf("not a hijacker: %T", c.ResponseWriter)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -392,6 +392,21 @@ func (c configBuilder) buildServersLB(namespace string, svc traefikv1alpha1.Load
|
|||
}
|
||||
}
|
||||
|
||||
if svc.PassiveHealthCheck != nil {
|
||||
lb.PassiveHealthCheck = &dynamic.PassiveServerHealthCheck{}
|
||||
lb.PassiveHealthCheck.SetDefaults()
|
||||
|
||||
if svc.PassiveHealthCheck.MaxFailedAttempts != nil {
|
||||
lb.PassiveHealthCheck.MaxFailedAttempts = *svc.PassiveHealthCheck.MaxFailedAttempts
|
||||
}
|
||||
|
||||
if svc.PassiveHealthCheck.FailureWindow != nil {
|
||||
if err := lb.PassiveHealthCheck.FailureWindow.Set(svc.PassiveHealthCheck.FailureWindow.String()); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
conf := svc
|
||||
lb.PassHostHeader = conf.PassHostHeader
|
||||
if lb.PassHostHeader == nil {
|
||||
|
|
|
|||
|
|
@ -144,6 +144,8 @@ type LoadBalancerSpec struct {
|
|||
NodePortLB bool `json:"nodePortLB,omitempty"`
|
||||
// Healthcheck defines health checks for ExternalName services.
|
||||
HealthCheck *ServerHealthCheck `json:"healthCheck,omitempty"`
|
||||
// PassiveHealthCheck defines passive health checks for ExternalName services.
|
||||
PassiveHealthCheck *PassiveServerHealthCheck `json:"passiveHealthCheck,omitempty"`
|
||||
}
|
||||
|
||||
type ResponseForwarding struct {
|
||||
|
|
@ -189,6 +191,13 @@ type ServerHealthCheck struct {
|
|||
Headers map[string]string `json:"headers,omitempty"`
|
||||
}
|
||||
|
||||
type PassiveServerHealthCheck struct {
|
||||
// FailureWindow defines the time window during which the failed attempts must occur for the server to be marked as unhealthy. It also defines for how long the server will be considered unhealthy.
|
||||
FailureWindow *intstr.IntOrString `json:"failureWindow,omitempty"`
|
||||
// MaxFailedAttempts is the number of consecutive failed attempts allowed within the failure window before marking the server as unhealthy.
|
||||
MaxFailedAttempts *int `json:"maxFailedAttempts,omitempty"`
|
||||
}
|
||||
|
||||
// Service defines an upstream HTTP service to proxy traffic to.
|
||||
type Service struct {
|
||||
LoadBalancerSpec `json:",inline"`
|
||||
|
|
|
|||
|
|
@ -657,6 +657,11 @@ func (in *LoadBalancerSpec) DeepCopyInto(out *LoadBalancerSpec) {
|
|||
*out = new(ServerHealthCheck)
|
||||
(*in).DeepCopyInto(*out)
|
||||
}
|
||||
if in.PassiveHealthCheck != nil {
|
||||
in, out := &in.PassiveHealthCheck, &out.PassiveHealthCheck
|
||||
*out = new(PassiveServerHealthCheck)
|
||||
(*in).DeepCopyInto(*out)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
|
@ -1047,6 +1052,32 @@ func (in *ObjectReference) DeepCopy() *ObjectReference {
|
|||
return out
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *PassiveServerHealthCheck) DeepCopyInto(out *PassiveServerHealthCheck) {
|
||||
*out = *in
|
||||
if in.FailureWindow != nil {
|
||||
in, out := &in.FailureWindow, &out.FailureWindow
|
||||
*out = new(intstr.IntOrString)
|
||||
**out = **in
|
||||
}
|
||||
if in.MaxFailedAttempts != nil {
|
||||
in, out := &in.MaxFailedAttempts, &out.MaxFailedAttempts
|
||||
*out = new(int)
|
||||
**out = **in
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new PassiveServerHealthCheck.
|
||||
func (in *PassiveServerHealthCheck) DeepCopy() *PassiveServerHealthCheck {
|
||||
if in == nil {
|
||||
return nil
|
||||
}
|
||||
out := new(PassiveServerHealthCheck)
|
||||
in.DeepCopyInto(out)
|
||||
return out
|
||||
}
|
||||
|
||||
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
|
||||
func (in *RateLimit) DeepCopyInto(out *RateLimit) {
|
||||
*out = *in
|
||||
|
|
|
|||
|
|
@ -341,7 +341,7 @@ func (m *Manager) getLoadBalancerServiceHandler(ctx context.Context, serviceName
|
|||
var lb serverBalancer
|
||||
switch service.Strategy {
|
||||
// Here we are handling the empty value to comply with providers that are not applying defaults (e.g. REST provider)
|
||||
// TODO: remove this when all providers apply default values.
|
||||
// TODO: remove this empty check when all providers apply default values.
|
||||
case dynamic.BalancerStrategyWRR, "":
|
||||
lb = wrr.New(service.Sticky, service.HealthCheck != nil)
|
||||
case dynamic.BalancerStrategyP2C:
|
||||
|
|
@ -350,6 +350,17 @@ func (m *Manager) getLoadBalancerServiceHandler(ctx context.Context, serviceName
|
|||
return nil, fmt.Errorf("unsupported load-balancer strategy %q", service.Strategy)
|
||||
}
|
||||
|
||||
var passiveHealthChecker *healthcheck.PassiveServiceHealthChecker
|
||||
if service.PassiveHealthCheck != nil {
|
||||
passiveHealthChecker = healthcheck.NewPassiveHealthChecker(
|
||||
serviceName,
|
||||
lb,
|
||||
service.PassiveHealthCheck.MaxFailedAttempts,
|
||||
service.PassiveHealthCheck.FailureWindow,
|
||||
service.HealthCheck != nil,
|
||||
m.observabilityMgr.MetricsRegistry())
|
||||
}
|
||||
|
||||
healthCheckTargets := make(map[string]*url.URL)
|
||||
|
||||
for i, server := range shuffle(service.Servers, m.rand) {
|
||||
|
|
@ -368,6 +379,11 @@ func (m *Manager) getLoadBalancerServiceHandler(ctx context.Context, serviceName
|
|||
return nil, fmt.Errorf("error building proxy for server URL %s: %w", server.URL, err)
|
||||
}
|
||||
|
||||
if passiveHealthChecker != nil {
|
||||
// If passive health check is enabled, we wrap the proxy with the passive health checker.
|
||||
proxy = passiveHealthChecker.WrapHandler(ctx, proxy, target.String())
|
||||
}
|
||||
|
||||
// The retry wrapping must be done just before the proxy handler,
|
||||
// to make sure that the retry will not be triggered/disabled by
|
||||
// middlewares in the chain.
|
||||
|
|
|
|||
|
|
@ -10,9 +10,11 @@ import (
|
|||
"net/textproto"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
ptypes "github.com/traefik/paerser/types"
|
||||
"github.com/traefik/traefik/v3/pkg/config/dynamic"
|
||||
"github.com/traefik/traefik/v3/pkg/config/runtime"
|
||||
"github.com/traefik/traefik/v3/pkg/proxy/httputil"
|
||||
|
|
@ -67,6 +69,19 @@ func TestGetLoadBalancer(t *testing.T) {
|
|||
fwd: &forwarderMock{},
|
||||
expectError: false,
|
||||
},
|
||||
{
|
||||
desc: "Succeeds when passive health checker is set",
|
||||
serviceName: "test",
|
||||
service: &dynamic.ServersLoadBalancer{
|
||||
Strategy: dynamic.BalancerStrategyWRR,
|
||||
PassiveHealthCheck: &dynamic.PassiveServerHealthCheck{
|
||||
FailureWindow: ptypes.Duration(30 * time.Second),
|
||||
MaxFailedAttempts: 3,
|
||||
},
|
||||
},
|
||||
fwd: &forwarderMock{},
|
||||
expectError: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range testCases {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue