1
0
Fork 0

fix: nodes no longer beg to be kept alive

This commit is contained in:
Arthur K. 2026-01-19 22:50:12 +03:00
parent 7c4154a459
commit a32b0f728e
Signed by: wzray
GPG key ID: B97F30FDC4636357
12 changed files with 146 additions and 62 deletions

View file

@ -59,7 +59,7 @@ func (r *Role) OnStartup(ctx context.Context) error {
r.syncFromRegistry()
})
c := r.state.Registry.OnChanged()
c := r.state.Registry.Subscribe()
r.group.Go(func() {
for {
select {

View file

@ -28,6 +28,7 @@ func New(state *state.RuntimeState, config config.MasterConfig) *Role {
config.ObserverInterval,
config.BackoffSeconds,
config.BackoffCount,
config.NodeTimeout,
),
}
}
@ -57,7 +58,7 @@ func (r *Role) notify(path types.Path, v any) {
}
}
func (r *Role) onJoin(node types.Node) ([]types.Node, error) {
func (r *Role) onJoin(node types.Node) (map[string]types.Node, error) {
if err := r.state.Registry.AddNode(node); err != nil {
return nil, err
}
@ -68,7 +69,7 @@ func (r *Role) onJoin(node types.Node) ([]types.Node, error) {
}
func (r *Role) onLeave(node types.Node) (bool, error) {
if err := r.state.Registry.RemoveNode(node.Hostname); err != nil {
if err := r.state.Registry.RemoveNode(node); err != nil {
return false, err
}
@ -78,6 +79,8 @@ func (r *Role) onLeave(node types.Node) (bool, error) {
}
func (r *Role) onKeepAlive(node types.Node) (bool, error) {
r.observer.onKeepAlive(node)
if ok := r.state.Registry.Exists(node.Hostname); !ok {
_, err := r.onJoin(node)
return true, err

View file

@ -2,8 +2,10 @@ package master
import (
"context"
"sync"
"time"
"git.wzray.com/homelab/hivemind/internal/registry"
"git.wzray.com/homelab/hivemind/internal/state"
"git.wzray.com/homelab/hivemind/internal/types"
"git.wzray.com/homelab/hivemind/internal/web/client"
@ -15,6 +17,9 @@ type observer struct {
interval int
backoff int
backoffCount int
nodeTimeout int64
keepalives map[string]int64
lock sync.RWMutex
}
func newObserver(
@ -22,17 +27,35 @@ func newObserver(
interval int,
backoff int,
backoffCount int,
nodeTimeout int,
) *observer {
return &observer{
state: state,
interval: interval,
backoff: backoff,
backoffCount: backoffCount,
nodeTimeout: int64(nodeTimeout) * 1000,
keepalives: make(map[string]int64),
}
}
func (o *observer) pollNodes(ctx context.Context, onLeave func(types.Node) error) {
for _, n := range o.state.Registry.Nodes() {
nodes := o.state.Registry.Nodes()
toCheck := make([]types.Node, 0, len(nodes))
now := time.Now().UnixMilli()
for name, last := range o.keepalives {
if name == o.state.Self.Hostname {
continue
}
if now-last > o.nodeTimeout {
toCheck = append(toCheck, nodes[name])
}
}
// TODO: think about this
for _, n := range toCheck {
name := n.Hostname
logger := log.With().Str("name", name).Logger()
logger.Debug().Msg("checking node")
@ -71,9 +94,42 @@ func (o *observer) pollNodes(ctx context.Context, onLeave func(types.Node) error
}
}
func (o *observer) onKeepAlive(node types.Node) {
o.lock.Lock()
defer o.lock.Unlock()
o.keepalives[node.Hostname] = time.Now().UnixMilli()
}
func (o *observer) onEvent(event registry.RegistryEvent) {
o.lock.Lock()
defer o.lock.Unlock()
switch event.Event {
case registry.EventNodeJoin:
for _, node := range event.Nodes {
o.keepalives[node.Hostname] = time.Now().UnixMilli()
}
case registry.EventNodeLeave:
for _, node := range event.Nodes {
delete(o.keepalives, node.Hostname)
}
case registry.EventSet:
o.keepalives = make(map[string]int64)
now := time.Now().UnixMilli()
for _, node := range event.Nodes {
o.keepalives[node.Hostname] = now
}
}
}
func (o *observer) Start(ctx context.Context, onLeave func(types.Node) error) {
registryEvents := o.state.Registry.Subscribe()
for {
select {
case n := <-registryEvents:
o.onEvent(n)
case <-ctx.Done():
return
case <-time.After(time.Duration(o.interval) * time.Second):

View file

@ -46,7 +46,7 @@ func (r *Role) Join(bootstrap string) error {
logger := log.With().Str("host", m).Logger()
logger.Debug().Msg("trying to join via master")
nodes, err := client.Post[[]types.Node](m, "/master/join", r.state.Self)
nodes, err := client.Post[map[string]types.Node](m, types.PathMasterJoin, r.state.Self)
if err != nil {
errs = append(errs, err)
logger.Debug().Err(err).Msg("unable to join")
@ -91,11 +91,7 @@ func (r *Role) Leave() error {
}
func (r *Role) OnStartup(ctx context.Context) error {
if r.config.KeepaliveInterval != -1 {
r.keepaliveGroup.Go(r.keepaliveFunc(ctx))
} else {
log.Debug().Msg("keepalive disabled")
}
r.keepaliveGroup.Go(r.keepaliveFunc(ctx))
return nil
}
@ -107,28 +103,16 @@ func (r *Role) OnShutdown() error {
func (r *Role) keepaliveFunc(ctx context.Context) func() {
sendKeepalive := func() {
masters := r.state.Registry.ByRole(types.MasterRole)
if len(masters) == 0 {
return
}
sent := false
for _, m := range masters {
for _, m := range r.state.Registry.ByRole(types.MasterRole) {
logger := log.With().Str("name", m.Hostname).Logger()
logger.Debug().Msg("sending keepalive packet")
if _, err := client.Post[any](m.Endpoint, types.PathMasterKeepalive, r.state.Self); err != nil {
continue
logger.Info().Err(err).Msg("unable to send keepalive packet")
} else {
logger.Debug().Msg("keepalive packet sent")
sent = true
break
}
}
if !sent {
log.Info().Msg("unable to send keepalive packet")
}
}
return func() {
@ -151,7 +135,7 @@ func (r *Role) onJoin(node types.Node) (bool, error) {
}
func (r *Role) onLeave(node types.Node) (bool, error) {
if err := r.state.Registry.RemoveNode(node.Hostname); err != nil {
if err := r.state.Registry.RemoveNode(node); err != nil {
return false, err
}
return true, nil