1
0
Fork 0
gibidy/src/providers/chatgpt/registration.py

465 lines
15 KiB
Python

import asyncio
import base64
import hashlib
import logging
import random
import re
import secrets
import string
import time
from datetime import datetime
from pathlib import Path
import os
from typing import Callable
from urllib.parse import parse_qs, urlencode, urlparse
import aiohttp
from playwright.async_api import async_playwright, Page, BrowserContext
from browser import launch as launch_browser
from email_providers import BaseProvider
from providers.base import ProviderTokens
from .tokens import CLIENT_ID, save_tokens
logger = logging.getLogger(__name__)
DATA_DIR = Path(os.environ.get("DATA_DIR", "./data"))
AUTHORIZE_URL = "https://auth.openai.com/oauth/authorize"
TOKEN_URL = "https://auth.openai.com/oauth/token"
REDIRECT_URI = "http://localhost:1455/auth/callback"
SCOPE = "openid profile email offline_access"
class AutomationError(Exception):
def __init__(self, step: str, message: str, page: Page | None = None):
self.step = step
self.message = message
self.page = page
super().__init__(f"[{step}] {message}")
async def save_error_screenshot(page: Page | None, step: str):
if page:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
screenshots_dir = DATA_DIR / "screenshots"
screenshots_dir.mkdir(parents=True, exist_ok=True)
filename = screenshots_dir / f"error_{step}_{timestamp}.png"
try:
await page.screenshot(path=str(filename))
logger.error(f"Screenshot saved: {filename}")
except:
pass
def generate_password(length: int = 20) -> str:
alphabet = string.ascii_letters + string.digits
return "".join(random.choice(alphabet) for _ in range(length))
def generate_name() -> str:
first_names = [
"James",
"John",
"Robert",
"Michael",
"William",
"David",
"Richard",
"Joseph",
"Thomas",
"Charles",
"Christopher",
"Daniel",
"Matthew",
"Anthony",
"Mark",
"Donald",
"Steven",
"Paul",
"Andrew",
"Joshua",
"Kenneth",
"Kevin",
"Brian",
"George",
"Edward",
"Ronald",
"Timothy",
"Jason",
"Jeffrey",
"Ryan",
"Jacob",
"Gary",
"Nicholas",
"Eric",
"Jonathan",
"Stephen",
"Larry",
"Justin",
"Scott",
"Brandon",
"Benjamin",
"Samuel",
"Frank",
"Gregory",
"Raymond",
"Alexander",
"Patrick",
"Jack",
"Dennis",
"Jerry",
]
last_names = [
"Smith",
"Johnson",
"Williams",
"Brown",
"Jones",
"Garcia",
"Miller",
"Davis",
"Rodriguez",
"Martinez",
"Hernandez",
"Lopez",
"Gonzalez",
"Wilson",
"Anderson",
"Thomas",
"Taylor",
"Moore",
"Jackson",
"Martin",
"Lee",
"Perez",
"Thompson",
"White",
"Harris",
"Sanchez",
"Clark",
"Ramirez",
"Lewis",
"Robinson",
"Walker",
"Young",
"Allen",
"King",
"Wright",
"Scott",
"Torres",
"Nguyen",
"Hill",
"Flores",
"Green",
"Adams",
"Nelson",
"Baker",
"Hall",
"Rivera",
"Campbell",
"Mitchell",
"Carter",
"Roberts",
]
return f"{random.choice(first_names)} {random.choice(last_names)}"
def generate_birthdate_90s() -> tuple[str, str, str]:
year = random.randint(1990, 1999)
month = random.randint(1, 12)
day = random.randint(1, 28)
return f"{month:02d}", f"{day:02d}", str(year)
def extract_verification_code(message: str) -> str | None:
normalized = re.sub(r"\s+", " ", message)
preferred = re.search(
r"Your\s+ChatGPT\s+code\s+is\s*(\d{6})",
normalized,
re.IGNORECASE,
)
if preferred:
return preferred.group(1)
openai_otp = re.search(r"OpenAI\s+otp.*?(\d{6})", normalized, re.IGNORECASE)
if openai_otp:
return openai_otp.group(1)
all_codes = re.findall(r"\b(\d{6})\b", normalized)
if all_codes:
return all_codes[-1]
return None
def generate_pkce_pair() -> tuple[str, str]:
verifier = secrets.token_urlsafe(64)
digest = hashlib.sha256(verifier.encode("utf-8")).digest()
challenge = base64.urlsafe_b64encode(digest).decode("utf-8").rstrip("=")
return verifier, challenge
def generate_state() -> str:
return secrets.token_urlsafe(32)
def build_authorize_url(verifier: str, challenge: str, state: str) -> str:
del verifier
params = {
"response_type": "code",
"client_id": CLIENT_ID,
"redirect_uri": REDIRECT_URI,
"scope": SCOPE,
"code_challenge": challenge,
"code_challenge_method": "S256",
"id_token_add_organizations": "true",
"codex_cli_simplified_flow": "true",
"state": state,
"originator": "opencode",
}
return f"{AUTHORIZE_URL}?{urlencode(params)}"
async def exchange_code_for_tokens(code: str, verifier: str) -> ProviderTokens:
async with aiohttp.ClientSession() as session:
payload = {
"grant_type": "authorization_code",
"client_id": CLIENT_ID,
"code": code,
"code_verifier": verifier,
"redirect_uri": REDIRECT_URI,
}
async with session.post(TOKEN_URL, data=payload) as resp:
if not resp.ok:
text = await resp.text()
raise RuntimeError(f"Token exchange failed: {resp.status} {text}")
body = await resp.json()
expires_in = int(body["expires_in"])
return ProviderTokens(
access_token=body["access_token"],
refresh_token=body["refresh_token"],
expires_at=time.time() + expires_in,
)
async def get_latest_code(email_provider: BaseProvider, email: str) -> str | None:
message = await email_provider.get_latest_message(email)
if not message:
return None
return extract_verification_code(message)
async def fill_date_field(page: Page, month: str, day: str, year: str):
month_field = page.locator('[data-type="month"]').first
if await month_field.count() == 0:
raise AutomationError("profile", "Missing birthday month field", page)
await month_field.scroll_into_view_if_needed()
await month_field.click()
await page.wait_for_timeout(80)
await page.keyboard.type(f"{month}{day}{year}")
await page.wait_for_timeout(120)
async def click_continue(page: Page, timeout_ms: int = 10000):
btn = page.get_by_role("button", name="Continue", exact=True).first
await btn.wait_for(state="visible", timeout=timeout_ms)
await btn.click()
async def wait_for_signup_stabilization(
page: Page,
source_url: str,
timeout_seconds: int = 30,
):
end_at = asyncio.get_running_loop().time() + timeout_seconds
while asyncio.get_running_loop().time() < end_at:
current_url = page.url
if current_url != source_url:
logger.info("Signup redirect detected: %s -> %s", source_url, current_url)
return
await asyncio.sleep(0.5)
logger.warning("Signup redirect was not detected within %ss", timeout_seconds)
async def register_chatgpt_account(
email_provider_factory: Callable[[BrowserContext], BaseProvider] | None = None,
) -> bool:
logger.info("=== Starting ChatGPT account registration ===")
if email_provider_factory is None:
logger.error("No email provider factory configured")
return False
birth_month, birth_day, birth_year = generate_birthdate_90s()
current_page: Page | None = None
redirect_url_captured: str | None = None
managed = None
try:
async with async_playwright() as p:
managed = await launch_browser(p)
browser = managed.browser
context = (
browser.contexts[0] if browser.contexts else await browser.new_context()
)
email_provider = email_provider_factory(context)
logger.info("[1/5] Getting new email from configured provider...")
email = await email_provider.get_new_email()
if not email:
raise AutomationError(
"email_provider", "Email provider returned empty email"
)
password = generate_password()
full_name = generate_name()
verifier, challenge = generate_pkce_pair()
oauth_state = generate_state()
authorize_url = build_authorize_url(verifier, challenge, oauth_state)
logger.info("[2/5] Registering ChatGPT for %s", email)
chatgpt_page = await context.new_page()
current_page = chatgpt_page
await chatgpt_page.goto("https://chatgpt.com")
await chatgpt_page.wait_for_load_state("domcontentloaded")
await chatgpt_page.get_by_text("Sign up for free", exact=True).click()
await chatgpt_page.locator('input[type="email"]').first.wait_for(
state="visible", timeout=15000
)
await chatgpt_page.locator('input[type="email"]').fill(email)
await click_continue(chatgpt_page)
await chatgpt_page.locator('input[type="password"]').first.wait_for(
state="visible", timeout=15000
)
await chatgpt_page.locator('input[type="password"]').fill(password)
await click_continue(chatgpt_page)
await chatgpt_page.get_by_placeholder("Code").first.wait_for(
state="visible", timeout=30000
)
logger.info("[3/5] Getting verification message from email provider...")
code = await get_latest_code(email_provider, email)
if not code:
raise AutomationError(
"email_provider", "Email provider returned no verification message"
)
logger.info("[3/5] Verification code extracted: %s", code)
await chatgpt_page.bring_to_front()
code_input = chatgpt_page.get_by_placeholder("Code")
if await code_input.count() > 0:
await code_input.fill(code)
await click_continue(chatgpt_page)
logger.info("[4/5] Setting profile...")
name_input = chatgpt_page.get_by_placeholder("Full name")
await name_input.first.wait_for(state="visible", timeout=20000)
if await name_input.count() > 0:
await name_input.fill(full_name)
await fill_date_field(chatgpt_page, birth_month, birth_day, birth_year)
profile_url = chatgpt_page.url
await click_continue(chatgpt_page)
logger.info("Account registered!")
await wait_for_signup_stabilization(chatgpt_page, source_url=profile_url)
logger.info("[5/5] Running OAuth flow to get tokens...")
oauth_page = await context.new_page()
current_page = oauth_page
def handle_request(request):
nonlocal redirect_url_captured
url = request.url
if "localhost:1455" in url and "code=" in url:
redirect_url_captured = url
logger.info("Captured OAuth redirect URL")
oauth_page.on("request", handle_request)
await oauth_page.goto(authorize_url, wait_until="domcontentloaded")
await oauth_page.locator(
'input[type="email"], input[name="email"]'
).first.wait_for(state="visible", timeout=20000)
email_input = oauth_page.locator('input[type="email"], input[name="email"]')
if await email_input.count() > 0:
await email_input.first.fill(email)
continue_button = oauth_page.get_by_role("button", name="Continue")
if await continue_button.count() > 0:
await continue_button.first.click()
await oauth_page.locator('input[type="password"]').first.wait_for(
state="visible", timeout=20000
)
password_input = oauth_page.locator('input[type="password"]')
if await password_input.count() > 0:
await password_input.first.fill(password)
continue_button = oauth_page.get_by_role("button", name="Continue")
if await continue_button.count() > 0:
await continue_button.first.click()
for label in ["Continue", "Allow", "Authorize"]:
button = oauth_page.get_by_role("button", name=label)
if await button.count() > 0:
try:
await button.first.click(timeout=5000)
await oauth_page.wait_for_timeout(500)
except Exception:
pass
if not redirect_url_captured:
try:
await oauth_page.wait_for_timeout(4000)
current_url = oauth_page.url
if "localhost:1455" in current_url and "code=" in current_url:
redirect_url_captured = current_url
logger.info("Captured OAuth redirect from page URL")
except Exception:
pass
if not redirect_url_captured:
raise AutomationError(
"oauth", "OAuth redirect with code was not captured", oauth_page
)
parsed = urlparse(redirect_url_captured)
params = parse_qs(parsed.query)
auth_code = params.get("code", [None])[0]
returned_state = params.get("state", [None])[0]
if not auth_code:
raise AutomationError(
"oauth", "OAuth code missing in redirect", oauth_page
)
if returned_state != oauth_state:
raise AutomationError("oauth", "OAuth state mismatch", oauth_page)
tokens = await exchange_code_for_tokens(auth_code, verifier)
save_tokens(tokens)
logger.info("OAuth tokens saved successfully")
return True
except AutomationError as e:
logger.error(f"Error at step [{e.step}]: {e.message}")
await save_error_screenshot(e.page, e.step)
return False
except Exception as e:
logger.error(f"Unexpected error: {e}")
await save_error_screenshot(current_page, "unexpected")
return False
finally:
if managed:
await asyncio.sleep(2)
await managed.close()