Support for watching instead of polling Nomad

This commit is contained in:
Dan Everton 2024-09-26 23:56:04 +10:00 committed by GitHub
parent f8a78b3b25
commit fbf6757ce9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 214 additions and 22 deletions

View file

@ -10,6 +10,7 @@ import (
"github.com/cenkalti/backoff/v4"
"github.com/hashicorp/nomad/api"
"github.com/mitchellh/hashstructure"
"github.com/rs/zerolog/log"
ptypes "github.com/traefik/paerser/types"
"github.com/traefik/traefik/v3/pkg/config/dynamic"
@ -93,6 +94,8 @@ type Configuration struct {
ExposedByDefault bool `description:"Expose Nomad services by default." json:"exposedByDefault,omitempty" toml:"exposedByDefault,omitempty" yaml:"exposedByDefault,omitempty" export:"true"`
RefreshInterval ptypes.Duration `description:"Interval for polling Nomad API." json:"refreshInterval,omitempty" toml:"refreshInterval,omitempty" yaml:"refreshInterval,omitempty" export:"true"`
AllowEmptyServices bool `description:"Allow the creation of services without endpoints." json:"allowEmptyServices,omitempty" toml:"allowEmptyServices,omitempty" yaml:"allowEmptyServices,omitempty" export:"true"`
Watch bool `description:"Watch Nomad Service events." json:"watch,omitempty" toml:"watch,omitempty" yaml:"watch,omitempty" export:"true"`
ThrottleDuration ptypes.Duration `description:"Watch throttle duration." json:"throttleDuration,omitempty" toml:"throttleDuration,omitempty" yaml:"throttleDuration,omitempty" export:"true"`
}
// SetDefaults sets the default values for the Nomad Traefik Provider Configuration.
@ -117,7 +120,7 @@ func (c *Configuration) SetDefaults() {
c.ExposedByDefault = true
c.RefreshInterval = ptypes.Duration(15 * time.Second)
c.DefaultRule = defaultTemplateRule
c.AllowEmptyServices = false
c.ThrottleDuration = ptypes.Duration(0)
}
type EndpointConfig struct {
@ -139,6 +142,8 @@ type Provider struct {
namespace string
client *api.Client // client for Nomad API
defaultRuleTpl *template.Template // default routing rule
lastConfiguration safe.Safe
}
// SetDefaults sets the default values for the Nomad Traefik Provider.
@ -152,6 +157,10 @@ func (p *Provider) Init() error {
return errors.New("wildcard namespace not supported")
}
if p.ThrottleDuration > 0 && !p.Watch {
return errors.New("throttle duration should not be used with polling mode")
}
defaultRuleTpl, err := provider.MakeDefaultRuleTemplate(p.DefaultRule, nil)
if err != nil {
return fmt.Errorf("error while parsing default rule: %w", err)
@ -183,32 +192,63 @@ func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe.
ctx, cancel := context.WithCancel(ctxLog)
defer cancel()
// load initial configuration
if err := p.loadConfiguration(ctx, configurationChan); err != nil {
return fmt.Errorf("failed to load initial nomad services: %w", err)
serviceEventsChan, err := p.pollOrWatch(ctx)
if err != nil {
return fmt.Errorf("watching Nomad events: %w", err)
}
// issue periodic refreshes in the background
// (Nomad does not support Watch style observations)
ticker := time.NewTicker(time.Duration(p.RefreshInterval))
defer ticker.Stop()
throttleDuration := time.Duration(p.ThrottleDuration)
throttledChan := throttleEvents(ctx, throttleDuration, pool, serviceEventsChan)
if throttledChan != nil {
serviceEventsChan = throttledChan
}
conf, err := p.loadConfiguration(ctx)
if err != nil {
return fmt.Errorf("loading configuration: %w", err)
}
if _, err := p.updateLastConfiguration(conf); err != nil {
return fmt.Errorf("updating last configuration: %w", err)
}
configurationChan <- dynamic.Message{
ProviderName: p.name,
Configuration: conf,
}
// enter loop where we wait for and respond to notifications
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
}
// load services due to refresh
if err := p.loadConfiguration(ctx, configurationChan); err != nil {
return fmt.Errorf("failed to refresh nomad services: %w", err)
case event := <-serviceEventsChan:
conf, err = p.loadConfiguration(ctx)
if err != nil {
return fmt.Errorf("loading configuration: %w", err)
}
updated, err := p.updateLastConfiguration(conf)
if err != nil {
return fmt.Errorf("updating last configuration: %w", err)
}
if !updated {
logger.Debug().Msgf("Skipping Nomad event %d with no changes", event.Index)
continue
}
configurationChan <- dynamic.Message{
ProviderName: p.name,
Configuration: conf,
}
// If we're throttling, we sleep here for the throttle duration to
// enforce that we don't refresh faster than our throttle. time.Sleep
// returns immediately if p.ThrottleDuration is 0 (no throttle).
time.Sleep(throttleDuration)
}
}
}
failure := func(err error, d time.Duration) {
logger.Error().Err(err).Msgf("Provider connection error, retrying in %s", d)
logger.Error().Err(err).Msgf("Loading configuration, retrying in %s", d)
}
if retryErr := backoff.RetryNotify(
@ -223,27 +263,70 @@ func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe.
return nil
}
func (p *Provider) loadConfiguration(ctx context.Context, configurationC chan<- dynamic.Message) error {
func (p *Provider) pollOrWatch(ctx context.Context) (<-chan *api.Events, error) {
if p.Watch {
return p.client.EventStream().Stream(ctx,
map[api.Topic][]string{
api.TopicService: {"*"},
},
0,
&api.QueryOptions{
Namespace: p.namespace,
},
)
}
serviceEventsChan := make(chan *api.Events, 1)
go func() {
ticker := time.NewTicker(time.Duration(p.RefreshInterval))
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case t := <-ticker.C:
serviceEventsChan <- &api.Events{
Index: uint64(t.UnixNano()),
}
}
}
}()
return serviceEventsChan, nil
}
func (p *Provider) loadConfiguration(ctx context.Context) (*dynamic.Configuration, error) {
var items []item
var err error
if p.AllowEmptyServices {
items, err = p.getNomadServiceDataWithEmptyServices(ctx)
if err != nil {
return err
return nil, err
}
} else {
items, err = p.getNomadServiceData(ctx)
if err != nil {
return err
return nil, err
}
}
configurationC <- dynamic.Message{
ProviderName: p.name,
Configuration: p.buildConfig(ctx, items),
return p.buildConfig(ctx, items), nil
}
func (p *Provider) updateLastConfiguration(conf *dynamic.Configuration) (bool, error) {
confHash, err := hashstructure.Hash(conf, nil)
if err != nil {
return false, fmt.Errorf("hashing the configuration: %w", err)
}
return nil
if p.lastConfiguration.Get() == confHash {
return false, nil
}
p.lastConfiguration.Set(confHash)
return true, nil
}
func (p *Provider) getNomadServiceData(ctx context.Context) ([]item, error) {
@ -453,3 +536,38 @@ func createClient(namespace string, endpoint *EndpointConfig) (*api.Client, erro
return api.NewClient(&config)
}
// Copied from the Kubernetes provider.
func throttleEvents(ctx context.Context, throttleDuration time.Duration, pool *safe.Pool, eventsChan <-chan *api.Events) chan *api.Events {
if throttleDuration == 0 {
return nil
}
// Create a buffered channel to hold the pending event (if we're delaying processing the event due to throttling).
eventsChanBuffered := make(chan *api.Events, 1)
// Run a goroutine that reads events from eventChan and does a
// non-blocking write to pendingEvent. This guarantees that writing to
// eventChan will never block, and that pendingEvent will have
// something in it if there's been an event since we read from that channel.
pool.GoCtx(func(ctxPool context.Context) {
for {
select {
case <-ctxPool.Done():
return
case nextEvent := <-eventsChan:
select {
case eventsChanBuffered <- nextEvent:
default:
// We already have an event in eventsChanBuffered, so we'll
// do a refresh as soon as our throttle allows us to. It's fine
// to drop the event and keep whatever's in the buffer -- we
// don't do different things for different events.
log.Ctx(ctx).Debug().Msgf("Dropping event %d due to throttling", nextEvent.Index)
}
}
}
})
return eventsChanBuffered
}