diff --git a/.githooks/pre-commit b/.githooks/pre-commit new file mode 100755 index 0000000..519b7e4 --- /dev/null +++ b/.githooks/pre-commit @@ -0,0 +1,9 @@ +#!/bin/bash + +changed="$(gofmt -d .)" +[ -n "$changed" ] && { + echo 'Some files are not formatted' + delta <<< "$changed" + exit 1 +} +go vet ./... diff --git a/TODO.md b/TODO.md index 27f744a..bdfaf3c 100644 --- a/TODO.md +++ b/TODO.md @@ -1,5 +1,4 @@ - nginx role -- don't make roles beg to keep them alive, instead wait for a timeout since the last keepalive message and then probe them - think about choosing the master for the keepalive message (should be somewhat load-balanced) - hivemind lite should not just print `hivemind-lite` lol - different transport (maybe something like a custom binary protocol) diff --git a/internal/config/defaults.go b/internal/config/defaults.go index d0c2a30..317770d 100644 --- a/internal/config/defaults.go +++ b/internal/config/defaults.go @@ -9,9 +9,10 @@ var DefaultConfig = Config{ }, Configs: Configs{ Master: MasterConfig{ - ObserverInterval: 120, + ObserverInterval: 20, BackoffSeconds: 1, BackoffCount: 4, + NodeTimeout: 120, }, }, } diff --git a/internal/config/master.go b/internal/config/master.go index 29ca18b..fb4b266 100644 --- a/internal/config/master.go +++ b/internal/config/master.go @@ -4,9 +4,9 @@ import "errors" type MasterConfig struct { ObserverInterval int `toml:"observer_interval"` - - BackoffSeconds int `toml:"backoff_seconds"` - BackoffCount int `toml:"backoff_count"` + BackoffSeconds int `toml:"backoff_seconds"` + BackoffCount int `toml:"backoff_count"` + NodeTimeout int `toml:"node_timeout"` baseRoleConfig } @@ -24,6 +24,10 @@ func (c MasterConfig) Validate() error { return errors.New("invalid backoff_count") } + if c.NodeTimeout < 1 { + return errors.New("invalid node_timeout") + } + return nil } @@ -43,4 +47,8 @@ func (c *MasterConfig) Merge(other MasterConfig) { if other.BackoffCount != 0 { c.BackoffCount = other.BackoffCount } + + if other.NodeTimeout != 0 { + c.NodeTimeout = other.NodeTimeout + } } diff --git a/internal/config/node.go b/internal/config/node.go index 4049bfe..fbeb29d 100644 --- a/internal/config/node.go +++ b/internal/config/node.go @@ -49,7 +49,7 @@ func (c NodeConfig) Validate() error { return errors.New("missing hostname") } - if c.KeepaliveInterval < 1 && c.KeepaliveInterval != -1 { + if c.KeepaliveInterval < 1 { return errors.New("invalid keepalive_interval") } diff --git a/internal/registry/event.go b/internal/registry/event.go new file mode 100644 index 0000000..4924508 --- /dev/null +++ b/internal/registry/event.go @@ -0,0 +1,16 @@ +package registry + +import "git.wzray.com/homelab/hivemind/internal/types" + +type Event int + +const ( + EventNodeJoin Event = iota + EventNodeLeave + EventSet +) + +type RegistryEvent struct { + Event Event + Nodes map[string]types.Node +} diff --git a/internal/registry/registry.go b/internal/registry/registry.go index 17f7186..610e153 100644 --- a/internal/registry/registry.go +++ b/internal/registry/registry.go @@ -16,7 +16,7 @@ type Registry struct { storage Storage lock sync.RWMutex self types.Node - observers []chan<- []types.Node + observers []chan<- RegistryEvent } func New(storage Storage, self types.Node) *Registry { @@ -47,42 +47,35 @@ func (r *Registry) snapshot() *storedConfig { } } -func (r *Registry) notify() { - nodes := r.Nodes() +func (r *Registry) notify(event RegistryEvent) { for _, c := range r.observers { - c <- nodes + c <- event } } -func (r *Registry) AllNodes() []types.Node { +func (r *Registry) AllNodes() map[string]types.Node { r.lock.RLock() defer r.lock.RUnlock() - - nodes := make([]types.Node, 0, len(r.nodes)) - for _, n := range r.nodes { - nodes = append(nodes, n) - } - return nodes + return maps.Clone(r.nodes) } -func (r *Registry) Nodes() []types.Node { +func (r *Registry) Nodes() map[string]types.Node { nodes := r.AllNodes() - nodes = slices.DeleteFunc(nodes, func(n types.Node) bool { - return n.Hostname == r.self.Hostname - }) + delete(nodes, r.self.Hostname) return nodes } -func (r *Registry) ByRole(role types.Role) []types.Node { +func (r *Registry) ByRole(role types.Role) map[string]types.Node { r.lock.RLock() defer r.lock.RUnlock() - o := make([]types.Node, 0, len(r.nodes)) - for _, node := range r.nodes { + o := make(map[string]types.Node) + for name, node := range r.nodes { if slices.Contains(node.Roles, role) && node.Hostname != r.self.Hostname { - o = append(o, node) + o[name] = node } } + return o } @@ -96,12 +89,20 @@ func (r *Registry) AddNode(node types.Node) error { if err := r.storage.Save(snapshot); err != nil { return err } + + r.notify(RegistryEvent{ + EventNodeJoin, + map[string]types.Node{ + node.Hostname: node, + }, + }) + return nil } -func (r *Registry) RemoveNode(nodeName string) error { +func (r *Registry) RemoveNode(node types.Node) error { r.lock.Lock() - delete(r.nodes, nodeName) + delete(r.nodes, node.Hostname) r.LastUpdate = time.Now() snapshot := r.snapshot() r.lock.Unlock() @@ -110,17 +111,19 @@ func (r *Registry) RemoveNode(nodeName string) error { return err } - r.notify() + r.notify(RegistryEvent{ + EventNodeLeave, + map[string]types.Node{ + node.Hostname: node, + }, + }) return nil } -func (r *Registry) Set(nodes []types.Node) error { +func (r *Registry) Set(nodes map[string]types.Node) error { r.lock.Lock() - r.nodes = make(map[string]types.Node) - for _, n := range nodes { - r.nodes[n.Hostname] = n - } + r.nodes = maps.Clone(nodes) snapshot := r.snapshot() r.lock.Unlock() @@ -128,7 +131,10 @@ func (r *Registry) Set(nodes []types.Node) error { return err } - r.notify() + r.notify(RegistryEvent{ + EventSet, + nodes, + }) return nil } @@ -138,8 +144,8 @@ func (r *Registry) Exists(name string) bool { return ok } -func (r *Registry) OnChanged() <-chan []types.Node { // TODO: rename this - c := make(chan []types.Node, 1) +func (r *Registry) Subscribe() <-chan RegistryEvent { // TODO: rename this + c := make(chan RegistryEvent, 1) r.observers = append(r.observers, c) return c } diff --git a/internal/roles/dns/dns.go b/internal/roles/dns/dns.go index d527c60..6d01deb 100644 --- a/internal/roles/dns/dns.go +++ b/internal/roles/dns/dns.go @@ -59,7 +59,7 @@ func (r *Role) OnStartup(ctx context.Context) error { r.syncFromRegistry() }) - c := r.state.Registry.OnChanged() + c := r.state.Registry.Subscribe() r.group.Go(func() { for { select { diff --git a/internal/roles/master/master.go b/internal/roles/master/master.go index 6d2884b..dee25e3 100644 --- a/internal/roles/master/master.go +++ b/internal/roles/master/master.go @@ -28,6 +28,7 @@ func New(state *state.RuntimeState, config config.MasterConfig) *Role { config.ObserverInterval, config.BackoffSeconds, config.BackoffCount, + config.NodeTimeout, ), } } @@ -57,7 +58,7 @@ func (r *Role) notify(path types.Path, v any) { } } -func (r *Role) onJoin(node types.Node) ([]types.Node, error) { +func (r *Role) onJoin(node types.Node) (map[string]types.Node, error) { if err := r.state.Registry.AddNode(node); err != nil { return nil, err } @@ -68,7 +69,7 @@ func (r *Role) onJoin(node types.Node) ([]types.Node, error) { } func (r *Role) onLeave(node types.Node) (bool, error) { - if err := r.state.Registry.RemoveNode(node.Hostname); err != nil { + if err := r.state.Registry.RemoveNode(node); err != nil { return false, err } @@ -78,6 +79,8 @@ func (r *Role) onLeave(node types.Node) (bool, error) { } func (r *Role) onKeepAlive(node types.Node) (bool, error) { + r.observer.onKeepAlive(node) + if ok := r.state.Registry.Exists(node.Hostname); !ok { _, err := r.onJoin(node) return true, err diff --git a/internal/roles/master/observer.go b/internal/roles/master/observer.go index 46153b2..4d07a98 100644 --- a/internal/roles/master/observer.go +++ b/internal/roles/master/observer.go @@ -2,8 +2,10 @@ package master import ( "context" + "sync" "time" + "git.wzray.com/homelab/hivemind/internal/registry" "git.wzray.com/homelab/hivemind/internal/state" "git.wzray.com/homelab/hivemind/internal/types" "git.wzray.com/homelab/hivemind/internal/web/client" @@ -15,6 +17,9 @@ type observer struct { interval int backoff int backoffCount int + nodeTimeout int64 + keepalives map[string]int64 + lock sync.RWMutex } func newObserver( @@ -22,17 +27,35 @@ func newObserver( interval int, backoff int, backoffCount int, + nodeTimeout int, ) *observer { return &observer{ state: state, interval: interval, backoff: backoff, backoffCount: backoffCount, + nodeTimeout: int64(nodeTimeout) * 1000, + keepalives: make(map[string]int64), } } func (o *observer) pollNodes(ctx context.Context, onLeave func(types.Node) error) { - for _, n := range o.state.Registry.Nodes() { + nodes := o.state.Registry.Nodes() + toCheck := make([]types.Node, 0, len(nodes)) + now := time.Now().UnixMilli() + + for name, last := range o.keepalives { + if name == o.state.Self.Hostname { + continue + } + + if now-last > o.nodeTimeout { + toCheck = append(toCheck, nodes[name]) + } + } + + // TODO: think about this + for _, n := range toCheck { name := n.Hostname logger := log.With().Str("name", name).Logger() logger.Debug().Msg("checking node") @@ -71,9 +94,42 @@ func (o *observer) pollNodes(ctx context.Context, onLeave func(types.Node) error } } +func (o *observer) onKeepAlive(node types.Node) { + o.lock.Lock() + defer o.lock.Unlock() + o.keepalives[node.Hostname] = time.Now().UnixMilli() +} + +func (o *observer) onEvent(event registry.RegistryEvent) { + o.lock.Lock() + defer o.lock.Unlock() + + switch event.Event { + case registry.EventNodeJoin: + for _, node := range event.Nodes { + o.keepalives[node.Hostname] = time.Now().UnixMilli() + } + case registry.EventNodeLeave: + for _, node := range event.Nodes { + delete(o.keepalives, node.Hostname) + } + case registry.EventSet: + o.keepalives = make(map[string]int64) + now := time.Now().UnixMilli() + + for _, node := range event.Nodes { + o.keepalives[node.Hostname] = now + } + } +} + func (o *observer) Start(ctx context.Context, onLeave func(types.Node) error) { + registryEvents := o.state.Registry.Subscribe() + for { select { + case n := <-registryEvents: + o.onEvent(n) case <-ctx.Done(): return case <-time.After(time.Duration(o.interval) * time.Second): diff --git a/internal/roles/node/node.go b/internal/roles/node/node.go index 5bc113e..a09355b 100644 --- a/internal/roles/node/node.go +++ b/internal/roles/node/node.go @@ -46,7 +46,7 @@ func (r *Role) Join(bootstrap string) error { logger := log.With().Str("host", m).Logger() logger.Debug().Msg("trying to join via master") - nodes, err := client.Post[[]types.Node](m, "/master/join", r.state.Self) + nodes, err := client.Post[map[string]types.Node](m, types.PathMasterJoin, r.state.Self) if err != nil { errs = append(errs, err) logger.Debug().Err(err).Msg("unable to join") @@ -91,11 +91,7 @@ func (r *Role) Leave() error { } func (r *Role) OnStartup(ctx context.Context) error { - if r.config.KeepaliveInterval != -1 { - r.keepaliveGroup.Go(r.keepaliveFunc(ctx)) - } else { - log.Debug().Msg("keepalive disabled") - } + r.keepaliveGroup.Go(r.keepaliveFunc(ctx)) return nil } @@ -107,28 +103,16 @@ func (r *Role) OnShutdown() error { func (r *Role) keepaliveFunc(ctx context.Context) func() { sendKeepalive := func() { - masters := r.state.Registry.ByRole(types.MasterRole) - if len(masters) == 0 { - return - } - - sent := false - for _, m := range masters { + for _, m := range r.state.Registry.ByRole(types.MasterRole) { logger := log.With().Str("name", m.Hostname).Logger() logger.Debug().Msg("sending keepalive packet") if _, err := client.Post[any](m.Endpoint, types.PathMasterKeepalive, r.state.Self); err != nil { - continue + logger.Info().Err(err).Msg("unable to send keepalive packet") } else { logger.Debug().Msg("keepalive packet sent") - sent = true - break } } - - if !sent { - log.Info().Msg("unable to send keepalive packet") - } } return func() { @@ -151,7 +135,7 @@ func (r *Role) onJoin(node types.Node) (bool, error) { } func (r *Role) onLeave(node types.Node) (bool, error) { - if err := r.state.Registry.RemoveNode(node.Hostname); err != nil { + if err := r.state.Registry.RemoveNode(node); err != nil { return false, err } return true, nil diff --git a/internal/web/client/client.go b/internal/web/client/client.go index a6444c8..aa98ad1 100644 --- a/internal/web/client/client.go +++ b/internal/web/client/client.go @@ -100,10 +100,12 @@ func request[Out any, In any](method string, host string, path types.Path, data return &out.Data, err } -func Get[Out any](host string, path types.Path) (*Out, error) { // TODO: out should not be a pointer +// TODO: out should not be a pointer +func Get[Out any](host string, path types.Path) (*Out, error) { return request[Out, any](http.MethodGet, host, path, nil) } +// TODO: out should not be a pointer func Post[Out any, In any](host string, path types.Path, data In) (*Out, error) { return request[Out](http.MethodPost, host, path, data) }