1
0
Fork 0

Adds mirroring service

This commit is contained in:
Julien Salleyron 2019-08-26 19:00:04 +02:00 committed by Traefiker Bot
parent fd24b1898e
commit 602a2ea541
10 changed files with 465 additions and 10 deletions

View file

@ -306,7 +306,7 @@ func TestRouterManager_Get(t *testing.T) {
Middlewares: test.middlewaresConfig,
},
})
serviceManager := service.NewManager(rtConf.Services, http.DefaultTransport, nil)
serviceManager := service.NewManager(rtConf.Services, http.DefaultTransport, nil, nil)
middlewaresBuilder := middleware.NewBuilder(rtConf.Middlewares, serviceManager)
responseModifierFactory := responsemodifiers.NewBuilder(rtConf.Middlewares)
routerManager := NewManager(rtConf, serviceManager, middlewaresBuilder, responseModifierFactory)
@ -407,7 +407,7 @@ func TestAccessLog(t *testing.T) {
Middlewares: test.middlewaresConfig,
},
})
serviceManager := service.NewManager(rtConf.Services, http.DefaultTransport, nil)
serviceManager := service.NewManager(rtConf.Services, http.DefaultTransport, nil, nil)
middlewaresBuilder := middleware.NewBuilder(rtConf.Middlewares, serviceManager)
responseModifierFactory := responsemodifiers.NewBuilder(rtConf.Middlewares)
routerManager := NewManager(rtConf, serviceManager, middlewaresBuilder, responseModifierFactory)
@ -693,7 +693,7 @@ func TestRuntimeConfiguration(t *testing.T) {
Middlewares: test.middlewareConfig,
},
})
serviceManager := service.NewManager(rtConf.Services, http.DefaultTransport, nil)
serviceManager := service.NewManager(rtConf.Services, http.DefaultTransport, nil, nil)
middlewaresBuilder := middleware.NewBuilder(rtConf.Middlewares, serviceManager)
responseModifierFactory := responsemodifiers.NewBuilder(map[string]*runtime.MiddlewareInfo{})
routerManager := NewManager(rtConf, serviceManager, middlewaresBuilder, responseModifierFactory)
@ -767,7 +767,7 @@ func BenchmarkRouterServe(b *testing.B) {
Middlewares: map[string]*dynamic.Middleware{},
},
})
serviceManager := service.NewManager(rtConf.Services, &staticTransport{res}, nil)
serviceManager := service.NewManager(rtConf.Services, &staticTransport{res}, nil, nil)
middlewaresBuilder := middleware.NewBuilder(rtConf.Middlewares, serviceManager)
responseModifierFactory := responsemodifiers.NewBuilder(rtConf.Middlewares)
routerManager := NewManager(rtConf, serviceManager, middlewaresBuilder, responseModifierFactory)
@ -808,7 +808,7 @@ func BenchmarkService(b *testing.B) {
Services: serviceConfig,
},
})
serviceManager := service.NewManager(rtConf.Services, &staticTransport{res}, nil)
serviceManager := service.NewManager(rtConf.Services, &staticTransport{res}, nil, nil)
w := httptest.NewRecorder()
req := testhelpers.MustNewRequest(http.MethodGet, "http://foo.bar/", nil)

View file

@ -97,7 +97,7 @@ func (s *Server) createTCPRouters(ctx context.Context, configuration *runtime.Co
// createHTTPHandlers returns, for the given configuration and entryPoints, the HTTP handlers for non-TLS connections, and for the TLS ones. the given configuration must not be nil. its fields will get mutated.
func (s *Server) createHTTPHandlers(ctx context.Context, configuration *runtime.Configuration, entryPoints []string) (map[string]http.Handler, map[string]http.Handler) {
serviceManager := service.NewManager(configuration.Services, s.defaultRoundTripper, s.metricsRegistry)
serviceManager := service.NewManager(configuration.Services, s.defaultRoundTripper, s.metricsRegistry, s.routinesPool)
middlewaresBuilder := middleware.NewBuilder(configuration.Middlewares, serviceManager)
responseModifierFactory := responsemodifiers.NewBuilder(configuration.Middlewares)
routerManager := router.NewManager(configuration, serviceManager, middlewaresBuilder, responseModifierFactory)

View file

@ -0,0 +1,104 @@
package mirror
import (
"context"
"errors"
"net/http"
"sync"
"github.com/containous/traefik/v2/pkg/safe"
)
// Mirroring is an http.Handler that can mirror requests.
type Mirroring struct {
handler http.Handler
mirrorHandlers []*mirrorHandler
rw http.ResponseWriter
routinePool *safe.Pool
lock sync.RWMutex
total uint64
}
// New returns a new instance of *Mirroring.
func New(handler http.Handler, pool *safe.Pool) *Mirroring {
return &Mirroring{
routinePool: pool,
handler: handler,
rw: blackholeResponseWriter{},
}
}
func (m *Mirroring) inc() uint64 {
m.lock.Lock()
defer m.lock.Unlock()
m.total++
return m.total
}
type mirrorHandler struct {
http.Handler
percent int
lock sync.RWMutex
count uint64
}
func (m *Mirroring) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
m.handler.ServeHTTP(rw, req)
select {
case <-req.Context().Done():
// No mirroring if request has been canceled during main handler ServeHTTP
return
default:
}
m.routinePool.GoCtx(func(_ context.Context) {
total := m.inc()
for _, handler := range m.mirrorHandlers {
handler.lock.Lock()
if handler.count*100 < total*uint64(handler.percent) {
handler.count++
handler.lock.Unlock()
// When a request served by m.handler is successful, req.Context will be cancelled,
// which would trigger a cancellation of the ongoing mirrored requests.
// Therefore, we give a new, non-cancellable context to each of the mirrored calls,
// so they can terminate by themselves.
handler.ServeHTTP(m.rw, req.WithContext(contextStopPropagation{req.Context()}))
} else {
handler.lock.Unlock()
}
}
})
}
// AddMirror adds an httpHandler to mirror to.
func (m *Mirroring) AddMirror(handler http.Handler, percent int) error {
if percent < 0 || percent >= 100 {
return errors.New("percent must be between 0 and 100")
}
m.mirrorHandlers = append(m.mirrorHandlers, &mirrorHandler{Handler: handler, percent: percent})
return nil
}
type blackholeResponseWriter struct{}
func (b blackholeResponseWriter) Header() http.Header {
return http.Header{}
}
func (b blackholeResponseWriter) Write(bytes []byte) (int, error) {
return len(bytes), nil
}
func (b blackholeResponseWriter) WriteHeader(statusCode int) {
}
type contextStopPropagation struct {
context.Context
}
func (c contextStopPropagation) Done() <-chan struct{} {
return make(chan struct{})
}

View file

@ -0,0 +1,79 @@
package mirror
import (
"context"
"net/http"
"net/http/httptest"
"sync/atomic"
"testing"
"github.com/containous/traefik/v2/pkg/safe"
"github.com/stretchr/testify/assert"
)
func TestMirroringOn100(t *testing.T) {
var countMirror1, countMirror2 int32
handler := http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.WriteHeader(http.StatusOK)
})
pool := safe.NewPool(context.Background())
mirror := New(handler, pool)
err := mirror.AddMirror(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
atomic.AddInt32(&countMirror1, 1)
}), 10)
assert.NoError(t, err)
err = mirror.AddMirror(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
atomic.AddInt32(&countMirror2, 1)
}), 50)
assert.NoError(t, err)
for i := 0; i < 100; i++ {
mirror.ServeHTTP(httptest.NewRecorder(), httptest.NewRequest(http.MethodGet, "/", nil))
}
pool.Stop()
val1 := atomic.LoadInt32(&countMirror1)
val2 := atomic.LoadInt32(&countMirror2)
assert.Equal(t, 10, int(val1))
assert.Equal(t, 50, int(val2))
}
func TestMirroringOn10(t *testing.T) {
var countMirror1, countMirror2 int32
handler := http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.WriteHeader(http.StatusOK)
})
pool := safe.NewPool(context.Background())
mirror := New(handler, pool)
err := mirror.AddMirror(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
atomic.AddInt32(&countMirror1, 1)
}), 10)
assert.NoError(t, err)
err = mirror.AddMirror(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
atomic.AddInt32(&countMirror2, 1)
}), 50)
assert.NoError(t, err)
for i := 0; i < 10; i++ {
mirror.ServeHTTP(httptest.NewRecorder(), httptest.NewRequest(http.MethodGet, "/", nil))
}
pool.Stop()
val1 := atomic.LoadInt32(&countMirror1)
val2 := atomic.LoadInt32(&countMirror2)
assert.Equal(t, 1, int(val1))
assert.Equal(t, 5, int(val2))
}
func TestInvalidPercent(t *testing.T) {
mirror := New(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {}), safe.NewPool(context.Background()))
err := mirror.AddMirror(nil, -1)
assert.Error(t, err)
err = mirror.AddMirror(nil, 101)
assert.Error(t, err)
}

View file

@ -7,6 +7,7 @@ import (
"net/http"
"net/http/httputil"
"net/url"
"reflect"
"time"
"github.com/containous/alice"
@ -19,8 +20,10 @@ import (
"github.com/containous/traefik/v2/pkg/middlewares/emptybackendhandler"
metricsMiddle "github.com/containous/traefik/v2/pkg/middlewares/metrics"
"github.com/containous/traefik/v2/pkg/middlewares/pipelining"
"github.com/containous/traefik/v2/pkg/safe"
"github.com/containous/traefik/v2/pkg/server/cookie"
"github.com/containous/traefik/v2/pkg/server/internal"
"github.com/containous/traefik/v2/pkg/server/service/loadbalancer/mirror"
"github.com/containous/traefik/v2/pkg/server/service/loadbalancer/wrr"
"github.com/vulcand/oxy/roundrobin"
)
@ -31,8 +34,9 @@ const (
)
// NewManager creates a new Manager
func NewManager(configs map[string]*runtime.ServiceInfo, defaultRoundTripper http.RoundTripper, metricsRegistry metrics.Registry) *Manager {
func NewManager(configs map[string]*runtime.ServiceInfo, defaultRoundTripper http.RoundTripper, metricsRegistry metrics.Registry, routinePool *safe.Pool) *Manager {
return &Manager{
routinePool: routinePool,
metricsRegistry: metricsRegistry,
bufferPool: newBufferPool(),
defaultRoundTripper: defaultRoundTripper,
@ -43,6 +47,7 @@ func NewManager(configs map[string]*runtime.ServiceInfo, defaultRoundTripper htt
// Manager The service manager
type Manager struct {
routinePool *safe.Pool
metricsRegistry metrics.Registry
bufferPool httputil.BufferPool
defaultRoundTripper http.RoundTripper
@ -62,7 +67,14 @@ func (m *Manager) BuildHTTP(rootCtx context.Context, serviceName string, respons
return nil, fmt.Errorf("the service %q does not exist", serviceName)
}
if conf.LoadBalancer != nil && conf.Weighted != nil {
value := reflect.ValueOf(*conf.Service)
var count int
for i := 0; i < value.NumField(); i++ {
if !value.Field(i).IsNil() {
count++
}
}
if count > 1 {
return nil, errors.New("cannot create service: multi-types service not supported, consider declaring two different pieces of service instead")
}
@ -83,6 +95,13 @@ func (m *Manager) BuildHTTP(rootCtx context.Context, serviceName string, respons
conf.AddError(err, true)
return nil, err
}
case conf.Mirroring != nil:
var err error
lb, err = m.getLoadBalancerMirrorServiceHandler(ctx, serviceName, conf.Mirroring, responseModifier)
if err != nil {
conf.AddError(err, true)
return nil, err
}
default:
sErr := fmt.Errorf("the service %q does not have any type defined", serviceName)
conf.AddError(sErr, true)
@ -92,6 +111,27 @@ func (m *Manager) BuildHTTP(rootCtx context.Context, serviceName string, respons
return lb, nil
}
func (m *Manager) getLoadBalancerMirrorServiceHandler(ctx context.Context, serviceName string, config *dynamic.Mirroring, responseModifier func(*http.Response) error) (http.Handler, error) {
serviceHandler, err := m.BuildHTTP(ctx, config.Service, responseModifier)
if err != nil {
return nil, err
}
handler := mirror.New(serviceHandler, m.routinePool)
for _, mirrorConfig := range config.Mirrors {
mirrorHandler, err := m.BuildHTTP(ctx, mirrorConfig.Name, responseModifier)
if err != nil {
return nil, err
}
err = handler.AddMirror(mirrorHandler, mirrorConfig.Percent)
if err != nil {
return nil, err
}
}
return handler, nil
}
func (m *Manager) getLoadBalancerWRRServiceHandler(ctx context.Context, serviceName string, config *dynamic.WeightedRoundRobin, responseModifier func(*http.Response) error) (http.Handler, error) {
// TODO Handle accesslog and metrics with multiple service name
if config.Sticky != nil && config.Sticky.Cookie != nil {

View file

@ -80,7 +80,7 @@ func TestGetLoadBalancer(t *testing.T) {
}
func TestGetLoadBalancerServiceHandler(t *testing.T) {
sm := NewManager(nil, http.DefaultTransport, nil)
sm := NewManager(nil, http.DefaultTransport, nil, nil)
server1 := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("X-From", "first")
@ -332,7 +332,7 @@ func TestManager_Build(t *testing.T) {
t.Run(test.desc, func(t *testing.T) {
t.Parallel()
manager := NewManager(test.configs, http.DefaultTransport, nil)
manager := NewManager(test.configs, http.DefaultTransport, nil, nil)
ctx := context.Background()
if len(test.providerName) > 0 {
@ -345,4 +345,18 @@ func TestManager_Build(t *testing.T) {
}
}
func TestMultipleTypeOnBuildHTTP(t *testing.T) {
manager := NewManager(map[string]*runtime.ServiceInfo{
"test@file": {
Service: &dynamic.Service{
LoadBalancer: &dynamic.ServersLoadBalancer{},
Weighted: &dynamic.WeightedRoundRobin{},
},
},
}, http.DefaultTransport, nil, nil)
_, err := manager.BuildHTTP(context.Background(), "test@file", nil)
assert.Error(t, err, "cannot create service: multi-types service not supported, consider declaring two different pieces of service instead")
}
// FIXME Add healthcheck tests