Add traffic size metrics
Co-authored-by: OmarElawady <omarelawady1998@gmail.com> Co-authored-by: Mathieu Lonjaret <mathieu.lonjaret@gmail.com> Co-authored-by: Romain <rtribotte@users.noreply.github.com>
This commit is contained in:
parent
10528c973a
commit
d578ed7327
26 changed files with 1125 additions and 339 deletions
|
@ -1,20 +0,0 @@
|
|||
package accesslog
|
||||
|
||||
import "io"
|
||||
|
||||
type captureRequestReader struct {
|
||||
// source ReadCloser from where the request body is read.
|
||||
source io.ReadCloser
|
||||
// count Counts the number of bytes read (when captureRequestReader.Read is called).
|
||||
count int64
|
||||
}
|
||||
|
||||
func (r *captureRequestReader) Read(p []byte) (int, error) {
|
||||
n, err := r.source.Read(p)
|
||||
r.count += int64(n)
|
||||
return n, err
|
||||
}
|
||||
|
||||
func (r *captureRequestReader) Close() error {
|
||||
return r.source.Close()
|
||||
}
|
|
@ -1,83 +0,0 @@
|
|||
package accesslog
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
|
||||
"github.com/traefik/traefik/v2/pkg/middlewares"
|
||||
)
|
||||
|
||||
var _ middlewares.Stateful = &captureResponseWriterWithCloseNotify{}
|
||||
|
||||
type capturer interface {
|
||||
http.ResponseWriter
|
||||
Size() int64
|
||||
Status() int
|
||||
}
|
||||
|
||||
func newCaptureResponseWriter(rw http.ResponseWriter) capturer {
|
||||
capt := &captureResponseWriter{rw: rw}
|
||||
if _, ok := rw.(http.CloseNotifier); !ok {
|
||||
return capt
|
||||
}
|
||||
return &captureResponseWriterWithCloseNotify{capt}
|
||||
}
|
||||
|
||||
// captureResponseWriter is a wrapper of type http.ResponseWriter
|
||||
// that tracks request status and size.
|
||||
type captureResponseWriter struct {
|
||||
rw http.ResponseWriter
|
||||
status int
|
||||
size int64
|
||||
}
|
||||
|
||||
type captureResponseWriterWithCloseNotify struct {
|
||||
*captureResponseWriter
|
||||
}
|
||||
|
||||
// CloseNotify returns a channel that receives at most a
|
||||
// single value (true) when the client connection has gone away.
|
||||
func (r *captureResponseWriterWithCloseNotify) CloseNotify() <-chan bool {
|
||||
return r.rw.(http.CloseNotifier).CloseNotify()
|
||||
}
|
||||
|
||||
func (crw *captureResponseWriter) Header() http.Header {
|
||||
return crw.rw.Header()
|
||||
}
|
||||
|
||||
func (crw *captureResponseWriter) Write(b []byte) (int, error) {
|
||||
if crw.status == 0 {
|
||||
crw.status = http.StatusOK
|
||||
}
|
||||
size, err := crw.rw.Write(b)
|
||||
crw.size += int64(size)
|
||||
return size, err
|
||||
}
|
||||
|
||||
func (crw *captureResponseWriter) WriteHeader(s int) {
|
||||
crw.rw.WriteHeader(s)
|
||||
crw.status = s
|
||||
}
|
||||
|
||||
func (crw *captureResponseWriter) Flush() {
|
||||
if f, ok := crw.rw.(http.Flusher); ok {
|
||||
f.Flush()
|
||||
}
|
||||
}
|
||||
|
||||
func (crw *captureResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
|
||||
if h, ok := crw.rw.(http.Hijacker); ok {
|
||||
return h.Hijack()
|
||||
}
|
||||
return nil, nil, fmt.Errorf("not a hijacker: %T", crw.rw)
|
||||
}
|
||||
|
||||
func (crw *captureResponseWriter) Status() int {
|
||||
return crw.status
|
||||
}
|
||||
|
||||
func (crw *captureResponseWriter) Size() int64 {
|
||||
return crw.size
|
||||
}
|
|
@ -1,50 +0,0 @@
|
|||
package accesslog
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
type rwWithCloseNotify struct {
|
||||
*httptest.ResponseRecorder
|
||||
}
|
||||
|
||||
func (r *rwWithCloseNotify) CloseNotify() <-chan bool {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func TestCloseNotifier(t *testing.T) {
|
||||
testCases := []struct {
|
||||
rw http.ResponseWriter
|
||||
desc string
|
||||
implementsCloseNotifier bool
|
||||
}{
|
||||
{
|
||||
rw: httptest.NewRecorder(),
|
||||
desc: "does not implement CloseNotifier",
|
||||
implementsCloseNotifier: false,
|
||||
},
|
||||
{
|
||||
rw: &rwWithCloseNotify{httptest.NewRecorder()},
|
||||
desc: "implements CloseNotifier",
|
||||
implementsCloseNotifier: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, test := range testCases {
|
||||
test := test
|
||||
t.Run(test.desc, func(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
_, ok := test.rw.(http.CloseNotifier)
|
||||
assert.Equal(t, test.implementsCloseNotifier, ok)
|
||||
|
||||
rw := newCaptureResponseWriter(test.rw)
|
||||
_, impl := rw.(http.CloseNotifier)
|
||||
assert.Equal(t, test.implementsCloseNotifier, impl)
|
||||
})
|
||||
}
|
||||
}
|
|
@ -4,6 +4,8 @@ import (
|
|||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/traefik/traefik/v2/pkg/log"
|
||||
"github.com/traefik/traefik/v2/pkg/middlewares/capture"
|
||||
"github.com/vulcand/oxy/utils"
|
||||
)
|
||||
|
||||
|
@ -49,16 +51,24 @@ func AddServiceFields(rw http.ResponseWriter, req *http.Request, next http.Handl
|
|||
|
||||
// AddOriginFields add origin fields.
|
||||
func AddOriginFields(rw http.ResponseWriter, req *http.Request, next http.Handler, data *LogData) {
|
||||
crw := newCaptureResponseWriter(rw)
|
||||
start := time.Now().UTC()
|
||||
|
||||
next.ServeHTTP(crw, req)
|
||||
next.ServeHTTP(rw, req)
|
||||
|
||||
// use UTC to handle switchover of daylight saving correctly
|
||||
data.Core[OriginDuration] = time.Now().UTC().Sub(start)
|
||||
data.Core[OriginStatus] = crw.Status()
|
||||
// make copy of headers so we can ensure there is no subsequent mutation during response processing
|
||||
// make copy of headers, so we can ensure there is no subsequent mutation
|
||||
// during response processing
|
||||
data.OriginResponse = make(http.Header)
|
||||
utils.CopyHeaders(data.OriginResponse, crw.Header())
|
||||
data.Core[OriginContentSize] = crw.Size()
|
||||
utils.CopyHeaders(data.OriginResponse, rw.Header())
|
||||
|
||||
ctx := req.Context()
|
||||
capt, err := capture.FromContext(ctx)
|
||||
if err != nil {
|
||||
log.FromContext(log.With(ctx, log.Str(log.MiddlewareType, "AccessLogs"))).Errorf("Could not get Capture: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
data.Core[OriginStatus] = capt.StatusCode()
|
||||
data.Core[OriginContentSize] = capt.ResponseSize()
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ import (
|
|||
"github.com/sirupsen/logrus"
|
||||
ptypes "github.com/traefik/paerser/types"
|
||||
"github.com/traefik/traefik/v2/pkg/log"
|
||||
"github.com/traefik/traefik/v2/pkg/middlewares/capture"
|
||||
traefiktls "github.com/traefik/traefik/v2/pkg/tls"
|
||||
"github.com/traefik/traefik/v2/pkg/types"
|
||||
)
|
||||
|
@ -182,13 +183,17 @@ func (h *Handler) ServeHTTP(rw http.ResponseWriter, req *http.Request, next http
|
|||
},
|
||||
}
|
||||
|
||||
reqWithDataTable := req.WithContext(context.WithValue(req.Context(), DataTableKey, logDataTable))
|
||||
defer func() {
|
||||
if h.config.BufferingSize > 0 {
|
||||
h.logHandlerChan <- handlerParams{
|
||||
logDataTable: logDataTable,
|
||||
}
|
||||
return
|
||||
}
|
||||
h.logTheRoundTrip(logDataTable)
|
||||
}()
|
||||
|
||||
var crr *captureRequestReader
|
||||
if req.Body != nil {
|
||||
crr = &captureRequestReader{source: req.Body, count: 0}
|
||||
reqWithDataTable.Body = crr
|
||||
}
|
||||
reqWithDataTable := req.WithContext(context.WithValue(req.Context(), DataTableKey, logDataTable))
|
||||
|
||||
core[RequestCount] = nextRequestCount()
|
||||
if req.Host != "" {
|
||||
|
@ -222,30 +227,26 @@ func (h *Handler) ServeHTTP(rw http.ResponseWriter, req *http.Request, next http
|
|||
core[ClientHost] = forwardedFor
|
||||
}
|
||||
|
||||
crw := newCaptureResponseWriter(rw)
|
||||
|
||||
next.ServeHTTP(crw, reqWithDataTable)
|
||||
next.ServeHTTP(rw, reqWithDataTable)
|
||||
|
||||
if _, ok := core[ClientUsername]; !ok {
|
||||
core[ClientUsername] = usernameIfPresent(reqWithDataTable.URL)
|
||||
}
|
||||
|
||||
logDataTable.DownstreamResponse = downstreamResponse{
|
||||
headers: crw.Header().Clone(),
|
||||
status: crw.Status(),
|
||||
size: crw.Size(),
|
||||
}
|
||||
if crr != nil {
|
||||
logDataTable.Request.size = crr.count
|
||||
headers: rw.Header().Clone(),
|
||||
}
|
||||
|
||||
if h.config.BufferingSize > 0 {
|
||||
h.logHandlerChan <- handlerParams{
|
||||
logDataTable: logDataTable,
|
||||
}
|
||||
} else {
|
||||
h.logTheRoundTrip(logDataTable)
|
||||
ctx := req.Context()
|
||||
capt, err := capture.FromContext(ctx)
|
||||
if err != nil {
|
||||
log.FromContext(log.With(ctx, log.Str(log.MiddlewareType, "AccessLogs"))).Errorf("Could not get Capture: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
logDataTable.DownstreamResponse.status = capt.StatusCode()
|
||||
logDataTable.DownstreamResponse.size = capt.ResponseSize()
|
||||
logDataTable.Request.size = capt.RequestSize()
|
||||
}
|
||||
|
||||
// Close closes the Logger (i.e. the file, drain logHandlerChan, etc).
|
||||
|
|
|
@ -1,9 +1,11 @@
|
|||
package accesslog
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
|
@ -14,9 +16,11 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/containous/alice"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
ptypes "github.com/traefik/paerser/types"
|
||||
"github.com/traefik/traefik/v2/pkg/middlewares/capture"
|
||||
"github.com/traefik/traefik/v2/pkg/types"
|
||||
)
|
||||
|
||||
|
@ -46,23 +50,29 @@ func TestLogRotation(t *testing.T) {
|
|||
|
||||
config := &types.AccessLog{FilePath: fileName, Format: CommonFormat}
|
||||
logHandler, err := NewHandler(config)
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating new log handler: %s", err)
|
||||
}
|
||||
defer logHandler.Close()
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
err := logHandler.Close()
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
chain := alice.New()
|
||||
chain = chain.Append(capture.WrapHandler(&capture.Handler{}))
|
||||
chain = chain.Append(WrapHandler(logHandler))
|
||||
handler, err := chain.Then(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||
rw.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
require.NoError(t, err)
|
||||
|
||||
recorder := httptest.NewRecorder()
|
||||
req := httptest.NewRequest(http.MethodGet, "http://localhost", nil)
|
||||
next := func(rw http.ResponseWriter, req *http.Request) {
|
||||
rw.WriteHeader(http.StatusOK)
|
||||
}
|
||||
|
||||
iterations := 20
|
||||
halfDone := make(chan bool)
|
||||
writeDone := make(chan bool)
|
||||
go func() {
|
||||
for i := 0; i < iterations; i++ {
|
||||
logHandler.ServeHTTP(recorder, req, http.HandlerFunc(next))
|
||||
handler.ServeHTTP(recorder, req)
|
||||
if i == iterations/2 {
|
||||
halfDone <- true
|
||||
}
|
||||
|
@ -178,7 +188,10 @@ func TestLoggerHeaderFields(t *testing.T) {
|
|||
|
||||
logger, err := NewHandler(config)
|
||||
require.NoError(t, err)
|
||||
defer logger.Close()
|
||||
t.Cleanup(func() {
|
||||
err := logger.Close()
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
if config.FilePath != "" {
|
||||
_, err = os.Stat(config.FilePath)
|
||||
|
@ -196,9 +209,14 @@ func TestLoggerHeaderFields(t *testing.T) {
|
|||
req.Header.Add(test.header, s)
|
||||
}
|
||||
|
||||
logger.ServeHTTP(httptest.NewRecorder(), req, http.HandlerFunc(func(writer http.ResponseWriter, r *http.Request) {
|
||||
writer.WriteHeader(http.StatusOK)
|
||||
chain := alice.New()
|
||||
chain = chain.Append(capture.WrapHandler(&capture.Handler{}))
|
||||
chain = chain.Append(WrapHandler(logger))
|
||||
handler, err := chain.Then(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
|
||||
rw.WriteHeader(http.StatusOK)
|
||||
}))
|
||||
require.NoError(t, err)
|
||||
handler.ServeHTTP(httptest.NewRecorder(), req)
|
||||
|
||||
logData, err := os.ReadFile(logFile.Name())
|
||||
require.NoError(t, err)
|
||||
|
@ -224,11 +242,14 @@ func TestLoggerCLF(t *testing.T) {
|
|||
assertValidLogData(t, expectedLog, logData)
|
||||
}
|
||||
|
||||
func TestAsyncLoggerCLF(t *testing.T) {
|
||||
func TestLoggerCLFWithBufferingSize(t *testing.T) {
|
||||
logFilePath := filepath.Join(t.TempDir(), logFileNameSuffix)
|
||||
config := &types.AccessLog{FilePath: logFilePath, Format: CommonFormat, BufferingSize: 1024}
|
||||
doLogging(t, config)
|
||||
|
||||
// wait a bit for the buffer to be written in the file.
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
|
||||
logData, err := os.ReadFile(logFilePath)
|
||||
require.NoError(t, err)
|
||||
|
||||
|
@ -691,9 +712,7 @@ func assertValidLogData(t *testing.T, expected string, logData []byte) {
|
|||
resultExpected, err := ParseAccessLog(expected)
|
||||
require.NoError(t, err)
|
||||
|
||||
formatErrMessage := fmt.Sprintf(`
|
||||
Expected: %s
|
||||
Actual: %s`, expected, string(logData))
|
||||
formatErrMessage := fmt.Sprintf("Expected:\t%q\nActual:\t%q", expected, string(logData))
|
||||
|
||||
require.Equal(t, len(resultExpected), len(result), formatErrMessage)
|
||||
assert.Equal(t, resultExpected[ClientHost], result[ClientHost], formatErrMessage)
|
||||
|
@ -722,7 +741,7 @@ func captureStdout(t *testing.T) (out *os.File, restoreStdout func()) {
|
|||
|
||||
restoreStdout = func() {
|
||||
os.Stdout = original
|
||||
os.RemoveAll(file.Name())
|
||||
_ = os.RemoveAll(file.Name())
|
||||
}
|
||||
|
||||
return file, restoreStdout
|
||||
|
@ -730,10 +749,12 @@ func captureStdout(t *testing.T) (out *os.File, restoreStdout func()) {
|
|||
|
||||
func doLoggingTLSOpt(t *testing.T, config *types.AccessLog, enableTLS bool) {
|
||||
t.Helper()
|
||||
|
||||
logger, err := NewHandler(config)
|
||||
require.NoError(t, err)
|
||||
defer logger.Close()
|
||||
t.Cleanup(func() {
|
||||
err := logger.Close()
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
if config.FilePath != "" {
|
||||
_, err = os.Stat(config.FilePath)
|
||||
|
@ -753,6 +774,7 @@ func doLoggingTLSOpt(t *testing.T, config *types.AccessLog, enableTLS bool) {
|
|||
User: url.UserPassword(testUsername, ""),
|
||||
Path: testPath,
|
||||
},
|
||||
Body: io.NopCloser(bytes.NewReader([]byte("bar"))),
|
||||
}
|
||||
if enableTLS {
|
||||
req.TLS = &tls.ConnectionState{
|
||||
|
@ -761,7 +783,13 @@ func doLoggingTLSOpt(t *testing.T, config *types.AccessLog, enableTLS bool) {
|
|||
}
|
||||
}
|
||||
|
||||
logger.ServeHTTP(httptest.NewRecorder(), req, http.HandlerFunc(logWriterTestHandlerFunc))
|
||||
chain := alice.New()
|
||||
chain = chain.Append(capture.WrapHandler(&capture.Handler{}))
|
||||
chain = chain.Append(WrapHandler(logger))
|
||||
handler, err := chain.Then(http.HandlerFunc(logWriterTestHandlerFunc))
|
||||
require.NoError(t, err)
|
||||
|
||||
handler.ServeHTTP(httptest.NewRecorder(), req)
|
||||
}
|
||||
|
||||
func doLoggingTLS(t *testing.T, config *types.AccessLog) {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue