Rework servers load-balancer to use the WRR
Co-authored-by: Kevin Pollet <pollet.kevin@gmail.com>
This commit is contained in:
parent
67d9c8da0b
commit
fadee5e87b
70 changed files with 2085 additions and 2211 deletions
|
@ -4,36 +4,28 @@ import (
|
|||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"hash/fnv"
|
||||
"math/rand"
|
||||
"net/http"
|
||||
"net/http/httputil"
|
||||
"net/url"
|
||||
"reflect"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/containous/alice"
|
||||
"github.com/traefik/traefik/v2/pkg/config/dynamic"
|
||||
"github.com/traefik/traefik/v2/pkg/config/runtime"
|
||||
"github.com/traefik/traefik/v2/pkg/healthcheck"
|
||||
"github.com/traefik/traefik/v2/pkg/log"
|
||||
"github.com/traefik/traefik/v2/pkg/metrics"
|
||||
"github.com/traefik/traefik/v2/pkg/middlewares/accesslog"
|
||||
"github.com/traefik/traefik/v2/pkg/middlewares/emptybackendhandler"
|
||||
metricsMiddle "github.com/traefik/traefik/v2/pkg/middlewares/metrics"
|
||||
"github.com/traefik/traefik/v2/pkg/middlewares/pipelining"
|
||||
"github.com/traefik/traefik/v2/pkg/safe"
|
||||
"github.com/traefik/traefik/v2/pkg/server/cookie"
|
||||
"github.com/traefik/traefik/v2/pkg/server/provider"
|
||||
"github.com/traefik/traefik/v2/pkg/server/service/loadbalancer/failover"
|
||||
"github.com/traefik/traefik/v2/pkg/server/service/loadbalancer/mirror"
|
||||
"github.com/traefik/traefik/v2/pkg/server/service/loadbalancer/wrr"
|
||||
"github.com/vulcand/oxy/roundrobin"
|
||||
"github.com/vulcand/oxy/roundrobin/stickycookie"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultHealthCheckInterval = 30 * time.Second
|
||||
defaultHealthCheckTimeout = 5 * time.Second
|
||||
)
|
||||
|
||||
const defaultMaxBodySize int64 = -1
|
||||
|
@ -43,6 +35,19 @@ type RoundTripperGetter interface {
|
|||
Get(name string) (http.RoundTripper, error)
|
||||
}
|
||||
|
||||
// Manager The service manager.
|
||||
type Manager struct {
|
||||
routinePool *safe.Pool
|
||||
metricsRegistry metrics.Registry
|
||||
bufferPool httputil.BufferPool
|
||||
roundTripperManager RoundTripperGetter
|
||||
|
||||
services map[string]http.Handler
|
||||
configs map[string]*runtime.ServiceInfo
|
||||
healthCheckers map[string]*healthcheck.ServiceHealthChecker
|
||||
rand *rand.Rand // For the initial shuffling of load-balancers.
|
||||
}
|
||||
|
||||
// NewManager creates a new Manager.
|
||||
func NewManager(configs map[string]*runtime.ServiceInfo, metricsRegistry metrics.Registry, routinePool *safe.Pool, roundTripperManager RoundTripperGetter) *Manager {
|
||||
return &Manager{
|
||||
|
@ -50,27 +55,13 @@ func NewManager(configs map[string]*runtime.ServiceInfo, metricsRegistry metrics
|
|||
metricsRegistry: metricsRegistry,
|
||||
bufferPool: newBufferPool(),
|
||||
roundTripperManager: roundTripperManager,
|
||||
balancers: make(map[string]healthcheck.Balancers),
|
||||
services: make(map[string]http.Handler),
|
||||
configs: configs,
|
||||
healthCheckers: make(map[string]*healthcheck.ServiceHealthChecker),
|
||||
rand: rand.New(rand.NewSource(time.Now().UnixNano())),
|
||||
}
|
||||
}
|
||||
|
||||
// Manager The service manager.
|
||||
type Manager struct {
|
||||
routinePool *safe.Pool
|
||||
metricsRegistry metrics.Registry
|
||||
bufferPool httputil.BufferPool
|
||||
roundTripperManager RoundTripperGetter
|
||||
// balancers is the map of all Balancers, keyed by service name.
|
||||
// There is one Balancer per service handler, and there is one service handler per reference to a service
|
||||
// (e.g. if 2 routers refer to the same service name, 2 service handlers are created),
|
||||
// which is why there is not just one Balancer per service name.
|
||||
balancers map[string]healthcheck.Balancers
|
||||
configs map[string]*runtime.ServiceInfo
|
||||
rand *rand.Rand // For the initial shuffling of load-balancers.
|
||||
}
|
||||
|
||||
// BuildHTTP Creates a http.Handler for a service configuration.
|
||||
func (m *Manager) BuildHTTP(rootCtx context.Context, serviceName string) (http.Handler, error) {
|
||||
ctx := log.With(rootCtx, log.Str(log.ServiceName, serviceName))
|
||||
|
@ -78,11 +69,20 @@ func (m *Manager) BuildHTTP(rootCtx context.Context, serviceName string) (http.H
|
|||
serviceName = provider.GetQualifiedName(ctx, serviceName)
|
||||
ctx = provider.AddInContext(ctx, serviceName)
|
||||
|
||||
handler, ok := m.services[serviceName]
|
||||
if ok {
|
||||
return handler, nil
|
||||
}
|
||||
|
||||
conf, ok := m.configs[serviceName]
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("the service %q does not exist", serviceName)
|
||||
}
|
||||
|
||||
if conf.Status == runtime.StatusDisabled {
|
||||
return nil, errors.New(strings.Join(conf.Err, ", "))
|
||||
}
|
||||
|
||||
value := reflect.ValueOf(*conf.Service)
|
||||
var count int
|
||||
for i := 0; i < value.NumField(); i++ {
|
||||
|
@ -101,7 +101,7 @@ func (m *Manager) BuildHTTP(rootCtx context.Context, serviceName string) (http.H
|
|||
switch {
|
||||
case conf.LoadBalancer != nil:
|
||||
var err error
|
||||
lb, err = m.getLoadBalancerServiceHandler(ctx, serviceName, conf.LoadBalancer)
|
||||
lb, err = m.getLoadBalancerServiceHandler(ctx, serviceName, conf)
|
||||
if err != nil {
|
||||
conf.AddError(err, true)
|
||||
return nil, err
|
||||
|
@ -133,6 +133,8 @@ func (m *Manager) BuildHTTP(rootCtx context.Context, serviceName string) (http.H
|
|||
return nil, sErr
|
||||
}
|
||||
|
||||
m.services[serviceName] = lb
|
||||
|
||||
return lb, nil
|
||||
}
|
||||
|
||||
|
@ -214,14 +216,14 @@ func (m *Manager) getWRRServiceHandler(ctx context.Context, serviceName string,
|
|||
config.Sticky.Cookie.Name = cookie.GetName(config.Sticky.Cookie.Name, serviceName)
|
||||
}
|
||||
|
||||
balancer := wrr.New(config.Sticky, config.HealthCheck)
|
||||
balancer := wrr.New(config.Sticky, config.HealthCheck != nil)
|
||||
for _, service := range shuffle(config.Services, m.rand) {
|
||||
serviceHandler, err := m.BuildHTTP(ctx, service.Name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
balancer.AddService(service.Name, serviceHandler, service.Weight)
|
||||
balancer.Add(service.Name, serviceHandler, service.Weight)
|
||||
|
||||
if config.HealthCheck == nil {
|
||||
continue
|
||||
|
@ -245,216 +247,91 @@ func (m *Manager) getWRRServiceHandler(ctx context.Context, serviceName string,
|
|||
return balancer, nil
|
||||
}
|
||||
|
||||
func (m *Manager) getLoadBalancerServiceHandler(ctx context.Context, serviceName string, service *dynamic.ServersLoadBalancer) (http.Handler, error) {
|
||||
if service.PassHostHeader == nil {
|
||||
defaultPassHostHeader := true
|
||||
service.PassHostHeader = &defaultPassHostHeader
|
||||
func (m *Manager) getLoadBalancerServiceHandler(ctx context.Context, serviceName string, info *runtime.ServiceInfo) (http.Handler, error) {
|
||||
service := info.LoadBalancer
|
||||
|
||||
logger := log.FromContext(ctx)
|
||||
logger.Debug("Creating load-balancer")
|
||||
|
||||
// TODO: should we keep this config value as Go is now handling stream response correctly?
|
||||
flushInterval := dynamic.DefaultFlushInterval
|
||||
if service.ResponseForwarding != nil {
|
||||
flushInterval = service.ResponseForwarding.FlushInterval
|
||||
}
|
||||
|
||||
if len(service.ServersTransport) > 0 {
|
||||
service.ServersTransport = provider.GetQualifiedName(ctx, service.ServersTransport)
|
||||
}
|
||||
|
||||
if service.Sticky != nil && service.Sticky.Cookie != nil {
|
||||
service.Sticky.Cookie.Name = cookie.GetName(service.Sticky.Cookie.Name, serviceName)
|
||||
}
|
||||
|
||||
// We make sure that the PassHostHeader value is defined to avoid panics.
|
||||
passHostHeader := dynamic.DefaultPassHostHeader
|
||||
if service.PassHostHeader != nil {
|
||||
passHostHeader = *service.PassHostHeader
|
||||
}
|
||||
|
||||
roundTripper, err := m.roundTripperManager.Get(service.ServersTransport)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
fwd, err := buildProxy(service.PassHostHeader, service.ResponseForwarding, roundTripper, m.bufferPool)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
lb := wrr.New(service.Sticky, service.HealthCheck != nil)
|
||||
healthCheckTargets := make(map[string]*url.URL)
|
||||
|
||||
for _, server := range shuffle(service.Servers, m.rand) {
|
||||
hasher := fnv.New64a()
|
||||
_, _ = hasher.Write([]byte(server.URL)) // this will never return an error.
|
||||
|
||||
proxyName := fmt.Sprintf("%x", hasher.Sum(nil))
|
||||
|
||||
target, err := url.Parse(server.URL)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error parsing server URL %s: %w", server.URL, err)
|
||||
}
|
||||
|
||||
logger.WithField(log.ServerName, proxyName).Debugf("Creating server %s", target)
|
||||
|
||||
proxy := buildSingleHostProxy(target, passHostHeader, time.Duration(flushInterval), roundTripper, m.bufferPool)
|
||||
|
||||
proxy = accesslog.NewFieldHandler(proxy, accesslog.ServiceURL, target.String(), nil)
|
||||
proxy = accesslog.NewFieldHandler(proxy, accesslog.ServiceAddr, target.Host, nil)
|
||||
proxy = accesslog.NewFieldHandler(proxy, accesslog.ServiceName, serviceName, nil)
|
||||
|
||||
if m.metricsRegistry != nil && m.metricsRegistry.IsSvcEnabled() {
|
||||
proxy = metricsMiddle.NewServiceMiddleware(ctx, proxy, m.metricsRegistry, serviceName)
|
||||
}
|
||||
|
||||
lb.Add(proxyName, proxy, nil)
|
||||
|
||||
// servers are considered UP by default.
|
||||
info.UpdateServerStatus(target.String(), runtime.StatusUp)
|
||||
|
||||
healthCheckTargets[proxyName] = target
|
||||
}
|
||||
|
||||
alHandler := func(next http.Handler) (http.Handler, error) {
|
||||
return accesslog.NewFieldHandler(next, accesslog.ServiceName, serviceName, accesslog.AddServiceFields), nil
|
||||
}
|
||||
chain := alice.New()
|
||||
if m.metricsRegistry != nil && m.metricsRegistry.IsSvcEnabled() {
|
||||
chain = chain.Append(metricsMiddle.WrapServiceHandler(ctx, m.metricsRegistry, serviceName))
|
||||
if service.HealthCheck != nil {
|
||||
m.healthCheckers[serviceName] = healthcheck.NewServiceHealthChecker(
|
||||
ctx,
|
||||
m.metricsRegistry,
|
||||
service.HealthCheck,
|
||||
lb,
|
||||
info,
|
||||
roundTripper,
|
||||
healthCheckTargets,
|
||||
)
|
||||
}
|
||||
|
||||
handler, err := chain.Append(alHandler).Then(pipelining.New(ctx, fwd, "pipelining"))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
balancer, err := m.getLoadBalancer(ctx, serviceName, service, handler)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// TODO rename and checks
|
||||
m.balancers[serviceName] = append(m.balancers[serviceName], balancer)
|
||||
|
||||
// Empty (backend with no servers)
|
||||
return emptybackendhandler.New(balancer), nil
|
||||
return lb, nil
|
||||
}
|
||||
|
||||
// LaunchHealthCheck launches the health checks.
|
||||
func (m *Manager) LaunchHealthCheck() {
|
||||
backendConfigs := make(map[string]*healthcheck.BackendConfig)
|
||||
|
||||
for serviceName, balancers := range m.balancers {
|
||||
ctx := log.With(context.Background(), log.Str(log.ServiceName, serviceName))
|
||||
|
||||
service := m.configs[serviceName].LoadBalancer
|
||||
|
||||
// Health Check
|
||||
hcOpts := buildHealthCheckOptions(ctx, balancers, serviceName, service.HealthCheck)
|
||||
if hcOpts == nil {
|
||||
continue
|
||||
}
|
||||
hcOpts.Transport, _ = m.roundTripperManager.Get(service.ServersTransport)
|
||||
log.FromContext(ctx).Debugf("Setting up healthcheck for service %s with %s", serviceName, *hcOpts)
|
||||
|
||||
backendConfigs[serviceName] = healthcheck.NewBackendConfig(*hcOpts, serviceName)
|
||||
}
|
||||
|
||||
healthcheck.GetHealthCheck(m.metricsRegistry).SetBackendsConfiguration(context.Background(), backendConfigs)
|
||||
}
|
||||
|
||||
func buildHealthCheckOptions(ctx context.Context, lb healthcheck.Balancer, backend string, hc *dynamic.ServerHealthCheck) *healthcheck.Options {
|
||||
if hc == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
logger := log.FromContext(ctx)
|
||||
|
||||
if hc.Path == "" {
|
||||
logger.Errorf("Ignoring heath check configuration for '%s': no path provided", backend)
|
||||
return nil
|
||||
}
|
||||
|
||||
interval := defaultHealthCheckInterval
|
||||
if hc.Interval != "" {
|
||||
intervalOverride, err := time.ParseDuration(hc.Interval)
|
||||
switch {
|
||||
case err != nil:
|
||||
logger.Errorf("Illegal health check interval for '%s': %s", backend, err)
|
||||
case intervalOverride <= 0:
|
||||
logger.Errorf("Health check interval smaller than zero for service '%s'", backend)
|
||||
default:
|
||||
interval = intervalOverride
|
||||
}
|
||||
}
|
||||
|
||||
timeout := defaultHealthCheckTimeout
|
||||
if hc.Timeout != "" {
|
||||
timeoutOverride, err := time.ParseDuration(hc.Timeout)
|
||||
switch {
|
||||
case err != nil:
|
||||
logger.Errorf("Illegal health check timeout for backend '%s': %s", backend, err)
|
||||
case timeoutOverride <= 0:
|
||||
logger.Errorf("Health check timeout smaller than zero for backend '%s', backend", backend)
|
||||
default:
|
||||
timeout = timeoutOverride
|
||||
}
|
||||
}
|
||||
|
||||
if timeout >= interval {
|
||||
logger.Warnf("Health check timeout for backend '%s' should be lower than the health check interval. Interval set to timeout + 1 second (%s).", backend, interval)
|
||||
}
|
||||
|
||||
followRedirects := true
|
||||
if hc.FollowRedirects != nil {
|
||||
followRedirects = *hc.FollowRedirects
|
||||
}
|
||||
|
||||
mode := healthcheck.HTTPMode
|
||||
switch hc.Mode {
|
||||
case "":
|
||||
mode = healthcheck.HTTPMode
|
||||
case healthcheck.GRPCMode, healthcheck.HTTPMode:
|
||||
mode = hc.Mode
|
||||
default:
|
||||
logger.Errorf("Illegal health check mode for backend '%s'", backend)
|
||||
}
|
||||
|
||||
return &healthcheck.Options{
|
||||
Scheme: hc.Scheme,
|
||||
Mode: mode,
|
||||
Path: hc.Path,
|
||||
Method: hc.Method,
|
||||
Port: hc.Port,
|
||||
Interval: interval,
|
||||
Timeout: timeout,
|
||||
LB: lb,
|
||||
Hostname: hc.Hostname,
|
||||
Headers: hc.Headers,
|
||||
FollowRedirects: followRedirects,
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) getLoadBalancer(ctx context.Context, serviceName string, service *dynamic.ServersLoadBalancer, fwd http.Handler) (healthcheck.BalancerStatusHandler, error) {
|
||||
logger := log.FromContext(ctx)
|
||||
logger.Debug("Creating load-balancer")
|
||||
|
||||
var options []roundrobin.LBOption
|
||||
|
||||
var cookieName string
|
||||
if service.Sticky != nil && service.Sticky.Cookie != nil {
|
||||
cookieName = cookie.GetName(service.Sticky.Cookie.Name, serviceName)
|
||||
|
||||
opts := roundrobin.CookieOptions{
|
||||
HTTPOnly: service.Sticky.Cookie.HTTPOnly,
|
||||
Secure: service.Sticky.Cookie.Secure,
|
||||
SameSite: convertSameSite(service.Sticky.Cookie.SameSite),
|
||||
}
|
||||
|
||||
// Sticky Cookie Value
|
||||
cv, err := stickycookie.NewFallbackValue(&stickycookie.RawValue{}, &stickycookie.HashValue{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
options = append(options, roundrobin.EnableStickySession(roundrobin.NewStickySessionWithOptions(cookieName, opts).SetCookieValue(cv)))
|
||||
|
||||
logger.Debugf("Sticky session cookie name: %v", cookieName)
|
||||
}
|
||||
|
||||
lb, err := roundrobin.New(fwd, options...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
lbsu := healthcheck.NewLBStatusUpdater(lb, m.configs[serviceName], service.HealthCheck)
|
||||
if err := m.upsertServers(ctx, lbsu, service.Servers); err != nil {
|
||||
return nil, fmt.Errorf("error configuring load balancer for service %s: %w", serviceName, err)
|
||||
}
|
||||
|
||||
return lbsu, nil
|
||||
}
|
||||
|
||||
func (m *Manager) upsertServers(ctx context.Context, lb healthcheck.BalancerHandler, servers []dynamic.Server) error {
|
||||
logger := log.FromContext(ctx)
|
||||
|
||||
for name, srv := range shuffle(servers, m.rand) {
|
||||
u, err := url.Parse(srv.URL)
|
||||
if err != nil {
|
||||
return fmt.Errorf("error parsing server URL %s: %w", srv.URL, err)
|
||||
}
|
||||
|
||||
logger.WithField(log.ServerName, name).Debugf("Creating server %d %s", name, u)
|
||||
|
||||
if err := lb.UpsertServer(u, roundrobin.Weight(1)); err != nil {
|
||||
return fmt.Errorf("error adding server %s to load balancer: %w", srv.URL, err)
|
||||
}
|
||||
|
||||
// TODO Handle Metrics
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func convertSameSite(sameSite string) http.SameSite {
|
||||
switch sameSite {
|
||||
case "none":
|
||||
return http.SameSiteNoneMode
|
||||
case "lax":
|
||||
return http.SameSiteLaxMode
|
||||
case "strict":
|
||||
return http.SameSiteStrictMode
|
||||
default:
|
||||
return 0
|
||||
func (m *Manager) LaunchHealthCheck(ctx context.Context) {
|
||||
for serviceName, hc := range m.healthCheckers {
|
||||
ctx = log.With(ctx, log.Str(log.ServiceName, serviceName))
|
||||
go hc.Launch(ctx)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue