mastermind/app.py

153 lines
4.8 KiB
Python

import base64
import os
import re
import requests
import urllib3
from concurrent.futures import Future, ThreadPoolExecutor, wait as gather_futures
import fabric
import paramiko.auth_strategy
from flask import Flask, request
NODE_NAME = (os.getenv('NODE_NAME') or '') + "_traefik"
TRAEFIK_INSTANCE = os.getenv('TRAEFIK_INSTANCE') or ''
TRAEFIK_HOST = os.getenv('TRAEFIK_HOST') or ''
INTERNAL_DESTS = (os.getenv('INTERNAL_DESTS') or '').split(',') or []
EXTERNAL_DESTS = (os.getenv('EXTERNAL_DESTS') or '').split(',') or []
PRIVATE_KEY = os.getenv('PRIVATE_KEY') or ''
HOST = os.getenv('HOST') or '0.0.0.0'
PORT = os.getenv('PORT') or '80'
assert NODE_NAME != '_traefik'
assert TRAEFIK_INSTANCE
assert TRAEFIK_HOST
assert INTERNAL_DESTS
assert EXTERNAL_DESTS
assert PRIVATE_KEY
app = Flask(__name__)
PATTERN = re.compile(r"Host\(`((?:[a-zA-Z0-9_-]+\.)*wzray\.com)`\)")
INTERNAL_ENTRYPOINT = 'https'
EXTERNAL_ENTRYPOINT = 'ehttps'
def Connection(host: str, user: str = 'root', port = None) -> fabric.Connection:
"""this actually is the fuckiest way to get a key."""
key = base64.b64decode(PRIVATE_KEY)
privkey = paramiko.Ed25519Key.__new__(paramiko.Ed25519Key)
privkey.public_blob = None
privkey._signing_key = privkey._parse_signing_key_data(key, None) # type: ignore
privkey._verifying_key = None # type: ignore
return fabric.Connection(host=host, user=user, port=port, connect_kwargs=
{'auth_strategy': paramiko.auth_strategy.InMemoryPrivateKey(user, privkey)})
class Observer:
internal: set[str] = set()
external: set[str] = set()
executor: ThreadPoolExecutor = ThreadPoolExecutor(max_workers=128)
job: Future | None = None
@classmethod
def _init(cls):
try:
with urllib3.HTTPSConnectionPool(
'host.docker.internal',
server_hostname=TRAEFIK_INSTANCE
) as pool:
runtime_conf = pool.request(
'GET',
'/api/rawdata',
headers={'Host': TRAEFIK_INSTANCE},
assert_same_host=False
).json()
except Exception:
return
dynamic_conf = {
'http': {
'middlewares': runtime_conf.get('middlewares') or {},
'routers': runtime_conf.get('routers') or {},
'services': runtime_conf.get('services') or {},
},
'tcp': {
'middlewares': runtime_conf.get('tcpMiddlewares') or {},
'routers': runtime_conf.get('tcpRouters') or {},
'services': runtime_conf.get('tcpServices') or {},
},
'udp': {
'routers': runtime_conf.get('udpRouters') or {},
'services': runtime_conf.get('udpServices') or {},
}
}
cls.update(dynamic_conf)
print("Initialized!")
@classmethod
def _close(cls):
cls.executor.shutdown()
@classmethod
def update_internal(cls):
hosts_string = '\n'.join(f'{TRAEFIK_HOST} {x}' for x in cls.internal)
def _update(ip: str):
c = Connection(ip)
with c.sftp().open(f'/etc/hosts.d/00_{NODE_NAME}.hosts', 'w') as f:
f.write(hosts_string)
c.run('/etc/init.d/dnsmasq reload')
return [cls.executor.submit(_update, x) for x in INTERNAL_DESTS]
@classmethod
def update_external(cls):
hosts_string = '\n'.join(f'{x} {NODE_NAME};' for x in cls.external)
def _update(ip: str):
c = Connection(ip)
with c.sftp().open(f'/etc/nginx/services.d/{NODE_NAME}_extra.conf', 'w') as f:
f.write(hosts_string)
c.run('systemctl reload nginx')
return [cls.executor.submit(_update, x) for x in EXTERNAL_DESTS]
@classmethod
def update(cls, data):
def _job():
gather_futures(cls.update_external() + cls.update_internal())
cls.job = None
cls.internal, cls.external = cls.parse_raw_data(data)
print(f"{cls.internal=}, {cls.external=}")
if cls.job != None:
cls.job.result()
cls.job = cls.executor.submit(_job)
@classmethod
def parse_raw_data(cls, data: dict):
http = data.get('http') or {}
flt = lambda ep: set([x.group(1) for x in [re.match(PATTERN, x['rule']) for x in http['routers'].values() if ep in x['entryPoints']] if x])
internal = flt(INTERNAL_ENTRYPOINT)
external = flt(EXTERNAL_ENTRYPOINT)
return internal, external
Observer._init()
@app.post("/callback")
def callback():
data = request.json
if not data:
return {"error": "missing data", "ok": False}, 400
Observer.update(data)
return {"ok": True}
if __name__ == "__main__":
try:
app.run(host=HOST, port=int(PORT))
finally:
Observer._close()