Fix races

Signed-off-by: Emile Vauge <emile@vauge.com>
This commit is contained in:
Emile Vauge 2016-04-13 20:36:23 +02:00
parent 4e427b5a9e
commit c1078c4374
No known key found for this signature in database
GPG key ID: D808B4C167352E59
19 changed files with 269 additions and 143 deletions

131
server.go
View file

@ -15,7 +15,6 @@ import (
"regexp"
"sort"
"strconv"
"sync"
"syscall"
"time"
@ -33,6 +32,7 @@ import (
"github.com/containous/traefik/types"
"github.com/gorilla/mux"
"github.com/mailgun/manners"
"github.com/streamrail/concurrent-map"
)
var oxyLogger = &OxyLogger{}
@ -45,10 +45,10 @@ type Server struct {
signals chan os.Signal
stopChan chan bool
providers []provider.Provider
serverLock sync.Mutex
currentConfigurations configs
currentConfigurations safe.Safe
globalConfiguration GlobalConfiguration
loggerMiddleware *middlewares.Logger
routinesPool safe.Pool
}
type serverEntryPoints map[string]*serverEntryPoint
@ -71,10 +71,11 @@ func NewServer(globalConfiguration GlobalConfiguration) *Server {
server.configurationChan = make(chan types.ConfigMessage, 10)
server.configurationValidatedChan = make(chan types.ConfigMessage, 10)
server.signals = make(chan os.Signal, 1)
server.stopChan = make(chan bool)
server.stopChan = make(chan bool, 1)
server.providers = []provider.Provider{}
signal.Notify(server.signals, syscall.SIGINT, syscall.SIGTERM)
server.currentConfigurations = make(configs)
currentConfigurations := make(configs)
server.currentConfigurations.Set(currentConfigurations)
server.globalConfiguration = globalConfiguration
server.loggerMiddleware = middlewares.NewLogger(globalConfiguration.AccessLogsFile)
@ -84,11 +85,11 @@ func NewServer(globalConfiguration GlobalConfiguration) *Server {
// Start starts the server and blocks until server is shutted down.
func (server *Server) Start() {
server.startHTTPServers()
safe.Go(func() {
server.listenProviders()
server.routinesPool.Go(func(stop chan bool) {
server.listenProviders(stop)
})
safe.Go(func() {
server.listenConfigurations()
server.routinesPool.Go(func(stop chan bool) {
server.listenConfigurations(stop)
})
server.configureProviders()
server.startProviders()
@ -106,6 +107,7 @@ func (server *Server) Stop() {
// Close destroys the server
func (server *Server) Close() {
server.routinesPool.Stop()
close(server.configurationChan)
close(server.configurationValidatedChan)
close(server.signals)
@ -126,58 +128,79 @@ func (server *Server) startHTTPServers() {
}
}
func (server *Server) listenProviders() {
lastReceivedConfiguration := time.Unix(0, 0)
lastConfigs := make(map[string]*types.ConfigMessage)
func (server *Server) listenProviders(stop chan bool) {
lastReceivedConfiguration := safe.New(time.Unix(0, 0))
lastConfigs := cmap.New()
for {
configMsg := <-server.configurationChan
jsonConf, _ := json.Marshal(configMsg.Configuration)
log.Debugf("Configuration received from provider %s: %s", configMsg.ProviderName, string(jsonConf))
lastConfigs[configMsg.ProviderName] = &configMsg
if time.Now().After(lastReceivedConfiguration.Add(time.Duration(server.globalConfiguration.ProvidersThrottleDuration))) {
log.Debugf("Last %s config received more than %s, OK", configMsg.ProviderName, server.globalConfiguration.ProvidersThrottleDuration)
// last config received more than n s ago
server.configurationValidatedChan <- configMsg
} else {
log.Debugf("Last %s config received less than %s, waiting...", configMsg.ProviderName, server.globalConfiguration.ProvidersThrottleDuration)
safe.Go(func() {
<-time.After(server.globalConfiguration.ProvidersThrottleDuration)
if time.Now().After(lastReceivedConfiguration.Add(time.Duration(server.globalConfiguration.ProvidersThrottleDuration))) {
log.Debugf("Waited for %s config, OK", configMsg.ProviderName)
server.configurationValidatedChan <- *lastConfigs[configMsg.ProviderName]
}
})
select {
case <-stop:
return
case configMsg, ok := <-server.configurationChan:
if !ok {
return
}
jsonConf, _ := json.Marshal(configMsg.Configuration)
log.Debugf("Configuration received from provider %s: %s", configMsg.ProviderName, string(jsonConf))
lastConfigs.Set(configMsg.ProviderName, &configMsg)
lastReceivedConfigurationValue := lastReceivedConfiguration.Get().(time.Time)
if time.Now().After(lastReceivedConfigurationValue.Add(time.Duration(server.globalConfiguration.ProvidersThrottleDuration))) {
log.Debugf("Last %s config received more than %s, OK", configMsg.ProviderName, server.globalConfiguration.ProvidersThrottleDuration)
// last config received more than n s ago
server.configurationValidatedChan <- configMsg
} else {
log.Debugf("Last %s config received less than %s, waiting...", configMsg.ProviderName, server.globalConfiguration.ProvidersThrottleDuration)
server.routinesPool.Go(func(stop chan bool) {
select {
case <-stop:
return
case <-time.After(server.globalConfiguration.ProvidersThrottleDuration):
lastReceivedConfigurationValue := lastReceivedConfiguration.Get().(time.Time)
if time.Now().After(lastReceivedConfigurationValue.Add(time.Duration(server.globalConfiguration.ProvidersThrottleDuration))) {
log.Debugf("Waited for %s config, OK", configMsg.ProviderName)
if lastConfig, ok := lastConfigs.Get(configMsg.ProviderName); ok {
server.configurationValidatedChan <- *lastConfig.(*types.ConfigMessage)
}
}
}
})
}
lastReceivedConfiguration.Set(time.Now())
}
lastReceivedConfiguration = time.Now()
}
}
func (server *Server) listenConfigurations() {
func (server *Server) listenConfigurations(stop chan bool) {
for {
configMsg := <-server.configurationValidatedChan
if configMsg.Configuration == nil {
log.Info("Skipping empty Configuration")
} else if reflect.DeepEqual(server.currentConfigurations[configMsg.ProviderName], configMsg.Configuration) {
log.Info("Skipping same configuration")
} else {
// Copy configurations to new map so we don't change current if LoadConfig fails
newConfigurations := make(configs)
for k, v := range server.currentConfigurations {
newConfigurations[k] = v
select {
case <-stop:
return
case configMsg, ok := <-server.configurationValidatedChan:
if !ok {
return
}
newConfigurations[configMsg.ProviderName] = configMsg.Configuration
newServerEntryPoints, err := server.loadConfig(newConfigurations, server.globalConfiguration)
if err == nil {
server.serverLock.Lock()
for newServerEntryPointName, newServerEntryPoint := range newServerEntryPoints {
server.serverEntryPoints[newServerEntryPointName].httpRouter.UpdateHandler(newServerEntryPoint.httpRouter.GetHandler())
log.Infof("Server configuration reloaded on %s", server.serverEntryPoints[newServerEntryPointName].httpServer.Addr)
}
server.currentConfigurations = newConfigurations
server.serverLock.Unlock()
currentConfigurations := server.currentConfigurations.Get().(configs)
if configMsg.Configuration == nil {
log.Info("Skipping empty Configuration")
} else if reflect.DeepEqual(currentConfigurations[configMsg.ProviderName], configMsg.Configuration) {
log.Info("Skipping same configuration")
} else {
log.Error("Error loading new configuration, aborted ", err)
// Copy configurations to new map so we don't change current if LoadConfig fails
newConfigurations := make(configs)
for k, v := range currentConfigurations {
newConfigurations[k] = v
}
newConfigurations[configMsg.ProviderName] = configMsg.Configuration
newServerEntryPoints, err := server.loadConfig(newConfigurations, server.globalConfiguration)
if err == nil {
for newServerEntryPointName, newServerEntryPoint := range newServerEntryPoints {
server.serverEntryPoints[newServerEntryPointName].httpRouter.UpdateHandler(newServerEntryPoint.httpRouter.GetHandler())
log.Infof("Server configuration reloaded on %s", server.serverEntryPoints[newServerEntryPointName].httpServer.Addr)
}
server.currentConfigurations.Set(newConfigurations)
} else {
log.Error("Error loading new configuration, aborted ", err)
}
}
}
}
@ -222,7 +245,7 @@ func (server *Server) startProviders() {
log.Infof("Starting provider %v %s", reflect.TypeOf(provider), jsonConf)
currentProvider := provider
safe.Go(func() {
err := currentProvider.Provide(server.configurationChan)
err := currentProvider.Provide(server.configurationChan, &server.routinesPool)
if err != nil {
log.Errorf("Error starting provider %s", err)
}