1
0
Fork 0
gibidy/src/providers/chatgpt/registration.py

533 lines
17 KiB
Python

import asyncio
import base64
import hashlib
import logging
import random
import re
import secrets
import time
from datetime import datetime
from pathlib import Path
import os
from typing import Callable
from urllib.parse import parse_qs, urlencode, urlparse
import aiohttp
from playwright.async_api import (
async_playwright,
Error as PlaywrightError,
Page,
BrowserContext,
)
from browser import launch as launch_browser
from email_providers import BaseProvider
from providers.base import ProviderTokens
from utils.randoms import generate_password
from .tokens import CLIENT_ID
logger = logging.getLogger(__name__)
DATA_DIR = Path(os.environ.get("DATA_DIR", "./data"))
AUTHORIZE_URL = "https://auth.openai.com/oauth/authorize"
TOKEN_URL = "https://auth.openai.com/oauth/token"
REDIRECT_URI = "http://localhost:1455/auth/callback"
SCOPE = "openid profile email offline_access"
class AutomationError(Exception):
def __init__(self, step: str, message: str, page: Page | None = None):
self.step = step
self.message = message
self.page = page
super().__init__(f"[{step}] {message}")
async def save_error_screenshot(page: Page | None, step: str):
if page:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
screenshots_dir = DATA_DIR / "screenshots"
screenshots_dir.mkdir(parents=True, exist_ok=True)
filename = screenshots_dir / f"error_{step}_{timestamp}.png"
try:
await page.screenshot(path=str(filename))
logger.error("Screenshot saved: %s", filename)
except PlaywrightError as e:
logger.warning("Failed to save screenshot at step %s: %s", step, e)
def generate_name() -> str:
first_names = [
"James",
"John",
"Robert",
"Michael",
"William",
"David",
"Richard",
"Joseph",
"Thomas",
"Charles",
"Christopher",
"Daniel",
"Matthew",
"Anthony",
"Mark",
"Donald",
"Steven",
"Paul",
"Andrew",
"Joshua",
"Kenneth",
"Kevin",
"Brian",
"George",
"Edward",
"Ronald",
"Timothy",
"Jason",
"Jeffrey",
"Ryan",
"Jacob",
"Gary",
"Nicholas",
"Eric",
"Jonathan",
"Stephen",
"Larry",
"Justin",
"Scott",
"Brandon",
"Benjamin",
"Samuel",
"Frank",
"Gregory",
"Raymond",
"Alexander",
"Patrick",
"Jack",
"Dennis",
"Jerry",
]
last_names = [
"Smith",
"Johnson",
"Williams",
"Brown",
"Jones",
"Garcia",
"Miller",
"Davis",
"Rodriguez",
"Martinez",
"Hernandez",
"Lopez",
"Gonzalez",
"Wilson",
"Anderson",
"Thomas",
"Taylor",
"Moore",
"Jackson",
"Martin",
"Lee",
"Perez",
"Thompson",
"White",
"Harris",
"Sanchez",
"Clark",
"Ramirez",
"Lewis",
"Robinson",
"Walker",
"Young",
"Allen",
"King",
"Wright",
"Scott",
"Torres",
"Nguyen",
"Hill",
"Flores",
"Green",
"Adams",
"Nelson",
"Baker",
"Hall",
"Rivera",
"Campbell",
"Mitchell",
"Carter",
"Roberts",
]
return f"{random.choice(first_names)} {random.choice(last_names)}"
def generate_birthdate_90s() -> tuple[str, str, str]:
year = random.randint(1990, 1999)
month = random.randint(1, 12)
day = random.randint(1, 28)
return f"{month:02d}", f"{day:02d}", str(year)
def extract_verification_code(message: str) -> str | None:
normalized = re.sub(r"\s+", " ", message)
preferred = re.search(
r"Your\s+ChatGPT\s+code\s+is\s*(\d{6})",
normalized,
re.IGNORECASE,
)
if preferred:
return preferred.group(1)
openai_otp = re.search(r"OpenAI\s+otp.*?(\d{6})", normalized, re.IGNORECASE)
if openai_otp:
return openai_otp.group(1)
all_codes = re.findall(r"\b(\d{6})\b", normalized)
if all_codes:
return all_codes[-1]
return None
def generate_pkce_pair() -> tuple[str, str]:
verifier = secrets.token_urlsafe(64)
digest = hashlib.sha256(verifier.encode("utf-8")).digest()
challenge = base64.urlsafe_b64encode(digest).decode("utf-8").rstrip("=")
return verifier, challenge
def generate_state() -> str:
return secrets.token_urlsafe(32)
def build_authorize_url(challenge: str, state: str) -> str:
params = {
"response_type": "code",
"client_id": CLIENT_ID,
"redirect_uri": REDIRECT_URI,
"scope": SCOPE,
"code_challenge": challenge,
"code_challenge_method": "S256",
"id_token_add_organizations": "true",
"codex_cli_simplified_flow": "true",
"state": state,
"originator": "opencode",
}
return f"{AUTHORIZE_URL}?{urlencode(params)}"
async def exchange_code_for_tokens(code: str, verifier: str) -> ProviderTokens:
payload = {
"grant_type": "authorization_code",
"client_id": CLIENT_ID,
"code": code,
"code_verifier": verifier,
"redirect_uri": REDIRECT_URI,
}
timeout = aiohttp.ClientTimeout(total=20)
try:
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.post(TOKEN_URL, data=payload) as resp:
if not resp.ok:
text = await resp.text()
raise RuntimeError(f"Token exchange failed: {resp.status} {text}")
body = await resp.json()
except (aiohttp.ClientError, TimeoutError) as e:
raise RuntimeError(f"Token exchange request error: {e}") from e
try:
expires_in = int(body["expires_in"])
return ProviderTokens(
access_token=body["access_token"],
refresh_token=body["refresh_token"],
expires_at=time.time() + expires_in,
)
except (KeyError, TypeError, ValueError) as e:
raise RuntimeError(f"Token exchange response parse error: {e}") from e
async def get_latest_code(email_provider: BaseProvider) -> str | None:
message = await email_provider.get_latest_message()
if not message:
return None
return extract_verification_code(message)
async def fill_date_field(page: Page, month: str, day: str, year: str):
month_field = page.locator('[data-type="month"]').first
if await month_field.count() == 0:
raise AutomationError("profile", "Missing birthday month field", page)
await month_field.scroll_into_view_if_needed()
await month_field.click()
await page.wait_for_timeout(80)
await page.keyboard.type(f"{month}{day}{year}")
await page.wait_for_timeout(120)
async def click_continue(page: Page, timeout_ms: int = 10000):
btn = page.get_by_role("button", name="Continue", exact=True).first
await btn.wait_for(state="visible", timeout=timeout_ms)
await btn.click()
async def oauth_needs_email_check(page: Page) -> bool:
marker = page.get_by_text("Check your inbox", exact=False)
return await marker.count() > 0
async def fill_oauth_code_if_present(page: Page, code: str) -> bool:
candidates = [
page.get_by_placeholder("Code"),
page.get_by_label("Code"),
page.locator(
'input[name*="code" i], input[id*="code" i], '
'input[autocomplete="one-time-code"], input[inputmode="numeric"]'
),
]
for locator in candidates:
if await locator.count() == 0:
continue
try:
await locator.first.wait_for(state="visible", timeout=1500)
await locator.first.fill(code)
return True
except PlaywrightError:
continue
return False
async def click_first_visible_button(
page: Page,
labels: list[str],
timeout_ms: int = 2000,
) -> bool:
for label in labels:
button = page.get_by_role("button", name=label)
if await button.count() == 0:
continue
try:
await button.first.wait_for(state="visible", timeout=timeout_ms)
await button.first.click(timeout=timeout_ms)
return True
except PlaywrightError:
continue
return False
async def wait_for_signup_stabilization(
page: Page,
source_url: str,
timeout_seconds: int = 30,
):
end_at = asyncio.get_running_loop().time() + timeout_seconds
while asyncio.get_running_loop().time() < end_at:
current_url = page.url
if current_url != source_url:
logger.info("Signup redirect detected: %s -> %s", source_url, current_url)
return
await asyncio.sleep(0.5)
logger.warning("Signup redirect was not detected within %ss", timeout_seconds)
async def register_chatgpt_account(
email_provider_factory: Callable[[BrowserContext], BaseProvider] | None = None,
) -> ProviderTokens | None:
logger.info("=== Starting ChatGPT account registration ===")
if email_provider_factory is None:
logger.error("No email provider factory configured")
return None
birth_month, birth_day, birth_year = generate_birthdate_90s()
current_page: Page | None = None
redirect_url_captured: str | None = None
managed = None
try:
async with async_playwright() as p:
managed = await launch_browser(p)
browser = managed.browser
context = (
browser.contexts[0] if browser.contexts else await browser.new_context()
)
email_provider = email_provider_factory(context)
logger.info("[1/5] Getting new email from configured provider...")
email = await email_provider.get_new_email()
if not email:
raise AutomationError(
"email_provider", "Email provider returned empty email"
)
password = generate_password()
full_name = generate_name()
verifier, challenge = generate_pkce_pair()
oauth_state = generate_state()
authorize_url = build_authorize_url(challenge, oauth_state)
logger.info("[2/5] Registering ChatGPT for %s", email)
chatgpt_page = await context.new_page()
current_page = chatgpt_page
await chatgpt_page.goto("https://chatgpt.com")
await chatgpt_page.wait_for_load_state("domcontentloaded")
await chatgpt_page.get_by_text("Sign up for free", exact=True).click()
await chatgpt_page.locator('input[type="email"]').first.wait_for(
state="visible", timeout=15000
)
await chatgpt_page.locator('input[type="email"]').fill(email)
await click_continue(chatgpt_page)
await chatgpt_page.locator('input[type="password"]').first.wait_for(
state="visible", timeout=15000
)
await chatgpt_page.locator('input[type="password"]').fill(password)
await click_continue(chatgpt_page)
await chatgpt_page.get_by_placeholder("Code").first.wait_for(
state="visible", timeout=30000
)
logger.info("[3/5] Getting verification message from email provider...")
code = await get_latest_code(email_provider)
if not code:
raise AutomationError(
"email_provider", "Email provider returned no verification message"
)
logger.info("[3/5] Verification code extracted")
await chatgpt_page.bring_to_front()
code_input = chatgpt_page.get_by_placeholder("Code")
await code_input.first.wait_for(state="visible", timeout=10000)
await code_input.first.fill(code)
await click_continue(chatgpt_page)
logger.info("[4/5] Setting profile...")
name_input = chatgpt_page.get_by_placeholder("Full name")
await name_input.first.wait_for(state="visible", timeout=20000)
await name_input.first.fill(full_name)
await fill_date_field(chatgpt_page, birth_month, birth_day, birth_year)
profile_url = chatgpt_page.url
await click_continue(chatgpt_page)
logger.info("Account registered!")
await wait_for_signup_stabilization(chatgpt_page, source_url=profile_url)
logger.info("[5/5] Running OAuth flow to get tokens...")
oauth_page = await context.new_page()
current_page = oauth_page
def handle_request(request):
nonlocal redirect_url_captured
url = request.url
if "localhost:1455" in url and "code=" in url:
redirect_url_captured = url
logger.info("Captured OAuth redirect URL")
oauth_page.on("request", handle_request)
await oauth_page.goto(authorize_url, wait_until="domcontentloaded")
await oauth_page.locator(
'input[type="email"], input[name="email"]'
).first.wait_for(state="visible", timeout=20000)
email_input = oauth_page.locator('input[type="email"], input[name="email"]')
if await email_input.count() > 0:
await email_input.first.fill(email)
continue_button = oauth_page.get_by_role("button", name="Continue")
if await continue_button.count() > 0:
await continue_button.first.click()
await oauth_page.locator('input[type="password"]').first.wait_for(
state="visible", timeout=20000
)
password_input = oauth_page.locator('input[type="password"]')
if await password_input.count() > 0:
await password_input.first.fill(password)
continue_button = oauth_page.get_by_role("button", name="Continue")
if await continue_button.count() > 0:
await continue_button.first.click()
last_oauth_email_code = code
oauth_deadline = asyncio.get_running_loop().time() + 60
while asyncio.get_running_loop().time() < oauth_deadline:
if redirect_url_captured:
break
if await oauth_needs_email_check(oauth_page):
logger.info("OAuth requested email confirmation code")
new_code = await get_latest_code(email_provider)
if new_code and new_code != last_oauth_email_code:
filled = await fill_oauth_code_if_present(oauth_page, new_code)
if filled:
last_oauth_email_code = new_code
logger.info("Filled OAuth email confirmation code")
else:
logger.warning(
"OAuth inbox challenge detected but code field not found"
)
try:
current_url = oauth_page.url
if "localhost:1455" in current_url and "code=" in current_url:
redirect_url_captured = current_url
logger.info("Captured OAuth redirect from page URL")
break
except Exception:
pass
clicked = await click_first_visible_button(
oauth_page,
["Continue", "Allow", "Authorize", "Verify"],
timeout_ms=2000,
)
if clicked:
await oauth_page.wait_for_timeout(500)
else:
await oauth_page.wait_for_timeout(1000)
if not redirect_url_captured:
raise AutomationError(
"oauth", "OAuth redirect with code was not captured", oauth_page
)
parsed = urlparse(redirect_url_captured)
params = parse_qs(parsed.query)
auth_code = params.get("code", [None])[0]
returned_state = params.get("state", [None])[0]
if not auth_code:
raise AutomationError(
"oauth", "OAuth code missing in redirect", oauth_page
)
if returned_state != oauth_state:
raise AutomationError("oauth", "OAuth state mismatch", oauth_page)
tokens = await exchange_code_for_tokens(auth_code, verifier)
logger.info("OAuth tokens fetched successfully")
return tokens
except AutomationError as e:
logger.error(f"Error at step [{e.step}]: {e.message}")
await save_error_screenshot(e.page, e.step)
return None
except Exception as e:
logger.error(f"Unexpected error: {e}")
await save_error_screenshot(current_page, "unexpected")
return None
finally:
if managed:
await managed.close()