1
0
Fork 0

i think that this works now

This commit is contained in:
Arthur K. 2026-03-08 09:44:33 +03:00
parent a3c843d63c
commit fc555244a8
Signed by: wzray
GPG key ID: B97F30FDC4636357
13 changed files with 715 additions and 505 deletions

View file

@ -1,4 +1,6 @@
from emails import pop_account, peek_accounts, remaining_count, _parse_line
import pytest
from emails import pop_account, has_accounts, mark_done, mark_failed, _parse_line
import emails as em
@ -25,57 +27,103 @@ def test_parse_line_malformed():
assert _parse_line("no-colon-here") is None
def test_peek_accounts(tmp_path, monkeypatch):
@pytest.mark.asyncio
async def test_has_accounts_true(tmp_path, monkeypatch):
f = tmp_path / "emails.txt"
f.write_text("a@b.com:pass1\nc@d.com:pass2\n")
monkeypatch.setattr(em, "EMAILS_FILE", f)
accounts = peek_accounts()
assert len(accounts) == 2
assert accounts[0].email == "a@b.com"
assert accounts[1].email == "c@d.com"
# peek doesn't consume
assert remaining_count() == 2
assert await has_accounts() is True
def test_pop_account(tmp_path, monkeypatch):
f = tmp_path / "emails.txt"
f.write_text("a@b.com:pass1\nc@d.com:pass2\ne@f.com:pass3\n")
monkeypatch.setattr(em, "EMAILS_FILE", f)
acc = pop_account()
assert acc is not None
assert acc.email == "a@b.com"
assert remaining_count() == 2
acc = pop_account()
assert acc is not None
assert acc.email == "c@d.com"
assert remaining_count() == 1
def test_pop_account_empty(tmp_path, monkeypatch):
@pytest.mark.asyncio
async def test_has_accounts_false(tmp_path, monkeypatch):
f = tmp_path / "emails.txt"
f.write_text("")
monkeypatch.setattr(em, "EMAILS_FILE", f)
assert pop_account() is None
assert await has_accounts() is False
def test_pop_account_missing_file(tmp_path, monkeypatch):
@pytest.mark.asyncio
async def test_has_accounts_missing_file(tmp_path, monkeypatch):
monkeypatch.setattr(em, "EMAILS_FILE", tmp_path / "nope.txt")
assert pop_account() is None
assert await has_accounts() is False
def test_pop_skips_comments(tmp_path, monkeypatch):
f = tmp_path / "emails.txt"
f.write_text("# first is comment\na@b.com:pass1\n")
monkeypatch.setattr(em, "EMAILS_FILE", f)
@pytest.mark.asyncio
async def test_pop_account_removes_from_file(tmp_path, monkeypatch):
emails_file = tmp_path / "emails.txt"
emails_file.write_text("a@b.com:pass1\nc@d.com:pass2\ne@f.com:pass3\n")
monkeypatch.setattr(em, "EMAILS_FILE", emails_file)
monkeypatch.setattr(em, "DATA_DIR", tmp_path)
monkeypatch.setattr(em, "DONE_FILE", tmp_path / "done.txt")
monkeypatch.setattr(em, "FAILED_FILE", tmp_path / "failed.txt")
acc = pop_account()
acc = await pop_account()
assert acc is not None
assert acc.email == "a@b.com"
# Comment line stays in file
remaining = f.read_text().strip()
assert remaining == "# first is comment"
remaining = emails_file.read_text()
assert "a@b.com" not in remaining
assert "c@d.com" in remaining
@pytest.mark.asyncio
async def test_pop_account_empty(tmp_path, monkeypatch):
f = tmp_path / "emails.txt"
f.write_text("")
monkeypatch.setattr(em, "EMAILS_FILE", f)
monkeypatch.setattr(em, "DATA_DIR", tmp_path)
monkeypatch.setattr(em, "DONE_FILE", tmp_path / "done.txt")
monkeypatch.setattr(em, "FAILED_FILE", tmp_path / "failed.txt")
assert await pop_account() is None
@pytest.mark.asyncio
async def test_pop_account_missing_file(tmp_path, monkeypatch):
monkeypatch.setattr(em, "EMAILS_FILE", tmp_path / "nope.txt")
assert await pop_account() is None
@pytest.mark.asyncio
async def test_mark_done(tmp_path, monkeypatch):
done_file = tmp_path / "done.txt"
monkeypatch.setattr(em, "DATA_DIR", tmp_path)
monkeypatch.setattr(em, "DONE_FILE", done_file)
await mark_done("test@example.com")
content = done_file.read_text()
assert "test@example.com" in content
@pytest.mark.asyncio
async def test_mark_failed(tmp_path, monkeypatch):
failed_file = tmp_path / "failed.txt"
monkeypatch.setattr(em, "DATA_DIR", tmp_path)
monkeypatch.setattr(em, "FAILED_FILE", failed_file)
await mark_failed("test@example.com")
content = failed_file.read_text()
assert "test@example.com" in content
@pytest.mark.asyncio
async def test_pop_all_accounts(tmp_path, monkeypatch):
f = tmp_path / "emails.txt"
f.write_text("a@b.com:pass1\n")
monkeypatch.setattr(em, "EMAILS_FILE", f)
monkeypatch.setattr(em, "DATA_DIR", tmp_path)
monkeypatch.setattr(em, "DONE_FILE", tmp_path / "done.txt")
monkeypatch.setattr(em, "FAILED_FILE", tmp_path / "failed.txt")
acc1 = await pop_account()
assert acc1.email == "a@b.com"
acc2 = await pop_account()
assert acc2 is None
assert await has_accounts() is False

126
tests/test_pool.py Normal file
View file

@ -0,0 +1,126 @@
import pytest
from pool import (
append_token,
pop_token,
pool_size,
get_first_token,
trigger_refill,
wait_for_token,
)
import pool as p
@pytest.fixture
def fresh_pool(tmp_path, monkeypatch):
tokens_file = tmp_path / "tokens.txt"
tokens_file.write_text("")
monkeypatch.setattr(p, "TOKENS_FILE", tokens_file)
monkeypatch.setattr(p, "DATA_DIR", tmp_path)
return tokens_file
@pytest.mark.asyncio
async def test_empty_pool(fresh_pool):
assert await get_first_token() is None
assert await pop_token() is None
assert await pool_size() == 0
@pytest.mark.asyncio
async def test_append_token(fresh_pool):
await append_token("token1")
assert await pool_size() == 1
assert await get_first_token() == "token1"
@pytest.mark.asyncio
async def test_append_multiple_tokens(fresh_pool):
await append_token("token1")
await append_token("token2")
await append_token("token3")
assert await pool_size() == 3
assert await get_first_token() == "token1"
@pytest.mark.asyncio
async def test_pop_token(fresh_pool):
await append_token("token1")
await append_token("token2")
t = await pop_token()
assert t == "token1"
assert await pool_size() == 1
assert await get_first_token() == "token2"
@pytest.mark.asyncio
async def test_pop_until_empty(fresh_pool):
await append_token("only_one")
t = await pop_token()
assert t == "only_one"
assert await pop_token() is None
assert await pool_size() == 0
@pytest.mark.asyncio
async def test_get_first_token_peek(fresh_pool):
await append_token("token1")
t = await get_first_token()
assert t == "token1"
t2 = await get_first_token()
assert t2 == "token1"
assert await pool_size() == 1
@pytest.mark.asyncio
async def test_token_with_whitespace(fresh_pool):
await append_token(" token_with_spaces ")
t = await get_first_token()
assert t == "token_with_spaces"
@pytest.mark.asyncio
async def test_persist_to_file(fresh_pool):
await append_token("token1")
await append_token("token2")
content = fresh_pool.read_text()
assert "token1" in content
assert "token2" in content
@pytest.mark.asyncio
async def test_wait_for_token_with_existing(fresh_pool):
await append_token("existing_token")
token = await wait_for_token()
assert token == "existing_token"
@pytest.mark.asyncio
async def test_wait_for_token_empty_no_accounts(fresh_pool, tmp_path, monkeypatch):
import emails
emails_file = tmp_path / "emails.txt"
emails_file.write_text("")
monkeypatch.setattr(emails, "EMAILS_FILE", emails_file)
token = await wait_for_token()
assert token is None
@pytest.mark.asyncio
async def test_trigger_refill_does_not_block(fresh_pool):
import asyncio
await trigger_refill()
await asyncio.sleep(0.01)
assert True

