Merge pull request #477 from containous/fix-spamming-events

Fix spamming events in listenProviders
This commit is contained in:
Vincent Demeester 2016-06-23 17:09:59 +02:00 committed by GitHub
commit 04ec757083
5 changed files with 93 additions and 56 deletions

View file

@ -12,9 +12,12 @@ type CircuitBreaker struct {
}
// NewCircuitBreaker returns a new CircuitBreaker.
func NewCircuitBreaker(next http.Handler, expression string, options ...cbreaker.CircuitBreakerOption) *CircuitBreaker {
circuitBreaker, _ := cbreaker.New(next, expression, options...)
return &CircuitBreaker{circuitBreaker}
func NewCircuitBreaker(next http.Handler, expression string, options ...cbreaker.CircuitBreakerOption) (*CircuitBreaker, error) {
circuitBreaker, err := cbreaker.New(next, expression, options...)
if err != nil {
return nil, err
}
return &CircuitBreaker{circuitBreaker}, nil
}
func (cb *CircuitBreaker) ServeHTTP(rw http.ResponseWriter, r *http.Request, next http.HandlerFunc) {

View file

@ -10,6 +10,7 @@ import (
"io"
"io/ioutil"
"os"
"reflect"
"strconv"
"strings"
"text/template"
@ -53,6 +54,7 @@ type Kubernetes struct {
Endpoint string `description:"Kubernetes server endpoint"`
DisablePassHostHeaders bool `description:"Kubernetes disable PassHost Headers"`
Namespaces Namespaces `description:"Kubernetes namespaces"`
lastConfiguration safe.Safe
}
func (provider *Kubernetes) createClient() (k8s.Client, error) {
@ -124,9 +126,14 @@ func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage
if err != nil {
return err
}
configurationChan <- types.ConfigMessage{
ProviderName: "kubernetes",
Configuration: provider.loadConfig(*templateObjects),
if reflect.DeepEqual(provider.lastConfiguration.Get(), templateObjects) {
log.Debugf("Skipping event from kubernetes %+v", event)
} else {
provider.lastConfiguration.Set(templateObjects)
configurationChan <- types.ConfigMessage{
ProviderName: "kubernetes",
Configuration: provider.loadConfig(*templateObjects),
}
}
}
}
@ -146,9 +153,14 @@ func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage
if err != nil {
return err
}
configurationChan <- types.ConfigMessage{
ProviderName: "kubernetes",
Configuration: provider.loadConfig(*templateObjects),
if reflect.DeepEqual(provider.lastConfiguration.Get(), templateObjects) {
log.Debugf("Skipping configuration from kubernetes %+v", templateObjects)
} else {
provider.lastConfiguration.Set(templateObjects)
configurationChan <- types.ConfigMessage{
ProviderName: "kubernetes",
Configuration: provider.loadConfig(*templateObjects),
}
}
return nil

View file

@ -106,6 +106,11 @@ func (r *Rules) Parse(expression string) (*mux.Route, error) {
"Headers": r.headers,
"HeadersRegexp": r.headersRegexp,
}
if len(expression) == 0 {
return nil, errors.New("Empty rule")
}
f := func(c rune) bool {
return c == ':'
}

101
server.go
View file

@ -137,28 +137,48 @@ func (server *Server) listenProviders(stop chan bool) {
if !ok {
return
}
server.defaultConfigurationValues(configMsg.Configuration)
currentConfigurations := server.currentConfigurations.Get().(configs)
jsonConf, _ := json.Marshal(configMsg.Configuration)
log.Debugf("Configuration received from provider %s: %s", configMsg.ProviderName, string(jsonConf))
lastConfigs.Set(configMsg.ProviderName, &configMsg)
lastReceivedConfigurationValue := lastReceivedConfiguration.Get().(time.Time)
if time.Now().After(lastReceivedConfigurationValue.Add(time.Duration(server.globalConfiguration.ProvidersThrottleDuration))) {
log.Debugf("Last %s config received more than %s, OK", configMsg.ProviderName, server.globalConfiguration.ProvidersThrottleDuration)
// last config received more than n s ago
server.configurationValidatedChan <- configMsg
if configMsg.Configuration == nil || configMsg.Configuration.Backends == nil && configMsg.Configuration.Frontends == nil {
log.Infof("Skipping empty Configuration for provider %s", configMsg.ProviderName)
} else if reflect.DeepEqual(currentConfigurations[configMsg.ProviderName], configMsg.Configuration) {
log.Infof("Skipping same configuration for provider %s", configMsg.ProviderName)
} else {
log.Debugf("Last %s config received less than %s, waiting...", configMsg.ProviderName, server.globalConfiguration.ProvidersThrottleDuration)
safe.Go(func() {
<-time.After(server.globalConfiguration.ProvidersThrottleDuration)
lastReceivedConfigurationValue := lastReceivedConfiguration.Get().(time.Time)
if time.Now().After(lastReceivedConfigurationValue.Add(time.Duration(server.globalConfiguration.ProvidersThrottleDuration))) {
log.Debugf("Waited for %s config, OK", configMsg.ProviderName)
if lastConfig, ok := lastConfigs.Get(configMsg.ProviderName); ok {
server.configurationValidatedChan <- *lastConfig.(*types.ConfigMessage)
lastConfigs.Set(configMsg.ProviderName, &configMsg)
lastReceivedConfigurationValue := lastReceivedConfiguration.Get().(time.Time)
if time.Now().After(lastReceivedConfigurationValue.Add(time.Duration(server.globalConfiguration.ProvidersThrottleDuration))) {
log.Debugf("Last %s config received more than %s, OK", configMsg.ProviderName, server.globalConfiguration.ProvidersThrottleDuration)
// last config received more than n s ago
server.configurationValidatedChan <- configMsg
} else {
log.Debugf("Last %s config received less than %s, waiting...", configMsg.ProviderName, server.globalConfiguration.ProvidersThrottleDuration)
safe.Go(func() {
<-time.After(server.globalConfiguration.ProvidersThrottleDuration)
lastReceivedConfigurationValue := lastReceivedConfiguration.Get().(time.Time)
if time.Now().After(lastReceivedConfigurationValue.Add(time.Duration(server.globalConfiguration.ProvidersThrottleDuration))) {
log.Debugf("Waited for %s config, OK", configMsg.ProviderName)
if lastConfig, ok := lastConfigs.Get(configMsg.ProviderName); ok {
server.configurationValidatedChan <- *lastConfig.(*types.ConfigMessage)
}
}
}
})
})
}
lastReceivedConfiguration.Set(time.Now())
}
lastReceivedConfiguration.Set(time.Now())
}
}
}
func (server *Server) defaultConfigurationValues(configuration *types.Configuration) {
if configuration == nil || configuration.Frontends == nil {
return
}
for _, frontend := range configuration.Frontends {
// default endpoints if not defined in frontends
if len(frontend.EntryPoints) == 0 {
frontend.EntryPoints = server.globalConfiguration.DefaultEntryPoints
}
}
}
@ -173,28 +193,23 @@ func (server *Server) listenConfigurations(stop chan bool) {
return
}
currentConfigurations := server.currentConfigurations.Get().(configs)
if configMsg.Configuration == nil {
log.Infof("Skipping empty Configuration for provider %s", configMsg.ProviderName)
} else if reflect.DeepEqual(currentConfigurations[configMsg.ProviderName], configMsg.Configuration) {
log.Infof("Skipping same configuration for provider %s", configMsg.ProviderName)
} else {
// Copy configurations to new map so we don't change current if LoadConfig fails
newConfigurations := make(configs)
for k, v := range currentConfigurations {
newConfigurations[k] = v
}
newConfigurations[configMsg.ProviderName] = configMsg.Configuration
newServerEntryPoints, err := server.loadConfig(newConfigurations, server.globalConfiguration)
if err == nil {
for newServerEntryPointName, newServerEntryPoint := range newServerEntryPoints {
server.serverEntryPoints[newServerEntryPointName].httpRouter.UpdateHandler(newServerEntryPoint.httpRouter.GetHandler())
log.Infof("Server configuration reloaded on %s", server.serverEntryPoints[newServerEntryPointName].httpServer.Addr)
}
server.currentConfigurations.Set(newConfigurations)
} else {
log.Error("Error loading new configuration, aborted ", err)
// Copy configurations to new map so we don't change current if LoadConfig fails
newConfigurations := make(configs)
for k, v := range currentConfigurations {
newConfigurations[k] = v
}
newConfigurations[configMsg.ProviderName] = configMsg.Configuration
newServerEntryPoints, err := server.loadConfig(newConfigurations, server.globalConfiguration)
if err == nil {
for newServerEntryPointName, newServerEntryPoint := range newServerEntryPoints {
server.serverEntryPoints[newServerEntryPointName].httpRouter.UpdateHandler(newServerEntryPoint.httpRouter.GetHandler())
log.Infof("Server configuration reloaded on %s", server.serverEntryPoints[newServerEntryPointName].httpServer.Addr)
}
server.currentConfigurations.Set(newConfigurations)
} else {
log.Error("Error loading new configuration, aborted ", err)
}
}
}
@ -376,10 +391,6 @@ func (server *Server) loadConfig(configurations configs, globalConfiguration Glo
log.Debugf("Creating frontend %s", frontendName)
fwd, _ := forward.New(forward.Logger(oxyLogger), forward.PassHostHeader(frontend.PassHostHeader))
saveBackend := middlewares.NewSaveBackend(fwd)
// default endpoints if not defined in frontends
if len(frontend.EntryPoints) == 0 {
frontend.EntryPoints = globalConfiguration.DefaultEntryPoints
}
if len(frontend.EntryPoints) == 0 {
log.Errorf("No entrypoint defined for frontend %s, defaultEntryPoints:%s", frontendName, globalConfiguration.DefaultEntryPoints)
log.Errorf("Skipping frontend %s...", frontendName)
@ -496,7 +507,13 @@ func (server *Server) loadConfig(configurations configs, globalConfiguration Glo
var negroni = negroni.New()
if configuration.Backends[frontend.Backend].CircuitBreaker != nil {
log.Debugf("Creating circuit breaker %s", configuration.Backends[frontend.Backend].CircuitBreaker.Expression)
negroni.Use(middlewares.NewCircuitBreaker(lb, configuration.Backends[frontend.Backend].CircuitBreaker.Expression, cbreaker.Logger(oxyLogger)))
cbreaker, err := middlewares.NewCircuitBreaker(lb, configuration.Backends[frontend.Backend].CircuitBreaker.Expression, cbreaker.Logger(oxyLogger))
if err != nil {
log.Errorf("Error creating circuit breaker: %v", err)
log.Errorf("Skipping frontend %s...", frontendName)
continue frontend
}
negroni.Use(cbreaker)
} else {
negroni.UseHandler(lb)
}

View file

@ -1,7 +1,7 @@
[backends]
{{range $index, $node := .Nodes}}
{{if ne (getAttribute "enable" $node.Service.Tags "true") "false"}}
[backends.backend-{{getBackend $node}}.servers.{{getBackendName $node $index}}]
[backends."backend-{{getBackend $node}}".servers."{{getBackendName $node $index}}"]
url = "{{getAttribute "protocol" $node.Service.Tags "http"}}://{{getBackendAddress $node}}:{{$node.Service.Port}}"
{{$weight := getAttribute "backend.weight" $node.Service.Tags ""}}
{{with $weight}}
@ -14,20 +14,20 @@
{{$service := .ServiceName}}
{{$circuitBreaker := getAttribute "backend.circuitbreaker" .Attributes ""}}
{{with $circuitBreaker}}
[backends.backend-{{$service}}.circuitbreaker]
[backends."backend-{{$service}}".circuitbreaker]
expression = "{{$circuitBreaker}}"
{{end}}
{{$loadBalancer := getAttribute "backend.loadbalancer" .Attributes ""}}
{{with $loadBalancer}}
[backends.backend-{{$service}}.loadbalancer]
[backends."backend-{{$service}}".loadbalancer]
method = "{{$loadBalancer}}"
{{end}}
{{end}}
[frontends]
{{range .Services}}
[frontends.frontend-{{.ServiceName}}]
[frontends."frontend-{{.ServiceName}}"]
backend = "backend-{{.ServiceName}}"
passHostHeader = {{getAttribute "frontend.passHostHeader" .Attributes "true"}}
priority = {{getAttribute "frontend.priority" .Attributes "0"}}
@ -37,6 +37,6 @@
"{{.}}",
{{end}}]
{{end}}
[frontends.frontend-{{.ServiceName}}.routes.route-host-{{.ServiceName}}]
[frontends."frontend-{{.ServiceName}}".routes."route-host-{{.ServiceName}}"]
rule = "{{getFrontendRule .}}"
{{end}}