1
0
Fork 0
hivemind/internal/registry/registry.go

153 lines
2.8 KiB
Go

package registry
import (
"maps"
"slices"
"sync"
"time"
"git.wzray.com/homelab/hivemind/internal/types"
"github.com/rs/zerolog/log"
)
type Registry struct {
LastUpdate time.Time
nodes map[string]types.Node
storage Storage
lock sync.RWMutex
self types.Node
observers []chan<- []types.Node
}
func New(storage Storage, self types.Node) *Registry {
r := &Registry{
storage: storage,
nodes: make(map[string]types.Node),
self: self,
}
var storedData storedConfig
if err := storage.Load(&storedData); err != nil {
log.Warn().Err(err).Msg("unable to load registry from storage")
goto ret
}
r.LastUpdate = time.UnixMilli(storedData.LastUpdate)
r.nodes = storedData.Nodes
ret:
r.nodes[self.Hostname] = self
return r
}
func (r *Registry) snapshot() *storedConfig {
return &storedConfig{
LastUpdate: r.LastUpdate.UnixMilli(),
Nodes: maps.Clone(r.nodes),
}
}
func (r *Registry) notify() {
nodes := r.Nodes()
for _, c := range r.observers {
c <- nodes
}
}
func (r *Registry) AllNodes() []types.Node {
r.lock.RLock()
defer r.lock.RUnlock()
nodes := make([]types.Node, 0, len(r.nodes))
for _, n := range r.nodes {
nodes = append(nodes, n)
}
return nodes
}
func (r *Registry) Nodes() []types.Node {
nodes := r.AllNodes()
nodes = slices.DeleteFunc(nodes, func(n types.Node) bool {
return n.Hostname == r.self.Hostname
})
return nodes
}
func (r *Registry) ByRole(role types.Role) []types.Node {
r.lock.RLock()
defer r.lock.RUnlock()
o := make([]types.Node, 0, len(r.nodes))
for _, node := range r.nodes {
if slices.Contains(node.Roles, role) && node.Hostname != r.self.Hostname {
o = append(o, node)
}
}
return o
}
func (r *Registry) AddNode(node types.Node) error {
r.lock.Lock()
r.nodes[node.Hostname] = node
r.LastUpdate = time.Now()
snapshot := r.snapshot()
r.lock.Unlock()
if err := r.storage.Save(snapshot); err != nil {
return err
}
return nil
}
func (r *Registry) RemoveNode(nodeName string) error {
r.lock.Lock()
delete(r.nodes, nodeName)
r.LastUpdate = time.Now()
snapshot := r.snapshot()
r.lock.Unlock()
if err := r.storage.Save(snapshot); err != nil {
return err
}
r.notify()
return nil
}
func (r *Registry) Set(nodes []types.Node) error {
r.lock.Lock()
r.nodes = make(map[string]types.Node)
for _, n := range nodes {
r.nodes[n.Hostname] = n
}
snapshot := r.snapshot()
r.lock.Unlock()
if err := r.storage.Save(snapshot); err != nil {
return err
}
r.notify()
return nil
}
func (r *Registry) Exists(name string) bool {
_, ok := r.nodes[name]
return ok
}
func (r *Registry) OnChanged() <-chan []types.Node { // TODO: rename this
c := make(chan []types.Node, 1)
r.observers = append(r.observers, c)
return c
}
func (r *Registry) Save() {
r.lock.RLock()
snapshot := r.snapshot()
r.lock.RUnlock()
r.storage.Save(snapshot)
}