package node import ( "context" "errors" "sync" "time" "git.wzray.com/homelab/hivemind/internal/config" "git.wzray.com/homelab/hivemind/internal/state" "git.wzray.com/homelab/hivemind/internal/types" "git.wzray.com/homelab/hivemind/internal/web/client" "github.com/rs/zerolog/log" ) type Role struct { state *state.RuntimeState keepaliveGroup sync.WaitGroup config config.NodeConfig } func New(state *state.RuntimeState, config config.NodeConfig) *Role { return &Role{ state: state, config: config, } } func (r *Role) Join(bootstrap string) error { masters := make(map[string]struct{}) for _, node := range r.state.Registry.ByRole(types.MasterRole) { if node.Hostname == r.state.Self.Hostname { continue } masters[node.Endpoint] = struct{}{} } if bootstrap != "" { masters[bootstrap] = struct{}{} } else if len(masters) == 0 { return errors.New("no masters configured") } for m := range masters { logger := log.With().Str("host", m).Logger() logger.Debug().Msg("trying to join via master") nodes, err := client.Post[[]types.Node](m, "/master/join", r.state.Self) if err != nil { logger.Debug().Err(err).Msg("unable to join") continue } if err := r.state.Registry.Set(*nodes); err != nil { logger.Debug().Err(err).Msg("unable to set master's nodes") continue } return nil } return errors.New("unable to join") } func (r *Role) Leave() error { masters := r.state.Registry.ByRole(types.MasterRole) if len(masters) == 0 { return nil } sent := false for _, m := range masters { logger := log.With().Str("name", m.Hostname).Logger() logger.Debug().Msg("sending leave message") _, err := client.Post[any](m.Endpoint, types.PathMasterLeave, r.state.Self) if err != nil { logger.Debug().Err(err).Msg("unable to send leave message") continue } else { sent = true logger.Debug().Msg("leave message sent") break } } if !sent { return errors.New("unable to send leave message") } return nil } func (r *Role) OnStartup(ctx context.Context) error { if r.config.KeepaliveInterval != -1 { r.keepaliveGroup.Go(r.keepaliveFunc(ctx)) } else { log.Debug().Msg("keepalive disabled") } return nil } func (r *Role) OnShutdown() error { r.keepaliveGroup.Wait() return nil } func (r *Role) keepaliveFunc(ctx context.Context) func() { sendKeepalive := func() { masters := r.state.Registry.ByRole(types.MasterRole) if len(masters) == 0 { return } sent := false for _, m := range masters { logger := log.With().Str("name", m.Hostname).Logger() logger.Debug().Msg("sending keepalive packet") if _, err := client.Post[any](m.Endpoint, types.PathMasterKeepalive, r.state.Self); err != nil { continue } else { logger.Debug().Msg("keepalive packet sent") sent = true break } } if !sent { log.Info().Msg("unable to send keepalive packet") } } return func() { for { select { case <-ctx.Done(): return case <-time.After(time.Duration(r.config.KeepaliveInterval) * time.Second): sendKeepalive() } } } } func (r *Role) onJoin(node types.Node) (bool, error) { if err := r.state.Registry.AddNode(node); err != nil { return false, err } return true, nil } func (r *Role) onLeave(node types.Node) (bool, error) { if err := r.state.Registry.RemoveNode(node.Hostname); err != nil { return false, err } return true, nil } func healthcheck() (string, error) { return "OK", nil } func (n *Role) RegisterHandlers(r types.Registrator) { r.Register(types.GetEndpoint(types.PathNodeHealthcheck, healthcheck)) r.Register(types.PostEndpoint(types.PathNodeJoin, n.onJoin)) r.Register(types.PostEndpoint(types.PathNodeLeave, n.onLeave)) }