1
0
Fork 0
hivemind/internal/roles/master/observer.go

139 lines
3 KiB
Go

package master
import (
"context"
"sync"
"time"
"git.wzray.com/homelab/hivemind/internal/registry"
"git.wzray.com/homelab/hivemind/internal/state"
"git.wzray.com/homelab/hivemind/internal/types"
"git.wzray.com/homelab/hivemind/internal/web/client"
"github.com/rs/zerolog/log"
)
type observer struct {
state *state.RuntimeState
interval int
backoff int
backoffCount int
nodeTimeout int64
keepalives map[string]int64
lock sync.RWMutex
}
func newObserver(
state *state.RuntimeState,
interval int,
backoff int,
backoffCount int,
nodeTimeout int,
) *observer {
return &observer{
state: state,
interval: interval,
backoff: backoff,
backoffCount: backoffCount,
nodeTimeout: int64(nodeTimeout) * 1000,
keepalives: make(map[string]int64),
}
}
func (o *observer) pollNodes(ctx context.Context, onLeave func(types.Node) error) {
nodes := o.state.Registry.Nodes()
toCheck := make([]types.Node, 0, len(nodes))
now := time.Now().UnixMilli()
for name, last := range o.keepalives {
if name == o.state.Self.Hostname {
continue
}
if now-last > o.nodeTimeout {
toCheck = append(toCheck, nodes[name])
}
}
// TODO: think about this
for _, n := range toCheck {
name := n.Hostname
logger := log.With().Str("name", name).Logger()
logger.Debug().Msg("checking node")
delay := time.Duration(o.backoff)
alive := false
for i := o.backoffCount - 1; i >= 0; i-- {
_, err := client.Get[any](n.Endpoint, types.PathNodeHealthcheck)
if err == nil {
logger.Debug().Msg("node is alive")
alive = true
break
}
if i == 0 {
break
}
logger.Info().Any("delay", delay).Msg("node didn't respond, sleeping")
select {
case <-ctx.Done():
goto dead
case <-time.After(delay * time.Second):
delay *= 2
}
}
dead:
if !alive {
logger.Info().Msg("node is dead, removing")
if err := onLeave(n); err != nil {
logger.Warn().Err(err).Msg("onLeave call failed")
}
}
}
}
func (o *observer) onKeepAlive(node types.Node) {
o.lock.Lock()
defer o.lock.Unlock()
o.keepalives[node.Hostname] = time.Now().UnixMilli()
}
func (o *observer) onEvent(event registry.RegistryEvent) {
o.lock.Lock()
defer o.lock.Unlock()
switch event.Event {
case registry.EventNodeJoin:
for _, node := range event.Nodes {
o.keepalives[node.Hostname] = time.Now().UnixMilli()
}
case registry.EventNodeLeave:
for _, node := range event.Nodes {
delete(o.keepalives, node.Hostname)
}
case registry.EventSet:
o.keepalives = make(map[string]int64)
now := time.Now().UnixMilli()
for _, node := range event.Nodes {
o.keepalives[node.Hostname] = now
}
}
}
func (o *observer) Start(ctx context.Context, onLeave func(types.Node) error) {
registryEvents := o.state.Registry.Subscribe()
for {
select {
case n := <-registryEvents:
o.onEvent(n)
case <-ctx.Done():
return
case <-time.After(time.Duration(o.interval) * time.Second):
o.pollNodes(ctx, onLeave)
}
}
}