1
0
Fork 0

Compare commits

...

3 commits

43 changed files with 914 additions and 412 deletions

View file

@ -4,6 +4,7 @@ changed="$(gofmt -d .)"
[ -n "$changed" ] && {
echo 'Some files are not formatted'
delta <<< "$changed"
go fmt ./...
exit 1
}
go vet ./...

View file

@ -15,6 +15,7 @@ RUN --mount=type=cache,target="/cache/go-build" mkdir -p /cache/go-build; make h
FROM alpine
EXPOSE 56714/tcp
EXPOSE 56715/tcp
WORKDIR /app
VOLUME /conf

55
TODO.md
View file

@ -1,3 +1,58 @@
# Background
Some background first:
the node can have multiple roles
this includes (but not limited to)
* Host (can generate events)
* DNS (can consume the events and act on them)
* Something else that I might come up with (the architecture has to be expandable)
# Control pane (3+ nodes)
* Quorum
* Consists of $n / 2 + 1$ nodes
* Cluster is considered "degraded" if no quorum can be created
* Stores an event log
* **Only** leader can append to the log (with quorum permission)
* Membership authority
* No joins without quorum approval
* Leaves are not propagated without quorum
* Manages epoch (useful for GC)
* Node $N$ with $N.epoch != cluster.epoch$ can **not** join the cluster, and has to re-join (bootstrap)
* Can (but doesn't have to) be a bootstrap point
# Membership
* Membership is managed though SWIM
* Each node contains a small slice of the entire network
## Joining
Each node has an array of roles:
1. That it performs
2. That it requires to operate (can be moved out to the master, or the shared type)
3. That it needs for bootstrapping (analogous to 2.)
Node can join via a master or via other nodes
When a node requests to join, the responder makes a request to the CP and asks for a permission to add this node
* If master allows
1. The node gets a membership digest from the CP.
2. The node *can* be brought up to speed using it's neighbors from 1.
3. Node join event gets broadcasted over SWIM gossiping
* Otherwise, nothing happens
# Host node
## Bootstrap
Host node requests `dns` nodes on join (and other node types, such as `ns`, `nginx`, etc... They should really be called something like `dns_processor`, and the internals (how it processes the dns) should not be visible to the cluster, but that's a task for a future me)
When a new update occurs, it sends the update to *some* `dns` hosts.
# DNS node
## Bootstrap
First, it gets all the available `hosts` from the CP
Then it requests their configs and sets map[hostName]seq accordingly
## Simple join (when other nodes exist)
It requests it's config from other nodes and that's it
<!-- TODO: finish the TODO file lol -->
# Minor To-Do
- auth middleware lol
- move request logging out of the request handling into a middleware
- nginx role
- think about choosing the master for the keepalive message (should be somewhat load-balanced)
- hivemind lite should not just print `hivemind-lite` lol

View file

@ -10,6 +10,7 @@ import (
"strings"
"syscall"
"git.wzray.com/homelab/hivemind/internal/app"
"git.wzray.com/homelab/hivemind/internal/config"
"git.wzray.com/homelab/hivemind/internal/registry"
"git.wzray.com/homelab/hivemind/internal/roles"
@ -17,11 +18,14 @@ import (
"git.wzray.com/homelab/hivemind/internal/roles/host"
"git.wzray.com/homelab/hivemind/internal/roles/master"
"git.wzray.com/homelab/hivemind/internal/roles/node"
"git.wzray.com/homelab/hivemind/internal/state"
dns_transport "git.wzray.com/homelab/hivemind/internal/transport/dns"
host_transport "git.wzray.com/homelab/hivemind/internal/transport/host"
master_transport "git.wzray.com/homelab/hivemind/internal/transport/master"
node_transport "git.wzray.com/homelab/hivemind/internal/transport/node"
"git.wzray.com/homelab/hivemind/internal/types"
"git.wzray.com/homelab/hivemind/internal/web/client"
"git.wzray.com/homelab/hivemind/internal/web/middleware"
"git.wzray.com/homelab/hivemind/internal/web/server"
web_client "git.wzray.com/homelab/hivemind/internal/web/client"
web_middleware "git.wzray.com/homelab/hivemind/internal/web/middleware"
web_server "git.wzray.com/homelab/hivemind/internal/web/server"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"github.com/rs/zerolog/pkgerrors"
@ -87,17 +91,22 @@ func main() {
filestore.EnsureExists()
registry := registry.New(filestore, self)
state := state.New(registry, self)
nodeRole := node.New(state, configuration.Node)
var builder middleware.MiddlewareBuilder
var builder web_middleware.MiddlewareBuilder
middlewares := builder.Prepare()
client.Init(middlewares)
client := web_client.New(middlewares)
listenAddr := fmt.Sprintf("%v:%v", configuration.Node.ListenOn, configuration.Node.Port)
server := server.NewServer(listenAddr, middlewares)
server := web_server.New(listenAddr, middlewares)
state := app.NewState(registry, self, app.Clients{
Master: master_transport.New(client),
DNS: dns_transport.New(client),
Host: host_transport.New(client),
Node: node_transport.New(client),
})
nodeRole := node.New(state, configuration.Node)
roles := make([]roles.Role, 0)
roles = append(roles, nodeRole)

6
go.mod
View file

@ -2,10 +2,12 @@ module git.wzray.com/homelab/hivemind
go 1.25.5
require github.com/rs/zerolog v1.34.0
require (
github.com/rs/zerolog v1.34.0
github.com/BurntSushi/toml v1.6.0
)
require (
github.com/BurntSushi/toml v1.6.0 // indirect
github.com/mattn/go-colorable v0.1.14 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/pkg/errors v0.9.1 // indirect

35
internal/app/state.go Normal file
View file

@ -0,0 +1,35 @@
package app
import (
"git.wzray.com/homelab/hivemind/internal/registry"
"git.wzray.com/homelab/hivemind/internal/transport/dns"
"git.wzray.com/homelab/hivemind/internal/transport/host"
"git.wzray.com/homelab/hivemind/internal/transport/master"
"git.wzray.com/homelab/hivemind/internal/transport/node"
"git.wzray.com/homelab/hivemind/internal/types"
)
type Clients struct {
Master *master.Client
DNS *dns.Client
Host *host.Client
Node *node.Client
}
type State struct {
Registry *registry.Registry
Self types.Node
Clients Clients
}
func NewState(
registry *registry.Registry,
self types.Node,
clients Clients,
) *State {
return &State{
Registry: registry,
Self: self,
Clients: clients,
}
}

View file

@ -14,5 +14,8 @@ var DefaultConfig = Config{
BackoffCount: 4,
NodeTimeout: 120,
},
Host: HostConfig{
ListenAddress: "0.0.0.0:56715",
},
},
}

View file

@ -12,6 +12,7 @@ type HostConfig struct {
LocalAddress string `toml:"local_address"`
InternalEntrypoint string `toml:"internal_entrypoint"`
ExternalEntrypoint string `toml:"external_entrypoint"`
ListenAddress string `toml:"listen_address"`
baseRoleConfig
}
@ -40,6 +41,10 @@ func (c HostConfig) Validate() error {
return errors.New("missing external entrypoint")
}
if c.ListenAddress == "" {
return errors.New("missing listen address")
}
return nil
}
@ -67,4 +72,8 @@ func (c *HostConfig) Merge(other HostConfig) {
if other.ExternalEntrypoint != "" {
c.ExternalEntrypoint = other.ExternalEntrypoint
}
if other.ListenAddress != "" {
c.ListenAddress = other.ListenAddress
}
}

View file

@ -53,8 +53,12 @@ func (c NodeConfig) Validate() error {
return errors.New("invalid keepalive_interval")
}
if c.ListenOn == "" {
return errors.New("missing listen_on")
}
if net.ParseIP(c.ListenOn) == nil {
return errors.New("invalid listen_on")
return fmt.Errorf("invalid listen_on: %v", c.ListenOn)
}
return nil

View file

@ -8,22 +8,23 @@ import (
"strings"
"sync"
"git.wzray.com/homelab/hivemind/internal/app"
"git.wzray.com/homelab/hivemind/internal/config"
"git.wzray.com/homelab/hivemind/internal/state"
"git.wzray.com/homelab/hivemind/internal/transport"
"git.wzray.com/homelab/hivemind/internal/transport/dns"
"git.wzray.com/homelab/hivemind/internal/types"
"git.wzray.com/homelab/hivemind/internal/web/client"
"github.com/rs/zerolog/log"
)
const hostsDir = "/etc/hosts.d/"
type Role struct {
state *state.RuntimeState
state *app.State
config config.DnsConfig
group sync.WaitGroup
}
func New(state *state.RuntimeState, config config.DnsConfig) *Role {
func New(state *app.State, config config.DnsConfig) *Role {
r := &Role{
state: state,
config: config,
@ -32,28 +33,6 @@ func New(state *state.RuntimeState, config config.DnsConfig) *Role {
return r
}
func (r *Role) updateDnsmasq(filename string, data []byte) error {
if err := os.WriteFile(filename, data, 0644); err != nil {
return fmt.Errorf("write endpoint file %q: %w", filename, err)
}
if err := r.reload(); err != nil {
return fmt.Errorf("reload dnsmasq: %w", err)
}
return nil
}
func parseState(state types.HostState) (string, []byte) {
var builder strings.Builder
for _, d := range state.Domains {
builder.WriteString(fmt.Sprintf("%s %s\n", state.Address, d))
}
return hostsDir + state.Hostname, []byte(builder.String())
}
func (r *Role) OnStartup(ctx context.Context) error {
r.group.Go(func() {
r.syncFromRegistry()
@ -74,15 +53,46 @@ func (r *Role) OnStartup(ctx context.Context) error {
return nil
}
func (r *Role) OnShutdown() error {
r.group.Wait()
return nil
}
func (r *Role) RegisterHandlers(rg transport.Registrator) {
dns.Register(rg, r)
}
func (r *Role) Callback(state types.HostState) (bool, error) {
filename, data := parseState(state)
if err := r.updateDnsmasq(filename, data); err != nil {
return false, err
}
return true, nil
}
func (r *Role) updateDnsmasq(filename string, data []byte) error {
if err := os.WriteFile(filename, data, 0644); err != nil {
return fmt.Errorf("write endpoint file %q: %w", filename, err)
}
if err := r.reload(); err != nil {
return fmt.Errorf("reload dnsmasq: %w", err)
}
return nil
}
func (r *Role) syncFromRegistry() {
for _, n := range r.state.Registry.ByRole(types.HostRole) {
state, err := client.Get[types.HostState](n.Endpoint, types.PathHostDns)
state, err := r.state.Clients.Host.Dns(n.Endpoint)
if err != nil {
log.Warn().Str("name", n.Hostname).Err(err).Msg("unable to get host config")
continue
}
filename, data := parseState(*state)
filename, data := parseState(state)
if err := r.updateDnsmasq(filename, data); err != nil {
log.Warn().Str("name", n.Hostname).Err(err).Msg("unable to update dnsmasq")
continue
@ -90,11 +100,6 @@ func (r *Role) syncFromRegistry() {
}
}
func (r *Role) OnShutdown() error {
r.group.Wait()
return nil
}
func (r *Role) reload() error {
var err error
@ -107,16 +112,12 @@ func (r *Role) reload() error {
return err
}
func (r *Role) onCallback(state types.HostState) (bool, error) {
filename, data := parseState(state)
func parseState(state types.HostState) (string, []byte) {
var builder strings.Builder
if err := r.updateDnsmasq(filename, data); err != nil {
return false, err
for _, d := range state.Domains {
builder.WriteString(fmt.Sprintf("%s %s\n", state.Address, d))
}
return true, nil
}
func (r *Role) RegisterHandlers(rg types.Registrator) {
rg.Register(types.PostEndpoint(types.PathDnsCallback, r.onCallback))
return hostsDir + state.Hostname, []byte(builder.String())
}

View file

@ -0,0 +1,96 @@
package host
import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"git.wzray.com/homelab/hivemind/internal/config"
"github.com/rs/zerolog/log"
)
type TraefikListener interface {
OnTraefikUpdate(traefikResponse)
}
type TraefikGateway struct {
client *http.Client
server *http.Server
listener TraefikListener
address url.URL
domain string
}
func NewTraefikGateway(cfg config.HostConfig, listener TraefikListener) *TraefikGateway {
mux := http.NewServeMux()
gw := &TraefikGateway{
client: &http.Client{},
server: &http.Server{
Addr: cfg.ListenAddress,
Handler: mux,
},
listener: listener,
address: url.URL{
Scheme: "http",
Host: cfg.LocalAddress,
},
domain: cfg.Domain,
}
mux.HandleFunc("/callback", gw.onCallback)
return gw
}
func (g *TraefikGateway) Listen() error {
return g.server.ListenAndServe()
}
func (g *TraefikGateway) Shutdown(ctx context.Context) error {
return g.server.Shutdown(ctx)
}
func (g *TraefikGateway) GetRawData() (*traefikResponse, error) {
var raw TraefikRawResponse
url := g.address
url.Path = "/api/rawdata"
req := http.Request{
Method: http.MethodGet,
URL: &url,
}
req.Host = g.domain
r, err := g.client.Do(&req)
if err != nil {
return nil, fmt.Errorf("make request: %w", err)
}
defer r.Body.Close()
if err := json.NewDecoder(r.Body).Decode(&raw); err != nil {
return nil, fmt.Errorf("unmarshal body: %w", err)
}
out := parseTraefikResponse(raw)
return &out, nil
}
func (g *TraefikGateway) onCallback(w http.ResponseWriter, req *http.Request) {
var raw TraefikRawResponse
if err := json.NewDecoder(req.Body).Decode(&raw); err != nil {
w.WriteHeader(http.StatusInternalServerError)
log.Err(err).Msg("unable to decode traefik callback data")
return
}
resp := parseTraefikResponse(raw)
if g.listener != nil {
g.listener.OnTraefikUpdate(resp)
}
w.Write([]byte("OK"))
}

View file

@ -2,36 +2,37 @@ package host
import (
"context"
"encoding/json"
"fmt"
"net/http"
"slices"
"sync"
"git.wzray.com/homelab/hivemind/internal/app"
"git.wzray.com/homelab/hivemind/internal/config"
"git.wzray.com/homelab/hivemind/internal/state"
"git.wzray.com/homelab/hivemind/internal/transport"
"git.wzray.com/homelab/hivemind/internal/transport/host"
"git.wzray.com/homelab/hivemind/internal/types"
"git.wzray.com/homelab/hivemind/internal/web/client"
"github.com/rs/zerolog/log"
)
type Role struct {
state *state.RuntimeState
state *app.State
config config.HostConfig
client *traefikClient
gateway *TraefikGateway
tasksGroup sync.WaitGroup
externalDomains []string // TODO: i don't like hardcoding external/internal logic here
internalDomains []string
}
func New(state *state.RuntimeState, config config.HostConfig) *Role {
return &Role{
client: newClient(config.Domain, config.LocalAddress),
func New(state *app.State, config config.HostConfig) *Role {
r := &Role{
state: state,
config: config,
}
r.gateway = NewTraefikGateway(config, r)
return r
}
func (r *Role) sendUpdate(domains []string, role types.Role) {
@ -45,7 +46,7 @@ func (r *Role) sendUpdate(domains []string, role types.Role) {
r.tasksGroup.Go(func() {
logger := log.With().Str("name", node.Hostname).Logger()
logger.Debug().Msg("sending update")
if _, err := client.Post[any](node.Endpoint, types.PathDnsCallback, state); err != nil {
if _, err := r.state.Clients.DNS.Callback(node.Endpoint, state); err != nil {
logger.Warn().Err(err).Msg("unable to send dns info")
} else {
logger.Debug().Msg("update sent")
@ -54,7 +55,7 @@ func (r *Role) sendUpdate(domains []string, role types.Role) {
}
}
func (r *Role) mutateState(resp traefikResponse) {
func (r *Role) OnTraefikUpdate(resp traefikResponse) {
newInternal := resp.Domains(r.config.InternalEntrypoint)
newExternal := resp.Domains(r.config.ExternalEntrypoint)
@ -71,20 +72,7 @@ func (r *Role) mutateState(resp traefikResponse) {
}
}
func (r *Role) onCallback(w http.ResponseWriter, req *http.Request) {
var resp traefikResponse
if err := json.NewDecoder(req.Body).Decode(&resp); err != nil {
w.WriteHeader(http.StatusInternalServerError)
log.Err(err).Msg("unable to decode traefik callback data")
return
}
r.mutateState(resp)
w.Write([]byte("OK"))
}
func (r *Role) getInternal() (types.HostState, error) {
func (r *Role) Dns() (types.HostState, error) {
return types.HostState{
Domains: r.internalDomains,
Address: r.config.IpAddress,
@ -92,7 +80,7 @@ func (r *Role) getInternal() (types.HostState, error) {
}, nil
}
func (r *Role) getExternal() (types.HostState, error) {
func (r *Role) Nameserver() (types.HostState, error) {
return types.HostState{
Domains: r.externalDomains,
Address: r.config.IpAddress,
@ -101,14 +89,25 @@ func (r *Role) getExternal() (types.HostState, error) {
}
func (r *Role) RegisterHandlers(rg types.Registrator) {
rg.RegisterRaw(http.MethodPost, types.PathHostCallback.String(), r.onCallback)
rg.Register(types.GetEndpoint(types.PathHostDns, r.getInternal))
rg.Register(types.GetEndpoint(types.PathHostNs, r.getExternal))
func (r *Role) RegisterHandlers(rg transport.Registrator) {
host.Register(rg, r)
}
func (r *Role) OnStartup(ctx context.Context) error {
resp, err := r.client.GetRawData()
r.tasksGroup.Go(func() {
if err := r.gateway.Listen(); err != nil {
log.Err(err).Msg("traefik gateway stopped")
}
})
r.tasksGroup.Go(func() {
<-ctx.Done()
if err := r.gateway.Shutdown(context.Background()); err != nil {
log.Err(err).Msg("failed to shutdown traefik gateway")
}
})
resp, err := r.gateway.GetRawData()
if err != nil {
return fmt.Errorf("get traefik state: %w", err)
}
@ -116,7 +115,7 @@ func (r *Role) OnStartup(ctx context.Context) error {
log.Info().Msg("got raw data from traefik")
log.Debug().Interface("response", resp).Send()
r.mutateState(*resp)
r.OnTraefikUpdate(*resp)
return nil
}

View file

@ -1,58 +1 @@
package host
import (
"crypto/tls"
"encoding/json"
"fmt"
"net/http"
"net/url"
)
type traefikClient struct {
client *http.Client
domain string
address url.URL
}
func newClient(domain string, addr string) *traefikClient {
return &traefikClient{
domain: domain,
address: url.URL{
Scheme: "https",
Host: addr,
},
client: &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
ServerName: domain,
},
},
},
}
}
func (c *traefikClient) GetRawData() (*traefikResponse, error) {
var out traefikResponse
url := c.address
url.Path = "/api/rawdata"
req := http.Request{
Method: "GET",
URL: &url,
}
req.Host = c.domain
r, err := c.client.Do(&req)
if err != nil {
return nil, fmt.Errorf("make request: %w", err)
}
defer r.Body.Close()
if err := json.NewDecoder(r.Body).Decode(&out); err != nil {
return nil, fmt.Errorf("unmarshal body: %w", err)
}
return &out, nil
}

View file

@ -1,64 +1,66 @@
package host
import (
"encoding/json"
"regexp"
"slices"
)
var hostRegex = regexp.MustCompile("Host\\(`([^()`]+\\.[^()`]+)`\\)")
type rule struct {
type traefikRule struct {
Raw string
Domains []string
Valid bool
}
func (r *rule) UnmarshalJSON(data []byte) error {
r.Valid = false
type traefikRouter struct {
Rule traefikRule
Entrypoints []string
}
raw := ""
if err := json.Unmarshal(data, &raw); err != nil {
return err
type traefikResponse struct {
Routers []traefikRouter
}
type TraefikRawResponse struct {
Routers map[string]TraefikRawRouter `json:"routers"`
}
type TraefikRawRouter struct {
Rule string `json:"rule"`
Entrypoints []string `json:"entryPoints"`
}
func parseTraefikResponse(raw TraefikRawResponse) traefikResponse {
out := traefikResponse{
Routers: make([]traefikRouter, 0, len(raw.Routers)),
}
for _, router := range raw.Routers {
out.Routers = append(out.Routers, traefikRouter{
Rule: parseTraefikRule(router.Rule),
Entrypoints: router.Entrypoints,
})
}
return out
}
func parseTraefikRule(raw string) traefikRule {
rule := traefikRule{
Raw: raw,
}
matches := hostRegex.FindAllStringSubmatch(raw, -1)
for _, match := range matches {
if len(match) <= 1 {
continue
}
r.Domains = append(r.Domains, match[1:]...)
rule.Domains = append(rule.Domains, match[1:]...)
}
r.Valid = len(r.Domains) > 0
return nil
}
type router struct {
Rule rule `json:"rule"`
Entrypoints []string `json:"entryPoints"`
}
type traefikResponse struct {
Routers []router
}
func (r *traefikResponse) UnmarshalJSON(data []byte) error {
var raw struct {
Routers map[string]router `json:"routers"`
}
if err := json.Unmarshal(data, &raw); err != nil {
return err
}
for _, v := range raw.Routers {
r.Routers = append(r.Routers, v)
}
return nil
rule.Valid = len(rule.Domains) > 0
return rule
}
func (r traefikResponse) Domains(entrypoint string) []string {

View file

@ -4,22 +4,24 @@ import (
"context"
"sync"
"git.wzray.com/homelab/hivemind/internal/app"
"git.wzray.com/homelab/hivemind/internal/config"
"git.wzray.com/homelab/hivemind/internal/roles"
"git.wzray.com/homelab/hivemind/internal/state"
"git.wzray.com/homelab/hivemind/internal/transport"
"git.wzray.com/homelab/hivemind/internal/transport/master"
"git.wzray.com/homelab/hivemind/internal/types"
"git.wzray.com/homelab/hivemind/internal/web/client"
"github.com/rs/zerolog/log"
)
type Role struct {
state *state.RuntimeState
state *app.State
config config.MasterConfig
tasksGroup sync.WaitGroup
observer *observer
roles.BaseRole
}
func New(state *state.RuntimeState, config config.MasterConfig) *Role {
func New(state *app.State, config config.MasterConfig) *Role {
return &Role{
state: state,
config: config,
@ -36,7 +38,7 @@ func New(state *state.RuntimeState, config config.MasterConfig) *Role {
func (r *Role) OnStartup(ctx context.Context) error {
r.tasksGroup.Go(func() {
r.observer.Start(ctx, func(n types.Node) error {
_, err := r.onLeave(n)
_, err := r.onLeave(n, true)
return err
})
})
@ -49,48 +51,95 @@ func (r *Role) OnShutdown() error {
return nil
}
func (r *Role) notify(path types.Path, v any) {
for _, n := range r.state.Registry.Nodes() {
addr := n.Endpoint
r.tasksGroup.Go(func() {
client.Post[any](addr, path, v)
})
}
func (c *Role) RegisterHandlers(r transport.Registrator) {
master.Register(r, c)
}
func (r *Role) onJoin(node types.Node) (map[string]types.Node, error) {
func (r *Role) Heartbeat(node types.Node) (types.Nodes, error) {
return r.onKeepAlive(node, true)
}
func (r *Role) Join(node types.Node) (types.Nodes, error) {
return r.onJoin(node, true)
}
func (r *Role) Leave(node types.Node) error {
_, err := r.onLeave(node, true)
return err
}
func (r *Role) EventHeartbeat(node types.Node) (types.Nodes, error) {
return r.onKeepAlive(node, false)
}
func (r *Role) EventJoin(node types.Node) (types.Nodes, error) {
return r.onJoin(node, false)
}
func (r *Role) EventLeave(node types.Node) error {
_, err := r.onLeave(node, false)
return err
}
func (r *Role) onJoin(node types.Node, notify bool) (map[string]types.Node, error) {
if err := r.state.Registry.AddNode(node); err != nil {
return nil, err
}
r.notify(types.PathNodeJoin, node)
if notify {
propagate(r, noReturn(r.state.Clients.Master.EventJoin), node)
}
return r.state.Registry.AllNodes(), nil
}
func (r *Role) onLeave(node types.Node) (bool, error) {
func (r *Role) onLeave(node types.Node, notify bool) (bool, error) {
if err := r.state.Registry.RemoveNode(node); err != nil {
return false, err
}
r.notify(types.PathNodeLeave, node)
if notify {
propagate(r, r.state.Clients.Master.EventLeave, node)
}
return true, nil
}
func (r *Role) onKeepAlive(node types.Node) (bool, error) {
func (r *Role) onKeepAlive(node types.Node, notify bool) (map[string]types.Node, error) {
r.observer.onKeepAlive(node)
if ok := r.state.Registry.Exists(node.Hostname); !ok {
_, err := r.onJoin(node)
return true, err
// TODO: i don't like this side effect
if _, err := r.onJoin(node, true); err != nil {
log.Warn().Err(err).Msg("unable to add node to the registry from keepalive")
}
}
return false, nil
if notify {
propagate(r, noReturn(r.state.Clients.Master.EventHeartbeat), node)
}
func (c *Role) RegisterHandlers(r types.Registrator) {
r.Register(types.PostEndpoint(types.PathMasterJoin, c.onJoin))
r.Register(types.PostEndpoint(types.PathMasterLeave, c.onLeave))
r.Register(types.PostEndpoint(types.PathMasterKeepalive, c.onKeepAlive))
return r.state.Registry.AllNodes(), nil
}
func eventFunc[R any](fn func(types.Node, bool) (R, error), notify bool) func(types.Node) (R, error) {
return func(n types.Node) (R, error) {
return fn(n, notify)
}
}
func noReturn[T, V any](fn func(string, T) (V, error)) func(string, T) error {
return func(s string, t T) error {
_, err := fn(s, t)
return err
}
}
func propagate[T any](r *Role, handler func(string, T) error, v T) {
for _, n := range r.state.Registry.ByRole(types.MasterRole) {
addr := n.Endpoint
r.tasksGroup.Go(func() {
handler(addr, v)
})
}
}

View file

@ -5,15 +5,14 @@ import (
"sync"
"time"
"git.wzray.com/homelab/hivemind/internal/app"
"git.wzray.com/homelab/hivemind/internal/registry"
"git.wzray.com/homelab/hivemind/internal/state"
"git.wzray.com/homelab/hivemind/internal/types"
"git.wzray.com/homelab/hivemind/internal/web/client"
"github.com/rs/zerolog/log"
)
type observer struct {
state *state.RuntimeState
state *app.State
interval int
backoff int
backoffCount int
@ -23,7 +22,7 @@ type observer struct {
}
func newObserver(
state *state.RuntimeState,
state *app.State,
interval int,
backoff int,
backoffCount int,
@ -63,7 +62,7 @@ func (o *observer) pollNodes(ctx context.Context, onLeave func(types.Node) error
delay := time.Duration(o.backoff)
alive := false
for i := o.backoffCount - 1; i >= 0; i-- {
_, err := client.Get[any](n.Endpoint, types.PathNodeHealthcheck)
_, err := o.state.Clients.Node.Healthcheck(n.Endpoint)
if err == nil {
logger.Debug().Msg("node is alive")

View file

@ -7,26 +7,41 @@ import (
"sync"
"time"
"git.wzray.com/homelab/hivemind/internal/app"
"git.wzray.com/homelab/hivemind/internal/config"
"git.wzray.com/homelab/hivemind/internal/state"
"git.wzray.com/homelab/hivemind/internal/transport"
"git.wzray.com/homelab/hivemind/internal/transport/node"
"git.wzray.com/homelab/hivemind/internal/types"
"git.wzray.com/homelab/hivemind/internal/web/client"
"github.com/rs/zerolog/log"
)
type Role struct {
state *state.RuntimeState
state *app.State
keepaliveGroup sync.WaitGroup
config config.NodeConfig
}
func New(state *state.RuntimeState, config config.NodeConfig) *Role {
func New(state *app.State, config config.NodeConfig) *Role {
return &Role{
state: state,
config: config,
}
}
func (r *Role) OnStartup(ctx context.Context) error {
r.keepaliveGroup.Go(r.keepaliveFunc(ctx))
return nil
}
func (r *Role) OnShutdown() error {
r.keepaliveGroup.Wait()
return nil
}
func (n *Role) RegisterHandlers(r transport.Registrator) {
node.Register(r, n)
}
func (r *Role) Join(bootstrap string) error {
masters := make(map[string]struct{})
for _, node := range r.state.Registry.ByRole(types.MasterRole) {
@ -46,14 +61,14 @@ func (r *Role) Join(bootstrap string) error {
logger := log.With().Str("host", m).Logger()
logger.Debug().Msg("trying to join via master")
nodes, err := client.Post[map[string]types.Node](m, types.PathMasterJoin, r.state.Self)
nodes, err := r.state.Clients.Master.Join(m, r.state.Self)
if err != nil {
errs = append(errs, err)
logger.Debug().Err(err).Msg("unable to join")
continue
}
if err := r.state.Registry.Set(*nodes); err != nil {
if err := r.state.Registry.Set(nodes); err != nil {
logger.Debug().Err(err).Msg("unable to set master's nodes")
errs = append(errs, err)
continue
@ -76,8 +91,7 @@ func (r *Role) Leave() error {
logger := log.With().Str("name", m.Hostname).Logger()
logger.Debug().Msg("sending leave message")
_, err := client.Post[any](m.Endpoint, types.PathMasterLeave, r.state.Self)
if err != nil {
if err := r.state.Clients.Master.Leave(m.Endpoint, r.state.Self); err != nil {
logger.Debug().Err(err).Msg("unable to send leave message")
errs = append(errs, err)
continue
@ -90,15 +104,8 @@ func (r *Role) Leave() error {
return fmt.Errorf("unable to send leave message to any master: %w", errors.Join(errs...))
}
func (r *Role) OnStartup(ctx context.Context) error {
r.keepaliveGroup.Go(r.keepaliveFunc(ctx))
return nil
}
func (r *Role) OnShutdown() error {
r.keepaliveGroup.Wait()
return nil
func (r *Role) Healthcheck() (string, error) {
return "OK", nil
}
func (r *Role) keepaliveFunc(ctx context.Context) func() {
@ -107,11 +114,20 @@ func (r *Role) keepaliveFunc(ctx context.Context) func() {
logger := log.With().Str("name", m.Hostname).Logger()
logger.Debug().Msg("sending keepalive packet")
if _, err := client.Post[any](m.Endpoint, types.PathMasterKeepalive, r.state.Self); err != nil {
nodes, err := r.state.Clients.Master.Heartbeat(m.Endpoint, r.state.Self)
if err != nil {
logger.Info().Err(err).Msg("unable to send keepalive packet")
} else {
logger.Debug().Msg("keepalive packet sent")
continue
}
logger.Debug().Msg("keepalive packet sent")
if err := r.state.Registry.Set(nodes); err != nil {
logger.Warn().Err(err).Msg("unable to set masters nodes")
continue
}
break
}
}
@ -126,27 +142,3 @@ func (r *Role) keepaliveFunc(ctx context.Context) func() {
}
}
}
func (r *Role) onJoin(node types.Node) (bool, error) {
if err := r.state.Registry.AddNode(node); err != nil {
return false, err
}
return true, nil
}
func (r *Role) onLeave(node types.Node) (bool, error) {
if err := r.state.Registry.RemoveNode(node); err != nil {
return false, err
}
return true, nil
}
func healthcheck() (string, error) {
return "OK", nil
}
func (n *Role) RegisterHandlers(r types.Registrator) {
r.Register(types.GetEndpoint(types.PathNodeHealthcheck, healthcheck))
r.Register(types.PostEndpoint(types.PathNodeJoin, n.onJoin))
r.Register(types.PostEndpoint(types.PathNodeLeave, n.onLeave))
}

View file

@ -3,18 +3,18 @@ package roles
import (
"context"
"git.wzray.com/homelab/hivemind/internal/types"
"git.wzray.com/homelab/hivemind/internal/transport"
)
type Role interface {
RegisterHandlers(types.Registrator)
RegisterHandlers(transport.Registrator)
OnStartup(context.Context) error
OnShutdown() error
}
type BaseRole struct{}
func (r *BaseRole) RegisterHandlers(types.Registrator) {}
func (r *BaseRole) RegisterHandlers(transport.Registrator) {}
func (r *BaseRole) OnStartup(context.Context) error { return nil }

View file

@ -1,18 +0,0 @@
package state
import (
"git.wzray.com/homelab/hivemind/internal/registry"
"git.wzray.com/homelab/hivemind/internal/types"
)
type RuntimeState struct {
Registry *registry.Registry
Self types.Node
}
func New(r *registry.Registry, n types.Node) *RuntimeState {
return &RuntimeState{
Registry: r,
Self: n,
}
}

View file

@ -0,0 +1,5 @@
package transport
type Caller interface {
Call(host string, path string, data any, out any) error
}

View file

@ -0,0 +1,16 @@
package codec
type Codec interface {
Decode(data []byte, out any) error
Encode(data any) ([]byte, error)
}
func Decode[T any](c Codec, data []byte) (T, error) {
var out T
err := c.Decode(data, &out)
return out, err
}
func Encode[T any](c Codec, data T) ([]byte, error) {
return c.Encode(data)
}

View file

@ -0,0 +1,15 @@
package codec
import "encoding/json"
type jsonCodec struct{}
var JSON = jsonCodec{}
func (jsonCodec) Decode(data []byte, out any) error {
return json.Unmarshal(data, out)
}
func (jsonCodec) Encode(data any) ([]byte, error) {
return json.Marshal(data)
}

View file

@ -0,0 +1,20 @@
package dns
import (
"git.wzray.com/homelab/hivemind/internal/transport"
"git.wzray.com/homelab/hivemind/internal/types"
)
type Client struct {
caller transport.Caller
}
func New(caller transport.Caller) *Client {
return &Client{
caller: caller,
}
}
func (c *Client) Callback(endpoint string, state types.HostState) (bool, error) {
return callbackRoute.Call(c.caller, endpoint, state)
}

View file

@ -0,0 +1,14 @@
package dns
import (
"git.wzray.com/homelab/hivemind/internal/transport"
"git.wzray.com/homelab/hivemind/internal/types"
)
type DnsHandlers interface {
Callback(types.HostState) (bool, error)
}
func Register(registrator transport.Registrator, h DnsHandlers) {
callbackRoute.Register(registrator, h.Callback)
}

View file

@ -0,0 +1,10 @@
package dns
import (
"git.wzray.com/homelab/hivemind/internal/transport"
"git.wzray.com/homelab/hivemind/internal/types"
)
var (
callbackRoute = transport.NewRoute[types.HostState, bool]("/dns/callback")
)

View file

@ -0,0 +1,9 @@
package transport
func WithoutInput[T any](f func() (T, error)) func(struct{}) (T, error) {
return func(s struct{}) (T, error) { return f() }
}
func WithoutOutput[T any](f func(T) error) func(T) (struct{}, error) {
return func(t T) (struct{}, error) { return struct{}{}, f(t) }
}

View file

@ -0,0 +1,24 @@
package host
import (
"git.wzray.com/homelab/hivemind/internal/transport"
"git.wzray.com/homelab/hivemind/internal/types"
)
type Client struct {
caller transport.Caller
}
func New(caller transport.Caller) *Client {
return &Client{
caller: caller,
}
}
func (c *Client) Dns(endpoint string) (types.HostState, error) {
return dnsRoute.CallNoInput(c.caller, endpoint)
}
func (c *Client) Nameserver(endpoint string) (types.HostState, error) {
return nsRoute.CallNoInput(c.caller, endpoint)
}

View file

@ -0,0 +1,16 @@
package host
import (
"git.wzray.com/homelab/hivemind/internal/transport"
"git.wzray.com/homelab/hivemind/internal/types"
)
type HostHandlers interface {
Dns() (types.HostState, error)
Nameserver() (types.HostState, error)
}
func Register(registrator transport.Registrator, h HostHandlers) {
dnsRoute.Register(registrator, transport.WithoutInput(h.Dns))
nsRoute.Register(registrator, transport.WithoutInput(h.Nameserver))
}

View file

@ -0,0 +1,11 @@
package host
import (
"git.wzray.com/homelab/hivemind/internal/transport"
"git.wzray.com/homelab/hivemind/internal/types"
)
var (
dnsRoute = transport.NewRoute[struct{}, types.HostState]("/host/dns")
nsRoute = transport.NewRoute[struct{}, types.HostState]("/host/ns")
)

View file

@ -0,0 +1,40 @@
package master
import (
"git.wzray.com/homelab/hivemind/internal/transport"
"git.wzray.com/homelab/hivemind/internal/types"
)
type Client struct {
caller transport.Caller
}
func New(caller transport.Caller) *Client {
return &Client{
caller: caller,
}
}
func (c *Client) Heartbeat(endpoint string, n types.Node) (types.Nodes, error) {
return heartbeatRoute.Call(c.caller, endpoint, n)
}
func (c *Client) Join(endpoint string, n types.Node) (types.Nodes, error) {
return joinRoute.Call(c.caller, endpoint, n)
}
func (c *Client) Leave(endpoint string, n types.Node) error {
return leaveRoute.CallNoOutput(c.caller, endpoint, n)
}
func (c *Client) EventHeartbeat(endpoint string, n types.Node) (types.Nodes, error) {
return eventHeartbeatRoute.Call(c.caller, endpoint, n)
}
func (c *Client) EventJoin(endpoint string, n types.Node) (types.Nodes, error) {
return eventJoinRoute.Call(c.caller, endpoint, n)
}
func (c *Client) EventLeave(endpoint string, n types.Node) error {
return eventLeaveRoute.CallNoOutput(c.caller, endpoint, n)
}

View file

@ -0,0 +1,26 @@
package master
import (
"git.wzray.com/homelab/hivemind/internal/transport"
"git.wzray.com/homelab/hivemind/internal/types"
)
type MasterHandlers interface {
Heartbeat(types.Node) (types.Nodes, error)
Join(types.Node) (types.Nodes, error)
Leave(types.Node) error
EventHeartbeat(types.Node) (types.Nodes, error)
EventJoin(types.Node) (types.Nodes, error)
EventLeave(types.Node) error
}
func Register(registrator transport.Registrator, h MasterHandlers) {
heartbeatRoute.Register(registrator, h.Heartbeat)
joinRoute.Register(registrator, h.Join)
leaveRoute.Register(registrator, transport.WithoutOutput(h.Leave))
eventHeartbeatRoute.Register(registrator, h.EventHeartbeat)
eventJoinRoute.Register(registrator, h.EventJoin)
eventLeaveRoute.Register(registrator, transport.WithoutOutput(h.EventLeave))
}

View file

@ -0,0 +1,16 @@
package master
import (
"git.wzray.com/homelab/hivemind/internal/transport"
"git.wzray.com/homelab/hivemind/internal/types"
)
var (
heartbeatRoute = transport.NewRoute[types.Node, types.Nodes]("/master/heartbeat")
joinRoute = transport.NewRoute[types.Node, types.Nodes]("/master/join")
leaveRoute = transport.NewRoute[types.Node, struct{}]("/master/leave")
eventHeartbeatRoute = transport.NewRoute[types.Node, types.Nodes]("/master/event/heartbeat")
eventJoinRoute = transport.NewRoute[types.Node, types.Nodes]("/master/event/join")
eventLeaveRoute = transport.NewRoute[types.Node, struct{}]("/master/event/leave")
)

View file

@ -0,0 +1,17 @@
package node
import (
"git.wzray.com/homelab/hivemind/internal/transport"
)
type Client struct {
caller transport.Caller
}
func New(caller transport.Caller) *Client {
return &Client{caller: caller}
}
func (c *Client) Healthcheck(endpoint string) (string, error) {
return healthcheckRoute.CallNoInput(c.caller, endpoint)
}

View file

@ -0,0 +1,13 @@
package node
import (
"git.wzray.com/homelab/hivemind/internal/transport"
)
type NodeHandlers interface {
Healthcheck() (string, error)
}
func Register(registrator transport.Registrator, h NodeHandlers) {
healthcheckRoute.Register(registrator, transport.WithoutInput(h.Healthcheck))
}

View file

@ -0,0 +1,9 @@
package node
import (
"git.wzray.com/homelab/hivemind/internal/transport"
)
var (
healthcheckRoute = transport.NewRoute[struct{}, string]("/node/healthcheck")
)

View file

@ -0,0 +1,22 @@
package transport
import (
"git.wzray.com/homelab/hivemind/internal/transport/codec"
)
type Handler struct {
path string
handler func(codec.Codec, []byte) ([]byte, error)
}
func (h Handler) Path() string {
return h.path
}
func (h Handler) Handle(c codec.Codec, v []byte) ([]byte, error) {
return h.handler(c, v)
}
type Registrator interface {
Register(endpoint Handler)
}

View file

@ -0,0 +1,65 @@
package transport
import (
"fmt"
"git.wzray.com/homelab/hivemind/internal/transport/codec"
)
type route[In, Out any] struct {
path string
}
func NewRoute[In, Out any](path string) route[In, Out] {
return route[In, Out]{
path: path,
}
}
func routeToHandler[In, Out any](r route[In, Out], handler func(In) (Out, error)) Handler {
return Handler{
path: r.path,
handler: func(c codec.Codec, b []byte) ([]byte, error) {
data, err := codec.Decode[In](c, b)
if err != nil {
return nil, fmt.Errorf("unable to decode body: %w", err)
}
out, err := handler(data)
if err != nil {
return nil, fmt.Errorf("error while handling request: %w", err)
}
raw, err := codec.Encode(c, out)
if err != nil {
return nil, fmt.Errorf("unable to encode body: %w", err)
}
return raw, nil
},
}
}
func (e route[In, Out]) Call(caller Caller, host string, data In) (Out, error) {
var out Out
err := caller.Call(host, e.path, data, &out)
return out, err
}
func (e route[In, Out]) CallNoInput(caller Caller, host string) (Out, error) {
var out Out
err := caller.Call(host, e.path, struct{}{}, &out)
return out, err
}
func (e route[In, Out]) CallNoOutput(caller Caller, host string, data In) error {
return caller.Call(host, e.path, data, nil)
}
func (e route[In, Out]) Path() string {
return e.path
}
func (e route[In, Out]) Register(r Registrator, h func(In) (Out, error)) {
r.Register(routeToHandler(e, h))
}

View file

@ -2,13 +2,15 @@ package types
import "fmt"
type Nodes map[string]Node
// TODO: consider moving this type back to registry
type Node struct {
Hostname string `json:"hostname"`
Address string `json:"address"`
Port int `json:"port"`
Roles []Role `json:"roles"`
Endpoint string `json:"endpoint"`
Endpoint string `json:"endpoint"` // TODO: make endpoint into a separate type
}
func NewNode(

View file

@ -1,78 +1,60 @@
package types
import (
"encoding/json"
"fmt"
"net/http"
)
// TODO: split this up
type Path string
func (p Path) String() string {
return string(p)
}
const (
PathMasterJoin Path = "/master/join"
PathMasterLeave Path = "/master/leave"
PathMasterKeepalive Path = "/master/keepalive"
PathNodeHealthcheck Path = "/node/healthcheck"
PathNodeJoin Path = "/node/join"
PathNodeLeave Path = "/node/leave"
PathDnsCallback Path = "/dns/callback"
PathHostCallback Path = "/host/callback"
PathHostDns Path = "/host/dns"
PathHostNs Path = "/host/ns"
)
type Response[T any] struct {
Ok bool `json:"ok"`
Data T `json:"data,omitempty"`
Err string `json:"err,omitempty"`
}
type Route interface {
Path() string
Handle([]byte) (any, error)
}
type endpoint struct {
path string
handler func([]byte) (any, error)
}
func (e endpoint) Path() string { return e.path }
func (e endpoint) Handle(v []byte) (any, error) { return e.handler(v) }
func PostEndpoint[T any, V any](path Path, handler func(T) (V, error)) Route {
return endpoint{
path: "POST " + path.String(),
handler: func(a []byte) (any, error) {
var r T
if err := json.Unmarshal(a, &r); err != nil {
return nil, fmt.Errorf("unable to unmarshal json: %w", err)
}
return handler(r)
},
}
}
func GetEndpoint[T any](path Path, handler func() (T, error)) Route {
return endpoint{
path: "GET " + path.String(),
handler: func(a []byte) (any, error) {
return handler()
},
}
}
type Registrator interface {
Register(endpoint Route)
RegisterRaw(method string, pattern string, handler func(http.ResponseWriter, *http.Request))
}
// type Path string
//
// func (p Path) String() string {
// return string(p)
// }
//
// const (
// PathMasterJoin Path = "/master/join"
// PathMasterLeave Path = "/master/leave"
// PathMasterKeepalive Path = "/master/keepalive"
// PathMasterEventJoin Path = "/master/event_join"
// PathMasterEventLeave Path = "/master/event_leave"
// PathMasterEventKeepalive Path = "/master/event_keepalive"
//
// PathNodeHealthcheck Path = "/node/healthcheck"
//
// PathDnsCallback Path = "/dns/callback"
//
// PathHostCallback Path = "/host/callback"
// PathHostDns Path = "/host/dns"
// PathHostNs Path = "/host/ns"
// )
//
// type Route interface {
// Path() string
// Handle([]byte) (any, error)
// }
//
// type endpoint struct {
// path string
// handler func([]byte) (any, error)
// }
//
// func (e endpoint) Path() string { return e.path }
//
// func (e endpoint) Handle(v []byte) (any, error) { return e.handler(v) }
//
// func PostEndpoint[T any, V any](path Path, handler func(T) (V, error)) Route {
// return endpoint{
// path: "POST " + path.String(),
// handler: func(a []byte) (any, error) {
// var r T
// if err := json.Unmarshal(a, &r); err != nil {
// return nil, fmt.Errorf("unable to unmarshal json: %w", err)
// }
// return handler(r)
// },
// }
// }
//
// func GetEndpoint[T any](path Path, handler func() (T, error)) Route {
// return endpoint{
// path: "GET " + path.String(),
// handler: func(a []byte) (any, error) {
// return handler()
// },
// }
// }

View file

@ -10,20 +10,18 @@ import (
"net/url"
"time"
"git.wzray.com/homelab/hivemind/internal/types"
"git.wzray.com/homelab/hivemind/internal/web"
"git.wzray.com/homelab/hivemind/internal/web/middleware"
)
type client struct {
type Client struct {
http *http.Client
middleware middleware.Middleware
}
var defaultClient *client
const timeout = time.Duration(2) * time.Second
func (c *client) makeRequest(method string, host string, path types.Path, data any, out any) error {
func (c *Client) Call(host string, path string, data any, out any) error {
var body io.Reader
if data != nil {
raw, err := json.Marshal(data)
@ -36,10 +34,10 @@ func (c *client) makeRequest(method string, host string, path types.Path, data a
uri := (&url.URL{
Scheme: "http",
Host: host,
Path: path.String(),
Path: path,
}).String()
r, err := http.NewRequest(method, uri, body)
r, err := http.NewRequest("POST", uri, body)
if err != nil {
return fmt.Errorf("build http request: %w", err)
}
@ -52,60 +50,41 @@ func (c *client) makeRequest(method string, host string, path types.Path, data a
return fmt.Errorf("apply middleware: %w", err)
}
resp, err := c.http.Do(r)
httpResponse, err := c.http.Do(r)
if err != nil {
return fmt.Errorf("send request: %w", err)
}
defer httpResponse.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
b, _ := io.ReadAll(resp.Body)
return fmt.Errorf("http %d: %s", resp.StatusCode, string(b))
if httpResponse.StatusCode < 200 || httpResponse.StatusCode >= 300 {
b, _ := io.ReadAll(httpResponse.Body)
return fmt.Errorf("http %d: %s", httpResponse.StatusCode, string(b))
}
defer resp.Body.Close()
if out != nil {
if err := json.NewDecoder(resp.Body).Decode(out); err != nil {
return fmt.Errorf("decode body: %w", err)
var resp web.Response[json.RawMessage]
if err := json.NewDecoder(httpResponse.Body).Decode(&resp); err != nil {
return fmt.Errorf("decode response wrapper: %w", err)
}
if !resp.Ok {
return fmt.Errorf("error on the remote: %w", errors.New(resp.Err))
}
if err := json.Unmarshal(resp.Data, out); err != nil {
return fmt.Errorf("decode response body: %w", err)
}
}
io.Copy(io.Discard, resp.Body)
io.Copy(io.Discard, httpResponse.Body)
return nil
}
func Init(mw middleware.Middleware) {
if defaultClient != nil {
panic("web.client: Init called twice")
}
defaultClient = &client{
func New(middleware middleware.Middleware) *Client {
return &Client{
http: &http.Client{
Timeout: timeout,
},
middleware: mw,
middleware: middleware,
}
}
func request[Out any, In any](method string, host string, path types.Path, data In) (*Out, error) {
out := &types.Response[Out]{}
err := defaultClient.makeRequest(method, host, path, data, out)
if err != nil {
return nil, err
}
if !out.Ok {
return nil, errors.New(out.Err)
}
return &out.Data, err
}
// TODO: out should not be a pointer
func Get[Out any](host string, path types.Path) (*Out, error) {
return request[Out, any](http.MethodGet, host, path, nil)
}
// TODO: out should not be a pointer
func Post[Out any, In any](host string, path types.Path, data In) (*Out, error) {
return request[Out](http.MethodPost, host, path, data)
}

View file

@ -5,7 +5,8 @@ import (
"io"
"net/http"
"git.wzray.com/homelab/hivemind/internal/types"
"git.wzray.com/homelab/hivemind/internal/transport"
"git.wzray.com/homelab/hivemind/internal/transport/codec"
"git.wzray.com/homelab/hivemind/internal/web/middleware"
"github.com/rs/zerolog/log"
)
@ -15,7 +16,7 @@ type Server struct {
httpServer http.Server
}
func NewServer(addr string, middleware middleware.Middleware) *Server {
func New(addr string, middleware middleware.Middleware) *Server {
mux := http.NewServeMux()
s := &Server{
mux: mux,
@ -35,7 +36,7 @@ func (s *Server) Shutdown(ctx context.Context) error {
return s.httpServer.Shutdown(ctx)
}
func (s *Server) handleFunc(route types.Route) func(w http.ResponseWriter, r *http.Request) {
func (s *Server) handleFunc(route transport.Handler) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
log.Debug(). // TODO: make this a middleware
Str("method", r.Method).
@ -46,35 +47,36 @@ func (s *Server) handleFunc(route types.Route) func(w http.ResponseWriter, r *ht
w.Header().Set("Content-Type", "application/json; charset=utf-8")
body, err := io.ReadAll(r.Body)
raw, err := io.ReadAll(r.Body)
if err != nil {
w.Write(fail("read request body: %v", err))
log.Err(err).Msg("unable to read request body")
return
}
raw, err := route.Handle(body)
resp, err := route.Handle(codec.JSON, raw)
if err != nil {
w.Write(fail("handle request: %v", err))
log.Err(err).Msg("unable to handle request")
return
}
data, err := ok(raw)
payload, err := ok(resp)
if err != nil {
w.Write(fail("marshal response: %v", err))
log.Err(err).Msg("unable to marshal response")
return
}
w.Write(data)
w.Write(payload)
}
}
func (s *Server) Register(endpoint types.Route) {
func (s *Server) Register(endpoint transport.Handler) {
s.mux.HandleFunc(endpoint.Path(), s.handleFunc(endpoint))
}
// TODO: i don't think that I need this?
func (s *Server) RegisterRaw(method string, pattern string, handler func(http.ResponseWriter, *http.Request)) {
s.mux.HandleFunc(method+" "+pattern, handler)
}

View file

@ -4,20 +4,20 @@ import (
"encoding/json"
"fmt"
"git.wzray.com/homelab/hivemind/internal/types"
"git.wzray.com/homelab/hivemind/internal/web"
)
func fail(format string, a ...any) []byte {
r, _ := json.Marshal(types.Response[string]{
r, _ := json.Marshal(web.Response[string]{
Ok: false,
Err: fmt.Sprintf(format, a...),
})
return r
}
func ok[T any](data T) ([]byte, error) {
return json.Marshal(types.Response[T]{
func ok(data []byte) ([]byte, error) {
return json.Marshal(web.Response[json.RawMessage]{
Ok: true,
Data: data,
Data: json.RawMessage(data),
})
}

7
internal/web/types.go Normal file
View file

@ -0,0 +1,7 @@
package web
type Response[T any] struct {
Ok bool `json:"ok"`
Data T `json:"data,omitempty"`
Err string `json:"err,omitempty"`
}