1
0
Fork 0

OpenTelemetry Logs and Access Logs

Co-authored-by: Kevin Pollet <pollet.kevin@gmail.com>
This commit is contained in:
Romain 2024-12-06 14:50:04 +01:00 committed by GitHub
parent 33c1d700c0
commit 826a2b74aa
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
33 changed files with 2297 additions and 475 deletions

View file

@ -23,6 +23,7 @@ import (
"github.com/traefik/traefik/v3/pkg/middlewares/capture"
traefiktls "github.com/traefik/traefik/v3/pkg/tls"
"github.com/traefik/traefik/v3/pkg/types"
"go.opentelemetry.io/contrib/bridges/otellogrus"
)
type key string
@ -52,6 +53,7 @@ func (n noopCloser) Close() error {
}
type handlerParams struct {
ctx context.Context
logDataTable *LogData
}
@ -106,6 +108,16 @@ func NewHandler(config *types.AccessLog) (*Handler, error) {
Level: logrus.InfoLevel,
}
if config.OTLP != nil {
otelLoggerProvider, err := config.OTLP.NewLoggerProvider()
if err != nil {
return nil, fmt.Errorf("setting up OpenTelemetry logger provider: %w", err)
}
logger.Hooks.Add(otellogrus.NewHook("traefik", otellogrus.WithLoggerProvider(otelLoggerProvider)))
logger.Out = io.Discard
}
// Transform header names to a canonical form, to be used as is without further transformations,
// and transform field names to lower case, to enable case-insensitive lookup.
if config.Fields != nil {
@ -150,7 +162,7 @@ func NewHandler(config *types.AccessLog) (*Handler, error) {
go func() {
defer logHandler.wg.Done()
for handlerParams := range logHandler.logHandlerChan {
logHandler.logTheRoundTrip(handlerParams.logDataTable)
logHandler.logTheRoundTrip(handlerParams.ctx, handlerParams.logDataTable)
}
}()
}
@ -256,12 +268,13 @@ func (h *Handler) ServeHTTP(rw http.ResponseWriter, req *http.Request, next http
if h.config.BufferingSize > 0 {
h.logHandlerChan <- handlerParams{
ctx: req.Context(),
logDataTable: logDataTable,
}
return
}
h.logTheRoundTrip(logDataTable)
h.logTheRoundTrip(req.Context(), logDataTable)
}()
next.ServeHTTP(rw, reqWithDataTable)
@ -313,7 +326,7 @@ func usernameIfPresent(theURL *url.URL) string {
}
// Logging handler to log frontend name, backend name, and elapsed time.
func (h *Handler) logTheRoundTrip(logDataTable *LogData) {
func (h *Handler) logTheRoundTrip(ctx context.Context, logDataTable *LogData) {
core := logDataTable.Core
retryAttempts, ok := core[RetryAttempts].(int)
@ -359,7 +372,7 @@ func (h *Handler) logTheRoundTrip(logDataTable *LogData) {
h.mu.Lock()
defer h.mu.Unlock()
h.logger.WithFields(fields).Println()
h.logger.WithContext(ctx).WithFields(fields).Println()
}
}

View file

@ -2,6 +2,7 @@ package accesslog
import (
"bytes"
"compress/gzip"
"context"
"crypto/tls"
"crypto/x509"
@ -25,6 +26,8 @@ import (
ptypes "github.com/traefik/paerser/types"
"github.com/traefik/traefik/v3/pkg/middlewares/capture"
"github.com/traefik/traefik/v3/pkg/types"
"go.opentelemetry.io/collector/pdata/plog/plogotlp"
"go.opentelemetry.io/otel/trace"
)
const delta float64 = 1e-10
@ -49,6 +52,75 @@ var (
testStart = time.Now()
)
func TestOTelAccessLog(t *testing.T) {
logCh := make(chan string)
collector := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
gzr, err := gzip.NewReader(r.Body)
require.NoError(t, err)
body, err := io.ReadAll(gzr)
require.NoError(t, err)
req := plogotlp.NewExportRequest()
err = req.UnmarshalProto(body)
require.NoError(t, err)
marshalledReq, err := json.Marshal(req)
require.NoError(t, err)
logCh <- string(marshalledReq)
}))
t.Cleanup(collector.Close)
config := &types.AccessLog{
OTLP: &types.OTelLog{
ServiceName: "test",
ResourceAttributes: map[string]string{"resource": "attribute"},
HTTP: &types.OTelHTTP{
Endpoint: collector.URL,
},
},
}
logHandler, err := NewHandler(config)
require.NoError(t, err)
t.Cleanup(func() {
err := logHandler.Close()
require.NoError(t, err)
})
req := &http.Request{
Header: map[string][]string{},
URL: &url.URL{
Path: testPath,
},
}
ctx := trace.ContextWithSpanContext(context.Background(), trace.NewSpanContext(trace.SpanContextConfig{
TraceID: trace.TraceID{0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8},
SpanID: trace.SpanID{0x1, 0x2, 0x3, 0x4, 0x5, 0x6, 0x7, 0x8},
}))
req = req.WithContext(ctx)
chain := alice.New()
chain = chain.Append(capture.Wrap)
chain = chain.Append(WrapHandler(logHandler))
handler, err := chain.Then(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
rw.WriteHeader(http.StatusOK)
}))
require.NoError(t, err)
handler.ServeHTTP(httptest.NewRecorder(), req)
select {
case <-time.After(5 * time.Second):
t.Error("AccessLog not exported")
case log := <-logCh:
assert.Regexp(t, `{"key":"resource","value":{"stringValue":"attribute"}}`, log)
assert.Regexp(t, `{"key":"service.name","value":{"stringValue":"test"}}`, log)
assert.Regexp(t, `{"key":"DownstreamStatus","value":{"intValue":"200"}}`, log)
assert.Regexp(t, `"traceId":"01020304050607080000000000000000","spanId":"0102030405060708"`, log)
}
}
func TestLogRotation(t *testing.T) {
fileName := filepath.Join(t.TempDir(), "traefik.log")
rotatedFileName := fileName + ".rotated"

View file

@ -9,6 +9,7 @@ import (
"time"
"github.com/containous/alice"
"github.com/rs/zerolog/log"
"github.com/traefik/traefik/v3/pkg/metrics"
"github.com/traefik/traefik/v3/pkg/middlewares"
"github.com/traefik/traefik/v3/pkg/middlewares/accesslog"
@ -64,7 +65,11 @@ func (e *entryPointTracing) ServeHTTP(rw http.ResponseWriter, req *http.Request)
start := time.Now()
tracingCtx, span := e.tracer.Start(tracingCtx, "EntryPoint", trace.WithSpanKind(trace.SpanKindServer), trace.WithTimestamp(start))
req = req.WithContext(tracingCtx)
// Associate the request context with the logger.
logger := log.Ctx(tracingCtx).With().Ctx(tracingCtx).Logger()
loggerCtx := logger.WithContext(tracingCtx)
req = req.WithContext(loggerCtx)
span.SetAttributes(attribute.String("entry_point", e.entryPoint))