1
0
Fork 0
This commit is contained in:
Arthur K. 2026-04-20 23:41:44 +03:00
parent 7cef56de15
commit ecb5f68e32
Signed by: wzray
GPG key ID: B97F30FDC4636357
17 changed files with 760 additions and 1626 deletions

View file

@ -1,163 +1,188 @@
# Plan: make local account state authoritative # Plan: rewrite token selection around a simple disk-first state model
## Problem ## Goal
Current `accounts.json` state is not acting like a real source of truth for `/token`: Throw away the current layered selection/cooldown/state model and replace it with a small implementation that:
- `active_account_id` is persisted but not actually used to drive selection - reads the main JSON file on every `/token` request
- `last_known_usage` is mostly treated as a sort hint, then immediately replaced by a live refresh - keeps only the minimum necessary account fields on disk
- `/token` live-refreshes every candidate it tries, so local limits are not truly trusted - decides from file state first
- the persisted snapshot contains duplicate derived fields (`used_percent`, `remaining_percent`, `exhausted`) in addition to canonical window and flag data - refreshes usage only when missing or stale
- as a result, the file is hard to reason about: some fields look authoritative but are only decorative or transient - validates the selected token before returning it
- moves invalid accounts to `failed.json`
- does not touch the helper scripts in this pass
## Desired behavior ## Required file model
1. `accounts.json` is the source of truth for `/token` selection. ### Main state file
2. If `active_account_id` exists and local state says the active account is usable, `/token` must try that account first.
3. Only if local state says the active account is blocked or unusable should `/token` fall back to choosing another account.
4. Fallback selection can keep the current ranking approach: most-used eligible account first.
5. `/usage` should continue to refresh all accounts live.
6. Persisted usage snapshots should store canonical data only, without duplicated derived fields.
## Recommended approach `accounts.json`
### 1. Split `/token` selection into local selection and live validation ```json
{
"active_account": "user@example.com",
"accounts": [
{
"email": "user@example.com",
"access_token": "...",
"refresh_token": "...",
"token_refresh_at": 1710000000,
"usage": {
"primary": {
"used_percent": 72,
"reset_at": 1710018000
},
"secondary": {
"used_percent": 18,
"reset_at": 1710600000
}
},
"usage_checked_at": 1710000000,
"disabled": false
}
]
}
```
In `src/gibby/manager.py`: Only these fields should exist for account state.
- Add `_find_active_account(state)` to resolve `state.active_account_id` to an `AccountRecord` ### Failed state file
- Add `_is_locally_blocked(account, current)` to decide from local file state only whether an account can be tried
- Add `_build_selection_order(state, current)` that:
- returns the active account first if it exists and is not locally blocked
- otherwise falls back to the remaining eligible accounts sorted by current saved-usage ranking
- never duplicates the active account in the fallback list
`_is_locally_blocked()` should use only persisted local state: `failed.json`
- blocked if `cooldown_until > now` ```json
- blocked if `last_known_usage` exists and local usage indicates exhaustion {
- otherwise not blocked "accounts": [
{
"email": "bad@example.com",
"access_token": "...",
"refresh_token": "...",
"token_refresh_at": 1710000000,
"usage": {
"primary": {
"used_percent": 100,
"reset_at": 1710018000
},
"secondary": {
"used_percent": 100,
"reset_at": 1710600000
}
},
"usage_checked_at": 1710000000,
"disabled": false
}
]
}
```
This gives the exact behavior the user requested: Top-level must contain only `accounts`.
- active account is mandatory first choice when nothing local blocks it ## Selection rules
- local file decides whether active is allowed before any network call
- live refresh remains only a validation step for the chosen candidate
### 2. Keep live refresh only for accounts actually attempted ### Active account first
In `src/gibby/manager.py`: For each `/token` request:
- keep the current live refresh path (`refresh_account_usage`) once an account has been selected for an attempt 1. Read `accounts.json` fresh from disk.
- if active account passes local checks but fails live validation, persist the updated state and continue to the next candidate 2. Resolve `active_account` by email.
- if active account is locally blocked, skip live refresh for it during that `/token` call 3. Evaluate active first.
`/usage` stays as-is and continues refreshing all accounts live. ### When an account is usable
### 3. Clean up the persisted usage snapshot schema An account is usable when:
In `src/gibby/models.py` and `src/gibby/store.py`: - `disabled == false`
- `secondary.used_percent < 100`
- `primary.used_percent < GIBBY_EXHAUSTED_USAGE_THRESHOLD`
- stop persisting derived snapshot fields: Default threshold remains `95`.
- `used_percent`
- `remaining_percent`
- `exhausted`
- keep only canonical persisted snapshot data:
- `checked_at`
- `primary_window`
- `secondary_window`
- `limit_reached`
- `allowed`
Implementation direction: ### Usage freshness
- keep `UsageSnapshot` as the in-memory model for now, but derive: Usage must be refreshed only when missing or stale.
- `used_percent`
- `remaining_percent`
- `exhausted`
from canonical fields when loading/parsing
- update `store._snapshot_to_dict()` to write only canonical fields
- update `store._snapshot_from_dict()` to reconstruct the full in-memory `UsageSnapshot` from canonical persisted fields
This keeps code churn smaller than a full model rewrite while making the file itself cleaner and more honest. Add env:
### 4. Keep cooldown as the persisted local block, but make local exhaustion matter too - `GIBBY_USAGE_STALE_SECONDS`, default `3600`
Local selection should not depend on a fresh API round-trip. Usage is stale when:
For `/token`: - `usage` is missing
- `usage_checked_at` is missing
- `now - usage_checked_at > GIBBY_USAGE_STALE_SECONDS`
- `cooldown_until` remains the strongest persisted block If active account usage is stale or missing, refresh usage for that account before deciding if it is usable.
- if cooldown is clear but local `last_known_usage` still says exhausted, treat the account as locally blocked too
- only accounts that pass local checks are eligible to be attempted live
This changes current behavior in an important way: ### Fallback selection
- today, an account with expired or missing cooldown can still be live-refreshed even if local snapshot says exhausted If active account cannot be used, choose the next account by:
- after the change, local state truly gates the initial decision
### 5. Preserve existing fallback ranking for non-active accounts - filtering to usable accounts
- sorting by highest primary `used_percent`
- using file order as the tie-breaker
After active account is rejected locally, keep the current fallback sort in `manager.py`: If a new account is chosen, write its email into `active_account` in `accounts.json`.
- primary window used percent descending ## Token flow
- secondary window used percent descending
That avoids a larger policy change in this pass and isolates the refactor to "trust local state first". For the chosen account:
## Files to modify 1. Ensure token is fresh enough.
2. If `token_refresh_at` says refresh is needed, refresh token and persist new values.
3. After selection decisions are finished and the token is ready, validate it by calling:
- `/home/wzray/AI/gibby/src/gibby/manager.py` `https://chatgpt.com/backend-api/codex/models`
- respect `active_account_id`
- add local-only eligibility predicate 4. Only return the token if validation returns `200`.
- change selection order to active-first-when-locally-usable
- `/home/wzray/AI/gibby/src/gibby/models.py` ## Invalid account handling
- keep canonical usage derivation helpers centralized
- support reconstructing derived values from canonical fields If refresh, usage auth, or final validation shows the token/account is invalid:
1. Read current main state.
2. Remove that full account object from `accounts.json`.
3. Append the same full account object to `failed.json.accounts`.
4. If it was the active account, clear `active_account` before reselection.
5. Persist both files atomically.
No `failed.txt` in the rewritten core flow.
## Files to rewrite
- `/home/wzray/AI/gibby/src/gibby/settings.py`
- keep only env needed for the new flow
- `/home/wzray/AI/gibby/src/gibby/store.py` - `/home/wzray/AI/gibby/src/gibby/store.py`
- write canonical snapshot shape only - rewrite as simple JSON read/write helpers for `accounts.json` and `failed.json`
- read canonical snapshot shape into full in-memory model - `/home/wzray/AI/gibby/src/gibby/client.py`
- keep only token refresh, usage fetch, and token validation calls
- `/home/wzray/AI/gibby/src/gibby/manager.py`
- rewrite into one small service for `/token`
- `/home/wzray/AI/gibby/src/gibby/app.py`
- keep thin FastAPI wiring for `/health` and `/token`
## Files to remove or stop using
- `/home/wzray/AI/gibby/src/gibby/models.py`
- `/home/wzray/AI/gibby/src/gibby/account_ops.py` - `/home/wzray/AI/gibby/src/gibby/account_ops.py`
- keep refresh path aligned with canonical snapshot handling
- reuse a local exhaustion predicate if helpful instead of duplicating logic
- `/home/wzray/AI/gibby/tests/test_core.py`
- add and update selection behavior tests
- `/home/wzray/AI/gibby/tests/test_account_ops.py`
- update snapshot persistence assumptions if needed
- `/home/wzray/AI/gibby/tests/test_app.py`
- adjust fixture shapes only if response expectations change
- `/home/wzray/AI/gibby/tests/test_refresh_limits.py`
- ensure live refresh still rewrites canonical local state correctly
- `/home/wzray/AI/gibby/tests/test_oauth_helper.py`
- ensure oauth helper stores canonical snapshot shape correctly
## Test plan Their logic should be folded into the new minimal data model and service flow instead of preserved.
### Selection behavior ## Out of scope for this pass
Add or update tests in `tests/test_core.py` for: - do not touch `scripts/oauth_helper.py`
- do not touch `scripts/refresh_limits.py`
- active account is used first when locally allowed, even if another account has higher saved usage - do not preserve old cooldown, failed.txt, dual-state, or derived snapshot machinery unless absolutely required to keep app booting during rewrite
- active account is skipped without live refresh when `cooldown_until` is still active
- active account is skipped without live refresh when local snapshot says exhausted
- active account passes local checks but fails live refresh, then fallback account is tried
- missing or stale `active_account_id` falls back cleanly to non-active selection logic
- fallback ordering still prefers higher saved primary usage and uses secondary as tie-breaker
### Snapshot/file behavior
Add or update tests to verify:
- `accounts.json` no longer writes `used_percent`, `remaining_percent`, or `exhausted`
- loading canonical persisted snapshots still reconstructs full in-memory `UsageSnapshot`
- `/usage`, `refresh_limits.py`, and `oauth_helper.py` still persist refreshed canonical state correctly
## Verification ## Verification
- `uv run pytest -q tests/test_core.py tests/test_account_ops.py tests/test_app.py tests/test_refresh_limits.py tests/test_oauth_helper.py` - `uv run pytest -q`
- inspect a real `accounts.json` after `/usage` or `refresh_limits.py` and confirm snapshot entries contain only canonical fields - API tests for:
- manually test `/token` with: - `/health` returns `ok`
- a locally usable active account - `/token` returns `503` when file has no usable accounts
- a locally blocked active account - `/token` prefers active account when usable
- a dangling `active_account_id` - `/token` rereads the file between requests
- verify that active account is not live-refreshed when local file state already blocks it - stale usage triggers a refresh before decision
- fresh usage skips refresh
- invalid token moves full account object to `failed.json`
- fallback chooses highest primary usage among usable non-disabled accounts
- direct file tests for exact `accounts.json` and `failed.json` schema

View file

@ -10,20 +10,9 @@ from urllib.parse import parse_qs, urlparse
sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src")) sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src"))
from gibby.account_ops import (
PermanentAccountFailure,
failed_identifier,
refresh_account_usage,
window_used_percent,
)
from gibby.client import OpenAIAPIError, OpenAIClient from gibby.client import OpenAIAPIError, OpenAIClient
from gibby.models import AccountRecord, format_reset_in from gibby.models import AccountRecord, format_reset_in, parse_usage_payload
from gibby.oauth import ( from gibby.oauth import build_authorize_url, generate_pkce_pair, generate_state
build_authorize_url,
generate_pkce_pair,
generate_state,
make_account_id,
)
from gibby.settings import Settings from gibby.settings import Settings
from gibby.store import JsonStateStore from gibby.store import JsonStateStore
@ -55,12 +44,10 @@ async def wait_for_callback(
code, state = parse_redirect_url(parts[1]) code, state = parse_redirect_url(parts[1])
if not result.done(): if not result.done():
result.set_result((code, state)) result.set_result((code, state))
while True: while True:
line = await reader.readline() line = await reader.readline()
if not line or line == b"\r\n": if not line or line == b"\r\n":
break break
writer.write( writer.write(
b"HTTP/1.1 200 OK\r\n" b"HTTP/1.1 200 OK\r\n"
b"Content-Type: text/plain; charset=utf-8\r\n" b"Content-Type: text/plain; charset=utf-8\r\n"
@ -91,35 +78,34 @@ async def exchange_and_store_account(
verifier: str, verifier: str,
set_active: bool, set_active: bool,
) -> AccountRecord: ) -> AccountRecord:
access_token, refresh_token, expires_at = await client.exchange_code(code, verifier) access_token, refresh_token, token_refresh_at = await client.exchange_code(code, verifier)
account = AccountRecord( account = AccountRecord(
id=make_account_id(), email="",
access_token=access_token, access_token=access_token,
refresh_token=refresh_token, refresh_token=refresh_token,
expires_at=expires_at, token_refresh_at=token_refresh_at,
) )
try: try:
usage = await refresh_account_usage( payload = await client.fetch_usage_payload(account.access_token)
account, email = payload.get("email")
client, if isinstance(email, str) and email:
client.settings.exhausted_usage_threshold, account.email = email
) account.usage = parse_usage_payload(payload)
except PermanentAccountFailure: account.usage_checked_at = account.usage.checked_at
store.append_failed_identifier(failed_identifier(account)) except OpenAIAPIError:
if not account.email:
raise raise
except OpenAIAPIError as exc:
account.last_error = str(exc)
store.upsert_account(account, set_active=set_active) store.upsert_account(account, set_active=set_active)
print("Usage fetch failed, stored account without usage snapshot.") print("Usage fetch failed, stored account without usage snapshot.")
return account return account
store.upsert_account(account, set_active=set_active) store.upsert_account(account, set_active=set_active)
print( print(
f"token ready for {account.id}, " f"token ready for {account.email}, "
f"primary {window_used_percent(usage.primary_window)}% " f"primary {account.usage.primary_window.used_percent if account.usage and account.usage.primary_window else 0}% "
f"reset in {format_reset_in(usage.primary_window.reset_at if usage.primary_window else None)}, " f"reset in {format_reset_in(account.usage.primary_window.reset_at if account.usage and account.usage.primary_window else None)}, "
f"secondary {window_used_percent(usage.secondary_window)}% " f"secondary {account.usage.secondary_window.used_percent if account.usage and account.usage.secondary_window else 0}% "
f"reset in {format_reset_in(usage.secondary_window.reset_at if usage.secondary_window else None)}" f"reset in {format_reset_in(account.usage.secondary_window.reset_at if account.usage and account.usage.secondary_window else None)}"
) )
return account return account
@ -168,14 +154,12 @@ async def run(
store = JsonStateStore(settings.accounts_file, settings.failed_file) store = JsonStateStore(settings.accounts_file, settings.failed_file)
client = OpenAIClient(settings) client = OpenAIClient(settings)
try: try:
account = await exchange_and_store_account( account = await exchange_and_store_account(store, client, code, verifier, set_active)
store, client, code, verifier, set_active
)
finally: finally:
await client.aclose() await client.aclose()
print(f"Stored account: {account.id}") print(f"Stored account: {account.email}")
print(f"Access token expires at: {account.expires_at}") print(f"Access token refresh at: {account.token_refresh_at}")
def main() -> None: def main() -> None:
@ -192,7 +176,7 @@ def main() -> None:
print("Timed out waiting for OAuth callback.") print("Timed out waiting for OAuth callback.")
except ValueError as exc: except ValueError as exc:
print(str(exc)) print(str(exc))
except PermanentAccountFailure as exc: except OpenAIAPIError as exc:
print(str(exc)) print(str(exc))

View file

@ -7,14 +7,8 @@ from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src")) sys.path.insert(0, str(Path(__file__).resolve().parents[1] / "src"))
from gibby.account_ops import ( from gibby.client import OpenAIAPIError, OpenAIClient
PermanentAccountFailure, from gibby.models import format_reset_in, parse_usage_payload
handle_failed_account,
refresh_account_usage,
window_used_percent,
)
from gibby.client import OpenAIClient
from gibby.models import format_reset_in
from gibby.settings import Settings from gibby.settings import Settings
from gibby.store import JsonStateStore from gibby.store import JsonStateStore
@ -33,29 +27,23 @@ async def run(data_dir: Path | None = None) -> None:
client = OpenAIClient(settings) client = OpenAIClient(settings)
try: try:
for account in list(state.accounts): for account in list(state.accounts):
previous_id = account.id
try: try:
usage = await refresh_account_usage( payload = await client.fetch_usage_payload(account.access_token)
account, account.usage = parse_usage_payload(payload)
client, account.usage_checked_at = account.usage.checked_at
settings.exhausted_usage_threshold,
)
store.update_active_account_id(state, previous_id, account.id)
print( print(
f"token ready for {account.id}, " f"token ready for {account.email}, "
f"primary {window_used_percent(usage.primary_window)}% " f"primary {account.usage.primary_window.used_percent if account.usage.primary_window else 0}% "
f"reset in {format_reset_in(usage.primary_window.reset_at if usage.primary_window else None)}, " f"reset in {format_reset_in(account.usage.primary_window.reset_at if account.usage and account.usage.primary_window else None)}, "
f"secondary {window_used_percent(usage.secondary_window)}% " f"secondary {account.usage.secondary_window.used_percent if account.usage and account.usage.secondary_window else 0}% "
f"reset in {format_reset_in(usage.secondary_window.reset_at if usage.secondary_window else None)}" f"reset in {format_reset_in(account.usage.secondary_window.reset_at if account.usage and account.usage.secondary_window else None)}"
) )
except PermanentAccountFailure as exc: except OpenAIAPIError as exc:
account.last_error = str(exc) if exc.permanent:
handle_failed_account(store, account) store.move_to_failed(state, account.email)
store.remove_account(state, account.id) print(f"{account.email}: removed={exc}")
print(f"{account.id}: removed={exc}") else:
except Exception as exc: print(f"{account.email}: error={exc}")
account.last_error = str(exc)
print(f"{account.id}: error={exc}")
store.save(state) store.save(state)
finally: finally:
await client.aclose() await client.aclose()

View file

@ -1,99 +0,0 @@
from __future__ import annotations
from gibby.client import OpenAIAPIError, OpenAIClient
from gibby.models import (
AccountRecord,
UsageSnapshot,
UsageWindow,
compute_cooldown_until,
now_ts,
parse_usage_payload,
)
from gibby.store import JsonStateStore
class PermanentAccountFailure(RuntimeError):
def __init__(self, account: AccountRecord, reason: str):
super().__init__(reason)
self.account = account
def failed_identifier(account: AccountRecord) -> str:
return account.email or account.account_id or account.id
def sync_account_identity(account: AccountRecord) -> None:
if account.email:
account.id = account.email
def handle_failed_account(store: JsonStateStore, account: AccountRecord) -> None:
store.append_failed_identifier(failed_identifier(account))
def window_used_percent(window: UsageWindow | None) -> int:
if window is None:
return 0
return window.used_percent
def snapshot_is_exhausted(
snapshot: UsageSnapshot, exhausted_threshold: int = 100
) -> bool:
if snapshot.exhausted:
return True
if snapshot.limit_reached or not snapshot.allowed:
return True
pri = snapshot.primary_window
sec = snapshot.secondary_window
if pri is not None and pri.used_percent >= exhausted_threshold:
return True
if sec is not None and sec.used_percent >= exhausted_threshold:
return True
return False
async def ensure_fresh_access_token(
account: AccountRecord, client: OpenAIClient
) -> None:
if account.expires_at > now_ts() + 30:
return
if not account.refresh_token:
raise RuntimeError("Missing refresh token")
try:
access_token, refresh_token, expires_at = await client.refresh_access_token(
account.refresh_token
)
except OpenAIAPIError as exc:
if exc.permanent:
raise PermanentAccountFailure(account, str(exc)) from exc
raise
account.access_token = access_token
account.refresh_token = refresh_token
account.expires_at = expires_at
async def refresh_account_usage(
account: AccountRecord,
client: OpenAIClient,
exhausted_threshold: int = 100,
) -> UsageSnapshot:
await ensure_fresh_access_token(account, client)
try:
payload = await client.fetch_usage_payload(account.access_token)
except OpenAIAPIError as exc:
if exc.permanent:
raise PermanentAccountFailure(account, str(exc)) from exc
raise
account.email = payload.get("email") or account.email
account.account_id = payload.get("account_id") or account.account_id
sync_account_identity(account)
usage = parse_usage_payload(payload)
account.last_known_usage = usage
account.last_error = None
account.cooldown_until = (
compute_cooldown_until(usage, exhausted_threshold)
if snapshot_is_exhausted(usage, exhausted_threshold)
else None
)
return usage

View file

@ -49,10 +49,6 @@ def create_app(manager: AccountManager | None = None) -> FastAPI:
except NoUsableAccountError as exc: except NoUsableAccountError as exc:
return JSONResponse(status_code=503, content={"error": str(exc)}) return JSONResponse(status_code=503, content={"error": str(exc)})
@app.get("/usage", response_model=None)
async def usage() -> Any:
return await app.state.manager.get_usage_report()
return app return app

View file

@ -8,6 +8,8 @@ from gibby.models import UsageSnapshot, now_ts, parse_usage_payload
from gibby.oauth import TOKEN_URL, USAGE_URL from gibby.oauth import TOKEN_URL, USAGE_URL
from gibby.settings import Settings from gibby.settings import Settings
MODELS_URL = "https://chatgpt.com/backend-api/codex/models"
class OpenAIAPIError(RuntimeError): class OpenAIAPIError(RuntimeError):
def __init__( def __init__(
@ -73,8 +75,8 @@ class OpenAIClient:
response = await self.http_client.post(TOKEN_URL, data=payload) response = await self.http_client.post(TOKEN_URL, data=payload)
_raise_for_openai_error(response) _raise_for_openai_error(response)
body = response.json() body = response.json()
expires_at = now_ts() + int(body["expires_in"]) refresh_at = now_ts() + int(body["expires_in"])
return body["access_token"], body["refresh_token"], expires_at return body["access_token"], body["refresh_token"], refresh_at
async def refresh_access_token(self, refresh_token: str) -> tuple[str, str, int]: async def refresh_access_token(self, refresh_token: str) -> tuple[str, str, int]:
payload = { payload = {
@ -85,17 +87,11 @@ class OpenAIClient:
response = await self.http_client.post(TOKEN_URL, data=payload) response = await self.http_client.post(TOKEN_URL, data=payload)
_raise_for_openai_error(response) _raise_for_openai_error(response)
body = response.json() body = response.json()
expires_at = now_ts() + int(body["expires_in"]) refresh_at = now_ts() + int(body["expires_in"])
next_refresh = str(body.get("refresh_token") or refresh_token) return body["access_token"], str(body.get("refresh_token") or refresh_token), refresh_at
return body["access_token"], next_refresh, expires_at
async def fetch_usage_payload(self, access_token: str) -> dict: async def fetch_usage_payload(self, access_token: str) -> dict:
headers = { response = await self.http_client.get(USAGE_URL, headers=self._headers(access_token))
"Authorization": f"Bearer {access_token}",
"User-Agent": "CodexProxy",
"Accept": "application/json",
}
response = await self.http_client.get(USAGE_URL, headers=headers)
_raise_for_openai_error(response) _raise_for_openai_error(response)
payload = response.json() payload = response.json()
if not isinstance(payload, dict): if not isinstance(payload, dict):
@ -106,3 +102,15 @@ class OpenAIClient:
async def fetch_usage(self, access_token: str) -> UsageSnapshot: async def fetch_usage(self, access_token: str) -> UsageSnapshot:
return parse_usage_payload(await self.fetch_usage_payload(access_token)) return parse_usage_payload(await self.fetch_usage_payload(access_token))
async def validate_token(self, access_token: str) -> bool:
response = await self.http_client.get(MODELS_URL, headers=self._headers(access_token))
return response.status_code == 200
@staticmethod
def _headers(access_token: str) -> dict[str, str]:
return {
"Authorization": f"Bearer {access_token}",
"User-Agent": "CodexProxy",
"Accept": "application/json",
}

View file

@ -5,22 +5,8 @@ import logging
from dataclasses import asdict from dataclasses import asdict
from typing import Any from typing import Any
from gibby.account_ops import ( from gibby.client import OpenAIAPIError, OpenAIClient
PermanentAccountFailure, from gibby.models import AccountRecord, StateFile, UsageSnapshot, build_limit, now_ts, parse_usage_payload
failed_identifier,
handle_failed_account,
refresh_account_usage,
snapshot_is_exhausted,
window_used_percent,
)
from gibby.client import OpenAIClient
from gibby.models import (
AccountRecord,
StateFile,
build_limit,
format_reset_in,
now_ts,
)
from gibby.settings import Settings from gibby.settings import Settings
from gibby.store import JsonStateStore from gibby.store import JsonStateStore
@ -40,92 +26,34 @@ class AccountManager:
async def issue_token_response(self) -> dict[str, Any]: async def issue_token_response(self) -> dict[str, Any]:
async with self._lock: async with self._lock:
while True:
state = self.store.load() state = self.store.load()
if not state.accounts: if not state.accounts:
logger.error("No accounts configured")
raise NoUsableAccountError("No accounts configured")
current = now_ts()
for account in self._build_selection_order(state, current):
payload = await self._try_issue_token(state, account)
if payload is not None:
return payload
self.store.save(state)
logger.error(
"No usable account available: %s",
[
{
"id": account.id,
"cooldown_until": account.cooldown_until,
"last_error": account.last_error,
}
for account in state.accounts
],
)
raise NoUsableAccountError("No usable account available") raise NoUsableAccountError("No usable account available")
async def _try_issue_token( account = await self._select_account(state)
self, state: StateFile, account: AccountRecord if account is None:
) -> dict[str, Any] | None: raise NoUsableAccountError("No usable account available")
previous_id = account.id
try:
usage = await refresh_account_usage(
account,
self.client,
self.settings.exhausted_usage_threshold,
)
except PermanentAccountFailure as exc:
account.last_error = str(exc)
handle_failed_account(self.store, account)
self.store.remove_account(state, account.id)
self.store.save(state)
logger.error(
"Removed failed account %s (%s): %s",
account.id,
failed_identifier(account),
exc,
)
return None
except Exception as exc:
account.last_error = str(exc)
logger.exception(
"Account %s failed during refresh or usage check",
account.id,
)
self.store.update_active_account_id(state, previous_id, account.id)
self.store.save(state)
return None
self.store.update_active_account_id(state, previous_id, account.id) state = self.store.load()
account.last_known_usage = usage account = self._find_account(state, account.email)
account.last_error = None if account is None:
if snapshot_is_exhausted(usage, self.settings.exhausted_usage_threshold): continue
logger.warning(
"Account %s exhausted: primary=%s%% secondary=%s%% cooldown_until=%s",
account.id,
window_used_percent(usage.primary_window),
window_used_percent(usage.secondary_window),
account.cooldown_until,
)
self.store.save(state)
return None
account.cooldown_until = None if not await self._ensure_fresh_token(state, account):
state.active_account_id = account.id continue
self.store.save(state) self.store.save(state)
logger.info(
"token issued for %s, primary %s%% reset in %s, secondary %s%% reset in %s", if not await self.client.validate_token(account.access_token):
account.id, logger.warning("account %s failed token validation", account.email)
window_used_percent(usage.primary_window), self.store.move_to_failed(state, account.email)
format_reset_in( self.store.save(state)
usage.primary_window.reset_at if usage.primary_window else None continue
),
window_used_percent(usage.secondary_window), state.active_account = account.email
format_reset_in( self.store.save(state)
usage.secondary_window.reset_at if usage.secondary_window else None usage = account.usage or UsageSnapshot(checked_at=now_ts())
), logger.info("token issued for %s", account.email)
)
return { return {
"token": account.access_token, "token": account.access_token,
"limit": build_limit(usage), "limit": build_limit(usage),
@ -139,133 +67,103 @@ class AccountManager:
}, },
} }
async def get_usage_report(self) -> dict[str, Any]: async def _select_account(self, state: StateFile) -> AccountRecord | None:
async with self._lock: now = now_ts()
state = self.store.load() active = self._find_account(state, state.active_account)
accounts_report: list[dict[str, Any]] = [] if active is not None:
prepared = await self._prepare_account(state, active, now)
if prepared is not None:
return prepared
for account in list(state.accounts): candidates: list[AccountRecord] = []
previous_id = account.id
try:
usage = await refresh_account_usage(
account,
self.client,
)
except PermanentAccountFailure as exc:
account.last_error = str(exc)
identifier = failed_identifier(account)
handle_failed_account(self.store, account)
self.store.remove_account(state, account.id)
logger.error(
"Removed failed account %s (%s): %s",
account.id,
identifier,
exc,
)
accounts_report.append(
{
"id": account.id,
"email": account.email,
"status": "removed",
"error": str(exc),
}
)
continue
except Exception as exc:
account.last_error = str(exc)
logger.exception(
"Account %s failed during usage refresh", account.id
)
accounts_report.append(
{
"id": account.id,
"email": account.email,
"status": "error",
"error": str(exc),
"cooldown_until": account.cooldown_until,
}
)
continue
self.store.update_active_account_id(state, previous_id, account.id)
status = (
"depleted"
if snapshot_is_exhausted(
usage, self.settings.exhausted_usage_threshold
)
else "ok"
)
accounts_report.append(
{
"id": account.id,
"email": account.email,
"status": status,
"used_percent": usage.used_percent,
"remaining_percent": usage.remaining_percent,
"cooldown_until": account.cooldown_until,
"primary_window": asdict(usage.primary_window)
if usage.primary_window
else None,
"secondary_window": asdict(usage.secondary_window)
if usage.secondary_window
else None,
}
)
self.store.save(state)
return {
"accounts": accounts_report,
"active_account_id": state.active_account_id,
"count": len(accounts_report),
}
def _find_active_account(self, state: StateFile) -> AccountRecord | None:
if not state.active_account_id:
return None
for account in state.accounts: for account in state.accounts:
if account.id == state.active_account_id: if active is not None and account.email == active.email:
continue
prepared = await self._prepare_account(state, account, now)
if prepared is not None:
candidates.append(prepared)
if not candidates:
self.store.save(state)
return None
selected = max(candidates, key=lambda item: self._primary_used(item))
if state.active_account != selected.email:
state.active_account = selected.email
self.store.save(state)
return selected
async def _prepare_account(
self, state: StateFile, account: AccountRecord, now: int
) -> AccountRecord | None:
if account.disabled:
return None
if self._usage_is_stale(account, now):
try:
if not await self._refresh_usage(state, account):
return None
except OpenAIAPIError as exc:
if exc.permanent:
logger.warning("account %s failed usage refresh", account.email)
self.store.move_to_failed(state, account.email)
self.store.save(state)
return None
if self._is_usable(account):
return account return account
return None return None
def _is_locally_blocked(self, account: AccountRecord, current: int) -> bool: async def _refresh_usage(self, state: StateFile, account: AccountRecord) -> bool:
if account.cooldown_until and account.cooldown_until > current: if not await self._ensure_fresh_token(state, account):
return True
if account.last_known_usage and snapshot_is_exhausted(
account.last_known_usage, self.settings.exhausted_usage_threshold
):
return True
return False return False
payload = await self.client.fetch_usage_payload(account.access_token)
email = payload.get("email")
if isinstance(email, str) and email:
account.email = email
account.usage = parse_usage_payload(payload)
account.usage_checked_at = account.usage.checked_at
self.store.save(state)
return True
def _build_selection_order( async def _ensure_fresh_token(self, state: StateFile, account: AccountRecord) -> bool:
self, state: StateFile, current: int if account.token_refresh_at > now_ts() + self.settings.token_refresh_buffer_seconds:
) -> list[AccountRecord]: return True
active = self._find_active_account(state) try:
excluded_ids = {active.id} if active is not None else set() access_token, refresh_token, refresh_at = await self.client.refresh_access_token(
fallback = self._iter_candidates(state, current, excluded_ids) account.refresh_token
if active is None:
return fallback
if self._is_locally_blocked(active, current):
logger.info("active account %s is blocked by local state", active.id)
return fallback
return [active, *fallback]
def _iter_candidates(
self, state: StateFile, current: int, excluded_ids: set[str] | None = None
) -> list[AccountRecord]:
excluded_ids = excluded_ids or set()
available = [
account
for account in state.accounts
if account.id not in excluded_ids
and not self._is_locally_blocked(account, current)
]
return sorted(available, key=self._candidate_sort_key, reverse=True)
def _candidate_sort_key(self, account: AccountRecord) -> tuple[int, int]:
snapshot = account.last_known_usage
if snapshot is None:
return (0, 0)
return (
window_used_percent(snapshot.primary_window),
window_used_percent(snapshot.secondary_window),
) )
except OpenAIAPIError as exc:
if exc.permanent:
logger.warning("account %s failed token refresh", account.email)
self.store.move_to_failed(state, account.email)
self.store.save(state)
return False
raise
account.access_token = access_token
account.refresh_token = refresh_token
account.token_refresh_at = refresh_at
return True
def _find_account(self, state: StateFile, email: str | None) -> AccountRecord | None:
if not email:
return None
for account in state.accounts:
if account.email == email:
return account
return None
def _usage_is_stale(self, account: AccountRecord, now: int) -> bool:
if account.usage is None or account.usage_checked_at is None:
return True
return now - account.usage_checked_at > self.settings.usage_stale_seconds
def _is_usable(self, account: AccountRecord) -> bool:
if account.disabled or account.usage is None:
return False
primary = account.usage.primary_window.used_percent if account.usage.primary_window else 0
secondary = account.usage.secondary_window.used_percent if account.usage.secondary_window else 0
return primary < self.settings.exhausted_usage_threshold and secondary < 100
def _primary_used(self, account: AccountRecord) -> int:
if account.usage is None or account.usage.primary_window is None:
return 0
return account.usage.primary_window.used_percent

View file

@ -4,46 +4,51 @@ import time
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import Any from typing import Any
UNKNOWN_EXHAUSTED_BACKOFF_SECONDS = 300
@dataclass(slots=True) @dataclass(slots=True)
class UsageWindow: class UsageWindow:
used_percent: int used_percent: int = 0
limit_window_seconds: int reset_at: int | None = None
reset_after_seconds: int
reset_at: int
@dataclass(slots=True) @dataclass(slots=True)
class UsageSnapshot: class UsageSnapshot:
checked_at: int checked_at: int
used_percent: int primary_window: UsageWindow | None = None
remaining_percent: int secondary_window: UsageWindow | None = None
exhausted: bool limit_reached: bool = False
primary_window: UsageWindow | None allowed: bool = True
secondary_window: UsageWindow | None
limit_reached: bool @property
allowed: bool def used_percent(self) -> int:
return max(
(window.used_percent for window in (self.primary_window, self.secondary_window) if window),
default=0,
)
@property
def remaining_percent(self) -> int:
return max(0, 100 - self.used_percent)
@property
def exhausted(self) -> bool:
return self.used_percent >= 100 or self.limit_reached or not self.allowed
@dataclass(slots=True) @dataclass(slots=True)
class AccountRecord: class AccountRecord:
id: str email: str
email: str | None = None
account_id: str | None = None
access_token: str = "" access_token: str = ""
refresh_token: str = "" refresh_token: str = ""
expires_at: int = 0 token_refresh_at: int = 0
cooldown_until: int | None = None usage: UsageSnapshot | None = None
last_known_usage: UsageSnapshot | None = None usage_checked_at: int | None = None
last_error: str | None = None disabled: bool = False
@dataclass(slots=True) @dataclass(slots=True)
class StateFile: class StateFile:
version: int = 1 active_account: str | None = None
active_account_id: str | None = None
accounts: list[AccountRecord] = field(default_factory=list) accounts: list[AccountRecord] = field(default_factory=list)
@ -51,13 +56,20 @@ def now_ts() -> int:
return int(time.time()) return int(time.time())
def clamp_percent(value: Any) -> int:
try:
number = int(round(float(value)))
except (TypeError, ValueError):
return 0
return max(0, min(100, number))
def format_reset_in(reset_at: int | None, current: int | None = None) -> str: def format_reset_in(reset_at: int | None, current: int | None = None) -> str:
if not reset_at: if not reset_at:
return "n/a" return "n/a"
remaining = reset_at - (current if current is not None else now_ts()) remaining = reset_at - (current if current is not None else now_ts())
if remaining <= 0: if remaining <= 0:
return "now" return "now"
days, remainder = divmod(remaining, 86400) days, remainder = divmod(remaining, 86400)
hours, remainder = divmod(remainder, 3600) hours, remainder = divmod(remainder, 3600)
minutes, seconds = divmod(remainder, 60) minutes, seconds = divmod(remainder, 60)
@ -73,70 +85,13 @@ def format_reset_in(reset_at: int | None, current: int | None = None) -> str:
return " ".join(parts) return " ".join(parts)
def clamp_percent(value: Any) -> int:
try:
num = float(value)
except TypeError, ValueError:
return 0
if num < 0:
return 0
if num > 100:
return 100
return int(round(num))
def parse_usage_window(window: dict[str, Any] | None) -> UsageWindow | None: def parse_usage_window(window: dict[str, Any] | None) -> UsageWindow | None:
if not isinstance(window, dict): if not isinstance(window, dict):
return None return None
reset_at = window.get("reset_at")
return UsageWindow( return UsageWindow(
used_percent=clamp_percent(window.get("used_percent") or 0), used_percent=clamp_percent(window.get("used_percent") or 0),
limit_window_seconds=int(window.get("limit_window_seconds") or 0), reset_at=int(reset_at) if reset_at else None,
reset_after_seconds=int(window.get("reset_after_seconds") or 0),
reset_at=int(window.get("reset_at") or 0),
)
def _effective_used_percent(
primary: UsageWindow | None, secondary: UsageWindow | None
) -> int:
return max(
(window.used_percent for window in (primary, secondary) if window), default=0
)
def _snapshot_is_exhausted(
primary: UsageWindow | None,
secondary: UsageWindow | None,
limit_reached: bool,
allowed: bool,
) -> bool:
return (
_effective_used_percent(primary, secondary) >= 100
or limit_reached
or not allowed
)
def build_usage_snapshot(
*,
checked_at: int,
primary_window: UsageWindow | None,
secondary_window: UsageWindow | None,
limit_reached: bool,
allowed: bool,
) -> UsageSnapshot:
used_percent = _effective_used_percent(primary_window, secondary_window)
return UsageSnapshot(
checked_at=checked_at,
used_percent=used_percent,
remaining_percent=max(0, 100 - used_percent),
exhausted=_snapshot_is_exhausted(
primary_window, secondary_window, limit_reached, allowed
),
primary_window=primary_window,
secondary_window=secondary_window,
limit_reached=limit_reached,
allowed=allowed,
) )
@ -145,16 +100,12 @@ def parse_usage_payload(
) -> UsageSnapshot: ) -> UsageSnapshot:
checked_at = checked_at or now_ts() checked_at = checked_at or now_ts()
rate_limit = payload.get("rate_limit") or {} rate_limit = payload.get("rate_limit") or {}
primary = parse_usage_window(rate_limit.get("primary_window")) return UsageSnapshot(
secondary = parse_usage_window(rate_limit.get("secondary_window"))
limit_reached = bool(rate_limit.get("limit_reached"))
allowed = bool(rate_limit.get("allowed", True))
return build_usage_snapshot(
checked_at=checked_at, checked_at=checked_at,
primary_window=primary, primary_window=parse_usage_window(rate_limit.get("primary_window")),
secondary_window=secondary, secondary_window=parse_usage_window(rate_limit.get("secondary_window")),
limit_reached=limit_reached, limit_reached=bool(rate_limit.get("limit_reached")),
allowed=allowed, allowed=bool(rate_limit.get("allowed", True)),
) )
@ -165,47 +116,3 @@ def build_limit(snapshot: UsageSnapshot) -> dict[str, int | bool]:
"exhausted": snapshot.exhausted, "exhausted": snapshot.exhausted,
"needs_prepare": False, "needs_prepare": False,
} }
def _window_reset_deadline(window: UsageWindow, checked_at: int) -> int | None:
if window.reset_at > checked_at:
return window.reset_at
if window.reset_after_seconds > 0:
return checked_at + window.reset_after_seconds
if window.limit_window_seconds > 0:
return checked_at + window.limit_window_seconds
return None
def compute_cooldown_until(
snapshot: UsageSnapshot, exhausted_threshold: int = 100
) -> int | None:
effective_used = _effective_used_percent(
snapshot.primary_window, snapshot.secondary_window
)
if (
effective_used < exhausted_threshold
and not snapshot.limit_reached
and snapshot.allowed
):
return None
blocking_windows = [
window
for window in (snapshot.primary_window, snapshot.secondary_window)
if window and window.used_percent >= exhausted_threshold
]
if not blocking_windows and (snapshot.limit_reached or not snapshot.allowed):
blocking_windows = [
window
for window in (snapshot.primary_window, snapshot.secondary_window)
if window
]
deadlines = [
deadline
for window in blocking_windows
if (deadline := _window_reset_deadline(window, snapshot.checked_at)) is not None
]
if deadlines:
return max(deadlines)
return snapshot.checked_at + UNKNOWN_EXHAUSTED_BACKOFF_SECONDS

View file

@ -13,15 +13,20 @@ class Settings:
data_dir: Path = Path(os.getenv("DATA_DIR", "data")) data_dir: Path = Path(os.getenv("DATA_DIR", "data"))
bind_host: str = os.getenv("GIBBY_BIND_HOST", "0.0.0.0") bind_host: str = os.getenv("GIBBY_BIND_HOST", "0.0.0.0")
port: int = int(os.getenv("GIBBY_PORT", "8080")) port: int = int(os.getenv("GIBBY_PORT", "8080"))
request_timeout: float = float(os.getenv("GIBBY_REQUEST_TIMEOUT", "40"))
usage_stale_seconds: int = int(os.getenv("GIBBY_USAGE_STALE_SECONDS", "3600"))
exhausted_usage_threshold: int = int( exhausted_usage_threshold: int = int(
os.getenv("GIBBY_EXHAUSTED_USAGE_THRESHOLD", "95") os.getenv("GIBBY_EXHAUSTED_USAGE_THRESHOLD", "95")
) )
token_refresh_buffer_seconds: int = int(
os.getenv("GIBBY_TOKEN_REFRESH_BUFFER_SECONDS", "30")
)
callback_host: str = os.getenv("GIBBY_CALLBACK_HOST", "localhost") callback_host: str = os.getenv("GIBBY_CALLBACK_HOST", "localhost")
callback_port: int = int(os.getenv("GIBBY_CALLBACK_PORT", "1455")) callback_port: int = int(os.getenv("GIBBY_CALLBACK_PORT", "1455"))
oauth_client_id: str = os.getenv("GIBBY_OAUTH_CLIENT_ID", DEFAULT_CLIENT_ID) oauth_client_id: str = os.getenv("GIBBY_OAUTH_CLIENT_ID", DEFAULT_CLIENT_ID)
oauth_scope: str = os.getenv("GIBBY_OAUTH_SCOPE", DEFAULT_SCOPE) oauth_scope: str = os.getenv("GIBBY_OAUTH_SCOPE", DEFAULT_SCOPE)
originator: str = os.getenv("GIBBY_ORIGINATOR", "opencode") originator: str = os.getenv("GIBBY_ORIGINATOR", "opencode")
request_timeout: float = float(os.getenv("GIBBY_REQUEST_TIMEOUT", "40"))
@property @property
def accounts_file(self) -> Path: def accounts_file(self) -> Path:
@ -29,7 +34,7 @@ class Settings:
@property @property
def failed_file(self) -> Path: def failed_file(self) -> Path:
return self.data_dir / "failed.txt" return self.data_dir / "failed.json"
@property @property
def redirect_uri(self) -> str: def redirect_uri(self) -> str:

View file

@ -2,170 +2,153 @@ from __future__ import annotations
import json import json
import os import os
from dataclasses import asdict
from pathlib import Path from pathlib import Path
from tempfile import NamedTemporaryFile from tempfile import NamedTemporaryFile
from typing import Any from typing import Any
from gibby.models import ( from gibby.models import AccountRecord, StateFile, UsageSnapshot, UsageWindow
AccountRecord,
StateFile,
UsageSnapshot,
UsageWindow,
build_usage_snapshot,
parse_usage_window,
)
class JsonStateStore: class JsonStateStore:
def __init__(self, path: Path, failed_path: Path | None = None): def __init__(self, path: Path, failed_path: Path | None = None):
self.path = path self.path = path
self.failed_path = failed_path or path.with_name("failed.txt") self.failed_path = failed_path or path.with_name("failed.json")
def load(self) -> StateFile: def load(self) -> StateFile:
try: raw = self._read_json(self.path, {"active_account": None, "accounts": []})
if not self.path.exists(): accounts = [self._account_from_dict(item) for item in raw.get("accounts", [])]
return StateFile() return StateFile(active_account=raw.get("active_account"), accounts=accounts)
raw = json.loads(self.path.read_text())
if not isinstance(raw, dict):
raise RuntimeError("accounts.json must contain a JSON object")
accounts = [
self._account_from_dict(item) for item in raw.get("accounts", [])
]
return StateFile(
version=int(raw.get("version", 1)),
active_account_id=raw.get("active_account_id"),
accounts=accounts,
)
except (OSError, json.JSONDecodeError, TypeError, ValueError, KeyError) as exc:
raise RuntimeError(f"Invalid accounts state file: {exc}") from exc
def save(self, state: StateFile) -> None: def save(self, state: StateFile) -> None:
self.path.parent.mkdir(parents=True, exist_ok=True)
payload = { payload = {
"version": state.version, "active_account": state.active_account,
"active_account_id": state.active_account_id,
"accounts": [self._account_to_dict(account) for account in state.accounts], "accounts": [self._account_to_dict(account) for account in state.accounts],
} }
with NamedTemporaryFile( self._write_json(self.path, payload)
"w", delete=False, dir=self.path.parent, encoding="utf-8"
) as tmp: def load_failed_accounts(self) -> list[AccountRecord]:
json.dump(payload, tmp, ensure_ascii=True, indent=2) raw = self._read_json(self.failed_path, {"accounts": []})
tmp.write("\n") return [self._account_from_dict(item) for item in raw.get("accounts", [])]
temp_name = tmp.name
os.replace(temp_name, self.path) def append_failed_account(self, account: AccountRecord) -> None:
payload = self._read_json(self.failed_path, {"accounts": []})
payload.setdefault("accounts", []).append(self._account_to_dict(account))
self._write_json(self.failed_path, {"accounts": payload["accounts"]})
def upsert_account( def upsert_account(
self, account: AccountRecord, set_active: bool = False self, account: AccountRecord, set_active: bool = False
) -> AccountRecord: ) -> AccountRecord:
state = self.load() state = self.load()
for index, existing in enumerate(state.accounts): for index, existing in enumerate(state.accounts):
if existing.email and account.email and existing.email == account.email: if existing.email == account.email:
if state.active_account_id == existing.id:
state.active_account_id = account.id
state.accounts[index] = account
break
if existing.id == account.id:
state.accounts[index] = account state.accounts[index] = account
break break
else: else:
state.accounts.append(account) state.accounts.append(account)
if set_active or not state.active_account_id: if set_active or not state.active_account:
state.active_account_id = account.id state.active_account = account.email
self.save(state) self.save(state)
return account return account
def remove_account(self, state: StateFile, account_id: str) -> None: def remove_account(self, state: StateFile, email: str) -> None:
state.accounts = [ state.accounts = [account for account in state.accounts if account.email != email]
account for account in state.accounts if account.id != account_id if state.active_account == email:
] state.active_account = None
if state.active_account_id == account_id:
state.active_account_id = state.accounts[0].id if state.accounts else None
@staticmethod def move_to_failed(self, state: StateFile, email: str) -> None:
def update_active_account_id( account = next((account for account in state.accounts if account.email == email), None)
state: StateFile, previous_id: str, current_id: str if account is None:
return
self.remove_account(state, email)
self.append_failed_account(account)
def update_active_account(
self, state: StateFile, previous_email: str, current_email: str
) -> None: ) -> None:
if state.active_account_id == previous_id: if state.active_account == previous_email:
state.active_account_id = current_id state.active_account = current_email
def append_failed_identifier(self, identifier: str) -> None: def _read_json(self, path: Path, default: dict[str, Any]) -> dict[str, Any]:
identifier = identifier.strip() if not path.exists():
if not identifier: return default
return try:
self.failed_path.parent.mkdir(parents=True, exist_ok=True) raw = json.loads(path.read_text())
existing = set() except (OSError, json.JSONDecodeError) as exc:
if self.failed_path.exists(): raise RuntimeError(f"Invalid state file {path}: {exc}") from exc
existing = { if not isinstance(raw, dict):
line.strip() raise RuntimeError(f"State file {path} must contain a JSON object")
for line in self.failed_path.read_text().splitlines() return raw
if line.strip()
} def _write_json(self, path: Path, payload: dict[str, Any]) -> None:
if identifier in existing: path.parent.mkdir(parents=True, exist_ok=True)
return with NamedTemporaryFile(
with self.failed_path.open("a", encoding="utf-8") as file: "w", delete=False, dir=path.parent, encoding="utf-8"
file.write(f"{identifier}\n") ) as tmp:
json.dump(payload, tmp, ensure_ascii=True, indent=2)
tmp.write("\n")
temp_name = tmp.name
os.replace(temp_name, path)
@staticmethod @staticmethod
def _window_to_dict(window: UsageWindow | None) -> dict[str, Any] | None: def _account_to_dict(account: AccountRecord) -> dict[str, Any]:
return asdict(window) if window else None return {
"email": account.email,
"access_token": account.access_token,
"refresh_token": account.refresh_token,
"token_refresh_at": account.token_refresh_at,
"usage": JsonStateStore._usage_to_dict(account.usage),
"usage_checked_at": account.usage_checked_at,
"disabled": account.disabled,
}
@classmethod @staticmethod
def _snapshot_to_dict(cls, snapshot: UsageSnapshot | None) -> dict[str, Any] | None: def _usage_to_dict(snapshot: UsageSnapshot | None) -> dict[str, Any] | None:
if snapshot is None: if snapshot is None:
return None return None
return { return {
"checked_at": snapshot.checked_at, "primary": JsonStateStore._window_to_dict(snapshot.primary_window),
"primary_window": cls._window_to_dict(snapshot.primary_window), "secondary": JsonStateStore._window_to_dict(snapshot.secondary_window),
"secondary_window": cls._window_to_dict(snapshot.secondary_window),
"limit_reached": snapshot.limit_reached,
"allowed": snapshot.allowed,
} }
@staticmethod @staticmethod
def _window_from_dict(window: dict[str, Any] | None) -> UsageWindow | None: def _window_to_dict(window: UsageWindow | None) -> dict[str, Any] | None:
return parse_usage_window(window) if window is None:
@classmethod
def _snapshot_from_dict(
cls, snapshot: dict[str, Any] | None
) -> UsageSnapshot | None:
if not isinstance(snapshot, dict):
return None return None
return build_usage_snapshot( return {
checked_at=int(snapshot.get("checked_at") or 0), "used_percent": window.used_percent,
primary_window=cls._window_from_dict(snapshot.get("primary_window")), "reset_at": window.reset_at,
secondary_window=cls._window_from_dict(snapshot.get("secondary_window")), }
limit_reached=bool(snapshot.get("limit_reached")),
allowed=bool(snapshot.get("allowed", True)),
)
@classmethod @staticmethod
def _account_from_dict(cls, payload: dict[str, Any]) -> AccountRecord: def _account_from_dict(payload: dict[str, Any]) -> AccountRecord:
usage_checked_at = payload.get("usage_checked_at")
usage = JsonStateStore._usage_from_dict(payload.get("usage"), usage_checked_at)
return AccountRecord( return AccountRecord(
id=str(payload["id"]), email=str(payload.get("email") or ""),
email=payload.get("email"),
account_id=payload.get("account_id"),
access_token=str(payload.get("access_token") or ""), access_token=str(payload.get("access_token") or ""),
refresh_token=str(payload.get("refresh_token") or ""), refresh_token=str(payload.get("refresh_token") or ""),
expires_at=int(payload.get("expires_at") or 0), token_refresh_at=int(payload.get("token_refresh_at") or 0),
cooldown_until=int(payload["cooldown_until"]) usage=usage,
if payload.get("cooldown_until") usage_checked_at=int(usage_checked_at) if usage_checked_at else None,
else None, disabled=bool(payload.get("disabled", False)),
last_known_usage=cls._snapshot_from_dict(payload.get("last_known_usage")),
last_error=payload.get("last_error"),
) )
@classmethod @staticmethod
def _account_to_dict(cls, account: AccountRecord) -> dict[str, Any]: def _usage_from_dict(payload: dict[str, Any] | None, checked_at: Any) -> UsageSnapshot | None:
return { if not isinstance(payload, dict):
"id": account.id, return None
"email": account.email, return UsageSnapshot(
"account_id": account.account_id, checked_at=int(checked_at or 0),
"access_token": account.access_token, primary_window=JsonStateStore._window_from_dict(payload.get("primary")),
"refresh_token": account.refresh_token, secondary_window=JsonStateStore._window_from_dict(payload.get("secondary")),
"expires_at": account.expires_at, )
"cooldown_until": account.cooldown_until,
"last_known_usage": cls._snapshot_to_dict(account.last_known_usage), @staticmethod
"last_error": account.last_error, def _window_from_dict(payload: dict[str, Any] | None) -> UsageWindow | None:
} if not isinstance(payload, dict):
return None
reset_at = payload.get("reset_at")
return UsageWindow(
used_percent=int(payload.get("used_percent") or 0),
reset_at=int(reset_at) if reset_at else None,
)

10
tests/conftest.py Normal file
View file

@ -0,0 +1,10 @@
from __future__ import annotations
import sys
from pathlib import Path
ROOT = Path(__file__).resolve().parents[1]
SRC = ROOT / "src"
if str(SRC) not in sys.path:
sys.path.insert(0, str(SRC))

View file

@ -1,233 +0,0 @@
from __future__ import annotations
import time
from typing import Any, cast
import pytest
from gibby.account_ops import (
PermanentAccountFailure,
refresh_account_usage,
snapshot_is_exhausted,
window_used_percent,
)
from gibby.client import OpenAIAPIError
from gibby.models import (
UNKNOWN_EXHAUSTED_BACKOFF_SECONDS,
AccountRecord,
UsageSnapshot,
UsageWindow,
compute_cooldown_until,
)
class FakeClient:
def __init__(self, usage: UsageSnapshot, *, permanent: bool = False):
self.usage = usage
self.permanent = permanent
self.refresh_calls: list[str] = []
async def refresh_access_token(self, refresh_token: str):
self.refresh_calls.append(refresh_token)
return ("new-token", "new-refresh", int(time.time()) + 600)
async def fetch_usage_payload(self, access_token: str) -> dict:
if self.permanent:
raise OpenAIAPIError("invalid_grant", permanent=True, status_code=401)
primary_window = self.usage.primary_window
assert primary_window is not None
secondary_window = (
{
"used_percent": self.usage.secondary_window.used_percent,
"limit_window_seconds": self.usage.secondary_window.limit_window_seconds,
"reset_after_seconds": self.usage.secondary_window.reset_after_seconds,
"reset_at": self.usage.secondary_window.reset_at,
}
if self.usage.secondary_window is not None
else None
)
return {
"email": "acc@example.com",
"account_id": "acc-1",
"rate_limit": {
"allowed": self.usage.allowed,
"limit_reached": self.usage.limit_reached,
"primary_window": {
"used_percent": primary_window.used_percent,
"limit_window_seconds": primary_window.limit_window_seconds,
"reset_after_seconds": primary_window.reset_after_seconds,
"reset_at": primary_window.reset_at,
},
"secondary_window": secondary_window,
},
}
def make_usage(
primary: int,
secondary: int | None = None,
*,
limit_reached: bool = False,
allowed: bool | None = None,
reset_after: int = 10,
checked_at: int | None = None,
primary_reset_at: int | None = None,
secondary_reset_at: int | None = None,
primary_limit_window_seconds: int = 18000,
secondary_limit_window_seconds: int = 604800,
) -> UsageSnapshot:
checked_at = checked_at or int(time.time())
exhausted = (
primary >= 100 or (secondary is not None and secondary >= 100) or limit_reached
)
if allowed is None:
allowed = not exhausted
return UsageSnapshot(
checked_at=checked_at,
used_percent=max(primary, secondary or 0),
remaining_percent=max(0, 100 - max(primary, secondary or 0)),
exhausted=exhausted or not allowed,
primary_window=UsageWindow(
primary,
primary_limit_window_seconds,
reset_after,
primary_reset_at
if primary_reset_at is not None
else checked_at + reset_after,
),
secondary_window=UsageWindow(
secondary,
secondary_limit_window_seconds,
reset_after,
secondary_reset_at
if secondary_reset_at is not None
else checked_at + reset_after,
)
if secondary is not None
else None,
limit_reached=limit_reached,
allowed=allowed,
)
def test_window_used_percent_defaults_to_zero() -> None:
assert window_used_percent(None) == 0
def test_snapshot_is_exhausted_checks_various_conditions() -> None:
assert snapshot_is_exhausted(make_usage(94, 95), 96) is False
assert snapshot_is_exhausted(make_usage(94), 95) is False
assert snapshot_is_exhausted(make_usage(95), 95) is True
assert snapshot_is_exhausted(make_usage(50, 95), 95) is True
assert snapshot_is_exhausted(make_usage(50, limit_reached=True), 95) is True
def test_compute_cooldown_until_uses_primary_blocking_window() -> None:
usage = make_usage(
95, 20, checked_at=1000, primary_reset_at=1100, secondary_reset_at=1300
)
assert compute_cooldown_until(usage, 95) == 1100
def test_compute_cooldown_until_uses_secondary_blocking_window() -> None:
usage = make_usage(
20, 95, checked_at=1000, primary_reset_at=1100, secondary_reset_at=1300
)
assert compute_cooldown_until(usage, 95) == 1300
def test_compute_cooldown_until_waits_for_all_blocking_windows() -> None:
usage = make_usage(
95, 95, checked_at=1000, primary_reset_at=1100, secondary_reset_at=1300
)
assert compute_cooldown_until(usage, 95) == 1300
def test_compute_cooldown_until_uses_latest_window_when_blocker_is_ambiguous() -> None:
usage = make_usage(
80,
40,
limit_reached=True,
allowed=False,
checked_at=1000,
primary_reset_at=1100,
secondary_reset_at=1300,
)
assert compute_cooldown_until(usage, 95) == 1300
def test_compute_cooldown_until_falls_back_to_limit_window_seconds() -> None:
usage = make_usage(
95,
checked_at=1000,
reset_after=0,
primary_reset_at=0,
primary_limit_window_seconds=600,
)
assert compute_cooldown_until(usage, 95) == 1600
def test_compute_cooldown_until_uses_backoff_when_no_reset_metadata_exists() -> None:
usage = make_usage(
95,
checked_at=1000,
reset_after=0,
primary_reset_at=0,
primary_limit_window_seconds=0,
)
assert compute_cooldown_until(usage, 95) == 1000 + UNKNOWN_EXHAUSTED_BACKOFF_SECONDS
@pytest.mark.asyncio
async def test_refresh_account_usage_populates_snapshot_and_cooldown() -> None:
current = int(time.time())
account = AccountRecord(
id="a1",
access_token="tok",
refresh_token="ref",
expires_at=int(time.time()) + 600,
)
usage = make_usage(
96,
1,
limit_reached=True,
allowed=False,
checked_at=current,
primary_reset_at=current + 100,
secondary_reset_at=current + 300,
)
client = FakeClient(usage)
result = await refresh_account_usage(account, cast(Any, client), 95)
assert result.used_percent == usage.used_percent
assert result.limit_reached is usage.limit_reached
assert result.allowed is usage.allowed
assert account.last_known_usage is not None
assert account.last_known_usage.used_percent == usage.used_percent
assert account.cooldown_until == current + 100
assert account.email == "acc@example.com"
assert account.id == "acc@example.com"
@pytest.mark.asyncio
async def test_refresh_account_usage_raises_permanent_failure() -> None:
account = AccountRecord(
id="a1",
access_token="tok",
refresh_token="ref",
expires_at=int(time.time()) + 600,
)
with pytest.raises(PermanentAccountFailure):
await refresh_account_usage(
account,
cast(Any, FakeClient(make_usage(1), permanent=True)),
95,
)

View file

@ -11,11 +11,9 @@ class StubManager(AccountManager):
def __init__( def __init__(
self, self,
response: dict[str, Any] | None = None, response: dict[str, Any] | None = None,
usage_response: dict[str, Any] | None = None,
error: Exception | None = None, error: Exception | None = None,
): ):
self.response = response self.response = response
self.usage_response = usage_response
self.error = error self.error = error
async def issue_token_response(self): async def issue_token_response(self):
@ -25,12 +23,6 @@ class StubManager(AccountManager):
return self.response return self.response
return {} return {}
async def get_usage_report(self):
if self.usage_response is not None:
return self.usage_response
return {"accounts": [], "active_account_id": None, "count": 0}
def test_health_ok() -> None: def test_health_ok() -> None:
client = TestClient(create_app(StubManager(response={}))) client = TestClient(create_app(StubManager(response={})))
response = client.get("/health") response = client.get("/health")
@ -64,26 +56,3 @@ def test_token_error_shape() -> None:
response = client.get("/token") response = client.get("/token")
assert response.status_code == 503 assert response.status_code == 503
assert response.json() == {"error": "No usable account available"} assert response.json() == {"error": "No usable account available"}
def test_usage_success_shape() -> None:
payload = {
"accounts": [
{
"id": "a1",
"email": "a1@example.com",
"status": "ok",
"used_percent": 12,
"remaining_percent": 88,
"cooldown_until": None,
"primary_window": {"used_percent": 12},
"secondary_window": {"used_percent": 1},
}
],
"active_account_id": "a1",
"count": 1,
}
client = TestClient(create_app(StubManager(usage_response=payload)))
response = client.get("/usage")
assert response.status_code == 200
assert response.json() == payload

View file

@ -1,5 +1,6 @@
from __future__ import annotations from __future__ import annotations
import json
import time import time
from pathlib import Path from pathlib import Path
@ -17,15 +18,17 @@ class FakeClient(OpenAIClient):
self, self,
usage_by_token=None, usage_by_token=None,
refresh_map=None, refresh_map=None,
failing_tokens=None, invalid_tokens=None,
permanent_refresh_tokens=None, permanent_refresh_tokens=None,
): ):
self.usage_by_token = usage_by_token or {} self.usage_by_token = usage_by_token or {}
self.refresh_map = refresh_map or {} self.refresh_map = refresh_map or {}
self.failing_tokens = set(failing_tokens or []) self.invalid_tokens = set(invalid_tokens or [])
self.permanent_refresh_tokens = set(permanent_refresh_tokens or []) self.permanent_refresh_tokens = set(permanent_refresh_tokens or [])
self.fetched_tokens: list[str] = [] self.fetched_usage_tokens: list[str] = []
self.validated_tokens: list[str] = []
self.refresh_calls: list[str] = [] self.refresh_calls: list[str] = []
self.settings = Settings(data_dir=Path("."))
async def refresh_access_token(self, refresh_token: str): async def refresh_access_token(self, refresh_token: str):
self.refresh_calls.append(refresh_token) self.refresh_calls.append(refresh_token)
@ -34,28 +37,21 @@ class FakeClient(OpenAIClient):
return self.refresh_map[refresh_token] return self.refresh_map[refresh_token]
async def fetch_usage_payload(self, access_token: str): async def fetch_usage_payload(self, access_token: str):
self.fetched_tokens.append(access_token) self.fetched_usage_tokens.append(access_token)
if access_token in self.failing_tokens:
raise RuntimeError("usage failed")
usage = self.usage_by_token[access_token] usage = self.usage_by_token[access_token]
return { return {
"email": f"{access_token}@example.com", "email": f"{access_token}@example.com",
"account_id": f"acct-{access_token}",
"rate_limit": { "rate_limit": {
"allowed": usage.allowed, "allowed": usage.allowed,
"limit_reached": usage.limit_reached, "limit_reached": usage.limit_reached,
"primary_window": { "primary_window": {
"used_percent": usage.primary_window.used_percent, "used_percent": usage.primary_window.used_percent,
"limit_window_seconds": usage.primary_window.limit_window_seconds,
"reset_after_seconds": usage.primary_window.reset_after_seconds,
"reset_at": usage.primary_window.reset_at, "reset_at": usage.primary_window.reset_at,
} }
if usage.primary_window if usage.primary_window
else None, else None,
"secondary_window": { "secondary_window": {
"used_percent": usage.secondary_window.used_percent, "used_percent": usage.secondary_window.used_percent,
"limit_window_seconds": usage.secondary_window.limit_window_seconds,
"reset_after_seconds": usage.secondary_window.reset_after_seconds,
"reset_at": usage.secondary_window.reset_at, "reset_at": usage.secondary_window.reset_at,
} }
if usage.secondary_window if usage.secondary_window
@ -63,53 +59,38 @@ class FakeClient(OpenAIClient):
}, },
} }
async def validate_token(self, access_token: str) -> bool:
self.validated_tokens.append(access_token)
return access_token not in self.invalid_tokens
def make_usage(
*, def make_usage(primary: int, secondary: int = 0, *, checked_at: int | None = None):
used: int,
secondary_used: int | None = None,
limit_reached: bool = False,
reset_after: int = 0,
) -> UsageSnapshot:
exhausted = (
used >= 100
or (secondary_used is not None and secondary_used >= 100)
or limit_reached
)
return UsageSnapshot( return UsageSnapshot(
checked_at=int(time.time()), checked_at=checked_at or int(time.time()),
used_percent=used, primary_window=UsageWindow(used_percent=primary, reset_at=int(time.time()) + 300),
remaining_percent=max(0, 100 - used),
exhausted=exhausted,
primary_window=UsageWindow(
used_percent=used,
limit_window_seconds=604800,
reset_after_seconds=reset_after,
reset_at=int(time.time()) + reset_after if reset_after else 0,
),
secondary_window=UsageWindow( secondary_window=UsageWindow(
used_percent=secondary_used, used_percent=secondary, reset_at=int(time.time()) + 300
limit_window_seconds=604800, ),
reset_after_seconds=reset_after,
reset_at=int(time.time()) + reset_after if reset_after else 0,
)
if secondary_used is not None
else None,
limit_reached=limit_reached,
allowed=not exhausted,
) )
def make_manager( def make_account(
store: JsonStateStore, email: str,
client: FakeClient,
*, *,
threshold: int = 95, token: str,
) -> AccountManager: refresh_token: str = "refresh",
return AccountManager( token_refresh_at: int | None = None,
store, usage: UsageSnapshot | None = None,
client, disabled: bool = False,
Settings(data_dir=store.path.parent, exhausted_usage_threshold=threshold), ) -> AccountRecord:
return AccountRecord(
email=email,
access_token=token,
refresh_token=refresh_token,
token_refresh_at=token_refresh_at or int(time.time()) + 600,
usage=usage,
usage_checked_at=usage.checked_at if usage is not None else None,
disabled=disabled,
) )
@ -119,452 +100,167 @@ def make_store(tmp_path: Path, state: StateFile) -> JsonStateStore:
return store return store
@pytest.mark.asyncio def make_manager(
async def test_prefers_active_account_when_locally_usable(tmp_path: Path) -> None: store: JsonStateStore,
active = AccountRecord( client: FakeClient,
id="a1", *,
access_token="tok-a1", threshold: int = 95,
refresh_token="ref-a1", stale_seconds: int = 3600,
expires_at=int(time.time()) + 600, ) -> AccountManager:
last_known_usage=make_usage(used=20), return AccountManager(
) store,
second = AccountRecord( client,
id="a2", Settings(
access_token="tok-a2", data_dir=store.path.parent,
refresh_token="ref-a2", exhausted_usage_threshold=threshold,
expires_at=int(time.time()) + 600, usage_stale_seconds=stale_seconds,
last_known_usage=make_usage(used=70),
)
store = make_store(
tmp_path, StateFile(active_account_id="a1", accounts=[active, second])
)
client = FakeClient(
usage_by_token={
"tok-a1": make_usage(used=21),
"tok-a2": make_usage(used=72),
}
)
payload = await make_manager(store, client).issue_token_response()
assert payload["token"] == "tok-a1"
assert client.fetched_tokens == ["tok-a1"]
assert store.load().active_account_id == "tok-a1@example.com"
@pytest.mark.asyncio
async def test_prefers_higher_primary_usage_from_saved_snapshot(tmp_path: Path) -> None:
first = AccountRecord(
id="a1",
access_token="tok-a1",
refresh_token="ref-a1",
expires_at=int(time.time()) + 600,
last_known_usage=make_usage(used=20),
)
second = AccountRecord(
id="a2",
access_token="tok-a2",
refresh_token="ref-a2",
expires_at=int(time.time()) + 600,
last_known_usage=make_usage(used=70),
)
store = make_store(tmp_path, StateFile(accounts=[first, second]))
client = FakeClient(usage_by_token={"tok-a2": make_usage(used=72)})
payload = await make_manager(store, client).issue_token_response()
assert payload["token"] == "tok-a2"
assert client.fetched_tokens == ["tok-a2"]
saved = store.load()
assert saved.active_account_id == "tok-a2@example.com"
@pytest.mark.asyncio
async def test_breaks_ties_with_secondary_usage(tmp_path: Path) -> None:
first = AccountRecord(
id="a1",
access_token="tok-a1",
refresh_token="ref-a1",
expires_at=int(time.time()) + 600,
last_known_usage=make_usage(used=60, secondary_used=10),
)
second = AccountRecord(
id="a2",
access_token="tok-a2",
refresh_token="ref-a2",
expires_at=int(time.time()) + 600,
last_known_usage=make_usage(used=60, secondary_used=40),
)
store = make_store(tmp_path, StateFile(accounts=[first, second]))
client = FakeClient(
usage_by_token={"tok-a2": make_usage(used=61, secondary_used=41)}
)
payload = await make_manager(store, client).issue_token_response()
assert payload["token"] == "tok-a2"
assert client.fetched_tokens == ["tok-a2"]
@pytest.mark.asyncio
async def test_treats_missing_secondary_as_zero(tmp_path: Path) -> None:
first = AccountRecord(
id="a1",
access_token="tok-a1",
refresh_token="ref-a1",
expires_at=int(time.time()) + 600,
last_known_usage=make_usage(used=60),
)
second = AccountRecord(
id="a2",
access_token="tok-a2",
refresh_token="ref-a2",
expires_at=int(time.time()) + 600,
last_known_usage=make_usage(used=60, secondary_used=1),
)
store = make_store(tmp_path, StateFile(accounts=[first, second]))
client = FakeClient(
usage_by_token={"tok-a2": make_usage(used=61, secondary_used=1)}
)
payload = await make_manager(store, client).issue_token_response()
assert payload["token"] == "tok-a2"
assert client.fetched_tokens == ["tok-a2"]
@pytest.mark.asyncio
async def test_skips_account_still_in_cooldown(tmp_path: Path) -> None:
active = AccountRecord(
id="a1",
access_token="tok-a1",
refresh_token="ref-a1",
expires_at=int(time.time()) + 600,
cooldown_until=int(time.time()) + 300,
last_known_usage=make_usage(used=80),
)
second = AccountRecord(
id="a2",
access_token="tok-a2",
refresh_token="ref-a2",
expires_at=int(time.time()) + 600,
last_known_usage=make_usage(used=20),
)
store = make_store(
tmp_path, StateFile(active_account_id="a1", accounts=[active, second])
)
client = FakeClient(usage_by_token={"tok-a2": make_usage(used=25)})
payload = await make_manager(store, client).issue_token_response()
assert payload["token"] == "tok-a2"
assert client.fetched_tokens == ["tok-a2"]
assert store.load().active_account_id == "tok-a2@example.com"
@pytest.mark.asyncio
async def test_skips_active_account_blocked_by_local_exhausted_snapshot(
tmp_path: Path,
) -> None:
active = AccountRecord(
id="a1",
access_token="tok-a1",
refresh_token="ref-a1",
expires_at=int(time.time()) + 600,
last_known_usage=make_usage(used=96),
)
second = AccountRecord(
id="a2",
access_token="tok-a2",
refresh_token="ref-a2",
expires_at=int(time.time()) + 600,
last_known_usage=make_usage(used=20),
)
store = make_store(
tmp_path, StateFile(active_account_id="a1", accounts=[active, second])
)
client = FakeClient(usage_by_token={"tok-a2": make_usage(used=25)})
payload = await make_manager(store, client).issue_token_response()
assert payload["token"] == "tok-a2"
assert client.fetched_tokens == ["tok-a2"]
assert store.load().active_account_id == "tok-a2@example.com"
@pytest.mark.asyncio
async def test_live_checks_depleted_and_moves_to_next(tmp_path: Path) -> None:
high = AccountRecord(
id="a1",
access_token="tok-a1",
refresh_token="ref-a1",
expires_at=int(time.time()) + 600,
last_known_usage=make_usage(used=94),
)
second = AccountRecord(
id="a2",
access_token="tok-a2",
refresh_token="ref-a2",
expires_at=int(time.time()) + 600,
last_known_usage=make_usage(used=50),
)
store = make_store(tmp_path, StateFile(accounts=[high, second]))
client = FakeClient(
usage_by_token={
"tok-a1": make_usage(used=96, reset_after=120),
"tok-a2": make_usage(used=52),
}
)
payload = await make_manager(store, client).issue_token_response()
assert payload["token"] == "tok-a2"
assert client.fetched_tokens == ["tok-a1", "tok-a2"]
@pytest.mark.asyncio
async def test_live_checks_secondary_depleted_and_moves_to_next(tmp_path: Path) -> None:
high = AccountRecord(
id="a1",
access_token="tok-a1",
refresh_token="ref-a1",
expires_at=int(time.time()) + 600,
last_known_usage=make_usage(used=30, secondary_used=94),
)
second = AccountRecord(
id="a2",
access_token="tok-a2",
refresh_token="ref-a2",
expires_at=int(time.time()) + 600,
last_known_usage=make_usage(used=20, secondary_used=10),
)
store = make_store(tmp_path, StateFile(accounts=[high, second]))
client = FakeClient(
usage_by_token={
"tok-a1": make_usage(used=30, secondary_used=100, reset_after=120),
"tok-a2": make_usage(used=22, secondary_used=10),
}
)
payload = await make_manager(store, client).issue_token_response()
assert payload["token"] == "tok-a2"
assert client.fetched_tokens == ["tok-a1", "tok-a2"]
@pytest.mark.asyncio
async def test_falls_through_when_live_usage_is_depleted(tmp_path: Path) -> None:
first = AccountRecord(
id="a1",
access_token="tok-a1",
refresh_token="ref-a1",
expires_at=int(time.time()) + 600,
last_known_usage=make_usage(used=80),
)
second = AccountRecord(
id="a2",
access_token="tok-a2",
refresh_token="ref-a2",
expires_at=int(time.time()) + 600,
last_known_usage=make_usage(used=70),
)
store = make_store(tmp_path, StateFile(accounts=[first, second]))
client = FakeClient(
usage_by_token={
"tok-a1": make_usage(used=95, reset_after=120),
"tok-a2": make_usage(used=71),
}
)
payload = await make_manager(store, client).issue_token_response()
saved = store.load()
assert payload["token"] == "tok-a2"
assert client.fetched_tokens == ["tok-a1", "tok-a2"]
depleted = next(
account for account in saved.accounts if account.id == "tok-a1@example.com"
)
assert depleted.cooldown_until is not None
assert depleted.last_known_usage is not None
assert depleted.last_known_usage.used_percent == 95
@pytest.mark.asyncio
async def test_keeps_account_out_until_blocking_window_resets(tmp_path: Path) -> None:
current = int(time.time())
blocked = AccountRecord(
id="a1",
access_token="tok-a1",
refresh_token="ref-a1",
expires_at=int(time.time()) + 600,
last_known_usage=make_usage(used=80, secondary_used=40),
)
second = AccountRecord(
id="a2",
access_token="tok-a2",
refresh_token="ref-a2",
expires_at=int(time.time()) + 600,
last_known_usage=make_usage(used=70),
)
store = make_store(tmp_path, StateFile(accounts=[blocked, second]))
blocked_usage = UsageSnapshot(
checked_at=current,
used_percent=80,
remaining_percent=20,
exhausted=True,
primary_window=UsageWindow(
used_percent=80,
limit_window_seconds=604800,
reset_after_seconds=60,
reset_at=current + 60,
), ),
secondary_window=UsageWindow(
used_percent=40,
limit_window_seconds=604800,
reset_after_seconds=240,
reset_at=current + 240,
),
limit_reached=True,
allowed=False,
) )
client = FakeClient(
usage_by_token={
"tok-a1": blocked_usage,
"tok-a2": make_usage(used=71),
}
)
payload = await make_manager(store, client).issue_token_response()
saved = store.load()
blocked_saved = next(
account for account in saved.accounts if account.id == "tok-a1@example.com"
)
assert payload["token"] == "tok-a2"
assert blocked_saved.cooldown_until == current + 240
assert client.fetched_tokens == ["tok-a1", "tok-a2"]
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_refreshes_expired_token_before_usage(tmp_path: Path) -> None: async def test_prefers_active_account_when_usable(tmp_path: Path) -> None:
active = AccountRecord( active = make_account("a@example.com", token="tok-a", usage=make_usage(20, 0))
id="a1", second = make_account("b@example.com", token="tok-b", usage=make_usage(80, 0))
access_token="old-token", store = make_store(
refresh_token="ref-a1", tmp_path,
expires_at=int(time.time()) - 1, StateFile(active_account="a@example.com", accounts=[active, second]),
last_known_usage=make_usage(used=20),
) )
store = make_store(tmp_path, StateFile(active_account_id="a1", accounts=[active])) client = FakeClient()
client = FakeClient(
usage_by_token={"new-token": make_usage(used=20)}, payload = await make_manager(store, client).issue_token_response()
refresh_map={"ref-a1": ("new-token", "new-refresh", int(time.time()) + 600)},
assert payload["token"] == "tok-a"
assert client.fetched_usage_tokens == []
assert client.validated_tokens == ["tok-a"]
@pytest.mark.asyncio
async def test_refreshes_stale_active_usage_before_deciding(tmp_path: Path) -> None:
stale = int(time.time()) - 7200
active = make_account("a@example.com", token="tok-a", usage=make_usage(20, 0, checked_at=stale))
second = make_account("b@example.com", token="tok-b", usage=make_usage(80, 0))
store = make_store(
tmp_path,
StateFile(active_account="a@example.com", accounts=[active, second]),
) )
client = FakeClient(usage_by_token={"tok-a": make_usage(21, 0)})
payload = await make_manager(store, client).issue_token_response()
assert payload["token"] == "tok-a"
assert client.fetched_usage_tokens == ["tok-a"]
@pytest.mark.asyncio
async def test_falls_back_to_highest_primary_usage_when_active_unusable(tmp_path: Path) -> None:
active = make_account("a@example.com", token="tok-a", usage=make_usage(95, 0))
low = make_account("b@example.com", token="tok-b", usage=make_usage(40, 0))
high = make_account("c@example.com", token="tok-c", usage=make_usage(70, 0))
store = make_store(
tmp_path,
StateFile(active_account="a@example.com", accounts=[active, low, high]),
)
client = FakeClient()
payload = await make_manager(store, client).issue_token_response()
state = store.load()
assert payload["token"] == "tok-c"
assert state.active_account == "c@example.com"
@pytest.mark.asyncio
async def test_skips_disabled_accounts(tmp_path: Path) -> None:
active = make_account("a@example.com", token="tok-a", usage=make_usage(20, 0), disabled=True)
second = make_account("b@example.com", token="tok-b", usage=make_usage(70, 0))
store = make_store(
tmp_path,
StateFile(active_account="a@example.com", accounts=[active, second]),
)
client = FakeClient()
payload = await make_manager(store, client).issue_token_response()
assert payload["token"] == "tok-b"
@pytest.mark.asyncio
async def test_secondary_100_makes_account_unusable(tmp_path: Path) -> None:
active = make_account("a@example.com", token="tok-a", usage=make_usage(20, 100))
second = make_account("b@example.com", token="tok-b", usage=make_usage(30, 0))
store = make_store(
tmp_path,
StateFile(active_account="a@example.com", accounts=[active, second]),
)
client = FakeClient()
payload = await make_manager(store, client).issue_token_response()
assert payload["token"] == "tok-b"
@pytest.mark.asyncio
async def test_refreshes_token_before_validation(tmp_path: Path) -> None:
account = make_account(
"a@example.com",
token="old-token",
refresh_token="ref-a",
token_refresh_at=int(time.time()) - 1,
usage=make_usage(20, 0),
)
store = make_store(tmp_path, StateFile(active_account="a@example.com", accounts=[account]))
client = FakeClient(refresh_map={"ref-a": ("new-token", "new-refresh", int(time.time()) + 600)})
payload = await make_manager(store, client).issue_token_response() payload = await make_manager(store, client).issue_token_response()
saved = store.load() saved = store.load()
assert payload["token"] == "new-token" assert payload["token"] == "new-token"
assert client.refresh_calls == ["ref-a1"] assert client.refresh_calls == ["ref-a"]
assert client.fetched_tokens == ["new-token"] assert client.validated_tokens == ["new-token"]
assert saved.accounts[0].id == "new-token@example.com"
assert saved.accounts[0].access_token == "new-token" assert saved.accounts[0].access_token == "new-token"
assert saved.accounts[0].refresh_token == "new-refresh" assert saved.accounts[0].refresh_token == "new-refresh"
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_raises_when_all_accounts_unusable(tmp_path: Path) -> None: async def test_invalid_token_moves_account_to_failed_json(tmp_path: Path) -> None:
active = AccountRecord( bad = make_account("bad@example.com", token="tok-bad", usage=make_usage(20, 0))
id="a1", good = make_account("good@example.com", token="tok-good", usage=make_usage(30, 0))
access_token="tok-a1", store = make_store(tmp_path, StateFile(active_account="bad@example.com", accounts=[bad, good]))
refresh_token="ref-a1", client = FakeClient(invalid_tokens={"tok-bad"})
expires_at=int(time.time()) + 600,
last_known_usage=make_usage(used=80), payload = await make_manager(store, client).issue_token_response()
) state = store.load()
second = AccountRecord( failed = json.loads((tmp_path / "failed.json").read_text())
id="a2",
access_token="tok-a2", assert payload["token"] == "tok-good"
refresh_token="ref-a2", assert [account.email for account in state.accounts] == ["good@example.com"]
expires_at=int(time.time()) + 600, assert failed["accounts"][0]["email"] == "bad@example.com"
last_known_usage=make_usage(used=70),
)
store = make_store( @pytest.mark.asyncio
tmp_path, StateFile(active_account_id="a1", accounts=[active, second]) async def test_rereads_disk_between_requests(tmp_path: Path) -> None:
) first = make_account("a@example.com", token="tok-a", usage=make_usage(20, 0))
client = FakeClient( store = make_store(tmp_path, StateFile(active_account="a@example.com", accounts=[first]))
usage_by_token={ client = FakeClient()
"tok-a1": make_usage(used=96, reset_after=120), manager = make_manager(store, client)
"tok-a2": make_usage(used=97, reset_after=120),
} first_payload = await manager.issue_token_response()
) assert first_payload["token"] == "tok-a"
replacement = make_account("b@example.com", token="tok-b", usage=make_usage(10, 0))
store.save(StateFile(active_account="b@example.com", accounts=[replacement]))
second_payload = await manager.issue_token_response()
assert second_payload["token"] == "tok-b"
@pytest.mark.asyncio
async def test_raises_when_no_usable_accounts(tmp_path: Path) -> None:
disabled = make_account("a@example.com", token="tok-a", usage=make_usage(10, 0), disabled=True)
exhausted = make_account("b@example.com", token="tok-b", usage=make_usage(95, 0))
store = make_store(tmp_path, StateFile(accounts=[disabled, exhausted]))
client = FakeClient()
with pytest.raises(NoUsableAccountError): with pytest.raises(NoUsableAccountError):
await make_manager(store, client).issue_token_response() await make_manager(store, client).issue_token_response()
saved = store.load()
assert all(account.cooldown_until is not None for account in saved.accounts)
@pytest.mark.asyncio
async def test_threshold_can_be_overridden_for_selection(tmp_path: Path) -> None:
active = AccountRecord(
id="a1",
access_token="tok-a1",
refresh_token="ref-a1",
expires_at=int(time.time()) + 600,
last_known_usage=make_usage(used=96),
)
second = AccountRecord(
id="a2",
access_token="tok-a2",
refresh_token="ref-a2",
expires_at=int(time.time()) + 600,
last_known_usage=make_usage(used=20),
)
store = make_store(
tmp_path, StateFile(active_account_id="a1", accounts=[active, second])
)
client = FakeClient(
usage_by_token={
"tok-a1": make_usage(used=96),
"tok-a2": make_usage(used=25),
}
)
payload = await make_manager(store, client, threshold=97).issue_token_response()
assert payload["token"] == "tok-a1"
assert client.fetched_tokens == ["tok-a1"]
@pytest.mark.asyncio
async def test_removes_account_and_records_failed_email_on_permanent_refresh_failure(
tmp_path: Path,
) -> None:
dead = AccountRecord(
id="a1",
email="dead@example.com",
access_token="tok-a1",
refresh_token="ref-a1",
expires_at=int(time.time()) - 1,
last_known_usage=make_usage(used=80),
)
alive = AccountRecord(
id="a2",
email="alive@example.com",
access_token="tok-a2",
refresh_token="ref-a2",
expires_at=int(time.time()) + 600,
last_known_usage=make_usage(used=70),
)
store = make_store(tmp_path, StateFile(accounts=[dead, alive]))
client = FakeClient(
usage_by_token={"tok-a2": make_usage(used=71)},
permanent_refresh_tokens={"ref-a1"},
)
payload = await make_manager(store, client).issue_token_response()
assert payload["token"] == "tok-a2"
saved = store.load()
assert [account.id for account in saved.accounts] == ["tok-a2@example.com"]
assert (tmp_path / "failed.txt").read_text().splitlines() == ["dead@example.com"]

