Merge branch v3.3 into master
This commit is contained in:
commit
30fe11eccf
100 changed files with 13204 additions and 916 deletions
|
@ -1,16 +1,16 @@
|
|||
package logs
|
||||
|
||||
import (
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/smithy-go/logging"
|
||||
"github.com/rs/zerolog"
|
||||
)
|
||||
|
||||
func NewAWSWrapper(logger zerolog.Logger) aws.LoggerFunc {
|
||||
func NewAWSWrapper(logger zerolog.Logger) logging.LoggerFunc {
|
||||
if logger.GetLevel() > zerolog.DebugLevel {
|
||||
return func(args ...interface{}) {}
|
||||
return func(classification logging.Classification, format string, args ...interface{}) {}
|
||||
}
|
||||
|
||||
return func(args ...interface{}) {
|
||||
return func(classification logging.Classification, format string, args ...interface{}) {
|
||||
logger.Debug().CallerSkipFrame(2).MsgFunc(msgFunc(args...))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/aws/smithy-go/logging"
|
||||
"github.com/rs/zerolog"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
@ -18,7 +19,7 @@ func TestNewAWSWrapper(t *testing.T) {
|
|||
|
||||
logger := NewAWSWrapper(zerolog.New(out).With().Caller().Logger())
|
||||
|
||||
logger.Log("foo")
|
||||
logger.Logf(logging.Debug, "%s", "foo")
|
||||
|
||||
assert.Equal(t, "<nil> DBG aws_test.go:21 > foo\n", buf.String())
|
||||
assert.Equal(t, "<nil> DBG aws_test.go:22 > foo\n", buf.String())
|
||||
}
|
||||
|
|
|
@ -609,83 +609,106 @@ func TestMinResponseBodyBytes(t *testing.T) {
|
|||
func Test1xxResponses(t *testing.T) {
|
||||
fakeBody := generateBytes(100000)
|
||||
|
||||
next := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
h := w.Header()
|
||||
h.Add("Link", "</style.css>; rel=preload; as=style")
|
||||
h.Add("Link", "</script.js>; rel=preload; as=script")
|
||||
w.WriteHeader(http.StatusEarlyHints)
|
||||
|
||||
h.Add("Link", "</foo.js>; rel=preload; as=script")
|
||||
w.WriteHeader(http.StatusProcessing)
|
||||
|
||||
if _, err := w.Write(fakeBody); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
}
|
||||
})
|
||||
cfg := dynamic.Compress{
|
||||
MinResponseBodyBytes: 1024,
|
||||
Encodings: defaultSupportedEncodings,
|
||||
}
|
||||
compress, err := New(context.Background(), next, cfg, "testing")
|
||||
require.NoError(t, err)
|
||||
|
||||
server := httptest.NewServer(compress)
|
||||
t.Cleanup(server.Close)
|
||||
frontendClient := server.Client()
|
||||
|
||||
checkLinkHeaders := func(t *testing.T, expected, got []string) {
|
||||
t.Helper()
|
||||
|
||||
if len(expected) != len(got) {
|
||||
t.Errorf("Expected %d link headers; got %d", len(expected), len(got))
|
||||
}
|
||||
|
||||
for i := range expected {
|
||||
if i >= len(got) {
|
||||
t.Errorf("Expected %q link header; got nothing", expected[i])
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
if expected[i] != got[i] {
|
||||
t.Errorf("Expected %q link header; got %q", expected[i], got[i])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var respCounter uint8
|
||||
trace := &httptrace.ClientTrace{
|
||||
Got1xxResponse: func(code int, header textproto.MIMEHeader) error {
|
||||
switch code {
|
||||
case http.StatusEarlyHints:
|
||||
checkLinkHeaders(t, []string{"</style.css>; rel=preload; as=style", "</script.js>; rel=preload; as=script"}, header["Link"])
|
||||
case http.StatusProcessing:
|
||||
checkLinkHeaders(t, []string{"</style.css>; rel=preload; as=style", "</script.js>; rel=preload; as=script", "</foo.js>; rel=preload; as=script"}, header["Link"])
|
||||
default:
|
||||
t.Error("Unexpected 1xx response")
|
||||
}
|
||||
|
||||
respCounter++
|
||||
|
||||
return nil
|
||||
testCases := []struct {
|
||||
desc string
|
||||
encoding string
|
||||
}{
|
||||
{
|
||||
desc: "gzip",
|
||||
encoding: gzipName,
|
||||
},
|
||||
{
|
||||
desc: "brotli",
|
||||
encoding: brotliName,
|
||||
},
|
||||
{
|
||||
desc: "zstd",
|
||||
encoding: zstdName,
|
||||
},
|
||||
}
|
||||
req, _ := http.NewRequestWithContext(httptrace.WithClientTrace(context.Background(), trace), http.MethodGet, server.URL, nil)
|
||||
req.Header.Add(acceptEncodingHeader, gzipName)
|
||||
for _, test := range testCases {
|
||||
t.Run(test.desc, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
res, err := frontendClient.Do(req)
|
||||
assert.NoError(t, err)
|
||||
next := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
h := w.Header()
|
||||
h.Add("Link", "</style.css>; rel=preload; as=style")
|
||||
h.Add("Link", "</script.js>; rel=preload; as=script")
|
||||
w.WriteHeader(http.StatusEarlyHints)
|
||||
|
||||
defer res.Body.Close()
|
||||
h.Add("Link", "</foo.js>; rel=preload; as=script")
|
||||
w.WriteHeader(http.StatusProcessing)
|
||||
|
||||
if respCounter != 2 {
|
||||
t.Errorf("Expected 2 1xx responses; got %d", respCounter)
|
||||
if _, err := w.Write(fakeBody); err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
}
|
||||
})
|
||||
cfg := dynamic.Compress{
|
||||
MinResponseBodyBytes: 1024,
|
||||
Encodings: defaultSupportedEncodings,
|
||||
}
|
||||
compress, err := New(context.Background(), next, cfg, "testing")
|
||||
require.NoError(t, err)
|
||||
|
||||
server := httptest.NewServer(compress)
|
||||
t.Cleanup(server.Close)
|
||||
frontendClient := server.Client()
|
||||
|
||||
checkLinkHeaders := func(t *testing.T, expected, got []string) {
|
||||
t.Helper()
|
||||
|
||||
if len(expected) != len(got) {
|
||||
t.Errorf("Expected %d link headers; got %d", len(expected), len(got))
|
||||
}
|
||||
|
||||
for i := range expected {
|
||||
if i >= len(got) {
|
||||
t.Errorf("Expected %q link header; got nothing", expected[i])
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
if expected[i] != got[i] {
|
||||
t.Errorf("Expected %q link header; got %q", expected[i], got[i])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var respCounter uint8
|
||||
trace := &httptrace.ClientTrace{
|
||||
Got1xxResponse: func(code int, header textproto.MIMEHeader) error {
|
||||
switch code {
|
||||
case http.StatusEarlyHints:
|
||||
checkLinkHeaders(t, []string{"</style.css>; rel=preload; as=style", "</script.js>; rel=preload; as=script"}, header["Link"])
|
||||
case http.StatusProcessing:
|
||||
checkLinkHeaders(t, []string{"</style.css>; rel=preload; as=style", "</script.js>; rel=preload; as=script", "</foo.js>; rel=preload; as=script"}, header["Link"])
|
||||
default:
|
||||
t.Error("Unexpected 1xx response")
|
||||
}
|
||||
|
||||
respCounter++
|
||||
|
||||
return nil
|
||||
},
|
||||
}
|
||||
req, _ := http.NewRequestWithContext(httptrace.WithClientTrace(context.Background(), trace), http.MethodGet, server.URL, nil)
|
||||
req.Header.Add(acceptEncodingHeader, test.encoding)
|
||||
|
||||
res, err := frontendClient.Do(req)
|
||||
assert.NoError(t, err)
|
||||
|
||||
defer res.Body.Close()
|
||||
|
||||
if respCounter != 2 {
|
||||
t.Errorf("Expected 2 1xx responses; got %d", respCounter)
|
||||
}
|
||||
checkLinkHeaders(t, []string{"</style.css>; rel=preload; as=style", "</script.js>; rel=preload; as=script", "</foo.js>; rel=preload; as=script"}, res.Header["Link"])
|
||||
|
||||
assert.Equal(t, test.encoding, res.Header.Get(contentEncodingHeader))
|
||||
body, _ := io.ReadAll(res.Body)
|
||||
assert.NotEqualValues(t, body, fakeBody)
|
||||
})
|
||||
}
|
||||
checkLinkHeaders(t, []string{"</style.css>; rel=preload; as=style", "</script.js>; rel=preload; as=script", "</foo.js>; rel=preload; as=script"}, res.Header["Link"])
|
||||
|
||||
assert.Equal(t, gzipName, res.Header.Get(contentEncodingHeader))
|
||||
body, _ := io.ReadAll(res.Body)
|
||||
assert.NotEqualValues(t, body, fakeBody)
|
||||
}
|
||||
|
||||
func BenchmarkCompressGzip(b *testing.B) {
|
||||
|
|
|
@ -192,12 +192,17 @@ func (r *responseWriter) Header() http.Header {
|
|||
}
|
||||
|
||||
func (r *responseWriter) WriteHeader(statusCode int) {
|
||||
if r.statusCodeSet {
|
||||
// Handle informational headers
|
||||
// This is gated to not forward 1xx responses on builds prior to go1.20.
|
||||
if statusCode >= 100 && statusCode <= 199 {
|
||||
r.rw.WriteHeader(statusCode)
|
||||
return
|
||||
}
|
||||
|
||||
r.statusCode = statusCode
|
||||
r.statusCodeSet = true
|
||||
if !r.statusCodeSet {
|
||||
r.statusCode = statusCode
|
||||
r.statusCodeSet = true
|
||||
}
|
||||
}
|
||||
|
||||
func (r *responseWriter) Write(p []byte) (int, error) {
|
||||
|
@ -319,11 +324,16 @@ func (r *responseWriter) Flush() {
|
|||
}
|
||||
|
||||
// Here, nothing was ever written either to rw or to bw (since we're still
|
||||
// waiting to decide whether to compress), so we do not need to flush anything.
|
||||
// Note that we diverge with klauspost's gzip behavior, where they instead
|
||||
// force compression and flush whatever was in the buffer in this case.
|
||||
// waiting to decide whether to compress), so to be aligned with klauspost's
|
||||
// gzip behavior we force the compression and flush whatever was in the buffer in this case.
|
||||
if !r.compressionStarted {
|
||||
return
|
||||
r.rw.Header().Del(contentLength)
|
||||
|
||||
r.rw.Header().Set(contentEncoding, r.compressionWriter.ContentEncoding())
|
||||
r.rw.WriteHeader(r.statusCode)
|
||||
r.headersSent = true
|
||||
|
||||
r.compressionStarted = true
|
||||
}
|
||||
|
||||
// Conversely, we here know that something was already written to bw (or is
|
||||
|
|
|
@ -498,6 +498,73 @@ func Test_FlushAfterAllWrites(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func Test_FlushForceCompress(t *testing.T) {
|
||||
testCases := []struct {
|
||||
desc string
|
||||
cfg Config
|
||||
algo string
|
||||
readerBuilder func(io.Reader) (io.Reader, error)
|
||||
acceptEncoding string
|
||||
}{
|
||||
{
|
||||
desc: "brotli",
|
||||
cfg: Config{MinSize: 1024, MiddlewareName: "Test"},
|
||||
algo: brotliName,
|
||||
readerBuilder: func(reader io.Reader) (io.Reader, error) {
|
||||
return brotli.NewReader(reader), nil
|
||||
},
|
||||
acceptEncoding: "br",
|
||||
},
|
||||
{
|
||||
desc: "zstd",
|
||||
cfg: Config{MinSize: 1024, MiddlewareName: "Test"},
|
||||
algo: zstdName,
|
||||
readerBuilder: func(reader io.Reader) (io.Reader, error) {
|
||||
return zstd.NewReader(reader)
|
||||
},
|
||||
acceptEncoding: "zstd",
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range testCases {
|
||||
t.Run(test.desc, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
next := http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||
rw.WriteHeader(http.StatusOK)
|
||||
|
||||
_, err := rw.Write(smallTestBody)
|
||||
require.NoError(t, err)
|
||||
|
||||
rw.(http.Flusher).Flush()
|
||||
})
|
||||
|
||||
srv := httptest.NewServer(mustNewCompressionHandler(t, test.cfg, test.algo, next))
|
||||
defer srv.Close()
|
||||
|
||||
req, err := http.NewRequest(http.MethodGet, srv.URL, http.NoBody)
|
||||
require.NoError(t, err)
|
||||
|
||||
req.Header.Set(acceptEncoding, test.acceptEncoding)
|
||||
|
||||
res, err := http.DefaultClient.Do(req)
|
||||
require.NoError(t, err)
|
||||
|
||||
defer res.Body.Close()
|
||||
|
||||
assert.Equal(t, http.StatusOK, res.StatusCode)
|
||||
assert.Equal(t, test.acceptEncoding, res.Header.Get(contentEncoding))
|
||||
|
||||
reader, err := test.readerBuilder(res.Body)
|
||||
require.NoError(t, err)
|
||||
|
||||
got, err := io.ReadAll(reader)
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, smallTestBody, got)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func Test_ExcludedContentTypes(t *testing.T) {
|
||||
testCases := []struct {
|
||||
desc string
|
||||
|
|
|
@ -36,6 +36,48 @@ type Listener interface {
|
|||
// each of them about a retry attempt.
|
||||
type Listeners []Listener
|
||||
|
||||
// Retried exists to implement the Listener interface. It calls Retried on each of its slice entries.
|
||||
func (l Listeners) Retried(req *http.Request, attempt int) {
|
||||
for _, listener := range l {
|
||||
listener.Retried(req, attempt)
|
||||
}
|
||||
}
|
||||
|
||||
type shouldRetryContextKey struct{}
|
||||
|
||||
// ShouldRetry is a function allowing to enable/disable the retry middleware mechanism.
|
||||
type ShouldRetry func(shouldRetry bool)
|
||||
|
||||
// ContextShouldRetry returns the ShouldRetry function if it has been set by the Retry middleware in the chain.
|
||||
func ContextShouldRetry(ctx context.Context) ShouldRetry {
|
||||
f, _ := ctx.Value(shouldRetryContextKey{}).(ShouldRetry)
|
||||
return f
|
||||
}
|
||||
|
||||
// WrapHandler wraps a given http.Handler to inject the httptrace.ClientTrace in the request context when it is needed
|
||||
// by the retry middleware.
|
||||
func WrapHandler(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||
if shouldRetry := ContextShouldRetry(req.Context()); shouldRetry != nil {
|
||||
shouldRetry(true)
|
||||
|
||||
trace := &httptrace.ClientTrace{
|
||||
WroteHeaders: func() {
|
||||
shouldRetry(false)
|
||||
},
|
||||
WroteRequest: func(httptrace.WroteRequestInfo) {
|
||||
shouldRetry(false)
|
||||
},
|
||||
}
|
||||
newCtx := httptrace.WithClientTrace(req.Context(), trace)
|
||||
next.ServeHTTP(rw, req.WithContext(newCtx))
|
||||
return
|
||||
}
|
||||
|
||||
next.ServeHTTP(rw, req)
|
||||
})
|
||||
}
|
||||
|
||||
// retry is a middleware that retries requests.
|
||||
type retry struct {
|
||||
attempts int
|
||||
|
@ -101,19 +143,13 @@ func (r *retry) ServeHTTP(rw http.ResponseWriter, req *http.Request) {
|
|||
req = req.WithContext(tracingCtx)
|
||||
}
|
||||
|
||||
shouldRetry := attempts < r.attempts
|
||||
retryResponseWriter := newResponseWriter(rw, shouldRetry)
|
||||
remainAttempts := attempts < r.attempts
|
||||
retryResponseWriter := newResponseWriter(rw)
|
||||
|
||||
// Disable retries when the backend already received request data
|
||||
clientTrace := &httptrace.ClientTrace{
|
||||
WroteHeaders: func() {
|
||||
retryResponseWriter.DisableRetries()
|
||||
},
|
||||
WroteRequest: func(httptrace.WroteRequestInfo) {
|
||||
retryResponseWriter.DisableRetries()
|
||||
},
|
||||
var shouldRetry ShouldRetry = func(shouldRetry bool) {
|
||||
retryResponseWriter.SetShouldRetry(remainAttempts && shouldRetry)
|
||||
}
|
||||
newCtx := httptrace.WithClientTrace(req.Context(), clientTrace)
|
||||
newCtx := context.WithValue(req.Context(), shouldRetryContextKey{}, shouldRetry)
|
||||
|
||||
r.next.ServeHTTP(retryResponseWriter, req.Clone(newCtx))
|
||||
|
||||
|
@ -164,18 +200,10 @@ func (r *retry) newBackOff() backoff.BackOff {
|
|||
return b
|
||||
}
|
||||
|
||||
// Retried exists to implement the Listener interface. It calls Retried on each of its slice entries.
|
||||
func (l Listeners) Retried(req *http.Request, attempt int) {
|
||||
for _, listener := range l {
|
||||
listener.Retried(req, attempt)
|
||||
}
|
||||
}
|
||||
|
||||
func newResponseWriter(rw http.ResponseWriter, shouldRetry bool) *responseWriter {
|
||||
func newResponseWriter(rw http.ResponseWriter) *responseWriter {
|
||||
return &responseWriter{
|
||||
responseWriter: rw,
|
||||
headers: make(http.Header),
|
||||
shouldRetry: shouldRetry,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -190,8 +218,8 @@ func (r *responseWriter) ShouldRetry() bool {
|
|||
return r.shouldRetry
|
||||
}
|
||||
|
||||
func (r *responseWriter) DisableRetries() {
|
||||
r.shouldRetry = false
|
||||
func (r *responseWriter) SetShouldRetry(shouldRetry bool) {
|
||||
r.shouldRetry = shouldRetry
|
||||
}
|
||||
|
||||
func (r *responseWriter) Header() http.Header {
|
||||
|
@ -205,20 +233,14 @@ func (r *responseWriter) Write(buf []byte) (int, error) {
|
|||
if r.ShouldRetry() {
|
||||
return len(buf), nil
|
||||
}
|
||||
if !r.written {
|
||||
r.WriteHeader(http.StatusOK)
|
||||
}
|
||||
return r.responseWriter.Write(buf)
|
||||
}
|
||||
|
||||
func (r *responseWriter) WriteHeader(code int) {
|
||||
if r.ShouldRetry() && code == http.StatusServiceUnavailable {
|
||||
// We get a 503 HTTP Status Code when there is no backend server in the pool
|
||||
// to which the request could be sent. Also, note that r.ShouldRetry()
|
||||
// will never return true in case there was a connection established to
|
||||
// the backend server and so we can be sure that the 503 was produced
|
||||
// inside Traefik already and we don't have to retry in this cases.
|
||||
r.DisableRetries()
|
||||
}
|
||||
|
||||
if r.ShouldRetry() || r.written {
|
||||
if r.shouldRetry || r.written {
|
||||
return
|
||||
}
|
||||
|
||||
|
|
|
@ -105,12 +105,21 @@ func TestRetry(t *testing.T) {
|
|||
t.Parallel()
|
||||
|
||||
retryAttempts := 0
|
||||
next := http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
|
||||
next := http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||
// This signals that a connection will be established with the backend
|
||||
// to enable the Retry middleware mechanism.
|
||||
shouldRetry := ContextShouldRetry(req.Context())
|
||||
if shouldRetry != nil {
|
||||
shouldRetry(true)
|
||||
}
|
||||
|
||||
retryAttempts++
|
||||
|
||||
if retryAttempts > test.amountFaultyEndpoints {
|
||||
// calls WroteHeaders on httptrace.
|
||||
_ = r.Write(io.Discard)
|
||||
// This signals that request headers have been sent to the backend.
|
||||
if shouldRetry != nil {
|
||||
shouldRetry(false)
|
||||
}
|
||||
|
||||
rw.WriteHeader(http.StatusOK)
|
||||
return
|
||||
|
@ -152,27 +161,16 @@ func TestRetryEmptyServerList(t *testing.T) {
|
|||
assert.Equal(t, 0, retryListener.timesCalled)
|
||||
}
|
||||
|
||||
func TestRetryListeners(t *testing.T) {
|
||||
req := httptest.NewRequest(http.MethodGet, "/", nil)
|
||||
retryListeners := Listeners{&countingRetryListener{}, &countingRetryListener{}}
|
||||
|
||||
retryListeners.Retried(req, 1)
|
||||
retryListeners.Retried(req, 1)
|
||||
|
||||
for _, retryListener := range retryListeners {
|
||||
listener := retryListener.(*countingRetryListener)
|
||||
if listener.timesCalled != 2 {
|
||||
t.Errorf("retry listener was called %d time(s), want %d time(s)", listener.timesCalled, 2)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestMultipleRetriesShouldNotLooseHeaders(t *testing.T) {
|
||||
attempt := 0
|
||||
expectedHeaderName := "X-Foo-Test-2"
|
||||
expectedHeaderValue := "bar"
|
||||
|
||||
next := http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||
shouldRetry := ContextShouldRetry(req.Context())
|
||||
if shouldRetry != nil {
|
||||
shouldRetry(true)
|
||||
}
|
||||
|
||||
headerName := fmt.Sprintf("X-Foo-Test-%d", attempt)
|
||||
rw.Header().Add(headerName, expectedHeaderValue)
|
||||
if attempt < 2 {
|
||||
|
@ -181,43 +179,54 @@ func TestMultipleRetriesShouldNotLooseHeaders(t *testing.T) {
|
|||
}
|
||||
|
||||
// Request has been successfully written to backend
|
||||
trace := httptrace.ContextClientTrace(req.Context())
|
||||
trace.WroteHeaders()
|
||||
shouldRetry(false)
|
||||
|
||||
// And we decide to answer to client
|
||||
// And we decide to answer to client.
|
||||
rw.WriteHeader(http.StatusNoContent)
|
||||
})
|
||||
|
||||
retry, err := New(context.Background(), next, dynamic.Retry{Attempts: 3}, &countingRetryListener{}, "traefikTest")
|
||||
require.NoError(t, err)
|
||||
|
||||
responseRecorder := httptest.NewRecorder()
|
||||
retry.ServeHTTP(responseRecorder, testhelpers.MustNewRequest(http.MethodGet, "http://test", http.NoBody))
|
||||
res := httptest.NewRecorder()
|
||||
retry.ServeHTTP(res, testhelpers.MustNewRequest(http.MethodGet, "http://test", http.NoBody))
|
||||
|
||||
headerValue := responseRecorder.Header().Get(expectedHeaderName)
|
||||
|
||||
// Validate if we have the correct header
|
||||
if headerValue != expectedHeaderValue {
|
||||
t.Errorf("Expected to have %s for header %s, got %s", expectedHeaderValue, expectedHeaderName, headerValue)
|
||||
}
|
||||
// The third header attempt is kept.
|
||||
headerValue := res.Header().Get("X-Foo-Test-2")
|
||||
assert.Equal(t, expectedHeaderValue, headerValue)
|
||||
|
||||
// Validate that we don't have headers from previous attempts
|
||||
for i := range attempt {
|
||||
headerName := fmt.Sprintf("X-Foo-Test-%d", i)
|
||||
headerValue = responseRecorder.Header().Get("headerName")
|
||||
headerValue = res.Header().Get(headerName)
|
||||
if headerValue != "" {
|
||||
t.Errorf("Expected no value for header %s, got %s", headerName, headerValue)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// countingRetryListener is a Listener implementation to count the times the Retried fn is called.
|
||||
type countingRetryListener struct {
|
||||
timesCalled int
|
||||
}
|
||||
func TestRetryShouldNotLooseHeadersOnWrite(t *testing.T) {
|
||||
next := http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||
rw.Header().Add("X-Foo-Test", "bar")
|
||||
|
||||
func (l *countingRetryListener) Retried(req *http.Request, attempt int) {
|
||||
l.timesCalled++
|
||||
// Request has been successfully written to backend.
|
||||
shouldRetry := ContextShouldRetry(req.Context())
|
||||
if shouldRetry != nil {
|
||||
shouldRetry(false)
|
||||
}
|
||||
// And we decide to answer to client without calling WriteHeader.
|
||||
_, err := rw.Write([]byte("bar"))
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
retry, err := New(context.Background(), next, dynamic.Retry{Attempts: 3}, &countingRetryListener{}, "traefikTest")
|
||||
require.NoError(t, err)
|
||||
|
||||
res := httptest.NewRecorder()
|
||||
retry.ServeHTTP(res, testhelpers.MustNewRequest(http.MethodGet, "http://test", http.NoBody))
|
||||
|
||||
headerValue := res.Header().Get("X-Foo-Test")
|
||||
assert.Equal(t, "bar", headerValue)
|
||||
}
|
||||
|
||||
func TestRetryWithFlush(t *testing.T) {
|
||||
|
@ -275,12 +284,24 @@ func TestRetryWebsocket(t *testing.T) {
|
|||
t.Parallel()
|
||||
|
||||
retryAttempts := 0
|
||||
next := http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) {
|
||||
next := http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||
// This signals that a connection will be established with the backend
|
||||
// to enable the Retry middleware mechanism.
|
||||
shouldRetry := ContextShouldRetry(req.Context())
|
||||
if shouldRetry != nil {
|
||||
shouldRetry(true)
|
||||
}
|
||||
|
||||
retryAttempts++
|
||||
|
||||
if retryAttempts > test.amountFaultyEndpoints {
|
||||
// This signals that request headers have been sent to the backend.
|
||||
if shouldRetry != nil {
|
||||
shouldRetry(false)
|
||||
}
|
||||
|
||||
upgrader := websocket.Upgrader{}
|
||||
_, err := upgrader.Upgrade(rw, r, nil)
|
||||
_, err := upgrader.Upgrade(rw, req, nil)
|
||||
if err != nil {
|
||||
http.Error(rw, err.Error(), http.StatusInternalServerError)
|
||||
}
|
||||
|
@ -387,3 +408,12 @@ func Test1xxResponses(t *testing.T) {
|
|||
|
||||
assert.Equal(t, 0, retryListener.timesCalled)
|
||||
}
|
||||
|
||||
// countingRetryListener is a Listener implementation to count the times the Retried fn is called.
|
||||
type countingRetryListener struct {
|
||||
timesCalled int
|
||||
}
|
||||
|
||||
func (l *countingRetryListener) Retried(req *http.Request, attempt int) {
|
||||
l.timesCalled++
|
||||
}
|
||||
|
|
|
@ -1,10 +1,13 @@
|
|||
package ecs
|
||||
|
||||
import "github.com/aws/aws-sdk-go/service/ecs"
|
||||
import (
|
||||
ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types"
|
||||
ecstypes "github.com/aws/aws-sdk-go-v2/service/ecs/types"
|
||||
)
|
||||
|
||||
func instance(ops ...func(*ecsInstance)) ecsInstance {
|
||||
e := &ecsInstance{
|
||||
containerDefinition: &ecs.ContainerDefinition{},
|
||||
containerDefinition: &ecstypes.ContainerDefinition{},
|
||||
}
|
||||
|
||||
for _, op := range ops {
|
||||
|
@ -36,7 +39,7 @@ func iMachine(opts ...func(*machine)) func(*ecsInstance) {
|
|||
}
|
||||
}
|
||||
|
||||
func mState(state string) func(*machine) {
|
||||
func mState(state ec2types.InstanceStateName) func(*machine) {
|
||||
return func(m *machine) {
|
||||
m.state = state
|
||||
}
|
||||
|
@ -48,7 +51,7 @@ func mPrivateIP(ip string) func(*machine) {
|
|||
}
|
||||
}
|
||||
|
||||
func mHealthStatus(status string) func(*machine) {
|
||||
func mHealthStatus(status ecstypes.HealthStatus) func(*machine) {
|
||||
return func(m *machine) {
|
||||
m.healthStatus = status
|
||||
}
|
||||
|
@ -64,10 +67,10 @@ func mPorts(opts ...func(*portMapping)) func(*machine) {
|
|||
}
|
||||
}
|
||||
|
||||
func mPort(containerPort, hostPort int32, protocol string) func(*portMapping) {
|
||||
func mPort(containerPort, hostPort int32, protocol ecstypes.TransportProtocol) func(*portMapping) {
|
||||
return func(pm *portMapping) {
|
||||
pm.containerPort = int64(containerPort)
|
||||
pm.hostPort = int64(hostPort)
|
||||
pm.containerPort = containerPort
|
||||
pm.hostPort = hostPort
|
||||
pm.protocol = protocol
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,9 +6,9 @@ import (
|
|||
"fmt"
|
||||
"net"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/aws/aws-sdk-go/service/ec2"
|
||||
ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types"
|
||||
ecstypes "github.com/aws/aws-sdk-go-v2/service/ecs/types"
|
||||
"github.com/docker/go-connections/nat"
|
||||
"github.com/rs/zerolog/log"
|
||||
"github.com/traefik/traefik/v3/pkg/config/dynamic"
|
||||
|
@ -161,12 +161,12 @@ func (p *Provider) filterInstance(ctx context.Context, instance ecsInstance) boo
|
|||
return false
|
||||
}
|
||||
|
||||
if strings.ToLower(instance.machine.state) != ec2.InstanceStateNameRunning {
|
||||
if instance.machine.state != ec2types.InstanceStateNameRunning {
|
||||
logger.Debug().Msgf("Filtering ecs instance with an incorrect state %s (%s) (state = %s)", instance.Name, instance.ID, instance.machine.state)
|
||||
return false
|
||||
}
|
||||
|
||||
if instance.machine.healthStatus == "UNHEALTHY" {
|
||||
if instance.machine.healthStatus == ecstypes.HealthStatusUnhealthy {
|
||||
logger.Debug().Msgf("Filtering unhealthy ecs instance %s (%s)", instance.Name, instance.ID)
|
||||
return false
|
||||
}
|
||||
|
@ -300,9 +300,9 @@ func (p *Provider) getIPPort(instance ecsInstance, serverPort string) (string, s
|
|||
func getPort(instance ecsInstance, serverPort string) string {
|
||||
if len(serverPort) > 0 {
|
||||
for _, port := range instance.machine.ports {
|
||||
containerPort := strconv.FormatInt(port.containerPort, 10)
|
||||
containerPort := strconv.FormatInt(int64(port.containerPort), 10)
|
||||
if serverPort == containerPort {
|
||||
return strconv.FormatInt(port.hostPort, 10)
|
||||
return strconv.FormatInt(int64(port.hostPort), 10)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -311,7 +311,7 @@ func getPort(instance ecsInstance, serverPort string) string {
|
|||
|
||||
var ports []nat.Port
|
||||
for _, port := range instance.machine.ports {
|
||||
natPort, err := nat.NewPort(port.protocol, strconv.FormatInt(port.hostPort, 10))
|
||||
natPort, err := nat.NewPort(string(port.protocol), strconv.FormatInt(int64(port.hostPort), 10))
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
|
|
@ -5,7 +5,8 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go/service/ec2"
|
||||
ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types"
|
||||
ecstypes "github.com/aws/aws-sdk-go-v2/service/ecs/types"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
ptypes "github.com/traefik/paerser/types"
|
||||
|
@ -31,10 +32,10 @@ func TestDefaultRule(t *testing.T) {
|
|||
id("1"),
|
||||
labels(map[string]string{}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("10.0.0.1"),
|
||||
mPorts(
|
||||
mPort(0, 1337, "TCP"),
|
||||
mPort(0, 1337, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -90,10 +91,10 @@ func TestDefaultRule(t *testing.T) {
|
|||
name("Test"),
|
||||
labels(map[string]string{}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -151,10 +152,10 @@ func TestDefaultRule(t *testing.T) {
|
|||
"traefik.domain": "foo.bar",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -210,10 +211,10 @@ func TestDefaultRule(t *testing.T) {
|
|||
name("Test"),
|
||||
labels(map[string]string{}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -263,10 +264,10 @@ func TestDefaultRule(t *testing.T) {
|
|||
name("Test"),
|
||||
labels(map[string]string{}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -316,10 +317,10 @@ func TestDefaultRule(t *testing.T) {
|
|||
name("Test"),
|
||||
labels(map[string]string{}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -412,10 +413,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.http.services.test": "",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -451,10 +452,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.tcp.services.test": "",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -490,10 +491,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.udp.services.test": "",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -527,10 +528,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
name("Test"),
|
||||
labels(map[string]string{}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -585,10 +586,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
name("Test"),
|
||||
labels(map[string]string{}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -596,10 +597,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
name("Test2"),
|
||||
labels(map[string]string{}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.2"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -674,10 +675,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
name("Test"),
|
||||
labels(map[string]string{}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -686,10 +687,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
name("Test"),
|
||||
labels(map[string]string{}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.2"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -749,10 +750,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.http.services.Service1.loadbalancer.passhostheader": "true",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -811,10 +812,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.http.routers.Router1.service": "Service1",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -870,10 +871,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.http.routers.Router1.rule": "Host(`foo.com`)",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -930,10 +931,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.http.services.Service1.loadbalancer.passhostheader": "true",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -991,10 +992,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.http.services.Service2.loadbalancer.passhostheader": "true",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -1060,10 +1061,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.http.routers.Router1.service": "Service1",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -1120,10 +1121,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.http.services.Service1.loadbalancer.passhostheader": "true",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -1134,10 +1135,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.http.services.Service1.loadbalancer.passhostheader": "false",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -1180,10 +1181,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.http.services.Service1.loadbalancer.passhostheader": "false",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -1194,10 +1195,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.http.services.Service1.loadbalancer.passhostheader": "true",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -1208,10 +1209,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.http.services.Service1.loadbalancer.passhostheader": "true",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -1254,10 +1255,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.http.services.Service1.loadbalancer.passhostheader": "true",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -1268,10 +1269,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.http.services.Service1.loadbalancer.passhostheader": "true",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.2"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -1331,10 +1332,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.http.middlewares.Middleware1.inflightreq.amount": "42",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -1398,10 +1399,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.http.middlewares.Middleware1.inflightreq.amount": "42",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -1412,10 +1413,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.http.middlewares.Middleware1.inflightreq.amount": "42",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.2"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -1482,10 +1483,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.http.middlewares.Middleware1.inflightreq.amount": "42",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -1496,10 +1497,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.http.middlewares.Middleware1.inflightreq.amount": "41",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.2"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -1560,10 +1561,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.http.middlewares.Middleware1.inflightreq.amount": "42",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -1574,10 +1575,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.http.middlewares.Middleware1.inflightreq.amount": "41",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.2"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -1588,10 +1589,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.http.middlewares.Middleware1.inflightreq.amount": "40",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.3"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -1655,10 +1656,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.http.routers.Router1.rule": "Host(`foo.com`)",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -1669,10 +1670,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.http.routers.Router1.rule": "Host(`bar.com`)",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.2"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -1727,10 +1728,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.http.routers.Router1.rule": "Host(`foo.com`)",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -1741,10 +1742,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.http.routers.Router1.rule": "Host(`bar.com`)",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.2"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -1755,10 +1756,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.http.routers.Router1.rule": "Host(`foobar.com`)",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.3"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -1816,10 +1817,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.http.routers.Router1.rule": "Host(`foo.com`)",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -1831,10 +1832,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.http.routers.Router1.rule": "Host(`foo.com`)",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.2"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -1893,10 +1894,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.http.routers.Router1.rule": "Host(`foo.com`)",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -1906,10 +1907,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.http.routers.Router1.rule": "Host(`foo.com`)",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.2"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -1974,10 +1975,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.wrong.label": "42",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -2035,10 +2036,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.http.services.Service1.LoadBalancer.server.port": "80",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(80, 8080, "tcp"),
|
||||
mPort(80, 8080, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -2095,7 +2096,7 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.http.services.Service1.LoadBalancer.server.url": "http://1.2.3.4:5678",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(80, 8080, "tcp"),
|
||||
|
@ -2156,7 +2157,7 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.http.services.Service1.LoadBalancer.server.preservepath": "true",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(80, 8080, "tcp"),
|
||||
|
@ -2218,7 +2219,7 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.http.services.Service1.LoadBalancer.server.port": "1234",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(80, 8080, "tcp"),
|
||||
|
@ -2258,7 +2259,7 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.http.services.Service1.LoadBalancer.server.scheme": "https",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(80, 8080, "tcp"),
|
||||
|
@ -2298,10 +2299,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.http.services.Service1.LoadBalancer.server.port": "8040",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(80, 8080, "tcp"),
|
||||
mPort(80, 8080, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -2363,11 +2364,11 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.http.services.Service2.LoadBalancer.server.port": "4444",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(4444, 32123, "tcp"),
|
||||
mPort(4445, 32124, "tcp"),
|
||||
mPort(4444, 32123, ecstypes.TransportProtocolTcp),
|
||||
mPort(4445, 32124, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -2442,10 +2443,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.http.services.Service2.LoadBalancer.server.port": "8080",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -2508,7 +2509,7 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
name("Test"),
|
||||
labels(map[string]string{}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(),
|
||||
),
|
||||
|
@ -2545,7 +2546,7 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.http.middlewares.Middleware1.inflightreq.amount": "42",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(),
|
||||
),
|
||||
|
@ -2582,10 +2583,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.enable": "false",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -2621,11 +2622,11 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.enable": "false",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mHealthStatus("UNHEALTHY"),
|
||||
mHealthStatus(ecstypes.HealthStatusUnhealthy),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -2661,10 +2662,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.enable": "false",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNamePending),
|
||||
mState(ec2types.InstanceStateNamePending),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -2700,10 +2701,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.tags": "foo",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -2740,10 +2741,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.tags": "foo",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -2802,10 +2803,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.http.routers.Test.middlewares": "Middleware1",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -2874,10 +2875,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.tcp.routers.Test.middlewares": "Middleware1",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -2936,10 +2937,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.tcp.routers.foo.tls": "true",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -2991,10 +2992,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.udp.routers.foo.entrypoints": "mydns",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(0, 80, "udp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolUdp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -3045,10 +3046,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.tcp.routers.foo.tls": "true",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -3096,10 +3097,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.tcp.services.foo.loadbalancer.server.port": "80",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(80, 8080, "tcp"),
|
||||
mPort(80, 8080, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -3154,10 +3155,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.udp.services.foo.loadbalancer.server.port": "80",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(80, 8080, "udp"),
|
||||
mPort(80, 8080, ecstypes.TransportProtocolUdp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -3210,10 +3211,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.http.services.Service1.loadbalancer.passhostheader": "true",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -3226,10 +3227,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.http.services.Service1.loadbalancer.passhostheader": "true",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.2"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -3307,10 +3308,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.udp.services.foo.loadbalancer.server.port": "8080",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -3357,10 +3358,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.tcp.services.foo.loadbalancer.server.port": "80",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(80, 8080, "tcp"),
|
||||
mPort(80, 8080, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
@ -3408,10 +3409,10 @@ func Test_buildConfiguration(t *testing.T) {
|
|||
"traefik.tls.stores.default.defaultgeneratedcert.domain.sans": "foobar, fiibar",
|
||||
}),
|
||||
iMachine(
|
||||
mState(ec2.InstanceStateNameRunning),
|
||||
mState(ec2types.InstanceStateNameRunning),
|
||||
mPrivateIP("127.0.0.1"),
|
||||
mPorts(
|
||||
mPort(0, 80, "tcp"),
|
||||
mPort(0, 80, ecstypes.TransportProtocolTcp),
|
||||
),
|
||||
),
|
||||
),
|
||||
|
|
|
@ -3,21 +3,21 @@ package ecs
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"os"
|
||||
"iter"
|
||||
"slices"
|
||||
"strings"
|
||||
"text/template"
|
||||
"time"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/aws/aws-sdk-go/aws/credentials"
|
||||
"github.com/aws/aws-sdk-go/aws/credentials/stscreds"
|
||||
"github.com/aws/aws-sdk-go/aws/defaults"
|
||||
"github.com/aws/aws-sdk-go/aws/ec2metadata"
|
||||
"github.com/aws/aws-sdk-go/aws/session"
|
||||
"github.com/aws/aws-sdk-go/service/ec2"
|
||||
"github.com/aws/aws-sdk-go/service/ecs"
|
||||
"github.com/aws/aws-sdk-go/service/ssm"
|
||||
"github.com/aws/aws-sdk-go/service/sts"
|
||||
"github.com/aws/aws-sdk-go-v2/aws"
|
||||
"github.com/aws/aws-sdk-go-v2/config"
|
||||
"github.com/aws/aws-sdk-go-v2/credentials"
|
||||
"github.com/aws/aws-sdk-go-v2/service/ec2"
|
||||
ec2types "github.com/aws/aws-sdk-go-v2/service/ec2/types"
|
||||
"github.com/aws/aws-sdk-go-v2/service/ecs"
|
||||
ecstypes "github.com/aws/aws-sdk-go-v2/service/ecs/types"
|
||||
"github.com/aws/aws-sdk-go-v2/service/ssm"
|
||||
ssmtypes "github.com/aws/aws-sdk-go-v2/service/ssm/types"
|
||||
"github.com/cenkalti/backoff/v4"
|
||||
"github.com/patrickmn/go-cache"
|
||||
"github.com/rs/zerolog"
|
||||
|
@ -50,29 +50,29 @@ type Provider struct {
|
|||
type ecsInstance struct {
|
||||
Name string
|
||||
ID string
|
||||
containerDefinition *ecs.ContainerDefinition
|
||||
containerDefinition *ecstypes.ContainerDefinition
|
||||
machine *machine
|
||||
Labels map[string]string
|
||||
ExtraConf configuration
|
||||
}
|
||||
|
||||
type portMapping struct {
|
||||
containerPort int64
|
||||
hostPort int64
|
||||
protocol string
|
||||
containerPort int32
|
||||
hostPort int32
|
||||
protocol ecstypes.TransportProtocol
|
||||
}
|
||||
|
||||
type machine struct {
|
||||
state string
|
||||
state ec2types.InstanceStateName
|
||||
privateIP string
|
||||
ports []portMapping
|
||||
healthStatus string
|
||||
healthStatus ecstypes.HealthStatus
|
||||
}
|
||||
|
||||
type awsClient struct {
|
||||
ecs *ecs.ECS
|
||||
ec2 *ec2.EC2
|
||||
ssm *ssm.SSM
|
||||
ecs *ecs.Client
|
||||
ec2 *ec2.Client
|
||||
ssm *ssm.Client
|
||||
}
|
||||
|
||||
// DefaultTemplateRule The default template for the default rule.
|
||||
|
@ -104,54 +104,38 @@ func (p *Provider) Init() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (p *Provider) createClient(logger zerolog.Logger) (*awsClient, error) {
|
||||
sess, err := session.NewSessionWithOptions(session.Options{
|
||||
SharedConfigState: session.SharedConfigEnable,
|
||||
})
|
||||
func (p *Provider) createClient(ctx context.Context, logger zerolog.Logger) (*awsClient, error) {
|
||||
optFns := []func(*config.LoadOptions) error{
|
||||
config.WithLogger(logs.NewAWSWrapper(logger)),
|
||||
}
|
||||
if p.Region != "" {
|
||||
optFns = append(optFns, config.WithRegion(p.Region))
|
||||
} else {
|
||||
logger.Info().Msg("No region provided, will retrieve region from the EC2 Metadata service")
|
||||
optFns = append(optFns, config.WithEC2IMDSRegion())
|
||||
}
|
||||
|
||||
if p.AccessKeyID != "" && p.SecretAccessKey != "" {
|
||||
// From https://docs.aws.amazon.com/sdk-for-go/v2/developer-guide/configure-gosdk.html#specify-credentials-programmatically:
|
||||
// "If you explicitly provide credentials, as in this example, the SDK uses only those credentials."
|
||||
// this makes sure that user-defined credentials always have the highest priority
|
||||
staticCreds := aws.NewCredentialsCache(credentials.NewStaticCredentialsProvider(p.AccessKeyID, p.SecretAccessKey, ""))
|
||||
optFns = append(optFns, config.WithCredentialsProvider(staticCreds))
|
||||
|
||||
// If the access key and secret access key are not provided, config.LoadDefaultConfig
|
||||
// will look for the credentials in the default credential chain.
|
||||
// See https://docs.aws.amazon.com/sdk-for-go/v2/developer-guide/configure-gosdk.html#specifying-credentials.
|
||||
}
|
||||
|
||||
cfg, err := config.LoadDefaultConfig(ctx, optFns...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ec2meta := ec2metadata.New(sess)
|
||||
if p.Region == "" && ec2meta.Available() {
|
||||
logger.Info().Msg("No region provided, querying instance metadata endpoint...")
|
||||
identity, err := ec2meta.GetInstanceIdentityDocument()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
p.Region = identity.Region
|
||||
}
|
||||
|
||||
cfg := aws.NewConfig().
|
||||
WithCredentials(credentials.NewChainCredentials([]credentials.Provider{
|
||||
&credentials.StaticProvider{
|
||||
Value: credentials.Value{
|
||||
AccessKeyID: p.AccessKeyID,
|
||||
SecretAccessKey: p.SecretAccessKey,
|
||||
},
|
||||
},
|
||||
&credentials.EnvProvider{},
|
||||
&credentials.SharedCredentialsProvider{},
|
||||
defaults.RemoteCredProvider(*(defaults.Config()), defaults.Handlers()),
|
||||
stscreds.NewWebIdentityRoleProviderWithOptions(
|
||||
sts.New(sess),
|
||||
os.Getenv("AWS_ROLE_ARN"),
|
||||
"",
|
||||
stscreds.FetchTokenPath(os.Getenv("AWS_WEB_IDENTITY_TOKEN_FILE")),
|
||||
),
|
||||
}))
|
||||
|
||||
// Set the region if it is defined by the user or resolved from the EC2 metadata.
|
||||
if p.Region != "" {
|
||||
cfg.Region = &p.Region
|
||||
}
|
||||
|
||||
cfg.WithLogger(logs.NewAWSWrapper(logger))
|
||||
|
||||
return &awsClient{
|
||||
ecs.New(sess, cfg),
|
||||
ec2.New(sess, cfg),
|
||||
ssm.New(sess, cfg),
|
||||
ecs.NewFromConfig(cfg),
|
||||
ec2.NewFromConfig(cfg),
|
||||
ssm.NewFromConfig(cfg),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -162,7 +146,7 @@ func (p *Provider) Provide(configurationChan chan<- dynamic.Message, pool *safe.
|
|||
ctxLog := logger.WithContext(routineCtx)
|
||||
|
||||
operation := func() error {
|
||||
awsClient, err := p.createClient(logger)
|
||||
awsClient, err := p.createClient(ctxLog, logger)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to create AWS client: %w", err)
|
||||
}
|
||||
|
@ -220,28 +204,19 @@ func (p *Provider) loadConfiguration(ctx context.Context, client *awsClient, con
|
|||
func (p *Provider) listInstances(ctx context.Context, client *awsClient) ([]ecsInstance, error) {
|
||||
logger := log.Ctx(ctx)
|
||||
|
||||
var clustersArn []*string
|
||||
var clusters []string
|
||||
|
||||
if p.AutoDiscoverClusters {
|
||||
input := &ecs.ListClustersInput{}
|
||||
for {
|
||||
result, err := client.ecs.ListClusters(input)
|
||||
|
||||
paginator := ecs.NewListClustersPaginator(client.ecs, input)
|
||||
for paginator.HasMorePages() {
|
||||
page, err := paginator.NextPage(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if result != nil {
|
||||
clustersArn = append(clustersArn, result.ClusterArns...)
|
||||
input.NextToken = result.NextToken
|
||||
if result.NextToken == nil {
|
||||
break
|
||||
}
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
for _, cArn := range clustersArn {
|
||||
clusters = append(clusters, *cArn)
|
||||
|
||||
clusters = append(clusters, page.ClusterArns...)
|
||||
}
|
||||
} else {
|
||||
clusters = p.Clusters
|
||||
|
@ -253,13 +228,19 @@ func (p *Provider) listInstances(ctx context.Context, client *awsClient) ([]ecsI
|
|||
for _, c := range clusters {
|
||||
input := &ecs.ListTasksInput{
|
||||
Cluster: &c,
|
||||
DesiredStatus: aws.String(ecs.DesiredStatusRunning),
|
||||
DesiredStatus: ecstypes.DesiredStatusRunning,
|
||||
}
|
||||
|
||||
tasks := make(map[string]*ecs.Task)
|
||||
err := client.ecs.ListTasksPagesWithContext(ctx, input, func(page *ecs.ListTasksOutput, lastPage bool) bool {
|
||||
tasks := make(map[string]ecstypes.Task)
|
||||
|
||||
paginator := ecs.NewListTasksPaginator(client.ecs, input)
|
||||
for paginator.HasMorePages() {
|
||||
page, err := paginator.NextPage(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("listing tasks: %w", err)
|
||||
}
|
||||
if len(page.TaskArns) > 0 {
|
||||
resp, err := client.ecs.DescribeTasksWithContext(ctx, &ecs.DescribeTasksInput{
|
||||
resp, err := client.ecs.DescribeTasks(ctx, &ecs.DescribeTasksInput{
|
||||
Tasks: page.TaskArns,
|
||||
Cluster: &c,
|
||||
})
|
||||
|
@ -267,19 +248,15 @@ func (p *Provider) listInstances(ctx context.Context, client *awsClient) ([]ecsI
|
|||
logger.Error().Msgf("Unable to describe tasks for %v", page.TaskArns)
|
||||
} else {
|
||||
for _, t := range resp.Tasks {
|
||||
if p.HealthyTasksOnly && aws.StringValue(t.HealthStatus) != ecs.HealthStatusHealthy {
|
||||
logger.Debug().Msgf("Skipping unhealthy task %s", aws.StringValue(t.TaskArn))
|
||||
if p.HealthyTasksOnly && t.HealthStatus != ecstypes.HealthStatusHealthy {
|
||||
logger.Debug().Msgf("Skipping unhealthy task %s", aws.ToString(t.TaskArn))
|
||||
continue
|
||||
}
|
||||
|
||||
tasks[aws.StringValue(t.TaskArn)] = t
|
||||
tasks[aws.ToString(t.TaskArn)] = t
|
||||
}
|
||||
}
|
||||
}
|
||||
return !lastPage
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("listing tasks: %w", err)
|
||||
}
|
||||
|
||||
// Skip to the next cluster if there are no tasks found on
|
||||
|
@ -293,7 +270,7 @@ func (p *Provider) listInstances(ctx context.Context, client *awsClient) ([]ecsI
|
|||
return nil, err
|
||||
}
|
||||
|
||||
miInstances := make(map[string]*ssm.InstanceInformation)
|
||||
miInstances := make(map[string]ssmtypes.InstanceInformation)
|
||||
if p.ECSAnywhere {
|
||||
// Try looking up for instances on ECS Anywhere
|
||||
miInstances, err = p.lookupMiInstances(ctx, client, &c, tasks)
|
||||
|
@ -308,74 +285,67 @@ func (p *Provider) listInstances(ctx context.Context, client *awsClient) ([]ecsI
|
|||
}
|
||||
|
||||
for key, task := range tasks {
|
||||
containerInstance := ec2Instances[aws.StringValue(task.ContainerInstanceArn)]
|
||||
containerInstance, hasContainerInstance := ec2Instances[aws.ToString(task.ContainerInstanceArn)]
|
||||
taskDef := taskDefinitions[key]
|
||||
|
||||
for _, container := range task.Containers {
|
||||
var containerDefinition *ecs.ContainerDefinition
|
||||
var containerDefinition *ecstypes.ContainerDefinition
|
||||
for _, def := range taskDef.ContainerDefinitions {
|
||||
if aws.StringValue(container.Name) == aws.StringValue(def.Name) {
|
||||
containerDefinition = def
|
||||
if aws.ToString(container.Name) == aws.ToString(def.Name) {
|
||||
containerDefinition = &def
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if containerDefinition == nil {
|
||||
logger.Debug().Msgf("Unable to find container definition for %s", aws.StringValue(container.Name))
|
||||
logger.Debug().Msgf("Unable to find container definition for %s", aws.ToString(container.Name))
|
||||
continue
|
||||
}
|
||||
|
||||
var mach *machine
|
||||
if aws.StringValue(taskDef.NetworkMode) == "awsvpc" && len(task.Attachments) != 0 {
|
||||
if taskDef.NetworkMode == ecstypes.NetworkModeAwsvpc && len(task.Attachments) != 0 {
|
||||
if len(container.NetworkInterfaces) == 0 {
|
||||
logger.Error().Msgf("Skip container %s: no network interfaces", aws.StringValue(container.Name))
|
||||
logger.Error().Msgf("Skip container %s: no network interfaces", aws.ToString(container.Name))
|
||||
continue
|
||||
}
|
||||
|
||||
var ports []portMapping
|
||||
for _, mapping := range containerDefinition.PortMappings {
|
||||
if mapping != nil {
|
||||
protocol := "TCP"
|
||||
if aws.StringValue(mapping.Protocol) == "udp" {
|
||||
protocol = "UDP"
|
||||
}
|
||||
|
||||
ports = append(ports, portMapping{
|
||||
hostPort: aws.Int64Value(mapping.HostPort),
|
||||
containerPort: aws.Int64Value(mapping.ContainerPort),
|
||||
protocol: protocol,
|
||||
})
|
||||
}
|
||||
ports = append(ports, portMapping{
|
||||
hostPort: aws.ToInt32(mapping.HostPort),
|
||||
containerPort: aws.ToInt32(mapping.ContainerPort),
|
||||
protocol: mapping.Protocol,
|
||||
})
|
||||
}
|
||||
mach = &machine{
|
||||
privateIP: aws.StringValue(container.NetworkInterfaces[0].PrivateIpv4Address),
|
||||
privateIP: aws.ToString(container.NetworkInterfaces[0].PrivateIpv4Address),
|
||||
ports: ports,
|
||||
state: aws.StringValue(task.LastStatus),
|
||||
healthStatus: aws.StringValue(task.HealthStatus),
|
||||
state: ec2types.InstanceStateName(strings.ToLower(aws.ToString(task.LastStatus))),
|
||||
healthStatus: task.HealthStatus,
|
||||
}
|
||||
} else {
|
||||
miContainerInstance := miInstances[aws.StringValue(task.ContainerInstanceArn)]
|
||||
if containerInstance == nil && miContainerInstance == nil {
|
||||
logger.Error().Msgf("Unable to find container instance information for %s", aws.StringValue(container.Name))
|
||||
miContainerInstance, hasMiContainerInstance := miInstances[aws.ToString(task.ContainerInstanceArn)]
|
||||
if !hasContainerInstance && !hasMiContainerInstance {
|
||||
logger.Error().Msgf("Unable to find container instance information for %s", aws.ToString(container.Name))
|
||||
continue
|
||||
}
|
||||
|
||||
var ports []portMapping
|
||||
for _, mapping := range container.NetworkBindings {
|
||||
if mapping != nil {
|
||||
ports = append(ports, portMapping{
|
||||
hostPort: aws.Int64Value(mapping.HostPort),
|
||||
containerPort: aws.Int64Value(mapping.ContainerPort),
|
||||
})
|
||||
}
|
||||
ports = append(ports, portMapping{
|
||||
hostPort: aws.ToInt32(mapping.HostPort),
|
||||
containerPort: aws.ToInt32(mapping.ContainerPort),
|
||||
protocol: mapping.Protocol,
|
||||
})
|
||||
}
|
||||
var privateIPAddress, stateName string
|
||||
if containerInstance != nil {
|
||||
privateIPAddress = aws.StringValue(containerInstance.PrivateIpAddress)
|
||||
stateName = aws.StringValue(containerInstance.State.Name)
|
||||
} else if miContainerInstance != nil {
|
||||
privateIPAddress = aws.StringValue(miContainerInstance.IPAddress)
|
||||
stateName = aws.StringValue(task.LastStatus)
|
||||
var privateIPAddress string
|
||||
var stateName ec2types.InstanceStateName
|
||||
if hasContainerInstance {
|
||||
privateIPAddress = aws.ToString(containerInstance.PrivateIpAddress)
|
||||
stateName = containerInstance.State.Name
|
||||
} else if hasMiContainerInstance {
|
||||
privateIPAddress = aws.ToString(miContainerInstance.IPAddress)
|
||||
stateName = ec2types.InstanceStateName(strings.ToLower(aws.ToString(task.LastStatus)))
|
||||
}
|
||||
|
||||
mach = &machine{
|
||||
|
@ -386,11 +356,11 @@ func (p *Provider) listInstances(ctx context.Context, client *awsClient) ([]ecsI
|
|||
}
|
||||
|
||||
instance := ecsInstance{
|
||||
Name: fmt.Sprintf("%s-%s", strings.Replace(aws.StringValue(task.Group), ":", "-", 1), *container.Name),
|
||||
Name: fmt.Sprintf("%s-%s", strings.Replace(aws.ToString(task.Group), ":", "-", 1), aws.ToString(container.Name)),
|
||||
ID: key[len(key)-12:],
|
||||
containerDefinition: containerDefinition,
|
||||
machine: mach,
|
||||
Labels: aws.StringValueMap(containerDefinition.DockerLabels),
|
||||
Labels: containerDefinition.DockerLabels,
|
||||
}
|
||||
|
||||
extraConf, err := p.getConfiguration(instance)
|
||||
|
@ -408,21 +378,21 @@ func (p *Provider) listInstances(ctx context.Context, client *awsClient) ([]ecsI
|
|||
return instances, nil
|
||||
}
|
||||
|
||||
func (p *Provider) lookupMiInstances(ctx context.Context, client *awsClient, clusterName *string, ecsDatas map[string]*ecs.Task) (map[string]*ssm.InstanceInformation, error) {
|
||||
func (p *Provider) lookupMiInstances(ctx context.Context, client *awsClient, clusterName *string, ecsDatas map[string]ecstypes.Task) (map[string]ssmtypes.InstanceInformation, error) {
|
||||
instanceIDs := make(map[string]string)
|
||||
miInstances := make(map[string]*ssm.InstanceInformation)
|
||||
miInstances := make(map[string]ssmtypes.InstanceInformation)
|
||||
|
||||
var containerInstancesArns []*string
|
||||
var instanceArns []*string
|
||||
var containerInstancesArns []string
|
||||
var instanceArns []string
|
||||
|
||||
for _, task := range ecsDatas {
|
||||
if task.ContainerInstanceArn != nil {
|
||||
containerInstancesArns = append(containerInstancesArns, task.ContainerInstanceArn)
|
||||
containerInstancesArns = append(containerInstancesArns, *task.ContainerInstanceArn)
|
||||
}
|
||||
}
|
||||
|
||||
for _, arns := range p.chunkIDs(containerInstancesArns) {
|
||||
resp, err := client.ecs.DescribeContainerInstancesWithContext(ctx, &ecs.DescribeContainerInstancesInput{
|
||||
for arns := range chunkIDs(containerInstancesArns) {
|
||||
resp, err := client.ecs.DescribeContainerInstances(ctx, &ecs.DescribeContainerInstancesInput{
|
||||
ContainerInstances: arns,
|
||||
Cluster: clusterName,
|
||||
})
|
||||
|
@ -431,23 +401,21 @@ func (p *Provider) lookupMiInstances(ctx context.Context, client *awsClient, clu
|
|||
}
|
||||
|
||||
for _, container := range resp.ContainerInstances {
|
||||
instanceIDs[aws.StringValue(container.Ec2InstanceId)] = aws.StringValue(container.ContainerInstanceArn)
|
||||
instanceIDs[aws.ToString(container.Ec2InstanceId)] = aws.ToString(container.ContainerInstanceArn)
|
||||
|
||||
// Disallow EC2 Instance IDs
|
||||
// This prevents considering EC2 instances in ECS
|
||||
// and getting InvalidInstanceID.Malformed error when calling the describe-instances endpoint.
|
||||
if !strings.HasPrefix(aws.StringValue(container.Ec2InstanceId), "mi-") {
|
||||
continue
|
||||
if strings.HasPrefix(aws.ToString(container.Ec2InstanceId), "mi-") {
|
||||
instanceArns = append(instanceArns, *container.Ec2InstanceId)
|
||||
}
|
||||
|
||||
instanceArns = append(instanceArns, container.Ec2InstanceId)
|
||||
}
|
||||
}
|
||||
|
||||
if len(instanceArns) > 0 {
|
||||
for _, ids := range p.chunkIDs(instanceArns) {
|
||||
for ids := range chunkIDs(instanceArns) {
|
||||
input := &ssm.DescribeInstanceInformationInput{
|
||||
Filters: []*ssm.InstanceInformationStringFilter{
|
||||
Filters: []ssmtypes.InstanceInformationStringFilter{
|
||||
{
|
||||
Key: aws.String("InstanceIds"),
|
||||
Values: ids,
|
||||
|
@ -455,18 +423,18 @@ func (p *Provider) lookupMiInstances(ctx context.Context, client *awsClient, clu
|
|||
},
|
||||
}
|
||||
|
||||
err := client.ssm.DescribeInstanceInformationPagesWithContext(ctx, input, func(page *ssm.DescribeInstanceInformationOutput, lastPage bool) bool {
|
||||
if len(page.InstanceInformationList) > 0 {
|
||||
for _, i := range page.InstanceInformationList {
|
||||
if i.InstanceId != nil {
|
||||
miInstances[instanceIDs[aws.StringValue(i.InstanceId)]] = i
|
||||
}
|
||||
paginator := ssm.NewDescribeInstanceInformationPaginator(client.ssm, input)
|
||||
for paginator.HasMorePages() {
|
||||
page, err := paginator.NextPage(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("describing instances: %w", err)
|
||||
}
|
||||
|
||||
for _, i := range page.InstanceInformationList {
|
||||
if i.InstanceId != nil {
|
||||
miInstances[instanceIDs[aws.ToString(i.InstanceId)]] = i
|
||||
}
|
||||
}
|
||||
return !lastPage
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("describing instances: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -474,21 +442,21 @@ func (p *Provider) lookupMiInstances(ctx context.Context, client *awsClient, clu
|
|||
return miInstances, nil
|
||||
}
|
||||
|
||||
func (p *Provider) lookupEc2Instances(ctx context.Context, client *awsClient, clusterName *string, ecsDatas map[string]*ecs.Task) (map[string]*ec2.Instance, error) {
|
||||
func (p *Provider) lookupEc2Instances(ctx context.Context, client *awsClient, clusterName *string, ecsDatas map[string]ecstypes.Task) (map[string]ec2types.Instance, error) {
|
||||
instanceIDs := make(map[string]string)
|
||||
ec2Instances := make(map[string]*ec2.Instance)
|
||||
ec2Instances := make(map[string]ec2types.Instance)
|
||||
|
||||
var containerInstancesArns []*string
|
||||
var instanceArns []*string
|
||||
var containerInstancesArns []string
|
||||
var instanceArns []string
|
||||
|
||||
for _, task := range ecsDatas {
|
||||
if task.ContainerInstanceArn != nil {
|
||||
containerInstancesArns = append(containerInstancesArns, task.ContainerInstanceArn)
|
||||
containerInstancesArns = append(containerInstancesArns, *task.ContainerInstanceArn)
|
||||
}
|
||||
}
|
||||
|
||||
for _, arns := range p.chunkIDs(containerInstancesArns) {
|
||||
resp, err := client.ecs.DescribeContainerInstancesWithContext(ctx, &ecs.DescribeContainerInstancesInput{
|
||||
for arns := range chunkIDs(containerInstancesArns) {
|
||||
resp, err := client.ecs.DescribeContainerInstances(ctx, &ecs.DescribeContainerInstancesInput{
|
||||
ContainerInstances: arns,
|
||||
Cluster: clusterName,
|
||||
})
|
||||
|
@ -497,38 +465,38 @@ func (p *Provider) lookupEc2Instances(ctx context.Context, client *awsClient, cl
|
|||
}
|
||||
|
||||
for _, container := range resp.ContainerInstances {
|
||||
instanceIDs[aws.StringValue(container.Ec2InstanceId)] = aws.StringValue(container.ContainerInstanceArn)
|
||||
instanceIDs[aws.ToString(container.Ec2InstanceId)] = aws.ToString(container.ContainerInstanceArn)
|
||||
// Disallow Instance IDs of the form mi-*
|
||||
// This prevents considering external instances in ECS Anywhere setups
|
||||
// and getting InvalidInstanceID.Malformed error when calling the describe-instances endpoint.
|
||||
if strings.HasPrefix(aws.StringValue(container.Ec2InstanceId), "mi-") {
|
||||
if strings.HasPrefix(aws.ToString(container.Ec2InstanceId), "mi-") {
|
||||
continue
|
||||
}
|
||||
|
||||
instanceArns = append(instanceArns, container.Ec2InstanceId)
|
||||
if container.Ec2InstanceId != nil {
|
||||
instanceArns = append(instanceArns, *container.Ec2InstanceId)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(instanceArns) > 0 {
|
||||
for _, ids := range p.chunkIDs(instanceArns) {
|
||||
for ids := range chunkIDs(instanceArns) {
|
||||
input := &ec2.DescribeInstancesInput{
|
||||
InstanceIds: ids,
|
||||
}
|
||||
|
||||
err := client.ec2.DescribeInstancesPagesWithContext(ctx, input, func(page *ec2.DescribeInstancesOutput, lastPage bool) bool {
|
||||
if len(page.Reservations) > 0 {
|
||||
for _, r := range page.Reservations {
|
||||
for _, i := range r.Instances {
|
||||
if i.InstanceId != nil {
|
||||
ec2Instances[instanceIDs[aws.StringValue(i.InstanceId)]] = i
|
||||
}
|
||||
paginator := ec2.NewDescribeInstancesPaginator(client.ec2, input)
|
||||
for paginator.HasMorePages() {
|
||||
page, err := paginator.NextPage(ctx)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("describing instances: %w", err)
|
||||
}
|
||||
for _, r := range page.Reservations {
|
||||
for _, i := range r.Instances {
|
||||
if i.InstanceId != nil {
|
||||
ec2Instances[instanceIDs[aws.ToString(i.InstanceId)]] = i
|
||||
}
|
||||
}
|
||||
}
|
||||
return !lastPage
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("describing instances: %w", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -536,16 +504,16 @@ func (p *Provider) lookupEc2Instances(ctx context.Context, client *awsClient, cl
|
|||
return ec2Instances, nil
|
||||
}
|
||||
|
||||
func (p *Provider) lookupTaskDefinitions(ctx context.Context, client *awsClient, taskDefArns map[string]*ecs.Task) (map[string]*ecs.TaskDefinition, error) {
|
||||
func (p *Provider) lookupTaskDefinitions(ctx context.Context, client *awsClient, taskDefArns map[string]ecstypes.Task) (map[string]*ecstypes.TaskDefinition, error) {
|
||||
logger := log.Ctx(ctx)
|
||||
taskDef := make(map[string]*ecs.TaskDefinition)
|
||||
taskDef := make(map[string]*ecstypes.TaskDefinition)
|
||||
|
||||
for arn, task := range taskDefArns {
|
||||
if definition, ok := existingTaskDefCache.Get(arn); ok {
|
||||
taskDef[arn] = definition.(*ecs.TaskDefinition)
|
||||
taskDef[arn] = definition.(*ecstypes.TaskDefinition)
|
||||
logger.Debug().Msgf("Found cached task definition for %s. Skipping the call", arn)
|
||||
} else {
|
||||
resp, err := client.ecs.DescribeTaskDefinitionWithContext(ctx, &ecs.DescribeTaskDefinitionInput{
|
||||
resp, err := client.ecs.DescribeTaskDefinition(ctx, &ecs.DescribeTaskDefinitionInput{
|
||||
TaskDefinition: task.TaskDefinitionArn,
|
||||
})
|
||||
if err != nil {
|
||||
|
@ -561,16 +529,6 @@ func (p *Provider) lookupTaskDefinitions(ctx context.Context, client *awsClient,
|
|||
|
||||
// chunkIDs ECS expects no more than 100 parameters be passed to a API call;
|
||||
// thus, pack each string into an array capped at 100 elements.
|
||||
func (p *Provider) chunkIDs(ids []*string) [][]*string {
|
||||
var chunked [][]*string
|
||||
for i := 0; i < len(ids); i += 100 {
|
||||
var sliceEnd int
|
||||
if i+100 < len(ids) {
|
||||
sliceEnd = i + 100
|
||||
} else {
|
||||
sliceEnd = len(ids)
|
||||
}
|
||||
chunked = append(chunked, ids[i:sliceEnd])
|
||||
}
|
||||
return chunked
|
||||
func chunkIDs(ids []string) iter.Seq[[]string] {
|
||||
return slices.Chunk(ids, 100)
|
||||
}
|
||||
|
|
|
@ -3,13 +3,10 @@ package ecs
|
|||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/aws/aws-sdk-go/aws"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestChunkIDs(t *testing.T) {
|
||||
provider := &Provider{}
|
||||
|
||||
testCases := []struct {
|
||||
desc string
|
||||
count int
|
||||
|
@ -71,13 +68,13 @@ func TestChunkIDs(t *testing.T) {
|
|||
t.Run(test.desc, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
var IDs []*string
|
||||
var IDs []string
|
||||
for range test.count {
|
||||
IDs = append(IDs, aws.String("a"))
|
||||
IDs = append(IDs, "a")
|
||||
}
|
||||
|
||||
var outCount []int
|
||||
for _, el := range provider.chunkIDs(IDs) {
|
||||
for el := range chunkIDs(IDs) {
|
||||
outCount = append(outCount, len(el))
|
||||
}
|
||||
|
||||
|
|
|
@ -190,6 +190,9 @@ func (p *Provider) loadWRRService(ctx context.Context, listener gatewayListener,
|
|||
svcName, errCondition := p.loadService(ctx, listener, conf, route, backendRef)
|
||||
weight := ptr.To(int(ptr.Deref(backendRef.Weight, 1)))
|
||||
if errCondition != nil {
|
||||
log.Ctx(ctx).Error().
|
||||
Msgf("Unable to load HTTPRoute backend: %s", errCondition.Message)
|
||||
|
||||
condition = errCondition
|
||||
wrr.Services = append(wrr.Services, dynamic.WRRService{
|
||||
Name: svcName,
|
||||
|
@ -272,92 +275,14 @@ func (p *Provider) loadService(ctx context.Context, listener gatewayListener, co
|
|||
portStr := strconv.FormatInt(int64(port), 10)
|
||||
serviceName = provider.Normalize(serviceName + "-" + portStr)
|
||||
|
||||
lb, svcPort, errCondition := p.loadHTTPServers(namespace, route, backendRef)
|
||||
lb, st, errCondition := p.loadHTTPServers(ctx, namespace, route, backendRef, listener)
|
||||
if errCondition != nil {
|
||||
return serviceName, errCondition
|
||||
}
|
||||
|
||||
if !p.ExperimentalChannel {
|
||||
conf.HTTP.Services[serviceName] = &dynamic.Service{LoadBalancer: lb}
|
||||
|
||||
return serviceName, nil
|
||||
}
|
||||
|
||||
servicePolicies, err := p.client.ListBackendTLSPoliciesForService(namespace, string(backendRef.Name))
|
||||
if err != nil {
|
||||
return serviceName, &metav1.Condition{
|
||||
Type: string(gatev1.RouteConditionResolvedRefs),
|
||||
Status: metav1.ConditionFalse,
|
||||
ObservedGeneration: route.Generation,
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: string(gatev1.RouteReasonRefNotPermitted),
|
||||
Message: fmt.Sprintf("Cannot list BackendTLSPolicies for Service %s/%s: %s", namespace, string(backendRef.Name), err),
|
||||
}
|
||||
}
|
||||
|
||||
var matchedPolicy *gatev1alpha3.BackendTLSPolicy
|
||||
for _, policy := range servicePolicies {
|
||||
matched := false
|
||||
for _, targetRef := range policy.Spec.TargetRefs {
|
||||
if targetRef.SectionName == nil || svcPort.Name == string(*targetRef.SectionName) {
|
||||
matchedPolicy = policy
|
||||
matched = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// If the policy targets the service, but doesn't match any port.
|
||||
if !matched {
|
||||
// update policy status
|
||||
status := gatev1alpha2.PolicyStatus{
|
||||
Ancestors: []gatev1alpha2.PolicyAncestorStatus{{
|
||||
AncestorRef: gatev1alpha2.ParentReference{
|
||||
Group: ptr.To(gatev1.Group(groupGateway)),
|
||||
Kind: ptr.To(gatev1.Kind(kindGateway)),
|
||||
Namespace: ptr.To(gatev1.Namespace(namespace)),
|
||||
Name: gatev1.ObjectName(listener.GWName),
|
||||
SectionName: ptr.To(gatev1.SectionName(listener.Name)),
|
||||
},
|
||||
ControllerName: controllerName,
|
||||
Conditions: []metav1.Condition{{
|
||||
Type: string(gatev1.RouteConditionResolvedRefs),
|
||||
Status: metav1.ConditionFalse,
|
||||
ObservedGeneration: route.Generation,
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: string(gatev1.RouteReasonBackendNotFound),
|
||||
Message: fmt.Sprintf("BackendTLSPolicy has no valid TargetRef for Service %s/%s", namespace, string(backendRef.Name)),
|
||||
}},
|
||||
}},
|
||||
}
|
||||
|
||||
if err := p.client.UpdateBackendTLSPolicyStatus(ctx, ktypes.NamespacedName{Namespace: policy.Namespace, Name: policy.Name}, status); err != nil {
|
||||
logger := log.Ctx(ctx).With().
|
||||
Str("http_route", route.Name).
|
||||
Str("namespace", route.Namespace).Logger()
|
||||
logger.Warn().
|
||||
Err(err).
|
||||
Msg("Unable to update TLSRoute status")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if matchedPolicy != nil {
|
||||
st, err := p.loadServersTransport(namespace, *matchedPolicy)
|
||||
if err != nil {
|
||||
return serviceName, &metav1.Condition{
|
||||
Type: string(gatev1.RouteConditionResolvedRefs),
|
||||
Status: metav1.ConditionFalse,
|
||||
ObservedGeneration: route.Generation,
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: string(gatev1.RouteReasonRefNotPermitted),
|
||||
Message: fmt.Sprintf("Cannot apply BackendTLSPolicy for Service %s/%s: %s", namespace, string(backendRef.Name), err),
|
||||
}
|
||||
}
|
||||
|
||||
if st != nil {
|
||||
lb.ServersTransport = serviceName
|
||||
conf.HTTP.ServersTransports[serviceName] = st
|
||||
}
|
||||
if st != nil {
|
||||
lb.ServersTransport = serviceName
|
||||
conf.HTTP.ServersTransports[serviceName] = st
|
||||
}
|
||||
|
||||
conf.HTTP.Services[serviceName] = &dynamic.Service{LoadBalancer: lb}
|
||||
|
@ -473,10 +398,10 @@ func (p *Provider) loadHTTPRouteFilterExtensionRef(namespace string, extensionRe
|
|||
return filterFunc(string(extensionRef.Name), namespace)
|
||||
}
|
||||
|
||||
func (p *Provider) loadHTTPServers(namespace string, route *gatev1.HTTPRoute, backendRef gatev1.HTTPBackendRef) (*dynamic.ServersLoadBalancer, corev1.ServicePort, *metav1.Condition) {
|
||||
func (p *Provider) loadHTTPServers(ctx context.Context, namespace string, route *gatev1.HTTPRoute, backendRef gatev1.HTTPBackendRef, listener gatewayListener) (*dynamic.ServersLoadBalancer, *dynamic.ServersTransport, *metav1.Condition) {
|
||||
backendAddresses, svcPort, err := p.getBackendAddresses(namespace, backendRef.BackendRef)
|
||||
if err != nil {
|
||||
return nil, corev1.ServicePort{}, &metav1.Condition{
|
||||
return nil, nil, &metav1.Condition{
|
||||
Type: string(gatev1.RouteConditionResolvedRefs),
|
||||
Status: metav1.ConditionFalse,
|
||||
ObservedGeneration: route.Generation,
|
||||
|
@ -486,27 +411,104 @@ func (p *Provider) loadHTTPServers(namespace string, route *gatev1.HTTPRoute, ba
|
|||
}
|
||||
}
|
||||
|
||||
protocol, err := getHTTPServiceProtocol(svcPort)
|
||||
if err != nil {
|
||||
return nil, corev1.ServicePort{}, &metav1.Condition{
|
||||
Type: string(gatev1.RouteConditionResolvedRefs),
|
||||
Status: metav1.ConditionFalse,
|
||||
ObservedGeneration: route.Generation,
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: string(gatev1.RouteReasonUnsupportedProtocol),
|
||||
Message: fmt.Sprintf("Cannot load HTTPBackendRef %s/%s: %s", namespace, backendRef.Name, err),
|
||||
var st *dynamic.ServersTransport
|
||||
var protocol string
|
||||
if p.ExperimentalChannel {
|
||||
servicePolicies, err := p.client.ListBackendTLSPoliciesForService(namespace, string(backendRef.Name))
|
||||
if err != nil {
|
||||
return nil, nil, &metav1.Condition{
|
||||
Type: string(gatev1.RouteConditionResolvedRefs),
|
||||
Status: metav1.ConditionFalse,
|
||||
ObservedGeneration: route.Generation,
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: string(gatev1.RouteReasonRefNotPermitted),
|
||||
Message: fmt.Sprintf("Cannot list BackendTLSPolicies for Service %s/%s: %s", namespace, string(backendRef.Name), err),
|
||||
}
|
||||
}
|
||||
|
||||
var matchedPolicy *gatev1alpha3.BackendTLSPolicy
|
||||
for _, policy := range servicePolicies {
|
||||
matched := false
|
||||
for _, targetRef := range policy.Spec.TargetRefs {
|
||||
if targetRef.SectionName == nil || svcPort.Name == string(*targetRef.SectionName) {
|
||||
matchedPolicy = policy
|
||||
matched = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// If the policy targets the service, but doesn't match any port.
|
||||
if !matched {
|
||||
// update policy status
|
||||
status := gatev1alpha2.PolicyStatus{
|
||||
Ancestors: []gatev1alpha2.PolicyAncestorStatus{{
|
||||
AncestorRef: gatev1alpha2.ParentReference{
|
||||
Group: ptr.To(gatev1.Group(groupGateway)),
|
||||
Kind: ptr.To(gatev1.Kind(kindGateway)),
|
||||
Namespace: ptr.To(gatev1.Namespace(namespace)),
|
||||
Name: gatev1.ObjectName(listener.GWName),
|
||||
SectionName: ptr.To(gatev1.SectionName(listener.Name)),
|
||||
},
|
||||
ControllerName: controllerName,
|
||||
Conditions: []metav1.Condition{{
|
||||
Type: string(gatev1.RouteConditionResolvedRefs),
|
||||
Status: metav1.ConditionFalse,
|
||||
ObservedGeneration: route.Generation,
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: string(gatev1.RouteReasonBackendNotFound),
|
||||
Message: fmt.Sprintf("BackendTLSPolicy has no valid TargetRef for Service %s/%s", namespace, string(backendRef.Name)),
|
||||
}},
|
||||
}},
|
||||
}
|
||||
|
||||
if err := p.client.UpdateBackendTLSPolicyStatus(ctx, ktypes.NamespacedName{Namespace: policy.Namespace, Name: policy.Name}, status); err != nil {
|
||||
log.Ctx(ctx).Warn().Err(err).
|
||||
Msg("Unable to update BackendTLSPolicy status")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if matchedPolicy != nil {
|
||||
st, err = p.loadServersTransport(namespace, *matchedPolicy)
|
||||
if err != nil {
|
||||
return nil, nil, &metav1.Condition{
|
||||
Type: string(gatev1.RouteConditionResolvedRefs),
|
||||
Status: metav1.ConditionFalse,
|
||||
ObservedGeneration: route.Generation,
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: string(gatev1.RouteReasonRefNotPermitted),
|
||||
Message: fmt.Sprintf("Cannot apply BackendTLSPolicy for Service %s/%s: %s", namespace, string(backendRef.Name), err),
|
||||
}
|
||||
}
|
||||
// A backend TLS policy has been found for the service, a serversTransport configuration has been created, use/force HTTPS.
|
||||
protocol = "https"
|
||||
}
|
||||
}
|
||||
|
||||
lb := &dynamic.ServersLoadBalancer{}
|
||||
lb.SetDefaults()
|
||||
|
||||
// Guess the protocol from the service port if not set by the backend TLS policy
|
||||
if protocol == "" {
|
||||
protocol, err = getHTTPServiceProtocol(svcPort)
|
||||
if err != nil {
|
||||
return nil, nil, &metav1.Condition{
|
||||
Type: string(gatev1.RouteConditionResolvedRefs),
|
||||
Status: metav1.ConditionFalse,
|
||||
ObservedGeneration: route.Generation,
|
||||
LastTransitionTime: metav1.Now(),
|
||||
Reason: string(gatev1.RouteReasonUnsupportedProtocol),
|
||||
Message: fmt.Sprintf("Cannot load HTTPBackendRef %s/%s: %s", namespace, backendRef.Name, err),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, ba := range backendAddresses {
|
||||
lb.Servers = append(lb.Servers, dynamic.Server{
|
||||
URL: fmt.Sprintf("%s://%s", protocol, net.JoinHostPort(ba.IP, strconv.Itoa(int(ba.Port)))),
|
||||
})
|
||||
}
|
||||
return lb, svcPort, nil
|
||||
return lb, st, nil
|
||||
}
|
||||
|
||||
func (p *Provider) loadServersTransport(namespace string, policy gatev1alpha3.BackendTLSPolicy) (*dynamic.ServersTransport, error) {
|
||||
|
|
|
@ -2309,10 +2309,10 @@ func TestLoadHTTPRoutes(t *testing.T) {
|
|||
Strategy: dynamic.BalancerStrategyWRR,
|
||||
Servers: []dynamic.Server{
|
||||
{
|
||||
URL: "http://10.10.0.1:80",
|
||||
URL: "https://10.10.0.1:80",
|
||||
},
|
||||
{
|
||||
URL: "http://10.10.0.2:80",
|
||||
URL: "https://10.10.0.2:80",
|
||||
},
|
||||
},
|
||||
PassHostHeader: ptr.To(true),
|
||||
|
@ -2381,10 +2381,10 @@ func TestLoadHTTPRoutes(t *testing.T) {
|
|||
Strategy: dynamic.BalancerStrategyWRR,
|
||||
Servers: []dynamic.Server{
|
||||
{
|
||||
URL: "http://10.10.0.1:80",
|
||||
URL: "https://10.10.0.1:80",
|
||||
},
|
||||
{
|
||||
URL: "http://10.10.0.2:80",
|
||||
URL: "https://10.10.0.2:80",
|
||||
},
|
||||
},
|
||||
PassHostHeader: ptr.To(true),
|
||||
|
|
|
@ -22,6 +22,7 @@ import (
|
|||
"github.com/traefik/traefik/v3/pkg/middlewares/capture"
|
||||
metricsMiddle "github.com/traefik/traefik/v3/pkg/middlewares/metrics"
|
||||
"github.com/traefik/traefik/v3/pkg/middlewares/observability"
|
||||
"github.com/traefik/traefik/v3/pkg/middlewares/retry"
|
||||
"github.com/traefik/traefik/v3/pkg/proxy/httputil"
|
||||
"github.com/traefik/traefik/v3/pkg/safe"
|
||||
"github.com/traefik/traefik/v3/pkg/server/cookie"
|
||||
|
@ -368,6 +369,10 @@ func (m *Manager) getLoadBalancerServiceHandler(ctx context.Context, serviceName
|
|||
if err != nil {
|
||||
return nil, fmt.Errorf("error building proxy for server URL %s: %w", server.URL, err)
|
||||
}
|
||||
// The retry wrapping must be done just before the proxy handler,
|
||||
// to make sure that the retry will not be triggered/disabled by
|
||||
// middlewares in the chain.
|
||||
proxy = retry.WrapHandler(proxy)
|
||||
|
||||
// Prevents from enabling observability for internal resources.
|
||||
|
||||
|
|
|
@ -151,7 +151,7 @@ func (t *Tracer) Start(ctx context.Context, spanName string, opts ...trace.SpanS
|
|||
|
||||
spanCtx, span := t.Tracer.Start(ctx, spanName, opts...)
|
||||
|
||||
wrappedSpan := &Span{Span: span, tracerProvider: &TracerProvider{tracer: t}}
|
||||
wrappedSpan := &Span{Span: span, tracerProvider: &TracerProvider{TracerProvider: span.TracerProvider(), tracer: t}}
|
||||
|
||||
return trace.ContextWithSpan(spanCtx, wrappedSpan), wrappedSpan
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@ package tracing
|
|||
|
||||
import (
|
||||
"compress/gzip"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"io"
|
||||
"net/http"
|
||||
|
@ -392,3 +393,25 @@ func TestTracing(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestTracerProvider ensures that Tracer returns a valid TracerProvider
|
||||
// when using the default Traefik Tracer and a custom one.
|
||||
func TestTracerProvider(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
otlpConfig := &types.OTelTracing{}
|
||||
otlpConfig.SetDefaults()
|
||||
|
||||
config := &static.Tracing{OTLP: otlpConfig}
|
||||
tracer, closer, err := NewTracing(config)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
closer.Close()
|
||||
|
||||
_, span := tracer.Start(context.Background(), "test")
|
||||
defer span.End()
|
||||
|
||||
span.TracerProvider().Tracer("github.com/traefik/traefik")
|
||||
span.TracerProvider().Tracer("other")
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue