1
0
Fork 0

Merge branch v2.1 into master

This commit is contained in:
Fernandez Ludovic 2020-02-10 16:03:39 +01:00
commit aa21351d0d
76 changed files with 392 additions and 395 deletions

View file

@ -67,7 +67,7 @@ func TestHandler_EntryPoints(t *testing.T) {
TrustedIPs: []string{"192.168.1.3", "192.168.1.4"},
},
},
"web-secure": {
"websecure": {
Address: ":443",
Transport: &static.EntryPointsTransport{
LifeCycle: &static.LifeCycle{

View file

@ -37,7 +37,7 @@
"192.168.1.40"
]
},
"name": "web-secure",
"name": "websecure",
"proxyProtocol": {
"insecure": true,
"trustedIPs": [

View file

@ -2,12 +2,12 @@ package auth
import (
"context"
"crypto/tls"
"fmt"
"io/ioutil"
"net"
"net/http"
"strings"
"time"
"github.com/containous/traefik/v2/pkg/config/dynamic"
"github.com/containous/traefik/v2/pkg/log"
@ -29,7 +29,7 @@ type forwardAuth struct {
authResponseHeaders []string
next http.Handler
name string
tlsConfig *tls.Config
client http.Client
trustForwardHeader bool
}
@ -45,13 +45,23 @@ func NewForward(ctx context.Context, next http.Handler, config dynamic.ForwardAu
trustForwardHeader: config.TrustForwardHeader,
}
// Ensure our request client does not follow redirects
fa.client = http.Client{
CheckRedirect: func(r *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
Timeout: 30 * time.Second,
}
if config.TLS != nil {
tlsConfig, err := config.TLS.CreateTLSConfig()
if err != nil {
return nil, err
}
fa.tlsConfig = tlsConfig
tr := http.DefaultTransport.(*http.Transport).Clone()
tr.TLSClientConfig = tlsConfig
fa.client.Transport = tr
}
return fa, nil
@ -64,19 +74,6 @@ func (fa *forwardAuth) GetTracingInformation() (string, ext.SpanKindEnum) {
func (fa *forwardAuth) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
logger := log.FromContext(middlewares.GetLoggerCtx(req.Context(), fa.name, forwardedTypeName))
// Ensure our request client does not follow redirects
httpClient := http.Client{
CheckRedirect: func(r *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
}
if fa.tlsConfig != nil {
httpClient.Transport = &http.Transport{
TLSClientConfig: fa.tlsConfig,
}
}
forwardReq, err := http.NewRequest(http.MethodGet, fa.address, nil)
tracing.LogRequest(tracing.GetSpan(req), forwardReq)
if err != nil {
@ -94,7 +91,7 @@ func (fa *forwardAuth) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
writeHeader(req, forwardReq, fa.trustForwardHeader)
forwardResponse, forwardErr := httpClient.Do(forwardReq)
forwardResponse, forwardErr := fa.client.Do(forwardReq)
if forwardErr != nil {
logMessage := fmt.Sprintf("Error calling %s. Cause: %s", fa.address, forwardErr)
logger.Debug(logMessage)

View file

@ -179,12 +179,12 @@ func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe.
p.renewCertificates(ctx)
ticker := time.NewTicker(24 * time.Hour)
pool.Go(func(stop chan bool) {
pool.GoCtx(func(ctxPool context.Context) {
for {
select {
case <-ticker.C:
p.renewCertificates(ctx)
case <-stop:
case <-ctxPool.Done():
ticker.Stop()
return
}
@ -341,7 +341,7 @@ func (p *Provider) resolveDomains(ctx context.Context, domains []string, tlsStor
}
func (p *Provider) watchNewDomains(ctx context.Context) {
p.pool.Go(func(stop chan bool) {
p.pool.GoCtx(func(ctxPool context.Context) {
for {
select {
case config := <-p.configFromListenerChan:
@ -415,7 +415,7 @@ func (p *Provider) watchNewDomains(ctx context.Context) {
p.resolveDomains(ctxRouter, domains, tlsStore)
}
}
case <-stop:
case <-ctxPool.Done():
return
}
}
@ -556,7 +556,7 @@ func deleteUnnecessaryDomains(ctx context.Context, domains []types.Domain) []typ
func (p *Provider) watchCertificate(ctx context.Context) {
p.certsChan = make(chan *CertAndStore)
p.pool.Go(func(stop chan bool) {
p.pool.GoCtx(func(ctxPool context.Context) {
for {
select {
case cert := <-p.certsChan:
@ -576,7 +576,7 @@ func (p *Provider) watchCertificate(ctx context.Context) {
if err != nil {
log.FromContext(ctx).Error(err)
}
case <-stop:
case <-ctxPool.Done():
return
}
}

View file

@ -103,11 +103,11 @@ func (p *Provider) addWatcher(pool *safe.Pool, directory string, configurationCh
}
// Process events
pool.Go(func(stop chan bool) {
pool.GoCtx(func(ctx context.Context) {
defer watcher.Close()
for {
select {
case <-stop:
case <-ctx.Done():
return
case evt := <-watcher.Events:
if p.Directory == "" {

View file

@ -66,7 +66,7 @@ metadata:
spec:
ports:
- name: web-secure
- name: websecure
port: 443
targetPort: 8443
selector:
@ -85,7 +85,7 @@ subsets:
- ip: 10.10.0.5
- ip: 10.10.0.6
ports:
- name: web-secure
- name: websecure
port: 8443
---
@ -97,7 +97,7 @@ metadata:
spec:
ports:
- name: web-secure2
- name: websecure2
port: 8443
scheme: https
selector:
@ -116,5 +116,5 @@ subsets:
- ip: 10.10.0.7
- ip: 10.10.0.8
ports:
- name: web-secure2
- name: websecure2
port: 8443

View file

@ -66,7 +66,7 @@ metadata:
spec:
ports:
- name: web-secure
- name: websecure
port: 443
selector:
app: containous
@ -84,7 +84,7 @@ subsets:
- ip: 10.10.0.5
- ip: 10.10.0.6
ports:
- name: web-secure
- name: websecure
port: 443
---

View file

@ -98,11 +98,9 @@ func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe.
return err
}
pool.Go(func(stop chan bool) {
pool.GoCtx(func(ctxPool context.Context) {
operation := func() error {
stopWatch := make(chan struct{}, 1)
defer close(stopWatch)
eventsChan, err := k8sClient.WatchAll(p.Namespaces, stopWatch)
eventsChan, err := k8sClient.WatchAll(p.Namespaces, ctxPool.Done())
if err != nil {
logger.Errorf("Error watching kubernetes events: %v", err)
@ -110,20 +108,20 @@ func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe.
select {
case <-timer.C:
return err
case <-stop:
case <-ctxPool.Done():
return nil
}
}
throttleDuration := time.Duration(p.ThrottleDuration)
throttledChan := throttleEvents(ctxLog, throttleDuration, stop, eventsChan)
throttledChan := throttleEvents(ctxLog, throttleDuration, pool, eventsChan)
if throttledChan != nil {
eventsChan = throttledChan
}
for {
select {
case <-stop:
case <-ctxPool.Done():
return nil
case event := <-eventsChan:
// Note that event is the *first* event that came in during this throttling interval -- if we're hitting our throttle, we may have dropped events.
@ -156,7 +154,7 @@ func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe.
notify := func(err error, time time.Duration) {
logger.Errorf("Provider connection error: %v; retrying in %s", err, time)
}
err := backoff.RetryNotify(safe.OperationWithRecover(operation), job.NewBackOff(backoff.NewExponentialBackOff()), notify)
err := backoff.RetryNotify(safe.OperationWithRecover(operation), backoff.WithContext(job.NewBackOff(backoff.NewExponentialBackOff()), ctxPool), notify)
if err != nil {
logger.Errorf("Cannot connect to Provider: %v", err)
}
@ -625,7 +623,7 @@ func getCABlocks(secret *corev1.Secret, namespace, secretName string) (string, e
return cert, nil
}
func throttleEvents(ctx context.Context, throttleDuration time.Duration, stop chan bool, eventsChan <-chan interface{}) chan interface{} {
func throttleEvents(ctx context.Context, throttleDuration time.Duration, pool *safe.Pool, eventsChan <-chan interface{}) chan interface{} {
if throttleDuration == 0 {
return nil
}
@ -635,10 +633,10 @@ func throttleEvents(ctx context.Context, throttleDuration time.Duration, stop ch
// Run a goroutine that reads events from eventChan and does a non-blocking write to pendingEvent.
// This guarantees that writing to eventChan will never block,
// and that pendingEvent will have something in it if there's been an event since we read from that channel.
go func() {
pool.GoCtx(func(ctxPool context.Context) {
for {
select {
case <-stop:
case <-ctxPool.Done():
return
case nextEvent := <-eventsChan:
select {
@ -650,7 +648,7 @@ func throttleEvents(ctx context.Context, throttleDuration time.Duration, stop ch
}
}
}
}()
})
return eventsChanBuffered
}

View file

@ -0,0 +1,11 @@
kind: Endpoints
apiVersion: v1
metadata:
name: service1
namespace: testing
subsets:
- addresses:
- ip: 10.10.0.1
ports:
- port: 8080

View file

@ -0,0 +1,15 @@
kind: Ingress
apiVersion: networking.k8s.io/v1beta1
metadata:
name: ""
namespace: testing
spec:
rules:
- host: "*.foobar.com"
http:
paths:
- path: /bar
backend:
serviceName: service1
servicePort: 80

View file

@ -0,0 +1,11 @@
---
kind: Service
apiVersion: v1
metadata:
name: service1
namespace: testing
spec:
ports:
- port: 80
clusterIp: 10.0.0.1

View file

@ -105,32 +105,29 @@ func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe.
return err
}
pool.Go(func(stop chan bool) {
pool.GoCtx(func(ctxPool context.Context) {
operation := func() error {
stopWatch := make(chan struct{}, 1)
defer close(stopWatch)
eventsChan, err := k8sClient.WatchAll(p.Namespaces, stopWatch)
eventsChan, err := k8sClient.WatchAll(p.Namespaces, ctxPool.Done())
if err != nil {
logger.Errorf("Error watching kubernetes events: %v", err)
timer := time.NewTimer(1 * time.Second)
select {
case <-timer.C:
return err
case <-stop:
case <-ctxPool.Done():
return nil
}
}
throttleDuration := time.Duration(p.ThrottleDuration)
throttledChan := throttleEvents(ctxLog, throttleDuration, stop, eventsChan)
throttledChan := throttleEvents(ctxLog, throttleDuration, pool, eventsChan)
if throttledChan != nil {
eventsChan = throttledChan
}
for {
select {
case <-stop:
case <-ctxPool.Done():
return nil
case event := <-eventsChan:
// Note that event is the *first* event that came in during this
@ -165,7 +162,8 @@ func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe.
notify := func(err error, time time.Duration) {
logger.Errorf("Provider connection error: %s; retrying in %s", err, time)
}
err := backoff.RetryNotify(safe.OperationWithRecover(operation), job.NewBackOff(backoff.NewExponentialBackOff()), notify)
err := backoff.RetryNotify(safe.OperationWithRecover(operation), backoff.WithContext(job.NewBackOff(backoff.NewExponentialBackOff()), ctxPool), notify)
if err != nil {
logger.Errorf("Cannot connect to Provider: %s", err)
}
@ -267,6 +265,7 @@ func (p *Provider) loadConfigurationFromIngresses(ctx context.Context, client Cl
serviceName := provider.Normalize(ingress.Namespace + "-" + pa.Backend.ServiceName + "-" + pa.Backend.ServicePort.String())
conf.HTTP.Services[serviceName] = service
conf.HTTP.Services[serviceName] = service
routerKey := strings.TrimPrefix(provider.Normalize(rule.Host+pa.Path), "-")
conf.HTTP.Routers[routerKey] = loadRouter(ingress, rule, pa, rtConfig, serviceName)
@ -323,6 +322,14 @@ func (p *Provider) updateIngressStatus(ing *v1beta1.Ingress, k8sClient Client) e
return k8sClient.UpdateIngressStatus(ing, service.Status.LoadBalancer.Ingress[0].IP, service.Status.LoadBalancer.Ingress[0].Hostname)
}
func buildHostRule(host string) string {
if strings.HasPrefix(host, "*.") {
return "HostRegexp(`" + strings.Replace(host, "*.", "{subdomain:[a-zA-Z0-9-]+}.", 1) + "`)"
}
return "Host(`" + host + "`)"
}
func shouldProcessIngress(ingressClass string, ingressClassAnnotation string) bool {
return ingressClass == ingressClassAnnotation ||
(len(ingressClass) == 0 && ingressClassAnnotation == traefikDefaultIngressClass)
@ -522,7 +529,7 @@ func getProtocol(portSpec corev1.ServicePort, portName string, svcConfig *Servic
func loadRouter(ingress *v1beta1.Ingress, rule v1beta1.IngressRule, pa v1beta1.HTTPIngressPath, rtConfig *RouterConfig, serviceName string) *dynamic.Router {
var rules []string
if len(rule.Host) > 0 {
rules = []string{"Host(`" + rule.Host + "`)"}
rules = []string{buildHostRule(rule.Host)}
}
if len(pa.Path) > 0 {
@ -562,7 +569,7 @@ func checkStringQuoteValidity(value string) error {
return err
}
func throttleEvents(ctx context.Context, throttleDuration time.Duration, stop chan bool, eventsChan <-chan interface{}) chan interface{} {
func throttleEvents(ctx context.Context, throttleDuration time.Duration, pool *safe.Pool, eventsChan <-chan interface{}) chan interface{} {
if throttleDuration == 0 {
return nil
}
@ -573,10 +580,10 @@ func throttleEvents(ctx context.Context, throttleDuration time.Duration, stop ch
// non-blocking write to pendingEvent. This guarantees that writing to
// eventChan will never block, and that pendingEvent will have
// something in it if there's been an event since we read from that channel.
go func() {
pool.GoCtx(func(ctxPool context.Context) {
for {
select {
case <-stop:
case <-ctxPool.Done():
return
case nextEvent := <-eventsChan:
select {
@ -590,7 +597,7 @@ func throttleEvents(ctx context.Context, throttleDuration time.Duration, stop ch
}
}
}
}()
})
return eventsChanBuffered
}

View file

@ -980,6 +980,35 @@ func TestLoadConfigurationFromIngresses(t *testing.T) {
},
},
},
{
desc: "Ingress with wildcard host",
expected: &dynamic.Configuration{
TCP: &dynamic.TCPConfiguration{},
HTTP: &dynamic.HTTPConfiguration{
Middlewares: map[string]*dynamic.Middleware{},
Routers: map[string]*dynamic.Router{
"foobar-com-bar": {
Rule: "HostRegexp(`{subdomain:[a-zA-Z0-9-]+}.foobar.com`) && PathPrefix(`/bar`)",
Service: "testing-service1-80",
},
},
Services: map[string]*dynamic.Service{
"testing-service1-80": {
LoadBalancer: &dynamic.ServersLoadBalancer{
PassHostHeader: Bool(true),
Servers: []dynamic.Server{
{
URL: "http://10.10.0.1:8080",
Scheme: "",
Port: "",
},
},
},
},
},
},
},
},
}
for _, test := range testCases {

View file

@ -61,7 +61,6 @@ func (p *Provider) Init(storeType store.Backend, name string) error {
// Provide allows the docker provider to provide configurations to traefik using the given configuration channel.
func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe.Pool) error {
ctx := log.With(context.Background(), log.Str(log.ProviderName, p.name))
logger := log.FromContext(ctx)
operation := func() error {
@ -89,8 +88,10 @@ func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe.
}
}
pool.Go(func(stop chan bool) {
err := p.watchKv(ctx, configurationChan, p.RootKey, stop)
pool.GoCtx(func(ctxPool context.Context) {
ctxLog := log.With(ctxPool, log.Str(log.ProviderName, p.name))
err := p.watchKv(ctxLog, configurationChan)
if err != nil {
logger.Errorf("Cannot watch KV store: %v", err)
}
@ -99,16 +100,16 @@ func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe.
return nil
}
func (p *Provider) watchKv(ctx context.Context, configurationChan chan<- dynamic.Message, prefix string, stop chan bool) error {
func (p *Provider) watchKv(ctx context.Context, configurationChan chan<- dynamic.Message) error {
operation := func() error {
events, err := p.kvClient.WatchTree(p.RootKey, make(chan struct{}), nil)
events, err := p.kvClient.WatchTree(p.RootKey, ctx.Done(), nil)
if err != nil {
return fmt.Errorf("failed to watch KV: %w", err)
}
for {
select {
case <-stop:
case <-ctx.Done():
return nil
case _, ok := <-events:
if !ok {
@ -133,7 +134,9 @@ func (p *Provider) watchKv(ctx context.Context, configurationChan chan<- dynamic
notify := func(err error, time time.Duration) {
log.FromContext(ctx).Errorf("KV connection error: %+v, retrying in %s", err, time)
}
err := backoff.RetryNotify(safe.OperationWithRecover(operation), job.NewBackOff(backoff.NewExponentialBackOff()), notify)
err := backoff.RetryNotify(safe.OperationWithRecover(operation),
backoff.WithContext(job.NewBackOff(backoff.NewExponentialBackOff()), ctx), notify)
if err != nil {
return fmt.Errorf("cannot connect to KV server: %w", err)
}

View file

@ -857,7 +857,7 @@ func TestKvWatchTree(t *testing.T) {
configChan := make(chan dynamic.Message)
go func() {
err := provider.watchKv(context.Background(), configChan, "prefix", make(chan bool, 1))
err := provider.watchKv(context.Background(), configChan)
require.NoError(t, err)
}()

View file

@ -159,11 +159,11 @@ func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe.
logger.Errorf("Failed to register for events, %s", err)
return err
}
pool.Go(func(stop chan bool) {
pool.GoCtx(func(ctxPool context.Context) {
defer close(update)
for {
select {
case <-stop:
case <-ctxPool.Done():
return
case event := <-update:
logger.Debugf("Received provider event %s", event)

View file

@ -5,6 +5,7 @@ import (
"net/http"
"github.com/containous/traefik/v2/pkg/config/runtime"
"github.com/containous/traefik/v2/pkg/server/provider"
)
// NewBuilder creates a builder.
@ -22,21 +23,28 @@ func (f *Builder) Build(ctx context.Context, names []string) func(*http.Response
var modifiers []func(*http.Response) error
for _, middleName := range names {
if conf, ok := f.configs[middleName]; ok {
if conf == nil || conf.Middleware == nil {
getLogger(ctx, middleName, "undefined").Error("Invalid Middleware configuration (ResponseModifier)")
continue
}
if conf.Headers != nil {
getLogger(ctx, middleName, "Headers").Debug("Creating Middleware (ResponseModifier)")
modifiers = append(modifiers, buildHeaders(conf.Headers))
} else if conf.Chain != nil {
getLogger(ctx, middleName, "Chain").Debug("Creating Middleware (ResponseModifier)")
modifiers = append(modifiers, f.Build(ctx, conf.Chain.Middlewares))
conf, ok := f.configs[middleName]
if !ok {
getLogger(ctx, middleName, "undefined").Debug("Middleware name not found in config (ResponseModifier)")
continue
}
if conf == nil || conf.Middleware == nil {
getLogger(ctx, middleName, "undefined").Error("Invalid Middleware configuration (ResponseModifier)")
continue
}
if conf.Headers != nil {
getLogger(ctx, middleName, "Headers").Debug("Creating Middleware (ResponseModifier)")
modifiers = append(modifiers, buildHeaders(conf.Headers))
} else if conf.Chain != nil {
chainCtx := provider.AddInContext(ctx, middleName)
getLogger(chainCtx, middleName, "Chain").Debug("Creating Middleware (ResponseModifier)")
var qualifiedNames []string
for _, name := range conf.Chain.Middlewares {
qualifiedNames = append(qualifiedNames, provider.GetQualifiedName(chainCtx, name))
}
modifiers = append(modifiers, f.Build(ctx, qualifiedNames))
}
}

View file

@ -10,88 +10,37 @@ import (
"github.com/containous/traefik/v2/pkg/log"
)
type routine struct {
goroutine func(chan bool)
stop chan bool
}
type routineCtx func(ctx context.Context)
// Pool is a pool of go routines
type Pool struct {
routines []routine
waitGroup sync.WaitGroup
lock sync.Mutex
baseCtx context.Context
baseCancel context.CancelFunc
ctx context.Context
cancel context.CancelFunc
waitGroup sync.WaitGroup
ctx context.Context
cancel context.CancelFunc
}
// NewPool creates a Pool
func NewPool(parentCtx context.Context) *Pool {
baseCtx, baseCancel := context.WithCancel(parentCtx)
ctx, cancel := context.WithCancel(baseCtx)
ctx, cancel := context.WithCancel(parentCtx)
return &Pool{
baseCtx: baseCtx,
baseCancel: baseCancel,
ctx: ctx,
cancel: cancel,
ctx: ctx,
cancel: cancel,
}
}
// Ctx returns main context
func (p *Pool) Ctx() context.Context {
return p.baseCtx
}
// GoCtx starts a recoverable goroutine with a context
func (p *Pool) GoCtx(goroutine routineCtx) {
p.lock.Lock()
p.waitGroup.Add(1)
Go(func() {
defer p.waitGroup.Done()
goroutine(p.ctx)
})
p.lock.Unlock()
}
// Go starts a recoverable goroutine, and can be stopped with stop chan
func (p *Pool) Go(goroutine func(stop chan bool)) {
p.lock.Lock()
newRoutine := routine{
goroutine: goroutine,
stop: make(chan bool, 1),
}
p.routines = append(p.routines, newRoutine)
p.waitGroup.Add(1)
Go(func() {
defer p.waitGroup.Done()
goroutine(newRoutine.stop)
})
p.lock.Unlock()
}
// Stop stops all started routines, waiting for their termination
func (p *Pool) Stop() {
p.lock.Lock()
defer p.lock.Unlock()
p.cancel()
for _, routine := range p.routines {
routine.stop <- true
}
p.waitGroup.Wait()
for _, routine := range p.routines {
close(routine.stop)
}
}
// Cleanup releases resources used by the pool, and should be called when the pool will no longer be used
func (p *Pool) Cleanup() {
p.Stop()
p.lock.Lock()
defer p.lock.Unlock()
p.baseCancel()
}
// Go starts a recoverable goroutine

View file

@ -18,12 +18,13 @@ func TestNewPoolContext(t *testing.T) {
ctx := context.WithValue(context.Background(), testKey, "test")
p := NewPool(ctx)
retCtx := p.Ctx()
retCtxVal, ok := retCtx.Value(testKey).(string)
if !ok || retCtxVal != "test" {
t.Errorf("Pool.Ctx() did not return a derived context, got %#v, expected context with test value", retCtx)
}
p.GoCtx(func(ctx context.Context) {
retCtxVal, ok := ctx.Value(testKey).(string)
if !ok || retCtxVal != "test" {
t.Errorf("Pool.Ctx() did not return a derived context, got %#v, expected context with test value", ctx)
}
})
p.Stop()
}
type fakeRoutine struct {
@ -46,14 +47,6 @@ func (tr *fakeRoutine) routineCtx(ctx context.Context) {
<-ctx.Done()
}
func (tr *fakeRoutine) routine(stop chan bool) {
tr.Lock()
tr.started = true
tr.Unlock()
tr.startSig <- true
<-stop
}
func TestPoolWithCtx(t *testing.T) {
testRoutine := newFakeRoutine()
@ -79,12 +72,12 @@ func TestPoolWithCtx(t *testing.T) {
defer timer.Stop()
test.fn(p)
defer p.Cleanup()
defer p.Stop()
testDone := make(chan bool, 1)
go func() {
<-testRoutine.startSig
p.Cleanup()
p.Stop()
testDone <- true
}()
@ -100,89 +93,30 @@ func TestPoolWithCtx(t *testing.T) {
}
}
func TestPoolWithStopChan(t *testing.T) {
testRoutine := newFakeRoutine()
func TestPoolCleanupWithGoPanicking(t *testing.T) {
p := NewPool(context.Background())
timer := time.NewTimer(500 * time.Millisecond)
defer timer.Stop()
p.Go(testRoutine.routine)
if len(p.routines) != 1 {
t.Fatalf("After Pool.Go(func), Pool did have %d goroutines, expected 1", len(p.routines))
}
p.GoCtx(func(ctx context.Context) {
panic("BOOM")
})
testDone := make(chan bool, 1)
go func() {
<-testRoutine.startSig
p.Cleanup()
p.Stop()
testDone <- true
}()
select {
case <-timer.C:
testRoutine.Lock()
defer testRoutine.Unlock()
t.Fatalf("Pool test did not complete in time, goroutine started equals '%t'", testRoutine.started)
t.Fatalf("Pool.Cleanup() did not complete in time with a panicking goroutine")
case <-testDone:
return
}
}
func TestPoolCleanupWithGoPanicking(t *testing.T) {
testRoutine := func(stop chan bool) {
panic("BOOM")
}
testCtxRoutine := func(ctx context.Context) {
panic("BOOM")
}
testCases := []struct {
desc string
fn func(*Pool)
}{
{
desc: "Go()",
fn: func(p *Pool) {
p.Go(testRoutine)
},
},
{
desc: "GoCtx()",
fn: func(p *Pool) {
p.GoCtx(testCtxRoutine)
},
},
}
for _, test := range testCases {
test := test
t.Run(test.desc, func(t *testing.T) {
p := NewPool(context.Background())
timer := time.NewTimer(500 * time.Millisecond)
defer timer.Stop()
test.fn(p)
testDone := make(chan bool, 1)
go func() {
p.Cleanup()
testDone <- true
}()
select {
case <-timer.C:
t.Fatalf("Pool.Cleanup() did not complete in time with a panicking goroutine")
case <-testDone:
return
}
})
}
}
func TestGoroutineRecover(t *testing.T) {
// if recover fails the test will panic
Go(func() {

View file

@ -3,7 +3,7 @@ package server
import (
"github.com/containous/traefik/v2/pkg/config/dynamic"
"github.com/containous/traefik/v2/pkg/log"
"github.com/containous/traefik/v2/pkg/server/internal"
"github.com/containous/traefik/v2/pkg/server/provider"
"github.com/containous/traefik/v2/pkg/tls"
)
@ -25,25 +25,25 @@ func mergeConfiguration(configurations dynamic.Configurations) dynamic.Configura
}
var defaultTLSOptionProviders []string
for provider, configuration := range configurations {
for pvd, configuration := range configurations {
if configuration.HTTP != nil {
for routerName, router := range configuration.HTTP.Routers {
conf.HTTP.Routers[internal.MakeQualifiedName(provider, routerName)] = router
conf.HTTP.Routers[provider.MakeQualifiedName(pvd, routerName)] = router
}
for middlewareName, middleware := range configuration.HTTP.Middlewares {
conf.HTTP.Middlewares[internal.MakeQualifiedName(provider, middlewareName)] = middleware
conf.HTTP.Middlewares[provider.MakeQualifiedName(pvd, middlewareName)] = middleware
}
for serviceName, service := range configuration.HTTP.Services {
conf.HTTP.Services[internal.MakeQualifiedName(provider, serviceName)] = service
conf.HTTP.Services[provider.MakeQualifiedName(pvd, serviceName)] = service
}
}
if configuration.TCP != nil {
for routerName, router := range configuration.TCP.Routers {
conf.TCP.Routers[internal.MakeQualifiedName(provider, routerName)] = router
conf.TCP.Routers[provider.MakeQualifiedName(pvd, routerName)] = router
}
for serviceName, service := range configuration.TCP.Services {
conf.TCP.Services[internal.MakeQualifiedName(provider, serviceName)] = service
conf.TCP.Services[provider.MakeQualifiedName(pvd, serviceName)] = service
}
}
@ -56,9 +56,9 @@ func mergeConfiguration(configurations dynamic.Configurations) dynamic.Configura
for tlsOptionsName, options := range configuration.TLS.Options {
if tlsOptionsName != "default" {
tlsOptionsName = internal.MakeQualifiedName(provider, tlsOptionsName)
tlsOptionsName = provider.MakeQualifiedName(pvd, tlsOptionsName)
} else {
defaultTLSOptionProviders = append(defaultTLSOptionProviders, provider)
defaultTLSOptionProviders = append(defaultTLSOptionProviders, pvd)
}
conf.TLS.Options[tlsOptionsName] = options

View file

@ -1,6 +1,7 @@
package server
import (
"context"
"encoding/json"
"reflect"
"time"
@ -49,8 +50,8 @@ func NewConfigurationWatcher(routinesPool *safe.Pool, pvd provider.Provider, pro
// Start the configuration watcher.
func (c *ConfigurationWatcher) Start() {
c.routinesPool.Go(c.listenProviders)
c.routinesPool.Go(c.listenConfigurations)
c.routinesPool.GoCtx(c.listenProviders)
c.routinesPool.GoCtx(c.listenConfigurations)
c.startProvider()
}
@ -90,10 +91,10 @@ func (c *ConfigurationWatcher) startProvider() {
// listenProviders receives configuration changes from the providers.
// The configuration message then gets passed along a series of check
// to finally end up in a throttler that sends it to listenConfigurations (through c. configurationValidatedChan).
func (c *ConfigurationWatcher) listenProviders(stop chan bool) {
func (c *ConfigurationWatcher) listenProviders(ctx context.Context) {
for {
select {
case <-stop:
case <-ctx.Done():
return
case configMsg, ok := <-c.configurationChan:
if !ok {
@ -111,10 +112,10 @@ func (c *ConfigurationWatcher) listenProviders(stop chan bool) {
}
}
func (c *ConfigurationWatcher) listenConfigurations(stop chan bool) {
func (c *ConfigurationWatcher) listenConfigurations(ctx context.Context) {
for {
select {
case <-stop:
case <-ctx.Done():
return
case configMsg, ok := <-c.configurationValidatedChan:
if !ok || configMsg.Configuration == nil {
@ -150,8 +151,10 @@ func (c *ConfigurationWatcher) preLoadConfiguration(configMsg dynamic.Message) {
if copyConf.TLS != nil {
copyConf.TLS.Certificates = nil
for _, v := range copyConf.TLS.Stores {
v.DefaultCertificate = nil
for k := range copyConf.TLS.Stores {
st := copyConf.TLS.Stores[k]
st.DefaultCertificate = nil
copyConf.TLS.Stores[k] = st
}
}
@ -178,8 +181,8 @@ func (c *ConfigurationWatcher) preLoadConfiguration(configMsg dynamic.Message) {
if !ok {
providerConfigUpdateCh = make(chan dynamic.Message)
c.providerConfigUpdateMap[configMsg.ProviderName] = providerConfigUpdateCh
c.routinesPool.Go(func(stop chan bool) {
c.throttleProviderConfigReload(c.providersThrottleDuration, c.configurationValidatedChan, providerConfigUpdateCh, stop)
c.routinesPool.GoCtx(func(ctxPool context.Context) {
c.throttleProviderConfigReload(ctxPool, c.providersThrottleDuration, c.configurationValidatedChan, providerConfigUpdateCh)
})
}
@ -190,14 +193,14 @@ func (c *ConfigurationWatcher) preLoadConfiguration(configMsg dynamic.Message) {
// It will immediately publish a new configuration and then only publish the next configuration after the throttle duration.
// Note that in the case it receives N new configs in the timeframe of the throttle duration after publishing,
// it will publish the last of the newly received configurations.
func (c *ConfigurationWatcher) throttleProviderConfigReload(throttle time.Duration, publish chan<- dynamic.Message, in <-chan dynamic.Message, stop chan bool) {
func (c *ConfigurationWatcher) throttleProviderConfigReload(ctx context.Context, throttle time.Duration, publish chan<- dynamic.Message, in <-chan dynamic.Message) {
ring := channels.NewRingChannel(1)
defer ring.Close()
c.routinesPool.Go(func(stop chan bool) {
c.routinesPool.GoCtx(func(ctxPool context.Context) {
for {
select {
case <-stop:
case <-ctxPool.Done():
return
case nextConfig := <-ring.Out():
if config, ok := nextConfig.(dynamic.Message); ok {
@ -210,7 +213,7 @@ func (c *ConfigurationWatcher) throttleProviderConfigReload(throttle time.Durati
for {
select {
case <-stop:
case <-ctx.Done():
return
case nextConfig := <-in:
ring.In() <- nextConfig

View file

@ -28,7 +28,7 @@ import (
"github.com/containous/traefik/v2/pkg/middlewares/stripprefix"
"github.com/containous/traefik/v2/pkg/middlewares/stripprefixregex"
"github.com/containous/traefik/v2/pkg/middlewares/tracing"
"github.com/containous/traefik/v2/pkg/server/internal"
"github.com/containous/traefik/v2/pkg/server/provider"
)
type middlewareStackType int
@ -56,10 +56,10 @@ func NewBuilder(configs map[string]*runtime.MiddlewareInfo, serviceBuilder servi
func (b *Builder) BuildChain(ctx context.Context, middlewares []string) *alice.Chain {
chain := alice.New()
for _, name := range middlewares {
middlewareName := internal.GetQualifiedName(ctx, name)
middlewareName := provider.GetQualifiedName(ctx, name)
chain = chain.Append(func(next http.Handler) (http.Handler, error) {
constructorContext := internal.AddProviderInContext(ctx, middlewareName)
constructorContext := provider.AddInContext(ctx, middlewareName)
if midInf, ok := b.configs[middlewareName]; !ok || midInf.Middleware == nil {
return nil, fmt.Errorf("middleware %q does not exist", middlewareName)
}
@ -144,7 +144,7 @@ func (b *Builder) buildConstructor(ctx context.Context, middlewareName string) (
var qualifiedNames []string
for _, name := range config.Chain.Middlewares {
qualifiedNames = append(qualifiedNames, internal.GetQualifiedName(ctx, name))
qualifiedNames = append(qualifiedNames, provider.GetQualifiedName(ctx, name))
}
config.Chain.Middlewares = qualifiedNames
middleware = func(next http.Handler) (http.Handler, error) {

View file

@ -9,7 +9,7 @@ import (
"github.com/containous/traefik/v2/pkg/config/dynamic"
"github.com/containous/traefik/v2/pkg/config/runtime"
"github.com/containous/traefik/v2/pkg/server/internal"
"github.com/containous/traefik/v2/pkg/server/provider"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -262,7 +262,7 @@ func TestBuilder_BuildChainWithContext(t *testing.T) {
ctx := context.Background()
if len(test.contextProvider) > 0 {
ctx = internal.AddProviderInContext(ctx, "foobar@"+test.contextProvider)
ctx = provider.AddInContext(ctx, "foobar@"+test.contextProvider)
}
rtConf := runtime.NewConfig(dynamic.Configuration{

View file

@ -1,4 +1,4 @@
package internal
package provider
import (
"context"
@ -10,29 +10,29 @@ import (
type contextKey int
const (
providerKey contextKey = iota
key contextKey = iota
)
// AddProviderInContext Adds the provider name in the context
func AddProviderInContext(ctx context.Context, elementName string) context.Context {
// AddInContext Adds the provider name in the context
func AddInContext(ctx context.Context, elementName string) context.Context {
parts := strings.Split(elementName, "@")
if len(parts) == 1 {
log.FromContext(ctx).Debugf("Could not find a provider for %s.", elementName)
return ctx
}
if name, ok := ctx.Value(providerKey).(string); ok && name == parts[1] {
if name, ok := ctx.Value(key).(string); ok && name == parts[1] {
return ctx
}
return context.WithValue(ctx, providerKey, parts[1])
return context.WithValue(ctx, key, parts[1])
}
// GetQualifiedName Gets the fully qualified name.
func GetQualifiedName(ctx context.Context, elementName string) string {
parts := strings.Split(elementName, "@")
if len(parts) == 1 {
if providerName, ok := ctx.Value(providerKey).(string); ok {
if providerName, ok := ctx.Value(key).(string); ok {
return MakeQualifiedName(providerName, parts[0])
}
}

View file

@ -1,4 +1,4 @@
package internal
package provider
import (
"context"
@ -28,19 +28,19 @@ func TestAddProviderInContext(t *testing.T) {
},
{
desc: "provider name in context",
ctx: context.WithValue(context.Background(), providerKey, "foo"),
ctx: context.WithValue(context.Background(), key, "foo"),
name: "test",
expected: "foo",
},
{
desc: "provider name in context and different provider name embedded in element name",
ctx: context.WithValue(context.Background(), providerKey, "foo"),
ctx: context.WithValue(context.Background(), key, "foo"),
name: "test@fii",
expected: "fii",
},
{
desc: "provider name in context and same provider name embedded in element name",
ctx: context.WithValue(context.Background(), providerKey, "foo"),
ctx: context.WithValue(context.Background(), key, "foo"),
name: "test@foo",
expected: "foo",
},
@ -51,10 +51,10 @@ func TestAddProviderInContext(t *testing.T) {
t.Run(test.desc, func(t *testing.T) {
t.Parallel()
newCtx := AddProviderInContext(test.ctx, test.name)
newCtx := AddInContext(test.ctx, test.name)
var providerName string
if name, ok := newCtx.Value(providerKey).(string); ok {
if name, ok := newCtx.Value(key).(string); ok {
providerName = name
}
@ -90,13 +90,13 @@ func TestGetQualifiedName(t *testing.T) {
},
{
desc: "with provider in context",
ctx: context.WithValue(context.Background(), providerKey, "foo"),
ctx: context.WithValue(context.Background(), key, "foo"),
name: "test",
expected: "test@foo",
},
{
desc: "with provider in context and explicit name",
ctx: context.WithValue(context.Background(), providerKey, "foo"),
ctx: context.WithValue(context.Background(), key, "foo"),
name: "test@fii",
expected: "test@fii",
},

View file

@ -12,8 +12,8 @@ import (
"github.com/containous/traefik/v2/pkg/middlewares/recovery"
"github.com/containous/traefik/v2/pkg/middlewares/tracing"
"github.com/containous/traefik/v2/pkg/rules"
"github.com/containous/traefik/v2/pkg/server/internal"
"github.com/containous/traefik/v2/pkg/server/middleware"
"github.com/containous/traefik/v2/pkg/server/provider"
)
const (
@ -121,7 +121,7 @@ func (m *Manager) buildEntryPointHandler(ctx context.Context, configs map[string
}
for routerName, routerConfig := range configs {
ctxRouter := log.With(internal.AddProviderInContext(ctx, routerName), log.Str(log.RouterName, routerName))
ctxRouter := log.With(provider.AddInContext(ctx, routerName), log.Str(log.RouterName, routerName))
logger := log.FromContext(ctxRouter)
handler, err := m.buildRouterHandler(ctxRouter, routerName, routerConfig)
@ -175,7 +175,7 @@ func (m *Manager) buildRouterHandler(ctx context.Context, routerName string, rou
func (m *Manager) buildHTTPHandler(ctx context.Context, router *runtime.RouterInfo, routerName string) (http.Handler, error) {
var qualifiedNames []string
for _, name := range router.Middlewares {
qualifiedNames = append(qualifiedNames, internal.GetQualifiedName(ctx, name))
qualifiedNames = append(qualifiedNames, provider.GetQualifiedName(ctx, name))
}
router.Middlewares = qualifiedNames
rm := m.modifierBuilder.Build(ctx, qualifiedNames)

View file

@ -10,7 +10,7 @@ import (
"github.com/containous/traefik/v2/pkg/config/runtime"
"github.com/containous/traefik/v2/pkg/log"
"github.com/containous/traefik/v2/pkg/rules"
"github.com/containous/traefik/v2/pkg/server/internal"
"github.com/containous/traefik/v2/pkg/server/provider"
tcpservice "github.com/containous/traefik/v2/pkg/server/service/tcp"
"github.com/containous/traefik/v2/pkg/tcp"
traefiktls "github.com/containous/traefik/v2/pkg/tls"
@ -112,7 +112,7 @@ func (m *Manager) buildEntryPointHandler(ctx context.Context, configs map[string
continue
}
ctxRouter := log.With(internal.AddProviderInContext(ctx, routerHTTPName), log.Str(log.RouterName, routerHTTPName))
ctxRouter := log.With(provider.AddInContext(ctx, routerHTTPName), log.Str(log.RouterName, routerHTTPName))
logger := log.FromContext(ctxRouter)
domains, err := rules.ParseDomains(routerHTTPConfig.Rule)
@ -131,7 +131,7 @@ func (m *Manager) buildEntryPointHandler(ctx context.Context, configs map[string
if routerHTTPConfig.TLS != nil {
tlsOptionsName := routerHTTPConfig.TLS.Options
if tlsOptionsName != defaultTLSConfigName {
tlsOptionsName = internal.GetQualifiedName(ctxRouter, routerHTTPConfig.TLS.Options)
tlsOptionsName = provider.GetQualifiedName(ctxRouter, routerHTTPConfig.TLS.Options)
}
tlsConf, err := m.tlsManager.Get(defaultTLSStoreName, tlsOptionsName)
@ -180,7 +180,7 @@ func (m *Manager) buildEntryPointHandler(ctx context.Context, configs map[string
}
for routerName, routerConfig := range configs {
ctxRouter := log.With(internal.AddProviderInContext(ctx, routerName), log.Str(log.RouterName, routerName))
ctxRouter := log.With(provider.AddInContext(ctx, routerName), log.Str(log.RouterName, routerName))
logger := log.FromContext(ctxRouter)
if routerConfig.Service == "" {
@ -226,7 +226,7 @@ func (m *Manager) buildEntryPointHandler(ctx context.Context, configs map[string
}
if tlsOptionsName != defaultTLSConfigName {
tlsOptionsName = internal.GetQualifiedName(ctxRouter, tlsOptionsName)
tlsOptionsName = provider.GetQualifiedName(ctxRouter, tlsOptionsName)
}
tlsConf, err := m.tlsManager.Get(defaultTLSStoreName, tlsOptionsName)

View file

@ -58,7 +58,7 @@ func (s *Server) Start(ctx context.Context) {
s.tcpEntryPoints.Start()
s.watcher.Start()
s.routinesPool.Go(s.listenSignals)
s.routinesPool.GoCtx(s.listenSignals)
}
// Wait blocks until the server shutdown.
@ -90,7 +90,7 @@ func (s *Server) Close() {
stopMetricsClients()
s.routinesPool.Cleanup()
s.routinesPool.Stop()
signal.Stop(s.signals)
close(s.signals)

View file

@ -3,6 +3,7 @@
package server
import (
"context"
"os/signal"
"syscall"
@ -13,10 +14,10 @@ func (s *Server) configureSignals() {
signal.Notify(s.signals, syscall.SIGUSR1)
}
func (s *Server) listenSignals(stop chan bool) {
func (s *Server) listenSignals(ctx context.Context) {
for {
select {
case <-stop:
case <-ctx.Done():
return
case sig := <-s.signals:
if sig == syscall.SIGUSR1 {

View file

@ -2,6 +2,8 @@
package server
import "context"
func (s *Server) configureSignals() {}
func (s *Server) listenSignals(stop chan bool) {}
func (s *Server) listenSignals(ctx context.Context) {}

View file

@ -22,7 +22,7 @@ import (
"github.com/containous/traefik/v2/pkg/middlewares/pipelining"
"github.com/containous/traefik/v2/pkg/safe"
"github.com/containous/traefik/v2/pkg/server/cookie"
"github.com/containous/traefik/v2/pkg/server/internal"
"github.com/containous/traefik/v2/pkg/server/provider"
"github.com/containous/traefik/v2/pkg/server/service/loadbalancer/mirror"
"github.com/containous/traefik/v2/pkg/server/service/loadbalancer/wrr"
"github.com/vulcand/oxy/roundrobin"
@ -63,8 +63,8 @@ type Manager struct {
func (m *Manager) BuildHTTP(rootCtx context.Context, serviceName string, responseModifier func(*http.Response) error) (http.Handler, error) {
ctx := log.With(rootCtx, log.Str(log.ServiceName, serviceName))
serviceName = internal.GetQualifiedName(ctx, serviceName)
ctx = internal.AddProviderInContext(ctx, serviceName)
serviceName = provider.GetQualifiedName(ctx, serviceName)
ctx = provider.AddInContext(ctx, serviceName)
conf, ok := m.configs[serviceName]
if !ok {

View file

@ -9,7 +9,7 @@ import (
"github.com/containous/traefik/v2/pkg/config/dynamic"
"github.com/containous/traefik/v2/pkg/config/runtime"
"github.com/containous/traefik/v2/pkg/server/internal"
"github.com/containous/traefik/v2/pkg/server/provider"
"github.com/containous/traefik/v2/pkg/testhelpers"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -336,7 +336,7 @@ func TestManager_Build(t *testing.T) {
ctx := context.Background()
if len(test.providerName) > 0 {
ctx = internal.AddProviderInContext(ctx, "foobar@"+test.providerName)
ctx = provider.AddInContext(ctx, "foobar@"+test.providerName)
}
_, err := manager.BuildHTTP(ctx, test.serviceName, nil)

View file

@ -9,7 +9,7 @@ import (
"github.com/containous/traefik/v2/pkg/config/runtime"
"github.com/containous/traefik/v2/pkg/log"
"github.com/containous/traefik/v2/pkg/server/internal"
"github.com/containous/traefik/v2/pkg/server/provider"
"github.com/containous/traefik/v2/pkg/tcp"
)
@ -27,8 +27,8 @@ func NewManager(conf *runtime.Configuration) *Manager {
// BuildTCP Creates a tcp.Handler for a service configuration.
func (m *Manager) BuildTCP(rootCtx context.Context, serviceName string) (tcp.Handler, error) {
serviceQualifiedName := internal.GetQualifiedName(rootCtx, serviceName)
ctx := internal.AddProviderInContext(rootCtx, serviceQualifiedName)
serviceQualifiedName := provider.GetQualifiedName(rootCtx, serviceName)
ctx := provider.AddInContext(rootCtx, serviceQualifiedName)
ctx = log.With(ctx, log.Str(log.ServiceName, serviceName))
conf, ok := m.configs[serviceQualifiedName]

View file

@ -6,7 +6,7 @@ import (
"github.com/containous/traefik/v2/pkg/config/dynamic"
"github.com/containous/traefik/v2/pkg/config/runtime"
"github.com/containous/traefik/v2/pkg/server/internal"
"github.com/containous/traefik/v2/pkg/server/provider"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@ -184,7 +184,7 @@ func TestManager_BuildTCP(t *testing.T) {
ctx := context.Background()
if len(test.providerName) > 0 {
ctx = internal.AddProviderInContext(ctx, "foobar@"+test.providerName)
ctx = provider.AddInContext(ctx, "foobar@"+test.providerName)
}
handler, err := manager.BuildTCP(ctx, test.serviceName)