1
0
Fork 0

fix: revert old oauth behavior and new default email provider

This commit is contained in:
Arthur K. 2026-03-02 19:10:50 +03:00
parent 0af7179596
commit 6dd26ad3d8
Signed by: wzray
GPG key ID: B97F30FDC4636357
7 changed files with 439 additions and 23 deletions

View file

@ -281,7 +281,34 @@ async def click_continue(page: Page, timeout_ms: int = 10000):
await btn.click()
async def click_any_visible_button(
async def oauth_needs_email_check(page: Page) -> bool:
marker = page.get_by_text("Check your inbox", exact=False)
return await marker.count() > 0
async def fill_oauth_code_if_present(page: Page, code: str) -> bool:
candidates = [
page.get_by_placeholder("Code"),
page.get_by_label("Code"),
page.locator(
'input[name*="code" i], input[id*="code" i], '
'input[autocomplete="one-time-code"], input[inputmode="numeric"]'
),
]
for locator in candidates:
if await locator.count() == 0:
continue
try:
await locator.first.wait_for(state="visible", timeout=1500)
await locator.first.fill(code)
return True
except PlaywrightError:
continue
return False
async def click_first_visible_button(
page: Page,
labels: list[str],
timeout_ms: int = 2000,
@ -415,44 +442,67 @@ async def register_chatgpt_account(
oauth_page.on("request", handle_request)
await oauth_page.goto(authorize_url, wait_until="domcontentloaded")
await oauth_page.locator(
'input[type="email"], input[name="email"]'
).first.wait_for(state="visible", timeout=20000)
email_input = oauth_page.locator('input[type="email"], input[name="email"]')
if await email_input.count() > 0:
await email_input.first.wait_for(state="visible", timeout=10000)
await email_input.first.fill(email)
await click_any_visible_button(
oauth_page, ["Continue"], timeout_ms=4000
continue_button = oauth_page.get_by_role("button", name="Continue")
if await continue_button.count() > 0:
await continue_button.first.click()
await oauth_page.locator('input[type="password"]').first.wait_for(
state="visible", timeout=20000
)
password_input = oauth_page.locator('input[type="password"]')
if await password_input.count() > 0:
await password_input.first.wait_for(state="visible", timeout=10000)
await password_input.first.fill(password)
await click_any_visible_button(
oauth_page, ["Continue"], timeout_ms=4000
)
continue_button = oauth_page.get_by_role("button", name="Continue")
if await continue_button.count() > 0:
await continue_button.first.click()
for _ in range(6):
last_oauth_email_code = code
oauth_deadline = asyncio.get_running_loop().time() + 60
while asyncio.get_running_loop().time() < oauth_deadline:
if redirect_url_captured:
break
clicked = await click_any_visible_button(
oauth_page,
["Continue", "Allow", "Authorize"],
timeout_ms=2000,
)
if clicked:
await asyncio.sleep(0.4)
else:
await asyncio.sleep(0.4)
if not redirect_url_captured:
if await oauth_needs_email_check(oauth_page):
logger.info("OAuth requested email confirmation code")
new_code = await get_latest_code(email_provider, email)
if new_code and new_code != last_oauth_email_code:
filled = await fill_oauth_code_if_present(oauth_page, new_code)
if filled:
last_oauth_email_code = new_code
logger.info("Filled OAuth email confirmation code")
else:
logger.warning(
"OAuth inbox challenge detected but code field not found"
)
try:
current_url = oauth_page.url
if "localhost:1455" in current_url and "code=" in current_url:
redirect_url_captured = current_url
logger.info("Captured OAuth redirect from page URL")
except PlaywrightError:
break
except Exception:
pass
clicked = await click_first_visible_button(
oauth_page,
["Continue", "Allow", "Authorize", "Verify"],
timeout_ms=2000,
)
if clicked:
await oauth_page.wait_for_timeout(500)
else:
await oauth_page.wait_for_timeout(1000)
if not redirect_url_captured:
raise AutomationError(
"oauth", "OAuth redirect with code was not captured", oauth_page