Compare commits
No commits in common. "0448f66ab272487d7195297bcdf00d83f3c90400" and "a32b0f728e4ba3b3ff288f3de2085d6fea12049f" have entirely different histories.
0448f66ab2
...
a32b0f728e
43 changed files with 414 additions and 916 deletions
|
|
@ -4,7 +4,6 @@ changed="$(gofmt -d .)"
|
||||||
[ -n "$changed" ] && {
|
[ -n "$changed" ] && {
|
||||||
echo 'Some files are not formatted'
|
echo 'Some files are not formatted'
|
||||||
delta <<< "$changed"
|
delta <<< "$changed"
|
||||||
go fmt ./...
|
|
||||||
exit 1
|
exit 1
|
||||||
}
|
}
|
||||||
go vet ./...
|
go vet ./...
|
||||||
|
|
|
||||||
|
|
@ -15,7 +15,6 @@ RUN --mount=type=cache,target="/cache/go-build" mkdir -p /cache/go-build; make h
|
||||||
FROM alpine
|
FROM alpine
|
||||||
|
|
||||||
EXPOSE 56714/tcp
|
EXPOSE 56714/tcp
|
||||||
EXPOSE 56715/tcp
|
|
||||||
WORKDIR /app
|
WORKDIR /app
|
||||||
|
|
||||||
VOLUME /conf
|
VOLUME /conf
|
||||||
|
|
|
||||||
55
TODO.md
55
TODO.md
|
|
@ -1,58 +1,3 @@
|
||||||
# Background
|
|
||||||
Some background first:
|
|
||||||
the node can have multiple roles
|
|
||||||
this includes (but not limited to)
|
|
||||||
* Host (can generate events)
|
|
||||||
* DNS (can consume the events and act on them)
|
|
||||||
* Something else that I might come up with (the architecture has to be expandable)
|
|
||||||
|
|
||||||
# Control pane (3+ nodes)
|
|
||||||
* Quorum
|
|
||||||
* Consists of $n / 2 + 1$ nodes
|
|
||||||
* Cluster is considered "degraded" if no quorum can be created
|
|
||||||
* Stores an event log
|
|
||||||
* **Only** leader can append to the log (with quorum permission)
|
|
||||||
* Membership authority
|
|
||||||
* No joins without quorum approval
|
|
||||||
* Leaves are not propagated without quorum
|
|
||||||
* Manages epoch (useful for GC)
|
|
||||||
* Node $N$ with $N.epoch != cluster.epoch$ can **not** join the cluster, and has to re-join (bootstrap)
|
|
||||||
* Can (but doesn't have to) be a bootstrap point
|
|
||||||
|
|
||||||
# Membership
|
|
||||||
* Membership is managed though SWIM
|
|
||||||
* Each node contains a small slice of the entire network
|
|
||||||
## Joining
|
|
||||||
Each node has an array of roles:
|
|
||||||
1. That it performs
|
|
||||||
2. That it requires to operate (can be moved out to the master, or the shared type)
|
|
||||||
3. That it needs for bootstrapping (analogous to 2.)
|
|
||||||
|
|
||||||
Node can join via a master or via other nodes
|
|
||||||
When a node requests to join, the responder makes a request to the CP and asks for a permission to add this node
|
|
||||||
* If master allows
|
|
||||||
1. The node gets a membership digest from the CP.
|
|
||||||
2. The node *can* be brought up to speed using it's neighbors from 1.
|
|
||||||
3. Node join event gets broadcasted over SWIM gossiping
|
|
||||||
* Otherwise, nothing happens
|
|
||||||
|
|
||||||
# Host node
|
|
||||||
## Bootstrap
|
|
||||||
Host node requests `dns` nodes on join (and other node types, such as `ns`, `nginx`, etc... They should really be called something like `dns_processor`, and the internals (how it processes the dns) should not be visible to the cluster, but that's a task for a future me)
|
|
||||||
When a new update occurs, it sends the update to *some* `dns` hosts.
|
|
||||||
|
|
||||||
# DNS node
|
|
||||||
## Bootstrap
|
|
||||||
First, it gets all the available `hosts` from the CP
|
|
||||||
Then it requests their configs and sets map[hostName]seq accordingly
|
|
||||||
## Simple join (when other nodes exist)
|
|
||||||
It requests it's config from other nodes and that's it
|
|
||||||
|
|
||||||
<!-- TODO: finish the TODO file lol -->
|
|
||||||
|
|
||||||
# Minor To-Do
|
|
||||||
- auth middleware lol
|
|
||||||
- move request logging out of the request handling into a middleware
|
|
||||||
- nginx role
|
- nginx role
|
||||||
- think about choosing the master for the keepalive message (should be somewhat load-balanced)
|
- think about choosing the master for the keepalive message (should be somewhat load-balanced)
|
||||||
- hivemind lite should not just print `hivemind-lite` lol
|
- hivemind lite should not just print `hivemind-lite` lol
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,6 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/app"
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/config"
|
"git.wzray.com/homelab/hivemind/internal/config"
|
||||||
"git.wzray.com/homelab/hivemind/internal/registry"
|
"git.wzray.com/homelab/hivemind/internal/registry"
|
||||||
"git.wzray.com/homelab/hivemind/internal/roles"
|
"git.wzray.com/homelab/hivemind/internal/roles"
|
||||||
|
|
@ -18,14 +17,11 @@ import (
|
||||||
"git.wzray.com/homelab/hivemind/internal/roles/host"
|
"git.wzray.com/homelab/hivemind/internal/roles/host"
|
||||||
"git.wzray.com/homelab/hivemind/internal/roles/master"
|
"git.wzray.com/homelab/hivemind/internal/roles/master"
|
||||||
"git.wzray.com/homelab/hivemind/internal/roles/node"
|
"git.wzray.com/homelab/hivemind/internal/roles/node"
|
||||||
dns_transport "git.wzray.com/homelab/hivemind/internal/transport/dns"
|
"git.wzray.com/homelab/hivemind/internal/state"
|
||||||
host_transport "git.wzray.com/homelab/hivemind/internal/transport/host"
|
|
||||||
master_transport "git.wzray.com/homelab/hivemind/internal/transport/master"
|
|
||||||
node_transport "git.wzray.com/homelab/hivemind/internal/transport/node"
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/types"
|
"git.wzray.com/homelab/hivemind/internal/types"
|
||||||
web_client "git.wzray.com/homelab/hivemind/internal/web/client"
|
"git.wzray.com/homelab/hivemind/internal/web/client"
|
||||||
web_middleware "git.wzray.com/homelab/hivemind/internal/web/middleware"
|
"git.wzray.com/homelab/hivemind/internal/web/middleware"
|
||||||
web_server "git.wzray.com/homelab/hivemind/internal/web/server"
|
"git.wzray.com/homelab/hivemind/internal/web/server"
|
||||||
"github.com/rs/zerolog"
|
"github.com/rs/zerolog"
|
||||||
"github.com/rs/zerolog/log"
|
"github.com/rs/zerolog/log"
|
||||||
"github.com/rs/zerolog/pkgerrors"
|
"github.com/rs/zerolog/pkgerrors"
|
||||||
|
|
@ -91,23 +87,18 @@ func main() {
|
||||||
filestore.EnsureExists()
|
filestore.EnsureExists()
|
||||||
registry := registry.New(filestore, self)
|
registry := registry.New(filestore, self)
|
||||||
|
|
||||||
var builder web_middleware.MiddlewareBuilder
|
state := state.New(registry, self)
|
||||||
middlewares := builder.Prepare()
|
|
||||||
|
|
||||||
client := web_client.New(middlewares)
|
|
||||||
|
|
||||||
listenAddr := fmt.Sprintf("%v:%v", configuration.Node.ListenOn, configuration.Node.Port)
|
|
||||||
server := web_server.New(listenAddr, middlewares)
|
|
||||||
|
|
||||||
state := app.NewState(registry, self, app.Clients{
|
|
||||||
Master: master_transport.New(client),
|
|
||||||
DNS: dns_transport.New(client),
|
|
||||||
Host: host_transport.New(client),
|
|
||||||
Node: node_transport.New(client),
|
|
||||||
})
|
|
||||||
|
|
||||||
nodeRole := node.New(state, configuration.Node)
|
nodeRole := node.New(state, configuration.Node)
|
||||||
|
|
||||||
|
var builder middleware.MiddlewareBuilder
|
||||||
|
middlewares := builder.Prepare()
|
||||||
|
|
||||||
|
client.Init(middlewares)
|
||||||
|
|
||||||
|
listenAddr := fmt.Sprintf("%v:%v", configuration.Node.ListenOn, configuration.Node.Port)
|
||||||
|
server := server.NewServer(listenAddr, middlewares)
|
||||||
|
|
||||||
roles := make([]roles.Role, 0)
|
roles := make([]roles.Role, 0)
|
||||||
roles = append(roles, nodeRole)
|
roles = append(roles, nodeRole)
|
||||||
|
|
||||||
|
|
|
||||||
6
go.mod
6
go.mod
|
|
@ -2,12 +2,10 @@ module git.wzray.com/homelab/hivemind
|
||||||
|
|
||||||
go 1.25.5
|
go 1.25.5
|
||||||
|
|
||||||
require (
|
require github.com/rs/zerolog v1.34.0
|
||||||
github.com/rs/zerolog v1.34.0
|
|
||||||
github.com/BurntSushi/toml v1.6.0
|
|
||||||
)
|
|
||||||
|
|
||||||
require (
|
require (
|
||||||
|
github.com/BurntSushi/toml v1.6.0 // indirect
|
||||||
github.com/mattn/go-colorable v0.1.14 // indirect
|
github.com/mattn/go-colorable v0.1.14 // indirect
|
||||||
github.com/mattn/go-isatty v0.0.20 // indirect
|
github.com/mattn/go-isatty v0.0.20 // indirect
|
||||||
github.com/pkg/errors v0.9.1 // indirect
|
github.com/pkg/errors v0.9.1 // indirect
|
||||||
|
|
|
||||||
|
|
@ -1,35 +0,0 @@
|
||||||
package app
|
|
||||||
|
|
||||||
import (
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/registry"
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/transport/dns"
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/transport/host"
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/transport/master"
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/transport/node"
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/types"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Clients struct {
|
|
||||||
Master *master.Client
|
|
||||||
DNS *dns.Client
|
|
||||||
Host *host.Client
|
|
||||||
Node *node.Client
|
|
||||||
}
|
|
||||||
|
|
||||||
type State struct {
|
|
||||||
Registry *registry.Registry
|
|
||||||
Self types.Node
|
|
||||||
Clients Clients
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewState(
|
|
||||||
registry *registry.Registry,
|
|
||||||
self types.Node,
|
|
||||||
clients Clients,
|
|
||||||
) *State {
|
|
||||||
return &State{
|
|
||||||
Registry: registry,
|
|
||||||
Self: self,
|
|
||||||
Clients: clients,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
@ -14,8 +14,5 @@ var DefaultConfig = Config{
|
||||||
BackoffCount: 4,
|
BackoffCount: 4,
|
||||||
NodeTimeout: 120,
|
NodeTimeout: 120,
|
||||||
},
|
},
|
||||||
Host: HostConfig{
|
|
||||||
ListenAddress: "0.0.0.0:56715",
|
|
||||||
},
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -12,7 +12,6 @@ type HostConfig struct {
|
||||||
LocalAddress string `toml:"local_address"`
|
LocalAddress string `toml:"local_address"`
|
||||||
InternalEntrypoint string `toml:"internal_entrypoint"`
|
InternalEntrypoint string `toml:"internal_entrypoint"`
|
||||||
ExternalEntrypoint string `toml:"external_entrypoint"`
|
ExternalEntrypoint string `toml:"external_entrypoint"`
|
||||||
ListenAddress string `toml:"listen_address"`
|
|
||||||
baseRoleConfig
|
baseRoleConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -41,10 +40,6 @@ func (c HostConfig) Validate() error {
|
||||||
return errors.New("missing external entrypoint")
|
return errors.New("missing external entrypoint")
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.ListenAddress == "" {
|
|
||||||
return errors.New("missing listen address")
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -72,8 +67,4 @@ func (c *HostConfig) Merge(other HostConfig) {
|
||||||
if other.ExternalEntrypoint != "" {
|
if other.ExternalEntrypoint != "" {
|
||||||
c.ExternalEntrypoint = other.ExternalEntrypoint
|
c.ExternalEntrypoint = other.ExternalEntrypoint
|
||||||
}
|
}
|
||||||
|
|
||||||
if other.ListenAddress != "" {
|
|
||||||
c.ListenAddress = other.ListenAddress
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -53,12 +53,8 @@ func (c NodeConfig) Validate() error {
|
||||||
return errors.New("invalid keepalive_interval")
|
return errors.New("invalid keepalive_interval")
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.ListenOn == "" {
|
|
||||||
return errors.New("missing listen_on")
|
|
||||||
}
|
|
||||||
|
|
||||||
if net.ParseIP(c.ListenOn) == nil {
|
if net.ParseIP(c.ListenOn) == nil {
|
||||||
return fmt.Errorf("invalid listen_on: %v", c.ListenOn)
|
return errors.New("invalid listen_on")
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
|
||||||
|
|
@ -8,23 +8,22 @@ import (
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/app"
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/config"
|
"git.wzray.com/homelab/hivemind/internal/config"
|
||||||
"git.wzray.com/homelab/hivemind/internal/transport"
|
"git.wzray.com/homelab/hivemind/internal/state"
|
||||||
"git.wzray.com/homelab/hivemind/internal/transport/dns"
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/types"
|
"git.wzray.com/homelab/hivemind/internal/types"
|
||||||
|
"git.wzray.com/homelab/hivemind/internal/web/client"
|
||||||
"github.com/rs/zerolog/log"
|
"github.com/rs/zerolog/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
const hostsDir = "/etc/hosts.d/"
|
const hostsDir = "/etc/hosts.d/"
|
||||||
|
|
||||||
type Role struct {
|
type Role struct {
|
||||||
state *app.State
|
state *state.RuntimeState
|
||||||
config config.DnsConfig
|
config config.DnsConfig
|
||||||
group sync.WaitGroup
|
group sync.WaitGroup
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(state *app.State, config config.DnsConfig) *Role {
|
func New(state *state.RuntimeState, config config.DnsConfig) *Role {
|
||||||
r := &Role{
|
r := &Role{
|
||||||
state: state,
|
state: state,
|
||||||
config: config,
|
config: config,
|
||||||
|
|
@ -33,6 +32,28 @@ func New(state *app.State, config config.DnsConfig) *Role {
|
||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *Role) updateDnsmasq(filename string, data []byte) error {
|
||||||
|
if err := os.WriteFile(filename, data, 0644); err != nil {
|
||||||
|
return fmt.Errorf("write endpoint file %q: %w", filename, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := r.reload(); err != nil {
|
||||||
|
return fmt.Errorf("reload dnsmasq: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func parseState(state types.HostState) (string, []byte) {
|
||||||
|
var builder strings.Builder
|
||||||
|
|
||||||
|
for _, d := range state.Domains {
|
||||||
|
builder.WriteString(fmt.Sprintf("%s %s\n", state.Address, d))
|
||||||
|
}
|
||||||
|
|
||||||
|
return hostsDir + state.Hostname, []byte(builder.String())
|
||||||
|
}
|
||||||
|
|
||||||
func (r *Role) OnStartup(ctx context.Context) error {
|
func (r *Role) OnStartup(ctx context.Context) error {
|
||||||
r.group.Go(func() {
|
r.group.Go(func() {
|
||||||
r.syncFromRegistry()
|
r.syncFromRegistry()
|
||||||
|
|
@ -53,46 +74,15 @@ func (r *Role) OnStartup(ctx context.Context) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Role) OnShutdown() error {
|
|
||||||
r.group.Wait()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Role) RegisterHandlers(rg transport.Registrator) {
|
|
||||||
dns.Register(rg, r)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Role) Callback(state types.HostState) (bool, error) {
|
|
||||||
filename, data := parseState(state)
|
|
||||||
|
|
||||||
if err := r.updateDnsmasq(filename, data); err != nil {
|
|
||||||
return false, err
|
|
||||||
}
|
|
||||||
|
|
||||||
return true, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Role) updateDnsmasq(filename string, data []byte) error {
|
|
||||||
if err := os.WriteFile(filename, data, 0644); err != nil {
|
|
||||||
return fmt.Errorf("write endpoint file %q: %w", filename, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := r.reload(); err != nil {
|
|
||||||
return fmt.Errorf("reload dnsmasq: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Role) syncFromRegistry() {
|
func (r *Role) syncFromRegistry() {
|
||||||
for _, n := range r.state.Registry.ByRole(types.HostRole) {
|
for _, n := range r.state.Registry.ByRole(types.HostRole) {
|
||||||
state, err := r.state.Clients.Host.Dns(n.Endpoint)
|
state, err := client.Get[types.HostState](n.Endpoint, types.PathHostDns)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn().Str("name", n.Hostname).Err(err).Msg("unable to get host config")
|
log.Warn().Str("name", n.Hostname).Err(err).Msg("unable to get host config")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
filename, data := parseState(state)
|
filename, data := parseState(*state)
|
||||||
if err := r.updateDnsmasq(filename, data); err != nil {
|
if err := r.updateDnsmasq(filename, data); err != nil {
|
||||||
log.Warn().Str("name", n.Hostname).Err(err).Msg("unable to update dnsmasq")
|
log.Warn().Str("name", n.Hostname).Err(err).Msg("unable to update dnsmasq")
|
||||||
continue
|
continue
|
||||||
|
|
@ -100,6 +90,11 @@ func (r *Role) syncFromRegistry() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *Role) OnShutdown() error {
|
||||||
|
r.group.Wait()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func (r *Role) reload() error {
|
func (r *Role) reload() error {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
|
|
@ -112,12 +107,16 @@ func (r *Role) reload() error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func parseState(state types.HostState) (string, []byte) {
|
func (r *Role) onCallback(state types.HostState) (bool, error) {
|
||||||
var builder strings.Builder
|
filename, data := parseState(state)
|
||||||
|
|
||||||
for _, d := range state.Domains {
|
if err := r.updateDnsmasq(filename, data); err != nil {
|
||||||
builder.WriteString(fmt.Sprintf("%s %s\n", state.Address, d))
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return hostsDir + state.Hostname, []byte(builder.String())
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Role) RegisterHandlers(rg types.Registrator) {
|
||||||
|
rg.Register(types.PostEndpoint(types.PathDnsCallback, r.onCallback))
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,96 +0,0 @@
|
||||||
package host
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"net/http"
|
|
||||||
"net/url"
|
|
||||||
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/config"
|
|
||||||
"github.com/rs/zerolog/log"
|
|
||||||
)
|
|
||||||
|
|
||||||
type TraefikListener interface {
|
|
||||||
OnTraefikUpdate(traefikResponse)
|
|
||||||
}
|
|
||||||
|
|
||||||
type TraefikGateway struct {
|
|
||||||
client *http.Client
|
|
||||||
server *http.Server
|
|
||||||
listener TraefikListener
|
|
||||||
address url.URL
|
|
||||||
domain string
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewTraefikGateway(cfg config.HostConfig, listener TraefikListener) *TraefikGateway {
|
|
||||||
mux := http.NewServeMux()
|
|
||||||
gw := &TraefikGateway{
|
|
||||||
client: &http.Client{},
|
|
||||||
|
|
||||||
server: &http.Server{
|
|
||||||
Addr: cfg.ListenAddress,
|
|
||||||
Handler: mux,
|
|
||||||
},
|
|
||||||
listener: listener,
|
|
||||||
address: url.URL{
|
|
||||||
Scheme: "http",
|
|
||||||
Host: cfg.LocalAddress,
|
|
||||||
},
|
|
||||||
domain: cfg.Domain,
|
|
||||||
}
|
|
||||||
|
|
||||||
mux.HandleFunc("/callback", gw.onCallback)
|
|
||||||
return gw
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g *TraefikGateway) Listen() error {
|
|
||||||
return g.server.ListenAndServe()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g *TraefikGateway) Shutdown(ctx context.Context) error {
|
|
||||||
return g.server.Shutdown(ctx)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g *TraefikGateway) GetRawData() (*traefikResponse, error) {
|
|
||||||
var raw TraefikRawResponse
|
|
||||||
|
|
||||||
url := g.address
|
|
||||||
url.Path = "/api/rawdata"
|
|
||||||
|
|
||||||
req := http.Request{
|
|
||||||
Method: http.MethodGet,
|
|
||||||
URL: &url,
|
|
||||||
}
|
|
||||||
|
|
||||||
req.Host = g.domain
|
|
||||||
|
|
||||||
r, err := g.client.Do(&req)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("make request: %w", err)
|
|
||||||
}
|
|
||||||
defer r.Body.Close()
|
|
||||||
|
|
||||||
if err := json.NewDecoder(r.Body).Decode(&raw); err != nil {
|
|
||||||
return nil, fmt.Errorf("unmarshal body: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
out := parseTraefikResponse(raw)
|
|
||||||
return &out, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (g *TraefikGateway) onCallback(w http.ResponseWriter, req *http.Request) {
|
|
||||||
var raw TraefikRawResponse
|
|
||||||
if err := json.NewDecoder(req.Body).Decode(&raw); err != nil {
|
|
||||||
w.WriteHeader(http.StatusInternalServerError)
|
|
||||||
log.Err(err).Msg("unable to decode traefik callback data")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
resp := parseTraefikResponse(raw)
|
|
||||||
if g.listener != nil {
|
|
||||||
g.listener.OnTraefikUpdate(resp)
|
|
||||||
}
|
|
||||||
|
|
||||||
w.Write([]byte("OK"))
|
|
||||||
}
|
|
||||||
|
|
@ -2,37 +2,36 @@ package host
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net/http"
|
||||||
"slices"
|
"slices"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/app"
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/config"
|
"git.wzray.com/homelab/hivemind/internal/config"
|
||||||
"git.wzray.com/homelab/hivemind/internal/transport"
|
"git.wzray.com/homelab/hivemind/internal/state"
|
||||||
"git.wzray.com/homelab/hivemind/internal/transport/host"
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/types"
|
"git.wzray.com/homelab/hivemind/internal/types"
|
||||||
|
"git.wzray.com/homelab/hivemind/internal/web/client"
|
||||||
"github.com/rs/zerolog/log"
|
"github.com/rs/zerolog/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Role struct {
|
type Role struct {
|
||||||
state *app.State
|
state *state.RuntimeState
|
||||||
config config.HostConfig
|
config config.HostConfig
|
||||||
|
|
||||||
gateway *TraefikGateway
|
client *traefikClient
|
||||||
tasksGroup sync.WaitGroup
|
tasksGroup sync.WaitGroup
|
||||||
|
|
||||||
externalDomains []string // TODO: i don't like hardcoding external/internal logic here
|
externalDomains []string // TODO: i don't like hardcoding external/internal logic here
|
||||||
internalDomains []string
|
internalDomains []string
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(state *app.State, config config.HostConfig) *Role {
|
func New(state *state.RuntimeState, config config.HostConfig) *Role {
|
||||||
r := &Role{
|
return &Role{
|
||||||
|
client: newClient(config.Domain, config.LocalAddress),
|
||||||
state: state,
|
state: state,
|
||||||
config: config,
|
config: config,
|
||||||
}
|
}
|
||||||
|
|
||||||
r.gateway = NewTraefikGateway(config, r)
|
|
||||||
return r
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Role) sendUpdate(domains []string, role types.Role) {
|
func (r *Role) sendUpdate(domains []string, role types.Role) {
|
||||||
|
|
@ -46,7 +45,7 @@ func (r *Role) sendUpdate(domains []string, role types.Role) {
|
||||||
r.tasksGroup.Go(func() {
|
r.tasksGroup.Go(func() {
|
||||||
logger := log.With().Str("name", node.Hostname).Logger()
|
logger := log.With().Str("name", node.Hostname).Logger()
|
||||||
logger.Debug().Msg("sending update")
|
logger.Debug().Msg("sending update")
|
||||||
if _, err := r.state.Clients.DNS.Callback(node.Endpoint, state); err != nil {
|
if _, err := client.Post[any](node.Endpoint, types.PathDnsCallback, state); err != nil {
|
||||||
logger.Warn().Err(err).Msg("unable to send dns info")
|
logger.Warn().Err(err).Msg("unable to send dns info")
|
||||||
} else {
|
} else {
|
||||||
logger.Debug().Msg("update sent")
|
logger.Debug().Msg("update sent")
|
||||||
|
|
@ -55,7 +54,7 @@ func (r *Role) sendUpdate(domains []string, role types.Role) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Role) OnTraefikUpdate(resp traefikResponse) {
|
func (r *Role) mutateState(resp traefikResponse) {
|
||||||
newInternal := resp.Domains(r.config.InternalEntrypoint)
|
newInternal := resp.Domains(r.config.InternalEntrypoint)
|
||||||
newExternal := resp.Domains(r.config.ExternalEntrypoint)
|
newExternal := resp.Domains(r.config.ExternalEntrypoint)
|
||||||
|
|
||||||
|
|
@ -72,7 +71,20 @@ func (r *Role) OnTraefikUpdate(resp traefikResponse) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Role) Dns() (types.HostState, error) {
|
func (r *Role) onCallback(w http.ResponseWriter, req *http.Request) {
|
||||||
|
var resp traefikResponse
|
||||||
|
if err := json.NewDecoder(req.Body).Decode(&resp); err != nil {
|
||||||
|
w.WriteHeader(http.StatusInternalServerError)
|
||||||
|
log.Err(err).Msg("unable to decode traefik callback data")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
r.mutateState(resp)
|
||||||
|
|
||||||
|
w.Write([]byte("OK"))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Role) getInternal() (types.HostState, error) {
|
||||||
return types.HostState{
|
return types.HostState{
|
||||||
Domains: r.internalDomains,
|
Domains: r.internalDomains,
|
||||||
Address: r.config.IpAddress,
|
Address: r.config.IpAddress,
|
||||||
|
|
@ -80,7 +92,7 @@ func (r *Role) Dns() (types.HostState, error) {
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Role) Nameserver() (types.HostState, error) {
|
func (r *Role) getExternal() (types.HostState, error) {
|
||||||
return types.HostState{
|
return types.HostState{
|
||||||
Domains: r.externalDomains,
|
Domains: r.externalDomains,
|
||||||
Address: r.config.IpAddress,
|
Address: r.config.IpAddress,
|
||||||
|
|
@ -89,25 +101,14 @@ func (r *Role) Nameserver() (types.HostState, error) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Role) RegisterHandlers(rg transport.Registrator) {
|
func (r *Role) RegisterHandlers(rg types.Registrator) {
|
||||||
host.Register(rg, r)
|
rg.RegisterRaw(http.MethodPost, types.PathHostCallback.String(), r.onCallback)
|
||||||
|
rg.Register(types.GetEndpoint(types.PathHostDns, r.getInternal))
|
||||||
|
rg.Register(types.GetEndpoint(types.PathHostNs, r.getExternal))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Role) OnStartup(ctx context.Context) error {
|
func (r *Role) OnStartup(ctx context.Context) error {
|
||||||
r.tasksGroup.Go(func() {
|
resp, err := r.client.GetRawData()
|
||||||
if err := r.gateway.Listen(); err != nil {
|
|
||||||
log.Err(err).Msg("traefik gateway stopped")
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
r.tasksGroup.Go(func() {
|
|
||||||
<-ctx.Done()
|
|
||||||
if err := r.gateway.Shutdown(context.Background()); err != nil {
|
|
||||||
log.Err(err).Msg("failed to shutdown traefik gateway")
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
resp, err := r.gateway.GetRawData()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("get traefik state: %w", err)
|
return fmt.Errorf("get traefik state: %w", err)
|
||||||
}
|
}
|
||||||
|
|
@ -115,7 +116,7 @@ func (r *Role) OnStartup(ctx context.Context) error {
|
||||||
log.Info().Msg("got raw data from traefik")
|
log.Info().Msg("got raw data from traefik")
|
||||||
log.Debug().Interface("response", resp).Send()
|
log.Debug().Interface("response", resp).Send()
|
||||||
|
|
||||||
r.OnTraefikUpdate(*resp)
|
r.mutateState(*resp)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1 +1,58 @@
|
||||||
package host
|
package host
|
||||||
|
|
||||||
|
import (
|
||||||
|
"crypto/tls"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
)
|
||||||
|
|
||||||
|
type traefikClient struct {
|
||||||
|
client *http.Client
|
||||||
|
domain string
|
||||||
|
address url.URL
|
||||||
|
}
|
||||||
|
|
||||||
|
func newClient(domain string, addr string) *traefikClient {
|
||||||
|
return &traefikClient{
|
||||||
|
domain: domain,
|
||||||
|
address: url.URL{
|
||||||
|
Scheme: "https",
|
||||||
|
Host: addr,
|
||||||
|
},
|
||||||
|
client: &http.Client{
|
||||||
|
Transport: &http.Transport{
|
||||||
|
TLSClientConfig: &tls.Config{
|
||||||
|
ServerName: domain,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *traefikClient) GetRawData() (*traefikResponse, error) {
|
||||||
|
var out traefikResponse
|
||||||
|
|
||||||
|
url := c.address
|
||||||
|
url.Path = "/api/rawdata"
|
||||||
|
|
||||||
|
req := http.Request{
|
||||||
|
Method: "GET",
|
||||||
|
URL: &url,
|
||||||
|
}
|
||||||
|
|
||||||
|
req.Host = c.domain
|
||||||
|
|
||||||
|
r, err := c.client.Do(&req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("make request: %w", err)
|
||||||
|
}
|
||||||
|
defer r.Body.Close()
|
||||||
|
|
||||||
|
if err := json.NewDecoder(r.Body).Decode(&out); err != nil {
|
||||||
|
return nil, fmt.Errorf("unmarshal body: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &out, nil
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,66 +1,64 @@
|
||||||
package host
|
package host
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"encoding/json"
|
||||||
"regexp"
|
"regexp"
|
||||||
"slices"
|
"slices"
|
||||||
)
|
)
|
||||||
|
|
||||||
var hostRegex = regexp.MustCompile("Host\\(`([^()`]+\\.[^()`]+)`\\)")
|
var hostRegex = regexp.MustCompile("Host\\(`([^()`]+\\.[^()`]+)`\\)")
|
||||||
|
|
||||||
type traefikRule struct {
|
type rule struct {
|
||||||
Raw string
|
Raw string
|
||||||
Domains []string
|
Domains []string
|
||||||
Valid bool
|
Valid bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type traefikRouter struct {
|
func (r *rule) UnmarshalJSON(data []byte) error {
|
||||||
Rule traefikRule
|
r.Valid = false
|
||||||
Entrypoints []string
|
|
||||||
}
|
|
||||||
|
|
||||||
type traefikResponse struct {
|
raw := ""
|
||||||
Routers []traefikRouter
|
if err := json.Unmarshal(data, &raw); err != nil {
|
||||||
}
|
return err
|
||||||
|
|
||||||
type TraefikRawResponse struct {
|
|
||||||
Routers map[string]TraefikRawRouter `json:"routers"`
|
|
||||||
}
|
|
||||||
|
|
||||||
type TraefikRawRouter struct {
|
|
||||||
Rule string `json:"rule"`
|
|
||||||
Entrypoints []string `json:"entryPoints"`
|
|
||||||
}
|
|
||||||
|
|
||||||
func parseTraefikResponse(raw TraefikRawResponse) traefikResponse {
|
|
||||||
out := traefikResponse{
|
|
||||||
Routers: make([]traefikRouter, 0, len(raw.Routers)),
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, router := range raw.Routers {
|
|
||||||
out.Routers = append(out.Routers, traefikRouter{
|
|
||||||
Rule: parseTraefikRule(router.Rule),
|
|
||||||
Entrypoints: router.Entrypoints,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
return out
|
|
||||||
}
|
|
||||||
|
|
||||||
func parseTraefikRule(raw string) traefikRule {
|
|
||||||
rule := traefikRule{
|
|
||||||
Raw: raw,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
matches := hostRegex.FindAllStringSubmatch(raw, -1)
|
matches := hostRegex.FindAllStringSubmatch(raw, -1)
|
||||||
|
|
||||||
for _, match := range matches {
|
for _, match := range matches {
|
||||||
if len(match) <= 1 {
|
if len(match) <= 1 {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
rule.Domains = append(rule.Domains, match[1:]...)
|
r.Domains = append(r.Domains, match[1:]...)
|
||||||
}
|
}
|
||||||
|
|
||||||
rule.Valid = len(rule.Domains) > 0
|
r.Valid = len(r.Domains) > 0
|
||||||
return rule
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type router struct {
|
||||||
|
Rule rule `json:"rule"`
|
||||||
|
Entrypoints []string `json:"entryPoints"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type traefikResponse struct {
|
||||||
|
Routers []router
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *traefikResponse) UnmarshalJSON(data []byte) error {
|
||||||
|
var raw struct {
|
||||||
|
Routers map[string]router `json:"routers"`
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := json.Unmarshal(data, &raw); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, v := range raw.Routers {
|
||||||
|
r.Routers = append(r.Routers, v)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r traefikResponse) Domains(entrypoint string) []string {
|
func (r traefikResponse) Domains(entrypoint string) []string {
|
||||||
|
|
|
||||||
|
|
@ -4,24 +4,22 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/app"
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/config"
|
"git.wzray.com/homelab/hivemind/internal/config"
|
||||||
"git.wzray.com/homelab/hivemind/internal/roles"
|
"git.wzray.com/homelab/hivemind/internal/roles"
|
||||||
"git.wzray.com/homelab/hivemind/internal/transport"
|
"git.wzray.com/homelab/hivemind/internal/state"
|
||||||
"git.wzray.com/homelab/hivemind/internal/transport/master"
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/types"
|
"git.wzray.com/homelab/hivemind/internal/types"
|
||||||
"github.com/rs/zerolog/log"
|
"git.wzray.com/homelab/hivemind/internal/web/client"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Role struct {
|
type Role struct {
|
||||||
state *app.State
|
state *state.RuntimeState
|
||||||
config config.MasterConfig
|
config config.MasterConfig
|
||||||
tasksGroup sync.WaitGroup
|
tasksGroup sync.WaitGroup
|
||||||
observer *observer
|
observer *observer
|
||||||
roles.BaseRole
|
roles.BaseRole
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(state *app.State, config config.MasterConfig) *Role {
|
func New(state *state.RuntimeState, config config.MasterConfig) *Role {
|
||||||
return &Role{
|
return &Role{
|
||||||
state: state,
|
state: state,
|
||||||
config: config,
|
config: config,
|
||||||
|
|
@ -38,7 +36,7 @@ func New(state *app.State, config config.MasterConfig) *Role {
|
||||||
func (r *Role) OnStartup(ctx context.Context) error {
|
func (r *Role) OnStartup(ctx context.Context) error {
|
||||||
r.tasksGroup.Go(func() {
|
r.tasksGroup.Go(func() {
|
||||||
r.observer.Start(ctx, func(n types.Node) error {
|
r.observer.Start(ctx, func(n types.Node) error {
|
||||||
_, err := r.onLeave(n, true)
|
_, err := r.onLeave(n)
|
||||||
return err
|
return err
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
@ -51,95 +49,48 @@ func (r *Role) OnShutdown() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Role) RegisterHandlers(r transport.Registrator) {
|
func (r *Role) notify(path types.Path, v any) {
|
||||||
master.Register(r, c)
|
for _, n := range r.state.Registry.Nodes() {
|
||||||
|
addr := n.Endpoint
|
||||||
|
r.tasksGroup.Go(func() {
|
||||||
|
client.Post[any](addr, path, v)
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Role) Heartbeat(node types.Node) (types.Nodes, error) {
|
func (r *Role) onJoin(node types.Node) (map[string]types.Node, error) {
|
||||||
return r.onKeepAlive(node, true)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Role) Join(node types.Node) (types.Nodes, error) {
|
|
||||||
return r.onJoin(node, true)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Role) Leave(node types.Node) error {
|
|
||||||
_, err := r.onLeave(node, true)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Role) EventHeartbeat(node types.Node) (types.Nodes, error) {
|
|
||||||
return r.onKeepAlive(node, false)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Role) EventJoin(node types.Node) (types.Nodes, error) {
|
|
||||||
return r.onJoin(node, false)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Role) EventLeave(node types.Node) error {
|
|
||||||
_, err := r.onLeave(node, false)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Role) onJoin(node types.Node, notify bool) (map[string]types.Node, error) {
|
|
||||||
if err := r.state.Registry.AddNode(node); err != nil {
|
if err := r.state.Registry.AddNode(node); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if notify {
|
r.notify(types.PathNodeJoin, node)
|
||||||
propagate(r, noReturn(r.state.Clients.Master.EventJoin), node)
|
|
||||||
}
|
|
||||||
|
|
||||||
return r.state.Registry.AllNodes(), nil
|
return r.state.Registry.AllNodes(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Role) onLeave(node types.Node, notify bool) (bool, error) {
|
func (r *Role) onLeave(node types.Node) (bool, error) {
|
||||||
if err := r.state.Registry.RemoveNode(node); err != nil {
|
if err := r.state.Registry.RemoveNode(node); err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if notify {
|
r.notify(types.PathNodeLeave, node)
|
||||||
propagate(r, r.state.Clients.Master.EventLeave, node)
|
|
||||||
}
|
|
||||||
|
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Role) onKeepAlive(node types.Node, notify bool) (map[string]types.Node, error) {
|
func (r *Role) onKeepAlive(node types.Node) (bool, error) {
|
||||||
r.observer.onKeepAlive(node)
|
r.observer.onKeepAlive(node)
|
||||||
|
|
||||||
if ok := r.state.Registry.Exists(node.Hostname); !ok {
|
if ok := r.state.Registry.Exists(node.Hostname); !ok {
|
||||||
// TODO: i don't like this side effect
|
_, err := r.onJoin(node)
|
||||||
if _, err := r.onJoin(node, true); err != nil {
|
return true, err
|
||||||
log.Warn().Err(err).Msg("unable to add node to the registry from keepalive")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if notify {
|
return false, nil
|
||||||
propagate(r, noReturn(r.state.Clients.Master.EventHeartbeat), node)
|
|
||||||
}
|
|
||||||
|
|
||||||
return r.state.Registry.AllNodes(), nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func eventFunc[R any](fn func(types.Node, bool) (R, error), notify bool) func(types.Node) (R, error) {
|
func (c *Role) RegisterHandlers(r types.Registrator) {
|
||||||
return func(n types.Node) (R, error) {
|
r.Register(types.PostEndpoint(types.PathMasterJoin, c.onJoin))
|
||||||
return fn(n, notify)
|
r.Register(types.PostEndpoint(types.PathMasterLeave, c.onLeave))
|
||||||
}
|
r.Register(types.PostEndpoint(types.PathMasterKeepalive, c.onKeepAlive))
|
||||||
}
|
|
||||||
|
|
||||||
func noReturn[T, V any](fn func(string, T) (V, error)) func(string, T) error {
|
|
||||||
return func(s string, t T) error {
|
|
||||||
_, err := fn(s, t)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func propagate[T any](r *Role, handler func(string, T) error, v T) {
|
|
||||||
for _, n := range r.state.Registry.ByRole(types.MasterRole) {
|
|
||||||
addr := n.Endpoint
|
|
||||||
r.tasksGroup.Go(func() {
|
|
||||||
handler(addr, v)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -5,14 +5,15 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/app"
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/registry"
|
"git.wzray.com/homelab/hivemind/internal/registry"
|
||||||
|
"git.wzray.com/homelab/hivemind/internal/state"
|
||||||
"git.wzray.com/homelab/hivemind/internal/types"
|
"git.wzray.com/homelab/hivemind/internal/types"
|
||||||
|
"git.wzray.com/homelab/hivemind/internal/web/client"
|
||||||
"github.com/rs/zerolog/log"
|
"github.com/rs/zerolog/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
type observer struct {
|
type observer struct {
|
||||||
state *app.State
|
state *state.RuntimeState
|
||||||
interval int
|
interval int
|
||||||
backoff int
|
backoff int
|
||||||
backoffCount int
|
backoffCount int
|
||||||
|
|
@ -22,7 +23,7 @@ type observer struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
func newObserver(
|
func newObserver(
|
||||||
state *app.State,
|
state *state.RuntimeState,
|
||||||
interval int,
|
interval int,
|
||||||
backoff int,
|
backoff int,
|
||||||
backoffCount int,
|
backoffCount int,
|
||||||
|
|
@ -62,7 +63,7 @@ func (o *observer) pollNodes(ctx context.Context, onLeave func(types.Node) error
|
||||||
delay := time.Duration(o.backoff)
|
delay := time.Duration(o.backoff)
|
||||||
alive := false
|
alive := false
|
||||||
for i := o.backoffCount - 1; i >= 0; i-- {
|
for i := o.backoffCount - 1; i >= 0; i-- {
|
||||||
_, err := o.state.Clients.Node.Healthcheck(n.Endpoint)
|
_, err := client.Get[any](n.Endpoint, types.PathNodeHealthcheck)
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
logger.Debug().Msg("node is alive")
|
logger.Debug().Msg("node is alive")
|
||||||
|
|
|
||||||
|
|
@ -7,41 +7,26 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/app"
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/config"
|
"git.wzray.com/homelab/hivemind/internal/config"
|
||||||
"git.wzray.com/homelab/hivemind/internal/transport"
|
"git.wzray.com/homelab/hivemind/internal/state"
|
||||||
"git.wzray.com/homelab/hivemind/internal/transport/node"
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/types"
|
"git.wzray.com/homelab/hivemind/internal/types"
|
||||||
|
"git.wzray.com/homelab/hivemind/internal/web/client"
|
||||||
"github.com/rs/zerolog/log"
|
"github.com/rs/zerolog/log"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Role struct {
|
type Role struct {
|
||||||
state *app.State
|
state *state.RuntimeState
|
||||||
keepaliveGroup sync.WaitGroup
|
keepaliveGroup sync.WaitGroup
|
||||||
config config.NodeConfig
|
config config.NodeConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(state *app.State, config config.NodeConfig) *Role {
|
func New(state *state.RuntimeState, config config.NodeConfig) *Role {
|
||||||
return &Role{
|
return &Role{
|
||||||
state: state,
|
state: state,
|
||||||
config: config,
|
config: config,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Role) OnStartup(ctx context.Context) error {
|
|
||||||
r.keepaliveGroup.Go(r.keepaliveFunc(ctx))
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Role) OnShutdown() error {
|
|
||||||
r.keepaliveGroup.Wait()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (n *Role) RegisterHandlers(r transport.Registrator) {
|
|
||||||
node.Register(r, n)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (r *Role) Join(bootstrap string) error {
|
func (r *Role) Join(bootstrap string) error {
|
||||||
masters := make(map[string]struct{})
|
masters := make(map[string]struct{})
|
||||||
for _, node := range r.state.Registry.ByRole(types.MasterRole) {
|
for _, node := range r.state.Registry.ByRole(types.MasterRole) {
|
||||||
|
|
@ -61,14 +46,14 @@ func (r *Role) Join(bootstrap string) error {
|
||||||
logger := log.With().Str("host", m).Logger()
|
logger := log.With().Str("host", m).Logger()
|
||||||
logger.Debug().Msg("trying to join via master")
|
logger.Debug().Msg("trying to join via master")
|
||||||
|
|
||||||
nodes, err := r.state.Clients.Master.Join(m, r.state.Self)
|
nodes, err := client.Post[map[string]types.Node](m, types.PathMasterJoin, r.state.Self)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errs = append(errs, err)
|
errs = append(errs, err)
|
||||||
logger.Debug().Err(err).Msg("unable to join")
|
logger.Debug().Err(err).Msg("unable to join")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := r.state.Registry.Set(nodes); err != nil {
|
if err := r.state.Registry.Set(*nodes); err != nil {
|
||||||
logger.Debug().Err(err).Msg("unable to set master's nodes")
|
logger.Debug().Err(err).Msg("unable to set master's nodes")
|
||||||
errs = append(errs, err)
|
errs = append(errs, err)
|
||||||
continue
|
continue
|
||||||
|
|
@ -91,7 +76,8 @@ func (r *Role) Leave() error {
|
||||||
logger := log.With().Str("name", m.Hostname).Logger()
|
logger := log.With().Str("name", m.Hostname).Logger()
|
||||||
logger.Debug().Msg("sending leave message")
|
logger.Debug().Msg("sending leave message")
|
||||||
|
|
||||||
if err := r.state.Clients.Master.Leave(m.Endpoint, r.state.Self); err != nil {
|
_, err := client.Post[any](m.Endpoint, types.PathMasterLeave, r.state.Self)
|
||||||
|
if err != nil {
|
||||||
logger.Debug().Err(err).Msg("unable to send leave message")
|
logger.Debug().Err(err).Msg("unable to send leave message")
|
||||||
errs = append(errs, err)
|
errs = append(errs, err)
|
||||||
continue
|
continue
|
||||||
|
|
@ -104,8 +90,15 @@ func (r *Role) Leave() error {
|
||||||
return fmt.Errorf("unable to send leave message to any master: %w", errors.Join(errs...))
|
return fmt.Errorf("unable to send leave message to any master: %w", errors.Join(errs...))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Role) Healthcheck() (string, error) {
|
func (r *Role) OnStartup(ctx context.Context) error {
|
||||||
return "OK", nil
|
r.keepaliveGroup.Go(r.keepaliveFunc(ctx))
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Role) OnShutdown() error {
|
||||||
|
r.keepaliveGroup.Wait()
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Role) keepaliveFunc(ctx context.Context) func() {
|
func (r *Role) keepaliveFunc(ctx context.Context) func() {
|
||||||
|
|
@ -114,20 +107,11 @@ func (r *Role) keepaliveFunc(ctx context.Context) func() {
|
||||||
logger := log.With().Str("name", m.Hostname).Logger()
|
logger := log.With().Str("name", m.Hostname).Logger()
|
||||||
logger.Debug().Msg("sending keepalive packet")
|
logger.Debug().Msg("sending keepalive packet")
|
||||||
|
|
||||||
nodes, err := r.state.Clients.Master.Heartbeat(m.Endpoint, r.state.Self)
|
if _, err := client.Post[any](m.Endpoint, types.PathMasterKeepalive, r.state.Self); err != nil {
|
||||||
if err != nil {
|
|
||||||
logger.Info().Err(err).Msg("unable to send keepalive packet")
|
logger.Info().Err(err).Msg("unable to send keepalive packet")
|
||||||
continue
|
} else {
|
||||||
|
logger.Debug().Msg("keepalive packet sent")
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Debug().Msg("keepalive packet sent")
|
|
||||||
|
|
||||||
if err := r.state.Registry.Set(nodes); err != nil {
|
|
||||||
logger.Warn().Err(err).Msg("unable to set masters nodes")
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
break
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -142,3 +126,27 @@ func (r *Role) keepaliveFunc(ctx context.Context) func() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *Role) onJoin(node types.Node) (bool, error) {
|
||||||
|
if err := r.state.Registry.AddNode(node); err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *Role) onLeave(node types.Node) (bool, error) {
|
||||||
|
if err := r.state.Registry.RemoveNode(node); err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func healthcheck() (string, error) {
|
||||||
|
return "OK", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (n *Role) RegisterHandlers(r types.Registrator) {
|
||||||
|
r.Register(types.GetEndpoint(types.PathNodeHealthcheck, healthcheck))
|
||||||
|
r.Register(types.PostEndpoint(types.PathNodeJoin, n.onJoin))
|
||||||
|
r.Register(types.PostEndpoint(types.PathNodeLeave, n.onLeave))
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -3,18 +3,18 @@ package roles
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/transport"
|
"git.wzray.com/homelab/hivemind/internal/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Role interface {
|
type Role interface {
|
||||||
RegisterHandlers(transport.Registrator)
|
RegisterHandlers(types.Registrator)
|
||||||
OnStartup(context.Context) error
|
OnStartup(context.Context) error
|
||||||
OnShutdown() error
|
OnShutdown() error
|
||||||
}
|
}
|
||||||
|
|
||||||
type BaseRole struct{}
|
type BaseRole struct{}
|
||||||
|
|
||||||
func (r *BaseRole) RegisterHandlers(transport.Registrator) {}
|
func (r *BaseRole) RegisterHandlers(types.Registrator) {}
|
||||||
|
|
||||||
func (r *BaseRole) OnStartup(context.Context) error { return nil }
|
func (r *BaseRole) OnStartup(context.Context) error { return nil }
|
||||||
|
|
||||||
|
|
|
||||||
18
internal/state/runtime.go
Normal file
18
internal/state/runtime.go
Normal file
|
|
@ -0,0 +1,18 @@
|
||||||
|
package state
|
||||||
|
|
||||||
|
import (
|
||||||
|
"git.wzray.com/homelab/hivemind/internal/registry"
|
||||||
|
"git.wzray.com/homelab/hivemind/internal/types"
|
||||||
|
)
|
||||||
|
|
||||||
|
type RuntimeState struct {
|
||||||
|
Registry *registry.Registry
|
||||||
|
Self types.Node
|
||||||
|
}
|
||||||
|
|
||||||
|
func New(r *registry.Registry, n types.Node) *RuntimeState {
|
||||||
|
return &RuntimeState{
|
||||||
|
Registry: r,
|
||||||
|
Self: n,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -1,5 +0,0 @@
|
||||||
package transport
|
|
||||||
|
|
||||||
type Caller interface {
|
|
||||||
Call(host string, path string, data any, out any) error
|
|
||||||
}
|
|
||||||
|
|
@ -1,16 +0,0 @@
|
||||||
package codec
|
|
||||||
|
|
||||||
type Codec interface {
|
|
||||||
Decode(data []byte, out any) error
|
|
||||||
Encode(data any) ([]byte, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
func Decode[T any](c Codec, data []byte) (T, error) {
|
|
||||||
var out T
|
|
||||||
err := c.Decode(data, &out)
|
|
||||||
return out, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func Encode[T any](c Codec, data T) ([]byte, error) {
|
|
||||||
return c.Encode(data)
|
|
||||||
}
|
|
||||||
|
|
@ -1,15 +0,0 @@
|
||||||
package codec
|
|
||||||
|
|
||||||
import "encoding/json"
|
|
||||||
|
|
||||||
type jsonCodec struct{}
|
|
||||||
|
|
||||||
var JSON = jsonCodec{}
|
|
||||||
|
|
||||||
func (jsonCodec) Decode(data []byte, out any) error {
|
|
||||||
return json.Unmarshal(data, out)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (jsonCodec) Encode(data any) ([]byte, error) {
|
|
||||||
return json.Marshal(data)
|
|
||||||
}
|
|
||||||
|
|
@ -1,20 +0,0 @@
|
||||||
package dns
|
|
||||||
|
|
||||||
import (
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/transport"
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/types"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Client struct {
|
|
||||||
caller transport.Caller
|
|
||||||
}
|
|
||||||
|
|
||||||
func New(caller transport.Caller) *Client {
|
|
||||||
return &Client{
|
|
||||||
caller: caller,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Client) Callback(endpoint string, state types.HostState) (bool, error) {
|
|
||||||
return callbackRoute.Call(c.caller, endpoint, state)
|
|
||||||
}
|
|
||||||
|
|
@ -1,14 +0,0 @@
|
||||||
package dns
|
|
||||||
|
|
||||||
import (
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/transport"
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/types"
|
|
||||||
)
|
|
||||||
|
|
||||||
type DnsHandlers interface {
|
|
||||||
Callback(types.HostState) (bool, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
func Register(registrator transport.Registrator, h DnsHandlers) {
|
|
||||||
callbackRoute.Register(registrator, h.Callback)
|
|
||||||
}
|
|
||||||
|
|
@ -1,10 +0,0 @@
|
||||||
package dns
|
|
||||||
|
|
||||||
import (
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/transport"
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/types"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
callbackRoute = transport.NewRoute[types.HostState, bool]("/dns/callback")
|
|
||||||
)
|
|
||||||
|
|
@ -1,9 +0,0 @@
|
||||||
package transport
|
|
||||||
|
|
||||||
func WithoutInput[T any](f func() (T, error)) func(struct{}) (T, error) {
|
|
||||||
return func(s struct{}) (T, error) { return f() }
|
|
||||||
}
|
|
||||||
|
|
||||||
func WithoutOutput[T any](f func(T) error) func(T) (struct{}, error) {
|
|
||||||
return func(t T) (struct{}, error) { return struct{}{}, f(t) }
|
|
||||||
}
|
|
||||||
|
|
@ -1,24 +0,0 @@
|
||||||
package host
|
|
||||||
|
|
||||||
import (
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/transport"
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/types"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Client struct {
|
|
||||||
caller transport.Caller
|
|
||||||
}
|
|
||||||
|
|
||||||
func New(caller transport.Caller) *Client {
|
|
||||||
return &Client{
|
|
||||||
caller: caller,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Client) Dns(endpoint string) (types.HostState, error) {
|
|
||||||
return dnsRoute.CallNoInput(c.caller, endpoint)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Client) Nameserver(endpoint string) (types.HostState, error) {
|
|
||||||
return nsRoute.CallNoInput(c.caller, endpoint)
|
|
||||||
}
|
|
||||||
|
|
@ -1,16 +0,0 @@
|
||||||
package host
|
|
||||||
|
|
||||||
import (
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/transport"
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/types"
|
|
||||||
)
|
|
||||||
|
|
||||||
type HostHandlers interface {
|
|
||||||
Dns() (types.HostState, error)
|
|
||||||
Nameserver() (types.HostState, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
func Register(registrator transport.Registrator, h HostHandlers) {
|
|
||||||
dnsRoute.Register(registrator, transport.WithoutInput(h.Dns))
|
|
||||||
nsRoute.Register(registrator, transport.WithoutInput(h.Nameserver))
|
|
||||||
}
|
|
||||||
|
|
@ -1,11 +0,0 @@
|
||||||
package host
|
|
||||||
|
|
||||||
import (
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/transport"
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/types"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
dnsRoute = transport.NewRoute[struct{}, types.HostState]("/host/dns")
|
|
||||||
nsRoute = transport.NewRoute[struct{}, types.HostState]("/host/ns")
|
|
||||||
)
|
|
||||||
|
|
@ -1,40 +0,0 @@
|
||||||
package master
|
|
||||||
|
|
||||||
import (
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/transport"
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/types"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Client struct {
|
|
||||||
caller transport.Caller
|
|
||||||
}
|
|
||||||
|
|
||||||
func New(caller transport.Caller) *Client {
|
|
||||||
return &Client{
|
|
||||||
caller: caller,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Client) Heartbeat(endpoint string, n types.Node) (types.Nodes, error) {
|
|
||||||
return heartbeatRoute.Call(c.caller, endpoint, n)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Client) Join(endpoint string, n types.Node) (types.Nodes, error) {
|
|
||||||
return joinRoute.Call(c.caller, endpoint, n)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Client) Leave(endpoint string, n types.Node) error {
|
|
||||||
return leaveRoute.CallNoOutput(c.caller, endpoint, n)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Client) EventHeartbeat(endpoint string, n types.Node) (types.Nodes, error) {
|
|
||||||
return eventHeartbeatRoute.Call(c.caller, endpoint, n)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Client) EventJoin(endpoint string, n types.Node) (types.Nodes, error) {
|
|
||||||
return eventJoinRoute.Call(c.caller, endpoint, n)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Client) EventLeave(endpoint string, n types.Node) error {
|
|
||||||
return eventLeaveRoute.CallNoOutput(c.caller, endpoint, n)
|
|
||||||
}
|
|
||||||
|
|
@ -1,26 +0,0 @@
|
||||||
package master
|
|
||||||
|
|
||||||
import (
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/transport"
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/types"
|
|
||||||
)
|
|
||||||
|
|
||||||
type MasterHandlers interface {
|
|
||||||
Heartbeat(types.Node) (types.Nodes, error)
|
|
||||||
Join(types.Node) (types.Nodes, error)
|
|
||||||
Leave(types.Node) error
|
|
||||||
|
|
||||||
EventHeartbeat(types.Node) (types.Nodes, error)
|
|
||||||
EventJoin(types.Node) (types.Nodes, error)
|
|
||||||
EventLeave(types.Node) error
|
|
||||||
}
|
|
||||||
|
|
||||||
func Register(registrator transport.Registrator, h MasterHandlers) {
|
|
||||||
heartbeatRoute.Register(registrator, h.Heartbeat)
|
|
||||||
joinRoute.Register(registrator, h.Join)
|
|
||||||
leaveRoute.Register(registrator, transport.WithoutOutput(h.Leave))
|
|
||||||
|
|
||||||
eventHeartbeatRoute.Register(registrator, h.EventHeartbeat)
|
|
||||||
eventJoinRoute.Register(registrator, h.EventJoin)
|
|
||||||
eventLeaveRoute.Register(registrator, transport.WithoutOutput(h.EventLeave))
|
|
||||||
}
|
|
||||||
|
|
@ -1,16 +0,0 @@
|
||||||
package master
|
|
||||||
|
|
||||||
import (
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/transport"
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/types"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
heartbeatRoute = transport.NewRoute[types.Node, types.Nodes]("/master/heartbeat")
|
|
||||||
joinRoute = transport.NewRoute[types.Node, types.Nodes]("/master/join")
|
|
||||||
leaveRoute = transport.NewRoute[types.Node, struct{}]("/master/leave")
|
|
||||||
|
|
||||||
eventHeartbeatRoute = transport.NewRoute[types.Node, types.Nodes]("/master/event/heartbeat")
|
|
||||||
eventJoinRoute = transport.NewRoute[types.Node, types.Nodes]("/master/event/join")
|
|
||||||
eventLeaveRoute = transport.NewRoute[types.Node, struct{}]("/master/event/leave")
|
|
||||||
)
|
|
||||||
|
|
@ -1,17 +0,0 @@
|
||||||
package node
|
|
||||||
|
|
||||||
import (
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/transport"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Client struct {
|
|
||||||
caller transport.Caller
|
|
||||||
}
|
|
||||||
|
|
||||||
func New(caller transport.Caller) *Client {
|
|
||||||
return &Client{caller: caller}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (c *Client) Healthcheck(endpoint string) (string, error) {
|
|
||||||
return healthcheckRoute.CallNoInput(c.caller, endpoint)
|
|
||||||
}
|
|
||||||
|
|
@ -1,13 +0,0 @@
|
||||||
package node
|
|
||||||
|
|
||||||
import (
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/transport"
|
|
||||||
)
|
|
||||||
|
|
||||||
type NodeHandlers interface {
|
|
||||||
Healthcheck() (string, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
func Register(registrator transport.Registrator, h NodeHandlers) {
|
|
||||||
healthcheckRoute.Register(registrator, transport.WithoutInput(h.Healthcheck))
|
|
||||||
}
|
|
||||||
|
|
@ -1,9 +0,0 @@
|
||||||
package node
|
|
||||||
|
|
||||||
import (
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/transport"
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
healthcheckRoute = transport.NewRoute[struct{}, string]("/node/healthcheck")
|
|
||||||
)
|
|
||||||
|
|
@ -1,22 +0,0 @@
|
||||||
package transport
|
|
||||||
|
|
||||||
import (
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/transport/codec"
|
|
||||||
)
|
|
||||||
|
|
||||||
type Handler struct {
|
|
||||||
path string
|
|
||||||
handler func(codec.Codec, []byte) ([]byte, error)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h Handler) Path() string {
|
|
||||||
return h.path
|
|
||||||
}
|
|
||||||
|
|
||||||
func (h Handler) Handle(c codec.Codec, v []byte) ([]byte, error) {
|
|
||||||
return h.handler(c, v)
|
|
||||||
}
|
|
||||||
|
|
||||||
type Registrator interface {
|
|
||||||
Register(endpoint Handler)
|
|
||||||
}
|
|
||||||
|
|
@ -1,65 +0,0 @@
|
||||||
package transport
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/transport/codec"
|
|
||||||
)
|
|
||||||
|
|
||||||
type route[In, Out any] struct {
|
|
||||||
path string
|
|
||||||
}
|
|
||||||
|
|
||||||
func NewRoute[In, Out any](path string) route[In, Out] {
|
|
||||||
return route[In, Out]{
|
|
||||||
path: path,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func routeToHandler[In, Out any](r route[In, Out], handler func(In) (Out, error)) Handler {
|
|
||||||
return Handler{
|
|
||||||
path: r.path,
|
|
||||||
handler: func(c codec.Codec, b []byte) ([]byte, error) {
|
|
||||||
data, err := codec.Decode[In](c, b)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("unable to decode body: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
out, err := handler(data)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("error while handling request: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
raw, err := codec.Encode(c, out)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("unable to encode body: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
return raw, nil
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e route[In, Out]) Call(caller Caller, host string, data In) (Out, error) {
|
|
||||||
var out Out
|
|
||||||
err := caller.Call(host, e.path, data, &out)
|
|
||||||
return out, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e route[In, Out]) CallNoInput(caller Caller, host string) (Out, error) {
|
|
||||||
var out Out
|
|
||||||
err := caller.Call(host, e.path, struct{}{}, &out)
|
|
||||||
return out, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e route[In, Out]) CallNoOutput(caller Caller, host string, data In) error {
|
|
||||||
return caller.Call(host, e.path, data, nil)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e route[In, Out]) Path() string {
|
|
||||||
return e.path
|
|
||||||
}
|
|
||||||
|
|
||||||
func (e route[In, Out]) Register(r Registrator, h func(In) (Out, error)) {
|
|
||||||
r.Register(routeToHandler(e, h))
|
|
||||||
}
|
|
||||||
|
|
@ -2,15 +2,13 @@ package types
|
||||||
|
|
||||||
import "fmt"
|
import "fmt"
|
||||||
|
|
||||||
type Nodes map[string]Node
|
|
||||||
|
|
||||||
// TODO: consider moving this type back to registry
|
// TODO: consider moving this type back to registry
|
||||||
type Node struct {
|
type Node struct {
|
||||||
Hostname string `json:"hostname"`
|
Hostname string `json:"hostname"`
|
||||||
Address string `json:"address"`
|
Address string `json:"address"`
|
||||||
Port int `json:"port"`
|
Port int `json:"port"`
|
||||||
Roles []Role `json:"roles"`
|
Roles []Role `json:"roles"`
|
||||||
Endpoint string `json:"endpoint"` // TODO: make endpoint into a separate type
|
Endpoint string `json:"endpoint"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewNode(
|
func NewNode(
|
||||||
|
|
|
||||||
|
|
@ -1,60 +1,78 @@
|
||||||
package types
|
package types
|
||||||
|
|
||||||
// type Path string
|
import (
|
||||||
//
|
"encoding/json"
|
||||||
// func (p Path) String() string {
|
"fmt"
|
||||||
// return string(p)
|
"net/http"
|
||||||
// }
|
)
|
||||||
//
|
|
||||||
// const (
|
// TODO: split this up
|
||||||
// PathMasterJoin Path = "/master/join"
|
|
||||||
// PathMasterLeave Path = "/master/leave"
|
type Path string
|
||||||
// PathMasterKeepalive Path = "/master/keepalive"
|
|
||||||
// PathMasterEventJoin Path = "/master/event_join"
|
func (p Path) String() string {
|
||||||
// PathMasterEventLeave Path = "/master/event_leave"
|
return string(p)
|
||||||
// PathMasterEventKeepalive Path = "/master/event_keepalive"
|
}
|
||||||
//
|
|
||||||
// PathNodeHealthcheck Path = "/node/healthcheck"
|
const (
|
||||||
//
|
PathMasterJoin Path = "/master/join"
|
||||||
// PathDnsCallback Path = "/dns/callback"
|
PathMasterLeave Path = "/master/leave"
|
||||||
//
|
PathMasterKeepalive Path = "/master/keepalive"
|
||||||
// PathHostCallback Path = "/host/callback"
|
|
||||||
// PathHostDns Path = "/host/dns"
|
PathNodeHealthcheck Path = "/node/healthcheck"
|
||||||
// PathHostNs Path = "/host/ns"
|
PathNodeJoin Path = "/node/join"
|
||||||
// )
|
PathNodeLeave Path = "/node/leave"
|
||||||
//
|
|
||||||
// type Route interface {
|
PathDnsCallback Path = "/dns/callback"
|
||||||
// Path() string
|
|
||||||
// Handle([]byte) (any, error)
|
PathHostCallback Path = "/host/callback"
|
||||||
// }
|
PathHostDns Path = "/host/dns"
|
||||||
//
|
PathHostNs Path = "/host/ns"
|
||||||
// type endpoint struct {
|
)
|
||||||
// path string
|
|
||||||
// handler func([]byte) (any, error)
|
type Response[T any] struct {
|
||||||
// }
|
Ok bool `json:"ok"`
|
||||||
//
|
Data T `json:"data,omitempty"`
|
||||||
// func (e endpoint) Path() string { return e.path }
|
Err string `json:"err,omitempty"`
|
||||||
//
|
}
|
||||||
// func (e endpoint) Handle(v []byte) (any, error) { return e.handler(v) }
|
|
||||||
//
|
type Route interface {
|
||||||
// func PostEndpoint[T any, V any](path Path, handler func(T) (V, error)) Route {
|
Path() string
|
||||||
// return endpoint{
|
Handle([]byte) (any, error)
|
||||||
// path: "POST " + path.String(),
|
}
|
||||||
// handler: func(a []byte) (any, error) {
|
|
||||||
// var r T
|
type endpoint struct {
|
||||||
// if err := json.Unmarshal(a, &r); err != nil {
|
path string
|
||||||
// return nil, fmt.Errorf("unable to unmarshal json: %w", err)
|
handler func([]byte) (any, error)
|
||||||
// }
|
}
|
||||||
// return handler(r)
|
|
||||||
// },
|
func (e endpoint) Path() string { return e.path }
|
||||||
// }
|
|
||||||
// }
|
func (e endpoint) Handle(v []byte) (any, error) { return e.handler(v) }
|
||||||
//
|
|
||||||
// func GetEndpoint[T any](path Path, handler func() (T, error)) Route {
|
func PostEndpoint[T any, V any](path Path, handler func(T) (V, error)) Route {
|
||||||
// return endpoint{
|
return endpoint{
|
||||||
// path: "GET " + path.String(),
|
path: "POST " + path.String(),
|
||||||
// handler: func(a []byte) (any, error) {
|
handler: func(a []byte) (any, error) {
|
||||||
// return handler()
|
var r T
|
||||||
// },
|
if err := json.Unmarshal(a, &r); err != nil {
|
||||||
// }
|
return nil, fmt.Errorf("unable to unmarshal json: %w", err)
|
||||||
// }
|
}
|
||||||
|
return handler(r)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetEndpoint[T any](path Path, handler func() (T, error)) Route {
|
||||||
|
return endpoint{
|
||||||
|
path: "GET " + path.String(),
|
||||||
|
handler: func(a []byte) (any, error) {
|
||||||
|
return handler()
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type Registrator interface {
|
||||||
|
Register(endpoint Route)
|
||||||
|
RegisterRaw(method string, pattern string, handler func(http.ResponseWriter, *http.Request))
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -10,18 +10,20 @@ import (
|
||||||
"net/url"
|
"net/url"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/web"
|
"git.wzray.com/homelab/hivemind/internal/types"
|
||||||
"git.wzray.com/homelab/hivemind/internal/web/middleware"
|
"git.wzray.com/homelab/hivemind/internal/web/middleware"
|
||||||
)
|
)
|
||||||
|
|
||||||
type Client struct {
|
type client struct {
|
||||||
http *http.Client
|
http *http.Client
|
||||||
middleware middleware.Middleware
|
middleware middleware.Middleware
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var defaultClient *client
|
||||||
|
|
||||||
const timeout = time.Duration(2) * time.Second
|
const timeout = time.Duration(2) * time.Second
|
||||||
|
|
||||||
func (c *Client) Call(host string, path string, data any, out any) error {
|
func (c *client) makeRequest(method string, host string, path types.Path, data any, out any) error {
|
||||||
var body io.Reader
|
var body io.Reader
|
||||||
if data != nil {
|
if data != nil {
|
||||||
raw, err := json.Marshal(data)
|
raw, err := json.Marshal(data)
|
||||||
|
|
@ -34,10 +36,10 @@ func (c *Client) Call(host string, path string, data any, out any) error {
|
||||||
uri := (&url.URL{
|
uri := (&url.URL{
|
||||||
Scheme: "http",
|
Scheme: "http",
|
||||||
Host: host,
|
Host: host,
|
||||||
Path: path,
|
Path: path.String(),
|
||||||
}).String()
|
}).String()
|
||||||
|
|
||||||
r, err := http.NewRequest("POST", uri, body)
|
r, err := http.NewRequest(method, uri, body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("build http request: %w", err)
|
return fmt.Errorf("build http request: %w", err)
|
||||||
}
|
}
|
||||||
|
|
@ -50,41 +52,60 @@ func (c *Client) Call(host string, path string, data any, out any) error {
|
||||||
return fmt.Errorf("apply middleware: %w", err)
|
return fmt.Errorf("apply middleware: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
httpResponse, err := c.http.Do(r)
|
resp, err := c.http.Do(r)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("send request: %w", err)
|
return fmt.Errorf("send request: %w", err)
|
||||||
}
|
}
|
||||||
defer httpResponse.Body.Close()
|
|
||||||
|
|
||||||
if httpResponse.StatusCode < 200 || httpResponse.StatusCode >= 300 {
|
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
|
||||||
b, _ := io.ReadAll(httpResponse.Body)
|
b, _ := io.ReadAll(resp.Body)
|
||||||
return fmt.Errorf("http %d: %s", httpResponse.StatusCode, string(b))
|
return fmt.Errorf("http %d: %s", resp.StatusCode, string(b))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
defer resp.Body.Close()
|
||||||
if out != nil {
|
if out != nil {
|
||||||
var resp web.Response[json.RawMessage]
|
if err := json.NewDecoder(resp.Body).Decode(out); err != nil {
|
||||||
if err := json.NewDecoder(httpResponse.Body).Decode(&resp); err != nil {
|
return fmt.Errorf("decode body: %w", err)
|
||||||
return fmt.Errorf("decode response wrapper: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if !resp.Ok {
|
|
||||||
return fmt.Errorf("error on the remote: %w", errors.New(resp.Err))
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := json.Unmarshal(resp.Data, out); err != nil {
|
|
||||||
return fmt.Errorf("decode response body: %w", err)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
io.Copy(io.Discard, httpResponse.Body)
|
io.Copy(io.Discard, resp.Body)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(middleware middleware.Middleware) *Client {
|
func Init(mw middleware.Middleware) {
|
||||||
return &Client{
|
if defaultClient != nil {
|
||||||
|
panic("web.client: Init called twice")
|
||||||
|
}
|
||||||
|
|
||||||
|
defaultClient = &client{
|
||||||
http: &http.Client{
|
http: &http.Client{
|
||||||
Timeout: timeout,
|
Timeout: timeout,
|
||||||
},
|
},
|
||||||
middleware: middleware,
|
middleware: mw,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func request[Out any, In any](method string, host string, path types.Path, data In) (*Out, error) {
|
||||||
|
out := &types.Response[Out]{}
|
||||||
|
err := defaultClient.makeRequest(method, host, path, data, out)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if !out.Ok {
|
||||||
|
return nil, errors.New(out.Err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &out.Data, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: out should not be a pointer
|
||||||
|
func Get[Out any](host string, path types.Path) (*Out, error) {
|
||||||
|
return request[Out, any](http.MethodGet, host, path, nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: out should not be a pointer
|
||||||
|
func Post[Out any, In any](host string, path types.Path, data In) (*Out, error) {
|
||||||
|
return request[Out](http.MethodPost, host, path, data)
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -5,8 +5,7 @@ import (
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/transport"
|
"git.wzray.com/homelab/hivemind/internal/types"
|
||||||
"git.wzray.com/homelab/hivemind/internal/transport/codec"
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/web/middleware"
|
"git.wzray.com/homelab/hivemind/internal/web/middleware"
|
||||||
"github.com/rs/zerolog/log"
|
"github.com/rs/zerolog/log"
|
||||||
)
|
)
|
||||||
|
|
@ -16,7 +15,7 @@ type Server struct {
|
||||||
httpServer http.Server
|
httpServer http.Server
|
||||||
}
|
}
|
||||||
|
|
||||||
func New(addr string, middleware middleware.Middleware) *Server {
|
func NewServer(addr string, middleware middleware.Middleware) *Server {
|
||||||
mux := http.NewServeMux()
|
mux := http.NewServeMux()
|
||||||
s := &Server{
|
s := &Server{
|
||||||
mux: mux,
|
mux: mux,
|
||||||
|
|
@ -36,7 +35,7 @@ func (s *Server) Shutdown(ctx context.Context) error {
|
||||||
return s.httpServer.Shutdown(ctx)
|
return s.httpServer.Shutdown(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) handleFunc(route transport.Handler) func(w http.ResponseWriter, r *http.Request) {
|
func (s *Server) handleFunc(route types.Route) func(w http.ResponseWriter, r *http.Request) {
|
||||||
return func(w http.ResponseWriter, r *http.Request) {
|
return func(w http.ResponseWriter, r *http.Request) {
|
||||||
log.Debug(). // TODO: make this a middleware
|
log.Debug(). // TODO: make this a middleware
|
||||||
Str("method", r.Method).
|
Str("method", r.Method).
|
||||||
|
|
@ -47,36 +46,35 @@ func (s *Server) handleFunc(route transport.Handler) func(w http.ResponseWriter,
|
||||||
|
|
||||||
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
w.Header().Set("Content-Type", "application/json; charset=utf-8")
|
||||||
|
|
||||||
raw, err := io.ReadAll(r.Body)
|
body, err := io.ReadAll(r.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
w.Write(fail("read request body: %v", err))
|
w.Write(fail("read request body: %v", err))
|
||||||
log.Err(err).Msg("unable to read request body")
|
log.Err(err).Msg("unable to read request body")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
resp, err := route.Handle(codec.JSON, raw)
|
raw, err := route.Handle(body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
w.Write(fail("handle request: %v", err))
|
w.Write(fail("handle request: %v", err))
|
||||||
log.Err(err).Msg("unable to handle request")
|
log.Err(err).Msg("unable to handle request")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
payload, err := ok(resp)
|
data, err := ok(raw)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
w.Write(fail("marshal response: %v", err))
|
w.Write(fail("marshal response: %v", err))
|
||||||
log.Err(err).Msg("unable to marshal response")
|
log.Err(err).Msg("unable to marshal response")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
w.Write(payload)
|
w.Write(data)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Server) Register(endpoint transport.Handler) {
|
func (s *Server) Register(endpoint types.Route) {
|
||||||
s.mux.HandleFunc(endpoint.Path(), s.handleFunc(endpoint))
|
s.mux.HandleFunc(endpoint.Path(), s.handleFunc(endpoint))
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: i don't think that I need this?
|
|
||||||
func (s *Server) RegisterRaw(method string, pattern string, handler func(http.ResponseWriter, *http.Request)) {
|
func (s *Server) RegisterRaw(method string, pattern string, handler func(http.ResponseWriter, *http.Request)) {
|
||||||
s.mux.HandleFunc(method+" "+pattern, handler)
|
s.mux.HandleFunc(method+" "+pattern, handler)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -4,20 +4,20 @@ import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"git.wzray.com/homelab/hivemind/internal/web"
|
"git.wzray.com/homelab/hivemind/internal/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
func fail(format string, a ...any) []byte {
|
func fail(format string, a ...any) []byte {
|
||||||
r, _ := json.Marshal(web.Response[string]{
|
r, _ := json.Marshal(types.Response[string]{
|
||||||
Ok: false,
|
Ok: false,
|
||||||
Err: fmt.Sprintf(format, a...),
|
Err: fmt.Sprintf(format, a...),
|
||||||
})
|
})
|
||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
func ok(data []byte) ([]byte, error) {
|
func ok[T any](data T) ([]byte, error) {
|
||||||
return json.Marshal(web.Response[json.RawMessage]{
|
return json.Marshal(types.Response[T]{
|
||||||
Ok: true,
|
Ok: true,
|
||||||
Data: json.RawMessage(data),
|
Data: data,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,7 +0,0 @@
|
||||||
package web
|
|
||||||
|
|
||||||
type Response[T any] struct {
|
|
||||||
Ok bool `json:"ok"`
|
|
||||||
Data T `json:"data,omitempty"`
|
|
||||||
Err string `json:"err,omitempty"`
|
|
||||||
}
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue