Implement Traefik provider for Nomad orchestrator

This commit is contained in:
Seth Hoenig 2022-06-10 11:32:08 -05:00 committed by GitHub
parent becee5e393
commit aa0b5466a9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 4328 additions and 7 deletions

View file

@ -114,6 +114,10 @@ func NewProviderAggregator(conf static.Providers) ProviderAggregator {
}
}
if conf.Nomad != nil {
p.quietAddProvider(conf.Nomad)
}
if conf.Consul != nil {
for _, pvd := range conf.Consul.BuildProviders() {
p.quietAddProvider(pvd)

View file

@ -0,0 +1,267 @@
package nomad
import (
"context"
"errors"
"fmt"
"net"
"strconv"
"github.com/traefik/traefik/v2/pkg/config/dynamic"
"github.com/traefik/traefik/v2/pkg/config/label"
"github.com/traefik/traefik/v2/pkg/log"
"github.com/traefik/traefik/v2/pkg/provider"
"github.com/traefik/traefik/v2/pkg/provider/constraints"
)
func (p *Provider) buildConfig(ctx context.Context, items []item) *dynamic.Configuration {
configurations := make(map[string]*dynamic.Configuration)
for _, i := range items {
svcName := provider.Normalize(i.Node + "-" + i.Name + "-" + i.ID)
ctxSvc := log.With(ctx, log.Str(log.ServiceName, svcName))
if !p.keepItem(ctxSvc, i) {
continue
}
logger := log.FromContext(ctx)
labels := tagsToLabels(i.Tags, p.Prefix)
config, err := label.DecodeConfiguration(labels)
if err != nil {
logger.Errorf("Failed to decode configuration: %v", err)
continue
}
var tcpOrUDP bool
if len(config.TCP.Routers) > 0 || len(config.TCP.Services) > 0 {
tcpOrUDP = true
if err := p.buildTCPConfig(i, config.TCP); err != nil {
logger.Errorf("Failed to build TCP service configuration: %v", err)
continue
}
provider.BuildTCPRouterConfiguration(ctxSvc, config.TCP)
}
if len(config.UDP.Routers) > 0 || len(config.UDP.Services) > 0 {
tcpOrUDP = true
if err := p.buildUDPConfig(i, config.UDP); err != nil {
logger.Errorf("Failed to build UDP service configuration: %v", err)
continue
}
provider.BuildUDPRouterConfiguration(ctxSvc, config.UDP)
}
// tcp/udp, skip configuring http service
if tcpOrUDP && len(config.HTTP.Routers) == 0 &&
len(config.HTTP.Middlewares) == 0 &&
len(config.HTTP.Services) == 0 {
configurations[svcName] = config
continue
}
// configure http service
if err := p.buildServiceConfig(i, config.HTTP); err != nil {
logger.Errorf("Failed to build HTTP service configuration: %v", err)
continue
}
model := struct {
Name string
Labels map[string]string
}{
Name: i.Name,
Labels: labels,
}
provider.BuildRouterConfiguration(ctx, config.HTTP, provider.Normalize(i.Name), p.defaultRuleTpl, model)
configurations[svcName] = config
}
return provider.Merge(ctx, configurations)
}
func (p *Provider) buildTCPConfig(i item, configuration *dynamic.TCPConfiguration) error {
if len(configuration.Services) == 0 {
configuration.Services = make(map[string]*dynamic.TCPService)
lb := new(dynamic.TCPServersLoadBalancer)
lb.SetDefaults()
configuration.Services[provider.Normalize(i.Name)] = &dynamic.TCPService{
LoadBalancer: lb,
}
}
for _, service := range configuration.Services {
if err := p.addServerTCP(i, service.LoadBalancer); err != nil {
return err
}
}
return nil
}
func (p *Provider) buildUDPConfig(i item, configuration *dynamic.UDPConfiguration) error {
if len(configuration.Services) == 0 {
configuration.Services = make(map[string]*dynamic.UDPService)
configuration.Services[provider.Normalize(i.Name)] = &dynamic.UDPService{
LoadBalancer: new(dynamic.UDPServersLoadBalancer),
}
}
for _, service := range configuration.Services {
if err := p.addServerUDP(i, service.LoadBalancer); err != nil {
return err
}
}
return nil
}
func (p *Provider) buildServiceConfig(i item, configuration *dynamic.HTTPConfiguration) error {
if len(configuration.Services) == 0 {
configuration.Services = make(map[string]*dynamic.Service)
lb := new(dynamic.ServersLoadBalancer)
lb.SetDefaults()
configuration.Services[provider.Normalize(i.Name)] = &dynamic.Service{
LoadBalancer: lb,
}
}
for _, service := range configuration.Services {
if err := p.addServer(i, service.LoadBalancer); err != nil {
return err
}
}
return nil
}
// TODO: check whether it is mandatory to filter again.
func (p *Provider) keepItem(ctx context.Context, i item) bool {
logger := log.FromContext(ctx)
if !i.ExtraConf.Enable {
logger.Debug("Filtering disabled item")
return false
}
matches, err := constraints.MatchTags(i.Tags, p.Constraints)
if err != nil {
logger.Errorf("Error matching constraint expressions: %v", err)
return false
}
if !matches {
logger.Debugf("Filtering out item due to constraints: %q", p.Constraints)
return false
}
// TODO: filter on health when that information exists (nomad 1.4+)
return true
}
func (p *Provider) addServerTCP(i item, lb *dynamic.TCPServersLoadBalancer) error {
if lb == nil {
return errors.New("load-balancer is missing")
}
var port string
if len(lb.Servers) > 0 {
port = lb.Servers[0].Port
}
if len(lb.Servers) == 0 {
lb.Servers = []dynamic.TCPServer{{}}
}
if i.Port != 0 && port == "" {
port = strconv.Itoa(i.Port)
}
lb.Servers[0].Port = ""
if port == "" {
return errors.New("port is missing")
}
if i.Address == "" {
return errors.New("address is missing")
}
lb.Servers[0].Address = net.JoinHostPort(i.Address, port)
return nil
}
func (p *Provider) addServerUDP(i item, lb *dynamic.UDPServersLoadBalancer) error {
if lb == nil {
return errors.New("load-balancer is missing")
}
var port string
if len(lb.Servers) > 0 {
port = lb.Servers[0].Port
}
if len(lb.Servers) == 0 {
lb.Servers = []dynamic.UDPServer{{}}
}
if i.Port != 0 && port == "" {
port = strconv.Itoa(i.Port)
}
lb.Servers[0].Port = ""
if port == "" {
return errors.New("port is missing")
}
if i.Address == "" {
return errors.New("address is missing")
}
lb.Servers[0].Address = net.JoinHostPort(i.Address, port)
return nil
}
func (p *Provider) addServer(i item, lb *dynamic.ServersLoadBalancer) error {
if lb == nil {
return errors.New("load-balancer is missing")
}
var port string
if len(lb.Servers) > 0 {
port = lb.Servers[0].Port
}
if len(lb.Servers) == 0 {
server := dynamic.Server{}
server.SetDefaults()
lb.Servers = []dynamic.Server{server}
}
if i.Port != 0 && port == "" {
port = strconv.Itoa(i.Port)
}
lb.Servers[0].Port = ""
if port == "" {
return errors.New("port is missing")
}
if i.Address == "" {
return errors.New("address is missing")
}
scheme := lb.Servers[0].Scheme
lb.Servers[0].Scheme = ""
lb.Servers[0].URL = fmt.Sprintf("%s://%s", scheme, net.JoinHostPort(i.Address, port))
return nil
}

File diff suppressed because it is too large Load diff

278
pkg/provider/nomad/nomad.go Normal file
View file

@ -0,0 +1,278 @@
package nomad
import (
"context"
"fmt"
"strings"
"text/template"
"time"
"github.com/cenkalti/backoff/v4"
"github.com/hashicorp/nomad/api"
ptypes "github.com/traefik/paerser/types"
"github.com/traefik/traefik/v2/pkg/config/dynamic"
"github.com/traefik/traefik/v2/pkg/job"
"github.com/traefik/traefik/v2/pkg/log"
"github.com/traefik/traefik/v2/pkg/provider"
"github.com/traefik/traefik/v2/pkg/provider/constraints"
"github.com/traefik/traefik/v2/pkg/safe"
"github.com/traefik/traefik/v2/pkg/types"
)
const (
// providerName is the name of this provider.
providerName = "nomad"
// defaultTemplateRule is the default template for the default rule.
defaultTemplateRule = "Host(`{{ normalize .Name }}`)"
// defaultPrefix is the default prefix used in tag values indicating the service
// should be consumed and exposed via traefik.
defaultPrefix = "traefik"
)
var _ provider.Provider = (*Provider)(nil)
type item struct {
ID string // service ID
Name string // service name
Namespace string // service namespace
Node string // node ID
Datacenter string // region
Address string // service address
Port int // service port
Tags []string // service tags
ExtraConf configuration // global options
}
// Provider holds configurations of the provider.
type Provider struct {
DefaultRule string `description:"Default rule." json:"defaultRule,omitempty" toml:"defaultRule,omitempty" yaml:"defaultRule,omitempty"`
Constraints string `description:"Constraints is an expression that Traefik matches against the Nomad service's tags to determine whether to create route(s) for that service." json:"constraints,omitempty" toml:"constraints,omitempty" yaml:"constraints,omitempty" export:"true"`
Endpoint *EndpointConfig `description:"Nomad endpoint settings" json:"endpoint,omitempty" toml:"endpoint,omitempty" yaml:"endpoint,omitempty" export:"true"`
Prefix string `description:"Prefix for nomad service tags." json:"prefix,omitempty" toml:"prefix,omitempty" yaml:"prefix,omitempty" export:"true"`
Stale bool `description:"Use stale consistency for catalog reads." json:"stale,omitempty" toml:"stale,omitempty" yaml:"stale,omitempty" export:"true"`
Namespace string `description:"Sets the Nomad namespace used to discover services." json:"namespace,omitempty" toml:"namespace,omitempty" yaml:"namespace,omitempty" export:"true"`
ExposedByDefault bool `description:"Expose Nomad services by default." json:"exposedByDefault,omitempty" toml:"exposedByDefault,omitempty" yaml:"exposedByDefault,omitempty" export:"true"`
RefreshInterval ptypes.Duration `description:"Interval for polling Nomad API." json:"refreshInterval,omitempty" toml:"refreshInterval,omitempty" yaml:"refreshInterval,omitempty" export:"true"`
client *api.Client // client for Nomad API
defaultRuleTpl *template.Template // default routing rule
}
type EndpointConfig struct {
// Address is the Nomad endpoint address, if empty it defaults to NOMAD_ADDR or "http://localhost:4646".
Address string `description:"The address of the Nomad server, including scheme and port." json:"address,omitempty" toml:"address,omitempty" yaml:"address,omitempty"`
// Region is the Nomad region, if empty it defaults to NOMAD_REGION or "global".
Region string `description:"Nomad region to use. If not provided, the local agent region is used." json:"region,omitempty" toml:"region,omitempty" yaml:"region,omitempty"`
// Token is the ACL token to connect with Nomad, if empty it defaults to NOMAD_TOKEN.
Token string `description:"Token is used to provide a per-request ACL token." json:"token,omitempty" toml:"token,omitempty" yaml:"token,omitempty" loggable:"false"`
TLS *types.ClientTLS `description:"Configure TLS." json:"tls,omitempty" toml:"tls,omitempty" yaml:"tls,omitempty" export:"true"`
EndpointWaitTime ptypes.Duration `description:"WaitTime limits how long a Watch will block. If not provided, the agent default values will be used" json:"endpointWaitTime,omitempty" toml:"endpointWaitTime,omitempty" yaml:"endpointWaitTime,omitempty" export:"true"`
}
// SetDefaults sets the default values for the Nomad Traefik Provider.
func (p *Provider) SetDefaults() {
p.Endpoint = &EndpointConfig{}
p.Prefix = defaultPrefix
p.ExposedByDefault = true
p.RefreshInterval = ptypes.Duration(15 * time.Second)
p.DefaultRule = defaultTemplateRule
}
// Init the Nomad Traefik Provider.
func (p *Provider) Init() error {
defaultRuleTpl, err := provider.MakeDefaultRuleTemplate(p.DefaultRule, nil)
if err != nil {
return fmt.Errorf("error while parsing default rule: %w", err)
}
p.defaultRuleTpl = defaultRuleTpl
return nil
}
// Provide allows the Nomad Traefik Provider to provide configurations to traefik
// using the given configuration channel.
func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe.Pool) error {
var err error
p.client, err = createClient(p.Namespace, p.Endpoint)
if err != nil {
return fmt.Errorf("failed to create nomad API client: %w", err)
}
pool.GoCtx(func(routineCtx context.Context) {
ctxLog := log.With(routineCtx, log.Str(log.ProviderName, providerName))
logger := log.FromContext(ctxLog)
operation := func() error {
ctx, cancel := context.WithCancel(ctxLog)
defer cancel()
// load initial configuration
if err := p.loadConfiguration(ctx, configurationChan); err != nil {
return fmt.Errorf("failed to load initial nomad services: %w", err)
}
// issue periodic refreshes in the background
// (Nomad does not support Watch style observations)
ticker := time.NewTicker(time.Duration(p.RefreshInterval))
defer ticker.Stop()
// enter loop where we wait for and respond to notifications
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
}
// load services due to refresh
if err := p.loadConfiguration(ctx, configurationChan); err != nil {
return fmt.Errorf("failed to refresh nomad services: %w", err)
}
}
}
failure := func(err error, d time.Duration) {
logger.Errorf("Provider connection error %+v, retrying in %s", err, d)
}
if retryErr := backoff.RetryNotify(
safe.OperationWithRecover(operation),
backoff.WithContext(job.NewBackOff(backoff.NewExponentialBackOff()), ctxLog),
failure,
); retryErr != nil {
logger.Errorf("Cannot connect to Nomad server %+v", retryErr)
}
})
return nil
}
func (p *Provider) loadConfiguration(ctx context.Context, configurationC chan<- dynamic.Message) error {
items, err := p.getNomadServiceData(ctx)
if err != nil {
return err
}
configurationC <- dynamic.Message{
ProviderName: providerName,
Configuration: p.buildConfig(ctx, items),
}
return nil
}
func createClient(namespace string, endpoint *EndpointConfig) (*api.Client, error) {
config := api.Config{
Address: endpoint.Address,
Region: endpoint.Region,
WaitTime: time.Duration(endpoint.EndpointWaitTime),
Namespace: namespace,
}
if endpoint.TLS != nil {
config.TLSConfig = &api.TLSConfig{
CACert: endpoint.TLS.CA,
ClientCert: endpoint.TLS.Cert,
ClientKey: endpoint.TLS.Key,
Insecure: endpoint.TLS.InsecureSkipVerify,
}
}
return api.NewClient(&config)
}
// configuration contains information from the service's tags that are globals
// (not specific to the dynamic configuration).
type configuration struct {
Enable bool // <prefix>.enable
}
// globalConfig returns a configuration with settings not specific to the dynamic configuration (i.e. "<prefix>.enable").
func (p *Provider) globalConfig(tags []string) configuration {
enabled := p.ExposedByDefault
labels := tagsToLabels(tags, p.Prefix)
if v, exists := labels["traefik.enable"]; exists {
enabled = strings.EqualFold(v, "true")
}
return configuration{Enable: enabled}
}
func (p *Provider) getNomadServiceData(ctx context.Context) ([]item, error) {
// first, get list of service stubs
opts := &api.QueryOptions{AllowStale: p.Stale}
opts = opts.WithContext(ctx)
stubs, _, err := p.client.Services().List(opts)
if err != nil {
return nil, err
}
var items []item
for _, stub := range stubs {
for _, service := range stub.Services {
logger := log.FromContext(log.With(ctx, log.Str("serviceName", service.ServiceName)))
globalCfg := p.globalConfig(service.Tags)
if !globalCfg.Enable {
logger.Debug("Filter Nomad service that is not enabled")
continue
}
matches, err := constraints.MatchTags(service.Tags, p.Constraints)
if err != nil {
logger.Errorf("Error matching constraint expressions: %v", err)
continue
}
if !matches {
logger.Debugf("Filter Nomad service not matching constraints: %q", p.Constraints)
continue
}
instances, err := p.fetchService(ctx, service.ServiceName)
if err != nil {
return nil, err
}
for _, i := range instances {
items = append(items, item{
ID: i.ID,
Name: i.ServiceName,
Namespace: i.Namespace,
Node: i.NodeID,
Datacenter: i.Datacenter,
Address: i.Address,
Port: i.Port,
Tags: i.Tags,
ExtraConf: p.globalConfig(i.Tags),
})
}
}
}
return items, nil
}
// fetchService queries Nomad API for services matching name,
// that also have the <prefix>.enable=true set in its tags.
func (p *Provider) fetchService(ctx context.Context, name string) ([]*api.ServiceRegistration, error) {
var tagFilter string
if !p.ExposedByDefault {
tagFilter = fmt.Sprintf(`Tags contains %q`, fmt.Sprintf("%s.enable=true", p.Prefix))
}
// TODO: Nomad currently (v1.3.0) does not support health checks,
// and as such does not yet return health status information.
// When it does, refactor this section to include health status.
opts := &api.QueryOptions{AllowStale: p.Stale, Filter: tagFilter}
opts = opts.WithContext(ctx)
services, _, err := p.client.Services().Get(name, opts)
if err != nil {
return nil, fmt.Errorf("failed to fetch services: %w", err)
}
return services, nil
}

View file

@ -0,0 +1,169 @@
package nomad
import (
"context"
"net/http"
"net/http/httptest"
"strings"
"testing"
"github.com/stretchr/testify/require"
)
func Test_globalConfig(t *testing.T) {
cases := []struct {
Name string
Prefix string
Tags []string
ExposedByDefault bool
exp configuration
}{
{
Name: "expose_by_default_no_tags",
Prefix: "traefik",
Tags: nil,
ExposedByDefault: true,
exp: configuration{Enable: true},
},
{
Name: "not_expose_by_default_no_tags",
Prefix: "traefik",
Tags: nil,
ExposedByDefault: false,
exp: configuration{Enable: false},
},
{
Name: "expose_by_default_tags_enable",
Prefix: "traefik",
Tags: []string{"traefik.enable=true"},
ExposedByDefault: true,
exp: configuration{Enable: true},
},
{
Name: "expose_by_default_tags_disable",
Prefix: "traefik",
Tags: []string{"traefik.enable=false"},
ExposedByDefault: true,
exp: configuration{Enable: false},
},
{
Name: "expose_by_default_tags_enable_custom_prefix",
Prefix: "custom",
Tags: []string{"custom.enable=true"},
ExposedByDefault: true,
exp: configuration{Enable: true},
},
{
Name: "expose_by_default_tags_disable_custom_prefix",
Prefix: "custom",
Tags: []string{"custom.enable=false"},
ExposedByDefault: true,
exp: configuration{Enable: false},
},
}
for _, test := range cases {
t.Run(test.Name, func(t *testing.T) {
p := Provider{ExposedByDefault: test.ExposedByDefault, Prefix: test.Prefix}
result := p.globalConfig(test.Tags)
require.Equal(t, test.exp, result)
})
}
}
func Test_getNomadServiceData(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch {
case strings.HasSuffix(r.RequestURI, "/v1/services"):
_, _ = w.Write([]byte(services))
case strings.HasSuffix(r.RequestURI, "/v1/service/redis"):
_, _ = w.Write([]byte(redis))
case strings.HasSuffix(r.RequestURI, "/v1/service/hello-nomad"):
_, _ = w.Write([]byte(hello))
}
}))
t.Cleanup(ts.Close)
p := new(Provider)
p.SetDefaults()
p.Endpoint.Address = ts.URL
err := p.Init()
require.NoError(t, err)
// fudge client, avoid starting up via Provide
p.client, err = createClient(p.Namespace, p.Endpoint)
require.NoError(t, err)
// make the query for services
items, err := p.getNomadServiceData(context.TODO())
require.NoError(t, err)
require.Len(t, items, 2)
}
const services = `
[
{
"Namespace": "default",
"Services": [
{
"ServiceName": "redis",
"Tags": [
"traefik.enable=true"
]
},
{
"ServiceName": "hello-nomad",
"Tags": [
"traefik.enable=true",
"traefik.http.routers.hellon.entrypoints=web",
"traefik.http.routers.hellon.service=hello-nomad"
]
}
]
}
]
`
const redis = `
[
{
"Address": "127.0.0.1",
"AllocID": "07501480-8175-8071-7da6-133bd1ff890f",
"CreateIndex": 46,
"Datacenter": "dc1",
"ID": "_nomad-task-07501480-8175-8071-7da6-133bd1ff890f-group-redis-redis-redis",
"JobID": "echo",
"ModifyIndex": 46,
"Namespace": "default",
"NodeID": "6d7f412e-e7ff-2e66-d47b-867b0e9d8726",
"Port": 30826,
"ServiceName": "redis",
"Tags": [
"traefik.enable=true"
]
}
]
`
const hello = `
[
{
"Address": "127.0.0.1",
"AllocID": "71a63a80-a98a-93ee-4fd7-73b808577c20",
"CreateIndex": 18,
"Datacenter": "dc1",
"ID": "_nomad-task-71a63a80-a98a-93ee-4fd7-73b808577c20-group-hello-nomad-hello-nomad-http",
"JobID": "echo",
"ModifyIndex": 18,
"Namespace": "default",
"NodeID": "6d7f412e-e7ff-2e66-d47b-867b0e9d8726",
"Port": 20627,
"ServiceName": "hello-nomad",
"Tags": [
"traefik.enable=true",
"traefik.http.routers.hellon.entrypoints=web",
"traefik.http.routers.hellon.service=hello-nomad"
]
}
]
`

19
pkg/provider/nomad/tag.go Normal file
View file

@ -0,0 +1,19 @@
package nomad
import (
"strings"
)
func tagsToLabels(tags []string, prefix string) map[string]string {
labels := make(map[string]string, len(tags))
for _, tag := range tags {
if strings.HasPrefix(tag, prefix) {
if parts := strings.SplitN(tag, "=", 2); len(parts) == 2 {
left, right := strings.TrimSpace(parts[0]), strings.TrimSpace(parts[1])
key := "traefik." + strings.TrimPrefix(left, prefix+".")
labels[key] = right
}
}
}
return labels
}

View file

@ -0,0 +1,109 @@
package nomad
import (
"testing"
"github.com/stretchr/testify/assert"
)
func Test_tagsToLabels(t *testing.T) {
testCases := []struct {
desc string
tags []string
prefix string
expected map[string]string
}{
{
desc: "no tags",
tags: []string{},
prefix: "traefik",
expected: map[string]string{},
},
{
desc: "minimal global config",
tags: []string{"traefik.enable=false"},
prefix: "traefik",
expected: map[string]string{
"traefik.enable": "false",
},
},
{
desc: "config with domain",
tags: []string{
"traefik.enable=true",
"traefik.domain=example.com",
},
prefix: "traefik",
expected: map[string]string{
"traefik.enable": "true",
"traefik.domain": "example.com",
},
},
{
desc: "config with custom prefix",
tags: []string{
"custom.enable=true",
"custom.domain=example.com",
},
prefix: "custom",
expected: map[string]string{
"traefik.enable": "true",
"traefik.domain": "example.com",
},
},
{
desc: "config with spaces in tags",
tags: []string{
"custom.enable = true",
"custom.domain = example.com",
},
prefix: "custom",
expected: map[string]string{
"traefik.enable": "true",
"traefik.domain": "example.com",
},
},
{
desc: "with a prefix",
prefix: "test",
tags: []string{
"test.aaa=01",
"test.bbb=02",
"ccc=03",
"test.ddd=04=to",
},
expected: map[string]string{
"traefik.aaa": "01",
"traefik.bbb": "02",
"traefik.ddd": "04=to",
},
},
{
desc: "with an empty prefix",
prefix: "",
tags: []string{
"test.aaa=01",
"test.bbb=02",
"ccc=03",
"test.ddd=04=to",
},
expected: map[string]string{
"traefik.test.aaa": "01",
"traefik.test.bbb": "02",
"traefik.ccc": "03",
"traefik.test.ddd": "04=to",
},
},
}
for _, test := range testCases {
test := test
t.Run(test.desc, func(t *testing.T) {
t.Parallel()
labels := tagsToLabels(test.tags, test.prefix)
assert.Equal(t, test.expected, labels)
})
}
}