1
0
Fork 0

Improve Prometheus metrics removal

This commit is contained in:
Marco Jantke 2018-06-05 12:32:03 +02:00 committed by Traefiker Bot
parent f317e50136
commit 2c18750537
7 changed files with 480 additions and 215 deletions

View file

@ -36,25 +36,18 @@ const (
backendServerUpName = metricNamePrefix + "backend_server_up"
)
const (
// generationAgeForever indicates that a metric never gets outdated.
generationAgeForever = 0
// generationAgeDefault is the default age of three generations.
generationAgeDefault = 3
)
// promState holds all metric state internally and acts as the only Collector we register for Prometheus.
//
// This enables control to remove metrics that belong to outdated configuration.
// As an example why this is required, consider Traefik learns about a new service.
// It populates the 'traefik_server_backend_up' metric for it with a value of 1 (alive).
// When the backend is undeployed now the metric is still there in the client library
// and will be until Traefik would be restarted.
// and will be returned on the metrics endpoint until Traefik would be restarted.
//
// To solve this problem promState keeps track of configuration generations.
// Every time a new configuration is loaded, the generation is increased by one.
// Metrics that "belong" to a dynamic configuration part of Traefik (e.g. backend, entrypoint)
// are removed, given they were tracked more than 3 generations ago.
// To solve this problem promState keeps track of Traefik's dynamic configuration.
// Metrics that "belong" to a dynamic configuration part like backends or entrypoints
// are removed after they were scraped at least once when the corresponding object
// doesn't exist anymore.
var promState = newPrometheusState()
// PrometheusHandler exposes Prometheus routes.
@ -163,40 +156,66 @@ func RegisterPrometheus(config *types.Prometheus) Registry {
}
}
// OnConfigurationUpdate increases the current generation of the prometheus state.
func OnConfigurationUpdate() {
promState.IncGeneration()
// OnConfigurationUpdate receives the current configuration from Traefik.
// It then converts the configuration to the optimized package internal format
// and sets it to the promState.
func OnConfigurationUpdate(configurations types.Configurations) {
dynamicConfig := newDynamicConfig()
for _, config := range configurations {
for _, frontend := range config.Frontends {
for _, entrypointName := range frontend.EntryPoints {
dynamicConfig.entrypoints[entrypointName] = true
}
}
for backendName, backend := range config.Backends {
dynamicConfig.backends[backendName] = make(map[string]bool)
for _, server := range backend.Servers {
dynamicConfig.backends[backendName][server.URL] = true
}
}
}
promState.SetDynamicConfig(dynamicConfig)
}
func newPrometheusState() *prometheusState {
collectors := make(chan *collector)
state := make(map[string]*collector)
return &prometheusState{
collectors: collectors,
state: state,
collectors: make(chan *collector),
dynamicConfig: newDynamicConfig(),
state: make(map[string]*collector),
}
}
type prometheusState struct {
currentGeneration int
collectors chan *collector
describers []func(ch chan<- *stdprometheus.Desc)
collectors chan *collector
describers []func(ch chan<- *stdprometheus.Desc)
mtx sync.Mutex
state map[string]*collector
mtx sync.Mutex
dynamicConfig *dynamicConfig
state map[string]*collector
}
func (ps *prometheusState) IncGeneration() {
// reset is a utility method for unit testing. It should be called after each
// test run that changes promState internally in order to avoid dependencies
// between unit tests.
func (ps *prometheusState) reset() {
ps.collectors = make(chan *collector)
ps.describers = []func(ch chan<- *stdprometheus.Desc){}
ps.dynamicConfig = newDynamicConfig()
ps.state = make(map[string]*collector)
}
func (ps *prometheusState) SetDynamicConfig(dynamicConfig *dynamicConfig) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
ps.currentGeneration++
ps.dynamicConfig = dynamicConfig
}
func (ps *prometheusState) ListenValueUpdates() {
for collector := range ps.collectors {
ps.mtx.Lock()
collector.lastTrackedGeneration = ps.currentGeneration
ps.state[collector.id] = collector
ps.mtx.Unlock()
}
@ -212,42 +231,89 @@ func (ps *prometheusState) Describe(ch chan<- *stdprometheus.Desc) {
// Collect implements prometheus.Collector. It calls the Collect
// method of all metrics it received on the collectors channel.
// It's also responsible to remove metrics that were tracked
// at least three generations ago. Those metrics are cleaned up
// after the Collect of them were called.
// It's also responsible to remove metrics that belong to an outdated configuration.
// The removal happens only after their Collect method was called to ensure that
// also those metrics will be exported on the current scrape.
func (ps *prometheusState) Collect(ch chan<- stdprometheus.Metric) {
ps.mtx.Lock()
defer ps.mtx.Unlock()
outdatedKeys := []string{}
var outdatedKeys []string
for key, cs := range ps.state {
cs.collector.Collect(ch)
if cs.maxAge == generationAgeForever {
continue
}
if ps.currentGeneration-cs.lastTrackedGeneration >= cs.maxAge {
if ps.isOutdated(cs) {
outdatedKeys = append(outdatedKeys, key)
}
}
for _, key := range outdatedKeys {
ps.state[key].delete()
delete(ps.state, key)
}
}
func newCollector(metricName string, lnvs labelNamesValues, c stdprometheus.Collector) *collector {
maxAge := generationAgeDefault
// isOutdated checks whether the passed collector has labels that mark
// it as belonging to an outdated configuration of Traefik.
func (ps *prometheusState) isOutdated(collector *collector) bool {
labels := collector.labels
// metrics without labels should never become outdated
if len(lnvs) == 0 {
maxAge = generationAgeForever
if entrypointName, ok := labels["entrypoint"]; ok && !ps.dynamicConfig.hasEntrypoint(entrypointName) {
return true
}
if backendName, ok := labels["backend"]; ok {
if !ps.dynamicConfig.hasBackend(backendName) {
return true
}
if url, ok := labels["url"]; ok && !ps.dynamicConfig.hasServerURL(backendName, url) {
return true
}
}
return false
}
func newDynamicConfig() *dynamicConfig {
return &dynamicConfig{
entrypoints: make(map[string]bool),
backends: make(map[string]map[string]bool),
}
}
// dynamicConfig holds the current configuration for entrypoints, backends,
// and server URLs in an optimized way to check for existence. This provides
// a performant way to check whether the collected metrics belong to the
// current configuration or to an outdated one.
type dynamicConfig struct {
entrypoints map[string]bool
backends map[string]map[string]bool
}
func (d *dynamicConfig) hasEntrypoint(entrypointName string) bool {
_, ok := d.entrypoints[entrypointName]
return ok
}
func (d *dynamicConfig) hasBackend(backendName string) bool {
_, ok := d.backends[backendName]
return ok
}
func (d *dynamicConfig) hasServerURL(backendName, serverURL string) bool {
if backend, hasBackend := d.backends[backendName]; hasBackend {
_, ok := backend[serverURL]
return ok
}
return false
}
func newCollector(metricName string, labels stdprometheus.Labels, c stdprometheus.Collector, delete func()) *collector {
return &collector{
id: buildMetricID(metricName, lnvs),
maxAge: maxAge,
id: buildMetricID(metricName, labels),
labels: labels,
collector: c,
delete: delete,
}
}
@ -255,16 +321,19 @@ func newCollector(metricName string, lnvs labelNamesValues, c stdprometheus.Coll
// It adds information on how many generations this metric should be present
// in the /metrics output, relatived to the time it was last tracked.
type collector struct {
id string
collector stdprometheus.Collector
lastTrackedGeneration int
maxAge int
id string
labels stdprometheus.Labels
collector stdprometheus.Collector
delete func()
}
func buildMetricID(metricName string, lnvs labelNamesValues) string {
newLnvs := append([]string{}, lnvs...)
sort.Strings(newLnvs)
return metricName + ":" + strings.Join(newLnvs, "|")
func buildMetricID(metricName string, labels stdprometheus.Labels) string {
var labelNamesValues []string
for name, value := range labels {
labelNamesValues = append(labelNamesValues, name, value)
}
sort.Strings(labelNamesValues)
return metricName + ":" + strings.Join(labelNamesValues, "|")
}
func newCounterFrom(collectors chan<- *collector, opts stdprometheus.CounterOpts, labelNames []string) *counter {
@ -297,9 +366,12 @@ func (c *counter) With(labelValues ...string) metrics.Counter {
}
func (c *counter) Add(delta float64) {
collector := c.cv.With(c.labelNamesValues.ToLabels())
labels := c.labelNamesValues.ToLabels()
collector := c.cv.With(labels)
collector.Add(delta)
c.collectors <- newCollector(c.name, c.labelNamesValues, collector)
c.collectors <- newCollector(c.name, labels, collector, func() {
c.cv.Delete(labels)
})
}
func (c *counter) Describe(ch chan<- *stdprometheus.Desc) {
@ -336,15 +408,21 @@ func (g *gauge) With(labelValues ...string) metrics.Gauge {
}
func (g *gauge) Add(delta float64) {
collector := g.gv.With(g.labelNamesValues.ToLabels())
labels := g.labelNamesValues.ToLabels()
collector := g.gv.With(labels)
collector.Add(delta)
g.collectors <- newCollector(g.name, g.labelNamesValues, collector)
g.collectors <- newCollector(g.name, labels, collector, func() {
g.gv.Delete(labels)
})
}
func (g *gauge) Set(value float64) {
collector := g.gv.With(g.labelNamesValues.ToLabels())
labels := g.labelNamesValues.ToLabels()
collector := g.gv.With(labels)
collector.Set(value)
g.collectors <- newCollector(g.name, g.labelNamesValues, collector)
g.collectors <- newCollector(g.name, labels, collector, func() {
g.gv.Delete(labels)
})
}
func (g *gauge) Describe(ch chan<- *stdprometheus.Desc) {
@ -377,9 +455,12 @@ func (h *histogram) With(labelValues ...string) metrics.Histogram {
}
func (h *histogram) Observe(value float64) {
collector := h.hv.With(h.labelNamesValues.ToLabels())
labels := h.labelNamesValues.ToLabels()
collector := h.hv.With(labels)
collector.Observe(value)
h.collectors <- newCollector(h.name, h.labelNamesValues, collector)
h.collectors <- newCollector(h.name, labels, collector, func() {
h.hv.Delete(labels)
})
}
func (h *histogram) Describe(ch chan<- *stdprometheus.Desc) {

View file

@ -7,12 +7,16 @@ import (
"testing"
"time"
th "github.com/containous/traefik/testhelpers"
"github.com/containous/traefik/types"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
)
func TestPrometheus(t *testing.T) {
// Reset state of global promState.
defer promState.reset()
prometheusRegistry := RegisterPrometheus(&types.Prometheus{})
defer prometheus.Unregister(promState)
@ -177,56 +181,94 @@ func TestPrometheus(t *testing.T) {
}
}
func TestPrometheusGenerationLogicForMetricWithLabel(t *testing.T) {
func TestPrometheusMetricRemoval(t *testing.T) {
// Reset state of global promState.
defer promState.reset()
prometheusRegistry := RegisterPrometheus(&types.Prometheus{})
defer prometheus.Unregister(promState)
// Metrics with labels belonging to a specific configuration in Traefik
// should be removed when the generationMaxAge is exceeded. As example
// we use the traefik_backend_requests_total metric.
configurations := make(types.Configurations)
configurations["providerName"] = th.BuildConfiguration(
th.WithFrontends(
th.WithFrontend("backend1", th.WithEntryPoints("entrypoint1")),
),
th.WithBackends(
th.WithBackendNew("backend1", th.WithServersNew(th.WithServerNew("http://localhost:9000"))),
),
)
OnConfigurationUpdate(configurations)
// Register some metrics manually that are not part of the active configuration.
// Those metrics should be part of the /metrics output on the first scrape but
// should be removed after that scrape.
prometheusRegistry.
EntrypointReqsCounter().
With("entrypoint", "entrypoint2", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet, "protocol", "http").
Add(1)
prometheusRegistry.
BackendReqsCounter().
With("backend", "backend1", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet, "protocol", "http").
With("backend", "backend2", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet, "protocol", "http").
Add(1)
prometheusRegistry.
BackendServerUpGauge().
With("backend", "backend1", "url", "http://localhost:9999").
Set(1)
delayForTrackingCompletion()
assertMetricsExist(t, mustScrape(), entrypointReqsTotalName, backendReqsTotalName, backendServerUpName)
assertMetricsAbsent(t, mustScrape(), entrypointReqsTotalName, backendReqsTotalName, backendServerUpName)
// To verify that metrics belonging to active configurations are not removed
// here the counter examples.
prometheusRegistry.
EntrypointReqsCounter().
With("entrypoint", "entrypoint1", "code", strconv.Itoa(http.StatusOK), "method", http.MethodGet, "protocol", "http").
Add(1)
delayForTrackingCompletion()
assertMetricExists(t, backendReqsTotalName, mustScrape())
// Increase the config generation one more than the max age of a metric.
for i := 0; i < generationAgeDefault+1; i++ {
OnConfigurationUpdate()
}
// On the next scrape the metric still exists and will be removed
// after the scrape completed.
assertMetricExists(t, backendReqsTotalName, mustScrape())
// Now the metric should be absent.
assertMetricAbsent(t, backendReqsTotalName, mustScrape())
assertMetricsExist(t, mustScrape(), entrypointReqsTotalName)
assertMetricsExist(t, mustScrape(), entrypointReqsTotalName)
}
func TestPrometheusGenerationLogicForMetricWithoutLabel(t *testing.T) {
func TestPrometheusRemovedMetricsReset(t *testing.T) {
// Reset state of global promState.
defer promState.reset()
prometheusRegistry := RegisterPrometheus(&types.Prometheus{})
defer prometheus.Unregister(promState)
// Metrics without labels like traefik_config_reloads_total should live forever
// and never get removed.
prometheusRegistry.ConfigReloadsCounter().Add(1)
labelNamesValues := []string{
"backend", "backend",
"code", strconv.Itoa(http.StatusOK),
"method", http.MethodGet,
"protocol", "http",
}
prometheusRegistry.
BackendReqsCounter().
With(labelNamesValues...).
Add(3)
delayForTrackingCompletion()
assertMetricExists(t, configReloadsTotalName, mustScrape())
metricsFamilies := mustScrape()
assertCounterValue(t, 3, findMetricFamily(backendReqsTotalName, metricsFamilies), labelNamesValues...)
// Increase the config generation one more than the max age of a metric.
for i := 0; i < generationAgeDefault+100; i++ {
OnConfigurationUpdate()
}
// There is no dynamic configuration and so this metric will be deleted
// after the first scrape.
assertMetricsAbsent(t, mustScrape(), backendReqsTotalName)
// Scrape two times in order to verify, that it is not removed after the
// first scrape completed.
assertMetricExists(t, configReloadsTotalName, mustScrape())
assertMetricExists(t, configReloadsTotalName, mustScrape())
prometheusRegistry.
BackendReqsCounter().
With(labelNamesValues...).
Add(1)
delayForTrackingCompletion()
metricsFamilies = mustScrape()
assertCounterValue(t, 1, findMetricFamily(backendReqsTotalName, metricsFamilies), labelNamesValues...)
}
// Tracking and gathering the metrics happens concurrently.
@ -247,17 +289,23 @@ func mustScrape() []*dto.MetricFamily {
return families
}
func assertMetricExists(t *testing.T, name string, families []*dto.MetricFamily) {
func assertMetricsExist(t *testing.T, families []*dto.MetricFamily, metricNames ...string) {
t.Helper()
if findMetricFamily(name, families) == nil {
t.Errorf("gathered metrics do not contain %q", name)
for _, metricName := range metricNames {
if findMetricFamily(metricName, families) == nil {
t.Errorf("gathered metrics should contain %q", metricName)
}
}
}
func assertMetricAbsent(t *testing.T, name string, families []*dto.MetricFamily) {
func assertMetricsAbsent(t *testing.T, families []*dto.MetricFamily, metricNames ...string) {
t.Helper()
if findMetricFamily(name, families) != nil {
t.Errorf("gathered metrics contain %q, but should not", name)
for _, metricName := range metricNames {
if findMetricFamily(metricName, families) != nil {
t.Errorf("gathered metrics should not contain %q", metricName)
}
}
}
@ -270,6 +318,58 @@ func findMetricFamily(name string, families []*dto.MetricFamily) *dto.MetricFami
return nil
}
func findMetricByLabelNamesValues(family *dto.MetricFamily, labelNamesValues ...string) *dto.Metric {
if family == nil {
return nil
}
for _, metric := range family.Metric {
if hasMetricAllLabelPairs(metric, labelNamesValues...) {
return metric
}
}
return nil
}
func hasMetricAllLabelPairs(metric *dto.Metric, labelNamesValues ...string) bool {
for i := 0; i < len(labelNamesValues); i += 2 {
name, val := labelNamesValues[i], labelNamesValues[i+1]
if !hasMetricLabelPair(metric, name, val) {
return false
}
}
return true
}
func hasMetricLabelPair(metric *dto.Metric, labelName, labelValue string) bool {
for _, labelPair := range metric.Label {
if labelPair.GetName() == labelName && labelPair.GetValue() == labelValue {
return true
}
}
return false
}
func assertCounterValue(t *testing.T, want float64, family *dto.MetricFamily, labelNamesValues ...string) {
t.Helper()
metric := findMetricByLabelNamesValues(family, labelNamesValues...)
if metric == nil {
t.Error("metric must not be nil")
return
}
if metric.Counter == nil {
t.Errorf("metric %s must be a counter", family.GetName())
return
}
if cv := metric.Counter.GetValue(); cv != want {
t.Errorf("metric %s has value %v, want %v", family.GetName(), cv, want)
}
}
func buildCounterAssert(t *testing.T, metricName string, expectedValue int) func(family *dto.MetricFamily) {
return func(family *dto.MetricFamily) {
if cv := int(family.Metric[0].Counter.GetValue()); cv != expectedValue {