diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..543c121 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,6 @@ +/build/ +/.env +/config.toml +/Dockerfile +/.git +/.gitignore diff --git a/.env b/.env new file mode 100644 index 0000000..780cb59 --- /dev/null +++ b/.env @@ -0,0 +1,2 @@ +HIVEMIND_CONFIG_FILE=./config.toml +HIVEMIND_REGISTRY_FILE=./registry.json diff --git a/.gitignore b/.gitignore index b11e0f8..84c048a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1 @@ -.env -.venv/ +/build/ diff --git a/Dockerfile b/Dockerfile index 637eb4e..86b496d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,11 +1,25 @@ -FROM python:3.12-alpine - -STOPSIGNAL SIGINT -EXPOSE 80/tcp +FROM golang:alpine AS builder WORKDIR /app -COPY requirements.txt . -RUN pip install -r requirements.txt -COPY app.py . -CMD ["python3", "-u", "app.py"] +RUN apk --no-cache add make; + +COPY . . + +RUN make hivemind; + +FROM alpine + +EXPOSE 56714/tcp + +WORKDIR /app + +VOLUME /conf +VOLUME /data + +ENV HIVEMIND_CONFIG_FILE=/conf/config.toml +ENV HIVEMIND_REGISTRY_FILE=/data/registry.json + +COPY --from=builder /app/build/hivemind . + +CMD ["./hivemind"] diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..9ad8070 --- /dev/null +++ b/Makefile @@ -0,0 +1,13 @@ + +all: hivemind hivemind-lite + +hivemind: + go build -o build/hivemind ./cmd/hivemind + +hivemind-lite: + CC=musl-gcc go build \ + -ldflags="-linkmode external -extldflags '-static'" \ + -o build/hivemind-lite \ + ./cmd/hivemind-lite + +.phony: all hivemind hivemind-lite diff --git a/app.py b/app.py deleted file mode 100644 index 4d2199a..0000000 --- a/app.py +++ /dev/null @@ -1,204 +0,0 @@ -import base64 -import os -import re -import urllib3 -from concurrent.futures import Future, ThreadPoolExecutor, wait as gather_futures - -import fabric -import paramiko.auth_strategy -from flask import Flask, request - -NODE_NAME = (os.getenv('NODE_NAME') or '') + "_traefik" -TRAEFIK_INSTANCE = os.getenv('TRAEFIK_INSTANCE') or '' -TRAEFIK_HOST = os.getenv('TRAEFIK_HOST') or '' -EXTERNAL_HOST = os.getenv('EXTERNAL_HOST') or '' - -INTERNAL_DESTS = (os.getenv('INTERNAL_DESTS') or '').split(',') or [] -EXTERNAL_DESTS = (os.getenv('EXTERNAL_DESTS') or '').split(',') or [] - -PRIVATE_KEY = os.getenv('PRIVATE_KEY') or '' -CLOUDFLARE_API_KEY = os.getenv('CLOUDFLARE_API_KEY') or '' -CLOUDFLARE_ZONE_ID = os.getenv('CLOUDFLARE_ZONE_ID') or '' - -HOST = os.getenv('HOST') or '0.0.0.0' -PORT = os.getenv('PORT') or '80' - -assert NODE_NAME != '_traefik' -for x in [ - TRAEFIK_INSTANCE, - TRAEFIK_HOST, - EXTERNAL_HOST, - - INTERNAL_DESTS, - EXTERNAL_DESTS, - - PRIVATE_KEY, - CLOUDFLARE_API_KEY, - CLOUDFLARE_ZONE_ID, - - HOST, - PORT, -]: assert x - -app = Flask(__name__) -PATTERN = re.compile(r"Host\(`((?:[a-zA-Z0-9_-]+\.)*wzray\.com)`\)") -INTERNAL_ENTRYPOINT = 'https' -EXTERNAL_ENTRYPOINT = 'ehttps' - - -def Connection(host: str, user: str = 'root', port = None) -> fabric.Connection: - """this actually is the fuckiest way to get a key.""" - key = base64.b64decode(PRIVATE_KEY) - privkey = paramiko.Ed25519Key.__new__(paramiko.Ed25519Key) - privkey.public_blob = None - privkey._signing_key = privkey._parse_signing_key_data(key, None) # type: ignore - privkey._verifying_key = None # type: ignore - return fabric.Connection(host=host, user=user, port=port, connect_kwargs= - {'auth_strategy': paramiko.auth_strategy.InMemoryPrivateKey(user, privkey)}) - - -def cf_request(method: str, target: str, json = None): - r = urllib3.request(method, 'https://api.cloudflare.com/client/v4/' + target, - headers={'Authorization': f'Bearer {CLOUDFLARE_API_KEY}'}, json=json) - if r.status != 200: - raise Exception(f'Error {r.status}: {r.data.decode()}') - return r.json()['result'] - - -class Observer: - internal: set[str] = set() - external: set[str] = set() - executor: ThreadPoolExecutor = ThreadPoolExecutor(max_workers=128) - job: Future | None = None - - - @classmethod - def _init(cls): - try: - with urllib3.HTTPSConnectionPool( - 'host.docker.internal', - server_hostname=TRAEFIK_INSTANCE - ) as pool: - runtime_conf = pool.request( - 'GET', - '/api/rawdata', - headers={'Host': TRAEFIK_INSTANCE}, - assert_same_host=False - ).json() - except Exception: - return - - dynamic_conf = { - 'http': { - 'middlewares': runtime_conf.get('middlewares') or {}, - 'routers': runtime_conf.get('routers') or {}, - 'services': runtime_conf.get('services') or {}, - }, - 'tcp': { - 'middlewares': runtime_conf.get('tcpMiddlewares') or {}, - 'routers': runtime_conf.get('tcpRouters') or {}, - 'services': runtime_conf.get('tcpServices') or {}, - }, - 'udp': { - 'routers': runtime_conf.get('udpRouters') or {}, - 'services': runtime_conf.get('udpServices') or {}, - } - } - cls.update(dynamic_conf) - print("Initialized!") - - - @classmethod - def _close(cls): - cls.executor.shutdown() - - - @classmethod - def update_internal(cls): - hosts_string = '\n'.join(f'{TRAEFIK_HOST} {x}' for x in cls.internal) - def _update(ip: str): - c = Connection(ip) - with c.sftp().open(f'/etc/hosts.d/00_{NODE_NAME}.hosts', 'w') as f: - f.write(hosts_string) - c.run('/etc/init.d/dnsmasq reload') - return [cls.executor.submit(_update, x) for x in INTERNAL_DESTS] - - - @classmethod - def update_external(cls): - hosts_string = '\n'.join(f'{x} {NODE_NAME};' for x in cls.external) - def _update(ip: str): - c = Connection(ip) - with c.sftp().open(f'/etc/nginx/services.d/{NODE_NAME}_extra.conf', 'w') as f: - f.write(hosts_string) - c.run('systemctl reload nginx') - return [cls.executor.submit(_update, x) for x in EXTERNAL_DESTS] - - - @classmethod - def update_cloudflare(cls): - url = f'zones/{CLOUDFLARE_ZONE_ID}/dns_records' - to_delete = [{'id': x['id']} for x in filter( - lambda x: x['comment'] == NODE_NAME, cf_request('GET', url))] - if not cls.external and not to_delete: - return - cf_request('POST', url + '/batch', { - 'deletes': to_delete, - 'posts': [{ - 'comment': NODE_NAME, - 'content': EXTERNAL_HOST, - 'type': 'A', - 'proxied': False, - 'name': x, - - } for x in cls.external] - }) - - - @classmethod - def update(cls, data): - def _job(): - gather_futures([ - *cls.update_external(), - *cls.update_internal(), - cls.executor.submit(cls.update_cloudflare) - ]) - cls.job = None - cls.internal, cls.external = cls.parse_raw_data(data) - print(f"{cls.internal=}, {cls.external=}") - if cls.job != None: - cls.job.result() - cls.job = cls.executor.submit(_job) - - - @classmethod - def parse_raw_data(cls, data: dict): - http = data.get('http') or {} - - flt = lambda ep: set([z - for y in [re.findall(PATTERN, x['rule']) - for x in http['routers'].values() if ep in x['entryPoints']] if y for z in y]) - - internal = flt(INTERNAL_ENTRYPOINT) - external = flt(EXTERNAL_ENTRYPOINT) - - return internal, external - - -Observer._init() - - -@app.post("/callback") -def callback(): - data = request.json - if not data: - return {"error": "missing data", "ok": False}, 400 - Observer.update(data) - return {"ok": True} - - -if __name__ == "__main__": - try: - app.run(host=HOST, port=int(PORT)) - finally: - Observer._close() diff --git a/cmd/hivemind-lite/main.go b/cmd/hivemind-lite/main.go new file mode 100644 index 0000000..e41a223 --- /dev/null +++ b/cmd/hivemind-lite/main.go @@ -0,0 +1,7 @@ +package main + +import "fmt" + +func main() { + fmt.Println("hivemind lite") +} diff --git a/cmd/hivemind/main.go b/cmd/hivemind/main.go new file mode 100644 index 0000000..e6cff3c --- /dev/null +++ b/cmd/hivemind/main.go @@ -0,0 +1,169 @@ +package main + +import ( + "context" + "fmt" + "os" + "os/signal" + "path/filepath" + "strconv" + "syscall" + + "git.wzray.com/homelab/mastermind/internal/config" + "git.wzray.com/homelab/mastermind/internal/registry" + "git.wzray.com/homelab/mastermind/internal/roles" + "git.wzray.com/homelab/mastermind/internal/roles/dns" + "git.wzray.com/homelab/mastermind/internal/roles/host" + "git.wzray.com/homelab/mastermind/internal/roles/master" + "git.wzray.com/homelab/mastermind/internal/roles/node" + "git.wzray.com/homelab/mastermind/internal/state" + "git.wzray.com/homelab/mastermind/internal/types" + "git.wzray.com/homelab/mastermind/internal/web/client" + "git.wzray.com/homelab/mastermind/internal/web/middleware" + "git.wzray.com/homelab/mastermind/internal/web/server" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" + "github.com/rs/zerolog/pkgerrors" +) + +var ( + configFile = "/etc/hivemind/config.toml" + registryFile = "/var/lib/hivemind/registry" +) + +func levelToZerolog(l config.LogLevel) zerolog.Level { + switch l { + case config.LogLevelDebug: + return zerolog.DebugLevel + case config.LogLevelInfo: + return zerolog.InfoLevel + case config.LogLevelWarn: + return zerolog.WarnLevel + case config.LogLevelError: + return zerolog.ErrorLevel + default: + panic("unreachable") + } +} + +func init() { + zerolog.CallerMarshalFunc = func(pc uintptr, file string, line int) string { + return filepath.Base(file) + ":" + strconv.Itoa(line) + } + log.Logger = log.With().Caller().Stack().Logger() + log.Logger = log.Output(zerolog.ConsoleWriter{Out: os.Stderr}) + zerolog.ErrorStackMarshaler = pkgerrors.MarshalStack + + if e := os.Getenv("HIVEMIND_CONFIG_FILE"); e != "" { + configFile = e + } + + if e := os.Getenv("HIVEMIND_REGISTRY_FILE"); e != "" { + registryFile = e + } +} + +func main() { + config, err := config.FromFile(configFile) + if err != nil { + log.Fatal().Err(err).Msg("unable to read config file") + } + + if err := config.Validate(); err != nil { + log.Fatal().Err(err).Msg("invalid configuration") + } + + zerolog.SetGlobalLevel(levelToZerolog(config.Node.LogLevel)) + + self := types.NewNode( + fmt.Sprintf("%v:%v", config.Node.Endpoint, config.Node.Port), + config.Node.Hostname, + config.Roles, + ) + + filestore := registry.NewFileStorage(registryFile) + filestore.EnsureExists() + registry := registry.New(filestore, self) + + state := state.New(registry, self) + + nodeRole := node.New(state, config.Node) + + var builder middleware.MiddlewareBuilder + middlewares := builder.Prepare() + + client.Init(middlewares) + + listenAddr := fmt.Sprintf("%v:%v", config.Node.ListenOn, config.Node.Port) + server := server.NewServer(listenAddr, middlewares) + + roles := make([]roles.Role, 0) + roles = append(roles, nodeRole) + + for _, role := range config.Roles { + switch role { + case types.MasterRole: + role := master.New(state, config.Configs.Master) + roles = append(roles, role) + case types.DnsRole: + role := dns.New(state, config.Configs.Dns) + roles = append(roles, role) + case types.HostRole: + role := host.New(state, config.Configs.Host) + roles = append(roles, role) + } + } + + for _, role := range roles { + role.RegisterHandlers(server) + } + + serverError := make(chan error) + go func() { + log.Info().Str("addr", listenAddr).Msg("started listening") + serverError <- server.Listen() + }() + + if err := nodeRole.Join(config.Node.BootstrapMaster); err != nil { + log.Warn().Err(err).Msg("unable to join") + } else { + log.Info().Msg("joined") + } + + ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + + for _, role := range roles { + logger := log.With().Str("role", fmt.Sprintf("%T", role)).Logger() + logger.Debug().Msg("running OnStartup handler") + if err := role.OnStartup(ctx); err != nil { + logger.Err(err).Msg("failed to initialize role") + } + } + + log.Debug().Msg("finished role startup") + + select { + case err := <-serverError: + log.Err(err).Send() + case <-ctx.Done(): + log.Info().Msg("got stop signal") + } + + cancel() + + if err := server.Shutdown(context.Background()); err != nil { + log.Err(err).Msg("error while shutting down the server") + } + + if err := nodeRole.Leave(); err != nil { + log.Info().Err(err).Msg("error while sending shutdown packet") + } + + for _, role := range roles { + if err := role.OnShutdown(); err != nil { + log.Err(err).Interface("role", role).Msg("failed to shutdown role") + } + } + + registry.Save() +} diff --git a/config.toml b/config.toml new file mode 100644 index 0000000..092bfb1 --- /dev/null +++ b/config.toml @@ -0,0 +1,19 @@ +[node] +log_level = "DEBUG" +hostname = "" +endpoint = "" +bootstrap_master = "" +keepalive_interval = 1 + +[roles.master] +observer_interval = 4 + +[roles.dns] +use_systemd = false + +[roles.host] +domain = "" +ip = "" +local_address = "" +internal_entrypoint = "" +external_entrypoint = "" diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..9923039 --- /dev/null +++ b/go.mod @@ -0,0 +1,13 @@ +module git.wzray.com/homelab/mastermind + +go 1.25.5 + +require github.com/rs/zerolog v1.34.0 + +require ( + github.com/BurntSushi/toml v1.6.0 // indirect + github.com/mattn/go-colorable v0.1.14 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/pkg/errors v0.9.1 // indirect + golang.org/x/sys v0.39.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..8a3e287 --- /dev/null +++ b/go.sum @@ -0,0 +1,21 @@ +github.com/BurntSushi/toml v1.6.0 h1:dRaEfpa2VI55EwlIW72hMRHdWouJeRF7TPYhI+AUQjk= +github.com/BurntSushi/toml v1.6.0/go.mod h1:ukJfTF/6rtPPRCnwkur4qwRxa8vTRFBF0uk2lLoLwho= +github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= +github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE= +github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8= +github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= +github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/rs/xid v1.6.0/go.mod h1:7XoLgs4eV+QndskICGsho+ADou8ySMSjJKDIan90Nz0= +github.com/rs/zerolog v1.34.0 h1:k43nTLIwcTVQAncfCw4KZ2VY6ukYoZaBPNOE8txlOeY= +github.com/rs/zerolog v1.34.0/go.mod h1:bJsvje4Z08ROH4Nhs5iH600c3IkWhwp44iRc54W6wYQ= +golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.39.0 h1:CvCKL8MeisomCi6qNZ+wbb0DN9E5AATixKsvNtMoMFk= +golang.org/x/sys v0.39.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= diff --git a/internal/config/config.go b/internal/config/config.go new file mode 100644 index 0000000..dafd481 --- /dev/null +++ b/internal/config/config.go @@ -0,0 +1,95 @@ +package config + +import ( + "errors" + "fmt" + "os" + + "git.wzray.com/homelab/mastermind/internal/types" + "github.com/BurntSushi/toml" +) + +type Configs struct { + Master MasterConfig + Dns DnsConfig + Host HostConfig +} + +type Config struct { + Node NodeConfig + Configs Configs + Roles []types.Role +} + +func FromFile(filename string) (Config, error) { + data, err := os.ReadFile(filename) + if err != nil { + return Config{}, fmt.Errorf("read config file: %w", err) + } + + var temp struct { + Node NodeConfig `toml:"node"` + Configs struct { + Host *HostConfig `toml:"host"` + Dns *DnsConfig `toml:"dns"` + Master *MasterConfig `toml:"master"` + } `toml:"roles"` + } + + if err := toml.Unmarshal(data, &temp); err != nil { + return Config{}, fmt.Errorf("parse config file: %w", err) + } + + config := defaultConfig + config.Node.Merge(temp.Node) + + if c := temp.Configs.Master; c != nil { + c.set = true + config.Roles = append(config.Roles, types.MasterRole) + config.Configs.Master.Merge(*c) + } + + if c := temp.Configs.Dns; c != nil { + c.set = true + config.Roles = append(config.Roles, types.DnsRole) + config.Configs.Dns.Merge(*c) + } + + if c := temp.Configs.Host; c != nil { + c.set = true + config.Roles = append(config.Roles, types.HostRole) + config.Configs.Host.Merge(*c) + } + + return config, nil +} + +func (c Config) Validate() error { + if err := c.Node.Validate(); err != nil { + return fmt.Errorf("node: %w", err) + } + + if c.Configs.Host.set { + if err := c.Configs.Host.Validate(); err != nil { + return fmt.Errorf("configs.host: %w", err) + } + } + + if c.Configs.Dns.set { + if err := c.Configs.Dns.Validate(); err != nil { + return fmt.Errorf("configs.dns: %w", err) + } + } + + if c.Configs.Host.set { + if err := c.Configs.Master.Validate(); err != nil { + return fmt.Errorf("configs.master: %w", err) + } + } + + if len(c.Roles) == 0 { + return errors.New("no roles configured") + } + + return nil +} diff --git a/internal/config/defaults.go b/internal/config/defaults.go new file mode 100644 index 0000000..c788047 --- /dev/null +++ b/internal/config/defaults.go @@ -0,0 +1,17 @@ +package config + +var defaultConfig = Config{ + Node: NodeConfig{ + ListenOn: "0.0.0.0", + Port: 56714, + KeepaliveInterval: 1, + LogLevel: LogLevelInfo, + }, + Configs: Configs{ + Master: MasterConfig{ + ObserverInterval: 10, + BackoffSeconds: 2, + BackoffCount: 3, + }, + }, +} diff --git a/internal/config/dns.go b/internal/config/dns.go new file mode 100644 index 0000000..88bc735 --- /dev/null +++ b/internal/config/dns.go @@ -0,0 +1,20 @@ +package config + +type DnsConfig struct { + UseSystemd bool `toml:"use_systemd"` + baseRoleConfig +} + +func (c DnsConfig) Validate() error { + return nil +} + +func (c *DnsConfig) Merge(other DnsConfig) { + if other.set { + c.set = other.set + } + + if other.UseSystemd { + c.UseSystemd = other.UseSystemd + } +} diff --git a/internal/config/host.go b/internal/config/host.go new file mode 100644 index 0000000..8e131ad --- /dev/null +++ b/internal/config/host.go @@ -0,0 +1,70 @@ +package config + +import ( + "errors" + "fmt" + "net" +) + +type HostConfig struct { + Domain string `toml:"domain"` + IpAddress string `toml:"ip"` + LocalAddress string `toml:"local_address"` + InternalEntrypoint string `toml:"internal_entrypoint"` + ExternalEntrypoint string `toml:"external_entrypoint"` + baseRoleConfig +} + +func (c HostConfig) Validate() error { + if c.Domain == "" { + return errors.New("missing domain") + } + + if c.IpAddress == "" { + return errors.New("missing ip") + } + + if net.ParseIP(c.IpAddress) == nil { + return fmt.Errorf("invalid ip: %q", c.IpAddress) + } + + if c.LocalAddress == "" { + return errors.New("missing local address") + } + + if c.InternalEntrypoint == "" { + return errors.New("missing internal entrypoint") + } + + if c.ExternalEntrypoint == "" { + return errors.New("missing external entrypoint") + } + + return nil +} + +func (c *HostConfig) Merge(other HostConfig) { + if other.set { + c.set = other.set + } + + if other.Domain != "" { + c.Domain = other.Domain + } + + if other.IpAddress != "" { + c.IpAddress = other.IpAddress + } + + if other.LocalAddress != "" { + c.LocalAddress = other.LocalAddress + } + + if other.InternalEntrypoint != "" { + c.InternalEntrypoint = other.InternalEntrypoint + } + + if other.ExternalEntrypoint != "" { + c.ExternalEntrypoint = other.ExternalEntrypoint + } +} diff --git a/internal/config/master.go b/internal/config/master.go new file mode 100644 index 0000000..61fa30d --- /dev/null +++ b/internal/config/master.go @@ -0,0 +1,47 @@ +package config + +import "errors" + +type MasterConfig struct { + ObserverInterval int `toml:"observer_interval"` + + BackoffSeconds int `toml:"backoff_seconds"` + BackoffCount int `toml:"backoff_count"` + + baseRoleConfig +} + +func (c MasterConfig) Validate() error { + if c.ObserverInterval < 1 { + return errors.New("invalid observer_interval") + } + + if c.BackoffSeconds < 1 { + return errors.New("invalid backoff_seconds") + } + + if c.BackoffCount < 1 { + return errors.New("invalid backoff_count") + } + + return nil +} + +func (c *MasterConfig) Merge(other MasterConfig) { + if other.set { + c.set = true + } + + if other.ObserverInterval != 0 { + c.ObserverInterval = other.ObserverInterval + } + + if other.BackoffSeconds != 0 { + c.BackoffSeconds = other.BackoffSeconds + } + + if other.BackoffCount != 0 { + c.BackoffCount = other.BackoffCount + } +} + diff --git a/internal/config/node.go b/internal/config/node.go new file mode 100644 index 0000000..2ffa6db --- /dev/null +++ b/internal/config/node.go @@ -0,0 +1,85 @@ +package config + +import ( + "errors" + "fmt" + "strings" +) + +type LogLevel string + +const ( + LogLevelDebug LogLevel = "DEBUG" + LogLevelInfo LogLevel = "INFO" + LogLevelWarn LogLevel = "WARN" + LogLevelError LogLevel = "ERROR" +) + +func (l *LogLevel) UnmarshalText(data []byte) error { + raw := strings.ToUpper(string(data)) + + switch LogLevel(raw) { + case LogLevelDebug, LogLevelInfo, LogLevelWarn, LogLevelError: + *l = LogLevel(raw) + return nil + default: + return fmt.Errorf("invalid log level: %q", data) + } +} + +type NodeConfig struct { + Hostname string `toml:"hostname"` + Endpoint string `toml:"endpoint"` + KeepaliveInterval int `toml:"keepalive_interval"` + LogLevel LogLevel `toml:"log_level"` + + BootstrapMaster string `toml:"bootstrap_master"` + ListenOn string `toml:"listen_on"` + Port int `toml:"port"` +} + +func (c NodeConfig) Validate() error { + if c.Hostname == "" { + return errors.New("missing hostname") + } + + if c.Endpoint == "" { + return errors.New("missing endpoint") + } + + if c.KeepaliveInterval < 1 && c.KeepaliveInterval != -1 { + return errors.New("invalid keepalive_interval") + } + + return nil +} + +func (c *NodeConfig) Merge(other NodeConfig) { + if other.Hostname != "" { + c.Hostname = other.Hostname + } + + if other.Endpoint != "" { + c.Endpoint = other.Endpoint + } + + if other.BootstrapMaster != "" { + c.BootstrapMaster = other.BootstrapMaster + } + + if other.ListenOn != "" { + c.ListenOn = other.ListenOn + } + + if other.Port != 0 { + c.Port = other.Port + } + + if other.KeepaliveInterval != 0 { + c.KeepaliveInterval = other.KeepaliveInterval + } + + if other.LogLevel != "" { + c.LogLevel = other.LogLevel + } +} diff --git a/internal/config/role.go b/internal/config/role.go new file mode 100644 index 0000000..4d7069d --- /dev/null +++ b/internal/config/role.go @@ -0,0 +1,5 @@ +package config + +type baseRoleConfig struct { + set bool +} diff --git a/internal/registry/filestorage.go b/internal/registry/filestorage.go new file mode 100644 index 0000000..3fe4247 --- /dev/null +++ b/internal/registry/filestorage.go @@ -0,0 +1,68 @@ +package registry + +import ( + "encoding/json" + "fmt" + "os" + "path" + "sync" + "time" + + "git.wzray.com/homelab/mastermind/internal/types" +) + +type FileStorage struct { + filename string + lock sync.Mutex +} + +func NewFileStorage(filename string) *FileStorage { + return &FileStorage{ + filename: filename, + } +} + +func (fs *FileStorage) EnsureExists() { + dirname := path.Dir(fs.filename) + if _, err := os.Stat(dirname); os.IsNotExist(err) { + os.MkdirAll(dirname, 0755) + } + + if _, err := os.Stat(fs.filename); os.IsNotExist(err) { + fs.Save(&storedConfig{ + LastUpdate: time.Now().UnixMilli(), + Nodes: make(map[string]types.Node), + }) + } +} + +func (fs *FileStorage) Save(cfg *storedConfig) error { + fs.lock.Lock() + defer fs.lock.Unlock() + + buf, err := json.Marshal(cfg) + if err != nil { + return fmt.Errorf("marshal registry: %w", err) + } + + if err := os.WriteFile(fs.filename, buf, 0644); err != nil { + return fmt.Errorf("write registry file: %w", err) + } + + return nil +} + +func (fs *FileStorage) Load(cfg *storedConfig) error { + fs.lock.Lock() + defer fs.lock.Unlock() + + data, err := os.ReadFile(fs.filename) + if err != nil { + return fmt.Errorf("read registry file: %w", err) + } + if err := json.Unmarshal(data, cfg); err != nil { + return fmt.Errorf("unmarshal registry: %w", err) + } + + return nil +} diff --git a/internal/registry/registry.go b/internal/registry/registry.go new file mode 100644 index 0000000..d3a5a4d --- /dev/null +++ b/internal/registry/registry.go @@ -0,0 +1,153 @@ +package registry + +import ( + "maps" + "slices" + "sync" + "time" + + "git.wzray.com/homelab/mastermind/internal/types" + "github.com/rs/zerolog/log" +) + +type Registry struct { + LastUpdate time.Time + nodes map[string]types.Node + storage Storage + lock sync.RWMutex + self types.Node + observers []chan<- []types.Node +} + +func New(storage Storage, self types.Node) *Registry { + r := &Registry{ + storage: storage, + nodes: make(map[string]types.Node), + self: self, + } + + var storedData storedConfig + if err := storage.Load(&storedData); err != nil { + log.Warn().Err(err).Msg("unable to load registry from storage") + goto ret + } + + r.LastUpdate = time.UnixMilli(storedData.LastUpdate) + r.nodes = storedData.Nodes + +ret: + r.nodes[self.Name] = self + return r +} + +func (r *Registry) snapshot() *storedConfig { + return &storedConfig{ + LastUpdate: r.LastUpdate.UnixMilli(), + Nodes: maps.Clone(r.nodes), + } +} + +func (r *Registry) notify() { + nodes := r.Nodes() + for _, c := range r.observers { + c <- nodes + } +} + +func (r *Registry) AllNodes() []types.Node { + r.lock.RLock() + defer r.lock.RUnlock() + + nodes := make([]types.Node, 0, len(r.nodes)) + for _, n := range r.nodes { + nodes = append(nodes, n) + } + return nodes +} + +func (r *Registry) Nodes() []types.Node { + nodes := r.AllNodes() + nodes = slices.DeleteFunc(nodes, func(n types.Node) bool { + return n.Name == r.self.Name + }) + return nodes +} + +func (r *Registry) ByRole(role types.Role) []types.Node { + r.lock.RLock() + defer r.lock.RUnlock() + + o := make([]types.Node, 0, len(r.nodes)) + for _, node := range r.nodes { + if slices.Contains(node.Roles, role) && node.Name != r.self.Name { + o = append(o, node) + } + } + return o +} + +func (r *Registry) AddNode(node types.Node) error { + r.lock.Lock() + r.nodes[node.Name] = node + r.LastUpdate = time.Now() + snapshot := r.snapshot() + r.lock.Unlock() + + if err := r.storage.Save(snapshot); err != nil { + return err + } + return nil +} + +func (r *Registry) RemoveNode(nodeName string) error { + r.lock.Lock() + delete(r.nodes, nodeName) + r.LastUpdate = time.Now() + snapshot := r.snapshot() + r.lock.Unlock() + + if err := r.storage.Save(snapshot); err != nil { + return err + } + + r.notify() + + return nil +} + +func (r *Registry) Set(nodes []types.Node) error { + r.lock.Lock() + r.nodes = make(map[string]types.Node) + for _, n := range nodes { + r.nodes[n.Name] = n + } + snapshot := r.snapshot() + r.lock.Unlock() + + if err := r.storage.Save(snapshot); err != nil { + return err + } + + r.notify() + + return nil +} + +func (r *Registry) Exists(name string) bool { + _, ok := r.nodes[name] + return ok +} + +func (r *Registry) OnChanged() <-chan []types.Node { // TODO: rename this + c := make(chan []types.Node, 1) + r.observers = append(r.observers, c) + return c +} + +func (r *Registry) Save() { + r.lock.RLock() + snapshot := r.snapshot() + r.lock.RUnlock() + + r.storage.Save(snapshot) +} diff --git a/internal/registry/storage.go b/internal/registry/storage.go new file mode 100644 index 0000000..6f8ceab --- /dev/null +++ b/internal/registry/storage.go @@ -0,0 +1,13 @@ +package registry + +import "git.wzray.com/homelab/mastermind/internal/types" + +type Storage interface { + Save(*storedConfig) error + Load(*storedConfig) error +} + +type storedConfig struct { + LastUpdate int64 `json:"last_update"` + Nodes map[string]types.Node `json:"nodes"` +} diff --git a/internal/roles/dns/dns.go b/internal/roles/dns/dns.go new file mode 100644 index 0000000..ffe3a66 --- /dev/null +++ b/internal/roles/dns/dns.go @@ -0,0 +1,122 @@ +package dns + +import ( + "context" + "fmt" + "os" + "os/exec" + "strings" + "sync" + + "git.wzray.com/homelab/mastermind/internal/config" + "git.wzray.com/homelab/mastermind/internal/state" + "git.wzray.com/homelab/mastermind/internal/types" + "git.wzray.com/homelab/mastermind/internal/web/client" + "github.com/rs/zerolog/log" +) + +const hostsDir = "/etc/hosts.d/" + +type Role struct { + state *state.RuntimeState + config config.DnsConfig + group sync.WaitGroup +} + +func New(state *state.RuntimeState, config config.DnsConfig) *Role { + r := &Role{ + state: state, + config: config, + } + + return r +} + +func (r *Role) updateDnsmasq(filename string, data []byte) error { + if err := os.WriteFile(filename, data, 0644); err != nil { + return fmt.Errorf("write endpoint file %q: %w", filename, err) + } + + if err := r.reload(); err != nil { + return fmt.Errorf("reload dnsmasq: %w", err) + } + + return nil +} + +func parseState(state types.HostState) (string, []byte) { + var builder strings.Builder + + for _, d := range state.Domains { + builder.WriteString(fmt.Sprintf("%s %s\n", state.Name, d)) + } + + return hostsDir + state.Endpoint, []byte(builder.String()) +} + +func (r *Role) OnStartup(ctx context.Context) error { + r.group.Go(func() { + r.syncFromRegistry() + }) + + c := r.state.Registry.OnChanged() + r.group.Go(func() { + for { + select { + case <-ctx.Done(): + return + case <-c: + r.syncFromRegistry() + } + } + }) + + return nil +} + +func (r *Role) syncFromRegistry() { + for _, n := range r.state.Registry.ByRole(types.HostRole) { + state, err := client.Get[types.HostState](n.Address, types.PathHostDns) + if err != nil { + log.Warn().Str("name", n.Name).Err(err).Msg("unable to get host config") + continue + } + + filename, data := parseState(*state) + if err := r.updateDnsmasq(filename, data); err != nil { + log.Warn().Str("name", n.Name).Err(err).Msg("unable to update dnsmasq") + continue + } + } +} + +func (r *Role) OnShutdown() error { + r.group.Wait() + return nil +} + +func (r *Role) reload() error { + var err error + + if r.config.UseSystemd { + err = exec.Command("systemctl", "reload", "dnsmasq").Run() + } else { + err = exec.Command("/etc/init.d/dnsmasq", "reload").Run() + } + + return err +} + +func (r *Role) onCallback(state types.HostState) (bool, error) { + filename, data := parseState(state) + + if err := r.updateDnsmasq(filename, data); err != nil { + return false, err + } + + return true, nil +} + +func (r *Role) RegisterHandlers(rg types.Registrator) { + rg.Register(types.PostEndpoint(types.PathDnsCallback, r.onCallback)) +} diff --git a/internal/roles/host/host.go b/internal/roles/host/host.go new file mode 100644 index 0000000..e67c2bf --- /dev/null +++ b/internal/roles/host/host.go @@ -0,0 +1,121 @@ +package host + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "slices" + "sync" + + "git.wzray.com/homelab/mastermind/internal/config" + "git.wzray.com/homelab/mastermind/internal/state" + "git.wzray.com/homelab/mastermind/internal/types" + "git.wzray.com/homelab/mastermind/internal/web/client" + "github.com/rs/zerolog/log" +) + +type Role struct { + state *state.RuntimeState + config config.HostConfig + + client *traefikClient + tasksGroup sync.WaitGroup + + externalDomains []string // TODO: i don't like hardcoding external/internal logic here + internalDomains []string +} + +func New(state *state.RuntimeState, config config.HostConfig) *Role { + return &Role{ + client: newClient(config.Domain, config.IpAddress), + state: state, + config: config, + } +} + +func (r *Role) sendUpdate(domains []string, role types.Role) { + state := types.HostState{ + Domains: domains, + Name: r.state.Self.Name, + Endpoint: r.state.Self.Address, + } + + for _, node := range r.state.Registry.ByRole(role) { + r.tasksGroup.Go(func() { + logger := log.With().Str("name", node.Name).Logger() + logger.Debug().Msg("sending update") + if _, err := client.Post[any](node.Address, types.PathDnsCallback, state); err != nil { + logger.Warn().Err(err).Msg("unable to send dns info") + } else { + logger.Debug().Msg("update sent") + } + }) + } +} + +func (r *Role) mutateState(resp traefikResponse) { + newInternal := resp.Domains(r.config.InternalEntrypoint) + newExternal := resp.Domains(r.config.ExternalEntrypoint) + + if !slices.Equal(newInternal, r.internalDomains) { + log.Info().Msg("internal domains updated, propogating") + r.internalDomains = newInternal + r.sendUpdate(newInternal, types.DnsRole) + } + + if !slices.Equal(newExternal, r.externalDomains) { + log.Info().Msg("internal domains updated, propogating") + r.externalDomains = newExternal + r.sendUpdate(newExternal, types.NameserverRole) + } +} + +func (r *Role) onCallback(w http.ResponseWriter, req *http.Request) { + var resp traefikResponse + if err := json.NewDecoder(req.Body).Decode(&resp); err != nil { + w.WriteHeader(http.StatusInternalServerError) + log.Err(err).Msg("unable to decode traefik callback data") + return + } + + r.mutateState(resp) + + w.Write([]byte("OK")) +} + +func (r *Role) getInternal() (types.HostState, error) { + return types.HostState{ + Domains: r.internalDomains, + Endpoint: r.state.Self.Address, + Name: r.state.Self.Name, + }, nil +} + +func (r *Role) getExternal() (types.HostState, error) { + return types.HostState{}, nil +} + +func (r *Role) RegisterHandlers(rg types.Registrator) { + rg.RegisterRaw(http.MethodPost, types.PathHostCallback.String(), r.onCallback) + rg.Register(types.GetEndpoint(types.PathHostDns, r.getInternal)) + rg.Register(types.GetEndpoint(types.PathHostNs, r.getExternal)) +} + +func (r *Role) OnStartup(ctx context.Context) error { + resp, err := r.client.GetRawData() + if err != nil { + return fmt.Errorf("get traefik state: %w", err) + } + + log.Info().Msg("got raw data from traefik") + log.Debug().Interface("response", resp).Send() + + r.mutateState(*resp) + return nil +} + +func (r *Role) OnShutdown() error { + r.tasksGroup.Wait() + return nil +} diff --git a/internal/roles/host/http.go b/internal/roles/host/http.go new file mode 100644 index 0000000..1eb647c --- /dev/null +++ b/internal/roles/host/http.go @@ -0,0 +1,58 @@ +package host + +import ( + "crypto/tls" + "encoding/json" + "fmt" + "net/http" + "net/url" +) + +type traefikClient struct { + client *http.Client + domain string + address url.URL +} + +func newClient(domain string, addr string) *traefikClient { + return &traefikClient{ + domain: domain, + address: url.URL{ + Scheme: "https", + Host: addr, + }, + client: &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + ServerName: domain, + }, + }, + }, + } +} + +func (c *traefikClient) GetRawData() (*traefikResponse, error) { + var out traefikResponse + + url := c.address + url.Path = "/api/rawdata" + + req := http.Request{ + Method: "GET", + URL: &url, + } + + req.Host = c.domain + + r, err := c.client.Do(&req) + if err != nil { + return nil, fmt.Errorf("make request: %w", err) + } + defer r.Body.Close() + + if err := json.NewDecoder(r.Body).Decode(&out); err != nil { + return nil, fmt.Errorf("unmarshal body: %w", err) + } + + return &out, nil +} diff --git a/internal/roles/host/types.go b/internal/roles/host/types.go new file mode 100644 index 0000000..ad1c9fd --- /dev/null +++ b/internal/roles/host/types.go @@ -0,0 +1,74 @@ +package host + +import ( + "encoding/json" + "regexp" + "slices" +) + +var hostRegex = regexp.MustCompile("Host\\(`([^()`]+\\.[^()`]+)`\\)") + +type rule struct { + Raw string + Domains []string + Valid bool +} + +func (r *rule) UnmarshalJSON(data []byte) error { + r.Valid = false + + raw := "" + if err := json.Unmarshal(data, &raw); err != nil { + return err + } + + matches := hostRegex.FindAllStringSubmatch(raw, -1) + + for _, match := range matches { + if len(match) <= 1 { + continue + } + r.Domains = append(r.Domains, match[1:]...) + } + + r.Valid = len(r.Domains) > 0 + + return nil +} + +type router struct { + Rule rule `json:"rule"` + Entrypoints []string `json:"entryPoints"` +} + +type traefikResponse struct { + Routers []router +} + +func (r *traefikResponse) UnmarshalJSON(data []byte) error { + var raw struct { + Routers map[string]router `json:"routers"` + } + + if err := json.Unmarshal(data, &raw); err != nil { + return err + } + + for _, v := range raw.Routers { + r.Routers = append(r.Routers, v) + } + + return nil +} + +func (r traefikResponse) Domains(entrypoint string) []string { + out := make([]string, 0, len(r.Routers)) + + for _, router := range r.Routers { + if router.Rule.Valid && slices.Contains(router.Entrypoints, entrypoint) { + out = append(out, router.Rule.Domains...) + } + } + + return out +} diff --git a/internal/roles/master/master.go b/internal/roles/master/master.go new file mode 100644 index 0000000..823b385 --- /dev/null +++ b/internal/roles/master/master.go @@ -0,0 +1,93 @@ +package master + +import ( + "context" + "sync" + + "git.wzray.com/homelab/mastermind/internal/config" + "git.wzray.com/homelab/mastermind/internal/roles" + "git.wzray.com/homelab/mastermind/internal/state" + "git.wzray.com/homelab/mastermind/internal/types" + "git.wzray.com/homelab/mastermind/internal/web/client" +) + +type Role struct { + state *state.RuntimeState + config config.MasterConfig + tasksGroup sync.WaitGroup + observer *observer + roles.BaseRole +} + +func New(state *state.RuntimeState, config config.MasterConfig) *Role { + return &Role{ + state: state, + config: config, + observer: newObserver( + state, + config.ObserverInterval, + config.BackoffSeconds, + config.BackoffCount, + ), + } +} + +func (r *Role) OnStartup(ctx context.Context) error { + r.tasksGroup.Go(func() { + r.observer.Start(ctx, func(n types.Node) error { + _, err := r.onLeave(n) + return err + }) + }) + + return nil +} + +func (r *Role) OnShutdown() error { + r.tasksGroup.Wait() + return nil +} + +func (r *Role) notify(path types.Path, v any) { + for _, n := range r.state.Registry.Nodes() { + addr := n.Address + r.tasksGroup.Go(func() { + client.Post[any](addr, path, v) + }) + } +} + +func (r *Role) onJoin(node types.Node) ([]types.Node, error) { + if err := r.state.Registry.AddNode(node); err != nil { + return nil, err + } + + r.notify(types.PathNodeJoin, node) + + return r.state.Registry.AllNodes(), nil +} + +func (r *Role) onLeave(node types.Node) (bool, error) { + if err := r.state.Registry.RemoveNode(node.Name); err != nil { + return false, err + } + + r.notify(types.PathNodeLeave, node.Name) + + return true, nil +} + +func (r *Role) onKeepAlive(node types.Node) (bool, error) { + if ok := r.state.Registry.Exists(node.Name); !ok { + _, err := r.onJoin(node) + return true, err + } + + return false, nil +} + +func (c *Role) RegisterHandlers(r types.Registrator) { + r.Register(types.PostEndpoint(types.PathMasterJoin, c.onJoin)) + r.Register(types.PostEndpoint(types.PathMasterLeave, c.onLeave)) + r.Register(types.PostEndpoint(types.PathMasterKeepalive, c.onKeepAlive)) +} diff --git a/internal/roles/master/observer.go b/internal/roles/master/observer.go new file mode 100644 index 0000000..f293133 --- /dev/null +++ b/internal/roles/master/observer.go @@ -0,0 +1,83 @@ +package master + +import ( + "context" + "time" + + "git.wzray.com/homelab/mastermind/internal/state" + "git.wzray.com/homelab/mastermind/internal/types" + "git.wzray.com/homelab/mastermind/internal/web/client" + "github.com/rs/zerolog/log" +) + +type observer struct { + state *state.RuntimeState + interval int + backoff int + backoffCount int +} + +func newObserver( + state *state.RuntimeState, + interval int, + backoff int, + backoffCount int, +) *observer { + return &observer{ + state: state, + interval: interval, + backoff: backoff, + backoffCount: backoffCount, + } +} + +func (o *observer) pollNodes(ctx context.Context, onLeave func(types.Node) error) { + for _, n := range o.state.Registry.Nodes() { + name := n.Name + logger := log.With().Str("name", name).Logger() + logger.Debug().Msg("checking node") + + delay := time.Duration(o.backoff) + alive := false + for i := o.backoffCount; i > 0; i-- { + _, err := client.Get[any](n.Address, types.PathNodeHealthcheck) + + if err == nil { + logger.Debug().Msg("node is alive") + alive = true + break + } + + if i == 0 { + break + } + + logger.Info().Any("delay", delay).Msg("node didn't respond, sleeping") + select { + case <-ctx.Done(): + goto dead + case <-time.After(delay * time.Second): + delay *= 2 + } + } + + dead: + if !alive { + logger.Info().Msg("node is dead, removing") + if err := onLeave(n); err != nil { + logger.Warn().Err(err).Msg("onLeave call failed") + } + } + } +} + +func (o *observer) Start(ctx context.Context, onLeave func(types.Node) error) { + for { + select { + case <-ctx.Done(): + return + case <-time.After(time.Duration(o.interval) * time.Second): + o.pollNodes(ctx, onLeave) + } + } +} diff --git a/internal/roles/node/node.go b/internal/roles/node/node.go new file mode 100644 index 0000000..b795835 --- /dev/null +++ b/internal/roles/node/node.go @@ -0,0 +1,168 @@ +package node + +import ( + "context" + "errors" + "sync" + "time" + + "git.wzray.com/homelab/mastermind/internal/config" + "git.wzray.com/homelab/mastermind/internal/state" + "git.wzray.com/homelab/mastermind/internal/types" + "git.wzray.com/homelab/mastermind/internal/web/client" + "github.com/rs/zerolog/log" +) + +type Role struct { + state *state.RuntimeState + keepaliveGroup sync.WaitGroup + config config.NodeConfig +} + +func New(state *state.RuntimeState, config config.NodeConfig) *Role { + return &Role{ + state: state, + config: config, + } +} + +func (r *Role) Join(bootstrap string) error { + masters := make(map[string]struct{}) + for _, node := range r.state.Registry.ByRole(types.MasterRole) { + if node.Name == r.state.Self.Name { + continue + } + masters[node.Address] = struct{}{} + } + if bootstrap != "" { + masters[bootstrap] = struct{}{} + } else if len(masters) == 0 { + return errors.New("no masters configured") + } + + for m := range masters { + logger := log.With().Str("host", m).Logger() + logger.Debug().Msg("trying to join via master") + + nodes, err := client.Post[[]types.Node](m, "/master/join", r.state.Self) + if err != nil { + logger.Debug().Err(err).Msg("unable to join") + continue + } + + if err := r.state.Registry.Set(*nodes); err != nil { + logger.Debug().Err(err).Msg("unable to set master's nodes") + continue + } + + return nil + } + + return errors.New("unable to join") +} + +func (r *Role) Leave() error { + masters := r.state.Registry.ByRole(types.MasterRole) + if len(masters) == 0 { + return nil + } + + sent := false + for _, m := range masters { + logger := log.With().Str("name", m.Name).Logger() + logger.Debug().Msg("sending leave message") + + _, err := client.Post[any](m.Address, types.PathMasterLeave, r.state.Self) + if err != nil { + logger.Debug().Err(err).Msg("unable to send leave message") + continue + } else { + sent = true + logger.Debug().Msg("leave message sent") + break + } + } + + if !sent { + return errors.New("unable to send leave message") + } + + return nil +} + +func (r *Role) OnStartup(ctx context.Context) error { + if r.config.KeepaliveInterval != -1 { + r.keepaliveGroup.Go(r.keepaliveFunc(ctx)) + } else { + log.Debug().Msg("keepalive disabled") + } + + return nil +} + +func (r *Role) OnShutdown() error { + r.keepaliveGroup.Wait() + return nil +} + +func (r *Role) keepaliveFunc(ctx context.Context) func() { + sendKeepalive := func() { + masters := r.state.Registry.ByRole(types.MasterRole) + if len(masters) == 0 { + return + } + + sent := false + for _, m := range masters { + logger := log.With().Str("name", m.Name).Logger() + logger.Debug().Msg("sending keepalive packet") + + if _, err := client.Post[any](m.Address, types.PathMasterKeepalive, r.state.Self); err != nil { + continue + } else { + logger.Debug().Msg("keepalive packet sent") + sent = true + break + } + } + + if !sent { + log.Info().Msg("unable to send keepalive packet") + } + } + + return func() { + for { + select { + case <-ctx.Done(): + return + case <-time.After(time.Duration(r.config.KeepaliveInterval) * time.Second): + sendKeepalive() + } + } + } +} + +func (r *Role) onJoin(node types.Node) (bool, error) { + if err := r.state.Registry.AddNode(node); err != nil { + return false, err + } + return true, nil +} + +func (r *Role) onLeave(node types.Node) (bool, error) { + if err := r.state.Registry.RemoveNode(node.Name); err != nil { + return false, err + } + return true, nil +} + +func healthcheck() (string, error) { + return "OK", nil +} + +func (n *Role) RegisterHandlers(r types.Registrator) { + r.Register(types.GetEndpoint(types.PathNodeHealthcheck, healthcheck)) + r.Register(types.PostEndpoint(types.PathNodeJoin, n.onJoin)) + r.Register(types.PostEndpoint(types.PathNodeLeave, n.onLeave)) +} diff --git a/internal/roles/role.go b/internal/roles/role.go new file mode 100644 index 0000000..b0e9413 --- /dev/null +++ b/internal/roles/role.go @@ -0,0 +1,21 @@ +package roles + +import ( + "context" + + "git.wzray.com/homelab/mastermind/internal/types" +) + +type Role interface { + RegisterHandlers(types.Registrator) + OnStartup(context.Context) error + OnShutdown() error +} + +type BaseRole struct{} + +func (r *BaseRole) RegisterHandlers(types.Registrator) {} + +func (r *BaseRole) OnStartup(context.Context) error { return nil } + +func (r *BaseRole) OnShutdown() error { return nil } diff --git a/internal/state/runtime.go b/internal/state/runtime.go new file mode 100644 index 0000000..d6eb8ee --- /dev/null +++ b/internal/state/runtime.go @@ -0,0 +1,18 @@ +package state + +import ( + "git.wzray.com/homelab/mastermind/internal/registry" + "git.wzray.com/homelab/mastermind/internal/types" +) + +type RuntimeState struct { + Registry *registry.Registry + Self types.Node +} + +func New(r *registry.Registry, n types.Node) *RuntimeState { + return &RuntimeState{ + Registry: r, + Self: n, + } +} diff --git a/internal/types/host.go b/internal/types/host.go new file mode 100644 index 0000000..c483e75 --- /dev/null +++ b/internal/types/host.go @@ -0,0 +1,7 @@ +package types + +type HostState struct { + Domains []string + Endpoint string + Name string +} diff --git a/internal/types/node.go b/internal/types/node.go new file mode 100644 index 0000000..a01b651 --- /dev/null +++ b/internal/types/node.go @@ -0,0 +1,16 @@ +package types + +// TODO: consider moving this type back to registry +type Node struct { + Address string `json:"address"` + Name string `json:"name"` + Roles []Role `json:"roles"` +} + +func NewNode(address string, name string, roles []Role) Node { + return Node{ + Address: address, + Name: name, + Roles: roles, + } +} diff --git a/internal/types/roles.go b/internal/types/roles.go new file mode 100644 index 0000000..3122d8d --- /dev/null +++ b/internal/types/roles.go @@ -0,0 +1,39 @@ +package types + +type Role string + +const ( + MasterRole Role = "master" + HostRole Role = "host" + DnsRole Role = "dns" + NameserverRole Role = "ns" +) + +var Roles = []Role{ + MasterRole, + HostRole, + DnsRole, + NameserverRole, +} + +var Names = func() []Role { + o := make([]Role, 0, len(Roles)) + for _, r := range Roles { + o = append(o, r) + } + return o +}() + +func (r Role) String() string { + return string(r) +} + +func Parse(s string) (Role, bool) { + for _, r := range Roles { + if s == r.String() { + return r, true + } + } + + return "", false +} diff --git a/internal/types/rpc.go b/internal/types/rpc.go new file mode 100644 index 0000000..ab1254f --- /dev/null +++ b/internal/types/rpc.go @@ -0,0 +1 @@ +package types diff --git a/internal/types/web.go b/internal/types/web.go new file mode 100644 index 0000000..17ab7f9 --- /dev/null +++ b/internal/types/web.go @@ -0,0 +1,76 @@ +package types + +import ( + "encoding/json" + "fmt" + "net/http" +) + +type Path string + +func (p Path) String() string { + return string(p) +} + +const ( + PathMasterJoin Path = "/master/join" + PathMasterLeave Path = "/master/leave" + PathMasterKeepalive Path = "/master/keepalive" + + PathNodeHealthcheck Path = "/node/healthcheck" + PathNodeJoin Path = "/node/join" + PathNodeLeave Path = "/node/leave" + + PathDnsCallback Path = "/dns/callback" + + PathHostCallback Path = "/host/callback" + PathHostDns Path = "/host/dns" + PathHostNs Path = "/host/ns" +) + +type Response[T any] struct { + Ok bool `json:"ok"` + Data T `json:"data,omitempty"` + Err string `json:"err,omitempty"` +} + +type Route interface { + Path() string + Handle([]byte) (any, error) +} + +type endpoint struct { + path string + handler func([]byte) (any, error) +} + +func (e endpoint) Path() string { return e.path } + +func (e endpoint) Handle(v []byte) (any, error) { return e.handler(v) } + +func PostEndpoint[T any, V any](path Path, handler func(T) (V, error)) Route { + return endpoint{ + path: "POST " + path.String(), + handler: func(a []byte) (any, error) { + var r T + if err := json.Unmarshal(a, &r); err != nil { + return nil, fmt.Errorf("unable to unmarshal json: %w", err) + } + return handler(r) + }, + } +} + +func GetEndpoint[T any](path Path, handler func() (T, error)) Route { + return endpoint{ + path: "GET " + path.String(), + handler: func(a []byte) (any, error) { + return handler() + }, + } +} + +type Registrator interface { + Register(endpoint Route) + RegisterRaw(method string, pattern string, handler func(http.ResponseWriter, *http.Request)) +} diff --git a/internal/web/client/client.go b/internal/web/client/client.go new file mode 100644 index 0000000..0c5f4e9 --- /dev/null +++ b/internal/web/client/client.go @@ -0,0 +1,108 @@ +package client + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "io" + "net/http" + "net/url" + "time" + + "git.wzray.com/homelab/mastermind/internal/types" + "git.wzray.com/homelab/mastermind/internal/web/middleware" +) + +type client struct { + http *http.Client + middleware middleware.Middleware +} + +var defaultClient *client +const timeout = time.Duration(2) * time.Second + +func (c *client) makeRequest(method string, host string, path types.Path, data any, out any) error { + var body io.Reader + if data != nil { + raw, err := json.Marshal(data) + if err != nil { + return fmt.Errorf("marshal body: %w", err) + } + body = bytes.NewReader(raw) + } + + uri := (&url.URL{ + Scheme: "http", + Host: host, + Path: path.String(), + }).String() + + r, err := http.NewRequest(method, uri, body) + if err != nil { + return fmt.Errorf("build http request: %w", err) + } + + if body != nil { + r.Header.Set("Content-Type", "application/json; charset=utf-8") + } + + if err := c.middleware.Client(r); err != nil { + return fmt.Errorf("apply middleware: %w", err) + } + + resp, err := c.http.Do(r) + if err != nil { + return fmt.Errorf("send request: %w", err) + } + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + b, _ := io.ReadAll(resp.Body) + return fmt.Errorf("http %d: %s", resp.StatusCode, string(b)) + } + + defer resp.Body.Close() + if out != nil { + if err := json.NewDecoder(resp.Body).Decode(out); err != nil { + return fmt.Errorf("decode body: %w", err) + } + } + + io.Copy(io.Discard, resp.Body) + return nil +} + +func Init(mw middleware.Middleware) { + if defaultClient != nil { + panic("web.client: Init called twice") + } + + defaultClient = &client{ + http: &http.Client{ + Timeout: timeout, + }, + middleware: mw, + } +} + +func request[Out any, In any](method string, host string, path types.Path, data In) (*Out, error) { + out := &types.Response[Out]{} + err := defaultClient.makeRequest(method, host, path, data, out) + if err != nil { + return nil, err + } + + if !out.Ok { + return nil, errors.New(out.Err) + } + + return &out.Data, err +} + +func Get[Out any](host string, path types.Path) (*Out, error) { // TODO: out should not be a pointer + return request[Out, any](http.MethodGet, host, path, nil) +} + +func Post[Out any, In any](host string, path types.Path, data In) (*Out, error) { + return request[Out](http.MethodPost, host, path, data) +} diff --git a/internal/web/middleware/middleware.go b/internal/web/middleware/middleware.go new file mode 100644 index 0000000..b7585f1 --- /dev/null +++ b/internal/web/middleware/middleware.go @@ -0,0 +1,53 @@ +package middleware + +import ( + "fmt" + "net/http" + "slices" +) + +type Middleware interface { + Client(r *http.Request) error + Handler(http.Handler) http.Handler +} + +type BaseMiddleware struct{} + +func (BaseMiddleware) Client(*http.Request) error { return nil } + +func (BaseMiddleware) Handler(h http.Handler) http.Handler { return h } + +type MiddlewareBuilder struct { + middlewares []Middleware +} + +func (b *MiddlewareBuilder) Use(middleware Middleware) *MiddlewareBuilder { + b.middlewares = append(b.middlewares, middleware) + return b +} + +type middlewareProxy struct { + middlewares []Middleware +} + +func (p *middlewareProxy) Client(r *http.Request) error { + for _, m := range p.middlewares { + if err := m.Client(r); err != nil { + return fmt.Errorf("%T: %w", m, err) + } + } + return nil +} + +func (p *middlewareProxy) Handler(h http.Handler) http.Handler { + for _, f := range slices.Backward(p.middlewares) { + h = f.Handler(h) + } + return h +} + +func (b *MiddlewareBuilder) Prepare() Middleware { + return &middlewareProxy{ + middlewares: append([]Middleware(nil), b.middlewares...), + } +} diff --git a/internal/web/server/server.go b/internal/web/server/server.go new file mode 100644 index 0000000..e9b8e8e --- /dev/null +++ b/internal/web/server/server.go @@ -0,0 +1,80 @@ +package server + +import ( + "context" + "io" + "net/http" + + "git.wzray.com/homelab/mastermind/internal/types" + "git.wzray.com/homelab/mastermind/internal/web/middleware" + "github.com/rs/zerolog/log" +) + +type Server struct { + mux *http.ServeMux + httpServer http.Server +} + +func NewServer(addr string, middleware middleware.Middleware) *Server { + mux := http.NewServeMux() + s := &Server{ + mux: mux, + httpServer: http.Server{ + Addr: addr, + Handler: middleware.Handler(mux), + }, + } + return s +} + +func (s *Server) Listen() error { + return s.httpServer.ListenAndServe() +} + +func (s *Server) Shutdown(ctx context.Context) error { + return s.httpServer.Shutdown(ctx) +} + +func (s *Server) handleFunc(route types.Route) func(w http.ResponseWriter, r *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + log.Debug(). // TODO: make this a middleware + Str("method", r.Method). + Str("path", r.URL.Path). + Str("query", r.URL.RawQuery). + Str("remoteAddr", r.RemoteAddr). + Send() + + w.Header().Set("Content-Type", "application/json; charset=utf-8") + + body, err := io.ReadAll(r.Body) + if err != nil { + w.Write(fail("read request body: %v", err)) + log.Err(err).Msg("unable to read request body") + return + } + + raw, err := route.Handle(body) + if err != nil { + w.Write(fail("handle request: %v", err)) + log.Err(err).Msg("unable to handle request") + return + } + + data, err := ok(raw) + if err != nil { + w.Write(fail("marshal response: %v", err)) + log.Err(err).Msg("unable to marshal response") + return + } + + w.Write(data) + } +} + +func (s *Server) Register(endpoint types.Route) { + s.mux.HandleFunc(endpoint.Path(), s.handleFunc(endpoint)) +} + +func (s *Server) RegisterRaw(method string, pattern string, handler func(http.ResponseWriter, *http.Request)) { + s.mux.HandleFunc(method+" "+pattern, handler) +} diff --git a/internal/web/server/util.go b/internal/web/server/util.go new file mode 100644 index 0000000..78e06b8 --- /dev/null +++ b/internal/web/server/util.go @@ -0,0 +1,23 @@ +package server + +import ( + "encoding/json" + "fmt" + + "git.wzray.com/homelab/mastermind/internal/types" +) + +func fail(format string, a ...any) []byte { + r, _ := json.Marshal(types.Response[string]{ + Ok: false, + Err: fmt.Sprintf(format, a...), + }) + return r +} + +func ok[T any](data T) ([]byte, error) { + return json.Marshal(types.Response[T]{ + Ok: true, + Data: data, + }) +} diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index 9d9c081..0000000 --- a/requirements.txt +++ /dev/null @@ -1,3 +0,0 @@ -fabric -requests -flask