1
0
Fork 0

Support multi-port service routing for containers running on Marathon

This commit is contained in:
Alex Antonov 2017-08-21 10:46:03 +02:00 committed by Traefiker
parent 0367034f93
commit ec3e2c08b8
9 changed files with 498 additions and 134 deletions

View file

@ -7,6 +7,7 @@ import (
"net"
"net/http"
"net/url"
"regexp"
"strconv"
"strings"
"text/template"
@ -45,6 +46,10 @@ const (
var _ provider.Provider = (*Provider)(nil)
// Regexp used to extract the name of the service and the name of the property for this service
// All properties are under the format traefik.<servicename>.frontend.*= except the port/portIndex/weight/protocol/backend directly after traefik.<servicename>.
var servicesPropertiesRegexp = regexp.MustCompile(`^traefik\.(?P<service_name>.+?)\.(?P<property_name>port|portIndex|weight|protocol|backend|frontend\.(.*))$`)
// Provider holds configuration of the provider.
type Provider struct {
provider.BaseProvider
@ -175,6 +180,7 @@ func (p *Provider) loadMarathonConfig() *types.Configuration {
"getPriority": p.getPriority,
"getEntryPoints": p.getEntryPoints,
"getFrontendRule": p.getFrontendRule,
"getFrontendName": p.getFrontendName,
"hasCircuitBreakerLabels": p.hasCircuitBreakerLabels,
"hasLoadBalancerLabels": p.hasLoadBalancerLabels,
"hasMaxConnLabels": p.hasMaxConnLabels,
@ -186,6 +192,9 @@ func (p *Provider) loadMarathonConfig() *types.Configuration {
"hasHealthCheckLabels": p.hasHealthCheckLabels,
"getHealthCheckPath": p.getHealthCheckPath,
"getHealthCheckInterval": p.getHealthCheckInterval,
"hasServices": p.hasServices,
"getServiceNames": p.getServiceNames,
"getServiceNameSuffix": p.getServiceNameSuffix,
"getBasicAuth": p.getBasicAuth,
}
@ -202,7 +211,11 @@ func (p *Provider) loadMarathonConfig() *types.Configuration {
filteredApps := fun.Filter(p.applicationFilter, applications.Apps).([]marathon.Application)
for i, app := range filteredApps {
filteredApps[i].Tasks = fun.Filter(func(task *marathon.Task) bool {
return p.taskFilter(*task, app)
filtered := p.taskFilter(*task, app)
if filtered {
p.logIllegalServices(*task, app)
}
return filtered
}, app.Tasks).([]*marathon.Task)
}
@ -229,10 +242,10 @@ func (p *Provider) applicationFilter(app marathon.Application) bool {
}
// Filter by constraints.
label, _ := p.getLabel(app, types.LabelTags)
label, _ := p.getAppLabel(app, types.LabelTags)
constraintTags := strings.Split(label, ",")
if p.MarathonLBCompatibility {
if label, ok := p.getLabel(app, "HAPROXY_GROUP"); ok {
if label, ok := p.getAppLabel(app, "HAPROXY_GROUP"); ok {
constraintTags = append(constraintTags, label)
}
}
@ -251,19 +264,6 @@ func (p *Provider) taskFilter(task marathon.Task, application marathon.Applicati
return false
}
if _, err := processPorts(application, task); err != nil {
log.Errorf("Filtering Marathon task %s from application %s without port: %s", task.ID, application.ID, err)
return false
}
// Filter illegal port label specification.
_, hasPortIndexLabel := p.getLabel(application, types.LabelPortIndex)
_, hasPortLabel := p.getLabel(application, types.LabelPort)
if hasPortIndexLabel && hasPortLabel {
log.Debugf("Filtering Marathon task %s from application %s specifying both traefik.portIndex and traefik.port labels", task.ID, application.ID)
return false
}
// Filter task with existing, bad health check results.
if application.HasHealthChecks() {
if task.HasHealthCheckResults() {
@ -288,7 +288,107 @@ func isApplicationEnabled(application marathon.Application, exposedByDefault boo
return exposedByDefault && (*application.Labels)[types.LabelEnable] != "false" || (*application.Labels)[types.LabelEnable] == "true"
}
func (p *Provider) getLabel(application marathon.Application, label string) (string, bool) {
// logIllegalServices logs illegal service configurations.
// While we cannot filter on the service level, they will eventually get
// rejected once the server configuration is rendered.
func (p *Provider) logIllegalServices(task marathon.Task, application marathon.Application) {
for _, serviceName := range p.getServiceNames(application) {
// Check for illegal/missing ports.
if _, err := p.processPorts(application, task, serviceName); err != nil {
log.Warnf("%s has an illegal configuration: no proper port available", identifier(application, task, serviceName))
continue
}
// Check for illegal port label combinations.
_, hasPortLabel := p.getLabel(application, types.LabelPort, serviceName)
_, hasPortIndexLabel := p.getLabel(application, types.LabelPortIndex, serviceName)
if hasPortLabel && hasPortIndexLabel {
log.Warnf("%s has both port and port index specified; port will take precedence", identifier(application, task, serviceName))
}
}
}
//servicePropertyValues is a map of services properties
//an example value is: weight=42
type servicePropertyValues map[string]string
//serviceProperties is a map of service properties per service, which we can get with label[serviceName][propertyName]. It yields a property value.
type serviceProperties map[string]servicePropertyValues
//hasServices checks if there are service-defining labels for the given application
func (p *Provider) hasServices(application marathon.Application) bool {
return len(extractServiceProperties(application.Labels)) > 0
}
//extractServiceProperties extracts the service labels for the given application
func extractServiceProperties(labels *map[string]string) serviceProperties {
v := make(serviceProperties)
if labels != nil {
for label, value := range *labels {
matches := servicesPropertiesRegexp.FindStringSubmatch(label)
if matches == nil {
continue
}
// According to the regex, match index 1 is "service_name" and match index 2 is the "property_name"
serviceName := matches[1]
propertyName := matches[2]
if _, ok := v[serviceName]; !ok {
v[serviceName] = make(servicePropertyValues)
}
v[serviceName][propertyName] = value
}
}
return v
}
//getServiceProperty returns the property for a service label searching in all labels of the given application
func getServiceProperty(application marathon.Application, serviceName string, property string) (string, bool) {
value, ok := extractServiceProperties(application.Labels)[serviceName][property]
return value, ok
}
//getServiceNames returns a list of service names for a given application
//An empty name "" will be added if no service specific properties exist, as an indication that there are no sub-services, but only main application
func (p *Provider) getServiceNames(application marathon.Application) []string {
labelServiceProperties := extractServiceProperties(application.Labels)
var names []string
for k := range labelServiceProperties {
names = append(names, k)
}
if len(names) == 0 {
names = append(names, "")
}
return names
}
func (p *Provider) getServiceNameSuffix(serviceName string) string {
if len(serviceName) > 0 {
serviceName = strings.Replace(serviceName, "/", "-", -1)
serviceName = strings.Replace(serviceName, ".", "-", -1)
return "-service-" + serviceName
}
return ""
}
//getAppLabel is a convenience function to get application label, when no serviceName is available
//it is identical to calling getLabel(application, label, "")
func (p *Provider) getAppLabel(application marathon.Application, label string) (string, bool) {
return p.getLabel(application, label, "")
}
//getLabel returns a string value of a corresponding `label` argument
// If serviceName is non-empty, we look for a service label. If none exists or serviceName is empty, we look for an application label.
func (p *Provider) getLabel(application marathon.Application, label string, serviceName string) (string, bool) {
if len(serviceName) > 0 {
property := strings.TrimPrefix(label, types.LabelPrefix)
if value, ok := getServiceProperty(application, serviceName, property); ok {
return value, true
}
}
for key, value := range *application.Labels {
if key == label {
return value, true
@ -297,84 +397,93 @@ func (p *Provider) getLabel(application marathon.Application, label string) (str
return "", false
}
func (p *Provider) getPort(task marathon.Task, application marathon.Application) string {
port, err := processPorts(application, task)
func (p *Provider) getPort(task marathon.Task, application marathon.Application, serviceName string) string {
port, err := p.processPorts(application, task, serviceName)
if err != nil {
log.Errorf("Unable to process ports for Marathon application %s and task %s: %s", application.ID, task.ID, err)
log.Errorf("Unable to process ports for %s: %s", identifier(application, task, serviceName), err)
return ""
}
return strconv.Itoa(port)
}
func (p *Provider) getWeight(application marathon.Application) string {
if label, ok := p.getLabel(application, types.LabelWeight); ok {
func (p *Provider) getWeight(application marathon.Application, serviceName string) string {
if label, ok := p.getLabel(application, types.LabelWeight, serviceName); ok {
return label
}
return "0"
}
func (p *Provider) getDomain(application marathon.Application) string {
if label, ok := p.getLabel(application, types.LabelDomain); ok {
if label, ok := p.getAppLabel(application, types.LabelDomain); ok {
return label
}
return p.Domain
}
func (p *Provider) getProtocol(application marathon.Application) string {
if label, ok := p.getLabel(application, types.LabelProtocol); ok {
func (p *Provider) getProtocol(application marathon.Application, serviceName string) string {
if label, ok := p.getLabel(application, types.LabelProtocol, serviceName); ok {
return label
}
return "http"
}
func (p *Provider) getSticky(application marathon.Application) string {
if sticky, ok := p.getLabel(application, types.LabelBackendLoadbalancerSticky); ok {
if sticky, ok := p.getAppLabel(application, types.LabelBackendLoadbalancerSticky); ok {
return sticky
}
return "false"
}
func (p *Provider) getPassHostHeader(application marathon.Application) string {
if passHostHeader, ok := p.getLabel(application, types.LabelFrontendPassHostHeader); ok {
func (p *Provider) getPassHostHeader(application marathon.Application, serviceName string) string {
if passHostHeader, ok := p.getLabel(application, types.LabelFrontendPassHostHeader, serviceName); ok {
return passHostHeader
}
return "true"
}
func (p *Provider) getPriority(application marathon.Application) string {
if priority, ok := p.getLabel(application, types.LabelFrontendPriority); ok {
func (p *Provider) getPriority(application marathon.Application, serviceName string) string {
if priority, ok := p.getLabel(application, types.LabelFrontendPriority, serviceName); ok {
return priority
}
return "0"
}
func (p *Provider) getEntryPoints(application marathon.Application) []string {
if entryPoints, ok := p.getLabel(application, types.LabelFrontendEntryPoints); ok {
func (p *Provider) getEntryPoints(application marathon.Application, serviceName string) []string {
if entryPoints, ok := p.getLabel(application, types.LabelFrontendEntryPoints, serviceName); ok {
return strings.Split(entryPoints, ",")
}
return []string{}
}
// getFrontendRule returns the frontend rule for the specified application, using
// it's label. It returns a default one (Host) if the label is not present.
func (p *Provider) getFrontendRule(application marathon.Application) string {
if label, ok := p.getLabel(application, types.LabelFrontendRule); ok {
// its label. If service is provided, it will look for serviceName label before generic one.
// It returns a default one (Host) if the label is not present.
func (p *Provider) getFrontendRule(application marathon.Application, serviceName string) string {
if label, ok := p.getLabel(application, types.LabelFrontendRule, serviceName); ok {
return label
}
if p.MarathonLBCompatibility {
if label, ok := p.getLabel(application, "HAPROXY_0_VHOST"); ok {
if label, ok := p.getAppLabel(application, "HAPROXY_0_VHOST"); ok {
return "Host:" + label
}
}
if len(serviceName) > 0 {
return "Host:" + strings.ToLower(provider.Normalize(serviceName)) + "." + p.getSubDomain(application.ID) + "." + p.Domain
}
return "Host:" + p.getSubDomain(application.ID) + "." + p.Domain
}
func (p *Provider) getBackend(application marathon.Application) string {
if label, ok := p.getLabel(application, types.LabelBackend); ok {
func (p *Provider) getBackend(application marathon.Application, serviceName string) string {
if label, ok := p.getLabel(application, types.LabelBackend, serviceName); ok {
return label
}
return strings.Replace(application.ID, "/", "-", -1)
return strings.Replace(application.ID, "/", "-", -1) + p.getServiceNameSuffix(serviceName)
}
func (p *Provider) getFrontendName(application marathon.Application, serviceName string) string {
appName := strings.Replace(application.ID, "/", "-", -1)
return "frontend" + appName + p.getServiceNameSuffix(serviceName)
}
func (p *Provider) getSubDomain(name string) string {
@ -388,26 +497,26 @@ func (p *Provider) getSubDomain(name string) string {
}
func (p *Provider) hasCircuitBreakerLabels(application marathon.Application) bool {
_, ok := p.getLabel(application, types.LabelBackendCircuitbreakerExpression)
_, ok := p.getAppLabel(application, types.LabelBackendCircuitbreakerExpression)
return ok
}
func (p *Provider) hasLoadBalancerLabels(application marathon.Application) bool {
_, errMethod := p.getLabel(application, types.LabelBackendLoadbalancerMethod)
_, errSticky := p.getLabel(application, types.LabelBackendLoadbalancerSticky)
_, errMethod := p.getAppLabel(application, types.LabelBackendLoadbalancerMethod)
_, errSticky := p.getAppLabel(application, types.LabelBackendLoadbalancerSticky)
return errMethod || errSticky
}
func (p *Provider) hasMaxConnLabels(application marathon.Application) bool {
if _, ok := p.getLabel(application, types.LabelBackendMaxconnAmount); !ok {
if _, ok := p.getAppLabel(application, types.LabelBackendMaxconnAmount); !ok {
return false
}
_, ok := p.getLabel(application, types.LabelBackendMaxconnExtractorfunc)
_, ok := p.getAppLabel(application, types.LabelBackendMaxconnExtractorfunc)
return ok
}
func (p *Provider) getMaxConnAmount(application marathon.Application) int64 {
if label, ok := p.getLabel(application, types.LabelBackendMaxconnAmount); ok {
if label, ok := p.getAppLabel(application, types.LabelBackendMaxconnAmount); ok {
i, errConv := strconv.ParseInt(label, 10, 64)
if errConv != nil {
log.Errorf("Unable to parse traefik.backend.maxconn.amount %s", label)
@ -419,21 +528,21 @@ func (p *Provider) getMaxConnAmount(application marathon.Application) int64 {
}
func (p *Provider) getMaxConnExtractorFunc(application marathon.Application) string {
if label, ok := p.getLabel(application, types.LabelBackendMaxconnExtractorfunc); ok {
if label, ok := p.getAppLabel(application, types.LabelBackendMaxconnExtractorfunc); ok {
return label
}
return "request.host"
}
func (p *Provider) getLoadBalancerMethod(application marathon.Application) string {
if label, ok := p.getLabel(application, types.LabelBackendLoadbalancerMethod); ok {
if label, ok := p.getAppLabel(application, types.LabelBackendLoadbalancerMethod); ok {
return label
}
return "wrr"
}
func (p *Provider) getCircuitBreakerExpression(application marathon.Application) string {
if label, ok := p.getLabel(application, types.LabelBackendCircuitbreakerExpression); ok {
if label, ok := p.getAppLabel(application, types.LabelBackendCircuitbreakerExpression); ok {
return label
}
return "NetworkErrorRatio() > 1"
@ -444,33 +553,37 @@ func (p *Provider) hasHealthCheckLabels(application marathon.Application) bool {
}
func (p *Provider) getHealthCheckPath(application marathon.Application) string {
if label, ok := p.getLabel(application, types.LabelBackendHealthcheckPath); ok {
if label, ok := p.getAppLabel(application, types.LabelBackendHealthcheckPath); ok {
return label
}
return ""
}
func (p *Provider) getHealthCheckInterval(application marathon.Application) string {
if label, ok := p.getLabel(application, types.LabelBackendHealthcheckInterval); ok {
if label, ok := p.getAppLabel(application, types.LabelBackendHealthcheckInterval); ok {
return label
}
return ""
}
func (p *Provider) getBasicAuth(application marathon.Application) []string {
if basicAuth, ok := p.getLabel(application, types.LabelFrontendAuthBasic); ok {
func (p *Provider) getBasicAuth(application marathon.Application, serviceName string) []string {
if basicAuth, ok := p.getLabel(application, types.LabelFrontendAuthBasic, serviceName); ok {
return strings.Split(basicAuth, ",")
}
return []string{}
}
func processPorts(application marathon.Application, task marathon.Task) (int, error) {
if portLabel, ok := (*application.Labels)[types.LabelPort]; ok {
// processPorts returns the configured port.
// An explicitly specified port is preferred. If none is specified, it selects
// one of the available port. The first such found port is returned unless an
// optional index is provided.
func (p *Provider) processPorts(application marathon.Application, task marathon.Task, serviceName string) (int, error) {
if portLabel, ok := p.getLabel(application, types.LabelPort, serviceName); ok {
port, err := strconv.Atoi(portLabel)
switch {
case err != nil:
return 0, fmt.Errorf("failed to parse port label: %s", err)
return 0, fmt.Errorf("failed to parse port label %q: %s", portLabel, err)
case port <= 0:
return 0, fmt.Errorf("explicitly specified port %d must be larger than zero", port)
}
@ -483,8 +596,7 @@ func processPorts(application marathon.Application, task marathon.Task) (int, er
}
portIndex := 0
portIndexLabel, ok := (*application.Labels)[types.LabelPortIndex]
if ok {
if portIndexLabel, ok := p.getLabel(application, types.LabelPortIndex, serviceName); ok {
var err error
portIndex, err = parseIndex(portIndexLabel, len(ports))
if err != nil {
@ -533,7 +645,7 @@ func (p *Provider) getBackendServer(task marathon.Task, application marathon.App
case numTaskIPAddresses == 1:
return task.IPAddresses[0].IPAddress
default:
ipAddressIdxStr, ok := p.getLabel(application, "traefik.ipAddressIdx")
ipAddressIdxStr, ok := p.getAppLabel(application, "traefik.ipAddressIdx")
if !ok {
log.Errorf("Found %d task IP addresses but missing IP address index for Marathon application %s on task %s", numTaskIPAddresses, application.ID, task.ID)
return ""
@ -553,10 +665,18 @@ func parseIndex(index string, length int) (int, error) {
parsed, err := strconv.Atoi(index)
switch {
case err != nil:
return 0, fmt.Errorf("failed to parse index '%s': %s", index, err)
return 0, fmt.Errorf("failed to parse index %q: %s", index, err)
case parsed < 0, parsed > length-1:
return 0, fmt.Errorf("index %d must be within range (0, %d)", parsed, length-1)
}
return parsed, nil
}
func identifier(app marathon.Application, task marathon.Task, serviceName string) string {
id := fmt.Sprintf("Marathon task %s from application %s", task.ID, app.ID)
if serviceName != "" {
id += fmt.Sprintf(" (service: %s)", serviceName)
}
return id
}