diff --git a/.env.example b/.env.example index 769137f..00d9430 100644 --- a/.env.example +++ b/.env.example @@ -1,17 +1,26 @@ # HTTP server port -PORT=80 +PORT=8080 -# HTTPS proxy URL (will be used by Firefox directly) -https_proxy=http://user:pass@host:port +# Target pool size (number of tokens to keep ready) +TARGET_SIZE=5 + +# Poll interval for checking new accounts when pool incomplete (seconds) +POLL_INTERVAL=30 + +# HTTPS proxy URL (used by Firefox and balance API) +HTTPS_PROXY=http://user:pass@host:port # Path to emails.txt (email:password per line) EMAILS_FILE=/data/emails.txt -# Custom Firefox binary path (default: firefox-esr in Docker) +# Firefox binary path FIREFOX_BINARY=firefox-esr -# Number of hot tokens to keep in pool -POOL_SIZE=10 +# Geckodriver path +GECKODRIVER_PATH=/usr/local/bin/geckodriver -# Persistent data directory (tokens, screenshots, extensions) +# Extensions directory (dark-reader.xpi, ublock_origin.xpi) +EXTRAS_DIR=/app/extras + +# Persistent data directory (tokens.txt, used.txt, screenshots) DATA_DIR=/data diff --git a/Dockerfile b/Dockerfile index a300046..ba2d493 100644 --- a/Dockerfile +++ b/Dockerfile @@ -19,16 +19,16 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ libglu1-mesa \ zip && \ rm -rf /var/lib/apt/lists/* && \ - fc-cache -fv && \ + fc-cache -fv # Install geckodriver -RUN GECKO_VERSION=$(curl -s https://api.github.com/repos/mozilla/geckodriver/releases/latest | grep tag_name | cut -d'"' -f4) && \ - curl -fsSL "https://github.com/mozilla/geckodriver/releases/download/${GECKO_VERSION}/geckodriver-${GECKO_VERSION}-linux64.tar.gz" | tar -xzf - -C /usr/local/bin && \ - chmod +x /usr/local/bin/geckodriver +ARG GECKO_VERSION=v0.36.0 +RUN curl -fsSL "https://github.com/mozilla/geckodriver/releases/download/${GECKO_VERSION}/geckodriver-${GECKO_VERSION}-linux64.tar.gz" | \ + tar -xzf - -C /usr/local/bin && chmod +x /usr/local/bin/geckodriver # Download uBlock Origin (latest) -RUN UBLOCK_VERSION=$(curl -s https://api.github.com/repos/gorhill/uBlock/releases/latest | grep tag_name | cut -d'"' -f4) && \ - mkdir -p /extras/extensions && \ +ARG UBLOCK_VERSION=1.69.0 +RUN mkdir -p /extras/extensions && \ curl -fsSL -o /extras/extensions/ublock_origin.xpi \ "https://github.com/gorhill/uBlock/releases/download/${UBLOCK_VERSION}/uBlock0_${UBLOCK_VERSION}.firefox.signed.xpi" @@ -47,6 +47,8 @@ RUN cd /tmp/extension && zip -r /extras/extensions/dark-reader.xpi . && rm -rf / ENV PYTHONUNBUFFERED=1 ENV PORT=80 +ENV TARGET_SIZE=5 +ENV POLL_INTERVAL=30 ENV DATA_DIR=/data ENV EXTRAS_DIR=/extras ENV EMAILS_FILE=/data/emails.txt diff --git a/pyproject.toml b/pyproject.toml index 4e499bb..59b98ae 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,4 +18,11 @@ package = false [dependency-groups] dev = [ "pytest>=9.0.2", + "pytest-asyncio>=0.25.0", + "pytest-aiohttp>=1.1.0", ] + +[tool.pytest.ini_options] +asyncio_mode = "auto" +asyncio_default_fixture_loop_scope = "function" +pythonpath = ["src"] diff --git a/src/emails.py b/src/emails.py index 4d24b1b..e8e548c 100644 --- a/src/emails.py +++ b/src/emails.py @@ -1,9 +1,11 @@ """Email account provider. -Reads accounts from a text file (one per line, email:password format). -When an account is consumed, it is removed from the file. +Reads accounts from emails.txt (one per line, email:password format). +When an account is consumed, it is removed from emails.txt. +Successful registrations go to done.txt, failed to failed.txt. """ +import asyncio import logging import os from dataclasses import dataclass @@ -11,7 +13,12 @@ from pathlib import Path logger = logging.getLogger(__name__) +DATA_DIR = Path(os.environ.get("DATA_DIR", "./data")) EMAILS_FILE = Path(os.environ.get("EMAILS_FILE", "./emails.txt")) +DONE_FILE = DATA_DIR / "done.txt" +FAILED_FILE = DATA_DIR / "failed.txt" + +_file_lock = asyncio.Lock() @dataclass @@ -34,56 +41,68 @@ def _parse_line(line: str) -> EmailAccount | None: return EmailAccount(email=email, password=password) -def peek_accounts(path: Path | None = None) -> list[EmailAccount]: - """Read all accounts without consuming them.""" - path = path or EMAILS_FILE - if not path.exists(): - return [] - lines = path.read_text().strip().splitlines() - accounts = [] - for line in lines: - acc = _parse_line(line) - if acc: - accounts.append(acc) - return accounts +async def has_accounts() -> bool: + """Check if there are any accounts left in emails.txt.""" + if not EMAILS_FILE.exists(): + return False + async with _file_lock: + content = EMAILS_FILE.read_text() + lines = content.splitlines() + for line in lines: + if _parse_line(line): + return True + return False -def pop_account(path: Path | None = None) -> EmailAccount | None: - """Read and remove the first account from the file. +async def pop_account() -> EmailAccount | None: + """Read and remove the first account from emails.txt. Returns the account, or None if the file is empty. """ - path = path or EMAILS_FILE - if not path.exists(): - logger.error("Emails file not found: %s", path) + if not EMAILS_FILE.exists(): + logger.error("Emails file not found: %s", EMAILS_FILE) return None - lines = path.read_text().strip().splitlines() + async with _file_lock: + content = EMAILS_FILE.read_text() + lines = content.splitlines() - account = None - remaining = [] - for line in lines: - if account is None: + account = None + account_idx = -1 + for i, line in enumerate(lines): parsed = _parse_line(line) if parsed: account = parsed - continue - remaining.append(line) + account_idx = i + break - if account is None: - logger.error("No accounts left in %s", path) - return None + if account is None: + logger.error("No accounts left in %s", EMAILS_FILE) + return None - # TODO: Write remaining lines back - # path.write_text("\n".join(remaining) + ("\n" if remaining else "")) - logger.info( - "Popped account %s, %d remaining", - account.email, - len([r for r in remaining if _parse_line(r)]), - ) + remaining_lines = lines[:account_idx] + lines[account_idx + 1 :] + EMAILS_FILE.write_text( + "\n".join(remaining_lines) + ("\n" if remaining_lines else "") + ) + + remaining_count = sum(1 for line in remaining_lines if _parse_line(line)) + logger.info("Popped account %s, %d remaining", account.email, remaining_count) return account -def remaining_count(path: Path | None = None) -> int: - """Count how many accounts are left.""" - return len(peek_accounts(path)) +async def mark_done(email: str) -> None: + """Append email to done.txt after successful registration.""" + DATA_DIR.mkdir(parents=True, exist_ok=True) + async with _file_lock: + with open(DONE_FILE, "a") as f: + f.write(email + "\n") + logger.info("Marked done: %s", email) + + +async def mark_failed(email: str) -> None: + """Append email to failed.txt after failed registration.""" + DATA_DIR.mkdir(parents=True, exist_ok=True) + async with _file_lock: + with open(FAILED_FILE, "a") as f: + f.write(email + "\n") + logger.warning("Marked failed: %s", email) diff --git a/src/pool.py b/src/pool.py index 25b887f..6de925f 100644 --- a/src/pool.py +++ b/src/pool.py @@ -1,218 +1,177 @@ -"""Hot token pool. +"""Token pool with automatic refill. -Maintains a pool of valid tokens ready to be served. -Tokens are validated (non-zero balance) before entering the pool. -A background task keeps the pool filled to the target size. +Maintains a pool of tokens in tokens.txt (one per line). +Automatically refills when tokens are consumed. +Watches for new accounts when pool is incomplete. """ import asyncio -import json import logging import os -from dataclasses import dataclass, field from pathlib import Path -from typing import Any - -from usage import get_balance -from registration import register_kilo_account -from proxy import rotate_proxy_ip -from emails import remaining_count logger = logging.getLogger(__name__) -POOL_SIZE = int(os.environ.get("POOL_SIZE", "10")) DATA_DIR = Path(os.environ.get("DATA_DIR", "./data")) -POOL_FILE = DATA_DIR / "pool.json" -REGISTRATION_MAX_ATTEMPTS = 4 +TOKENS_FILE = DATA_DIR / "tokens.txt" +TARGET_SIZE = int(os.environ.get("TARGET_SIZE", "5")) +POLL_INTERVAL = int(os.environ.get("POLL_INTERVAL", "30")) + +_file_lock = asyncio.Lock() +_registration_lock = asyncio.Lock() +_refill_task: asyncio.Task | None = None -@dataclass -class PoolToken: - api_key: str - balance: dict[str, Any] = field(default_factory=dict) +def _ensure_files() -> None: + DATA_DIR.mkdir(parents=True, exist_ok=True) + if not TOKENS_FILE.exists(): + TOKENS_FILE.touch() -class TokenPool: - """Hot pool of pre-registered, validated tokens. +async def get_first_token() -> str | None: + """Peek at first token without removing it.""" + _ensure_files() + async with _file_lock: + content = TOKENS_FILE.read_text().strip() + if not content: + return None + tokens = content.split() + return tokens[0] if tokens else None - Pool is persisted to disk so tokens survive container restarts. + +async def pop_token() -> str | None: + """Remove and return first token.""" + _ensure_files() + async with _file_lock: + content = TOKENS_FILE.read_text().strip() + if not content: + return None + tokens = content.split() + if not tokens: + return None + first = tokens[0] + rest = tokens[1:] + TOKENS_FILE.write_text("\n".join(rest) + ("\n" if rest else "")) + return first + + +async def pool_size() -> int: + """Current pool size.""" + _ensure_files() + async with _file_lock: + content = TOKENS_FILE.read_text().strip() + return len(content.split()) if content else 0 + + +async def append_token(token: str) -> None: + """Add token to pool.""" + _ensure_files() + async with _file_lock: + with open(TOKENS_FILE, "a") as f: + f.write(token.strip() + "\n") + + +async def trigger_refill() -> None: + """Start refill worker if not already running.""" + global _refill_task + if _refill_task is None or _refill_task.done(): + _refill_task = asyncio.create_task(_refill_worker()) + + +async def _refill_worker() -> None: + """Background worker that refills pool to TARGET_SIZE. + + Watches for new accounts when pool is incomplete. + If no accounts available, polls every POLL_INTERVAL seconds. """ + logger.info( + "Refill worker started (target: %d, poll: %ds)", TARGET_SIZE, POLL_INTERVAL + ) - def __init__(self, target_size: int = POOL_SIZE): - self.target_size = target_size - self._tokens: list[PoolToken] = [] - self._lock = asyncio.Lock() - self._fill_task: asyncio.Task | None = None - self._load_from_disk() + while True: + async with _file_lock: + content = TOKENS_FILE.read_text().strip() + size = len(content.split()) if content else 0 - def _load_from_disk(self) -> None: - """Load saved pool from disk.""" - if not POOL_FILE.exists(): + if size >= TARGET_SIZE: + logger.info("Pool full: %d/%d tokens", size, TARGET_SIZE) return - try: - with open(POOL_FILE) as f: - data = json.load(f) - if not isinstance(data, list): - return - for item in data: - if isinstance(item, dict) and "api_key" in item: - self._tokens.append( - PoolToken( - api_key=item["api_key"], - balance=item.get("balance", {}), - ) - ) - if self._tokens: - logger.info("Loaded %d tokens from disk", len(self._tokens)) - except (json.JSONDecodeError, OSError) as e: - logger.warning("Failed to load pool from disk: %s", e) - def _save_to_disk(self) -> None: - """Persist current pool to disk.""" - POOL_FILE.parent.mkdir(parents=True, exist_ok=True) - data = [{"api_key": t.api_key, "balance": t.balance} for t in self._tokens] - try: - with open(POOL_FILE, "w") as f: - json.dump(data, f, indent=2) - except OSError as e: - logger.warning("Failed to save pool to disk: %s", e) - - @property - def size(self) -> int: - return len(self._tokens) - - @property - def is_full(self) -> bool: - return self.size >= self.target_size - - async def get_token(self) -> PoolToken | None: - """Get a token from the pool. Returns None if pool is empty.""" - async with self._lock: - if not self._tokens: - return None - token = self._tokens.pop(0) - self._save_to_disk() - - # Trigger background refill - self._ensure_filling() - return token - - async def _validate_token(self, api_key: str) -> dict[str, Any] | None: - """Validate that a token has non-zero balance. Returns balance data or None.""" - balance_data = await get_balance(api_key) - if balance_data is None: - logger.warning("Token validation failed: could not fetch balance") - return None - - # Check for non-zero balance - balance_value = balance_data.get("balance", balance_data.get("remaining", 0)) - if not balance_value or balance_value <= 0: - logger.warning("Token has zero balance: %s", balance_data) - return None - - logger.info("Token validated, balance: %s", balance_data) - return balance_data - - async def _register_one(self) -> PoolToken | None: - """Register a single new account and validate it.""" - for attempt in range(1, REGISTRATION_MAX_ATTEMPTS + 1): + has = await _has_accounts_internal() + if not has: logger.info( - "Pool registration attempt %d/%d", attempt, REGISTRATION_MAX_ATTEMPTS + "No accounts, waiting for new ones (pool: %d/%d, checking every %ds)", + size, + TARGET_SIZE, + POLL_INTERVAL, ) + await asyncio.sleep(POLL_INTERVAL) + continue - result = await register_kilo_account() - if not result: - logger.warning("Registration attempt %d failed", attempt) - await asyncio.sleep(1.5 * attempt) + logger.info("Pool: %d/%d tokens, registering new account...", size, TARGET_SIZE) + + async with _registration_lock: + async with _file_lock: + content = TOKENS_FILE.read_text().strip() + size = len(content.split()) if content else 0 + + if size >= TARGET_SIZE: + logger.info("Pool full: %d/%d tokens", size, TARGET_SIZE) + return + + if not await _has_accounts_internal(): + logger.info( + "No accounts, waiting for new ones (pool: %d/%d)", + size, + TARGET_SIZE, + ) continue - balance = await self._validate_token(result.access_token) - if balance is not None: - return PoolToken(api_key=result.access_token, balance=balance) - - logger.warning("Registered token invalid (zero balance), rotating IP") - await rotate_proxy_ip() - await asyncio.sleep(1.5 * attempt) - - return None - - async def _fill_pool(self) -> None: - """Fill the pool up to target size.""" - while self.size < self.target_size: - emails_left = remaining_count() - if emails_left == 0: + ok = await _register_one() + if not ok: logger.warning( - "No email accounts left, cannot fill pool further (pool size: %d/%d)", - self.size, - self.target_size, + "Registration failed, will retry (pool: %d/%d)", + size, + TARGET_SIZE, ) - break + await asyncio.sleep(5) + continue - logger.info( - "Pool fill: %d/%d, registering new token...", - self.size, - self.target_size, - ) + logger.info("Registration successful, checking pool...") - token = await self._register_one() - if token: - async with self._lock: - self._tokens.append(token) - self._save_to_disk() - logger.info( - "Pool fill: added token, now %d/%d", self.size, self.target_size - ) - else: - logger.error("Pool fill: failed to register, stopping fill cycle") - break - def _ensure_filling(self) -> None: - """Start background fill task if not already running.""" - if self._fill_task and not self._fill_task.done(): - return - self._fill_task = asyncio.create_task(self._safe_fill()) +async def _has_accounts_internal() -> bool: + """Internal check without lock (caller must hold appropriate locks).""" + from emails import EMAILS_FILE, _parse_line - async def _safe_fill(self) -> None: - try: - await self._fill_pool() - except Exception: - logger.exception("Pool fill error") + if not EMAILS_FILE.exists(): + return False + content = EMAILS_FILE.read_text() + for line in content.splitlines(): + if _parse_line(line): + return True + return False - async def startup_fill(self) -> None: - """Initial fill on startup. Revalidates loaded tokens, then fills to target.""" - if self._tokens: - logger.info( - "Pool startup: %d tokens loaded from disk, revalidating...", self.size - ) - valid = [] - for token in self._tokens: - balance = await self._validate_token(token.api_key) - if balance is not None: - token.balance = balance - valid.append(token) - else: - logger.warning("Pool startup: discarding invalid token") - async with self._lock: - self._tokens = valid - self._save_to_disk() - logger.info("Pool startup: %d tokens valid after revalidation", self.size) - logger.info("Pool startup: filling to %d tokens...", self.target_size) - await self._fill_pool() - logger.info("Pool startup: %d tokens ready", self.size) +async def _register_one() -> bool: + """Register one account. Returns True on success.""" + from registration import register_kilo_account - async def shutdown(self) -> None: - """Cancel background fill task.""" - if self._fill_task and not self._fill_task.done(): - self._fill_task.cancel() - try: - await self._fill_task - except asyncio.CancelledError: - pass + return await register_kilo_account() - def status(self) -> dict[str, Any]: - return { - "pool_size": self.size, - "target_size": self.target_size, - "is_full": self.is_full, - } + +async def wait_for_token() -> str | None: + """Wait for a token to be available. Returns None if no accounts left.""" + async with _registration_lock: + token = await get_first_token() + if token: + return token + + if not await _has_accounts_internal(): + return None + + ok = await _register_one() + if not ok: + return None + + return await get_first_token() diff --git a/src/registration.py b/src/registration.py index cdb6931..31e2ef8 100644 --- a/src/registration.py +++ b/src/registration.py @@ -21,9 +21,8 @@ from selenium.common.exceptions import ( WebDriverException, ) -from tokens import ProviderTokens from proxy import HTTPS_PROXY, rotate_proxy_ip -from emails import pop_account +from emails import pop_account, mark_done, mark_failed logger = logging.getLogger(__name__) @@ -329,7 +328,7 @@ def _try_register_once_sync( # Wait for Google WebDriverWait(driver, 30).until(EC.url_contains("accounts.google.com")) - logger.info("[4/6] Google sign-in page loaded: %s", driver.current_url) + logger.info("[4/6] Google sign-in page loaded: %s", driver.current_url.split('?', 1)[0]) # Step 5: Google sign-in logger.info("[5/6] Signing in with Google (%s)...", email) @@ -399,18 +398,20 @@ def _try_register_once_sync( return None -async def register_kilo_account() -> ProviderTokens | None: +async def register_kilo_account() -> bool: """Register a new Kilo account via Google OAuth using Selenium Firefox. Pops one email account from emails.txt and attempts registration. + On success, appends token to pool, marks email done, returns True. + On failure, marks email failed, returns False. Rotates proxy IP between attempts if needed. """ logger.info("=== Starting Kilo account registration (Google OAuth) ===") - account = pop_account() + account = await pop_account() if not account: logger.error("No email accounts available") - return None + return False driver: WebDriver | None = None @@ -418,8 +419,6 @@ async def register_kilo_account() -> ProviderTokens | None: driver = await asyncio.to_thread(_create_firefox_driver) for ip_attempt in range(MAX_IP_ROTATIONS): - # driver.get("http://localhost:8005/") - # await asyncio.sleep(100000000000000000) # for debugging if ip_attempt > 0: logger.info( "Rotating proxy IP (attempt %d/%d)...", @@ -442,17 +441,26 @@ async def register_kilo_account() -> ProviderTokens | None: ) if api_key: - return ProviderTokens( - access_token=api_key, - refresh_token=None, - expires_at=0, - ) + from pool import append_token + + await append_token(api_key) + await mark_done(account.email) + logger.info("Token added to pool: %s...", api_key[:10]) + return True await asyncio.sleep(2) + await mark_failed(account.email) logger.error("All registration attempts exhausted for %s", account.email) - return None + return False except Exception as e: + await mark_failed(account.email) logger.error("Fatal registration error: %s", e) - return None + return False + finally: + if driver: + try: + driver.quit() + except Exception: + pass diff --git a/src/server.py b/src/server.py index f32a942..4dd2d0e 100644 --- a/src/server.py +++ b/src/server.py @@ -1,38 +1,70 @@ -import asyncio import logging import os from aiohttp import web -from pool import TokenPool +from pool import get_first_token, pop_token, pool_size, trigger_refill, wait_for_token +from usage import get_balance +from emails import has_accounts logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) PORT = int(os.environ.get("PORT", 8080)) -pool = TokenPool() + +async def on_startup(app: web.Application): + del app + size = await pool_size() + has = await has_accounts() + logger.info("Startup: pool=%d, accounts=%s", size, "yes" if has else "no") + + if size == 0 and has: + logger.info("Pool empty, starting initial fill...") + elif size > 0: + logger.info("Pool has %d tokens, triggering background fill check", size) + + await trigger_refill() async def token_handler(request: web.Request) -> web.Response: del request - token = await pool.get_token() - if not token: - return web.json_response( - {"error": "No tokens available", "pool": pool.status()}, - status=503, + while True: + token = await get_first_token() + + if token is None: + token = await wait_for_token() + if token is None: + logger.error("Token pool is empty and no accounts left") + return web.json_response({"token": "public"}) + + current_size = await pool_size() + logger.info("token: %s pool: %d balance: -", token[:5], current_size) + return web.json_response({"token": token}) + + balance_data = await get_balance(token) + + if balance_data is None: + await pop_token() + await trigger_refill() + continue + + balance = balance_data.get("balance", balance_data.get("remaining", 0)) + if balance is None or balance <= 0: + await pop_token() + await trigger_refill() + continue + + current_size = await pool_size() + calculated_balance = balance + (current_size - 1) * 5 + logger.info( + "token: %s pool: %d balance: %.2f", + token[:5], + current_size, + calculated_balance, ) - - logger.info("Token issued (pool: %d/%d)", pool.size, pool.target_size) - - return web.json_response( - { - "token": token.api_key, - "balance": token.balance, - "pool": pool.status(), - } - ) + return web.json_response({"token": token}) async def health_handler(request: web.Request) -> web.Response: @@ -42,24 +74,20 @@ async def health_handler(request: web.Request) -> web.Response: async def status_handler(request: web.Request) -> web.Response: del request - return web.json_response(pool.status()) - - -async def on_startup(app: web.Application): - del app - logger.info("Startup: filling token pool...") - await pool.startup_fill() - - -async def on_cleanup(app: web.Application): - del app - await pool.shutdown() + size = await pool_size() + has = await has_accounts() + return web.json_response( + { + "pool_size": size, + "target_size": int(os.environ.get("TARGET_SIZE", 5)), + "has_accounts": has, + } + ) def create_app() -> web.Application: app = web.Application() app.on_startup.append(on_startup) - app.on_cleanup.append(on_cleanup) app.router.add_get("/health", health_handler) app.router.add_get("/token", token_handler) app.router.add_get("/status", status_handler) @@ -67,9 +95,7 @@ def create_app() -> web.Application: def main(): - logger.info( - "Starting kilocode on port %s (pool target: %d)", PORT, pool.target_size - ) + logger.info("Starting server on port %s", PORT) app = create_app() web.run_app(app, host="0.0.0.0", port=PORT) diff --git a/src/tokens.py b/src/tokens.py index 2bf51eb..7a6f042 100644 --- a/src/tokens.py +++ b/src/tokens.py @@ -1,16 +1,6 @@ -import json -import logging -import os -import tempfile from dataclasses import dataclass -from pathlib import Path from typing import Any -logger = logging.getLogger(__name__) - -DATA_DIR = Path(os.environ.get("DATA_DIR", "./data")) -TOKENS_FILE = DATA_DIR / "kilo_tokens.json" - @dataclass class ProviderTokens: @@ -18,111 +8,3 @@ class ProviderTokens: refresh_token: str | None expires_at: float metadata: dict[str, Any] | None = None - - -def _tokens_to_dict(tokens: ProviderTokens) -> dict[str, Any]: - return { - "access_token": tokens.access_token, - "refresh_token": tokens.refresh_token, - "expires_at": tokens.expires_at, - } - - -def _dict_to_tokens(data: dict[str, Any] | None) -> ProviderTokens | None: - if not isinstance(data, dict): - return None - try: - return ProviderTokens( - access_token=data["access_token"], - refresh_token=data.get("refresh_token"), - expires_at=data.get("expires_at", 0), - ) - except KeyError, TypeError: - return None - - -def _load_raw() -> dict[str, Any] | None: - if not TOKENS_FILE.exists(): - return None - try: - with open(TOKENS_FILE) as f: - data = json.load(f) - if isinstance(data, dict): - return data - return None - except json.JSONDecodeError: - return None - - -def _save_raw(data: dict[str, Any]) -> None: - TOKENS_FILE.parent.mkdir(parents=True, exist_ok=True) - fd, tmp_path = tempfile.mkstemp( - prefix=f"{TOKENS_FILE.name}.", - suffix=".tmp", - dir=str(TOKENS_FILE.parent), - ) - try: - with os.fdopen(fd, "w") as f: - json.dump(data, f, indent=2) - f.flush() - os.fsync(f.fileno()) - os.replace(tmp_path, TOKENS_FILE) - finally: - if os.path.exists(tmp_path): - os.unlink(tmp_path) - - -def _normalize_state(data: dict[str, Any] | None) -> dict[str, Any]: - if not data: - return {"active": None, "next_account": None} - if "active" in data or "next_account" in data: - return { - "active": data.get("active"), - "next_account": data.get("next_account"), - } - return {"active": data, "next_account": None} - - -def load_state() -> tuple[ProviderTokens | None, ProviderTokens | None]: - normalized = _normalize_state(_load_raw()) - active = _dict_to_tokens(normalized.get("active")) - next_account = _dict_to_tokens(normalized.get("next_account")) - return active, next_account - - -def save_state( - active: ProviderTokens | None, next_account: ProviderTokens | None -) -> None: - payload = { - "active": _tokens_to_dict(active) if active else None, - "next_account": _tokens_to_dict(next_account) if next_account else None, - } - _save_raw(payload) - - -def load_tokens() -> ProviderTokens | None: - active, _ = load_state() - return active - - -def load_next_tokens() -> ProviderTokens | None: - _, next_account = load_state() - return next_account - - -def save_tokens(tokens: ProviderTokens) -> None: - _, next_account = load_state() - save_state(tokens, next_account) - - -def promote_next_tokens() -> bool: - _, next_account = load_state() - if not next_account: - return False - save_state(next_account, None) - return True - - -def clear_next_tokens() -> None: - active, _ = load_state() - save_state(active, None) diff --git a/tests/test_emails.py b/tests/test_emails.py index 423fc93..bbfd82a 100644 --- a/tests/test_emails.py +++ b/tests/test_emails.py @@ -1,4 +1,6 @@ -from emails import pop_account, peek_accounts, remaining_count, _parse_line +import pytest + +from emails import pop_account, has_accounts, mark_done, mark_failed, _parse_line import emails as em @@ -25,57 +27,103 @@ def test_parse_line_malformed(): assert _parse_line("no-colon-here") is None -def test_peek_accounts(tmp_path, monkeypatch): +@pytest.mark.asyncio +async def test_has_accounts_true(tmp_path, monkeypatch): f = tmp_path / "emails.txt" f.write_text("a@b.com:pass1\nc@d.com:pass2\n") monkeypatch.setattr(em, "EMAILS_FILE", f) - accounts = peek_accounts() - assert len(accounts) == 2 - assert accounts[0].email == "a@b.com" - assert accounts[1].email == "c@d.com" - - # peek doesn't consume - assert remaining_count() == 2 + assert await has_accounts() is True -def test_pop_account(tmp_path, monkeypatch): - f = tmp_path / "emails.txt" - f.write_text("a@b.com:pass1\nc@d.com:pass2\ne@f.com:pass3\n") - monkeypatch.setattr(em, "EMAILS_FILE", f) - - acc = pop_account() - assert acc is not None - assert acc.email == "a@b.com" - assert remaining_count() == 2 - - acc = pop_account() - assert acc is not None - assert acc.email == "c@d.com" - assert remaining_count() == 1 - - -def test_pop_account_empty(tmp_path, monkeypatch): +@pytest.mark.asyncio +async def test_has_accounts_false(tmp_path, monkeypatch): f = tmp_path / "emails.txt" f.write_text("") monkeypatch.setattr(em, "EMAILS_FILE", f) - assert pop_account() is None + assert await has_accounts() is False -def test_pop_account_missing_file(tmp_path, monkeypatch): +@pytest.mark.asyncio +async def test_has_accounts_missing_file(tmp_path, monkeypatch): monkeypatch.setattr(em, "EMAILS_FILE", tmp_path / "nope.txt") - assert pop_account() is None + assert await has_accounts() is False -def test_pop_skips_comments(tmp_path, monkeypatch): - f = tmp_path / "emails.txt" - f.write_text("# first is comment\na@b.com:pass1\n") - monkeypatch.setattr(em, "EMAILS_FILE", f) +@pytest.mark.asyncio +async def test_pop_account_removes_from_file(tmp_path, monkeypatch): + emails_file = tmp_path / "emails.txt" + emails_file.write_text("a@b.com:pass1\nc@d.com:pass2\ne@f.com:pass3\n") + monkeypatch.setattr(em, "EMAILS_FILE", emails_file) + monkeypatch.setattr(em, "DATA_DIR", tmp_path) + monkeypatch.setattr(em, "DONE_FILE", tmp_path / "done.txt") + monkeypatch.setattr(em, "FAILED_FILE", tmp_path / "failed.txt") - acc = pop_account() + acc = await pop_account() assert acc is not None assert acc.email == "a@b.com" - # Comment line stays in file - remaining = f.read_text().strip() - assert remaining == "# first is comment" + + remaining = emails_file.read_text() + assert "a@b.com" not in remaining + assert "c@d.com" in remaining + + +@pytest.mark.asyncio +async def test_pop_account_empty(tmp_path, monkeypatch): + f = tmp_path / "emails.txt" + f.write_text("") + monkeypatch.setattr(em, "EMAILS_FILE", f) + monkeypatch.setattr(em, "DATA_DIR", tmp_path) + monkeypatch.setattr(em, "DONE_FILE", tmp_path / "done.txt") + monkeypatch.setattr(em, "FAILED_FILE", tmp_path / "failed.txt") + + assert await pop_account() is None + + +@pytest.mark.asyncio +async def test_pop_account_missing_file(tmp_path, monkeypatch): + monkeypatch.setattr(em, "EMAILS_FILE", tmp_path / "nope.txt") + assert await pop_account() is None + + +@pytest.mark.asyncio +async def test_mark_done(tmp_path, monkeypatch): + done_file = tmp_path / "done.txt" + monkeypatch.setattr(em, "DATA_DIR", tmp_path) + monkeypatch.setattr(em, "DONE_FILE", done_file) + + await mark_done("test@example.com") + + content = done_file.read_text() + assert "test@example.com" in content + + +@pytest.mark.asyncio +async def test_mark_failed(tmp_path, monkeypatch): + failed_file = tmp_path / "failed.txt" + monkeypatch.setattr(em, "DATA_DIR", tmp_path) + monkeypatch.setattr(em, "FAILED_FILE", failed_file) + + await mark_failed("test@example.com") + + content = failed_file.read_text() + assert "test@example.com" in content + + +@pytest.mark.asyncio +async def test_pop_all_accounts(tmp_path, monkeypatch): + f = tmp_path / "emails.txt" + f.write_text("a@b.com:pass1\n") + monkeypatch.setattr(em, "EMAILS_FILE", f) + monkeypatch.setattr(em, "DATA_DIR", tmp_path) + monkeypatch.setattr(em, "DONE_FILE", tmp_path / "done.txt") + monkeypatch.setattr(em, "FAILED_FILE", tmp_path / "failed.txt") + + acc1 = await pop_account() + assert acc1.email == "a@b.com" + + acc2 = await pop_account() + assert acc2 is None + + assert await has_accounts() is False diff --git a/tests/test_pool.py b/tests/test_pool.py new file mode 100644 index 0000000..bdeed6e --- /dev/null +++ b/tests/test_pool.py @@ -0,0 +1,126 @@ +import pytest + +from pool import ( + append_token, + pop_token, + pool_size, + get_first_token, + trigger_refill, + wait_for_token, +) +import pool as p + + +@pytest.fixture +def fresh_pool(tmp_path, monkeypatch): + tokens_file = tmp_path / "tokens.txt" + tokens_file.write_text("") + monkeypatch.setattr(p, "TOKENS_FILE", tokens_file) + monkeypatch.setattr(p, "DATA_DIR", tmp_path) + return tokens_file + + +@pytest.mark.asyncio +async def test_empty_pool(fresh_pool): + assert await get_first_token() is None + assert await pop_token() is None + assert await pool_size() == 0 + + +@pytest.mark.asyncio +async def test_append_token(fresh_pool): + await append_token("token1") + assert await pool_size() == 1 + assert await get_first_token() == "token1" + + +@pytest.mark.asyncio +async def test_append_multiple_tokens(fresh_pool): + await append_token("token1") + await append_token("token2") + await append_token("token3") + + assert await pool_size() == 3 + assert await get_first_token() == "token1" + + +@pytest.mark.asyncio +async def test_pop_token(fresh_pool): + await append_token("token1") + await append_token("token2") + + t = await pop_token() + assert t == "token1" + assert await pool_size() == 1 + assert await get_first_token() == "token2" + + +@pytest.mark.asyncio +async def test_pop_until_empty(fresh_pool): + await append_token("only_one") + + t = await pop_token() + assert t == "only_one" + + assert await pop_token() is None + assert await pool_size() == 0 + + +@pytest.mark.asyncio +async def test_get_first_token_peek(fresh_pool): + await append_token("token1") + + t = await get_first_token() + assert t == "token1" + + t2 = await get_first_token() + assert t2 == "token1" + + assert await pool_size() == 1 + + +@pytest.mark.asyncio +async def test_token_with_whitespace(fresh_pool): + await append_token(" token_with_spaces ") + t = await get_first_token() + assert t == "token_with_spaces" + + +@pytest.mark.asyncio +async def test_persist_to_file(fresh_pool): + await append_token("token1") + await append_token("token2") + + content = fresh_pool.read_text() + assert "token1" in content + assert "token2" in content + + +@pytest.mark.asyncio +async def test_wait_for_token_with_existing(fresh_pool): + await append_token("existing_token") + + token = await wait_for_token() + assert token == "existing_token" + + +@pytest.mark.asyncio +async def test_wait_for_token_empty_no_accounts(fresh_pool, tmp_path, monkeypatch): + import emails + + emails_file = tmp_path / "emails.txt" + emails_file.write_text("") + monkeypatch.setattr(emails, "EMAILS_FILE", emails_file) + + token = await wait_for_token() + assert token is None + + +@pytest.mark.asyncio +async def test_trigger_refill_does_not_block(fresh_pool): + import asyncio + + await trigger_refill() + await asyncio.sleep(0.01) + + assert True diff --git a/tests/test_server.py b/tests/test_server.py new file mode 100644 index 0000000..fbda93d --- /dev/null +++ b/tests/test_server.py @@ -0,0 +1,141 @@ +import pytest +from unittest.mock import AsyncMock, patch +from aiohttp import web +from aiohttp.test_utils import AioHTTPTestCase + +from server import create_app + + +class TestServer(AioHTTPTestCase): + async def get_application(self): + return create_app() + + async def test_health(self): + resp = await self.client.get("/health") + assert resp.status == 200 + text = await resp.text() + assert text == "ok" + + @patch("server.get_first_token", new_callable=AsyncMock) + @patch("server.wait_for_token", new_callable=AsyncMock) + async def test_token_empty_pool_no_accounts(self, mock_wait, mock_first): + mock_first.return_value = None + mock_wait.return_value = None + + resp = await self.client.get("/token") + assert resp.status == 200 + data = await resp.json() + assert data["token"] == "public" + + @patch("server.get_first_token", new_callable=AsyncMock) + @patch("server.wait_for_token", new_callable=AsyncMock) + async def test_token_empty_pool_waits_then_gets(self, mock_wait, mock_first): + mock_first.side_effect = [None, "new_token"] + mock_wait.return_value = "new_token" + + resp = await self.client.get("/token") + assert resp.status == 200 + data = await resp.json() + assert data["token"] == "new_token" + + @patch("server.get_first_token", new_callable=AsyncMock) + @patch("server.pop_token", new_callable=AsyncMock) + @patch("server.trigger_refill", new_callable=AsyncMock) + @patch("server.pool_size", new_callable=AsyncMock) + @patch("server.get_balance", new_callable=AsyncMock) + async def test_token_valid( + self, mock_balance, mock_size, mock_refill, mock_pop, mock_first + ): + mock_first.return_value = "test_token_12345" + mock_balance.return_value = {"balance": 10.0} + mock_size.return_value = 3 + + resp = await self.client.get("/token") + assert resp.status == 200 + data = await resp.json() + assert data["token"] == "test_token_12345" + mock_pop.assert_not_called() + + @patch("server.get_first_token", new_callable=AsyncMock) + @patch("server.pop_token", new_callable=AsyncMock) + @patch("server.trigger_refill", new_callable=AsyncMock) + @patch("server.pool_size", new_callable=AsyncMock) + @patch("server.get_balance", new_callable=AsyncMock) + async def test_token_zero_balance_removed( + self, mock_balance, mock_size, mock_refill, mock_pop, mock_first + ): + mock_first.side_effect = ["bad_token", "good_token"] + mock_balance.side_effect = [{"balance": 0}, {"balance": 15.0}] + mock_size.return_value = 2 + + resp = await self.client.get("/token") + assert resp.status == 200 + data = await resp.json() + assert data["token"] == "good_token" + assert mock_pop.call_count == 1 + + @patch("server.get_first_token", new_callable=AsyncMock) + @patch("server.pop_token", new_callable=AsyncMock) + @patch("server.trigger_refill", new_callable=AsyncMock) + @patch("server.pool_size", new_callable=AsyncMock) + @patch("server.get_balance", new_callable=AsyncMock) + async def test_token_balance_fetch_fails( + self, mock_balance, mock_size, mock_refill, mock_pop, mock_first + ): + mock_first.side_effect = ["bad_token", "good_token"] + mock_balance.side_effect = [None, {"balance": 10.0}] + mock_size.return_value = 2 + + resp = await self.client.get("/token") + assert resp.status == 200 + data = await resp.json() + assert data["token"] == "good_token" + mock_pop.assert_called() + + @patch("server.get_first_token", new_callable=AsyncMock) + @patch("server.pop_token", new_callable=AsyncMock) + @patch("server.trigger_refill", new_callable=AsyncMock) + @patch("server.pool_size", new_callable=AsyncMock) + @patch("server.get_balance", new_callable=AsyncMock) + async def test_token_balance_with_remaining_key( + self, mock_balance, mock_size, mock_refill, mock_pop, mock_first + ): + mock_first.return_value = "test_token" + mock_balance.return_value = {"remaining": 20.0} + mock_size.return_value = 1 + + resp = await self.client.get("/token") + assert resp.status == 200 + data = await resp.json() + assert data["token"] == "test_token" + + @patch("server.has_accounts", new_callable=AsyncMock) + @patch("server.pool_size", new_callable=AsyncMock) + async def test_status(self, mock_size, mock_has): + mock_size.return_value = 5 + mock_has.return_value = True + + resp = await self.client.get("/status") + assert resp.status == 200 + data = await resp.json() + assert data["pool_size"] == 5 + assert data["has_accounts"] is True + + @patch("server.get_first_token", new_callable=AsyncMock) + @patch("server.pop_token", new_callable=AsyncMock) + @patch("server.trigger_refill", new_callable=AsyncMock) + @patch("server.wait_for_token", new_callable=AsyncMock) + @patch("server.pool_size", new_callable=AsyncMock) + @patch("server.get_balance", new_callable=AsyncMock) + async def test_all_tokens_exhausted_then_public( + self, mock_balance, mock_size, mock_wait, mock_refill, mock_pop, mock_first + ): + mock_first.side_effect = ["token1", "token2", None] + mock_balance.side_effect = [{"balance": 0}, {"balance": -5}] + mock_wait.return_value = None + mock_size.return_value = 0 + + resp = await self.client.get("/token") + assert resp.status == 200 + data = await resp.json() + assert data["token"] == "public" diff --git a/tests/test_tokens.py b/tests/test_tokens.py index c2b34c3..16fd071 100644 --- a/tests/test_tokens.py +++ b/tests/test_tokens.py @@ -1,72 +1,23 @@ -import json - -from tokens import ( - ProviderTokens, - load_state, - save_state, - save_tokens, - promote_next_tokens, - clear_next_tokens, -) -import tokens as t +from tokens import ProviderTokens -def test_save_and_load_state(tmp_path, monkeypatch): - monkeypatch.setattr(t, "TOKENS_FILE", tmp_path / "tokens.json") - - active = ProviderTokens("key1", None, 0) - nxt = ProviderTokens("key2", None, 0) - save_state(active, nxt) - - a, n = load_state() - assert a is not None and a.access_token == "key1" - assert n is not None and n.access_token == "key2" +def test_provider_tokens_basic(): + t = ProviderTokens(access_token="abc123", refresh_token=None, expires_at=0) + assert t.access_token == "abc123" + assert t.refresh_token is None + assert t.expires_at == 0 -def test_promote_next_tokens(tmp_path, monkeypatch): - monkeypatch.setattr(t, "TOKENS_FILE", tmp_path / "tokens.json") - - save_state(ProviderTokens("key1", None, 0), ProviderTokens("key2", None, 0)) - assert promote_next_tokens() is True - - a, n = load_state() - assert a is not None and a.access_token == "key2" - assert n is None +def test_provider_tokens_with_metadata(): + t = ProviderTokens( + access_token="key", + refresh_token="refresh", + expires_at=12345.0, + metadata={"foo": "bar"}, + ) + assert t.metadata == {"foo": "bar"} -def test_clear_next_tokens(tmp_path, monkeypatch): - monkeypatch.setattr(t, "TOKENS_FILE", tmp_path / "tokens.json") - - save_state(ProviderTokens("key1", None, 0), ProviderTokens("key2", None, 0)) - clear_next_tokens() - - a, n = load_state() - assert a is not None and a.access_token == "key1" - assert n is None - - -def test_save_tokens_preserves_next(tmp_path, monkeypatch): - monkeypatch.setattr(t, "TOKENS_FILE", tmp_path / "tokens.json") - - save_state(ProviderTokens("key1", None, 0), ProviderTokens("key2", None, 0)) - save_tokens(ProviderTokens("key3", None, 0)) - - a, n = load_state() - assert a is not None and a.access_token == "key3" - assert n is not None and n.access_token == "key2" - - -def test_load_missing_file(tmp_path, monkeypatch): - monkeypatch.setattr(t, "TOKENS_FILE", tmp_path / "missing.json") - a, n = load_state() - assert a is None and n is None - - -def test_atomic_write(tmp_path, monkeypatch): - f = tmp_path / "tokens.json" - monkeypatch.setattr(t, "TOKENS_FILE", f) - - save_state(ProviderTokens("x", None, 0), None) - with open(f) as fp: - data = json.load(fp) - assert data["active"]["access_token"] == "x" +def test_provider_tokens_default_metadata(): + t = ProviderTokens(access_token="x", refresh_token=None, expires_at=0) + assert t.metadata is None diff --git a/uv.lock b/uv.lock index dc50029..56d5fd5 100644 --- a/uv.lock +++ b/uv.lock @@ -203,6 +203,8 @@ dev = [ [package.dev-dependencies] dev = [ { name = "pytest" }, + { name = "pytest-aiohttp" }, + { name = "pytest-asyncio" }, ] [package.metadata] @@ -214,7 +216,11 @@ requires-dist = [ provides-extras = ["dev"] [package.metadata.requires-dev] -dev = [{ name = "pytest", specifier = ">=9.0.2" }] +dev = [ + { name = "pytest", specifier = ">=9.0.2" }, + { name = "pytest-aiohttp", specifier = ">=1.1.0" }, + { name = "pytest-asyncio", specifier = ">=0.25.0" }, +] [[package]] name = "multidict" @@ -373,6 +379,32 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/3b/ab/b3226f0bd7cdcf710fbede2b3548584366da3b19b5021e74f5bde2a8fa3f/pytest-9.0.2-py3-none-any.whl", hash = "sha256:711ffd45bf766d5264d487b917733b453d917afd2b0ad65223959f59089f875b", size = 374801, upload-time = "2025-12-06T21:30:49.154Z" }, ] +[[package]] +name = "pytest-aiohttp" +version = "1.1.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "aiohttp" }, + { name = "pytest" }, + { name = "pytest-asyncio" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/72/4b/d326890c153f2c4ce1bf45d07683c08c10a1766058a22934620bc6ac6592/pytest_aiohttp-1.1.0.tar.gz", hash = "sha256:147de8cb164f3fc9d7196967f109ab3c0b93ea3463ab50631e56438eab7b5adc", size = 12842, upload-time = "2025-01-23T12:44:04.465Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ba/0f/e6af71c02e0f1098eaf7d2dbf3ffdf0a69fc1e0ef174f96af05cef161f1b/pytest_aiohttp-1.1.0-py3-none-any.whl", hash = "sha256:f39a11693a0dce08dd6c542d241e199dd8047a6e6596b2bcfa60d373f143456d", size = 8932, upload-time = "2025-01-23T12:44:03.27Z" }, +] + +[[package]] +name = "pytest-asyncio" +version = "1.3.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pytest" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/90/2c/8af215c0f776415f3590cac4f9086ccefd6fd463befeae41cd4d3f193e5a/pytest_asyncio-1.3.0.tar.gz", hash = "sha256:d7f52f36d231b80ee124cd216ffb19369aa168fc10095013c6b014a34d3ee9e5", size = 50087, upload-time = "2025-11-10T16:07:47.256Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/e5/35/f8b19922b6a25bc0880171a2f1a003eaeb93657475193ab516fd87cac9da/pytest_asyncio-1.3.0-py3-none-any.whl", hash = "sha256:611e26147c7f77640e6d0a92a38ed17c3e9848063698d5c93d5aa7aa11cebff5", size = 15075, upload-time = "2025-11-10T16:07:45.537Z" }, +] + [[package]] name = "selenium" version = "4.41.0"