fix concurrent provider config reloads

This commit is contained in:
Marco Jantke 2017-11-17 10:26:03 +01:00 committed by Traefiker
parent 722f299306
commit cdab6b1796
6 changed files with 223 additions and 353 deletions

View file

@ -36,7 +36,7 @@ import (
traefikTls "github.com/containous/traefik/tls"
"github.com/containous/traefik/types"
"github.com/containous/traefik/whitelist"
"github.com/streamrail/concurrent-map"
"github.com/eapache/channels"
thoas_stats "github.com/thoas/stats"
"github.com/urfave/negroni"
"github.com/vulcand/oxy/cbreaker"
@ -67,8 +67,6 @@ type Server struct {
leadership *cluster.Leadership
defaultForwardingRoundTripper http.RoundTripper
metricsRegistry metrics.Registry
lastReceivedConfiguration *safe.Safe
lastConfigs cmap.ConcurrentMap
}
type serverEntryPoints map[string]*serverEntryPoint
@ -109,8 +107,6 @@ func NewServer(globalConfiguration configuration.GlobalConfiguration) *Server {
server.routinesPool = safe.NewPool(context.Background())
server.defaultForwardingRoundTripper = createHTTPTransport(globalConfiguration)
server.lastReceivedConfiguration = safe.New(time.Unix(0, 0))
server.lastConfigs = cmap.New()
server.metricsRegistry = metrics.NewVoidRegistry()
if globalConfiguration.Metrics != nil {
@ -345,6 +341,8 @@ func (server *Server) listenProviders(stop chan bool) {
}
func (server *Server) preLoadConfiguration(configMsg types.ConfigMessage) {
providerConfigUpdateMap := map[string]chan types.ConfigMessage{}
providersThrottleDuration := time.Duration(server.globalConfiguration.ProvidersThrottleDuration)
server.defaultConfigurationValues(configMsg.Configuration)
currentConfigurations := server.currentConfigurations.Get().(types.Configurations)
jsonConf, _ := json.Marshal(configMsg.Configuration)
@ -354,28 +352,43 @@ func (server *Server) preLoadConfiguration(configMsg types.ConfigMessage) {
} else if reflect.DeepEqual(currentConfigurations[configMsg.ProviderName], configMsg.Configuration) {
log.Infof("Skipping same configuration for provider %s", configMsg.ProviderName)
} else {
server.lastConfigs.Set(configMsg.ProviderName, &configMsg)
lastReceivedConfigurationValue := server.lastReceivedConfiguration.Get().(time.Time)
providersThrottleDuration := time.Duration(server.globalConfiguration.ProvidersThrottleDuration)
if time.Now().After(lastReceivedConfigurationValue.Add(providersThrottleDuration)) {
log.Debugf("Last %s configuration received more than %s, OK", configMsg.ProviderName, server.globalConfiguration.ProvidersThrottleDuration.String())
// last config received more than n server ago
server.configurationValidatedChan <- configMsg
} else {
log.Debugf("Last %s configuration received less than %s, waiting...", configMsg.ProviderName, server.globalConfiguration.ProvidersThrottleDuration.String())
safe.Go(func() {
<-time.After(providersThrottleDuration)
lastReceivedConfigurationValue := server.lastReceivedConfiguration.Get().(time.Time)
if time.Now().After(lastReceivedConfigurationValue.Add(time.Duration(providersThrottleDuration))) {
log.Debugf("Waited for %s configuration, OK", configMsg.ProviderName)
if lastConfig, ok := server.lastConfigs.Get(configMsg.ProviderName); ok {
server.configurationValidatedChan <- *lastConfig.(*types.ConfigMessage)
}
}
if _, ok := providerConfigUpdateMap[configMsg.ProviderName]; !ok {
providerConfigUpdate := make(chan types.ConfigMessage)
providerConfigUpdateMap[configMsg.ProviderName] = providerConfigUpdate
server.routinesPool.Go(func(stop chan bool) {
throttleProviderConfigReload(providersThrottleDuration, server.configurationValidatedChan, providerConfigUpdate, stop)
})
}
// Update the last configuration loading time
server.lastReceivedConfiguration.Set(time.Now())
providerConfigUpdateMap[configMsg.ProviderName] <- configMsg
}
}
// throttleProviderConfigReload throttles the configuration reload speed for a single provider.
// It will immediately publish a new configuration and then only publish the next configuration after the throttle duration.
// Note that in the case it receives N new configs in the timeframe of the throttle duration after publishing,
// it will publish the last of the newly received configurations.
func throttleProviderConfigReload(throttle time.Duration, publish chan<- types.ConfigMessage, in <-chan types.ConfigMessage, stop chan bool) {
ring := channels.NewRingChannel(1)
safe.Go(func() {
for {
select {
case <-stop:
return
case nextConfig := <-ring.Out():
publish <- nextConfig.(types.ConfigMessage)
time.Sleep(throttle)
}
}
})
for {
select {
case <-stop:
return
case nextConfig := <-in:
ring.In() <- nextConfig
}
}
}

View file

@ -160,6 +160,189 @@ func TestPrepareServerTimeouts(t *testing.T) {
}
}
func TestListenProvidersSkipsEmptyConfigs(t *testing.T) {
server, stop, invokeStopChan := setupListenProvider(10 * time.Millisecond)
defer invokeStopChan()
go func() {
for {
select {
case <-stop:
return
case <-server.configurationValidatedChan:
t.Error("An empty configuration was published but it should not")
}
}
}()
server.configurationChan <- types.ConfigMessage{ProviderName: "kubernetes"}
// give some time so that the configuration can be processed
time.Sleep(100 * time.Millisecond)
}
func TestListenProvidersSkipsSameConfigurationForProvider(t *testing.T) {
server, stop, invokeStopChan := setupListenProvider(10 * time.Millisecond)
defer invokeStopChan()
publishedConfigCount := 0
go func() {
for {
select {
case <-stop:
return
case config := <-server.configurationValidatedChan:
// set the current configuration
// this is usually done in the processing part of the published configuration
// so we have to emulate the behaviour here
currentConfigurations := server.currentConfigurations.Get().(types.Configurations)
currentConfigurations[config.ProviderName] = config.Configuration
server.currentConfigurations.Set(currentConfigurations)
publishedConfigCount++
if publishedConfigCount > 1 {
t.Error("Same configuration should not be published multiple times")
}
}
}
}()
config := buildDynamicConfig(
withFrontend("frontend", buildFrontend()),
withBackend("backend", buildBackend()),
)
// provide a configuration
server.configurationChan <- types.ConfigMessage{ProviderName: "kubernetes", Configuration: config}
// give some time so that the configuration can be processed
time.Sleep(20 * time.Millisecond)
// provide the same configuration a second time
server.configurationChan <- types.ConfigMessage{ProviderName: "kubernetes", Configuration: config}
// give some time so that the configuration can be processed
time.Sleep(100 * time.Millisecond)
}
func TestListenProvidersPublishesConfigForEachProvider(t *testing.T) {
server, stop, invokeStopChan := setupListenProvider(10 * time.Millisecond)
defer invokeStopChan()
publishedProviderConfigCount := map[string]int{}
publishedConfigCount := 0
consumePublishedConfigsDone := make(chan bool)
go func() {
for {
select {
case <-stop:
return
case newConfig := <-server.configurationValidatedChan:
publishedProviderConfigCount[newConfig.ProviderName]++
publishedConfigCount++
if publishedConfigCount == 2 {
consumePublishedConfigsDone <- true
return
}
}
}
}()
config := buildDynamicConfig(
withFrontend("frontend", buildFrontend()),
withBackend("backend", buildBackend()),
)
server.configurationChan <- types.ConfigMessage{ProviderName: "kubernetes", Configuration: config}
server.configurationChan <- types.ConfigMessage{ProviderName: "marathon", Configuration: config}
select {
case <-consumePublishedConfigsDone:
if val := publishedProviderConfigCount["kubernetes"]; val != 1 {
t.Errorf("Got %d configuration publication(s) for provider %q, want 1", val, "kubernetes")
}
if val := publishedProviderConfigCount["marathon"]; val != 1 {
t.Errorf("Got %d configuration publication(s) for provider %q, want 1", val, "marathon")
}
case <-time.After(100 * time.Millisecond):
t.Errorf("Published configurations were not consumed in time")
}
}
// setupListenProvider configures the Server and starts listenProviders
func setupListenProvider(throttleDuration time.Duration) (server *Server, stop chan bool, invokeStopChan func()) {
stop = make(chan bool)
invokeStopChan = func() {
stop <- true
}
globalConfig := configuration.GlobalConfiguration{
EntryPoints: configuration.EntryPoints{
"http": &configuration.EntryPoint{},
},
ProvidersThrottleDuration: flaeg.Duration(throttleDuration),
}
server = NewServer(globalConfig)
go server.listenProviders(stop)
return server, stop, invokeStopChan
}
func TestThrottleProviderConfigReload(t *testing.T) {
throttleDuration := 30 * time.Millisecond
publishConfig := make(chan types.ConfigMessage)
providerConfig := make(chan types.ConfigMessage)
stop := make(chan bool)
defer func() {
stop <- true
}()
go throttleProviderConfigReload(throttleDuration, publishConfig, providerConfig, stop)
publishedConfigCount := 0
stopConsumeConfigs := make(chan bool)
go func() {
for {
select {
case <-stop:
return
case <-stopConsumeConfigs:
return
case <-publishConfig:
publishedConfigCount++
}
}
}()
// publish 5 new configs, one new config each 10 milliseconds
for i := 0; i < 5; i++ {
providerConfig <- types.ConfigMessage{}
time.Sleep(10 * time.Millisecond)
}
// after 50 milliseconds 5 new configs were published
// with a throttle duration of 30 milliseconds this means, we should have received 2 new configs
wantPublishedConfigCount := 2
if publishedConfigCount != wantPublishedConfigCount {
t.Errorf("%d times configs were published, want %d times", publishedConfigCount, wantPublishedConfigCount)
}
stopConsumeConfigs <- true
select {
case <-publishConfig:
// There should be exactly one more message that we receive after ~60 milliseconds since the start of the test.
select {
case <-publishConfig:
t.Error("extra config publication found")
case <-time.After(100 * time.Millisecond):
return
}
case <-time.After(100 * time.Millisecond):
t.Error("Last config was not published in time")
}
}
func TestServerMultipleFrontendRules(t *testing.T) {
cases := []struct {
expression string