Add Traefik Hub Integration (Experimental Feature)
This commit is contained in:
parent
8d58f33a28
commit
fbdb6e6e78
20 changed files with 992 additions and 11 deletions
146
pkg/provider/hub/handler.go
Normal file
146
pkg/provider/hub/handler.go
Normal file
|
@ -0,0 +1,146 @@
|
|||
package hub
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/traefik/traefik/v2/pkg/config/dynamic"
|
||||
"github.com/traefik/traefik/v2/pkg/log"
|
||||
)
|
||||
|
||||
type handler struct {
|
||||
mux *http.ServeMux
|
||||
|
||||
client http.Client
|
||||
|
||||
entryPoint string
|
||||
port int
|
||||
tlsCfg *TLS
|
||||
|
||||
// Accessed atomically.
|
||||
lastCfgUnixNano int64
|
||||
|
||||
cfgChan chan<- dynamic.Message
|
||||
}
|
||||
|
||||
func newHandler(entryPoint string, port int, cfgChan chan<- dynamic.Message, tlsCfg *TLS, client http.Client) http.Handler {
|
||||
h := &handler{
|
||||
mux: http.NewServeMux(),
|
||||
entryPoint: entryPoint,
|
||||
port: port,
|
||||
cfgChan: cfgChan,
|
||||
tlsCfg: tlsCfg,
|
||||
client: client,
|
||||
}
|
||||
|
||||
h.mux.HandleFunc("/config", h.handleConfig)
|
||||
h.mux.HandleFunc("/discover-ip", h.handleDiscoverIP)
|
||||
h.mux.HandleFunc("/state", h.handleState)
|
||||
|
||||
return h
|
||||
}
|
||||
|
||||
type configRequest struct {
|
||||
UnixNano int64 `json:"unixNano"`
|
||||
Configuration *dynamic.Configuration `json:"configuration"`
|
||||
}
|
||||
|
||||
func (h *handler) handleConfig(rw http.ResponseWriter, req *http.Request) {
|
||||
if req.Method != http.MethodPost {
|
||||
http.Error(rw, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
payload := &configRequest{Configuration: emptyDynamicConfiguration()}
|
||||
if err := json.NewDecoder(req.Body).Decode(payload); err != nil {
|
||||
err = fmt.Errorf("decoding config request: %w", err)
|
||||
log.WithoutContext().Errorf("Handling config: %v", err)
|
||||
http.Error(rw, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
|
||||
cfg := payload.Configuration
|
||||
patchDynamicConfiguration(cfg, h.entryPoint, h.port, h.tlsCfg)
|
||||
|
||||
// We can safely drop messages here if the other end is not ready to receive them
|
||||
// as the agent will re-apply the same configuration.
|
||||
select {
|
||||
case h.cfgChan <- dynamic.Message{ProviderName: "hub", Configuration: cfg}:
|
||||
atomic.StoreInt64(&h.lastCfgUnixNano, payload.UnixNano)
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (h *handler) handleDiscoverIP(rw http.ResponseWriter, req *http.Request) {
|
||||
if req.Method != http.MethodGet {
|
||||
http.Error(rw, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
xff := req.Header.Get("X-Forwarded-For")
|
||||
port := req.URL.Query().Get("port")
|
||||
nonce := req.URL.Query().Get("nonce")
|
||||
|
||||
if err := h.doDiscoveryReq(req.Context(), xff, port, nonce); err != nil {
|
||||
err = fmt.Errorf("doing discovery request: %w", err)
|
||||
log.WithoutContext().Errorf("Handling IP discovery: %v", err)
|
||||
http.Error(rw, http.StatusText(http.StatusBadGateway), http.StatusBadGateway)
|
||||
return
|
||||
}
|
||||
|
||||
if err := json.NewEncoder(rw).Encode(xff); err != nil {
|
||||
err = fmt.Errorf("encoding discover ip response: %w", err)
|
||||
log.WithoutContext().Errorf("Handling IP discovery: %v", err)
|
||||
http.Error(rw, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (h *handler) doDiscoveryReq(ctx context.Context, ip, port, nonce string) error {
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("https://%s:%s", ip, port), http.NoBody)
|
||||
if err != nil {
|
||||
return fmt.Errorf("creating request: %w", err)
|
||||
}
|
||||
|
||||
q := make(url.Values)
|
||||
q.Set("nonce", nonce)
|
||||
req.URL.RawQuery = q.Encode()
|
||||
req.Host = "agent.traefik"
|
||||
|
||||
resp, err := h.client.Do(req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("doing request: %w", err)
|
||||
}
|
||||
defer func() { _ = resp.Body.Close() }()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type stateResponse struct {
|
||||
LastConfigUnixNano int64 `json:"lastConfigUnixNano"`
|
||||
}
|
||||
|
||||
func (h *handler) handleState(rw http.ResponseWriter, req *http.Request) {
|
||||
if req.Method != http.MethodGet {
|
||||
http.Error(rw, http.StatusText(http.StatusMethodNotAllowed), http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
|
||||
resp := stateResponse{
|
||||
LastConfigUnixNano: atomic.LoadInt64(&h.lastCfgUnixNano),
|
||||
}
|
||||
if err := json.NewEncoder(rw).Encode(resp); err != nil {
|
||||
err = fmt.Errorf("encoding last config received response: %w", err)
|
||||
log.WithoutContext().Errorf("Handling state: %v", err)
|
||||
http.Error(rw, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
func (h *handler) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
|
||||
h.mux.ServeHTTP(rw, req)
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue