1
0
Fork 0

feat: initial release

This commit is contained in:
Arthur K. 2026-01-17 18:14:50 +03:00
parent a3cf21f5bd
commit 761174d035
Signed by: wzray
GPG key ID: B97F30FDC4636357
41 changed files with 2008 additions and 217 deletions

95
internal/config/config.go Normal file
View file

@ -0,0 +1,95 @@
package config
import (
"errors"
"fmt"
"os"
"git.wzray.com/homelab/mastermind/internal/types"
"github.com/BurntSushi/toml"
)
type Configs struct {
Master MasterConfig
Dns DnsConfig
Host HostConfig
}
type Config struct {
Node NodeConfig
Configs Configs
Roles []types.Role
}
func FromFile(filename string) (Config, error) {
data, err := os.ReadFile(filename)
if err != nil {
return Config{}, fmt.Errorf("read config file: %w", err)
}
var temp struct {
Node NodeConfig `toml:"node"`
Configs struct {
Host *HostConfig `toml:"host"`
Dns *DnsConfig `toml:"dns"`
Master *MasterConfig `toml:"master"`
} `toml:"roles"`
}
if err := toml.Unmarshal(data, &temp); err != nil {
return Config{}, fmt.Errorf("parse config file: %w", err)
}
config := defaultConfig
config.Node.Merge(temp.Node)
if c := temp.Configs.Master; c != nil {
c.set = true
config.Roles = append(config.Roles, types.MasterRole)
config.Configs.Master.Merge(*c)
}
if c := temp.Configs.Dns; c != nil {
c.set = true
config.Roles = append(config.Roles, types.DnsRole)
config.Configs.Dns.Merge(*c)
}
if c := temp.Configs.Host; c != nil {
c.set = true
config.Roles = append(config.Roles, types.HostRole)
config.Configs.Host.Merge(*c)
}
return config, nil
}
func (c Config) Validate() error {
if err := c.Node.Validate(); err != nil {
return fmt.Errorf("node: %w", err)
}
if c.Configs.Host.set {
if err := c.Configs.Host.Validate(); err != nil {
return fmt.Errorf("configs.host: %w", err)
}
}
if c.Configs.Dns.set {
if err := c.Configs.Dns.Validate(); err != nil {
return fmt.Errorf("configs.dns: %w", err)
}
}
if c.Configs.Host.set {
if err := c.Configs.Master.Validate(); err != nil {
return fmt.Errorf("configs.master: %w", err)
}
}
if len(c.Roles) == 0 {
return errors.New("no roles configured")
}
return nil
}

View file

@ -0,0 +1,17 @@
package config
var defaultConfig = Config{
Node: NodeConfig{
ListenOn: "0.0.0.0",
Port: 56714,
KeepaliveInterval: 1,
LogLevel: LogLevelInfo,
},
Configs: Configs{
Master: MasterConfig{
ObserverInterval: 10,
BackoffSeconds: 2,
BackoffCount: 3,
},
},
}

20
internal/config/dns.go Normal file
View file

@ -0,0 +1,20 @@
package config
type DnsConfig struct {
UseSystemd bool `toml:"use_systemd"`
baseRoleConfig
}
func (c DnsConfig) Validate() error {
return nil
}
func (c *DnsConfig) Merge(other DnsConfig) {
if other.set {
c.set = other.set
}
if other.UseSystemd {
c.UseSystemd = other.UseSystemd
}
}

70
internal/config/host.go Normal file
View file

@ -0,0 +1,70 @@
package config
import (
"errors"
"fmt"
"net"
)
type HostConfig struct {
Domain string `toml:"domain"`
IpAddress string `toml:"ip"`
LocalAddress string `toml:"local_address"`
InternalEntrypoint string `toml:"internal_entrypoint"`
ExternalEntrypoint string `toml:"external_entrypoint"`
baseRoleConfig
}
func (c HostConfig) Validate() error {
if c.Domain == "" {
return errors.New("missing domain")
}
if c.IpAddress == "" {
return errors.New("missing ip")
}
if net.ParseIP(c.IpAddress) == nil {
return fmt.Errorf("invalid ip: %q", c.IpAddress)
}
if c.LocalAddress == "" {
return errors.New("missing local address")
}
if c.InternalEntrypoint == "" {
return errors.New("missing internal entrypoint")
}
if c.ExternalEntrypoint == "" {
return errors.New("missing external entrypoint")
}
return nil
}
func (c *HostConfig) Merge(other HostConfig) {
if other.set {
c.set = other.set
}
if other.Domain != "" {
c.Domain = other.Domain
}
if other.IpAddress != "" {
c.IpAddress = other.IpAddress
}
if other.LocalAddress != "" {
c.LocalAddress = other.LocalAddress
}
if other.InternalEntrypoint != "" {
c.InternalEntrypoint = other.InternalEntrypoint
}
if other.ExternalEntrypoint != "" {
c.ExternalEntrypoint = other.ExternalEntrypoint
}
}

47
internal/config/master.go Normal file
View file

@ -0,0 +1,47 @@
package config
import "errors"
type MasterConfig struct {
ObserverInterval int `toml:"observer_interval"`
BackoffSeconds int `toml:"backoff_seconds"`
BackoffCount int `toml:"backoff_count"`
baseRoleConfig
}
func (c MasterConfig) Validate() error {
if c.ObserverInterval < 1 {
return errors.New("invalid observer_interval")
}
if c.BackoffSeconds < 1 {
return errors.New("invalid backoff_seconds")
}
if c.BackoffCount < 1 {
return errors.New("invalid backoff_count")
}
return nil
}
func (c *MasterConfig) Merge(other MasterConfig) {
if other.set {
c.set = true
}
if other.ObserverInterval != 0 {
c.ObserverInterval = other.ObserverInterval
}
if other.BackoffSeconds != 0 {
c.BackoffSeconds = other.BackoffSeconds
}
if other.BackoffCount != 0 {
c.BackoffCount = other.BackoffCount
}
}

85
internal/config/node.go Normal file
View file

@ -0,0 +1,85 @@
package config
import (
"errors"
"fmt"
"strings"
)
type LogLevel string
const (
LogLevelDebug LogLevel = "DEBUG"
LogLevelInfo LogLevel = "INFO"
LogLevelWarn LogLevel = "WARN"
LogLevelError LogLevel = "ERROR"
)
func (l *LogLevel) UnmarshalText(data []byte) error {
raw := strings.ToUpper(string(data))
switch LogLevel(raw) {
case LogLevelDebug, LogLevelInfo, LogLevelWarn, LogLevelError:
*l = LogLevel(raw)
return nil
default:
return fmt.Errorf("invalid log level: %q", data)
}
}
type NodeConfig struct {
Hostname string `toml:"hostname"`
Endpoint string `toml:"endpoint"`
KeepaliveInterval int `toml:"keepalive_interval"`
LogLevel LogLevel `toml:"log_level"`
BootstrapMaster string `toml:"bootstrap_master"`
ListenOn string `toml:"listen_on"`
Port int `toml:"port"`
}
func (c NodeConfig) Validate() error {
if c.Hostname == "" {
return errors.New("missing hostname")
}
if c.Endpoint == "" {
return errors.New("missing endpoint")
}
if c.KeepaliveInterval < 1 && c.KeepaliveInterval != -1 {
return errors.New("invalid keepalive_interval")
}
return nil
}
func (c *NodeConfig) Merge(other NodeConfig) {
if other.Hostname != "" {
c.Hostname = other.Hostname
}
if other.Endpoint != "" {
c.Endpoint = other.Endpoint
}
if other.BootstrapMaster != "" {
c.BootstrapMaster = other.BootstrapMaster
}
if other.ListenOn != "" {
c.ListenOn = other.ListenOn
}
if other.Port != 0 {
c.Port = other.Port
}
if other.KeepaliveInterval != 0 {
c.KeepaliveInterval = other.KeepaliveInterval
}
if other.LogLevel != "" {
c.LogLevel = other.LogLevel
}
}

5
internal/config/role.go Normal file
View file

@ -0,0 +1,5 @@
package config
type baseRoleConfig struct {
set bool
}

View file

@ -0,0 +1,68 @@
package registry
import (
"encoding/json"
"fmt"
"os"
"path"
"sync"
"time"
"git.wzray.com/homelab/mastermind/internal/types"
)
type FileStorage struct {
filename string
lock sync.Mutex
}
func NewFileStorage(filename string) *FileStorage {
return &FileStorage{
filename: filename,
}
}
func (fs *FileStorage) EnsureExists() {
dirname := path.Dir(fs.filename)
if _, err := os.Stat(dirname); os.IsNotExist(err) {
os.MkdirAll(dirname, 0755)
}
if _, err := os.Stat(fs.filename); os.IsNotExist(err) {
fs.Save(&storedConfig{
LastUpdate: time.Now().UnixMilli(),
Nodes: make(map[string]types.Node),
})
}
}
func (fs *FileStorage) Save(cfg *storedConfig) error {
fs.lock.Lock()
defer fs.lock.Unlock()
buf, err := json.Marshal(cfg)
if err != nil {
return fmt.Errorf("marshal registry: %w", err)
}
if err := os.WriteFile(fs.filename, buf, 0644); err != nil {
return fmt.Errorf("write registry file: %w", err)
}
return nil
}
func (fs *FileStorage) Load(cfg *storedConfig) error {
fs.lock.Lock()
defer fs.lock.Unlock()
data, err := os.ReadFile(fs.filename)
if err != nil {
return fmt.Errorf("read registry file: %w", err)
}
if err := json.Unmarshal(data, cfg); err != nil {
return fmt.Errorf("unmarshal registry: %w", err)
}
return nil
}

View file

@ -0,0 +1,153 @@
package registry
import (
"maps"
"slices"
"sync"
"time"
"git.wzray.com/homelab/mastermind/internal/types"
"github.com/rs/zerolog/log"
)
type Registry struct {
LastUpdate time.Time
nodes map[string]types.Node
storage Storage
lock sync.RWMutex
self types.Node
observers []chan<- []types.Node
}
func New(storage Storage, self types.Node) *Registry {
r := &Registry{
storage: storage,
nodes: make(map[string]types.Node),
self: self,
}
var storedData storedConfig
if err := storage.Load(&storedData); err != nil {
log.Warn().Err(err).Msg("unable to load registry from storage")
goto ret
}
r.LastUpdate = time.UnixMilli(storedData.LastUpdate)
r.nodes = storedData.Nodes
ret:
r.nodes[self.Name] = self
return r
}
func (r *Registry) snapshot() *storedConfig {
return &storedConfig{
LastUpdate: r.LastUpdate.UnixMilli(),
Nodes: maps.Clone(r.nodes),
}
}
func (r *Registry) notify() {
nodes := r.Nodes()
for _, c := range r.observers {
c <- nodes
}
}
func (r *Registry) AllNodes() []types.Node {
r.lock.RLock()
defer r.lock.RUnlock()
nodes := make([]types.Node, 0, len(r.nodes))
for _, n := range r.nodes {
nodes = append(nodes, n)
}
return nodes
}
func (r *Registry) Nodes() []types.Node {
nodes := r.AllNodes()
nodes = slices.DeleteFunc(nodes, func(n types.Node) bool {
return n.Name == r.self.Name
})
return nodes
}
func (r *Registry) ByRole(role types.Role) []types.Node {
r.lock.RLock()
defer r.lock.RUnlock()
o := make([]types.Node, 0, len(r.nodes))
for _, node := range r.nodes {
if slices.Contains(node.Roles, role) && node.Name != r.self.Name {
o = append(o, node)
}
}
return o
}
func (r *Registry) AddNode(node types.Node) error {
r.lock.Lock()
r.nodes[node.Name] = node
r.LastUpdate = time.Now()
snapshot := r.snapshot()
r.lock.Unlock()
if err := r.storage.Save(snapshot); err != nil {
return err
}
return nil
}
func (r *Registry) RemoveNode(nodeName string) error {
r.lock.Lock()
delete(r.nodes, nodeName)
r.LastUpdate = time.Now()
snapshot := r.snapshot()
r.lock.Unlock()
if err := r.storage.Save(snapshot); err != nil {
return err
}
r.notify()
return nil
}
func (r *Registry) Set(nodes []types.Node) error {
r.lock.Lock()
r.nodes = make(map[string]types.Node)
for _, n := range nodes {
r.nodes[n.Name] = n
}
snapshot := r.snapshot()
r.lock.Unlock()
if err := r.storage.Save(snapshot); err != nil {
return err
}
r.notify()
return nil
}
func (r *Registry) Exists(name string) bool {
_, ok := r.nodes[name]
return ok
}
func (r *Registry) OnChanged() <-chan []types.Node { // TODO: rename this
c := make(chan []types.Node, 1)
r.observers = append(r.observers, c)
return c
}
func (r *Registry) Save() {
r.lock.RLock()
snapshot := r.snapshot()
r.lock.RUnlock()
r.storage.Save(snapshot)
}

View file

@ -0,0 +1,13 @@
package registry
import "git.wzray.com/homelab/mastermind/internal/types"
type Storage interface {
Save(*storedConfig) error
Load(*storedConfig) error
}
type storedConfig struct {
LastUpdate int64 `json:"last_update"`
Nodes map[string]types.Node `json:"nodes"`
}

122
internal/roles/dns/dns.go Normal file
View file

@ -0,0 +1,122 @@
package dns
import (
"context"
"fmt"
"os"
"os/exec"
"strings"
"sync"
"git.wzray.com/homelab/mastermind/internal/config"
"git.wzray.com/homelab/mastermind/internal/state"
"git.wzray.com/homelab/mastermind/internal/types"
"git.wzray.com/homelab/mastermind/internal/web/client"
"github.com/rs/zerolog/log"
)
const hostsDir = "/etc/hosts.d/"
type Role struct {
state *state.RuntimeState
config config.DnsConfig
group sync.WaitGroup
}
func New(state *state.RuntimeState, config config.DnsConfig) *Role {
r := &Role{
state: state,
config: config,
}
return r
}
func (r *Role) updateDnsmasq(filename string, data []byte) error {
if err := os.WriteFile(filename, data, 0644); err != nil {
return fmt.Errorf("write endpoint file %q: %w", filename, err)
}
if err := r.reload(); err != nil {
return fmt.Errorf("reload dnsmasq: %w", err)
}
return nil
}
func parseState(state types.HostState) (string, []byte) {
var builder strings.Builder
for _, d := range state.Domains {
builder.WriteString(fmt.Sprintf("%s %s\n", state.Name, d))
}
return hostsDir + state.Endpoint, []byte(builder.String())
}
func (r *Role) OnStartup(ctx context.Context) error {
r.group.Go(func() {
r.syncFromRegistry()
})
c := r.state.Registry.OnChanged()
r.group.Go(func() {
for {
select {
case <-ctx.Done():
return
case <-c:
r.syncFromRegistry()
}
}
})
return nil
}
func (r *Role) syncFromRegistry() {
for _, n := range r.state.Registry.ByRole(types.HostRole) {
state, err := client.Get[types.HostState](n.Address, types.PathHostDns)
if err != nil {
log.Warn().Str("name", n.Name).Err(err).Msg("unable to get host config")
continue
}
filename, data := parseState(*state)
if err := r.updateDnsmasq(filename, data); err != nil {
log.Warn().Str("name", n.Name).Err(err).Msg("unable to update dnsmasq")
continue
}
}
}
func (r *Role) OnShutdown() error {
r.group.Wait()
return nil
}
func (r *Role) reload() error {
var err error
if r.config.UseSystemd {
err = exec.Command("systemctl", "reload", "dnsmasq").Run()
} else {
err = exec.Command("/etc/init.d/dnsmasq", "reload").Run()
}
return err
}
func (r *Role) onCallback(state types.HostState) (bool, error) {
filename, data := parseState(state)
if err := r.updateDnsmasq(filename, data); err != nil {
return false, err
}
return true, nil
}
func (r *Role) RegisterHandlers(rg types.Registrator) {
rg.Register(types.PostEndpoint(types.PathDnsCallback, r.onCallback))
}

121
internal/roles/host/host.go Normal file
View file

@ -0,0 +1,121 @@
package host
import (
"context"
"encoding/json"
"fmt"
"net/http"
"slices"
"sync"
"git.wzray.com/homelab/mastermind/internal/config"
"git.wzray.com/homelab/mastermind/internal/state"
"git.wzray.com/homelab/mastermind/internal/types"
"git.wzray.com/homelab/mastermind/internal/web/client"
"github.com/rs/zerolog/log"
)
type Role struct {
state *state.RuntimeState
config config.HostConfig
client *traefikClient
tasksGroup sync.WaitGroup
externalDomains []string // TODO: i don't like hardcoding external/internal logic here
internalDomains []string
}
func New(state *state.RuntimeState, config config.HostConfig) *Role {
return &Role{
client: newClient(config.Domain, config.IpAddress),
state: state,
config: config,
}
}
func (r *Role) sendUpdate(domains []string, role types.Role) {
state := types.HostState{
Domains: domains,
Name: r.state.Self.Name,
Endpoint: r.state.Self.Address,
}
for _, node := range r.state.Registry.ByRole(role) {
r.tasksGroup.Go(func() {
logger := log.With().Str("name", node.Name).Logger()
logger.Debug().Msg("sending update")
if _, err := client.Post[any](node.Address, types.PathDnsCallback, state); err != nil {
logger.Warn().Err(err).Msg("unable to send dns info")
} else {
logger.Debug().Msg("update sent")
}
})
}
}
func (r *Role) mutateState(resp traefikResponse) {
newInternal := resp.Domains(r.config.InternalEntrypoint)
newExternal := resp.Domains(r.config.ExternalEntrypoint)
if !slices.Equal(newInternal, r.internalDomains) {
log.Info().Msg("internal domains updated, propogating")
r.internalDomains = newInternal
r.sendUpdate(newInternal, types.DnsRole)
}
if !slices.Equal(newExternal, r.externalDomains) {
log.Info().Msg("internal domains updated, propogating")
r.externalDomains = newExternal
r.sendUpdate(newExternal, types.NameserverRole)
}
}
func (r *Role) onCallback(w http.ResponseWriter, req *http.Request) {
var resp traefikResponse
if err := json.NewDecoder(req.Body).Decode(&resp); err != nil {
w.WriteHeader(http.StatusInternalServerError)
log.Err(err).Msg("unable to decode traefik callback data")
return
}
r.mutateState(resp)
w.Write([]byte("OK"))
}
func (r *Role) getInternal() (types.HostState, error) {
return types.HostState{
Domains: r.internalDomains,
Endpoint: r.state.Self.Address,
Name: r.state.Self.Name,
}, nil
}
func (r *Role) getExternal() (types.HostState, error) {
return types.HostState{}, nil
}
func (r *Role) RegisterHandlers(rg types.Registrator) {
rg.RegisterRaw(http.MethodPost, types.PathHostCallback.String(), r.onCallback)
rg.Register(types.GetEndpoint(types.PathHostDns, r.getInternal))
rg.Register(types.GetEndpoint(types.PathHostNs, r.getExternal))
}
func (r *Role) OnStartup(ctx context.Context) error {
resp, err := r.client.GetRawData()
if err != nil {
return fmt.Errorf("get traefik state: %w", err)
}
log.Info().Msg("got raw data from traefik")
log.Debug().Interface("response", resp).Send()
r.mutateState(*resp)
return nil
}
func (r *Role) OnShutdown() error {
r.tasksGroup.Wait()
return nil
}

View file

@ -0,0 +1,58 @@
package host
import (
"crypto/tls"
"encoding/json"
"fmt"
"net/http"
"net/url"
)
type traefikClient struct {
client *http.Client
domain string
address url.URL
}
func newClient(domain string, addr string) *traefikClient {
return &traefikClient{
domain: domain,
address: url.URL{
Scheme: "https",
Host: addr,
},
client: &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
ServerName: domain,
},
},
},
}
}
func (c *traefikClient) GetRawData() (*traefikResponse, error) {
var out traefikResponse
url := c.address
url.Path = "/api/rawdata"
req := http.Request{
Method: "GET",
URL: &url,
}
req.Host = c.domain
r, err := c.client.Do(&req)
if err != nil {
return nil, fmt.Errorf("make request: %w", err)
}
defer r.Body.Close()
if err := json.NewDecoder(r.Body).Decode(&out); err != nil {
return nil, fmt.Errorf("unmarshal body: %w", err)
}
return &out, nil
}

View file

@ -0,0 +1,74 @@
package host
import (
"encoding/json"
"regexp"
"slices"
)
var hostRegex = regexp.MustCompile("Host\\(`([^()`]+\\.[^()`]+)`\\)")
type rule struct {
Raw string
Domains []string
Valid bool
}
func (r *rule) UnmarshalJSON(data []byte) error {
r.Valid = false
raw := ""
if err := json.Unmarshal(data, &raw); err != nil {
return err
}
matches := hostRegex.FindAllStringSubmatch(raw, -1)
for _, match := range matches {
if len(match) <= 1 {
continue
}
r.Domains = append(r.Domains, match[1:]...)
}
r.Valid = len(r.Domains) > 0
return nil
}
type router struct {
Rule rule `json:"rule"`
Entrypoints []string `json:"entryPoints"`
}
type traefikResponse struct {
Routers []router
}
func (r *traefikResponse) UnmarshalJSON(data []byte) error {
var raw struct {
Routers map[string]router `json:"routers"`
}
if err := json.Unmarshal(data, &raw); err != nil {
return err
}
for _, v := range raw.Routers {
r.Routers = append(r.Routers, v)
}
return nil
}
func (r traefikResponse) Domains(entrypoint string) []string {
out := make([]string, 0, len(r.Routers))
for _, router := range r.Routers {
if router.Rule.Valid && slices.Contains(router.Entrypoints, entrypoint) {
out = append(out, router.Rule.Domains...)
}
}
return out
}

View file

@ -0,0 +1,93 @@
package master
import (
"context"
"sync"
"git.wzray.com/homelab/mastermind/internal/config"
"git.wzray.com/homelab/mastermind/internal/roles"
"git.wzray.com/homelab/mastermind/internal/state"
"git.wzray.com/homelab/mastermind/internal/types"
"git.wzray.com/homelab/mastermind/internal/web/client"
)
type Role struct {
state *state.RuntimeState
config config.MasterConfig
tasksGroup sync.WaitGroup
observer *observer
roles.BaseRole
}
func New(state *state.RuntimeState, config config.MasterConfig) *Role {
return &Role{
state: state,
config: config,
observer: newObserver(
state,
config.ObserverInterval,
config.BackoffSeconds,
config.BackoffCount,
),
}
}
func (r *Role) OnStartup(ctx context.Context) error {
r.tasksGroup.Go(func() {
r.observer.Start(ctx, func(n types.Node) error {
_, err := r.onLeave(n)
return err
})
})
return nil
}
func (r *Role) OnShutdown() error {
r.tasksGroup.Wait()
return nil
}
func (r *Role) notify(path types.Path, v any) {
for _, n := range r.state.Registry.Nodes() {
addr := n.Address
r.tasksGroup.Go(func() {
client.Post[any](addr, path, v)
})
}
}
func (r *Role) onJoin(node types.Node) ([]types.Node, error) {
if err := r.state.Registry.AddNode(node); err != nil {
return nil, err
}
r.notify(types.PathNodeJoin, node)
return r.state.Registry.AllNodes(), nil
}
func (r *Role) onLeave(node types.Node) (bool, error) {
if err := r.state.Registry.RemoveNode(node.Name); err != nil {
return false, err
}
r.notify(types.PathNodeLeave, node.Name)
return true, nil
}
func (r *Role) onKeepAlive(node types.Node) (bool, error) {
if ok := r.state.Registry.Exists(node.Name); !ok {
_, err := r.onJoin(node)
return true, err
}
return false, nil
}
func (c *Role) RegisterHandlers(r types.Registrator) {
r.Register(types.PostEndpoint(types.PathMasterJoin, c.onJoin))
r.Register(types.PostEndpoint(types.PathMasterLeave, c.onLeave))
r.Register(types.PostEndpoint(types.PathMasterKeepalive, c.onKeepAlive))
}

View file

@ -0,0 +1,83 @@
package master
import (
"context"
"time"
"git.wzray.com/homelab/mastermind/internal/state"
"git.wzray.com/homelab/mastermind/internal/types"
"git.wzray.com/homelab/mastermind/internal/web/client"
"github.com/rs/zerolog/log"
)
type observer struct {
state *state.RuntimeState
interval int
backoff int
backoffCount int
}
func newObserver(
state *state.RuntimeState,
interval int,
backoff int,
backoffCount int,
) *observer {
return &observer{
state: state,
interval: interval,
backoff: backoff,
backoffCount: backoffCount,
}
}
func (o *observer) pollNodes(ctx context.Context, onLeave func(types.Node) error) {
for _, n := range o.state.Registry.Nodes() {
name := n.Name
logger := log.With().Str("name", name).Logger()
logger.Debug().Msg("checking node")
delay := time.Duration(o.backoff)
alive := false
for i := o.backoffCount; i > 0; i-- {
_, err := client.Get[any](n.Address, types.PathNodeHealthcheck)
if err == nil {
logger.Debug().Msg("node is alive")
alive = true
break
}
if i == 0 {
break
}
logger.Info().Any("delay", delay).Msg("node didn't respond, sleeping")
select {
case <-ctx.Done():
goto dead
case <-time.After(delay * time.Second):
delay *= 2
}
}
dead:
if !alive {
logger.Info().Msg("node is dead, removing")
if err := onLeave(n); err != nil {
logger.Warn().Err(err).Msg("onLeave call failed")
}
}
}
}
func (o *observer) Start(ctx context.Context, onLeave func(types.Node) error) {
for {
select {
case <-ctx.Done():
return
case <-time.After(time.Duration(o.interval) * time.Second):
o.pollNodes(ctx, onLeave)
}
}
}

168
internal/roles/node/node.go Normal file
View file

@ -0,0 +1,168 @@
package node
import (
"context"
"errors"
"sync"
"time"
"git.wzray.com/homelab/mastermind/internal/config"
"git.wzray.com/homelab/mastermind/internal/state"
"git.wzray.com/homelab/mastermind/internal/types"
"git.wzray.com/homelab/mastermind/internal/web/client"
"github.com/rs/zerolog/log"
)
type Role struct {
state *state.RuntimeState
keepaliveGroup sync.WaitGroup
config config.NodeConfig
}
func New(state *state.RuntimeState, config config.NodeConfig) *Role {
return &Role{
state: state,
config: config,
}
}
func (r *Role) Join(bootstrap string) error {
masters := make(map[string]struct{})
for _, node := range r.state.Registry.ByRole(types.MasterRole) {
if node.Name == r.state.Self.Name {
continue
}
masters[node.Address] = struct{}{}
}
if bootstrap != "" {
masters[bootstrap] = struct{}{}
} else if len(masters) == 0 {
return errors.New("no masters configured")
}
for m := range masters {
logger := log.With().Str("host", m).Logger()
logger.Debug().Msg("trying to join via master")
nodes, err := client.Post[[]types.Node](m, "/master/join", r.state.Self)
if err != nil {
logger.Debug().Err(err).Msg("unable to join")
continue
}
if err := r.state.Registry.Set(*nodes); err != nil {
logger.Debug().Err(err).Msg("unable to set master's nodes")
continue
}
return nil
}
return errors.New("unable to join")
}
func (r *Role) Leave() error {
masters := r.state.Registry.ByRole(types.MasterRole)
if len(masters) == 0 {
return nil
}
sent := false
for _, m := range masters {
logger := log.With().Str("name", m.Name).Logger()
logger.Debug().Msg("sending leave message")
_, err := client.Post[any](m.Address, types.PathMasterLeave, r.state.Self)
if err != nil {
logger.Debug().Err(err).Msg("unable to send leave message")
continue
} else {
sent = true
logger.Debug().Msg("leave message sent")
break
}
}
if !sent {
return errors.New("unable to send leave message")
}
return nil
}
func (r *Role) OnStartup(ctx context.Context) error {
if r.config.KeepaliveInterval != -1 {
r.keepaliveGroup.Go(r.keepaliveFunc(ctx))
} else {
log.Debug().Msg("keepalive disabled")
}
return nil
}
func (r *Role) OnShutdown() error {
r.keepaliveGroup.Wait()
return nil
}
func (r *Role) keepaliveFunc(ctx context.Context) func() {
sendKeepalive := func() {
masters := r.state.Registry.ByRole(types.MasterRole)
if len(masters) == 0 {
return
}
sent := false
for _, m := range masters {
logger := log.With().Str("name", m.Name).Logger()
logger.Debug().Msg("sending keepalive packet")
if _, err := client.Post[any](m.Address, types.PathMasterKeepalive, r.state.Self); err != nil {
continue
} else {
logger.Debug().Msg("keepalive packet sent")
sent = true
break
}
}
if !sent {
log.Info().Msg("unable to send keepalive packet")
}
}
return func() {
for {
select {
case <-ctx.Done():
return
case <-time.After(time.Duration(r.config.KeepaliveInterval) * time.Second):
sendKeepalive()
}
}
}
}
func (r *Role) onJoin(node types.Node) (bool, error) {
if err := r.state.Registry.AddNode(node); err != nil {
return false, err
}
return true, nil
}
func (r *Role) onLeave(node types.Node) (bool, error) {
if err := r.state.Registry.RemoveNode(node.Name); err != nil {
return false, err
}
return true, nil
}
func healthcheck() (string, error) {
return "OK", nil
}
func (n *Role) RegisterHandlers(r types.Registrator) {
r.Register(types.GetEndpoint(types.PathNodeHealthcheck, healthcheck))
r.Register(types.PostEndpoint(types.PathNodeJoin, n.onJoin))
r.Register(types.PostEndpoint(types.PathNodeLeave, n.onLeave))
}

21
internal/roles/role.go Normal file
View file

@ -0,0 +1,21 @@
package roles
import (
"context"
"git.wzray.com/homelab/mastermind/internal/types"
)
type Role interface {
RegisterHandlers(types.Registrator)
OnStartup(context.Context) error
OnShutdown() error
}
type BaseRole struct{}
func (r *BaseRole) RegisterHandlers(types.Registrator) {}
func (r *BaseRole) OnStartup(context.Context) error { return nil }
func (r *BaseRole) OnShutdown() error { return nil }

18
internal/state/runtime.go Normal file
View file

@ -0,0 +1,18 @@
package state
import (
"git.wzray.com/homelab/mastermind/internal/registry"
"git.wzray.com/homelab/mastermind/internal/types"
)
type RuntimeState struct {
Registry *registry.Registry
Self types.Node
}
func New(r *registry.Registry, n types.Node) *RuntimeState {
return &RuntimeState{
Registry: r,
Self: n,
}
}

7
internal/types/host.go Normal file
View file

@ -0,0 +1,7 @@
package types
type HostState struct {
Domains []string
Endpoint string
Name string
}

16
internal/types/node.go Normal file
View file

@ -0,0 +1,16 @@
package types
// TODO: consider moving this type back to registry
type Node struct {
Address string `json:"address"`
Name string `json:"name"`
Roles []Role `json:"roles"`
}
func NewNode(address string, name string, roles []Role) Node {
return Node{
Address: address,
Name: name,
Roles: roles,
}
}

39
internal/types/roles.go Normal file
View file

@ -0,0 +1,39 @@
package types
type Role string
const (
MasterRole Role = "master"
HostRole Role = "host"
DnsRole Role = "dns"
NameserverRole Role = "ns"
)
var Roles = []Role{
MasterRole,
HostRole,
DnsRole,
NameserverRole,
}
var Names = func() []Role {
o := make([]Role, 0, len(Roles))
for _, r := range Roles {
o = append(o, r)
}
return o
}()
func (r Role) String() string {
return string(r)
}
func Parse(s string) (Role, bool) {
for _, r := range Roles {
if s == r.String() {
return r, true
}
}
return "", false
}

1
internal/types/rpc.go Normal file
View file

@ -0,0 +1 @@
package types

76
internal/types/web.go Normal file
View file

@ -0,0 +1,76 @@
package types
import (
"encoding/json"
"fmt"
"net/http"
)
type Path string
func (p Path) String() string {
return string(p)
}
const (
PathMasterJoin Path = "/master/join"
PathMasterLeave Path = "/master/leave"
PathMasterKeepalive Path = "/master/keepalive"
PathNodeHealthcheck Path = "/node/healthcheck"
PathNodeJoin Path = "/node/join"
PathNodeLeave Path = "/node/leave"
PathDnsCallback Path = "/dns/callback"
PathHostCallback Path = "/host/callback"
PathHostDns Path = "/host/dns"
PathHostNs Path = "/host/ns"
)
type Response[T any] struct {
Ok bool `json:"ok"`
Data T `json:"data,omitempty"`
Err string `json:"err,omitempty"`
}
type Route interface {
Path() string
Handle([]byte) (any, error)
}
type endpoint struct {
path string
handler func([]byte) (any, error)
}
func (e endpoint) Path() string { return e.path }
func (e endpoint) Handle(v []byte) (any, error) { return e.handler(v) }
func PostEndpoint[T any, V any](path Path, handler func(T) (V, error)) Route {
return endpoint{
path: "POST " + path.String(),
handler: func(a []byte) (any, error) {
var r T
if err := json.Unmarshal(a, &r); err != nil {
return nil, fmt.Errorf("unable to unmarshal json: %w", err)
}
return handler(r)
},
}
}
func GetEndpoint[T any](path Path, handler func() (T, error)) Route {
return endpoint{
path: "GET " + path.String(),
handler: func(a []byte) (any, error) {
return handler()
},
}
}
type Registrator interface {
Register(endpoint Route)
RegisterRaw(method string, pattern string, handler func(http.ResponseWriter, *http.Request))
}

View file

@ -0,0 +1,108 @@
package client
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"time"
"git.wzray.com/homelab/mastermind/internal/types"
"git.wzray.com/homelab/mastermind/internal/web/middleware"
)
type client struct {
http *http.Client
middleware middleware.Middleware
}
var defaultClient *client
const timeout = time.Duration(2) * time.Second
func (c *client) makeRequest(method string, host string, path types.Path, data any, out any) error {
var body io.Reader
if data != nil {
raw, err := json.Marshal(data)
if err != nil {
return fmt.Errorf("marshal body: %w", err)
}
body = bytes.NewReader(raw)
}
uri := (&url.URL{
Scheme: "http",
Host: host,
Path: path.String(),
}).String()
r, err := http.NewRequest(method, uri, body)
if err != nil {
return fmt.Errorf("build http request: %w", err)
}
if body != nil {
r.Header.Set("Content-Type", "application/json; charset=utf-8")
}
if err := c.middleware.Client(r); err != nil {
return fmt.Errorf("apply middleware: %w", err)
}
resp, err := c.http.Do(r)
if err != nil {
return fmt.Errorf("send request: %w", err)
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
b, _ := io.ReadAll(resp.Body)
return fmt.Errorf("http %d: %s", resp.StatusCode, string(b))
}
defer resp.Body.Close()
if out != nil {
if err := json.NewDecoder(resp.Body).Decode(out); err != nil {
return fmt.Errorf("decode body: %w", err)
}
}
io.Copy(io.Discard, resp.Body)
return nil
}
func Init(mw middleware.Middleware) {
if defaultClient != nil {
panic("web.client: Init called twice")
}
defaultClient = &client{
http: &http.Client{
Timeout: timeout,
},
middleware: mw,
}
}
func request[Out any, In any](method string, host string, path types.Path, data In) (*Out, error) {
out := &types.Response[Out]{}
err := defaultClient.makeRequest(method, host, path, data, out)
if err != nil {
return nil, err
}
if !out.Ok {
return nil, errors.New(out.Err)
}
return &out.Data, err
}
func Get[Out any](host string, path types.Path) (*Out, error) { // TODO: out should not be a pointer
return request[Out, any](http.MethodGet, host, path, nil)
}
func Post[Out any, In any](host string, path types.Path, data In) (*Out, error) {
return request[Out](http.MethodPost, host, path, data)
}

View file

@ -0,0 +1,53 @@
package middleware
import (
"fmt"
"net/http"
"slices"
)
type Middleware interface {
Client(r *http.Request) error
Handler(http.Handler) http.Handler
}
type BaseMiddleware struct{}
func (BaseMiddleware) Client(*http.Request) error { return nil }
func (BaseMiddleware) Handler(h http.Handler) http.Handler { return h }
type MiddlewareBuilder struct {
middlewares []Middleware
}
func (b *MiddlewareBuilder) Use(middleware Middleware) *MiddlewareBuilder {
b.middlewares = append(b.middlewares, middleware)
return b
}
type middlewareProxy struct {
middlewares []Middleware
}
func (p *middlewareProxy) Client(r *http.Request) error {
for _, m := range p.middlewares {
if err := m.Client(r); err != nil {
return fmt.Errorf("%T: %w", m, err)
}
}
return nil
}
func (p *middlewareProxy) Handler(h http.Handler) http.Handler {
for _, f := range slices.Backward(p.middlewares) {
h = f.Handler(h)
}
return h
}
func (b *MiddlewareBuilder) Prepare() Middleware {
return &middlewareProxy{
middlewares: append([]Middleware(nil), b.middlewares...),
}
}

View file

@ -0,0 +1,80 @@
package server
import (
"context"
"io"
"net/http"
"git.wzray.com/homelab/mastermind/internal/types"
"git.wzray.com/homelab/mastermind/internal/web/middleware"
"github.com/rs/zerolog/log"
)
type Server struct {
mux *http.ServeMux
httpServer http.Server
}
func NewServer(addr string, middleware middleware.Middleware) *Server {
mux := http.NewServeMux()
s := &Server{
mux: mux,
httpServer: http.Server{
Addr: addr,
Handler: middleware.Handler(mux),
},
}
return s
}
func (s *Server) Listen() error {
return s.httpServer.ListenAndServe()
}
func (s *Server) Shutdown(ctx context.Context) error {
return s.httpServer.Shutdown(ctx)
}
func (s *Server) handleFunc(route types.Route) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
log.Debug(). // TODO: make this a middleware
Str("method", r.Method).
Str("path", r.URL.Path).
Str("query", r.URL.RawQuery).
Str("remoteAddr", r.RemoteAddr).
Send()
w.Header().Set("Content-Type", "application/json; charset=utf-8")
body, err := io.ReadAll(r.Body)
if err != nil {
w.Write(fail("read request body: %v", err))
log.Err(err).Msg("unable to read request body")
return
}
raw, err := route.Handle(body)
if err != nil {
w.Write(fail("handle request: %v", err))
log.Err(err).Msg("unable to handle request")
return
}
data, err := ok(raw)
if err != nil {
w.Write(fail("marshal response: %v", err))
log.Err(err).Msg("unable to marshal response")
return
}
w.Write(data)
}
}
func (s *Server) Register(endpoint types.Route) {
s.mux.HandleFunc(endpoint.Path(), s.handleFunc(endpoint))
}
func (s *Server) RegisterRaw(method string, pattern string, handler func(http.ResponseWriter, *http.Request)) {
s.mux.HandleFunc(method+" "+pattern, handler)
}

View file

@ -0,0 +1,23 @@
package server
import (
"encoding/json"
"fmt"
"git.wzray.com/homelab/mastermind/internal/types"
)
func fail(format string, a ...any) []byte {
r, _ := json.Marshal(types.Response[string]{
Ok: false,
Err: fmt.Sprintf(format, a...),
})
return r
}
func ok[T any](data T) ([]byte, error) {
return json.Marshal(types.Response[T]{
Ok: true,
Data: data,
})
}