1
0
Fork 0

Compare commits

...

3 commits

43 changed files with 914 additions and 412 deletions

View file

@ -4,6 +4,7 @@ changed="$(gofmt -d .)"
[ -n "$changed" ] && { [ -n "$changed" ] && {
echo 'Some files are not formatted' echo 'Some files are not formatted'
delta <<< "$changed" delta <<< "$changed"
go fmt ./...
exit 1 exit 1
} }
go vet ./... go vet ./...

View file

@ -15,6 +15,7 @@ RUN --mount=type=cache,target="/cache/go-build" mkdir -p /cache/go-build; make h
FROM alpine FROM alpine
EXPOSE 56714/tcp EXPOSE 56714/tcp
EXPOSE 56715/tcp
WORKDIR /app WORKDIR /app
VOLUME /conf VOLUME /conf

55
TODO.md
View file

@ -1,3 +1,58 @@
# Background
Some background first:
the node can have multiple roles
this includes (but not limited to)
* Host (can generate events)
* DNS (can consume the events and act on them)
* Something else that I might come up with (the architecture has to be expandable)
# Control pane (3+ nodes)
* Quorum
* Consists of $n / 2 + 1$ nodes
* Cluster is considered "degraded" if no quorum can be created
* Stores an event log
* **Only** leader can append to the log (with quorum permission)
* Membership authority
* No joins without quorum approval
* Leaves are not propagated without quorum
* Manages epoch (useful for GC)
* Node $N$ with $N.epoch != cluster.epoch$ can **not** join the cluster, and has to re-join (bootstrap)
* Can (but doesn't have to) be a bootstrap point
# Membership
* Membership is managed though SWIM
* Each node contains a small slice of the entire network
## Joining
Each node has an array of roles:
1. That it performs
2. That it requires to operate (can be moved out to the master, or the shared type)
3. That it needs for bootstrapping (analogous to 2.)
Node can join via a master or via other nodes
When a node requests to join, the responder makes a request to the CP and asks for a permission to add this node
* If master allows
1. The node gets a membership digest from the CP.
2. The node *can* be brought up to speed using it's neighbors from 1.
3. Node join event gets broadcasted over SWIM gossiping
* Otherwise, nothing happens
# Host node
## Bootstrap
Host node requests `dns` nodes on join (and other node types, such as `ns`, `nginx`, etc... They should really be called something like `dns_processor`, and the internals (how it processes the dns) should not be visible to the cluster, but that's a task for a future me)
When a new update occurs, it sends the update to *some* `dns` hosts.
# DNS node
## Bootstrap
First, it gets all the available `hosts` from the CP
Then it requests their configs and sets map[hostName]seq accordingly
## Simple join (when other nodes exist)
It requests it's config from other nodes and that's it
<!-- TODO: finish the TODO file lol -->
# Minor To-Do
- auth middleware lol
- move request logging out of the request handling into a middleware
- nginx role - nginx role
- think about choosing the master for the keepalive message (should be somewhat load-balanced) - think about choosing the master for the keepalive message (should be somewhat load-balanced)
- hivemind lite should not just print `hivemind-lite` lol - hivemind lite should not just print `hivemind-lite` lol

View file

@ -10,6 +10,7 @@ import (
"strings" "strings"
"syscall" "syscall"
"git.wzray.com/homelab/hivemind/internal/app"
"git.wzray.com/homelab/hivemind/internal/config" "git.wzray.com/homelab/hivemind/internal/config"
"git.wzray.com/homelab/hivemind/internal/registry" "git.wzray.com/homelab/hivemind/internal/registry"
"git.wzray.com/homelab/hivemind/internal/roles" "git.wzray.com/homelab/hivemind/internal/roles"
@ -17,11 +18,14 @@ import (
"git.wzray.com/homelab/hivemind/internal/roles/host" "git.wzray.com/homelab/hivemind/internal/roles/host"
"git.wzray.com/homelab/hivemind/internal/roles/master" "git.wzray.com/homelab/hivemind/internal/roles/master"
"git.wzray.com/homelab/hivemind/internal/roles/node" "git.wzray.com/homelab/hivemind/internal/roles/node"
"git.wzray.com/homelab/hivemind/internal/state" dns_transport "git.wzray.com/homelab/hivemind/internal/transport/dns"
host_transport "git.wzray.com/homelab/hivemind/internal/transport/host"
master_transport "git.wzray.com/homelab/hivemind/internal/transport/master"
node_transport "git.wzray.com/homelab/hivemind/internal/transport/node"
"git.wzray.com/homelab/hivemind/internal/types" "git.wzray.com/homelab/hivemind/internal/types"
"git.wzray.com/homelab/hivemind/internal/web/client" web_client "git.wzray.com/homelab/hivemind/internal/web/client"
"git.wzray.com/homelab/hivemind/internal/web/middleware" web_middleware "git.wzray.com/homelab/hivemind/internal/web/middleware"
"git.wzray.com/homelab/hivemind/internal/web/server" web_server "git.wzray.com/homelab/hivemind/internal/web/server"
"github.com/rs/zerolog" "github.com/rs/zerolog"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
"github.com/rs/zerolog/pkgerrors" "github.com/rs/zerolog/pkgerrors"
@ -87,17 +91,22 @@ func main() {
filestore.EnsureExists() filestore.EnsureExists()
registry := registry.New(filestore, self) registry := registry.New(filestore, self)
state := state.New(registry, self) var builder web_middleware.MiddlewareBuilder
nodeRole := node.New(state, configuration.Node)
var builder middleware.MiddlewareBuilder
middlewares := builder.Prepare() middlewares := builder.Prepare()
client.Init(middlewares) client := web_client.New(middlewares)
listenAddr := fmt.Sprintf("%v:%v", configuration.Node.ListenOn, configuration.Node.Port) listenAddr := fmt.Sprintf("%v:%v", configuration.Node.ListenOn, configuration.Node.Port)
server := server.NewServer(listenAddr, middlewares) server := web_server.New(listenAddr, middlewares)
state := app.NewState(registry, self, app.Clients{
Master: master_transport.New(client),
DNS: dns_transport.New(client),
Host: host_transport.New(client),
Node: node_transport.New(client),
})
nodeRole := node.New(state, configuration.Node)
roles := make([]roles.Role, 0) roles := make([]roles.Role, 0)
roles = append(roles, nodeRole) roles = append(roles, nodeRole)

6
go.mod
View file

@ -2,10 +2,12 @@ module git.wzray.com/homelab/hivemind
go 1.25.5 go 1.25.5
require github.com/rs/zerolog v1.34.0 require (
github.com/rs/zerolog v1.34.0
github.com/BurntSushi/toml v1.6.0
)
require ( require (
github.com/BurntSushi/toml v1.6.0 // indirect
github.com/mattn/go-colorable v0.1.14 // indirect github.com/mattn/go-colorable v0.1.14 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-isatty v0.0.20 // indirect
github.com/pkg/errors v0.9.1 // indirect github.com/pkg/errors v0.9.1 // indirect

35
internal/app/state.go Normal file
View file

@ -0,0 +1,35 @@
package app
import (
"git.wzray.com/homelab/hivemind/internal/registry"
"git.wzray.com/homelab/hivemind/internal/transport/dns"
"git.wzray.com/homelab/hivemind/internal/transport/host"
"git.wzray.com/homelab/hivemind/internal/transport/master"
"git.wzray.com/homelab/hivemind/internal/transport/node"
"git.wzray.com/homelab/hivemind/internal/types"
)
type Clients struct {
Master *master.Client
DNS *dns.Client
Host *host.Client
Node *node.Client
}
type State struct {
Registry *registry.Registry
Self types.Node
Clients Clients
}
func NewState(
registry *registry.Registry,
self types.Node,
clients Clients,
) *State {
return &State{
Registry: registry,
Self: self,
Clients: clients,
}
}

View file

@ -14,5 +14,8 @@ var DefaultConfig = Config{
BackoffCount: 4, BackoffCount: 4,
NodeTimeout: 120, NodeTimeout: 120,
}, },
Host: HostConfig{
ListenAddress: "0.0.0.0:56715",
},
}, },
} }

View file

@ -12,6 +12,7 @@ type HostConfig struct {
LocalAddress string `toml:"local_address"` LocalAddress string `toml:"local_address"`
InternalEntrypoint string `toml:"internal_entrypoint"` InternalEntrypoint string `toml:"internal_entrypoint"`
ExternalEntrypoint string `toml:"external_entrypoint"` ExternalEntrypoint string `toml:"external_entrypoint"`
ListenAddress string `toml:"listen_address"`
baseRoleConfig baseRoleConfig
} }
@ -40,6 +41,10 @@ func (c HostConfig) Validate() error {
return errors.New("missing external entrypoint") return errors.New("missing external entrypoint")
} }
if c.ListenAddress == "" {
return errors.New("missing listen address")
}
return nil return nil
} }
@ -67,4 +72,8 @@ func (c *HostConfig) Merge(other HostConfig) {
if other.ExternalEntrypoint != "" { if other.ExternalEntrypoint != "" {
c.ExternalEntrypoint = other.ExternalEntrypoint c.ExternalEntrypoint = other.ExternalEntrypoint
} }
if other.ListenAddress != "" {
c.ListenAddress = other.ListenAddress
}
} }

View file

@ -53,8 +53,12 @@ func (c NodeConfig) Validate() error {
return errors.New("invalid keepalive_interval") return errors.New("invalid keepalive_interval")
} }
if c.ListenOn == "" {
return errors.New("missing listen_on")
}
if net.ParseIP(c.ListenOn) == nil { if net.ParseIP(c.ListenOn) == nil {
return errors.New("invalid listen_on") return fmt.Errorf("invalid listen_on: %v", c.ListenOn)
} }
return nil return nil

View file

@ -8,22 +8,23 @@ import (
"strings" "strings"
"sync" "sync"
"git.wzray.com/homelab/hivemind/internal/app"
"git.wzray.com/homelab/hivemind/internal/config" "git.wzray.com/homelab/hivemind/internal/config"
"git.wzray.com/homelab/hivemind/internal/state" "git.wzray.com/homelab/hivemind/internal/transport"
"git.wzray.com/homelab/hivemind/internal/transport/dns"
"git.wzray.com/homelab/hivemind/internal/types" "git.wzray.com/homelab/hivemind/internal/types"
"git.wzray.com/homelab/hivemind/internal/web/client"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
) )
const hostsDir = "/etc/hosts.d/" const hostsDir = "/etc/hosts.d/"
type Role struct { type Role struct {
state *state.RuntimeState state *app.State
config config.DnsConfig config config.DnsConfig
group sync.WaitGroup group sync.WaitGroup
} }
func New(state *state.RuntimeState, config config.DnsConfig) *Role { func New(state *app.State, config config.DnsConfig) *Role {
r := &Role{ r := &Role{
state: state, state: state,
config: config, config: config,
@ -32,28 +33,6 @@ func New(state *state.RuntimeState, config config.DnsConfig) *Role {
return r return r
} }
func (r *Role) updateDnsmasq(filename string, data []byte) error {
if err := os.WriteFile(filename, data, 0644); err != nil {
return fmt.Errorf("write endpoint file %q: %w", filename, err)
}
if err := r.reload(); err != nil {
return fmt.Errorf("reload dnsmasq: %w", err)
}
return nil
}
func parseState(state types.HostState) (string, []byte) {
var builder strings.Builder
for _, d := range state.Domains {
builder.WriteString(fmt.Sprintf("%s %s\n", state.Address, d))
}
return hostsDir + state.Hostname, []byte(builder.String())
}
func (r *Role) OnStartup(ctx context.Context) error { func (r *Role) OnStartup(ctx context.Context) error {
r.group.Go(func() { r.group.Go(func() {
r.syncFromRegistry() r.syncFromRegistry()
@ -74,15 +53,46 @@ func (r *Role) OnStartup(ctx context.Context) error {
return nil return nil
} }
func (r *Role) OnShutdown() error {
r.group.Wait()
return nil
}
func (r *Role) RegisterHandlers(rg transport.Registrator) {
dns.Register(rg, r)
}
func (r *Role) Callback(state types.HostState) (bool, error) {
filename, data := parseState(state)
if err := r.updateDnsmasq(filename, data); err != nil {
return false, err
}
return true, nil
}
func (r *Role) updateDnsmasq(filename string, data []byte) error {
if err := os.WriteFile(filename, data, 0644); err != nil {
return fmt.Errorf("write endpoint file %q: %w", filename, err)
}
if err := r.reload(); err != nil {
return fmt.Errorf("reload dnsmasq: %w", err)
}
return nil
}
func (r *Role) syncFromRegistry() { func (r *Role) syncFromRegistry() {
for _, n := range r.state.Registry.ByRole(types.HostRole) { for _, n := range r.state.Registry.ByRole(types.HostRole) {
state, err := client.Get[types.HostState](n.Endpoint, types.PathHostDns) state, err := r.state.Clients.Host.Dns(n.Endpoint)
if err != nil { if err != nil {
log.Warn().Str("name", n.Hostname).Err(err).Msg("unable to get host config") log.Warn().Str("name", n.Hostname).Err(err).Msg("unable to get host config")
continue continue
} }
filename, data := parseState(*state) filename, data := parseState(state)
if err := r.updateDnsmasq(filename, data); err != nil { if err := r.updateDnsmasq(filename, data); err != nil {
log.Warn().Str("name", n.Hostname).Err(err).Msg("unable to update dnsmasq") log.Warn().Str("name", n.Hostname).Err(err).Msg("unable to update dnsmasq")
continue continue
@ -90,11 +100,6 @@ func (r *Role) syncFromRegistry() {
} }
} }
func (r *Role) OnShutdown() error {
r.group.Wait()
return nil
}
func (r *Role) reload() error { func (r *Role) reload() error {
var err error var err error
@ -107,16 +112,12 @@ func (r *Role) reload() error {
return err return err
} }
func (r *Role) onCallback(state types.HostState) (bool, error) { func parseState(state types.HostState) (string, []byte) {
filename, data := parseState(state) var builder strings.Builder
if err := r.updateDnsmasq(filename, data); err != nil { for _, d := range state.Domains {
return false, err builder.WriteString(fmt.Sprintf("%s %s\n", state.Address, d))
} }
return true, nil return hostsDir + state.Hostname, []byte(builder.String())
}
func (r *Role) RegisterHandlers(rg types.Registrator) {
rg.Register(types.PostEndpoint(types.PathDnsCallback, r.onCallback))
} }

View file

@ -0,0 +1,96 @@
package host
import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"git.wzray.com/homelab/hivemind/internal/config"
"github.com/rs/zerolog/log"
)
type TraefikListener interface {
OnTraefikUpdate(traefikResponse)
}
type TraefikGateway struct {
client *http.Client
server *http.Server
listener TraefikListener
address url.URL
domain string
}
func NewTraefikGateway(cfg config.HostConfig, listener TraefikListener) *TraefikGateway {
mux := http.NewServeMux()
gw := &TraefikGateway{
client: &http.Client{},
server: &http.Server{
Addr: cfg.ListenAddress,
Handler: mux,
},
listener: listener,
address: url.URL{
Scheme: "http",
Host: cfg.LocalAddress,
},
domain: cfg.Domain,
}
mux.HandleFunc("/callback", gw.onCallback)
return gw
}
func (g *TraefikGateway) Listen() error {
return g.server.ListenAndServe()
}
func (g *TraefikGateway) Shutdown(ctx context.Context) error {
return g.server.Shutdown(ctx)
}
func (g *TraefikGateway) GetRawData() (*traefikResponse, error) {
var raw TraefikRawResponse
url := g.address
url.Path = "/api/rawdata"
req := http.Request{
Method: http.MethodGet,
URL: &url,
}
req.Host = g.domain
r, err := g.client.Do(&req)
if err != nil {
return nil, fmt.Errorf("make request: %w", err)
}
defer r.Body.Close()
if err := json.NewDecoder(r.Body).Decode(&raw); err != nil {
return nil, fmt.Errorf("unmarshal body: %w", err)
}
out := parseTraefikResponse(raw)
return &out, nil
}
func (g *TraefikGateway) onCallback(w http.ResponseWriter, req *http.Request) {
var raw TraefikRawResponse
if err := json.NewDecoder(req.Body).Decode(&raw); err != nil {
w.WriteHeader(http.StatusInternalServerError)
log.Err(err).Msg("unable to decode traefik callback data")
return
}
resp := parseTraefikResponse(raw)
if g.listener != nil {
g.listener.OnTraefikUpdate(resp)
}
w.Write([]byte("OK"))
}

View file

@ -2,36 +2,37 @@ package host
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"net/http"
"slices" "slices"
"sync" "sync"
"git.wzray.com/homelab/hivemind/internal/app"
"git.wzray.com/homelab/hivemind/internal/config" "git.wzray.com/homelab/hivemind/internal/config"
"git.wzray.com/homelab/hivemind/internal/state" "git.wzray.com/homelab/hivemind/internal/transport"
"git.wzray.com/homelab/hivemind/internal/transport/host"
"git.wzray.com/homelab/hivemind/internal/types" "git.wzray.com/homelab/hivemind/internal/types"
"git.wzray.com/homelab/hivemind/internal/web/client"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
) )
type Role struct { type Role struct {
state *state.RuntimeState state *app.State
config config.HostConfig config config.HostConfig
client *traefikClient gateway *TraefikGateway
tasksGroup sync.WaitGroup tasksGroup sync.WaitGroup
externalDomains []string // TODO: i don't like hardcoding external/internal logic here externalDomains []string // TODO: i don't like hardcoding external/internal logic here
internalDomains []string internalDomains []string
} }
func New(state *state.RuntimeState, config config.HostConfig) *Role { func New(state *app.State, config config.HostConfig) *Role {
return &Role{ r := &Role{
client: newClient(config.Domain, config.LocalAddress),
state: state, state: state,
config: config, config: config,
} }
r.gateway = NewTraefikGateway(config, r)
return r
} }
func (r *Role) sendUpdate(domains []string, role types.Role) { func (r *Role) sendUpdate(domains []string, role types.Role) {
@ -45,7 +46,7 @@ func (r *Role) sendUpdate(domains []string, role types.Role) {
r.tasksGroup.Go(func() { r.tasksGroup.Go(func() {
logger := log.With().Str("name", node.Hostname).Logger() logger := log.With().Str("name", node.Hostname).Logger()
logger.Debug().Msg("sending update") logger.Debug().Msg("sending update")
if _, err := client.Post[any](node.Endpoint, types.PathDnsCallback, state); err != nil { if _, err := r.state.Clients.DNS.Callback(node.Endpoint, state); err != nil {
logger.Warn().Err(err).Msg("unable to send dns info") logger.Warn().Err(err).Msg("unable to send dns info")
} else { } else {
logger.Debug().Msg("update sent") logger.Debug().Msg("update sent")
@ -54,7 +55,7 @@ func (r *Role) sendUpdate(domains []string, role types.Role) {
} }
} }
func (r *Role) mutateState(resp traefikResponse) { func (r *Role) OnTraefikUpdate(resp traefikResponse) {
newInternal := resp.Domains(r.config.InternalEntrypoint) newInternal := resp.Domains(r.config.InternalEntrypoint)
newExternal := resp.Domains(r.config.ExternalEntrypoint) newExternal := resp.Domains(r.config.ExternalEntrypoint)
@ -71,20 +72,7 @@ func (r *Role) mutateState(resp traefikResponse) {
} }
} }
func (r *Role) onCallback(w http.ResponseWriter, req *http.Request) { func (r *Role) Dns() (types.HostState, error) {
var resp traefikResponse
if err := json.NewDecoder(req.Body).Decode(&resp); err != nil {
w.WriteHeader(http.StatusInternalServerError)
log.Err(err).Msg("unable to decode traefik callback data")
return
}
r.mutateState(resp)
w.Write([]byte("OK"))
}
func (r *Role) getInternal() (types.HostState, error) {
return types.HostState{ return types.HostState{
Domains: r.internalDomains, Domains: r.internalDomains,
Address: r.config.IpAddress, Address: r.config.IpAddress,
@ -92,7 +80,7 @@ func (r *Role) getInternal() (types.HostState, error) {
}, nil }, nil
} }
func (r *Role) getExternal() (types.HostState, error) { func (r *Role) Nameserver() (types.HostState, error) {
return types.HostState{ return types.HostState{
Domains: r.externalDomains, Domains: r.externalDomains,
Address: r.config.IpAddress, Address: r.config.IpAddress,
@ -101,14 +89,25 @@ func (r *Role) getExternal() (types.HostState, error) {
} }
func (r *Role) RegisterHandlers(rg types.Registrator) { func (r *Role) RegisterHandlers(rg transport.Registrator) {
rg.RegisterRaw(http.MethodPost, types.PathHostCallback.String(), r.onCallback) host.Register(rg, r)
rg.Register(types.GetEndpoint(types.PathHostDns, r.getInternal))
rg.Register(types.GetEndpoint(types.PathHostNs, r.getExternal))
} }
func (r *Role) OnStartup(ctx context.Context) error { func (r *Role) OnStartup(ctx context.Context) error {
resp, err := r.client.GetRawData() r.tasksGroup.Go(func() {
if err := r.gateway.Listen(); err != nil {
log.Err(err).Msg("traefik gateway stopped")
}
})
r.tasksGroup.Go(func() {
<-ctx.Done()
if err := r.gateway.Shutdown(context.Background()); err != nil {
log.Err(err).Msg("failed to shutdown traefik gateway")
}
})
resp, err := r.gateway.GetRawData()
if err != nil { if err != nil {
return fmt.Errorf("get traefik state: %w", err) return fmt.Errorf("get traefik state: %w", err)
} }
@ -116,7 +115,7 @@ func (r *Role) OnStartup(ctx context.Context) error {
log.Info().Msg("got raw data from traefik") log.Info().Msg("got raw data from traefik")
log.Debug().Interface("response", resp).Send() log.Debug().Interface("response", resp).Send()
r.mutateState(*resp) r.OnTraefikUpdate(*resp)
return nil return nil
} }

View file

@ -1,58 +1 @@
package host package host
import (
"crypto/tls"
"encoding/json"
"fmt"
"net/http"
"net/url"
)
type traefikClient struct {
client *http.Client
domain string
address url.URL
}
func newClient(domain string, addr string) *traefikClient {
return &traefikClient{
domain: domain,
address: url.URL{
Scheme: "https",
Host: addr,
},
client: &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
ServerName: domain,
},
},
},
}
}
func (c *traefikClient) GetRawData() (*traefikResponse, error) {
var out traefikResponse
url := c.address
url.Path = "/api/rawdata"
req := http.Request{
Method: "GET",
URL: &url,
}
req.Host = c.domain
r, err := c.client.Do(&req)
if err != nil {
return nil, fmt.Errorf("make request: %w", err)
}
defer r.Body.Close()
if err := json.NewDecoder(r.Body).Decode(&out); err != nil {
return nil, fmt.Errorf("unmarshal body: %w", err)
}
return &out, nil
}

View file

@ -1,64 +1,66 @@
package host package host
import ( import (
"encoding/json"
"regexp" "regexp"
"slices" "slices"
) )
var hostRegex = regexp.MustCompile("Host\\(`([^()`]+\\.[^()`]+)`\\)") var hostRegex = regexp.MustCompile("Host\\(`([^()`]+\\.[^()`]+)`\\)")
type rule struct { type traefikRule struct {
Raw string Raw string
Domains []string Domains []string
Valid bool Valid bool
} }
func (r *rule) UnmarshalJSON(data []byte) error { type traefikRouter struct {
r.Valid = false Rule traefikRule
Entrypoints []string
}
raw := "" type traefikResponse struct {
if err := json.Unmarshal(data, &raw); err != nil { Routers []traefikRouter
return err }
type TraefikRawResponse struct {
Routers map[string]TraefikRawRouter `json:"routers"`
}
type TraefikRawRouter struct {
Rule string `json:"rule"`
Entrypoints []string `json:"entryPoints"`
}
func parseTraefikResponse(raw TraefikRawResponse) traefikResponse {
out := traefikResponse{
Routers: make([]traefikRouter, 0, len(raw.Routers)),
}
for _, router := range raw.Routers {
out.Routers = append(out.Routers, traefikRouter{
Rule: parseTraefikRule(router.Rule),
Entrypoints: router.Entrypoints,
})
}
return out
}
func parseTraefikRule(raw string) traefikRule {
rule := traefikRule{
Raw: raw,
} }
matches := hostRegex.FindAllStringSubmatch(raw, -1) matches := hostRegex.FindAllStringSubmatch(raw, -1)
for _, match := range matches { for _, match := range matches {
if len(match) <= 1 { if len(match) <= 1 {
continue continue
} }
r.Domains = append(r.Domains, match[1:]...) rule.Domains = append(rule.Domains, match[1:]...)
} }
r.Valid = len(r.Domains) > 0 rule.Valid = len(rule.Domains) > 0
return rule
return nil
}
type router struct {
Rule rule `json:"rule"`
Entrypoints []string `json:"entryPoints"`
}
type traefikResponse struct {
Routers []router
}
func (r *traefikResponse) UnmarshalJSON(data []byte) error {
var raw struct {
Routers map[string]router `json:"routers"`
}
if err := json.Unmarshal(data, &raw); err != nil {
return err
}
for _, v := range raw.Routers {
r.Routers = append(r.Routers, v)
}
return nil
} }
func (r traefikResponse) Domains(entrypoint string) []string { func (r traefikResponse) Domains(entrypoint string) []string {

View file

@ -4,22 +4,24 @@ import (
"context" "context"
"sync" "sync"
"git.wzray.com/homelab/hivemind/internal/app"
"git.wzray.com/homelab/hivemind/internal/config" "git.wzray.com/homelab/hivemind/internal/config"
"git.wzray.com/homelab/hivemind/internal/roles" "git.wzray.com/homelab/hivemind/internal/roles"
"git.wzray.com/homelab/hivemind/internal/state" "git.wzray.com/homelab/hivemind/internal/transport"
"git.wzray.com/homelab/hivemind/internal/transport/master"
"git.wzray.com/homelab/hivemind/internal/types" "git.wzray.com/homelab/hivemind/internal/types"
"git.wzray.com/homelab/hivemind/internal/web/client" "github.com/rs/zerolog/log"
) )
type Role struct { type Role struct {
state *state.RuntimeState state *app.State
config config.MasterConfig config config.MasterConfig
tasksGroup sync.WaitGroup tasksGroup sync.WaitGroup
observer *observer observer *observer
roles.BaseRole roles.BaseRole
} }
func New(state *state.RuntimeState, config config.MasterConfig) *Role { func New(state *app.State, config config.MasterConfig) *Role {
return &Role{ return &Role{
state: state, state: state,
config: config, config: config,
@ -36,7 +38,7 @@ func New(state *state.RuntimeState, config config.MasterConfig) *Role {
func (r *Role) OnStartup(ctx context.Context) error { func (r *Role) OnStartup(ctx context.Context) error {
r.tasksGroup.Go(func() { r.tasksGroup.Go(func() {
r.observer.Start(ctx, func(n types.Node) error { r.observer.Start(ctx, func(n types.Node) error {
_, err := r.onLeave(n) _, err := r.onLeave(n, true)
return err return err
}) })
}) })
@ -49,48 +51,95 @@ func (r *Role) OnShutdown() error {
return nil return nil
} }
func (r *Role) notify(path types.Path, v any) { func (c *Role) RegisterHandlers(r transport.Registrator) {
for _, n := range r.state.Registry.Nodes() { master.Register(r, c)
addr := n.Endpoint
r.tasksGroup.Go(func() {
client.Post[any](addr, path, v)
})
}
} }
func (r *Role) onJoin(node types.Node) (map[string]types.Node, error) { func (r *Role) Heartbeat(node types.Node) (types.Nodes, error) {
return r.onKeepAlive(node, true)
}
func (r *Role) Join(node types.Node) (types.Nodes, error) {
return r.onJoin(node, true)
}
func (r *Role) Leave(node types.Node) error {
_, err := r.onLeave(node, true)
return err
}
func (r *Role) EventHeartbeat(node types.Node) (types.Nodes, error) {
return r.onKeepAlive(node, false)
}
func (r *Role) EventJoin(node types.Node) (types.Nodes, error) {
return r.onJoin(node, false)
}
func (r *Role) EventLeave(node types.Node) error {
_, err := r.onLeave(node, false)
return err
}
func (r *Role) onJoin(node types.Node, notify bool) (map[string]types.Node, error) {
if err := r.state.Registry.AddNode(node); err != nil { if err := r.state.Registry.AddNode(node); err != nil {
return nil, err return nil, err
} }
r.notify(types.PathNodeJoin, node) if notify {
propagate(r, noReturn(r.state.Clients.Master.EventJoin), node)
}
return r.state.Registry.AllNodes(), nil return r.state.Registry.AllNodes(), nil
} }
func (r *Role) onLeave(node types.Node) (bool, error) { func (r *Role) onLeave(node types.Node, notify bool) (bool, error) {
if err := r.state.Registry.RemoveNode(node); err != nil { if err := r.state.Registry.RemoveNode(node); err != nil {
return false, err return false, err
} }
r.notify(types.PathNodeLeave, node) if notify {
propagate(r, r.state.Clients.Master.EventLeave, node)
}
return true, nil return true, nil
} }
func (r *Role) onKeepAlive(node types.Node) (bool, error) { func (r *Role) onKeepAlive(node types.Node, notify bool) (map[string]types.Node, error) {
r.observer.onKeepAlive(node) r.observer.onKeepAlive(node)
if ok := r.state.Registry.Exists(node.Hostname); !ok { if ok := r.state.Registry.Exists(node.Hostname); !ok {
_, err := r.onJoin(node) // TODO: i don't like this side effect
return true, err if _, err := r.onJoin(node, true); err != nil {
log.Warn().Err(err).Msg("unable to add node to the registry from keepalive")
}
} }
return false, nil if notify {
propagate(r, noReturn(r.state.Clients.Master.EventHeartbeat), node)
}
return r.state.Registry.AllNodes(), nil
} }
func (c *Role) RegisterHandlers(r types.Registrator) { func eventFunc[R any](fn func(types.Node, bool) (R, error), notify bool) func(types.Node) (R, error) {
r.Register(types.PostEndpoint(types.PathMasterJoin, c.onJoin)) return func(n types.Node) (R, error) {
r.Register(types.PostEndpoint(types.PathMasterLeave, c.onLeave)) return fn(n, notify)
r.Register(types.PostEndpoint(types.PathMasterKeepalive, c.onKeepAlive)) }
}
func noReturn[T, V any](fn func(string, T) (V, error)) func(string, T) error {
return func(s string, t T) error {
_, err := fn(s, t)
return err
}
}
func propagate[T any](r *Role, handler func(string, T) error, v T) {
for _, n := range r.state.Registry.ByRole(types.MasterRole) {
addr := n.Endpoint
r.tasksGroup.Go(func() {
handler(addr, v)
})
}
} }

View file

@ -5,15 +5,14 @@ import (
"sync" "sync"
"time" "time"
"git.wzray.com/homelab/hivemind/internal/app"
"git.wzray.com/homelab/hivemind/internal/registry" "git.wzray.com/homelab/hivemind/internal/registry"
"git.wzray.com/homelab/hivemind/internal/state"
"git.wzray.com/homelab/hivemind/internal/types" "git.wzray.com/homelab/hivemind/internal/types"
"git.wzray.com/homelab/hivemind/internal/web/client"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
) )
type observer struct { type observer struct {
state *state.RuntimeState state *app.State
interval int interval int
backoff int backoff int
backoffCount int backoffCount int
@ -23,7 +22,7 @@ type observer struct {
} }
func newObserver( func newObserver(
state *state.RuntimeState, state *app.State,
interval int, interval int,
backoff int, backoff int,
backoffCount int, backoffCount int,
@ -63,7 +62,7 @@ func (o *observer) pollNodes(ctx context.Context, onLeave func(types.Node) error
delay := time.Duration(o.backoff) delay := time.Duration(o.backoff)
alive := false alive := false
for i := o.backoffCount - 1; i >= 0; i-- { for i := o.backoffCount - 1; i >= 0; i-- {
_, err := client.Get[any](n.Endpoint, types.PathNodeHealthcheck) _, err := o.state.Clients.Node.Healthcheck(n.Endpoint)
if err == nil { if err == nil {
logger.Debug().Msg("node is alive") logger.Debug().Msg("node is alive")

View file

@ -7,26 +7,41 @@ import (
"sync" "sync"
"time" "time"
"git.wzray.com/homelab/hivemind/internal/app"
"git.wzray.com/homelab/hivemind/internal/config" "git.wzray.com/homelab/hivemind/internal/config"
"git.wzray.com/homelab/hivemind/internal/state" "git.wzray.com/homelab/hivemind/internal/transport"
"git.wzray.com/homelab/hivemind/internal/transport/node"
"git.wzray.com/homelab/hivemind/internal/types" "git.wzray.com/homelab/hivemind/internal/types"
"git.wzray.com/homelab/hivemind/internal/web/client"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
) )
type Role struct { type Role struct {
state *state.RuntimeState state *app.State
keepaliveGroup sync.WaitGroup keepaliveGroup sync.WaitGroup
config config.NodeConfig config config.NodeConfig
} }
func New(state *state.RuntimeState, config config.NodeConfig) *Role { func New(state *app.State, config config.NodeConfig) *Role {
return &Role{ return &Role{
state: state, state: state,
config: config, config: config,
} }
} }
func (r *Role) OnStartup(ctx context.Context) error {
r.keepaliveGroup.Go(r.keepaliveFunc(ctx))
return nil
}
func (r *Role) OnShutdown() error {
r.keepaliveGroup.Wait()
return nil
}
func (n *Role) RegisterHandlers(r transport.Registrator) {
node.Register(r, n)
}
func (r *Role) Join(bootstrap string) error { func (r *Role) Join(bootstrap string) error {
masters := make(map[string]struct{}) masters := make(map[string]struct{})
for _, node := range r.state.Registry.ByRole(types.MasterRole) { for _, node := range r.state.Registry.ByRole(types.MasterRole) {
@ -46,14 +61,14 @@ func (r *Role) Join(bootstrap string) error {
logger := log.With().Str("host", m).Logger() logger := log.With().Str("host", m).Logger()
logger.Debug().Msg("trying to join via master") logger.Debug().Msg("trying to join via master")
nodes, err := client.Post[map[string]types.Node](m, types.PathMasterJoin, r.state.Self) nodes, err := r.state.Clients.Master.Join(m, r.state.Self)
if err != nil { if err != nil {
errs = append(errs, err) errs = append(errs, err)
logger.Debug().Err(err).Msg("unable to join") logger.Debug().Err(err).Msg("unable to join")
continue continue
} }
if err := r.state.Registry.Set(*nodes); err != nil { if err := r.state.Registry.Set(nodes); err != nil {
logger.Debug().Err(err).Msg("unable to set master's nodes") logger.Debug().Err(err).Msg("unable to set master's nodes")
errs = append(errs, err) errs = append(errs, err)
continue continue
@ -76,8 +91,7 @@ func (r *Role) Leave() error {
logger := log.With().Str("name", m.Hostname).Logger() logger := log.With().Str("name", m.Hostname).Logger()
logger.Debug().Msg("sending leave message") logger.Debug().Msg("sending leave message")
_, err := client.Post[any](m.Endpoint, types.PathMasterLeave, r.state.Self) if err := r.state.Clients.Master.Leave(m.Endpoint, r.state.Self); err != nil {
if err != nil {
logger.Debug().Err(err).Msg("unable to send leave message") logger.Debug().Err(err).Msg("unable to send leave message")
errs = append(errs, err) errs = append(errs, err)
continue continue
@ -90,15 +104,8 @@ func (r *Role) Leave() error {
return fmt.Errorf("unable to send leave message to any master: %w", errors.Join(errs...)) return fmt.Errorf("unable to send leave message to any master: %w", errors.Join(errs...))
} }
func (r *Role) OnStartup(ctx context.Context) error { func (r *Role) Healthcheck() (string, error) {
r.keepaliveGroup.Go(r.keepaliveFunc(ctx)) return "OK", nil
return nil
}
func (r *Role) OnShutdown() error {
r.keepaliveGroup.Wait()
return nil
} }
func (r *Role) keepaliveFunc(ctx context.Context) func() { func (r *Role) keepaliveFunc(ctx context.Context) func() {
@ -107,11 +114,20 @@ func (r *Role) keepaliveFunc(ctx context.Context) func() {
logger := log.With().Str("name", m.Hostname).Logger() logger := log.With().Str("name", m.Hostname).Logger()
logger.Debug().Msg("sending keepalive packet") logger.Debug().Msg("sending keepalive packet")
if _, err := client.Post[any](m.Endpoint, types.PathMasterKeepalive, r.state.Self); err != nil { nodes, err := r.state.Clients.Master.Heartbeat(m.Endpoint, r.state.Self)
if err != nil {
logger.Info().Err(err).Msg("unable to send keepalive packet") logger.Info().Err(err).Msg("unable to send keepalive packet")
} else { continue
logger.Debug().Msg("keepalive packet sent")
} }
logger.Debug().Msg("keepalive packet sent")
if err := r.state.Registry.Set(nodes); err != nil {
logger.Warn().Err(err).Msg("unable to set masters nodes")
continue
}
break
} }
} }
@ -126,27 +142,3 @@ func (r *Role) keepaliveFunc(ctx context.Context) func() {
} }
} }
} }
func (r *Role) onJoin(node types.Node) (bool, error) {
if err := r.state.Registry.AddNode(node); err != nil {
return false, err
}
return true, nil
}
func (r *Role) onLeave(node types.Node) (bool, error) {
if err := r.state.Registry.RemoveNode(node); err != nil {
return false, err
}
return true, nil
}
func healthcheck() (string, error) {
return "OK", nil
}
func (n *Role) RegisterHandlers(r types.Registrator) {
r.Register(types.GetEndpoint(types.PathNodeHealthcheck, healthcheck))
r.Register(types.PostEndpoint(types.PathNodeJoin, n.onJoin))
r.Register(types.PostEndpoint(types.PathNodeLeave, n.onLeave))
}

View file

@ -3,18 +3,18 @@ package roles
import ( import (
"context" "context"
"git.wzray.com/homelab/hivemind/internal/types" "git.wzray.com/homelab/hivemind/internal/transport"
) )
type Role interface { type Role interface {
RegisterHandlers(types.Registrator) RegisterHandlers(transport.Registrator)
OnStartup(context.Context) error OnStartup(context.Context) error
OnShutdown() error OnShutdown() error
} }
type BaseRole struct{} type BaseRole struct{}
func (r *BaseRole) RegisterHandlers(types.Registrator) {} func (r *BaseRole) RegisterHandlers(transport.Registrator) {}
func (r *BaseRole) OnStartup(context.Context) error { return nil } func (r *BaseRole) OnStartup(context.Context) error { return nil }

View file

@ -1,18 +0,0 @@
package state
import (
"git.wzray.com/homelab/hivemind/internal/registry"
"git.wzray.com/homelab/hivemind/internal/types"
)
type RuntimeState struct {
Registry *registry.Registry
Self types.Node
}
func New(r *registry.Registry, n types.Node) *RuntimeState {
return &RuntimeState{
Registry: r,
Self: n,
}
}

View file

@ -0,0 +1,5 @@
package transport
type Caller interface {
Call(host string, path string, data any, out any) error
}

View file

@ -0,0 +1,16 @@
package codec
type Codec interface {
Decode(data []byte, out any) error
Encode(data any) ([]byte, error)
}
func Decode[T any](c Codec, data []byte) (T, error) {
var out T
err := c.Decode(data, &out)
return out, err
}
func Encode[T any](c Codec, data T) ([]byte, error) {
return c.Encode(data)
}

View file

@ -0,0 +1,15 @@
package codec
import "encoding/json"
type jsonCodec struct{}
var JSON = jsonCodec{}
func (jsonCodec) Decode(data []byte, out any) error {
return json.Unmarshal(data, out)
}
func (jsonCodec) Encode(data any) ([]byte, error) {
return json.Marshal(data)
}

View file

@ -0,0 +1,20 @@
package dns
import (
"git.wzray.com/homelab/hivemind/internal/transport"
"git.wzray.com/homelab/hivemind/internal/types"
)
type Client struct {
caller transport.Caller
}
func New(caller transport.Caller) *Client {
return &Client{
caller: caller,
}
}
func (c *Client) Callback(endpoint string, state types.HostState) (bool, error) {
return callbackRoute.Call(c.caller, endpoint, state)
}

View file

@ -0,0 +1,14 @@
package dns
import (
"git.wzray.com/homelab/hivemind/internal/transport"
"git.wzray.com/homelab/hivemind/internal/types"
)
type DnsHandlers interface {
Callback(types.HostState) (bool, error)
}
func Register(registrator transport.Registrator, h DnsHandlers) {
callbackRoute.Register(registrator, h.Callback)
}

View file

@ -0,0 +1,10 @@
package dns
import (
"git.wzray.com/homelab/hivemind/internal/transport"
"git.wzray.com/homelab/hivemind/internal/types"
)
var (
callbackRoute = transport.NewRoute[types.HostState, bool]("/dns/callback")
)

View file

@ -0,0 +1,9 @@
package transport
func WithoutInput[T any](f func() (T, error)) func(struct{}) (T, error) {
return func(s struct{}) (T, error) { return f() }
}
func WithoutOutput[T any](f func(T) error) func(T) (struct{}, error) {
return func(t T) (struct{}, error) { return struct{}{}, f(t) }
}

View file

@ -0,0 +1,24 @@
package host
import (
"git.wzray.com/homelab/hivemind/internal/transport"
"git.wzray.com/homelab/hivemind/internal/types"
)
type Client struct {
caller transport.Caller
}
func New(caller transport.Caller) *Client {
return &Client{
caller: caller,
}
}
func (c *Client) Dns(endpoint string) (types.HostState, error) {
return dnsRoute.CallNoInput(c.caller, endpoint)
}
func (c *Client) Nameserver(endpoint string) (types.HostState, error) {
return nsRoute.CallNoInput(c.caller, endpoint)
}

View file

@ -0,0 +1,16 @@
package host
import (
"git.wzray.com/homelab/hivemind/internal/transport"
"git.wzray.com/homelab/hivemind/internal/types"
)
type HostHandlers interface {
Dns() (types.HostState, error)
Nameserver() (types.HostState, error)
}
func Register(registrator transport.Registrator, h HostHandlers) {
dnsRoute.Register(registrator, transport.WithoutInput(h.Dns))
nsRoute.Register(registrator, transport.WithoutInput(h.Nameserver))
}

View file

@ -0,0 +1,11 @@
package host
import (
"git.wzray.com/homelab/hivemind/internal/transport"
"git.wzray.com/homelab/hivemind/internal/types"
)
var (
dnsRoute = transport.NewRoute[struct{}, types.HostState]("/host/dns")
nsRoute = transport.NewRoute[struct{}, types.HostState]("/host/ns")
)

View file

@ -0,0 +1,40 @@
package master
import (
"git.wzray.com/homelab/hivemind/internal/transport"
"git.wzray.com/homelab/hivemind/internal/types"
)
type Client struct {
caller transport.Caller
}
func New(caller transport.Caller) *Client {
return &Client{
caller: caller,
}
}
func (c *Client) Heartbeat(endpoint string, n types.Node) (types.Nodes, error) {
return heartbeatRoute.Call(c.caller, endpoint, n)
}
func (c *Client) Join(endpoint string, n types.Node) (types.Nodes, error) {
return joinRoute.Call(c.caller, endpoint, n)
}
func (c *Client) Leave(endpoint string, n types.Node) error {
return leaveRoute.CallNoOutput(c.caller, endpoint, n)
}
func (c *Client) EventHeartbeat(endpoint string, n types.Node) (types.Nodes, error) {
return eventHeartbeatRoute.Call(c.caller, endpoint, n)
}
func (c *Client) EventJoin(endpoint string, n types.Node) (types.Nodes, error) {
return eventJoinRoute.Call(c.caller, endpoint, n)
}
func (c *Client) EventLeave(endpoint string, n types.Node) error {
return eventLeaveRoute.CallNoOutput(c.caller, endpoint, n)
}

View file

@ -0,0 +1,26 @@
package master
import (
"git.wzray.com/homelab/hivemind/internal/transport"
"git.wzray.com/homelab/hivemind/internal/types"
)
type MasterHandlers interface {
Heartbeat(types.Node) (types.Nodes, error)
Join(types.Node) (types.Nodes, error)
Leave(types.Node) error
EventHeartbeat(types.Node) (types.Nodes, error)
EventJoin(types.Node) (types.Nodes, error)
EventLeave(types.Node) error
}
func Register(registrator transport.Registrator, h MasterHandlers) {
heartbeatRoute.Register(registrator, h.Heartbeat)
joinRoute.Register(registrator, h.Join)
leaveRoute.Register(registrator, transport.WithoutOutput(h.Leave))
eventHeartbeatRoute.Register(registrator, h.EventHeartbeat)
eventJoinRoute.Register(registrator, h.EventJoin)
eventLeaveRoute.Register(registrator, transport.WithoutOutput(h.EventLeave))
}

View file

@ -0,0 +1,16 @@
package master
import (
"git.wzray.com/homelab/hivemind/internal/transport"
"git.wzray.com/homelab/hivemind/internal/types"
)
var (
heartbeatRoute = transport.NewRoute[types.Node, types.Nodes]("/master/heartbeat")
joinRoute = transport.NewRoute[types.Node, types.Nodes]("/master/join")
leaveRoute = transport.NewRoute[types.Node, struct{}]("/master/leave")
eventHeartbeatRoute = transport.NewRoute[types.Node, types.Nodes]("/master/event/heartbeat")
eventJoinRoute = transport.NewRoute[types.Node, types.Nodes]("/master/event/join")
eventLeaveRoute = transport.NewRoute[types.Node, struct{}]("/master/event/leave")
)

View file

@ -0,0 +1,17 @@
package node
import (
"git.wzray.com/homelab/hivemind/internal/transport"
)
type Client struct {
caller transport.Caller
}
func New(caller transport.Caller) *Client {
return &Client{caller: caller}
}
func (c *Client) Healthcheck(endpoint string) (string, error) {
return healthcheckRoute.CallNoInput(c.caller, endpoint)
}

View file

@ -0,0 +1,13 @@
package node
import (
"git.wzray.com/homelab/hivemind/internal/transport"
)
type NodeHandlers interface {
Healthcheck() (string, error)
}
func Register(registrator transport.Registrator, h NodeHandlers) {
healthcheckRoute.Register(registrator, transport.WithoutInput(h.Healthcheck))
}

View file

@ -0,0 +1,9 @@
package node
import (
"git.wzray.com/homelab/hivemind/internal/transport"
)
var (
healthcheckRoute = transport.NewRoute[struct{}, string]("/node/healthcheck")
)

View file

@ -0,0 +1,22 @@
package transport
import (
"git.wzray.com/homelab/hivemind/internal/transport/codec"
)
type Handler struct {
path string
handler func(codec.Codec, []byte) ([]byte, error)
}
func (h Handler) Path() string {
return h.path
}
func (h Handler) Handle(c codec.Codec, v []byte) ([]byte, error) {
return h.handler(c, v)
}
type Registrator interface {
Register(endpoint Handler)
}

View file

@ -0,0 +1,65 @@
package transport
import (
"fmt"
"git.wzray.com/homelab/hivemind/internal/transport/codec"
)
type route[In, Out any] struct {
path string
}
func NewRoute[In, Out any](path string) route[In, Out] {
return route[In, Out]{
path: path,
}
}
func routeToHandler[In, Out any](r route[In, Out], handler func(In) (Out, error)) Handler {
return Handler{
path: r.path,
handler: func(c codec.Codec, b []byte) ([]byte, error) {
data, err := codec.Decode[In](c, b)
if err != nil {
return nil, fmt.Errorf("unable to decode body: %w", err)
}
out, err := handler(data)
if err != nil {
return nil, fmt.Errorf("error while handling request: %w", err)
}
raw, err := codec.Encode(c, out)
if err != nil {
return nil, fmt.Errorf("unable to encode body: %w", err)
}
return raw, nil
},
}
}
func (e route[In, Out]) Call(caller Caller, host string, data In) (Out, error) {
var out Out
err := caller.Call(host, e.path, data, &out)
return out, err
}
func (e route[In, Out]) CallNoInput(caller Caller, host string) (Out, error) {
var out Out
err := caller.Call(host, e.path, struct{}{}, &out)
return out, err
}
func (e route[In, Out]) CallNoOutput(caller Caller, host string, data In) error {
return caller.Call(host, e.path, data, nil)
}
func (e route[In, Out]) Path() string {
return e.path
}
func (e route[In, Out]) Register(r Registrator, h func(In) (Out, error)) {
r.Register(routeToHandler(e, h))
}

View file

@ -2,13 +2,15 @@ package types
import "fmt" import "fmt"
type Nodes map[string]Node
// TODO: consider moving this type back to registry // TODO: consider moving this type back to registry
type Node struct { type Node struct {
Hostname string `json:"hostname"` Hostname string `json:"hostname"`
Address string `json:"address"` Address string `json:"address"`
Port int `json:"port"` Port int `json:"port"`
Roles []Role `json:"roles"` Roles []Role `json:"roles"`
Endpoint string `json:"endpoint"` Endpoint string `json:"endpoint"` // TODO: make endpoint into a separate type
} }
func NewNode( func NewNode(

View file

@ -1,78 +1,60 @@
package types package types
import ( // type Path string
"encoding/json" //
"fmt" // func (p Path) String() string {
"net/http" // return string(p)
) // }
//
// TODO: split this up // const (
// PathMasterJoin Path = "/master/join"
type Path string // PathMasterLeave Path = "/master/leave"
// PathMasterKeepalive Path = "/master/keepalive"
func (p Path) String() string { // PathMasterEventJoin Path = "/master/event_join"
return string(p) // PathMasterEventLeave Path = "/master/event_leave"
} // PathMasterEventKeepalive Path = "/master/event_keepalive"
//
const ( // PathNodeHealthcheck Path = "/node/healthcheck"
PathMasterJoin Path = "/master/join" //
PathMasterLeave Path = "/master/leave" // PathDnsCallback Path = "/dns/callback"
PathMasterKeepalive Path = "/master/keepalive" //
// PathHostCallback Path = "/host/callback"
PathNodeHealthcheck Path = "/node/healthcheck" // PathHostDns Path = "/host/dns"
PathNodeJoin Path = "/node/join" // PathHostNs Path = "/host/ns"
PathNodeLeave Path = "/node/leave" // )
//
PathDnsCallback Path = "/dns/callback" // type Route interface {
// Path() string
PathHostCallback Path = "/host/callback" // Handle([]byte) (any, error)
PathHostDns Path = "/host/dns" // }
PathHostNs Path = "/host/ns" //
) // type endpoint struct {
// path string
type Response[T any] struct { // handler func([]byte) (any, error)
Ok bool `json:"ok"` // }
Data T `json:"data,omitempty"` //
Err string `json:"err,omitempty"` // func (e endpoint) Path() string { return e.path }
} //
// func (e endpoint) Handle(v []byte) (any, error) { return e.handler(v) }
type Route interface { //
Path() string // func PostEndpoint[T any, V any](path Path, handler func(T) (V, error)) Route {
Handle([]byte) (any, error) // return endpoint{
} // path: "POST " + path.String(),
// handler: func(a []byte) (any, error) {
type endpoint struct { // var r T
path string // if err := json.Unmarshal(a, &r); err != nil {
handler func([]byte) (any, error) // return nil, fmt.Errorf("unable to unmarshal json: %w", err)
} // }
// return handler(r)
func (e endpoint) Path() string { return e.path } // },
// }
func (e endpoint) Handle(v []byte) (any, error) { return e.handler(v) } // }
//
func PostEndpoint[T any, V any](path Path, handler func(T) (V, error)) Route { // func GetEndpoint[T any](path Path, handler func() (T, error)) Route {
return endpoint{ // return endpoint{
path: "POST " + path.String(), // path: "GET " + path.String(),
handler: func(a []byte) (any, error) { // handler: func(a []byte) (any, error) {
var r T // return handler()
if err := json.Unmarshal(a, &r); err != nil { // },
return nil, fmt.Errorf("unable to unmarshal json: %w", err) // }
} // }
return handler(r)
},
}
}
func GetEndpoint[T any](path Path, handler func() (T, error)) Route {
return endpoint{
path: "GET " + path.String(),
handler: func(a []byte) (any, error) {
return handler()
},
}
}
type Registrator interface {
Register(endpoint Route)
RegisterRaw(method string, pattern string, handler func(http.ResponseWriter, *http.Request))
}

View file

@ -10,20 +10,18 @@ import (
"net/url" "net/url"
"time" "time"
"git.wzray.com/homelab/hivemind/internal/types" "git.wzray.com/homelab/hivemind/internal/web"
"git.wzray.com/homelab/hivemind/internal/web/middleware" "git.wzray.com/homelab/hivemind/internal/web/middleware"
) )
type client struct { type Client struct {
http *http.Client http *http.Client
middleware middleware.Middleware middleware middleware.Middleware
} }
var defaultClient *client
const timeout = time.Duration(2) * time.Second const timeout = time.Duration(2) * time.Second
func (c *client) makeRequest(method string, host string, path types.Path, data any, out any) error { func (c *Client) Call(host string, path string, data any, out any) error {
var body io.Reader var body io.Reader
if data != nil { if data != nil {
raw, err := json.Marshal(data) raw, err := json.Marshal(data)
@ -36,10 +34,10 @@ func (c *client) makeRequest(method string, host string, path types.Path, data a
uri := (&url.URL{ uri := (&url.URL{
Scheme: "http", Scheme: "http",
Host: host, Host: host,
Path: path.String(), Path: path,
}).String() }).String()
r, err := http.NewRequest(method, uri, body) r, err := http.NewRequest("POST", uri, body)
if err != nil { if err != nil {
return fmt.Errorf("build http request: %w", err) return fmt.Errorf("build http request: %w", err)
} }
@ -52,60 +50,41 @@ func (c *client) makeRequest(method string, host string, path types.Path, data a
return fmt.Errorf("apply middleware: %w", err) return fmt.Errorf("apply middleware: %w", err)
} }
resp, err := c.http.Do(r) httpResponse, err := c.http.Do(r)
if err != nil { if err != nil {
return fmt.Errorf("send request: %w", err) return fmt.Errorf("send request: %w", err)
} }
defer httpResponse.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 { if httpResponse.StatusCode < 200 || httpResponse.StatusCode >= 300 {
b, _ := io.ReadAll(resp.Body) b, _ := io.ReadAll(httpResponse.Body)
return fmt.Errorf("http %d: %s", resp.StatusCode, string(b)) return fmt.Errorf("http %d: %s", httpResponse.StatusCode, string(b))
} }
defer resp.Body.Close()
if out != nil { if out != nil {
if err := json.NewDecoder(resp.Body).Decode(out); err != nil { var resp web.Response[json.RawMessage]
return fmt.Errorf("decode body: %w", err) if err := json.NewDecoder(httpResponse.Body).Decode(&resp); err != nil {
return fmt.Errorf("decode response wrapper: %w", err)
}
if !resp.Ok {
return fmt.Errorf("error on the remote: %w", errors.New(resp.Err))
}
if err := json.Unmarshal(resp.Data, out); err != nil {
return fmt.Errorf("decode response body: %w", err)
} }
} }
io.Copy(io.Discard, resp.Body) io.Copy(io.Discard, httpResponse.Body)
return nil return nil
} }
func Init(mw middleware.Middleware) { func New(middleware middleware.Middleware) *Client {
if defaultClient != nil { return &Client{
panic("web.client: Init called twice")
}
defaultClient = &client{
http: &http.Client{ http: &http.Client{
Timeout: timeout, Timeout: timeout,
}, },
middleware: mw, middleware: middleware,
} }
} }
func request[Out any, In any](method string, host string, path types.Path, data In) (*Out, error) {
out := &types.Response[Out]{}
err := defaultClient.makeRequest(method, host, path, data, out)
if err != nil {
return nil, err
}
if !out.Ok {
return nil, errors.New(out.Err)
}
return &out.Data, err
}
// TODO: out should not be a pointer
func Get[Out any](host string, path types.Path) (*Out, error) {
return request[Out, any](http.MethodGet, host, path, nil)
}
// TODO: out should not be a pointer
func Post[Out any, In any](host string, path types.Path, data In) (*Out, error) {
return request[Out](http.MethodPost, host, path, data)
}

View file

@ -5,7 +5,8 @@ import (
"io" "io"
"net/http" "net/http"
"git.wzray.com/homelab/hivemind/internal/types" "git.wzray.com/homelab/hivemind/internal/transport"
"git.wzray.com/homelab/hivemind/internal/transport/codec"
"git.wzray.com/homelab/hivemind/internal/web/middleware" "git.wzray.com/homelab/hivemind/internal/web/middleware"
"github.com/rs/zerolog/log" "github.com/rs/zerolog/log"
) )
@ -15,7 +16,7 @@ type Server struct {
httpServer http.Server httpServer http.Server
} }
func NewServer(addr string, middleware middleware.Middleware) *Server { func New(addr string, middleware middleware.Middleware) *Server {
mux := http.NewServeMux() mux := http.NewServeMux()
s := &Server{ s := &Server{
mux: mux, mux: mux,
@ -35,7 +36,7 @@ func (s *Server) Shutdown(ctx context.Context) error {
return s.httpServer.Shutdown(ctx) return s.httpServer.Shutdown(ctx)
} }
func (s *Server) handleFunc(route types.Route) func(w http.ResponseWriter, r *http.Request) { func (s *Server) handleFunc(route transport.Handler) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
log.Debug(). // TODO: make this a middleware log.Debug(). // TODO: make this a middleware
Str("method", r.Method). Str("method", r.Method).
@ -46,35 +47,36 @@ func (s *Server) handleFunc(route types.Route) func(w http.ResponseWriter, r *ht
w.Header().Set("Content-Type", "application/json; charset=utf-8") w.Header().Set("Content-Type", "application/json; charset=utf-8")
body, err := io.ReadAll(r.Body) raw, err := io.ReadAll(r.Body)
if err != nil { if err != nil {
w.Write(fail("read request body: %v", err)) w.Write(fail("read request body: %v", err))
log.Err(err).Msg("unable to read request body") log.Err(err).Msg("unable to read request body")
return return
} }
raw, err := route.Handle(body) resp, err := route.Handle(codec.JSON, raw)
if err != nil { if err != nil {
w.Write(fail("handle request: %v", err)) w.Write(fail("handle request: %v", err))
log.Err(err).Msg("unable to handle request") log.Err(err).Msg("unable to handle request")
return return
} }
data, err := ok(raw) payload, err := ok(resp)
if err != nil { if err != nil {
w.Write(fail("marshal response: %v", err)) w.Write(fail("marshal response: %v", err))
log.Err(err).Msg("unable to marshal response") log.Err(err).Msg("unable to marshal response")
return return
} }
w.Write(data) w.Write(payload)
} }
} }
func (s *Server) Register(endpoint types.Route) { func (s *Server) Register(endpoint transport.Handler) {
s.mux.HandleFunc(endpoint.Path(), s.handleFunc(endpoint)) s.mux.HandleFunc(endpoint.Path(), s.handleFunc(endpoint))
} }
// TODO: i don't think that I need this?
func (s *Server) RegisterRaw(method string, pattern string, handler func(http.ResponseWriter, *http.Request)) { func (s *Server) RegisterRaw(method string, pattern string, handler func(http.ResponseWriter, *http.Request)) {
s.mux.HandleFunc(method+" "+pattern, handler) s.mux.HandleFunc(method+" "+pattern, handler)
} }

View file

@ -4,20 +4,20 @@ import (
"encoding/json" "encoding/json"
"fmt" "fmt"
"git.wzray.com/homelab/hivemind/internal/types" "git.wzray.com/homelab/hivemind/internal/web"
) )
func fail(format string, a ...any) []byte { func fail(format string, a ...any) []byte {
r, _ := json.Marshal(types.Response[string]{ r, _ := json.Marshal(web.Response[string]{
Ok: false, Ok: false,
Err: fmt.Sprintf(format, a...), Err: fmt.Sprintf(format, a...),
}) })
return r return r
} }
func ok[T any](data T) ([]byte, error) { func ok(data []byte) ([]byte, error) {
return json.Marshal(types.Response[T]{ return json.Marshal(web.Response[json.RawMessage]{
Ok: true, Ok: true,
Data: data, Data: json.RawMessage(data),
}) })
} }

7
internal/web/types.go Normal file
View file

@ -0,0 +1,7 @@
package web
type Response[T any] struct {
Ok bool `json:"ok"`
Data T `json:"data,omitempty"`
Err string `json:"err,omitempty"`
}