1
0
Fork 0

Speeding up health change detection by separating it from catalog services check.

This commit is contained in:
vholovko 2017-06-18 12:38:35 +03:00 committed by Ludovic Fernandez
parent 9cb07d026f
commit db1baf80a9
2 changed files with 240 additions and 23 deletions

View file

@ -76,18 +76,25 @@ func (a nodeSorter) Less(i int, j int) bool {
return lentr.Service.Port < rentr.Service.Port
}
func (p *CatalogProvider) watchServices(stopCh <-chan struct{}) <-chan map[string][]string {
watchCh := make(chan map[string][]string)
func getChangedKeys(currState map[string][]string, prevState map[string][]string) ([]string, []string) {
currKeySet := fun.Set(fun.Keys(currState).([]string)).(map[string]bool)
prevKeySet := fun.Set(fun.Keys(prevState).([]string)).(map[string]bool)
catalog := p.client.Catalog()
addedKeys := fun.Difference(currKeySet, prevKeySet).(map[string]bool)
removedKeys := fun.Difference(prevKeySet, currKeySet).(map[string]bool)
return fun.Keys(addedKeys).([]string), fun.Keys(removedKeys).([]string)
}
func (p *CatalogProvider) watchHealthState(stopCh <-chan struct{}, watchCh chan<- map[string][]string) {
health := p.client.Health()
var lastHealthIndex uint64
catalog := p.client.Catalog()
safe.Go(func() {
defer close(watchCh)
// variable to hold previous state
var flashback map[string][]string
catalogOptions := &api.QueryOptions{WaitTime: DefaultWatchWaitTime}
healthOptions := &api.QueryOptions{}
options := &api.QueryOptions{WaitTime: DefaultWatchWaitTime}
for {
select {
@ -96,16 +103,10 @@ func (p *CatalogProvider) watchServices(stopCh <-chan struct{}) <-chan map[strin
default:
}
data, catalogMeta, err := catalog.Services(catalogOptions)
if err != nil {
log.WithError(err).Error("Failed to list services")
return
}
// Listening to changes that leads to `passing` state or degrades from it.
// The call is used just as a trigger for further actions
// (intentionally there is no interest in the received data).
_, healthMeta, err := health.State("passing", healthOptions)
_, meta, err := health.State("passing", options)
if err != nil {
log.WithError(err).Error("Failed to retrieve health checks")
return
@ -113,21 +114,89 @@ func (p *CatalogProvider) watchServices(stopCh <-chan struct{}) <-chan map[strin
// If LastIndex didn't change then it means `Get` returned
// because of the WaitTime and the key didn't changed.
sameServiceAmount := catalogOptions.WaitIndex == catalogMeta.LastIndex
sameServiceHealth := lastHealthIndex == healthMeta.LastIndex
if sameServiceAmount && sameServiceHealth {
if options.WaitIndex == meta.LastIndex {
continue
}
catalogOptions.WaitIndex = catalogMeta.LastIndex
lastHealthIndex = healthMeta.LastIndex
options.WaitIndex = meta.LastIndex
// The response should be unified with watchCatalogServices
data, _, err := catalog.Services(&api.QueryOptions{})
if err != nil {
log.Errorf("Failed to list services: %s", err)
return
}
if data != nil {
watchCh <- data
// A critical note is that the return of a blocking request is no guarantee of a change.
// It is possible that there was an idempotent write that does not affect the result of the query.
// Thus it is required to do extra check for changes...
addedKeys, removedKeys := getChangedKeys(data, flashback)
if len(addedKeys) > 0 {
log.WithField("DiscoveredServices", addedKeys).Debug("Health State change detected.")
watchCh <- data
flashback = data
}
if len(removedKeys) > 0 {
log.WithField("MissingServices", removedKeys).Debug("Health State change detected.")
watchCh <- data
flashback = data
}
}
}
})
}
return watchCh
func (p *CatalogProvider) watchCatalogServices(stopCh <-chan struct{}, watchCh chan<- map[string][]string) {
catalog := p.client.Catalog()
safe.Go(func() {
// variable to hold previous state
var flashback map[string][]string
options := &api.QueryOptions{WaitTime: DefaultWatchWaitTime}
for {
select {
case <-stopCh:
return
default:
}
data, meta, err := catalog.Services(options)
if err != nil {
log.Errorf("Failed to list services: %s", err)
return
}
if options.WaitIndex == meta.LastIndex {
continue
}
options.WaitIndex = meta.LastIndex
if data != nil {
// A critical note is that the return of a blocking request is no guarantee of a change.
// It is possible that there was an idempotent write that does not affect the result of the query.
// Thus it is required to do extra check for changes...
addedKeys, removedKeys := getChangedKeys(data, flashback)
if len(addedKeys) > 0 {
log.WithField("DiscoveredServices", addedKeys).Debug("Catalog Services change detected.")
watchCh <- data
flashback = data
}
if len(removedKeys) > 0 {
log.WithField("MissingServices", removedKeys).Debug("Catalog Services change detected.")
watchCh <- data
flashback = data
}
}
}
})
}
func (p *CatalogProvider) healthyNodes(service string) (catalogUpdate, error) {
@ -357,15 +426,19 @@ func (p *CatalogProvider) getNodes(index map[string][]string) ([]catalogUpdate,
func (p *CatalogProvider) watch(configurationChan chan<- types.ConfigMessage, stop chan bool) error {
stopCh := make(chan struct{})
serviceCatalog := p.watchServices(stopCh)
watchCh := make(chan map[string][]string)
p.watchHealthState(stopCh, watchCh)
p.watchCatalogServices(stopCh, watchCh)
defer close(stopCh)
defer close(watchCh)
for {
select {
case <-stop:
return nil
case index, ok := <-serviceCatalog:
case index, ok := <-watchCh:
if !ok {
return errors.New("Consul service list nil")
}