diff --git a/.githooks/pre-commit b/.githooks/pre-commit index 811d231..519b7e4 100755 --- a/.githooks/pre-commit +++ b/.githooks/pre-commit @@ -4,7 +4,6 @@ changed="$(gofmt -d .)" [ -n "$changed" ] && { echo 'Some files are not formatted' delta <<< "$changed" - go fmt ./... exit 1 } go vet ./... diff --git a/Dockerfile b/Dockerfile index 24e18e4..b50dc45 100644 --- a/Dockerfile +++ b/Dockerfile @@ -15,7 +15,6 @@ RUN --mount=type=cache,target="/cache/go-build" mkdir -p /cache/go-build; make h FROM alpine EXPOSE 56714/tcp -EXPOSE 56715/tcp WORKDIR /app VOLUME /conf diff --git a/TODO.md b/TODO.md index 68b1a09..bdfaf3c 100644 --- a/TODO.md +++ b/TODO.md @@ -1,58 +1,3 @@ -# Background -Some background first: -the node can have multiple roles -this includes (but not limited to) -* Host (can generate events) -* DNS (can consume the events and act on them) -* Something else that I might come up with (the architecture has to be expandable) - -# Control pane (3+ nodes) -* Quorum - * Consists of $n / 2 + 1$ nodes - * Cluster is considered "degraded" if no quorum can be created -* Stores an event log - * **Only** leader can append to the log (with quorum permission) -* Membership authority - * No joins without quorum approval - * Leaves are not propagated without quorum -* Manages epoch (useful for GC) - * Node $N$ with $N.epoch != cluster.epoch$ can **not** join the cluster, and has to re-join (bootstrap) -* Can (but doesn't have to) be a bootstrap point - -# Membership -* Membership is managed though SWIM -* Each node contains a small slice of the entire network -## Joining -Each node has an array of roles: -1. That it performs -2. That it requires to operate (can be moved out to the master, or the shared type) -3. That it needs for bootstrapping (analogous to 2.) - -Node can join via a master or via other nodes -When a node requests to join, the responder makes a request to the CP and asks for a permission to add this node -* If master allows - 1. The node gets a membership digest from the CP. - 2. The node *can* be brought up to speed using it's neighbors from 1. - 3. Node join event gets broadcasted over SWIM gossiping -* Otherwise, nothing happens - -# Host node -## Bootstrap -Host node requests `dns` nodes on join (and other node types, such as `ns`, `nginx`, etc... They should really be called something like `dns_processor`, and the internals (how it processes the dns) should not be visible to the cluster, but that's a task for a future me) -When a new update occurs, it sends the update to *some* `dns` hosts. - -# DNS node -## Bootstrap -First, it gets all the available `hosts` from the CP -Then it requests their configs and sets map[hostName]seq accordingly -## Simple join (when other nodes exist) -It requests it's config from other nodes and that's it - - - -# Minor To-Do -- auth middleware lol -- move request logging out of the request handling into a middleware - nginx role - think about choosing the master for the keepalive message (should be somewhat load-balanced) - hivemind lite should not just print `hivemind-lite` lol diff --git a/cmd/hivemind/main.go b/cmd/hivemind/main.go index 75dc6a2..9ce289b 100644 --- a/cmd/hivemind/main.go +++ b/cmd/hivemind/main.go @@ -10,7 +10,6 @@ import ( "strings" "syscall" - "git.wzray.com/homelab/hivemind/internal/app" "git.wzray.com/homelab/hivemind/internal/config" "git.wzray.com/homelab/hivemind/internal/registry" "git.wzray.com/homelab/hivemind/internal/roles" @@ -18,14 +17,11 @@ import ( "git.wzray.com/homelab/hivemind/internal/roles/host" "git.wzray.com/homelab/hivemind/internal/roles/master" "git.wzray.com/homelab/hivemind/internal/roles/node" - dns_transport "git.wzray.com/homelab/hivemind/internal/transport/dns" - host_transport "git.wzray.com/homelab/hivemind/internal/transport/host" - master_transport "git.wzray.com/homelab/hivemind/internal/transport/master" - node_transport "git.wzray.com/homelab/hivemind/internal/transport/node" + "git.wzray.com/homelab/hivemind/internal/state" "git.wzray.com/homelab/hivemind/internal/types" - web_client "git.wzray.com/homelab/hivemind/internal/web/client" - web_middleware "git.wzray.com/homelab/hivemind/internal/web/middleware" - web_server "git.wzray.com/homelab/hivemind/internal/web/server" + "git.wzray.com/homelab/hivemind/internal/web/client" + "git.wzray.com/homelab/hivemind/internal/web/middleware" + "git.wzray.com/homelab/hivemind/internal/web/server" "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/rs/zerolog/pkgerrors" @@ -91,23 +87,18 @@ func main() { filestore.EnsureExists() registry := registry.New(filestore, self) - var builder web_middleware.MiddlewareBuilder - middlewares := builder.Prepare() - - client := web_client.New(middlewares) - - listenAddr := fmt.Sprintf("%v:%v", configuration.Node.ListenOn, configuration.Node.Port) - server := web_server.New(listenAddr, middlewares) - - state := app.NewState(registry, self, app.Clients{ - Master: master_transport.New(client), - DNS: dns_transport.New(client), - Host: host_transport.New(client), - Node: node_transport.New(client), - }) + state := state.New(registry, self) nodeRole := node.New(state, configuration.Node) + var builder middleware.MiddlewareBuilder + middlewares := builder.Prepare() + + client.Init(middlewares) + + listenAddr := fmt.Sprintf("%v:%v", configuration.Node.ListenOn, configuration.Node.Port) + server := server.NewServer(listenAddr, middlewares) + roles := make([]roles.Role, 0) roles = append(roles, nodeRole) diff --git a/go.mod b/go.mod index 53b5b0a..eceeb1b 100644 --- a/go.mod +++ b/go.mod @@ -2,12 +2,10 @@ module git.wzray.com/homelab/hivemind go 1.25.5 -require ( - github.com/rs/zerolog v1.34.0 - github.com/BurntSushi/toml v1.6.0 -) +require github.com/rs/zerolog v1.34.0 require ( + github.com/BurntSushi/toml v1.6.0 // indirect github.com/mattn/go-colorable v0.1.14 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/pkg/errors v0.9.1 // indirect diff --git a/internal/app/state.go b/internal/app/state.go deleted file mode 100644 index 74d6139..0000000 --- a/internal/app/state.go +++ /dev/null @@ -1,35 +0,0 @@ -package app - -import ( - "git.wzray.com/homelab/hivemind/internal/registry" - "git.wzray.com/homelab/hivemind/internal/transport/dns" - "git.wzray.com/homelab/hivemind/internal/transport/host" - "git.wzray.com/homelab/hivemind/internal/transport/master" - "git.wzray.com/homelab/hivemind/internal/transport/node" - "git.wzray.com/homelab/hivemind/internal/types" -) - -type Clients struct { - Master *master.Client - DNS *dns.Client - Host *host.Client - Node *node.Client -} - -type State struct { - Registry *registry.Registry - Self types.Node - Clients Clients -} - -func NewState( - registry *registry.Registry, - self types.Node, - clients Clients, -) *State { - return &State{ - Registry: registry, - Self: self, - Clients: clients, - } -} diff --git a/internal/config/defaults.go b/internal/config/defaults.go index df4a64e..317770d 100644 --- a/internal/config/defaults.go +++ b/internal/config/defaults.go @@ -14,8 +14,5 @@ var DefaultConfig = Config{ BackoffCount: 4, NodeTimeout: 120, }, - Host: HostConfig{ - ListenAddress: "0.0.0.0:56715", - }, }, } diff --git a/internal/config/host.go b/internal/config/host.go index e3b3af5..8e131ad 100644 --- a/internal/config/host.go +++ b/internal/config/host.go @@ -12,7 +12,6 @@ type HostConfig struct { LocalAddress string `toml:"local_address"` InternalEntrypoint string `toml:"internal_entrypoint"` ExternalEntrypoint string `toml:"external_entrypoint"` - ListenAddress string `toml:"listen_address"` baseRoleConfig } @@ -41,10 +40,6 @@ func (c HostConfig) Validate() error { return errors.New("missing external entrypoint") } - if c.ListenAddress == "" { - return errors.New("missing listen address") - } - return nil } @@ -72,8 +67,4 @@ func (c *HostConfig) Merge(other HostConfig) { if other.ExternalEntrypoint != "" { c.ExternalEntrypoint = other.ExternalEntrypoint } - - if other.ListenAddress != "" { - c.ListenAddress = other.ListenAddress - } } diff --git a/internal/config/node.go b/internal/config/node.go index 0ee3da0..fbeb29d 100644 --- a/internal/config/node.go +++ b/internal/config/node.go @@ -53,12 +53,8 @@ func (c NodeConfig) Validate() error { return errors.New("invalid keepalive_interval") } - if c.ListenOn == "" { - return errors.New("missing listen_on") - } - if net.ParseIP(c.ListenOn) == nil { - return fmt.Errorf("invalid listen_on: %v", c.ListenOn) + return errors.New("invalid listen_on") } return nil diff --git a/internal/roles/dns/dns.go b/internal/roles/dns/dns.go index 9de4898..6d01deb 100644 --- a/internal/roles/dns/dns.go +++ b/internal/roles/dns/dns.go @@ -8,23 +8,22 @@ import ( "strings" "sync" - "git.wzray.com/homelab/hivemind/internal/app" "git.wzray.com/homelab/hivemind/internal/config" - "git.wzray.com/homelab/hivemind/internal/transport" - "git.wzray.com/homelab/hivemind/internal/transport/dns" + "git.wzray.com/homelab/hivemind/internal/state" "git.wzray.com/homelab/hivemind/internal/types" + "git.wzray.com/homelab/hivemind/internal/web/client" "github.com/rs/zerolog/log" ) const hostsDir = "/etc/hosts.d/" type Role struct { - state *app.State + state *state.RuntimeState config config.DnsConfig group sync.WaitGroup } -func New(state *app.State, config config.DnsConfig) *Role { +func New(state *state.RuntimeState, config config.DnsConfig) *Role { r := &Role{ state: state, config: config, @@ -33,6 +32,28 @@ func New(state *app.State, config config.DnsConfig) *Role { return r } +func (r *Role) updateDnsmasq(filename string, data []byte) error { + if err := os.WriteFile(filename, data, 0644); err != nil { + return fmt.Errorf("write endpoint file %q: %w", filename, err) + } + + if err := r.reload(); err != nil { + return fmt.Errorf("reload dnsmasq: %w", err) + } + + return nil +} + +func parseState(state types.HostState) (string, []byte) { + var builder strings.Builder + + for _, d := range state.Domains { + builder.WriteString(fmt.Sprintf("%s %s\n", state.Address, d)) + } + + return hostsDir + state.Hostname, []byte(builder.String()) +} + func (r *Role) OnStartup(ctx context.Context) error { r.group.Go(func() { r.syncFromRegistry() @@ -53,46 +74,15 @@ func (r *Role) OnStartup(ctx context.Context) error { return nil } -func (r *Role) OnShutdown() error { - r.group.Wait() - return nil -} - -func (r *Role) RegisterHandlers(rg transport.Registrator) { - dns.Register(rg, r) -} - -func (r *Role) Callback(state types.HostState) (bool, error) { - filename, data := parseState(state) - - if err := r.updateDnsmasq(filename, data); err != nil { - return false, err - } - - return true, nil -} - -func (r *Role) updateDnsmasq(filename string, data []byte) error { - if err := os.WriteFile(filename, data, 0644); err != nil { - return fmt.Errorf("write endpoint file %q: %w", filename, err) - } - - if err := r.reload(); err != nil { - return fmt.Errorf("reload dnsmasq: %w", err) - } - - return nil -} - func (r *Role) syncFromRegistry() { for _, n := range r.state.Registry.ByRole(types.HostRole) { - state, err := r.state.Clients.Host.Dns(n.Endpoint) + state, err := client.Get[types.HostState](n.Endpoint, types.PathHostDns) if err != nil { log.Warn().Str("name", n.Hostname).Err(err).Msg("unable to get host config") continue } - filename, data := parseState(state) + filename, data := parseState(*state) if err := r.updateDnsmasq(filename, data); err != nil { log.Warn().Str("name", n.Hostname).Err(err).Msg("unable to update dnsmasq") continue @@ -100,6 +90,11 @@ func (r *Role) syncFromRegistry() { } } +func (r *Role) OnShutdown() error { + r.group.Wait() + return nil +} + func (r *Role) reload() error { var err error @@ -112,12 +107,16 @@ func (r *Role) reload() error { return err } -func parseState(state types.HostState) (string, []byte) { - var builder strings.Builder +func (r *Role) onCallback(state types.HostState) (bool, error) { + filename, data := parseState(state) - for _, d := range state.Domains { - builder.WriteString(fmt.Sprintf("%s %s\n", state.Address, d)) + if err := r.updateDnsmasq(filename, data); err != nil { + return false, err } - return hostsDir + state.Hostname, []byte(builder.String()) + return true, nil +} + +func (r *Role) RegisterHandlers(rg types.Registrator) { + rg.Register(types.PostEndpoint(types.PathDnsCallback, r.onCallback)) } diff --git a/internal/roles/host/gateway.go b/internal/roles/host/gateway.go deleted file mode 100644 index 7e16807..0000000 --- a/internal/roles/host/gateway.go +++ /dev/null @@ -1,96 +0,0 @@ -package host - -import ( - "context" - "encoding/json" - "fmt" - "net/http" - "net/url" - - "git.wzray.com/homelab/hivemind/internal/config" - "github.com/rs/zerolog/log" -) - -type TraefikListener interface { - OnTraefikUpdate(traefikResponse) -} - -type TraefikGateway struct { - client *http.Client - server *http.Server - listener TraefikListener - address url.URL - domain string -} - -func NewTraefikGateway(cfg config.HostConfig, listener TraefikListener) *TraefikGateway { - mux := http.NewServeMux() - gw := &TraefikGateway{ - client: &http.Client{}, - - server: &http.Server{ - Addr: cfg.ListenAddress, - Handler: mux, - }, - listener: listener, - address: url.URL{ - Scheme: "http", - Host: cfg.LocalAddress, - }, - domain: cfg.Domain, - } - - mux.HandleFunc("/callback", gw.onCallback) - return gw -} - -func (g *TraefikGateway) Listen() error { - return g.server.ListenAndServe() -} - -func (g *TraefikGateway) Shutdown(ctx context.Context) error { - return g.server.Shutdown(ctx) -} - -func (g *TraefikGateway) GetRawData() (*traefikResponse, error) { - var raw TraefikRawResponse - - url := g.address - url.Path = "/api/rawdata" - - req := http.Request{ - Method: http.MethodGet, - URL: &url, - } - - req.Host = g.domain - - r, err := g.client.Do(&req) - if err != nil { - return nil, fmt.Errorf("make request: %w", err) - } - defer r.Body.Close() - - if err := json.NewDecoder(r.Body).Decode(&raw); err != nil { - return nil, fmt.Errorf("unmarshal body: %w", err) - } - - out := parseTraefikResponse(raw) - return &out, nil -} - -func (g *TraefikGateway) onCallback(w http.ResponseWriter, req *http.Request) { - var raw TraefikRawResponse - if err := json.NewDecoder(req.Body).Decode(&raw); err != nil { - w.WriteHeader(http.StatusInternalServerError) - log.Err(err).Msg("unable to decode traefik callback data") - return - } - - resp := parseTraefikResponse(raw) - if g.listener != nil { - g.listener.OnTraefikUpdate(resp) - } - - w.Write([]byte("OK")) -} diff --git a/internal/roles/host/host.go b/internal/roles/host/host.go index a97a80b..ca61e2f 100644 --- a/internal/roles/host/host.go +++ b/internal/roles/host/host.go @@ -2,37 +2,36 @@ package host import ( "context" + "encoding/json" "fmt" + "net/http" "slices" "sync" - "git.wzray.com/homelab/hivemind/internal/app" "git.wzray.com/homelab/hivemind/internal/config" - "git.wzray.com/homelab/hivemind/internal/transport" - "git.wzray.com/homelab/hivemind/internal/transport/host" + "git.wzray.com/homelab/hivemind/internal/state" "git.wzray.com/homelab/hivemind/internal/types" + "git.wzray.com/homelab/hivemind/internal/web/client" "github.com/rs/zerolog/log" ) type Role struct { - state *app.State + state *state.RuntimeState config config.HostConfig - gateway *TraefikGateway + client *traefikClient tasksGroup sync.WaitGroup externalDomains []string // TODO: i don't like hardcoding external/internal logic here internalDomains []string } -func New(state *app.State, config config.HostConfig) *Role { - r := &Role{ +func New(state *state.RuntimeState, config config.HostConfig) *Role { + return &Role{ + client: newClient(config.Domain, config.LocalAddress), state: state, config: config, } - - r.gateway = NewTraefikGateway(config, r) - return r } func (r *Role) sendUpdate(domains []string, role types.Role) { @@ -46,7 +45,7 @@ func (r *Role) sendUpdate(domains []string, role types.Role) { r.tasksGroup.Go(func() { logger := log.With().Str("name", node.Hostname).Logger() logger.Debug().Msg("sending update") - if _, err := r.state.Clients.DNS.Callback(node.Endpoint, state); err != nil { + if _, err := client.Post[any](node.Endpoint, types.PathDnsCallback, state); err != nil { logger.Warn().Err(err).Msg("unable to send dns info") } else { logger.Debug().Msg("update sent") @@ -55,7 +54,7 @@ func (r *Role) sendUpdate(domains []string, role types.Role) { } } -func (r *Role) OnTraefikUpdate(resp traefikResponse) { +func (r *Role) mutateState(resp traefikResponse) { newInternal := resp.Domains(r.config.InternalEntrypoint) newExternal := resp.Domains(r.config.ExternalEntrypoint) @@ -72,7 +71,20 @@ func (r *Role) OnTraefikUpdate(resp traefikResponse) { } } -func (r *Role) Dns() (types.HostState, error) { +func (r *Role) onCallback(w http.ResponseWriter, req *http.Request) { + var resp traefikResponse + if err := json.NewDecoder(req.Body).Decode(&resp); err != nil { + w.WriteHeader(http.StatusInternalServerError) + log.Err(err).Msg("unable to decode traefik callback data") + return + } + + r.mutateState(resp) + + w.Write([]byte("OK")) +} + +func (r *Role) getInternal() (types.HostState, error) { return types.HostState{ Domains: r.internalDomains, Address: r.config.IpAddress, @@ -80,7 +92,7 @@ func (r *Role) Dns() (types.HostState, error) { }, nil } -func (r *Role) Nameserver() (types.HostState, error) { +func (r *Role) getExternal() (types.HostState, error) { return types.HostState{ Domains: r.externalDomains, Address: r.config.IpAddress, @@ -89,25 +101,14 @@ func (r *Role) Nameserver() (types.HostState, error) { } -func (r *Role) RegisterHandlers(rg transport.Registrator) { - host.Register(rg, r) +func (r *Role) RegisterHandlers(rg types.Registrator) { + rg.RegisterRaw(http.MethodPost, types.PathHostCallback.String(), r.onCallback) + rg.Register(types.GetEndpoint(types.PathHostDns, r.getInternal)) + rg.Register(types.GetEndpoint(types.PathHostNs, r.getExternal)) } func (r *Role) OnStartup(ctx context.Context) error { - r.tasksGroup.Go(func() { - if err := r.gateway.Listen(); err != nil { - log.Err(err).Msg("traefik gateway stopped") - } - }) - - r.tasksGroup.Go(func() { - <-ctx.Done() - if err := r.gateway.Shutdown(context.Background()); err != nil { - log.Err(err).Msg("failed to shutdown traefik gateway") - } - }) - - resp, err := r.gateway.GetRawData() + resp, err := r.client.GetRawData() if err != nil { return fmt.Errorf("get traefik state: %w", err) } @@ -115,7 +116,7 @@ func (r *Role) OnStartup(ctx context.Context) error { log.Info().Msg("got raw data from traefik") log.Debug().Interface("response", resp).Send() - r.OnTraefikUpdate(*resp) + r.mutateState(*resp) return nil } diff --git a/internal/roles/host/http.go b/internal/roles/host/http.go index cd147b7..1eb647c 100644 --- a/internal/roles/host/http.go +++ b/internal/roles/host/http.go @@ -1 +1,58 @@ package host + +import ( + "crypto/tls" + "encoding/json" + "fmt" + "net/http" + "net/url" +) + +type traefikClient struct { + client *http.Client + domain string + address url.URL +} + +func newClient(domain string, addr string) *traefikClient { + return &traefikClient{ + domain: domain, + address: url.URL{ + Scheme: "https", + Host: addr, + }, + client: &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + ServerName: domain, + }, + }, + }, + } +} + +func (c *traefikClient) GetRawData() (*traefikResponse, error) { + var out traefikResponse + + url := c.address + url.Path = "/api/rawdata" + + req := http.Request{ + Method: "GET", + URL: &url, + } + + req.Host = c.domain + + r, err := c.client.Do(&req) + if err != nil { + return nil, fmt.Errorf("make request: %w", err) + } + defer r.Body.Close() + + if err := json.NewDecoder(r.Body).Decode(&out); err != nil { + return nil, fmt.Errorf("unmarshal body: %w", err) + } + + return &out, nil +} diff --git a/internal/roles/host/types.go b/internal/roles/host/types.go index e0b5e5a..ad1c9fd 100644 --- a/internal/roles/host/types.go +++ b/internal/roles/host/types.go @@ -1,66 +1,64 @@ package host import ( + "encoding/json" "regexp" "slices" ) var hostRegex = regexp.MustCompile("Host\\(`([^()`]+\\.[^()`]+)`\\)") -type traefikRule struct { +type rule struct { Raw string Domains []string Valid bool } -type traefikRouter struct { - Rule traefikRule - Entrypoints []string -} +func (r *rule) UnmarshalJSON(data []byte) error { + r.Valid = false -type traefikResponse struct { - Routers []traefikRouter -} - -type TraefikRawResponse struct { - Routers map[string]TraefikRawRouter `json:"routers"` -} - -type TraefikRawRouter struct { - Rule string `json:"rule"` - Entrypoints []string `json:"entryPoints"` -} - -func parseTraefikResponse(raw TraefikRawResponse) traefikResponse { - out := traefikResponse{ - Routers: make([]traefikRouter, 0, len(raw.Routers)), - } - - for _, router := range raw.Routers { - out.Routers = append(out.Routers, traefikRouter{ - Rule: parseTraefikRule(router.Rule), - Entrypoints: router.Entrypoints, - }) - } - - return out -} - -func parseTraefikRule(raw string) traefikRule { - rule := traefikRule{ - Raw: raw, + raw := "" + if err := json.Unmarshal(data, &raw); err != nil { + return err } matches := hostRegex.FindAllStringSubmatch(raw, -1) + for _, match := range matches { if len(match) <= 1 { continue } - rule.Domains = append(rule.Domains, match[1:]...) + r.Domains = append(r.Domains, match[1:]...) } - rule.Valid = len(rule.Domains) > 0 - return rule + r.Valid = len(r.Domains) > 0 + + return nil +} + +type router struct { + Rule rule `json:"rule"` + Entrypoints []string `json:"entryPoints"` +} + +type traefikResponse struct { + Routers []router +} + +func (r *traefikResponse) UnmarshalJSON(data []byte) error { + var raw struct { + Routers map[string]router `json:"routers"` + } + + if err := json.Unmarshal(data, &raw); err != nil { + return err + } + + for _, v := range raw.Routers { + r.Routers = append(r.Routers, v) + } + + return nil } func (r traefikResponse) Domains(entrypoint string) []string { diff --git a/internal/roles/master/master.go b/internal/roles/master/master.go index fef6453..dee25e3 100644 --- a/internal/roles/master/master.go +++ b/internal/roles/master/master.go @@ -4,24 +4,22 @@ import ( "context" "sync" - "git.wzray.com/homelab/hivemind/internal/app" "git.wzray.com/homelab/hivemind/internal/config" "git.wzray.com/homelab/hivemind/internal/roles" - "git.wzray.com/homelab/hivemind/internal/transport" - "git.wzray.com/homelab/hivemind/internal/transport/master" + "git.wzray.com/homelab/hivemind/internal/state" "git.wzray.com/homelab/hivemind/internal/types" - "github.com/rs/zerolog/log" + "git.wzray.com/homelab/hivemind/internal/web/client" ) type Role struct { - state *app.State + state *state.RuntimeState config config.MasterConfig tasksGroup sync.WaitGroup observer *observer roles.BaseRole } -func New(state *app.State, config config.MasterConfig) *Role { +func New(state *state.RuntimeState, config config.MasterConfig) *Role { return &Role{ state: state, config: config, @@ -38,7 +36,7 @@ func New(state *app.State, config config.MasterConfig) *Role { func (r *Role) OnStartup(ctx context.Context) error { r.tasksGroup.Go(func() { r.observer.Start(ctx, func(n types.Node) error { - _, err := r.onLeave(n, true) + _, err := r.onLeave(n) return err }) }) @@ -51,95 +49,48 @@ func (r *Role) OnShutdown() error { return nil } -func (c *Role) RegisterHandlers(r transport.Registrator) { - master.Register(r, c) +func (r *Role) notify(path types.Path, v any) { + for _, n := range r.state.Registry.Nodes() { + addr := n.Endpoint + r.tasksGroup.Go(func() { + client.Post[any](addr, path, v) + }) + } } -func (r *Role) Heartbeat(node types.Node) (types.Nodes, error) { - return r.onKeepAlive(node, true) -} - -func (r *Role) Join(node types.Node) (types.Nodes, error) { - return r.onJoin(node, true) -} - -func (r *Role) Leave(node types.Node) error { - _, err := r.onLeave(node, true) - return err -} - -func (r *Role) EventHeartbeat(node types.Node) (types.Nodes, error) { - return r.onKeepAlive(node, false) -} - -func (r *Role) EventJoin(node types.Node) (types.Nodes, error) { - return r.onJoin(node, false) -} - -func (r *Role) EventLeave(node types.Node) error { - _, err := r.onLeave(node, false) - return err -} - -func (r *Role) onJoin(node types.Node, notify bool) (map[string]types.Node, error) { +func (r *Role) onJoin(node types.Node) (map[string]types.Node, error) { if err := r.state.Registry.AddNode(node); err != nil { return nil, err } - if notify { - propagate(r, noReturn(r.state.Clients.Master.EventJoin), node) - } + r.notify(types.PathNodeJoin, node) return r.state.Registry.AllNodes(), nil } -func (r *Role) onLeave(node types.Node, notify bool) (bool, error) { +func (r *Role) onLeave(node types.Node) (bool, error) { if err := r.state.Registry.RemoveNode(node); err != nil { return false, err } - if notify { - propagate(r, r.state.Clients.Master.EventLeave, node) - } + r.notify(types.PathNodeLeave, node) return true, nil } -func (r *Role) onKeepAlive(node types.Node, notify bool) (map[string]types.Node, error) { +func (r *Role) onKeepAlive(node types.Node) (bool, error) { r.observer.onKeepAlive(node) if ok := r.state.Registry.Exists(node.Hostname); !ok { - // TODO: i don't like this side effect - if _, err := r.onJoin(node, true); err != nil { - log.Warn().Err(err).Msg("unable to add node to the registry from keepalive") - } + _, err := r.onJoin(node) + return true, err } - if notify { - propagate(r, noReturn(r.state.Clients.Master.EventHeartbeat), node) - } - - return r.state.Registry.AllNodes(), nil + return false, nil } -func eventFunc[R any](fn func(types.Node, bool) (R, error), notify bool) func(types.Node) (R, error) { - return func(n types.Node) (R, error) { - return fn(n, notify) - } -} - -func noReturn[T, V any](fn func(string, T) (V, error)) func(string, T) error { - return func(s string, t T) error { - _, err := fn(s, t) - return err - } -} - -func propagate[T any](r *Role, handler func(string, T) error, v T) { - for _, n := range r.state.Registry.ByRole(types.MasterRole) { - addr := n.Endpoint - r.tasksGroup.Go(func() { - handler(addr, v) - }) - } +func (c *Role) RegisterHandlers(r types.Registrator) { + r.Register(types.PostEndpoint(types.PathMasterJoin, c.onJoin)) + r.Register(types.PostEndpoint(types.PathMasterLeave, c.onLeave)) + r.Register(types.PostEndpoint(types.PathMasterKeepalive, c.onKeepAlive)) } diff --git a/internal/roles/master/observer.go b/internal/roles/master/observer.go index fab3478..4d07a98 100644 --- a/internal/roles/master/observer.go +++ b/internal/roles/master/observer.go @@ -5,14 +5,15 @@ import ( "sync" "time" - "git.wzray.com/homelab/hivemind/internal/app" "git.wzray.com/homelab/hivemind/internal/registry" + "git.wzray.com/homelab/hivemind/internal/state" "git.wzray.com/homelab/hivemind/internal/types" + "git.wzray.com/homelab/hivemind/internal/web/client" "github.com/rs/zerolog/log" ) type observer struct { - state *app.State + state *state.RuntimeState interval int backoff int backoffCount int @@ -22,7 +23,7 @@ type observer struct { } func newObserver( - state *app.State, + state *state.RuntimeState, interval int, backoff int, backoffCount int, @@ -62,7 +63,7 @@ func (o *observer) pollNodes(ctx context.Context, onLeave func(types.Node) error delay := time.Duration(o.backoff) alive := false for i := o.backoffCount - 1; i >= 0; i-- { - _, err := o.state.Clients.Node.Healthcheck(n.Endpoint) + _, err := client.Get[any](n.Endpoint, types.PathNodeHealthcheck) if err == nil { logger.Debug().Msg("node is alive") diff --git a/internal/roles/node/node.go b/internal/roles/node/node.go index 083a86a..a09355b 100644 --- a/internal/roles/node/node.go +++ b/internal/roles/node/node.go @@ -7,41 +7,26 @@ import ( "sync" "time" - "git.wzray.com/homelab/hivemind/internal/app" "git.wzray.com/homelab/hivemind/internal/config" - "git.wzray.com/homelab/hivemind/internal/transport" - "git.wzray.com/homelab/hivemind/internal/transport/node" + "git.wzray.com/homelab/hivemind/internal/state" "git.wzray.com/homelab/hivemind/internal/types" + "git.wzray.com/homelab/hivemind/internal/web/client" "github.com/rs/zerolog/log" ) type Role struct { - state *app.State + state *state.RuntimeState keepaliveGroup sync.WaitGroup config config.NodeConfig } -func New(state *app.State, config config.NodeConfig) *Role { +func New(state *state.RuntimeState, config config.NodeConfig) *Role { return &Role{ state: state, config: config, } } -func (r *Role) OnStartup(ctx context.Context) error { - r.keepaliveGroup.Go(r.keepaliveFunc(ctx)) - return nil -} - -func (r *Role) OnShutdown() error { - r.keepaliveGroup.Wait() - return nil -} - -func (n *Role) RegisterHandlers(r transport.Registrator) { - node.Register(r, n) -} - func (r *Role) Join(bootstrap string) error { masters := make(map[string]struct{}) for _, node := range r.state.Registry.ByRole(types.MasterRole) { @@ -61,14 +46,14 @@ func (r *Role) Join(bootstrap string) error { logger := log.With().Str("host", m).Logger() logger.Debug().Msg("trying to join via master") - nodes, err := r.state.Clients.Master.Join(m, r.state.Self) + nodes, err := client.Post[map[string]types.Node](m, types.PathMasterJoin, r.state.Self) if err != nil { errs = append(errs, err) logger.Debug().Err(err).Msg("unable to join") continue } - if err := r.state.Registry.Set(nodes); err != nil { + if err := r.state.Registry.Set(*nodes); err != nil { logger.Debug().Err(err).Msg("unable to set master's nodes") errs = append(errs, err) continue @@ -91,7 +76,8 @@ func (r *Role) Leave() error { logger := log.With().Str("name", m.Hostname).Logger() logger.Debug().Msg("sending leave message") - if err := r.state.Clients.Master.Leave(m.Endpoint, r.state.Self); err != nil { + _, err := client.Post[any](m.Endpoint, types.PathMasterLeave, r.state.Self) + if err != nil { logger.Debug().Err(err).Msg("unable to send leave message") errs = append(errs, err) continue @@ -104,8 +90,15 @@ func (r *Role) Leave() error { return fmt.Errorf("unable to send leave message to any master: %w", errors.Join(errs...)) } -func (r *Role) Healthcheck() (string, error) { - return "OK", nil +func (r *Role) OnStartup(ctx context.Context) error { + r.keepaliveGroup.Go(r.keepaliveFunc(ctx)) + + return nil +} + +func (r *Role) OnShutdown() error { + r.keepaliveGroup.Wait() + return nil } func (r *Role) keepaliveFunc(ctx context.Context) func() { @@ -114,20 +107,11 @@ func (r *Role) keepaliveFunc(ctx context.Context) func() { logger := log.With().Str("name", m.Hostname).Logger() logger.Debug().Msg("sending keepalive packet") - nodes, err := r.state.Clients.Master.Heartbeat(m.Endpoint, r.state.Self) - if err != nil { + if _, err := client.Post[any](m.Endpoint, types.PathMasterKeepalive, r.state.Self); err != nil { logger.Info().Err(err).Msg("unable to send keepalive packet") - continue + } else { + logger.Debug().Msg("keepalive packet sent") } - - logger.Debug().Msg("keepalive packet sent") - - if err := r.state.Registry.Set(nodes); err != nil { - logger.Warn().Err(err).Msg("unable to set masters nodes") - continue - } - - break } } @@ -142,3 +126,27 @@ func (r *Role) keepaliveFunc(ctx context.Context) func() { } } } + +func (r *Role) onJoin(node types.Node) (bool, error) { + if err := r.state.Registry.AddNode(node); err != nil { + return false, err + } + return true, nil +} + +func (r *Role) onLeave(node types.Node) (bool, error) { + if err := r.state.Registry.RemoveNode(node); err != nil { + return false, err + } + return true, nil +} + +func healthcheck() (string, error) { + return "OK", nil +} + +func (n *Role) RegisterHandlers(r types.Registrator) { + r.Register(types.GetEndpoint(types.PathNodeHealthcheck, healthcheck)) + r.Register(types.PostEndpoint(types.PathNodeJoin, n.onJoin)) + r.Register(types.PostEndpoint(types.PathNodeLeave, n.onLeave)) +} diff --git a/internal/roles/role.go b/internal/roles/role.go index e3cd5f7..4d42f6d 100644 --- a/internal/roles/role.go +++ b/internal/roles/role.go @@ -3,18 +3,18 @@ package roles import ( "context" - "git.wzray.com/homelab/hivemind/internal/transport" + "git.wzray.com/homelab/hivemind/internal/types" ) type Role interface { - RegisterHandlers(transport.Registrator) + RegisterHandlers(types.Registrator) OnStartup(context.Context) error OnShutdown() error } type BaseRole struct{} -func (r *BaseRole) RegisterHandlers(transport.Registrator) {} +func (r *BaseRole) RegisterHandlers(types.Registrator) {} func (r *BaseRole) OnStartup(context.Context) error { return nil } diff --git a/internal/state/runtime.go b/internal/state/runtime.go new file mode 100644 index 0000000..8b814d2 --- /dev/null +++ b/internal/state/runtime.go @@ -0,0 +1,18 @@ +package state + +import ( + "git.wzray.com/homelab/hivemind/internal/registry" + "git.wzray.com/homelab/hivemind/internal/types" +) + +type RuntimeState struct { + Registry *registry.Registry + Self types.Node +} + +func New(r *registry.Registry, n types.Node) *RuntimeState { + return &RuntimeState{ + Registry: r, + Self: n, + } +} diff --git a/internal/transport/caller.go b/internal/transport/caller.go deleted file mode 100644 index ec82499..0000000 --- a/internal/transport/caller.go +++ /dev/null @@ -1,5 +0,0 @@ -package transport - -type Caller interface { - Call(host string, path string, data any, out any) error -} diff --git a/internal/transport/codec/codec.go b/internal/transport/codec/codec.go deleted file mode 100644 index 0c67c16..0000000 --- a/internal/transport/codec/codec.go +++ /dev/null @@ -1,16 +0,0 @@ -package codec - -type Codec interface { - Decode(data []byte, out any) error - Encode(data any) ([]byte, error) -} - -func Decode[T any](c Codec, data []byte) (T, error) { - var out T - err := c.Decode(data, &out) - return out, err -} - -func Encode[T any](c Codec, data T) ([]byte, error) { - return c.Encode(data) -} diff --git a/internal/transport/codec/json.go b/internal/transport/codec/json.go deleted file mode 100644 index 9be4a82..0000000 --- a/internal/transport/codec/json.go +++ /dev/null @@ -1,15 +0,0 @@ -package codec - -import "encoding/json" - -type jsonCodec struct{} - -var JSON = jsonCodec{} - -func (jsonCodec) Decode(data []byte, out any) error { - return json.Unmarshal(data, out) -} - -func (jsonCodec) Encode(data any) ([]byte, error) { - return json.Marshal(data) -} diff --git a/internal/transport/dns/client.go b/internal/transport/dns/client.go deleted file mode 100644 index 43020c0..0000000 --- a/internal/transport/dns/client.go +++ /dev/null @@ -1,20 +0,0 @@ -package dns - -import ( - "git.wzray.com/homelab/hivemind/internal/transport" - "git.wzray.com/homelab/hivemind/internal/types" -) - -type Client struct { - caller transport.Caller -} - -func New(caller transport.Caller) *Client { - return &Client{ - caller: caller, - } -} - -func (c *Client) Callback(endpoint string, state types.HostState) (bool, error) { - return callbackRoute.Call(c.caller, endpoint, state) -} diff --git a/internal/transport/dns/handlers.go b/internal/transport/dns/handlers.go deleted file mode 100644 index 0455af7..0000000 --- a/internal/transport/dns/handlers.go +++ /dev/null @@ -1,14 +0,0 @@ -package dns - -import ( - "git.wzray.com/homelab/hivemind/internal/transport" - "git.wzray.com/homelab/hivemind/internal/types" -) - -type DnsHandlers interface { - Callback(types.HostState) (bool, error) -} - -func Register(registrator transport.Registrator, h DnsHandlers) { - callbackRoute.Register(registrator, h.Callback) -} diff --git a/internal/transport/dns/routes.go b/internal/transport/dns/routes.go deleted file mode 100644 index 6b505f8..0000000 --- a/internal/transport/dns/routes.go +++ /dev/null @@ -1,10 +0,0 @@ -package dns - -import ( - "git.wzray.com/homelab/hivemind/internal/transport" - "git.wzray.com/homelab/hivemind/internal/types" -) - -var ( - callbackRoute = transport.NewRoute[types.HostState, bool]("/dns/callback") -) diff --git a/internal/transport/helpers.go b/internal/transport/helpers.go deleted file mode 100644 index 93ca5f5..0000000 --- a/internal/transport/helpers.go +++ /dev/null @@ -1,9 +0,0 @@ -package transport - -func WithoutInput[T any](f func() (T, error)) func(struct{}) (T, error) { - return func(s struct{}) (T, error) { return f() } -} - -func WithoutOutput[T any](f func(T) error) func(T) (struct{}, error) { - return func(t T) (struct{}, error) { return struct{}{}, f(t) } -} diff --git a/internal/transport/host/client.go b/internal/transport/host/client.go deleted file mode 100644 index d2580b2..0000000 --- a/internal/transport/host/client.go +++ /dev/null @@ -1,24 +0,0 @@ -package host - -import ( - "git.wzray.com/homelab/hivemind/internal/transport" - "git.wzray.com/homelab/hivemind/internal/types" -) - -type Client struct { - caller transport.Caller -} - -func New(caller transport.Caller) *Client { - return &Client{ - caller: caller, - } -} - -func (c *Client) Dns(endpoint string) (types.HostState, error) { - return dnsRoute.CallNoInput(c.caller, endpoint) -} - -func (c *Client) Nameserver(endpoint string) (types.HostState, error) { - return nsRoute.CallNoInput(c.caller, endpoint) -} diff --git a/internal/transport/host/handlers.go b/internal/transport/host/handlers.go deleted file mode 100644 index 81d98e0..0000000 --- a/internal/transport/host/handlers.go +++ /dev/null @@ -1,16 +0,0 @@ -package host - -import ( - "git.wzray.com/homelab/hivemind/internal/transport" - "git.wzray.com/homelab/hivemind/internal/types" -) - -type HostHandlers interface { - Dns() (types.HostState, error) - Nameserver() (types.HostState, error) -} - -func Register(registrator transport.Registrator, h HostHandlers) { - dnsRoute.Register(registrator, transport.WithoutInput(h.Dns)) - nsRoute.Register(registrator, transport.WithoutInput(h.Nameserver)) -} diff --git a/internal/transport/host/routes.go b/internal/transport/host/routes.go deleted file mode 100644 index a907cb8..0000000 --- a/internal/transport/host/routes.go +++ /dev/null @@ -1,11 +0,0 @@ -package host - -import ( - "git.wzray.com/homelab/hivemind/internal/transport" - "git.wzray.com/homelab/hivemind/internal/types" -) - -var ( - dnsRoute = transport.NewRoute[struct{}, types.HostState]("/host/dns") - nsRoute = transport.NewRoute[struct{}, types.HostState]("/host/ns") -) diff --git a/internal/transport/master/client.go b/internal/transport/master/client.go deleted file mode 100644 index 08a78c6..0000000 --- a/internal/transport/master/client.go +++ /dev/null @@ -1,40 +0,0 @@ -package master - -import ( - "git.wzray.com/homelab/hivemind/internal/transport" - "git.wzray.com/homelab/hivemind/internal/types" -) - -type Client struct { - caller transport.Caller -} - -func New(caller transport.Caller) *Client { - return &Client{ - caller: caller, - } -} - -func (c *Client) Heartbeat(endpoint string, n types.Node) (types.Nodes, error) { - return heartbeatRoute.Call(c.caller, endpoint, n) -} - -func (c *Client) Join(endpoint string, n types.Node) (types.Nodes, error) { - return joinRoute.Call(c.caller, endpoint, n) -} - -func (c *Client) Leave(endpoint string, n types.Node) error { - return leaveRoute.CallNoOutput(c.caller, endpoint, n) -} - -func (c *Client) EventHeartbeat(endpoint string, n types.Node) (types.Nodes, error) { - return eventHeartbeatRoute.Call(c.caller, endpoint, n) -} - -func (c *Client) EventJoin(endpoint string, n types.Node) (types.Nodes, error) { - return eventJoinRoute.Call(c.caller, endpoint, n) -} - -func (c *Client) EventLeave(endpoint string, n types.Node) error { - return eventLeaveRoute.CallNoOutput(c.caller, endpoint, n) -} diff --git a/internal/transport/master/handlers.go b/internal/transport/master/handlers.go deleted file mode 100644 index b67ef7b..0000000 --- a/internal/transport/master/handlers.go +++ /dev/null @@ -1,26 +0,0 @@ -package master - -import ( - "git.wzray.com/homelab/hivemind/internal/transport" - "git.wzray.com/homelab/hivemind/internal/types" -) - -type MasterHandlers interface { - Heartbeat(types.Node) (types.Nodes, error) - Join(types.Node) (types.Nodes, error) - Leave(types.Node) error - - EventHeartbeat(types.Node) (types.Nodes, error) - EventJoin(types.Node) (types.Nodes, error) - EventLeave(types.Node) error -} - -func Register(registrator transport.Registrator, h MasterHandlers) { - heartbeatRoute.Register(registrator, h.Heartbeat) - joinRoute.Register(registrator, h.Join) - leaveRoute.Register(registrator, transport.WithoutOutput(h.Leave)) - - eventHeartbeatRoute.Register(registrator, h.EventHeartbeat) - eventJoinRoute.Register(registrator, h.EventJoin) - eventLeaveRoute.Register(registrator, transport.WithoutOutput(h.EventLeave)) -} diff --git a/internal/transport/master/routes.go b/internal/transport/master/routes.go deleted file mode 100644 index d59b53d..0000000 --- a/internal/transport/master/routes.go +++ /dev/null @@ -1,16 +0,0 @@ -package master - -import ( - "git.wzray.com/homelab/hivemind/internal/transport" - "git.wzray.com/homelab/hivemind/internal/types" -) - -var ( - heartbeatRoute = transport.NewRoute[types.Node, types.Nodes]("/master/heartbeat") - joinRoute = transport.NewRoute[types.Node, types.Nodes]("/master/join") - leaveRoute = transport.NewRoute[types.Node, struct{}]("/master/leave") - - eventHeartbeatRoute = transport.NewRoute[types.Node, types.Nodes]("/master/event/heartbeat") - eventJoinRoute = transport.NewRoute[types.Node, types.Nodes]("/master/event/join") - eventLeaveRoute = transport.NewRoute[types.Node, struct{}]("/master/event/leave") -) diff --git a/internal/transport/node/client.go b/internal/transport/node/client.go deleted file mode 100644 index 82f5a29..0000000 --- a/internal/transport/node/client.go +++ /dev/null @@ -1,17 +0,0 @@ -package node - -import ( - "git.wzray.com/homelab/hivemind/internal/transport" -) - -type Client struct { - caller transport.Caller -} - -func New(caller transport.Caller) *Client { - return &Client{caller: caller} -} - -func (c *Client) Healthcheck(endpoint string) (string, error) { - return healthcheckRoute.CallNoInput(c.caller, endpoint) -} diff --git a/internal/transport/node/handlers.go b/internal/transport/node/handlers.go deleted file mode 100644 index f2c7228..0000000 --- a/internal/transport/node/handlers.go +++ /dev/null @@ -1,13 +0,0 @@ -package node - -import ( - "git.wzray.com/homelab/hivemind/internal/transport" -) - -type NodeHandlers interface { - Healthcheck() (string, error) -} - -func Register(registrator transport.Registrator, h NodeHandlers) { - healthcheckRoute.Register(registrator, transport.WithoutInput(h.Healthcheck)) -} diff --git a/internal/transport/node/routes.go b/internal/transport/node/routes.go deleted file mode 100644 index 1be2a69..0000000 --- a/internal/transport/node/routes.go +++ /dev/null @@ -1,9 +0,0 @@ -package node - -import ( - "git.wzray.com/homelab/hivemind/internal/transport" -) - -var ( - healthcheckRoute = transport.NewRoute[struct{}, string]("/node/healthcheck") -) diff --git a/internal/transport/registrator.go b/internal/transport/registrator.go deleted file mode 100644 index e98c5ad..0000000 --- a/internal/transport/registrator.go +++ /dev/null @@ -1,22 +0,0 @@ -package transport - -import ( - "git.wzray.com/homelab/hivemind/internal/transport/codec" -) - -type Handler struct { - path string - handler func(codec.Codec, []byte) ([]byte, error) -} - -func (h Handler) Path() string { - return h.path -} - -func (h Handler) Handle(c codec.Codec, v []byte) ([]byte, error) { - return h.handler(c, v) -} - -type Registrator interface { - Register(endpoint Handler) -} diff --git a/internal/transport/route.go b/internal/transport/route.go deleted file mode 100644 index a5714eb..0000000 --- a/internal/transport/route.go +++ /dev/null @@ -1,65 +0,0 @@ -package transport - -import ( - "fmt" - - "git.wzray.com/homelab/hivemind/internal/transport/codec" -) - -type route[In, Out any] struct { - path string -} - -func NewRoute[In, Out any](path string) route[In, Out] { - return route[In, Out]{ - path: path, - } -} - -func routeToHandler[In, Out any](r route[In, Out], handler func(In) (Out, error)) Handler { - return Handler{ - path: r.path, - handler: func(c codec.Codec, b []byte) ([]byte, error) { - data, err := codec.Decode[In](c, b) - if err != nil { - return nil, fmt.Errorf("unable to decode body: %w", err) - } - - out, err := handler(data) - if err != nil { - return nil, fmt.Errorf("error while handling request: %w", err) - } - - raw, err := codec.Encode(c, out) - if err != nil { - return nil, fmt.Errorf("unable to encode body: %w", err) - } - - return raw, nil - }, - } -} - -func (e route[In, Out]) Call(caller Caller, host string, data In) (Out, error) { - var out Out - err := caller.Call(host, e.path, data, &out) - return out, err -} - -func (e route[In, Out]) CallNoInput(caller Caller, host string) (Out, error) { - var out Out - err := caller.Call(host, e.path, struct{}{}, &out) - return out, err -} - -func (e route[In, Out]) CallNoOutput(caller Caller, host string, data In) error { - return caller.Call(host, e.path, data, nil) -} - -func (e route[In, Out]) Path() string { - return e.path -} - -func (e route[In, Out]) Register(r Registrator, h func(In) (Out, error)) { - r.Register(routeToHandler(e, h)) -} diff --git a/internal/types/node.go b/internal/types/node.go index 25f937e..d1ed308 100644 --- a/internal/types/node.go +++ b/internal/types/node.go @@ -2,15 +2,13 @@ package types import "fmt" -type Nodes map[string]Node - // TODO: consider moving this type back to registry type Node struct { Hostname string `json:"hostname"` Address string `json:"address"` Port int `json:"port"` Roles []Role `json:"roles"` - Endpoint string `json:"endpoint"` // TODO: make endpoint into a separate type + Endpoint string `json:"endpoint"` } func NewNode( diff --git a/internal/types/web.go b/internal/types/web.go index 0514376..1416d6a 100644 --- a/internal/types/web.go +++ b/internal/types/web.go @@ -1,60 +1,78 @@ package types -// type Path string -// -// func (p Path) String() string { -// return string(p) -// } -// -// const ( -// PathMasterJoin Path = "/master/join" -// PathMasterLeave Path = "/master/leave" -// PathMasterKeepalive Path = "/master/keepalive" -// PathMasterEventJoin Path = "/master/event_join" -// PathMasterEventLeave Path = "/master/event_leave" -// PathMasterEventKeepalive Path = "/master/event_keepalive" -// -// PathNodeHealthcheck Path = "/node/healthcheck" -// -// PathDnsCallback Path = "/dns/callback" -// -// PathHostCallback Path = "/host/callback" -// PathHostDns Path = "/host/dns" -// PathHostNs Path = "/host/ns" -// ) -// -// type Route interface { -// Path() string -// Handle([]byte) (any, error) -// } -// -// type endpoint struct { -// path string -// handler func([]byte) (any, error) -// } -// -// func (e endpoint) Path() string { return e.path } -// -// func (e endpoint) Handle(v []byte) (any, error) { return e.handler(v) } -// -// func PostEndpoint[T any, V any](path Path, handler func(T) (V, error)) Route { -// return endpoint{ -// path: "POST " + path.String(), -// handler: func(a []byte) (any, error) { -// var r T -// if err := json.Unmarshal(a, &r); err != nil { -// return nil, fmt.Errorf("unable to unmarshal json: %w", err) -// } -// return handler(r) -// }, -// } -// } -// -// func GetEndpoint[T any](path Path, handler func() (T, error)) Route { -// return endpoint{ -// path: "GET " + path.String(), -// handler: func(a []byte) (any, error) { -// return handler() -// }, -// } -// } +import ( + "encoding/json" + "fmt" + "net/http" +) + +// TODO: split this up + +type Path string + +func (p Path) String() string { + return string(p) +} + +const ( + PathMasterJoin Path = "/master/join" + PathMasterLeave Path = "/master/leave" + PathMasterKeepalive Path = "/master/keepalive" + + PathNodeHealthcheck Path = "/node/healthcheck" + PathNodeJoin Path = "/node/join" + PathNodeLeave Path = "/node/leave" + + PathDnsCallback Path = "/dns/callback" + + PathHostCallback Path = "/host/callback" + PathHostDns Path = "/host/dns" + PathHostNs Path = "/host/ns" +) + +type Response[T any] struct { + Ok bool `json:"ok"` + Data T `json:"data,omitempty"` + Err string `json:"err,omitempty"` +} + +type Route interface { + Path() string + Handle([]byte) (any, error) +} + +type endpoint struct { + path string + handler func([]byte) (any, error) +} + +func (e endpoint) Path() string { return e.path } + +func (e endpoint) Handle(v []byte) (any, error) { return e.handler(v) } + +func PostEndpoint[T any, V any](path Path, handler func(T) (V, error)) Route { + return endpoint{ + path: "POST " + path.String(), + handler: func(a []byte) (any, error) { + var r T + if err := json.Unmarshal(a, &r); err != nil { + return nil, fmt.Errorf("unable to unmarshal json: %w", err) + } + return handler(r) + }, + } +} + +func GetEndpoint[T any](path Path, handler func() (T, error)) Route { + return endpoint{ + path: "GET " + path.String(), + handler: func(a []byte) (any, error) { + return handler() + }, + } +} + +type Registrator interface { + Register(endpoint Route) + RegisterRaw(method string, pattern string, handler func(http.ResponseWriter, *http.Request)) +} diff --git a/internal/web/client/client.go b/internal/web/client/client.go index 3396f6f..aa98ad1 100644 --- a/internal/web/client/client.go +++ b/internal/web/client/client.go @@ -10,18 +10,20 @@ import ( "net/url" "time" - "git.wzray.com/homelab/hivemind/internal/web" + "git.wzray.com/homelab/hivemind/internal/types" "git.wzray.com/homelab/hivemind/internal/web/middleware" ) -type Client struct { +type client struct { http *http.Client middleware middleware.Middleware } +var defaultClient *client + const timeout = time.Duration(2) * time.Second -func (c *Client) Call(host string, path string, data any, out any) error { +func (c *client) makeRequest(method string, host string, path types.Path, data any, out any) error { var body io.Reader if data != nil { raw, err := json.Marshal(data) @@ -34,10 +36,10 @@ func (c *Client) Call(host string, path string, data any, out any) error { uri := (&url.URL{ Scheme: "http", Host: host, - Path: path, + Path: path.String(), }).String() - r, err := http.NewRequest("POST", uri, body) + r, err := http.NewRequest(method, uri, body) if err != nil { return fmt.Errorf("build http request: %w", err) } @@ -50,41 +52,60 @@ func (c *Client) Call(host string, path string, data any, out any) error { return fmt.Errorf("apply middleware: %w", err) } - httpResponse, err := c.http.Do(r) + resp, err := c.http.Do(r) if err != nil { return fmt.Errorf("send request: %w", err) } - defer httpResponse.Body.Close() - if httpResponse.StatusCode < 200 || httpResponse.StatusCode >= 300 { - b, _ := io.ReadAll(httpResponse.Body) - return fmt.Errorf("http %d: %s", httpResponse.StatusCode, string(b)) + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + b, _ := io.ReadAll(resp.Body) + return fmt.Errorf("http %d: %s", resp.StatusCode, string(b)) } + defer resp.Body.Close() if out != nil { - var resp web.Response[json.RawMessage] - if err := json.NewDecoder(httpResponse.Body).Decode(&resp); err != nil { - return fmt.Errorf("decode response wrapper: %w", err) - } - - if !resp.Ok { - return fmt.Errorf("error on the remote: %w", errors.New(resp.Err)) - } - - if err := json.Unmarshal(resp.Data, out); err != nil { - return fmt.Errorf("decode response body: %w", err) + if err := json.NewDecoder(resp.Body).Decode(out); err != nil { + return fmt.Errorf("decode body: %w", err) } } - io.Copy(io.Discard, httpResponse.Body) + io.Copy(io.Discard, resp.Body) return nil } -func New(middleware middleware.Middleware) *Client { - return &Client{ +func Init(mw middleware.Middleware) { + if defaultClient != nil { + panic("web.client: Init called twice") + } + + defaultClient = &client{ http: &http.Client{ Timeout: timeout, }, - middleware: middleware, + middleware: mw, } } + +func request[Out any, In any](method string, host string, path types.Path, data In) (*Out, error) { + out := &types.Response[Out]{} + err := defaultClient.makeRequest(method, host, path, data, out) + if err != nil { + return nil, err + } + + if !out.Ok { + return nil, errors.New(out.Err) + } + + return &out.Data, err +} + +// TODO: out should not be a pointer +func Get[Out any](host string, path types.Path) (*Out, error) { + return request[Out, any](http.MethodGet, host, path, nil) +} + +// TODO: out should not be a pointer +func Post[Out any, In any](host string, path types.Path, data In) (*Out, error) { + return request[Out](http.MethodPost, host, path, data) +} diff --git a/internal/web/server/server.go b/internal/web/server/server.go index 2e579cb..9ce1c53 100644 --- a/internal/web/server/server.go +++ b/internal/web/server/server.go @@ -5,8 +5,7 @@ import ( "io" "net/http" - "git.wzray.com/homelab/hivemind/internal/transport" - "git.wzray.com/homelab/hivemind/internal/transport/codec" + "git.wzray.com/homelab/hivemind/internal/types" "git.wzray.com/homelab/hivemind/internal/web/middleware" "github.com/rs/zerolog/log" ) @@ -16,7 +15,7 @@ type Server struct { httpServer http.Server } -func New(addr string, middleware middleware.Middleware) *Server { +func NewServer(addr string, middleware middleware.Middleware) *Server { mux := http.NewServeMux() s := &Server{ mux: mux, @@ -36,7 +35,7 @@ func (s *Server) Shutdown(ctx context.Context) error { return s.httpServer.Shutdown(ctx) } -func (s *Server) handleFunc(route transport.Handler) func(w http.ResponseWriter, r *http.Request) { +func (s *Server) handleFunc(route types.Route) func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) { log.Debug(). // TODO: make this a middleware Str("method", r.Method). @@ -47,36 +46,35 @@ func (s *Server) handleFunc(route transport.Handler) func(w http.ResponseWriter, w.Header().Set("Content-Type", "application/json; charset=utf-8") - raw, err := io.ReadAll(r.Body) + body, err := io.ReadAll(r.Body) if err != nil { w.Write(fail("read request body: %v", err)) log.Err(err).Msg("unable to read request body") return } - resp, err := route.Handle(codec.JSON, raw) + raw, err := route.Handle(body) if err != nil { w.Write(fail("handle request: %v", err)) log.Err(err).Msg("unable to handle request") return } - payload, err := ok(resp) + data, err := ok(raw) if err != nil { w.Write(fail("marshal response: %v", err)) log.Err(err).Msg("unable to marshal response") return } - w.Write(payload) + w.Write(data) } } -func (s *Server) Register(endpoint transport.Handler) { +func (s *Server) Register(endpoint types.Route) { s.mux.HandleFunc(endpoint.Path(), s.handleFunc(endpoint)) } -// TODO: i don't think that I need this? func (s *Server) RegisterRaw(method string, pattern string, handler func(http.ResponseWriter, *http.Request)) { s.mux.HandleFunc(method+" "+pattern, handler) } diff --git a/internal/web/server/util.go b/internal/web/server/util.go index 34bd7d3..6ba2288 100644 --- a/internal/web/server/util.go +++ b/internal/web/server/util.go @@ -4,20 +4,20 @@ import ( "encoding/json" "fmt" - "git.wzray.com/homelab/hivemind/internal/web" + "git.wzray.com/homelab/hivemind/internal/types" ) func fail(format string, a ...any) []byte { - r, _ := json.Marshal(web.Response[string]{ + r, _ := json.Marshal(types.Response[string]{ Ok: false, Err: fmt.Sprintf(format, a...), }) return r } -func ok(data []byte) ([]byte, error) { - return json.Marshal(web.Response[json.RawMessage]{ +func ok[T any](data T) ([]byte, error) { + return json.Marshal(types.Response[T]{ Ok: true, - Data: json.RawMessage(data), + Data: data, }) } diff --git a/internal/web/types.go b/internal/web/types.go deleted file mode 100644 index d9ee3b4..0000000 --- a/internal/web/types.go +++ /dev/null @@ -1,7 +0,0 @@ -package web - -type Response[T any] struct { - Ok bool `json:"ok"` - Data T `json:"data,omitempty"` - Err string `json:"err,omitempty"` -}