1
0
Fork 0

Add k8s resource attributes automatically

Co-authored-by: Romain <rtribotte@users.noreply.github.com>
This commit is contained in:
Kevin Pollet 2025-07-21 12:06:04 +02:00 committed by GitHub
parent 7b78128d4e
commit 78cc85283c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 170 additions and 60 deletions

View file

@ -1,6 +1,7 @@
package logs
import (
"context"
"encoding/json"
"fmt"
"reflect"
@ -12,12 +13,12 @@ import (
)
// SetupOTelLogger sets up the OpenTelemetry logger.
func SetupOTelLogger(logger zerolog.Logger, config *types.OTelLog) (zerolog.Logger, error) {
func SetupOTelLogger(ctx context.Context, logger zerolog.Logger, config *types.OTelLog) (zerolog.Logger, error) {
if config == nil {
return logger, nil
}
provider, err := config.NewLoggerProvider()
provider, err := config.NewLoggerProvider(ctx)
if err != nil {
return zerolog.Logger{}, fmt.Errorf("setting up OpenTelemetry logger provider: %w", err)
}

View file

@ -171,7 +171,7 @@ func TestLog(t *testing.T) {
out := zerolog.MultiLevelWriter(zerolog.ConsoleWriter{Out: os.Stderr, TimeFormat: time.RFC3339})
logger := zerolog.New(out).With().Caller().Logger()
logger, err := SetupOTelLogger(logger, config)
logger, err := SetupOTelLogger(t.Context(), logger, config)
require.NoError(t, err)
ctx := trace.ContextWithSpanContext(t.Context(), trace.NewSpanContext(trace.SpanContextConfig{

View file

@ -217,6 +217,7 @@ func newOpenTelemetryMeterProvider(ctx context.Context, config *types.OTLP) (*sd
resource.WithOS(),
resource.WithProcess(),
resource.WithTelemetrySDK(),
resource.WithDetectors(types.K8sAttributesDetector{}),
// The following order allows the user to override the service name and version,
// as well as any other attributes set by the above detectors.
resource.WithAttributes(

View file

@ -85,7 +85,7 @@ func (h *Handler) AliceConstructor() alice.Constructor {
}
// NewHandler creates a new Handler.
func NewHandler(config *types.AccessLog) (*Handler, error) {
func NewHandler(ctx context.Context, config *types.AccessLog) (*Handler, error) {
var file io.WriteCloser = noopCloser{os.Stdout}
if len(config.FilePath) > 0 {
f, err := openAccessLogFile(config.FilePath)
@ -116,7 +116,7 @@ func NewHandler(config *types.AccessLog) (*Handler, error) {
}
if config.OTLP != nil {
otelLoggerProvider, err := config.OTLP.NewLoggerProvider()
otelLoggerProvider, err := config.OTLP.NewLoggerProvider(ctx)
if err != nil {
return nil, fmt.Errorf("setting up OpenTelemetry logger provider: %w", err)
}

View file

@ -85,7 +85,7 @@ func TestOTelAccessLog(t *testing.T) {
},
},
}
logHandler, err := NewHandler(config)
logHandler, err := NewHandler(t.Context(), config)
require.NoError(t, err)
t.Cleanup(func() {
err := logHandler.Close()
@ -138,7 +138,7 @@ func TestLogRotation(t *testing.T) {
rotatedFileName := fileName + ".rotated"
config := &types.AccessLog{FilePath: fileName, Format: CommonFormat}
logHandler, err := NewHandler(config)
logHandler, err := NewHandler(t.Context(), config)
require.NoError(t, err)
t.Cleanup(func() {
err := logHandler.Close()
@ -282,7 +282,7 @@ func TestLoggerHeaderFields(t *testing.T) {
Fields: &test.accessLogFields,
}
logger, err := NewHandler(config)
logger, err := NewHandler(t.Context(), config)
require.NoError(t, err)
t.Cleanup(func() {
err := logger.Close()
@ -979,7 +979,7 @@ func captureStdout(t *testing.T) (out *os.File, restoreStdout func()) {
func doLoggingTLSOpt(t *testing.T, config *types.AccessLog, enableTLS, tracing bool) {
t.Helper()
logger, err := NewHandler(config)
logger, err := NewHandler(t.Context(), config)
require.NoError(t, err)
t.Cleanup(func() {
err := logger.Close()
@ -1076,7 +1076,7 @@ func logWriterTestHandlerFunc(rw http.ResponseWriter, r *http.Request) {
func doLoggingWithAbortedStream(t *testing.T, config *types.AccessLog) {
t.Helper()
logger, err := NewHandler(config)
logger, err := NewHandler(t.Context(), config)
require.NoError(t, err)
t.Cleanup(func() {
err := logger.Close()

View file

@ -25,11 +25,11 @@ import (
// Backend is an abstraction for tracking backend (OpenTelemetry, ...).
type Backend interface {
Setup(serviceName string, sampleRate float64, resourceAttributes map[string]string) (trace.Tracer, io.Closer, error)
Setup(ctx context.Context, serviceName string, sampleRate float64, resourceAttributes map[string]string) (trace.Tracer, io.Closer, error)
}
// NewTracing Creates a Tracing.
func NewTracing(conf *static.Tracing) (*Tracer, io.Closer, error) {
func NewTracing(ctx context.Context, conf *static.Tracing) (*Tracer, io.Closer, error) {
var backend Backend
if conf.OTLP != nil {
@ -44,7 +44,7 @@ func NewTracing(conf *static.Tracing) (*Tracer, io.Closer, error) {
otel.SetTextMapPropagator(autoprop.NewTextMapPropagator())
tr, closer, err := backend.Setup(conf.ServiceName, conf.SampleRate, conf.ResourceAttributes)
tr, closer, err := backend.Setup(ctx, conf.ServiceName, conf.SampleRate, conf.ResourceAttributes)
if err != nil {
return nil, nil, err
}
@ -84,13 +84,6 @@ func InjectContextIntoCarrier(req *http.Request) {
propagator.Inject(req.Context(), propagation.HeaderCarrier(req.Header))
}
// SetStatusErrorf flags the span as in error and log an event.
func SetStatusErrorf(ctx context.Context, format string, args ...interface{}) {
if span := trace.SpanFromContext(ctx); span != nil {
span.SetStatus(codes.Error, fmt.Sprintf(format, args...))
}
}
// Span is trace.Span wrapping the Traefik TracerProvider.
type Span struct {
trace.Span

View file

@ -350,7 +350,7 @@ func TestTracing(t *testing.T) {
},
}
tracer, closer, err := NewTracing(tracingConfig)
tracer, closer, err := NewTracing(t.Context(), tracingConfig)
require.NoError(t, err)
t.Cleanup(func() {
_ = closer.Close()
@ -402,7 +402,7 @@ func TestTracerProvider(t *testing.T) {
otlpConfig.SetDefaults()
config := &static.Tracing{OTLP: otlpConfig}
tracer, closer, err := NewTracing(config)
tracer, closer, err := NewTracing(t.Context(), config)
if err != nil {
t.Fatal(err)
}

70
pkg/types/k8sdetector.go Normal file
View file

@ -0,0 +1,70 @@
package types
import (
"context"
"errors"
"fmt"
"os"
"strings"
"github.com/rs/zerolog/log"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
kerror "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kclientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
// K8sAttributesDetector detects the metadata of the Traefik pod running in a Kubernetes cluster.
// It reads the pod name from the hostname file and the namespace from the service account namespace file and queries the Kubernetes API to get the pod's UID.
type K8sAttributesDetector struct{}
func (K8sAttributesDetector) Detect(ctx context.Context) (*resource.Resource, error) {
attrs := os.Getenv("OTEL_RESOURCE_ATTRIBUTES")
if strings.Contains(attrs, string(semconv.K8SPodNameKey)) || strings.Contains(attrs, string(semconv.K8SPodUIDKey)) {
return resource.Empty(), nil
}
// The InClusterConfig function returns a config for the Kubernetes API server
// when it is running inside a Kubernetes cluster.
config, err := rest.InClusterConfig()
if err != nil && errors.Is(err, rest.ErrNotInCluster) {
return resource.Empty(), nil
}
if err != nil {
return nil, fmt.Errorf("creating in cluster config: %w", err)
}
client, err := kclientset.NewForConfig(config)
if err != nil {
return nil, fmt.Errorf("creating Kubernetes client: %w", err)
}
podName, err := os.Hostname()
if err != nil {
return nil, fmt.Errorf("getting pod name: %w", err)
}
podNamespaceBytes, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
if err != nil {
return nil, fmt.Errorf("getting pod namespace: %w", err)
}
podNamespace := string(podNamespaceBytes)
pod, err := client.CoreV1().Pods(podNamespace).Get(ctx, podName, metav1.GetOptions{})
if err != nil && kerror.IsForbidden(err) {
log.Error().Err(err).Msg("Unable to build K8s resource attributes for Traefik pod")
return resource.Empty(), nil
}
if err != nil {
return nil, fmt.Errorf("getting pod metadata: %w", err)
}
// To avoid version conflicts with other detectors, we use a Schemaless resource.
return resource.NewSchemaless(
semconv.K8SPodUID(string(pod.UID)),
semconv.K8SPodName(pod.Name),
semconv.K8SNamespaceName(podNamespace),
), nil
}

View file

@ -13,7 +13,7 @@ import (
"go.opentelemetry.io/otel/exporters/otlp/otlplog/otlploghttp"
otelsdk "go.opentelemetry.io/otel/sdk/log"
"go.opentelemetry.io/otel/sdk/resource"
semconv "go.opentelemetry.io/otel/semconv/v1.27.0"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/encoding/gzip"
)
@ -164,7 +164,7 @@ func (o *OTelLog) SetDefaults() {
}
// NewLoggerProvider creates a new OpenTelemetry logger provider.
func (o *OTelLog) NewLoggerProvider() (*otelsdk.LoggerProvider, error) {
func (o *OTelLog) NewLoggerProvider(ctx context.Context) (*otelsdk.LoggerProvider, error) {
var (
err error
exporter otelsdk.Exporter
@ -178,23 +178,27 @@ func (o *OTelLog) NewLoggerProvider() (*otelsdk.LoggerProvider, error) {
return nil, fmt.Errorf("setting up exporter: %w", err)
}
attr := []attribute.KeyValue{
semconv.ServiceNameKey.String(o.ServiceName),
semconv.ServiceVersionKey.String(version.Version),
}
var resAttrs []attribute.KeyValue
for k, v := range o.ResourceAttributes {
attr = append(attr, attribute.String(k, v))
resAttrs = append(resAttrs, attribute.String(k, v))
}
res, err := resource.New(context.Background(),
resource.WithAttributes(attr...),
res, err := resource.New(ctx,
resource.WithContainer(),
resource.WithFromEnv(),
resource.WithHost(),
resource.WithOS(),
resource.WithProcess(),
resource.WithTelemetrySDK(),
resource.WithDetectors(K8sAttributesDetector{}),
// The following order allows the user to override the service name and version,
// as well as any other attributes set by the above detectors.
resource.WithAttributes(
semconv.ServiceName(o.ServiceName),
semconv.ServiceVersion(version.Version),
),
resource.WithAttributes(resAttrs...),
// Use the environment variables to allow overriding above resource attributes.
resource.WithFromEnv(),
)
if err != nil {
return nil, fmt.Errorf("building resource: %w", err)

View file

@ -17,7 +17,7 @@ import (
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.27.0"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/encoding/gzip"
@ -52,7 +52,7 @@ func (c *OTelTracing) SetDefaults() {
}
// Setup sets up the tracer.
func (c *OTelTracing) Setup(serviceName string, sampleRate float64, resourceAttributes map[string]string) (trace.Tracer, io.Closer, error) {
func (c *OTelTracing) Setup(ctx context.Context, serviceName string, sampleRate float64, resourceAttributes map[string]string) (trace.Tracer, io.Closer, error) {
var (
err error
exporter *otlptrace.Exporter
@ -66,23 +66,27 @@ func (c *OTelTracing) Setup(serviceName string, sampleRate float64, resourceAttr
return nil, nil, fmt.Errorf("setting up exporter: %w", err)
}
attr := []attribute.KeyValue{
semconv.ServiceNameKey.String(serviceName),
semconv.ServiceVersionKey.String(version.Version),
}
var resAttrs []attribute.KeyValue
for k, v := range resourceAttributes {
attr = append(attr, attribute.String(k, v))
resAttrs = append(resAttrs, attribute.String(k, v))
}
res, err := resource.New(context.Background(),
resource.WithAttributes(attr...),
res, err := resource.New(ctx,
resource.WithContainer(),
resource.WithFromEnv(),
resource.WithHost(),
resource.WithOS(),
resource.WithProcess(),
resource.WithTelemetrySDK(),
resource.WithDetectors(K8sAttributesDetector{}),
// The following order allows the user to override the service name and version,
// as well as any other attributes set by the above detectors.
resource.WithAttributes(
semconv.ServiceName(serviceName),
semconv.ServiceVersion(version.Version),
),
resource.WithAttributes(resAttrs...),
// Use the environment variables to allow overriding above resource attributes.
resource.WithFromEnv(),
)
if err != nil {
return nil, nil, fmt.Errorf("building resource: %w", err)