1
0
Fork 0

refactor!: change the entire purpose of this script

This commit is contained in:
Arthur K. 2026-03-01 19:32:10 +03:00
parent 217e176975
commit 71d1050adb
Signed by: wzray
GPG key ID: B97F30FDC4636357
20 changed files with 1124 additions and 872 deletions

View file

@ -0,0 +1,5 @@
from .base import BaseProvider
from .ten_minute_mail import TenMinuteMailProvider
from .temp_mail_org import TempMailOrgProvider
__all__ = ["BaseProvider", "TenMinuteMailProvider", "TempMailOrgProvider"]

View file

@ -0,0 +1,16 @@
from abc import ABC, abstractmethod
from playwright.async_api import BrowserContext
class BaseProvider(ABC):
def __init__(self, browser_session: BrowserContext):
self.browser_session = browser_session
@abstractmethod
async def get_new_email(self) -> str:
pass
@abstractmethod
async def get_latest_message(self, email: str) -> str | None:
pass

View file

@ -0,0 +1,125 @@
import asyncio
import logging
import re
from playwright.async_api import BrowserContext, Page
from .base import BaseProvider
logger = logging.getLogger(__name__)
class TempMailOrgProvider(BaseProvider):
def __init__(self, browser_session: BrowserContext):
super().__init__(browser_session)
self.page: Page | None = None
async def _ensure_page(self) -> Page:
if self.page is None or self.page.is_closed():
self.page = await self.browser_session.new_page()
return self.page
async def get_new_email(self) -> str:
page = await self._ensure_page()
logger.info("[temp-mail.org] Opening mailbox page")
await page.goto("https://temp-mail.org", wait_until="domcontentloaded")
await page.locator("input#mail, #mail, input[value*='@']").first.wait_for(
state="visible",
timeout=30000,
)
selectors = ["#mail", "input#mail", "input[value*='@']"]
end_at = asyncio.get_running_loop().time() + 60
while asyncio.get_running_loop().time() < end_at:
await page.bring_to_front()
for selector in selectors:
try:
field = page.locator(selector).first
if await field.is_visible(timeout=1000):
value = (await field.input_value()).strip()
if "@" in value:
logger.info(
"[temp-mail.org] selector matched: %s -> %s",
selector,
value,
)
return value
except Exception:
continue
try:
body = await page.inner_text("body")
found = extract_email(body)
if found:
logger.info("[temp-mail.org] email found by body scan: %s", found)
return found
except Exception:
pass
await asyncio.sleep(1)
raise RuntimeError("Could not get temp email from temp-mail.org")
async def get_latest_message(self, email: str) -> str | None:
page = await self._ensure_page()
logger.info("[temp-mail.org] Waiting for latest message for %s", email)
if page.is_closed():
raise RuntimeError("temp-mail.org tab was closed unexpectedly")
await page.bring_to_front()
items = page.locator("div.inbox-dataList ul li")
# temp-mail updates inbox via websocket; do not refresh/reload page.
for attempt in range(30):
try:
count = await items.count()
logger.info("[temp-mail.org] inbox items: %s", count)
except Exception:
count = 0
if count > 0:
for idx in reversed(range(count)):
try:
item = items.nth(idx)
if not await item.is_visible(timeout=1000):
continue
text = (await item.inner_text()).strip().replace("\n", " ")
logger.info("[temp-mail.org] item[%s]: %s", idx, text[:160])
except Exception:
continue
if text:
try:
await item.click()
logger.info("[temp-mail.org] opened item[%s]", idx)
except Exception:
pass
message_text = text
try:
content = await page.content()
if content and "Your ChatGPT code is" in content:
message_text = content
except Exception:
pass
try:
await page.go_back(
wait_until="domcontentloaded", timeout=5000
)
logger.info("[temp-mail.org] returned back to inbox")
except Exception:
pass
return message_text
await asyncio.sleep(2)
logger.warning("[temp-mail.org] No messages received within 60 seconds")
return None
def extract_email(text: str) -> str | None:
match = re.search(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}", text)
return match.group(0) if match else None

View file

@ -0,0 +1,100 @@
import asyncio
import logging
from playwright.async_api import BrowserContext, Page
from .base import BaseProvider
logger = logging.getLogger(__name__)
class TenMinuteMailProvider(BaseProvider):
def __init__(self, browser_session: BrowserContext):
super().__init__(browser_session)
self.page: Page | None = None
async def _ensure_page(self) -> Page:
if self.page is None or self.page.is_closed():
self.page = await self.browser_session.new_page()
return self.page
async def get_new_email(self) -> str:
page = await self._ensure_page()
logger.info("[10min] Opening https://10minutemail.com")
await page.goto("https://10minutemail.com", wait_until="domcontentloaded")
await page.wait_for_timeout(3000)
email_input = page.locator("#mail_address")
await email_input.first.wait_for(state="visible", timeout=60000)
email = (await email_input.first.input_value()).strip()
if not email or "@" not in email:
raise RuntimeError("10MinuteMail did not return a valid email")
logger.info("[10min] New email acquired: %s", email)
return email
async def get_latest_message(self, email: str) -> str | None:
page = await self._ensure_page()
logger.info("[10min] Waiting for latest message for %s", email)
seen_count = 0
for attempt in range(60):
try:
count = await page.evaluate(
"""
async () => {
const response = await fetch('/messages/messageCount', { credentials: 'include' });
const data = await response.json();
return Number(data.messageCount || 0);
}
"""
)
except Exception:
count = 0
if count > 0:
if count != seen_count:
logger.info("[10min] Inbox has %s message(s)", count)
seen_count = count
try:
messages = await page.evaluate(
"""
async () => {
const response = await fetch('/messages/messagesAfter/0', { credentials: 'include' });
const data = await response.json();
return Array.isArray(data) ? data : [];
}
"""
)
except Exception:
messages = []
text = ""
if messages:
latest = messages[-1]
subject = str(latest.get("subject") or "")
sender = str(latest.get("sender") or "")
body_plain = str(latest.get("bodyPlainText") or "")
body_html = str(latest.get("bodyHtmlContent") or "")
text = "\n".join(
part
for part in [subject, sender, body_plain, body_html]
if part
)
if text:
logger.info("[10min] Latest message received")
return text
if attempt % 3 == 0:
try:
await page.reload(wait_until="domcontentloaded", timeout=60000)
except Exception:
pass
await asyncio.sleep(2)
logger.warning("[10min] No messages received within timeout")
return None