From 7fb90dd1dae39c4dd1a254ecddad9c4f41029371 Mon Sep 17 00:00:00 2001 From: "Arthur K." Date: Mon, 19 Jan 2026 23:39:17 +0300 Subject: [PATCH 1/3] doc: add todos --- TODO.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/TODO.md b/TODO.md index bdfaf3c..47c0fea 100644 --- a/TODO.md +++ b/TODO.md @@ -1,3 +1,5 @@ +- auth middleware lol +- move request logging out of the request handling into a middleware - nginx role - think about choosing the master for the keepalive message (should be somewhat load-balanced) - hivemind lite should not just print `hivemind-lite` lol From 476c4b056f7908a70878d276ef252bfe3825e839 Mon Sep 17 00:00:00 2001 From: "Arthur K." Date: Thu, 22 Jan 2026 16:02:44 +0300 Subject: [PATCH 2/3] fix: don't spam nodes with updates and instead pull the registry on keepalive --- TODO.md | 53 +++++++++++++++++++++++++++++++++ go.mod | 6 ++-- internal/roles/master/master.go | 46 ++++++++++++++++++++-------- internal/roles/node/node.go | 32 ++++++++------------ internal/types/web.go | 11 +++---- 5 files changed, 108 insertions(+), 40 deletions(-) diff --git a/TODO.md b/TODO.md index 47c0fea..68b1a09 100644 --- a/TODO.md +++ b/TODO.md @@ -1,3 +1,56 @@ +# Background +Some background first: +the node can have multiple roles +this includes (but not limited to) +* Host (can generate events) +* DNS (can consume the events and act on them) +* Something else that I might come up with (the architecture has to be expandable) + +# Control pane (3+ nodes) +* Quorum + * Consists of $n / 2 + 1$ nodes + * Cluster is considered "degraded" if no quorum can be created +* Stores an event log + * **Only** leader can append to the log (with quorum permission) +* Membership authority + * No joins without quorum approval + * Leaves are not propagated without quorum +* Manages epoch (useful for GC) + * Node $N$ with $N.epoch != cluster.epoch$ can **not** join the cluster, and has to re-join (bootstrap) +* Can (but doesn't have to) be a bootstrap point + +# Membership +* Membership is managed though SWIM +* Each node contains a small slice of the entire network +## Joining +Each node has an array of roles: +1. That it performs +2. That it requires to operate (can be moved out to the master, or the shared type) +3. That it needs for bootstrapping (analogous to 2.) + +Node can join via a master or via other nodes +When a node requests to join, the responder makes a request to the CP and asks for a permission to add this node +* If master allows + 1. The node gets a membership digest from the CP. + 2. The node *can* be brought up to speed using it's neighbors from 1. + 3. Node join event gets broadcasted over SWIM gossiping +* Otherwise, nothing happens + +# Host node +## Bootstrap +Host node requests `dns` nodes on join (and other node types, such as `ns`, `nginx`, etc... They should really be called something like `dns_processor`, and the internals (how it processes the dns) should not be visible to the cluster, but that's a task for a future me) +When a new update occurs, it sends the update to *some* `dns` hosts. + +# DNS node +## Bootstrap +First, it gets all the available `hosts` from the CP +Then it requests their configs and sets map[hostName]seq accordingly +## Simple join (when other nodes exist) +It requests it's config from other nodes and that's it + + + +# Minor To-Do - auth middleware lol - move request logging out of the request handling into a middleware - nginx role diff --git a/go.mod b/go.mod index eceeb1b..53b5b0a 100644 --- a/go.mod +++ b/go.mod @@ -2,10 +2,12 @@ module git.wzray.com/homelab/hivemind go 1.25.5 -require github.com/rs/zerolog v1.34.0 +require ( + github.com/rs/zerolog v1.34.0 + github.com/BurntSushi/toml v1.6.0 +) require ( - github.com/BurntSushi/toml v1.6.0 // indirect github.com/mattn/go-colorable v0.1.14 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/pkg/errors v0.9.1 // indirect diff --git a/internal/roles/master/master.go b/internal/roles/master/master.go index dee25e3..9987c69 100644 --- a/internal/roles/master/master.go +++ b/internal/roles/master/master.go @@ -9,6 +9,7 @@ import ( "git.wzray.com/homelab/hivemind/internal/state" "git.wzray.com/homelab/hivemind/internal/types" "git.wzray.com/homelab/hivemind/internal/web/client" + "github.com/rs/zerolog/log" ) type Role struct { @@ -36,7 +37,7 @@ func New(state *state.RuntimeState, config config.MasterConfig) *Role { func (r *Role) OnStartup(ctx context.Context) error { r.tasksGroup.Go(func() { r.observer.Start(ctx, func(n types.Node) error { - _, err := r.onLeave(n) + _, err := r.onLeave(n, true) return err }) }) @@ -50,7 +51,7 @@ func (r *Role) OnShutdown() error { } func (r *Role) notify(path types.Path, v any) { - for _, n := range r.state.Registry.Nodes() { + for _, n := range r.state.Registry.ByRole(types.MasterRole) { addr := n.Endpoint r.tasksGroup.Go(func() { client.Post[any](addr, path, v) @@ -58,39 +59,58 @@ func (r *Role) notify(path types.Path, v any) { } } -func (r *Role) onJoin(node types.Node) (map[string]types.Node, error) { +func (r *Role) onJoin(node types.Node, notify bool) (map[string]types.Node, error) { if err := r.state.Registry.AddNode(node); err != nil { return nil, err } - r.notify(types.PathNodeJoin, node) + if notify { + r.notify(types.PathMasterEventJoin, node) + } return r.state.Registry.AllNodes(), nil } -func (r *Role) onLeave(node types.Node) (bool, error) { +func (r *Role) onLeave(node types.Node, notify bool) (bool, error) { if err := r.state.Registry.RemoveNode(node); err != nil { return false, err } - r.notify(types.PathNodeLeave, node) + if notify { + r.notify(types.PathMasterEventLeave, node) + } return true, nil } -func (r *Role) onKeepAlive(node types.Node) (bool, error) { +func (r *Role) onKeepAlive(node types.Node, notify bool) (map[string]types.Node, error) { r.observer.onKeepAlive(node) if ok := r.state.Registry.Exists(node.Hostname); !ok { - _, err := r.onJoin(node) - return true, err + // TODO: i don't like this side effect + if _, err := r.onJoin(node, true); err != nil { + log.Warn().Err(err).Msg("unable to add node to the registry from keepalive") + } } - return false, nil + if notify { + r.notify(types.PathMasterEventKeepalive, node) + } + + return r.state.Registry.AllNodes(), nil +} + +func eventFunc[R any](fn func(types.Node, bool) (R, error), notify bool) func(types.Node) (R, error) { + return func(n types.Node) (R, error) { + return fn(n, notify) + } } func (c *Role) RegisterHandlers(r types.Registrator) { - r.Register(types.PostEndpoint(types.PathMasterJoin, c.onJoin)) - r.Register(types.PostEndpoint(types.PathMasterLeave, c.onLeave)) - r.Register(types.PostEndpoint(types.PathMasterKeepalive, c.onKeepAlive)) + r.Register(types.PostEndpoint(types.PathMasterKeepalive, eventFunc(c.onKeepAlive, true))) + r.Register(types.PostEndpoint(types.PathMasterEventKeepalive, eventFunc(c.onKeepAlive, false))) + r.Register(types.PostEndpoint(types.PathMasterJoin, eventFunc(c.onJoin, true))) + r.Register(types.PostEndpoint(types.PathMasterLeave, eventFunc(c.onLeave, true))) + r.Register(types.PostEndpoint(types.PathMasterEventJoin, eventFunc(c.onJoin, false))) + r.Register(types.PostEndpoint(types.PathMasterEventLeave, eventFunc(c.onLeave, false))) } diff --git a/internal/roles/node/node.go b/internal/roles/node/node.go index a09355b..0a04fe8 100644 --- a/internal/roles/node/node.go +++ b/internal/roles/node/node.go @@ -92,7 +92,6 @@ func (r *Role) Leave() error { func (r *Role) OnStartup(ctx context.Context) error { r.keepaliveGroup.Go(r.keepaliveFunc(ctx)) - return nil } @@ -107,11 +106,20 @@ func (r *Role) keepaliveFunc(ctx context.Context) func() { logger := log.With().Str("name", m.Hostname).Logger() logger.Debug().Msg("sending keepalive packet") - if _, err := client.Post[any](m.Endpoint, types.PathMasterKeepalive, r.state.Self); err != nil { + nodes, err := client.Post[map[string]types.Node](m.Endpoint, types.PathMasterKeepalive, r.state.Self) + if err != nil { logger.Info().Err(err).Msg("unable to send keepalive packet") - } else { - logger.Debug().Msg("keepalive packet sent") + continue } + + logger.Debug().Msg("keepalive packet sent") + + if err := r.state.Registry.Set(*nodes); err != nil { + logger.Warn().Err(err).Msg("unable to set masters nodes") + continue + } + + break } } @@ -127,26 +135,10 @@ func (r *Role) keepaliveFunc(ctx context.Context) func() { } } -func (r *Role) onJoin(node types.Node) (bool, error) { - if err := r.state.Registry.AddNode(node); err != nil { - return false, err - } - return true, nil -} - -func (r *Role) onLeave(node types.Node) (bool, error) { - if err := r.state.Registry.RemoveNode(node); err != nil { - return false, err - } - return true, nil -} - func healthcheck() (string, error) { return "OK", nil } func (n *Role) RegisterHandlers(r types.Registrator) { r.Register(types.GetEndpoint(types.PathNodeHealthcheck, healthcheck)) - r.Register(types.PostEndpoint(types.PathNodeJoin, n.onJoin)) - r.Register(types.PostEndpoint(types.PathNodeLeave, n.onLeave)) } diff --git a/internal/types/web.go b/internal/types/web.go index 1416d6a..97d86f2 100644 --- a/internal/types/web.go +++ b/internal/types/web.go @@ -15,13 +15,14 @@ func (p Path) String() string { } const ( - PathMasterJoin Path = "/master/join" - PathMasterLeave Path = "/master/leave" - PathMasterKeepalive Path = "/master/keepalive" + PathMasterJoin Path = "/master/join" + PathMasterLeave Path = "/master/leave" + PathMasterKeepalive Path = "/master/keepalive" + PathMasterEventJoin Path = "/master/event_join" + PathMasterEventLeave Path = "/master/event_leave" + PathMasterEventKeepalive Path = "/master/event_keepalive" PathNodeHealthcheck Path = "/node/healthcheck" - PathNodeJoin Path = "/node/join" - PathNodeLeave Path = "/node/leave" PathDnsCallback Path = "/dns/callback" From 0448f66ab272487d7195297bcdf00d83f3c90400 Mon Sep 17 00:00:00 2001 From: "Arthur K." Date: Fri, 23 Jan 2026 09:56:01 +0300 Subject: [PATCH 3/3] refactor: move http api to a new transport layer --- .githooks/pre-commit | 1 + Dockerfile | 1 + cmd/hivemind/main.go | 31 +++--- internal/app/state.go | 35 +++++++ internal/config/defaults.go | 3 + internal/config/host.go | 9 ++ internal/config/node.go | 6 +- internal/roles/dns/dns.go | 85 ++++++++-------- internal/roles/host/gateway.go | 96 ++++++++++++++++++ internal/roles/host/host.go | 63 ++++++------ internal/roles/host/http.go | 57 ----------- internal/roles/host/types.go | 76 ++++++++------- internal/roles/master/master.go | 71 ++++++++++---- internal/roles/master/observer.go | 9 +- internal/roles/node/node.go | 52 +++++----- internal/roles/role.go | 6 +- internal/state/runtime.go | 18 ---- internal/transport/caller.go | 5 + internal/transport/codec/codec.go | 16 +++ internal/transport/codec/json.go | 15 +++ internal/transport/dns/client.go | 20 ++++ internal/transport/dns/handlers.go | 14 +++ internal/transport/dns/routes.go | 10 ++ internal/transport/helpers.go | 9 ++ internal/transport/host/client.go | 24 +++++ internal/transport/host/handlers.go | 16 +++ internal/transport/host/routes.go | 11 +++ internal/transport/master/client.go | 40 ++++++++ internal/transport/master/handlers.go | 26 +++++ internal/transport/master/routes.go | 16 +++ internal/transport/node/client.go | 17 ++++ internal/transport/node/handlers.go | 13 +++ internal/transport/node/routes.go | 9 ++ internal/transport/registrator.go | 22 +++++ internal/transport/route.go | 65 +++++++++++++ internal/types/node.go | 4 +- internal/types/web.go | 135 +++++++++++--------------- internal/web/client/client.go | 71 +++++--------- internal/web/server/server.go | 18 ++-- internal/web/server/util.go | 10 +- internal/web/types.go | 7 ++ 41 files changed, 822 insertions(+), 390 deletions(-) create mode 100644 internal/app/state.go create mode 100644 internal/roles/host/gateway.go delete mode 100644 internal/state/runtime.go create mode 100644 internal/transport/caller.go create mode 100644 internal/transport/codec/codec.go create mode 100644 internal/transport/codec/json.go create mode 100644 internal/transport/dns/client.go create mode 100644 internal/transport/dns/handlers.go create mode 100644 internal/transport/dns/routes.go create mode 100644 internal/transport/helpers.go create mode 100644 internal/transport/host/client.go create mode 100644 internal/transport/host/handlers.go create mode 100644 internal/transport/host/routes.go create mode 100644 internal/transport/master/client.go create mode 100644 internal/transport/master/handlers.go create mode 100644 internal/transport/master/routes.go create mode 100644 internal/transport/node/client.go create mode 100644 internal/transport/node/handlers.go create mode 100644 internal/transport/node/routes.go create mode 100644 internal/transport/registrator.go create mode 100644 internal/transport/route.go create mode 100644 internal/web/types.go diff --git a/.githooks/pre-commit b/.githooks/pre-commit index 519b7e4..811d231 100755 --- a/.githooks/pre-commit +++ b/.githooks/pre-commit @@ -4,6 +4,7 @@ changed="$(gofmt -d .)" [ -n "$changed" ] && { echo 'Some files are not formatted' delta <<< "$changed" + go fmt ./... exit 1 } go vet ./... diff --git a/Dockerfile b/Dockerfile index b50dc45..24e18e4 100644 --- a/Dockerfile +++ b/Dockerfile @@ -15,6 +15,7 @@ RUN --mount=type=cache,target="/cache/go-build" mkdir -p /cache/go-build; make h FROM alpine EXPOSE 56714/tcp +EXPOSE 56715/tcp WORKDIR /app VOLUME /conf diff --git a/cmd/hivemind/main.go b/cmd/hivemind/main.go index 9ce289b..75dc6a2 100644 --- a/cmd/hivemind/main.go +++ b/cmd/hivemind/main.go @@ -10,6 +10,7 @@ import ( "strings" "syscall" + "git.wzray.com/homelab/hivemind/internal/app" "git.wzray.com/homelab/hivemind/internal/config" "git.wzray.com/homelab/hivemind/internal/registry" "git.wzray.com/homelab/hivemind/internal/roles" @@ -17,11 +18,14 @@ import ( "git.wzray.com/homelab/hivemind/internal/roles/host" "git.wzray.com/homelab/hivemind/internal/roles/master" "git.wzray.com/homelab/hivemind/internal/roles/node" - "git.wzray.com/homelab/hivemind/internal/state" + dns_transport "git.wzray.com/homelab/hivemind/internal/transport/dns" + host_transport "git.wzray.com/homelab/hivemind/internal/transport/host" + master_transport "git.wzray.com/homelab/hivemind/internal/transport/master" + node_transport "git.wzray.com/homelab/hivemind/internal/transport/node" "git.wzray.com/homelab/hivemind/internal/types" - "git.wzray.com/homelab/hivemind/internal/web/client" - "git.wzray.com/homelab/hivemind/internal/web/middleware" - "git.wzray.com/homelab/hivemind/internal/web/server" + web_client "git.wzray.com/homelab/hivemind/internal/web/client" + web_middleware "git.wzray.com/homelab/hivemind/internal/web/middleware" + web_server "git.wzray.com/homelab/hivemind/internal/web/server" "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/rs/zerolog/pkgerrors" @@ -87,17 +91,22 @@ func main() { filestore.EnsureExists() registry := registry.New(filestore, self) - state := state.New(registry, self) - - nodeRole := node.New(state, configuration.Node) - - var builder middleware.MiddlewareBuilder + var builder web_middleware.MiddlewareBuilder middlewares := builder.Prepare() - client.Init(middlewares) + client := web_client.New(middlewares) listenAddr := fmt.Sprintf("%v:%v", configuration.Node.ListenOn, configuration.Node.Port) - server := server.NewServer(listenAddr, middlewares) + server := web_server.New(listenAddr, middlewares) + + state := app.NewState(registry, self, app.Clients{ + Master: master_transport.New(client), + DNS: dns_transport.New(client), + Host: host_transport.New(client), + Node: node_transport.New(client), + }) + + nodeRole := node.New(state, configuration.Node) roles := make([]roles.Role, 0) roles = append(roles, nodeRole) diff --git a/internal/app/state.go b/internal/app/state.go new file mode 100644 index 0000000..74d6139 --- /dev/null +++ b/internal/app/state.go @@ -0,0 +1,35 @@ +package app + +import ( + "git.wzray.com/homelab/hivemind/internal/registry" + "git.wzray.com/homelab/hivemind/internal/transport/dns" + "git.wzray.com/homelab/hivemind/internal/transport/host" + "git.wzray.com/homelab/hivemind/internal/transport/master" + "git.wzray.com/homelab/hivemind/internal/transport/node" + "git.wzray.com/homelab/hivemind/internal/types" +) + +type Clients struct { + Master *master.Client + DNS *dns.Client + Host *host.Client + Node *node.Client +} + +type State struct { + Registry *registry.Registry + Self types.Node + Clients Clients +} + +func NewState( + registry *registry.Registry, + self types.Node, + clients Clients, +) *State { + return &State{ + Registry: registry, + Self: self, + Clients: clients, + } +} diff --git a/internal/config/defaults.go b/internal/config/defaults.go index 317770d..df4a64e 100644 --- a/internal/config/defaults.go +++ b/internal/config/defaults.go @@ -14,5 +14,8 @@ var DefaultConfig = Config{ BackoffCount: 4, NodeTimeout: 120, }, + Host: HostConfig{ + ListenAddress: "0.0.0.0:56715", + }, }, } diff --git a/internal/config/host.go b/internal/config/host.go index 8e131ad..e3b3af5 100644 --- a/internal/config/host.go +++ b/internal/config/host.go @@ -12,6 +12,7 @@ type HostConfig struct { LocalAddress string `toml:"local_address"` InternalEntrypoint string `toml:"internal_entrypoint"` ExternalEntrypoint string `toml:"external_entrypoint"` + ListenAddress string `toml:"listen_address"` baseRoleConfig } @@ -40,6 +41,10 @@ func (c HostConfig) Validate() error { return errors.New("missing external entrypoint") } + if c.ListenAddress == "" { + return errors.New("missing listen address") + } + return nil } @@ -67,4 +72,8 @@ func (c *HostConfig) Merge(other HostConfig) { if other.ExternalEntrypoint != "" { c.ExternalEntrypoint = other.ExternalEntrypoint } + + if other.ListenAddress != "" { + c.ListenAddress = other.ListenAddress + } } diff --git a/internal/config/node.go b/internal/config/node.go index fbeb29d..0ee3da0 100644 --- a/internal/config/node.go +++ b/internal/config/node.go @@ -53,8 +53,12 @@ func (c NodeConfig) Validate() error { return errors.New("invalid keepalive_interval") } + if c.ListenOn == "" { + return errors.New("missing listen_on") + } + if net.ParseIP(c.ListenOn) == nil { - return errors.New("invalid listen_on") + return fmt.Errorf("invalid listen_on: %v", c.ListenOn) } return nil diff --git a/internal/roles/dns/dns.go b/internal/roles/dns/dns.go index 6d01deb..9de4898 100644 --- a/internal/roles/dns/dns.go +++ b/internal/roles/dns/dns.go @@ -8,22 +8,23 @@ import ( "strings" "sync" + "git.wzray.com/homelab/hivemind/internal/app" "git.wzray.com/homelab/hivemind/internal/config" - "git.wzray.com/homelab/hivemind/internal/state" + "git.wzray.com/homelab/hivemind/internal/transport" + "git.wzray.com/homelab/hivemind/internal/transport/dns" "git.wzray.com/homelab/hivemind/internal/types" - "git.wzray.com/homelab/hivemind/internal/web/client" "github.com/rs/zerolog/log" ) const hostsDir = "/etc/hosts.d/" type Role struct { - state *state.RuntimeState + state *app.State config config.DnsConfig group sync.WaitGroup } -func New(state *state.RuntimeState, config config.DnsConfig) *Role { +func New(state *app.State, config config.DnsConfig) *Role { r := &Role{ state: state, config: config, @@ -32,28 +33,6 @@ func New(state *state.RuntimeState, config config.DnsConfig) *Role { return r } -func (r *Role) updateDnsmasq(filename string, data []byte) error { - if err := os.WriteFile(filename, data, 0644); err != nil { - return fmt.Errorf("write endpoint file %q: %w", filename, err) - } - - if err := r.reload(); err != nil { - return fmt.Errorf("reload dnsmasq: %w", err) - } - - return nil -} - -func parseState(state types.HostState) (string, []byte) { - var builder strings.Builder - - for _, d := range state.Domains { - builder.WriteString(fmt.Sprintf("%s %s\n", state.Address, d)) - } - - return hostsDir + state.Hostname, []byte(builder.String()) -} - func (r *Role) OnStartup(ctx context.Context) error { r.group.Go(func() { r.syncFromRegistry() @@ -74,15 +53,46 @@ func (r *Role) OnStartup(ctx context.Context) error { return nil } +func (r *Role) OnShutdown() error { + r.group.Wait() + return nil +} + +func (r *Role) RegisterHandlers(rg transport.Registrator) { + dns.Register(rg, r) +} + +func (r *Role) Callback(state types.HostState) (bool, error) { + filename, data := parseState(state) + + if err := r.updateDnsmasq(filename, data); err != nil { + return false, err + } + + return true, nil +} + +func (r *Role) updateDnsmasq(filename string, data []byte) error { + if err := os.WriteFile(filename, data, 0644); err != nil { + return fmt.Errorf("write endpoint file %q: %w", filename, err) + } + + if err := r.reload(); err != nil { + return fmt.Errorf("reload dnsmasq: %w", err) + } + + return nil +} + func (r *Role) syncFromRegistry() { for _, n := range r.state.Registry.ByRole(types.HostRole) { - state, err := client.Get[types.HostState](n.Endpoint, types.PathHostDns) + state, err := r.state.Clients.Host.Dns(n.Endpoint) if err != nil { log.Warn().Str("name", n.Hostname).Err(err).Msg("unable to get host config") continue } - filename, data := parseState(*state) + filename, data := parseState(state) if err := r.updateDnsmasq(filename, data); err != nil { log.Warn().Str("name", n.Hostname).Err(err).Msg("unable to update dnsmasq") continue @@ -90,11 +100,6 @@ func (r *Role) syncFromRegistry() { } } -func (r *Role) OnShutdown() error { - r.group.Wait() - return nil -} - func (r *Role) reload() error { var err error @@ -107,16 +112,12 @@ func (r *Role) reload() error { return err } -func (r *Role) onCallback(state types.HostState) (bool, error) { - filename, data := parseState(state) +func parseState(state types.HostState) (string, []byte) { + var builder strings.Builder - if err := r.updateDnsmasq(filename, data); err != nil { - return false, err + for _, d := range state.Domains { + builder.WriteString(fmt.Sprintf("%s %s\n", state.Address, d)) } - return true, nil -} - -func (r *Role) RegisterHandlers(rg types.Registrator) { - rg.Register(types.PostEndpoint(types.PathDnsCallback, r.onCallback)) + return hostsDir + state.Hostname, []byte(builder.String()) } diff --git a/internal/roles/host/gateway.go b/internal/roles/host/gateway.go new file mode 100644 index 0000000..7e16807 --- /dev/null +++ b/internal/roles/host/gateway.go @@ -0,0 +1,96 @@ +package host + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/url" + + "git.wzray.com/homelab/hivemind/internal/config" + "github.com/rs/zerolog/log" +) + +type TraefikListener interface { + OnTraefikUpdate(traefikResponse) +} + +type TraefikGateway struct { + client *http.Client + server *http.Server + listener TraefikListener + address url.URL + domain string +} + +func NewTraefikGateway(cfg config.HostConfig, listener TraefikListener) *TraefikGateway { + mux := http.NewServeMux() + gw := &TraefikGateway{ + client: &http.Client{}, + + server: &http.Server{ + Addr: cfg.ListenAddress, + Handler: mux, + }, + listener: listener, + address: url.URL{ + Scheme: "http", + Host: cfg.LocalAddress, + }, + domain: cfg.Domain, + } + + mux.HandleFunc("/callback", gw.onCallback) + return gw +} + +func (g *TraefikGateway) Listen() error { + return g.server.ListenAndServe() +} + +func (g *TraefikGateway) Shutdown(ctx context.Context) error { + return g.server.Shutdown(ctx) +} + +func (g *TraefikGateway) GetRawData() (*traefikResponse, error) { + var raw TraefikRawResponse + + url := g.address + url.Path = "/api/rawdata" + + req := http.Request{ + Method: http.MethodGet, + URL: &url, + } + + req.Host = g.domain + + r, err := g.client.Do(&req) + if err != nil { + return nil, fmt.Errorf("make request: %w", err) + } + defer r.Body.Close() + + if err := json.NewDecoder(r.Body).Decode(&raw); err != nil { + return nil, fmt.Errorf("unmarshal body: %w", err) + } + + out := parseTraefikResponse(raw) + return &out, nil +} + +func (g *TraefikGateway) onCallback(w http.ResponseWriter, req *http.Request) { + var raw TraefikRawResponse + if err := json.NewDecoder(req.Body).Decode(&raw); err != nil { + w.WriteHeader(http.StatusInternalServerError) + log.Err(err).Msg("unable to decode traefik callback data") + return + } + + resp := parseTraefikResponse(raw) + if g.listener != nil { + g.listener.OnTraefikUpdate(resp) + } + + w.Write([]byte("OK")) +} diff --git a/internal/roles/host/host.go b/internal/roles/host/host.go index ca61e2f..a97a80b 100644 --- a/internal/roles/host/host.go +++ b/internal/roles/host/host.go @@ -2,36 +2,37 @@ package host import ( "context" - "encoding/json" "fmt" - "net/http" "slices" "sync" + "git.wzray.com/homelab/hivemind/internal/app" "git.wzray.com/homelab/hivemind/internal/config" - "git.wzray.com/homelab/hivemind/internal/state" + "git.wzray.com/homelab/hivemind/internal/transport" + "git.wzray.com/homelab/hivemind/internal/transport/host" "git.wzray.com/homelab/hivemind/internal/types" - "git.wzray.com/homelab/hivemind/internal/web/client" "github.com/rs/zerolog/log" ) type Role struct { - state *state.RuntimeState + state *app.State config config.HostConfig - client *traefikClient + gateway *TraefikGateway tasksGroup sync.WaitGroup externalDomains []string // TODO: i don't like hardcoding external/internal logic here internalDomains []string } -func New(state *state.RuntimeState, config config.HostConfig) *Role { - return &Role{ - client: newClient(config.Domain, config.LocalAddress), +func New(state *app.State, config config.HostConfig) *Role { + r := &Role{ state: state, config: config, } + + r.gateway = NewTraefikGateway(config, r) + return r } func (r *Role) sendUpdate(domains []string, role types.Role) { @@ -45,7 +46,7 @@ func (r *Role) sendUpdate(domains []string, role types.Role) { r.tasksGroup.Go(func() { logger := log.With().Str("name", node.Hostname).Logger() logger.Debug().Msg("sending update") - if _, err := client.Post[any](node.Endpoint, types.PathDnsCallback, state); err != nil { + if _, err := r.state.Clients.DNS.Callback(node.Endpoint, state); err != nil { logger.Warn().Err(err).Msg("unable to send dns info") } else { logger.Debug().Msg("update sent") @@ -54,7 +55,7 @@ func (r *Role) sendUpdate(domains []string, role types.Role) { } } -func (r *Role) mutateState(resp traefikResponse) { +func (r *Role) OnTraefikUpdate(resp traefikResponse) { newInternal := resp.Domains(r.config.InternalEntrypoint) newExternal := resp.Domains(r.config.ExternalEntrypoint) @@ -71,20 +72,7 @@ func (r *Role) mutateState(resp traefikResponse) { } } -func (r *Role) onCallback(w http.ResponseWriter, req *http.Request) { - var resp traefikResponse - if err := json.NewDecoder(req.Body).Decode(&resp); err != nil { - w.WriteHeader(http.StatusInternalServerError) - log.Err(err).Msg("unable to decode traefik callback data") - return - } - - r.mutateState(resp) - - w.Write([]byte("OK")) -} - -func (r *Role) getInternal() (types.HostState, error) { +func (r *Role) Dns() (types.HostState, error) { return types.HostState{ Domains: r.internalDomains, Address: r.config.IpAddress, @@ -92,7 +80,7 @@ func (r *Role) getInternal() (types.HostState, error) { }, nil } -func (r *Role) getExternal() (types.HostState, error) { +func (r *Role) Nameserver() (types.HostState, error) { return types.HostState{ Domains: r.externalDomains, Address: r.config.IpAddress, @@ -101,14 +89,25 @@ func (r *Role) getExternal() (types.HostState, error) { } -func (r *Role) RegisterHandlers(rg types.Registrator) { - rg.RegisterRaw(http.MethodPost, types.PathHostCallback.String(), r.onCallback) - rg.Register(types.GetEndpoint(types.PathHostDns, r.getInternal)) - rg.Register(types.GetEndpoint(types.PathHostNs, r.getExternal)) +func (r *Role) RegisterHandlers(rg transport.Registrator) { + host.Register(rg, r) } func (r *Role) OnStartup(ctx context.Context) error { - resp, err := r.client.GetRawData() + r.tasksGroup.Go(func() { + if err := r.gateway.Listen(); err != nil { + log.Err(err).Msg("traefik gateway stopped") + } + }) + + r.tasksGroup.Go(func() { + <-ctx.Done() + if err := r.gateway.Shutdown(context.Background()); err != nil { + log.Err(err).Msg("failed to shutdown traefik gateway") + } + }) + + resp, err := r.gateway.GetRawData() if err != nil { return fmt.Errorf("get traefik state: %w", err) } @@ -116,7 +115,7 @@ func (r *Role) OnStartup(ctx context.Context) error { log.Info().Msg("got raw data from traefik") log.Debug().Interface("response", resp).Send() - r.mutateState(*resp) + r.OnTraefikUpdate(*resp) return nil } diff --git a/internal/roles/host/http.go b/internal/roles/host/http.go index 1eb647c..cd147b7 100644 --- a/internal/roles/host/http.go +++ b/internal/roles/host/http.go @@ -1,58 +1 @@ package host - -import ( - "crypto/tls" - "encoding/json" - "fmt" - "net/http" - "net/url" -) - -type traefikClient struct { - client *http.Client - domain string - address url.URL -} - -func newClient(domain string, addr string) *traefikClient { - return &traefikClient{ - domain: domain, - address: url.URL{ - Scheme: "https", - Host: addr, - }, - client: &http.Client{ - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{ - ServerName: domain, - }, - }, - }, - } -} - -func (c *traefikClient) GetRawData() (*traefikResponse, error) { - var out traefikResponse - - url := c.address - url.Path = "/api/rawdata" - - req := http.Request{ - Method: "GET", - URL: &url, - } - - req.Host = c.domain - - r, err := c.client.Do(&req) - if err != nil { - return nil, fmt.Errorf("make request: %w", err) - } - defer r.Body.Close() - - if err := json.NewDecoder(r.Body).Decode(&out); err != nil { - return nil, fmt.Errorf("unmarshal body: %w", err) - } - - return &out, nil -} diff --git a/internal/roles/host/types.go b/internal/roles/host/types.go index ad1c9fd..e0b5e5a 100644 --- a/internal/roles/host/types.go +++ b/internal/roles/host/types.go @@ -1,64 +1,66 @@ package host import ( - "encoding/json" "regexp" "slices" ) var hostRegex = regexp.MustCompile("Host\\(`([^()`]+\\.[^()`]+)`\\)") -type rule struct { +type traefikRule struct { Raw string Domains []string Valid bool } -func (r *rule) UnmarshalJSON(data []byte) error { - r.Valid = false +type traefikRouter struct { + Rule traefikRule + Entrypoints []string +} - raw := "" - if err := json.Unmarshal(data, &raw); err != nil { - return err +type traefikResponse struct { + Routers []traefikRouter +} + +type TraefikRawResponse struct { + Routers map[string]TraefikRawRouter `json:"routers"` +} + +type TraefikRawRouter struct { + Rule string `json:"rule"` + Entrypoints []string `json:"entryPoints"` +} + +func parseTraefikResponse(raw TraefikRawResponse) traefikResponse { + out := traefikResponse{ + Routers: make([]traefikRouter, 0, len(raw.Routers)), + } + + for _, router := range raw.Routers { + out.Routers = append(out.Routers, traefikRouter{ + Rule: parseTraefikRule(router.Rule), + Entrypoints: router.Entrypoints, + }) + } + + return out +} + +func parseTraefikRule(raw string) traefikRule { + rule := traefikRule{ + Raw: raw, } matches := hostRegex.FindAllStringSubmatch(raw, -1) - for _, match := range matches { if len(match) <= 1 { continue } - r.Domains = append(r.Domains, match[1:]...) + rule.Domains = append(rule.Domains, match[1:]...) } - r.Valid = len(r.Domains) > 0 - - return nil -} - -type router struct { - Rule rule `json:"rule"` - Entrypoints []string `json:"entryPoints"` -} - -type traefikResponse struct { - Routers []router -} - -func (r *traefikResponse) UnmarshalJSON(data []byte) error { - var raw struct { - Routers map[string]router `json:"routers"` - } - - if err := json.Unmarshal(data, &raw); err != nil { - return err - } - - for _, v := range raw.Routers { - r.Routers = append(r.Routers, v) - } - - return nil + rule.Valid = len(rule.Domains) > 0 + return rule } func (r traefikResponse) Domains(entrypoint string) []string { diff --git a/internal/roles/master/master.go b/internal/roles/master/master.go index 9987c69..fef6453 100644 --- a/internal/roles/master/master.go +++ b/internal/roles/master/master.go @@ -4,23 +4,24 @@ import ( "context" "sync" + "git.wzray.com/homelab/hivemind/internal/app" "git.wzray.com/homelab/hivemind/internal/config" "git.wzray.com/homelab/hivemind/internal/roles" - "git.wzray.com/homelab/hivemind/internal/state" + "git.wzray.com/homelab/hivemind/internal/transport" + "git.wzray.com/homelab/hivemind/internal/transport/master" "git.wzray.com/homelab/hivemind/internal/types" - "git.wzray.com/homelab/hivemind/internal/web/client" "github.com/rs/zerolog/log" ) type Role struct { - state *state.RuntimeState + state *app.State config config.MasterConfig tasksGroup sync.WaitGroup observer *observer roles.BaseRole } -func New(state *state.RuntimeState, config config.MasterConfig) *Role { +func New(state *app.State, config config.MasterConfig) *Role { return &Role{ state: state, config: config, @@ -50,13 +51,34 @@ func (r *Role) OnShutdown() error { return nil } -func (r *Role) notify(path types.Path, v any) { - for _, n := range r.state.Registry.ByRole(types.MasterRole) { - addr := n.Endpoint - r.tasksGroup.Go(func() { - client.Post[any](addr, path, v) - }) - } +func (c *Role) RegisterHandlers(r transport.Registrator) { + master.Register(r, c) +} + +func (r *Role) Heartbeat(node types.Node) (types.Nodes, error) { + return r.onKeepAlive(node, true) +} + +func (r *Role) Join(node types.Node) (types.Nodes, error) { + return r.onJoin(node, true) +} + +func (r *Role) Leave(node types.Node) error { + _, err := r.onLeave(node, true) + return err +} + +func (r *Role) EventHeartbeat(node types.Node) (types.Nodes, error) { + return r.onKeepAlive(node, false) +} + +func (r *Role) EventJoin(node types.Node) (types.Nodes, error) { + return r.onJoin(node, false) +} + +func (r *Role) EventLeave(node types.Node) error { + _, err := r.onLeave(node, false) + return err } func (r *Role) onJoin(node types.Node, notify bool) (map[string]types.Node, error) { @@ -65,7 +87,7 @@ func (r *Role) onJoin(node types.Node, notify bool) (map[string]types.Node, erro } if notify { - r.notify(types.PathMasterEventJoin, node) + propagate(r, noReturn(r.state.Clients.Master.EventJoin), node) } return r.state.Registry.AllNodes(), nil @@ -77,7 +99,7 @@ func (r *Role) onLeave(node types.Node, notify bool) (bool, error) { } if notify { - r.notify(types.PathMasterEventLeave, node) + propagate(r, r.state.Clients.Master.EventLeave, node) } return true, nil @@ -94,7 +116,7 @@ func (r *Role) onKeepAlive(node types.Node, notify bool) (map[string]types.Node, } if notify { - r.notify(types.PathMasterEventKeepalive, node) + propagate(r, noReturn(r.state.Clients.Master.EventHeartbeat), node) } return r.state.Registry.AllNodes(), nil @@ -106,11 +128,18 @@ func eventFunc[R any](fn func(types.Node, bool) (R, error), notify bool) func(ty } } -func (c *Role) RegisterHandlers(r types.Registrator) { - r.Register(types.PostEndpoint(types.PathMasterKeepalive, eventFunc(c.onKeepAlive, true))) - r.Register(types.PostEndpoint(types.PathMasterEventKeepalive, eventFunc(c.onKeepAlive, false))) - r.Register(types.PostEndpoint(types.PathMasterJoin, eventFunc(c.onJoin, true))) - r.Register(types.PostEndpoint(types.PathMasterLeave, eventFunc(c.onLeave, true))) - r.Register(types.PostEndpoint(types.PathMasterEventJoin, eventFunc(c.onJoin, false))) - r.Register(types.PostEndpoint(types.PathMasterEventLeave, eventFunc(c.onLeave, false))) +func noReturn[T, V any](fn func(string, T) (V, error)) func(string, T) error { + return func(s string, t T) error { + _, err := fn(s, t) + return err + } +} + +func propagate[T any](r *Role, handler func(string, T) error, v T) { + for _, n := range r.state.Registry.ByRole(types.MasterRole) { + addr := n.Endpoint + r.tasksGroup.Go(func() { + handler(addr, v) + }) + } } diff --git a/internal/roles/master/observer.go b/internal/roles/master/observer.go index 4d07a98..fab3478 100644 --- a/internal/roles/master/observer.go +++ b/internal/roles/master/observer.go @@ -5,15 +5,14 @@ import ( "sync" "time" + "git.wzray.com/homelab/hivemind/internal/app" "git.wzray.com/homelab/hivemind/internal/registry" - "git.wzray.com/homelab/hivemind/internal/state" "git.wzray.com/homelab/hivemind/internal/types" - "git.wzray.com/homelab/hivemind/internal/web/client" "github.com/rs/zerolog/log" ) type observer struct { - state *state.RuntimeState + state *app.State interval int backoff int backoffCount int @@ -23,7 +22,7 @@ type observer struct { } func newObserver( - state *state.RuntimeState, + state *app.State, interval int, backoff int, backoffCount int, @@ -63,7 +62,7 @@ func (o *observer) pollNodes(ctx context.Context, onLeave func(types.Node) error delay := time.Duration(o.backoff) alive := false for i := o.backoffCount - 1; i >= 0; i-- { - _, err := client.Get[any](n.Endpoint, types.PathNodeHealthcheck) + _, err := o.state.Clients.Node.Healthcheck(n.Endpoint) if err == nil { logger.Debug().Msg("node is alive") diff --git a/internal/roles/node/node.go b/internal/roles/node/node.go index 0a04fe8..083a86a 100644 --- a/internal/roles/node/node.go +++ b/internal/roles/node/node.go @@ -7,26 +7,41 @@ import ( "sync" "time" + "git.wzray.com/homelab/hivemind/internal/app" "git.wzray.com/homelab/hivemind/internal/config" - "git.wzray.com/homelab/hivemind/internal/state" + "git.wzray.com/homelab/hivemind/internal/transport" + "git.wzray.com/homelab/hivemind/internal/transport/node" "git.wzray.com/homelab/hivemind/internal/types" - "git.wzray.com/homelab/hivemind/internal/web/client" "github.com/rs/zerolog/log" ) type Role struct { - state *state.RuntimeState + state *app.State keepaliveGroup sync.WaitGroup config config.NodeConfig } -func New(state *state.RuntimeState, config config.NodeConfig) *Role { +func New(state *app.State, config config.NodeConfig) *Role { return &Role{ state: state, config: config, } } +func (r *Role) OnStartup(ctx context.Context) error { + r.keepaliveGroup.Go(r.keepaliveFunc(ctx)) + return nil +} + +func (r *Role) OnShutdown() error { + r.keepaliveGroup.Wait() + return nil +} + +func (n *Role) RegisterHandlers(r transport.Registrator) { + node.Register(r, n) +} + func (r *Role) Join(bootstrap string) error { masters := make(map[string]struct{}) for _, node := range r.state.Registry.ByRole(types.MasterRole) { @@ -46,14 +61,14 @@ func (r *Role) Join(bootstrap string) error { logger := log.With().Str("host", m).Logger() logger.Debug().Msg("trying to join via master") - nodes, err := client.Post[map[string]types.Node](m, types.PathMasterJoin, r.state.Self) + nodes, err := r.state.Clients.Master.Join(m, r.state.Self) if err != nil { errs = append(errs, err) logger.Debug().Err(err).Msg("unable to join") continue } - if err := r.state.Registry.Set(*nodes); err != nil { + if err := r.state.Registry.Set(nodes); err != nil { logger.Debug().Err(err).Msg("unable to set master's nodes") errs = append(errs, err) continue @@ -76,8 +91,7 @@ func (r *Role) Leave() error { logger := log.With().Str("name", m.Hostname).Logger() logger.Debug().Msg("sending leave message") - _, err := client.Post[any](m.Endpoint, types.PathMasterLeave, r.state.Self) - if err != nil { + if err := r.state.Clients.Master.Leave(m.Endpoint, r.state.Self); err != nil { logger.Debug().Err(err).Msg("unable to send leave message") errs = append(errs, err) continue @@ -90,14 +104,8 @@ func (r *Role) Leave() error { return fmt.Errorf("unable to send leave message to any master: %w", errors.Join(errs...)) } -func (r *Role) OnStartup(ctx context.Context) error { - r.keepaliveGroup.Go(r.keepaliveFunc(ctx)) - return nil -} - -func (r *Role) OnShutdown() error { - r.keepaliveGroup.Wait() - return nil +func (r *Role) Healthcheck() (string, error) { + return "OK", nil } func (r *Role) keepaliveFunc(ctx context.Context) func() { @@ -106,7 +114,7 @@ func (r *Role) keepaliveFunc(ctx context.Context) func() { logger := log.With().Str("name", m.Hostname).Logger() logger.Debug().Msg("sending keepalive packet") - nodes, err := client.Post[map[string]types.Node](m.Endpoint, types.PathMasterKeepalive, r.state.Self) + nodes, err := r.state.Clients.Master.Heartbeat(m.Endpoint, r.state.Self) if err != nil { logger.Info().Err(err).Msg("unable to send keepalive packet") continue @@ -114,7 +122,7 @@ func (r *Role) keepaliveFunc(ctx context.Context) func() { logger.Debug().Msg("keepalive packet sent") - if err := r.state.Registry.Set(*nodes); err != nil { + if err := r.state.Registry.Set(nodes); err != nil { logger.Warn().Err(err).Msg("unable to set masters nodes") continue } @@ -134,11 +142,3 @@ func (r *Role) keepaliveFunc(ctx context.Context) func() { } } } - -func healthcheck() (string, error) { - return "OK", nil -} - -func (n *Role) RegisterHandlers(r types.Registrator) { - r.Register(types.GetEndpoint(types.PathNodeHealthcheck, healthcheck)) -} diff --git a/internal/roles/role.go b/internal/roles/role.go index 4d42f6d..e3cd5f7 100644 --- a/internal/roles/role.go +++ b/internal/roles/role.go @@ -3,18 +3,18 @@ package roles import ( "context" - "git.wzray.com/homelab/hivemind/internal/types" + "git.wzray.com/homelab/hivemind/internal/transport" ) type Role interface { - RegisterHandlers(types.Registrator) + RegisterHandlers(transport.Registrator) OnStartup(context.Context) error OnShutdown() error } type BaseRole struct{} -func (r *BaseRole) RegisterHandlers(types.Registrator) {} +func (r *BaseRole) RegisterHandlers(transport.Registrator) {} func (r *BaseRole) OnStartup(context.Context) error { return nil } diff --git a/internal/state/runtime.go b/internal/state/runtime.go deleted file mode 100644 index 8b814d2..0000000 --- a/internal/state/runtime.go +++ /dev/null @@ -1,18 +0,0 @@ -package state - -import ( - "git.wzray.com/homelab/hivemind/internal/registry" - "git.wzray.com/homelab/hivemind/internal/types" -) - -type RuntimeState struct { - Registry *registry.Registry - Self types.Node -} - -func New(r *registry.Registry, n types.Node) *RuntimeState { - return &RuntimeState{ - Registry: r, - Self: n, - } -} diff --git a/internal/transport/caller.go b/internal/transport/caller.go new file mode 100644 index 0000000..ec82499 --- /dev/null +++ b/internal/transport/caller.go @@ -0,0 +1,5 @@ +package transport + +type Caller interface { + Call(host string, path string, data any, out any) error +} diff --git a/internal/transport/codec/codec.go b/internal/transport/codec/codec.go new file mode 100644 index 0000000..0c67c16 --- /dev/null +++ b/internal/transport/codec/codec.go @@ -0,0 +1,16 @@ +package codec + +type Codec interface { + Decode(data []byte, out any) error + Encode(data any) ([]byte, error) +} + +func Decode[T any](c Codec, data []byte) (T, error) { + var out T + err := c.Decode(data, &out) + return out, err +} + +func Encode[T any](c Codec, data T) ([]byte, error) { + return c.Encode(data) +} diff --git a/internal/transport/codec/json.go b/internal/transport/codec/json.go new file mode 100644 index 0000000..9be4a82 --- /dev/null +++ b/internal/transport/codec/json.go @@ -0,0 +1,15 @@ +package codec + +import "encoding/json" + +type jsonCodec struct{} + +var JSON = jsonCodec{} + +func (jsonCodec) Decode(data []byte, out any) error { + return json.Unmarshal(data, out) +} + +func (jsonCodec) Encode(data any) ([]byte, error) { + return json.Marshal(data) +} diff --git a/internal/transport/dns/client.go b/internal/transport/dns/client.go new file mode 100644 index 0000000..43020c0 --- /dev/null +++ b/internal/transport/dns/client.go @@ -0,0 +1,20 @@ +package dns + +import ( + "git.wzray.com/homelab/hivemind/internal/transport" + "git.wzray.com/homelab/hivemind/internal/types" +) + +type Client struct { + caller transport.Caller +} + +func New(caller transport.Caller) *Client { + return &Client{ + caller: caller, + } +} + +func (c *Client) Callback(endpoint string, state types.HostState) (bool, error) { + return callbackRoute.Call(c.caller, endpoint, state) +} diff --git a/internal/transport/dns/handlers.go b/internal/transport/dns/handlers.go new file mode 100644 index 0000000..0455af7 --- /dev/null +++ b/internal/transport/dns/handlers.go @@ -0,0 +1,14 @@ +package dns + +import ( + "git.wzray.com/homelab/hivemind/internal/transport" + "git.wzray.com/homelab/hivemind/internal/types" +) + +type DnsHandlers interface { + Callback(types.HostState) (bool, error) +} + +func Register(registrator transport.Registrator, h DnsHandlers) { + callbackRoute.Register(registrator, h.Callback) +} diff --git a/internal/transport/dns/routes.go b/internal/transport/dns/routes.go new file mode 100644 index 0000000..6b505f8 --- /dev/null +++ b/internal/transport/dns/routes.go @@ -0,0 +1,10 @@ +package dns + +import ( + "git.wzray.com/homelab/hivemind/internal/transport" + "git.wzray.com/homelab/hivemind/internal/types" +) + +var ( + callbackRoute = transport.NewRoute[types.HostState, bool]("/dns/callback") +) diff --git a/internal/transport/helpers.go b/internal/transport/helpers.go new file mode 100644 index 0000000..93ca5f5 --- /dev/null +++ b/internal/transport/helpers.go @@ -0,0 +1,9 @@ +package transport + +func WithoutInput[T any](f func() (T, error)) func(struct{}) (T, error) { + return func(s struct{}) (T, error) { return f() } +} + +func WithoutOutput[T any](f func(T) error) func(T) (struct{}, error) { + return func(t T) (struct{}, error) { return struct{}{}, f(t) } +} diff --git a/internal/transport/host/client.go b/internal/transport/host/client.go new file mode 100644 index 0000000..d2580b2 --- /dev/null +++ b/internal/transport/host/client.go @@ -0,0 +1,24 @@ +package host + +import ( + "git.wzray.com/homelab/hivemind/internal/transport" + "git.wzray.com/homelab/hivemind/internal/types" +) + +type Client struct { + caller transport.Caller +} + +func New(caller transport.Caller) *Client { + return &Client{ + caller: caller, + } +} + +func (c *Client) Dns(endpoint string) (types.HostState, error) { + return dnsRoute.CallNoInput(c.caller, endpoint) +} + +func (c *Client) Nameserver(endpoint string) (types.HostState, error) { + return nsRoute.CallNoInput(c.caller, endpoint) +} diff --git a/internal/transport/host/handlers.go b/internal/transport/host/handlers.go new file mode 100644 index 0000000..81d98e0 --- /dev/null +++ b/internal/transport/host/handlers.go @@ -0,0 +1,16 @@ +package host + +import ( + "git.wzray.com/homelab/hivemind/internal/transport" + "git.wzray.com/homelab/hivemind/internal/types" +) + +type HostHandlers interface { + Dns() (types.HostState, error) + Nameserver() (types.HostState, error) +} + +func Register(registrator transport.Registrator, h HostHandlers) { + dnsRoute.Register(registrator, transport.WithoutInput(h.Dns)) + nsRoute.Register(registrator, transport.WithoutInput(h.Nameserver)) +} diff --git a/internal/transport/host/routes.go b/internal/transport/host/routes.go new file mode 100644 index 0000000..a907cb8 --- /dev/null +++ b/internal/transport/host/routes.go @@ -0,0 +1,11 @@ +package host + +import ( + "git.wzray.com/homelab/hivemind/internal/transport" + "git.wzray.com/homelab/hivemind/internal/types" +) + +var ( + dnsRoute = transport.NewRoute[struct{}, types.HostState]("/host/dns") + nsRoute = transport.NewRoute[struct{}, types.HostState]("/host/ns") +) diff --git a/internal/transport/master/client.go b/internal/transport/master/client.go new file mode 100644 index 0000000..08a78c6 --- /dev/null +++ b/internal/transport/master/client.go @@ -0,0 +1,40 @@ +package master + +import ( + "git.wzray.com/homelab/hivemind/internal/transport" + "git.wzray.com/homelab/hivemind/internal/types" +) + +type Client struct { + caller transport.Caller +} + +func New(caller transport.Caller) *Client { + return &Client{ + caller: caller, + } +} + +func (c *Client) Heartbeat(endpoint string, n types.Node) (types.Nodes, error) { + return heartbeatRoute.Call(c.caller, endpoint, n) +} + +func (c *Client) Join(endpoint string, n types.Node) (types.Nodes, error) { + return joinRoute.Call(c.caller, endpoint, n) +} + +func (c *Client) Leave(endpoint string, n types.Node) error { + return leaveRoute.CallNoOutput(c.caller, endpoint, n) +} + +func (c *Client) EventHeartbeat(endpoint string, n types.Node) (types.Nodes, error) { + return eventHeartbeatRoute.Call(c.caller, endpoint, n) +} + +func (c *Client) EventJoin(endpoint string, n types.Node) (types.Nodes, error) { + return eventJoinRoute.Call(c.caller, endpoint, n) +} + +func (c *Client) EventLeave(endpoint string, n types.Node) error { + return eventLeaveRoute.CallNoOutput(c.caller, endpoint, n) +} diff --git a/internal/transport/master/handlers.go b/internal/transport/master/handlers.go new file mode 100644 index 0000000..b67ef7b --- /dev/null +++ b/internal/transport/master/handlers.go @@ -0,0 +1,26 @@ +package master + +import ( + "git.wzray.com/homelab/hivemind/internal/transport" + "git.wzray.com/homelab/hivemind/internal/types" +) + +type MasterHandlers interface { + Heartbeat(types.Node) (types.Nodes, error) + Join(types.Node) (types.Nodes, error) + Leave(types.Node) error + + EventHeartbeat(types.Node) (types.Nodes, error) + EventJoin(types.Node) (types.Nodes, error) + EventLeave(types.Node) error +} + +func Register(registrator transport.Registrator, h MasterHandlers) { + heartbeatRoute.Register(registrator, h.Heartbeat) + joinRoute.Register(registrator, h.Join) + leaveRoute.Register(registrator, transport.WithoutOutput(h.Leave)) + + eventHeartbeatRoute.Register(registrator, h.EventHeartbeat) + eventJoinRoute.Register(registrator, h.EventJoin) + eventLeaveRoute.Register(registrator, transport.WithoutOutput(h.EventLeave)) +} diff --git a/internal/transport/master/routes.go b/internal/transport/master/routes.go new file mode 100644 index 0000000..d59b53d --- /dev/null +++ b/internal/transport/master/routes.go @@ -0,0 +1,16 @@ +package master + +import ( + "git.wzray.com/homelab/hivemind/internal/transport" + "git.wzray.com/homelab/hivemind/internal/types" +) + +var ( + heartbeatRoute = transport.NewRoute[types.Node, types.Nodes]("/master/heartbeat") + joinRoute = transport.NewRoute[types.Node, types.Nodes]("/master/join") + leaveRoute = transport.NewRoute[types.Node, struct{}]("/master/leave") + + eventHeartbeatRoute = transport.NewRoute[types.Node, types.Nodes]("/master/event/heartbeat") + eventJoinRoute = transport.NewRoute[types.Node, types.Nodes]("/master/event/join") + eventLeaveRoute = transport.NewRoute[types.Node, struct{}]("/master/event/leave") +) diff --git a/internal/transport/node/client.go b/internal/transport/node/client.go new file mode 100644 index 0000000..82f5a29 --- /dev/null +++ b/internal/transport/node/client.go @@ -0,0 +1,17 @@ +package node + +import ( + "git.wzray.com/homelab/hivemind/internal/transport" +) + +type Client struct { + caller transport.Caller +} + +func New(caller transport.Caller) *Client { + return &Client{caller: caller} +} + +func (c *Client) Healthcheck(endpoint string) (string, error) { + return healthcheckRoute.CallNoInput(c.caller, endpoint) +} diff --git a/internal/transport/node/handlers.go b/internal/transport/node/handlers.go new file mode 100644 index 0000000..f2c7228 --- /dev/null +++ b/internal/transport/node/handlers.go @@ -0,0 +1,13 @@ +package node + +import ( + "git.wzray.com/homelab/hivemind/internal/transport" +) + +type NodeHandlers interface { + Healthcheck() (string, error) +} + +func Register(registrator transport.Registrator, h NodeHandlers) { + healthcheckRoute.Register(registrator, transport.WithoutInput(h.Healthcheck)) +} diff --git a/internal/transport/node/routes.go b/internal/transport/node/routes.go new file mode 100644 index 0000000..1be2a69 --- /dev/null +++ b/internal/transport/node/routes.go @@ -0,0 +1,9 @@ +package node + +import ( + "git.wzray.com/homelab/hivemind/internal/transport" +) + +var ( + healthcheckRoute = transport.NewRoute[struct{}, string]("/node/healthcheck") +) diff --git a/internal/transport/registrator.go b/internal/transport/registrator.go new file mode 100644 index 0000000..e98c5ad --- /dev/null +++ b/internal/transport/registrator.go @@ -0,0 +1,22 @@ +package transport + +import ( + "git.wzray.com/homelab/hivemind/internal/transport/codec" +) + +type Handler struct { + path string + handler func(codec.Codec, []byte) ([]byte, error) +} + +func (h Handler) Path() string { + return h.path +} + +func (h Handler) Handle(c codec.Codec, v []byte) ([]byte, error) { + return h.handler(c, v) +} + +type Registrator interface { + Register(endpoint Handler) +} diff --git a/internal/transport/route.go b/internal/transport/route.go new file mode 100644 index 0000000..a5714eb --- /dev/null +++ b/internal/transport/route.go @@ -0,0 +1,65 @@ +package transport + +import ( + "fmt" + + "git.wzray.com/homelab/hivemind/internal/transport/codec" +) + +type route[In, Out any] struct { + path string +} + +func NewRoute[In, Out any](path string) route[In, Out] { + return route[In, Out]{ + path: path, + } +} + +func routeToHandler[In, Out any](r route[In, Out], handler func(In) (Out, error)) Handler { + return Handler{ + path: r.path, + handler: func(c codec.Codec, b []byte) ([]byte, error) { + data, err := codec.Decode[In](c, b) + if err != nil { + return nil, fmt.Errorf("unable to decode body: %w", err) + } + + out, err := handler(data) + if err != nil { + return nil, fmt.Errorf("error while handling request: %w", err) + } + + raw, err := codec.Encode(c, out) + if err != nil { + return nil, fmt.Errorf("unable to encode body: %w", err) + } + + return raw, nil + }, + } +} + +func (e route[In, Out]) Call(caller Caller, host string, data In) (Out, error) { + var out Out + err := caller.Call(host, e.path, data, &out) + return out, err +} + +func (e route[In, Out]) CallNoInput(caller Caller, host string) (Out, error) { + var out Out + err := caller.Call(host, e.path, struct{}{}, &out) + return out, err +} + +func (e route[In, Out]) CallNoOutput(caller Caller, host string, data In) error { + return caller.Call(host, e.path, data, nil) +} + +func (e route[In, Out]) Path() string { + return e.path +} + +func (e route[In, Out]) Register(r Registrator, h func(In) (Out, error)) { + r.Register(routeToHandler(e, h)) +} diff --git a/internal/types/node.go b/internal/types/node.go index d1ed308..25f937e 100644 --- a/internal/types/node.go +++ b/internal/types/node.go @@ -2,13 +2,15 @@ package types import "fmt" +type Nodes map[string]Node + // TODO: consider moving this type back to registry type Node struct { Hostname string `json:"hostname"` Address string `json:"address"` Port int `json:"port"` Roles []Role `json:"roles"` - Endpoint string `json:"endpoint"` + Endpoint string `json:"endpoint"` // TODO: make endpoint into a separate type } func NewNode( diff --git a/internal/types/web.go b/internal/types/web.go index 97d86f2..0514376 100644 --- a/internal/types/web.go +++ b/internal/types/web.go @@ -1,79 +1,60 @@ package types -import ( - "encoding/json" - "fmt" - "net/http" -) - -// TODO: split this up - -type Path string - -func (p Path) String() string { - return string(p) -} - -const ( - PathMasterJoin Path = "/master/join" - PathMasterLeave Path = "/master/leave" - PathMasterKeepalive Path = "/master/keepalive" - PathMasterEventJoin Path = "/master/event_join" - PathMasterEventLeave Path = "/master/event_leave" - PathMasterEventKeepalive Path = "/master/event_keepalive" - - PathNodeHealthcheck Path = "/node/healthcheck" - - PathDnsCallback Path = "/dns/callback" - - PathHostCallback Path = "/host/callback" - PathHostDns Path = "/host/dns" - PathHostNs Path = "/host/ns" -) - -type Response[T any] struct { - Ok bool `json:"ok"` - Data T `json:"data,omitempty"` - Err string `json:"err,omitempty"` -} - -type Route interface { - Path() string - Handle([]byte) (any, error) -} - -type endpoint struct { - path string - handler func([]byte) (any, error) -} - -func (e endpoint) Path() string { return e.path } - -func (e endpoint) Handle(v []byte) (any, error) { return e.handler(v) } - -func PostEndpoint[T any, V any](path Path, handler func(T) (V, error)) Route { - return endpoint{ - path: "POST " + path.String(), - handler: func(a []byte) (any, error) { - var r T - if err := json.Unmarshal(a, &r); err != nil { - return nil, fmt.Errorf("unable to unmarshal json: %w", err) - } - return handler(r) - }, - } -} - -func GetEndpoint[T any](path Path, handler func() (T, error)) Route { - return endpoint{ - path: "GET " + path.String(), - handler: func(a []byte) (any, error) { - return handler() - }, - } -} - -type Registrator interface { - Register(endpoint Route) - RegisterRaw(method string, pattern string, handler func(http.ResponseWriter, *http.Request)) -} +// type Path string +// +// func (p Path) String() string { +// return string(p) +// } +// +// const ( +// PathMasterJoin Path = "/master/join" +// PathMasterLeave Path = "/master/leave" +// PathMasterKeepalive Path = "/master/keepalive" +// PathMasterEventJoin Path = "/master/event_join" +// PathMasterEventLeave Path = "/master/event_leave" +// PathMasterEventKeepalive Path = "/master/event_keepalive" +// +// PathNodeHealthcheck Path = "/node/healthcheck" +// +// PathDnsCallback Path = "/dns/callback" +// +// PathHostCallback Path = "/host/callback" +// PathHostDns Path = "/host/dns" +// PathHostNs Path = "/host/ns" +// ) +// +// type Route interface { +// Path() string +// Handle([]byte) (any, error) +// } +// +// type endpoint struct { +// path string +// handler func([]byte) (any, error) +// } +// +// func (e endpoint) Path() string { return e.path } +// +// func (e endpoint) Handle(v []byte) (any, error) { return e.handler(v) } +// +// func PostEndpoint[T any, V any](path Path, handler func(T) (V, error)) Route { +// return endpoint{ +// path: "POST " + path.String(), +// handler: func(a []byte) (any, error) { +// var r T +// if err := json.Unmarshal(a, &r); err != nil { +// return nil, fmt.Errorf("unable to unmarshal json: %w", err) +// } +// return handler(r) +// }, +// } +// } +// +// func GetEndpoint[T any](path Path, handler func() (T, error)) Route { +// return endpoint{ +// path: "GET " + path.String(), +// handler: func(a []byte) (any, error) { +// return handler() +// }, +// } +// } diff --git a/internal/web/client/client.go b/internal/web/client/client.go index aa98ad1..3396f6f 100644 --- a/internal/web/client/client.go +++ b/internal/web/client/client.go @@ -10,20 +10,18 @@ import ( "net/url" "time" - "git.wzray.com/homelab/hivemind/internal/types" + "git.wzray.com/homelab/hivemind/internal/web" "git.wzray.com/homelab/hivemind/internal/web/middleware" ) -type client struct { +type Client struct { http *http.Client middleware middleware.Middleware } -var defaultClient *client - const timeout = time.Duration(2) * time.Second -func (c *client) makeRequest(method string, host string, path types.Path, data any, out any) error { +func (c *Client) Call(host string, path string, data any, out any) error { var body io.Reader if data != nil { raw, err := json.Marshal(data) @@ -36,10 +34,10 @@ func (c *client) makeRequest(method string, host string, path types.Path, data a uri := (&url.URL{ Scheme: "http", Host: host, - Path: path.String(), + Path: path, }).String() - r, err := http.NewRequest(method, uri, body) + r, err := http.NewRequest("POST", uri, body) if err != nil { return fmt.Errorf("build http request: %w", err) } @@ -52,60 +50,41 @@ func (c *client) makeRequest(method string, host string, path types.Path, data a return fmt.Errorf("apply middleware: %w", err) } - resp, err := c.http.Do(r) + httpResponse, err := c.http.Do(r) if err != nil { return fmt.Errorf("send request: %w", err) } + defer httpResponse.Body.Close() - if resp.StatusCode < 200 || resp.StatusCode >= 300 { - b, _ := io.ReadAll(resp.Body) - return fmt.Errorf("http %d: %s", resp.StatusCode, string(b)) + if httpResponse.StatusCode < 200 || httpResponse.StatusCode >= 300 { + b, _ := io.ReadAll(httpResponse.Body) + return fmt.Errorf("http %d: %s", httpResponse.StatusCode, string(b)) } - defer resp.Body.Close() if out != nil { - if err := json.NewDecoder(resp.Body).Decode(out); err != nil { - return fmt.Errorf("decode body: %w", err) + var resp web.Response[json.RawMessage] + if err := json.NewDecoder(httpResponse.Body).Decode(&resp); err != nil { + return fmt.Errorf("decode response wrapper: %w", err) + } + + if !resp.Ok { + return fmt.Errorf("error on the remote: %w", errors.New(resp.Err)) + } + + if err := json.Unmarshal(resp.Data, out); err != nil { + return fmt.Errorf("decode response body: %w", err) } } - io.Copy(io.Discard, resp.Body) + io.Copy(io.Discard, httpResponse.Body) return nil } -func Init(mw middleware.Middleware) { - if defaultClient != nil { - panic("web.client: Init called twice") - } - - defaultClient = &client{ +func New(middleware middleware.Middleware) *Client { + return &Client{ http: &http.Client{ Timeout: timeout, }, - middleware: mw, + middleware: middleware, } } - -func request[Out any, In any](method string, host string, path types.Path, data In) (*Out, error) { - out := &types.Response[Out]{} - err := defaultClient.makeRequest(method, host, path, data, out) - if err != nil { - return nil, err - } - - if !out.Ok { - return nil, errors.New(out.Err) - } - - return &out.Data, err -} - -// TODO: out should not be a pointer -func Get[Out any](host string, path types.Path) (*Out, error) { - return request[Out, any](http.MethodGet, host, path, nil) -} - -// TODO: out should not be a pointer -func Post[Out any, In any](host string, path types.Path, data In) (*Out, error) { - return request[Out](http.MethodPost, host, path, data) -} diff --git a/internal/web/server/server.go b/internal/web/server/server.go index 9ce1c53..2e579cb 100644 --- a/internal/web/server/server.go +++ b/internal/web/server/server.go @@ -5,7 +5,8 @@ import ( "io" "net/http" - "git.wzray.com/homelab/hivemind/internal/types" + "git.wzray.com/homelab/hivemind/internal/transport" + "git.wzray.com/homelab/hivemind/internal/transport/codec" "git.wzray.com/homelab/hivemind/internal/web/middleware" "github.com/rs/zerolog/log" ) @@ -15,7 +16,7 @@ type Server struct { httpServer http.Server } -func NewServer(addr string, middleware middleware.Middleware) *Server { +func New(addr string, middleware middleware.Middleware) *Server { mux := http.NewServeMux() s := &Server{ mux: mux, @@ -35,7 +36,7 @@ func (s *Server) Shutdown(ctx context.Context) error { return s.httpServer.Shutdown(ctx) } -func (s *Server) handleFunc(route types.Route) func(w http.ResponseWriter, r *http.Request) { +func (s *Server) handleFunc(route transport.Handler) func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { log.Debug(). // TODO: make this a middleware Str("method", r.Method). @@ -46,35 +47,36 @@ func (s *Server) handleFunc(route types.Route) func(w http.ResponseWriter, r *ht w.Header().Set("Content-Type", "application/json; charset=utf-8") - body, err := io.ReadAll(r.Body) + raw, err := io.ReadAll(r.Body) if err != nil { w.Write(fail("read request body: %v", err)) log.Err(err).Msg("unable to read request body") return } - raw, err := route.Handle(body) + resp, err := route.Handle(codec.JSON, raw) if err != nil { w.Write(fail("handle request: %v", err)) log.Err(err).Msg("unable to handle request") return } - data, err := ok(raw) + payload, err := ok(resp) if err != nil { w.Write(fail("marshal response: %v", err)) log.Err(err).Msg("unable to marshal response") return } - w.Write(data) + w.Write(payload) } } -func (s *Server) Register(endpoint types.Route) { +func (s *Server) Register(endpoint transport.Handler) { s.mux.HandleFunc(endpoint.Path(), s.handleFunc(endpoint)) } +// TODO: i don't think that I need this? func (s *Server) RegisterRaw(method string, pattern string, handler func(http.ResponseWriter, *http.Request)) { s.mux.HandleFunc(method+" "+pattern, handler) } diff --git a/internal/web/server/util.go b/internal/web/server/util.go index 6ba2288..34bd7d3 100644 --- a/internal/web/server/util.go +++ b/internal/web/server/util.go @@ -4,20 +4,20 @@ import ( "encoding/json" "fmt" - "git.wzray.com/homelab/hivemind/internal/types" + "git.wzray.com/homelab/hivemind/internal/web" ) func fail(format string, a ...any) []byte { - r, _ := json.Marshal(types.Response[string]{ + r, _ := json.Marshal(web.Response[string]{ Ok: false, Err: fmt.Sprintf(format, a...), }) return r } -func ok[T any](data T) ([]byte, error) { - return json.Marshal(types.Response[T]{ +func ok(data []byte) ([]byte, error) { + return json.Marshal(web.Response[json.RawMessage]{ Ok: true, - Data: data, + Data: json.RawMessage(data), }) } diff --git a/internal/web/types.go b/internal/web/types.go new file mode 100644 index 0000000..d9ee3b4 --- /dev/null +++ b/internal/web/types.go @@ -0,0 +1,7 @@ +package web + +type Response[T any] struct { + Ok bool `json:"ok"` + Data T `json:"data,omitempty"` + Err string `json:"err,omitempty"` +}