Fix kubernetes providers shutdown and clean safe.Pool

This commit is contained in:
Julien Salleyron 2020-02-03 17:56:04 +01:00 committed by GitHub
parent c80d53e7e5
commit 1b63c95c4e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 73 additions and 190 deletions

View file

@ -98,11 +98,9 @@ func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe.
return err
}
pool.Go(func(stop chan bool) {
pool.GoCtx(func(ctxPool context.Context) {
operation := func() error {
stopWatch := make(chan struct{}, 1)
defer close(stopWatch)
eventsChan, err := k8sClient.WatchAll(p.Namespaces, stopWatch)
eventsChan, err := k8sClient.WatchAll(p.Namespaces, ctxPool.Done())
if err != nil {
logger.Errorf("Error watching kubernetes events: %v", err)
@ -110,20 +108,20 @@ func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe.
select {
case <-timer.C:
return err
case <-stop:
case <-ctxPool.Done():
return nil
}
}
throttleDuration := time.Duration(p.ThrottleDuration)
throttledChan := throttleEvents(ctxLog, throttleDuration, stop, eventsChan)
throttledChan := throttleEvents(ctxLog, throttleDuration, pool, eventsChan)
if throttledChan != nil {
eventsChan = throttledChan
}
for {
select {
case <-stop:
case <-ctxPool.Done():
return nil
case event := <-eventsChan:
// Note that event is the *first* event that came in during this throttling interval -- if we're hitting our throttle, we may have dropped events.
@ -156,7 +154,7 @@ func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe.
notify := func(err error, time time.Duration) {
logger.Errorf("Provider connection error: %v; retrying in %s", err, time)
}
err := backoff.RetryNotify(safe.OperationWithRecover(operation), job.NewBackOff(backoff.NewExponentialBackOff()), notify)
err := backoff.RetryNotify(safe.OperationWithRecover(operation), backoff.WithContext(job.NewBackOff(backoff.NewExponentialBackOff()), ctxPool), notify)
if err != nil {
logger.Errorf("Cannot connect to Provider: %v", err)
}
@ -625,7 +623,7 @@ func getCABlocks(secret *corev1.Secret, namespace, secretName string) (string, e
return cert, nil
}
func throttleEvents(ctx context.Context, throttleDuration time.Duration, stop chan bool, eventsChan <-chan interface{}) chan interface{} {
func throttleEvents(ctx context.Context, throttleDuration time.Duration, pool *safe.Pool, eventsChan <-chan interface{}) chan interface{} {
if throttleDuration == 0 {
return nil
}
@ -635,10 +633,10 @@ func throttleEvents(ctx context.Context, throttleDuration time.Duration, stop ch
// Run a goroutine that reads events from eventChan and does a non-blocking write to pendingEvent.
// This guarantees that writing to eventChan will never block,
// and that pendingEvent will have something in it if there's been an event since we read from that channel.
go func() {
pool.GoCtx(func(ctxPool context.Context) {
for {
select {
case <-stop:
case <-ctxPool.Done():
return
case nextEvent := <-eventsChan:
select {
@ -650,7 +648,7 @@ func throttleEvents(ctx context.Context, throttleDuration time.Duration, stop ch
}
}
}
}()
})
return eventsChanBuffered
}