Extract metrics to own package and refactor implementations
This commit is contained in:
parent
c1b5b740ff
commit
e6c2040ea8
19 changed files with 599 additions and 797 deletions
|
@ -1,95 +0,0 @@
|
|||
package middlewares
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/containous/traefik/log"
|
||||
"github.com/containous/traefik/safe"
|
||||
"github.com/containous/traefik/types"
|
||||
kitlog "github.com/go-kit/kit/log"
|
||||
"github.com/go-kit/kit/metrics"
|
||||
"github.com/go-kit/kit/metrics/dogstatsd"
|
||||
)
|
||||
|
||||
var _ Metrics = (*Datadog)(nil)
|
||||
var _ RetryMetrics = (*Datadog)(nil)
|
||||
|
||||
var datadogClient = dogstatsd.New("traefik.", kitlog.LoggerFunc(func(keyvals ...interface{}) error {
|
||||
log.Info(keyvals)
|
||||
return nil
|
||||
}))
|
||||
|
||||
var datadogTicker *time.Ticker
|
||||
|
||||
// Metric names consistent with https://github.com/DataDog/integrations-extras/pull/64
|
||||
const (
|
||||
ddMetricsReqsName = "requests.total"
|
||||
ddMetricsLatencyName = "request.duration"
|
||||
ddRetriesTotalName = "backend.retries.total"
|
||||
)
|
||||
|
||||
// Datadog is an Implementation for Metrics that exposes datadog metrics for the latency
|
||||
// and the number of requests partitioned by status code and method.
|
||||
// - number of requests partitioned by status code and method
|
||||
// - request durations
|
||||
// - amount of retries happened
|
||||
type Datadog struct {
|
||||
reqsCounter metrics.Counter
|
||||
reqDurationHistogram metrics.Histogram
|
||||
retryCounter metrics.Counter
|
||||
}
|
||||
|
||||
func (dd *Datadog) getReqsCounter() metrics.Counter {
|
||||
return dd.reqsCounter
|
||||
}
|
||||
|
||||
func (dd *Datadog) getReqDurationHistogram() metrics.Histogram {
|
||||
return dd.reqDurationHistogram
|
||||
}
|
||||
|
||||
func (dd *Datadog) getRetryCounter() metrics.Counter {
|
||||
return dd.retryCounter
|
||||
}
|
||||
|
||||
// NewDataDog creates new instance of Datadog
|
||||
func NewDataDog(name string) *Datadog {
|
||||
var m Datadog
|
||||
|
||||
m.reqsCounter = datadogClient.NewCounter(ddMetricsReqsName, 1.0).With("service", name)
|
||||
m.reqDurationHistogram = datadogClient.NewHistogram(ddMetricsLatencyName, 1.0).With("service", name)
|
||||
m.retryCounter = datadogClient.NewCounter(ddRetriesTotalName, 1.0).With("service", name)
|
||||
|
||||
return &m
|
||||
}
|
||||
|
||||
// InitDatadogClient initializes metrics pusher and creates a datadogClient if not created already
|
||||
func InitDatadogClient(config *types.Datadog) *time.Ticker {
|
||||
if datadogTicker == nil {
|
||||
address := config.Address
|
||||
if len(address) == 0 {
|
||||
address = "localhost:8125"
|
||||
}
|
||||
pushInterval, err := time.ParseDuration(config.PushInterval)
|
||||
if err != nil {
|
||||
log.Warnf("Unable to parse %s into pushInterval, using 10s as default value", config.PushInterval)
|
||||
pushInterval = 10 * time.Second
|
||||
}
|
||||
|
||||
report := time.NewTicker(pushInterval)
|
||||
|
||||
safe.Go(func() {
|
||||
datadogClient.SendLoop(report.C, "udp", address)
|
||||
})
|
||||
|
||||
datadogTicker = report
|
||||
}
|
||||
return datadogTicker
|
||||
}
|
||||
|
||||
// StopDatadogClient stops internal datadogTicker which controls the pushing of metrics to DD Agent and resets it to `nil`
|
||||
func StopDatadogClient() {
|
||||
if datadogTicker != nil {
|
||||
datadogTicker.Stop()
|
||||
}
|
||||
datadogTicker = nil
|
||||
}
|
|
@ -1,59 +0,0 @@
|
|||
package middlewares
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/containous/traefik/testhelpers"
|
||||
"github.com/containous/traefik/types"
|
||||
"github.com/stvp/go-udp-testing"
|
||||
"github.com/urfave/negroni"
|
||||
)
|
||||
|
||||
func TestDatadog(t *testing.T) {
|
||||
udp.SetAddr(":18125")
|
||||
// This is needed to make sure that UDP Listener listens for data a bit longer, otherwise it will quit after a millisecond
|
||||
udp.Timeout = 5 * time.Second
|
||||
recorder := httptest.NewRecorder()
|
||||
InitDatadogClient(&types.Datadog{":18125", "1s"})
|
||||
|
||||
n := negroni.New()
|
||||
dd := NewDataDog("test")
|
||||
defer StopDatadogClient()
|
||||
metricsMiddlewareBackend := NewMetricsWrapper(dd)
|
||||
|
||||
n.Use(metricsMiddlewareBackend)
|
||||
r := http.NewServeMux()
|
||||
r.HandleFunc(`/ok`, func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
fmt.Fprintln(w, "ok")
|
||||
})
|
||||
r.HandleFunc(`/not-found`, func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
fmt.Fprintln(w, "not-found")
|
||||
})
|
||||
n.UseHandler(r)
|
||||
|
||||
req1 := testhelpers.MustNewRequest(http.MethodGet, "http://localhost:3000/ok", nil)
|
||||
req2 := testhelpers.MustNewRequest(http.MethodGet, "http://localhost:3000/not-found", nil)
|
||||
|
||||
retryListener := NewMetricsRetryListener(dd)
|
||||
retryListener.Retried(1)
|
||||
retryListener.Retried(2)
|
||||
|
||||
expected := []string{
|
||||
// We are only validating counts, as it is nearly impossible to validate latency, since it varies every run
|
||||
"traefik.requests.total:1.000000|c|#service:test,code:404,method:GET\n",
|
||||
"traefik.requests.total:1.000000|c|#service:test,code:200,method:GET\n",
|
||||
"traefik.backend.retries.total:2.000000|c|#service:test\n",
|
||||
"traefik.request.duration",
|
||||
}
|
||||
|
||||
udp.ShouldReceiveAll(t, expected, func() {
|
||||
n.ServeHTTP(recorder, req1)
|
||||
n.ServeHTTP(recorder, req2)
|
||||
})
|
||||
}
|
|
@ -5,77 +5,23 @@ import (
|
|||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/go-kit/kit/metrics"
|
||||
"github.com/go-kit/kit/metrics/multi"
|
||||
"github.com/containous/traefik/metrics"
|
||||
gokitmetrics "github.com/go-kit/kit/metrics"
|
||||
)
|
||||
|
||||
// Metrics is an Interface that must be satisfied by any system that
|
||||
// wants to expose and monitor Metrics.
|
||||
type Metrics interface {
|
||||
getReqsCounter() metrics.Counter
|
||||
getReqDurationHistogram() metrics.Histogram
|
||||
RetryMetrics
|
||||
}
|
||||
|
||||
// RetryMetrics must be satisfied by any system that wants to collect and
|
||||
// expose retry specific Metrics.
|
||||
type RetryMetrics interface {
|
||||
getRetryCounter() metrics.Counter
|
||||
}
|
||||
|
||||
// MultiMetrics is a struct that provides a wrapper container for multiple Metrics, if they are configured
|
||||
type MultiMetrics struct {
|
||||
wrappedMetrics *[]Metrics
|
||||
reqsCounter metrics.Counter
|
||||
reqDurationHistogram metrics.Histogram
|
||||
retryCounter metrics.Counter
|
||||
}
|
||||
|
||||
// NewMultiMetrics creates a new instance of MultiMetrics
|
||||
func NewMultiMetrics(manyMetrics []Metrics) *MultiMetrics {
|
||||
counters := []metrics.Counter{}
|
||||
histograms := []metrics.Histogram{}
|
||||
retryCounters := []metrics.Counter{}
|
||||
|
||||
for _, m := range manyMetrics {
|
||||
counters = append(counters, m.getReqsCounter())
|
||||
histograms = append(histograms, m.getReqDurationHistogram())
|
||||
retryCounters = append(retryCounters, m.getRetryCounter())
|
||||
}
|
||||
|
||||
var mm MultiMetrics
|
||||
|
||||
mm.wrappedMetrics = &manyMetrics
|
||||
mm.reqsCounter = multi.NewCounter(counters...)
|
||||
mm.reqDurationHistogram = multi.NewHistogram(histograms...)
|
||||
mm.retryCounter = multi.NewCounter(retryCounters...)
|
||||
|
||||
return &mm
|
||||
}
|
||||
|
||||
func (mm *MultiMetrics) getReqsCounter() metrics.Counter {
|
||||
return mm.reqsCounter
|
||||
}
|
||||
|
||||
func (mm *MultiMetrics) getReqDurationHistogram() metrics.Histogram {
|
||||
return mm.reqDurationHistogram
|
||||
}
|
||||
|
||||
func (mm *MultiMetrics) getRetryCounter() metrics.Counter {
|
||||
return mm.retryCounter
|
||||
}
|
||||
|
||||
// MetricsWrapper is a Negroni compatible Handler which relies on a
|
||||
// given Metrics implementation to expose and monitor Traefik Metrics.
|
||||
type MetricsWrapper struct {
|
||||
Impl Metrics
|
||||
registry metrics.Registry
|
||||
serviceName string
|
||||
}
|
||||
|
||||
// NewMetricsWrapper return a MetricsWrapper struct with
|
||||
// a given Metrics implementation e.g Prometheuss
|
||||
func NewMetricsWrapper(impl Metrics) *MetricsWrapper {
|
||||
func NewMetricsWrapper(registry metrics.Registry, service string) *MetricsWrapper {
|
||||
var metricsWrapper = MetricsWrapper{
|
||||
Impl: impl,
|
||||
registry: registry,
|
||||
serviceName: service,
|
||||
}
|
||||
|
||||
return &metricsWrapper
|
||||
|
@ -86,27 +32,30 @@ func (m *MetricsWrapper) ServeHTTP(rw http.ResponseWriter, r *http.Request, next
|
|||
prw := &responseRecorder{rw, http.StatusOK}
|
||||
next(prw, r)
|
||||
|
||||
reqLabels := []string{"code", strconv.Itoa(prw.statusCode), "method", r.Method}
|
||||
m.Impl.getReqsCounter().With(reqLabels...).Add(1)
|
||||
reqLabels := []string{"service", m.serviceName, "code", strconv.Itoa(prw.statusCode), "method", r.Method}
|
||||
m.registry.ReqsCounter().With(reqLabels...).Add(1)
|
||||
|
||||
reqDurationLabels := []string{"code", strconv.Itoa(prw.statusCode)}
|
||||
m.Impl.getReqDurationHistogram().With(reqDurationLabels...).Observe(float64(time.Since(start).Seconds()))
|
||||
reqDurationLabels := []string{"service", m.serviceName, "code", strconv.Itoa(prw.statusCode)}
|
||||
m.registry.ReqDurationHistogram().With(reqDurationLabels...).Observe(float64(time.Since(start).Seconds()))
|
||||
}
|
||||
|
||||
type retryMetrics interface {
|
||||
RetriesCounter() gokitmetrics.Counter
|
||||
}
|
||||
|
||||
// NewMetricsRetryListener instantiates a MetricsRetryListener with the given retryMetrics.
|
||||
func NewMetricsRetryListener(retryMetrics retryMetrics, backendName string) RetryListener {
|
||||
return &MetricsRetryListener{retryMetrics: retryMetrics, backendName: backendName}
|
||||
}
|
||||
|
||||
// MetricsRetryListener is an implementation of the RetryListener interface to
|
||||
// record Metrics about retry attempts.
|
||||
// record RequestMetrics about retry attempts.
|
||||
type MetricsRetryListener struct {
|
||||
retryMetrics RetryMetrics
|
||||
retryMetrics retryMetrics
|
||||
backendName string
|
||||
}
|
||||
|
||||
// Retried tracks the retry in the Metrics implementation.
|
||||
// Retried tracks the retry in the RequestMetrics implementation.
|
||||
func (m *MetricsRetryListener) Retried(attempt int) {
|
||||
if m.retryMetrics != nil {
|
||||
m.retryMetrics.getRetryCounter().Add(1)
|
||||
}
|
||||
}
|
||||
|
||||
// NewMetricsRetryListener instantiates a MetricsRetryListener with the given RetryMetrics.
|
||||
func NewMetricsRetryListener(retryMetrics RetryMetrics) RetryListener {
|
||||
return &MetricsRetryListener{retryMetrics: retryMetrics}
|
||||
m.retryMetrics.RetriesCounter().With("backend", m.backendName).Add(1)
|
||||
}
|
||||
|
|
|
@ -1,18 +1,15 @@
|
|||
package middlewares
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/go-kit/kit/metrics"
|
||||
)
|
||||
|
||||
func TestMetricsRetryListener(t *testing.T) {
|
||||
// nil implementation, nothing should fail
|
||||
retryListener := NewMetricsRetryListener(nil)
|
||||
retryListener.Retried(1)
|
||||
|
||||
retryMetrics := newCollectingMetrics()
|
||||
retryListener = NewMetricsRetryListener(retryMetrics)
|
||||
retryMetrics := newCollectingRetryMetrics()
|
||||
retryListener := NewMetricsRetryListener(retryMetrics, "backendName")
|
||||
retryListener.Retried(1)
|
||||
retryListener.Retried(2)
|
||||
|
||||
|
@ -20,27 +17,34 @@ func TestMetricsRetryListener(t *testing.T) {
|
|||
if retryMetrics.retryCounter.counterValue != wantCounterValue {
|
||||
t.Errorf("got counter value of %d, want %d", retryMetrics.retryCounter.counterValue, wantCounterValue)
|
||||
}
|
||||
|
||||
wantLabelValues := []string{"backend", "backendName"}
|
||||
if !reflect.DeepEqual(retryMetrics.retryCounter.lastLabelValues, wantLabelValues) {
|
||||
t.Errorf("wrong label values %v used, want %v", retryMetrics.retryCounter.lastLabelValues, wantLabelValues)
|
||||
}
|
||||
}
|
||||
|
||||
// collectingRetryMetrics is an implementation of the RetryMetrics interface that can be used inside tests to collect the times Add() was called.
|
||||
// collectingRetryMetrics is an implementation of the retryMetrics interface that can be used inside tests to collect the times Add() was called.
|
||||
type collectingRetryMetrics struct {
|
||||
retryCounter *collectingCounter
|
||||
}
|
||||
|
||||
func newCollectingMetrics() collectingRetryMetrics {
|
||||
func newCollectingRetryMetrics() collectingRetryMetrics {
|
||||
return collectingRetryMetrics{retryCounter: &collectingCounter{}}
|
||||
}
|
||||
|
||||
func (metrics collectingRetryMetrics) getRetryCounter() metrics.Counter {
|
||||
func (metrics collectingRetryMetrics) RetriesCounter() metrics.Counter {
|
||||
return metrics.retryCounter
|
||||
}
|
||||
|
||||
type collectingCounter struct {
|
||||
counterValue float64
|
||||
counterValue float64
|
||||
lastLabelValues []string
|
||||
}
|
||||
|
||||
func (c *collectingCounter) With(labelValues ...string) metrics.Counter {
|
||||
panic("collectingCounter.With not implemented!")
|
||||
c.lastLabelValues = labelValues
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *collectingCounter) Add(delta float64) {
|
||||
|
|
|
@ -1,129 +0,0 @@
|
|||
package middlewares
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/containous/traefik/types"
|
||||
"github.com/go-kit/kit/metrics"
|
||||
"github.com/go-kit/kit/metrics/prometheus"
|
||||
stdprometheus "github.com/prometheus/client_golang/prometheus"
|
||||
)
|
||||
|
||||
const (
|
||||
reqsTotalName = "traefik_requests_total"
|
||||
reqDurationName = "traefik_request_duration_seconds"
|
||||
retriesTotalName = "traefik_backend_retries_total"
|
||||
)
|
||||
|
||||
// Prometheus is an Implementation for Metrics that exposes the following Prometheus metrics:
|
||||
// - number of requests partitioned by status code and method
|
||||
// - request durations partitioned by status code
|
||||
// - amount of retries happened
|
||||
type Prometheus struct {
|
||||
reqsCounter metrics.Counter
|
||||
reqDurationHistogram metrics.Histogram
|
||||
retryCounter metrics.Counter
|
||||
}
|
||||
|
||||
func (p *Prometheus) getReqsCounter() metrics.Counter {
|
||||
return p.reqsCounter
|
||||
}
|
||||
|
||||
func (p *Prometheus) getReqDurationHistogram() metrics.Histogram {
|
||||
return p.reqDurationHistogram
|
||||
}
|
||||
|
||||
func (p *Prometheus) getRetryCounter() metrics.Counter {
|
||||
return p.retryCounter
|
||||
}
|
||||
|
||||
// NewPrometheus returns a new Prometheus Metrics implementation.
|
||||
// With the returned collectors you have the possibility to clean up the internal Prometheus state by unsubscribing the collectors.
|
||||
// This is for example useful while testing the Prometheus implementation.
|
||||
// If any of the Prometheus Metrics can not be registered an error will be returned and the returned Metrics implementation will be nil.
|
||||
func NewPrometheus(name string, config *types.Prometheus) (*Prometheus, []stdprometheus.Collector, error) {
|
||||
var prom Prometheus
|
||||
var collectors []stdprometheus.Collector
|
||||
|
||||
cv := stdprometheus.NewCounterVec(
|
||||
stdprometheus.CounterOpts{
|
||||
Name: reqsTotalName,
|
||||
Help: "How many HTTP requests processed, partitioned by status code and method.",
|
||||
ConstLabels: stdprometheus.Labels{"service": name},
|
||||
},
|
||||
[]string{"code", "method"},
|
||||
)
|
||||
cv, err := registerCounterVec(cv)
|
||||
if err != nil {
|
||||
return nil, collectors, err
|
||||
}
|
||||
prom.reqsCounter = prometheus.NewCounter(cv)
|
||||
collectors = append(collectors, cv)
|
||||
|
||||
var buckets []float64
|
||||
if config.Buckets != nil {
|
||||
buckets = config.Buckets
|
||||
} else {
|
||||
buckets = []float64{0.1, 0.3, 1.2, 5}
|
||||
}
|
||||
hv := stdprometheus.NewHistogramVec(
|
||||
stdprometheus.HistogramOpts{
|
||||
Name: reqDurationName,
|
||||
Help: "How long it took to process the request.",
|
||||
ConstLabels: stdprometheus.Labels{"service": name},
|
||||
Buckets: buckets,
|
||||
},
|
||||
[]string{"code"},
|
||||
)
|
||||
hv, err = registerHistogramVec(hv)
|
||||
if err != nil {
|
||||
return nil, collectors, err
|
||||
}
|
||||
prom.reqDurationHistogram = prometheus.NewHistogram(hv)
|
||||
collectors = append(collectors, hv)
|
||||
|
||||
cv = stdprometheus.NewCounterVec(
|
||||
stdprometheus.CounterOpts{
|
||||
Name: retriesTotalName,
|
||||
Help: "How many request retries happened in total.",
|
||||
ConstLabels: stdprometheus.Labels{"service": name},
|
||||
},
|
||||
[]string{},
|
||||
)
|
||||
cv, err = registerCounterVec(cv)
|
||||
if err != nil {
|
||||
return nil, collectors, err
|
||||
}
|
||||
prom.retryCounter = prometheus.NewCounter(cv)
|
||||
collectors = append(collectors, cv)
|
||||
|
||||
return &prom, collectors, nil
|
||||
}
|
||||
|
||||
func registerCounterVec(cv *stdprometheus.CounterVec) (*stdprometheus.CounterVec, error) {
|
||||
err := stdprometheus.Register(cv)
|
||||
|
||||
if err != nil {
|
||||
e, ok := err.(stdprometheus.AlreadyRegisteredError)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("error registering CounterVec: %s", e)
|
||||
}
|
||||
cv = e.ExistingCollector.(*stdprometheus.CounterVec)
|
||||
}
|
||||
|
||||
return cv, nil
|
||||
}
|
||||
|
||||
func registerHistogramVec(hv *stdprometheus.HistogramVec) (*stdprometheus.HistogramVec, error) {
|
||||
err := stdprometheus.Register(hv)
|
||||
|
||||
if err != nil {
|
||||
e, ok := err.(stdprometheus.AlreadyRegisteredError)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("error registering HistogramVec: %s", e)
|
||||
}
|
||||
hv = e.ExistingCollector.(*stdprometheus.HistogramVec)
|
||||
}
|
||||
|
||||
return hv, nil
|
||||
}
|
|
@ -1,189 +0,0 @@
|
|||
package middlewares
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/containous/traefik/testhelpers"
|
||||
"github.com/containous/traefik/types"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
dto "github.com/prometheus/client_model/go"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/urfave/negroni"
|
||||
)
|
||||
|
||||
func TestPrometheus(t *testing.T) {
|
||||
defer resetPrometheusValues()
|
||||
|
||||
metricsFamilies, err := prometheus.DefaultGatherer.Gather()
|
||||
if err != nil {
|
||||
t.Fatalf("could not gather metrics family: %s", err)
|
||||
}
|
||||
initialMetricsFamilyCount := len(metricsFamilies)
|
||||
|
||||
recorder := httptest.NewRecorder()
|
||||
|
||||
req1 := testhelpers.MustNewRequest(http.MethodGet, "http://localhost:3000/ok", ioutil.NopCloser(nil))
|
||||
req2 := testhelpers.MustNewRequest(http.MethodGet, "http://localhost:3000/metrics", ioutil.NopCloser(nil))
|
||||
|
||||
httpHandler := setupTestHTTPHandler()
|
||||
httpHandler.ServeHTTP(recorder, req1)
|
||||
httpHandler.ServeHTTP(recorder, req2)
|
||||
|
||||
body := recorder.Body.String()
|
||||
if !strings.Contains(body, reqsTotalName) {
|
||||
t.Errorf("body does not contain request total entry '%s'", reqsTotalName)
|
||||
}
|
||||
if !strings.Contains(body, reqDurationName) {
|
||||
t.Errorf("body does not contain request duration entry '%s'", reqDurationName)
|
||||
}
|
||||
if !strings.Contains(body, retriesTotalName) {
|
||||
t.Errorf("body does not contain total retries entry '%s'", retriesTotalName)
|
||||
}
|
||||
|
||||
metricsFamilies, err = prometheus.DefaultGatherer.Gather()
|
||||
if err != nil {
|
||||
t.Fatalf("could not gather metrics families: %s", err)
|
||||
}
|
||||
|
||||
tests := []struct {
|
||||
name string
|
||||
labels map[string]string
|
||||
assert func(*dto.MetricFamily)
|
||||
}{
|
||||
{
|
||||
name: reqsTotalName,
|
||||
labels: map[string]string{
|
||||
"code": "200",
|
||||
"method": http.MethodGet,
|
||||
"service": "test",
|
||||
},
|
||||
assert: func(family *dto.MetricFamily) {
|
||||
cv := uint(family.Metric[0].Counter.GetValue())
|
||||
expectedCv := uint(2)
|
||||
if cv != expectedCv {
|
||||
t.Errorf("gathered metrics do not contain correct value for total requests, got %d expected %d", cv, expectedCv)
|
||||
}
|
||||
},
|
||||
},
|
||||
{
|
||||
name: reqDurationName,
|
||||
labels: map[string]string{
|
||||
"service": "test",
|
||||
"code": "200",
|
||||
},
|
||||
assert: func(family *dto.MetricFamily) {
|
||||
sc := family.Metric[0].Histogram.GetSampleCount()
|
||||
expectedSc := uint64(2)
|
||||
if sc != expectedSc {
|
||||
t.Errorf("gathered metrics do not contain correct sample count for request duration, got %d expected %d", sc, expectedSc)
|
||||
}
|
||||
},
|
||||
},
|
||||
{
|
||||
name: retriesTotalName,
|
||||
labels: map[string]string{
|
||||
"service": "test",
|
||||
},
|
||||
assert: func(family *dto.MetricFamily) {
|
||||
cv := uint(family.Metric[0].Counter.GetValue())
|
||||
expectedCv := uint(1)
|
||||
if cv != expectedCv {
|
||||
t.Errorf("gathered metrics do not contain correct value for total retries, got '%d' expected '%d'", cv, expectedCv)
|
||||
}
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
assert.Equal(t, len(tests), len(metricsFamilies)-initialMetricsFamilyCount, "gathered Traefik metrics count does not match tests count")
|
||||
|
||||
for _, test := range tests {
|
||||
family := findMetricFamily(test.name, metricsFamilies)
|
||||
if family == nil {
|
||||
t.Errorf("gathered metrics do not contain '%s'", test.name)
|
||||
continue
|
||||
}
|
||||
for _, label := range family.Metric[0].Label {
|
||||
val, ok := test.labels[*label.Name]
|
||||
if !ok {
|
||||
t.Errorf("'%s' metric contains unexpected label '%s'", test.name, label)
|
||||
} else if val != *label.Value {
|
||||
t.Errorf("label '%s' in metric '%s' has wrong value '%s'", label, test.name, *label.Value)
|
||||
}
|
||||
}
|
||||
test.assert(family)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPrometheusRegisterMetricsMultipleTimes(t *testing.T) {
|
||||
defer resetPrometheusValues()
|
||||
|
||||
recorder := httptest.NewRecorder()
|
||||
req1 := testhelpers.MustNewRequest(http.MethodGet, "http://localhost:3000/ok", ioutil.NopCloser(nil))
|
||||
|
||||
httpHandler := setupTestHTTPHandler()
|
||||
httpHandler.ServeHTTP(recorder, req1)
|
||||
|
||||
httpHandler = setupTestHTTPHandler()
|
||||
httpHandler.ServeHTTP(recorder, req1)
|
||||
|
||||
metricsFamilies, err := prometheus.DefaultGatherer.Gather()
|
||||
if err != nil {
|
||||
t.Fatalf("could not gather metrics families: %s", err)
|
||||
}
|
||||
|
||||
reqsTotalFamily := findMetricFamily(reqsTotalName, metricsFamilies)
|
||||
if reqsTotalFamily == nil {
|
||||
t.Fatalf("gathered metrics do not contain '%s'", reqsTotalName)
|
||||
}
|
||||
|
||||
cv := uint(reqsTotalFamily.Metric[0].Counter.GetValue())
|
||||
expectedCv := uint(2)
|
||||
if cv != expectedCv {
|
||||
t.Errorf("wrong counter value when registering metrics multiple times, got '%d' expected '%d'", cv, expectedCv)
|
||||
}
|
||||
}
|
||||
|
||||
func setupTestHTTPHandler() http.Handler {
|
||||
serveMux := http.NewServeMux()
|
||||
serveMux.Handle("/metrics", promhttp.Handler())
|
||||
serveMux.Handle("/ok", &networkFailingHTTPHandler{failAtCalls: []int{1}, netErrorRecorder: &DefaultNetErrorRecorder{}})
|
||||
|
||||
metrics, _ := newPrometheusMetrics()
|
||||
|
||||
n := negroni.New()
|
||||
n.Use(NewMetricsWrapper(metrics))
|
||||
n.UseHandler(NewRetry(2, serveMux, NewMetricsRetryListener(metrics)))
|
||||
|
||||
return n
|
||||
}
|
||||
|
||||
func resetPrometheusValues() {
|
||||
_, collectors := newPrometheusMetrics()
|
||||
|
||||
for _, collector := range collectors {
|
||||
prometheus.Unregister(collector)
|
||||
}
|
||||
}
|
||||
|
||||
func newPrometheusMetrics() (*Prometheus, []prometheus.Collector) {
|
||||
prom, collectors, err := NewPrometheus("test", &types.Prometheus{})
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("Error creating Prometheus Metrics: %s", err))
|
||||
}
|
||||
return prom, collectors
|
||||
}
|
||||
|
||||
func findMetricFamily(name string, families []*dto.MetricFamily) *dto.MetricFamily {
|
||||
for _, family := range families {
|
||||
if family.GetName() == name {
|
||||
return family
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -1,87 +0,0 @@
|
|||
package middlewares
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/containous/traefik/log"
|
||||
"github.com/containous/traefik/safe"
|
||||
"github.com/containous/traefik/types"
|
||||
kitlog "github.com/go-kit/kit/log"
|
||||
"github.com/go-kit/kit/metrics"
|
||||
"github.com/go-kit/kit/metrics/statsd"
|
||||
)
|
||||
|
||||
var _ Metrics = (*Statsd)(nil)
|
||||
var _ RetryMetrics = (*Statsd)(nil)
|
||||
|
||||
var statsdClient = statsd.New("traefik.", kitlog.LoggerFunc(func(keyvals ...interface{}) error {
|
||||
log.Info(keyvals)
|
||||
return nil
|
||||
}))
|
||||
var statsdTicker *time.Ticker
|
||||
|
||||
// Statsd is an Implementation for Metrics that exposes statsd metrics for the latency
|
||||
// and the number of requests partitioned by status code and method.
|
||||
// - number of requests partitioned by status code and method
|
||||
// - request durations
|
||||
// - amount of retries happened
|
||||
type Statsd struct {
|
||||
reqsCounter metrics.Counter
|
||||
reqDurationHistogram metrics.Histogram
|
||||
retryCounter metrics.Counter
|
||||
}
|
||||
|
||||
func (s *Statsd) getReqsCounter() metrics.Counter {
|
||||
return s.reqsCounter
|
||||
}
|
||||
|
||||
func (s *Statsd) getReqDurationHistogram() metrics.Histogram {
|
||||
return s.reqDurationHistogram
|
||||
}
|
||||
|
||||
func (s *Statsd) getRetryCounter() metrics.Counter {
|
||||
return s.retryCounter
|
||||
}
|
||||
|
||||
// NewStatsD creates new instance of StatsD
|
||||
func NewStatsD(name string) *Statsd {
|
||||
var m Statsd
|
||||
|
||||
m.reqsCounter = statsdClient.NewCounter(ddMetricsReqsName, 1.0).With("service", name)
|
||||
m.reqDurationHistogram = statsdClient.NewTiming(ddMetricsLatencyName, 1.0).With("service", name)
|
||||
m.retryCounter = statsdClient.NewCounter(ddRetriesTotalName, 1.0).With("service", name)
|
||||
|
||||
return &m
|
||||
}
|
||||
|
||||
// InitStatsdClient initializes metrics pusher and creates a statsdClient if not created already
|
||||
func InitStatsdClient(config *types.Statsd) *time.Ticker {
|
||||
if statsdTicker == nil {
|
||||
address := config.Address
|
||||
if len(address) == 0 {
|
||||
address = "localhost:8125"
|
||||
}
|
||||
pushInterval, err := time.ParseDuration(config.PushInterval)
|
||||
if err != nil {
|
||||
log.Warnf("Unable to parse %s into pushInterval, using 10s as default value", config.PushInterval)
|
||||
pushInterval = 10 * time.Second
|
||||
}
|
||||
|
||||
report := time.NewTicker(pushInterval)
|
||||
|
||||
safe.Go(func() {
|
||||
statsdClient.SendLoop(report.C, "udp", address)
|
||||
})
|
||||
|
||||
statsdTicker = report
|
||||
}
|
||||
return statsdTicker
|
||||
}
|
||||
|
||||
// StopStatsdClient stops internal statsdTicker which controls the pushing of metrics to StatsD Agent and resets it to `nil`
|
||||
func StopStatsdClient() {
|
||||
if statsdTicker != nil {
|
||||
statsdTicker.Stop()
|
||||
}
|
||||
statsdTicker = nil
|
||||
}
|
|
@ -1,58 +0,0 @@
|
|||
package middlewares
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/containous/traefik/testhelpers"
|
||||
"github.com/containous/traefik/types"
|
||||
"github.com/stvp/go-udp-testing"
|
||||
"github.com/urfave/negroni"
|
||||
)
|
||||
|
||||
func TestStatsD(t *testing.T) {
|
||||
udp.SetAddr(":18125")
|
||||
// This is needed to make sure that UDP Listener listens for data a bit longer, otherwise it will quit after a millisecond
|
||||
udp.Timeout = 5 * time.Second
|
||||
recorder := httptest.NewRecorder()
|
||||
InitStatsdClient(&types.Statsd{":18125", "1s"})
|
||||
|
||||
n := negroni.New()
|
||||
c := NewStatsD("test")
|
||||
defer StopStatsdClient()
|
||||
metricsMiddlewareBackend := NewMetricsWrapper(c)
|
||||
|
||||
n.Use(metricsMiddlewareBackend)
|
||||
r := http.NewServeMux()
|
||||
r.HandleFunc(`/ok`, func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusOK)
|
||||
fmt.Fprintln(w, "ok")
|
||||
})
|
||||
r.HandleFunc(`/not-found`, func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
fmt.Fprintln(w, "not-found")
|
||||
})
|
||||
n.UseHandler(r)
|
||||
|
||||
req1 := testhelpers.MustNewRequest(http.MethodGet, "http://localhost:3000/ok", nil)
|
||||
req2 := testhelpers.MustNewRequest(http.MethodGet, "http://localhost:3000/not-found", nil)
|
||||
|
||||
retryListener := NewMetricsRetryListener(c)
|
||||
retryListener.Retried(1)
|
||||
retryListener.Retried(2)
|
||||
|
||||
expected := []string{
|
||||
// We are only validating counts, as it is nearly impossible to validate latency, since it varies every run
|
||||
"traefik.requests.total:2.000000|c\n",
|
||||
"traefik.backend.retries.total:2.000000|c\n",
|
||||
"traefik.request.duration",
|
||||
}
|
||||
|
||||
udp.ShouldReceiveAll(t, expected, func() {
|
||||
n.ServeHTTP(recorder, req1)
|
||||
n.ServeHTTP(recorder, req2)
|
||||
})
|
||||
}
|
Loading…
Add table
Add a link
Reference in a new issue