Merge branch 'v2.1' into master
This commit is contained in:
commit
2d3fc613ec
44 changed files with 923 additions and 392 deletions
|
@ -49,8 +49,8 @@ func NewConfigurationWatcher(routinesPool *safe.Pool, pvd provider.Provider, pro
|
|||
|
||||
// Start the configuration watcher.
|
||||
func (c *ConfigurationWatcher) Start() {
|
||||
c.routinesPool.Go(func(stop chan bool) { c.listenProviders(stop) })
|
||||
c.routinesPool.Go(func(stop chan bool) { c.listenConfigurations(stop) })
|
||||
c.routinesPool.Go(c.listenProviders)
|
||||
c.routinesPool.Go(c.listenConfigurations)
|
||||
c.startProvider()
|
||||
}
|
||||
|
||||
|
|
|
@ -48,7 +48,6 @@ func NewServer(routinesPool *safe.Pool, entryPoints TCPEntryPoints, watcher *Con
|
|||
// Start starts the server and Stop/Close it when context is Done
|
||||
func (s *Server) Start(ctx context.Context) {
|
||||
go func() {
|
||||
defer s.Close()
|
||||
<-ctx.Done()
|
||||
logger := log.FromContext(ctx)
|
||||
logger.Info("I have to go...")
|
||||
|
@ -59,9 +58,7 @@ func (s *Server) Start(ctx context.Context) {
|
|||
s.tcpEntryPoints.Start()
|
||||
s.watcher.Start()
|
||||
|
||||
s.routinesPool.Go(func(stop chan bool) {
|
||||
s.listenSignals(stop)
|
||||
})
|
||||
s.routinesPool.Go(s.listenSignals)
|
||||
}
|
||||
|
||||
// Wait blocks until the server shutdown.
|
||||
|
|
|
@ -158,6 +158,10 @@ func (e *TCPEntryPoint) StartTCP(ctx context.Context) {
|
|||
conn, err := e.listener.Accept()
|
||||
if err != nil {
|
||||
logger.Error(err)
|
||||
if netErr, ok := err.(net.Error); ok && netErr.Temporary() {
|
||||
continue
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"net/http"
|
||||
"sync"
|
||||
|
||||
"github.com/containous/traefik/v2/pkg/middlewares/accesslog"
|
||||
"github.com/containous/traefik/v2/pkg/safe"
|
||||
)
|
||||
|
||||
|
@ -63,11 +64,21 @@ func (m *Mirroring) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
|
|||
if handler.count*100 < total*uint64(handler.percent) {
|
||||
handler.count++
|
||||
handler.lock.Unlock()
|
||||
|
||||
// In ServeHTTP, we rely on the presence of the accesslog datatable found in the
|
||||
// request's context to know whether we should mutate said datatable (and
|
||||
// contribute some fields to the log). In this instance, we do not want the mirrors
|
||||
// mutating (i.e. changing the service name in) the logs related to the mirrored
|
||||
// server. Especially since it would result in unguarded concurrent reads/writes on
|
||||
// the datatable. Therefore, we reset any potential datatable key in the new
|
||||
// context that we pass around.
|
||||
ctx := context.WithValue(req.Context(), accesslog.DataTableKey, nil)
|
||||
|
||||
// When a request served by m.handler is successful, req.Context will be canceled,
|
||||
// which would trigger a cancellation of the ongoing mirrored requests.
|
||||
// Therefore, we give a new, non-cancellable context to each of the mirrored calls,
|
||||
// so they can terminate by themselves.
|
||||
handler.ServeHTTP(m.rw, req.WithContext(contextStopPropagation{req.Context()}))
|
||||
handler.ServeHTTP(m.rw, req.WithContext(contextStopPropagation{ctx}))
|
||||
} else {
|
||||
handler.lock.Unlock()
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue