1
0
Fork 0

Add unhealthy Interval to the health check configuration

This commit is contained in:
Swastik Sarkar 2025-04-09 13:40:05 +05:30 committed by GitHub
parent 6c3b099c25
commit d7d0017545
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
36 changed files with 701 additions and 295 deletions

View file

@ -40,18 +40,27 @@ type metricsHealthCheck interface {
ServiceServerUpGauge() gokitmetrics.Gauge
}
type target struct {
targetURL *url.URL
name string
}
type ServiceHealthChecker struct {
balancer StatusSetter
info *runtime.ServiceInfo
config *dynamic.ServerHealthCheck
interval time.Duration
timeout time.Duration
config *dynamic.ServerHealthCheck
interval time.Duration
unhealthyInterval time.Duration
timeout time.Duration
metrics metricsHealthCheck
client *http.Client
targets map[string]*url.URL
client *http.Client
healthyTargets chan target
unhealthyTargets chan target
serviceName string
}
@ -60,13 +69,26 @@ func NewServiceHealthChecker(ctx context.Context, metrics metricsHealthCheck, co
interval := time.Duration(config.Interval)
if interval <= 0 {
logger.Error().Msg("Health check interval smaller than zero")
logger.Error().Msg("Health check interval smaller than zero, default value will be used instead.")
interval = time.Duration(dynamic.DefaultHealthCheckInterval)
}
// If the unhealthyInterval option is not set, we use the interval option value,
// to check the unhealthy targets as often as the healthy ones.
var unhealthyInterval time.Duration
if config.UnhealthyInterval == nil {
unhealthyInterval = interval
} else {
unhealthyInterval = time.Duration(*config.UnhealthyInterval)
if unhealthyInterval <= 0 {
logger.Error().Msg("Health check unhealthy interval smaller than zero, default value will be used instead.")
unhealthyInterval = time.Duration(dynamic.DefaultHealthCheckInterval)
}
}
timeout := time.Duration(config.Timeout)
if timeout <= 0 {
logger.Error().Msg("Health check timeout smaller than zero")
logger.Error().Msg("Health check timeout smaller than zero, default value will be used instead.")
timeout = time.Duration(dynamic.DefaultHealthCheckTimeout)
}
@ -80,21 +102,38 @@ func NewServiceHealthChecker(ctx context.Context, metrics metricsHealthCheck, co
}
}
healthyTargets := make(chan target, len(targets))
for name, targetURL := range targets {
healthyTargets <- target{
targetURL: targetURL,
name: name,
}
}
unhealthyTargets := make(chan target, len(targets))
return &ServiceHealthChecker{
balancer: service,
info: info,
config: config,
interval: interval,
timeout: timeout,
targets: targets,
serviceName: serviceName,
client: client,
metrics: metrics,
balancer: service,
info: info,
config: config,
interval: interval,
unhealthyInterval: unhealthyInterval,
timeout: timeout,
healthyTargets: healthyTargets,
unhealthyTargets: unhealthyTargets,
serviceName: serviceName,
client: client,
metrics: metrics,
}
}
func (shc *ServiceHealthChecker) Launch(ctx context.Context) {
ticker := time.NewTicker(shc.interval)
go shc.healthcheck(ctx, shc.unhealthyTargets, shc.unhealthyInterval)
shc.healthcheck(ctx, shc.healthyTargets, shc.interval)
}
func (shc *ServiceHealthChecker) healthcheck(ctx context.Context, targets chan target, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
@ -103,7 +142,23 @@ func (shc *ServiceHealthChecker) Launch(ctx context.Context) {
return
case <-ticker.C:
for proxyName, target := range shc.targets {
// We collect the targets to check once for all,
// to avoid rechecking a target that has been moved during the health check.
var targetsToCheck []target
hasMoreTargets := true
for hasMoreTargets {
select {
case <-ctx.Done():
return
case target := <-targets:
targetsToCheck = append(targetsToCheck, target)
default:
hasMoreTargets = false
}
}
// Now we can check the targets.
for _, target := range targetsToCheck {
select {
case <-ctx.Done():
return
@ -113,14 +168,14 @@ func (shc *ServiceHealthChecker) Launch(ctx context.Context) {
up := true
serverUpMetricValue := float64(1)
if err := shc.executeHealthCheck(ctx, shc.config, target); err != nil {
if err := shc.executeHealthCheck(ctx, shc.config, target.targetURL); err != nil {
// The context is canceled when the dynamic configuration is refreshed.
if errors.Is(err, context.Canceled) {
return
}
log.Ctx(ctx).Warn().
Str("targetURL", target.String()).
Str("targetURL", target.targetURL.String()).
Err(err).
Msg("Health check failed.")
@ -128,17 +183,21 @@ func (shc *ServiceHealthChecker) Launch(ctx context.Context) {
serverUpMetricValue = float64(0)
}
shc.balancer.SetStatus(ctx, proxyName, up)
shc.balancer.SetStatus(ctx, target.name, up)
statusStr := runtime.StatusDown
var statusStr string
if up {
statusStr = runtime.StatusUp
shc.healthyTargets <- target
} else {
statusStr = runtime.StatusDown
shc.unhealthyTargets <- target
}
shc.info.UpdateServerStatus(target.String(), statusStr)
shc.info.UpdateServerStatus(target.targetURL.String(), statusStr)
shc.metrics.ServiceServerUpGauge().
With("service", shc.serviceName, "url", target.String()).
With("service", shc.serviceName, "url", target.targetURL.String()).
Set(serverUpMetricValue)
}
}

View file

@ -419,11 +419,12 @@ func TestServiceHealthChecker_Launch(t *testing.T) {
lb := &testLoadBalancer{RWMutex: &sync.RWMutex{}}
config := &dynamic.ServerHealthCheck{
Mode: test.mode,
Status: test.status,
Path: "/path",
Interval: ptypes.Duration(500 * time.Millisecond),
Timeout: ptypes.Duration(499 * time.Millisecond),
Mode: test.mode,
Status: test.status,
Path: "/path",
Interval: ptypes.Duration(500 * time.Millisecond),
UnhealthyInterval: pointer(ptypes.Duration(500 * time.Millisecond)),
Timeout: ptypes.Duration(499 * time.Millisecond),
}
gauge := &testhelpers.CollectingGauge{}
@ -456,3 +457,54 @@ func TestServiceHealthChecker_Launch(t *testing.T) {
})
}
}
func TestDifferentIntervals(t *testing.T) {
// The context is passed to the health check and
// canonically canceled by the test server once all expected requests have been received.
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
healthyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusOK)
}))
healthyURL := testhelpers.MustParseURL(healthyServer.URL)
unhealthyServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusServiceUnavailable)
}))
unhealthyURL := testhelpers.MustParseURL(unhealthyServer.URL)
lb := &testLoadBalancer{RWMutex: &sync.RWMutex{}}
config := &dynamic.ServerHealthCheck{
Mode: "http",
Path: "/path",
Interval: ptypes.Duration(500 * time.Millisecond),
UnhealthyInterval: pointer(ptypes.Duration(50 * time.Millisecond)),
Timeout: ptypes.Duration(499 * time.Millisecond),
}
gauge := &testhelpers.CollectingGauge{}
serviceInfo := &runtime.ServiceInfo{}
hc := NewServiceHealthChecker(ctx, &MetricsMock{gauge}, config, lb, serviceInfo, http.DefaultTransport, map[string]*url.URL{"healthy": healthyURL, "unhealthy": unhealthyURL}, "foobar")
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
hc.Launch(ctx)
wg.Done()
}()
select {
case <-time.After(2 * time.Second):
break
case <-ctx.Done():
wg.Wait()
}
lb.Lock()
defer lb.Unlock()
assert.Greater(t, lb.numRemovedServers, lb.numUpsertedServers, "removed servers greater than upserted servers")
}