1
0
Fork 0

Add p2c load-balancing strategy for servers load-balancer

Co-authored-by: Ian Ross <ifross@gmail.com>
Co-authored-by: Kevin Pollet <pollet.kevin@gmail.com>
This commit is contained in:
Romain 2025-03-10 12:12:04 +01:00 committed by GitHub
parent 550d96ea67
commit 9e029a84c4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
50 changed files with 1621 additions and 382 deletions

View file

@ -0,0 +1,227 @@
package p2c
import (
"context"
"errors"
"math/rand"
"net/http"
"sync"
"sync/atomic"
"time"
"github.com/rs/zerolog/log"
"github.com/traefik/traefik/v3/pkg/config/dynamic"
"github.com/traefik/traefik/v3/pkg/server/service/loadbalancer"
)
type namedHandler struct {
http.Handler
// name is the handler name.
name string
// inflight is the number of inflight requests.
// It is used to implement the "power-of-two-random-choices" algorithm.
inflight atomic.Int64
}
func (h *namedHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
h.inflight.Add(1)
defer h.inflight.Add(-1)
h.Handler.ServeHTTP(rw, req)
}
type rnd interface {
Intn(n int) int
}
// Balancer implements the power-of-two-random-choices algorithm for load balancing.
// The idea is to randomly select two of the available backends and choose the one with the fewest in-flight requests.
// This algorithm balances the load more effectively than a round-robin approach, while maintaining a constant time for the selection:
// The strategy also has more advantageous "herd" behavior than the "fewest connections" algorithm, especially when the load balancer
// doesn't have perfect knowledge of the global number of connections to the backend, for example, when running in a distributed fashion.
type Balancer struct {
wantsHealthCheck bool
handlersMu sync.RWMutex
handlers []*namedHandler
// status is a record of which child services of the Balancer are healthy, keyed
// by name of child service. A service is initially added to the map when it is
// created via Add, and it is later removed or added to the map as needed,
// through the SetStatus method.
status map[string]struct{}
// updaters is the list of hooks that are run (to update the Balancer
// parent(s)), whenever the Balancer status changes.
updaters []func(bool)
// fenced is the list of terminating yet still serving child services.
fenced map[string]struct{}
sticky *loadbalancer.Sticky
rand rnd
}
// New creates a new power-of-two-random-choices load balancer.
func New(stickyConfig *dynamic.Sticky, wantsHealthCheck bool) *Balancer {
balancer := &Balancer{
status: make(map[string]struct{}),
fenced: make(map[string]struct{}),
wantsHealthCheck: wantsHealthCheck,
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
}
if stickyConfig != nil && stickyConfig.Cookie != nil {
balancer.sticky = loadbalancer.NewSticky(*stickyConfig.Cookie)
}
return balancer
}
// SetStatus sets on the balancer that its given child is now of the given
// status. balancerName is only needed for logging purposes.
func (b *Balancer) SetStatus(ctx context.Context, childName string, up bool) {
b.handlersMu.Lock()
defer b.handlersMu.Unlock()
upBefore := len(b.status) > 0
status := "DOWN"
if up {
status = "UP"
}
log.Ctx(ctx).Debug().Msgf("Setting status of %s to %v", childName, status)
if up {
b.status[childName] = struct{}{}
} else {
delete(b.status, childName)
}
upAfter := len(b.status) > 0
status = "DOWN"
if upAfter {
status = "UP"
}
// No Status Change
if upBefore == upAfter {
// We're still with the same status, no need to propagate
log.Ctx(ctx).Debug().Msgf("Still %s, no need to propagate", status)
return
}
// Status Change
log.Ctx(ctx).Debug().Msgf("Propagating new %s status", status)
for _, fn := range b.updaters {
fn(upAfter)
}
}
// RegisterStatusUpdater adds fn to the list of hooks that are run when the
// status of the Balancer changes.
// Not thread safe.
func (b *Balancer) RegisterStatusUpdater(fn func(up bool)) error {
if !b.wantsHealthCheck {
return errors.New("healthCheck not enabled in config for this weighted service")
}
b.updaters = append(b.updaters, fn)
return nil
}
var errNoAvailableServer = errors.New("no available server")
func (b *Balancer) nextServer() (*namedHandler, error) {
// We kept the same representation (map) as in the WRR strategy to improve maintainability.
// However, with the P2C strategy, we only need a slice of healthy servers.
b.handlersMu.RLock()
var healthy []*namedHandler
for _, h := range b.handlers {
if _, ok := b.status[h.name]; ok {
if _, fenced := b.fenced[h.name]; !fenced {
healthy = append(healthy, h)
}
}
}
b.handlersMu.RUnlock()
if len(healthy) == 0 {
return nil, errNoAvailableServer
}
// If there is only one healthy server, return it.
if len(healthy) == 1 {
return healthy[0], nil
}
// In order to not get the same backend twice, we make the second call to s.rand.IntN one fewer
// than the length of the slice. We then have to shift over the second index if it is equal or
// greater than the first index, wrapping round if needed.
n1, n2 := b.rand.Intn(len(healthy)), b.rand.Intn(len(healthy))
if n2 == n1 {
n2 = (n2 + 1) % len(healthy)
}
h1, h2 := healthy[n1], healthy[n2]
// Ensure h1 has fewer inflight requests than h2.
if h2.inflight.Load() < h1.inflight.Load() {
log.Debug().Msgf("Service selected by P2C: %s", h2.name)
return h2, nil
}
log.Debug().Msgf("Service selected by P2C: %s", h1.name)
return h1, nil
}
func (b *Balancer) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
if b.sticky != nil {
h, rewrite, err := b.sticky.StickyHandler(req)
if err != nil {
log.Error().Err(err).Msg("Error while getting sticky handler")
} else if h != nil {
if _, ok := b.status[h.Name]; ok {
if rewrite {
if err := b.sticky.WriteStickyCookie(rw, h.Name); err != nil {
log.Error().Err(err).Msg("Writing sticky cookie")
}
}
h.ServeHTTP(rw, req)
return
}
}
}
server, err := b.nextServer()
if err != nil {
if errors.Is(err, errNoAvailableServer) {
http.Error(rw, errNoAvailableServer.Error(), http.StatusServiceUnavailable)
} else {
http.Error(rw, err.Error(), http.StatusInternalServerError)
}
return
}
if b.sticky != nil {
if err := b.sticky.WriteStickyCookie(rw, server.name); err != nil {
log.Error().Err(err).Msg("Error while writing sticky cookie")
}
}
server.ServeHTTP(rw, req)
}
// AddServer adds a handler with a server.
func (b *Balancer) AddServer(name string, handler http.Handler, server dynamic.Server) {
h := &namedHandler{Handler: handler, name: name}
b.handlersMu.Lock()
b.handlers = append(b.handlers, h)
b.status[name] = struct{}{}
if server.Fenced {
b.fenced[name] = struct{}{}
}
b.handlersMu.Unlock()
if b.sticky != nil {
b.sticky.AddHandler(name, h)
}
}

View file

@ -0,0 +1,288 @@
package p2c
import (
"context"
"net/http"
"net/http/httptest"
"strconv"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/traefik/traefik/v3/pkg/config/dynamic"
)
func TestP2C(t *testing.T) {
testCases := []struct {
desc string
handlers []*namedHandler
rand *mockRand
expectedHandler string
}{
{
desc: "one healthy handler",
handlers: testHandlers(0),
rand: nil,
expectedHandler: "0",
},
{
desc: "two handlers zero in flight",
handlers: testHandlers(0, 0),
rand: &mockRand{vals: []int{1, 0}},
expectedHandler: "1",
},
{
desc: "chooses lower of two",
handlers: testHandlers(0, 1),
rand: &mockRand{vals: []int{1, 0}},
expectedHandler: "0",
},
{
desc: "chooses lower of three",
handlers: testHandlers(10, 90, 40),
rand: &mockRand{vals: []int{1, 1}},
expectedHandler: "2",
},
}
for _, test := range testCases {
t.Run(test.desc, func(t *testing.T) {
t.Parallel()
balancer := New(nil, false)
balancer.rand = test.rand
for _, h := range test.handlers {
balancer.handlers = append(balancer.handlers, h)
balancer.status[h.name] = struct{}{}
}
got, err := balancer.nextServer()
require.NoError(t, err)
assert.Equal(t, test.expectedHandler, got.name)
})
}
}
func TestSticky(t *testing.T) {
balancer := New(&dynamic.Sticky{
Cookie: &dynamic.Cookie{
Name: "test",
Secure: true,
HTTPOnly: true,
SameSite: "none",
MaxAge: 42,
Path: func(v string) *string { return &v }("/foo"),
},
}, false)
balancer.rand = &mockRand{vals: []int{1, 0}}
balancer.AddServer("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "first")
rw.WriteHeader(http.StatusOK)
}), dynamic.Server{})
balancer.AddServer("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "second")
rw.WriteHeader(http.StatusOK)
}), dynamic.Server{})
recorder := &responseRecorder{
ResponseRecorder: httptest.NewRecorder(),
save: map[string]int{},
cookies: make(map[string]*http.Cookie),
}
req := httptest.NewRequest(http.MethodGet, "/", nil)
for range 3 {
for _, cookie := range recorder.Result().Cookies() {
assert.NotContains(t, "first", cookie.Value)
assert.NotContains(t, "second", cookie.Value)
req.AddCookie(cookie)
}
recorder.ResponseRecorder = httptest.NewRecorder()
balancer.ServeHTTP(recorder, req)
}
assert.Equal(t, 0, recorder.save["first"])
assert.Equal(t, 3, recorder.save["second"])
assert.True(t, recorder.cookies["test"].HttpOnly)
assert.True(t, recorder.cookies["test"].Secure)
assert.Equal(t, http.SameSiteNoneMode, recorder.cookies["test"].SameSite)
assert.Equal(t, 42, recorder.cookies["test"].MaxAge)
assert.Equal(t, "/foo", recorder.cookies["test"].Path)
}
func TestSticky_Fallback(t *testing.T) {
balancer := New(&dynamic.Sticky{
Cookie: &dynamic.Cookie{Name: "test"},
}, false)
balancer.rand = &mockRand{vals: []int{1, 0}}
balancer.AddServer("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "first")
rw.WriteHeader(http.StatusOK)
}), dynamic.Server{})
balancer.AddServer("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "second")
rw.WriteHeader(http.StatusOK)
}), dynamic.Server{})
recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}, cookies: make(map[string]*http.Cookie)}
req := httptest.NewRequest(http.MethodGet, "/", nil)
req.AddCookie(&http.Cookie{Name: "test", Value: "second"})
for range 3 {
recorder.ResponseRecorder = httptest.NewRecorder()
balancer.ServeHTTP(recorder, req)
}
assert.Equal(t, 0, recorder.save["first"])
assert.Equal(t, 3, recorder.save["second"])
}
// TestSticky_Fenced checks that fenced node receive traffic if their sticky cookie matches.
func TestSticky_Fenced(t *testing.T) {
balancer := New(&dynamic.Sticky{Cookie: &dynamic.Cookie{Name: "test"}}, false)
balancer.rand = &mockRand{vals: []int{1, 0, 1, 0}}
balancer.AddServer("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "first")
rw.WriteHeader(http.StatusOK)
}), dynamic.Server{})
balancer.AddServer("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "second")
rw.WriteHeader(http.StatusOK)
}), dynamic.Server{})
balancer.AddServer("fenced", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "fenced")
rw.WriteHeader(http.StatusOK)
}), dynamic.Server{Fenced: true})
recorder := &responseRecorder{ResponseRecorder: httptest.NewRecorder(), save: map[string]int{}, cookies: make(map[string]*http.Cookie)}
stickyReq := httptest.NewRequest(http.MethodGet, "/", nil)
stickyReq.AddCookie(&http.Cookie{Name: "test", Value: "fenced"})
req := httptest.NewRequest(http.MethodGet, "/", nil)
for range 2 {
recorder.ResponseRecorder = httptest.NewRecorder()
balancer.ServeHTTP(recorder, stickyReq)
balancer.ServeHTTP(recorder, req)
}
assert.Equal(t, 2, recorder.save["fenced"])
assert.Equal(t, 0, recorder.save["first"])
assert.Equal(t, 2, recorder.save["second"])
}
func TestBalancerPropagate(t *testing.T) {
balancer := New(nil, true)
balancer.AddServer("first", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "first")
rw.WriteHeader(http.StatusOK)
}), dynamic.Server{})
balancer.AddServer("second", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.Header().Set("server", "second")
rw.WriteHeader(http.StatusOK)
}), dynamic.Server{})
var calls int
err := balancer.RegisterStatusUpdater(func(up bool) {
calls++
})
require.NoError(t, err)
recorder := httptest.NewRecorder()
balancer.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil))
assert.Equal(t, http.StatusOK, recorder.Code)
// two gets downed, but balancer still up since first is still up.
balancer.SetStatus(context.Background(), "second", false)
assert.Equal(t, 0, calls)
recorder = httptest.NewRecorder()
balancer.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil))
assert.Equal(t, http.StatusOK, recorder.Code)
assert.Equal(t, "first", recorder.Header().Get("server"))
// first gets downed, balancer is down.
balancer.SetStatus(context.Background(), "first", false)
assert.Equal(t, 1, calls)
recorder = httptest.NewRecorder()
balancer.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil))
assert.Equal(t, http.StatusServiceUnavailable, recorder.Code)
// two gets up, balancer up.
balancer.SetStatus(context.Background(), "second", true)
assert.Equal(t, 2, calls)
recorder = httptest.NewRecorder()
balancer.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil))
assert.Equal(t, http.StatusOK, recorder.Code)
assert.Equal(t, "second", recorder.Header().Get("server"))
}
func TestBalancerAllServersFenced(t *testing.T) {
balancer := New(nil, false)
balancer.AddServer("test", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {}), dynamic.Server{Fenced: true})
balancer.AddServer("test2", http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {}), dynamic.Server{Fenced: true})
recorder := httptest.NewRecorder()
balancer.ServeHTTP(recorder, httptest.NewRequest(http.MethodGet, "/", nil))
assert.Equal(t, http.StatusServiceUnavailable, recorder.Result().StatusCode)
}
type responseRecorder struct {
*httptest.ResponseRecorder
save map[string]int
sequence []string
status []int
cookies map[string]*http.Cookie
}
func (r *responseRecorder) WriteHeader(statusCode int) {
r.save[r.Header().Get("server")]++
r.sequence = append(r.sequence, r.Header().Get("server"))
r.status = append(r.status, statusCode)
for _, cookie := range r.Result().Cookies() {
r.cookies[cookie.Name] = cookie
}
r.ResponseRecorder.WriteHeader(statusCode)
}
type mockRand struct {
vals []int
calls int
}
func (m *mockRand) Intn(int) int {
defer func() {
m.calls++
}()
return m.vals[m.calls]
}
func testHandlers(inflights ...int) []*namedHandler {
var out []*namedHandler
for i, inflight := range inflights {
h := &namedHandler{
name: strconv.Itoa(i),
}
h.inflight.Store(int64(inflight))
out = append(out, h)
}
return out
}