refactor(kv): split provide and configuration.
This commit is contained in:
parent
e3d1201b46
commit
be0dd71bb4
9 changed files with 944 additions and 546 deletions
|
@ -4,10 +4,8 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"text/template"
|
||||
"time"
|
||||
|
||||
"github.com/BurntSushi/ty/fun"
|
||||
"github.com/cenk/backoff"
|
||||
"github.com/containous/traefik/job"
|
||||
"github.com/containous/traefik/log"
|
||||
|
@ -27,7 +25,7 @@ type Provider struct {
|
|||
Username string `description:"KV Username"`
|
||||
Password string `description:"KV Password"`
|
||||
storeType store.Backend
|
||||
kvclient store.Store
|
||||
kvClient store.Store
|
||||
}
|
||||
|
||||
// CreateStore create the K/V store
|
||||
|
@ -58,16 +56,16 @@ func (p *Provider) SetStoreType(storeType store.Backend) {
|
|||
p.storeType = storeType
|
||||
}
|
||||
|
||||
// SetKVClient kvclient setter
|
||||
// SetKVClient kvClient setter
|
||||
func (p *Provider) SetKVClient(kvClient store.Store) {
|
||||
p.kvclient = kvClient
|
||||
p.kvClient = kvClient
|
||||
}
|
||||
|
||||
func (p *Provider) watchKv(configurationChan chan<- types.ConfigMessage, prefix string, stop chan bool) error {
|
||||
operation := func() error {
|
||||
events, err := p.kvclient.WatchTree(p.Prefix, make(chan struct{}), nil)
|
||||
events, err := p.kvClient.WatchTree(p.Prefix, make(chan struct{}), nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to KV WatchTree: %v", err)
|
||||
return fmt.Errorf("failed to KV WatchTree: %v", err)
|
||||
}
|
||||
for {
|
||||
select {
|
||||
|
@ -77,7 +75,7 @@ func (p *Provider) watchKv(configurationChan chan<- types.ConfigMessage, prefix
|
|||
if !ok {
|
||||
return errors.New("watchtree channel closed")
|
||||
}
|
||||
configuration := p.loadConfig()
|
||||
configuration := p.buildConfiguration()
|
||||
if configuration != nil {
|
||||
configurationChan <- types.ConfigMessage{
|
||||
ProviderName: string(p.storeType),
|
||||
|
@ -93,7 +91,7 @@ func (p *Provider) watchKv(configurationChan chan<- types.ConfigMessage, prefix
|
|||
}
|
||||
err := backoff.RetryNotify(safe.OperationWithRecover(operation), job.NewBackOff(backoff.NewExponentialBackOff()), notify)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Cannot connect to KV server: %v", err)
|
||||
return fmt.Errorf("cannot connect to KV server: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -102,8 +100,8 @@ func (p *Provider) watchKv(configurationChan chan<- types.ConfigMessage, prefix
|
|||
func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error {
|
||||
p.Constraints = append(p.Constraints, constraints...)
|
||||
operation := func() error {
|
||||
if _, err := p.kvclient.Exists(p.Prefix+"/qmslkjdfmqlskdjfmqlksjazçueznbvbwzlkajzebvkwjdcqmlsfj", nil); err != nil {
|
||||
return fmt.Errorf("Failed to test KV store connection: %v", err)
|
||||
if _, err := p.kvClient.Exists(p.Prefix+"/qmslkjdfmqlskdjfmqlksjazçueznbvbwzlkajzebvkwjdcqmlsfj", nil); err != nil {
|
||||
return fmt.Errorf("failed to test KV store connection: %v", err)
|
||||
}
|
||||
if p.Watch {
|
||||
pool.Go(func(stop chan bool) {
|
||||
|
@ -113,7 +111,7 @@ func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *s
|
|||
}
|
||||
})
|
||||
}
|
||||
configuration := p.loadConfig()
|
||||
configuration := p.buildConfiguration()
|
||||
configurationChan <- types.ConfigMessage{
|
||||
ProviderName: string(p.storeType),
|
||||
Configuration: configuration,
|
||||
|
@ -125,142 +123,7 @@ func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *s
|
|||
}
|
||||
err := backoff.RetryNotify(safe.OperationWithRecover(operation), job.NewBackOff(backoff.NewExponentialBackOff()), notify)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Cannot connect to KV server: %v", err)
|
||||
return fmt.Errorf("cannot connect to KV server: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Provider) loadConfig() *types.Configuration {
|
||||
templateObjects := struct {
|
||||
Prefix string
|
||||
}{
|
||||
// Allow `/traefik/alias` to supersede `p.Prefix`
|
||||
strings.TrimSuffix(p.get(p.Prefix, p.Prefix+"/alias"), "/"),
|
||||
}
|
||||
|
||||
var KvFuncMap = template.FuncMap{
|
||||
"List": p.list,
|
||||
"ListServers": p.listServers,
|
||||
"Get": p.get,
|
||||
"SplitGet": p.splitGet,
|
||||
"Last": p.last,
|
||||
"getSticky": p.getSticky,
|
||||
"hasStickinessLabel": p.hasStickinessLabel,
|
||||
"getStickinessCookieName": p.getStickinessCookieName,
|
||||
}
|
||||
|
||||
configuration, err := p.GetConfiguration("templates/kv.tmpl", KvFuncMap, templateObjects)
|
||||
if err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
|
||||
for key, frontend := range configuration.Frontends {
|
||||
if _, ok := configuration.Backends[frontend.Backend]; !ok {
|
||||
delete(configuration.Frontends, key)
|
||||
}
|
||||
}
|
||||
|
||||
return configuration
|
||||
}
|
||||
|
||||
func (p *Provider) list(keys ...string) []string {
|
||||
joinedKeys := strings.Join(keys, "")
|
||||
keysPairs, err := p.kvclient.List(joinedKeys, nil)
|
||||
if err != nil {
|
||||
log.Debugf("Cannot get keys %s %s ", joinedKeys, err)
|
||||
return nil
|
||||
}
|
||||
directoryKeys := make(map[string]string)
|
||||
for _, key := range keysPairs {
|
||||
directory := strings.Split(strings.TrimPrefix(key.Key, joinedKeys), "/")[0]
|
||||
directoryKeys[directory] = joinedKeys + directory
|
||||
}
|
||||
return fun.Values(directoryKeys).([]string)
|
||||
}
|
||||
|
||||
func (p *Provider) listServers(backend string) []string {
|
||||
serverNames := p.list(backend, "/servers/")
|
||||
return fun.Filter(func(serverName string) bool {
|
||||
key := fmt.Sprint(serverName, "/url")
|
||||
if _, err := p.kvclient.Get(key, nil); err != nil {
|
||||
if err != store.ErrKeyNotFound {
|
||||
log.Errorf("Failed to retrieve value for key %s: %s", key, err)
|
||||
}
|
||||
return false
|
||||
}
|
||||
return p.checkConstraints(serverName, "/tags")
|
||||
}, serverNames).([]string)
|
||||
}
|
||||
|
||||
func (p *Provider) get(defaultValue string, keys ...string) string {
|
||||
joinedKeys := strings.Join(keys, "")
|
||||
if p.storeType == store.ETCD {
|
||||
joinedKeys = strings.TrimPrefix(joinedKeys, "/")
|
||||
}
|
||||
keyPair, err := p.kvclient.Get(joinedKeys, nil)
|
||||
if err != nil {
|
||||
log.Debugf("Cannot get key %s %s, setting default %s", joinedKeys, err, defaultValue)
|
||||
return defaultValue
|
||||
} else if keyPair == nil {
|
||||
log.Debugf("Cannot get key %s, setting default %s", joinedKeys, defaultValue)
|
||||
return defaultValue
|
||||
}
|
||||
return string(keyPair.Value)
|
||||
}
|
||||
|
||||
func (p *Provider) splitGet(keys ...string) []string {
|
||||
joinedKeys := strings.Join(keys, "")
|
||||
keyPair, err := p.kvclient.Get(joinedKeys, nil)
|
||||
if err != nil {
|
||||
log.Debugf("Cannot get key %s %s, setting default empty", joinedKeys, err)
|
||||
return []string{}
|
||||
} else if keyPair == nil {
|
||||
log.Debugf("Cannot get key %s, setting default %empty", joinedKeys)
|
||||
return []string{}
|
||||
}
|
||||
return strings.Split(string(keyPair.Value), ",")
|
||||
}
|
||||
|
||||
func (p *Provider) last(key string) string {
|
||||
splittedKey := strings.Split(key, "/")
|
||||
return splittedKey[len(splittedKey)-1]
|
||||
}
|
||||
|
||||
func (p *Provider) checkConstraints(keys ...string) bool {
|
||||
joinedKeys := strings.Join(keys, "")
|
||||
keyPair, err := p.kvclient.Get(joinedKeys, nil)
|
||||
|
||||
value := ""
|
||||
if err == nil && keyPair != nil && keyPair.Value != nil {
|
||||
value = string(keyPair.Value)
|
||||
}
|
||||
|
||||
constraintTags := strings.Split(value, ",")
|
||||
ok, failingConstraint := p.MatchConstraints(constraintTags)
|
||||
if !ok {
|
||||
if failingConstraint != nil {
|
||||
log.Debugf("Constraint %v not matching with following tags: %v", failingConstraint.String(), value)
|
||||
}
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (p *Provider) getSticky(rootPath string) string {
|
||||
stickyValue := p.get("", rootPath, "/loadbalancer", "/sticky")
|
||||
if len(stickyValue) > 0 {
|
||||
log.Warnf("Deprecated configuration found: %s. Please use %s.", "loadbalancer/sticky", "loadbalancer/stickiness")
|
||||
} else {
|
||||
stickyValue = "false"
|
||||
}
|
||||
return stickyValue
|
||||
}
|
||||
|
||||
func (p *Provider) hasStickinessLabel(rootPath string) bool {
|
||||
stickinessValue := p.get("false", rootPath, "/loadbalancer", "/stickiness")
|
||||
return len(stickinessValue) > 0 && strings.EqualFold(strings.TrimSpace(stickinessValue), "true")
|
||||
}
|
||||
|
||||
func (p *Provider) getStickinessCookieName(rootPath string) string {
|
||||
return p.get("", rootPath, "/loadbalancer", "/stickiness", "/cookiename")
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue