1
0
Fork 0

fix: revert old oauth behavior and new default email provider

This commit is contained in:
Arthur K. 2026-03-02 19:10:50 +03:00
parent 8b5449b1fd
commit 307ca38ecc
Signed by: wzray
GPG key ID: B97F30FDC4636357
7 changed files with 439 additions and 23 deletions

View file

@ -1,5 +1,11 @@
from .base import BaseProvider
from .mail_tm import MailTmProvider
from .ten_minute_mail import TenMinuteMailProvider
from .temp_mail_org import TempMailOrgProvider
__all__ = ["BaseProvider", "TenMinuteMailProvider", "TempMailOrgProvider"]
__all__ = [
"BaseProvider",
"MailTmProvider",
"TenMinuteMailProvider",
"TempMailOrgProvider",
]

View file

@ -0,0 +1,232 @@
import asyncio
import logging
import os
import secrets
import string
from typing import Any
import aiohttp
from playwright.async_api import BrowserContext
from .base import BaseProvider
logger = logging.getLogger(__name__)
_API_BASE = os.environ.get("MAIL_TM_API_BASE", "https://api.mail.tm")
_TIMEOUT_SECONDS = 20
_FIRST_NAMES = [
"james",
"john",
"robert",
"michael",
"david",
"william",
"joseph",
"thomas",
"daniel",
"mark",
"paul",
"kevin",
]
_LAST_NAMES = [
"smith",
"johnson",
"williams",
"brown",
"jones",
"miller",
"davis",
"wilson",
"anderson",
"taylor",
"martin",
"thompson",
]
def _generate_local_part() -> str:
first = secrets.choice(_FIRST_NAMES)
last = secrets.choice(_LAST_NAMES)
digits = "".join(secrets.choice(string.digits) for _ in range(8))
return f"{first}{last}{digits}"
def _generate_password(length: int = 24) -> str:
alphabet = string.ascii_letters + string.digits
return "".join(secrets.choice(alphabet) for _ in range(length))
class MailTmProvider(BaseProvider):
def __init__(self, browser_session: BrowserContext):
super().__init__(browser_session)
self._address: str | None = None
self._password: str | None = None
self._token: str | None = None
async def _request(
self,
method: str,
path: str,
*,
token: str | None = None,
json_body: dict[str, Any] | None = None,
) -> tuple[int, dict[str, Any] | list[Any] | None]:
url = f"{_API_BASE.rstrip('/')}{path}"
headers: dict[str, str] = {}
if token:
headers["Authorization"] = f"Bearer {token}"
timeout = aiohttp.ClientTimeout(total=_TIMEOUT_SECONDS)
try:
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.request(
method,
url,
headers=headers,
json=json_body,
) as resp:
status = resp.status
try:
payload = await resp.json()
except aiohttp.ContentTypeError:
payload = None
return status, payload
except aiohttp.ClientError as e:
logger.warning("[mail.tm] request failed %s %s: %s", method, path, e)
return 0, None
async def _get_domains(self) -> list[str]:
status, payload = await self._request("GET", "/domains")
if status != 200 or not isinstance(payload, dict):
raise RuntimeError("mail.tm domains request failed")
members = payload.get("hydra:member")
if not isinstance(members, list):
raise RuntimeError("mail.tm domains response has unexpected format")
domains: list[str] = []
for item in members:
if not isinstance(item, dict):
continue
domain = item.get("domain")
is_active = bool(item.get("isActive", True))
if isinstance(domain, str) and domain and is_active:
domains.append(domain)
if not domains:
raise RuntimeError("mail.tm returned no active domains")
return domains
async def _create_account(self, address: str, password: str) -> bool:
status, _ = await self._request(
"POST",
"/accounts",
json_body={"address": address, "password": password},
)
if status in (200, 201):
return True
return False
async def _create_token(self, address: str, password: str) -> str | None:
status, payload = await self._request(
"POST",
"/token",
json_body={"address": address, "password": password},
)
if status != 200 or not isinstance(payload, dict):
return None
token = payload.get("token")
if isinstance(token, str) and token:
return token
return None
async def get_new_email(self) -> str:
domains = await self._get_domains()
for _ in range(8):
domain = secrets.choice(domains)
address = f"{_generate_local_part()}@{domain}"
password = _generate_password()
created = await self._create_account(address, password)
if not created:
continue
token = await self._create_token(address, password)
if not token:
continue
self._address = address
self._password = password
self._token = token
logger.info("[mail.tm] New mailbox acquired: %s", address)
return address
raise RuntimeError("mail.tm could not create account")
async def _list_messages(self) -> list[dict[str, Any]]:
if not self._token:
return []
status, payload = await self._request(
"GET",
"/messages",
token=self._token,
)
if status == 401 and self._address and self._password:
token = await self._create_token(self._address, self._password)
if token:
self._token = token
status, payload = await self._request(
"GET",
"/messages",
token=self._token,
)
if status != 200 or not isinstance(payload, dict):
return []
members = payload.get("hydra:member")
if not isinstance(members, list):
return []
return [item for item in members if isinstance(item, dict)]
async def _get_message_text(self, message_id: str) -> str | None:
if not self._token:
return None
status, payload = await self._request(
"GET",
f"/messages/{message_id}",
token=self._token,
)
if status != 200 or not isinstance(payload, dict):
return None
parts = [
payload.get("subject"),
payload.get("intro"),
payload.get("text"),
payload.get("html"),
]
text = "\n".join(str(part) for part in parts if part)
return text or None
async def get_latest_message(self, email: str) -> str | None:
del email
if not self._token:
raise RuntimeError("mail.tm provider is not initialized with mailbox token")
for _ in range(45):
messages = await self._list_messages()
if messages:
latest = messages[0]
message_id = latest.get("id")
if isinstance(message_id, str) and message_id:
full_message = await self._get_message_text(message_id)
if full_message:
logger.info("[mail.tm] Latest message received")
return full_message
await asyncio.sleep(2)
logger.warning("[mail.tm] No messages received within timeout")
return None