Merge branch 'v1.5' into master

This commit is contained in:
Fernandez Ludovic 2018-03-21 22:43:15 +01:00
commit 007a1fc7f2
8 changed files with 26 additions and 16 deletions

View file

@ -3,6 +3,7 @@ package consulcatalog
import (
"errors"
"strings"
"sync"
"text/template"
"time"
@ -138,8 +139,15 @@ func (p *Provider) watch(configurationChan chan<- types.ConfigMessage, stop chan
watchCh := make(chan map[string][]string)
errorCh := make(chan error)
p.watchHealthState(stopCh, watchCh, errorCh)
p.watchCatalogServices(stopCh, watchCh, errorCh)
var errorOnce sync.Once
notifyError := func(err error) {
errorOnce.Do(func() {
errorCh <- err
})
}
p.watchHealthState(stopCh, watchCh, notifyError)
p.watchCatalogServices(stopCh, watchCh, notifyError)
defer close(stopCh)
defer close(watchCh)
@ -155,7 +163,7 @@ func (p *Provider) watch(configurationChan chan<- types.ConfigMessage, stop chan
log.Debug("List of services changed")
nodes, err := p.getNodes(index)
if err != nil {
return err
notifyError(err)
}
configuration := p.buildConfiguration(nodes)
configurationChan <- types.ConfigMessage{
@ -168,7 +176,7 @@ func (p *Provider) watch(configurationChan chan<- types.ConfigMessage, stop chan
}
}
func (p *Provider) watchCatalogServices(stopCh <-chan struct{}, watchCh chan<- map[string][]string, errorCh chan<- error) {
func (p *Provider) watchCatalogServices(stopCh <-chan struct{}, watchCh chan<- map[string][]string, notifyError func(error)) {
catalog := p.client.Catalog()
safe.Go(func() {
@ -187,7 +195,7 @@ func (p *Provider) watchCatalogServices(stopCh <-chan struct{}, watchCh chan<- m
data, meta, err := catalog.Services(options)
if err != nil {
log.Errorf("Failed to list services: %v", err)
errorCh <- err
notifyError(err)
return
}
@ -203,7 +211,7 @@ func (p *Provider) watchCatalogServices(stopCh <-chan struct{}, watchCh chan<- m
nodes, _, err := catalog.Service(key, "", &api.QueryOptions{})
if err != nil {
log.Errorf("Failed to get detail of service %s: %v", key, err)
errorCh <- err
notifyError(err)
return
}
@ -239,7 +247,7 @@ func (p *Provider) watchCatalogServices(stopCh <-chan struct{}, watchCh chan<- m
})
}
func (p *Provider) watchHealthState(stopCh <-chan struct{}, watchCh chan<- map[string][]string, errorCh chan<- error) {
func (p *Provider) watchHealthState(stopCh <-chan struct{}, watchCh chan<- map[string][]string, notifyError func(error)) {
health := p.client.Health()
catalog := p.client.Catalog()
@ -260,7 +268,7 @@ func (p *Provider) watchHealthState(stopCh <-chan struct{}, watchCh chan<- map[s
healthyState, meta, err := health.State("passing", options)
if err != nil {
log.WithError(err).Error("Failed to retrieve health checks")
errorCh <- err
notifyError(err)
return
}
@ -284,7 +292,7 @@ func (p *Provider) watchHealthState(stopCh <-chan struct{}, watchCh chan<- map[s
data, _, err := catalog.Services(&api.QueryOptions{})
if err != nil {
log.Errorf("Failed to list services: %v", err)
errorCh <- err
notifyError(err)
return
}