AWS ECS Fargate
This commit is contained in:
parent
e76836b948
commit
a7200a292b
39 changed files with 6240 additions and 1529 deletions
|
@ -10,7 +10,6 @@ import (
|
|||
"github.com/aws/aws-sdk-go/aws/credentials"
|
||||
"github.com/aws/aws-sdk-go/aws/defaults"
|
||||
"github.com/aws/aws-sdk-go/aws/ec2metadata"
|
||||
"github.com/aws/aws-sdk-go/aws/request"
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/aws/aws-sdk-go/service/ec2"
|
||||
"github.com/aws/aws-sdk-go/service/ecs"
|
||||
|
@ -44,14 +43,18 @@ type Provider struct {
|
|||
type ecsInstance struct {
|
||||
Name string
|
||||
ID string
|
||||
task *ecs.Task
|
||||
taskDefinition *ecs.TaskDefinition
|
||||
container *ecs.Container
|
||||
containerDefinition *ecs.ContainerDefinition
|
||||
machine *ec2.Instance
|
||||
machine *machine
|
||||
TraefikLabels map[string]string
|
||||
}
|
||||
|
||||
type machine struct {
|
||||
name string
|
||||
state string
|
||||
privateIP string
|
||||
port int64
|
||||
}
|
||||
|
||||
type awsClient struct {
|
||||
ecs *ecs.ECS
|
||||
ec2 *ec2.EC2
|
||||
|
@ -171,11 +174,6 @@ func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *s
|
|||
return nil
|
||||
}
|
||||
|
||||
func wrapAws(ctx context.Context, req *request.Request) error {
|
||||
req.HTTPRequest = req.HTTPRequest.WithContext(ctx)
|
||||
return req.Send()
|
||||
}
|
||||
|
||||
// Find all running Provider tasks in a cluster, also collect the task definitions (for docker labels)
|
||||
// and the EC2 instance data
|
||||
func (p *Provider) listInstances(ctx context.Context, client *awsClient) ([]ecsInstance, error) {
|
||||
|
@ -217,94 +215,97 @@ func (p *Provider) listInstances(ctx context.Context, client *awsClient) ([]ecsI
|
|||
|
||||
for _, c := range clusters {
|
||||
|
||||
req, _ := client.ecs.ListTasksRequest(&ecs.ListTasksInput{
|
||||
input := &ecs.ListTasksInput{
|
||||
Cluster: &c,
|
||||
DesiredStatus: aws.String(ecs.DesiredStatusRunning),
|
||||
}
|
||||
tasks := make(map[string]*ecs.Task)
|
||||
err := client.ecs.ListTasksPagesWithContext(ctx, input, func(page *ecs.ListTasksOutput, lastPage bool) bool {
|
||||
if len(page.TaskArns) > 0 {
|
||||
resp, err := client.ecs.DescribeTasksWithContext(ctx, &ecs.DescribeTasksInput{
|
||||
Tasks: page.TaskArns,
|
||||
Cluster: &c,
|
||||
})
|
||||
if err != nil {
|
||||
log.Errorf("Unable to describe tasks for %s", page.TaskArns)
|
||||
} else {
|
||||
for _, t := range resp.Tasks {
|
||||
tasks[aws.StringValue(t.TaskArn)] = t
|
||||
}
|
||||
}
|
||||
}
|
||||
return !lastPage
|
||||
})
|
||||
|
||||
var taskArns []*string
|
||||
|
||||
for ; req != nil; req = req.NextPage() {
|
||||
if err := wrapAws(ctx, req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
taskArns = append(taskArns, req.Data.(*ecs.ListTasksOutput).TaskArns...)
|
||||
if err != nil {
|
||||
log.Error("Unable to list tasks")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Skip to the next cluster if there are no tasks found on
|
||||
// this cluster.
|
||||
if len(taskArns) == 0 {
|
||||
if len(tasks) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
chunkedTaskArns := chunkedTaskArns(taskArns)
|
||||
var tasks []*ecs.Task
|
||||
|
||||
for _, arns := range chunkedTaskArns {
|
||||
req, taskResp := client.ecs.DescribeTasksRequest(&ecs.DescribeTasksInput{
|
||||
Tasks: arns,
|
||||
Cluster: &c,
|
||||
})
|
||||
|
||||
if err := wrapAws(ctx, req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
tasks = append(tasks, taskResp.Tasks...)
|
||||
|
||||
}
|
||||
|
||||
containerInstanceArns := make([]*string, 0)
|
||||
byContainerInstance := make(map[string]int)
|
||||
|
||||
taskDefinitionArns := make([]*string, 0)
|
||||
byTaskDefinition := make(map[string]int)
|
||||
|
||||
for _, task := range tasks {
|
||||
if _, found := byContainerInstance[aws.StringValue(task.ContainerInstanceArn)]; !found {
|
||||
byContainerInstance[aws.StringValue(task.ContainerInstanceArn)] = len(containerInstanceArns)
|
||||
containerInstanceArns = append(containerInstanceArns, task.ContainerInstanceArn)
|
||||
}
|
||||
if _, found := byTaskDefinition[aws.StringValue(task.TaskDefinitionArn)]; !found {
|
||||
byTaskDefinition[aws.StringValue(task.TaskDefinitionArn)] = len(taskDefinitionArns)
|
||||
taskDefinitionArns = append(taskDefinitionArns, task.TaskDefinitionArn)
|
||||
}
|
||||
}
|
||||
|
||||
machines, err := p.lookupEc2Instances(ctx, client, &c, containerInstanceArns)
|
||||
ec2Instances, err := p.lookupEc2Instances(ctx, client, &c, tasks)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
taskDefinitions, err := p.lookupTaskDefinitions(ctx, client, taskDefinitionArns)
|
||||
taskDefinitions, err := p.lookupTaskDefinitions(ctx, client, tasks)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for _, task := range tasks {
|
||||
for key, task := range tasks {
|
||||
|
||||
machineIdx := byContainerInstance[aws.StringValue(task.ContainerInstanceArn)]
|
||||
taskDefIdx := byTaskDefinition[aws.StringValue(task.TaskDefinitionArn)]
|
||||
containerInstance := ec2Instances[aws.StringValue(task.ContainerInstanceArn)]
|
||||
taskDef := taskDefinitions[key]
|
||||
|
||||
for _, container := range task.Containers {
|
||||
|
||||
taskDefinition := taskDefinitions[taskDefIdx]
|
||||
var containerDefinition *ecs.ContainerDefinition
|
||||
for _, def := range taskDefinition.ContainerDefinitions {
|
||||
for _, def := range taskDef.ContainerDefinitions {
|
||||
if aws.StringValue(container.Name) == aws.StringValue(def.Name) {
|
||||
containerDefinition = def
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if containerDefinition == nil {
|
||||
log.Debugf("Unable to find container definition for %s", aws.StringValue(container.Name))
|
||||
continue
|
||||
}
|
||||
|
||||
var mach *machine
|
||||
if aws.StringValue(task.LaunchType) == ecs.LaunchTypeFargate {
|
||||
var hostPort int64
|
||||
if len(containerDefinition.PortMappings) > 0 && containerDefinition.PortMappings[0] != nil {
|
||||
hostPort = aws.Int64Value(containerDefinition.PortMappings[0].HostPort)
|
||||
}
|
||||
mach = &machine{
|
||||
privateIP: aws.StringValue(container.NetworkInterfaces[0].PrivateIpv4Address),
|
||||
port: hostPort,
|
||||
state: aws.StringValue(task.LastStatus),
|
||||
}
|
||||
} else {
|
||||
var hostPort int64
|
||||
if len(container.NetworkBindings) > 0 && container.NetworkBindings[0] != nil {
|
||||
hostPort = aws.Int64Value(container.NetworkBindings[0].HostPort)
|
||||
}
|
||||
mach = &machine{
|
||||
privateIP: aws.StringValue(containerInstance.PrivateIpAddress),
|
||||
port: hostPort,
|
||||
state: aws.StringValue(containerInstance.State.Name),
|
||||
}
|
||||
}
|
||||
|
||||
instances = append(instances, ecsInstance{
|
||||
Name: fmt.Sprintf("%s-%s", strings.Replace(aws.StringValue(task.Group), ":", "-", 1), *container.Name),
|
||||
ID: (aws.StringValue(task.TaskArn))[len(aws.StringValue(task.TaskArn))-12:],
|
||||
task: task,
|
||||
taskDefinition: taskDefinition,
|
||||
container: container,
|
||||
ID: key[len(key)-12:],
|
||||
containerDefinition: containerDefinition,
|
||||
machine: machines[machineIdx],
|
||||
machine: mach,
|
||||
TraefikLabels: aws.StringValueMap(containerDefinition.DockerLabels),
|
||||
})
|
||||
}
|
||||
|
@ -314,68 +315,77 @@ func (p *Provider) listInstances(ctx context.Context, client *awsClient) ([]ecsI
|
|||
return instances, nil
|
||||
}
|
||||
|
||||
func (p *Provider) lookupEc2Instances(ctx context.Context, client *awsClient, clusterName *string, containerArns []*string) ([]*ec2.Instance, error) {
|
||||
func (p *Provider) lookupEc2Instances(ctx context.Context, client *awsClient, clusterName *string, ecsDatas map[string]*ecs.Task) (map[string]*ec2.Instance, error) {
|
||||
|
||||
order := make(map[string]int)
|
||||
instanceIds := make([]*string, len(containerArns))
|
||||
instances := make([]*ec2.Instance, len(containerArns))
|
||||
for i, arn := range containerArns {
|
||||
order[aws.StringValue(arn)] = i
|
||||
instanceIds := make(map[string]string)
|
||||
ec2Instances := make(map[string]*ec2.Instance)
|
||||
|
||||
var containerInstancesArns []*string
|
||||
var instanceArns []*string
|
||||
|
||||
for _, task := range ecsDatas {
|
||||
if task.ContainerInstanceArn != nil {
|
||||
containerInstancesArns = append(containerInstancesArns, task.ContainerInstanceArn)
|
||||
}
|
||||
}
|
||||
|
||||
req, _ := client.ecs.DescribeContainerInstancesRequest(&ecs.DescribeContainerInstancesInput{
|
||||
ContainerInstances: containerArns,
|
||||
resp, err := client.ecs.DescribeContainerInstancesWithContext(ctx, &ecs.DescribeContainerInstancesInput{
|
||||
ContainerInstances: containerInstancesArns,
|
||||
Cluster: clusterName,
|
||||
})
|
||||
|
||||
for ; req != nil; req = req.NextPage() {
|
||||
if err := wrapAws(ctx, req); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
containerResp := req.Data.(*ecs.DescribeContainerInstancesOutput)
|
||||
for i, container := range containerResp.ContainerInstances {
|
||||
order[aws.StringValue(container.Ec2InstanceId)] = order[aws.StringValue(container.ContainerInstanceArn)]
|
||||
instanceIds[i] = container.Ec2InstanceId
|
||||
}
|
||||
if err != nil {
|
||||
log.Errorf("Unable to describe container instances: %s", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
req, _ = client.ec2.DescribeInstancesRequest(&ec2.DescribeInstancesInput{
|
||||
InstanceIds: instanceIds,
|
||||
})
|
||||
for _, container := range resp.ContainerInstances {
|
||||
instanceIds[aws.StringValue(container.Ec2InstanceId)] = aws.StringValue(container.ContainerInstanceArn)
|
||||
instanceArns = append(instanceArns, container.Ec2InstanceId)
|
||||
}
|
||||
|
||||
for ; req != nil; req = req.NextPage() {
|
||||
if err := wrapAws(ctx, req); err != nil {
|
||||
return nil, err
|
||||
if len(instanceArns) > 0 {
|
||||
input := &ec2.DescribeInstancesInput{
|
||||
InstanceIds: instanceArns,
|
||||
}
|
||||
|
||||
instancesResp := req.Data.(*ec2.DescribeInstancesOutput)
|
||||
for _, r := range instancesResp.Reservations {
|
||||
for _, i := range r.Instances {
|
||||
if i.InstanceId != nil {
|
||||
instances[order[aws.StringValue(i.InstanceId)]] = i
|
||||
err = client.ec2.DescribeInstancesPagesWithContext(ctx, input, func(page *ec2.DescribeInstancesOutput, lastPage bool) bool {
|
||||
if len(page.Reservations) > 0 {
|
||||
for _, r := range page.Reservations {
|
||||
for _, i := range r.Instances {
|
||||
if i.InstanceId != nil {
|
||||
ec2Instances[instanceIds[aws.StringValue(i.InstanceId)]] = i
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return instances, nil
|
||||
}
|
||||
|
||||
func (p *Provider) lookupTaskDefinitions(ctx context.Context, client *awsClient, taskDefArns []*string) ([]*ecs.TaskDefinition, error) {
|
||||
taskDefinitions := make([]*ecs.TaskDefinition, len(taskDefArns))
|
||||
for i, arn := range taskDefArns {
|
||||
|
||||
req, resp := client.ecs.DescribeTaskDefinitionRequest(&ecs.DescribeTaskDefinitionInput{
|
||||
TaskDefinition: arn,
|
||||
return !lastPage
|
||||
})
|
||||
|
||||
if err := wrapAws(ctx, req); err != nil {
|
||||
if err != nil {
|
||||
log.Errorf("Unable to describe instances: %s", err)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return ec2Instances, nil
|
||||
}
|
||||
|
||||
func (p *Provider) lookupTaskDefinitions(ctx context.Context, client *awsClient, taskDefArns map[string]*ecs.Task) (map[string]*ecs.TaskDefinition, error) {
|
||||
taskDef := make(map[string]*ecs.TaskDefinition)
|
||||
for arn, task := range taskDefArns {
|
||||
resp, err := client.ecs.DescribeTaskDefinitionWithContext(ctx, &ecs.DescribeTaskDefinitionInput{
|
||||
TaskDefinition: task.TaskDefinitionArn,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
log.Errorf("Unable to describe task definition: %s", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
taskDefinitions[i] = resp.TaskDefinition
|
||||
taskDef[arn] = resp.TaskDefinition
|
||||
}
|
||||
return taskDefinitions, nil
|
||||
return taskDef, nil
|
||||
}
|
||||
|
||||
func (p *Provider) loadECSConfig(ctx context.Context, client *awsClient) (*types.Configuration, error) {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue