fix: clean code related to Hub
This commit is contained in:
parent
1522afe2ec
commit
511762cbf3
27 changed files with 97 additions and 1076 deletions
|
@ -1,147 +0,0 @@
|
|||
package hub
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/traefik/traefik/v2/pkg/config/dynamic"
|
||||
"github.com/traefik/traefik/v2/pkg/log"
|
||||
)
|
||||
|
||||
type handler struct {
|
||||
mux *http.ServeMux
|
||||
|
||||
client http.Client
|
||||
|
||||
entryPoint string
|
||||
port int
|
||||
tlsCfg *TLS
|
||||
|
||||
// Accessed atomically.
|
||||
lastCfgUnixNano int64
|
||||
|
||||
cfgChan chan<- dynamic.Message
|
||||
}
|
||||
|
||||
func newHandler(entryPoint string, port int, cfgChan chan<- dynamic.Message, tlsCfg *TLS, client http.Client) http.Handler {
|
||||
h := &handler{
|
||||
mux: http.NewServeMux(),
|
||||
entryPoint: entryPoint,
|
||||
port: port,
|
||||
cfgChan: cfgChan,
|
||||
tlsCfg: tlsCfg,
|
||||
client: client,
|
||||
}
|
||||
|
||||
h.mux.HandleFunc("/config", h.handleConfig)
|
||||
h.mux.HandleFunc("/discover-ip", h.handleDiscoverIP)
|
||||
h.mux.HandleFunc("/state", h.handleState)
|
||||
|
||||
return h
|
||||
}
|
||||
|
||||
type configRequest struct {
|
||||
UnixNano int64 `json:"unixNano"`
|
||||
Configuration *dynamic.Configuration `json:"configuration"`
|
||||
}
|
||||
|
||||
func (h *handler) handleConfig(rw http.ResponseWriter, req *http.Request) {
|
||||
if req.Method != http.MethodPost {
|
||||
http.Error(rw, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
payload := &configRequest{Configuration: emptyDynamicConfiguration()}
|
||||
if err := json.NewDecoder(req.Body).Decode(payload); err != nil {
|
||||
err = fmt.Errorf("decoding config request: %w", err)
|
||||
log.WithoutContext().Errorf("Handling config: %v", err)
|
||||
http.Error(rw, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
cfg := payload.Configuration
|
||||
patchDynamicConfiguration(cfg, h.entryPoint, h.port, h.tlsCfg)
|
||||
|
||||
// We can safely drop messages here if the other end is not ready to receive them
|
||||
// as the agent will re-apply the same configuration.
|
||||
select {
|
||||
case h.cfgChan <- dynamic.Message{ProviderName: "hub", Configuration: cfg}:
|
||||
atomic.StoreInt64(&h.lastCfgUnixNano, payload.UnixNano)
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (h *handler) handleDiscoverIP(rw http.ResponseWriter, req *http.Request) {
|
||||
if req.Method != http.MethodGet {
|
||||
http.Error(rw, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
xff := req.Header.Get("X-Forwarded-For")
|
||||
port := req.URL.Query().Get("port")
|
||||
nonce := req.URL.Query().Get("nonce")
|
||||
|
||||
if err := h.doDiscoveryReq(req.Context(), xff, port, nonce); err != nil {
|
||||
err = fmt.Errorf("doing discovery request: %w", err)
|
||||
log.WithoutContext().Errorf("Handling IP discovery: %v", err)
|
||||
http.Error(rw, http.StatusText(http.StatusBadGateway), http.StatusBadGateway)
|
||||
return
|
||||
}
|
||||
|
||||
if err := json.NewEncoder(rw).Encode(xff); err != nil {
|
||||
err = fmt.Errorf("encoding discover ip response: %w", err)
|
||||
log.WithoutContext().Errorf("Handling IP discovery: %v", err)
|
||||
http.Error(rw, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (h *handler) doDiscoveryReq(ctx context.Context, ip, port, nonce string) error {
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("https://%s", net.JoinHostPort(ip, port)), http.NoBody)
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating request: %w", err)
|
||||
}
|
||||
|
||||
q := make(url.Values)
|
||||
q.Set("nonce", nonce)
|
||||
req.URL.RawQuery = q.Encode()
|
||||
req.Host = "agent.traefik"
|
||||
|
||||
resp, err := h.client.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("doing request: %w", err)
|
||||
}
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type stateResponse struct {
|
||||
LastConfigUnixNano int64 `json:"lastConfigUnixNano"`
|
||||
}
|
||||
|
||||
func (h *handler) handleState(rw http.ResponseWriter, req *http.Request) {
|
||||
if req.Method != http.MethodGet {
|
||||
http.Error(rw, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
resp := stateResponse{
|
||||
LastConfigUnixNano: atomic.LoadInt64(&h.lastCfgUnixNano),
|
||||
}
|
||||
if err := json.NewEncoder(rw).Encode(resp); err != nil {
|
||||
err = fmt.Errorf("encoding last config received response: %w", err)
|
||||
log.WithoutContext().Errorf("Handling state: %v", err)
|
||||
http.Error(rw, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (h *handler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
|
||||
h.mux.ServeHTTP(rw, req)
|
||||
}
|
|
@ -1,168 +0,0 @@
|
|||
package hub
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
"github.com/traefik/traefik/v2/pkg/config/dynamic"
|
||||
"github.com/traefik/traefik/v2/pkg/tls/generate"
|
||||
)
|
||||
|
||||
func TestHandleConfig(t *testing.T) {
|
||||
cfgChan := make(chan dynamic.Message, 1)
|
||||
|
||||
client, err := createAgentClient(&TLS{Insecure: true})
|
||||
require.NoError(t, err)
|
||||
h := newHandler("traefik-hub-ep", 42, cfgChan, nil, client)
|
||||
|
||||
cfg := emptyDynamicConfiguration()
|
||||
cfg.HTTP.Routers["foo"] = &dynamic.Router{
|
||||
EntryPoints: []string{"ep"},
|
||||
Service: "bar",
|
||||
Rule: "Host(`foo.com`)",
|
||||
}
|
||||
|
||||
req := configRequest{Configuration: cfg}
|
||||
|
||||
b, err := json.Marshal(req)
|
||||
require.NoError(t, err)
|
||||
|
||||
server := httptest.NewServer(h)
|
||||
t.Cleanup(server.Close)
|
||||
|
||||
resp, err := http.Post(server.URL+"/config", "application/json", bytes.NewReader(b))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, http.StatusOK, resp.StatusCode)
|
||||
|
||||
select {
|
||||
case gotCfgRaw := <-cfgChan:
|
||||
patchDynamicConfiguration(cfg, "traefik-hub-ep", 42, nil)
|
||||
assert.Equal(t, cfg, gotCfgRaw.Configuration)
|
||||
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("Configuration not received")
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandle_Config_MethodNotAllowed(t *testing.T) {
|
||||
cfgChan := make(chan dynamic.Message, 1)
|
||||
client, err := createAgentClient(&TLS{Insecure: true})
|
||||
require.NoError(t, err)
|
||||
h := newHandler("traefik-hub-ep", 42, cfgChan, nil, client)
|
||||
|
||||
server := httptest.NewServer(h)
|
||||
t.Cleanup(server.Close)
|
||||
|
||||
resp, err := http.Get(server.URL + "/config")
|
||||
require.NoError(t, err)
|
||||
|
||||
err = resp.Body.Close()
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, http.StatusMethodNotAllowed, resp.StatusCode)
|
||||
}
|
||||
|
||||
func TestHandle_DiscoverIP(t *testing.T) {
|
||||
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
require.NoError(t, err)
|
||||
|
||||
port := listener.Addr().(*net.TCPAddr).Port
|
||||
nonce := "XVlBzgbaiCMRAjWw"
|
||||
|
||||
mux := http.NewServeMux()
|
||||
|
||||
var handlerCallCount int
|
||||
mux.HandleFunc("/", func(_ http.ResponseWriter, req *http.Request) {
|
||||
handlerCallCount++
|
||||
assert.Equal(t, nonce, req.URL.Query().Get("nonce"))
|
||||
})
|
||||
|
||||
certificate, err := generate.DefaultCertificate()
|
||||
require.NoError(t, err)
|
||||
agentServer := &http.Server{
|
||||
Handler: mux,
|
||||
TLSConfig: &tls.Config{
|
||||
Certificates: []tls.Certificate{*certificate},
|
||||
InsecureSkipVerify: true,
|
||||
MinVersion: tls.VersionTLS13,
|
||||
},
|
||||
}
|
||||
t.Cleanup(func() { _ = agentServer.Close() })
|
||||
|
||||
rdy := make(chan struct{})
|
||||
|
||||
go func(s *http.Server) {
|
||||
close(rdy)
|
||||
if err = s.ServeTLS(listener, "", ""); errors.Is(err, http.ErrServerClosed) {
|
||||
return
|
||||
}
|
||||
}(agentServer)
|
||||
|
||||
<-rdy
|
||||
|
||||
cfgChan := make(chan dynamic.Message, 1)
|
||||
client, err := createAgentClient(&TLS{Insecure: true})
|
||||
require.NoError(t, err)
|
||||
h := newHandler("traefik-hub-ep", 42, cfgChan, nil, client)
|
||||
|
||||
traefikServer := httptest.NewServer(h)
|
||||
t.Cleanup(traefikServer.Close)
|
||||
|
||||
req, err := http.NewRequest(http.MethodGet, traefikServer.URL+"/discover-ip", http.NoBody)
|
||||
require.NoError(t, err)
|
||||
|
||||
q := make(url.Values)
|
||||
q.Set("port", strconv.Itoa(port))
|
||||
q.Set("nonce", nonce)
|
||||
req.URL.RawQuery = q.Encode()
|
||||
|
||||
// Simulate a call from behind different proxies.
|
||||
req.Header.Add("X-Forwarded-For", "127.0.0.1")
|
||||
req.Header.Add("X-Forwarded-For", "10.10.0.13")
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
require.NoError(t, err)
|
||||
|
||||
defer func() {
|
||||
err = resp.Body.Close()
|
||||
require.NoError(t, err)
|
||||
}()
|
||||
|
||||
assert.Equal(t, 1, handlerCallCount)
|
||||
assert.Equal(t, http.StatusOK, resp.StatusCode)
|
||||
|
||||
var ip string
|
||||
err = json.NewDecoder(resp.Body).Decode(&ip)
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, "127.0.0.1", ip)
|
||||
}
|
||||
|
||||
func TestHandle_DiscoverIP_MethodNotAllowed(t *testing.T) {
|
||||
cfgChan := make(chan dynamic.Message, 1)
|
||||
client, err := createAgentClient(&TLS{Insecure: true})
|
||||
require.NoError(t, err)
|
||||
h := newHandler("traefik-hub-ep", 42, cfgChan, nil, client)
|
||||
|
||||
server := httptest.NewServer(h)
|
||||
t.Cleanup(server.Close)
|
||||
|
||||
resp, err := http.Post(server.URL+"/discover-ip", "", http.NoBody)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = resp.Body.Close()
|
||||
require.NoError(t, err)
|
||||
|
||||
assert.Equal(t, http.StatusMethodNotAllowed, resp.StatusCode)
|
||||
}
|
|
@ -1,217 +0,0 @@
|
|||
package hub
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"crypto/x509"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
|
||||
"github.com/traefik/traefik/v2/pkg/config/dynamic"
|
||||
"github.com/traefik/traefik/v2/pkg/log"
|
||||
"github.com/traefik/traefik/v2/pkg/provider"
|
||||
"github.com/traefik/traefik/v2/pkg/safe"
|
||||
ttls "github.com/traefik/traefik/v2/pkg/tls"
|
||||
)
|
||||
|
||||
var _ provider.Provider = (*Provider)(nil)
|
||||
|
||||
// Entrypoints created for Hub.
|
||||
const (
|
||||
APIEntrypoint = "traefikhub-api"
|
||||
TunnelEntrypoint = "traefikhub-tunl"
|
||||
)
|
||||
|
||||
// Provider holds configurations of the provider.
|
||||
type Provider struct {
|
||||
TLS *TLS `description:"TLS configuration for mTLS communication between Traefik and Hub Agent." json:"tls,omitempty" toml:"tls,omitempty" yaml:"tls,omitempty" export:"true"`
|
||||
|
||||
server *http.Server
|
||||
}
|
||||
|
||||
// TLS configures the mTLS connection between Traefik Proxy and the Traefik Hub Agent.
|
||||
type TLS struct {
|
||||
Insecure bool `description:"Enables an insecure TLS connection that uses default credentials, and which has no peer authentication between Traefik Proxy and the Traefik Hub Agent." json:"insecure,omitempty" toml:"insecure,omitempty" yaml:"insecure,omitempty" export:"true"`
|
||||
CA ttls.FileOrContent `description:"The certificate authority authenticates the Traefik Hub Agent certificate." json:"ca,omitempty" toml:"ca,omitempty" yaml:"ca,omitempty" loggable:"false"`
|
||||
Cert ttls.FileOrContent `description:"The TLS certificate for Traefik Proxy as a TLS client." json:"cert,omitempty" toml:"cert,omitempty" yaml:"cert,omitempty" loggable:"false"`
|
||||
Key ttls.FileOrContent `description:"The TLS key for Traefik Proxy as a TLS client." json:"key,omitempty" toml:"key,omitempty" yaml:"key,omitempty" loggable:"false"`
|
||||
}
|
||||
|
||||
// Init the provider.
|
||||
func (p *Provider) Init() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Provide allows the hub provider to provide configurations to traefik using the given configuration channel.
|
||||
func (p *Provider) Provide(configurationChan chan<- dynamic.Message, _ *safe.Pool) error {
|
||||
if p.TLS == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
listener, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
return fmt.Errorf("listener: %w", err)
|
||||
}
|
||||
|
||||
port := listener.Addr().(*net.TCPAddr).Port
|
||||
|
||||
client, err := createAgentClient(p.TLS)
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating Hub Agent HTTP client: %w", err)
|
||||
}
|
||||
|
||||
p.server = &http.Server{Handler: newHandler(APIEntrypoint, port, configurationChan, p.TLS, client)}
|
||||
|
||||
// TODO: this is going to be leaky (because no context to make it terminate)
|
||||
// if/when Provide lifecycle differs with Traefik lifecycle.
|
||||
go func() {
|
||||
if err = p.server.Serve(listener); err != nil {
|
||||
log.WithoutContext().WithField(log.ProviderName, "hub").Errorf("Unexpected error while running server: %v", err)
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
exposeAPIAndMetrics(configurationChan, APIEntrypoint, port, p.TLS)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func exposeAPIAndMetrics(cfgChan chan<- dynamic.Message, ep string, port int, tlsCfg *TLS) {
|
||||
cfg := emptyDynamicConfiguration()
|
||||
|
||||
patchDynamicConfiguration(cfg, ep, port, tlsCfg)
|
||||
|
||||
cfgChan <- dynamic.Message{ProviderName: "hub", Configuration: cfg}
|
||||
}
|
||||
|
||||
func patchDynamicConfiguration(cfg *dynamic.Configuration, ep string, port int, tlsCfg *TLS) {
|
||||
cfg.HTTP.Routers["traefik-hub-agent-api"] = &dynamic.Router{
|
||||
EntryPoints: []string{ep},
|
||||
Service: "api@internal",
|
||||
Rule: "Host(`proxy.traefik`) && PathPrefix(`/api`)",
|
||||
}
|
||||
cfg.HTTP.Routers["traefik-hub-agent-metrics"] = &dynamic.Router{
|
||||
EntryPoints: []string{ep},
|
||||
Service: "prometheus@internal",
|
||||
Rule: "Host(`proxy.traefik`) && PathPrefix(`/metrics`)",
|
||||
}
|
||||
|
||||
cfg.HTTP.Routers["traefik-hub-agent-service"] = &dynamic.Router{
|
||||
EntryPoints: []string{ep},
|
||||
Service: "traefik-hub-agent-service",
|
||||
Rule: "Host(`proxy.traefik`) && PathPrefix(`/config`, `/discover-ip`, `/state`)",
|
||||
}
|
||||
|
||||
cfg.HTTP.Services["traefik-hub-agent-service"] = &dynamic.Service{
|
||||
LoadBalancer: &dynamic.ServersLoadBalancer{
|
||||
Servers: []dynamic.Server{
|
||||
{
|
||||
URL: fmt.Sprintf("http://127.0.0.1:%d", port),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
if tlsCfg == nil {
|
||||
return
|
||||
}
|
||||
|
||||
if tlsCfg.Insecure {
|
||||
cfg.TLS.Options["traefik-hub"] = ttls.Options{
|
||||
MinVersion: "VersionTLS13",
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
cfg.TLS.Options["traefik-hub"] = ttls.Options{
|
||||
ClientAuth: ttls.ClientAuth{
|
||||
CAFiles: []ttls.FileOrContent{tlsCfg.CA},
|
||||
ClientAuthType: "RequireAndVerifyClientCert",
|
||||
},
|
||||
SniStrict: true,
|
||||
MinVersion: "VersionTLS13",
|
||||
}
|
||||
|
||||
cfg.TLS.Certificates = append(cfg.TLS.Certificates, &ttls.CertAndStores{
|
||||
Certificate: ttls.Certificate{
|
||||
CertFile: tlsCfg.Cert,
|
||||
KeyFile: tlsCfg.Key,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
func emptyDynamicConfiguration() *dynamic.Configuration {
|
||||
return &dynamic.Configuration{
|
||||
HTTP: &dynamic.HTTPConfiguration{
|
||||
Routers: make(map[string]*dynamic.Router),
|
||||
Middlewares: make(map[string]*dynamic.Middleware),
|
||||
Services: make(map[string]*dynamic.Service),
|
||||
ServersTransports: make(map[string]*dynamic.ServersTransport),
|
||||
},
|
||||
TCP: &dynamic.TCPConfiguration{
|
||||
Routers: make(map[string]*dynamic.TCPRouter),
|
||||
Services: make(map[string]*dynamic.TCPService),
|
||||
},
|
||||
TLS: &dynamic.TLSConfiguration{
|
||||
Stores: make(map[string]ttls.Store),
|
||||
Options: make(map[string]ttls.Options),
|
||||
},
|
||||
UDP: &dynamic.UDPConfiguration{
|
||||
Routers: make(map[string]*dynamic.UDPRouter),
|
||||
Services: make(map[string]*dynamic.UDPService),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func createAgentClient(tlsCfg *TLS) (http.Client, error) {
|
||||
var client http.Client
|
||||
if tlsCfg.Insecure {
|
||||
client.Transport = &http.Transport{
|
||||
TLSClientConfig: &tls.Config{
|
||||
InsecureSkipVerify: true,
|
||||
MinVersion: tls.VersionTLS13,
|
||||
},
|
||||
}
|
||||
|
||||
return client, nil
|
||||
}
|
||||
|
||||
caContent, err := tlsCfg.CA.Read()
|
||||
if err != nil {
|
||||
return client, fmt.Errorf("reading CA: %w", err)
|
||||
}
|
||||
|
||||
roots := x509.NewCertPool()
|
||||
if ok := roots.AppendCertsFromPEM(caContent); !ok {
|
||||
return client, errors.New("appending CA error")
|
||||
}
|
||||
|
||||
certContent, err := tlsCfg.Cert.Read()
|
||||
if err != nil {
|
||||
return client, fmt.Errorf("reading Cert: %w", err)
|
||||
}
|
||||
keyContent, err := tlsCfg.Key.Read()
|
||||
if err != nil {
|
||||
return client, fmt.Errorf("reading Key: %w", err)
|
||||
}
|
||||
|
||||
certificate, err := tls.X509KeyPair(certContent, keyContent)
|
||||
if err != nil {
|
||||
return client, fmt.Errorf("creating key pair: %w", err)
|
||||
}
|
||||
|
||||
// mTLS
|
||||
client.Transport = &http.Transport{
|
||||
TLSClientConfig: &tls.Config{
|
||||
RootCAs: roots,
|
||||
Certificates: []tls.Certificate{certificate},
|
||||
ServerName: "agent.traefik",
|
||||
ClientAuth: tls.RequireAndVerifyClientCert,
|
||||
MinVersion: tls.VersionTLS13,
|
||||
},
|
||||
}
|
||||
|
||||
return client, nil
|
||||
}
|
|
@ -25,6 +25,7 @@ import (
|
|||
"github.com/traefik/traefik/v2/pkg/log"
|
||||
"github.com/traefik/traefik/v2/pkg/provider"
|
||||
traefikv1alpha1 "github.com/traefik/traefik/v2/pkg/provider/kubernetes/crd/traefikio/v1alpha1"
|
||||
"github.com/traefik/traefik/v2/pkg/provider/kubernetes/k8s"
|
||||
"github.com/traefik/traefik/v2/pkg/safe"
|
||||
"github.com/traefik/traefik/v2/pkg/tls"
|
||||
"github.com/traefik/traefik/v2/pkg/types"
|
||||
|
@ -56,7 +57,25 @@ type Provider struct {
|
|||
IngressClass string `description:"Value of kubernetes.io/ingress.class annotation to watch for." json:"ingressClass,omitempty" toml:"ingressClass,omitempty" yaml:"ingressClass,omitempty" export:"true"`
|
||||
ThrottleDuration ptypes.Duration `description:"Ingress refresh throttle duration" json:"throttleDuration,omitempty" toml:"throttleDuration,omitempty" yaml:"throttleDuration,omitempty" export:"true"`
|
||||
AllowEmptyServices bool `description:"Allow the creation of services without endpoints." json:"allowEmptyServices,omitempty" toml:"allowEmptyServices,omitempty" yaml:"allowEmptyServices,omitempty" export:"true"`
|
||||
lastConfiguration safe.Safe
|
||||
|
||||
lastConfiguration safe.Safe
|
||||
|
||||
routerTransform k8s.RouterTransform
|
||||
}
|
||||
|
||||
func (p *Provider) SetRouterTransform(routerTransform k8s.RouterTransform) {
|
||||
p.routerTransform = routerTransform
|
||||
}
|
||||
|
||||
func (p *Provider) applyRouterTransform(ctx context.Context, rt *dynamic.Router, ingress *traefikv1alpha1.IngressRoute) {
|
||||
if p.routerTransform == nil {
|
||||
return
|
||||
}
|
||||
|
||||
err := p.routerTransform.Apply(ctx, rt, ingress.Annotations)
|
||||
if err != nil {
|
||||
log.FromContext(ctx).WithError(err).Error("Apply router transform")
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Provider) newK8sClient(ctx context.Context) (*clientWrapper, error) {
|
||||
|
|
|
@ -148,6 +148,8 @@ func (p *Provider) loadIngressRouteConfiguration(ctx context.Context, client Cli
|
|||
}
|
||||
}
|
||||
|
||||
p.applyRouterTransform(ctx, r, ingressRoute)
|
||||
|
||||
conf.Routers[normalized] = r
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,6 +22,7 @@ import (
|
|||
"github.com/traefik/traefik/v2/pkg/provider"
|
||||
containousv1alpha1 "github.com/traefik/traefik/v2/pkg/provider/kubernetes/crd/traefikcontainous/v1alpha1"
|
||||
traefikv1alpha1 "github.com/traefik/traefik/v2/pkg/provider/kubernetes/crd/traefikio/v1alpha1"
|
||||
"github.com/traefik/traefik/v2/pkg/provider/kubernetes/k8s"
|
||||
"github.com/traefik/traefik/v2/pkg/safe"
|
||||
"github.com/traefik/traefik/v2/pkg/tls"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
|
@ -53,6 +54,23 @@ type Provider struct {
|
|||
EntryPoints map[string]Entrypoint `json:"-" toml:"-" yaml:"-" label:"-" file:"-"`
|
||||
|
||||
lastConfiguration safe.Safe
|
||||
|
||||
routerTransform k8s.RouterTransform
|
||||
}
|
||||
|
||||
func (p *Provider) SetRouterTransform(routerTransform k8s.RouterTransform) {
|
||||
p.routerTransform = routerTransform
|
||||
}
|
||||
|
||||
func (p *Provider) applyRouterTransform(ctx context.Context, rt *dynamic.Router, route *gatev1alpha2.HTTPRoute) {
|
||||
if p.routerTransform == nil {
|
||||
return
|
||||
}
|
||||
|
||||
err := p.routerTransform.Apply(ctx, rt, route.Annotations)
|
||||
if err != nil {
|
||||
log.FromContext(ctx).WithError(err).Error("Apply router transform")
|
||||
}
|
||||
}
|
||||
|
||||
// Entrypoint defines the available entry points.
|
||||
|
@ -495,7 +513,7 @@ func (p *Provider) fillGatewayConf(ctx context.Context, client Client, gateway *
|
|||
for _, routeKind := range routeKinds {
|
||||
switch routeKind.Kind {
|
||||
case kindHTTPRoute:
|
||||
listenerStatuses[i].Conditions = append(listenerStatuses[i].Conditions, gatewayHTTPRouteToHTTPConf(ctx, ep, listener, gateway, client, conf)...)
|
||||
listenerStatuses[i].Conditions = append(listenerStatuses[i].Conditions, p.gatewayHTTPRouteToHTTPConf(ctx, ep, listener, gateway, client, conf)...)
|
||||
case kindTCPRoute:
|
||||
listenerStatuses[i].Conditions = append(listenerStatuses[i].Conditions, gatewayTCPRouteToTCPConf(ctx, ep, listener, gateway, client, conf)...)
|
||||
case kindTLSRoute:
|
||||
|
@ -654,7 +672,7 @@ func getAllowedRouteKinds(listener gatev1alpha2.Listener, supportedKinds []gatev
|
|||
return routeKinds, conditions
|
||||
}
|
||||
|
||||
func gatewayHTTPRouteToHTTPConf(ctx context.Context, ep string, listener gatev1alpha2.Listener, gateway *gatev1alpha2.Gateway, client Client, conf *dynamic.Configuration) []metav1.Condition {
|
||||
func (p *Provider) gatewayHTTPRouteToHTTPConf(ctx context.Context, ep string, listener gatev1alpha2.Listener, gateway *gatev1alpha2.Gateway, client Client, conf *dynamic.Configuration) []metav1.Condition {
|
||||
if listener.AllowedRoutes == nil {
|
||||
// Should not happen due to validation.
|
||||
return nil
|
||||
|
@ -787,8 +805,11 @@ func gatewayHTTPRouteToHTTPConf(ctx context.Context, ep string, listener gatev1a
|
|||
router.Service = serviceName
|
||||
}
|
||||
|
||||
rt := &router
|
||||
p.applyRouterTransform(ctx, rt, route)
|
||||
|
||||
routerKey = provider.Normalize(routerKey)
|
||||
conf.HTTP.Routers[routerKey] = &router
|
||||
conf.HTTP.Routers[routerKey] = rt
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@ import (
|
|||
"github.com/traefik/traefik/v2/pkg/job"
|
||||
"github.com/traefik/traefik/v2/pkg/log"
|
||||
"github.com/traefik/traefik/v2/pkg/provider"
|
||||
"github.com/traefik/traefik/v2/pkg/provider/kubernetes/k8s"
|
||||
"github.com/traefik/traefik/v2/pkg/safe"
|
||||
"github.com/traefik/traefik/v2/pkg/tls"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
|
@ -46,7 +47,25 @@ type Provider struct {
|
|||
ThrottleDuration ptypes.Duration `description:"Ingress refresh throttle duration" json:"throttleDuration,omitempty" toml:"throttleDuration,omitempty" yaml:"throttleDuration,omitempty" export:"true"`
|
||||
AllowEmptyServices bool `description:"Allow creation of services without endpoints." json:"allowEmptyServices,omitempty" toml:"allowEmptyServices,omitempty" yaml:"allowEmptyServices,omitempty" export:"true"`
|
||||
AllowExternalNameServices bool `description:"Allow ExternalName services." json:"allowExternalNameServices,omitempty" toml:"allowExternalNameServices,omitempty" yaml:"allowExternalNameServices,omitempty" export:"true"`
|
||||
lastConfiguration safe.Safe
|
||||
|
||||
lastConfiguration safe.Safe
|
||||
|
||||
routerTransform k8s.RouterTransform
|
||||
}
|
||||
|
||||
func (p *Provider) SetRouterTransform(routerTransform k8s.RouterTransform) {
|
||||
p.routerTransform = routerTransform
|
||||
}
|
||||
|
||||
func (p *Provider) applyRouterTransform(ctx context.Context, rt *dynamic.Router, ingress *netv1.Ingress) {
|
||||
if p.routerTransform == nil {
|
||||
return
|
||||
}
|
||||
|
||||
err := p.routerTransform.Apply(ctx, rt, ingress.Annotations)
|
||||
if err != nil {
|
||||
log.FromContext(ctx).WithError(err).Error("Apply router transform")
|
||||
}
|
||||
}
|
||||
|
||||
// EndpointIngress holds the endpoint information for the Kubernetes provider.
|
||||
|
@ -262,6 +281,8 @@ func (p *Provider) loadConfigurationFromIngresses(ctx context.Context, client Cl
|
|||
rt.TLS = rtConfig.Router.TLS
|
||||
}
|
||||
|
||||
p.applyRouterTransform(ctx, rt, ingress)
|
||||
|
||||
conf.HTTP.Routers["default-router"] = rt
|
||||
conf.HTTP.Services["default-backend"] = service
|
||||
}
|
||||
|
@ -304,8 +325,13 @@ func (p *Provider) loadConfigurationFromIngresses(ctx context.Context, client Cl
|
|||
serviceName := provider.Normalize(ingress.Namespace + "-" + pa.Backend.Service.Name + "-" + portString)
|
||||
conf.HTTP.Services[serviceName] = service
|
||||
|
||||
rt := loadRouter(rule, pa, rtConfig, serviceName)
|
||||
|
||||
p.applyRouterTransform(ctx, rt, ingress)
|
||||
|
||||
routerKey := strings.TrimPrefix(provider.Normalize(ingress.Namespace+"-"+ingress.Name+"-"+rule.Host+pa.Path), "-")
|
||||
routers[routerKey] = append(routers[routerKey], loadRouter(rule, pa, rtConfig, serviceName))
|
||||
|
||||
routers[routerKey] = append(routers[routerKey], rt)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
11
pkg/provider/kubernetes/k8s/router_transform.go
Normal file
11
pkg/provider/kubernetes/k8s/router_transform.go
Normal file
|
@ -0,0 +1,11 @@
|
|||
package k8s
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/traefik/traefik/v2/pkg/config/dynamic"
|
||||
)
|
||||
|
||||
type RouterTransform interface {
|
||||
Apply(ctx context.Context, rt *dynamic.Router, annotations map[string]string) error
|
||||
}
|
|
@ -167,7 +167,7 @@ func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe.
|
|||
case <-ctxPool.Done():
|
||||
return
|
||||
case event := <-update:
|
||||
logger.Debugf("Received provider event %s", event)
|
||||
logger.Debugf("Received provider event %v", event)
|
||||
|
||||
conf := p.getConfigurations(ctx)
|
||||
if conf != nil {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue