1
0
Fork 0

Update valkeyrie to a9a70ee

This commit is contained in:
Kevin Pollet 2022-08-11 15:42:07 +02:00 committed by GitHub
parent 4755bb2f33
commit 40db06204b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 93 additions and 72 deletions

View file

@ -67,7 +67,7 @@ func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe.
logger := log.FromContext(ctx)
operation := func() error {
if _, err := p.kvClient.Exists(path.Join(p.RootKey, "qmslkjdfmqlskdjfmqlksjazçueznbvbwzlkajzebvkwjdcqmlsfj"), nil); err != nil {
if _, err := p.kvClient.Exists(ctx, path.Join(p.RootKey, "qmslkjdfmqlskdjfmqlksjazçueznbvbwzlkajzebvkwjdcqmlsfj"), nil); err != nil {
return fmt.Errorf("KV store connection error: %w", err)
}
return nil
@ -76,12 +76,12 @@ func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe.
notify := func(err error, time time.Duration) {
logger.Errorf("KV connection error: %+v, retrying in %s", err, time)
}
err := backoff.RetryNotify(safe.OperationWithRecover(operation), job.NewBackOff(backoff.NewExponentialBackOff()), notify)
err := backoff.RetryNotify(safe.OperationWithRecover(operation), backoff.WithContext(job.NewBackOff(backoff.NewExponentialBackOff()), ctx), notify)
if err != nil {
return fmt.Errorf("cannot connect to KV server: %w", err)
}
configuration, err := p.buildConfiguration()
configuration, err := p.buildConfiguration(ctx)
if err != nil {
logger.Errorf("Cannot build the configuration: %v", err)
} else {
@ -105,7 +105,7 @@ func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe.
func (p *Provider) watchKv(ctx context.Context, configurationChan chan<- dynamic.Message) error {
operation := func() error {
events, err := p.kvClient.WatchTree(p.RootKey, ctx.Done(), nil)
events, err := p.kvClient.WatchTree(ctx, p.RootKey, nil)
if err != nil {
return fmt.Errorf("failed to watch KV: %w", err)
}
@ -119,7 +119,7 @@ func (p *Provider) watchKv(ctx context.Context, configurationChan chan<- dynamic
return errors.New("the WatchTree channel is closed")
}
configuration, errC := p.buildConfiguration()
configuration, errC := p.buildConfiguration(ctx)
if errC != nil {
return errC
}
@ -146,8 +146,8 @@ func (p *Provider) watchKv(ctx context.Context, configurationChan chan<- dynamic
return nil
}
func (p *Provider) buildConfiguration() (*dynamic.Configuration, error) {
pairs, err := p.kvClient.List(p.RootKey, nil)
func (p *Provider) buildConfiguration(ctx context.Context) (*dynamic.Configuration, error) {
pairs, err := p.kvClient.List(ctx, p.RootKey, nil)
if err != nil {
return nil, err
}
@ -190,7 +190,7 @@ func (p *Provider) createKVClient(ctx context.Context) (store.Store, error) {
redis.Register()
}
kvStore, err := valkeyrie.NewStore(p.storeType, p.Endpoints, storeConfig)
kvStore, err := valkeyrie.NewStore(ctx, p.storeType, p.Endpoints, storeConfig)
if err != nil {
return nil, err
}