Refactor traefik with package
Split a bit traefik into package. The idea behind this refactor is to start move inter-dependencies away and do some DRY or SRP. - Adds a `provider` package, with providers except `web.go` - Adds a `types` package with common struct. - Move `gen.go` to an `autogen` package Signed-off-by: Vincent Demeester <vincent@sbr.pm>
This commit is contained in:
parent
6e1a0554c0
commit
de0a57ec76
21 changed files with 195 additions and 167 deletions
16
provider/boltdb.go
Normal file
16
provider/boltdb.go
Normal file
|
@ -0,0 +1,16 @@
|
|||
package provider
|
||||
|
||||
import "github.com/emilevauge/traefik/types"
|
||||
|
||||
type BoltDbProvider struct {
|
||||
Watch bool
|
||||
Endpoint string
|
||||
Prefix string
|
||||
Filename string
|
||||
KvProvider *KvProvider
|
||||
}
|
||||
|
||||
func (provider *BoltDbProvider) Provide(configurationChan chan<- types.ConfigMessage) error {
|
||||
provider.KvProvider = NewBoltDbProvider(provider)
|
||||
return provider.KvProvider.provide(configurationChan)
|
||||
}
|
16
provider/consul.go
Normal file
16
provider/consul.go
Normal file
|
@ -0,0 +1,16 @@
|
|||
package provider
|
||||
|
||||
import "github.com/emilevauge/traefik/types"
|
||||
|
||||
type ConsulProvider struct {
|
||||
Watch bool
|
||||
Endpoint string
|
||||
Prefix string
|
||||
Filename string
|
||||
KvProvider *KvProvider
|
||||
}
|
||||
|
||||
func (provider *ConsulProvider) Provide(configurationChan chan<- types.ConfigMessage) error {
|
||||
provider.KvProvider = NewConsulProvider(provider)
|
||||
return provider.KvProvider.provide(configurationChan)
|
||||
}
|
247
provider/docker.go
Normal file
247
provider/docker.go
Normal file
|
@ -0,0 +1,247 @@
|
|||
package provider
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"text/template"
|
||||
"time"
|
||||
|
||||
"github.com/BurntSushi/toml"
|
||||
"github.com/BurntSushi/ty/fun"
|
||||
log "github.com/Sirupsen/logrus"
|
||||
"github.com/cenkalti/backoff"
|
||||
"github.com/emilevauge/traefik/autogen"
|
||||
"github.com/emilevauge/traefik/types"
|
||||
"github.com/fsouza/go-dockerclient"
|
||||
)
|
||||
|
||||
type DockerProvider struct {
|
||||
Watch bool
|
||||
Endpoint string
|
||||
Filename string
|
||||
Domain string
|
||||
}
|
||||
|
||||
func (provider *DockerProvider) Provide(configurationChan chan<- types.ConfigMessage) error {
|
||||
if dockerClient, err := docker.NewClient(provider.Endpoint); err != nil {
|
||||
log.Errorf("Failed to create a client for docker, error: %s", err)
|
||||
return err
|
||||
} else {
|
||||
err := dockerClient.Ping()
|
||||
if err != nil {
|
||||
log.Errorf("Docker connection error %+v", err)
|
||||
return err
|
||||
}
|
||||
log.Debug("Docker connection established")
|
||||
if provider.Watch {
|
||||
dockerEvents := make(chan *docker.APIEvents)
|
||||
dockerClient.AddEventListener(dockerEvents)
|
||||
log.Debug("Docker listening")
|
||||
go func() {
|
||||
operation := func() error {
|
||||
for {
|
||||
event := <-dockerEvents
|
||||
if event == nil {
|
||||
return errors.New("Docker event nil")
|
||||
// log.Fatalf("Docker connection error")
|
||||
}
|
||||
if event.Status == "start" || event.Status == "die" {
|
||||
log.Debugf("Docker event receveived %+v", event)
|
||||
configuration := provider.loadDockerConfig(dockerClient)
|
||||
if configuration != nil {
|
||||
configurationChan <- types.ConfigMessage{"docker", configuration}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
notify := func(err error, time time.Duration) {
|
||||
log.Errorf("Docker connection error %+v, retrying in %s", err, time)
|
||||
}
|
||||
err := backoff.RetryNotify(operation, backoff.NewExponentialBackOff(), notify)
|
||||
if err != nil {
|
||||
log.Fatalf("Cannot connect to docker server %+v", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
configuration := provider.loadDockerConfig(dockerClient)
|
||||
configurationChan <- types.ConfigMessage{"docker", configuration}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (provider *DockerProvider) loadDockerConfig(dockerClient *docker.Client) *types.Configuration {
|
||||
var DockerFuncMap = template.FuncMap{
|
||||
"getBackend": func(container docker.Container) string {
|
||||
if label, err := provider.getLabel(container, "traefik.backend"); err == nil {
|
||||
return label
|
||||
}
|
||||
return provider.getEscapedName(container.Name)
|
||||
},
|
||||
"getPort": func(container docker.Container) string {
|
||||
if label, err := provider.getLabel(container, "traefik.port"); err == nil {
|
||||
return label
|
||||
}
|
||||
for key := range container.NetworkSettings.Ports {
|
||||
return key.Port()
|
||||
}
|
||||
return ""
|
||||
},
|
||||
"getWeight": func(container docker.Container) string {
|
||||
if label, err := provider.getLabel(container, "traefik.weight"); err == nil {
|
||||
return label
|
||||
}
|
||||
return "0"
|
||||
},
|
||||
"getDomain": func(container docker.Container) string {
|
||||
if label, err := provider.getLabel(container, "traefik.domain"); err == nil {
|
||||
return label
|
||||
}
|
||||
return provider.Domain
|
||||
},
|
||||
"getProtocol": func(container docker.Container) string {
|
||||
if label, err := provider.getLabel(container, "traefik.protocol"); err == nil {
|
||||
return label
|
||||
}
|
||||
return "http"
|
||||
},
|
||||
"getPassHostHeader": func(container docker.Container) string {
|
||||
if passHostHeader, err := provider.getLabel(container, "traefik.frontend.passHostHeader"); err == nil {
|
||||
return passHostHeader
|
||||
}
|
||||
return "false"
|
||||
},
|
||||
"getFrontendValue": provider.GetFrontendValue,
|
||||
"getFrontendRule": provider.GetFrontendRule,
|
||||
"replace": func(s1 string, s2 string, s3 string) string {
|
||||
return strings.Replace(s3, s1, s2, -1)
|
||||
},
|
||||
}
|
||||
configuration := new(types.Configuration)
|
||||
containerList, _ := dockerClient.ListContainers(docker.ListContainersOptions{})
|
||||
containersInspected := []docker.Container{}
|
||||
frontends := map[string][]docker.Container{}
|
||||
|
||||
// get inspect containers
|
||||
for _, container := range containerList {
|
||||
containerInspected, _ := dockerClient.InspectContainer(container.ID)
|
||||
containersInspected = append(containersInspected, *containerInspected)
|
||||
}
|
||||
|
||||
// filter containers
|
||||
filteredContainers := fun.Filter(func(container docker.Container) bool {
|
||||
if len(container.NetworkSettings.Ports) == 0 {
|
||||
log.Debugf("Filtering container without port %s", container.Name)
|
||||
return false
|
||||
}
|
||||
_, err := strconv.Atoi(container.Config.Labels["traefik.port"])
|
||||
if len(container.NetworkSettings.Ports) > 1 && err != nil {
|
||||
log.Debugf("Filtering container with more than 1 port and no traefik.port label %s", container.Name)
|
||||
return false
|
||||
}
|
||||
if container.Config.Labels["traefik.enable"] == "false" {
|
||||
log.Debugf("Filtering disabled container %s", container.Name)
|
||||
return false
|
||||
}
|
||||
|
||||
if _, err := provider.getLabels(container, []string{"traefik.frontend.rule", "traefik.frontend.value"}); err != nil {
|
||||
log.Debugf("Filtering bad labeled container %s", container.Name)
|
||||
return false
|
||||
}
|
||||
|
||||
return true
|
||||
}, containersInspected).([]docker.Container)
|
||||
|
||||
for _, container := range filteredContainers {
|
||||
frontends[provider.getFrontendName(container)] = append(frontends[provider.getFrontendName(container)], container)
|
||||
}
|
||||
|
||||
templateObjects := struct {
|
||||
Containers []docker.Container
|
||||
Frontends map[string][]docker.Container
|
||||
Domain string
|
||||
}{
|
||||
filteredContainers,
|
||||
frontends,
|
||||
provider.Domain,
|
||||
}
|
||||
tmpl := template.New(provider.Filename).Funcs(DockerFuncMap)
|
||||
if len(provider.Filename) > 0 {
|
||||
_, err := tmpl.ParseFiles(provider.Filename)
|
||||
if err != nil {
|
||||
log.Error("Error reading file", err)
|
||||
return nil
|
||||
}
|
||||
} else {
|
||||
buf, err := autogen.Asset("templates/docker.tmpl")
|
||||
if err != nil {
|
||||
log.Error("Error reading file", err)
|
||||
}
|
||||
_, err = tmpl.Parse(string(buf))
|
||||
if err != nil {
|
||||
log.Error("Error reading file", err)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
var buffer bytes.Buffer
|
||||
err := tmpl.Execute(&buffer, templateObjects)
|
||||
if err != nil {
|
||||
log.Error("Error with docker template", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
if _, err := toml.Decode(buffer.String(), configuration); err != nil {
|
||||
log.Error("Error creating docker configuration ", err)
|
||||
return nil
|
||||
}
|
||||
return configuration
|
||||
}
|
||||
|
||||
func (provider *DockerProvider) getFrontendName(container docker.Container) string {
|
||||
// Replace '.' with '-' in quoted keys because of this issue https://github.com/BurntSushi/toml/issues/78
|
||||
frontendName := fmt.Sprintf("%s-%s", provider.GetFrontendRule(container), provider.GetFrontendValue(container))
|
||||
return strings.Replace(frontendName, ".", "-", -1)
|
||||
}
|
||||
|
||||
func (provider *DockerProvider) getEscapedName(name string) string {
|
||||
return strings.Replace(name, "/", "", -1)
|
||||
}
|
||||
|
||||
func (provider *DockerProvider) getLabel(container docker.Container, label string) (string, error) {
|
||||
for key, value := range container.Config.Labels {
|
||||
if key == label {
|
||||
return value, nil
|
||||
}
|
||||
}
|
||||
return "", errors.New("Label not found:" + label)
|
||||
}
|
||||
|
||||
func (provider *DockerProvider) getLabels(container docker.Container, labels []string) (map[string]string, error) {
|
||||
foundLabels := map[string]string{}
|
||||
for _, label := range labels {
|
||||
if foundLabel, err := provider.getLabel(container, label); err != nil {
|
||||
return nil, errors.New("Label not found: " + label)
|
||||
} else {
|
||||
foundLabels[label] = foundLabel
|
||||
}
|
||||
}
|
||||
return foundLabels, nil
|
||||
}
|
||||
|
||||
func (provider *DockerProvider) GetFrontendValue(container docker.Container) string {
|
||||
if label, err := provider.getLabel(container, "traefik.frontend.value"); err == nil {
|
||||
return label
|
||||
}
|
||||
return provider.getEscapedName(container.Name) + "." + provider.Domain
|
||||
}
|
||||
|
||||
func (provider *DockerProvider) GetFrontendRule(container docker.Container) string {
|
||||
if label, err := provider.getLabel(container, "traefik.frontend.rule"); err == nil {
|
||||
return label
|
||||
}
|
||||
return "Host"
|
||||
}
|
16
provider/etcd.go
Normal file
16
provider/etcd.go
Normal file
|
@ -0,0 +1,16 @@
|
|||
package provider
|
||||
|
||||
import "github.com/emilevauge/traefik/types"
|
||||
|
||||
type EtcdProvider struct {
|
||||
Watch bool
|
||||
Endpoint string
|
||||
Prefix string
|
||||
Filename string
|
||||
KvProvider *KvProvider
|
||||
}
|
||||
|
||||
func (provider *EtcdProvider) Provide(configurationChan chan<- types.ConfigMessage) error {
|
||||
provider.KvProvider = NewEtcdProvider(provider)
|
||||
return provider.KvProvider.provide(configurationChan)
|
||||
}
|
71
provider/file.go
Normal file
71
provider/file.go
Normal file
|
@ -0,0 +1,71 @@
|
|||
package provider
|
||||
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"github.com/BurntSushi/toml"
|
||||
log "github.com/Sirupsen/logrus"
|
||||
"github.com/emilevauge/traefik/types"
|
||||
"gopkg.in/fsnotify.v1"
|
||||
)
|
||||
|
||||
type FileProvider struct {
|
||||
Watch bool
|
||||
Filename string
|
||||
}
|
||||
|
||||
func (provider *FileProvider) Provide(configurationChan chan<- types.ConfigMessage) error {
|
||||
watcher, err := fsnotify.NewWatcher()
|
||||
if err != nil {
|
||||
log.Error("Error creating file watcher", err)
|
||||
return err
|
||||
}
|
||||
|
||||
file, err := os.Open(provider.Filename)
|
||||
if err != nil {
|
||||
log.Error("Error opening file", err)
|
||||
return err
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
if provider.Watch {
|
||||
// Process events
|
||||
go func() {
|
||||
defer watcher.Close()
|
||||
for {
|
||||
select {
|
||||
case event := <-watcher.Events:
|
||||
if strings.Contains(event.Name, file.Name()) {
|
||||
log.Debug("File event:", event)
|
||||
configuration := provider.LoadFileConfig(file.Name())
|
||||
if configuration != nil {
|
||||
configurationChan <- types.ConfigMessage{"file", configuration}
|
||||
}
|
||||
}
|
||||
case error := <-watcher.Errors:
|
||||
log.Error("Watcher event error", error)
|
||||
}
|
||||
}
|
||||
}()
|
||||
err = watcher.Add(filepath.Dir(file.Name()))
|
||||
if err != nil {
|
||||
log.Error("Error adding file watcher", err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
configuration := provider.LoadFileConfig(file.Name())
|
||||
configurationChan <- types.ConfigMessage{"file", configuration}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (provider *FileProvider) LoadFileConfig(filename string) *types.Configuration {
|
||||
configuration := new(types.Configuration)
|
||||
if _, err := toml.DecodeFile(filename, configuration); err != nil {
|
||||
log.Error("Error reading file:", err)
|
||||
return nil
|
||||
}
|
||||
return configuration
|
||||
}
|
1
provider/file_test.go
Normal file
1
provider/file_test.go
Normal file
|
@ -0,0 +1 @@
|
|||
package provider
|
198
provider/kv.go
Normal file
198
provider/kv.go
Normal file
|
@ -0,0 +1,198 @@
|
|||
/*
|
||||
Copyright
|
||||
*/
|
||||
package provider
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"strings"
|
||||
"text/template"
|
||||
"time"
|
||||
|
||||
"github.com/BurntSushi/toml"
|
||||
"github.com/BurntSushi/ty/fun"
|
||||
log "github.com/Sirupsen/logrus"
|
||||
"github.com/docker/libkv"
|
||||
"github.com/docker/libkv/store"
|
||||
"github.com/docker/libkv/store/boltdb"
|
||||
"github.com/docker/libkv/store/consul"
|
||||
"github.com/docker/libkv/store/etcd"
|
||||
"github.com/docker/libkv/store/zookeeper"
|
||||
"github.com/emilevauge/traefik/autogen"
|
||||
"github.com/emilevauge/traefik/types"
|
||||
)
|
||||
|
||||
type KvProvider struct {
|
||||
Watch bool
|
||||
Endpoint string
|
||||
Prefix string
|
||||
Filename string
|
||||
StoreType store.Backend
|
||||
kvclient store.Store
|
||||
}
|
||||
|
||||
func NewConsulProvider(provider *ConsulProvider) *KvProvider {
|
||||
kvProvider := new(KvProvider)
|
||||
kvProvider.Watch = provider.Watch
|
||||
kvProvider.Endpoint = provider.Endpoint
|
||||
kvProvider.Prefix = provider.Prefix
|
||||
kvProvider.Filename = provider.Filename
|
||||
kvProvider.StoreType = store.CONSUL
|
||||
return kvProvider
|
||||
}
|
||||
|
||||
func NewEtcdProvider(provider *EtcdProvider) *KvProvider {
|
||||
kvProvider := new(KvProvider)
|
||||
kvProvider.Watch = provider.Watch
|
||||
kvProvider.Endpoint = provider.Endpoint
|
||||
kvProvider.Prefix = provider.Prefix
|
||||
kvProvider.Filename = provider.Filename
|
||||
kvProvider.StoreType = store.ETCD
|
||||
return kvProvider
|
||||
}
|
||||
|
||||
func NewZkProvider(provider *ZookepperProvider) *KvProvider {
|
||||
kvProvider := new(KvProvider)
|
||||
kvProvider.Watch = provider.Watch
|
||||
kvProvider.Endpoint = provider.Endpoint
|
||||
kvProvider.Prefix = provider.Prefix
|
||||
kvProvider.Filename = provider.Filename
|
||||
kvProvider.StoreType = store.ZK
|
||||
return kvProvider
|
||||
}
|
||||
|
||||
func NewBoltDbProvider(provider *BoltDbProvider) *KvProvider {
|
||||
kvProvider := new(KvProvider)
|
||||
kvProvider.Watch = provider.Watch
|
||||
kvProvider.Endpoint = provider.Endpoint
|
||||
kvProvider.Prefix = provider.Prefix
|
||||
kvProvider.Filename = provider.Filename
|
||||
kvProvider.StoreType = store.BOLTDB
|
||||
return kvProvider
|
||||
}
|
||||
|
||||
func (provider *KvProvider) provide(configurationChan chan<- types.ConfigMessage) error {
|
||||
switch provider.StoreType {
|
||||
case store.CONSUL:
|
||||
consul.Register()
|
||||
case store.ETCD:
|
||||
etcd.Register()
|
||||
case store.ZK:
|
||||
zookeeper.Register()
|
||||
case store.BOLTDB:
|
||||
boltdb.Register()
|
||||
default:
|
||||
return errors.New("Invalid kv store: " + string(provider.StoreType))
|
||||
}
|
||||
kv, err := libkv.NewStore(
|
||||
provider.StoreType,
|
||||
[]string{provider.Endpoint},
|
||||
&store.Config{
|
||||
ConnectionTimeout: 30 * time.Second,
|
||||
Bucket: "traefik",
|
||||
},
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := kv.List(""); err != nil {
|
||||
return err
|
||||
}
|
||||
provider.kvclient = kv
|
||||
if provider.Watch {
|
||||
stopCh := make(chan struct{})
|
||||
chanKeys, err := kv.WatchTree(provider.Prefix, stopCh)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
go func() {
|
||||
for {
|
||||
<-chanKeys
|
||||
configuration := provider.loadConfig()
|
||||
if configuration != nil {
|
||||
configurationChan <- types.ConfigMessage{string(provider.StoreType), configuration}
|
||||
}
|
||||
defer close(stopCh)
|
||||
}
|
||||
}()
|
||||
}
|
||||
configuration := provider.loadConfig()
|
||||
configurationChan <- types.ConfigMessage{string(provider.StoreType), configuration}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (provider *KvProvider) loadConfig() *types.Configuration {
|
||||
configuration := new(types.Configuration)
|
||||
templateObjects := struct {
|
||||
Prefix string
|
||||
}{
|
||||
provider.Prefix,
|
||||
}
|
||||
var KvFuncMap = template.FuncMap{
|
||||
"List": func(keys ...string) []string {
|
||||
joinedKeys := strings.Join(keys, "")
|
||||
keysPairs, err := provider.kvclient.List(joinedKeys)
|
||||
if err != nil {
|
||||
log.Error("Error getting keys: ", joinedKeys, err)
|
||||
return nil
|
||||
}
|
||||
directoryKeys := make(map[string]string)
|
||||
for _, key := range keysPairs {
|
||||
directory := strings.Split(strings.TrimPrefix(key.Key, strings.TrimPrefix(joinedKeys, "/")), "/")[0]
|
||||
directoryKeys[directory] = joinedKeys + directory
|
||||
}
|
||||
return fun.Values(directoryKeys).([]string)
|
||||
},
|
||||
"Get": func(keys ...string) string {
|
||||
joinedKeys := strings.Join(keys, "")
|
||||
keyPair, err := provider.kvclient.Get(joinedKeys)
|
||||
if err != nil {
|
||||
log.Debug("Error getting key: ", joinedKeys, err)
|
||||
return ""
|
||||
} else if keyPair == nil {
|
||||
return ""
|
||||
}
|
||||
return string(keyPair.Value)
|
||||
},
|
||||
"Last": func(key string) string {
|
||||
splittedKey := strings.Split(key, "/")
|
||||
return splittedKey[len(splittedKey)-1]
|
||||
},
|
||||
}
|
||||
|
||||
tmpl := template.New(provider.Filename).Funcs(KvFuncMap)
|
||||
if len(provider.Filename) > 0 {
|
||||
_, err := tmpl.ParseFiles(provider.Filename)
|
||||
if err != nil {
|
||||
log.Error("Error reading file", err)
|
||||
return nil
|
||||
}
|
||||
} else {
|
||||
buf, err := autogen.Asset("templates/kv.tmpl")
|
||||
if err != nil {
|
||||
log.Error("Error reading file", err)
|
||||
}
|
||||
_, err = tmpl.Parse(string(buf))
|
||||
if err != nil {
|
||||
log.Error("Error reading file", err)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
var buffer bytes.Buffer
|
||||
|
||||
err := tmpl.Execute(&buffer, templateObjects)
|
||||
if err != nil {
|
||||
log.Error("Error with kv template:", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
if _, err := toml.Decode(buffer.String(), configuration); err != nil {
|
||||
log.Error("Error creating kv configuration:", err)
|
||||
log.Error(buffer.String())
|
||||
return nil
|
||||
}
|
||||
|
||||
return configuration
|
||||
}
|
253
provider/marathon.go
Normal file
253
provider/marathon.go
Normal file
|
@ -0,0 +1,253 @@
|
|||
package provider
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"strconv"
|
||||
"strings"
|
||||
"text/template"
|
||||
|
||||
"github.com/BurntSushi/toml"
|
||||
"github.com/BurntSushi/ty/fun"
|
||||
log "github.com/Sirupsen/logrus"
|
||||
"github.com/emilevauge/traefik/autogen"
|
||||
"github.com/emilevauge/traefik/types"
|
||||
"github.com/gambol99/go-marathon"
|
||||
)
|
||||
|
||||
type MarathonProvider struct {
|
||||
Watch bool
|
||||
Endpoint string
|
||||
marathonClient marathon.Marathon
|
||||
Domain string
|
||||
Filename string
|
||||
NetworkInterface string
|
||||
}
|
||||
|
||||
func (provider *MarathonProvider) Provide(configurationChan chan<- types.ConfigMessage) error {
|
||||
config := marathon.NewDefaultConfig()
|
||||
config.URL = provider.Endpoint
|
||||
config.EventsInterface = provider.NetworkInterface
|
||||
client, err := marathon.NewClient(config)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to create a client for marathon, error: %s", err)
|
||||
return err
|
||||
}
|
||||
provider.marathonClient = client
|
||||
update := make(marathon.EventsChannel, 5)
|
||||
if provider.Watch {
|
||||
if err := client.AddEventsListener(update, marathon.EVENTS_APPLICATIONS); err != nil {
|
||||
log.Errorf("Failed to register for subscriptions, %s", err)
|
||||
} else {
|
||||
go func() {
|
||||
for {
|
||||
event := <-update
|
||||
log.Debug("Marathon event receveived", event)
|
||||
configuration := provider.loadMarathonConfig()
|
||||
if configuration != nil {
|
||||
configurationChan <- types.ConfigMessage{"marathon", configuration}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
configuration := provider.loadMarathonConfig()
|
||||
configurationChan <- types.ConfigMessage{"marathon", configuration}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (provider *MarathonProvider) loadMarathonConfig() *types.Configuration {
|
||||
var MarathonFuncMap = template.FuncMap{
|
||||
"getPort": func(task marathon.Task) string {
|
||||
for _, port := range task.Ports {
|
||||
return strconv.Itoa(port)
|
||||
}
|
||||
return ""
|
||||
},
|
||||
"getWeight": func(task marathon.Task, applications []marathon.Application) string {
|
||||
application, errApp := getApplication(task, applications)
|
||||
if errApp != nil {
|
||||
log.Errorf("Unable to get marathon application from task %s", task.AppID)
|
||||
return "0"
|
||||
}
|
||||
if label, err := provider.getLabel(application, "traefik.weight"); err == nil {
|
||||
return label
|
||||
}
|
||||
return "0"
|
||||
},
|
||||
"getDomain": func(application marathon.Application) string {
|
||||
if label, err := provider.getLabel(application, "traefik.domain"); err == nil {
|
||||
return label
|
||||
}
|
||||
return provider.Domain
|
||||
},
|
||||
"replace": func(s1 string, s2 string, s3 string) string {
|
||||
return strings.Replace(s3, s1, s2, -1)
|
||||
},
|
||||
"getProtocol": func(task marathon.Task, applications []marathon.Application) string {
|
||||
application, errApp := getApplication(task, applications)
|
||||
if errApp != nil {
|
||||
log.Errorf("Unable to get marathon application from task %s", task.AppID)
|
||||
return "http"
|
||||
}
|
||||
if label, err := provider.getLabel(application, "traefik.protocol"); err == nil {
|
||||
return label
|
||||
}
|
||||
return "http"
|
||||
},
|
||||
"getPassHostHeader": func(application marathon.Application) string {
|
||||
if passHostHeader, err := provider.getLabel(application, "traefik.frontend.passHostHeader"); err == nil {
|
||||
return passHostHeader
|
||||
}
|
||||
return "false"
|
||||
},
|
||||
"getFrontendValue": provider.GetFrontendValue,
|
||||
"getFrontendRule": provider.GetFrontendRule,
|
||||
}
|
||||
configuration := new(types.Configuration)
|
||||
|
||||
applications, err := provider.marathonClient.Applications(nil)
|
||||
if err != nil {
|
||||
log.Errorf("Failed to create a client for marathon, error: %s", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
tasks, err := provider.marathonClient.AllTasks()
|
||||
if err != nil {
|
||||
log.Errorf("Failed to create a client for marathon, error: %s", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
//filter tasks
|
||||
filteredTasks := fun.Filter(func(task marathon.Task) bool {
|
||||
if len(task.Ports) == 0 {
|
||||
log.Debug("Filtering marathon task without port %s", task.AppID)
|
||||
return false
|
||||
}
|
||||
application, errApp := getApplication(task, applications.Apps)
|
||||
if errApp != nil {
|
||||
log.Errorf("Unable to get marathon application from task %s", task.AppID)
|
||||
return false
|
||||
}
|
||||
_, err := strconv.Atoi(application.Labels["traefik.port"])
|
||||
if len(application.Ports) > 1 && err != nil {
|
||||
log.Debugf("Filtering marathon task %s with more than 1 port and no traefik.port label", task.AppID)
|
||||
return false
|
||||
}
|
||||
if application.Labels["traefik.enable"] == "false" {
|
||||
log.Debugf("Filtering disabled marathon task %s", task.AppID)
|
||||
return false
|
||||
}
|
||||
//filter healthchecks
|
||||
if application.HasHealthChecks() {
|
||||
if task.HasHealthCheckResults() {
|
||||
for _, healthcheck := range task.HealthCheckResult {
|
||||
// found one bad healthcheck, return false
|
||||
if !healthcheck.Alive {
|
||||
log.Debugf("Filtering marathon task %s with bad healthcheck", task.AppID)
|
||||
return false
|
||||
}
|
||||
}
|
||||
} else {
|
||||
log.Debugf("Filtering marathon task %s with bad healthcheck", task.AppID)
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}, tasks.Tasks).([]marathon.Task)
|
||||
|
||||
//filter apps
|
||||
filteredApps := fun.Filter(func(app marathon.Application) bool {
|
||||
//get ports from app tasks
|
||||
if !fun.Exists(func(task marathon.Task) bool {
|
||||
if task.AppID == app.ID {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}, filteredTasks) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}, applications.Apps).([]marathon.Application)
|
||||
|
||||
templateObjects := struct {
|
||||
Applications []marathon.Application
|
||||
Tasks []marathon.Task
|
||||
Domain string
|
||||
}{
|
||||
filteredApps,
|
||||
filteredTasks,
|
||||
provider.Domain,
|
||||
}
|
||||
|
||||
tmpl := template.New(provider.Filename).Funcs(MarathonFuncMap)
|
||||
if len(provider.Filename) > 0 {
|
||||
_, err := tmpl.ParseFiles(provider.Filename)
|
||||
if err != nil {
|
||||
log.Error("Error reading file", err)
|
||||
return nil
|
||||
}
|
||||
} else {
|
||||
buf, err := autogen.Asset("templates/marathon.tmpl")
|
||||
if err != nil {
|
||||
log.Error("Error reading file", err)
|
||||
}
|
||||
_, err = tmpl.Parse(string(buf))
|
||||
if err != nil {
|
||||
log.Error("Error reading file", err)
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
var buffer bytes.Buffer
|
||||
|
||||
err = tmpl.Execute(&buffer, templateObjects)
|
||||
if err != nil {
|
||||
log.Error("Error with marathon template:", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
if _, err := toml.Decode(buffer.String(), configuration); err != nil {
|
||||
log.Error("Error creating marathon configuration:", err)
|
||||
return nil
|
||||
}
|
||||
|
||||
return configuration
|
||||
}
|
||||
|
||||
func getApplication(task marathon.Task, apps []marathon.Application) (marathon.Application, error) {
|
||||
for _, application := range apps {
|
||||
if application.ID == task.AppID {
|
||||
return application, nil
|
||||
}
|
||||
}
|
||||
return marathon.Application{}, errors.New("Application not found: " + task.AppID)
|
||||
}
|
||||
|
||||
func (provider *MarathonProvider) getLabel(application marathon.Application, label string) (string, error) {
|
||||
for key, value := range application.Labels {
|
||||
if key == label {
|
||||
return value, nil
|
||||
}
|
||||
}
|
||||
return "", errors.New("Label not found:" + label)
|
||||
}
|
||||
|
||||
func (provider *MarathonProvider) getEscapedName(name string) string {
|
||||
return strings.Replace(name, "/", "", -1)
|
||||
}
|
||||
|
||||
func (provider *MarathonProvider) GetFrontendValue(application marathon.Application) string {
|
||||
if label, err := provider.getLabel(application, "traefik.frontend.value"); err == nil {
|
||||
return label
|
||||
}
|
||||
return provider.getEscapedName(application.ID) + "." + provider.Domain
|
||||
}
|
||||
|
||||
func (provider *MarathonProvider) GetFrontendRule(application marathon.Application) string {
|
||||
if label, err := provider.getLabel(application, "traefik.frontend.rule"); err == nil {
|
||||
return label
|
||||
}
|
||||
return "Host"
|
||||
}
|
7
provider/provider.go
Normal file
7
provider/provider.go
Normal file
|
@ -0,0 +1,7 @@
|
|||
package provider
|
||||
|
||||
import "github.com/emilevauge/traefik/types"
|
||||
|
||||
type Provider interface {
|
||||
Provide(configurationChan chan<- types.ConfigMessage) error
|
||||
}
|
16
provider/zk.go
Normal file
16
provider/zk.go
Normal file
|
@ -0,0 +1,16 @@
|
|||
package provider
|
||||
|
||||
import "github.com/emilevauge/traefik/types"
|
||||
|
||||
type ZookepperProvider struct {
|
||||
Watch bool
|
||||
Endpoint string
|
||||
Prefix string
|
||||
Filename string
|
||||
KvProvider *KvProvider
|
||||
}
|
||||
|
||||
func (provider *ZookepperProvider) Provide(configurationChan chan<- types.ConfigMessage) error {
|
||||
provider.KvProvider = NewZkProvider(provider)
|
||||
return provider.KvProvider.provide(configurationChan)
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue