1
0
Fork 0

Refactor into dual Rancher API/Metadata providers

Introduces Rancher's metadata service as an optional provider source for
Traefik, enabled by setting `rancher.MetadataService`.

The provider uses a long polling technique to watch the metadata service and
obtain near instantaneous updates. Alternatively it can be configured to poll
the metadata service every `rancher.RefreshSeconds` by setting
`rancher.MetadataPoll`.

The refactor splits API and metadata service code into separate source
files respectively, and specific configuration is deferred to
sub-structs.

Incorporates bugfix #1414
This commit is contained in:
Martin Baillie 2017-05-08 11:20:38 +10:00 committed by Ludovic Fernandez
parent 984ea1040f
commit 9cb07d026f
14 changed files with 1006 additions and 272 deletions

234
provider/rancher/api.go Normal file
View file

@ -0,0 +1,234 @@
package rancher
import (
"context"
"os"
"time"
"github.com/cenk/backoff"
"github.com/containous/traefik/job"
"github.com/containous/traefik/log"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
rancher "github.com/rancher/go-rancher/client"
)
var (
withoutPagination *rancher.ListOpts
)
// APIConfiguration contains configuration properties specific to the Rancher
// API provider.
type APIConfiguration struct {
Endpoint string `description:"Rancher server API HTTP(S) endpoint"`
AccessKey string `description:"Rancher server API access key"`
SecretKey string `description:"Rancher server API secret key"`
}
func init() {
withoutPagination = &rancher.ListOpts{
Filters: map[string]interface{}{"limit": 0},
}
}
func (p *Provider) createClient() (*rancher.RancherClient, error) {
rancherURL := getenv("CATTLE_URL", p.API.Endpoint)
accessKey := getenv("CATTLE_ACCESS_KEY", p.API.AccessKey)
secretKey := getenv("CATTLE_SECRET_KEY", p.API.SecretKey)
return rancher.NewRancherClient(&rancher.ClientOpts{
Url: rancherURL,
AccessKey: accessKey,
SecretKey: secretKey,
})
}
func getenv(key, fallback string) string {
value := os.Getenv(key)
if len(value) == 0 {
return fallback
}
return value
}
func (p *Provider) apiProvide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error {
p.Constraints = append(p.Constraints, constraints...)
if p.API == nil {
p.API = &APIConfiguration{}
}
safe.Go(func() {
operation := func() error {
rancherClient, err := p.createClient()
if err != nil {
log.Errorf("Failed to create a client for rancher, error: %s", err)
return err
}
ctx := context.Background()
var environments = listRancherEnvironments(rancherClient)
var services = listRancherServices(rancherClient)
var container = listRancherContainer(rancherClient)
var rancherData = parseAPISourcedRancherData(environments, services, container)
configuration := p.loadRancherConfig(rancherData)
configurationChan <- types.ConfigMessage{
ProviderName: "rancher",
Configuration: configuration,
}
if p.Watch {
_, cancel := context.WithCancel(ctx)
ticker := time.NewTicker(time.Second * time.Duration(p.RefreshSeconds))
pool.Go(func(stop chan bool) {
for {
select {
case <-ticker.C:
log.Debugf("Refreshing new Data from Provider API")
var environments = listRancherEnvironments(rancherClient)
var services = listRancherServices(rancherClient)
var container = listRancherContainer(rancherClient)
rancherData := parseAPISourcedRancherData(environments, services, container)
configuration := p.loadRancherConfig(rancherData)
if configuration != nil {
configurationChan <- types.ConfigMessage{
ProviderName: "rancher",
Configuration: configuration,
}
}
case <-stop:
ticker.Stop()
cancel()
return
}
}
})
}
return nil
}
notify := func(err error, time time.Duration) {
log.Errorf("Provider connection error %+v, retrying in %s", err, time)
}
err := backoff.RetryNotify(operation, job.NewBackOff(backoff.NewExponentialBackOff()), notify)
if err != nil {
log.Errorf("Cannot connect to Provider Endpoint %+v", err)
}
})
return nil
}
func listRancherEnvironments(client *rancher.RancherClient) []*rancher.Project {
// Rancher Environment in frontend UI is actually project in API
// https://forums.rancher.com/t/api-key-for-all-environments/279/9
var environmentList = []*rancher.Project{}
environments, err := client.Project.List(nil)
if err != nil {
log.Errorf("Cannot get Rancher Environments %+v", err)
}
for k := range environments.Data {
environmentList = append(environmentList, &environments.Data[k])
}
return environmentList
}
func listRancherServices(client *rancher.RancherClient) []*rancher.Service {
var servicesList = []*rancher.Service{}
services, err := client.Service.List(withoutPagination)
if err != nil {
log.Errorf("Cannot get Provider Services %+v", err)
}
for k := range services.Data {
servicesList = append(servicesList, &services.Data[k])
}
return servicesList
}
func listRancherContainer(client *rancher.RancherClient) []*rancher.Container {
containerList := []*rancher.Container{}
container, err := client.Container.List(withoutPagination)
if err != nil {
log.Errorf("Cannot get Provider Services %+v", err)
}
valid := true
for valid {
for k := range container.Data {
containerList = append(containerList, &container.Data[k])
}
container, err = container.Next()
if err != nil {
break
}
if container == nil || len(container.Data) == 0 {
valid = false
}
}
return containerList
}
func parseAPISourcedRancherData(environments []*rancher.Project, services []*rancher.Service, containers []*rancher.Container) []rancherData {
var rancherDataList []rancherData
for _, environment := range environments {
for _, service := range services {
if service.EnvironmentId != environment.Id {
continue
}
rancherData := rancherData{
Name: environment.Name + "/" + service.Name,
Health: service.HealthState,
State: service.State,
Labels: make(map[string]string),
Containers: []string{},
}
if service.LaunchConfig == nil || service.LaunchConfig.Labels == nil {
log.Warnf("Rancher Service Labels are missing. Environment: %s, service: %s", environment.Name, service.Name)
} else {
for key, value := range service.LaunchConfig.Labels {
rancherData.Labels[key] = value.(string)
}
}
for _, container := range containers {
if container.Labels["io.rancher.stack_service.name"] == rancherData.Name &&
containerFilter(container.Name, container.HealthState, container.State) {
rancherData.Containers = append(rancherData.Containers, container.PrimaryIpAddress)
}
}
rancherDataList = append(rancherDataList, rancherData)
}
}
return rancherDataList
}

