1
0
Fork 0

init: whatever

This commit is contained in:
Arthur K. 2026-02-20 19:11:32 +03:00
commit e752a9003a
Signed by: wzray
GPG key ID: B97F30FDC4636357
9 changed files with 1053 additions and 0 deletions

2
.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
/data/
__pycache__/

11
compose.yml Normal file
View file

@ -0,0 +1,11 @@
services:
megapt:
build: src
restart: unless-stopped
environment:
USAGE_THRESHOLD: 85
CHECK_INTERVAL: 60
labels:
traefik.host: megapt
volumes:
- ./data:/data

32
src/Dockerfile Normal file
View file

@ -0,0 +1,32 @@
FROM python:3.14-slim
WORKDIR /app
RUN apt-get update && apt-get install -y --no-install-recommends \
tzdata \
xvfb \
xauth \
ca-certificates \
curl \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt /app/requirements.txt
RUN pip install --no-cache-dir -r /app/requirements.txt
RUN python -m playwright install --with-deps chromium
COPY *.py /app/
ENV PYTHONUNBUFFERED=1
ENV PORT=8000
ENV DATA_DIR=/data
ENV PLAYWRIGHT_SKIP_BROWSER_DOWNLOAD=1
VOLUME ["/data"]
EXPOSE 8000
STOPSIGNAL SIGINT
COPY entrypoint.sh /entrypoint.sh
CMD ["/entrypoint.sh"]

61
src/codex_usage.py Normal file
View file

@ -0,0 +1,61 @@
import json
import socket
import urllib.error
import urllib.request
from typing import Any
def clamp_percent(value: Any) -> int:
try:
num = float(value)
except (TypeError, ValueError):
return 0
if num < 0:
return 0
if num > 100:
return 100
return int(round(num))
def get_usage_percent(access_token: str, timeout_ms: int = 10000) -> int:
headers = {
"Authorization": f"Bearer {access_token}",
"User-Agent": "CodexProxy",
"Accept": "application/json",
}
req = urllib.request.Request(
"https://chatgpt.com/backend-api/wham/usage",
headers=headers,
method="GET",
)
try:
with urllib.request.urlopen(req, timeout=timeout_ms / 1000) as res:
body = res.read().decode("utf-8", errors="replace")
except urllib.error.HTTPError as e:
return -1
except (urllib.error.URLError, socket.timeout):
return -1
try:
data = json.loads(body)
except json.JSONDecodeError:
return -1
primary = (data.get("rate_limit") or {}).get("primary_window") or {}
if primary:
return clamp_percent(primary.get("used_percent") or 0)
return -1
if __name__ == "__main__":
from tokens import load_tokens
tokens = load_tokens()
if tokens:
usage = get_usage_percent(tokens.access_token)
print(f"{usage}%")
else:
print("No tokens")

15
src/entrypoint.sh Executable file
View file

@ -0,0 +1,15 @@
#!/bin/sh
set -e
export DISPLAY=:99
Xvfb :99 -screen 0 1280x1024x24 -nolisten tcp -ac >/tmp/xvfb.log 2>&1 &
XVFB_PID=$!
cleanup() {
kill "$XVFB_PID" >/dev/null 2>&1 || true
}
trap cleanup EXIT INT TERM
exec python -u proxy.py

397
src/get_new_token.py Normal file
View file

@ -0,0 +1,397 @@
import asyncio
import os
import re
import random
import secrets
import json
import time
import logging
from datetime import datetime
import aiohttp
import pkce
from urllib.parse import urlencode, urlparse, parse_qs
from playwright.async_api import async_playwright, Page, Browser
from tokens import DATA_DIR, TOKENS_FILE
logger = logging.getLogger(__name__)
CLIENT_ID = "app_EMoamEEZ73f0CkXaXp7hrann"
AUTHORIZE_URL = "https://auth.openai.com/oauth/authorize"
TOKEN_URL = "https://auth.openai.com/oauth/token"
REDIRECT_URI = "http://localhost:1455/auth/callback"
SCOPE = "openid profile email offline_access"
class AutomationError(Exception):
def __init__(self, step: str, message: str, page: Page | None = None):
self.step = step
self.message = message
self.page = page
super().__init__(f"[{step}] {message}")
async def save_error_screenshot(page: Page | None, step: str):
if page:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
screenshots_dir = DATA_DIR / "screenshots"
screenshots_dir.mkdir(parents=True, exist_ok=True)
filename = screenshots_dir / f"error_{step}_{timestamp}.png"
try:
await page.screenshot(path=str(filename))
logger.error(f"Screenshot saved: {filename}")
except:
pass
def generate_pkce():
return pkce.generate_pkce_pair()
def generate_state():
return secrets.token_urlsafe(32)
def create_auth_url(verifier: str, challenge: str, state: str) -> str:
params = {
"response_type": "code",
"client_id": CLIENT_ID,
"redirect_uri": REDIRECT_URI,
"scope": SCOPE,
"code_challenge": challenge,
"code_challenge_method": "S256",
"state": state,
"id_token_add_organizations": "true",
"codex_cli_simplified_flow": "true",
"originator": "opencode",
}
return f"{AUTHORIZE_URL}?{urlencode(params)}"
async def get_temp_email(page: Page) -> str:
logger.info("Getting temp email...")
for i in range(30):
mail_input = page.locator("#mail")
if await mail_input.count() > 0:
val = await mail_input.input_value()
if val and "@" in val:
logger.info(f"Got email: {val}")
return val
await page.wait_for_timeout(1000)
raise AutomationError("get_email", "Failed to get email", page)
async def get_verification_code(page: Page, used_codes: list | None = None) -> str:
logger.info("Waiting for verification code...")
if used_codes is None:
used_codes = []
await page.wait_for_timeout(10000)
for attempt in range(20):
mail_items = page.locator(".inbox-dataList ul li")
count = await mail_items.count()
logger.debug(f"Attempt {attempt + 1}: {count} emails")
if count > 0:
codes = []
for i in range(count):
try:
item = mail_items.nth(i)
text = await item.inner_text()
match = re.search(
r"Your ChatGPT code is (\d{6})", text, re.IGNORECASE
)
if match:
code = match.group(1)
if code not in used_codes:
codes.append(code)
except:
pass
if codes:
logger.info(f"Got code: {codes[0]}")
return codes[0]
await page.wait_for_timeout(5000)
await page.reload(wait_until="domcontentloaded")
await page.wait_for_timeout(5000)
raise AutomationError("get_code", "Code not found", page)
async def fill_date_field(page: Page, month: str, day: str, year: str):
async def type_segment(segment_type: str, value: str):
field = page.locator(f'[data-type="{segment_type}"]')
if await field.count() == 0:
raise AutomationError(
"profile", f"Missing birthday segment: {segment_type}", page
)
target = field.first
await target.scroll_into_view_if_needed()
await target.focus()
await page.keyboard.press("Control+A")
await page.keyboard.press("Backspace")
await page.keyboard.type(value)
await page.wait_for_timeout(200)
await type_segment("month", month)
await type_segment("day", day)
await type_segment("year", year)
def generate_name():
first_names = [
"Alex",
"Jordan",
"Taylor",
"Morgan",
"Casey",
"Riley",
"Quinn",
"Avery",
"Parker",
"Blake",
]
last_names = [
"Smith",
"Johnson",
"Williams",
"Brown",
"Jones",
"Davis",
"Miller",
"Wilson",
"Moore",
"Clark",
]
return f"{random.choice(first_names)} {random.choice(last_names)}"
async def exchange_code_for_tokens(code: str, verifier: str) -> dict:
async with aiohttp.ClientSession() as session:
data = {
"grant_type": "authorization_code",
"client_id": CLIENT_ID,
"code": code,
"code_verifier": verifier,
"redirect_uri": REDIRECT_URI,
}
async with session.post(TOKEN_URL, data=data) as resp:
if not resp.ok:
text = await resp.text()
raise Exception(f"Token exchange failed: {resp.status} {text}")
json_resp = await resp.json()
return {
"access_token": json_resp["access_token"],
"refresh_token": json_resp["refresh_token"],
"expires_in": json_resp["expires_in"],
}
async def get_new_token(headless: bool = False) -> bool:
logger.info("=== Starting token generation ===")
password = "TempPass123!"
full_name = generate_name()
birth_month, birth_day, birth_year = "01", "15", "1995"
verifier, challenge = generate_pkce()
state = generate_state()
auth_url = create_auth_url(verifier, challenge, state)
redirect_url_captured = None
browser: Browser | None = None
current_page: Page | None = None
try:
async with async_playwright() as p:
chromium_path = os.environ.get("CHROMIUM_PATH")
if chromium_path:
browser = await p.chromium.launch(
headless=headless,
executable_path=chromium_path,
)
else:
browser = await p.chromium.launch(headless=headless)
context = await browser.new_context()
page = await context.new_page()
current_page = page
logger.info("[1/6] Getting email...")
await page.goto("https://temp-mail.org", wait_until="domcontentloaded")
email = await get_temp_email(page)
tempmail_page = page
logger.info("[2/6] Registering ChatGPT...")
chatgpt_page = await context.new_page()
current_page = chatgpt_page
await chatgpt_page.goto("https://chatgpt.com")
await chatgpt_page.wait_for_load_state("domcontentloaded")
await chatgpt_page.get_by_text("Sign up for free", exact=True).click()
await chatgpt_page.wait_for_timeout(2000)
await chatgpt_page.locator('input[type="email"]').fill(email)
await chatgpt_page.wait_for_timeout(500)
await chatgpt_page.get_by_role(
"button", name="Continue", exact=True
).click()
await chatgpt_page.wait_for_timeout(3000)
await chatgpt_page.locator('input[type="password"]').fill(password)
await chatgpt_page.wait_for_timeout(500)
await chatgpt_page.get_by_role(
"button", name="Continue", exact=True
).click()
await chatgpt_page.wait_for_timeout(5000)
logger.info("[3/6] Getting verification code...")
await tempmail_page.bring_to_front()
code = await get_verification_code(tempmail_page)
await chatgpt_page.bring_to_front()
code_input = chatgpt_page.get_by_placeholder("Code")
if await code_input.count() > 0:
await code_input.fill(code)
await chatgpt_page.wait_for_timeout(5000)
continue_btn = chatgpt_page.get_by_role(
"button", name="Continue", exact=True
)
if await continue_btn.count() > 0:
await continue_btn.click()
await chatgpt_page.wait_for_timeout(5000)
logger.info("[4/6] Setting profile...")
name_input = chatgpt_page.get_by_placeholder("Full name")
if await name_input.count() > 0:
await name_input.fill(full_name)
await chatgpt_page.wait_for_timeout(500)
await fill_date_field(chatgpt_page, birth_month, birth_day, birth_year)
await chatgpt_page.wait_for_timeout(1000)
continue_btn = chatgpt_page.get_by_role(
"button", name="Continue", exact=True
)
if await continue_btn.count() > 0:
await continue_btn.click()
logger.info("Account registered!")
await chatgpt_page.wait_for_timeout(10000)
await chatgpt_page.wait_for_load_state("networkidle", timeout=30000)
await chatgpt_page.wait_for_timeout(5000)
used_codes = [code]
logger.info("[5/6] OAuth flow...")
oauth_page = await context.new_page()
current_page = oauth_page
def handle_request(request):
nonlocal redirect_url_captured
url = request.url
if "localhost:1455" in url and "code=" in url:
logger.info("Redirect URL captured!")
redirect_url_captured = url
oauth_page.on("request", handle_request)
await oauth_page.goto(auth_url)
await oauth_page.wait_for_load_state("domcontentloaded")
await oauth_page.wait_for_timeout(3000)
await oauth_page.locator('input[type="email"], input[name="email"]').fill(
email
)
await oauth_page.wait_for_timeout(500)
await oauth_page.get_by_role("button", name="Continue", exact=True).click()
await oauth_page.wait_for_timeout(3000)
password_input = oauth_page.locator('input[type="password"]')
if await password_input.count() > 0:
await password_input.fill(password)
await oauth_page.wait_for_timeout(500)
await oauth_page.get_by_role(
"button", name="Continue", exact=True
).click()
await oauth_page.wait_for_timeout(5000)
await tempmail_page.bring_to_front()
await tempmail_page.reload(wait_until="domcontentloaded")
await tempmail_page.wait_for_timeout(3000)
try:
oauth_code = await get_verification_code(tempmail_page, used_codes)
except AutomationError:
logger.info("Reopening mail...")
tempmail_page = await context.new_page()
current_page = tempmail_page
await tempmail_page.goto(
"https://temp-mail.org", wait_until="domcontentloaded"
)
await tempmail_page.wait_for_timeout(10000)
oauth_code = await get_verification_code(tempmail_page, used_codes)
await oauth_page.bring_to_front()
code_input = oauth_page.get_by_placeholder("Code")
if await code_input.count() > 0:
await code_input.fill(oauth_code)
await oauth_page.wait_for_timeout(500)
await oauth_page.get_by_role(
"button", name="Continue", exact=True
).click()
await oauth_page.wait_for_timeout(5000)
for btn_text in ["Continue", "Allow", "Authorize"]:
btn = oauth_page.get_by_role("button", name=btn_text, exact=True)
if await btn.count() > 0:
await btn.click()
break
await oauth_page.wait_for_timeout(5000)
logger.info("[6/6] Exchanging code for tokens...")
if redirect_url_captured and "code=" in redirect_url_captured:
parsed = urlparse(redirect_url_captured)
params = parse_qs(parsed.query)
auth_code = params.get("code", [None])[0]
if auth_code:
tokens = await exchange_code_for_tokens(auth_code, verifier)
token_data = {
"access_token": tokens["access_token"],
"refresh_token": tokens["refresh_token"],
"expires_at": time.time() + tokens["expires_in"],
}
TOKENS_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(TOKENS_FILE, "w") as f:
json.dump(token_data, f, indent=2)
logger.info(f"Tokens saved to {TOKENS_FILE}")
return True
raise AutomationError("token_exchange", "Failed to get tokens", oauth_page)
except AutomationError as e:
logger.error(f"Error at step [{e.step}]: {e.message}")
await save_error_screenshot(e.page, e.step)
return False
except Exception as e:
logger.error(f"Unexpected error: {e}")
await save_error_screenshot(current_page, "unexpected")
return False
finally:
if browser:
await asyncio.sleep(2)
await browser.close()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
success = asyncio.run(get_new_token())
exit(0 if success else 1)

436
src/proxy.py Normal file
View file

@ -0,0 +1,436 @@
import os
import asyncio
import logging
import time
import secrets
import json
import base64
import uuid
from urllib.parse import urlencode
from aiohttp import web
import aiohttp
from tokens import get_valid_tokens, load_tokens, DATA_DIR
from codex_usage import get_usage_percent
from get_new_token import get_new_token
CODEX_BASE_URL = "https://chatgpt.com/backend-api"
PORT = int(os.environ.get("PORT", "8080"))
USAGE_THRESHOLD = int(os.environ.get("USAGE_THRESHOLD", "85"))
CHECK_INTERVAL = int(os.environ.get("CHECK_INTERVAL", "60"))
FAKE_EXPIRES_IN = 9999999999999
AUTH_FILE = DATA_DIR / "auth.json"
JWT_AUTH_CLAIM_PATH = "https://api.openai.com/auth"
JWT_PROFILE_CLAIM_PATH = "https://api.openai.com/profile"
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
refresh_in_progress = False
auth_codes: dict[str, dict] = {}
def _b64url(data: bytes) -> str:
return base64.urlsafe_b64encode(data).decode("utf-8").rstrip("=")
def _generate_jwt_like() -> str:
account_id = str(uuid.uuid4())
now = int(time.time())
header = {"alg": "HS256", "typ": "JWT"}
user_id = f"user-{secrets.token_urlsafe(18)}"
account_user_id = f"{user_id}__{account_id}"
payload = {
"aud": ["https://api.openai.com/v1"],
"client_id": "app_EMoamEEZ73f0CkXaXp7hrann",
"iss": "https://auth.openai.com",
"iat": now,
"nbf": now,
"exp": now + 315360000,
"jti": str(uuid.uuid4()),
"scp": ["openid", "profile", "email", "offline_access"],
"session_id": f"authsess_{secrets.token_urlsafe(24)}",
JWT_AUTH_CLAIM_PATH: {
"chatgpt_account_id": account_id,
"chatgpt_account_user_id": account_user_id,
"chatgpt_compute_residency": "no_constraint",
"chatgpt_plan_type": "plus",
"chatgpt_user_id": user_id,
"user_id": user_id,
},
JWT_PROFILE_CLAIM_PATH: {
"email": f"proxy-{secrets.token_hex(4)}@example.local",
"email_verified": True,
},
"sub": f"auth0|{secrets.token_urlsafe(20)}",
}
head = _b64url(json.dumps(header, separators=(",", ":")).encode("utf-8"))
body = _b64url(json.dumps(payload, separators=(",", ":")).encode("utf-8"))
sign = _b64url(secrets.token_bytes(32))
return f"{head}.{body}.{sign}"
def _generate_refresh_like() -> str:
return f"rt_{secrets.token_urlsafe(40)}.{secrets.token_urlsafe(32)}"
def _mask(value: str, head: int = 8, tail: int = 6) -> str:
if not value:
return "<empty>"
if len(value) <= head + tail:
return "<hidden>"
return f"{value[:head]}...{value[-tail:]}"
def load_or_create_auth() -> dict:
if AUTH_FILE.exists():
with open(AUTH_FILE) as f:
data = json.load(f)
if (
data.get("access_token")
and data.get("refresh_token")
and data.get("expires_at")
):
return data
DATA_DIR.mkdir(parents=True, exist_ok=True)
access_token = _generate_jwt_like()
data = {
"access_token": access_token,
"refresh_token": _generate_refresh_like(),
"expires_at": FAKE_EXPIRES_IN,
}
with open(AUTH_FILE, "w") as f:
json.dump(data, f, indent=2)
return data
@web.middleware
async def request_log_middleware(request: web.Request, handler):
started = time.perf_counter()
response = None
try:
response = await handler(request)
return response
finally:
elapsed_ms = int((time.perf_counter() - started) * 1000)
status = getattr(response, "status", "ERR")
logger.info(
"%s %s -> %s (%d ms)",
request.method,
request.path_qs,
status,
elapsed_ms,
)
def check_auth(request: web.Request) -> bool:
auth_data = load_or_create_auth()
expected_token = auth_data["access_token"]
auth = request.headers.get("Authorization", "")
if auth.lower().startswith("bearer "):
token = auth[7:].strip()
return token == expected_token
return False
async def oauth_authorize_handler(request: web.Request) -> web.Response:
params = request.rel_url.query
redirect_uri = params.get("redirect_uri")
state = params.get("state", "")
if not redirect_uri:
return web.json_response(
{"error": "invalid_request", "error_description": "Missing redirect_uri"},
status=400,
)
code = f"ac_{secrets.token_urlsafe(48)}"
auth_codes[code] = {
"state": state,
"created_at": time.time(),
}
query = urlencode(
{
"code": code,
"scope": "openid profile email offline_access",
"state": state,
}
)
location = f"{redirect_uri}?{query}"
logger.info("OAuth authorize: issued code")
raise web.HTTPFound(location=location)
async def oauth_token_handler(request: web.Request) -> web.Response:
auth_data = load_or_create_auth()
content_type = request.content_type or ""
grant_type = None
refresh_token = None
code = None
if content_type.startswith("application/json"):
body = await request.json()
grant_type = body.get("grant_type")
refresh_token = body.get("refresh_token")
code = body.get("code")
else:
form = await request.post()
grant_type = form.get("grant_type")
refresh_token = form.get("refresh_token")
code = form.get("code")
if grant_type == "authorization_code":
code = str(code) if code else ""
if not code or code not in auth_codes:
return web.json_response(
{
"error": "invalid_grant",
"error_description": "Invalid authorization code",
},
status=400,
)
created_at = auth_codes[code]["created_at"]
del auth_codes[code]
if time.time() - created_at > 300:
return web.json_response(
{
"error": "invalid_grant",
"error_description": "Authorization code expired",
},
status=400,
)
return web.json_response(
{
"access_token": auth_data["access_token"],
"refresh_token": auth_data["refresh_token"],
"token_type": "Bearer",
"expires_in": FAKE_EXPIRES_IN,
}
)
if grant_type == "refresh_token":
if refresh_token != auth_data["refresh_token"]:
return web.json_response(
{
"error": "invalid_grant",
"error_description": "Invalid refresh token",
},
status=400,
)
return web.json_response(
{
"access_token": auth_data["access_token"],
"refresh_token": auth_data["refresh_token"],
"token_type": "Bearer",
"expires_in": FAKE_EXPIRES_IN,
}
)
return web.json_response(
{
"error": "unsupported_grant_type",
"error_description": "Only authorization_code and refresh_token are supported",
},
status=400,
)
async def refresh_tokens_task():
global refresh_in_progress
if refresh_in_progress:
logger.info("Token refresh already in progress")
return
refresh_in_progress = True
logger.info("Starting token refresh...")
try:
success = await get_new_token(headless=False)
if success:
logger.info("Token refresh completed successfully")
else:
logger.error("Token refresh failed")
except Exception as e:
logger.error(f"Error during token refresh: {e}")
finally:
refresh_in_progress = False
async def usage_monitor():
while True:
for _ in range(1):
tokens = load_tokens()
if not tokens:
if not refresh_in_progress:
logger.warning("No tokens found, starting refresh...")
asyncio.create_task(refresh_tokens_task())
break
usage = get_usage_percent(tokens.access_token)
if usage < 0:
logger.warning("Failed to get usage, token may be invalid")
asyncio.create_task(refresh_tokens_task())
break
logger.info(f"Current usage: {usage}%")
if usage >= USAGE_THRESHOLD:
logger.info(
f"Usage {usage}% >= threshold {USAGE_THRESHOLD}%, starting refresh..."
)
asyncio.create_task(refresh_tokens_task())
break
await asyncio.sleep(CHECK_INTERVAL)
async def proxy_handler(request: web.Request) -> web.StreamResponse | web.Response:
if not check_auth(request):
auth = request.headers.get("Authorization", "")
auth_preview = auth[:24] + ("..." if len(auth) > 24 else "")
logger.warning(
"Auth failed: method=%s path=%s auth_present=%s auth_preview=%s ua=%s",
request.method,
request.path,
bool(auth),
auth_preview,
request.headers.get("User-Agent", ""),
)
return web.json_response({"error": "Unauthorized"}, status=401)
tokens = await get_valid_tokens()
if not tokens:
return web.json_response({"error": "No valid tokens"}, status=500)
path = request.path
target_url = f"{CODEX_BASE_URL}{path}"
logger.info(
"Proxying request: %s %s -> %s",
request.method,
request.path_qs,
target_url,
)
headers = {}
for key, value in request.headers.items():
if key.lower() not in ("host", "authorization", "content-length"):
headers[key] = value
headers["Authorization"] = f"Bearer {tokens.access_token}"
if request.method in ("POST", "PUT", "PATCH"):
body = await request.read()
else:
body = None
async with aiohttp.ClientSession() as session:
try:
async with session.request(
method=request.method,
url=target_url,
headers=headers,
data=body,
params=request.query,
) as resp:
content_type = resp.content_type or "application/json"
is_stream = (
content_type == "text/event-stream" or "stream" in content_type
)
if is_stream:
response = web.StreamResponse(
status=resp.status,
reason=resp.reason,
headers={
"Content-Type": content_type,
"Cache-Control": "no-cache",
"Connection": "keep-alive",
},
)
await response.prepare(request)
async for chunk in resp.content.iter_any():
await response.write(chunk)
await response.write_eof()
return response
else:
response_body = await resp.read()
if resp.status >= 400:
preview = response_body[:500].decode("utf-8", errors="replace")
logger.warning(
"Upstream error: status=%s path=%s body=%s",
resp.status,
request.path,
preview,
)
return web.Response(
status=resp.status,
body=response_body,
headers={"Content-Type": content_type},
)
except aiohttp.ClientError as e:
return web.json_response({"error": f"Proxy error: {e}"}, status=502)
async def health_handler(request: web.Request) -> web.Response:
tokens = await get_valid_tokens()
usage = -1
if tokens:
usage = get_usage_percent(tokens.access_token)
return web.json_response(
{
"status": "ok" if tokens else "no_tokens",
"has_tokens": tokens is not None,
"usage_percent": usage,
"refresh_in_progress": refresh_in_progress,
}
)
async def start_background_tasks(app: web.Application):
app["usage_monitor"] = asyncio.create_task(usage_monitor())
async def cleanup_background_tasks(app: web.Application):
app["usage_monitor"].cancel()
try:
await app["usage_monitor"]
except asyncio.CancelledError:
pass
def create_app() -> web.Application:
app = web.Application(middlewares=[request_log_middleware])
app.router.add_get("/oauth/authorize", oauth_authorize_handler)
app.router.add_post("/oauth/token", oauth_token_handler)
app.router.add_get("/health", health_handler)
app.router.add_route("*", "/{path:.*}", proxy_handler)
app.on_startup.append(start_background_tasks)
app.on_cleanup.append(cleanup_background_tasks)
return app
if __name__ == "__main__":
logger.info(f"Starting proxy on port {PORT}")
logger.info(f"Usage threshold: {USAGE_THRESHOLD}%")
logger.info(f"Check interval: {CHECK_INTERVAL}s")
auth_data = load_or_create_auth()
logger.info("Client access token: %s", _mask(auth_data["access_token"]))
logger.info("Client refresh token: %s", _mask(auth_data["refresh_token"]))
startup_tokens = load_tokens()
if startup_tokens:
logger.info("Upstream access token: %s", _mask(startup_tokens.access_token))
else:
logger.warning("No upstream token found at %s", DATA_DIR / "tokens.json")
app = create_app()
web.run_app(app, host="0.0.0.0", port=PORT)

3
src/requirements.txt Normal file
View file

@ -0,0 +1,3 @@
playwright==1.58.0
aiohttp==3.13.3
pkce==1.0.3

96
src/tokens.py Normal file
View file

@ -0,0 +1,96 @@
import json
import time
import os
import base64
from pathlib import Path
from dataclasses import dataclass
import aiohttp
DATA_DIR = Path(os.environ.get("DATA_DIR", "./data"))
TOKENS_FILE = DATA_DIR / "tokens.json"
CLIENT_ID = "app_EMoamEEZ73f0CkXaXp7hrann"
TOKEN_URL = "https://auth.openai.com/oauth/token"
@dataclass
class Tokens:
access_token: str
refresh_token: str
expires_at: float # unix timestamp
@property
def is_expired(self) -> bool:
return time.time() >= self.expires_at - 10
def load_tokens() -> Tokens | None:
if not TOKENS_FILE.exists():
return None
try:
with open(TOKENS_FILE) as f:
data = json.load(f)
access_token = data["access_token"]
return Tokens(
access_token=access_token,
refresh_token=data["refresh_token"],
expires_at=data["expires_at"],
)
except (json.JSONDecodeError, KeyError):
return None
def save_tokens(tokens: Tokens):
TOKENS_FILE.parent.mkdir(parents=True, exist_ok=True)
with open(TOKENS_FILE, "w") as f:
json.dump(
{
"access_token": tokens.access_token,
"refresh_token": tokens.refresh_token,
"expires_at": tokens.expires_at,
},
f,
indent=2,
)
async def refresh_tokens(refresh_token: str) -> Tokens | None:
async with aiohttp.ClientSession() as session:
data = {
"grant_type": "refresh_token",
"refresh_token": refresh_token,
"client_id": CLIENT_ID,
}
async with session.post(TOKEN_URL, data=data) as resp:
if not resp.ok:
text = await resp.text()
print(f"Token refresh failed: {resp.status} {text}")
return None
json_resp = await resp.json()
expires_in = json_resp["expires_in"]
return Tokens(
access_token=json_resp["access_token"],
refresh_token=json_resp["refresh_token"],
expires_at=time.time() + expires_in,
)
async def get_valid_tokens() -> Tokens | None:
tokens = load_tokens()
if not tokens:
print("No tokens found")
return None
if tokens.is_expired:
print("Token expired, refreshing...")
if not tokens.refresh_token:
print("No refresh token available")
return None
new_tokens = await refresh_tokens(tokens.refresh_token)
if not new_tokens:
print("Failed to refresh token")
return None
save_tokens(new_tokens)
return new_tokens
return tokens