1
0
Fork 0

Merge branch v2.11 into v3.1

This commit is contained in:
kevinpollet 2024-09-16 16:24:08 +02:00
commit 093989fc14
No known key found for this signature in database
GPG key ID: 0C9A5DDD1B292453
66 changed files with 904 additions and 136 deletions

View file

@ -2,6 +2,7 @@ package accesslog
import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
@ -24,6 +25,7 @@ import (
"github.com/stretchr/testify/require"
ptypes "github.com/traefik/paerser/types"
"github.com/traefik/traefik/v3/pkg/middlewares/capture"
"github.com/traefik/traefik/v3/pkg/middlewares/recovery"
"github.com/traefik/traefik/v3/pkg/types"
)
@ -164,7 +166,7 @@ func TestLoggerHeaderFields(t *testing.T) {
},
},
{
desc: "with case insensitive match on header name",
desc: "with case-insensitive match on header name",
header: "User-Agent",
expected: types.AccessLogKeep,
accessLogFields: types.AccessLogFields{
@ -465,6 +467,32 @@ func TestLoggerJSON(t *testing.T) {
RequestRefererHeader: assertString(testReferer),
},
},
{
desc: "fields and headers with unconventional letter case",
config: &types.AccessLog{
FilePath: "",
Format: JSONFormat,
Fields: &types.AccessLogFields{
DefaultMode: "drop",
Names: map[string]string{
"rEqUeStHoSt": "keep",
},
Headers: &types.FieldHeaders{
DefaultMode: "drop",
Names: map[string]string{
"ReFeReR": "keep",
},
},
},
},
expected: map[string]func(t *testing.T, value interface{}){
RequestHost: assertString(testHostname),
"level": assertString("info"),
"msg": assertString(""),
"time": assertNotEmpty(),
RequestRefererHeader: assertString(testReferer),
},
},
}
for _, test := range testCases {
@ -496,6 +524,64 @@ func TestLoggerJSON(t *testing.T) {
}
}
func TestLogger_AbortedRequest(t *testing.T) {
expected := map[string]func(t *testing.T, value interface{}){
RequestContentSize: assertFloat64(0),
RequestHost: assertString(testHostname),
RequestAddr: assertString(testHostname),
RequestMethod: assertString(testMethod),
RequestPath: assertString(""),
RequestProtocol: assertString(testProto),
RequestScheme: assertString(testScheme),
RequestPort: assertString("-"),
DownstreamStatus: assertFloat64(float64(200)),
DownstreamContentSize: assertFloat64(float64(40)),
RequestRefererHeader: assertString(testReferer),
RequestUserAgentHeader: assertString(testUserAgent),
ServiceURL: assertString("http://stream"),
ServiceAddr: assertString("127.0.0.1"),
ServiceName: assertString("stream"),
ClientUsername: assertString(testUsername),
ClientHost: assertString(testHostname),
ClientPort: assertString(strconv.Itoa(testPort)),
ClientAddr: assertString(fmt.Sprintf("%s:%d", testHostname, testPort)),
"level": assertString("info"),
"msg": assertString(""),
RequestCount: assertFloat64NotZero(),
Duration: assertFloat64NotZero(),
Overhead: assertFloat64NotZero(),
RetryAttempts: assertFloat64(float64(0)),
"time": assertNotEmpty(),
StartLocal: assertNotEmpty(),
StartUTC: assertNotEmpty(),
"downstream_Content-Type": assertString("text/plain"),
"downstream_Transfer-Encoding": assertString("chunked"),
"downstream_Cache-Control": assertString("no-cache"),
}
config := &types.AccessLog{
FilePath: filepath.Join(t.TempDir(), logFileNameSuffix),
Format: JSONFormat,
}
doLoggingWithAbortedStream(t, config)
logData, err := os.ReadFile(config.FilePath)
require.NoError(t, err)
jsonData := make(map[string]interface{})
err = json.Unmarshal(logData, &jsonData)
require.NoError(t, err)
assert.Equal(t, len(expected), len(jsonData))
for field, assertion := range expected {
assertion(t, jsonData[field])
if t.Failed() {
return
}
}
}
func TestNewLogHandlerOutputStdout(t *testing.T) {
testCases := []struct {
desc string
@ -832,3 +918,89 @@ func logWriterTestHandlerFunc(rw http.ResponseWriter, r *http.Request) {
rw.WriteHeader(testStatus)
}
func doLoggingWithAbortedStream(t *testing.T, config *types.AccessLog) {
t.Helper()
logger, err := NewHandler(config)
require.NoError(t, err)
t.Cleanup(func() {
err := logger.Close()
require.NoError(t, err)
})
if config.FilePath != "" {
_, err = os.Stat(config.FilePath)
require.NoError(t, err, "logger should create "+config.FilePath)
}
reqContext, cancelRequest := context.WithCancel(context.Background())
req := &http.Request{
Header: map[string][]string{
"User-Agent": {testUserAgent},
"Referer": {testReferer},
},
Proto: testProto,
Host: testHostname,
Method: testMethod,
RemoteAddr: fmt.Sprintf("%s:%d", testHostname, testPort),
URL: &url.URL{
User: url.UserPassword(testUsername, ""),
},
Body: nil,
}
req = req.WithContext(reqContext)
chain := alice.New()
chain = chain.Append(func(next http.Handler) (http.Handler, error) {
return recovery.New(context.Background(), next)
})
chain = chain.Append(capture.Wrap)
chain = chain.Append(WrapHandler(logger))
service := NewFieldHandler(http.HandlerFunc(streamBackend), ServiceURL, "http://stream", nil)
service = NewFieldHandler(service, ServiceAddr, "127.0.0.1", nil)
service = NewFieldHandler(service, ServiceName, "stream", AddServiceFields)
handler, err := chain.Then(service)
require.NoError(t, err)
go func() {
time.Sleep(499 * time.Millisecond)
cancelRequest()
}()
handler.ServeHTTP(httptest.NewRecorder(), req)
}
func streamBackend(rw http.ResponseWriter, r *http.Request) {
// Get the Flusher to flush the response to the client
flusher, ok := rw.(http.Flusher)
if !ok {
http.Error(rw, "Streaming unsupported!", http.StatusInternalServerError)
return
}
// Set the headers for streaming
rw.Header().Set("Content-Type", "text/plain")
rw.Header().Set("Transfer-Encoding", "chunked")
rw.Header().Set("Cache-Control", "no-cache")
for {
time.Sleep(100 * time.Millisecond)
select {
case <-r.Context().Done():
panic(http.ErrAbortHandler)
default:
if _, err := fmt.Fprint(rw, "FOOBAR!!!!"); err != nil {
http.Error(rw, err.Error(), http.StatusInternalServerError)
return
}
flusher.Flush()
}
}
}