141
tests/test_server.py Normal file
View file

@ -0,0 +1,141 @@
import pytest
from unittest.mock import AsyncMock, patch
from aiohttp import web
from aiohttp.test_utils import AioHTTPTestCase
from server import create_app
class TestServer(AioHTTPTestCase):
async def get_application(self):
return create_app()
async def test_health(self):
resp = await self.client.get("/health")
assert resp.status == 200
text = await resp.text()
assert text == "ok"
@patch("server.get_first_token", new_callable=AsyncMock)
@patch("server.wait_for_token", new_callable=AsyncMock)
async def test_token_empty_pool_no_accounts(self, mock_wait, mock_first):
mock_first.return_value = None
mock_wait.return_value = None
resp = await self.client.get("/token")
assert resp.status == 200
data = await resp.json()
assert data["token"] == "public"
@patch("server.get_first_token", new_callable=AsyncMock)
@patch("server.wait_for_token", new_callable=AsyncMock)
async def test_token_empty_pool_waits_then_gets(self, mock_wait, mock_first):
mock_first.side_effect = [None, "new_token"]
mock_wait.return_value = "new_token"
resp = await self.client.get("/token")
assert resp.status == 200
data = await resp.json()
assert data["token"] == "new_token"
@patch("server.get_first_token", new_callable=AsyncMock)
@patch("server.pop_token", new_callable=AsyncMock)
@patch("server.trigger_refill", new_callable=AsyncMock)
@patch("server.pool_size", new_callable=AsyncMock)
@patch("server.get_balance", new_callable=AsyncMock)
async def test_token_valid(
self, mock_balance, mock_size, mock_refill, mock_pop, mock_first
):
mock_first.return_value = "test_token_12345"
mock_balance.return_value = {"balance": 10.0}
mock_size.return_value = 3
resp = await self.client.get("/token")
assert resp.status == 200
data = await resp.json()
assert data["token"] == "test_token_12345"
mock_pop.assert_not_called()
@patch("server.get_first_token", new_callable=AsyncMock)
@patch("server.pop_token", new_callable=AsyncMock)
@patch("server.trigger_refill", new_callable=AsyncMock)
@patch("server.pool_size", new_callable=AsyncMock)
@patch("server.get_balance", new_callable=AsyncMock)
async def test_token_zero_balance_removed(
self, mock_balance, mock_size, mock_refill, mock_pop, mock_first
):
mock_first.side_effect = ["bad_token", "good_token"]
mock_balance.side_effect = [{"balance": 0}, {"balance": 15.0}]
mock_size.return_value = 2
resp = await self.client.get("/token")
assert resp.status == 200
data = await resp.json()
assert data["token"] == "good_token"
assert mock_pop.call_count == 1
@patch("server.get_first_token", new_callable=AsyncMock)
@patch("server.pop_token", new_callable=AsyncMock)
@patch("server.trigger_refill", new_callable=AsyncMock)
@patch("server.pool_size", new_callable=AsyncMock)
@patch("server.get_balance", new_callable=AsyncMock)
async def test_token_balance_fetch_fails(
self, mock_balance, mock_size, mock_refill, mock_pop, mock_first
):
mock_first.side_effect = ["bad_token", "good_token"]
mock_balance.side_effect = [None, {"balance": 10.0}]
mock_size.return_value = 2
resp = await self.client.get("/token")
assert resp.status == 200
data = await resp.json()
assert data["token"] == "good_token"
mock_pop.assert_called()
@patch("server.get_first_token", new_callable=AsyncMock)
@patch("server.pop_token", new_callable=AsyncMock)
@patch("server.trigger_refill", new_callable=AsyncMock)
@patch("server.pool_size", new_callable=AsyncMock)
@patch("server.get_balance", new_callable=AsyncMock)
async def test_token_balance_with_remaining_key(
self, mock_balance, mock_size, mock_refill, mock_pop, mock_first
):
mock_first.return_value = "test_token"
mock_balance.return_value = {"remaining": 20.0}
mock_size.return_value = 1
resp = await self.client.get("/token")
assert resp.status == 200
data = await resp.json()
assert data["token"] == "test_token"
@patch("server.has_accounts", new_callable=AsyncMock)
@patch("server.pool_size", new_callable=AsyncMock)
async def test_status(self, mock_size, mock_has):
mock_size.return_value = 5
mock_has.return_value = True
resp = await self.client.get("/status")
assert resp.status == 200
data = await resp.json()
assert data["pool_size"] == 5
assert data["has_accounts"] is True
@patch("server.get_first_token", new_callable=AsyncMock)
@patch("server.pop_token", new_callable=AsyncMock)
@patch("server.trigger_refill", new_callable=AsyncMock)
@patch("server.wait_for_token", new_callable=AsyncMock)
@patch("server.pool_size", new_callable=AsyncMock)
@patch("server.get_balance", new_callable=AsyncMock)
async def test_all_tokens_exhausted_then_public(
self, mock_balance, mock_size, mock_wait, mock_refill, mock_pop, mock_first
):
mock_first.side_effect = ["token1", "token2", None]
mock_balance.side_effect = [{"balance": 0}, {"balance": -5}]
mock_wait.return_value = None
mock_size.return_value = 0
resp = await self.client.get("/token")
assert resp.status == 200
data = await resp.json()
assert data["token"] == "public"

View file

@ -1,72 +1,23 @@
import json
from tokens import (
ProviderTokens,
load_state,
save_state,
save_tokens,
promote_next_tokens,
clear_next_tokens,
)
import tokens as t
from tokens import ProviderTokens
def test_save_and_load_state(tmp_path, monkeypatch):
monkeypatch.setattr(t, "TOKENS_FILE", tmp_path / "tokens.json")
active = ProviderTokens("key1", None, 0)
nxt = ProviderTokens("key2", None, 0)
save_state(active, nxt)
a, n = load_state()
assert a is not None and a.access_token == "key1"
assert n is not None and n.access_token == "key2"
def test_provider_tokens_basic():
t = ProviderTokens(access_token="abc123", refresh_token=None, expires_at=0)
assert t.access_token == "abc123"
assert t.refresh_token is None
assert t.expires_at == 0
def test_promote_next_tokens(tmp_path, monkeypatch):
monkeypatch.setattr(t, "TOKENS_FILE", tmp_path / "tokens.json")
save_state(ProviderTokens("key1", None, 0), ProviderTokens("key2", None, 0))
assert promote_next_tokens() is True
a, n = load_state()
assert a is not None and a.access_token == "key2"
assert n is None
def test_provider_tokens_with_metadata():
t = ProviderTokens(
access_token="key",
refresh_token="refresh",
expires_at=12345.0,
metadata={"foo": "bar"},
)
assert t.metadata == {"foo": "bar"}
def test_clear_next_tokens(tmp_path, monkeypatch):
monkeypatch.setattr(t, "TOKENS_FILE", tmp_path / "tokens.json")
save_state(ProviderTokens("key1", None, 0), ProviderTokens("key2", None, 0))
clear_next_tokens()
a, n = load_state()
assert a is not None and a.access_token == "key1"
assert n is None
def test_save_tokens_preserves_next(tmp_path, monkeypatch):
monkeypatch.setattr(t, "TOKENS_FILE", tmp_path / "tokens.json")
save_state(ProviderTokens("key1", None, 0), ProviderTokens("key2", None, 0))
save_tokens(ProviderTokens("key3", None, 0))
a, n = load_state()
assert a is not None and a.access_token == "key3"
assert n is not None and n.access_token == "key2"
def test_load_missing_file(tmp_path, monkeypatch):
monkeypatch.setattr(t, "TOKENS_FILE", tmp_path / "missing.json")
a, n = load_state()
assert a is None and n is None
def test_atomic_write(tmp_path, monkeypatch):
f = tmp_path / "tokens.json"
monkeypatch.setattr(t, "TOKENS_FILE", f)
save_state(ProviderTokens("x", None, 0), None)
with open(f) as fp:
data = json.load(fp)
assert data["active"]["access_token"] == "x"
def test_provider_tokens_default_metadata():
t = ProviderTokens(access_token="x", refresh_token=None, expires_at=0)
assert t.metadata is None