import base64 import os import re import urllib3 from concurrent.futures import Future, ThreadPoolExecutor, wait as gather_futures import fabric import paramiko.auth_strategy from flask import Flask, request NODE_NAME = (os.getenv('NODE_NAME') or '') + "_traefik" TRAEFIK_INSTANCE = os.getenv('TRAEFIK_INSTANCE') or '' TRAEFIK_HOST = os.getenv('TRAEFIK_HOST') or '' EXTERNAL_HOST = os.getenv('EXTERNAL_HOST') or '' INTERNAL_DESTS = (os.getenv('INTERNAL_DESTS') or '').split(',') or [] EXTERNAL_DESTS = (os.getenv('EXTERNAL_DESTS') or '').split(',') or [] PRIVATE_KEY = os.getenv('PRIVATE_KEY') or '' CLOUDFLARE_API_KEY = os.getenv('CLOUDFLARE_API_KEY') or '' CLOUDFLARE_ZONE_ID = os.getenv('CLOUDFLARE_ZONE_ID') or '' HOST = os.getenv('HOST') or '0.0.0.0' PORT = os.getenv('PORT') or '80' assert NODE_NAME != '_traefik' for x in [ TRAEFIK_INSTANCE, TRAEFIK_HOST, EXTERNAL_HOST, INTERNAL_DESTS, EXTERNAL_DESTS, PRIVATE_KEY, CLOUDFLARE_API_KEY, CLOUDFLARE_ZONE_ID, HOST, PORT, ]: assert x app = Flask(__name__) PATTERN = re.compile(r"Host\(`((?:[a-zA-Z0-9_-]+\.)*wzray\.com)`\)") INTERNAL_ENTRYPOINT = 'https' EXTERNAL_ENTRYPOINT = 'ehttps' def Connection(host: str, user: str = 'root', port = None) -> fabric.Connection: """this actually is the fuckiest way to get a key.""" key = base64.b64decode(PRIVATE_KEY) privkey = paramiko.Ed25519Key.__new__(paramiko.Ed25519Key) privkey.public_blob = None privkey._signing_key = privkey._parse_signing_key_data(key, None) # type: ignore privkey._verifying_key = None # type: ignore return fabric.Connection(host=host, user=user, port=port, connect_kwargs= {'auth_strategy': paramiko.auth_strategy.InMemoryPrivateKey(user, privkey)}) def cf_request(method: str, target: str, json = None): r = urllib3.request(method, 'https://api.cloudflare.com/client/v4/' + target, headers={'Authorization': f'Bearer {CLOUDFLARE_API_KEY}'}, json=json) if r.status != 200: raise Exception(f'Error {r.status}: {r.data.decode()}') return r.json()['result'] class Observer: internal: set[str] = set() external: set[str] = set() executor: ThreadPoolExecutor = ThreadPoolExecutor(max_workers=128) job: Future | None = None @classmethod def _init(cls): try: with urllib3.HTTPSConnectionPool( 'host.docker.internal', server_hostname=TRAEFIK_INSTANCE ) as pool: runtime_conf = pool.request( 'GET', '/api/rawdata', headers={'Host': TRAEFIK_INSTANCE}, assert_same_host=False ).json() except Exception: return dynamic_conf = { 'http': { 'middlewares': runtime_conf.get('middlewares') or {}, 'routers': runtime_conf.get('routers') or {}, 'services': runtime_conf.get('services') or {}, }, 'tcp': { 'middlewares': runtime_conf.get('tcpMiddlewares') or {}, 'routers': runtime_conf.get('tcpRouters') or {}, 'services': runtime_conf.get('tcpServices') or {}, }, 'udp': { 'routers': runtime_conf.get('udpRouters') or {}, 'services': runtime_conf.get('udpServices') or {}, } } cls.update(dynamic_conf) print("Initialized!") @classmethod def _close(cls): cls.executor.shutdown() @classmethod def update_internal(cls): hosts_string = '\n'.join(f'{TRAEFIK_HOST} {x}' for x in cls.internal) def _update(ip: str): c = Connection(ip) with c.sftp().open(f'/etc/hosts.d/00_{NODE_NAME}.hosts', 'w') as f: f.write(hosts_string) c.run('/etc/init.d/dnsmasq reload') return [cls.executor.submit(_update, x) for x in INTERNAL_DESTS] @classmethod def update_external(cls): hosts_string = '\n'.join(f'{x} {NODE_NAME};' for x in cls.external) def _update(ip: str): c = Connection(ip) with c.sftp().open(f'/etc/nginx/services.d/{NODE_NAME}_extra.conf', 'w') as f: f.write(hosts_string) c.run('systemctl reload nginx') return [cls.executor.submit(_update, x) for x in EXTERNAL_DESTS] @classmethod def update_cloudflare(cls): url = f'zones/{CLOUDFLARE_ZONE_ID}/dns_records' to_delete = [{'id': x['id']} for x in filter( lambda x: x['comment'] == NODE_NAME, cf_request('GET', url))] if not cls.external and not to_delete: return cf_request('POST', url + '/batch', { 'deletes': to_delete, 'posts': [{ 'comment': NODE_NAME, 'content': EXTERNAL_HOST, 'type': 'A', 'proxied': False, 'name': x, } for x in cls.external] }) @classmethod def update(cls, data): def _job(): gather_futures([ *cls.update_external(), *cls.update_internal(), cls.executor.submit(cls.update_cloudflare) ]) cls.job = None cls.internal, cls.external = cls.parse_raw_data(data) print(f"{cls.internal=}, {cls.external=}") if cls.job != None: cls.job.result() cls.job = cls.executor.submit(_job) @classmethod def parse_raw_data(cls, data: dict): http = data.get('http') or {} flt = lambda ep: set([z for y in [re.findall(PATTERN, x['rule']) for x in http['routers'].values() if ep in x['entryPoints']] if y for z in y]) internal = flt(INTERNAL_ENTRYPOINT) external = flt(EXTERNAL_ENTRYPOINT) return internal, external Observer._init() @app.post("/callback") def callback(): data = request.json if not data: return {"error": "missing data", "ok": False}, 400 Observer.update(data) return {"ok": True} if __name__ == "__main__": try: app.run(host=HOST, port=int(PORT)) finally: Observer._close()