1
0
Fork 0

Remove Pilot support

This commit is contained in:
Ludovic Fernandez 2022-09-14 10:56:08 +02:00 committed by GitHub
parent a002ccfce3
commit ab8d7d2e78
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
23 changed files with 35 additions and 1264 deletions

View file

@ -3,11 +3,6 @@ package static
// Pilot Configuration related to Traefik Pilot.
// Deprecated.
type Pilot struct {
Token string `description:"Traefik Pilot token." json:"token,omitempty" toml:"token,omitempty" yaml:"token,omitempty" loggable:"false"`
Dashboard bool `description:"Enable Traefik Pilot in the dashboard." json:"dashboard,omitempty" toml:"dashboard,omitempty" yaml:"dashboard,omitempty"`
}
// SetDefaults sets the default values.
func (p *Pilot) SetDefaults() {
p.Dashboard = true
Token string `description:"Traefik Pilot token. (Deprecated)" json:"token,omitempty" toml:"token,omitempty" yaml:"token,omitempty" loggable:"false"`
Dashboard bool `description:"Enable Traefik Pilot in the dashboard. (Deprecated)" json:"dashboard,omitempty" toml:"dashboard,omitempty" yaml:"dashboard,omitempty"`
}

View file

@ -79,7 +79,7 @@ type Configuration struct {
CertificatesResolvers map[string]CertificateResolver `description:"Certificates resolvers configuration." json:"certificatesResolvers,omitempty" toml:"certificatesResolvers,omitempty" yaml:"certificatesResolvers,omitempty" export:"true"`
// Deprecated.
Pilot *Pilot `description:"Traefik Pilot configuration." json:"pilot,omitempty" toml:"pilot,omitempty" yaml:"pilot,omitempty" export:"true"`
Pilot *Pilot `description:"Traefik Pilot configuration (Deprecated)." json:"pilot,omitempty" toml:"pilot,omitempty" yaml:"pilot,omitempty" export:"true"`
Hub *hub.Provider `description:"Traefik Hub configuration." json:"hub,omitempty" toml:"hub,omitempty" yaml:"hub,omitempty" label:"allowEmpty" file:"allowEmpty" export:"true"`
@ -250,16 +250,10 @@ func (c *Configuration) SetEffectiveConfiguration() {
}
// Enable anonymous usage when pilot is enabled.
if c.Pilot != nil && c.Pilot.Token != "" {
if c.Pilot != nil {
c.Global.SendAnonymousUsage = true
}
// Create Pilot struct to apply default value on undefined configuration.
if c.Pilot == nil {
c.Pilot = &Pilot{}
c.Pilot.SetDefaults()
}
// Disable Gateway API provider if not enabled in experimental.
if c.Experimental == nil || !c.Experimental.KubernetesGateway {
c.Providers.KubernetesGateway = nil

View file

@ -1,317 +0,0 @@
package metrics
import (
"strings"
"sync"
"time"
"github.com/go-kit/kit/metrics"
"github.com/go-kit/kit/metrics/generic"
)
const (
// server meta information.
pilotConfigPrefix = "config"
pilotConfigReloadsTotalName = pilotConfigPrefix + "ReloadsTotal"
pilotConfigReloadsFailuresTotalName = pilotConfigPrefix + "ReloadsFailureTotal"
pilotConfigLastReloadSuccessName = pilotConfigPrefix + "LastReloadSuccess"
pilotConfigLastReloadFailureName = pilotConfigPrefix + "LastReloadFailure"
// entry point.
pilotEntryPointPrefix = "entrypoint"
pilotEntryPointReqsTotalName = pilotEntryPointPrefix + "RequestsTotal"
pilotEntryPointReqsTLSTotalName = pilotEntryPointPrefix + "RequestsTLSTotal"
pilotEntryPointReqDurationName = pilotEntryPointPrefix + "RequestDurationSeconds"
pilotEntryPointOpenConnsName = pilotEntryPointPrefix + "OpenConnections"
// service level.
pilotServicePrefix = "service"
pilotServiceReqsTotalName = pilotServicePrefix + "RequestsTotal"
pilotServiceReqsTLSTotalName = pilotServicePrefix + "RequestsTLSTotal"
pilotServiceReqDurationName = pilotServicePrefix + "RequestDurationSeconds"
pilotServiceOpenConnsName = pilotServicePrefix + "OpenConnections"
pilotServiceRetriesTotalName = pilotServicePrefix + "RetriesTotal"
pilotServiceServerUpName = pilotServicePrefix + "ServerUp"
)
const root = "value"
// RegisterPilot registers all Pilot metrics.
func RegisterPilot() *PilotRegistry {
standardRegistry := &standardRegistry{
epEnabled: true,
svcEnabled: true,
}
pr := &PilotRegistry{
standardRegistry: standardRegistry,
counters: make(map[string]*pilotCounter),
gauges: make(map[string]*pilotGauge),
histograms: make(map[string]*pilotHistogram),
}
standardRegistry.configReloadsCounter = pr.newCounter(pilotConfigReloadsTotalName)
standardRegistry.configReloadsFailureCounter = pr.newCounter(pilotConfigReloadsFailuresTotalName)
standardRegistry.lastConfigReloadSuccessGauge = pr.newGauge(pilotConfigLastReloadSuccessName)
standardRegistry.lastConfigReloadFailureGauge = pr.newGauge(pilotConfigLastReloadFailureName)
standardRegistry.entryPointReqsCounter = pr.newCounter(pilotEntryPointReqsTotalName)
standardRegistry.entryPointReqsTLSCounter = pr.newCounter(pilotEntryPointReqsTLSTotalName)
standardRegistry.entryPointReqDurationHistogram, _ = NewHistogramWithScale(pr.newHistogram(pilotEntryPointReqDurationName), time.Millisecond)
standardRegistry.entryPointOpenConnsGauge = pr.newGauge(pilotEntryPointOpenConnsName)
standardRegistry.serviceReqsCounter = pr.newCounter(pilotServiceReqsTotalName)
standardRegistry.serviceReqsTLSCounter = pr.newCounter(pilotServiceReqsTLSTotalName)
standardRegistry.serviceReqDurationHistogram, _ = NewHistogramWithScale(pr.newHistogram(pilotServiceReqDurationName), time.Millisecond)
standardRegistry.serviceOpenConnsGauge = pr.newGauge(pilotServiceOpenConnsName)
standardRegistry.serviceRetriesCounter = pr.newCounter(pilotServiceRetriesTotalName)
standardRegistry.serviceServerUpGauge = pr.newGauge(pilotServiceServerUpName)
return pr
}
// PilotMetric is a representation of a metric.
type PilotMetric struct {
Name string `json:"name"`
Type string `json:"type"`
Observations map[string]interface{} `json:"observations"`
}
type pilotHistogramObservation struct {
Total float64 `json:"total"`
Count float64 `json:"count"`
}
// PilotRegistry represents the pilots metrics registry.
type PilotRegistry struct {
counters map[string]*pilotCounter
gauges map[string]*pilotGauge
histograms map[string]*pilotHistogram
*standardRegistry
}
// newCounter register and returns a new pilotCounter.
func (pr *PilotRegistry) newCounter(name string) *pilotCounter {
c := newPilotCounter(name)
pr.counters[name] = c
return c
}
// newGauge register and returns a new pilotGauge.
func (pr *PilotRegistry) newGauge(name string) *pilotGauge {
g := newPilotGauge(name)
pr.gauges[name] = g
return g
}
// newHistogram register and returns a new pilotHistogram.
func (pr *PilotRegistry) newHistogram(name string) *pilotHistogram {
h := newPilotHistogram(name)
pr.histograms[name] = h
return h
}
// Data exports the metrics: metrics name -> labels -> values.
func (pr *PilotRegistry) Data() []PilotMetric {
var pilotMetrics []PilotMetric
for name, counter := range pr.counters {
pilotMetric := PilotMetric{
Name: name,
Type: "COUNTER",
Observations: make(map[string]interface{}),
}
pilotMetrics = append(pilotMetrics, pilotMetric)
counter.counters.Range(func(key, value interface{}) bool {
labels := key.(string)
pc := value.(*pilotCounter)
if labels == "" {
labels = root
}
if labels == root || len(pc.c.LabelValues())%2 == 0 {
pilotMetric.Observations[labels] = pc.c.Value()
}
return true
})
}
for name, gauge := range pr.gauges {
pilotMetric := PilotMetric{
Name: name,
Type: "GAUGE",
Observations: make(map[string]interface{}),
}
pilotMetrics = append(pilotMetrics, pilotMetric)
gauge.gauges.Range(func(key, value interface{}) bool {
labels := key.(string)
pg := value.(*pilotGauge)
if labels == "" {
labels = root
}
if labels == root || len(pg.g.LabelValues())%2 == 0 {
pilotMetric.Observations[labels] = pg.g.Value()
}
return true
})
}
for name, histogram := range pr.histograms {
pilotMetric := PilotMetric{
Name: name,
Type: "HISTOGRAM",
Observations: make(map[string]interface{}),
}
pilotMetrics = append(pilotMetrics, pilotMetric)
histogram.histograms.Range(func(key, value interface{}) bool {
labels := key.(string)
ph := value.(*pilotHistogram)
if labels == "" {
labels = root
}
if labels == root || len(ph.labels)%2 == 0 {
pilotMetric.Observations[labels] = &pilotHistogramObservation{
Total: ph.total.Value(),
Count: ph.count.Value(),
}
}
return true
})
}
return pilotMetrics
}
type pilotCounter struct {
c *generic.Counter
counters *sync.Map
}
func newPilotCounter(name string) *pilotCounter {
return &pilotCounter{
c: generic.NewCounter(name),
counters: &sync.Map{},
}
}
// With returns a new pilotCounter with the given labels.
func (c *pilotCounter) With(labels ...string) metrics.Counter {
newCounter := c.c.With(labels...).(*generic.Counter)
newCounter.ValueReset()
return &pilotCounter{
c: newCounter,
counters: c.counters,
}
}
// Add adds the given delta to the counter.
func (c *pilotCounter) Add(delta float64) {
labelsKey := strings.Join(c.c.LabelValues(), ",")
pc, _ := c.counters.LoadOrStore(labelsKey, c)
pc.(*pilotCounter).c.Add(delta)
}
type pilotGauge struct {
g *generic.Gauge
gauges *sync.Map
}
func newPilotGauge(name string) *pilotGauge {
return &pilotGauge{
g: generic.NewGauge(name),
gauges: &sync.Map{},
}
}
// With returns a new pilotGauge with the given labels.
func (g *pilotGauge) With(labels ...string) metrics.Gauge {
newGauge := g.g.With(labels...).(*generic.Gauge)
newGauge.Set(0)
return &pilotGauge{
g: newGauge,
gauges: g.gauges,
}
}
// Set sets the given value to the gauge.
func (g *pilotGauge) Set(value float64) {
labelsKey := strings.Join(g.g.LabelValues(), ",")
pg, _ := g.gauges.LoadOrStore(labelsKey, g)
pg.(*pilotGauge).g.Set(value)
}
// Add adds the given delta to the gauge.
func (g *pilotGauge) Add(delta float64) {
labelsKey := strings.Join(g.g.LabelValues(), ",")
pg, _ := g.gauges.LoadOrStore(labelsKey, g)
pg.(*pilotGauge).g.Add(delta)
}
type pilotHistogram struct {
name string
labels []string
count *generic.Counter
total *generic.Counter
histograms *sync.Map
}
func newPilotHistogram(name string) *pilotHistogram {
return &pilotHistogram{
name: name,
labels: make([]string, 0),
count: &generic.Counter{},
total: &generic.Counter{},
histograms: &sync.Map{},
}
}
// With returns a new pilotHistogram with the given labels.
func (h *pilotHistogram) With(labels ...string) metrics.Histogram {
var newLabels []string
newLabels = append(newLabels, h.labels...)
newLabels = append(newLabels, labels...)
return &pilotHistogram{
name: h.name,
labels: newLabels,
count: &generic.Counter{},
total: &generic.Counter{},
histograms: h.histograms,
}
}
// Observe records a new value into the histogram.
func (h *pilotHistogram) Observe(value float64) {
labelsKey := strings.Join(h.labels, ",")
ph, _ := h.histograms.LoadOrStore(labelsKey, h)
pHisto := ph.(*pilotHistogram)
pHisto.count.Add(1)
pHisto.total.Add(value)
}

View file

@ -1,365 +0,0 @@
package metrics
import (
"net/http"
"strconv"
"strings"
"testing"
"time"
"github.com/go-kit/kit/metrics"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestPilotCounter(t *testing.T) {
rootCounter := newPilotCounter("rootCounter")
// Checks that a counter without labels can be incremented.
rootCounter.Add(1)
assertPilotCounterValue(t, 1.0, "", rootCounter)
// Checks that a counter with labels can be incremented.
counterWithLabels := rootCounter.With("foo", "bar", "foo", "buz")
counterWithLabels.Add(1)
assertPilotCounterValue(t, 1.0, "foo,bar,foo,buz", counterWithLabels)
// Checks that the derived counter value has not changed.
assertPilotCounterValue(t, 1.0, "", rootCounter)
// Checks that an existing counter (with the same labels) can be incremented.
existingCounterWithLabels := rootCounter.With("foo", "bar").With("foo", "buz")
existingCounterWithLabels.Add(1)
assertPilotCounterValue(t, 2.0, "foo,bar,foo,buz", existingCounterWithLabels)
}
func assertPilotCounterValue(t *testing.T, expValue float64, labels string, c metrics.Counter) {
t.Helper()
counter, ok := c.(*pilotCounter).counters.Load(labels)
require.True(t, ok)
assert.Equal(t, expValue, counter.(*pilotCounter).c.Value())
}
func TestPilotGauge(t *testing.T) {
rootGauge := newPilotGauge("rootGauge")
// Checks that a gauge without labels can be incremented.
rootGauge.Add(1)
assertPilotGaugeValue(t, 1.0, "", rootGauge)
// Checks that a gauge (without labels) value can be set.
rootGauge.Set(5.0)
assertPilotGaugeValue(t, 5.0, "", rootGauge)
// Checks that a gauge with labels can be incremented.
gaugeWithLabels := rootGauge.With("foo", "bar", "foo", "buz")
gaugeWithLabels.Add(1)
assertPilotGaugeValue(t, 1.0, "foo,bar,foo,buz", gaugeWithLabels)
// Checks that the derived gauge value has not changed.
assertPilotGaugeValue(t, 5.0, "", rootGauge)
// Checks that an existing gauge (with the same labels) can be incremented.
existingGaugeWithLabels := rootGauge.With("foo", "bar").With("foo", "buz")
existingGaugeWithLabels.Add(1)
assertPilotGaugeValue(t, 2.0, "foo,bar,foo,buz", existingGaugeWithLabels)
}
func assertPilotGaugeValue(t *testing.T, expValue float64, labels string, g metrics.Gauge) {
t.Helper()
gauge, ok := g.(*pilotGauge).gauges.Load(labels)
require.True(t, ok)
assert.Equal(t, expValue, gauge.(*pilotGauge).g.Value())
}
func TestPilotHistogram(t *testing.T) {
rootHistogram := newPilotHistogram("rootHistogram")
// Checks that an histogram without labels can be updated.
rootHistogram.Observe(1)
assertPilotHistogramValues(t, 1.0, 1.0, "", rootHistogram)
rootHistogram.Observe(2)
assertPilotHistogramValues(t, 2.0, 3.0, "", rootHistogram)
// Checks that an histogram with labels can be updated.
histogramWithLabels := rootHistogram.With("foo", "bar", "foo", "buz")
histogramWithLabels.Observe(1)
assertPilotHistogramValues(t, 1.0, 1.0, "foo,bar,foo,buz", histogramWithLabels)
// Checks that the derived histogram has not changed.
assertPilotHistogramValues(t, 2.0, 3.0, "", rootHistogram)
// Checks that an existing histogram (with the same labels) can be updated.
existingHistogramWithLabels := rootHistogram.With("foo", "bar").With("foo", "buz")
existingHistogramWithLabels.Observe(1)
assertPilotHistogramValues(t, 2.0, 2.0, "foo,bar,foo,buz", existingHistogramWithLabels)
}
func assertPilotHistogramValues(t *testing.T, expCount, expTotal float64, labels string, h metrics.Histogram) {
t.Helper()
histogram, ok := h.(*pilotHistogram).histograms.Load(labels)
require.True(t, ok)
assert.Equal(t, expCount, histogram.(*pilotHistogram).count.Value())
assert.Equal(t, expTotal, histogram.(*pilotHistogram).total.Value())
}
func TestPilotMetrics(t *testing.T) {
pilotRegistry := RegisterPilot()
if !pilotRegistry.IsEpEnabled() || !pilotRegistry.IsSvcEnabled() {
t.Errorf("PilotRegistry should return true for IsEnabled() and IsSvcEnabled()")
}
pilotRegistry.ConfigReloadsCounter().Add(1)
pilotRegistry.ConfigReloadsFailureCounter().Add(1)
pilotRegistry.LastConfigReloadSuccessGauge().Set(float64(time.Now().Unix()))
pilotRegistry.LastConfigReloadFailureGauge().Set(float64(time.Now().Unix()))
pilotRegistry.
EntryPointReqsCounter().
With("code", strconv.Itoa(http.StatusOK), "method", http.MethodGet, "protocol", "http", "entrypoint", "http").
Add(1)
pilotRegistry.
EntryPointReqDurationHistogram().
With("code", strconv.Itoa(http.StatusOK), "method", http.MethodGet, "protocol", "http", "entrypoint", "http").
Observe(1)
pilotRegistry.
EntryPointOpenConnsGauge().
With("method", http.MethodGet, "protocol", "http", "entrypoint", "http").
Set(1)
pilotRegistry.
ServiceReqsCounter().
With("service", "service1", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet, "protocol", "http").
Add(1)
pilotRegistry.
ServiceReqDurationHistogram().
With("service", "service1", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet, "protocol", "http").
Observe(10000)
pilotRegistry.
ServiceOpenConnsGauge().
With("service", "service1", "method", http.MethodGet, "protocol", "http").
Set(1)
pilotRegistry.
ServiceRetriesCounter().
With("service", "service1").
Add(1)
pilotRegistry.
ServiceServerUpGauge().
With("service", "service1", "url", "http://127.0.0.10:80").
Set(1)
data := pilotRegistry.Data()
testCases := []struct {
name string
labels map[string]string
assert func(*PilotMetric)
}{
{
name: pilotConfigReloadsTotalName,
assert: buildPilotCounterAssert(t, pilotConfigReloadsTotalName, 1),
},
{
name: pilotConfigReloadsFailuresTotalName,
assert: buildPilotCounterAssert(t, pilotConfigReloadsFailuresTotalName, 1),
},
{
name: pilotConfigLastReloadSuccessName,
assert: buildPilotTimestampAssert(t, pilotConfigLastReloadSuccessName),
},
{
name: pilotConfigLastReloadFailureName,
assert: buildPilotTimestampAssert(t, pilotConfigLastReloadFailureName),
},
{
name: pilotEntryPointReqsTotalName,
labels: map[string]string{
"code": "200",
"method": http.MethodGet,
"protocol": "http",
"entrypoint": "http",
},
assert: buildPilotCounterAssert(t, pilotEntryPointReqsTotalName, 1),
},
{
name: pilotEntryPointReqDurationName,
labels: map[string]string{
"code": "200",
"method": http.MethodGet,
"protocol": "http",
"entrypoint": "http",
},
assert: buildPilotHistogramAssert(t, pilotEntryPointReqDurationName, 1),
},
{
name: pilotEntryPointOpenConnsName,
labels: map[string]string{
"method": http.MethodGet,
"protocol": "http",
"entrypoint": "http",
},
assert: buildPilotGaugeAssert(t, pilotEntryPointOpenConnsName, 1),
},
{
name: pilotServiceReqsTotalName,
labels: map[string]string{
"code": "200",
"method": http.MethodGet,
"protocol": "http",
"service": "service1",
},
assert: buildPilotCounterAssert(t, pilotServiceReqsTotalName, 1),
},
{
name: pilotServiceReqDurationName,
labels: map[string]string{
"code": "200",
"method": http.MethodGet,
"protocol": "http",
"service": "service1",
},
assert: buildPilotHistogramAssert(t, pilotServiceReqDurationName, 1),
},
{
name: pilotServiceOpenConnsName,
labels: map[string]string{
"method": http.MethodGet,
"protocol": "http",
"service": "service1",
},
assert: buildPilotGaugeAssert(t, pilotServiceOpenConnsName, 1),
},
{
name: pilotServiceRetriesTotalName,
labels: map[string]string{
"service": "service1",
},
assert: buildPilotGreaterThanCounterAssert(t, pilotServiceRetriesTotalName, 1),
},
{
name: pilotServiceServerUpName,
labels: map[string]string{
"service": "service1",
"url": "http://127.0.0.10:80",
},
assert: buildPilotGaugeAssert(t, pilotServiceServerUpName, 1),
},
}
for _, test := range testCases {
test := test
t.Run(test.name, func(t *testing.T) {
metric := findPilotMetric(test.name, data)
if metric == nil {
t.Errorf("metrics do not contain %q", test.name)
return
}
for labels := range metric.Observations {
if len(labels)%2 == 0 {
splitLabels := strings.Split(labels, ",")
for i := 0; i < len(splitLabels); i += 2 {
label := splitLabels[i]
value := splitLabels[i+1]
val, ok := test.labels[label]
if !ok {
t.Errorf("%q metric contains unexpected label %q", test.name, label)
} else if val != value {
t.Errorf("label %q in metric %q has wrong value %q, expected %q", label, test.name, value, val)
}
}
}
}
test.assert(metric)
})
}
}
func findPilotMetric(name string, metrics []PilotMetric) *PilotMetric {
for _, metric := range metrics {
if metric.Name == name {
return &metric
}
}
return nil
}
func buildPilotCounterAssert(t *testing.T, metricName string, expectedValue float64) func(metric *PilotMetric) {
t.Helper()
return func(metric *PilotMetric) {
for _, value := range metric.Observations {
if cv := value.(float64); cv != expectedValue {
t.Errorf("metric %s has value %f, want %f", metricName, cv, expectedValue)
}
break
}
}
}
func buildPilotGreaterThanCounterAssert(t *testing.T, metricName string, expectedMinValue float64) func(metric *PilotMetric) {
t.Helper()
return func(metric *PilotMetric) {
for _, value := range metric.Observations {
if cv := value.(float64); cv < expectedMinValue {
t.Errorf("metric %s has value %f, want at least %f", metricName, cv, expectedMinValue)
}
break
}
}
}
func buildPilotHistogramAssert(t *testing.T, metricName string, expectedSampleCount float64) func(metric *PilotMetric) {
t.Helper()
return func(metric *PilotMetric) {
for _, value := range metric.Observations {
if pho := value.(*pilotHistogramObservation); pho.Count != expectedSampleCount {
t.Errorf("metric %s has sample count value %f, want %f", metricName, pho, expectedSampleCount)
}
break
}
}
}
func buildPilotGaugeAssert(t *testing.T, metricName string, expectedValue float64) func(metric *PilotMetric) {
t.Helper()
return func(metric *PilotMetric) {
for _, value := range metric.Observations {
if gv := value.(float64); gv != expectedValue {
t.Errorf("metric %s has value %f, want %f", metricName, gv, expectedValue)
}
break
}
}
}
func buildPilotTimestampAssert(t *testing.T, metricName string) func(metric *PilotMetric) {
t.Helper()
return func(metric *PilotMetric) {
for _, value := range metric.Observations {
if ts := time.Unix(int64(value.(float64)), 0); time.Since(ts) > time.Minute {
t.Errorf("metric %s has wrong timestamp %v", metricName, ts)
}
break
}
}
}

View file

@ -1,252 +0,0 @@
package pilot
import (
"bytes"
"context"
"encoding/json"
"fmt"
"hash/fnv"
"io"
"net/http"
"time"
"github.com/cenkalti/backoff/v4"
"github.com/traefik/traefik/v2/pkg/config/dynamic"
"github.com/traefik/traefik/v2/pkg/log"
"github.com/traefik/traefik/v2/pkg/metrics"
"github.com/traefik/traefik/v2/pkg/redactor"
"github.com/traefik/traefik/v2/pkg/safe"
"github.com/traefik/traefik/v2/pkg/version"
)
const (
baseInstanceInfoURL = "https://instance-info.pilot.traefik.io/public"
baseGatewayURL = "https://gateway.pilot.traefik.io"
)
const (
tokenHeader = "X-Token"
tokenHashHeader = "X-Token-Hash"
)
const (
pilotInstanceInfoTimer = 5 * time.Minute
pilotDynConfTimer = 12 * time.Hour
maxElapsedTime = 4 * time.Minute
initialInterval = 5 * time.Second
multiplier = 3
)
type instanceInfo struct {
ID string `json:"id,omitempty"`
Metrics []metrics.PilotMetric `json:"metrics,omitempty"`
}
// New creates a new Pilot.
func New(token string, metricsRegistry *metrics.PilotRegistry, pool *safe.Pool) *Pilot {
tokenHash := fnv.New64a()
// the `sum64a` implementation of the `Write` method never returns an error.
_, _ = tokenHash.Write([]byte(token))
return &Pilot{
dynamicConfigCh: make(chan dynamic.Configuration),
client: &client{
token: token,
tokenHash: fmt.Sprintf("%x", tokenHash.Sum64()),
httpClient: &http.Client{Timeout: 5 * time.Second},
baseInstanceInfoURL: baseInstanceInfoURL,
baseGatewayURL: baseGatewayURL,
},
routinesPool: pool,
metricsRegistry: metricsRegistry,
}
}
// Pilot connector with Pilot.
type Pilot struct {
routinesPool *safe.Pool
client *client
dynamicConfig dynamic.Configuration
dynamicConfigCh chan dynamic.Configuration
metricsRegistry *metrics.PilotRegistry
}
// SetDynamicConfiguration stores the dynamic configuration.
func (p *Pilot) SetDynamicConfiguration(dynamicConfig dynamic.Configuration) {
p.dynamicConfigCh <- dynamicConfig
}
func (p *Pilot) sendAnonDynConf(ctx context.Context, config dynamic.Configuration) {
err := p.client.SendAnonDynConf(ctx, config)
if err != nil {
log.WithoutContext().Error(err)
}
}
func (p *Pilot) sendInstanceInfo(ctx context.Context, pilotMetrics []metrics.PilotMetric) {
err := p.client.SendInstanceInfo(ctx, pilotMetrics)
if err != nil {
log.WithoutContext().Error(err)
}
}
// Tick sends data periodically.
func (p *Pilot) Tick(ctx context.Context) {
pilotMetrics := p.metricsRegistry.Data()
p.routinesPool.GoCtx(func(ctxRt context.Context) {
p.sendInstanceInfo(ctxRt, pilotMetrics)
})
instanceInfoTicker := time.NewTicker(pilotInstanceInfoTimer)
dynConfTicker := time.NewTicker(pilotDynConfTimer)
for {
select {
case tick := <-instanceInfoTicker.C:
log.WithoutContext().Debugf("Send instance info to pilot: %s", tick)
pilotMetrics := p.metricsRegistry.Data()
p.routinesPool.GoCtx(func(ctxRt context.Context) {
p.sendInstanceInfo(ctxRt, pilotMetrics)
})
case tick := <-dynConfTicker.C:
log.WithoutContext().Debugf("Send anonymized dynamic configuration to pilot: %s", tick)
p.routinesPool.GoCtx(func(ctxRt context.Context) {
p.sendAnonDynConf(ctxRt, p.dynamicConfig)
})
case dynamicConfig := <-p.dynamicConfigCh:
p.dynamicConfig = dynamicConfig
case <-ctx.Done():
return
}
}
}
type client struct {
httpClient *http.Client
baseInstanceInfoURL string
baseGatewayURL string
token string
tokenHash string
uuid string
}
func (c *client) createUUID() (string, error) {
data := []byte(`{"version":"` + version.Version + `","codeName":"` + version.Codename + `"}`)
req, err := http.NewRequest(http.MethodPost, c.baseInstanceInfoURL+"/", bytes.NewBuffer(data))
if err != nil {
return "", fmt.Errorf("failed to create request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set(tokenHeader, c.token)
resp, err := c.httpClient.Do(req)
if err != nil {
return "", fmt.Errorf("failed call Pilot: %w", err)
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return "", fmt.Errorf("failed read response body: %w", err)
}
if resp.StatusCode/100 != 2 {
return "", fmt.Errorf("wrong status code while sending configuration: %d: %s", resp.StatusCode, body)
}
created := instanceInfo{}
err = json.Unmarshal(body, &created)
if err != nil {
return "", fmt.Errorf("failed to unmarshal response body: %w", err)
}
return created.ID, nil
}
// SendAnonDynConf sends anonymized dynamic configuration to Pilot.
func (c *client) SendAnonDynConf(ctx context.Context, config dynamic.Configuration) error {
anonConfig, err := redactor.Anonymize(&config)
if err != nil {
return fmt.Errorf("unable to anonymize dynamic configuration: %w", err)
}
req, err := http.NewRequest(http.MethodPost, c.baseGatewayURL+"/collect", bytes.NewReader([]byte(anonConfig)))
if err != nil {
return fmt.Errorf("failed to create request: %w", err)
}
return c.sendDataRetryable(ctx, req)
}
// SendInstanceInfo sends instance information to Pilot.
func (c *client) SendInstanceInfo(ctx context.Context, pilotMetrics []metrics.PilotMetric) error {
if len(c.uuid) == 0 {
var err error
c.uuid, err = c.createUUID()
if err != nil {
return fmt.Errorf("failed to create UUID: %w", err)
}
}
info := instanceInfo{
ID: c.uuid,
Metrics: pilotMetrics,
}
b, err := json.Marshal(info)
if err != nil {
return fmt.Errorf("failed to marshall request body: %w", err)
}
req, err := http.NewRequest(http.MethodPost, c.baseInstanceInfoURL+"/command", bytes.NewReader(b))
if err != nil {
return fmt.Errorf("failed to create instance info request: %w", err)
}
req.Header.Set(tokenHeader, c.token)
return c.sendDataRetryable(ctx, req)
}
func (c *client) sendDataRetryable(ctx context.Context, req *http.Request) error {
exponentialBackOff := backoff.NewExponentialBackOff()
exponentialBackOff.MaxElapsedTime = maxElapsedTime
exponentialBackOff.InitialInterval = initialInterval
exponentialBackOff.Multiplier = multiplier
req.Header.Set("Content-Type", "application/json")
req.Header.Set(tokenHashHeader, c.tokenHash)
return backoff.RetryNotify(
func() error {
resp, err := c.httpClient.Do(req)
if err != nil {
return fmt.Errorf("failed to call Pilot: %w", err)
}
defer func() { _ = resp.Body.Close() }()
body, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("failed to read response body: %w", err)
}
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("wrong status code while sending configuration: %d: %s", resp.StatusCode, body)
}
return nil
},
backoff.WithContext(exponentialBackOff, ctx),
func(err error, duration time.Duration) {
log.WithoutContext().Errorf("retry in %s due to: %v ", duration, err)
})
}

View file

@ -1,225 +0,0 @@
package pilot
import (
"context"
"encoding/json"
"fmt"
"hash/fnv"
"net/http"
"net/http/httptest"
"reflect"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/traefik/traefik/v2/pkg/config/dynamic"
"github.com/traefik/traefik/v2/pkg/metrics"
"github.com/traefik/traefik/v2/pkg/safe"
)
func TestTick(t *testing.T) {
receivedConfig := make(chan bool)
mux := http.NewServeMux()
server := httptest.NewServer(mux)
t.Cleanup(server.Close)
mux.HandleFunc("/", func(rw http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodPost {
http.Error(rw, "invalid method", http.StatusMethodNotAllowed)
return
}
err := json.NewEncoder(rw).Encode(instanceInfo{ID: "123"})
if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
return
}
})
mux.HandleFunc("/command", func(rw http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodPost {
http.Error(rw, "invalid method", http.StatusMethodNotAllowed)
return
}
receivedConfig <- true
})
pilot := New("token", metrics.RegisterPilot(), safe.NewPool(context.Background()))
pilot.client.baseInstanceInfoURL = server.URL
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
go pilot.Tick(ctx)
pilot.SetDynamicConfiguration(dynamic.Configuration{})
pilot.SetDynamicConfiguration(dynamic.Configuration{})
select {
case <-time.Tick(10 * time.Second):
t.Fatal("Timeout")
case <-receivedConfig:
return
}
}
func TestClient_SendInstanceInfo(t *testing.T) {
myToken := "myToken"
myTokenHash, err := hashToken(myToken)
require.NoError(t, err)
mux := http.NewServeMux()
server := httptest.NewServer(mux)
t.Cleanup(server.Close)
mux.HandleFunc("/", func(rw http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodPost {
http.Error(rw, "invalid method", http.StatusMethodNotAllowed)
return
}
tk := req.Header.Get(tokenHeader)
if tk != myToken {
http.Error(rw, fmt.Sprintf("invalid token: %s", tk), http.StatusUnauthorized)
return
}
err := json.NewEncoder(rw).Encode(instanceInfo{ID: "123"})
if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
return
}
})
mux.HandleFunc("/command", func(rw http.ResponseWriter, req *http.Request) {
if req.Method != http.MethodPost {
http.Error(rw, "invalid method", http.StatusMethodNotAllowed)
return
}
tk := req.Header.Get(tokenHeader)
if tk != myToken {
http.Error(rw, fmt.Sprintf("invalid token: %s", tk), http.StatusUnauthorized)
return
}
tkh := req.Header.Get(tokenHashHeader)
if tkh != myTokenHash {
http.Error(rw, fmt.Sprintf("invalid token hash: %s", tkh), http.StatusBadRequest)
return
}
defer func() { _ = req.Body.Close() }()
info := &instanceInfo{}
err := json.NewDecoder(req.Body).Decode(info)
if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
return
}
if info.ID != "123" {
http.Error(rw, fmt.Sprintf("invalid ID: %s", info.ID), http.StatusBadRequest)
}
})
client := client{
baseInstanceInfoURL: server.URL,
httpClient: http.DefaultClient,
token: myToken,
tokenHash: myTokenHash,
}
err = client.SendInstanceInfo(context.Background(), []metrics.PilotMetric{})
require.NoError(t, err)
}
func TestClient_SendAnonDynConf(t *testing.T) {
myToken := "myToken"
myTokenHash, err := hashToken(myToken)
require.NoError(t, err)
var count int
mux := http.NewServeMux()
mux.HandleFunc("/collect", func(rw http.ResponseWriter, req *http.Request) {
count++
if count == 1 {
http.Error(rw, "OOPS", http.StatusBadRequest)
return
}
if req.Method != http.MethodPost {
http.Error(rw, "invalid method", http.StatusMethodNotAllowed)
return
}
tkh := req.Header.Get(tokenHashHeader)
if tkh != myTokenHash {
http.Error(rw, fmt.Sprintf("invalid token hash: %s", tkh), http.StatusBadRequest)
return
}
defer func() { _ = req.Body.Close() }()
config := &dynamic.Configuration{}
err := json.NewDecoder(req.Body).Decode(config)
if err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
return
}
router, exists := config.HTTP.Routers["foo"]
if !exists {
http.Error(rw, "router configuration is missing", http.StatusBadRequest)
return
}
if !reflect.DeepEqual(router, &dynamic.Router{Service: "foo", Rule: "xxxx"}) {
http.Error(rw, fmt.Sprintf("configuration is not anonymized: %+v", router), http.StatusBadRequest)
return
}
})
server := httptest.NewServer(mux)
t.Cleanup(server.Close)
client := client{
baseGatewayURL: server.URL,
httpClient: http.DefaultClient,
token: myToken,
tokenHash: myTokenHash,
}
config := dynamic.Configuration{
HTTP: &dynamic.HTTPConfiguration{
Routers: map[string]*dynamic.Router{
"foo": {
Service: "foo",
Rule: "foo.com",
},
},
},
}
err = client.SendAnonDynConf(context.Background(), config)
require.NoError(t, err)
assert.Equal(t, 2, count)
}
func hashToken(token string) (string, error) {
tokenHash := fnv.New64a()
_, err := tokenHash.Write([]byte(token))
if err != nil {
return "", err
}
return fmt.Sprintf("%x", tokenHash.Sum64()), nil
}

View file

@ -29,20 +29,19 @@ const (
pluginManifest = ".traefik.yml"
)
const pilotURL = "https://plugin.pilot.traefik.io/public/"
const pluginsURL = "https://plugins.traefik.io/public/"
const (
hashHeader = "X-Plugin-Hash"
tokenHeader = "X-Token"
)
// ClientOptions the options of a Traefik Pilot client.
// ClientOptions the options of a Traefik plugins client.
type ClientOptions struct {
Output string
Token string
}
// Client a Traefik Pilot client.
// Client a Traefik plugins client.
type Client struct {
HTTPClient *http.Client
baseURL *url.URL
@ -54,9 +53,9 @@ type Client struct {
sources string
}
// NewClient creates a new Traefik Pilot client.
// NewClient creates a new Traefik plugins client.
func NewClient(opts ClientOptions) (*Client, error) {
baseURL, err := url.Parse(pilotURL)
baseURL, err := url.Parse(pluginsURL)
if err != nil {
return nil, err
}
@ -87,8 +86,6 @@ func NewClient(opts ClientOptions) (*Client, error) {
goPath: goPath,
sources: filepath.Join(goPath, goPathSrc),
token: opts.Token,
}, nil
}

View file

@ -956,10 +956,6 @@ func TestDo_staticConfiguration(t *testing.T) {
},
}
config.Pilot = &static.Pilot{
Token: "token",
}
config.Experimental = &static.Experimental{
Plugins: map[string]plugins.Descriptor{
"Descriptor0": {

View file

@ -475,4 +475,4 @@
}
}
}
}
}

View file

@ -445,9 +445,6 @@
}
}
},
"pilot": {
"token": "xxxx"
},
"experimental": {
"plugins": {
"Descriptor0": {

View file

@ -483,4 +483,4 @@
}
}
}
}
}

View file

@ -37,11 +37,10 @@ func (v Handler) Append(router *mux.Router) {
router.Methods(http.MethodGet).Path("/api/version").
HandlerFunc(func(response http.ResponseWriter, request *http.Request) {
v := struct {
Version string
Codename string
StartDate time.Time `json:"startDate"`
UUID string `json:"uuid,omitempty"`
PilotEnabled bool `json:"pilotEnabled"`
Version string
Codename string
StartDate time.Time `json:"startDate"`
UUID string `json:"uuid,omitempty"`
}{
Version: Version,
Codename: Codename,