Add Consul integration tests
Signed-off-by: Emile Vauge <emile@vauge.com>
This commit is contained in:
parent
26774d2317
commit
c3aadab615
10 changed files with 306 additions and 42 deletions
|
@ -38,12 +38,11 @@ type KvTLS struct {
|
|||
InsecureSkipVerify bool
|
||||
}
|
||||
|
||||
func (provider *Kv) watchKv(configurationChan chan<- types.ConfigMessage, prefix string, stop chan bool) {
|
||||
func (provider *Kv) watchKv(configurationChan chan<- types.ConfigMessage, prefix string, stop chan bool) error {
|
||||
operation := func() error {
|
||||
events, err := provider.kvclient.WatchTree(provider.Prefix, make(chan struct{}) /* stop chan */)
|
||||
events, err := provider.kvclient.WatchTree(provider.Prefix, make(chan struct{}))
|
||||
if err != nil {
|
||||
log.Errorf("Failed to WatchTree %s", err)
|
||||
return err
|
||||
return fmt.Errorf("Failed to KV WatchTree: %v", err)
|
||||
}
|
||||
for {
|
||||
select {
|
||||
|
@ -65,12 +64,13 @@ func (provider *Kv) watchKv(configurationChan chan<- types.ConfigMessage, prefix
|
|||
}
|
||||
|
||||
notify := func(err error, time time.Duration) {
|
||||
log.Errorf("KV connection error %+v, retrying in %s", err, time)
|
||||
log.Errorf("KV connection error: %+v, retrying in %s", err, time)
|
||||
}
|
||||
err := backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), notify)
|
||||
if err != nil {
|
||||
log.Fatalf("Cannot connect to KV server %+v", err)
|
||||
return fmt.Errorf("Cannot connect to KV server: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (provider *Kv) provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool) error {
|
||||
|
@ -112,15 +112,18 @@ func (provider *Kv) provide(configurationChan chan<- types.ConfigMessage, pool *
|
|||
storeConfig,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
return fmt.Errorf("Failed to Connect to KV store: %v", err)
|
||||
}
|
||||
if _, err := kv.List(""); err != nil {
|
||||
return err
|
||||
if _, err := kv.Exists("qmslkjdfmqlskdjfmqlksjazçueznbvbwzlkajzebvkwjdcqmlsfj"); err != nil {
|
||||
return fmt.Errorf("Failed to test KV store connection: %v", err)
|
||||
}
|
||||
provider.kvclient = kv
|
||||
if provider.Watch {
|
||||
pool.Go(func(stop chan bool) {
|
||||
provider.watchKv(configurationChan, provider.Prefix, stop)
|
||||
err := provider.watchKv(configurationChan, provider.Prefix, stop)
|
||||
if err != nil {
|
||||
log.Errorf("Cannot watch KV store: %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
configuration := provider.loadConfig()
|
||||
|
@ -131,11 +134,11 @@ func (provider *Kv) provide(configurationChan chan<- types.ConfigMessage, pool *
|
|||
return nil
|
||||
}
|
||||
notify := func(err error, time time.Duration) {
|
||||
log.Errorf("KV connection error %+v, retrying in %s", err, time)
|
||||
log.Errorf("KV connection error: %+v, retrying in %s", err, time)
|
||||
}
|
||||
err := backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), notify)
|
||||
if err != nil {
|
||||
log.Fatalf("Cannot connect to KV server %+v", err)
|
||||
return fmt.Errorf("Cannot connect to KV server: %v", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue