refactor: move http api to a new transport layer
This commit is contained in:
parent
476c4b056f
commit
0448f66ab2
41 changed files with 822 additions and 390 deletions
|
|
@ -8,22 +8,23 @@ import (
|
|||
"strings"
|
||||
"sync"
|
||||
|
||||
"git.wzray.com/homelab/hivemind/internal/app"
|
||||
"git.wzray.com/homelab/hivemind/internal/config"
|
||||
"git.wzray.com/homelab/hivemind/internal/state"
|
||||
"git.wzray.com/homelab/hivemind/internal/transport"
|
||||
"git.wzray.com/homelab/hivemind/internal/transport/dns"
|
||||
"git.wzray.com/homelab/hivemind/internal/types"
|
||||
"git.wzray.com/homelab/hivemind/internal/web/client"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
const hostsDir = "/etc/hosts.d/"
|
||||
|
||||
type Role struct {
|
||||
state *state.RuntimeState
|
||||
state *app.State
|
||||
config config.DnsConfig
|
||||
group sync.WaitGroup
|
||||
}
|
||||
|
||||
func New(state *state.RuntimeState, config config.DnsConfig) *Role {
|
||||
func New(state *app.State, config config.DnsConfig) *Role {
|
||||
r := &Role{
|
||||
state: state,
|
||||
config: config,
|
||||
|
|
@ -32,28 +33,6 @@ func New(state *state.RuntimeState, config config.DnsConfig) *Role {
|
|||
return r
|
||||
}
|
||||
|
||||
func (r *Role) updateDnsmasq(filename string, data []byte) error {
|
||||
if err := os.WriteFile(filename, data, 0644); err != nil {
|
||||
return fmt.Errorf("write endpoint file %q: %w", filename, err)
|
||||
}
|
||||
|
||||
if err := r.reload(); err != nil {
|
||||
return fmt.Errorf("reload dnsmasq: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func parseState(state types.HostState) (string, []byte) {
|
||||
var builder strings.Builder
|
||||
|
||||
for _, d := range state.Domains {
|
||||
builder.WriteString(fmt.Sprintf("%s %s\n", state.Address, d))
|
||||
}
|
||||
|
||||
return hostsDir + state.Hostname, []byte(builder.String())
|
||||
}
|
||||
|
||||
func (r *Role) OnStartup(ctx context.Context) error {
|
||||
r.group.Go(func() {
|
||||
r.syncFromRegistry()
|
||||
|
|
@ -74,15 +53,46 @@ func (r *Role) OnStartup(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (r *Role) OnShutdown() error {
|
||||
r.group.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Role) RegisterHandlers(rg transport.Registrator) {
|
||||
dns.Register(rg, r)
|
||||
}
|
||||
|
||||
func (r *Role) Callback(state types.HostState) (bool, error) {
|
||||
filename, data := parseState(state)
|
||||
|
||||
if err := r.updateDnsmasq(filename, data); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (r *Role) updateDnsmasq(filename string, data []byte) error {
|
||||
if err := os.WriteFile(filename, data, 0644); err != nil {
|
||||
return fmt.Errorf("write endpoint file %q: %w", filename, err)
|
||||
}
|
||||
|
||||
if err := r.reload(); err != nil {
|
||||
return fmt.Errorf("reload dnsmasq: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Role) syncFromRegistry() {
|
||||
for _, n := range r.state.Registry.ByRole(types.HostRole) {
|
||||
state, err := client.Get[types.HostState](n.Endpoint, types.PathHostDns)
|
||||
state, err := r.state.Clients.Host.Dns(n.Endpoint)
|
||||
if err != nil {
|
||||
log.Warn().Str("name", n.Hostname).Err(err).Msg("unable to get host config")
|
||||
continue
|
||||
}
|
||||
|
||||
filename, data := parseState(*state)
|
||||
filename, data := parseState(state)
|
||||
if err := r.updateDnsmasq(filename, data); err != nil {
|
||||
log.Warn().Str("name", n.Hostname).Err(err).Msg("unable to update dnsmasq")
|
||||
continue
|
||||
|
|
@ -90,11 +100,6 @@ func (r *Role) syncFromRegistry() {
|
|||
}
|
||||
}
|
||||
|
||||
func (r *Role) OnShutdown() error {
|
||||
r.group.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Role) reload() error {
|
||||
var err error
|
||||
|
||||
|
|
@ -107,16 +112,12 @@ func (r *Role) reload() error {
|
|||
return err
|
||||
}
|
||||
|
||||
func (r *Role) onCallback(state types.HostState) (bool, error) {
|
||||
filename, data := parseState(state)
|
||||
func parseState(state types.HostState) (string, []byte) {
|
||||
var builder strings.Builder
|
||||
|
||||
if err := r.updateDnsmasq(filename, data); err != nil {
|
||||
return false, err
|
||||
for _, d := range state.Domains {
|
||||
builder.WriteString(fmt.Sprintf("%s %s\n", state.Address, d))
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (r *Role) RegisterHandlers(rg types.Registrator) {
|
||||
rg.Register(types.PostEndpoint(types.PathDnsCallback, r.onCallback))
|
||||
return hostsDir + state.Hostname, []byte(builder.String())
|
||||
}
|
||||
|
|
|
|||
96
internal/roles/host/gateway.go
Normal file
96
internal/roles/host/gateway.go
Normal file
|
|
@ -0,0 +1,96 @@
|
|||
package host
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
|
||||
"git.wzray.com/homelab/hivemind/internal/config"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
type TraefikListener interface {
|
||||
OnTraefikUpdate(traefikResponse)
|
||||
}
|
||||
|
||||
type TraefikGateway struct {
|
||||
client *http.Client
|
||||
server *http.Server
|
||||
listener TraefikListener
|
||||
address url.URL
|
||||
domain string
|
||||
}
|
||||
|
||||
func NewTraefikGateway(cfg config.HostConfig, listener TraefikListener) *TraefikGateway {
|
||||
mux := http.NewServeMux()
|
||||
gw := &TraefikGateway{
|
||||
client: &http.Client{},
|
||||
|
||||
server: &http.Server{
|
||||
Addr: cfg.ListenAddress,
|
||||
Handler: mux,
|
||||
},
|
||||
listener: listener,
|
||||
address: url.URL{
|
||||
Scheme: "http",
|
||||
Host: cfg.LocalAddress,
|
||||
},
|
||||
domain: cfg.Domain,
|
||||
}
|
||||
|
||||
mux.HandleFunc("/callback", gw.onCallback)
|
||||
return gw
|
||||
}
|
||||
|
||||
func (g *TraefikGateway) Listen() error {
|
||||
return g.server.ListenAndServe()
|
||||
}
|
||||
|
||||
func (g *TraefikGateway) Shutdown(ctx context.Context) error {
|
||||
return g.server.Shutdown(ctx)
|
||||
}
|
||||
|
||||
func (g *TraefikGateway) GetRawData() (*traefikResponse, error) {
|
||||
var raw TraefikRawResponse
|
||||
|
||||
url := g.address
|
||||
url.Path = "/api/rawdata"
|
||||
|
||||
req := http.Request{
|
||||
Method: http.MethodGet,
|
||||
URL: &url,
|
||||
}
|
||||
|
||||
req.Host = g.domain
|
||||
|
||||
r, err := g.client.Do(&req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("make request: %w", err)
|
||||
}
|
||||
defer r.Body.Close()
|
||||
|
||||
if err := json.NewDecoder(r.Body).Decode(&raw); err != nil {
|
||||
return nil, fmt.Errorf("unmarshal body: %w", err)
|
||||
}
|
||||
|
||||
out := parseTraefikResponse(raw)
|
||||
return &out, nil
|
||||
}
|
||||
|
||||
func (g *TraefikGateway) onCallback(w http.ResponseWriter, req *http.Request) {
|
||||
var raw TraefikRawResponse
|
||||
if err := json.NewDecoder(req.Body).Decode(&raw); err != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
log.Err(err).Msg("unable to decode traefik callback data")
|
||||
return
|
||||
}
|
||||
|
||||
resp := parseTraefikResponse(raw)
|
||||
if g.listener != nil {
|
||||
g.listener.OnTraefikUpdate(resp)
|
||||
}
|
||||
|
||||
w.Write([]byte("OK"))
|
||||
}
|
||||
|
|
@ -2,36 +2,37 @@ package host
|
|||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"slices"
|
||||
"sync"
|
||||
|
||||
"git.wzray.com/homelab/hivemind/internal/app"
|
||||
"git.wzray.com/homelab/hivemind/internal/config"
|
||||
"git.wzray.com/homelab/hivemind/internal/state"
|
||||
"git.wzray.com/homelab/hivemind/internal/transport"
|
||||
"git.wzray.com/homelab/hivemind/internal/transport/host"
|
||||
"git.wzray.com/homelab/hivemind/internal/types"
|
||||
"git.wzray.com/homelab/hivemind/internal/web/client"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
type Role struct {
|
||||
state *state.RuntimeState
|
||||
state *app.State
|
||||
config config.HostConfig
|
||||
|
||||
client *traefikClient
|
||||
gateway *TraefikGateway
|
||||
tasksGroup sync.WaitGroup
|
||||
|
||||
externalDomains []string // TODO: i don't like hardcoding external/internal logic here
|
||||
internalDomains []string
|
||||
}
|
||||
|
||||
func New(state *state.RuntimeState, config config.HostConfig) *Role {
|
||||
return &Role{
|
||||
client: newClient(config.Domain, config.LocalAddress),
|
||||
func New(state *app.State, config config.HostConfig) *Role {
|
||||
r := &Role{
|
||||
state: state,
|
||||
config: config,
|
||||
}
|
||||
|
||||
r.gateway = NewTraefikGateway(config, r)
|
||||
return r
|
||||
}
|
||||
|
||||
func (r *Role) sendUpdate(domains []string, role types.Role) {
|
||||
|
|
@ -45,7 +46,7 @@ func (r *Role) sendUpdate(domains []string, role types.Role) {
|
|||
r.tasksGroup.Go(func() {
|
||||
logger := log.With().Str("name", node.Hostname).Logger()
|
||||
logger.Debug().Msg("sending update")
|
||||
if _, err := client.Post[any](node.Endpoint, types.PathDnsCallback, state); err != nil {
|
||||
if _, err := r.state.Clients.DNS.Callback(node.Endpoint, state); err != nil {
|
||||
logger.Warn().Err(err).Msg("unable to send dns info")
|
||||
} else {
|
||||
logger.Debug().Msg("update sent")
|
||||
|
|
@ -54,7 +55,7 @@ func (r *Role) sendUpdate(domains []string, role types.Role) {
|
|||
}
|
||||
}
|
||||
|
||||
func (r *Role) mutateState(resp traefikResponse) {
|
||||
func (r *Role) OnTraefikUpdate(resp traefikResponse) {
|
||||
newInternal := resp.Domains(r.config.InternalEntrypoint)
|
||||
newExternal := resp.Domains(r.config.ExternalEntrypoint)
|
||||
|
||||
|
|
@ -71,20 +72,7 @@ func (r *Role) mutateState(resp traefikResponse) {
|
|||
}
|
||||
}
|
||||
|
||||
func (r *Role) onCallback(w http.ResponseWriter, req *http.Request) {
|
||||
var resp traefikResponse
|
||||
if err := json.NewDecoder(req.Body).Decode(&resp); err != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
log.Err(err).Msg("unable to decode traefik callback data")
|
||||
return
|
||||
}
|
||||
|
||||
r.mutateState(resp)
|
||||
|
||||
w.Write([]byte("OK"))
|
||||
}
|
||||
|
||||
func (r *Role) getInternal() (types.HostState, error) {
|
||||
func (r *Role) Dns() (types.HostState, error) {
|
||||
return types.HostState{
|
||||
Domains: r.internalDomains,
|
||||
Address: r.config.IpAddress,
|
||||
|
|
@ -92,7 +80,7 @@ func (r *Role) getInternal() (types.HostState, error) {
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (r *Role) getExternal() (types.HostState, error) {
|
||||
func (r *Role) Nameserver() (types.HostState, error) {
|
||||
return types.HostState{
|
||||
Domains: r.externalDomains,
|
||||
Address: r.config.IpAddress,
|
||||
|
|
@ -101,14 +89,25 @@ func (r *Role) getExternal() (types.HostState, error) {
|
|||
|
||||
}
|
||||
|
||||
func (r *Role) RegisterHandlers(rg types.Registrator) {
|
||||
rg.RegisterRaw(http.MethodPost, types.PathHostCallback.String(), r.onCallback)
|
||||
rg.Register(types.GetEndpoint(types.PathHostDns, r.getInternal))
|
||||
rg.Register(types.GetEndpoint(types.PathHostNs, r.getExternal))
|
||||
func (r *Role) RegisterHandlers(rg transport.Registrator) {
|
||||
host.Register(rg, r)
|
||||
}
|
||||
|
||||
func (r *Role) OnStartup(ctx context.Context) error {
|
||||
resp, err := r.client.GetRawData()
|
||||
r.tasksGroup.Go(func() {
|
||||
if err := r.gateway.Listen(); err != nil {
|
||||
log.Err(err).Msg("traefik gateway stopped")
|
||||
}
|
||||
})
|
||||
|
||||
r.tasksGroup.Go(func() {
|
||||
<-ctx.Done()
|
||||
if err := r.gateway.Shutdown(context.Background()); err != nil {
|
||||
log.Err(err).Msg("failed to shutdown traefik gateway")
|
||||
}
|
||||
})
|
||||
|
||||
resp, err := r.gateway.GetRawData()
|
||||
if err != nil {
|
||||
return fmt.Errorf("get traefik state: %w", err)
|
||||
}
|
||||
|
|
@ -116,7 +115,7 @@ func (r *Role) OnStartup(ctx context.Context) error {
|
|||
log.Info().Msg("got raw data from traefik")
|
||||
log.Debug().Interface("response", resp).Send()
|
||||
|
||||
r.mutateState(*resp)
|
||||
r.OnTraefikUpdate(*resp)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -1,58 +1 @@
|
|||
package host
|
||||
|
||||
import (
|
||||
"crypto/tls"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
)
|
||||
|
||||
type traefikClient struct {
|
||||
client *http.Client
|
||||
domain string
|
||||
address url.URL
|
||||
}
|
||||
|
||||
func newClient(domain string, addr string) *traefikClient {
|
||||
return &traefikClient{
|
||||
domain: domain,
|
||||
address: url.URL{
|
||||
Scheme: "https",
|
||||
Host: addr,
|
||||
},
|
||||
client: &http.Client{
|
||||
Transport: &http.Transport{
|
||||
TLSClientConfig: &tls.Config{
|
||||
ServerName: domain,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (c *traefikClient) GetRawData() (*traefikResponse, error) {
|
||||
var out traefikResponse
|
||||
|
||||
url := c.address
|
||||
url.Path = "/api/rawdata"
|
||||
|
||||
req := http.Request{
|
||||
Method: "GET",
|
||||
URL: &url,
|
||||
}
|
||||
|
||||
req.Host = c.domain
|
||||
|
||||
r, err := c.client.Do(&req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("make request: %w", err)
|
||||
}
|
||||
defer r.Body.Close()
|
||||
|
||||
if err := json.NewDecoder(r.Body).Decode(&out); err != nil {
|
||||
return nil, fmt.Errorf("unmarshal body: %w", err)
|
||||
}
|
||||
|
||||
return &out, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,64 +1,66 @@
|
|||
package host
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"regexp"
|
||||
"slices"
|
||||
)
|
||||
|
||||
var hostRegex = regexp.MustCompile("Host\\(`([^()`]+\\.[^()`]+)`\\)")
|
||||
|
||||
type rule struct {
|
||||
type traefikRule struct {
|
||||
Raw string
|
||||
Domains []string
|
||||
Valid bool
|
||||
}
|
||||
|
||||
func (r *rule) UnmarshalJSON(data []byte) error {
|
||||
r.Valid = false
|
||||
type traefikRouter struct {
|
||||
Rule traefikRule
|
||||
Entrypoints []string
|
||||
}
|
||||
|
||||
raw := ""
|
||||
if err := json.Unmarshal(data, &raw); err != nil {
|
||||
return err
|
||||
type traefikResponse struct {
|
||||
Routers []traefikRouter
|
||||
}
|
||||
|
||||
type TraefikRawResponse struct {
|
||||
Routers map[string]TraefikRawRouter `json:"routers"`
|
||||
}
|
||||
|
||||
type TraefikRawRouter struct {
|
||||
Rule string `json:"rule"`
|
||||
Entrypoints []string `json:"entryPoints"`
|
||||
}
|
||||
|
||||
func parseTraefikResponse(raw TraefikRawResponse) traefikResponse {
|
||||
out := traefikResponse{
|
||||
Routers: make([]traefikRouter, 0, len(raw.Routers)),
|
||||
}
|
||||
|
||||
for _, router := range raw.Routers {
|
||||
out.Routers = append(out.Routers, traefikRouter{
|
||||
Rule: parseTraefikRule(router.Rule),
|
||||
Entrypoints: router.Entrypoints,
|
||||
})
|
||||
}
|
||||
|
||||
return out
|
||||
}
|
||||
|
||||
func parseTraefikRule(raw string) traefikRule {
|
||||
rule := traefikRule{
|
||||
Raw: raw,
|
||||
}
|
||||
|
||||
matches := hostRegex.FindAllStringSubmatch(raw, -1)
|
||||
|
||||
for _, match := range matches {
|
||||
if len(match) <= 1 {
|
||||
continue
|
||||
}
|
||||
r.Domains = append(r.Domains, match[1:]...)
|
||||
rule.Domains = append(rule.Domains, match[1:]...)
|
||||
}
|
||||
|
||||
r.Valid = len(r.Domains) > 0
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type router struct {
|
||||
Rule rule `json:"rule"`
|
||||
Entrypoints []string `json:"entryPoints"`
|
||||
}
|
||||
|
||||
type traefikResponse struct {
|
||||
Routers []router
|
||||
}
|
||||
|
||||
func (r *traefikResponse) UnmarshalJSON(data []byte) error {
|
||||
var raw struct {
|
||||
Routers map[string]router `json:"routers"`
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(data, &raw); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for _, v := range raw.Routers {
|
||||
r.Routers = append(r.Routers, v)
|
||||
}
|
||||
|
||||
return nil
|
||||
rule.Valid = len(rule.Domains) > 0
|
||||
return rule
|
||||
}
|
||||
|
||||
func (r traefikResponse) Domains(entrypoint string) []string {
|
||||
|
|
|
|||
|
|
@ -4,23 +4,24 @@ import (
|
|||
"context"
|
||||
"sync"
|
||||
|
||||
"git.wzray.com/homelab/hivemind/internal/app"
|
||||
"git.wzray.com/homelab/hivemind/internal/config"
|
||||
"git.wzray.com/homelab/hivemind/internal/roles"
|
||||
"git.wzray.com/homelab/hivemind/internal/state"
|
||||
"git.wzray.com/homelab/hivemind/internal/transport"
|
||||
"git.wzray.com/homelab/hivemind/internal/transport/master"
|
||||
"git.wzray.com/homelab/hivemind/internal/types"
|
||||
"git.wzray.com/homelab/hivemind/internal/web/client"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
type Role struct {
|
||||
state *state.RuntimeState
|
||||
state *app.State
|
||||
config config.MasterConfig
|
||||
tasksGroup sync.WaitGroup
|
||||
observer *observer
|
||||
roles.BaseRole
|
||||
}
|
||||
|
||||
func New(state *state.RuntimeState, config config.MasterConfig) *Role {
|
||||
func New(state *app.State, config config.MasterConfig) *Role {
|
||||
return &Role{
|
||||
state: state,
|
||||
config: config,
|
||||
|
|
@ -50,13 +51,34 @@ func (r *Role) OnShutdown() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (r *Role) notify(path types.Path, v any) {
|
||||
for _, n := range r.state.Registry.ByRole(types.MasterRole) {
|
||||
addr := n.Endpoint
|
||||
r.tasksGroup.Go(func() {
|
||||
client.Post[any](addr, path, v)
|
||||
})
|
||||
}
|
||||
func (c *Role) RegisterHandlers(r transport.Registrator) {
|
||||
master.Register(r, c)
|
||||
}
|
||||
|
||||
func (r *Role) Heartbeat(node types.Node) (types.Nodes, error) {
|
||||
return r.onKeepAlive(node, true)
|
||||
}
|
||||
|
||||
func (r *Role) Join(node types.Node) (types.Nodes, error) {
|
||||
return r.onJoin(node, true)
|
||||
}
|
||||
|
||||
func (r *Role) Leave(node types.Node) error {
|
||||
_, err := r.onLeave(node, true)
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *Role) EventHeartbeat(node types.Node) (types.Nodes, error) {
|
||||
return r.onKeepAlive(node, false)
|
||||
}
|
||||
|
||||
func (r *Role) EventJoin(node types.Node) (types.Nodes, error) {
|
||||
return r.onJoin(node, false)
|
||||
}
|
||||
|
||||
func (r *Role) EventLeave(node types.Node) error {
|
||||
_, err := r.onLeave(node, false)
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *Role) onJoin(node types.Node, notify bool) (map[string]types.Node, error) {
|
||||
|
|
@ -65,7 +87,7 @@ func (r *Role) onJoin(node types.Node, notify bool) (map[string]types.Node, erro
|
|||
}
|
||||
|
||||
if notify {
|
||||
r.notify(types.PathMasterEventJoin, node)
|
||||
propagate(r, noReturn(r.state.Clients.Master.EventJoin), node)
|
||||
}
|
||||
|
||||
return r.state.Registry.AllNodes(), nil
|
||||
|
|
@ -77,7 +99,7 @@ func (r *Role) onLeave(node types.Node, notify bool) (bool, error) {
|
|||
}
|
||||
|
||||
if notify {
|
||||
r.notify(types.PathMasterEventLeave, node)
|
||||
propagate(r, r.state.Clients.Master.EventLeave, node)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
|
|
@ -94,7 +116,7 @@ func (r *Role) onKeepAlive(node types.Node, notify bool) (map[string]types.Node,
|
|||
}
|
||||
|
||||
if notify {
|
||||
r.notify(types.PathMasterEventKeepalive, node)
|
||||
propagate(r, noReturn(r.state.Clients.Master.EventHeartbeat), node)
|
||||
}
|
||||
|
||||
return r.state.Registry.AllNodes(), nil
|
||||
|
|
@ -106,11 +128,18 @@ func eventFunc[R any](fn func(types.Node, bool) (R, error), notify bool) func(ty
|
|||
}
|
||||
}
|
||||
|
||||
func (c *Role) RegisterHandlers(r types.Registrator) {
|
||||
r.Register(types.PostEndpoint(types.PathMasterKeepalive, eventFunc(c.onKeepAlive, true)))
|
||||
r.Register(types.PostEndpoint(types.PathMasterEventKeepalive, eventFunc(c.onKeepAlive, false)))
|
||||
r.Register(types.PostEndpoint(types.PathMasterJoin, eventFunc(c.onJoin, true)))
|
||||
r.Register(types.PostEndpoint(types.PathMasterLeave, eventFunc(c.onLeave, true)))
|
||||
r.Register(types.PostEndpoint(types.PathMasterEventJoin, eventFunc(c.onJoin, false)))
|
||||
r.Register(types.PostEndpoint(types.PathMasterEventLeave, eventFunc(c.onLeave, false)))
|
||||
func noReturn[T, V any](fn func(string, T) (V, error)) func(string, T) error {
|
||||
return func(s string, t T) error {
|
||||
_, err := fn(s, t)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
func propagate[T any](r *Role, handler func(string, T) error, v T) {
|
||||
for _, n := range r.state.Registry.ByRole(types.MasterRole) {
|
||||
addr := n.Endpoint
|
||||
r.tasksGroup.Go(func() {
|
||||
handler(addr, v)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,15 +5,14 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"git.wzray.com/homelab/hivemind/internal/app"
|
||||
"git.wzray.com/homelab/hivemind/internal/registry"
|
||||
"git.wzray.com/homelab/hivemind/internal/state"
|
||||
"git.wzray.com/homelab/hivemind/internal/types"
|
||||
"git.wzray.com/homelab/hivemind/internal/web/client"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
type observer struct {
|
||||
state *state.RuntimeState
|
||||
state *app.State
|
||||
interval int
|
||||
backoff int
|
||||
backoffCount int
|
||||
|
|
@ -23,7 +22,7 @@ type observer struct {
|
|||
}
|
||||
|
||||
func newObserver(
|
||||
state *state.RuntimeState,
|
||||
state *app.State,
|
||||
interval int,
|
||||
backoff int,
|
||||
backoffCount int,
|
||||
|
|
@ -63,7 +62,7 @@ func (o *observer) pollNodes(ctx context.Context, onLeave func(types.Node) error
|
|||
delay := time.Duration(o.backoff)
|
||||
alive := false
|
||||
for i := o.backoffCount - 1; i >= 0; i-- {
|
||||
_, err := client.Get[any](n.Endpoint, types.PathNodeHealthcheck)
|
||||
_, err := o.state.Clients.Node.Healthcheck(n.Endpoint)
|
||||
|
||||
if err == nil {
|
||||
logger.Debug().Msg("node is alive")
|
||||
|
|
|
|||
|
|
@ -7,26 +7,41 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"git.wzray.com/homelab/hivemind/internal/app"
|
||||
"git.wzray.com/homelab/hivemind/internal/config"
|
||||
"git.wzray.com/homelab/hivemind/internal/state"
|
||||
"git.wzray.com/homelab/hivemind/internal/transport"
|
||||
"git.wzray.com/homelab/hivemind/internal/transport/node"
|
||||
"git.wzray.com/homelab/hivemind/internal/types"
|
||||
"git.wzray.com/homelab/hivemind/internal/web/client"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
type Role struct {
|
||||
state *state.RuntimeState
|
||||
state *app.State
|
||||
keepaliveGroup sync.WaitGroup
|
||||
config config.NodeConfig
|
||||
}
|
||||
|
||||
func New(state *state.RuntimeState, config config.NodeConfig) *Role {
|
||||
func New(state *app.State, config config.NodeConfig) *Role {
|
||||
return &Role{
|
||||
state: state,
|
||||
config: config,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Role) OnStartup(ctx context.Context) error {
|
||||
r.keepaliveGroup.Go(r.keepaliveFunc(ctx))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Role) OnShutdown() error {
|
||||
r.keepaliveGroup.Wait()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *Role) RegisterHandlers(r transport.Registrator) {
|
||||
node.Register(r, n)
|
||||
}
|
||||
|
||||
func (r *Role) Join(bootstrap string) error {
|
||||
masters := make(map[string]struct{})
|
||||
for _, node := range r.state.Registry.ByRole(types.MasterRole) {
|
||||
|
|
@ -46,14 +61,14 @@ func (r *Role) Join(bootstrap string) error {
|
|||
logger := log.With().Str("host", m).Logger()
|
||||
logger.Debug().Msg("trying to join via master")
|
||||
|
||||
nodes, err := client.Post[map[string]types.Node](m, types.PathMasterJoin, r.state.Self)
|
||||
nodes, err := r.state.Clients.Master.Join(m, r.state.Self)
|
||||
if err != nil {
|
||||
errs = append(errs, err)
|
||||
logger.Debug().Err(err).Msg("unable to join")
|
||||
continue
|
||||
}
|
||||
|
||||
if err := r.state.Registry.Set(*nodes); err != nil {
|
||||
if err := r.state.Registry.Set(nodes); err != nil {
|
||||
logger.Debug().Err(err).Msg("unable to set master's nodes")
|
||||
errs = append(errs, err)
|
||||
continue
|
||||
|
|
@ -76,8 +91,7 @@ func (r *Role) Leave() error {
|
|||
logger := log.With().Str("name", m.Hostname).Logger()
|
||||
logger.Debug().Msg("sending leave message")
|
||||
|
||||
_, err := client.Post[any](m.Endpoint, types.PathMasterLeave, r.state.Self)
|
||||
if err != nil {
|
||||
if err := r.state.Clients.Master.Leave(m.Endpoint, r.state.Self); err != nil {
|
||||
logger.Debug().Err(err).Msg("unable to send leave message")
|
||||
errs = append(errs, err)
|
||||
continue
|
||||
|
|
@ -90,14 +104,8 @@ func (r *Role) Leave() error {
|
|||
return fmt.Errorf("unable to send leave message to any master: %w", errors.Join(errs...))
|
||||
}
|
||||
|
||||
func (r *Role) OnStartup(ctx context.Context) error {
|
||||
r.keepaliveGroup.Go(r.keepaliveFunc(ctx))
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Role) OnShutdown() error {
|
||||
r.keepaliveGroup.Wait()
|
||||
return nil
|
||||
func (r *Role) Healthcheck() (string, error) {
|
||||
return "OK", nil
|
||||
}
|
||||
|
||||
func (r *Role) keepaliveFunc(ctx context.Context) func() {
|
||||
|
|
@ -106,7 +114,7 @@ func (r *Role) keepaliveFunc(ctx context.Context) func() {
|
|||
logger := log.With().Str("name", m.Hostname).Logger()
|
||||
logger.Debug().Msg("sending keepalive packet")
|
||||
|
||||
nodes, err := client.Post[map[string]types.Node](m.Endpoint, types.PathMasterKeepalive, r.state.Self)
|
||||
nodes, err := r.state.Clients.Master.Heartbeat(m.Endpoint, r.state.Self)
|
||||
if err != nil {
|
||||
logger.Info().Err(err).Msg("unable to send keepalive packet")
|
||||
continue
|
||||
|
|
@ -114,7 +122,7 @@ func (r *Role) keepaliveFunc(ctx context.Context) func() {
|
|||
|
||||
logger.Debug().Msg("keepalive packet sent")
|
||||
|
||||
if err := r.state.Registry.Set(*nodes); err != nil {
|
||||
if err := r.state.Registry.Set(nodes); err != nil {
|
||||
logger.Warn().Err(err).Msg("unable to set masters nodes")
|
||||
continue
|
||||
}
|
||||
|
|
@ -134,11 +142,3 @@ func (r *Role) keepaliveFunc(ctx context.Context) func() {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func healthcheck() (string, error) {
|
||||
return "OK", nil
|
||||
}
|
||||
|
||||
func (n *Role) RegisterHandlers(r types.Registrator) {
|
||||
r.Register(types.GetEndpoint(types.PathNodeHealthcheck, healthcheck))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,18 +3,18 @@ package roles
|
|||
import (
|
||||
"context"
|
||||
|
||||
"git.wzray.com/homelab/hivemind/internal/types"
|
||||
"git.wzray.com/homelab/hivemind/internal/transport"
|
||||
)
|
||||
|
||||
type Role interface {
|
||||
RegisterHandlers(types.Registrator)
|
||||
RegisterHandlers(transport.Registrator)
|
||||
OnStartup(context.Context) error
|
||||
OnShutdown() error
|
||||
}
|
||||
|
||||
type BaseRole struct{}
|
||||
|
||||
func (r *BaseRole) RegisterHandlers(types.Registrator) {}
|
||||
func (r *BaseRole) RegisterHandlers(transport.Registrator) {}
|
||||
|
||||
func (r *BaseRole) OnStartup(context.Context) error { return nil }
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue