Refactor configuration reload/throttling
Co-authored-by: Mathieu Lonjaret <mathieu.lonjaret@gmail.com>
This commit is contained in:
parent
764bf59d4d
commit
5780dc2b15
15 changed files with 872 additions and 242 deletions
|
@ -4,9 +4,7 @@ import (
|
|||
"context"
|
||||
"encoding/json"
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
"github.com/eapache/channels"
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/traefik/traefik/v2/pkg/config/dynamic"
|
||||
"github.com/traefik/traefik/v2/pkg/log"
|
||||
|
@ -17,17 +15,13 @@ import (
|
|||
|
||||
// ConfigurationWatcher watches configuration changes.
|
||||
type ConfigurationWatcher struct {
|
||||
provider provider.Provider
|
||||
providerAggregator provider.Provider
|
||||
|
||||
defaultEntryPoints []string
|
||||
|
||||
providersThrottleDuration time.Duration
|
||||
allProvidersConfigs chan dynamic.Message
|
||||
|
||||
currentConfigurations safe.Safe
|
||||
|
||||
configurationChan chan dynamic.Message
|
||||
configurationValidatedChan chan dynamic.Message
|
||||
providerConfigUpdateMap map[string]chan dynamic.Message
|
||||
newConfigs chan dynamic.Configurations
|
||||
|
||||
requiredProvider string
|
||||
configurationListeners []func(dynamic.Configuration)
|
||||
|
@ -39,38 +33,30 @@ type ConfigurationWatcher struct {
|
|||
func NewConfigurationWatcher(
|
||||
routinesPool *safe.Pool,
|
||||
pvd provider.Provider,
|
||||
providersThrottleDuration time.Duration,
|
||||
defaultEntryPoints []string,
|
||||
requiredProvider string,
|
||||
) *ConfigurationWatcher {
|
||||
watcher := &ConfigurationWatcher{
|
||||
provider: pvd,
|
||||
configurationChan: make(chan dynamic.Message, 100),
|
||||
configurationValidatedChan: make(chan dynamic.Message, 100),
|
||||
providerConfigUpdateMap: make(map[string]chan dynamic.Message),
|
||||
providersThrottleDuration: providersThrottleDuration,
|
||||
routinesPool: routinesPool,
|
||||
defaultEntryPoints: defaultEntryPoints,
|
||||
requiredProvider: requiredProvider,
|
||||
return &ConfigurationWatcher{
|
||||
providerAggregator: pvd,
|
||||
allProvidersConfigs: make(chan dynamic.Message, 100),
|
||||
newConfigs: make(chan dynamic.Configurations),
|
||||
routinesPool: routinesPool,
|
||||
defaultEntryPoints: defaultEntryPoints,
|
||||
requiredProvider: requiredProvider,
|
||||
}
|
||||
|
||||
currentConfigurations := make(dynamic.Configurations)
|
||||
watcher.currentConfigurations.Set(currentConfigurations)
|
||||
|
||||
return watcher
|
||||
}
|
||||
|
||||
// Start the configuration watcher.
|
||||
func (c *ConfigurationWatcher) Start() {
|
||||
c.routinesPool.GoCtx(c.listenProviders)
|
||||
c.routinesPool.GoCtx(c.listenConfigurations)
|
||||
c.startProvider()
|
||||
c.routinesPool.GoCtx(c.receiveConfigurations)
|
||||
c.routinesPool.GoCtx(c.applyConfigurations)
|
||||
c.startProviderAggregator()
|
||||
}
|
||||
|
||||
// Stop the configuration watcher.
|
||||
func (c *ConfigurationWatcher) Stop() {
|
||||
close(c.configurationChan)
|
||||
close(c.configurationValidatedChan)
|
||||
close(c.allProvidersConfigs)
|
||||
close(c.newConfigs)
|
||||
}
|
||||
|
||||
// AddListener adds a new listener function used when new configuration is provided.
|
||||
|
@ -81,180 +67,159 @@ func (c *ConfigurationWatcher) AddListener(listener func(dynamic.Configuration))
|
|||
c.configurationListeners = append(c.configurationListeners, listener)
|
||||
}
|
||||
|
||||
func (c *ConfigurationWatcher) startProvider() {
|
||||
func (c *ConfigurationWatcher) startProviderAggregator() {
|
||||
logger := log.WithoutContext()
|
||||
|
||||
logger.Infof("Starting provider %T", c.provider)
|
||||
|
||||
currentProvider := c.provider
|
||||
logger.Infof("Starting provider aggregator %T", c.providerAggregator)
|
||||
|
||||
safe.Go(func() {
|
||||
err := currentProvider.Provide(c.configurationChan, c.routinesPool)
|
||||
err := c.providerAggregator.Provide(c.allProvidersConfigs, c.routinesPool)
|
||||
if err != nil {
|
||||
logger.Errorf("Error starting provider %T: %s", currentProvider, err)
|
||||
logger.Errorf("Error starting provider aggregator %T: %s", c.providerAggregator, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// listenProviders receives configuration changes from the providers.
|
||||
// The configuration message then gets passed along a series of check
|
||||
// to finally end up in a throttler that sends it to listenConfigurations (through c. configurationValidatedChan).
|
||||
func (c *ConfigurationWatcher) listenProviders(ctx context.Context) {
|
||||
// receiveConfigurations receives configuration changes from the providers.
|
||||
// The configuration message then gets passed along a series of check, notably
|
||||
// to verify that, for a given provider, the configuration that was just received
|
||||
// is at least different from the previously received one.
|
||||
// The full set of configurations is then sent to the throttling goroutine,
|
||||
// (throttleAndApplyConfigurations) via a RingChannel, which ensures that we can
|
||||
// constantly send in a non-blocking way to the throttling goroutine the last
|
||||
// global state we are aware of.
|
||||
func (c *ConfigurationWatcher) receiveConfigurations(ctx context.Context) {
|
||||
newConfigurations := make(dynamic.Configurations)
|
||||
var output chan dynamic.Configurations
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case configMsg, ok := <-c.configurationChan:
|
||||
// DeepCopy is necessary because newConfigurations gets modified later by the consumer of c.newConfigs
|
||||
case output <- newConfigurations.DeepCopy():
|
||||
output = nil
|
||||
|
||||
default:
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case configMsg, ok := <-c.allProvidersConfigs:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
logger := log.WithoutContext().WithField(log.ProviderName, configMsg.ProviderName)
|
||||
|
||||
if configMsg.Configuration == nil {
|
||||
logger.Debug("Skipping nil configuration.")
|
||||
continue
|
||||
}
|
||||
|
||||
if isEmptyConfiguration(configMsg.Configuration) {
|
||||
logger.Debug("Skipping empty configuration.")
|
||||
continue
|
||||
}
|
||||
|
||||
logConfiguration(logger, configMsg)
|
||||
|
||||
if reflect.DeepEqual(newConfigurations[configMsg.ProviderName], configMsg.Configuration) {
|
||||
// no change, do nothing
|
||||
logger.Debug("Skipping unchanged configuration.")
|
||||
continue
|
||||
}
|
||||
|
||||
newConfigurations[configMsg.ProviderName] = configMsg.Configuration.DeepCopy()
|
||||
|
||||
output = c.newConfigs
|
||||
|
||||
// DeepCopy is necessary because newConfigurations gets modified later by the consumer of c.newConfigs
|
||||
case output <- newConfigurations.DeepCopy():
|
||||
output = nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// applyConfigurations blocks on a RingChannel that receives the new
|
||||
// set of configurations that is compiled and sent by receiveConfigurations as soon
|
||||
// as a provider change occurs. If the new set is different from the previous set
|
||||
// that had been applied, the new set is applied, and we sleep for a while before
|
||||
// listening on the channel again.
|
||||
func (c *ConfigurationWatcher) applyConfigurations(ctx context.Context) {
|
||||
var lastConfigurations dynamic.Configurations
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case newConfigs, ok := <-c.newConfigs:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
if configMsg.Configuration == nil {
|
||||
log.WithoutContext().WithField(log.ProviderName, configMsg.ProviderName).
|
||||
Debug("Received nil configuration from provider, skipping.")
|
||||
return
|
||||
// We wait for first configuration of the required provider before applying configurations.
|
||||
if _, ok := newConfigs[c.requiredProvider]; c.requiredProvider != "" && !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
c.preLoadConfiguration(configMsg)
|
||||
if reflect.DeepEqual(newConfigs, lastConfigurations) {
|
||||
continue
|
||||
}
|
||||
|
||||
conf := mergeConfiguration(newConfigs.DeepCopy(), c.defaultEntryPoints)
|
||||
conf = applyModel(conf)
|
||||
|
||||
for _, listener := range c.configurationListeners {
|
||||
listener(conf)
|
||||
}
|
||||
|
||||
lastConfigurations = newConfigs
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ConfigurationWatcher) listenConfigurations(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case configMsg, ok := <-c.configurationValidatedChan:
|
||||
if !ok || configMsg.Configuration == nil {
|
||||
return
|
||||
}
|
||||
c.loadMessage(configMsg)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ConfigurationWatcher) loadMessage(configMsg dynamic.Message) {
|
||||
currentConfigurations := c.currentConfigurations.Get().(dynamic.Configurations)
|
||||
|
||||
// Copy configurations to new map so we don't change current if LoadConfig fails
|
||||
newConfigurations := currentConfigurations.DeepCopy()
|
||||
newConfigurations[configMsg.ProviderName] = configMsg.Configuration
|
||||
|
||||
c.currentConfigurations.Set(newConfigurations)
|
||||
|
||||
conf := mergeConfiguration(newConfigurations, c.defaultEntryPoints)
|
||||
conf = applyModel(conf)
|
||||
|
||||
// We wait for first configuration of the require provider before applying configurations.
|
||||
if _, ok := newConfigurations[c.requiredProvider]; c.requiredProvider == "" || ok {
|
||||
for _, listener := range c.configurationListeners {
|
||||
listener(conf)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *ConfigurationWatcher) preLoadConfiguration(configMsg dynamic.Message) {
|
||||
logger := log.WithoutContext().WithField(log.ProviderName, configMsg.ProviderName)
|
||||
if log.GetLevel() == logrus.DebugLevel {
|
||||
copyConf := configMsg.Configuration.DeepCopy()
|
||||
if copyConf.TLS != nil {
|
||||
copyConf.TLS.Certificates = nil
|
||||
|
||||
if copyConf.TLS.Options != nil {
|
||||
cleanedOptions := make(map[string]tls.Options, len(copyConf.TLS.Options))
|
||||
for name, option := range copyConf.TLS.Options {
|
||||
option.ClientAuth.CAFiles = []tls.FileOrContent{}
|
||||
cleanedOptions[name] = option
|
||||
}
|
||||
|
||||
copyConf.TLS.Options = cleanedOptions
|
||||
}
|
||||
|
||||
for k := range copyConf.TLS.Stores {
|
||||
st := copyConf.TLS.Stores[k]
|
||||
st.DefaultCertificate = nil
|
||||
copyConf.TLS.Stores[k] = st
|
||||
}
|
||||
}
|
||||
|
||||
if copyConf.HTTP != nil {
|
||||
for _, transport := range copyConf.HTTP.ServersTransports {
|
||||
transport.Certificates = tls.Certificates{}
|
||||
transport.RootCAs = []tls.FileOrContent{}
|
||||
}
|
||||
}
|
||||
|
||||
jsonConf, err := json.Marshal(copyConf)
|
||||
if err != nil {
|
||||
logger.Errorf("Could not marshal dynamic configuration: %v", err)
|
||||
logger.Debugf("Configuration received from provider %s: [struct] %#v", configMsg.ProviderName, copyConf)
|
||||
} else {
|
||||
logger.Debugf("Configuration received from provider %s: %s", configMsg.ProviderName, string(jsonConf))
|
||||
}
|
||||
}
|
||||
|
||||
if isEmptyConfiguration(configMsg.Configuration) {
|
||||
logger.Infof("Skipping empty Configuration for provider %s", configMsg.ProviderName)
|
||||
func logConfiguration(logger log.Logger, configMsg dynamic.Message) {
|
||||
if log.GetLevel() != logrus.DebugLevel {
|
||||
return
|
||||
}
|
||||
|
||||
providerConfigUpdateCh, ok := c.providerConfigUpdateMap[configMsg.ProviderName]
|
||||
if !ok {
|
||||
providerConfigUpdateCh = make(chan dynamic.Message)
|
||||
c.providerConfigUpdateMap[configMsg.ProviderName] = providerConfigUpdateCh
|
||||
c.routinesPool.GoCtx(func(ctxPool context.Context) {
|
||||
c.throttleProviderConfigReload(ctxPool, c.providersThrottleDuration, c.configurationValidatedChan, providerConfigUpdateCh)
|
||||
})
|
||||
copyConf := configMsg.Configuration.DeepCopy()
|
||||
if copyConf.TLS != nil {
|
||||
copyConf.TLS.Certificates = nil
|
||||
|
||||
if copyConf.TLS.Options != nil {
|
||||
cleanedOptions := make(map[string]tls.Options, len(copyConf.TLS.Options))
|
||||
for name, option := range copyConf.TLS.Options {
|
||||
option.ClientAuth.CAFiles = []tls.FileOrContent{}
|
||||
cleanedOptions[name] = option
|
||||
}
|
||||
|
||||
copyConf.TLS.Options = cleanedOptions
|
||||
}
|
||||
|
||||
for k := range copyConf.TLS.Stores {
|
||||
st := copyConf.TLS.Stores[k]
|
||||
st.DefaultCertificate = nil
|
||||
copyConf.TLS.Stores[k] = st
|
||||
}
|
||||
}
|
||||
|
||||
providerConfigUpdateCh <- configMsg
|
||||
}
|
||||
|
||||
// throttleProviderConfigReload throttles the configuration reload speed for a single provider.
|
||||
// It will immediately publish a new configuration and then only publish the next configuration after the throttle duration.
|
||||
// Note that in the case it receives N new configs in the timeframe of the throttle duration after publishing,
|
||||
// it will publish the last of the newly received configurations.
|
||||
func (c *ConfigurationWatcher) throttleProviderConfigReload(ctx context.Context, throttle time.Duration, publish chan<- dynamic.Message, in <-chan dynamic.Message) {
|
||||
ring := channels.NewRingChannel(1)
|
||||
defer ring.Close()
|
||||
|
||||
c.routinesPool.GoCtx(func(ctxPool context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-ctxPool.Done():
|
||||
return
|
||||
case nextConfig := <-ring.Out():
|
||||
if config, ok := nextConfig.(dynamic.Message); ok {
|
||||
publish <- config
|
||||
time.Sleep(throttle)
|
||||
}
|
||||
}
|
||||
if copyConf.HTTP != nil {
|
||||
for _, transport := range copyConf.HTTP.ServersTransports {
|
||||
transport.Certificates = tls.Certificates{}
|
||||
transport.RootCAs = []tls.FileOrContent{}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
var previousConfig dynamic.Message
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case nextConfig := <-in:
|
||||
if reflect.DeepEqual(previousConfig, nextConfig) {
|
||||
logger := log.WithoutContext().WithField(log.ProviderName, nextConfig.ProviderName)
|
||||
logger.Debug("Skipping same configuration")
|
||||
continue
|
||||
}
|
||||
previousConfig = *nextConfig.DeepCopy()
|
||||
ring.In() <- *nextConfig.DeepCopy()
|
||||
}
|
||||
jsonConf, err := json.Marshal(copyConf)
|
||||
if err != nil {
|
||||
logger.Errorf("Could not marshal dynamic configuration: %v", err)
|
||||
logger.Debugf("Configuration received: [struct] %#v", copyConf)
|
||||
} else {
|
||||
logger.Debugf("Configuration received: %s", string(jsonConf))
|
||||
}
|
||||
}
|
||||
|
||||
func isEmptyConfiguration(conf *dynamic.Configuration) bool {
|
||||
if conf == nil {
|
||||
return true
|
||||
}
|
||||
|
||||
if conf.TCP == nil {
|
||||
conf.TCP = &dynamic.TCPConfiguration{}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue