116 lines
2.9 KiB
Go
116 lines
2.9 KiB
Go
package master
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
|
|
"git.wzray.com/homelab/hivemind/internal/config"
|
|
"git.wzray.com/homelab/hivemind/internal/roles"
|
|
"git.wzray.com/homelab/hivemind/internal/state"
|
|
"git.wzray.com/homelab/hivemind/internal/types"
|
|
"git.wzray.com/homelab/hivemind/internal/web/client"
|
|
"github.com/rs/zerolog/log"
|
|
)
|
|
|
|
type Role struct {
|
|
state *state.RuntimeState
|
|
config config.MasterConfig
|
|
tasksGroup sync.WaitGroup
|
|
observer *observer
|
|
roles.BaseRole
|
|
}
|
|
|
|
func New(state *state.RuntimeState, config config.MasterConfig) *Role {
|
|
return &Role{
|
|
state: state,
|
|
config: config,
|
|
observer: newObserver(
|
|
state,
|
|
config.ObserverInterval,
|
|
config.BackoffSeconds,
|
|
config.BackoffCount,
|
|
config.NodeTimeout,
|
|
),
|
|
}
|
|
}
|
|
|
|
func (r *Role) OnStartup(ctx context.Context) error {
|
|
r.tasksGroup.Go(func() {
|
|
r.observer.Start(ctx, func(n types.Node) error {
|
|
_, err := r.onLeave(n, true)
|
|
return err
|
|
})
|
|
})
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *Role) OnShutdown() error {
|
|
r.tasksGroup.Wait()
|
|
return nil
|
|
}
|
|
|
|
func (r *Role) notify(path types.Path, v any) {
|
|
for _, n := range r.state.Registry.ByRole(types.MasterRole) {
|
|
addr := n.Endpoint
|
|
r.tasksGroup.Go(func() {
|
|
client.Post[any](addr, path, v)
|
|
})
|
|
}
|
|
}
|
|
|
|
func (r *Role) onJoin(node types.Node, notify bool) (map[string]types.Node, error) {
|
|
if err := r.state.Registry.AddNode(node); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if notify {
|
|
r.notify(types.PathMasterEventJoin, node)
|
|
}
|
|
|
|
return r.state.Registry.AllNodes(), nil
|
|
}
|
|
|
|
func (r *Role) onLeave(node types.Node, notify bool) (bool, error) {
|
|
if err := r.state.Registry.RemoveNode(node); err != nil {
|
|
return false, err
|
|
}
|
|
|
|
if notify {
|
|
r.notify(types.PathMasterEventLeave, node)
|
|
}
|
|
|
|
return true, nil
|
|
}
|
|
|
|
func (r *Role) onKeepAlive(node types.Node, notify bool) (map[string]types.Node, error) {
|
|
r.observer.onKeepAlive(node)
|
|
|
|
if ok := r.state.Registry.Exists(node.Hostname); !ok {
|
|
// TODO: i don't like this side effect
|
|
if _, err := r.onJoin(node, true); err != nil {
|
|
log.Warn().Err(err).Msg("unable to add node to the registry from keepalive")
|
|
}
|
|
}
|
|
|
|
if notify {
|
|
r.notify(types.PathMasterEventKeepalive, node)
|
|
}
|
|
|
|
return r.state.Registry.AllNodes(), nil
|
|
}
|
|
|
|
func eventFunc[R any](fn func(types.Node, bool) (R, error), notify bool) func(types.Node) (R, error) {
|
|
return func(n types.Node) (R, error) {
|
|
return fn(n, notify)
|
|
}
|
|
}
|
|
|
|
func (c *Role) RegisterHandlers(r types.Registrator) {
|
|
r.Register(types.PostEndpoint(types.PathMasterKeepalive, eventFunc(c.onKeepAlive, true)))
|
|
r.Register(types.PostEndpoint(types.PathMasterEventKeepalive, eventFunc(c.onKeepAlive, false)))
|
|
r.Register(types.PostEndpoint(types.PathMasterJoin, eventFunc(c.onJoin, true)))
|
|
r.Register(types.PostEndpoint(types.PathMasterLeave, eventFunc(c.onLeave, true)))
|
|
r.Register(types.PostEndpoint(types.PathMasterEventJoin, eventFunc(c.onJoin, false)))
|
|
r.Register(types.PostEndpoint(types.PathMasterEventLeave, eventFunc(c.onLeave, false)))
|
|
}
|