Merge branch 'v2.1' into master
This commit is contained in:
commit
829649e905
73 changed files with 1497 additions and 517 deletions
|
@ -4,18 +4,20 @@ package cli
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
)
|
||||
|
||||
// Command structure contains program/command information (command name and description).
|
||||
type Command struct {
|
||||
Name string
|
||||
Description string
|
||||
Configuration interface{}
|
||||
Resources []ResourceLoader
|
||||
Run func([]string) error
|
||||
Hidden bool
|
||||
Name string
|
||||
Description string
|
||||
Configuration interface{}
|
||||
Resources []ResourceLoader
|
||||
Run func([]string) error
|
||||
CustomHelpFunc func(io.Writer, *Command) error
|
||||
Hidden bool
|
||||
// AllowArg if not set, disallows any argument that is not a known command or a sub-command.
|
||||
AllowArg bool
|
||||
subCommands []*Command
|
||||
|
@ -35,6 +37,15 @@ func (c *Command) AddCommand(cmd *Command) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// PrintHelp calls the custom help function of the command if it's set.
|
||||
// Otherwise, it calls the default help function.
|
||||
func (c *Command) PrintHelp(w io.Writer) error {
|
||||
if c.CustomHelpFunc != nil {
|
||||
return c.CustomHelpFunc(w, c)
|
||||
}
|
||||
return PrintHelp(w, c)
|
||||
}
|
||||
|
||||
// Execute Executes a command.
|
||||
func Execute(cmd *Command) error {
|
||||
return execute(cmd, os.Args, true)
|
||||
|
@ -61,10 +72,12 @@ func execute(cmd *Command, args []string, root bool) error {
|
|||
|
||||
// Calls command by its name.
|
||||
if len(args) >= 2 && cmd.Name == args[1] {
|
||||
if err := run(cmd, args[2:]); err != nil {
|
||||
return fmt.Errorf("command %s error: %v", cmd.Name, err)
|
||||
if len(args) < 3 || !contains(cmd.subCommands, args[2]) {
|
||||
if err := run(cmd, args[2:]); err != nil {
|
||||
return fmt.Errorf("command %s error: %v", cmd.Name, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// No sub-command, calls the current command.
|
||||
|
@ -78,6 +91,9 @@ func execute(cmd *Command, args []string, root bool) error {
|
|||
// Trying to find the sub-command.
|
||||
for _, subCmd := range cmd.subCommands {
|
||||
if len(args) >= 2 && subCmd.Name == args[1] {
|
||||
return execute(subCmd, args, false)
|
||||
}
|
||||
if len(args) >= 3 && subCmd.Name == args[2] {
|
||||
return execute(subCmd, args[1:], false)
|
||||
}
|
||||
}
|
||||
|
@ -87,16 +103,16 @@ func execute(cmd *Command, args []string, root bool) error {
|
|||
|
||||
func run(cmd *Command, args []string) error {
|
||||
if len(args) > 0 && !isFlag(args[0]) && !cmd.AllowArg {
|
||||
_ = PrintHelp(os.Stdout, cmd)
|
||||
_ = cmd.PrintHelp(os.Stdout)
|
||||
return fmt.Errorf("command not found: %s", args[0])
|
||||
}
|
||||
|
||||
if isHelp(args) {
|
||||
return PrintHelp(os.Stdout, cmd)
|
||||
return cmd.PrintHelp(os.Stdout)
|
||||
}
|
||||
|
||||
if cmd.Run == nil {
|
||||
_ = PrintHelp(os.Stdout, cmd)
|
||||
_ = cmd.PrintHelp(os.Stdout)
|
||||
return fmt.Errorf("command %s is not runnable", cmd.Name)
|
||||
}
|
||||
|
||||
|
|
|
@ -1,6 +1,10 @@
|
|||
package cli
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"strings"
|
||||
|
@ -55,6 +59,63 @@ func TestCommand_AddCommand(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestCommand_PrintHelp(t *testing.T) {
|
||||
testCases := []struct {
|
||||
desc string
|
||||
command *Command
|
||||
expectedOutput string
|
||||
expectedError error
|
||||
}{
|
||||
{
|
||||
desc: "print default help",
|
||||
command: &Command{},
|
||||
expectedOutput: " \n\nUsage: [command] [flags] [arguments]\n\nUse \" [command] --help\" for help on any command.\n\n",
|
||||
},
|
||||
{
|
||||
desc: "print custom help",
|
||||
command: &Command{
|
||||
Name: "root",
|
||||
Description: "Description for root",
|
||||
Configuration: &struct {
|
||||
Foo []struct {
|
||||
Field string
|
||||
}
|
||||
}{},
|
||||
Run: func(args []string) error {
|
||||
return nil
|
||||
},
|
||||
CustomHelpFunc: func(w io.Writer, _ *Command) error {
|
||||
_, _ = fmt.Fprintln(w, "test")
|
||||
return nil
|
||||
},
|
||||
},
|
||||
expectedOutput: "test\n",
|
||||
},
|
||||
{
|
||||
desc: "error is returned from called help",
|
||||
command: &Command{
|
||||
CustomHelpFunc: func(_ io.Writer, _ *Command) error {
|
||||
return errors.New("test")
|
||||
},
|
||||
},
|
||||
expectedError: errors.New("test"),
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range testCases {
|
||||
test := test
|
||||
t.Run(test.desc, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
buffer := &bytes.Buffer{}
|
||||
err := test.command.PrintHelp(buffer)
|
||||
|
||||
assert.Equal(t, test.expectedError, err)
|
||||
assert.Equal(t, test.expectedOutput, buffer.String())
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func Test_execute(t *testing.T) {
|
||||
var called string
|
||||
|
||||
|
@ -559,6 +620,88 @@ func Test_execute(t *testing.T) {
|
|||
},
|
||||
expected: expected{result: "root---foo=bar--fii=bir"},
|
||||
},
|
||||
{
|
||||
desc: "sub command help",
|
||||
args: []string{"", "test", "subtest", "--help"},
|
||||
command: func() *Command {
|
||||
rootCmd := &Command{
|
||||
Name: "test",
|
||||
Resources: []ResourceLoader{&FlagLoader{}},
|
||||
}
|
||||
|
||||
subCmd := &Command{
|
||||
Name: "subtest",
|
||||
Resources: []ResourceLoader{&FlagLoader{}},
|
||||
}
|
||||
|
||||
err := rootCmd.AddCommand(subCmd)
|
||||
require.NoError(t, err)
|
||||
|
||||
subSubCmd := &Command{
|
||||
Name: "subsubtest",
|
||||
Resources: []ResourceLoader{&FlagLoader{}},
|
||||
}
|
||||
|
||||
err = subCmd.AddCommand(subSubCmd)
|
||||
require.NoError(t, err)
|
||||
|
||||
subSubSubCmd := &Command{
|
||||
Name: "subsubsubtest",
|
||||
Resources: []ResourceLoader{&FlagLoader{}},
|
||||
Run: func([]string) error {
|
||||
called = "subsubsubtest"
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
err = subSubCmd.AddCommand(subSubSubCmd)
|
||||
require.NoError(t, err)
|
||||
|
||||
return rootCmd
|
||||
},
|
||||
expected: expected{},
|
||||
},
|
||||
{
|
||||
desc: "sub sub command help",
|
||||
args: []string{"", "test", "subtest", "subsubtest", "--help"},
|
||||
command: func() *Command {
|
||||
rootCmd := &Command{
|
||||
Name: "test",
|
||||
Resources: []ResourceLoader{&FlagLoader{}},
|
||||
}
|
||||
|
||||
subCmd := &Command{
|
||||
Name: "subtest",
|
||||
Resources: []ResourceLoader{&FlagLoader{}},
|
||||
}
|
||||
|
||||
err := rootCmd.AddCommand(subCmd)
|
||||
require.NoError(t, err)
|
||||
|
||||
subSubCmd := &Command{
|
||||
Name: "subsubtest",
|
||||
Resources: []ResourceLoader{&FlagLoader{}},
|
||||
}
|
||||
|
||||
err = subCmd.AddCommand(subSubCmd)
|
||||
require.NoError(t, err)
|
||||
|
||||
subSubSubCmd := &Command{
|
||||
Name: "subsubsubtest",
|
||||
Resources: []ResourceLoader{&FlagLoader{}},
|
||||
Run: func([]string) error {
|
||||
called = "subsubsubtest"
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
err = subSubCmd.AddCommand(subSubSubCmd)
|
||||
require.NoError(t, err)
|
||||
|
||||
return rootCmd
|
||||
},
|
||||
expected: expected{},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range testCases {
|
||||
|
@ -756,3 +899,43 @@ Flags:
|
|||
|
||||
`, string(out))
|
||||
}
|
||||
|
||||
func TestName(t *testing.T) {
|
||||
rootCmd := &Command{
|
||||
Name: "test",
|
||||
Resources: []ResourceLoader{&FlagLoader{}},
|
||||
}
|
||||
|
||||
subCmd := &Command{
|
||||
Name: "subtest",
|
||||
Resources: []ResourceLoader{&FlagLoader{}},
|
||||
}
|
||||
|
||||
err := rootCmd.AddCommand(subCmd)
|
||||
require.NoError(t, err)
|
||||
|
||||
subSubCmd := &Command{
|
||||
Name: "subsubtest",
|
||||
Resources: []ResourceLoader{&FlagLoader{}},
|
||||
Run: func([]string) error {
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
err = subCmd.AddCommand(subSubCmd)
|
||||
require.NoError(t, err)
|
||||
|
||||
subSubSubCmd := &Command{
|
||||
Name: "subsubsubtest",
|
||||
Resources: []ResourceLoader{&FlagLoader{}},
|
||||
Run: func([]string) error {
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
err = subSubCmd.AddCommand(subSubSubCmd)
|
||||
require.NoError(t, err)
|
||||
|
||||
err = execute(rootCmd, []string{"", "test", "subtest", "subsubtest", "subsubsubtest", "--help"}, true)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
|
|
@ -25,7 +25,7 @@ func (f *FileLoader) GetFilename() string {
|
|||
func (f *FileLoader) Load(args []string, cmd *Command) (bool, error) {
|
||||
ref, err := flag.Parse(args, cmd.Configuration)
|
||||
if err != nil {
|
||||
_ = PrintHelp(os.Stdout, cmd)
|
||||
_ = cmd.PrintHelp(os.Stdout)
|
||||
return false, err
|
||||
}
|
||||
|
||||
|
|
|
@ -25,14 +25,20 @@ const (
|
|||
var singleton *HealthCheck
|
||||
var once sync.Once
|
||||
|
||||
// BalancerHandler includes functionality for load-balancing management.
|
||||
type BalancerHandler interface {
|
||||
ServeHTTP(w http.ResponseWriter, req *http.Request)
|
||||
// Balancer is the set of operations required to manage the list of servers in a
|
||||
// load-balancer.
|
||||
type Balancer interface {
|
||||
Servers() []*url.URL
|
||||
RemoveServer(u *url.URL) error
|
||||
UpsertServer(u *url.URL, options ...roundrobin.ServerOption) error
|
||||
}
|
||||
|
||||
// BalancerHandler includes functionality for load-balancing management.
|
||||
type BalancerHandler interface {
|
||||
ServeHTTP(w http.ResponseWriter, req *http.Request)
|
||||
Balancer
|
||||
}
|
||||
|
||||
// metricsRegistry is a local interface in the health check package, exposing only the required metrics
|
||||
// necessary for the health check package. This makes it easier for the tests.
|
||||
type metricsRegistry interface {
|
||||
|
@ -49,7 +55,7 @@ type Options struct {
|
|||
Transport http.RoundTripper
|
||||
Interval time.Duration
|
||||
Timeout time.Duration
|
||||
LB BalancerHandler
|
||||
LB Balancer
|
||||
}
|
||||
|
||||
func (opt Options) String() string {
|
||||
|
@ -146,18 +152,18 @@ func (hc *HealthCheck) checkBackend(ctx context.Context, backend *BackendConfig)
|
|||
enabledURLs := backend.LB.Servers()
|
||||
var newDisabledURLs []backendURL
|
||||
// FIXME re enable metrics
|
||||
for _, disableURL := range backend.disabledURLs {
|
||||
for _, disabledURL := range backend.disabledURLs {
|
||||
// FIXME serverUpMetricValue := float64(0)
|
||||
if err := checkHealth(disableURL.url, backend); err == nil {
|
||||
if err := checkHealth(disabledURL.url, backend); err == nil {
|
||||
logger.Warnf("Health check up: Returning to server list. Backend: %q URL: %q Weight: %d",
|
||||
backend.name, disableURL.url.String(), disableURL.weight)
|
||||
if err = backend.LB.UpsertServer(disableURL.url, roundrobin.Weight(disableURL.weight)); err != nil {
|
||||
backend.name, disabledURL.url.String(), disabledURL.weight)
|
||||
if err = backend.LB.UpsertServer(disabledURL.url, roundrobin.Weight(disabledURL.weight)); err != nil {
|
||||
logger.Error(err)
|
||||
}
|
||||
// FIXME serverUpMetricValue = 1
|
||||
} else {
|
||||
logger.Warnf("Health check still failing. Backend: %q URL: %q Reason: %s", backend.name, disableURL.url.String(), err)
|
||||
newDisabledURLs = append(newDisabledURLs, disableURL)
|
||||
logger.Warnf("Health check still failing. Backend: %q URL: %q Reason: %s", backend.name, disabledURL.url.String(), err)
|
||||
newDisabledURLs = append(newDisabledURLs, disabledURL)
|
||||
}
|
||||
// FIXME labelValues := []string{"backend", backend.name, "url", backendurl.url.String()}
|
||||
// FIXME hc.metrics.BackendServerUpGauge().With(labelValues...).Set(serverUpMetricValue)
|
||||
|
@ -177,7 +183,7 @@ func (hc *HealthCheck) checkBackend(ctx context.Context, backend *BackendConfig)
|
|||
weight = 1
|
||||
}
|
||||
}
|
||||
logger.Warnf("Health check failed: Remove from server list. Backend: %q URL: %q Weight: %d Reason: %s", backend.name, enableURL.String(), weight, err)
|
||||
logger.Warnf("Health check failed, removing from server list. Backend: %q URL: %q Weight: %d Reason: %s", backend.name, enableURL.String(), weight, err)
|
||||
if err := backend.LB.RemoveServer(enableURL); err != nil {
|
||||
logger.Error(err)
|
||||
}
|
||||
|
@ -281,3 +287,38 @@ func (lb *LbStatusUpdater) UpsertServer(u *url.URL, options ...roundrobin.Server
|
|||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Balancers is a list of Balancers(s) that implements the Balancer interface.
|
||||
type Balancers []Balancer
|
||||
|
||||
// Servers returns the servers url from all the BalancerHandler
|
||||
func (b Balancers) Servers() []*url.URL {
|
||||
var servers []*url.URL
|
||||
for _, lb := range b {
|
||||
servers = append(servers, lb.Servers()...)
|
||||
}
|
||||
|
||||
return servers
|
||||
}
|
||||
|
||||
// RemoveServer removes the given server from all the BalancerHandler,
|
||||
// and updates the status of the server to "DOWN".
|
||||
func (b Balancers) RemoveServer(u *url.URL) error {
|
||||
for _, lb := range b {
|
||||
if err := lb.RemoveServer(u); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpsertServer adds the given server to all the BalancerHandler,
|
||||
// and updates the status of the server to "UP".
|
||||
func (b Balancers) UpsertServer(u *url.URL, options ...roundrobin.ServerOption) error {
|
||||
for _, lb := range b {
|
||||
if err := lb.UpsertServer(u, options...); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -116,7 +116,18 @@ type CoreLogData map[string]interface{}
|
|||
// LogData is the data captured by the middleware so that it can be logged.
|
||||
type LogData struct {
|
||||
Core CoreLogData
|
||||
Request http.Header
|
||||
Request request
|
||||
OriginResponse http.Header
|
||||
DownstreamResponse http.Header
|
||||
DownstreamResponse downstreamResponse
|
||||
}
|
||||
|
||||
type downstreamResponse struct {
|
||||
headers http.Header
|
||||
status int
|
||||
size int64
|
||||
}
|
||||
|
||||
type request struct {
|
||||
headers http.Header
|
||||
count int64
|
||||
}
|
||||
|
|
|
@ -47,8 +47,6 @@ func (n noopCloser) Close() error {
|
|||
|
||||
type handlerParams struct {
|
||||
logDataTable *LogData
|
||||
crr *captureRequestReader
|
||||
crw *captureResponseWriter
|
||||
}
|
||||
|
||||
// Handler will write each request and its response to the access log.
|
||||
|
@ -122,7 +120,7 @@ func NewHandler(config *types.AccessLog) (*Handler, error) {
|
|||
go func() {
|
||||
defer logHandler.wg.Done()
|
||||
for handlerParams := range logHandler.logHandlerChan {
|
||||
logHandler.logTheRoundTrip(handlerParams.logDataTable, handlerParams.crr, handlerParams.crw)
|
||||
logHandler.logTheRoundTrip(handlerParams.logDataTable)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
@ -162,7 +160,12 @@ func (h *Handler) ServeHTTP(rw http.ResponseWriter, req *http.Request, next http
|
|||
StartLocal: now.Local(),
|
||||
}
|
||||
|
||||
logDataTable := &LogData{Core: core, Request: req.Header}
|
||||
logDataTable := &LogData{
|
||||
Core: core,
|
||||
Request: request{
|
||||
headers: req.Header,
|
||||
},
|
||||
}
|
||||
|
||||
reqWithDataTable := req.WithContext(context.WithValue(req.Context(), DataTableKey, logDataTable))
|
||||
|
||||
|
@ -205,16 +208,21 @@ func (h *Handler) ServeHTTP(rw http.ResponseWriter, req *http.Request, next http
|
|||
core[ClientUsername] = usernameIfPresent(reqWithDataTable.URL)
|
||||
}
|
||||
|
||||
logDataTable.DownstreamResponse = crw.Header()
|
||||
logDataTable.DownstreamResponse = downstreamResponse{
|
||||
headers: crw.Header().Clone(),
|
||||
status: crw.Status(),
|
||||
size: crw.Size(),
|
||||
}
|
||||
if crr != nil {
|
||||
logDataTable.Request.count = crr.count
|
||||
}
|
||||
|
||||
if h.config.BufferingSize > 0 {
|
||||
h.logHandlerChan <- handlerParams{
|
||||
logDataTable: logDataTable,
|
||||
crr: crr,
|
||||
crw: crw,
|
||||
}
|
||||
} else {
|
||||
h.logTheRoundTrip(logDataTable, crr, crw)
|
||||
h.logTheRoundTrip(logDataTable)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -264,7 +272,7 @@ func usernameIfPresent(theURL *url.URL) string {
|
|||
}
|
||||
|
||||
// Logging handler to log frontend name, backend name, and elapsed time.
|
||||
func (h *Handler) logTheRoundTrip(logDataTable *LogData, crr *captureRequestReader, crw *captureResponseWriter) {
|
||||
func (h *Handler) logTheRoundTrip(logDataTable *LogData) {
|
||||
core := logDataTable.Core
|
||||
|
||||
retryAttempts, ok := core[RetryAttempts].(int)
|
||||
|
@ -272,23 +280,22 @@ func (h *Handler) logTheRoundTrip(logDataTable *LogData, crr *captureRequestRead
|
|||
retryAttempts = 0
|
||||
}
|
||||
core[RetryAttempts] = retryAttempts
|
||||
core[RequestContentSize] = logDataTable.Request.count
|
||||
|
||||
if crr != nil {
|
||||
core[RequestContentSize] = crr.count
|
||||
}
|
||||
|
||||
core[DownstreamStatus] = crw.Status()
|
||||
status := logDataTable.DownstreamResponse.status
|
||||
core[DownstreamStatus] = status
|
||||
|
||||
// n.b. take care to perform time arithmetic using UTC to avoid errors at DST boundaries.
|
||||
totalDuration := time.Now().UTC().Sub(core[StartUTC].(time.Time))
|
||||
core[Duration] = totalDuration
|
||||
|
||||
if h.keepAccessLog(crw.Status(), retryAttempts, totalDuration) {
|
||||
core[DownstreamContentSize] = crw.Size()
|
||||
if h.keepAccessLog(status, retryAttempts, totalDuration) {
|
||||
size := logDataTable.DownstreamResponse.size
|
||||
core[DownstreamContentSize] = size
|
||||
if original, ok := core[OriginContentSize]; ok {
|
||||
o64 := original.(int64)
|
||||
if crw.Size() != o64 && crw.Size() != 0 {
|
||||
core[GzipRatio] = float64(o64) / float64(crw.Size())
|
||||
if size != o64 && size != 0 {
|
||||
core[GzipRatio] = float64(o64) / float64(size)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -305,9 +312,9 @@ func (h *Handler) logTheRoundTrip(logDataTable *LogData, crr *captureRequestRead
|
|||
}
|
||||
}
|
||||
|
||||
h.redactHeaders(logDataTable.Request, fields, "request_")
|
||||
h.redactHeaders(logDataTable.Request.headers, fields, "request_")
|
||||
h.redactHeaders(logDataTable.OriginResponse, fields, "origin_")
|
||||
h.redactHeaders(logDataTable.DownstreamResponse, fields, "downstream_")
|
||||
h.redactHeaders(logDataTable.DownstreamResponse.headers, fields, "downstream_")
|
||||
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
|
|
|
@ -192,6 +192,7 @@ func TestLoggerJSON(t *testing.T) {
|
|||
Format: JSONFormat,
|
||||
},
|
||||
expected: map[string]func(t *testing.T, value interface{}){
|
||||
RequestContentSize: assertFloat64(0),
|
||||
RequestHost: assertString(testHostname),
|
||||
RequestAddr: assertString(testHostname),
|
||||
RequestMethod: assertString(testMethod),
|
||||
|
|
|
@ -221,13 +221,11 @@ func (s *Header) processCorsHeaders(rw http.ResponseWriter, req *http.Request) b
|
|||
}
|
||||
|
||||
reqAcMethod := req.Header.Get("Access-Control-Request-Method")
|
||||
reqAcHeaders := req.Header.Get("Access-Control-Request-Headers")
|
||||
originHeader := req.Header.Get("Origin")
|
||||
|
||||
if reqAcMethod != "" && reqAcHeaders != "" && originHeader != "" && req.Method == http.MethodOptions {
|
||||
if reqAcMethod != "" && originHeader != "" && req.Method == http.MethodOptions {
|
||||
// If the request is an OPTIONS request with an Access-Control-Request-Method header,
|
||||
// and Access-Control-Request-Headers headers, and Origin headers,
|
||||
// then it is a CORS preflight request,
|
||||
// and Origin headers, then it is a CORS preflight request,
|
||||
// and we need to build a custom response: https://www.w3.org/TR/cors/#preflight-request
|
||||
if s.headers.AccessControlAllowCredentials {
|
||||
rw.Header().Set("Access-Control-Allow-Credentials", "true")
|
||||
|
|
|
@ -275,6 +275,25 @@ func TestCORSPreflights(t *testing.T) {
|
|||
"Access-Control-Allow-Headers": {"origin,X-Forwarded-For"},
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "No Request Headers Preflight",
|
||||
header: NewHeader(emptyHandler, dynamic.Headers{
|
||||
AccessControlAllowMethods: []string{"GET", "OPTIONS", "PUT"},
|
||||
AccessControlAllowOrigin: "*",
|
||||
AccessControlAllowHeaders: []string{"origin", "X-Forwarded-For"},
|
||||
AccessControlMaxAge: 600,
|
||||
}),
|
||||
requestHeaders: map[string][]string{
|
||||
"Access-Control-Request-Method": {"GET", "OPTIONS"},
|
||||
"Origin": {"https://foo.bar.org"},
|
||||
},
|
||||
expected: map[string][]string{
|
||||
"Access-Control-Allow-Origin": {"*"},
|
||||
"Access-Control-Max-Age": {"600"},
|
||||
"Access-Control-Allow-Methods": {"GET,OPTIONS,PUT"},
|
||||
"Access-Control-Allow-Headers": {"origin,X-Forwarded-For"},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range testCases {
|
||||
|
|
|
@ -34,7 +34,11 @@ type entryPointMiddleware struct {
|
|||
}
|
||||
|
||||
func (e *entryPointMiddleware) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
|
||||
spanCtx, _ := e.Extract(opentracing.HTTPHeaders, tracing.HTTPHeadersCarrier(req.Header))
|
||||
spanCtx, err := e.Extract(opentracing.HTTPHeaders, opentracing.HTTPHeadersCarrier(req.Header))
|
||||
if err != nil {
|
||||
log.FromContext(middlewares.GetLoggerCtx(req.Context(), "tracing", entryPointTypeName)).
|
||||
Debugf("Failed to extract the context: %v", err)
|
||||
}
|
||||
|
||||
span, req, finish := e.StartSpanf(req, ext.SpanKindRPCServerEnum, "EntryPoint", []string{e.entryPoint, req.Host}, " ", ext.RPCServerOption(spanCtx))
|
||||
defer finish()
|
||||
|
|
|
@ -12,23 +12,23 @@ import (
|
|||
// It is used in order to create a specific and unique pattern for these labels.
|
||||
const MarathonConstraintPrefix = "Traefik-Marathon-505F9E15-BDC7-45E7-828D-C06C7BAB8091"
|
||||
|
||||
type constraintFunc func(map[string]string) bool
|
||||
type constraintLabelFunc func(map[string]string) bool
|
||||
|
||||
// Match reports whether the expression matches with the given labels.
|
||||
// MatchLabels reports whether the expression matches with the given labels.
|
||||
// The expression must match any logical boolean combination of:
|
||||
// - `Label(labelName, labelValue)`
|
||||
// - `LabelRegex(labelName, regexValue)`
|
||||
// - `MarathonConstraint(field:operator:value)`
|
||||
func Match(labels map[string]string, expr string) (bool, error) {
|
||||
func MatchLabels(labels map[string]string, expr string) (bool, error) {
|
||||
if expr == "" {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
p, err := predicate.NewParser(predicate.Def{
|
||||
Operators: predicate.Operators{
|
||||
AND: andFunc,
|
||||
NOT: notFunc,
|
||||
OR: orFunc,
|
||||
AND: andLabelFunc,
|
||||
NOT: notLabelFunc,
|
||||
OR: orLabelFunc,
|
||||
},
|
||||
Functions: map[string]interface{}{
|
||||
"Label": labelFn,
|
||||
|
@ -45,20 +45,20 @@ func Match(labels map[string]string, expr string) (bool, error) {
|
|||
return false, err
|
||||
}
|
||||
|
||||
fn, ok := parse.(constraintFunc)
|
||||
fn, ok := parse.(constraintLabelFunc)
|
||||
if !ok {
|
||||
return false, errors.New("not a constraintFunc")
|
||||
return false, errors.New("not a constraintLabelFunc")
|
||||
}
|
||||
return fn(labels), nil
|
||||
}
|
||||
|
||||
func labelFn(name, value string) constraintFunc {
|
||||
func labelFn(name, value string) constraintLabelFunc {
|
||||
return func(labels map[string]string) bool {
|
||||
return labels[name] == value
|
||||
}
|
||||
}
|
||||
|
||||
func labelRegexFn(name, expr string) constraintFunc {
|
||||
func labelRegexFn(name, expr string) constraintLabelFunc {
|
||||
return func(labels map[string]string) bool {
|
||||
matched, err := regexp.MatchString(expr, labels[name])
|
||||
if err != nil {
|
||||
|
@ -68,7 +68,7 @@ func labelRegexFn(name, expr string) constraintFunc {
|
|||
}
|
||||
}
|
||||
|
||||
func marathonFn(value string) constraintFunc {
|
||||
func marathonFn(value string) constraintLabelFunc {
|
||||
return func(labels map[string]string) bool {
|
||||
for k, v := range labels {
|
||||
if strings.HasPrefix(k, MarathonConstraintPrefix) {
|
||||
|
@ -81,19 +81,19 @@ func marathonFn(value string) constraintFunc {
|
|||
}
|
||||
}
|
||||
|
||||
func andFunc(a, b constraintFunc) constraintFunc {
|
||||
func andLabelFunc(a, b constraintLabelFunc) constraintLabelFunc {
|
||||
return func(labels map[string]string) bool {
|
||||
return a(labels) && b(labels)
|
||||
}
|
||||
}
|
||||
|
||||
func orFunc(a, b constraintFunc) constraintFunc {
|
||||
func orLabelFunc(a, b constraintLabelFunc) constraintLabelFunc {
|
||||
return func(labels map[string]string) bool {
|
||||
return a(labels) || b(labels)
|
||||
}
|
||||
}
|
||||
|
||||
func notFunc(a constraintFunc) constraintFunc {
|
||||
func notLabelFunc(a constraintLabelFunc) constraintLabelFunc {
|
||||
return func(labels map[string]string) bool {
|
||||
return !a(labels)
|
||||
}
|
|
@ -7,7 +7,7 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestMatch(t *testing.T) {
|
||||
func TestMatchLabels(t *testing.T) {
|
||||
testCases := []struct {
|
||||
expr string
|
||||
labels map[string]string
|
||||
|
@ -192,7 +192,7 @@ func TestMatch(t *testing.T) {
|
|||
t.Run(test.expr, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
matches, err := Match(test.labels, test.expr)
|
||||
matches, err := MatchLabels(test.labels, test.expr)
|
||||
if test.expectedErr {
|
||||
require.Error(t, err)
|
||||
} else {
|
92
pkg/provider/constraints/constraints_tags.go
Normal file
92
pkg/provider/constraints/constraints_tags.go
Normal file
|
@ -0,0 +1,92 @@
|
|||
package constraints
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"regexp"
|
||||
|
||||
"github.com/vulcand/predicate"
|
||||
)
|
||||
|
||||
type constraintTagFunc func([]string) bool
|
||||
|
||||
// MatchTags reports whether the expression matches with the given tags.
|
||||
// The expression must match any logical boolean combination of:
|
||||
// - `Tag(tagValue)`
|
||||
// - `TagRegex(regexValue)`
|
||||
func MatchTags(tags []string, expr string) (bool, error) {
|
||||
if expr == "" {
|
||||
return true, nil
|
||||
}
|
||||
|
||||
p, err := predicate.NewParser(predicate.Def{
|
||||
Operators: predicate.Operators{
|
||||
AND: andTagFunc,
|
||||
NOT: notTagFunc,
|
||||
OR: orTagFunc,
|
||||
},
|
||||
Functions: map[string]interface{}{
|
||||
"Tag": tagFn,
|
||||
"TagRegex": tagRegexFn,
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
parse, err := p.Parse(expr)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
fn, ok := parse.(constraintTagFunc)
|
||||
if !ok {
|
||||
return false, errors.New("not a constraintTagFunc")
|
||||
}
|
||||
return fn(tags), nil
|
||||
}
|
||||
|
||||
func tagFn(name string) constraintTagFunc {
|
||||
return func(tags []string) bool {
|
||||
for _, tag := range tags {
|
||||
if tag == name {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func tagRegexFn(expr string) constraintTagFunc {
|
||||
return func(tags []string) bool {
|
||||
exp, err := regexp.Compile(expr)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
for _, tag := range tags {
|
||||
if exp.MatchString(tag) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func andTagFunc(a, b constraintTagFunc) constraintTagFunc {
|
||||
return func(tags []string) bool {
|
||||
return a(tags) && b(tags)
|
||||
}
|
||||
}
|
||||
|
||||
func orTagFunc(a, b constraintTagFunc) constraintTagFunc {
|
||||
return func(tags []string) bool {
|
||||
return a(tags) || b(tags)
|
||||
}
|
||||
}
|
||||
|
||||
func notTagFunc(a constraintTagFunc) constraintTagFunc {
|
||||
return func(tags []string) bool {
|
||||
return !a(tags)
|
||||
}
|
||||
}
|
111
pkg/provider/constraints/constraints_tags_test.go
Normal file
111
pkg/provider/constraints/constraints_tags_test.go
Normal file
|
@ -0,0 +1,111 @@
|
|||
package constraints
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestMatchTags(t *testing.T) {
|
||||
testCases := []struct {
|
||||
expr string
|
||||
tags []string
|
||||
expected bool
|
||||
expectedErr bool
|
||||
}{
|
||||
{
|
||||
expr: `Tag("world")`,
|
||||
tags: []string{"hello", "world"},
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
expr: `Tag("worlds")`,
|
||||
tags: []string{"hello", "world"},
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
expr: `!Tag("world")`,
|
||||
tags: []string{"hello", "world"},
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
expr: `Tag("hello") && Tag("world")`,
|
||||
tags: []string{"hello", "world"},
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
expr: `Tag("hello") && Tag("worlds")`,
|
||||
tags: []string{"hello", "world"},
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
expr: `Tag("hello") && !Tag("world")`,
|
||||
tags: []string{"hello", "world"},
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
expr: `Tag("hello") || Tag( "world")`,
|
||||
tags: []string{"hello", "world"},
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
expr: `Tag( "worlds") || Tag("hello")`,
|
||||
tags: []string{"hello", "world"},
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
expr: `Tag("hello") || !Tag("world")`,
|
||||
tags: []string{"hello", "world"},
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
expr: `Tag()`,
|
||||
tags: []string{"hello", "world"},
|
||||
expectedErr: true,
|
||||
},
|
||||
{
|
||||
expr: `Foo("hello")`,
|
||||
tags: []string{"hello", "world"},
|
||||
expectedErr: true,
|
||||
},
|
||||
{
|
||||
expr: `Tag("hello")`,
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
expr: ``,
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
expr: `TagRegex("hel\\w+")`,
|
||||
tags: []string{"hello", "world"},
|
||||
expected: true,
|
||||
},
|
||||
{
|
||||
expr: `TagRegex("hell\\w+s")`,
|
||||
tags: []string{"hello", "world"},
|
||||
expected: false,
|
||||
},
|
||||
{
|
||||
expr: `!TagRegex("hel\\w+")`,
|
||||
tags: []string{"hello", "world"},
|
||||
expected: false,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range testCases {
|
||||
test := test
|
||||
t.Run(test.expr, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
matches, err := MatchTags(test.tags, test.expr)
|
||||
if test.expectedErr {
|
||||
require.Error(t, err)
|
||||
} else {
|
||||
require.NoError(t, err)
|
||||
}
|
||||
assert.Equal(t, test.expected, matches)
|
||||
})
|
||||
}
|
||||
}
|
|
@ -18,7 +18,7 @@ func (p *Provider) buildConfiguration(ctx context.Context, items []itemData) *dy
|
|||
configurations := make(map[string]*dynamic.Configuration)
|
||||
|
||||
for _, item := range items {
|
||||
svcName := item.Name + "-" + item.ID
|
||||
svcName := item.Node + "-" + item.Name + "-" + item.ID
|
||||
ctxSvc := log.With(ctx, log.Str("serviceName", svcName))
|
||||
|
||||
if !p.keepContainer(ctxSvc, item) {
|
||||
|
@ -80,7 +80,7 @@ func (p *Provider) keepContainer(ctx context.Context, item itemData) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
matches, err := constraints.Match(item.Labels, p.Constraints)
|
||||
matches, err := constraints.MatchTags(item.Tags, p.Constraints)
|
||||
if err != nil {
|
||||
logger.Errorf("Error matching constraints expression: %v", err)
|
||||
return false
|
||||
|
|
|
@ -2,6 +2,7 @@ package consulcatalog
|
|||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"github.com/containous/traefik/v2/pkg/config/dynamic"
|
||||
|
@ -25,6 +26,7 @@ func TestDefaultRule(t *testing.T) {
|
|||
items: []itemData{
|
||||
{
|
||||
ID: "id",
|
||||
Node: "Node1",
|
||||
Name: "Test",
|
||||
Address: "127.0.0.1",
|
||||
Port: "80",
|
||||
|
@ -66,6 +68,7 @@ func TestDefaultRule(t *testing.T) {
|
|||
items: []itemData{
|
||||
{
|
||||
ID: "id",
|
||||
Node: "Node1",
|
||||
Name: "Test",
|
||||
Address: "127.0.0.1",
|
||||
Port: "80",
|
||||
|
@ -109,6 +112,7 @@ func TestDefaultRule(t *testing.T) {
|
|||
items: []itemData{
|
||||
{
|
||||
ID: "Test",
|
||||
Node: "Node1",
|
||||
Name: "Test",
|
||||
Labels: map[string]string{},
|
||||
Address: "127.0.0.1",
|
||||
|
@ -145,6 +149,7 @@ func TestDefaultRule(t *testing.T) {
|
|||
items: []itemData{
|
||||
{
|
||||
ID: "Test",
|
||||
Node: "Node1",
|
||||
Name: "Test",
|
||||
Labels: map[string]string{},
|
||||
Address: "127.0.0.1",
|
||||
|
@ -181,6 +186,7 @@ func TestDefaultRule(t *testing.T) {
|
|||
items: []itemData{
|
||||
{
|
||||
ID: "Test",
|
||||
Node: "Node1",
|
||||
Name: "Test",
|
||||
Labels: map[string]string{},
|
||||
Address: "127.0.0.1",
|
||||
|
@ -257,6 +263,7 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
items: []itemData{
|
||||
{
|
||||
ID: "Test",
|
||||
Node: "Node1",
|
||||
Name: "Test",
|
||||
Labels: map[string]string{},
|
||||
Address: "127.0.0.1",
|
||||
|
@ -297,6 +304,7 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
items: []itemData{
|
||||
{
|
||||
ID: "Test",
|
||||
Node: "Node1",
|
||||
Name: "Test",
|
||||
Labels: map[string]string{},
|
||||
Address: "127.0.0.1",
|
||||
|
@ -305,6 +313,7 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
},
|
||||
{
|
||||
ID: "Test2",
|
||||
Node: "Node1",
|
||||
Name: "Test2",
|
||||
Labels: map[string]string{},
|
||||
Address: "127.0.0.2",
|
||||
|
@ -359,6 +368,7 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
items: []itemData{
|
||||
{
|
||||
ID: "1",
|
||||
Node: "Node1",
|
||||
Name: "Test",
|
||||
Labels: map[string]string{},
|
||||
Address: "127.0.0.1",
|
||||
|
@ -367,6 +377,110 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
},
|
||||
{
|
||||
ID: "2",
|
||||
Node: "Node1",
|
||||
Name: "Test",
|
||||
Labels: map[string]string{},
|
||||
Address: "127.0.0.2",
|
||||
Port: "80",
|
||||
Status: api.HealthPassing,
|
||||
},
|
||||
},
|
||||
expected: &dynamic.Configuration{
|
||||
TCP: &dynamic.TCPConfiguration{
|
||||
Routers: map[string]*dynamic.TCPRouter{},
|
||||
Services: map[string]*dynamic.TCPService{},
|
||||
},
|
||||
HTTP: &dynamic.HTTPConfiguration{
|
||||
Routers: map[string]*dynamic.Router{
|
||||
"Test": {
|
||||
Service: "Test",
|
||||
Rule: "Host(`Test.traefik.wtf`)",
|
||||
},
|
||||
},
|
||||
Middlewares: map[string]*dynamic.Middleware{},
|
||||
Services: map[string]*dynamic.Service{
|
||||
"Test": {
|
||||
LoadBalancer: &dynamic.ServersLoadBalancer{
|
||||
Servers: []dynamic.Server{
|
||||
{
|
||||
URL: "http://127.0.0.1:80",
|
||||
},
|
||||
{
|
||||
URL: "http://127.0.0.2:80",
|
||||
},
|
||||
},
|
||||
PassHostHeader: Bool(true),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "two containers with same service name & id no label on same node",
|
||||
items: []itemData{
|
||||
{
|
||||
ID: "1",
|
||||
Node: "Node1",
|
||||
Name: "Test",
|
||||
Labels: map[string]string{},
|
||||
Address: "127.0.0.1",
|
||||
Port: "80",
|
||||
Status: api.HealthPassing,
|
||||
},
|
||||
{
|
||||
ID: "1",
|
||||
Node: "Node1",
|
||||
Name: "Test",
|
||||
Labels: map[string]string{},
|
||||
Address: "127.0.0.2",
|
||||
Port: "80",
|
||||
Status: api.HealthPassing,
|
||||
},
|
||||
},
|
||||
expected: &dynamic.Configuration{
|
||||
TCP: &dynamic.TCPConfiguration{
|
||||
Routers: map[string]*dynamic.TCPRouter{},
|
||||
Services: map[string]*dynamic.TCPService{},
|
||||
},
|
||||
HTTP: &dynamic.HTTPConfiguration{
|
||||
Routers: map[string]*dynamic.Router{
|
||||
"Test": {
|
||||
Service: "Test",
|
||||
Rule: "Host(`Test.traefik.wtf`)",
|
||||
},
|
||||
},
|
||||
Middlewares: map[string]*dynamic.Middleware{},
|
||||
Services: map[string]*dynamic.Service{
|
||||
"Test": {
|
||||
LoadBalancer: &dynamic.ServersLoadBalancer{
|
||||
Servers: []dynamic.Server{
|
||||
{
|
||||
URL: "http://127.0.0.2:80",
|
||||
},
|
||||
},
|
||||
PassHostHeader: Bool(true),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
desc: "two containers with same service name & id no label on different nodes",
|
||||
items: []itemData{
|
||||
{
|
||||
ID: "1",
|
||||
Node: "Node1",
|
||||
Name: "Test",
|
||||
Labels: map[string]string{},
|
||||
Address: "127.0.0.1",
|
||||
Port: "80",
|
||||
Status: api.HealthPassing,
|
||||
},
|
||||
{
|
||||
ID: "1",
|
||||
Node: "Node2",
|
||||
Name: "Test",
|
||||
Labels: map[string]string{},
|
||||
Address: "127.0.0.2",
|
||||
|
@ -1320,6 +1434,7 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
items: []itemData{
|
||||
{
|
||||
ID: "Test",
|
||||
Node: "Node1",
|
||||
Name: "Test",
|
||||
Labels: map[string]string{},
|
||||
Address: "127.0.0.2",
|
||||
|
@ -1393,6 +1508,7 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
items: []itemData{
|
||||
{
|
||||
ID: "Test",
|
||||
Node: "Node1",
|
||||
Name: "Test",
|
||||
Labels: map[string]string{},
|
||||
Address: "127.0.0.1",
|
||||
|
@ -1426,7 +1542,7 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
Status: api.HealthPassing,
|
||||
},
|
||||
},
|
||||
constraints: `Label("traefik.tags", "bar")`,
|
||||
constraints: `Tag("traefik.tags=bar")`,
|
||||
expected: &dynamic.Configuration{
|
||||
TCP: &dynamic.TCPConfiguration{
|
||||
Routers: map[string]*dynamic.TCPRouter{},
|
||||
|
@ -1453,7 +1569,7 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
Status: api.HealthPassing,
|
||||
},
|
||||
},
|
||||
constraints: `Label("traefik.tags", "foo")`,
|
||||
constraints: `Tag("traefik.tags=foo")`,
|
||||
expected: &dynamic.Configuration{
|
||||
TCP: &dynamic.TCPConfiguration{
|
||||
Routers: map[string]*dynamic.TCPRouter{},
|
||||
|
@ -1840,6 +1956,12 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
var err error
|
||||
test.items[i].ExtraConf, err = p.getConfiguration(test.items[i])
|
||||
require.NoError(t, err)
|
||||
|
||||
var tags []string
|
||||
for k, v := range test.items[i].Labels {
|
||||
tags = append(tags, fmt.Sprintf("%s=%s", k, v))
|
||||
}
|
||||
test.items[i].Tags = tags
|
||||
}
|
||||
|
||||
configuration := p.buildConfiguration(context.Background(), test.items)
|
||||
|
|
|
@ -24,11 +24,13 @@ var _ provider.Provider = (*Provider)(nil)
|
|||
|
||||
type itemData struct {
|
||||
ID string
|
||||
Node string
|
||||
Name string
|
||||
Address string
|
||||
Port string
|
||||
Status string
|
||||
Labels map[string]string
|
||||
Tags []string
|
||||
ExtraConf configuration
|
||||
}
|
||||
|
||||
|
@ -156,7 +158,6 @@ func (p *Provider) getConsulServicesData(ctx context.Context) ([]itemData, error
|
|||
}
|
||||
|
||||
for _, consulService := range consulServices {
|
||||
labels := tagsToNeutralLabels(consulService.ServiceTags, p.Prefix)
|
||||
address := consulService.ServiceAddress
|
||||
if address == "" {
|
||||
address = consulService.Address
|
||||
|
@ -164,10 +165,12 @@ func (p *Provider) getConsulServicesData(ctx context.Context) ([]itemData, error
|
|||
|
||||
item := itemData{
|
||||
ID: consulService.ServiceID,
|
||||
Node: consulService.Node,
|
||||
Name: consulService.ServiceName,
|
||||
Address: address,
|
||||
Port: strconv.Itoa(consulService.ServicePort),
|
||||
Labels: labels,
|
||||
Labels: tagsToNeutralLabels(consulService.ServiceTags, p.Prefix),
|
||||
Tags: consulService.ServiceTags,
|
||||
Status: consulService.Checks.AggregatedStatus(),
|
||||
}
|
||||
|
||||
|
|
|
@ -6,7 +6,7 @@ import (
|
|||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestTagsToNeutralLabels(t *testing.T) {
|
||||
func Test_tagsToNeutralLabels(t *testing.T) {
|
||||
testCases := []struct {
|
||||
desc string
|
||||
tags []string
|
||||
|
|
|
@ -127,7 +127,7 @@ func (p *Provider) keepContainer(ctx context.Context, container dockerData) bool
|
|||
return false
|
||||
}
|
||||
|
||||
matches, err := constraints.Match(container.Labels, p.Constraints)
|
||||
matches, err := constraints.MatchLabels(container.Labels, p.Constraints)
|
||||
if err != nil {
|
||||
logger.Errorf("Error matching constraints expression: %v", err)
|
||||
return false
|
||||
|
|
|
@ -185,7 +185,7 @@ func (p *Provider) keepApplication(ctx context.Context, extraConf configuration,
|
|||
}
|
||||
|
||||
// Filter by constraints.
|
||||
matches, err := constraints.Match(labels, p.Constraints)
|
||||
matches, err := constraints.MatchLabels(labels, p.Constraints)
|
||||
if err != nil {
|
||||
logger.Errorf("Error matching constraints expression: %v", err)
|
||||
return false
|
||||
|
|
|
@ -121,7 +121,7 @@ func (p *Provider) keepService(ctx context.Context, service rancherData) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
matches, err := constraints.Match(service.Labels, p.Constraints)
|
||||
matches, err := constraints.MatchLabels(service.Labels, p.Constraints)
|
||||
if err != nil {
|
||||
logger.Errorf("Error matching constraints expression: %v", err)
|
||||
return false
|
||||
|
|
|
@ -193,7 +193,7 @@ func (p *Provider) parseMetadataSourcedRancherData(ctx context.Context, stacks [
|
|||
}
|
||||
|
||||
service := rancherData{
|
||||
Name: service.Name + "/" + stack.Name,
|
||||
Name: service.Name + "_" + stack.Name,
|
||||
State: service.State,
|
||||
Labels: service.Labels,
|
||||
Port: servicePort,
|
||||
|
|
|
@ -40,7 +40,7 @@ func NewManager(configs map[string]*runtime.ServiceInfo, defaultRoundTripper htt
|
|||
metricsRegistry: metricsRegistry,
|
||||
bufferPool: newBufferPool(),
|
||||
defaultRoundTripper: defaultRoundTripper,
|
||||
balancers: make(map[string][]healthcheck.BalancerHandler),
|
||||
balancers: make(map[string]healthcheck.Balancers),
|
||||
configs: configs,
|
||||
}
|
||||
}
|
||||
|
@ -51,8 +51,12 @@ type Manager struct {
|
|||
metricsRegistry metrics.Registry
|
||||
bufferPool httputil.BufferPool
|
||||
defaultRoundTripper http.RoundTripper
|
||||
balancers map[string][]healthcheck.BalancerHandler
|
||||
configs map[string]*runtime.ServiceInfo
|
||||
// balancers is the map of all Balancers, keyed by service name.
|
||||
// There is one Balancer per service handler, and there is one service handler per reference to a service
|
||||
// (e.g. if 2 routers refer to the same service name, 2 service handlers are created),
|
||||
// which is why there is not just one Balancer per service name.
|
||||
balancers map[string]healthcheck.Balancers
|
||||
configs map[string]*runtime.ServiceInfo
|
||||
}
|
||||
|
||||
// BuildHTTP Creates a http.Handler for a service configuration.
|
||||
|
@ -92,14 +96,14 @@ func (m *Manager) BuildHTTP(rootCtx context.Context, serviceName string, respons
|
|||
}
|
||||
case conf.Weighted != nil:
|
||||
var err error
|
||||
lb, err = m.getLoadBalancerWRRServiceHandler(ctx, serviceName, conf.Weighted, responseModifier)
|
||||
lb, err = m.getWRRServiceHandler(ctx, serviceName, conf.Weighted, responseModifier)
|
||||
if err != nil {
|
||||
conf.AddError(err, true)
|
||||
return nil, err
|
||||
}
|
||||
case conf.Mirroring != nil:
|
||||
var err error
|
||||
lb, err = m.getLoadBalancerMirrorServiceHandler(ctx, serviceName, conf.Mirroring, responseModifier)
|
||||
lb, err = m.getMirrorServiceHandler(ctx, conf.Mirroring, responseModifier)
|
||||
if err != nil {
|
||||
conf.AddError(err, true)
|
||||
return nil, err
|
||||
|
@ -113,7 +117,7 @@ func (m *Manager) BuildHTTP(rootCtx context.Context, serviceName string, respons
|
|||
return lb, nil
|
||||
}
|
||||
|
||||
func (m *Manager) getLoadBalancerMirrorServiceHandler(ctx context.Context, serviceName string, config *dynamic.Mirroring, responseModifier func(*http.Response) error) (http.Handler, error) {
|
||||
func (m *Manager) getMirrorServiceHandler(ctx context.Context, config *dynamic.Mirroring, responseModifier func(*http.Response) error) (http.Handler, error) {
|
||||
serviceHandler, err := m.BuildHTTP(ctx, config.Service, responseModifier)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -134,7 +138,7 @@ func (m *Manager) getLoadBalancerMirrorServiceHandler(ctx context.Context, servi
|
|||
return handler, nil
|
||||
}
|
||||
|
||||
func (m *Manager) getLoadBalancerWRRServiceHandler(ctx context.Context, serviceName string, config *dynamic.WeightedRoundRobin, responseModifier func(*http.Response) error) (http.Handler, error) {
|
||||
func (m *Manager) getWRRServiceHandler(ctx context.Context, serviceName string, config *dynamic.WeightedRoundRobin, responseModifier func(*http.Response) error) (http.Handler, error) {
|
||||
// TODO Handle accesslog and metrics with multiple service name
|
||||
if config.Sticky != nil && config.Sticky.Cookie != nil {
|
||||
config.Sticky.Cookie.Name = cookie.GetName(config.Sticky.Cookie.Name, serviceName)
|
||||
|
@ -200,15 +204,12 @@ func (m *Manager) LaunchHealthCheck() {
|
|||
for serviceName, balancers := range m.balancers {
|
||||
ctx := log.With(context.Background(), log.Str(log.ServiceName, serviceName))
|
||||
|
||||
// TODO aggregate
|
||||
balancer := balancers[0]
|
||||
|
||||
// TODO Should all the services handle healthcheck? Handle different types
|
||||
service := m.configs[serviceName].LoadBalancer
|
||||
|
||||
// Health Check
|
||||
var backendHealthCheck *healthcheck.BackendConfig
|
||||
if hcOpts := buildHealthCheckOptions(ctx, balancer, serviceName, service.HealthCheck); hcOpts != nil {
|
||||
if hcOpts := buildHealthCheckOptions(ctx, balancers, serviceName, service.HealthCheck); hcOpts != nil {
|
||||
log.FromContext(ctx).Debugf("Setting up healthcheck for service %s with %s", serviceName, *hcOpts)
|
||||
|
||||
hcOpts.Transport = m.defaultRoundTripper
|
||||
|
@ -224,7 +225,7 @@ func (m *Manager) LaunchHealthCheck() {
|
|||
healthcheck.GetHealthCheck().SetBackendsConfiguration(context.Background(), backendConfigs)
|
||||
}
|
||||
|
||||
func buildHealthCheckOptions(ctx context.Context, lb healthcheck.BalancerHandler, backend string, hc *dynamic.HealthCheck) *healthcheck.Options {
|
||||
func buildHealthCheckOptions(ctx context.Context, lb healthcheck.Balancer, backend string, hc *dynamic.HealthCheck) *healthcheck.Options {
|
||||
if hc == nil || hc.Path == "" {
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -1,25 +0,0 @@
|
|||
package tracing
|
||||
|
||||
import "net/http"
|
||||
|
||||
// HTTPHeadersCarrier custom implementation to fix duplicated headers
|
||||
// It has been fixed in https://github.com/opentracing/opentracing-go/pull/191
|
||||
type HTTPHeadersCarrier http.Header
|
||||
|
||||
// Set conforms to the TextMapWriter interface.
|
||||
func (c HTTPHeadersCarrier) Set(key, val string) {
|
||||
h := http.Header(c)
|
||||
h.Set(key, val)
|
||||
}
|
||||
|
||||
// ForeachKey conforms to the TextMapReader interface.
|
||||
func (c HTTPHeadersCarrier) ForeachKey(handler func(key, val string) error) error {
|
||||
for k, vals := range c {
|
||||
for _, v := range vals {
|
||||
if err := handler(k, v); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -134,7 +134,7 @@ func InjectRequestHeaders(r *http.Request) {
|
|||
err := opentracing.GlobalTracer().Inject(
|
||||
span.Context(),
|
||||
opentracing.HTTPHeaders,
|
||||
HTTPHeadersCarrier(r.Header))
|
||||
opentracing.HTTPHeadersCarrier(r.Header))
|
||||
if err != nil {
|
||||
log.FromContext(r.Context()).Error(err)
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue