diff --git a/middlewares/cbreaker.go b/middlewares/cbreaker.go index d5c97d8f7..4a1859330 100644 --- a/middlewares/cbreaker.go +++ b/middlewares/cbreaker.go @@ -12,9 +12,12 @@ type CircuitBreaker struct { } // NewCircuitBreaker returns a new CircuitBreaker. -func NewCircuitBreaker(next http.Handler, expression string, options ...cbreaker.CircuitBreakerOption) *CircuitBreaker { - circuitBreaker, _ := cbreaker.New(next, expression, options...) - return &CircuitBreaker{circuitBreaker} +func NewCircuitBreaker(next http.Handler, expression string, options ...cbreaker.CircuitBreakerOption) (*CircuitBreaker, error) { + circuitBreaker, err := cbreaker.New(next, expression, options...) + if err != nil { + return nil, err + } + return &CircuitBreaker{circuitBreaker}, nil } func (cb *CircuitBreaker) ServeHTTP(rw http.ResponseWriter, r *http.Request, next http.HandlerFunc) { diff --git a/provider/kubernetes.go b/provider/kubernetes.go index e46b165d4..d9fc1bf0e 100644 --- a/provider/kubernetes.go +++ b/provider/kubernetes.go @@ -10,6 +10,7 @@ import ( "io" "io/ioutil" "os" + "reflect" "strconv" "strings" "text/template" @@ -53,6 +54,7 @@ type Kubernetes struct { Endpoint string `description:"Kubernetes server endpoint"` DisablePassHostHeaders bool `description:"Kubernetes disable PassHost Headers"` Namespaces Namespaces `description:"Kubernetes namespaces"` + lastConfiguration safe.Safe } func (provider *Kubernetes) createClient() (k8s.Client, error) { @@ -124,9 +126,14 @@ func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage if err != nil { return err } - configurationChan <- types.ConfigMessage{ - ProviderName: "kubernetes", - Configuration: provider.loadConfig(*templateObjects), + if reflect.DeepEqual(provider.lastConfiguration.Get(), templateObjects) { + log.Debugf("Skipping event from kubernetes %+v", event) + } else { + provider.lastConfiguration.Set(templateObjects) + configurationChan <- types.ConfigMessage{ + ProviderName: "kubernetes", + Configuration: provider.loadConfig(*templateObjects), + } } } } @@ -146,9 +153,14 @@ func (provider *Kubernetes) Provide(configurationChan chan<- types.ConfigMessage if err != nil { return err } - configurationChan <- types.ConfigMessage{ - ProviderName: "kubernetes", - Configuration: provider.loadConfig(*templateObjects), + if reflect.DeepEqual(provider.lastConfiguration.Get(), templateObjects) { + log.Debugf("Skipping configuration from kubernetes %+v", templateObjects) + } else { + provider.lastConfiguration.Set(templateObjects) + configurationChan <- types.ConfigMessage{ + ProviderName: "kubernetes", + Configuration: provider.loadConfig(*templateObjects), + } } return nil diff --git a/rules.go b/rules.go index 229f58203..f0c03409d 100644 --- a/rules.go +++ b/rules.go @@ -106,6 +106,11 @@ func (r *Rules) Parse(expression string) (*mux.Route, error) { "Headers": r.headers, "HeadersRegexp": r.headersRegexp, } + + if len(expression) == 0 { + return nil, errors.New("Empty rule") + } + f := func(c rune) bool { return c == ':' } diff --git a/server.go b/server.go index 409f0b848..a8adaa288 100644 --- a/server.go +++ b/server.go @@ -137,28 +137,48 @@ func (server *Server) listenProviders(stop chan bool) { if !ok { return } + server.defaultConfigurationValues(configMsg.Configuration) + currentConfigurations := server.currentConfigurations.Get().(configs) jsonConf, _ := json.Marshal(configMsg.Configuration) log.Debugf("Configuration received from provider %s: %s", configMsg.ProviderName, string(jsonConf)) - lastConfigs.Set(configMsg.ProviderName, &configMsg) - lastReceivedConfigurationValue := lastReceivedConfiguration.Get().(time.Time) - if time.Now().After(lastReceivedConfigurationValue.Add(time.Duration(server.globalConfiguration.ProvidersThrottleDuration))) { - log.Debugf("Last %s config received more than %s, OK", configMsg.ProviderName, server.globalConfiguration.ProvidersThrottleDuration) - // last config received more than n s ago - server.configurationValidatedChan <- configMsg + if configMsg.Configuration == nil || configMsg.Configuration.Backends == nil && configMsg.Configuration.Frontends == nil { + log.Infof("Skipping empty Configuration for provider %s", configMsg.ProviderName) + } else if reflect.DeepEqual(currentConfigurations[configMsg.ProviderName], configMsg.Configuration) { + log.Infof("Skipping same configuration for provider %s", configMsg.ProviderName) } else { - log.Debugf("Last %s config received less than %s, waiting...", configMsg.ProviderName, server.globalConfiguration.ProvidersThrottleDuration) - safe.Go(func() { - <-time.After(server.globalConfiguration.ProvidersThrottleDuration) - lastReceivedConfigurationValue := lastReceivedConfiguration.Get().(time.Time) - if time.Now().After(lastReceivedConfigurationValue.Add(time.Duration(server.globalConfiguration.ProvidersThrottleDuration))) { - log.Debugf("Waited for %s config, OK", configMsg.ProviderName) - if lastConfig, ok := lastConfigs.Get(configMsg.ProviderName); ok { - server.configurationValidatedChan <- *lastConfig.(*types.ConfigMessage) + lastConfigs.Set(configMsg.ProviderName, &configMsg) + lastReceivedConfigurationValue := lastReceivedConfiguration.Get().(time.Time) + if time.Now().After(lastReceivedConfigurationValue.Add(time.Duration(server.globalConfiguration.ProvidersThrottleDuration))) { + log.Debugf("Last %s config received more than %s, OK", configMsg.ProviderName, server.globalConfiguration.ProvidersThrottleDuration) + // last config received more than n s ago + server.configurationValidatedChan <- configMsg + } else { + log.Debugf("Last %s config received less than %s, waiting...", configMsg.ProviderName, server.globalConfiguration.ProvidersThrottleDuration) + safe.Go(func() { + <-time.After(server.globalConfiguration.ProvidersThrottleDuration) + lastReceivedConfigurationValue := lastReceivedConfiguration.Get().(time.Time) + if time.Now().After(lastReceivedConfigurationValue.Add(time.Duration(server.globalConfiguration.ProvidersThrottleDuration))) { + log.Debugf("Waited for %s config, OK", configMsg.ProviderName) + if lastConfig, ok := lastConfigs.Get(configMsg.ProviderName); ok { + server.configurationValidatedChan <- *lastConfig.(*types.ConfigMessage) + } } - } - }) + }) + } + lastReceivedConfiguration.Set(time.Now()) } - lastReceivedConfiguration.Set(time.Now()) + } + } +} + +func (server *Server) defaultConfigurationValues(configuration *types.Configuration) { + if configuration == nil || configuration.Frontends == nil { + return + } + for _, frontend := range configuration.Frontends { + // default endpoints if not defined in frontends + if len(frontend.EntryPoints) == 0 { + frontend.EntryPoints = server.globalConfiguration.DefaultEntryPoints } } } @@ -173,28 +193,23 @@ func (server *Server) listenConfigurations(stop chan bool) { return } currentConfigurations := server.currentConfigurations.Get().(configs) - if configMsg.Configuration == nil { - log.Infof("Skipping empty Configuration for provider %s", configMsg.ProviderName) - } else if reflect.DeepEqual(currentConfigurations[configMsg.ProviderName], configMsg.Configuration) { - log.Infof("Skipping same configuration for provider %s", configMsg.ProviderName) - } else { - // Copy configurations to new map so we don't change current if LoadConfig fails - newConfigurations := make(configs) - for k, v := range currentConfigurations { - newConfigurations[k] = v - } - newConfigurations[configMsg.ProviderName] = configMsg.Configuration - newServerEntryPoints, err := server.loadConfig(newConfigurations, server.globalConfiguration) - if err == nil { - for newServerEntryPointName, newServerEntryPoint := range newServerEntryPoints { - server.serverEntryPoints[newServerEntryPointName].httpRouter.UpdateHandler(newServerEntryPoint.httpRouter.GetHandler()) - log.Infof("Server configuration reloaded on %s", server.serverEntryPoints[newServerEntryPointName].httpServer.Addr) - } - server.currentConfigurations.Set(newConfigurations) - } else { - log.Error("Error loading new configuration, aborted ", err) + // Copy configurations to new map so we don't change current if LoadConfig fails + newConfigurations := make(configs) + for k, v := range currentConfigurations { + newConfigurations[k] = v + } + newConfigurations[configMsg.ProviderName] = configMsg.Configuration + + newServerEntryPoints, err := server.loadConfig(newConfigurations, server.globalConfiguration) + if err == nil { + for newServerEntryPointName, newServerEntryPoint := range newServerEntryPoints { + server.serverEntryPoints[newServerEntryPointName].httpRouter.UpdateHandler(newServerEntryPoint.httpRouter.GetHandler()) + log.Infof("Server configuration reloaded on %s", server.serverEntryPoints[newServerEntryPointName].httpServer.Addr) } + server.currentConfigurations.Set(newConfigurations) + } else { + log.Error("Error loading new configuration, aborted ", err) } } } @@ -376,10 +391,6 @@ func (server *Server) loadConfig(configurations configs, globalConfiguration Glo log.Debugf("Creating frontend %s", frontendName) fwd, _ := forward.New(forward.Logger(oxyLogger), forward.PassHostHeader(frontend.PassHostHeader)) saveBackend := middlewares.NewSaveBackend(fwd) - // default endpoints if not defined in frontends - if len(frontend.EntryPoints) == 0 { - frontend.EntryPoints = globalConfiguration.DefaultEntryPoints - } if len(frontend.EntryPoints) == 0 { log.Errorf("No entrypoint defined for frontend %s, defaultEntryPoints:%s", frontendName, globalConfiguration.DefaultEntryPoints) log.Errorf("Skipping frontend %s...", frontendName) @@ -496,7 +507,13 @@ func (server *Server) loadConfig(configurations configs, globalConfiguration Glo var negroni = negroni.New() if configuration.Backends[frontend.Backend].CircuitBreaker != nil { log.Debugf("Creating circuit breaker %s", configuration.Backends[frontend.Backend].CircuitBreaker.Expression) - negroni.Use(middlewares.NewCircuitBreaker(lb, configuration.Backends[frontend.Backend].CircuitBreaker.Expression, cbreaker.Logger(oxyLogger))) + cbreaker, err := middlewares.NewCircuitBreaker(lb, configuration.Backends[frontend.Backend].CircuitBreaker.Expression, cbreaker.Logger(oxyLogger)) + if err != nil { + log.Errorf("Error creating circuit breaker: %v", err) + log.Errorf("Skipping frontend %s...", frontendName) + continue frontend + } + negroni.Use(cbreaker) } else { negroni.UseHandler(lb) } diff --git a/templates/consul_catalog.tmpl b/templates/consul_catalog.tmpl index 20d0a8f19..cac301bbd 100644 --- a/templates/consul_catalog.tmpl +++ b/templates/consul_catalog.tmpl @@ -1,7 +1,7 @@ [backends] {{range $index, $node := .Nodes}} {{if ne (getAttribute "enable" $node.Service.Tags "true") "false"}} - [backends.backend-{{getBackend $node}}.servers.{{getBackendName $node $index}}] + [backends."backend-{{getBackend $node}}".servers."{{getBackendName $node $index}}"] url = "{{getAttribute "protocol" $node.Service.Tags "http"}}://{{getBackendAddress $node}}:{{$node.Service.Port}}" {{$weight := getAttribute "backend.weight" $node.Service.Tags ""}} {{with $weight}} @@ -14,20 +14,20 @@ {{$service := .ServiceName}} {{$circuitBreaker := getAttribute "backend.circuitbreaker" .Attributes ""}} {{with $circuitBreaker}} - [backends.backend-{{$service}}.circuitbreaker] + [backends."backend-{{$service}}".circuitbreaker] expression = "{{$circuitBreaker}}" {{end}} {{$loadBalancer := getAttribute "backend.loadbalancer" .Attributes ""}} {{with $loadBalancer}} - [backends.backend-{{$service}}.loadbalancer] + [backends."backend-{{$service}}".loadbalancer] method = "{{$loadBalancer}}" {{end}} {{end}} [frontends] {{range .Services}} - [frontends.frontend-{{.ServiceName}}] + [frontends."frontend-{{.ServiceName}}"] backend = "backend-{{.ServiceName}}" passHostHeader = {{getAttribute "frontend.passHostHeader" .Attributes "true"}} priority = {{getAttribute "frontend.priority" .Attributes "0"}} @@ -37,6 +37,6 @@ "{{.}}", {{end}}] {{end}} - [frontends.frontend-{{.ServiceName}}.routes.route-host-{{.ServiceName}}] + [frontends."frontend-{{.ServiceName}}".routes."route-host-{{.ServiceName}}"] rule = "{{getFrontendRule .}}" {{end}}