1
0
Fork 0

Fix consul catalog refresh problems

This commit is contained in:
SALLEYRON Julien 2017-09-08 20:50:04 +02:00 committed by Traefiker
parent ecf31097ea
commit f80a6ef2a6
5 changed files with 259 additions and 111 deletions

View file

@ -77,7 +77,7 @@ func (a nodeSorter) Less(i int, j int) bool {
return lentr.Service.Port < rentr.Service.Port
}
func getChangedKeys(currState map[string][]string, prevState map[string][]string) ([]string, []string) {
func getChangedServiceKeys(currState map[string]Service, prevState map[string]Service) ([]string, []string) {
currKeySet := fun.Set(fun.Keys(currState).([]string)).(map[string]bool)
prevKeySet := fun.Set(fun.Keys(prevState).([]string)).(map[string]bool)
@ -87,13 +87,36 @@ func getChangedKeys(currState map[string][]string, prevState map[string][]string
return fun.Keys(addedKeys).([]string), fun.Keys(removedKeys).([]string)
}
func getChangedServiceNodeKeys(currState map[string]Service, prevState map[string]Service) ([]string, []string) {
var addedNodeKeys []string
var removedNodeKeys []string
for key, value := range currState {
if prevValue, ok := prevState[key]; ok {
addedKeys, removedKeys := getChangedHealthyKeys(value.Nodes, prevValue.Nodes)
addedNodeKeys = append(addedKeys)
removedNodeKeys = append(removedKeys)
}
}
return addedNodeKeys, removedNodeKeys
}
func getChangedHealthyKeys(currState []string, prevState []string) ([]string, []string) {
currKeySet := fun.Set(currState).(map[string]bool)
prevKeySet := fun.Set(prevState).(map[string]bool)
addedKeys := fun.Difference(currKeySet, prevKeySet).(map[string]bool)
removedKeys := fun.Difference(prevKeySet, currKeySet).(map[string]bool)
return fun.Keys(addedKeys).([]string), fun.Keys(removedKeys).([]string)
}
func (p *CatalogProvider) watchHealthState(stopCh <-chan struct{}, watchCh chan<- map[string][]string) {
health := p.client.Health()
catalog := p.client.Catalog()
safe.Go(func() {
// variable to hold previous state
var flashback map[string][]string
var flashback []string
options := &api.QueryOptions{WaitTime: DefaultWatchWaitTime}
@ -105,14 +128,20 @@ func (p *CatalogProvider) watchHealthState(stopCh <-chan struct{}, watchCh chan<
}
// Listening to changes that leads to `passing` state or degrades from it.
// The call is used just as a trigger for further actions
// (intentionally there is no interest in the received data).
_, meta, err := health.State("passing", options)
healthyState, meta, err := health.State("passing", options)
if err != nil {
log.WithError(err).Error("Failed to retrieve health checks")
return
}
var current []string
if healthyState != nil {
for _, healthy := range healthyState {
current = append(current, healthy.ServiceID)
}
}
// If LastIndex didn't change then it means `Get` returned
// because of the WaitTime and the key didn't changed.
if options.WaitIndex == meta.LastIndex {
@ -132,30 +161,38 @@ func (p *CatalogProvider) watchHealthState(stopCh <-chan struct{}, watchCh chan<
// A critical note is that the return of a blocking request is no guarantee of a change.
// It is possible that there was an idempotent write that does not affect the result of the query.
// Thus it is required to do extra check for changes...
addedKeys, removedKeys := getChangedKeys(data, flashback)
addedKeys, removedKeys := getChangedHealthyKeys(current, flashback)
if len(addedKeys) > 0 {
log.WithField("DiscoveredServices", addedKeys).Debug("Health State change detected.")
watchCh <- data
flashback = data
flashback = current
}
if len(removedKeys) > 0 {
log.WithField("MissingServices", removedKeys).Debug("Health State change detected.")
watchCh <- data
flashback = data
flashback = current
}
}
}
})
}
// Service represent a Consul service.
type Service struct {
Name string
Tags []string
Nodes []string
}
func (p *CatalogProvider) watchCatalogServices(stopCh <-chan struct{}, watchCh chan<- map[string][]string) {
catalog := p.client.Catalog()
safe.Go(func() {
current := make(map[string]Service)
// variable to hold previous state
var flashback map[string][]string
var flashback map[string]Service
options := &api.QueryOptions{WaitTime: DefaultWatchWaitTime}
@ -179,26 +216,55 @@ func (p *CatalogProvider) watchCatalogServices(stopCh <-chan struct{}, watchCh c
options.WaitIndex = meta.LastIndex
if data != nil {
for key, value := range data {
nodes, _, err := catalog.Service(key, "", options)
if err != nil {
log.Errorf("Failed to get detail of service %s: %s", key, err)
return
}
nodesID := getServiceIds(nodes)
if service, ok := current[key]; ok {
service.Tags = value
service.Nodes = nodesID
} else {
service := Service{
Name: key,
Tags: value,
Nodes: nodesID,
}
current[key] = service
}
}
// A critical note is that the return of a blocking request is no guarantee of a change.
// It is possible that there was an idempotent write that does not affect the result of the query.
// Thus it is required to do extra check for changes...
addedKeys, removedKeys := getChangedKeys(data, flashback)
addedServiceKeys, removedServiceKeys := getChangedServiceKeys(current, flashback)
if len(addedKeys) > 0 {
log.WithField("DiscoveredServices", addedKeys).Debug("Catalog Services change detected.")
addedServiceNodeKeys, removedServiceNodeKeys := getChangedServiceNodeKeys(current, flashback)
if len(addedServiceKeys) > 0 || len(addedServiceNodeKeys) > 0 {
log.WithField("DiscoveredServices", addedServiceKeys).Debug("Catalog Services change detected.")
watchCh <- data
flashback = data
flashback = current
}
if len(removedKeys) > 0 {
log.WithField("MissingServices", removedKeys).Debug("Catalog Services change detected.")
if len(removedServiceKeys) > 0 || len(removedServiceNodeKeys) > 0 {
log.WithField("MissingServices", removedServiceKeys).Debug("Catalog Services change detected.")
watchCh <- data
flashback = data
flashback = current
}
}
}
})
}
func getServiceIds(services []*api.CatalogService) []string {
var serviceIds []string
for _, service := range services {
serviceIds = append(serviceIds, service.ServiceID)
}
return serviceIds
}
func (p *CatalogProvider) healthyNodes(service string) (catalogUpdate, error) {
health := p.client.Health()

View file

@ -613,8 +613,8 @@ func TestConsulCatalogNodeSorter(t *testing.T) {
func TestConsulCatalogGetChangedKeys(t *testing.T) {
type Input struct {
currState map[string][]string
prevState map[string][]string
currState map[string]Service
prevState map[string]Service
}
type Output struct {
@ -628,37 +628,37 @@ func TestConsulCatalogGetChangedKeys(t *testing.T) {
}{
{
input: Input{
currState: map[string][]string{
"foo-service": {"v1"},
"bar-service": {"v1"},
"baz-service": {"v1"},
"qux-service": {"v1"},
"quux-service": {"v1"},
"quuz-service": {"v1"},
"corge-service": {"v1"},
"grault-service": {"v1"},
"garply-service": {"v1"},
"waldo-service": {"v1"},
"fred-service": {"v1"},
"plugh-service": {"v1"},
"xyzzy-service": {"v1"},
"thud-service": {"v1"},
currState: map[string]Service{
"foo-service": {Name: "v1"},
"bar-service": {Name: "v1"},
"baz-service": {Name: "v1"},
"qux-service": {Name: "v1"},
"quux-service": {Name: "v1"},
"quuz-service": {Name: "v1"},
"corge-service": {Name: "v1"},
"grault-service": {Name: "v1"},
"garply-service": {Name: "v1"},
"waldo-service": {Name: "v1"},
"fred-service": {Name: "v1"},
"plugh-service": {Name: "v1"},
"xyzzy-service": {Name: "v1"},
"thud-service": {Name: "v1"},
},
prevState: map[string][]string{
"foo-service": {"v1"},
"bar-service": {"v1"},
"baz-service": {"v1"},
"qux-service": {"v1"},
"quux-service": {"v1"},
"quuz-service": {"v1"},
"corge-service": {"v1"},
"grault-service": {"v1"},
"garply-service": {"v1"},
"waldo-service": {"v1"},
"fred-service": {"v1"},
"plugh-service": {"v1"},
"xyzzy-service": {"v1"},
"thud-service": {"v1"},
prevState: map[string]Service{
"foo-service": {Name: "v1"},
"bar-service": {Name: "v1"},
"baz-service": {Name: "v1"},
"qux-service": {Name: "v1"},
"quux-service": {Name: "v1"},
"quuz-service": {Name: "v1"},
"corge-service": {Name: "v1"},
"grault-service": {Name: "v1"},
"garply-service": {Name: "v1"},
"waldo-service": {Name: "v1"},
"fred-service": {Name: "v1"},
"plugh-service": {Name: "v1"},
"xyzzy-service": {Name: "v1"},
"thud-service": {Name: "v1"},
},
},
output: Output{
@ -668,34 +668,34 @@ func TestConsulCatalogGetChangedKeys(t *testing.T) {
},
{
input: Input{
currState: map[string][]string{
"foo-service": {"v1"},
"bar-service": {"v1"},
"baz-service": {"v1"},
"qux-service": {"v1"},
"quux-service": {"v1"},
"quuz-service": {"v1"},
"corge-service": {"v1"},
"grault-service": {"v1"},
"garply-service": {"v1"},
"waldo-service": {"v1"},
"fred-service": {"v1"},
"plugh-service": {"v1"},
"xyzzy-service": {"v1"},
"thud-service": {"v1"},
currState: map[string]Service{
"foo-service": {Name: "v1"},
"bar-service": {Name: "v1"},
"baz-service": {Name: "v1"},
"qux-service": {Name: "v1"},
"quux-service": {Name: "v1"},
"quuz-service": {Name: "v1"},
"corge-service": {Name: "v1"},
"grault-service": {Name: "v1"},
"garply-service": {Name: "v1"},
"waldo-service": {Name: "v1"},
"fred-service": {Name: "v1"},
"plugh-service": {Name: "v1"},
"xyzzy-service": {Name: "v1"},
"thud-service": {Name: "v1"},
},
prevState: map[string][]string{
"foo-service": {"v1"},
"bar-service": {"v1"},
"baz-service": {"v1"},
"corge-service": {"v1"},
"grault-service": {"v1"},
"garply-service": {"v1"},
"waldo-service": {"v1"},
"fred-service": {"v1"},
"plugh-service": {"v1"},
"xyzzy-service": {"v1"},
"thud-service": {"v1"},
prevState: map[string]Service{
"foo-service": {Name: "v1"},
"bar-service": {Name: "v1"},
"baz-service": {Name: "v1"},
"corge-service": {Name: "v1"},
"grault-service": {Name: "v1"},
"garply-service": {Name: "v1"},
"waldo-service": {Name: "v1"},
"fred-service": {Name: "v1"},
"plugh-service": {Name: "v1"},
"xyzzy-service": {Name: "v1"},
"thud-service": {Name: "v1"},
},
},
output: Output{
@ -705,33 +705,33 @@ func TestConsulCatalogGetChangedKeys(t *testing.T) {
},
{
input: Input{
currState: map[string][]string{
"foo-service": {"v1"},
"qux-service": {"v1"},
"quux-service": {"v1"},
"quuz-service": {"v1"},
"corge-service": {"v1"},
"grault-service": {"v1"},
"garply-service": {"v1"},
"waldo-service": {"v1"},
"fred-service": {"v1"},
"plugh-service": {"v1"},
"xyzzy-service": {"v1"},
"thud-service": {"v1"},
currState: map[string]Service{
"foo-service": {Name: "v1"},
"qux-service": {Name: "v1"},
"quux-service": {Name: "v1"},
"quuz-service": {Name: "v1"},
"corge-service": {Name: "v1"},
"grault-service": {Name: "v1"},
"garply-service": {Name: "v1"},
"waldo-service": {Name: "v1"},
"fred-service": {Name: "v1"},
"plugh-service": {Name: "v1"},
"xyzzy-service": {Name: "v1"},
"thud-service": {Name: "v1"},
},
prevState: map[string][]string{
"foo-service": {"v1"},
"bar-service": {"v1"},
"baz-service": {"v1"},
"qux-service": {"v1"},
"quux-service": {"v1"},
"quuz-service": {"v1"},
"corge-service": {"v1"},
"waldo-service": {"v1"},
"fred-service": {"v1"},
"plugh-service": {"v1"},
"xyzzy-service": {"v1"},
"thud-service": {"v1"},
prevState: map[string]Service{
"foo-service": {Name: "v1"},
"bar-service": {Name: "v1"},
"baz-service": {Name: "v1"},
"qux-service": {Name: "v1"},
"quux-service": {Name: "v1"},
"quuz-service": {Name: "v1"},
"corge-service": {Name: "v1"},
"waldo-service": {Name: "v1"},
"fred-service": {Name: "v1"},
"plugh-service": {Name: "v1"},
"xyzzy-service": {Name: "v1"},
"thud-service": {Name: "v1"},
},
},
output: Output{
@ -742,7 +742,7 @@ func TestConsulCatalogGetChangedKeys(t *testing.T) {
}
for _, c := range cases {
addedKeys, removedKeys := getChangedKeys(c.input.currState, c.input.prevState)
addedKeys, removedKeys := getChangedServiceKeys(c.input.currState, c.input.prevState)
if !reflect.DeepEqual(fun.Set(addedKeys), fun.Set(c.output.addedKeys)) {
t.Fatalf("Added keys comparison results: got %q, want %q", addedKeys, c.output.addedKeys)