Add KV store providers (dynamic configuration only)
Co-authored-by: Jean-Baptiste Doumenjou <jb.doumenjou@gmail.com>
This commit is contained in:
parent
028683666d
commit
9b9f4be6a4
61 changed files with 5825 additions and 70 deletions
191
pkg/provider/kv/kv.go
Normal file
191
pkg/provider/kv/kv.go
Normal file
|
@ -0,0 +1,191 @@
|
|||
package kv
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"path"
|
||||
"time"
|
||||
|
||||
"github.com/abronan/valkeyrie"
|
||||
"github.com/abronan/valkeyrie/store"
|
||||
"github.com/abronan/valkeyrie/store/consul"
|
||||
etcdv3 "github.com/abronan/valkeyrie/store/etcd/v3"
|
||||
"github.com/abronan/valkeyrie/store/redis"
|
||||
"github.com/abronan/valkeyrie/store/zookeeper"
|
||||
"github.com/cenkalti/backoff/v3"
|
||||
"github.com/containous/traefik/v2/pkg/config/dynamic"
|
||||
"github.com/containous/traefik/v2/pkg/config/kv"
|
||||
"github.com/containous/traefik/v2/pkg/job"
|
||||
"github.com/containous/traefik/v2/pkg/log"
|
||||
"github.com/containous/traefik/v2/pkg/safe"
|
||||
"github.com/containous/traefik/v2/pkg/types"
|
||||
)
|
||||
|
||||
// Provider holds configurations of the provider.
|
||||
type Provider struct {
|
||||
RootKey string `description:"Root key used for KV store" export:"true" json:"rootKey,omitempty" toml:"rootKey,omitempty" yaml:"rootKey,omitempty"`
|
||||
|
||||
Endpoints []string `description:"KV store endpoints" json:"endpoints,omitempty" toml:"endpoints,omitempty" yaml:"endpoints,omitempty"`
|
||||
Username string `description:"KV Username" json:"username,omitempty" toml:"username,omitempty" yaml:"username,omitempty"`
|
||||
Password string `description:"KV Password" json:"password,omitempty" toml:"password,omitempty" yaml:"password,omitempty"`
|
||||
TLS *types.ClientTLS `description:"Enable TLS support" export:"true" json:"tls,omitempty" toml:"tls,omitempty" yaml:"tls,omitempty"`
|
||||
|
||||
storeType store.Backend
|
||||
kvClient store.Store
|
||||
name string
|
||||
}
|
||||
|
||||
// SetDefaults sets the default values.
|
||||
func (p *Provider) SetDefaults() {
|
||||
p.RootKey = "traefik"
|
||||
}
|
||||
|
||||
// Init the provider
|
||||
func (p *Provider) Init(storeType store.Backend, name string) error {
|
||||
ctx := log.With(context.Background(), log.Str(log.ProviderName, string(storeType)))
|
||||
|
||||
p.storeType = storeType
|
||||
p.name = name
|
||||
|
||||
kvClient, err := p.createKVClient(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to Connect to KV store: %w", err)
|
||||
}
|
||||
|
||||
p.kvClient = kvClient
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Provide allows the docker provider to provide configurations to traefik using the given configuration channel.
|
||||
func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe.Pool) error {
|
||||
ctx := log.With(context.Background(), log.Str(log.ProviderName, string(p.storeType)))
|
||||
|
||||
logger := log.FromContext(ctx)
|
||||
|
||||
operation := func() error {
|
||||
if _, err := p.kvClient.Exists(path.Join(p.RootKey, "qmslkjdfmqlskdjfmqlksjazçueznbvbwzlkajzebvkwjdcqmlsfj"), nil); err != nil {
|
||||
return fmt.Errorf("KV store connection error: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
notify := func(err error, time time.Duration) {
|
||||
logger.Errorf("KV connection error: %+v, retrying in %s", err, time)
|
||||
}
|
||||
err := backoff.RetryNotify(safe.OperationWithRecover(operation), job.NewBackOff(backoff.NewExponentialBackOff()), notify)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot connect to KV server: %w", err)
|
||||
}
|
||||
|
||||
configuration, err := p.buildConfiguration()
|
||||
if err != nil {
|
||||
logger.Errorf("Cannot build the configuration: %v", err)
|
||||
} else {
|
||||
configurationChan <- dynamic.Message{
|
||||
ProviderName: p.name,
|
||||
Configuration: configuration,
|
||||
}
|
||||
}
|
||||
|
||||
pool.Go(func(stop chan bool) {
|
||||
err := p.watchKv(ctx, configurationChan, p.RootKey, stop)
|
||||
if err != nil {
|
||||
logger.Errorf("Cannot watch KV store: %v", err)
|
||||
}
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Provider) watchKv(ctx context.Context, configurationChan chan<- dynamic.Message, prefix string, stop chan bool) error {
|
||||
operation := func() error {
|
||||
events, err := p.kvClient.WatchTree(p.RootKey, make(chan struct{}), nil)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to watch KV: %w", err)
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-stop:
|
||||
return nil
|
||||
case _, ok := <-events:
|
||||
if !ok {
|
||||
return errors.New("the WatchTree channel is closed")
|
||||
}
|
||||
|
||||
configuration, errC := p.buildConfiguration()
|
||||
if errC != nil {
|
||||
return errC
|
||||
}
|
||||
|
||||
if configuration != nil {
|
||||
configurationChan <- dynamic.Message{
|
||||
ProviderName: string(p.storeType),
|
||||
Configuration: configuration,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
notify := func(err error, time time.Duration) {
|
||||
log.FromContext(ctx).Errorf("KV connection error: %+v, retrying in %s", err, time)
|
||||
}
|
||||
err := backoff.RetryNotify(safe.OperationWithRecover(operation), job.NewBackOff(backoff.NewExponentialBackOff()), notify)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot connect to KV server: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Provider) buildConfiguration() (*dynamic.Configuration, error) {
|
||||
pairs, err := p.kvClient.List(p.RootKey, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cfg := &dynamic.Configuration{}
|
||||
err = kv.Decode(pairs, cfg, p.RootKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
func (p *Provider) createKVClient(ctx context.Context) (store.Store, error) {
|
||||
storeConfig := &store.Config{
|
||||
ConnectionTimeout: 3 * time.Second,
|
||||
Bucket: "traefik",
|
||||
Username: p.Username,
|
||||
Password: p.Password,
|
||||
}
|
||||
|
||||
if p.TLS != nil {
|
||||
var err error
|
||||
storeConfig.TLS, err = p.TLS.CreateTLSConfig(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
switch p.storeType {
|
||||
case store.CONSUL:
|
||||
consul.Register()
|
||||
case store.ETCDV3:
|
||||
etcdv3.Register()
|
||||
case store.ZK:
|
||||
zookeeper.Register()
|
||||
case store.REDIS:
|
||||
redis.Register()
|
||||
}
|
||||
|
||||
kvStore, err := valkeyrie.NewStore(p.storeType, p.Endpoints, storeConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &storeWrapper{Store: kvStore}, nil
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue