Add AWS ECS provider

* add ecs provider

* add ecs docs

* fix test after rebase

* add provider icon

* add missing addProvider call

* Fix for review

* Fix documentation

* Fix for review

* Fix documentation

* fix ctx usage

* autoDiscoverClusters setDefaults false

* Fix for review

* review: doc.

* Fix for review: add ctx in backoff retry

* review: linter.

Co-authored-by: Michael <michael.matur@gmail.com>
Co-authored-by: romain <romain@containo.us>
Co-authored-by: Fernandez Ludovic <ludovic@containo.us>
This commit is contained in:
Alessandro Chitolina 2020-07-15 16:28:04 +02:00 committed by GitHub
parent 6e4f5821dc
commit 285ded6e49
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 4348 additions and 0 deletions

View file

@ -49,6 +49,10 @@ func NewProviderAggregator(conf static.Providers) ProviderAggregator {
p.quietAddProvider(conf.Rancher)
}
if conf.Ecs != nil {
p.quietAddProvider(conf.Ecs)
}
if conf.ConsulCatalog != nil {
p.quietAddProvider(conf.ConsulCatalog)
}

View file

@ -0,0 +1,79 @@
package ecs
import "github.com/aws/aws-sdk-go/service/ecs"
func instance(ops ...func(*ecsInstance)) ecsInstance {
e := &ecsInstance{
containerDefinition: &ecs.ContainerDefinition{},
}
for _, op := range ops {
op(e)
}
return *e
}
func name(name string) func(*ecsInstance) {
return func(e *ecsInstance) {
e.Name = name
}
}
func id(id string) func(*ecsInstance) {
return func(e *ecsInstance) {
e.ID = id
}
}
func iMachine(opts ...func(*machine)) func(*ecsInstance) {
return func(e *ecsInstance) {
e.machine = &machine{}
for _, opt := range opts {
opt(e.machine)
}
}
}
func mState(state string) func(*machine) {
return func(m *machine) {
m.state = state
}
}
func mPrivateIP(ip string) func(*machine) {
return func(m *machine) {
m.privateIP = ip
}
}
func mHealthStatus(status string) func(*machine) {
return func(m *machine) {
m.healthStatus = status
}
}
func mPorts(opts ...func(*portMapping)) func(*machine) {
return func(m *machine) {
for _, opt := range opts {
p := &portMapping{}
opt(p)
m.ports = append(m.ports, *p)
}
}
}
func mPort(containerPort, hostPort int32, protocol string) func(*portMapping) {
return func(pm *portMapping) {
pm.containerPort = int64(containerPort)
pm.hostPort = int64(hostPort)
pm.protocol = protocol
}
}
func labels(labels map[string]string) func(*ecsInstance) {
return func(c *ecsInstance) {
c.Labels = labels
}
}

339
pkg/provider/ecs/config.go Normal file
View file

@ -0,0 +1,339 @@
package ecs
import (
"context"
"errors"
"fmt"
"net"
"strconv"
"strings"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/containous/traefik/v2/pkg/config/dynamic"
"github.com/containous/traefik/v2/pkg/config/label"
"github.com/containous/traefik/v2/pkg/log"
"github.com/containous/traefik/v2/pkg/provider"
"github.com/containous/traefik/v2/pkg/provider/constraints"
"github.com/docker/go-connections/nat"
)
func (p *Provider) buildConfiguration(ctx context.Context, instances []ecsInstance) *dynamic.Configuration {
configurations := make(map[string]*dynamic.Configuration)
for _, instance := range instances {
instanceName := getServiceName(instance) + "-" + instance.ID
ctxContainer := log.With(ctx, log.Str("ecs-instance", instanceName))
if !p.filterInstance(ctxContainer, instance) {
continue
}
logger := log.FromContext(ctxContainer)
confFromLabel, err := label.DecodeConfiguration(instance.Labels)
if err != nil {
logger.Error(err)
continue
}
var tcpOrUDP bool
if len(confFromLabel.TCP.Routers) > 0 || len(confFromLabel.TCP.Services) > 0 {
tcpOrUDP = true
err := p.buildTCPServiceConfiguration(instance, confFromLabel.TCP)
if err != nil {
logger.Error(err)
continue
}
provider.BuildTCPRouterConfiguration(ctxContainer, confFromLabel.TCP)
}
if len(confFromLabel.UDP.Routers) > 0 || len(confFromLabel.UDP.Services) > 0 {
tcpOrUDP = true
err := p.buildUDPServiceConfiguration(instance, confFromLabel.UDP)
if err != nil {
logger.Error(err)
continue
}
provider.BuildUDPRouterConfiguration(ctxContainer, confFromLabel.UDP)
}
if tcpOrUDP && len(confFromLabel.HTTP.Routers) == 0 &&
len(confFromLabel.HTTP.Middlewares) == 0 &&
len(confFromLabel.HTTP.Services) == 0 {
configurations[instanceName] = confFromLabel
continue
}
err = p.buildServiceConfiguration(ctxContainer, instance, confFromLabel.HTTP)
if err != nil {
logger.Error(err)
continue
}
serviceName := getServiceName(instance)
model := struct {
Name string
Labels map[string]string
}{
Name: serviceName,
Labels: instance.Labels,
}
provider.BuildRouterConfiguration(ctx, confFromLabel.HTTP, serviceName, p.defaultRuleTpl, model)
configurations[instanceName] = confFromLabel
}
return provider.Merge(ctx, configurations)
}
func (p *Provider) buildTCPServiceConfiguration(instance ecsInstance, configuration *dynamic.TCPConfiguration) error {
serviceName := getServiceName(instance)
if len(configuration.Services) == 0 {
configuration.Services = make(map[string]*dynamic.TCPService)
lb := &dynamic.TCPServersLoadBalancer{}
lb.SetDefaults()
configuration.Services[serviceName] = &dynamic.TCPService{
LoadBalancer: lb,
}
}
for name, service := range configuration.Services {
err := p.addServerTCP(instance, service.LoadBalancer)
if err != nil {
return fmt.Errorf("service %q error: %w", name, err)
}
}
return nil
}
func (p *Provider) buildUDPServiceConfiguration(instance ecsInstance, configuration *dynamic.UDPConfiguration) error {
serviceName := getServiceName(instance)
if len(configuration.Services) == 0 {
configuration.Services = make(map[string]*dynamic.UDPService)
lb := &dynamic.UDPServersLoadBalancer{}
configuration.Services[serviceName] = &dynamic.UDPService{
LoadBalancer: lb,
}
}
for name, service := range configuration.Services {
err := p.addServerUDP(instance, service.LoadBalancer)
if err != nil {
return fmt.Errorf("service %q error: %w", name, err)
}
}
return nil
}
func (p *Provider) buildServiceConfiguration(_ context.Context, instance ecsInstance, configuration *dynamic.HTTPConfiguration) error {
serviceName := getServiceName(instance)
if len(configuration.Services) == 0 {
configuration.Services = make(map[string]*dynamic.Service)
lb := &dynamic.ServersLoadBalancer{}
lb.SetDefaults()
configuration.Services[serviceName] = &dynamic.Service{
LoadBalancer: lb,
}
}
for name, service := range configuration.Services {
err := p.addServer(instance, service.LoadBalancer)
if err != nil {
return fmt.Errorf("service %q error: %w", name, err)
}
}
return nil
}
func (p *Provider) filterInstance(ctx context.Context, instance ecsInstance) bool {
logger := log.FromContext(ctx)
if instance.machine == nil {
logger.Debug("Filtering ecs instance with nil machine")
return false
}
if strings.ToLower(instance.machine.state) != ec2.InstanceStateNameRunning {
logger.Debugf("Filtering ecs instance with an incorrect state %s (%s) (state = %s)", instance.Name, instance.ID, instance.machine.state)
return false
}
if instance.machine.healthStatus == "UNHEALTHY" {
logger.Debugf("Filtering unhealthy ecs instance %s (%s)", instance.Name, instance.ID)
return false
}
if len(instance.machine.privateIP) == 0 {
logger.Debugf("Filtering ecs instance without an ip address %s (%s)", instance.Name, instance.ID)
return false
}
if !instance.ExtraConf.Enable {
logger.Debugf("Filtering disabled ecs instance %s (%s)", instance.Name, instance.ID)
return false
}
matches, err := constraints.MatchLabels(instance.Labels, p.Constraints)
if err != nil {
logger.Errorf("Error matching constraints expression: %v", err)
return false
}
if !matches {
logger.Debugf("Container pruned by constraint expression: %q", p.Constraints)
return false
}
return true
}
func (p *Provider) addServerTCP(instance ecsInstance, loadBalancer *dynamic.TCPServersLoadBalancer) error {
if loadBalancer == nil {
return errors.New("load-balancer is not defined")
}
var serverPort string
if len(loadBalancer.Servers) > 0 {
serverPort = loadBalancer.Servers[0].Port
loadBalancer.Servers[0].Port = ""
}
ip, port, err := p.getIPPort(instance, serverPort)
if err != nil {
return err
}
if len(loadBalancer.Servers) == 0 {
server := dynamic.TCPServer{}
loadBalancer.Servers = []dynamic.TCPServer{server}
}
if port == "" {
return errors.New("port is missing")
}
loadBalancer.Servers[0].Address = net.JoinHostPort(ip, port)
return nil
}
func (p *Provider) addServerUDP(instance ecsInstance, loadBalancer *dynamic.UDPServersLoadBalancer) error {
if loadBalancer == nil {
return errors.New("load-balancer is not defined")
}
var serverPort string
if len(loadBalancer.Servers) > 0 {
serverPort = loadBalancer.Servers[0].Port
loadBalancer.Servers[0].Port = ""
}
ip, port, err := p.getIPPort(instance, serverPort)
if err != nil {
return err
}
if len(loadBalancer.Servers) == 0 {
server := dynamic.UDPServer{}
loadBalancer.Servers = []dynamic.UDPServer{server}
}
if port == "" {
return errors.New("port is missing")
}
loadBalancer.Servers[0].Address = net.JoinHostPort(ip, port)
return nil
}
func (p *Provider) addServer(instance ecsInstance, loadBalancer *dynamic.ServersLoadBalancer) error {
if loadBalancer == nil {
return errors.New("load-balancer is not defined")
}
var serverPort string
if len(loadBalancer.Servers) > 0 {
serverPort = loadBalancer.Servers[0].Port
loadBalancer.Servers[0].Port = ""
}
ip, port, err := p.getIPPort(instance, serverPort)
if err != nil {
return err
}
if len(loadBalancer.Servers) == 0 {
server := dynamic.Server{}
server.SetDefaults()
loadBalancer.Servers = []dynamic.Server{server}
}
if port == "" {
return errors.New("port is missing")
}
loadBalancer.Servers[0].URL = fmt.Sprintf("%s://%s", loadBalancer.Servers[0].Scheme, net.JoinHostPort(ip, port))
loadBalancer.Servers[0].Scheme = ""
return nil
}
func (p *Provider) getIPPort(instance ecsInstance, serverPort string) (string, string, error) {
var ip, port string
ip = p.getIPAddress(instance)
port = getPort(instance, serverPort)
if len(ip) == 0 {
return "", "", fmt.Errorf("unable to find the IP address for the instance %q: the server is ignored", instance.Name)
}
return ip, port, nil
}
func (p Provider) getIPAddress(instance ecsInstance) string {
return instance.machine.privateIP
}
func getPort(instance ecsInstance, serverPort string) string {
if len(serverPort) > 0 {
return serverPort
}
var ports []nat.Port
for _, port := range instance.machine.ports {
natPort, err := nat.NewPort(port.protocol, strconv.FormatInt(port.hostPort, 10))
if err != nil {
continue
}
ports = append(ports, natPort)
}
less := func(i, j nat.Port) bool {
return i.Int() < j.Int()
}
nat.Sort(ports, less)
if len(ports) > 0 {
min := ports[0]
return min.Port()
}
return ""
}
func getServiceName(instance ecsInstance) string {
return provider.Normalize(instance.Name)
}

File diff suppressed because it is too large Load diff

469
pkg/provider/ecs/ecs.go Normal file
View file

@ -0,0 +1,469 @@
package ecs
import (
"context"
"fmt"
"strings"
"text/template"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/defaults"
"github.com/aws/aws-sdk-go/aws/ec2metadata"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/ec2"
"github.com/aws/aws-sdk-go/service/ecs"
"github.com/cenkalti/backoff/v4"
"github.com/patrickmn/go-cache"
"github.com/containous/traefik/v2/pkg/config/dynamic"
"github.com/containous/traefik/v2/pkg/job"
"github.com/containous/traefik/v2/pkg/log"
"github.com/containous/traefik/v2/pkg/provider"
"github.com/containous/traefik/v2/pkg/safe"
)
// Provider holds configurations of the provider.
type Provider struct {
Constraints string `description:"Constraints is an expression that Traefik matches against the container's labels to determine whether to create any route for that container." json:"constraints,omitempty" toml:"constraints,omitempty" yaml:"constraints,omitempty" export:"true"`
ExposedByDefault bool `description:"Expose services by default" json:"exposedByDefault,omitempty" toml:"exposedByDefault,omitempty" yaml:"exposedByDefault,omitempty" export:"true"`
RefreshSeconds int `description:"Polling interval (in seconds)" json:"refreshSeconds,omitempty" toml:"refreshSeconds,omitempty" yaml:"refreshSeconds,omitempty" export:"true"`
DefaultRule string `description:"Default rule." json:"defaultRule,omitempty" toml:"defaultRule,omitempty" yaml:"defaultRule,omitempty"`
// Provider lookup parameters.
Clusters []string `description:"ECS Clusters name" json:"clusters,omitempty" toml:"clusters,omitempty" yaml:"clusters,omitempty" export:"true"`
AutoDiscoverClusters bool `description:"Auto discover cluster" json:"autoDiscoverClusters,omitempty" toml:"autoDiscoverClusters,omitempty" yaml:"autoDiscoverClusters,omitempty" export:"true"`
Region string `description:"The AWS region to use for requests" json:"region,omitempty" toml:"region,omitempty" yaml:"region,omitempty" export:"true"`
AccessKeyID string `description:"The AWS credentials access key to use for making requests" json:"accessKeyID,omitempty" toml:"accessKeyID,omitempty" yaml:"accessKeyID,omitempty"`
SecretAccessKey string `description:"The AWS credentials access key to use for making requests" json:"secretAccessKey,omitempty" toml:"secretAccessKey,omitempty" yaml:"secretAccessKey,omitempty"`
defaultRuleTpl *template.Template
}
type ecsInstance struct {
Name string
ID string
containerDefinition *ecs.ContainerDefinition
machine *machine
Labels map[string]string
ExtraConf configuration
}
type portMapping struct {
containerPort int64
hostPort int64
protocol string
}
type machine struct {
state string
privateIP string
ports []portMapping
healthStatus string
}
type awsClient struct {
ecs *ecs.ECS
ec2 *ec2.EC2
}
// DefaultTemplateRule The default template for the default rule.
const DefaultTemplateRule = "Host(`{{ normalize .Name }}`)"
var (
_ provider.Provider = (*Provider)(nil)
existingTaskDefCache = cache.New(30*time.Minute, 5*time.Minute)
)
// SetDefaults sets the default values.
func (p *Provider) SetDefaults() {
p.Clusters = []string{"default"}
p.AutoDiscoverClusters = false
p.ExposedByDefault = true
p.RefreshSeconds = 15
p.DefaultRule = DefaultTemplateRule
}
// Init the provider.
func (p *Provider) Init() error {
defaultRuleTpl, err := provider.MakeDefaultRuleTemplate(p.DefaultRule, nil)
if err != nil {
return fmt.Errorf("error while parsing default rule: %w", err)
}
p.defaultRuleTpl = defaultRuleTpl
return nil
}
func (p *Provider) createClient(logger log.Logger) (*awsClient, error) {
sess, err := session.NewSession()
if err != nil {
return nil, err
}
ec2meta := ec2metadata.New(sess)
if p.Region == "" {
logger.Infoln("No EC2 region provided, querying instance metadata endpoint...")
identity, err := ec2meta.GetInstanceIdentityDocument()
if err != nil {
return nil, err
}
p.Region = identity.Region
}
cfg := &aws.Config{
Region: &p.Region,
Credentials: credentials.NewChainCredentials(
[]credentials.Provider{
&credentials.StaticProvider{
Value: credentials.Value{
AccessKeyID: p.AccessKeyID,
SecretAccessKey: p.SecretAccessKey,
},
},
&credentials.EnvProvider{},
&credentials.SharedCredentialsProvider{},
defaults.RemoteCredProvider(*(defaults.Config()), defaults.Handlers()),
}),
}
cfg.WithLogger(aws.LoggerFunc(func(args ...interface{}) {
logger.Debug(args...)
}))
return &awsClient{
ecs.New(sess, cfg),
ec2.New(sess, cfg),
}, nil
}
// Provide configuration to traefik from ECS.
func (p Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe.Pool) error {
pool.GoCtx(func(routineCtx context.Context) {
ctxLog := log.With(routineCtx, log.Str(log.ProviderName, "ecs"))
logger := log.FromContext(ctxLog)
operation := func() error {
awsClient, err := p.createClient(logger)
if err != nil {
return err
}
configuration, err := p.loadECSConfig(ctxLog, awsClient)
if err != nil {
return err
}
configurationChan <- dynamic.Message{
ProviderName: "ecs",
Configuration: configuration,
}
reload := time.NewTicker(time.Second * time.Duration(p.RefreshSeconds))
defer reload.Stop()
for {
select {
case <-reload.C:
configuration, err := p.loadECSConfig(ctxLog, awsClient)
if err != nil {
logger.Errorf("Failed to load ECS configuration, error %s", err)
return err
}
configurationChan <- dynamic.Message{
ProviderName: "ecs",
Configuration: configuration,
}
case <-routineCtx.Done():
return nil
}
}
}
notify := func(err error, time time.Duration) {
logger.Errorf("Provider connection error %+v, retrying in %s", err, time)
}
err := backoff.RetryNotify(safe.OperationWithRecover(operation), backoff.WithContext(job.NewBackOff(backoff.NewExponentialBackOff()), routineCtx), notify)
if err != nil {
logger.Errorf("Cannot connect to Provider api %+v", err)
}
})
return nil
}
// Find all running Provider tasks in a cluster, also collect the task definitions (for docker labels)
// and the EC2 instance data.
func (p *Provider) listInstances(ctx context.Context, client *awsClient) ([]ecsInstance, error) {
logger := log.FromContext(ctx)
var clustersArn []*string
var clusters []string
if p.AutoDiscoverClusters {
input := &ecs.ListClustersInput{}
for {
result, err := client.ecs.ListClusters(input)
if err != nil {
return nil, err
}
if result != nil {
clustersArn = append(clustersArn, result.ClusterArns...)
input.NextToken = result.NextToken
if result.NextToken == nil {
break
}
} else {
break
}
}
for _, cArn := range clustersArn {
clusters = append(clusters, *cArn)
}
} else {
clusters = p.Clusters
}
var instances []ecsInstance
logger.Debugf("ECS Clusters: %s", clusters)
for _, c := range clusters {
input := &ecs.ListTasksInput{
Cluster: &c,
DesiredStatus: aws.String(ecs.DesiredStatusRunning),
}
tasks := make(map[string]*ecs.Task)
err := client.ecs.ListTasksPagesWithContext(ctx, input, func(page *ecs.ListTasksOutput, lastPage bool) bool {
if len(page.TaskArns) > 0 {
resp, err := client.ecs.DescribeTasksWithContext(ctx, &ecs.DescribeTasksInput{
Tasks: page.TaskArns,
Cluster: &c,
})
if err != nil {
logger.Errorf("Unable to describe tasks for %v", page.TaskArns)
} else {
for _, t := range resp.Tasks {
if aws.StringValue(t.LastStatus) == ecs.DesiredStatusRunning {
tasks[aws.StringValue(t.TaskArn)] = t
}
}
}
}
return !lastPage
})
if err != nil {
logger.Error("Unable to list tasks")
return nil, err
}
// Skip to the next cluster if there are no tasks found on
// this cluster.
if len(tasks) == 0 {
continue
}
ec2Instances, err := p.lookupEc2Instances(ctx, client, &c, tasks)
if err != nil {
return nil, err
}
taskDefinitions, err := p.lookupTaskDefinitions(ctx, client, tasks)
if err != nil {
return nil, err
}
for key, task := range tasks {
containerInstance := ec2Instances[aws.StringValue(task.ContainerInstanceArn)]
taskDef := taskDefinitions[key]
for _, container := range task.Containers {
var containerDefinition *ecs.ContainerDefinition
for _, def := range taskDef.ContainerDefinitions {
if aws.StringValue(container.Name) == aws.StringValue(def.Name) {
containerDefinition = def
break
}
}
if containerDefinition == nil {
logger.Debugf("Unable to find container definition for %s", aws.StringValue(container.Name))
continue
}
var mach *machine
if len(task.Attachments) != 0 {
var ports []portMapping
for _, mapping := range containerDefinition.PortMappings {
if mapping != nil {
protocol := "TCP"
if aws.StringValue(mapping.Protocol) == "udp" {
protocol = "UDP"
}
ports = append(ports, portMapping{
hostPort: aws.Int64Value(mapping.HostPort),
containerPort: aws.Int64Value(mapping.ContainerPort),
protocol: protocol,
})
}
}
mach = &machine{
privateIP: aws.StringValue(container.NetworkInterfaces[0].PrivateIpv4Address),
ports: ports,
state: aws.StringValue(task.LastStatus),
healthStatus: aws.StringValue(task.HealthStatus),
}
} else {
if containerInstance == nil {
logger.Errorf("Unable to find container instance information for %s", aws.StringValue(container.Name))
continue
}
var ports []portMapping
for _, mapping := range container.NetworkBindings {
if mapping != nil {
ports = append(ports, portMapping{
hostPort: aws.Int64Value(mapping.HostPort),
containerPort: aws.Int64Value(mapping.ContainerPort),
})
}
}
mach = &machine{
privateIP: aws.StringValue(containerInstance.PrivateIpAddress),
ports: ports,
state: aws.StringValue(containerInstance.State.Name),
}
}
instance := ecsInstance{
Name: fmt.Sprintf("%s-%s", strings.Replace(aws.StringValue(task.Group), ":", "-", 1), *container.Name),
ID: key[len(key)-12:],
containerDefinition: containerDefinition,
machine: mach,
Labels: aws.StringValueMap(containerDefinition.DockerLabels),
}
extraConf, err := p.getConfiguration(instance)
if err != nil {
log.FromContext(ctx).Errorf("Skip container %s: %w", getServiceName(instance), err)
continue
}
instance.ExtraConf = extraConf
instances = append(instances, instance)
}
}
}
return instances, nil
}
func (p *Provider) loadECSConfig(ctx context.Context, client *awsClient) (*dynamic.Configuration, error) {
instances, err := p.listInstances(ctx, client)
if err != nil {
return nil, err
}
return p.buildConfiguration(ctx, instances), nil
}
func (p *Provider) lookupEc2Instances(ctx context.Context, client *awsClient, clusterName *string, ecsDatas map[string]*ecs.Task) (map[string]*ec2.Instance, error) {
logger := log.FromContext(ctx)
instanceIds := make(map[string]string)
ec2Instances := make(map[string]*ec2.Instance)
var containerInstancesArns []*string
var instanceArns []*string
for _, task := range ecsDatas {
if task.ContainerInstanceArn != nil {
containerInstancesArns = append(containerInstancesArns, task.ContainerInstanceArn)
}
}
for _, arns := range p.chunkIDs(containerInstancesArns) {
resp, err := client.ecs.DescribeContainerInstancesWithContext(ctx, &ecs.DescribeContainerInstancesInput{
ContainerInstances: arns,
Cluster: clusterName,
})
if err != nil {
logger.Errorf("Unable to describe container instances: %v", err)
return nil, err
}
for _, container := range resp.ContainerInstances {
instanceIds[aws.StringValue(container.Ec2InstanceId)] = aws.StringValue(container.ContainerInstanceArn)
instanceArns = append(instanceArns, container.Ec2InstanceId)
}
}
if len(instanceArns) > 0 {
for _, ids := range p.chunkIDs(instanceArns) {
input := &ec2.DescribeInstancesInput{
InstanceIds: ids,
}
err := client.ec2.DescribeInstancesPagesWithContext(ctx, input, func(page *ec2.DescribeInstancesOutput, lastPage bool) bool {
if len(page.Reservations) > 0 {
for _, r := range page.Reservations {
for _, i := range r.Instances {
if i.InstanceId != nil {
ec2Instances[instanceIds[aws.StringValue(i.InstanceId)]] = i
}
}
}
}
return !lastPage
})
if err != nil {
logger.Errorf("Unable to describe instances: %v", err)
return nil, err
}
}
}
return ec2Instances, nil
}
func (p *Provider) lookupTaskDefinitions(ctx context.Context, client *awsClient, taskDefArns map[string]*ecs.Task) (map[string]*ecs.TaskDefinition, error) {
logger := log.FromContext(ctx)
taskDef := make(map[string]*ecs.TaskDefinition)
for arn, task := range taskDefArns {
if definition, ok := existingTaskDefCache.Get(arn); ok {
taskDef[arn] = definition.(*ecs.TaskDefinition)
logger.Debugf("Found cached task definition for %s. Skipping the call", arn)
} else {
resp, err := client.ecs.DescribeTaskDefinitionWithContext(ctx, &ecs.DescribeTaskDefinitionInput{
TaskDefinition: task.TaskDefinitionArn,
})
if err != nil {
logger.Errorf("Unable to describe task definition: %v", err)
return nil, err
}
taskDef[arn] = resp.TaskDefinition
existingTaskDefCache.Set(arn, resp.TaskDefinition, cache.DefaultExpiration)
}
}
return taskDef, nil
}
// chunkIDs ECS expects no more than 100 parameters be passed to a API call;
// thus, pack each string into an array capped at 100 elements.
func (p *Provider) chunkIDs(ids []*string) [][]*string {
var chuncked [][]*string
for i := 0; i < len(ids); i += 100 {
var sliceEnd int
if i+100 < len(ids) {
sliceEnd = i + 100
} else {
sliceEnd = len(ids)
}
chuncked = append(chuncked, ids[i:sliceEnd])
}
return chuncked
}

View file

@ -0,0 +1,88 @@
package ecs
import (
"testing"
"github.com/aws/aws-sdk-go/aws"
"github.com/stretchr/testify/assert"
)
func TestChunkIDs(t *testing.T) {
provider := &Provider{}
testCases := []struct {
desc string
count int
expected []int
}{
{
desc: "0 element",
count: 0,
expected: []int(nil),
},
{
desc: "1 element",
count: 1,
expected: []int{1},
},
{
desc: "99 elements, 1 chunk",
count: 99,
expected: []int{99},
},
{
desc: "100 elements, 1 chunk",
count: 100,
expected: []int{100},
},
{
desc: "101 elements, 2 chunks",
count: 101,
expected: []int{100, 1},
},
{
desc: "199 elements, 2 chunks",
count: 199,
expected: []int{100, 99},
},
{
desc: "200 elements, 2 chunks",
count: 200,
expected: []int{100, 100},
},
{
desc: "201 elements, 3 chunks",
count: 201,
expected: []int{100, 100, 1},
},
{
desc: "555 elements, 5 chunks",
count: 555,
expected: []int{100, 100, 100, 100, 100, 55},
},
{
desc: "1001 elements, 11 chunks",
count: 1001,
expected: []int{100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 1},
},
}
for _, test := range testCases {
test := test
t.Run(test.desc, func(t *testing.T) {
t.Parallel()
var IDs []*string
for v := 0; v < test.count; v++ {
IDs = append(IDs, aws.String("a"))
}
var outCount []int
for _, el := range provider.chunkIDs(IDs) {
outCount = append(outCount, len(el))
}
assert.Equal(t, test.expected, outCount)
})
}
}

23
pkg/provider/ecs/label.go Normal file
View file

@ -0,0 +1,23 @@
package ecs
import (
"github.com/containous/traefik/v2/pkg/config/label"
)
// configuration Contains information from the labels that are globals (not related to the dynamic configuration) or specific to the provider.
type configuration struct {
Enable bool
}
func (p *Provider) getConfiguration(instance ecsInstance) (configuration, error) {
conf := configuration{
Enable: p.ExposedByDefault,
}
err := label.Decode(instance.Labels, &conf, "traefik.ecs.", "traefik.enable")
if err != nil {
return configuration{}, err
}
return conf, nil
}