Add HTTP Provider
* feat: add HTTP provider implementation * refactor: add SetDefaults and struct tag for the new file parser * feat: add TLS configuration property * refactor: rework HTTP provider implementation * feat: provide config only once if fetched config is unchanged * style: lint * ui: add HTTP provider icon * tests: simplify and fix integration test * docs: add reference config for file * docs: move http reference config for file Co-authored-by: Daniel Tomcej <daniel.tomcej@gmail.com>
This commit is contained in:
parent
285ded6e49
commit
1ef93fead7
15 changed files with 816 additions and 1 deletions
172
pkg/provider/http/http.go
Normal file
172
pkg/provider/http/http.go
Normal file
|
@ -0,0 +1,172 @@
|
|||
package http
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"hash/fnv"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
"github.com/containous/traefik/v2/pkg/config/dynamic"
|
||||
"github.com/containous/traefik/v2/pkg/config/file"
|
||||
"github.com/containous/traefik/v2/pkg/job"
|
||||
"github.com/containous/traefik/v2/pkg/log"
|
||||
"github.com/containous/traefik/v2/pkg/provider"
|
||||
"github.com/containous/traefik/v2/pkg/safe"
|
||||
"github.com/containous/traefik/v2/pkg/tls"
|
||||
"github.com/containous/traefik/v2/pkg/types"
|
||||
)
|
||||
|
||||
var _ provider.Provider = (*Provider)(nil)
|
||||
|
||||
// Provider is a provider.Provider implementation that queries an HTTP(s) endpoint for a configuration.
|
||||
type Provider struct {
|
||||
Endpoint string `description:"Load configuration from this endpoint." json:"endpoint" toml:"endpoint" yaml:"endpoint" export:"true"`
|
||||
PollInterval types.Duration `description:"Polling interval for endpoint." json:"pollInterval,omitempty" toml:"pollInterval,omitempty" yaml:"pollInterval,omitempty"`
|
||||
PollTimeout types.Duration `description:"Polling timeout for endpoint." json:"pollTimeout,omitempty" toml:"pollTimeout,omitempty" yaml:"pollTimeout,omitempty"`
|
||||
TLS *types.ClientTLS `description:"Enable TLS support." json:"tls,omitempty" toml:"tls,omitempty" yaml:"tls,omitempty" export:"true"`
|
||||
httpClient *http.Client
|
||||
lastConfigurationHash uint64
|
||||
}
|
||||
|
||||
// SetDefaults sets the default values.
|
||||
func (p *Provider) SetDefaults() {
|
||||
p.PollInterval = types.Duration(5 * time.Second)
|
||||
p.PollTimeout = types.Duration(5 * time.Second)
|
||||
}
|
||||
|
||||
// Init the provider.
|
||||
func (p *Provider) Init() error {
|
||||
if p.Endpoint == "" {
|
||||
return fmt.Errorf("non-empty endpoint is required")
|
||||
}
|
||||
|
||||
if p.PollInterval <= 0 {
|
||||
return fmt.Errorf("poll interval must be greater than 0")
|
||||
}
|
||||
|
||||
p.httpClient = &http.Client{
|
||||
Timeout: time.Duration(p.PollTimeout),
|
||||
}
|
||||
|
||||
if p.TLS != nil {
|
||||
tlsConfig, err := p.TLS.CreateTLSConfig(context.Background())
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to create TLS configuration: %w", err)
|
||||
}
|
||||
|
||||
p.httpClient.Transport = &http.Transport{
|
||||
TLSClientConfig: tlsConfig,
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Provide allows the provider to provide configurations to traefik using the given configuration channel.
|
||||
func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe.Pool) error {
|
||||
pool.GoCtx(func(routineCtx context.Context) {
|
||||
ctxLog := log.With(routineCtx, log.Str(log.ProviderName, "http"))
|
||||
logger := log.FromContext(ctxLog)
|
||||
|
||||
operation := func() error {
|
||||
ticker := time.NewTicker(time.Duration(p.PollInterval))
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
configData, err := p.fetchConfigurationData()
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot fetch configuration data: %w", err)
|
||||
}
|
||||
|
||||
fnvHasher := fnv.New64()
|
||||
|
||||
_, err = fnvHasher.Write(configData)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot hash configuration data: %w", err)
|
||||
}
|
||||
|
||||
hash := fnvHasher.Sum64()
|
||||
if hash == p.lastConfigurationHash {
|
||||
continue
|
||||
}
|
||||
|
||||
p.lastConfigurationHash = hash
|
||||
|
||||
configuration, err := decodeConfiguration(configData)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot decode configuration data: %w", err)
|
||||
}
|
||||
|
||||
configurationChan <- dynamic.Message{
|
||||
ProviderName: "http",
|
||||
Configuration: configuration,
|
||||
}
|
||||
|
||||
case <-routineCtx.Done():
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
notify := func(err error, time time.Duration) {
|
||||
logger.Errorf("Provider connection error %+v, retrying in %s", err, time)
|
||||
}
|
||||
err := backoff.RetryNotify(safe.OperationWithRecover(operation), backoff.WithContext(job.NewBackOff(backoff.NewExponentialBackOff()), ctxLog), notify)
|
||||
if err != nil {
|
||||
logger.Errorf("Cannot connect to server endpoint %+v", err)
|
||||
}
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// fetchConfigurationData fetches the configuration data from the configured endpoint.
|
||||
func (p *Provider) fetchConfigurationData() ([]byte, error) {
|
||||
res, err := p.httpClient.Get(p.Endpoint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
defer res.Body.Close()
|
||||
|
||||
if res.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("received non-ok response code: %d", res.StatusCode)
|
||||
}
|
||||
|
||||
return ioutil.ReadAll(res.Body)
|
||||
}
|
||||
|
||||
// decodeConfiguration decodes and returns the dynamic configuration from the given data.
|
||||
func decodeConfiguration(data []byte) (*dynamic.Configuration, error) {
|
||||
configuration := &dynamic.Configuration{
|
||||
HTTP: &dynamic.HTTPConfiguration{
|
||||
Routers: make(map[string]*dynamic.Router),
|
||||
Middlewares: make(map[string]*dynamic.Middleware),
|
||||
Services: make(map[string]*dynamic.Service),
|
||||
},
|
||||
TCP: &dynamic.TCPConfiguration{
|
||||
Routers: make(map[string]*dynamic.TCPRouter),
|
||||
Services: make(map[string]*dynamic.TCPService),
|
||||
},
|
||||
TLS: &dynamic.TLSConfiguration{
|
||||
Stores: make(map[string]tls.Store),
|
||||
Options: make(map[string]tls.Options),
|
||||
},
|
||||
UDP: &dynamic.UDPConfiguration{
|
||||
Routers: make(map[string]*dynamic.UDPRouter),
|
||||
Services: make(map[string]*dynamic.UDPService),
|
||||
},
|
||||
}
|
||||
|
||||
err := file.DecodeContent(string(data), ".yaml", configuration)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return configuration, nil
|
||||
}
|
253
pkg/provider/http/http_test.go
Normal file
253
pkg/provider/http/http_test.go
Normal file
|
@ -0,0 +1,253 @@
|
|||
package http
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/containous/traefik/v2/pkg/config/dynamic"
|
||||
"github.com/containous/traefik/v2/pkg/safe"
|
||||
"github.com/containous/traefik/v2/pkg/tls"
|
||||
"github.com/containous/traefik/v2/pkg/types"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestProvider_Init(t *testing.T) {
|
||||
tests := []struct {
|
||||
desc string
|
||||
endpoint string
|
||||
pollInterval types.Duration
|
||||
expErr bool
|
||||
}{
|
||||
{
|
||||
desc: "should return an error if no endpoint is configured",
|
||||
expErr: true,
|
||||
},
|
||||
{
|
||||
desc: "should return an error if pollInterval is equal to 0",
|
||||
endpoint: "http://localhost:8080",
|
||||
expErr: true,
|
||||
},
|
||||
{
|
||||
desc: "should not return an error",
|
||||
endpoint: "http://localhost:8080",
|
||||
pollInterval: types.Duration(time.Second),
|
||||
expErr: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.desc, func(t *testing.T) {
|
||||
provider := &Provider{
|
||||
Endpoint: test.endpoint,
|
||||
PollInterval: test.pollInterval,
|
||||
}
|
||||
|
||||
err := provider.Init()
|
||||
if test.expErr {
|
||||
require.Error(t, err)
|
||||
return
|
||||
}
|
||||
|
||||
require.NoError(t, err)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestProvider_SetDefaults(t *testing.T) {
|
||||
provider := &Provider{}
|
||||
|
||||
provider.SetDefaults()
|
||||
|
||||
assert.Equal(t, provider.PollInterval, types.Duration(5*time.Second))
|
||||
assert.Equal(t, provider.PollTimeout, types.Duration(5*time.Second))
|
||||
}
|
||||
|
||||
func TestProvider_fetchConfigurationData(t *testing.T) {
|
||||
tests := []struct {
|
||||
desc string
|
||||
handler func(rw http.ResponseWriter, req *http.Request)
|
||||
expData []byte
|
||||
expErr bool
|
||||
}{
|
||||
{
|
||||
desc: "should return the fetched configuration data",
|
||||
expData: []byte("{}"),
|
||||
handler: func(rw http.ResponseWriter, req *http.Request) {
|
||||
rw.WriteHeader(http.StatusOK)
|
||||
_, _ = fmt.Fprintf(rw, "{}")
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "should return an error if endpoint does not return an OK status code",
|
||||
expErr: true,
|
||||
handler: func(rw http.ResponseWriter, req *http.Request) {
|
||||
rw.WriteHeader(http.StatusNoContent)
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.desc, func(t *testing.T) {
|
||||
server := httptest.NewServer(http.HandlerFunc(test.handler))
|
||||
defer server.Close()
|
||||
|
||||
provider := Provider{
|
||||
Endpoint: server.URL,
|
||||
PollInterval: types.Duration(1 * time.Second),
|
||||
PollTimeout: types.Duration(1 * time.Second),
|
||||
}
|
||||
|
||||
err := provider.Init()
|
||||
require.NoError(t, err)
|
||||
|
||||
configData, err := provider.fetchConfigurationData()
|
||||
if test.expErr {
|
||||
require.Error(t, err)
|
||||
return
|
||||
}
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, test.expData, configData)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestProvider_decodeConfiguration(t *testing.T) {
|
||||
tests := []struct {
|
||||
desc string
|
||||
configData []byte
|
||||
expConfig *dynamic.Configuration
|
||||
expErr bool
|
||||
}{
|
||||
{
|
||||
desc: "should return an error if the configuration data cannot be decoded",
|
||||
expErr: true,
|
||||
configData: []byte("{"),
|
||||
},
|
||||
{
|
||||
desc: "should return the decoded dynamic configuration",
|
||||
configData: []byte("{\"tcp\":{\"routers\":{\"foo\":{}}}}"),
|
||||
expConfig: &dynamic.Configuration{
|
||||
HTTP: &dynamic.HTTPConfiguration{
|
||||
Routers: make(map[string]*dynamic.Router),
|
||||
Middlewares: make(map[string]*dynamic.Middleware),
|
||||
Services: make(map[string]*dynamic.Service),
|
||||
},
|
||||
TCP: &dynamic.TCPConfiguration{
|
||||
Routers: map[string]*dynamic.TCPRouter{
|
||||
"foo": {},
|
||||
},
|
||||
Services: make(map[string]*dynamic.TCPService),
|
||||
},
|
||||
TLS: &dynamic.TLSConfiguration{
|
||||
Stores: make(map[string]tls.Store),
|
||||
Options: make(map[string]tls.Options),
|
||||
},
|
||||
UDP: &dynamic.UDPConfiguration{
|
||||
Routers: make(map[string]*dynamic.UDPRouter),
|
||||
Services: make(map[string]*dynamic.UDPService),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range tests {
|
||||
t.Run(test.desc, func(t *testing.T) {
|
||||
configuration, err := decodeConfiguration(test.configData)
|
||||
if test.expErr {
|
||||
require.Error(t, err)
|
||||
return
|
||||
}
|
||||
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, test.expConfig, configuration)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestProvider_Provide(t *testing.T) {
|
||||
handler := func(rw http.ResponseWriter, req *http.Request) {
|
||||
rw.WriteHeader(http.StatusOK)
|
||||
_, _ = fmt.Fprintf(rw, "{}")
|
||||
}
|
||||
|
||||
server := httptest.NewServer(http.HandlerFunc(handler))
|
||||
defer server.Close()
|
||||
|
||||
provider := Provider{
|
||||
Endpoint: server.URL,
|
||||
PollTimeout: types.Duration(1 * time.Second),
|
||||
PollInterval: types.Duration(100 * time.Millisecond),
|
||||
}
|
||||
|
||||
err := provider.Init()
|
||||
require.NoError(t, err)
|
||||
|
||||
configurationChan := make(chan dynamic.Message)
|
||||
|
||||
expConfiguration := &dynamic.Configuration{
|
||||
HTTP: &dynamic.HTTPConfiguration{
|
||||
Routers: make(map[string]*dynamic.Router),
|
||||
Middlewares: make(map[string]*dynamic.Middleware),
|
||||
Services: make(map[string]*dynamic.Service),
|
||||
},
|
||||
TCP: &dynamic.TCPConfiguration{
|
||||
Routers: make(map[string]*dynamic.TCPRouter),
|
||||
Services: make(map[string]*dynamic.TCPService),
|
||||
},
|
||||
TLS: &dynamic.TLSConfiguration{
|
||||
Stores: make(map[string]tls.Store),
|
||||
Options: make(map[string]tls.Options),
|
||||
},
|
||||
UDP: &dynamic.UDPConfiguration{
|
||||
Routers: make(map[string]*dynamic.UDPRouter),
|
||||
Services: make(map[string]*dynamic.UDPService),
|
||||
},
|
||||
}
|
||||
|
||||
err = provider.Provide(configurationChan, safe.NewPool(context.Background()))
|
||||
require.NoError(t, err)
|
||||
|
||||
timeout := time.After(time.Second)
|
||||
|
||||
select {
|
||||
case configuration := <-configurationChan:
|
||||
assert.NotNil(t, configuration.Configuration)
|
||||
assert.Equal(t, expConfiguration, configuration.Configuration)
|
||||
case <-timeout:
|
||||
t.Errorf("timeout while waiting for config")
|
||||
}
|
||||
}
|
||||
|
||||
func TestProvider_ProvideConfigurationOnlyOnceIfUnchanged(t *testing.T) {
|
||||
handler := func(rw http.ResponseWriter, req *http.Request) {
|
||||
rw.WriteHeader(http.StatusOK)
|
||||
_, _ = fmt.Fprintf(rw, "{}")
|
||||
}
|
||||
|
||||
server := httptest.NewServer(http.HandlerFunc(handler))
|
||||
defer server.Close()
|
||||
|
||||
provider := Provider{
|
||||
Endpoint: server.URL + "/endpoint",
|
||||
PollTimeout: types.Duration(1 * time.Second),
|
||||
PollInterval: types.Duration(100 * time.Millisecond),
|
||||
}
|
||||
|
||||
err := provider.Init()
|
||||
require.NoError(t, err)
|
||||
|
||||
configurationChan := make(chan dynamic.Message, 10)
|
||||
|
||||
err = provider.Provide(configurationChan, safe.NewPool(context.Background()))
|
||||
require.NoError(t, err)
|
||||
|
||||
time.Sleep(time.Second)
|
||||
|
||||
assert.Equal(t, 1, len(configurationChan))
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue