Send anonymized dynamic configuration to Pilot
Co-authored-by: Kevin Pollet <pollet.kevin@gmail.com>
This commit is contained in:
parent
a488430f23
commit
64a65cadf3
14 changed files with 1394 additions and 374 deletions
|
@ -5,57 +5,56 @@ import (
|
|||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"hash/fnv"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
"github.com/traefik/traefik/v2/pkg/config/runtime"
|
||||
"github.com/traefik/traefik/v2/pkg/anonymize"
|
||||
"github.com/traefik/traefik/v2/pkg/config/dynamic"
|
||||
"github.com/traefik/traefik/v2/pkg/log"
|
||||
"github.com/traefik/traefik/v2/pkg/metrics"
|
||||
"github.com/traefik/traefik/v2/pkg/safe"
|
||||
"github.com/traefik/traefik/v2/pkg/version"
|
||||
)
|
||||
|
||||
const baseURL = "https://instance-info.pilot.traefik.io/public"
|
||||
|
||||
const tokenHeader = "X-Token"
|
||||
|
||||
const (
|
||||
pilotTimer = 5 * time.Minute
|
||||
maxElapsedTime = 4 * time.Minute
|
||||
baseInstanceInfoURL = "https://instance-info.pilot.traefik.io/public"
|
||||
baseGatewayURL = "https://gateway.pilot.traefik.io"
|
||||
)
|
||||
|
||||
// RunTimeRepresentation is the configuration information exposed by the API handler.
|
||||
type RunTimeRepresentation struct {
|
||||
Routers map[string]*runtime.RouterInfo `json:"routers,omitempty"`
|
||||
Middlewares map[string]*runtime.MiddlewareInfo `json:"middlewares,omitempty"`
|
||||
Services map[string]*serviceInfoRepresentation `json:"services,omitempty"`
|
||||
TCPRouters map[string]*runtime.TCPRouterInfo `json:"tcpRouters,omitempty"`
|
||||
TCPServices map[string]*runtime.TCPServiceInfo `json:"tcpServices,omitempty"`
|
||||
UDPRouters map[string]*runtime.UDPRouterInfo `json:"udpRouters,omitempty"`
|
||||
UDPServices map[string]*runtime.UDPServiceInfo `json:"udpServices,omitempty"`
|
||||
}
|
||||
const (
|
||||
tokenHeader = "X-Token"
|
||||
tokenHashHeader = "X-Token-Hash"
|
||||
)
|
||||
|
||||
type serviceInfoRepresentation struct {
|
||||
*runtime.ServiceInfo
|
||||
ServerStatus map[string]string `json:"serverStatus,omitempty"`
|
||||
}
|
||||
const (
|
||||
pilotInstanceInfoTimer = 5 * time.Minute
|
||||
pilotDynConfTimer = 12 * time.Hour
|
||||
maxElapsedTime = 4 * time.Minute
|
||||
)
|
||||
|
||||
type instanceInfo struct {
|
||||
ID string `json:"id,omitempty"`
|
||||
Configuration RunTimeRepresentation `json:"configuration,omitempty"`
|
||||
Metrics []metrics.PilotMetric `json:"metrics,omitempty"`
|
||||
ID string `json:"id,omitempty"`
|
||||
Metrics []metrics.PilotMetric `json:"metrics,omitempty"`
|
||||
}
|
||||
|
||||
// New creates a new Pilot.
|
||||
func New(token string, metricsRegistry *metrics.PilotRegistry, pool *safe.Pool) *Pilot {
|
||||
tokenHash := fnv.New64a()
|
||||
|
||||
// the `sum64a` implementation of the `Write` method never returns an error.
|
||||
_, _ = tokenHash.Write([]byte(token))
|
||||
|
||||
return &Pilot{
|
||||
rtConfChan: make(chan *runtime.Configuration),
|
||||
dynamicConfigCh: make(chan dynamic.Configuration),
|
||||
client: &client{
|
||||
token: token,
|
||||
httpClient: &http.Client{Timeout: 5 * time.Second},
|
||||
baseURL: baseURL,
|
||||
token: token,
|
||||
tokenHash: fmt.Sprintf("%x", tokenHash.Sum64()),
|
||||
httpClient: &http.Client{Timeout: 5 * time.Second},
|
||||
baseInstanceInfoURL: baseInstanceInfoURL,
|
||||
baseGatewayURL: baseGatewayURL,
|
||||
},
|
||||
routinesPool: pool,
|
||||
metricsRegistry: metricsRegistry,
|
||||
|
@ -67,44 +66,25 @@ type Pilot struct {
|
|||
routinesPool *safe.Pool
|
||||
client *client
|
||||
|
||||
rtConf *runtime.Configuration
|
||||
rtConfChan chan *runtime.Configuration
|
||||
dynamicConfig dynamic.Configuration
|
||||
dynamicConfigCh chan dynamic.Configuration
|
||||
metricsRegistry *metrics.PilotRegistry
|
||||
}
|
||||
|
||||
// SetRuntimeConfiguration stores the runtime configuration.
|
||||
func (p *Pilot) SetRuntimeConfiguration(rtConf *runtime.Configuration) {
|
||||
p.rtConfChan <- rtConf
|
||||
// SetDynamicConfiguration stores the dynamic configuration.
|
||||
func (p *Pilot) SetDynamicConfiguration(dynamicConfig dynamic.Configuration) {
|
||||
p.dynamicConfigCh <- dynamicConfig
|
||||
}
|
||||
|
||||
func (p *Pilot) getRepresentation() RunTimeRepresentation {
|
||||
if p.rtConf == nil {
|
||||
return RunTimeRepresentation{}
|
||||
func (p *Pilot) sendAnonDynConf(ctx context.Context, config dynamic.Configuration) {
|
||||
err := p.client.SendAnonDynConf(ctx, config)
|
||||
if err != nil {
|
||||
log.WithoutContext().Error(err)
|
||||
}
|
||||
|
||||
siRepr := make(map[string]*serviceInfoRepresentation, len(p.rtConf.Services))
|
||||
for k, v := range p.rtConf.Services {
|
||||
siRepr[k] = &serviceInfoRepresentation{
|
||||
ServiceInfo: v,
|
||||
ServerStatus: v.GetAllStatus(),
|
||||
}
|
||||
}
|
||||
|
||||
result := RunTimeRepresentation{
|
||||
Routers: p.rtConf.Routers,
|
||||
Middlewares: p.rtConf.Middlewares,
|
||||
Services: siRepr,
|
||||
TCPRouters: p.rtConf.TCPRouters,
|
||||
TCPServices: p.rtConf.TCPServices,
|
||||
UDPRouters: p.rtConf.UDPRouters,
|
||||
UDPServices: p.rtConf.UDPServices,
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
func (p *Pilot) sendData(ctx context.Context, conf RunTimeRepresentation, pilotMetrics []metrics.PilotMetric) {
|
||||
err := p.client.SendData(ctx, conf, pilotMetrics)
|
||||
func (p *Pilot) sendInstanceInfo(ctx context.Context, pilotMetrics []metrics.PilotMetric) {
|
||||
err := p.client.SendInstanceInfo(ctx, pilotMetrics)
|
||||
if err != nil {
|
||||
log.WithoutContext().Error(err)
|
||||
}
|
||||
|
@ -112,35 +92,33 @@ func (p *Pilot) sendData(ctx context.Context, conf RunTimeRepresentation, pilotM
|
|||
|
||||
// Tick sends data periodically.
|
||||
func (p *Pilot) Tick(ctx context.Context) {
|
||||
select {
|
||||
case rtConf := <-p.rtConfChan:
|
||||
p.rtConf = rtConf
|
||||
break
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
||||
conf := p.getRepresentation()
|
||||
pilotMetrics := p.metricsRegistry.Data()
|
||||
|
||||
p.routinesPool.GoCtx(func(ctxRt context.Context) {
|
||||
p.sendData(ctxRt, conf, pilotMetrics)
|
||||
p.sendInstanceInfo(ctxRt, pilotMetrics)
|
||||
})
|
||||
|
||||
ticker := time.NewTicker(pilotTimer)
|
||||
instanceInfoTicker := time.NewTicker(pilotInstanceInfoTimer)
|
||||
dynConfTicker := time.NewTicker(pilotDynConfTimer)
|
||||
|
||||
for {
|
||||
select {
|
||||
case tick := <-ticker.C:
|
||||
log.WithoutContext().Debugf("Send to pilot: %s", tick)
|
||||
case tick := <-instanceInfoTicker.C:
|
||||
log.WithoutContext().Debugf("Send instance info to pilot: %s", tick)
|
||||
|
||||
conf := p.getRepresentation()
|
||||
pilotMetrics := p.metricsRegistry.Data()
|
||||
|
||||
p.routinesPool.GoCtx(func(ctxRt context.Context) {
|
||||
p.sendData(ctxRt, conf, pilotMetrics)
|
||||
p.sendInstanceInfo(ctxRt, pilotMetrics)
|
||||
})
|
||||
case rtConf := <-p.rtConfChan:
|
||||
p.rtConf = rtConf
|
||||
case tick := <-dynConfTicker.C:
|
||||
log.WithoutContext().Debugf("Send anonymized dynamic configuration to pilot: %s", tick)
|
||||
|
||||
p.routinesPool.GoCtx(func(ctxRt context.Context) {
|
||||
p.sendAnonDynConf(ctxRt, p.dynamicConfig)
|
||||
})
|
||||
case dynamicConfig := <-p.dynamicConfigCh:
|
||||
p.dynamicConfig = dynamicConfig
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
|
@ -148,15 +126,17 @@ func (p *Pilot) Tick(ctx context.Context) {
|
|||
}
|
||||
|
||||
type client struct {
|
||||
httpClient *http.Client
|
||||
baseURL string
|
||||
token string
|
||||
uuid string
|
||||
httpClient *http.Client
|
||||
baseInstanceInfoURL string
|
||||
baseGatewayURL string
|
||||
token string
|
||||
tokenHash string
|
||||
uuid string
|
||||
}
|
||||
|
||||
func (c *client) createUUID() (string, error) {
|
||||
data := []byte(`{"version":"` + version.Version + `","codeName":"` + version.Codename + `"}`)
|
||||
req, err := http.NewRequest(http.MethodPost, c.baseURL+"/", bytes.NewBuffer(data))
|
||||
req, err := http.NewRequest(http.MethodPost, c.baseInstanceInfoURL+"/", bytes.NewBuffer(data))
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to create request: %w", err)
|
||||
}
|
||||
|
@ -189,22 +169,23 @@ func (c *client) createUUID() (string, error) {
|
|||
return created.ID, nil
|
||||
}
|
||||
|
||||
// SendData sends data to Pilot.
|
||||
func (c *client) SendData(ctx context.Context, rtConf RunTimeRepresentation, pilotMetrics []metrics.PilotMetric) error {
|
||||
exponentialBackOff := backoff.NewExponentialBackOff()
|
||||
exponentialBackOff.MaxElapsedTime = maxElapsedTime
|
||||
// SendAnonDynConf sends anonymized dynamic configuration to Pilot.
|
||||
func (c *client) SendAnonDynConf(ctx context.Context, config dynamic.Configuration) error {
|
||||
anonConfig, err := anonymize.Do(&config, false)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to anonymize dynamic configuration: %w", err)
|
||||
}
|
||||
|
||||
return backoff.RetryNotify(
|
||||
func() error {
|
||||
return c.sendData(rtConf, pilotMetrics)
|
||||
},
|
||||
backoff.WithContext(exponentialBackOff, ctx),
|
||||
func(err error, duration time.Duration) {
|
||||
log.WithoutContext().Errorf("retry in %s due to: %v ", duration, err)
|
||||
})
|
||||
req, err := http.NewRequest(http.MethodPost, c.baseGatewayURL+"/collect", bytes.NewReader([]byte(anonConfig)))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create request: %w", err)
|
||||
}
|
||||
|
||||
return c.sendDataRetryable(ctx, req)
|
||||
}
|
||||
|
||||
func (c *client) sendData(_ RunTimeRepresentation, pilotMetrics []metrics.PilotMetric) error {
|
||||
// SendInstanceInfo sends instance information to Pilot.
|
||||
func (c *client) SendInstanceInfo(ctx context.Context, pilotMetrics []metrics.PilotMetric) error {
|
||||
if len(c.uuid) == 0 {
|
||||
var err error
|
||||
c.uuid, err = c.createUUID()
|
||||
|
@ -225,29 +206,45 @@ func (c *client) sendData(_ RunTimeRepresentation, pilotMetrics []metrics.PilotM
|
|||
return fmt.Errorf("failed to marshall request body: %w", err)
|
||||
}
|
||||
|
||||
request, err := http.NewRequest(http.MethodPost, c.baseURL+"/command", bytes.NewBuffer(b))
|
||||
req, err := http.NewRequest(http.MethodPost, c.baseInstanceInfoURL+"/command", bytes.NewReader(b))
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create request: %w", err)
|
||||
return fmt.Errorf("failed to create instance info request: %w", err)
|
||||
}
|
||||
|
||||
request.Header.Set("Content-Type", "application/json")
|
||||
request.Header.Set(tokenHeader, c.token)
|
||||
req.Header.Set(tokenHeader, c.token)
|
||||
|
||||
resp, err := c.httpClient.Do(request)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to call Pilot: %w", err)
|
||||
}
|
||||
|
||||
defer resp.Body.Close()
|
||||
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read response body: %w", err)
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf("wrong status code while sending configuration: %d: %s", resp.StatusCode, body)
|
||||
}
|
||||
|
||||
return nil
|
||||
return c.sendDataRetryable(ctx, req)
|
||||
}
|
||||
|
||||
func (c *client) sendDataRetryable(ctx context.Context, req *http.Request) error {
|
||||
exponentialBackOff := backoff.NewExponentialBackOff()
|
||||
exponentialBackOff.MaxElapsedTime = maxElapsedTime
|
||||
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set(tokenHashHeader, c.tokenHash)
|
||||
|
||||
return backoff.RetryNotify(
|
||||
func() error {
|
||||
resp, err := c.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to call Pilot: %w", err)
|
||||
}
|
||||
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
|
||||
body, err := ioutil.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read response body: %w", err)
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return fmt.Errorf("wrong status code while sending configuration: %d: %s", resp.StatusCode, body)
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
backoff.WithContext(exponentialBackOff, ctx),
|
||||
func(err error, duration time.Duration) {
|
||||
log.WithoutContext().Errorf("retry in %s due to: %v ", duration, err)
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue