package master import ( "context" "sync" "time" "git.wzray.com/homelab/hivemind/internal/registry" "git.wzray.com/homelab/hivemind/internal/state" "git.wzray.com/homelab/hivemind/internal/types" "git.wzray.com/homelab/hivemind/internal/web/client" "github.com/rs/zerolog/log" ) type observer struct { state *state.RuntimeState interval int backoff int backoffCount int nodeTimeout int64 keepalives map[string]int64 lock sync.RWMutex } func newObserver( state *state.RuntimeState, interval int, backoff int, backoffCount int, nodeTimeout int, ) *observer { return &observer{ state: state, interval: interval, backoff: backoff, backoffCount: backoffCount, nodeTimeout: int64(nodeTimeout) * 1000, keepalives: make(map[string]int64), } } func (o *observer) pollNodes(ctx context.Context, onLeave func(types.Node) error) { nodes := o.state.Registry.Nodes() toCheck := make([]types.Node, 0, len(nodes)) now := time.Now().UnixMilli() for name, last := range o.keepalives { if name == o.state.Self.Hostname { continue } if now-last > o.nodeTimeout { toCheck = append(toCheck, nodes[name]) } } // TODO: think about this for _, n := range toCheck { name := n.Hostname logger := log.With().Str("name", name).Logger() logger.Debug().Msg("checking node") delay := time.Duration(o.backoff) alive := false for i := o.backoffCount - 1; i >= 0; i-- { _, err := client.Get[any](n.Endpoint, types.PathNodeHealthcheck) if err == nil { logger.Debug().Msg("node is alive") alive = true break } if i == 0 { break } logger.Info().Any("delay", delay).Msg("node didn't respond, sleeping") select { case <-ctx.Done(): goto dead case <-time.After(delay * time.Second): delay *= 2 } } dead: if !alive { logger.Info().Msg("node is dead, removing") if err := onLeave(n); err != nil { logger.Warn().Err(err).Msg("onLeave call failed") } } } } func (o *observer) onKeepAlive(node types.Node) { o.lock.Lock() defer o.lock.Unlock() o.keepalives[node.Hostname] = time.Now().UnixMilli() } func (o *observer) onEvent(event registry.RegistryEvent) { o.lock.Lock() defer o.lock.Unlock() switch event.Event { case registry.EventNodeJoin: for _, node := range event.Nodes { o.keepalives[node.Hostname] = time.Now().UnixMilli() } case registry.EventNodeLeave: for _, node := range event.Nodes { delete(o.keepalives, node.Hostname) } case registry.EventSet: o.keepalives = make(map[string]int64) now := time.Now().UnixMilli() for _, node := range event.Nodes { o.keepalives[node.Hostname] = now } } } func (o *observer) Start(ctx context.Context, onLeave func(types.Node) error) { registryEvents := o.state.Registry.Subscribe() for { select { case n := <-registryEvents: o.onEvent(n) case <-ctx.Done(): return case <-time.After(time.Duration(o.interval) * time.Second): o.pollNodes(ctx, onLeave) } } }