View file

@ -0,0 +1,136 @@
package rancher
import (
"context"
"fmt"
"time"
"github.com/Sirupsen/logrus"
"github.com/cenk/backoff"
"github.com/containous/traefik/job"
"github.com/containous/traefik/log"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
rancher "github.com/rancher/go-rancher-metadata/metadata"
)
// MetadataConfiguration contains configuration properties specific to
// the Rancher metadata service provider.
type MetadataConfiguration struct {
IntervalPoll bool `description:"Poll the Rancher metadata service every 'rancher.refreshseconds' (less accurate)"`
Prefix string `description:"Prefix used for accessing the Rancher metadata service"`
}
func (p *Provider) metadataProvide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error {
p.Constraints = append(p.Constraints, constraints...)
metadataServiceURL := fmt.Sprintf("http://rancher-metadata.rancher.internal/%s", p.Metadata.Prefix)
safe.Go(func() {
operation := func() error {
client, err := rancher.NewClientAndWait(metadataServiceURL)
if err != nil {
log.Errorln("Failed to create Rancher metadata service client: %s", err)
return err
}
updateConfiguration := func(version string) {
log.WithField("metadata_version", version).Debugln("Refreshing configuration from Rancher metadata service")
services, err := client.GetServices()
if err != nil {
log.Errorf("Failed to query Rancher metadata service: %s", err)
return
}
rancherData := parseMetadataSourcedRancherData(services)
configuration := p.loadRancherConfig(rancherData)
configurationChan <- types.ConfigMessage{
ProviderName: "rancher",
Configuration: configuration,
}
}
updateConfiguration("init")
if p.Watch {
pool.Go(func(stop chan bool) {
switch {
case p.Metadata.IntervalPoll:
p.intervalPoll(client, updateConfiguration, stop)
default:
p.longPoll(client, updateConfiguration, stop)
}
})
}
return nil
}
notify := func(err error, time time.Duration) {
log.WithFields(logrus.Fields{
"error": err,
"retry_in": time,
}).Errorln("Rancher metadata service connection error")
}
if err := backoff.RetryNotify(operation, job.NewBackOff(backoff.NewExponentialBackOff()), notify); err != nil {
log.WithField("endpoint", metadataServiceURL).Errorln("Cannot connect to Rancher metadata service")
}
})
return nil
}
func (p *Provider) intervalPoll(client rancher.Client, updateConfiguration func(string), stop chan bool) {
_, cancel := context.WithCancel(context.Background())
defer cancel()
ticker := time.NewTicker(time.Duration(p.RefreshSeconds))
defer ticker.Stop()
var version string
for {
select {
case <-ticker.C:
newVersion, err := client.GetVersion()
if err != nil {
log.WithField("error", err).Errorln("Failed to read Rancher metadata service version")
} else if version != newVersion {
version = newVersion
updateConfiguration(version)
}
case <-stop:
return
}
}
}
func (p *Provider) longPoll(client rancher.Client, updateConfiguration func(string), stop chan bool) {
_, cancel := context.WithCancel(context.Background())
defer cancel()
// Holds the connection until there is either a change in the metadata
// repository or `p.RefreshSeconds` has elapsed. Long polling should be
// favoured for the most accurate configuration updates.
go client.OnChange(p.RefreshSeconds, updateConfiguration)
<-stop
}
func parseMetadataSourcedRancherData(services []rancher.Service) (rancherDataList []rancherData) {
for _, service := range services {
var containerIPAddresses []string
for _, container := range service.Containers {
if containerFilter(container.Name, container.HealthState, container.State) {
containerIPAddresses = append(containerIPAddresses, container.PrimaryIp)
}
}
rancherDataList = append(rancherDataList, rancherData{
Name: service.Name,
State: service.State,
Labels: service.Labels,
Containers: containerIPAddresses,
})
}
return rancherDataList
}

