IP-per-task: (#841)

Support IP per task with marathon/mesos
This commit is contained in:
Diego de Oliveira 2017-01-06 13:26:50 -02:00 committed by Emile Vauge
parent 8004132a3a
commit d74ea22d7d
6 changed files with 279 additions and 74 deletions

View file

@ -83,9 +83,10 @@ func (provider *Marathon) Provide(configurationChan chan<- types.ConfigMessage,
return err
}
provider.marathonClient = client
update := make(marathon.EventsChannel, 5)
if provider.Watch {
if err := client.AddEventsListener(update, marathon.EventIDApplications); err != nil {
update, err := client.AddEventsListener(marathon.EventIDApplications)
if err != nil {
log.Errorf("Failed to register for events, %s", err)
return err
}
@ -129,6 +130,7 @@ func (provider *Marathon) Provide(configurationChan chan<- types.ConfigMessage,
func (provider *Marathon) loadMarathonConfig() *types.Configuration {
var MarathonFuncMap = template.FuncMap{
"getBackend": provider.getBackend,
"getBackendServer": provider.getBackendServer,
"getPort": provider.getPort,
"getWeight": provider.getWeight,
"getDomain": provider.getDomain,
@ -188,15 +190,16 @@ func (provider *Marathon) loadMarathonConfig() *types.Configuration {
}
func (provider *Marathon) taskFilter(task marathon.Task, applications *marathon.Applications, exposedByDefaultFlag bool) bool {
if len(task.Ports) == 0 {
log.Debug("Filtering marathon task without port %s", task.AppID)
return false
}
application, err := getApplication(task, applications.Apps)
if err != nil {
log.Errorf("Unable to get marathon application from task %s", task.AppID)
return false
}
ports := processPorts(application, task)
if len(ports) == 0 {
log.Debug("Filtering marathon task without port %s", task.AppID)
return false
}
label, _ := provider.getLabel(application, "traefik.tags")
constraintTags := strings.Split(label, ",")
if provider.MarathonLBCompatibility {
@ -223,10 +226,9 @@ func (provider *Marathon) taskFilter(task marathon.Task, applications *marathon.
log.Debugf("Filtering marathon task %s specifying both traefik.portIndex and traefik.port labels", task.AppID)
return false
}
if portIndexLabel != "" {
index, err := strconv.Atoi((*application.Labels)["traefik.portIndex"])
if err != nil || index < 0 || index > len(application.Ports)-1 {
if err != nil || index < 0 || index > len(ports)-1 {
log.Debugf("Filtering marathon task %s with unexpected value for traefik.portIndex label", task.AppID)
return false
}
@ -239,7 +241,7 @@ func (provider *Marathon) taskFilter(task marathon.Task, applications *marathon.
}
var foundPort bool
for _, exposedPort := range task.Ports {
for _, exposedPort := range ports {
if port == exposedPort {
foundPort = true
break
@ -315,17 +317,17 @@ func (provider *Marathon) getPort(task marathon.Task, applications []marathon.Ap
log.Errorf("Unable to get marathon application from task %s", task.AppID)
return ""
}
ports := processPorts(application, task)
if portIndexLabel, err := provider.getLabel(application, "traefik.portIndex"); err == nil {
if index, err := strconv.Atoi(portIndexLabel); err == nil {
return strconv.Itoa(task.Ports[index])
return strconv.Itoa(ports[index])
}
}
if portValueLabel, err := provider.getLabel(application, "traefik.port"); err == nil {
return portValueLabel
}
for _, port := range task.Ports {
for _, port := range ports {
return strconv.Itoa(port)
}
return ""
@ -488,3 +490,62 @@ func (provider *Marathon) getCircuitBreakerExpression(application marathon.Appli
}
return "NetworkErrorRatio() > 1"
}
func processPorts(application marathon.Application, task marathon.Task) []int {
// First using application ports
if len(application.Ports) > 0 {
return application.Ports
}
// Using default port configuration
if task.Ports != nil && len(task.Ports) > 0 {
return task.Ports
}
// Using port definition if available
if application.PortDefinitions != nil && len(*application.PortDefinitions) > 0 {
ports := make([]int, 0)
for _, def := range *application.PortDefinitions {
if def.Port != nil {
ports = append(ports, *def.Port)
}
}
return ports
}
// If using IP-per-task using this port definition
if application.IPAddressPerTask != nil && len(*((*application.IPAddressPerTask).Discovery).Ports) > 0 {
ports := make([]int, 0)
for _, def := range *((*application.IPAddressPerTask).Discovery).Ports {
ports = append(ports, def.Number)
}
return ports
}
return []int{}
}
func (provider *Marathon) getBackendServer(task marathon.Task, applications []marathon.Application) string {
application, err := getApplication(task, applications)
if err != nil {
log.Errorf("Unable to get marathon application from task %s", task.AppID)
return ""
}
if len(task.IPAddresses) == 0 {
return ""
} else if len(task.IPAddresses) == 1 {
return task.IPAddresses[0].IPAddress
} else {
ipAddressIdxStr, err := provider.getLabel(application, "traefik.ipAddressIdx")
if err != nil {
log.Errorf("Unable to get marathon IPAddress from task %s", task.AppID)
return ""
}
ipAddressIdx, err := strconv.Atoi(ipAddressIdxStr)
if err != nil {
log.Errorf("Invalid marathon IPAddress from task %s", task.AppID)
return ""
}
return task.IPAddresses[ipAddressIdx].IPAddress
}
}