1
0
Fork 0

healthcheck: add support at the load-balancers of services level

Co-authored-by: Dmitry Sharshakov <d3dx12.xx@gmail.com>
Co-authored-by: Julien Salleyron <julien.salleyron@gmail.com>
Co-authored-by: Jean-Baptiste Doumenjou <925513+jbdoumenjou@users.noreply.github.com>
Co-authored-by: Romain <rtribotte@users.noreply.github.com>
Co-authored-by: Tom Moulard <tom.moulard@traefik.io>
This commit is contained in:
mpl 2021-06-25 21:08:11 +02:00 committed by GitHub
parent 5e3e47b484
commit 838a8e18d3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
28 changed files with 1196 additions and 120 deletions

View file

@ -77,7 +77,7 @@ func TestDo_dynamicConfiguration(t *testing.T) {
SameSite: "foo",
},
},
HealthCheck: &dynamic.HealthCheck{
HealthCheck: &dynamic.ServerHealthCheck{
Scheme: "foo",
Path: "foo",
Port: 42,

View file

@ -65,6 +65,7 @@ type Mirroring struct {
Service string `json:"service,omitempty" toml:"service,omitempty" yaml:"service,omitempty" export:"true"`
MaxBodySize *int64 `json:"maxBodySize,omitempty" toml:"maxBodySize,omitempty" yaml:"maxBodySize,omitempty" export:"true"`
Mirrors []MirrorService `json:"mirrors,omitempty" toml:"mirrors,omitempty" yaml:"mirrors,omitempty" export:"true"`
HealthCheck *HealthCheck `json:"healthCheck,omitempty" toml:"healthCheck,omitempty" yaml:"healthCheck,omitempty" label:"allowEmpty" file:"allowEmpty" export:"true"`
}
// SetDefaults Default values for a WRRService.
@ -87,6 +88,12 @@ type MirrorService struct {
type WeightedRoundRobin struct {
Services []WRRService `json:"services,omitempty" toml:"services,omitempty" yaml:"services,omitempty" export:"true"`
Sticky *Sticky `json:"sticky,omitempty" toml:"sticky,omitempty" yaml:"sticky,omitempty" export:"true"`
// HealthCheck enables automatic self-healthcheck for this service, i.e.
// whenever one of its children is reported as down, this service becomes aware of it,
// and takes it into account (i.e. it ignores the down child) when running the
// load-balancing algorithm. In addition, if the parent of this service also has
// HealthCheck enabled, this service reports to its parent any status change.
HealthCheck *HealthCheck `json:"healthCheck,omitempty" toml:"healthCheck,omitempty" yaml:"healthCheck,omitempty" label:"allowEmpty" file:"allowEmpty" export:"true"`
}
// +k8s:deepcopy-gen=true
@ -124,9 +131,13 @@ type Cookie struct {
// ServersLoadBalancer holds the ServersLoadBalancer configuration.
type ServersLoadBalancer struct {
Sticky *Sticky `json:"sticky,omitempty" toml:"sticky,omitempty" yaml:"sticky,omitempty" label:"allowEmpty" file:"allowEmpty" export:"true"`
Servers []Server `json:"servers,omitempty" toml:"servers,omitempty" yaml:"servers,omitempty" label-slice-as-struct:"server" export:"true"`
HealthCheck *HealthCheck `json:"healthCheck,omitempty" toml:"healthCheck,omitempty" yaml:"healthCheck,omitempty" export:"true"`
Sticky *Sticky `json:"sticky,omitempty" toml:"sticky,omitempty" yaml:"sticky,omitempty" label:"allowEmpty" file:"allowEmpty" export:"true"`
Servers []Server `json:"servers,omitempty" toml:"servers,omitempty" yaml:"servers,omitempty" label-slice-as-struct:"server" export:"true"`
// HealthCheck enables regular active checks of the responsiveness of the
// children servers of this load-balancer. To propagate status changes (e.g. all
// servers of this service are down) upwards, HealthCheck must also be enabled on
// the parent(s) of this service.
HealthCheck *ServerHealthCheck `json:"healthCheck,omitempty" toml:"healthCheck,omitempty" yaml:"healthCheck,omitempty" export:"true"`
PassHostHeader *bool `json:"passHostHeader" toml:"passHostHeader" yaml:"passHostHeader" export:"true"`
ResponseForwarding *ResponseForwarding `json:"responseForwarding,omitempty" toml:"responseForwarding,omitempty" yaml:"responseForwarding,omitempty" export:"true"`
ServersTransport string `json:"serversTransport,omitempty" toml:"serversTransport,omitempty" yaml:"serversTransport,omitempty" export:"true"`
@ -178,8 +189,8 @@ func (s *Server) SetDefaults() {
// +k8s:deepcopy-gen=true
// HealthCheck holds the HealthCheck configuration.
type HealthCheck struct {
// ServerHealthCheck holds the HealthCheck configuration.
type ServerHealthCheck struct {
Scheme string `json:"scheme,omitempty" toml:"scheme,omitempty" yaml:"scheme,omitempty" export:"true"`
Path string `json:"path,omitempty" toml:"path,omitempty" yaml:"path,omitempty" export:"true"`
Port int `json:"port,omitempty" toml:"port,omitempty,omitzero" yaml:"port,omitempty" export:"true"`
@ -193,13 +204,18 @@ type HealthCheck struct {
}
// SetDefaults Default values for a HealthCheck.
func (h *HealthCheck) SetDefaults() {
func (h *ServerHealthCheck) SetDefaults() {
fr := true
h.FollowRedirects = &fr
}
// +k8s:deepcopy-gen=true
// HealthCheck controls healthcheck awareness and propagation at the services level.
type HealthCheck struct{}
// +k8s:deepcopy-gen=true
// ServersTransport options to configure communication between Traefik and the servers.
type ServersTransport struct {
ServerName string `description:"ServerName used to contact the server" json:"serverName,omitempty" toml:"serverName,omitempty" yaml:"serverName,omitempty"`

View file

@ -513,18 +513,6 @@ func (in *Headers) DeepCopy() *Headers {
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *HealthCheck) DeepCopyInto(out *HealthCheck) {
*out = *in
if in.FollowRedirects != nil {
in, out := &in.FollowRedirects, &out.FollowRedirects
*out = new(bool)
**out = **in
}
if in.Headers != nil {
in, out := &in.Headers, &out.Headers
*out = make(map[string]string, len(*in))
for key, val := range *in {
(*out)[key] = val
}
}
return
}
@ -789,6 +777,11 @@ func (in *Mirroring) DeepCopyInto(out *Mirroring) {
*out = make([]MirrorService, len(*in))
copy(*out, *in)
}
if in.HealthCheck != nil {
in, out := &in.HealthCheck, &out.HealthCheck
*out = new(HealthCheck)
**out = **in
}
return
}
@ -1075,6 +1068,34 @@ func (in *Server) DeepCopy() *Server {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ServerHealthCheck) DeepCopyInto(out *ServerHealthCheck) {
*out = *in
if in.FollowRedirects != nil {
in, out := &in.FollowRedirects, &out.FollowRedirects
*out = new(bool)
**out = **in
}
if in.Headers != nil {
in, out := &in.Headers, &out.Headers
*out = make(map[string]string, len(*in))
for key, val := range *in {
(*out)[key] = val
}
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new ServerHealthCheck.
func (in *ServerHealthCheck) DeepCopy() *ServerHealthCheck {
if in == nil {
return nil
}
out := new(ServerHealthCheck)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ServersLoadBalancer) DeepCopyInto(out *ServersLoadBalancer) {
*out = *in
@ -1090,7 +1111,7 @@ func (in *ServersLoadBalancer) DeepCopyInto(out *ServersLoadBalancer) {
}
if in.HealthCheck != nil {
in, out := &in.HealthCheck, &out.HealthCheck
*out = new(HealthCheck)
*out = new(ServerHealthCheck)
(*in).DeepCopyInto(*out)
}
if in.PassHostHeader != nil {
@ -1826,6 +1847,11 @@ func (in *WeightedRoundRobin) DeepCopyInto(out *WeightedRoundRobin) {
*out = new(Sticky)
(*in).DeepCopyInto(*out)
}
if in.HealthCheck != nil {
in, out := &in.HealthCheck, &out.HealthCheck
*out = new(HealthCheck)
**out = **in
}
return
}

View file

@ -625,7 +625,7 @@ func TestDecodeConfiguration(t *testing.T) {
Port: "8080",
},
},
HealthCheck: &dynamic.HealthCheck{
HealthCheck: &dynamic.ServerHealthCheck{
Scheme: "foobar",
Path: "foobar",
Port: 42,
@ -652,7 +652,7 @@ func TestDecodeConfiguration(t *testing.T) {
Port: "8080",
},
},
HealthCheck: &dynamic.HealthCheck{
HealthCheck: &dynamic.ServerHealthCheck{
Scheme: "foobar",
Path: "foobar",
Port: 42,
@ -1100,7 +1100,7 @@ func TestEncodeConfiguration(t *testing.T) {
Port: "8080",
},
},
HealthCheck: &dynamic.HealthCheck{
HealthCheck: &dynamic.ServerHealthCheck{
Scheme: "foobar",
Path: "foobar",
Port: 42,
@ -1126,7 +1126,7 @@ func TestEncodeConfiguration(t *testing.T) {
Port: "8080",
},
},
HealthCheck: &dynamic.HealthCheck{
HealthCheck: &dynamic.ServerHealthCheck{
Scheme: "foobar",
Path: "foobar",
Port: 42,

View file

@ -48,7 +48,7 @@ func TestPopulateUsedBy(t *testing.T) {
{URL: "http://127.0.0.1:8085"},
{URL: "http://127.0.0.1:8086"},
},
HealthCheck: &dynamic.HealthCheck{
HealthCheck: &dynamic.ServerHealthCheck{
Interval: "500ms",
Path: "/health",
},
@ -158,7 +158,7 @@ func TestPopulateUsedBy(t *testing.T) {
URL: "http://127.0.0.1:8086",
},
},
HealthCheck: &dynamic.HealthCheck{
HealthCheck: &dynamic.ServerHealthCheck{
Interval: "500ms",
Path: "/health",
},
@ -176,7 +176,7 @@ func TestPopulateUsedBy(t *testing.T) {
URL: "http://127.0.0.1:8088",
},
},
HealthCheck: &dynamic.HealthCheck{
HealthCheck: &dynamic.ServerHealthCheck{
Interval: "500ms",
Path: "/health",
},

View file

@ -2,6 +2,7 @@ package healthcheck
import (
"context"
"errors"
"fmt"
"net"
"net/http"
@ -11,6 +12,7 @@ import (
"time"
gokitmetrics "github.com/go-kit/kit/metrics"
"github.com/traefik/traefik/v2/pkg/config/dynamic"
"github.com/traefik/traefik/v2/pkg/config/runtime"
"github.com/traefik/traefik/v2/pkg/log"
"github.com/traefik/traefik/v2/pkg/metrics"
@ -41,6 +43,13 @@ type BalancerHandler interface {
Balancer
}
// BalancerStatusHandler is an http Handler that does load-balancing,
// andupdates its parents of its status.
type BalancerStatusHandler interface {
BalancerHandler
StatusUpdater
}
type metricsHealthcheck struct {
serverUpGauge gokitmetrics.Gauge
}
@ -130,9 +139,10 @@ func (hc *HealthCheck) SetBackendsConfiguration(parentCtx context.Context, backe
func (hc *HealthCheck) execute(ctx context.Context, backend *BackendConfig) {
logger := log.FromContext(ctx)
logger.Debugf("Initial health check for backend: %q", backend.name)
hc.checkBackend(ctx, backend)
logger.Debugf("Initial health check for backend: %q", backend.name)
hc.checkServersLB(ctx, backend)
ticker := time.NewTicker(backend.Interval)
defer ticker.Stop()
for {
@ -141,13 +151,13 @@ func (hc *HealthCheck) execute(ctx context.Context, backend *BackendConfig) {
logger.Debugf("Stopping current health check goroutines of backend: %s", backend.name)
return
case <-ticker.C:
logger.Debugf("Refreshing health check for backend: %s", backend.name)
hc.checkBackend(ctx, backend)
logger.Debugf("Routine health check refresh for backend: %s", backend.name)
hc.checkServersLB(ctx, backend)
}
}
}
func (hc *HealthCheck) checkBackend(ctx context.Context, backend *BackendConfig) {
func (hc *HealthCheck) checkServersLB(ctx context.Context, backend *BackendConfig) {
logger := log.FromContext(ctx)
enabledURLs := backend.LB.Servers()
@ -157,12 +167,11 @@ func (hc *HealthCheck) checkBackend(ctx context.Context, backend *BackendConfig)
serverUpMetricValue := float64(0)
if err := checkHealth(disabledURL.url, backend); err == nil {
logger.Warnf("Health check up: Returning to server list. Backend: %q URL: %q Weight: %d",
logger.Warnf("Health check up: returning to server list. Backend: %q URL: %q Weight: %d",
backend.name, disabledURL.url.String(), disabledURL.weight)
if err = backend.LB.UpsertServer(disabledURL.url, roundrobin.Weight(disabledURL.weight)); err != nil {
logger.Error(err)
}
serverUpMetricValue = 1
} else {
logger.Warnf("Health check still failing. Backend: %q URL: %q Reason: %s", backend.name, disabledURL.url.String(), err)
@ -175,31 +184,31 @@ func (hc *HealthCheck) checkBackend(ctx context.Context, backend *BackendConfig)
backend.disabledURLs = newDisabledURLs
for _, enableURL := range enabledURLs {
for _, enabledURL := range enabledURLs {
serverUpMetricValue := float64(1)
if err := checkHealth(enableURL, backend); err != nil {
if err := checkHealth(enabledURL, backend); err != nil {
weight := 1
rr, ok := backend.LB.(*roundrobin.RoundRobin)
if ok {
var gotWeight bool
weight, gotWeight = rr.ServerWeight(enableURL)
weight, gotWeight = rr.ServerWeight(enabledURL)
if !gotWeight {
weight = 1
}
}
logger.Warnf("Health check failed, removing from server list. Backend: %q URL: %q Weight: %d Reason: %s",
backend.name, enableURL.String(), weight, err)
if err := backend.LB.RemoveServer(enableURL); err != nil {
backend.name, enabledURL.String(), weight, err)
if err := backend.LB.RemoveServer(enabledURL); err != nil {
logger.Error(err)
}
backend.disabledURLs = append(backend.disabledURLs, backendURL{enableURL, weight})
backend.disabledURLs = append(backend.disabledURLs, backendURL{enabledURL, weight})
serverUpMetricValue = 0
}
labelValues := []string{"service", backend.name, "url", enableURL.String()}
labelValues := []string{"service", backend.name, "url", enabledURL.String()}
hc.metrics.serverUpGauge.With(labelValues...).Set(serverUpMetricValue)
}
}
@ -264,11 +273,19 @@ func checkHealth(serverURL *url.URL, backend *BackendConfig) error {
return nil
}
// StatusUpdater should be implemented by a service that, when its status
// changes (e.g. all if its children are down), needs to propagate upwards (to
// their parent(s)) that change.
type StatusUpdater interface {
RegisterStatusUpdater(fn func(up bool)) error
}
// NewLBStatusUpdater returns a new LbStatusUpdater.
func NewLBStatusUpdater(bh BalancerHandler, info *runtime.ServiceInfo) *LbStatusUpdater {
func NewLBStatusUpdater(bh BalancerHandler, info *runtime.ServiceInfo, hc *dynamic.ServerHealthCheck) *LbStatusUpdater {
return &LbStatusUpdater{
BalancerHandler: bh,
serviceInfo: info,
BalancerHandler: bh,
serviceInfo: info,
wantsHealthCheck: hc != nil,
}
}
@ -276,27 +293,83 @@ func NewLBStatusUpdater(bh BalancerHandler, info *runtime.ServiceInfo) *LbStatus
// so it can keep track of the status of a server in the ServiceInfo.
type LbStatusUpdater struct {
BalancerHandler
serviceInfo *runtime.ServiceInfo // can be nil
serviceInfo *runtime.ServiceInfo // can be nil
updaters []func(up bool)
wantsHealthCheck bool
}
// RegisterStatusUpdater adds fn to the list of hooks that are run when the
// status of the Balancer changes.
// Not thread safe.
func (lb *LbStatusUpdater) RegisterStatusUpdater(fn func(up bool)) error {
if !lb.wantsHealthCheck {
return errors.New("healthCheck not enabled in config for this loadbalancer service")
}
lb.updaters = append(lb.updaters, fn)
return nil
}
// RemoveServer removes the given server from the BalancerHandler,
// and updates the status of the server to "DOWN".
func (lb *LbStatusUpdater) RemoveServer(u *url.URL) error {
// TODO(mpl): when we have the freedom to change the signature of RemoveServer
// (kinda stuck because of oxy for now), let's pass around a context to improve
// logging.
ctx := context.TODO()
upBefore := len(lb.BalancerHandler.Servers()) > 0
err := lb.BalancerHandler.RemoveServer(u)
if err == nil && lb.serviceInfo != nil {
if err != nil {
return err
}
if lb.serviceInfo != nil {
lb.serviceInfo.UpdateServerStatus(u.String(), serverDown)
}
return err
log.FromContext(ctx).Debugf("child %s now %s", u.String(), serverDown)
if !upBefore {
// we were already down, and we still are, no need to propagate.
log.FromContext(ctx).Debugf("Still %s, no need to propagate", serverDown)
return nil
}
if len(lb.BalancerHandler.Servers()) > 0 {
// we were up, and we still are, no need to propagate
log.FromContext(ctx).Debugf("Still %s, no need to propagate", serverUp)
return nil
}
log.FromContext(ctx).Debugf("Propagating new %s status", serverDown)
for _, fn := range lb.updaters {
fn(false)
}
return nil
}
// UpsertServer adds the given server to the BalancerHandler,
// and updates the status of the server to "UP".
func (lb *LbStatusUpdater) UpsertServer(u *url.URL, options ...roundrobin.ServerOption) error {
ctx := context.TODO()
upBefore := len(lb.BalancerHandler.Servers()) > 0
err := lb.BalancerHandler.UpsertServer(u, options...)
if err == nil && lb.serviceInfo != nil {
if err != nil {
return err
}
if lb.serviceInfo != nil {
lb.serviceInfo.UpdateServerStatus(u.String(), serverUp)
}
return err
log.FromContext(ctx).Debugf("child %s now %s", u.String(), serverUp)
if upBefore {
// we were up, and we still are, no need to propagate
log.FromContext(ctx).Debugf("Still %s, no need to propagate", serverUp)
return nil
}
log.FromContext(ctx).Debugf("Propagating new %s status", serverUp)
for _, fn := range lb.updaters {
fn(true)
}
return nil
}
// Balancers is a list of Balancers(s) that implements the Balancer interface.

View file

@ -445,7 +445,7 @@ func (th *testHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
func TestLBStatusUpdater(t *testing.T) {
lb := &testLoadBalancer{RWMutex: &sync.RWMutex{}}
svInfo := &runtime.ServiceInfo{}
lbsu := NewLBStatusUpdater(lb, svInfo)
lbsu := NewLBStatusUpdater(lb, svInfo, nil)
newServer, err := url.Parse("http://foo.com")
assert.NoError(t, err)
err = lbsu.UpsertServer(newServer, roundrobin.Weight(1))

View file

@ -10,24 +10,25 @@ import (
// has at least one active Server in respect to the healthchecks and if this
// is not the case, it will stop the middleware chain and respond with 503.
type emptyBackend struct {
next healthcheck.BalancerHandler
healthcheck.BalancerStatusHandler
}
// New creates a new EmptyBackend middleware.
func New(lb healthcheck.BalancerHandler) http.Handler {
return &emptyBackend{next: lb}
func New(lb healthcheck.BalancerStatusHandler) http.Handler {
return &emptyBackend{BalancerStatusHandler: lb}
}
// ServeHTTP responds with 503 when there is no active Server and otherwise
// invokes the next handler in the middleware chain.
func (e *emptyBackend) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
if len(e.next.Servers()) == 0 {
rw.WriteHeader(http.StatusServiceUnavailable)
_, err := rw.Write([]byte(http.StatusText(http.StatusServiceUnavailable)))
if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
}
} else {
e.next.ServeHTTP(rw, req)
if len(e.BalancerStatusHandler.Servers()) != 0 {
e.BalancerStatusHandler.ServeHTTP(rw, req)
return
}
rw.WriteHeader(http.StatusServiceUnavailable)
if _, err := rw.Write([]byte(http.StatusText(http.StatusServiceUnavailable))); err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
return
}
}

View file

@ -48,6 +48,10 @@ type healthCheckLoadBalancer struct {
amountServer int
}
func (lb *healthCheckLoadBalancer) RegisterStatusUpdater(fn func(up bool)) error {
return nil
}
func (lb *healthCheckLoadBalancer) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}

View file

@ -628,7 +628,7 @@ func Test_buildConfiguration(t *testing.T) {
Scheme: "http",
},
},
HealthCheck: &dynamic.HealthCheck{
HealthCheck: &dynamic.ServerHealthCheck{
Scheme: "foobar",
Path: "foobar",
Port: 42,

View file

@ -470,7 +470,7 @@ func TestRuntimeConfiguration(t *testing.T) {
URL: "http://127.0.0.1:8086",
},
},
HealthCheck: &dynamic.HealthCheck{
HealthCheck: &dynamic.ServerHealthCheck{
Interval: "500ms",
Path: "/health",
},

View file

@ -94,8 +94,8 @@ func testShutdown(t *testing.T, router *tcp.Router) {
// We need to do a write on the conn before the shutdown to make it "exist".
// Because the connection indeed exists as far as TCP is concerned,
// but since we only pass it along to the HTTP server after at least one byte is peaked,
// the HTTP server (and hence its shutdown) does not know about the connection until that first byte peaking.
// but since we only pass it along to the HTTP server after at least one byte is peeked,
// the HTTP server (and hence its shutdown) does not know about the connection until that first byte peeked.
err = request.Write(conn)
require.NoError(t, err)

View file

@ -11,6 +11,8 @@ import (
"net/http"
"sync"
"github.com/traefik/traefik/v2/pkg/config/dynamic"
"github.com/traefik/traefik/v2/pkg/healthcheck"
"github.com/traefik/traefik/v2/pkg/log"
"github.com/traefik/traefik/v2/pkg/middlewares/accesslog"
"github.com/traefik/traefik/v2/pkg/safe"
@ -23,19 +25,21 @@ type Mirroring struct {
rw http.ResponseWriter
routinePool *safe.Pool
maxBodySize int64
maxBodySize int64
wantsHealthCheck bool
lock sync.RWMutex
total uint64
}
// New returns a new instance of *Mirroring.
func New(handler http.Handler, pool *safe.Pool, maxBodySize int64) *Mirroring {
func New(handler http.Handler, pool *safe.Pool, maxBodySize int64, hc *dynamic.HealthCheck) *Mirroring {
return &Mirroring{
routinePool: pool,
handler: handler,
rw: blackHoleResponseWriter{},
maxBodySize: maxBodySize,
routinePool: pool,
handler: handler,
rw: blackHoleResponseWriter{},
maxBodySize: maxBodySize,
wantsHealthCheck: hc != nil,
}
}
@ -134,6 +138,31 @@ func (m *Mirroring) AddMirror(handler http.Handler, percent int) error {
return nil
}
// RegisterStatusUpdater adds fn to the list of hooks that are run when the
// status of handler of the Mirroring changes.
// Not thread safe.
func (m *Mirroring) RegisterStatusUpdater(fn func(up bool)) error {
// Since the status propagation is completely transparent through the
// mirroring (because of the recursion on the underlying service), we could maybe
// skip that below, and even not add HealthCheck as a field of
// dynamic.Mirroring. But I think it's easier to understand for the user
// if the HealthCheck is required absolutely everywhere in the config.
if !m.wantsHealthCheck {
return errors.New("healthCheck not enabled in config for this mirroring service")
}
updater, ok := m.handler.(healthcheck.StatusUpdater)
if !ok {
return fmt.Errorf("service of mirroring %T not a healthcheck.StatusUpdater", m.handler)
}
if err := updater.RegisterStatusUpdater(fn); err != nil {
return fmt.Errorf("cannot register service of mirroring as updater: %w", err)
}
return nil
}
type blackHoleResponseWriter struct{}
func (b blackHoleResponseWriter) Flush() {}

View file

@ -21,7 +21,7 @@ func TestMirroringOn100(t *testing.T) {
rw.WriteHeader(http.StatusOK)
})
pool := safe.NewPool(context.Background())
mirror := New(handler, pool, defaultMaxBodySize)
mirror := New(handler, pool, defaultMaxBodySize, nil)
err := mirror.AddMirror(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
atomic.AddInt32(&countMirror1, 1)
}), 10)
@ -50,7 +50,7 @@ func TestMirroringOn10(t *testing.T) {
rw.WriteHeader(http.StatusOK)
})
pool := safe.NewPool(context.Background())
mirror := New(handler, pool, defaultMaxBodySize)
mirror := New(handler, pool, defaultMaxBodySize, nil)
err := mirror.AddMirror(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
atomic.AddInt32(&countMirror1, 1)
}), 10)
@ -74,7 +74,7 @@ func TestMirroringOn10(t *testing.T) {
}
func TestInvalidPercent(t *testing.T) {
mirror := New(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {}), safe.NewPool(context.Background()), defaultMaxBodySize)
mirror := New(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {}), safe.NewPool(context.Background()), defaultMaxBodySize, nil)
err := mirror.AddMirror(nil, -1)
assert.Error(t, err)
@ -93,7 +93,7 @@ func TestHijack(t *testing.T) {
rw.WriteHeader(http.StatusOK)
})
pool := safe.NewPool(context.Background())
mirror := New(handler, pool, defaultMaxBodySize)
mirror := New(handler, pool, defaultMaxBodySize, nil)
var mirrorRequest bool
err := mirror.AddMirror(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
@ -117,7 +117,7 @@ func TestFlush(t *testing.T) {
rw.WriteHeader(http.StatusOK)
})
pool := safe.NewPool(context.Background())
mirror := New(handler, pool, defaultMaxBodySize)
mirror := New(handler, pool, defaultMaxBodySize, nil)
var mirrorRequest bool
err := mirror.AddMirror(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
@ -154,7 +154,7 @@ func TestMirroringWithBody(t *testing.T) {
rw.WriteHeader(http.StatusOK)
})
mirror := New(handler, pool, defaultMaxBodySize)
mirror := New(handler, pool, defaultMaxBodySize, nil)
for i := 0; i < numMirrors; i++ {
err := mirror.AddMirror(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {

View file

@ -2,6 +2,7 @@ package wrr
import (
"container/heap"
"context"
"errors"
"fmt"
"net/http"
@ -30,16 +31,28 @@ type stickyCookie struct {
// Entries have deadlines set at currentDeadline + 1 / weight,
// providing weighted round robin behavior with floating point weights and an O(log n) pick time.
type Balancer struct {
stickyCookie *stickyCookie
stickyCookie *stickyCookie
wantsHealthCheck bool
mutex sync.RWMutex
handlers []*namedHandler
curDeadline float64
// status is a record of which child services of the Balancer are healthy, keyed
// by name of child service. A service is initially added to the map when it is
// created via AddService, and it is later removed or added to the map as needed,
// through the SetStatus method.
status map[string]struct{}
// updaters is the list of hooks that are run (to update the Balancer
// parent(s)), whenever the Balancer status changes.
updaters []func(bool)
}
// New creates a new load balancer.
func New(sticky *dynamic.Sticky) *Balancer {
balancer := &Balancer{}
func New(sticky *dynamic.Sticky, hc *dynamic.HealthCheck) *Balancer {
balancer := &Balancer{
status: make(map[string]struct{}),
wantsHealthCheck: hc != nil,
}
if sticky != nil && sticky.Cookie != nil {
balancer.stickyCookie = &stickyCookie{
name: sticky.Cookie.Name,
@ -81,6 +94,58 @@ func (b *Balancer) Pop() interface{} {
return h
}
// SetStatus sets on the balancer that its given child is now of the given
// status. balancerName is only needed for logging purposes.
func (b *Balancer) SetStatus(ctx context.Context, childName string, up bool) {
b.mutex.Lock()
defer b.mutex.Unlock()
upBefore := len(b.status) > 0
status := "DOWN"
if up {
status = "UP"
}
log.FromContext(ctx).Debugf("Setting status of %s to %v", childName, status)
if up {
b.status[childName] = struct{}{}
} else {
delete(b.status, childName)
}
upAfter := len(b.status) > 0
status = "DOWN"
if upAfter {
status = "UP"
}
// No Status Change
if upBefore == upAfter {
// We're still with the same status, no need to propagate
log.FromContext(ctx).Debugf("Still %s, no need to propagate", status)
return
}
// Status Change
log.FromContext(ctx).Debugf("Propagating new %s status", status)
for _, fn := range b.updaters {
fn(upAfter)
}
}
// RegisterStatusUpdater adds fn to the list of hooks that are run when the
// status of the Balancer changes.
// Not thread safe.
func (b *Balancer) RegisterStatusUpdater(fn func(up bool)) error {
if !b.wantsHealthCheck {
return errors.New("healthCheck not enabled in config for this weighted service")
}
b.updaters = append(b.updaters, fn)
return nil
}
var errNoAvailableServer = errors.New("no available server")
func (b *Balancer) nextServer() (*namedHandler, error) {
b.mutex.Lock()
defer b.mutex.Unlock()
@ -88,15 +153,24 @@ func (b *Balancer) nextServer() (*namedHandler, error) {
if len(b.handlers) == 0 {
return nil, fmt.Errorf("no servers in the pool")
}
if len(b.status) == 0 {
return nil, errNoAvailableServer
}
// Pick handler with closest deadline.
handler := heap.Pop(b).(*namedHandler)
var handler *namedHandler
for {
// Pick handler with closest deadline.
handler = heap.Pop(b).(*namedHandler)
// curDeadline should be handler's deadline so that new added entry would have a fair competition environment with the old ones.
b.curDeadline = handler.deadline
handler.deadline += 1 / handler.weight
// curDeadline should be handler's deadline so that new added entry would have a fair competition environment with the old ones.
b.curDeadline = handler.deadline
handler.deadline += 1 / handler.weight
heap.Push(b, handler)
heap.Push(b, handler)
if _, ok := b.status[handler.name]; ok {
break
}
}
log.WithoutContext().Debugf("Service selected by WRR: %s", handler.name)
return handler, nil
@ -112,17 +186,32 @@ func (b *Balancer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
if err == nil && cookie != nil {
for _, handler := range b.handlers {
if handler.name == cookie.Value {
handler.ServeHTTP(w, req)
return
if handler.name != cookie.Value {
continue
}
b.mutex.RLock()
_, ok := b.status[handler.name]
b.mutex.RUnlock()
if !ok {
// because we already are in the only iteration that matches the cookie, so none
// of the following iterations are going to be a match for the cookie anyway.
break
}
handler.ServeHTTP(w, req)
return
}
}
}
server, err := b.nextServer()
if err != nil {
http.Error(w, http.StatusText(http.StatusInternalServerError)+err.Error(), http.StatusInternalServerError)
if errors.Is(err, errNoAvailableServer) {
http.Error(w, errNoAvailableServer.Error(), http.StatusServiceUnavailable)
} else {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
return
}
@ -135,7 +224,6 @@ func (b *Balancer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
// AddService adds a handler.
// It is not thread safe with ServeHTTP.
// A handler with a non-positive weight is ignored.
func (b *Balancer) AddService(name string, handler http.Handler, weight *int) {
w := 1
@ -148,10 +236,9 @@ func (b *Balancer) AddService(name string, handler http.Handler, weight *int) {
h := &namedHandler{Handler: handler, name: name, weight: float64(w)}
// use RWLock to protect b.curDeadline
b.mutex.RLock()
b.mutex.Lock()
h.deadline = b.curDeadline + 1/h.weight
b.mutex.RUnlock()
heap.Push(b, h)
b.status[name] = struct{}{}
b.mutex.Unlock()
}

View file

@ -1,6 +1,7 @@
package wrr
import (
"context"
"net/http"
"net/http/httptest"
"testing"
@ -15,16 +16,18 @@ type responseRecorder struct {
*httptest.ResponseRecorder
save map[string]int
sequence []string
status []int
}
func (r *responseRecorder) WriteHeader(statusCode int) {
r.save[r.Header().Get("server")]++
r.sequence = append(r.sequence, r.Header().Get("server"))
r.status = append(r.status, statusCode)
r.ResponseRecorder.WriteHeader(statusCode)
}
func TestBalancer(t *testing.T) {
balancer := New(nil)
balancer := New(nil, nil)
balancer.AddService("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "first")
@ -46,7 +49,7 @@ func TestBalancer(t *testing.T) {
}
func TestBalancerNoService(t *testing.T) {
balancer := New(nil)
balancer := New(nil, nil)
recorder := httptest.NewRecorder()
balancer.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil))
@ -55,7 +58,7 @@ func TestBalancerNoService(t *testing.T) {
}
func TestBalancerOneServerZeroWeight(t *testing.T) {
balancer := New(nil)
balancer := New(nil, nil)
balancer.AddService("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "first")
@ -72,8 +75,155 @@ func TestBalancerOneServerZeroWeight(t *testing.T) {
assert.Equal(t, 3, recorder.save["first"])
}
type key string
const serviceName key = "serviceName"
func TestBalancerNoServiceUp(t *testing.T) {
balancer := New(nil, nil)
balancer.AddService("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.WriteHeader(http.StatusInternalServerError)
}), Int(1))
balancer.AddService("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.WriteHeader(http.StatusInternalServerError)
}), Int(1))
balancer.SetStatus(context.WithValue(context.Background(), serviceName, "parent"), "first", false)
balancer.SetStatus(context.WithValue(context.Background(), serviceName, "parent"), "second", false)
recorder := httptest.NewRecorder()
balancer.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil))
assert.Equal(t, http.StatusServiceUnavailable, recorder.Result().StatusCode)
}
func TestBalancerOneServerDown(t *testing.T) {
balancer := New(nil, nil)
balancer.AddService("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "first")
rw.WriteHeader(http.StatusOK)
}), Int(1))
balancer.AddService("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.WriteHeader(http.StatusInternalServerError)
}), Int(1))
balancer.SetStatus(context.WithValue(context.Background(), serviceName, "parent"), "second", false)
recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}}
for i := 0; i < 3; i++ {
balancer.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil))
}
assert.Equal(t, 3, recorder.save["first"])
}
func TestBalancerDownThenUp(t *testing.T) {
balancer := New(nil, nil)
balancer.AddService("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "first")
rw.WriteHeader(http.StatusOK)
}), Int(1))
balancer.AddService("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "second")
rw.WriteHeader(http.StatusOK)
}), Int(1))
balancer.SetStatus(context.WithValue(context.Background(), serviceName, "parent"), "second", false)
recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}}
for i := 0; i < 3; i++ {
balancer.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil))
}
assert.Equal(t, 3, recorder.save["first"])
balancer.SetStatus(context.WithValue(context.Background(), serviceName, "parent"), "second", true)
recorder = &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}}
for i := 0; i < 2; i++ {
balancer.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil))
}
assert.Equal(t, 1, recorder.save["first"])
assert.Equal(t, 1, recorder.save["second"])
}
func TestBalancerPropagate(t *testing.T) {
balancer1 := New(nil, &dynamic.HealthCheck{})
balancer1.AddService("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "first")
rw.WriteHeader(http.StatusOK)
}), Int(1))
balancer1.AddService("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "second")
rw.WriteHeader(http.StatusOK)
}), Int(1))
balancer2 := New(nil, &dynamic.HealthCheck{})
balancer2.AddService("third", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "third")
rw.WriteHeader(http.StatusOK)
}), Int(1))
balancer2.AddService("fourth", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "fourth")
rw.WriteHeader(http.StatusOK)
}), Int(1))
topBalancer := New(nil, &dynamic.HealthCheck{})
topBalancer.AddService("balancer1", balancer1, Int(1))
_ = balancer1.RegisterStatusUpdater(func(up bool) {
topBalancer.SetStatus(context.WithValue(context.Background(), serviceName, "top"), "balancer1", up)
// TODO(mpl): if test gets flaky, add channel or something here to signal that
// propagation is done, and wait on it before sending request.
})
topBalancer.AddService("balancer2", balancer2, Int(1))
_ = balancer2.RegisterStatusUpdater(func(up bool) {
topBalancer.SetStatus(context.WithValue(context.Background(), serviceName, "top"), "balancer2", up)
})
recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}}
for i := 0; i < 8; i++ {
topBalancer.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil))
}
assert.Equal(t, 2, recorder.save["first"])
assert.Equal(t, 2, recorder.save["second"])
assert.Equal(t, 2, recorder.save["third"])
assert.Equal(t, 2, recorder.save["fourth"])
wantStatus := []int{200, 200, 200, 200, 200, 200, 200, 200}
assert.Equal(t, wantStatus, recorder.status)
// fourth gets downed, but balancer2 still up since third is still up.
balancer2.SetStatus(context.WithValue(context.Background(), serviceName, "top"), "fourth", false)
recorder = &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}}
for i := 0; i < 8; i++ {
topBalancer.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil))
}
assert.Equal(t, 2, recorder.save["first"])
assert.Equal(t, 2, recorder.save["second"])
assert.Equal(t, 4, recorder.save["third"])
assert.Equal(t, 0, recorder.save["fourth"])
wantStatus = []int{200, 200, 200, 200, 200, 200, 200, 200}
assert.Equal(t, wantStatus, recorder.status)
// third gets downed, and the propagation triggers balancer2 to be marked as
// down as well for topBalancer.
balancer2.SetStatus(context.WithValue(context.Background(), serviceName, "top"), "third", false)
recorder = &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}}
for i := 0; i < 8; i++ {
topBalancer.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil))
}
assert.Equal(t, 4, recorder.save["first"])
assert.Equal(t, 4, recorder.save["second"])
assert.Equal(t, 0, recorder.save["third"])
assert.Equal(t, 0, recorder.save["fourth"])
wantStatus = []int{200, 200, 200, 200, 200, 200, 200, 200}
assert.Equal(t, wantStatus, recorder.status)
}
func TestBalancerAllServersZeroWeight(t *testing.T) {
balancer := New(nil)
balancer := New(nil, nil)
balancer.AddService("test", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {}), Int(0))
balancer.AddService("test2", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {}), Int(0))
@ -87,7 +237,7 @@ func TestBalancerAllServersZeroWeight(t *testing.T) {
func TestSticky(t *testing.T) {
balancer := New(&dynamic.Sticky{
Cookie: &dynamic.Cookie{Name: "test"},
})
}, nil)
balancer.AddService("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "first")
@ -118,7 +268,7 @@ func TestSticky(t *testing.T) {
// TestBalancerBias makes sure that the WRR algorithm spreads elements evenly right from the start,
// and that it does not "over-favor" the high-weighted ones with a biased start-up regime.
func TestBalancerBias(t *testing.T) {
balancer := New(nil)
balancer := New(nil, nil)
balancer.AddService("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "A")

View file

@ -135,7 +135,7 @@ func (m *Manager) getMirrorServiceHandler(ctx context.Context, config *dynamic.M
if config.MaxBodySize != nil {
maxBodySize = *config.MaxBodySize
}
handler := mirror.New(serviceHandler, m.routinePool, maxBodySize)
handler := mirror.New(serviceHandler, m.routinePool, maxBodySize, config.HealthCheck)
for _, mirrorConfig := range config.Mirrors {
mirrorHandler, err := m.BuildHTTP(ctx, mirrorConfig.Name)
if err != nil {
@ -156,7 +156,7 @@ func (m *Manager) getWRRServiceHandler(ctx context.Context, serviceName string,
config.Sticky.Cookie.Name = cookie.GetName(config.Sticky.Cookie.Name, serviceName)
}
balancer := wrr.New(config.Sticky)
balancer := wrr.New(config.Sticky, config.HealthCheck)
for _, service := range config.Services {
serviceHandler, err := m.BuildHTTP(ctx, service.Name)
if err != nil {
@ -164,7 +164,25 @@ func (m *Manager) getWRRServiceHandler(ctx context.Context, serviceName string,
}
balancer.AddService(service.Name, serviceHandler, service.Weight)
if config.HealthCheck == nil {
continue
}
childName := service.Name
updater, ok := serviceHandler.(healthcheck.StatusUpdater)
if !ok {
return nil, fmt.Errorf("child service %v of %v not a healthcheck.StatusUpdater (%T)", childName, serviceName, serviceHandler)
}
if err := updater.RegisterStatusUpdater(func(up bool) {
balancer.SetStatus(ctx, childName, up)
}); err != nil {
return nil, fmt.Errorf("cannot register %v as updater for %v: %w", childName, serviceName, err)
}
log.FromContext(ctx).Debugf("Child service %v will update parent %v on status change", childName, serviceName)
}
return balancer, nil
}
@ -213,34 +231,30 @@ func (m *Manager) getLoadBalancerServiceHandler(ctx context.Context, serviceName
return emptybackendhandler.New(balancer), nil
}
// LaunchHealthCheck Launches the health checks.
// LaunchHealthCheck launches the health checks.
func (m *Manager) LaunchHealthCheck() {
backendConfigs := make(map[string]*healthcheck.BackendConfig)
for serviceName, balancers := range m.balancers {
ctx := log.With(context.Background(), log.Str(log.ServiceName, serviceName))
// TODO Should all the services handle healthcheck? Handle different types
service := m.configs[serviceName].LoadBalancer
// Health Check
var backendHealthCheck *healthcheck.BackendConfig
if hcOpts := buildHealthCheckOptions(ctx, balancers, serviceName, service.HealthCheck); hcOpts != nil {
log.FromContext(ctx).Debugf("Setting up healthcheck for service %s with %s", serviceName, *hcOpts)
hcOpts.Transport, _ = m.roundTripperManager.Get(service.ServersTransport)
backendHealthCheck = healthcheck.NewBackendConfig(*hcOpts, serviceName)
hcOpts := buildHealthCheckOptions(ctx, balancers, serviceName, service.HealthCheck)
if hcOpts == nil {
continue
}
hcOpts.Transport, _ = m.roundTripperManager.Get(service.ServersTransport)
log.FromContext(ctx).Debugf("Setting up healthcheck for service %s with %s", serviceName, *hcOpts)
if backendHealthCheck != nil {
backendConfigs[serviceName] = backendHealthCheck
}
backendConfigs[serviceName] = healthcheck.NewBackendConfig(*hcOpts, serviceName)
}
healthcheck.GetHealthCheck(m.metricsRegistry).SetBackendsConfiguration(context.Background(), backendConfigs)
}
func buildHealthCheckOptions(ctx context.Context, lb healthcheck.Balancer, backend string, hc *dynamic.HealthCheck) *healthcheck.Options {
func buildHealthCheckOptions(ctx context.Context, lb healthcheck.Balancer, backend string, hc *dynamic.ServerHealthCheck) *healthcheck.Options {
if hc == nil || hc.Path == "" {
return nil
}
@ -295,7 +309,7 @@ func buildHealthCheckOptions(ctx context.Context, lb healthcheck.Balancer, backe
}
}
func (m *Manager) getLoadBalancer(ctx context.Context, serviceName string, service *dynamic.ServersLoadBalancer, fwd http.Handler) (healthcheck.BalancerHandler, error) {
func (m *Manager) getLoadBalancer(ctx context.Context, serviceName string, service *dynamic.ServersLoadBalancer, fwd http.Handler) (healthcheck.BalancerStatusHandler, error) {
logger := log.FromContext(ctx)
logger.Debug("Creating load-balancer")
@ -327,7 +341,7 @@ func (m *Manager) getLoadBalancer(ctx context.Context, serviceName string, servi
return nil, err
}
lbsu := healthcheck.NewLBStatusUpdater(lb, m.configs[serviceName])
lbsu := healthcheck.NewLBStatusUpdater(lb, m.configs[serviceName], service.HealthCheck)
if err := m.upsertServers(ctx, lbsu, service.Servers); err != nil {
return nil, fmt.Errorf("error configuring load balancer for service %s: %w", serviceName, err)
}

View file

@ -241,7 +241,7 @@ func TestGetLoadBalancerServiceHandler(t *testing.T) {
},
},
{
desc: "PassHost doesn't passe the host instead of the IP",
desc: "PassHost doesn't pass the host instead of the IP",
serviceName: "test",
service: &dynamic.ServersLoadBalancer{
PassHostHeader: Bool(false),
@ -406,5 +406,3 @@ func TestMultipleTypeOnBuildHTTP(t *testing.T) {
_, err := manager.BuildHTTP(context.Background(), "test@file")
assert.Error(t, err, "cannot create service: multi-types service not supported, consider declaring two different pieces of service instead")
}
// FIXME Add healthcheck tests