package master import ( "context" "sync" "git.wzray.com/homelab/hivemind/internal/app" "git.wzray.com/homelab/hivemind/internal/config" "git.wzray.com/homelab/hivemind/internal/roles" "git.wzray.com/homelab/hivemind/internal/transport" "git.wzray.com/homelab/hivemind/internal/transport/master" "git.wzray.com/homelab/hivemind/internal/types" "github.com/rs/zerolog/log" ) type Role struct { state *app.State config config.MasterConfig tasksGroup sync.WaitGroup observer *observer roles.BaseRole } func New(state *app.State, config config.MasterConfig) *Role { return &Role{ state: state, config: config, observer: newObserver( state, config.ObserverInterval, config.BackoffSeconds, config.BackoffCount, config.NodeTimeout, ), } } func (r *Role) OnStartup(ctx context.Context) error { r.tasksGroup.Go(func() { r.observer.Start(ctx, func(n types.Node) error { _, err := r.onLeave(n, true) return err }) }) return nil } func (r *Role) OnShutdown() error { r.tasksGroup.Wait() return nil } func (c *Role) RegisterHandlers(r transport.Registrator) { master.Register(r, c) } func (r *Role) Heartbeat(node types.Node) (types.Nodes, error) { return r.onKeepAlive(node, true) } func (r *Role) Join(node types.Node) (types.Nodes, error) { return r.onJoin(node, true) } func (r *Role) Leave(node types.Node) error { _, err := r.onLeave(node, true) return err } func (r *Role) EventHeartbeat(node types.Node) (types.Nodes, error) { return r.onKeepAlive(node, false) } func (r *Role) EventJoin(node types.Node) (types.Nodes, error) { return r.onJoin(node, false) } func (r *Role) EventLeave(node types.Node) error { _, err := r.onLeave(node, false) return err } func (r *Role) onJoin(node types.Node, notify bool) (map[string]types.Node, error) { if err := r.state.Registry.AddNode(node); err != nil { return nil, err } if notify { propagate(r, noReturn(r.state.Clients.Master.EventJoin), node) } return r.state.Registry.AllNodes(), nil } func (r *Role) onLeave(node types.Node, notify bool) (bool, error) { if err := r.state.Registry.RemoveNode(node); err != nil { return false, err } if notify { propagate(r, r.state.Clients.Master.EventLeave, node) } return true, nil } func (r *Role) onKeepAlive(node types.Node, notify bool) (map[string]types.Node, error) { r.observer.onKeepAlive(node) if ok := r.state.Registry.Exists(node.Hostname); !ok { // TODO: i don't like this side effect if _, err := r.onJoin(node, true); err != nil { log.Warn().Err(err).Msg("unable to add node to the registry from keepalive") } } if notify { propagate(r, noReturn(r.state.Clients.Master.EventHeartbeat), node) } return r.state.Registry.AllNodes(), nil } func eventFunc[R any](fn func(types.Node, bool) (R, error), notify bool) func(types.Node) (R, error) { return func(n types.Node) (R, error) { return fn(n, notify) } } func noReturn[T, V any](fn func(string, T) (V, error)) func(string, T) error { return func(s string, t T) error { _, err := fn(s, t) return err } } func propagate[T any](r *Role, handler func(string, T) error, v T) { for _, n := range r.state.Registry.ByRole(types.MasterRole) { addr := n.Endpoint r.tasksGroup.Go(func() { handler(addr, v) }) } }