1
0
Fork 0

refactor(mesos): rewrite configuration system.

This commit is contained in:
Fernandez Ludovic 2017-12-02 19:29:34 +01:00 committed by Traefiker
parent ca680710a2
commit be718aea11
5 changed files with 729 additions and 698 deletions

View file

@ -1,14 +1,10 @@
package mesos
import (
"errors"
"fmt"
"strconv"
"strings"
"text/template"
"time"
"github.com/BurntSushi/ty/fun"
"github.com/cenk/backoff"
"github.com/containous/traefik/job"
"github.com/containous/traefik/log"
@ -20,8 +16,6 @@ import (
_ "github.com/mesos/mesos-go/detector/zoo"
"github.com/mesosphere/mesos-dns/detect"
"github.com/mesosphere/mesos-dns/logging"
"github.com/mesosphere/mesos-dns/records"
"github.com/mesosphere/mesos-dns/records/state"
"github.com/mesosphere/mesos-dns/util"
)
@ -82,7 +76,7 @@ func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *s
for {
select {
case <-reload.C:
configuration := p.loadMesosConfig()
configuration := p.buildConfiguration()
if configuration != nil {
configurationChan <- types.ConfigMessage{
ProviderName: "mesos",
@ -98,7 +92,7 @@ func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *s
}
log.Debugf("new masters detected: %v", masters)
p.Masters = masters
configuration := p.loadMesosConfig()
configuration := p.buildConfiguration()
if configuration != nil {
configurationChan <- types.ConfigMessage{
ProviderName: "mesos",
@ -121,272 +115,6 @@ func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *s
return nil
}
func (p *Provider) loadMesosConfig() *types.Configuration {
var mesosFuncMap = template.FuncMap{
"getBackend": p.getBackend,
"getPort": p.getPort,
"getHost": p.getHost,
"getWeight": p.getWeight,
"getDomain": p.getDomain,
"getProtocol": p.getProtocol,
"getPassHostHeader": p.getPassHostHeader,
"getPriority": p.getPriority,
"getEntryPoints": p.getEntryPoints,
"getFrontendRule": p.getFrontendRule,
"getFrontendBackend": p.getFrontendBackend,
"getID": p.getID,
"getFrontEndName": p.getFrontEndName,
}
t := records.NewRecordGenerator(time.Duration(p.StateTimeoutSecond) * time.Second)
sj, err := t.FindMaster(p.Masters...)
if err != nil {
log.Errorf("Failed to create a client for Mesos, error: %s", err)
return nil
}
tasks := p.taskRecords(sj)
//filter tasks
filteredTasks := fun.Filter(func(task state.Task) bool {
return mesosTaskFilter(task, p.ExposedByDefault)
}, tasks).([]state.Task)
filteredApps := []state.Task{}
for _, value := range filteredTasks {
if !taskInSlice(value, filteredApps) {
filteredApps = append(filteredApps, value)
}
}
templateObjects := struct {
Applications []state.Task
Tasks []state.Task
Domain string
}{
filteredApps,
filteredTasks,
p.Domain,
}
configuration, err := p.GetConfiguration("templates/mesos.tmpl", mesosFuncMap, templateObjects)
if err != nil {
log.Error(err)
}
return configuration
}
func taskInSlice(a state.Task, list []state.Task) bool {
for _, b := range list {
if b.DiscoveryInfo.Name == a.DiscoveryInfo.Name {
return true
}
}
return false
}
// labels returns all given Status.[]Labels' values whose keys are equal
// to the given key
func labels(task state.Task, key string) string {
for _, l := range task.Labels {
if l.Key == key {
return l.Value
}
}
return ""
}
func mesosTaskFilter(task state.Task, exposedByDefaultFlag bool) bool {
if len(task.DiscoveryInfo.Ports.DiscoveryPorts) == 0 {
log.Debugf("Filtering Mesos task without port %s", task.Name)
return false
}
if !isMesosApplicationEnabled(task, exposedByDefaultFlag) {
log.Debugf("Filtering disabled Mesos task %s", task.DiscoveryInfo.Name)
return false
}
//filter indeterminable task port
portIndexLabel := labels(task, types.LabelPortIndex)
portValueLabel := labels(task, types.LabelPort)
if portIndexLabel != "" && portValueLabel != "" {
log.Debugf("Filtering Mesos task %s specifying both traefik.portIndex and traefik.port labels", task.Name)
return false
}
if portIndexLabel != "" {
index, err := strconv.Atoi(labels(task, types.LabelPortIndex))
if err != nil || index < 0 || index > len(task.DiscoveryInfo.Ports.DiscoveryPorts)-1 {
log.Debugf("Filtering Mesos task %s with unexpected value for traefik.portIndex label", task.Name)
return false
}
}
if portValueLabel != "" {
port, err := strconv.Atoi(labels(task, types.LabelPort))
if err != nil {
log.Debugf("Filtering Mesos task %s with unexpected value for traefik.port label", task.Name)
return false
}
var foundPort bool
for _, exposedPort := range task.DiscoveryInfo.Ports.DiscoveryPorts {
if port == exposedPort.Number {
foundPort = true
break
}
}
if !foundPort {
log.Debugf("Filtering Mesos task %s without a matching port for traefik.port label", task.Name)
return false
}
}
//filter healthchecks
if task.Statuses != nil && len(task.Statuses) > 0 && task.Statuses[0].Healthy != nil && !*task.Statuses[0].Healthy {
log.Debugf("Filtering Mesos task %s with bad healthcheck", task.DiscoveryInfo.Name)
return false
}
return true
}
func getMesos(task state.Task, apps []state.Task) (state.Task, error) {
for _, application := range apps {
if application.DiscoveryInfo.Name == task.DiscoveryInfo.Name {
return application, nil
}
}
return state.Task{}, errors.New("Application not found: " + task.DiscoveryInfo.Name)
}
func isMesosApplicationEnabled(task state.Task, exposedByDefault bool) bool {
return exposedByDefault && labels(task, types.LabelEnable) != "false" || labels(task, types.LabelEnable) == "true"
}
func (p *Provider) getLabel(task state.Task, label string) (string, error) {
for _, tmpLabel := range task.Labels {
if tmpLabel.Key == label {
return tmpLabel.Value, nil
}
}
return "", errors.New("Label not found:" + label)
}
func (p *Provider) getPort(task state.Task, applications []state.Task) string {
application, err := getMesos(task, applications)
if err != nil {
log.Errorf("Unable to get Mesos application from task %s", task.DiscoveryInfo.Name)
return ""
}
if portIndexLabel, err := p.getLabel(application, types.LabelPortIndex); err == nil {
if index, err := strconv.Atoi(portIndexLabel); err == nil {
return strconv.Itoa(task.DiscoveryInfo.Ports.DiscoveryPorts[index].Number)
}
}
if portValueLabel, err := p.getLabel(application, types.LabelPort); err == nil {
return portValueLabel
}
for _, port := range task.DiscoveryInfo.Ports.DiscoveryPorts {
return strconv.Itoa(port.Number)
}
return ""
}
func (p *Provider) getWeight(task state.Task, applications []state.Task) string {
application, errApp := getMesos(task, applications)
if errApp != nil {
log.Errorf("Unable to get Mesos application from task %s", task.DiscoveryInfo.Name)
return "0"
}
if label, err := p.getLabel(application, types.LabelWeight); err == nil {
return label
}
return "0"
}
func (p *Provider) getDomain(task state.Task) string {
if label, err := p.getLabel(task, types.LabelDomain); err == nil {
return label
}
return p.Domain
}
func (p *Provider) getProtocol(task state.Task, applications []state.Task) string {
application, errApp := getMesos(task, applications)
if errApp != nil {
log.Errorf("Unable to get Mesos application from task %s", task.DiscoveryInfo.Name)
return "http"
}
if label, err := p.getLabel(application, types.LabelProtocol); err == nil {
return label
}
return "http"
}
func (p *Provider) getPassHostHeader(task state.Task) string {
if passHostHeader, err := p.getLabel(task, types.LabelFrontendPassHostHeader); err == nil {
return passHostHeader
}
return "false"
}
func (p *Provider) getPriority(task state.Task) string {
if priority, err := p.getLabel(task, types.LabelFrontendPriority); err == nil {
return priority
}
return "0"
}
func (p *Provider) getEntryPoints(task state.Task) []string {
if entryPoints, err := p.getLabel(task, types.LabelFrontendEntryPoints); err == nil {
return strings.Split(entryPoints, ",")
}
return []string{}
}
// getFrontendRule returns the frontend rule for the specified application, using
// it's label. It returns a default one (Host) if the label is not present.
func (p *Provider) getFrontendRule(task state.Task) string {
if label, err := p.getLabel(task, types.LabelFrontendRule); err == nil {
return label
}
return "Host:" + strings.ToLower(strings.Replace(p.getSubDomain(task.DiscoveryInfo.Name), "_", "-", -1)) + "." + p.Domain
}
func (p *Provider) getBackend(task state.Task, applications []state.Task) string {
application, errApp := getMesos(task, applications)
if errApp != nil {
log.Errorf("Unable to get Mesos application from task %s", task.DiscoveryInfo.Name)
return ""
}
return p.getFrontendBackend(application)
}
func (p *Provider) getFrontendBackend(task state.Task) string {
if label, err := p.getLabel(task, types.LabelBackend); err == nil {
return label
}
return "-" + cleanupSpecialChars(task.DiscoveryInfo.Name)
}
func (p *Provider) getHost(task state.Task) string {
return task.IP(strings.Split(p.IPSources, ",")...)
}
func (p *Provider) getID(task state.Task) string {
return cleanupSpecialChars(task.ID)
}
func (p *Provider) getFrontEndName(task state.Task) string {
return strings.Replace(cleanupSpecialChars(task.ID), "/", "-", -1)
}
func cleanupSpecialChars(s string) string {
return strings.Replace(strings.Replace(strings.Replace(s, ".", "-", -1), ":", "-", -1), "_", "-", -1)
}
func detectMasters(zk string, masters []string) <-chan []string {
changed := make(chan []string, 1)
if zk != "" {
@ -401,43 +129,3 @@ func detectMasters(zk string, masters []string) <-chan []string {
}
return changed
}
func (p *Provider) taskRecords(sj state.State) []state.Task {
var tasks []state.Task // == nil
for _, f := range sj.Frameworks {
for _, task := range f.Tasks {
for _, slave := range sj.Slaves {
if task.SlaveID == slave.ID {
task.SlaveIP = slave.Hostname
}
}
// only do running and discoverable tasks
if task.State == "TASK_RUNNING" {
tasks = append(tasks, task)
}
}
}
return tasks
}
// ErrorFunction A function definition that returns an error
// to be passed to the Ignore or Panic error handler
type ErrorFunction func() error
// Ignore Calls an ErrorFunction, and ignores the result.
// This allows us to be more explicit when there is no error
// handling to be done, for example in defers
func Ignore(f ErrorFunction) {
_ = f()
}
func (p *Provider) getSubDomain(name string) string {
if p.GroupsAsSubDomains {
splitedName := strings.Split(strings.TrimPrefix(name, "/"), "/")
provider.ReverseStringSlice(&splitedName)
reverseName := strings.Join(splitedName, ".")
return reverseName
}
return strings.Replace(strings.TrimPrefix(name, "/"), "/", "-", -1)
}