1
0
Fork 0
hivemind/internal/roles/node/node.go

168 lines
3.6 KiB
Go

package node
import (
"context"
"errors"
"sync"
"time"
"git.wzray.com/homelab/hivemind/internal/config"
"git.wzray.com/homelab/hivemind/internal/state"
"git.wzray.com/homelab/hivemind/internal/types"
"git.wzray.com/homelab/hivemind/internal/web/client"
"github.com/rs/zerolog/log"
)
type Role struct {
state *state.RuntimeState
keepaliveGroup sync.WaitGroup
config config.NodeConfig
}
func New(state *state.RuntimeState, config config.NodeConfig) *Role {
return &Role{
state: state,
config: config,
}
}
func (r *Role) Join(bootstrap string) error {
masters := make(map[string]struct{})
for _, node := range r.state.Registry.ByRole(types.MasterRole) {
if node.Hostname == r.state.Self.Hostname {
continue
}
masters[node.Endpoint] = struct{}{}
}
if bootstrap != "" {
masters[bootstrap] = struct{}{}
} else if len(masters) == 0 {
return errors.New("no masters configured")
}
for m := range masters {
logger := log.With().Str("host", m).Logger()
logger.Debug().Msg("trying to join via master")
nodes, err := client.Post[[]types.Node](m, "/master/join", r.state.Self)
if err != nil {
logger.Debug().Err(err).Msg("unable to join")
continue
}
if err := r.state.Registry.Set(*nodes); err != nil {
logger.Debug().Err(err).Msg("unable to set master's nodes")
continue
}
return nil
}
return errors.New("unable to join")
}
func (r *Role) Leave() error {
masters := r.state.Registry.ByRole(types.MasterRole)
if len(masters) == 0 {
return nil
}
sent := false
for _, m := range masters {
logger := log.With().Str("name", m.Hostname).Logger()
logger.Debug().Msg("sending leave message")
_, err := client.Post[any](m.Endpoint, types.PathMasterLeave, r.state.Self)
if err != nil {
logger.Debug().Err(err).Msg("unable to send leave message")
continue
} else {
sent = true
logger.Debug().Msg("leave message sent")
break
}
}
if !sent {
return errors.New("unable to send leave message")
}
return nil
}
func (r *Role) OnStartup(ctx context.Context) error {
if r.config.KeepaliveInterval != -1 {
r.keepaliveGroup.Go(r.keepaliveFunc(ctx))
} else {
log.Debug().Msg("keepalive disabled")
}
return nil
}
func (r *Role) OnShutdown() error {
r.keepaliveGroup.Wait()
return nil
}
func (r *Role) keepaliveFunc(ctx context.Context) func() {
sendKeepalive := func() {
masters := r.state.Registry.ByRole(types.MasterRole)
if len(masters) == 0 {
return
}
sent := false
for _, m := range masters {
logger := log.With().Str("name", m.Hostname).Logger()
logger.Debug().Msg("sending keepalive packet")
if _, err := client.Post[any](m.Endpoint, types.PathMasterKeepalive, r.state.Self); err != nil {
continue
} else {
logger.Debug().Msg("keepalive packet sent")
sent = true
break
}
}
if !sent {
log.Info().Msg("unable to send keepalive packet")
}
}
return func() {
for {
select {
case <-ctx.Done():
return
case <-time.After(time.Duration(r.config.KeepaliveInterval) * time.Second):
sendKeepalive()
}
}
}
}
func (r *Role) onJoin(node types.Node) (bool, error) {
if err := r.state.Registry.AddNode(node); err != nil {
return false, err
}
return true, nil
}
func (r *Role) onLeave(node types.Node) (bool, error) {
if err := r.state.Registry.RemoveNode(node.Hostname); err != nil {
return false, err
}
return true, nil
}
func healthcheck() (string, error) {
return "OK", nil
}
func (n *Role) RegisterHandlers(r types.Registrator) {
r.Register(types.GetEndpoint(types.PathNodeHealthcheck, healthcheck))
r.Register(types.PostEndpoint(types.PathNodeJoin, n.onJoin))
r.Register(types.PostEndpoint(types.PathNodeLeave, n.onLeave))
}