Merge branch 'v1.5' into master
This commit is contained in:
commit
f6520727a3
44 changed files with 1035 additions and 463 deletions
|
@ -42,6 +42,7 @@ type Service struct {
|
|||
Name string
|
||||
Tags []string
|
||||
Nodes []string
|
||||
Ports []int
|
||||
}
|
||||
|
||||
type serviceUpdate struct {
|
||||
|
@ -185,19 +186,25 @@ func (p *CatalogProvider) watchCatalogServices(stopCh <-chan struct{}, watchCh c
|
|||
errorCh <- err
|
||||
return
|
||||
}
|
||||
|
||||
nodesID := getServiceIds(nodes)
|
||||
ports := getServicePorts(nodes)
|
||||
|
||||
if service, ok := current[key]; ok {
|
||||
service.Tags = value
|
||||
service.Nodes = nodesID
|
||||
service.Ports = ports
|
||||
} else {
|
||||
service := Service{
|
||||
Name: key,
|
||||
Tags: value,
|
||||
Nodes: nodesID,
|
||||
Ports: ports,
|
||||
}
|
||||
current[key] = service
|
||||
}
|
||||
}
|
||||
|
||||
// A critical note is that the return of a blocking request is no guarantee of a change.
|
||||
// It is possible that there was an idempotent write that does not affect the result of the query.
|
||||
// Thus it is required to do extra check for changes...
|
||||
|
@ -304,8 +311,11 @@ func (p *CatalogProvider) getNodes(index map[string][]string) ([]catalogUpdate,
|
|||
}
|
||||
|
||||
func hasChanged(current map[string]Service, previous map[string]Service) bool {
|
||||
if len(current) != len(previous) {
|
||||
return true
|
||||
}
|
||||
addedServiceKeys, removedServiceKeys := getChangedServiceKeys(current, previous)
|
||||
return len(removedServiceKeys) > 0 || len(addedServiceKeys) > 0 || hasNodeOrTagsChanged(current, previous)
|
||||
return len(removedServiceKeys) > 0 || len(addedServiceKeys) > 0 || hasServiceChanged(current, previous)
|
||||
}
|
||||
|
||||
func getChangedServiceKeys(current map[string]Service, previous map[string]Service) ([]string, []string) {
|
||||
|
@ -318,20 +328,24 @@ func getChangedServiceKeys(current map[string]Service, previous map[string]Servi
|
|||
return fun.Keys(addedKeys).([]string), fun.Keys(removedKeys).([]string)
|
||||
}
|
||||
|
||||
func hasNodeOrTagsChanged(current map[string]Service, previous map[string]Service) bool {
|
||||
var added []string
|
||||
var removed []string
|
||||
func hasServiceChanged(current map[string]Service, previous map[string]Service) bool {
|
||||
for key, value := range current {
|
||||
if prevValue, ok := previous[key]; ok {
|
||||
addedNodesKeys, removedNodesKeys := getChangedStringKeys(value.Nodes, prevValue.Nodes)
|
||||
added = append(added, addedNodesKeys...)
|
||||
removed = append(removed, removedNodesKeys...)
|
||||
if len(addedNodesKeys) > 0 || len(removedNodesKeys) > 0 {
|
||||
return true
|
||||
}
|
||||
addedTagsKeys, removedTagsKeys := getChangedStringKeys(value.Tags, prevValue.Tags)
|
||||
added = append(added, addedTagsKeys...)
|
||||
removed = append(removed, removedTagsKeys...)
|
||||
if len(addedTagsKeys) > 0 || len(removedTagsKeys) > 0 {
|
||||
return true
|
||||
}
|
||||
addedPortsKeys, removedPortsKeys := getChangedIntKeys(value.Ports, prevValue.Ports)
|
||||
if len(addedPortsKeys) > 0 || len(removedPortsKeys) > 0 {
|
||||
return true
|
||||
}
|
||||
}
|
||||
}
|
||||
return len(added) > 0 || len(removed) > 0
|
||||
return false
|
||||
}
|
||||
|
||||
func getChangedStringKeys(currState []string, prevState []string) ([]string, []string) {
|
||||
|
@ -344,14 +358,32 @@ func getChangedStringKeys(currState []string, prevState []string) ([]string, []s
|
|||
return fun.Keys(addedKeys).([]string), fun.Keys(removedKeys).([]string)
|
||||
}
|
||||
|
||||
func getChangedIntKeys(currState []int, prevState []int) ([]int, []int) {
|
||||
currKeySet := fun.Set(currState).(map[int]bool)
|
||||
prevKeySet := fun.Set(prevState).(map[int]bool)
|
||||
|
||||
addedKeys := fun.Difference(currKeySet, prevKeySet).(map[int]bool)
|
||||
removedKeys := fun.Difference(prevKeySet, currKeySet).(map[int]bool)
|
||||
|
||||
return fun.Keys(addedKeys).([]int), fun.Keys(removedKeys).([]int)
|
||||
}
|
||||
|
||||
func getServiceIds(services []*api.CatalogService) []string {
|
||||
var serviceIds []string
|
||||
for _, service := range services {
|
||||
serviceIds = append(serviceIds, service.ServiceID)
|
||||
serviceIds = append(serviceIds, service.ID)
|
||||
}
|
||||
return serviceIds
|
||||
}
|
||||
|
||||
func getServicePorts(services []*api.CatalogService) []int {
|
||||
var servicePorts []int
|
||||
for _, service := range services {
|
||||
servicePorts = append(servicePorts, service.ServicePort)
|
||||
}
|
||||
return servicePorts
|
||||
}
|
||||
|
||||
func (p *CatalogProvider) healthyNodes(service string) (catalogUpdate, error) {
|
||||
health := p.client.Health()
|
||||
opts := &api.QueryOptions{}
|
||||
|
@ -364,7 +396,7 @@ func (p *CatalogProvider) healthyNodes(service string) (catalogUpdate, error) {
|
|||
return p.nodeFilter(service, node)
|
||||
}, data).([]*api.ServiceEntry)
|
||||
|
||||
//Merge tags of nodes matching constraints, in a single slice.
|
||||
// Merge tags of nodes matching constraints, in a single slice.
|
||||
tags := fun.Foldl(func(node *api.ServiceEntry, set []string) []string {
|
||||
return fun.Keys(fun.Union(
|
||||
fun.Set(set),
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue