Bump AWS SDK to v2
This commit is contained in:
parent
7cfd10db62
commit
14e400bcd0
8 changed files with 352 additions and 387 deletions
|
@ -3,21 +3,22 @@ package ecs
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"iter"
|
||||
"slices"
|
||||
"strings"
|
||||
"text/template"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/credentials"
|
||||
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
|
||||
"github.com/aws/aws-sdk-go/aws/defaults"
|
||||
"github.com/aws/aws-sdk-go/aws/ec2metadata"
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/aws/aws-sdk-go/service/ec2"
|
||||
"github.com/aws/aws-sdk-go/service/ecs"
|
||||
"github.com/aws/aws-sdk-go/service/ssm"
|
||||
"github.com/aws/aws-sdk-go/service/sts"
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
"github.com/aws/aws-sdk-go-v2/config"
|
||||
"github.com/aws/aws-sdk-go-v2/credentials"
|
||||
"github.com/aws/aws-sdk-go-v2/service/ec2"
|
||||
ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types"
|
||||
"github.com/aws/aws-sdk-go-v2/service/ecs"
|
||||
ecstypes "github.com/aws/aws-sdk-go-v2/service/ecs/types"
|
||||
"github.com/aws/aws-sdk-go-v2/service/ssm"
|
||||
ssmtypes "github.com/aws/aws-sdk-go-v2/service/ssm/types"
|
||||
"github.com/aws/smithy-go/logging"
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
"github.com/patrickmn/go-cache"
|
||||
"github.com/traefik/traefik/v2/pkg/config/dynamic"
|
||||
|
@ -47,29 +48,29 @@ type Provider struct {
|
|||
type ecsInstance struct {
|
||||
Name string
|
||||
ID string
|
||||
containerDefinition *ecs.ContainerDefinition
|
||||
containerDefinition *ecstypes.ContainerDefinition
|
||||
machine *machine
|
||||
Labels map[string]string
|
||||
ExtraConf configuration
|
||||
}
|
||||
|
||||
type portMapping struct {
|
||||
containerPort int64
|
||||
hostPort int64
|
||||
protocol string
|
||||
containerPort int32
|
||||
hostPort int32
|
||||
protocol ecstypes.TransportProtocol
|
||||
}
|
||||
|
||||
type machine struct {
|
||||
state string
|
||||
state ec2types.InstanceStateName
|
||||
privateIP string
|
||||
ports []portMapping
|
||||
healthStatus string
|
||||
healthStatus ecstypes.HealthStatus
|
||||
}
|
||||
|
||||
type awsClient struct {
|
||||
ecs *ecs.ECS
|
||||
ec2 *ec2.EC2
|
||||
ssm *ssm.SSM
|
||||
ecs *ecs.Client
|
||||
ec2 *ec2.Client
|
||||
ssm *ssm.Client
|
||||
}
|
||||
|
||||
// DefaultTemplateRule The default template for the default rule.
|
||||
|
@ -100,56 +101,40 @@ func (p *Provider) Init() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (p *Provider) createClient(logger log.Logger) (*awsClient, error) {
|
||||
sess, err := session.NewSessionWithOptions(session.Options{
|
||||
SharedConfigState: session.SharedConfigEnable,
|
||||
})
|
||||
func (p *Provider) createClient(ctx context.Context, logger log.Logger) (*awsClient, error) {
|
||||
optFns := []func(*config.LoadOptions) error{
|
||||
config.WithLogger(logging.LoggerFunc(func(_ logging.Classification, format string, args ...interface{}) {
|
||||
logger.Debugf(format, args...)
|
||||
})),
|
||||
}
|
||||
if p.Region != "" {
|
||||
optFns = append(optFns, config.WithRegion(p.Region))
|
||||
} else {
|
||||
logger.Infoln("No region provided, will retrieve region from the EC2 Metadata service")
|
||||
optFns = append(optFns, config.WithEC2IMDSRegion())
|
||||
}
|
||||
|
||||
if p.AccessKeyID != "" && p.SecretAccessKey != "" {
|
||||
// From https://docs.aws.amazon.com/sdk-for-go/v2/developer-guide/configure-gosdk.html#specify-credentials-programmatically:
|
||||
// "If you explicitly provide credentials, as in this example, the SDK uses only those credentials."
|
||||
// this makes sure that user-defined credentials always have the highest priority
|
||||
staticCreds := aws.NewCredentialsCache(credentials.NewStaticCredentialsProvider(p.AccessKeyID, p.SecretAccessKey, ""))
|
||||
optFns = append(optFns, config.WithCredentialsProvider(staticCreds))
|
||||
|
||||
// If the access key and secret access key are not provided, config.LoadDefaultConfig
|
||||
// will look for the credentials in the default credential chain.
|
||||
// See https://docs.aws.amazon.com/sdk-for-go/v2/developer-guide/configure-gosdk.html#specifying-credentials.
|
||||
}
|
||||
|
||||
cfg, err := config.LoadDefaultConfig(ctx, optFns...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ec2meta := ec2metadata.New(sess)
|
||||
if p.Region == "" && ec2meta.Available() {
|
||||
logger.Infoln("No region provided, querying instance metadata endpoint...")
|
||||
identity, err := ec2meta.GetInstanceIdentityDocument()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p.Region = identity.Region
|
||||
}
|
||||
|
||||
cfg := aws.NewConfig().
|
||||
WithCredentials(credentials.NewChainCredentials([]credentials.Provider{
|
||||
&credentials.StaticProvider{
|
||||
Value: credentials.Value{
|
||||
AccessKeyID: p.AccessKeyID,
|
||||
SecretAccessKey: p.SecretAccessKey,
|
||||
},
|
||||
},
|
||||
&credentials.EnvProvider{},
|
||||
&credentials.SharedCredentialsProvider{},
|
||||
defaults.RemoteCredProvider(*(defaults.Config()), defaults.Handlers()),
|
||||
stscreds.NewWebIdentityRoleProviderWithOptions(
|
||||
sts.New(sess),
|
||||
os.Getenv("AWS_ROLE_ARN"),
|
||||
"",
|
||||
stscreds.FetchTokenPath(os.Getenv("AWS_WEB_IDENTITY_TOKEN_FILE")),
|
||||
),
|
||||
}))
|
||||
|
||||
// Set the region if it is defined by the user or resolved from the EC2 metadata.
|
||||
if p.Region != "" {
|
||||
cfg.Region = &p.Region
|
||||
}
|
||||
|
||||
cfg.WithLogger(aws.LoggerFunc(func(args ...interface{}) {
|
||||
logger.Debug(args...)
|
||||
}))
|
||||
|
||||
return &awsClient{
|
||||
ecs.New(sess, cfg),
|
||||
ec2.New(sess, cfg),
|
||||
ssm.New(sess, cfg),
|
||||
ecs.NewFromConfig(cfg),
|
||||
ec2.NewFromConfig(cfg),
|
||||
ssm.NewFromConfig(cfg),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -160,7 +145,7 @@ func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe.
|
|||
logger := log.FromContext(ctxLog)
|
||||
|
||||
operation := func() error {
|
||||
awsClient, err := p.createClient(logger)
|
||||
awsClient, err := p.createClient(ctxLog, logger)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to create AWS client: %w", err)
|
||||
}
|
||||
|
@ -218,28 +203,19 @@ func (p *Provider) loadConfiguration(ctx context.Context, client *awsClient, con
|
|||
func (p *Provider) listInstances(ctx context.Context, client *awsClient) ([]ecsInstance, error) {
|
||||
logger := log.FromContext(ctx)
|
||||
|
||||
var clustersArn []*string
|
||||
var clusters []string
|
||||
|
||||
if p.AutoDiscoverClusters {
|
||||
input := &ecs.ListClustersInput{}
|
||||
for {
|
||||
result, err := client.ecs.ListClusters(input)
|
||||
|
||||
paginator := ecs.NewListClustersPaginator(client.ecs, input)
|
||||
for paginator.HasMorePages() {
|
||||
page, err := paginator.NextPage(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if result != nil {
|
||||
clustersArn = append(clustersArn, result.ClusterArns...)
|
||||
input.NextToken = result.NextToken
|
||||
if result.NextToken == nil {
|
||||
break
|
||||
}
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
for _, cArn := range clustersArn {
|
||||
clusters = append(clusters, *cArn)
|
||||
|
||||
clusters = append(clusters, page.ClusterArns...)
|
||||
}
|
||||
} else {
|
||||
clusters = p.Clusters
|
||||
|
@ -251,13 +227,19 @@ func (p *Provider) listInstances(ctx context.Context, client *awsClient) ([]ecsI
|
|||
for _, c := range clusters {
|
||||
input := &ecs.ListTasksInput{
|
||||
Cluster: &c,
|
||||
DesiredStatus: aws.String(ecs.DesiredStatusRunning),
|
||||
DesiredStatus: ecstypes.DesiredStatusRunning,
|
||||
}
|
||||
|
||||
tasks := make(map[string]*ecs.Task)
|
||||
err := client.ecs.ListTasksPagesWithContext(ctx, input, func(page *ecs.ListTasksOutput, lastPage bool) bool {
|
||||
tasks := make(map[string]ecstypes.Task)
|
||||
|
||||
paginator := ecs.NewListTasksPaginator(client.ecs, input)
|
||||
for paginator.HasMorePages() {
|
||||
page, err := paginator.NextPage(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("listing tasks: %w", err)
|
||||
}
|
||||
if len(page.TaskArns) > 0 {
|
||||
resp, err := client.ecs.DescribeTasksWithContext(ctx, &ecs.DescribeTasksInput{
|
||||
resp, err := client.ecs.DescribeTasks(ctx, &ecs.DescribeTasksInput{
|
||||
Tasks: page.TaskArns,
|
||||
Cluster: &c,
|
||||
})
|
||||
|
@ -265,16 +247,12 @@ func (p *Provider) listInstances(ctx context.Context, client *awsClient) ([]ecsI
|
|||
logger.Errorf("Unable to describe tasks for %v", page.TaskArns)
|
||||
} else {
|
||||
for _, t := range resp.Tasks {
|
||||
if aws.StringValue(t.LastStatus) == ecs.DesiredStatusRunning {
|
||||
tasks[aws.StringValue(t.TaskArn)] = t
|
||||
if aws.ToString(t.LastStatus) == string(ecstypes.DesiredStatusRunning) {
|
||||
tasks[aws.ToString(t.TaskArn)] = t
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return !lastPage
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("listing tasks: %w", err)
|
||||
}
|
||||
|
||||
// Skip to the next cluster if there are no tasks found on
|
||||
|
@ -288,7 +266,7 @@ func (p *Provider) listInstances(ctx context.Context, client *awsClient) ([]ecsI
|
|||
return nil, err
|
||||
}
|
||||
|
||||
miInstances := make(map[string]*ssm.InstanceInformation)
|
||||
miInstances := make(map[string]ssmtypes.InstanceInformation)
|
||||
if p.ECSAnywhere {
|
||||
// Try looking up for instances on ECS Anywhere
|
||||
miInstances, err = p.lookupMiInstances(ctx, client, &c, tasks)
|
||||
|
@ -303,74 +281,67 @@ func (p *Provider) listInstances(ctx context.Context, client *awsClient) ([]ecsI
|
|||
}
|
||||
|
||||
for key, task := range tasks {
|
||||
containerInstance := ec2Instances[aws.StringValue(task.ContainerInstanceArn)]
|
||||
containerInstance, hasContainerInstance := ec2Instances[aws.ToString(task.ContainerInstanceArn)]
|
||||
taskDef := taskDefinitions[key]
|
||||
|
||||
for _, container := range task.Containers {
|
||||
var containerDefinition *ecs.ContainerDefinition
|
||||
var containerDefinition *ecstypes.ContainerDefinition
|
||||
for _, def := range taskDef.ContainerDefinitions {
|
||||
if aws.StringValue(container.Name) == aws.StringValue(def.Name) {
|
||||
containerDefinition = def
|
||||
if aws.ToString(container.Name) == aws.ToString(def.Name) {
|
||||
containerDefinition = &def
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if containerDefinition == nil {
|
||||
logger.Debugf("Unable to find container definition for %s", aws.StringValue(container.Name))
|
||||
logger.Debugf("Unable to find container definition for %s", aws.ToString(container.Name))
|
||||
continue
|
||||
}
|
||||
|
||||
var mach *machine
|
||||
if aws.StringValue(taskDef.NetworkMode) == "awsvpc" && len(task.Attachments) != 0 {
|
||||
if taskDef.NetworkMode == ecstypes.NetworkModeAwsvpc && len(task.Attachments) != 0 {
|
||||
if len(container.NetworkInterfaces) == 0 {
|
||||
logger.Errorf("Skip container %s: no network interfaces", aws.StringValue(container.Name))
|
||||
logger.Errorf("Skip container %s: no network interfaces", aws.ToString(container.Name))
|
||||
continue
|
||||
}
|
||||
|
||||
var ports []portMapping
|
||||
for _, mapping := range containerDefinition.PortMappings {
|
||||
if mapping != nil {
|
||||
protocol := "TCP"
|
||||
if aws.StringValue(mapping.Protocol) == "udp" {
|
||||
protocol = "UDP"
|
||||
}
|
||||
|
||||
ports = append(ports, portMapping{
|
||||
hostPort: aws.Int64Value(mapping.HostPort),
|
||||
containerPort: aws.Int64Value(mapping.ContainerPort),
|
||||
protocol: protocol,
|
||||
})
|
||||
}
|
||||
ports = append(ports, portMapping{
|
||||
hostPort: aws.ToInt32(mapping.HostPort),
|
||||
containerPort: aws.ToInt32(mapping.ContainerPort),
|
||||
protocol: mapping.Protocol,
|
||||
})
|
||||
}
|
||||
mach = &machine{
|
||||
privateIP: aws.StringValue(container.NetworkInterfaces[0].PrivateIpv4Address),
|
||||
privateIP: aws.ToString(container.NetworkInterfaces[0].PrivateIpv4Address),
|
||||
ports: ports,
|
||||
state: aws.StringValue(task.LastStatus),
|
||||
healthStatus: aws.StringValue(task.HealthStatus),
|
||||
state: ec2types.InstanceStateName(strings.ToLower(aws.ToString(task.LastStatus))),
|
||||
healthStatus: task.HealthStatus,
|
||||
}
|
||||
} else {
|
||||
miContainerInstance := miInstances[aws.StringValue(task.ContainerInstanceArn)]
|
||||
if containerInstance == nil && miContainerInstance == nil {
|
||||
logger.Errorf("Unable to find container instance information for %s", aws.StringValue(container.Name))
|
||||
miContainerInstance, hasMiContainerInstance := miInstances[aws.ToString(task.ContainerInstanceArn)]
|
||||
if !hasContainerInstance && !hasMiContainerInstance {
|
||||
logger.Errorf("Unable to find container instance information for %s", aws.ToString(container.Name))
|
||||
continue
|
||||
}
|
||||
|
||||
var ports []portMapping
|
||||
for _, mapping := range container.NetworkBindings {
|
||||
if mapping != nil {
|
||||
ports = append(ports, portMapping{
|
||||
hostPort: aws.Int64Value(mapping.HostPort),
|
||||
containerPort: aws.Int64Value(mapping.ContainerPort),
|
||||
})
|
||||
}
|
||||
ports = append(ports, portMapping{
|
||||
hostPort: aws.ToInt32(mapping.HostPort),
|
||||
containerPort: aws.ToInt32(mapping.ContainerPort),
|
||||
protocol: mapping.Protocol,
|
||||
})
|
||||
}
|
||||
var privateIPAddress, stateName string
|
||||
if containerInstance != nil {
|
||||
privateIPAddress = aws.StringValue(containerInstance.PrivateIpAddress)
|
||||
stateName = aws.StringValue(containerInstance.State.Name)
|
||||
} else if miContainerInstance != nil {
|
||||
privateIPAddress = aws.StringValue(miContainerInstance.IPAddress)
|
||||
stateName = aws.StringValue(task.LastStatus)
|
||||
var privateIPAddress string
|
||||
var stateName ec2types.InstanceStateName
|
||||
if hasContainerInstance {
|
||||
privateIPAddress = aws.ToString(containerInstance.PrivateIpAddress)
|
||||
stateName = containerInstance.State.Name
|
||||
} else if hasMiContainerInstance {
|
||||
privateIPAddress = aws.ToString(miContainerInstance.IPAddress)
|
||||
stateName = ec2types.InstanceStateName(strings.ToLower(aws.ToString(task.LastStatus)))
|
||||
}
|
||||
|
||||
mach = &machine{
|
||||
|
@ -381,11 +352,11 @@ func (p *Provider) listInstances(ctx context.Context, client *awsClient) ([]ecsI
|
|||
}
|
||||
|
||||
instance := ecsInstance{
|
||||
Name: fmt.Sprintf("%s-%s", strings.Replace(aws.StringValue(task.Group), ":", "-", 1), *container.Name),
|
||||
Name: fmt.Sprintf("%s-%s", strings.Replace(aws.ToString(task.Group), ":", "-", 1), aws.ToString(container.Name)),
|
||||
ID: key[len(key)-12:],
|
||||
containerDefinition: containerDefinition,
|
||||
machine: mach,
|
||||
Labels: aws.StringValueMap(containerDefinition.DockerLabels),
|
||||
Labels: containerDefinition.DockerLabels,
|
||||
}
|
||||
|
||||
extraConf, err := p.getConfiguration(instance)
|
||||
|
@ -403,21 +374,21 @@ func (p *Provider) listInstances(ctx context.Context, client *awsClient) ([]ecsI
|
|||
return instances, nil
|
||||
}
|
||||
|
||||
func (p *Provider) lookupMiInstances(ctx context.Context, client *awsClient, clusterName *string, ecsDatas map[string]*ecs.Task) (map[string]*ssm.InstanceInformation, error) {
|
||||
func (p *Provider) lookupMiInstances(ctx context.Context, client *awsClient, clusterName *string, ecsDatas map[string]ecstypes.Task) (map[string]ssmtypes.InstanceInformation, error) {
|
||||
instanceIDs := make(map[string]string)
|
||||
miInstances := make(map[string]*ssm.InstanceInformation)
|
||||
miInstances := make(map[string]ssmtypes.InstanceInformation)
|
||||
|
||||
var containerInstancesArns []*string
|
||||
var instanceArns []*string
|
||||
var containerInstancesArns []string
|
||||
var instanceArns []string
|
||||
|
||||
for _, task := range ecsDatas {
|
||||
if task.ContainerInstanceArn != nil {
|
||||
containerInstancesArns = append(containerInstancesArns, task.ContainerInstanceArn)
|
||||
containerInstancesArns = append(containerInstancesArns, *task.ContainerInstanceArn)
|
||||
}
|
||||
}
|
||||
|
||||
for _, arns := range p.chunkIDs(containerInstancesArns) {
|
||||
resp, err := client.ecs.DescribeContainerInstancesWithContext(ctx, &ecs.DescribeContainerInstancesInput{
|
||||
for arns := range chunkIDs(containerInstancesArns) {
|
||||
resp, err := client.ecs.DescribeContainerInstances(ctx, &ecs.DescribeContainerInstancesInput{
|
||||
ContainerInstances: arns,
|
||||
Cluster: clusterName,
|
||||
})
|
||||
|
@ -426,23 +397,21 @@ func (p *Provider) lookupMiInstances(ctx context.Context, client *awsClient, clu
|
|||
}
|
||||
|
||||
for _, container := range resp.ContainerInstances {
|
||||
instanceIDs[aws.StringValue(container.Ec2InstanceId)] = aws.StringValue(container.ContainerInstanceArn)
|
||||
instanceIDs[aws.ToString(container.Ec2InstanceId)] = aws.ToString(container.ContainerInstanceArn)
|
||||
|
||||
// Disallow EC2 Instance IDs
|
||||
// This prevents considering EC2 instances in ECS
|
||||
// and getting InvalidInstanceID.Malformed error when calling the describe-instances endpoint.
|
||||
if !strings.HasPrefix(aws.StringValue(container.Ec2InstanceId), "mi-") {
|
||||
continue
|
||||
if strings.HasPrefix(aws.ToString(container.Ec2InstanceId), "mi-") {
|
||||
instanceArns = append(instanceArns, *container.Ec2InstanceId)
|
||||
}
|
||||
|
||||
instanceArns = append(instanceArns, container.Ec2InstanceId)
|
||||
}
|
||||
}
|
||||
|
||||
if len(instanceArns) > 0 {
|
||||
for _, ids := range p.chunkIDs(instanceArns) {
|
||||
for ids := range chunkIDs(instanceArns) {
|
||||
input := &ssm.DescribeInstanceInformationInput{
|
||||
Filters: []*ssm.InstanceInformationStringFilter{
|
||||
Filters: []ssmtypes.InstanceInformationStringFilter{
|
||||
{
|
||||
Key: aws.String("InstanceIds"),
|
||||
Values: ids,
|
||||
|
@ -450,18 +419,18 @@ func (p *Provider) lookupMiInstances(ctx context.Context, client *awsClient, clu
|
|||
},
|
||||
}
|
||||
|
||||
err := client.ssm.DescribeInstanceInformationPagesWithContext(ctx, input, func(page *ssm.DescribeInstanceInformationOutput, lastPage bool) bool {
|
||||
if len(page.InstanceInformationList) > 0 {
|
||||
for _, i := range page.InstanceInformationList {
|
||||
if i.InstanceId != nil {
|
||||
miInstances[instanceIDs[aws.StringValue(i.InstanceId)]] = i
|
||||
}
|
||||
paginator := ssm.NewDescribeInstanceInformationPaginator(client.ssm, input)
|
||||
for paginator.HasMorePages() {
|
||||
page, err := paginator.NextPage(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("describing instances: %w", err)
|
||||
}
|
||||
|
||||
for _, i := range page.InstanceInformationList {
|
||||
if i.InstanceId != nil {
|
||||
miInstances[instanceIDs[aws.ToString(i.InstanceId)]] = i
|
||||
}
|
||||
}
|
||||
return !lastPage
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("describing instances: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -469,21 +438,21 @@ func (p *Provider) lookupMiInstances(ctx context.Context, client *awsClient, clu
|
|||
return miInstances, nil
|
||||
}
|
||||
|
||||
func (p *Provider) lookupEc2Instances(ctx context.Context, client *awsClient, clusterName *string, ecsDatas map[string]*ecs.Task) (map[string]*ec2.Instance, error) {
|
||||
func (p *Provider) lookupEc2Instances(ctx context.Context, client *awsClient, clusterName *string, ecsDatas map[string]ecstypes.Task) (map[string]ec2types.Instance, error) {
|
||||
instanceIDs := make(map[string]string)
|
||||
ec2Instances := make(map[string]*ec2.Instance)
|
||||
ec2Instances := make(map[string]ec2types.Instance)
|
||||
|
||||
var containerInstancesArns []*string
|
||||
var instanceArns []*string
|
||||
var containerInstancesArns []string
|
||||
var instanceArns []string
|
||||
|
||||
for _, task := range ecsDatas {
|
||||
if task.ContainerInstanceArn != nil {
|
||||
containerInstancesArns = append(containerInstancesArns, task.ContainerInstanceArn)
|
||||
containerInstancesArns = append(containerInstancesArns, *task.ContainerInstanceArn)
|
||||
}
|
||||
}
|
||||
|
||||
for _, arns := range p.chunkIDs(containerInstancesArns) {
|
||||
resp, err := client.ecs.DescribeContainerInstancesWithContext(ctx, &ecs.DescribeContainerInstancesInput{
|
||||
for arns := range chunkIDs(containerInstancesArns) {
|
||||
resp, err := client.ecs.DescribeContainerInstances(ctx, &ecs.DescribeContainerInstancesInput{
|
||||
ContainerInstances: arns,
|
||||
Cluster: clusterName,
|
||||
})
|
||||
|
@ -492,38 +461,38 @@ func (p *Provider) lookupEc2Instances(ctx context.Context, client *awsClient, cl
|
|||
}
|
||||
|
||||
for _, container := range resp.ContainerInstances {
|
||||
instanceIDs[aws.StringValue(container.Ec2InstanceId)] = aws.StringValue(container.ContainerInstanceArn)
|
||||
instanceIDs[aws.ToString(container.Ec2InstanceId)] = aws.ToString(container.ContainerInstanceArn)
|
||||
// Disallow Instance IDs of the form mi-*
|
||||
// This prevents considering external instances in ECS Anywhere setups
|
||||
// and getting InvalidInstanceID.Malformed error when calling the describe-instances endpoint.
|
||||
if strings.HasPrefix(aws.StringValue(container.Ec2InstanceId), "mi-") {
|
||||
if strings.HasPrefix(aws.ToString(container.Ec2InstanceId), "mi-") {
|
||||
continue
|
||||
}
|
||||
|
||||
instanceArns = append(instanceArns, container.Ec2InstanceId)
|
||||
if container.Ec2InstanceId != nil {
|
||||
instanceArns = append(instanceArns, *container.Ec2InstanceId)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(instanceArns) > 0 {
|
||||
for _, ids := range p.chunkIDs(instanceArns) {
|
||||
for ids := range chunkIDs(instanceArns) {
|
||||
input := &ec2.DescribeInstancesInput{
|
||||
InstanceIds: ids,
|
||||
}
|
||||
|
||||
err := client.ec2.DescribeInstancesPagesWithContext(ctx, input, func(page *ec2.DescribeInstancesOutput, lastPage bool) bool {
|
||||
if len(page.Reservations) > 0 {
|
||||
for _, r := range page.Reservations {
|
||||
for _, i := range r.Instances {
|
||||
if i.InstanceId != nil {
|
||||
ec2Instances[instanceIDs[aws.StringValue(i.InstanceId)]] = i
|
||||
}
|
||||
paginator := ec2.NewDescribeInstancesPaginator(client.ec2, input)
|
||||
for paginator.HasMorePages() {
|
||||
page, err := paginator.NextPage(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("describing instances: %w", err)
|
||||
}
|
||||
for _, r := range page.Reservations {
|
||||
for _, i := range r.Instances {
|
||||
if i.InstanceId != nil {
|
||||
ec2Instances[instanceIDs[aws.ToString(i.InstanceId)]] = i
|
||||
}
|
||||
}
|
||||
}
|
||||
return !lastPage
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("describing instances: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -531,16 +500,16 @@ func (p *Provider) lookupEc2Instances(ctx context.Context, client *awsClient, cl
|
|||
return ec2Instances, nil
|
||||
}
|
||||
|
||||
func (p *Provider) lookupTaskDefinitions(ctx context.Context, client *awsClient, taskDefArns map[string]*ecs.Task) (map[string]*ecs.TaskDefinition, error) {
|
||||
func (p *Provider) lookupTaskDefinitions(ctx context.Context, client *awsClient, taskDefArns map[string]ecstypes.Task) (map[string]*ecstypes.TaskDefinition, error) {
|
||||
logger := log.FromContext(ctx)
|
||||
taskDef := make(map[string]*ecs.TaskDefinition)
|
||||
taskDef := make(map[string]*ecstypes.TaskDefinition)
|
||||
|
||||
for arn, task := range taskDefArns {
|
||||
if definition, ok := existingTaskDefCache.Get(arn); ok {
|
||||
taskDef[arn] = definition.(*ecs.TaskDefinition)
|
||||
taskDef[arn] = definition.(*ecstypes.TaskDefinition)
|
||||
logger.Debugf("Found cached task definition for %s. Skipping the call", arn)
|
||||
} else {
|
||||
resp, err := client.ecs.DescribeTaskDefinitionWithContext(ctx, &ecs.DescribeTaskDefinitionInput{
|
||||
resp, err := client.ecs.DescribeTaskDefinition(ctx, &ecs.DescribeTaskDefinitionInput{
|
||||
TaskDefinition: task.TaskDefinitionArn,
|
||||
})
|
||||
if err != nil {
|
||||
|
@ -556,16 +525,6 @@ func (p *Provider) lookupTaskDefinitions(ctx context.Context, client *awsClient,
|
|||
|
||||
// chunkIDs ECS expects no more than 100 parameters be passed to a API call;
|
||||
// thus, pack each string into an array capped at 100 elements.
|
||||
func (p *Provider) chunkIDs(ids []*string) [][]*string {
|
||||
var chunked [][]*string
|
||||
for i := 0; i < len(ids); i += 100 {
|
||||
var sliceEnd int
|
||||
if i+100 < len(ids) {
|
||||
sliceEnd = i + 100
|
||||
} else {
|
||||
sliceEnd = len(ids)
|
||||
}
|
||||
chunked = append(chunked, ids[i:sliceEnd])
|
||||
}
|
||||
return chunked
|
||||
func chunkIDs(ids []string) iter.Seq[[]string] {
|
||||
return slices.Chunk(ids, 100)
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue