1
0
Fork 0

Feature: Exponential Backoff in Retry Middleware

This commit is contained in:
Daniel Adams 2020-11-05 10:14:04 -05:00 committed by GitHub
parent 3a8cb3f010
commit 74d1d55051
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
12 changed files with 218 additions and 44 deletions

View file

@ -5,10 +5,13 @@ import (
"context"
"fmt"
"io/ioutil"
"math"
"net"
"net/http"
"net/http/httptrace"
"time"
"github.com/cenkalti/backoff/v4"
"github.com/opentracing/opentracing-go/ext"
"github.com/traefik/traefik/v2/pkg/config/dynamic"
"github.com/traefik/traefik/v2/pkg/log"
@ -34,12 +37,18 @@ type Listener interface {
// each of them about a retry attempt.
type Listeners []Listener
// nexter returns the duration to wait before retrying the operation.
type nexter interface {
NextBackOff() time.Duration
}
// retry is a middleware that retries requests.
type retry struct {
attempts int
next http.Handler
listener Listener
name string
attempts int
initialInterval time.Duration
next http.Handler
listener Listener
name string
}
// New returns a new retry middleware.
@ -51,10 +60,11 @@ func New(ctx context.Context, next http.Handler, config dynamic.Retry, listener
}
return &retry{
attempts: config.Attempts,
next: next,
listener: listener,
name: name,
attempts: config.Attempts,
initialInterval: time.Duration(config.InitialInterval),
next: next,
listener: listener,
name: name,
}, nil
}
@ -72,36 +82,65 @@ func (r *retry) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
}
attempts := 1
backOff := r.newBackOff()
currentInterval := 0 * time.Millisecond
for {
shouldRetry := attempts < r.attempts
retryResponseWriter := newResponseWriter(rw, shouldRetry)
select {
case <-time.After(currentInterval):
// Disable retries when the backend already received request data
trace := &httptrace.ClientTrace{
WroteHeaders: func() {
retryResponseWriter.DisableRetries()
},
WroteRequest: func(httptrace.WroteRequestInfo) {
retryResponseWriter.DisableRetries()
},
shouldRetry := attempts < r.attempts
retryResponseWriter := newResponseWriter(rw, shouldRetry)
// Disable retries when the backend already received request data
trace := &httptrace.ClientTrace{
WroteHeaders: func() {
retryResponseWriter.DisableRetries()
},
WroteRequest: func(httptrace.WroteRequestInfo) {
retryResponseWriter.DisableRetries()
},
}
newCtx := httptrace.WithClientTrace(req.Context(), trace)
r.next.ServeHTTP(retryResponseWriter, req.WithContext(newCtx))
if !retryResponseWriter.ShouldRetry() {
return
}
currentInterval = backOff.NextBackOff()
attempts++
log.FromContext(middlewares.GetLoggerCtx(req.Context(), r.name, typeName)).
Debugf("New attempt %d for request: %v", attempts, req.URL)
r.listener.Retried(req, attempts)
case <-req.Context().Done():
return
}
newCtx := httptrace.WithClientTrace(req.Context(), trace)
r.next.ServeHTTP(retryResponseWriter, req.WithContext(newCtx))
if !retryResponseWriter.ShouldRetry() {
break
}
attempts++
log.FromContext(middlewares.GetLoggerCtx(req.Context(), r.name, typeName)).
Debugf("New attempt %d for request: %v", attempts, req.URL)
r.listener.Retried(req, attempts)
}
}
func (r *retry) newBackOff() nexter {
if r.attempts < 2 || r.initialInterval <= 0 {
return &backoff.ZeroBackOff{}
}
b := backoff.NewExponentialBackOff()
b.InitialInterval = r.initialInterval
// calculate the multiplier for the given number of attempts
// so that applying the multiplier for the given number of attempts will not exceed 2 times the initial interval
// it allows to control the progression along the attempts
b.Multiplier = math.Pow(2, 1/float64(r.attempts-1))
// according to docs, b.Reset() must be called before using
b.Reset()
return b
}
// Retried exists to implement the Listener interface. It calls Retried on each of its slice entries.
func (l Listeners) Retried(req *http.Request, attempt int) {
for _, listener := range l {