139 lines
3 KiB
Go
139 lines
3 KiB
Go
package master
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"git.wzray.com/homelab/hivemind/internal/registry"
|
|
"git.wzray.com/homelab/hivemind/internal/state"
|
|
"git.wzray.com/homelab/hivemind/internal/types"
|
|
"git.wzray.com/homelab/hivemind/internal/web/client"
|
|
"github.com/rs/zerolog/log"
|
|
)
|
|
|
|
type observer struct {
|
|
state *state.RuntimeState
|
|
interval int
|
|
backoff int
|
|
backoffCount int
|
|
nodeTimeout int64
|
|
keepalives map[string]int64
|
|
lock sync.RWMutex
|
|
}
|
|
|
|
func newObserver(
|
|
state *state.RuntimeState,
|
|
interval int,
|
|
backoff int,
|
|
backoffCount int,
|
|
nodeTimeout int,
|
|
) *observer {
|
|
return &observer{
|
|
state: state,
|
|
interval: interval,
|
|
backoff: backoff,
|
|
backoffCount: backoffCount,
|
|
nodeTimeout: int64(nodeTimeout) * 1000,
|
|
keepalives: make(map[string]int64),
|
|
}
|
|
}
|
|
|
|
func (o *observer) pollNodes(ctx context.Context, onLeave func(types.Node) error) {
|
|
nodes := o.state.Registry.Nodes()
|
|
toCheck := make([]types.Node, 0, len(nodes))
|
|
now := time.Now().UnixMilli()
|
|
|
|
for name, last := range o.keepalives {
|
|
if name == o.state.Self.Hostname {
|
|
continue
|
|
}
|
|
|
|
if now-last > o.nodeTimeout {
|
|
toCheck = append(toCheck, nodes[name])
|
|
}
|
|
}
|
|
|
|
// TODO: think about this
|
|
for _, n := range toCheck {
|
|
name := n.Hostname
|
|
logger := log.With().Str("name", name).Logger()
|
|
logger.Debug().Msg("checking node")
|
|
|
|
delay := time.Duration(o.backoff)
|
|
alive := false
|
|
for i := o.backoffCount - 1; i >= 0; i-- {
|
|
_, err := client.Get[any](n.Endpoint, types.PathNodeHealthcheck)
|
|
|
|
if err == nil {
|
|
logger.Debug().Msg("node is alive")
|
|
alive = true
|
|
break
|
|
}
|
|
|
|
if i == 0 {
|
|
break
|
|
}
|
|
|
|
logger.Info().Any("delay", delay).Msg("node didn't respond, sleeping")
|
|
select {
|
|
case <-ctx.Done():
|
|
goto dead
|
|
case <-time.After(delay * time.Second):
|
|
delay *= 2
|
|
}
|
|
}
|
|
|
|
dead:
|
|
if !alive {
|
|
logger.Info().Msg("node is dead, removing")
|
|
if err := onLeave(n); err != nil {
|
|
logger.Warn().Err(err).Msg("onLeave call failed")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (o *observer) onKeepAlive(node types.Node) {
|
|
o.lock.Lock()
|
|
defer o.lock.Unlock()
|
|
o.keepalives[node.Hostname] = time.Now().UnixMilli()
|
|
}
|
|
|
|
func (o *observer) onEvent(event registry.RegistryEvent) {
|
|
o.lock.Lock()
|
|
defer o.lock.Unlock()
|
|
|
|
switch event.Event {
|
|
case registry.EventNodeJoin:
|
|
for _, node := range event.Nodes {
|
|
o.keepalives[node.Hostname] = time.Now().UnixMilli()
|
|
}
|
|
case registry.EventNodeLeave:
|
|
for _, node := range event.Nodes {
|
|
delete(o.keepalives, node.Hostname)
|
|
}
|
|
case registry.EventSet:
|
|
o.keepalives = make(map[string]int64)
|
|
now := time.Now().UnixMilli()
|
|
|
|
for _, node := range event.Nodes {
|
|
o.keepalives[node.Hostname] = now
|
|
}
|
|
}
|
|
}
|
|
|
|
func (o *observer) Start(ctx context.Context, onLeave func(types.Node) error) {
|
|
registryEvents := o.state.Registry.Subscribe()
|
|
|
|
for {
|
|
select {
|
|
case n := <-registryEvents:
|
|
o.onEvent(n)
|
|
case <-ctx.Done():
|
|
return
|
|
case <-time.After(time.Duration(o.interval) * time.Second):
|
|
o.pollNodes(ctx, onLeave)
|
|
}
|
|
}
|
|
}
|