View file

@ -49,8 +49,6 @@ class FakeClient:
secondary_window = ( secondary_window = (
{ {
"used_percent": self.usage.secondary_window.used_percent, "used_percent": self.usage.secondary_window.used_percent,
"limit_window_seconds": self.usage.secondary_window.limit_window_seconds,
"reset_after_seconds": self.usage.secondary_window.reset_after_seconds,
"reset_at": self.usage.secondary_window.reset_at, "reset_at": self.usage.secondary_window.reset_at,
} }
if self.usage.secondary_window is not None if self.usage.secondary_window is not None
@ -64,8 +62,6 @@ class FakeClient:
"limit_reached": self.usage.limit_reached, "limit_reached": self.usage.limit_reached,
"primary_window": { "primary_window": {
"used_percent": primary_window.used_percent, "used_percent": primary_window.used_percent,
"limit_window_seconds": primary_window.limit_window_seconds,
"reset_after_seconds": primary_window.reset_after_seconds,
"reset_at": primary_window.reset_at, "reset_at": primary_window.reset_at,
}, },
"secondary_window": secondary_window, "secondary_window": secondary_window,
@ -76,15 +72,10 @@ class FakeClient:
def make_usage(primary: int, secondary: int | None = None) -> UsageSnapshot: def make_usage(primary: int, secondary: int | None = None) -> UsageSnapshot:
return UsageSnapshot( return UsageSnapshot(
checked_at=1775000000, checked_at=1775000000,
used_percent=max(primary, secondary or 0), primary_window=UsageWindow(primary, 1775000100),
remaining_percent=max(0, 100 - max(primary, secondary or 0)), secondary_window=UsageWindow(secondary or 0, 1775000100)
exhausted=False,
primary_window=UsageWindow(primary, 18000, 100, 1775000100),
secondary_window=UsageWindow(secondary, 604800, 100, 1775000100)
if secondary is not None if secondary is not None
else None, else None,
limit_reached=False,
allowed=True,
) )
@ -147,23 +138,24 @@ async def test_exchange_and_store_account_populates_usage_snapshot(
False, False,
) )
assert account.last_known_usage is not None assert account.usage is not None
assert account.id == "oauth@example.com" assert account.email == "oauth@example.com"
assert account.last_known_usage.primary_window is not None assert account.usage.primary_window is not None
assert account.last_known_usage.primary_window.used_percent == 12 assert account.usage.primary_window.used_percent == 12
assert account.last_known_usage.secondary_window is not None assert account.usage.secondary_window is not None
assert account.last_known_usage.secondary_window.used_percent == 3 assert account.usage.secondary_window.used_percent == 3
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_exchange_and_store_account_keeps_tokens_on_transient_usage_failure( async def test_exchange_and_store_account_raises_on_transient_usage_failure(
tmp_path: Path, tmp_path: Path,
) -> None: ) -> None:
store = JsonStateStore(tmp_path / "accounts.json") store = JsonStateStore(tmp_path / "accounts.json")
settings = Settings(data_dir=tmp_path) settings = Settings(data_dir=tmp_path)
client = FakeClient(settings, make_usage(12, 3), transient_usage_failure=True) client = FakeClient(settings, make_usage(12, 3), transient_usage_failure=True)
account = await exchange_and_store_account( with pytest.raises(OpenAIAPIError):
await exchange_and_store_account(
store, store,
cast(Any, client), cast(Any, client),
"code", "code",
@ -171,7 +163,4 @@ async def test_exchange_and_store_account_keeps_tokens_on_transient_usage_failur
False, False,
) )
saved = store.load() assert store.load().accounts == []
assert account.last_known_usage is None
assert saved.accounts[0].access_token == "access-token"
assert saved.accounts[0].last_error is not None

View file

@ -17,15 +17,10 @@ import refresh_limits # type: ignore[import-not-found]
def make_usage(primary: int, secondary: int | None = None) -> UsageSnapshot: def make_usage(primary: int, secondary: int | None = None) -> UsageSnapshot:
return UsageSnapshot( return UsageSnapshot(
checked_at=int(time.time()), checked_at=int(time.time()),
used_percent=max(primary, secondary or 0), primary_window=UsageWindow(primary, int(time.time()) + 10),
remaining_percent=max(0, 100 - max(primary, secondary or 0)), secondary_window=UsageWindow(secondary or 0, int(time.time()) + 10)
exhausted=False,
primary_window=UsageWindow(primary, 18000, 10, int(time.time()) + 10),
secondary_window=UsageWindow(secondary, 604800, 10, int(time.time()) + 10)
if secondary is not None if secondary is not None
else None, else None,
limit_reached=False,
allowed=True,
) )
@ -49,8 +44,6 @@ class FakeClient:
secondary_window = ( secondary_window = (
{ {
"used_percent": usage.secondary_window.used_percent, "used_percent": usage.secondary_window.used_percent,
"limit_window_seconds": usage.secondary_window.limit_window_seconds,
"reset_after_seconds": usage.secondary_window.reset_after_seconds,
"reset_at": usage.secondary_window.reset_at, "reset_at": usage.secondary_window.reset_at,
} }
if usage.secondary_window is not None if usage.secondary_window is not None
@ -64,8 +57,6 @@ class FakeClient:
"limit_reached": usage.limit_reached, "limit_reached": usage.limit_reached,
"primary_window": { "primary_window": {
"used_percent": primary_window.used_percent, "used_percent": primary_window.used_percent,
"limit_window_seconds": primary_window.limit_window_seconds,
"reset_after_seconds": primary_window.reset_after_seconds,
"reset_at": primary_window.reset_at, "reset_at": primary_window.reset_at,
}, },
"secondary_window": secondary_window, "secondary_window": secondary_window,
@ -82,11 +73,10 @@ async def test_refresh_limits_updates_all_accounts(
StateFile( StateFile(
accounts=[ accounts=[
AccountRecord( AccountRecord(
id="acc@example.com",
email="acc@example.com", email="acc@example.com",
access_token="tok-a1", access_token="tok-a1",
refresh_token="ref-a1", refresh_token="ref-a1",
expires_at=int(time.time()) + 600, token_refresh_at=int(time.time()) + 600,
) )
] ]
) )
@ -96,9 +86,9 @@ async def test_refresh_limits_updates_all_accounts(
await refresh_limits.run(tmp_path) await refresh_limits.run(tmp_path)
state = store.load() state = store.load()
assert state.accounts[0].last_known_usage is not None assert state.accounts[0].usage is not None
assert state.accounts[0].last_known_usage.primary_window is not None assert state.accounts[0].usage.primary_window is not None
assert state.accounts[0].last_known_usage.primary_window.used_percent == 12 assert state.accounts[0].usage.primary_window.used_percent == 12
@pytest.mark.asyncio @pytest.mark.asyncio
@ -110,11 +100,10 @@ async def test_refresh_limits_removes_permanently_failed_account(
StateFile( StateFile(
accounts=[ accounts=[
AccountRecord( AccountRecord(
id="dead@example.com",
email="dead@example.com", email="dead@example.com",
access_token="tok-a1", access_token="tok-a1",
refresh_token="ref-a1", refresh_token="ref-a1",
expires_at=int(time.time()) + 600, token_refresh_at=int(time.time()) + 600,
) )
] ]
) )
@ -129,4 +118,5 @@ async def test_refresh_limits_removes_permanently_failed_account(
state = store.load() state = store.load()
assert state.accounts == [] assert state.accounts == []
assert (tmp_path / "failed.txt").read_text().splitlines() == ["dead@example.com"] failed = JsonStateStore(tmp_path / "accounts.json").load_failed_accounts()
assert [account.email for account in failed] == ["dead@example.com"]

View file

@ -6,79 +6,68 @@ from gibby.models import AccountRecord, StateFile, UsageSnapshot, UsageWindow
from gibby.store import JsonStateStore from gibby.store import JsonStateStore
def test_store_writes_canonical_usage_snapshot_shape(tmp_path) -> None: def test_store_writes_minimal_accounts_schema(tmp_path) -> None:
store = JsonStateStore(tmp_path / "accounts.json") store = JsonStateStore(tmp_path / "accounts.json")
snapshot = UsageSnapshot(
checked_at=1000,
used_percent=75,
remaining_percent=25,
exhausted=False,
primary_window=UsageWindow(75, 18000, 300, 1300),
secondary_window=UsageWindow(10, 604800, 3600, 4600),
limit_reached=False,
allowed=True,
)
store.save( store.save(
StateFile( StateFile(
active_account="acc@example.com",
accounts=[ accounts=[
AccountRecord( AccountRecord(
id="acc@example.com",
email="acc@example.com", email="acc@example.com",
access_token="tok", access_token="tok",
refresh_token="ref", refresh_token="ref",
expires_at=2000, token_refresh_at=2000,
last_known_usage=snapshot, usage=UsageSnapshot(
checked_at=1000,
primary_window=UsageWindow(used_percent=70, reset_at=1300),
secondary_window=UsageWindow(used_percent=20, reset_at=4600),
),
usage_checked_at=1000,
disabled=False,
) )
] ],
) )
) )
payload = json.loads((tmp_path / "accounts.json").read_text()) payload = json.loads((tmp_path / "accounts.json").read_text())
saved_snapshot = payload["accounts"][0]["last_known_usage"]
assert set(saved_snapshot) == { assert payload == {
"checked_at", "active_account": "acc@example.com",
"primary_window", "accounts": [
"secondary_window", {
"limit_reached", "email": "acc@example.com",
"allowed", "access_token": "tok",
"refresh_token": "ref",
"token_refresh_at": 2000,
"usage": {
"primary": {"used_percent": 70, "reset_at": 1300},
"secondary": {"used_percent": 20, "reset_at": 4600},
},
"usage_checked_at": 1000,
"disabled": False,
}
],
} }
def test_store_load_reconstructs_derived_usage_fields(tmp_path) -> None: def test_store_load_reconstructs_account_state(tmp_path) -> None:
path = tmp_path / "accounts.json" path = tmp_path / "accounts.json"
path.write_text( path.write_text(
json.dumps( json.dumps(
{ {
"version": 1, "active_account": "acc@example.com",
"active_account_id": "acc@example.com",
"accounts": [ "accounts": [
{ {
"id": "acc@example.com",
"email": "acc@example.com", "email": "acc@example.com",
"account_id": "acc-1",
"access_token": "tok", "access_token": "tok",
"refresh_token": "ref", "refresh_token": "ref",
"expires_at": 2000, "token_refresh_at": 2000,
"cooldown_until": None, "usage": {
"last_known_usage": { "primary": {"used_percent": 80, "reset_at": 1300},
"checked_at": 1000, "secondary": {"used_percent": 15, "reset_at": 4600},
"primary_window": {
"used_percent": 80,
"limit_window_seconds": 18000,
"reset_after_seconds": 300,
"reset_at": 1300,
}, },
"secondary_window": { "usage_checked_at": 1000,
"used_percent": 100, "disabled": True,
"limit_window_seconds": 604800,
"reset_after_seconds": 3600,
"reset_at": 4600,
},
"limit_reached": False,
"allowed": True,
},
"last_error": None,
} }
], ],
} }
@ -86,11 +75,40 @@ def test_store_load_reconstructs_derived_usage_fields(tmp_path) -> None:
) )
state = JsonStateStore(path).load() state = JsonStateStore(path).load()
snapshot = state.accounts[0].last_known_usage
assert snapshot is not None assert state.active_account == "acc@example.com"
assert snapshot.used_percent == 100 assert state.accounts[0].email == "acc@example.com"
assert snapshot.remaining_percent == 0 assert state.accounts[0].token_refresh_at == 2000
assert snapshot.exhausted is True assert state.accounts[0].usage is not None
assert snapshot.limit_reached is False assert state.accounts[0].usage.primary_window is not None
assert snapshot.allowed is True assert state.accounts[0].usage.primary_window.used_percent == 80
assert state.accounts[0].disabled is True
def test_append_failed_account_writes_failed_json_shape(tmp_path) -> None:
store = JsonStateStore(tmp_path / "accounts.json")
store.append_failed_account(
AccountRecord(
email="failed@example.com",
access_token="tok",
refresh_token="ref",
token_refresh_at=2000,
disabled=False,
)
)
payload = json.loads((tmp_path / "failed.json").read_text())
assert payload == {
"accounts": [
{
"email": "failed@example.com",
"access_token": "tok",
"refresh_token": "ref",
"token_refresh_at": 2000,
"usage": None,
"usage_checked_at": None,
"disabled": False,
}
]
}