View file

@ -1,28 +1,17 @@
package rancher
import (
"context"
"errors"
"fmt"
"math"
"os"
"strconv"
"strings"
"text/template"
"time"
"github.com/BurntSushi/ty/fun"
"github.com/cenk/backoff"
"github.com/containous/traefik/job"
"github.com/containous/traefik/log"
"github.com/containous/traefik/provider"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
rancher "github.com/rancher/go-rancher/client"
)
var (
withoutPagination *rancher.ListOpts
)
var _ provider.Provider = (*Provider)(nil)
@ -30,13 +19,13 @@ var _ provider.Provider = (*Provider)(nil)
// Provider holds configurations of the provider.
type Provider struct {
provider.BaseProvider `mapstructure:",squash"`
Endpoint string `description:"Rancher server HTTP(S) endpoint."`
AccessKey string `description:"Rancher server access key."`
SecretKey string `description:"Rancher server Secret Key."`
ExposedByDefault bool `description:"Expose Services by default"`
Domain string `description:"Default domain used"`
RefreshSeconds int `description:"Polling interval (in seconds)"`
EnableServiceHealthFilter bool `description:"Filter services with unhealthy states and health states."`
APIConfiguration `mapstructure:",squash"` // Provide backwards compatibility
API *APIConfiguration `description:"Enable the Rancher API provider"`
Metadata *MetadataConfiguration `description:"Enable the Rancher metadata service provider"`
Domain string `description:"Default domain used"`
RefreshSeconds int `description:"Polling interval (in seconds)"`
ExposedByDefault bool `description:"Expose services by default"`
EnableServiceHealthFilter bool `description:"Filter services with unhealthy states and inactive states"`
}
type rancherData struct {
@ -47,12 +36,6 @@ type rancherData struct {
State string
}
func init() {
withoutPagination = &rancher.ListOpts{
Filters: map[string]interface{}{"limit": 0},
}
}
func (r rancherData) String() string {
return fmt.Sprintf("{name:%s, labels:%v, containers: %v, health: %s, state: %s}", r.Name, r.Labels, r.Containers, r.Health, r.State)
}
@ -207,205 +190,16 @@ func getServiceLabel(service rancherData, label string) (string, error) {
return value, nil
}
}
return "", errors.New("Label not found:" + label)
return "", fmt.Errorf("label not found: %s", label)
}
func (p *Provider) createClient() (*rancher.RancherClient, error) {
rancherURL := getenv("CATTLE_URL", p.Endpoint)
accessKey := getenv("CATTLE_ACCESS_KEY", p.AccessKey)
secretKey := getenv("CATTLE_SECRET_KEY", p.SecretKey)
return rancher.NewRancherClient(&rancher.ClientOpts{
Url: rancherURL,
AccessKey: accessKey,
SecretKey: secretKey,
})
}
func getenv(key, fallback string) string {
value := os.Getenv(key)
if len(value) == 0 {
return fallback
}
return value
}
// Provide allows the rancher provider to provide configurations to traefik
// using the given configuration channel.
// Provide allows either the Rancher API or metadata service provider to
// seed configuration into Traefik using the given configuration channel.
func (p *Provider) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error {
p.Constraints = append(p.Constraints, constraints...)
safe.Go(func() {
operation := func() error {
rancherClient, err := p.createClient()
if err != nil {
log.Errorf("Failed to create a client for rancher, error: %s", err)
return err
}
ctx := context.Background()
var environments = listRancherEnvironments(rancherClient)
var services = listRancherServices(rancherClient)
var container = listRancherContainer(rancherClient)
var rancherData = parseRancherData(environments, services, container)
configuration := p.loadRancherConfig(rancherData)
configurationChan <- types.ConfigMessage{
ProviderName: "rancher",
Configuration: configuration,
}
if p.Watch {
_, cancel := context.WithCancel(ctx)
ticker := time.NewTicker(time.Second * time.Duration(p.RefreshSeconds))
pool.Go(func(stop chan bool) {
for {
select {
case <-ticker.C:
log.Debugf("Refreshing new Data from Provider API")
var environments = listRancherEnvironments(rancherClient)
var services = listRancherServices(rancherClient)
var container = listRancherContainer(rancherClient)
rancherData := parseRancherData(environments, services, container)
configuration := p.loadRancherConfig(rancherData)
if configuration != nil {
configurationChan <- types.ConfigMessage{
ProviderName: "rancher",
Configuration: configuration,
}
}
case <-stop:
ticker.Stop()
cancel()
return
}
}
})
}
return nil
}
notify := func(err error, time time.Duration) {
log.Errorf("Provider connection error %+v, retrying in %s", err, time)
}
err := backoff.RetryNotify(operation, job.NewBackOff(backoff.NewExponentialBackOff()), notify)
if err != nil {
log.Errorf("Cannot connect to Provider Endpoint %+v", err)
}
})
return nil
}
func listRancherEnvironments(client *rancher.RancherClient) []*rancher.Environment {
var environmentList = []*rancher.Environment{}
environments, err := client.Environment.List(withoutPagination)
if err != nil {
log.Errorf("Cannot get Provider Environments %+v", err)
if p.Metadata == nil {
return p.apiProvide(configurationChan, pool, constraints)
}
for k := range environments.Data {
environmentList = append(environmentList, &environments.Data[k])
}
return environmentList
}
func listRancherServices(client *rancher.RancherClient) []*rancher.Service {
var servicesList = []*rancher.Service{}
services, err := client.Service.List(withoutPagination)
if err != nil {
log.Errorf("Cannot get Provider Services %+v", err)
}
for k := range services.Data {
servicesList = append(servicesList, &services.Data[k])
}
return servicesList
}
func listRancherContainer(client *rancher.RancherClient) []*rancher.Container {
containerList := []*rancher.Container{}
container, err := client.Container.List(withoutPagination)
log.Debugf("first container len: %i", len(container.Data))
if err != nil {
log.Errorf("Cannot get Provider Services %+v", err)
}
valid := true
for valid {
for k := range container.Data {
containerList = append(containerList, &container.Data[k])
}
container, err = container.Next()
if err != nil {
break
}
if container == nil || len(container.Data) == 0 {
valid = false
}
}
return containerList
}
func parseRancherData(environments []*rancher.Environment, services []*rancher.Service, containers []*rancher.Container) []rancherData {
var rancherDataList []rancherData
for _, environment := range environments {
for _, service := range services {
if service.EnvironmentId != environment.Id {
continue
}
rancherData := rancherData{
Name: environment.Name + "/" + service.Name,
Health: service.HealthState,
State: service.State,
Labels: make(map[string]string),
Containers: []string{},
}
if service.LaunchConfig == nil || service.LaunchConfig.Labels == nil {
log.Warnf("Rancher Service Labels are missing. Environment: %s, service: %s", environment.Name, service.Name)
} else {
for key, value := range service.LaunchConfig.Labels {
rancherData.Labels[key] = value.(string)
}
}
for _, container := range containers {
if container.Labels["io.rancher.stack_service.name"] == rancherData.Name && containerFilter(container) {
rancherData.Containers = append(rancherData.Containers, container.PrimaryIpAddress)
}
}
rancherDataList = append(rancherDataList, rancherData)
}
}
return rancherDataList
return p.metadataProvide(configurationChan, pool, constraints)
}
func (p *Provider) loadRancherConfig(services []rancherData) *types.Configuration {
@ -464,14 +258,14 @@ func (p *Provider) loadRancherConfig(services []rancherData) *types.Configuratio
}
func containerFilter(container *rancher.Container) bool {
if container.HealthState != "" && container.HealthState != "healthy" && container.HealthState != "updating-healthy" {
log.Debugf("Filtering container %s with healthState of %s", container.Name, container.HealthState)
func containerFilter(name, healthState, state string) bool {
if healthState != "" && healthState != "healthy" && healthState != "updating-healthy" {
log.Debugf("Filtering container %s with healthState of %s", name, healthState)
return false
}
if container.State != "" && container.State != "running" && container.State != "updating-running" {
log.Debugf("Filtering container %s with state of %s", container.Name, container.State)
if state != "" && state != "running" && state != "updating-running" {
log.Debugf("Filtering container %s with state of %s", name, state)
return false
}

View file

@ -6,7 +6,6 @@ import (
"testing"
"github.com/containous/traefik/types"
rancher "github.com/rancher/go-rancher/client"
)
func TestRancherServiceFilter(t *testing.T) {
@ -114,49 +113,41 @@ func TestRancherServiceFilter(t *testing.T) {
func TestRancherContainerFilter(t *testing.T) {
containers := []struct {
container *rancher.Container
expected bool
name string
healthState string
state string
expected bool
}{
{
container: &rancher.Container{
HealthState: "unhealthy",
State: "running",
},
healthState: "unhealthy",
state: "running",
expected: false,
},
{
healthState: "healthy",
state: "stopped",
expected: false,
},
{
state: "stopped",
expected: false,
},
{
container: &rancher.Container{
HealthState: "healthy",
State: "stopped",
},
expected: false,
healthState: "healthy",
state: "running",
expected: true,
},
{
container: &rancher.Container{
State: "stopped",
},
expected: false,
},
{
container: &rancher.Container{
HealthState: "healthy",
State: "running",
},
expected: true,
},
{
container: &rancher.Container{
HealthState: "updating-healthy",
State: "updating-running",
},
expected: true,
healthState: "updating-healthy",
state: "updating-running",
expected: true,
},
}
for _, e := range containers {
actual := containerFilter(e.container)
if actual != e.expected {
t.Fatalf("expected %t, got %t", e.expected, actual)
for _, container := range containers {
actual := containerFilter(container.name, container.healthState, container.state)
if actual != container.expected {
t.Fatalf("expected %t, got %t", container.expected, actual)
}
}
}
@ -506,7 +497,7 @@ func TestRancherGetLabel(t *testing.T) {
service: rancherData{
Name: "test-service",
},
expected: "Label not found",
expected: "label not found",
},
{
service: rancherData{
@ -593,9 +584,7 @@ func TestRancherLoadRancherConfig(t *testing.T) {
for _, c := range cases {
var rancherDataList []rancherData
for _, service := range c.services {
rancherDataList = append(rancherDataList, service)
}
rancherDataList = append(rancherDataList, c.services...)
actualConfig := provider.loadRancherConfig(rancherDataList)