From dd0701dd1688910ed24c68928cb5d0877c15d189 Mon Sep 17 00:00:00 2001 From: Julien Salleyron Date: Thu, 25 Feb 2021 17:20:04 +0100 Subject: [PATCH] fix: wait for file and internal before applying configurations Co-authored-by: Ludovic Fernandez --- cmd/traefik/traefik.go | 1 + pkg/provider/aggregator/aggregator.go | 10 ++++++---- pkg/provider/aggregator/aggregator_test.go | 5 ++--- pkg/server/configurationwatcher.go | 10 ++++++++-- pkg/server/configurationwatcher_test.go | 16 ++++++++-------- 5 files changed, 25 insertions(+), 17 deletions(-) diff --git a/cmd/traefik/traefik.go b/cmd/traefik/traefik.go index 6126a1a7c..9ad068e4f 100644 --- a/cmd/traefik/traefik.go +++ b/cmd/traefik/traefik.go @@ -256,6 +256,7 @@ func setupServer(staticConfiguration *static.Configuration) (*server.Server, err providerAggregator, time.Duration(staticConfiguration.Providers.ProvidersThrottleDuration), getDefaultsEntrypoints(staticConfiguration), + "internal", ) // TLS diff --git a/pkg/provider/aggregator/aggregator.go b/pkg/provider/aggregator/aggregator.go index d000fa8af..89bcae4d5 100644 --- a/pkg/provider/aggregator/aggregator.go +++ b/pkg/provider/aggregator/aggregator.go @@ -119,10 +119,6 @@ func (p ProviderAggregator) Init() error { // Provide calls the provide method of every providers. func (p ProviderAggregator) Provide(configurationChan chan<- dynamic.Message, pool *safe.Pool) error { - if p.internalProvider != nil { - launchProvider(configurationChan, pool, p.internalProvider) - } - if p.fileProvider != nil { launchProvider(configurationChan, pool, p.fileProvider) } @@ -134,6 +130,12 @@ func (p ProviderAggregator) Provide(configurationChan chan<- dynamic.Message, po }) } + // internal provider must be the last because we use it to know if all the providers are loaded. + // ConfigurationWatcher will wait for this requiredProvider before applying configurations. + if p.internalProvider != nil { + launchProvider(configurationChan, pool, p.internalProvider) + } + return nil } diff --git a/pkg/provider/aggregator/aggregator_test.go b/pkg/provider/aggregator/aggregator_test.go index a88a791ba..a6363e819 100644 --- a/pkg/provider/aggregator/aggregator_test.go +++ b/pkg/provider/aggregator/aggregator_test.go @@ -32,12 +32,11 @@ func TestProviderAggregator_Provide(t *testing.T) { errCh <- aggregator.Provide(cfgCh, pool) }() - // Make sure the internal provider is always called first, followed by the file provider. - requireReceivedMessageFromProviders(t, cfgCh, []string{"internal"}) + // Make sure the file provider is always called first. requireReceivedMessageFromProviders(t, cfgCh, []string{"file"}) // Check if all providers have been called, the order doesn't matter. - requireReceivedMessageFromProviders(t, cfgCh, []string{"salad", "tomato", "onion"}) + requireReceivedMessageFromProviders(t, cfgCh, []string{"salad", "tomato", "onion", "internal"}) require.NoError(t, <-errCh) } diff --git a/pkg/server/configurationwatcher.go b/pkg/server/configurationwatcher.go index 260dcfb07..1d71fe472 100644 --- a/pkg/server/configurationwatcher.go +++ b/pkg/server/configurationwatcher.go @@ -28,6 +28,7 @@ type ConfigurationWatcher struct { configurationValidatedChan chan dynamic.Message providerConfigUpdateMap map[string]chan dynamic.Message + requiredProvider string configurationListeners []func(dynamic.Configuration) routinesPool *safe.Pool @@ -39,6 +40,7 @@ func NewConfigurationWatcher( pvd provider.Provider, providersThrottleDuration time.Duration, defaultEntryPoints []string, + requiredProvider string, ) *ConfigurationWatcher { watcher := &ConfigurationWatcher{ provider: pvd, @@ -48,6 +50,7 @@ func NewConfigurationWatcher( providersThrottleDuration: providersThrottleDuration, routinesPool: routinesPool, defaultEntryPoints: defaultEntryPoints, + requiredProvider: requiredProvider, } currentConfigurations := make(dynamic.Configurations) @@ -146,8 +149,11 @@ func (c *ConfigurationWatcher) loadMessage(configMsg dynamic.Message) { conf := mergeConfiguration(newConfigurations, c.defaultEntryPoints) conf = applyModel(conf) - for _, listener := range c.configurationListeners { - listener(conf) + // We wait for first configuration of the require provider before applying configurations. + if _, ok := newConfigurations[c.requiredProvider]; c.requiredProvider == "" || ok { + for _, listener := range c.configurationListeners { + listener(conf) + } } } diff --git a/pkg/server/configurationwatcher_test.go b/pkg/server/configurationwatcher_test.go index 44c4454b1..5c31f46df 100644 --- a/pkg/server/configurationwatcher_test.go +++ b/pkg/server/configurationwatcher_test.go @@ -55,7 +55,7 @@ func TestNewConfigurationWatcher(t *testing.T) { }}, } - watcher := NewConfigurationWatcher(routinesPool, pvd, time.Second, []string{}) + watcher := NewConfigurationWatcher(routinesPool, pvd, time.Second, []string{}, "") run := make(chan struct{}) @@ -112,7 +112,7 @@ func TestListenProvidersThrottleProviderConfigReload(t *testing.T) { }) } - watcher := NewConfigurationWatcher(routinesPool, pvd, 30*time.Millisecond, []string{}) + watcher := NewConfigurationWatcher(routinesPool, pvd, 30*time.Millisecond, []string{}, "") publishedConfigCount := 0 watcher.AddListener(func(_ dynamic.Configuration) { @@ -136,7 +136,7 @@ func TestListenProvidersSkipsEmptyConfigs(t *testing.T) { messages: []dynamic.Message{{ProviderName: "mock"}}, } - watcher := NewConfigurationWatcher(routinesPool, pvd, time.Second, []string{}) + watcher := NewConfigurationWatcher(routinesPool, pvd, time.Second, []string{}, "") watcher.AddListener(func(_ dynamic.Configuration) { t.Error("An empty configuration was published but it should not") }) @@ -162,7 +162,7 @@ func TestListenProvidersSkipsSameConfigurationForProvider(t *testing.T) { messages: []dynamic.Message{message, message}, } - watcher := NewConfigurationWatcher(routinesPool, pvd, 0, []string{}) + watcher := NewConfigurationWatcher(routinesPool, pvd, 0, []string{}, "") alreadyCalled := false watcher.AddListener(func(_ dynamic.Configuration) { @@ -205,7 +205,7 @@ func TestListenProvidersDoesNotSkipFlappingConfiguration(t *testing.T) { }, } - watcher := NewConfigurationWatcher(routinesPool, pvd, 15*time.Millisecond, []string{"defaultEP"}) + watcher := NewConfigurationWatcher(routinesPool, pvd, 15*time.Millisecond, []string{"defaultEP"}, "") var lastConfig dynamic.Configuration watcher.AddListener(func(conf dynamic.Configuration) { @@ -260,7 +260,7 @@ func TestListenProvidersPublishesConfigForEachProvider(t *testing.T) { }, } - watcher := NewConfigurationWatcher(routinesPool, pvd, 0, []string{"defaultEP"}) + watcher := NewConfigurationWatcher(routinesPool, pvd, 0, []string{"defaultEP"}, "") var publishedProviderConfig dynamic.Configuration @@ -327,7 +327,7 @@ func TestPublishConfigUpdatedByProvider(t *testing.T) { }, } - watcher := NewConfigurationWatcher(routinesPool, pvd, 30*time.Millisecond, []string{}) + watcher := NewConfigurationWatcher(routinesPool, pvd, 30*time.Millisecond, []string{}, "") publishedConfigCount := 0 watcher.AddListener(func(configuration dynamic.Configuration) { @@ -375,7 +375,7 @@ func TestPublishConfigUpdatedByConfigWatcherListener(t *testing.T) { }, } - watcher := NewConfigurationWatcher(routinesPool, pvd, 30*time.Millisecond, []string{}) + watcher := NewConfigurationWatcher(routinesPool, pvd, 30*time.Millisecond, []string{}, "") publishedConfigCount := 0 watcher.AddListener(func(configuration dynamic.Configuration) {