1
0
Fork 0
gibidy/src/email_providers/temp_mail_org.py

125 lines
4.5 KiB
Python

import asyncio
import logging
import re
from playwright.async_api import BrowserContext, Page
from .base import BaseProvider
logger = logging.getLogger(__name__)
class TempMailOrgProvider(BaseProvider):
def __init__(self, browser_session: BrowserContext):
super().__init__(browser_session)
self.page: Page | None = None
async def _ensure_page(self) -> Page:
if self.page is None or self.page.is_closed():
self.page = await self.browser_session.new_page()
return self.page
async def get_new_email(self) -> str:
page = await self._ensure_page()
logger.info("[temp-mail.org] Opening mailbox page")
await page.goto("https://temp-mail.org", wait_until="domcontentloaded")
await page.locator("input#mail, #mail, input[value*='@']").first.wait_for(
state="visible",
timeout=30000,
)
selectors = ["#mail", "input#mail", "input[value*='@']"]
end_at = asyncio.get_running_loop().time() + 60
while asyncio.get_running_loop().time() < end_at:
await page.bring_to_front()
for selector in selectors:
try:
field = page.locator(selector).first
if await field.is_visible(timeout=1000):
value = (await field.input_value()).strip()
if "@" in value:
logger.info(
"[temp-mail.org] selector matched: %s -> %s",
selector,
value,
)
return value
except Exception:
continue
try:
body = await page.inner_text("body")
found = extract_email(body)
if found:
logger.info("[temp-mail.org] email found by body scan: %s", found)
return found
except Exception:
pass
await asyncio.sleep(1)
raise RuntimeError("Could not get temp email from temp-mail.org")
async def get_latest_message(self, email: str) -> str | None:
page = await self._ensure_page()
logger.info("[temp-mail.org] Waiting for latest message for %s", email)
if page.is_closed():
raise RuntimeError("temp-mail.org tab was closed unexpectedly")
await page.bring_to_front()
items = page.locator("div.inbox-dataList ul li")
# temp-mail updates inbox via websocket; do not refresh/reload page.
for attempt in range(30):
try:
count = await items.count()
logger.info("[temp-mail.org] inbox items: %s", count)
except Exception:
count = 0
if count > 0:
for idx in reversed(range(count)):
try:
item = items.nth(idx)
if not await item.is_visible(timeout=1000):
continue
text = (await item.inner_text()).strip().replace("\n", " ")
logger.info("[temp-mail.org] item[%s]: %s", idx, text[:160])
except Exception:
continue
if text:
try:
await item.click()
logger.info("[temp-mail.org] opened item[%s]", idx)
except Exception:
pass
message_text = text
try:
content = await page.content()
if content and "Your ChatGPT code is" in content:
message_text = content
except Exception:
pass
try:
await page.go_back(
wait_until="domcontentloaded", timeout=5000
)
logger.info("[temp-mail.org] returned back to inbox")
except Exception:
pass
return message_text
await asyncio.sleep(2)
logger.warning("[temp-mail.org] No messages received within 60 seconds")
return None
def extract_email(text: str) -> str | None:
match = re.search(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}", text)
return match.group(0) if match else None