Add Failover service
Co-authored-by: Kevin Pollet <pollet.kevin@gmail.com>
This commit is contained in:
parent
6622027c7c
commit
79aab5aab8
12 changed files with 583 additions and 3 deletions
140
pkg/server/service/loadbalancer/failover/failover.go
Normal file
140
pkg/server/service/loadbalancer/failover/failover.go
Normal file
|
@ -0,0 +1,140 @@
|
|||
package failover
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
"github.com/traefik/traefik/v2/pkg/config/dynamic"
|
||||
"github.com/traefik/traefik/v2/pkg/log"
|
||||
)
|
||||
|
||||
// Failover is an http.Handler that can forward requests to the fallback handler
|
||||
// when the main handler status is down.
|
||||
type Failover struct {
|
||||
wantsHealthCheck bool
|
||||
handler http.Handler
|
||||
fallbackHandler http.Handler
|
||||
// updaters is the list of hooks that are run (to update the Failover
|
||||
// parent(s)), whenever the Failover status changes.
|
||||
updaters []func(bool)
|
||||
|
||||
handlerStatusMu sync.RWMutex
|
||||
handlerStatus bool
|
||||
|
||||
fallbackStatusMu sync.RWMutex
|
||||
fallbackStatus bool
|
||||
}
|
||||
|
||||
// New creates a new Failover handler.
|
||||
func New(hc *dynamic.HealthCheck) *Failover {
|
||||
return &Failover{
|
||||
wantsHealthCheck: hc != nil,
|
||||
}
|
||||
}
|
||||
|
||||
// RegisterStatusUpdater adds fn to the list of hooks that are run when the
|
||||
// status of the Failover changes.
|
||||
// Not thread safe.
|
||||
func (f *Failover) RegisterStatusUpdater(fn func(up bool)) error {
|
||||
if !f.wantsHealthCheck {
|
||||
return errors.New("healthCheck not enabled in config for this failover service")
|
||||
}
|
||||
|
||||
f.updaters = append(f.updaters, fn)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *Failover) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
f.handlerStatusMu.RLock()
|
||||
handlerStatus := f.handlerStatus
|
||||
f.handlerStatusMu.RUnlock()
|
||||
|
||||
if handlerStatus {
|
||||
f.handler.ServeHTTP(w, req)
|
||||
return
|
||||
}
|
||||
|
||||
f.fallbackStatusMu.RLock()
|
||||
fallbackStatus := f.fallbackStatus
|
||||
f.fallbackStatusMu.RUnlock()
|
||||
|
||||
if fallbackStatus {
|
||||
f.fallbackHandler.ServeHTTP(w, req)
|
||||
return
|
||||
}
|
||||
|
||||
http.Error(w, http.StatusText(http.StatusServiceUnavailable), http.StatusServiceUnavailable)
|
||||
}
|
||||
|
||||
// SetHandler sets the main http.Handler.
|
||||
func (f *Failover) SetHandler(handler http.Handler) {
|
||||
f.handlerStatusMu.Lock()
|
||||
defer f.handlerStatusMu.Unlock()
|
||||
|
||||
f.handler = handler
|
||||
f.handlerStatus = true
|
||||
}
|
||||
|
||||
// SetHandlerStatus sets the main handler status.
|
||||
func (f *Failover) SetHandlerStatus(ctx context.Context, up bool) {
|
||||
f.handlerStatusMu.Lock()
|
||||
defer f.handlerStatusMu.Unlock()
|
||||
|
||||
status := "DOWN"
|
||||
if up {
|
||||
status = "UP"
|
||||
}
|
||||
|
||||
if up == f.handlerStatus {
|
||||
// We're still with the same status, no need to propagate.
|
||||
log.FromContext(ctx).Debugf("Still %s, no need to propagate", status)
|
||||
return
|
||||
}
|
||||
|
||||
log.FromContext(ctx).Debugf("Propagating new %s status", status)
|
||||
f.handlerStatus = up
|
||||
|
||||
for _, fn := range f.updaters {
|
||||
// Failover service status is set to DOWN
|
||||
// when main and fallback handlers have a DOWN status.
|
||||
fn(f.handlerStatus || f.fallbackStatus)
|
||||
}
|
||||
}
|
||||
|
||||
// SetFallbackHandler sets the fallback http.Handler.
|
||||
func (f *Failover) SetFallbackHandler(handler http.Handler) {
|
||||
f.fallbackStatusMu.Lock()
|
||||
defer f.fallbackStatusMu.Unlock()
|
||||
|
||||
f.fallbackHandler = handler
|
||||
f.fallbackStatus = true
|
||||
}
|
||||
|
||||
// SetFallbackHandlerStatus sets the fallback handler status.
|
||||
func (f *Failover) SetFallbackHandlerStatus(ctx context.Context, up bool) {
|
||||
f.fallbackStatusMu.Lock()
|
||||
defer f.fallbackStatusMu.Unlock()
|
||||
|
||||
status := "DOWN"
|
||||
if up {
|
||||
status = "UP"
|
||||
}
|
||||
|
||||
if up == f.fallbackStatus {
|
||||
// We're still with the same status, no need to propagate.
|
||||
log.FromContext(ctx).Debugf("Still %s, no need to propagate", status)
|
||||
return
|
||||
}
|
||||
|
||||
log.FromContext(ctx).Debugf("Propagating new %s status", status)
|
||||
f.fallbackStatus = up
|
||||
|
||||
for _, fn := range f.updaters {
|
||||
// Failover service status is set to DOWN
|
||||
// when main and fallback handlers have a DOWN status.
|
||||
fn(f.handlerStatus || f.fallbackStatus)
|
||||
}
|
||||
}
|
163
pkg/server/service/loadbalancer/failover/failover_test.go
Normal file
163
pkg/server/service/loadbalancer/failover/failover_test.go
Normal file
|
@ -0,0 +1,163 @@
|
|||
package failover
|
||||
|
||||
import (
|
||||
"context"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/traefik/traefik/v2/pkg/config/dynamic"
|
||||
)
|
||||
|
||||
type responseRecorder struct {
|
||||
*httptest.ResponseRecorder
|
||||
save map[string]int
|
||||
sequence []string
|
||||
status []int
|
||||
}
|
||||
|
||||
func (r *responseRecorder) WriteHeader(statusCode int) {
|
||||
r.save[r.Header().Get("server")]++
|
||||
r.sequence = append(r.sequence, r.Header().Get("server"))
|
||||
r.status = append(r.status, statusCode)
|
||||
r.ResponseRecorder.WriteHeader(statusCode)
|
||||
}
|
||||
|
||||
func TestFailover(t *testing.T) {
|
||||
failover := New(&dynamic.HealthCheck{})
|
||||
|
||||
status := true
|
||||
require.NoError(t, failover.RegisterStatusUpdater(func(up bool) {
|
||||
status = up
|
||||
}))
|
||||
|
||||
failover.SetHandler(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||
rw.Header().Set("server", "handler")
|
||||
rw.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
|
||||
failover.SetFallbackHandler(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||
rw.Header().Set("server", "fallback")
|
||||
rw.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
|
||||
recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}}
|
||||
failover.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil))
|
||||
|
||||
assert.Equal(t, 1, recorder.save["handler"])
|
||||
assert.Equal(t, 0, recorder.save["fallback"])
|
||||
assert.Equal(t, []int{200}, recorder.status)
|
||||
assert.True(t, status)
|
||||
|
||||
failover.SetHandlerStatus(context.Background(), false)
|
||||
|
||||
recorder = &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}}
|
||||
failover.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil))
|
||||
|
||||
assert.Equal(t, 0, recorder.save["handler"])
|
||||
assert.Equal(t, 1, recorder.save["fallback"])
|
||||
assert.Equal(t, []int{200}, recorder.status)
|
||||
assert.True(t, status)
|
||||
|
||||
failover.SetFallbackHandlerStatus(context.Background(), false)
|
||||
|
||||
recorder = &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}}
|
||||
failover.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil))
|
||||
|
||||
assert.Equal(t, 0, recorder.save["handler"])
|
||||
assert.Equal(t, 0, recorder.save["fallback"])
|
||||
assert.Equal(t, []int{503}, recorder.status)
|
||||
assert.False(t, status)
|
||||
}
|
||||
|
||||
func TestFailoverDownThenUp(t *testing.T) {
|
||||
failover := New(nil)
|
||||
|
||||
failover.SetHandler(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||
rw.Header().Set("server", "handler")
|
||||
rw.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
|
||||
failover.SetFallbackHandler(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||
rw.Header().Set("server", "fallback")
|
||||
rw.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
|
||||
recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}}
|
||||
failover.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil))
|
||||
|
||||
assert.Equal(t, 1, recorder.save["handler"])
|
||||
assert.Equal(t, 0, recorder.save["fallback"])
|
||||
assert.Equal(t, []int{200}, recorder.status)
|
||||
|
||||
failover.SetHandlerStatus(context.Background(), false)
|
||||
|
||||
recorder = &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}}
|
||||
failover.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil))
|
||||
|
||||
assert.Equal(t, 0, recorder.save["handler"])
|
||||
assert.Equal(t, 1, recorder.save["fallback"])
|
||||
assert.Equal(t, []int{200}, recorder.status)
|
||||
|
||||
failover.SetHandlerStatus(context.Background(), true)
|
||||
|
||||
recorder = &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}}
|
||||
failover.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil))
|
||||
|
||||
assert.Equal(t, 1, recorder.save["handler"])
|
||||
assert.Equal(t, 0, recorder.save["fallback"])
|
||||
assert.Equal(t, []int{200}, recorder.status)
|
||||
}
|
||||
|
||||
func TestFailoverPropagate(t *testing.T) {
|
||||
failover := New(&dynamic.HealthCheck{})
|
||||
failover.SetHandler(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||
rw.Header().Set("server", "handler")
|
||||
rw.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
failover.SetFallbackHandler(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||
rw.Header().Set("server", "fallback")
|
||||
rw.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
|
||||
topFailover := New(nil)
|
||||
topFailover.SetHandler(failover)
|
||||
topFailover.SetFallbackHandler(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||
rw.Header().Set("server", "topFailover")
|
||||
rw.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
err := failover.RegisterStatusUpdater(func(up bool) {
|
||||
topFailover.SetHandlerStatus(context.Background(), up)
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}}
|
||||
topFailover.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil))
|
||||
|
||||
assert.Equal(t, 1, recorder.save["handler"])
|
||||
assert.Equal(t, 0, recorder.save["fallback"])
|
||||
assert.Equal(t, 0, recorder.save["topFailover"])
|
||||
assert.Equal(t, []int{200}, recorder.status)
|
||||
|
||||
failover.SetHandlerStatus(context.Background(), false)
|
||||
|
||||
recorder = &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}}
|
||||
topFailover.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil))
|
||||
|
||||
assert.Equal(t, 0, recorder.save["handler"])
|
||||
assert.Equal(t, 1, recorder.save["fallback"])
|
||||
assert.Equal(t, 0, recorder.save["topFailover"])
|
||||
assert.Equal(t, []int{200}, recorder.status)
|
||||
|
||||
failover.SetFallbackHandlerStatus(context.Background(), false)
|
||||
|
||||
recorder = &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}}
|
||||
topFailover.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil))
|
||||
|
||||
assert.Equal(t, 0, recorder.save["handler"])
|
||||
assert.Equal(t, 0, recorder.save["fallback"])
|
||||
assert.Equal(t, 1, recorder.save["topFailover"])
|
||||
assert.Equal(t, []int{200}, recorder.status)
|
||||
}
|
|
@ -29,7 +29,7 @@ type stickyCookie struct {
|
|||
// (https://en.wikipedia.org/wiki/Earliest_deadline_first_scheduling)
|
||||
// Each pick from the schedule has the earliest deadline entry selected.
|
||||
// Entries have deadlines set at currentDeadline + 1 / weight,
|
||||
// providing weighted round robin behavior with floating point weights and an O(log n) pick time.
|
||||
// providing weighted round-robin behavior with floating point weights and an O(log n) pick time.
|
||||
type Balancer struct {
|
||||
stickyCookie *stickyCookie
|
||||
wantsHealthCheck bool
|
||||
|
@ -230,6 +230,7 @@ func (b *Balancer) AddService(name string, handler http.Handler, weight *int) {
|
|||
if weight != nil {
|
||||
w = *weight
|
||||
}
|
||||
|
||||
if w <= 0 { // non-positive weight is meaningless
|
||||
return
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
"github.com/traefik/traefik/v2/pkg/safe"
|
||||
"github.com/traefik/traefik/v2/pkg/server/cookie"
|
||||
"github.com/traefik/traefik/v2/pkg/server/provider"
|
||||
"github.com/traefik/traefik/v2/pkg/server/service/loadbalancer/failover"
|
||||
"github.com/traefik/traefik/v2/pkg/server/service/loadbalancer/mirror"
|
||||
"github.com/traefik/traefik/v2/pkg/server/service/loadbalancer/wrr"
|
||||
"github.com/vulcand/oxy/roundrobin"
|
||||
|
@ -116,6 +117,13 @@ func (m *Manager) BuildHTTP(rootCtx context.Context, serviceName string) (http.H
|
|||
conf.AddError(err, true)
|
||||
return nil, err
|
||||
}
|
||||
case conf.Failover != nil:
|
||||
var err error
|
||||
lb, err = m.getFailoverServiceHandler(ctx, serviceName, conf.Failover)
|
||||
if err != nil {
|
||||
conf.AddError(err, true)
|
||||
return nil, err
|
||||
}
|
||||
default:
|
||||
sErr := fmt.Errorf("the service %q does not have any type defined", serviceName)
|
||||
conf.AddError(sErr, true)
|
||||
|
@ -125,6 +133,53 @@ func (m *Manager) BuildHTTP(rootCtx context.Context, serviceName string) (http.H
|
|||
return lb, nil
|
||||
}
|
||||
|
||||
func (m *Manager) getFailoverServiceHandler(ctx context.Context, serviceName string, config *dynamic.Failover) (http.Handler, error) {
|
||||
f := failover.New(config.HealthCheck)
|
||||
|
||||
serviceHandler, err := m.BuildHTTP(ctx, config.Service)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
f.SetHandler(serviceHandler)
|
||||
|
||||
updater, ok := serviceHandler.(healthcheck.StatusUpdater)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("child service %v of %v not a healthcheck.StatusUpdater (%T)", config.Service, serviceName, serviceHandler)
|
||||
}
|
||||
|
||||
if err := updater.RegisterStatusUpdater(func(up bool) {
|
||||
f.SetHandlerStatus(ctx, up)
|
||||
}); err != nil {
|
||||
return nil, fmt.Errorf("cannot register %v as updater for %v: %w", config.Service, serviceName, err)
|
||||
}
|
||||
|
||||
fallbackHandler, err := m.BuildHTTP(ctx, config.Fallback)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
f.SetFallbackHandler(fallbackHandler)
|
||||
|
||||
// Do not report the health of the fallback handler.
|
||||
if config.HealthCheck == nil {
|
||||
return f, nil
|
||||
}
|
||||
|
||||
fallbackUpdater, ok := fallbackHandler.(healthcheck.StatusUpdater)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("child service %v of %v not a healthcheck.StatusUpdater (%T)", config.Fallback, serviceName, fallbackHandler)
|
||||
}
|
||||
|
||||
if err := fallbackUpdater.RegisterStatusUpdater(func(up bool) {
|
||||
f.SetFallbackHandlerStatus(ctx, up)
|
||||
}); err != nil {
|
||||
return nil, fmt.Errorf("cannot register %v as updater for %v: %w", config.Fallback, serviceName, err)
|
||||
}
|
||||
|
||||
return f, nil
|
||||
}
|
||||
|
||||
func (m *Manager) getMirrorServiceHandler(ctx context.Context, config *dynamic.Mirroring) (http.Handler, error) {
|
||||
serviceHandler, err := m.BuildHTTP(ctx, config.Service)
|
||||
if err != nil {
|
||||
|
@ -164,6 +219,7 @@ func (m *Manager) getWRRServiceHandler(ctx context.Context, serviceName string,
|
|||
}
|
||||
|
||||
balancer.AddService(service.Name, serviceHandler, service.Weight)
|
||||
|
||||
if config.HealthCheck == nil {
|
||||
continue
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue