import json import logging import os import time import traceback from contextlib import asynccontextmanager from datetime import datetime, timezone from typing import Any, cast from fastapi import FastAPI, HTTPException, Query, Request from pydantic import BaseModel, Field from pyrogram.client import Client from pyrogram.raw.functions.account.get_notify_exceptions import GetNotifyExceptions from pyrogram.raw.functions.account.get_notify_settings import GetNotifySettings from pyrogram.raw.functions.messages.get_dialog_filters import GetDialogFilters from pyrogram.raw.types.input_notify_broadcasts import InputNotifyBroadcasts from pyrogram.raw.types.input_notify_chats import InputNotifyChats from pyrogram.raw.types.input_notify_users import InputNotifyUsers from pyrogram.types import Dialog from starlette.middleware.base import BaseHTTPMiddleware from starlette.responses import JSONResponse, PlainTextResponse logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", ) logger = logging.getLogger(__name__) logging.getLogger("uvicorn.access").setLevel(logging.WARNING) API_ID = int(os.getenv("API_ID", "0")) API_HASH = os.getenv("API_HASH", "") MAX_LIMIT = 100 DIALOGS_CACHE_TTL = 300 MESSAGES_CACHE_TTL = 1200 HEAD_REFRESH_LIMIT = 20 client = Client("tg_proxy", api_id=API_ID, api_hash=API_HASH, workdir="session") _dialogs_cache: list[Dialog] = [] _dialogs_cache_time: float = 0 _dialog_filters_cache: list[Any] = [] _dialog_filters_cache_time: float = 0 _folder_membership_cache: dict[int, list[int]] = {} _folder_membership_cache_time: float = 0 _global_notify_settings: dict[str, int] = {} _notify_exceptions_cache: dict[int, int] = {} _raw_message_cache: dict[int, dict[int, tuple[Any, float]]] = {} _history_page_cache: dict[ tuple[int, int, int], tuple[int | None, list[Any], float] ] = {} _delta_cache: dict[tuple[int, str], tuple[int | None, list[Any], float]] = {} class Chat(BaseModel): id: int title: str | None type: str chat_type: str username: str | None = None members_count: int | None = None is_pinned: bool = False pinned: bool = False last_message_date: datetime | None = None unread_count: int = 0 is_muted: bool = False muted: bool = False archived: bool = False folder_id: int | None = None folder_ids: list[int] | None = None last_online_at: datetime | None = None class DialogFolder(BaseModel): id: int title: str | None = None type: str icon_emoji: str | None = None pinned_chat_ids: list[int] = Field(default_factory=list) include_chat_ids: list[int] = Field(default_factory=list) exclude_chat_ids: list[int] = Field(default_factory=list) contacts: bool = False non_contacts: bool = False groups: bool = False broadcasts: bool = False bots: bool = False exclude_muted: bool = False exclude_read: bool = False exclude_archived: bool = False has_my_invites: bool | None = None class Attachment(BaseModel): type: str filename: str | None = None mime: str | None = None duration: int | None = None size: int | None = None class Message(BaseModel): id: int date: datetime | None text: str | None from_user: str | None chat_id: int from_me: bool | None = None is_outgoing: bool | None = None reply_to_message_id: int | None = None quoted_text: str | None = None reply_snippet: str | None = None edited_at: datetime | None = None is_read: bool | None = None attachments: list[Attachment] | None = None class PaginatedChats(BaseModel): items: list[Chat] limit: int offset: int has_more: bool remaining_count: int | None = None class PaginatedMessages(BaseModel): chat: Chat | None = None items: list[Message] limit: int offset: int has_more: bool remaining_count: int | None = None class AccessLogMiddleware(BaseHTTPMiddleware): async def dispatch(self, request: Request, call_next): response = await call_next(request) xff = request.headers.get("x-forwarded-for", "") client_ip = ( xff.split(",")[0].strip() if xff else request.client.host if request.client else "-" ) logger.info( f'{client_ip} - "{request.method} {request.url.path}" {response.status_code}' ) return response class PrettyJSONResponse(JSONResponse): def render(self, content) -> bytes: return json.dumps( content, ensure_ascii=False, allow_nan=False, indent=2, ).encode("utf-8") def validate_limit(limit: int) -> int: if limit > MAX_LIMIT: raise HTTPException(status_code=400, detail=f"Limit cannot exceed {MAX_LIMIT}") return limit def normalize_chat_type(chat_type: str) -> str: if chat_type in ("private", "bot", "direct"): return "direct" if chat_type in ("group", "supergroup", "forum"): return "group" if chat_type == "channel": return "channel" return chat_type def matches_chat_type_filter(chat: Chat, chat_type_filter: str | None) -> bool: if chat_type_filter is None: return True normalized_filter = chat_type_filter.strip().lower() if normalized_filter == "direct": return chat.type in ("private", "direct") if normalized_filter == "bot": return chat.type == "bot" if normalized_filter == "group": return chat.type in ("group", "supergroup", "forum") if normalized_filter == "channel": return chat.type == "channel" return False def normalize_int(value) -> int | None: if value is None: return None try: return int(value) except (TypeError, ValueError): return None def normalize_datetime(value: datetime | None) -> datetime | None: if value is None: return None if value.tzinfo is None: return value.replace(tzinfo=timezone.utc) return value.astimezone(timezone.utc) def raw_peer_to_chat_id(peer: Any) -> int | None: if peer is None: return None if hasattr(peer, "user_id"): return normalize_int(getattr(peer, "user_id", None)) if hasattr(peer, "chat_id"): chat_id = normalize_int(getattr(peer, "chat_id", None)) return -chat_id if chat_id is not None else None if hasattr(peer, "channel_id"): channel_id = normalize_int(getattr(peer, "channel_id", None)) if channel_id is None: return None return int(f"-100{channel_id}") nested_peer = getattr(peer, "peer", None) if nested_peer is not None and nested_peer is not peer: return raw_peer_to_chat_id(nested_peer) return None def extract_filter_title(raw_filter: Any) -> str | None: title = getattr(raw_filter, "title", None) if title is None: return None if isinstance(title, str): return title text = getattr(title, "text", None) if isinstance(text, str): return text return str(title) def collect_filter_chat_ids(peers: list[Any] | None) -> list[int]: chat_ids: list[int] = [] if not peers: return chat_ids for peer in peers: chat_id = raw_peer_to_chat_id(peer) if chat_id is not None: chat_ids.append(chat_id) return sorted(set(chat_ids)) def build_dialog_folder(raw_filter: Any) -> DialogFolder | None: folder_id = normalize_int(getattr(raw_filter, "id", None)) if folder_id is None: return None filter_name = raw_filter.__class__.__name__ if filter_name == "DialogFilterDefault": return None folder_type = "chatlist" if filter_name == "DialogFilterChatlist" else "folder" return DialogFolder( id=folder_id, title=extract_filter_title(raw_filter), type=folder_type, icon_emoji=getattr(raw_filter, "emoticon", None), pinned_chat_ids=collect_filter_chat_ids( getattr(raw_filter, "pinned_peers", None) ), include_chat_ids=collect_filter_chat_ids( getattr(raw_filter, "include_peers", None) ), exclude_chat_ids=collect_filter_chat_ids( getattr(raw_filter, "exclude_peers", None) ), contacts=bool(getattr(raw_filter, "contacts", False)), non_contacts=bool(getattr(raw_filter, "non_contacts", False)), groups=bool(getattr(raw_filter, "groups", False)), broadcasts=bool(getattr(raw_filter, "broadcasts", False)), bots=bool(getattr(raw_filter, "bots", False)), exclude_muted=bool(getattr(raw_filter, "exclude_muted", False)), exclude_read=bool(getattr(raw_filter, "exclude_read", False)), exclude_archived=bool(getattr(raw_filter, "exclude_archived", False)), has_my_invites=getattr(raw_filter, "has_my_invites", None), ) def dialog_is_read(dialog: Dialog) -> bool: return (getattr(dialog, "unread_messages_count", 0) or 0) == 0 def dialog_matches_folder(dialog: Dialog, chat: Chat, raw_filter: Any) -> bool: filter_name = raw_filter.__class__.__name__ if filter_name == "DialogFilterDefault": return False folder = build_dialog_folder(raw_filter) if folder is None: return False explicit_include_ids = set(folder.pinned_chat_ids) | set(folder.include_chat_ids) explicit_exclude_ids = set(folder.exclude_chat_ids) if chat.id in explicit_exclude_ids: return False matches_positive_rule = chat.id in explicit_include_ids if filter_name == "DialogFilterChatlist": return matches_positive_rule chat_type = chat.type is_bot = chat_type == "bot" or bool(getattr(dialog.chat, "is_bot", False)) is_group = chat_type in ("group", "supergroup", "forum") is_broadcast = chat_type == "channel" is_contact = bool(getattr(dialog.chat, "is_contact", False)) is_private = chat_type in ("private", "direct") is_non_contact = is_private and not is_contact and not is_bot if folder.contacts and is_contact: matches_positive_rule = True if folder.non_contacts and is_non_contact: matches_positive_rule = True if folder.groups and is_group: matches_positive_rule = True if folder.broadcasts and is_broadcast: matches_positive_rule = True if folder.bots and is_bot: matches_positive_rule = True if not matches_positive_rule: return False if folder.exclude_muted and chat.is_muted: return False if folder.exclude_read and dialog_is_read(dialog): return False if folder.exclude_archived and chat.archived: return False return True def build_folder_membership_map( dialogs: list[Dialog], raw_filters: list[Any] ) -> dict[int, list[int]]: memberships: dict[int, list[int]] = {} for dialog in dialogs: chat = build_chat(dialog) matched_folder_ids: list[int] = [] for raw_filter in raw_filters: folder = build_dialog_folder(raw_filter) if folder is None: continue if dialog_matches_folder(dialog, chat, raw_filter): matched_folder_ids.append(folder.id) memberships[chat.id] = sorted(set(matched_folder_ids)) return memberships def extract_message_snippet(message) -> str | None: if not message: return None for attr in ("text", "caption"): value = getattr(message, attr, None) if value: return value[:200] media = getattr(message, "media", None) if media: media_value = getattr(media, "value", None) or str(media) return f"[{media_value}]" return None def build_attachments(message) -> list[Attachment] | None: attachments: list[Attachment] = [] media_type = getattr(getattr(message, "media", None), "value", None) if media_type == "photo": photo = getattr(message, "photo", None) attachments.append( Attachment( type="photo", mime=getattr(photo, "mime_type", None), size=normalize_int(getattr(photo, "file_size", None)), ) ) elif media_type == "document": document = getattr(message, "document", None) attachments.append( Attachment( type="document", filename=getattr(document, "file_name", None), mime=getattr(document, "mime_type", None), size=normalize_int(getattr(document, "file_size", None)), ) ) elif media_type == "video": video = getattr(message, "video", None) attachments.append( Attachment( type="video", filename=getattr(video, "file_name", None), mime=getattr(video, "mime_type", None), duration=normalize_int(getattr(video, "duration", None)), size=normalize_int(getattr(video, "file_size", None)), ) ) elif media_type == "audio": audio = getattr(message, "audio", None) attachments.append( Attachment( type="audio", filename=getattr(audio, "file_name", None), mime=getattr(audio, "mime_type", None), duration=normalize_int(getattr(audio, "duration", None)), size=normalize_int(getattr(audio, "file_size", None)), ) ) elif media_type == "voice": voice = getattr(message, "voice", None) attachments.append( Attachment( type="voice", mime=getattr(voice, "mime_type", None), duration=normalize_int(getattr(voice, "duration", None)), size=normalize_int(getattr(voice, "file_size", None)), ) ) elif media_type == "animation": animation = getattr(message, "animation", None) attachments.append( Attachment( type="animation", filename=getattr(animation, "file_name", None), mime=getattr(animation, "mime_type", None), duration=normalize_int(getattr(animation, "duration", None)), size=normalize_int(getattr(animation, "file_size", None)), ) ) elif media_type == "sticker": sticker = getattr(message, "sticker", None) attachments.append( Attachment( type="sticker", filename=getattr(sticker, "file_name", None), mime=getattr(sticker, "mime_type", None), size=normalize_int(getattr(sticker, "file_size", None)), ) ) elif media_type == "video_note": video_note = getattr(message, "video_note", None) attachments.append( Attachment( type="video_note", duration=normalize_int(getattr(video_note, "duration", None)), size=normalize_int(getattr(video_note, "file_size", None)), ) ) return attachments or None async def get_cached_dialogs() -> list[Dialog]: global \ _dialogs_cache, \ _dialogs_cache_time, \ _global_notify_settings, \ _notify_exceptions_cache now = time.time() if _dialogs_cache and (now - _dialogs_cache_time) < DIALOGS_CACHE_TTL: logger.info("Returning dialogs from cache") return _dialogs_cache logger.info("Fetching dialogs from Telegram...") dialogs = [] async for dialog in client.get_dialogs(): dialogs.append(dialog) _dialogs_cache = dialogs _dialogs_cache_time = now logger.info(f"Cached {len(dialogs)} dialogs") logger.info("Fetching global notify settings...") global_settings: dict[str, int] = {} try: for name, input_notify in [ ("users", InputNotifyUsers()), ("chats", InputNotifyChats()), ("broadcasts", InputNotifyBroadcasts()), ]: result = await client.invoke(GetNotifySettings(peer=input_notify)) mute_until = getattr(result, "mute_until", 0) or 0 global_settings[name] = mute_until logger.info(f"Global {name} mute_until: {mute_until}") except Exception as e: logger.warning(f"Failed to fetch global notify settings: {e}") _global_notify_settings = global_settings logger.info("Fetching notify exceptions...") exceptions: dict[int, int] = {} try: for input_notify in [ InputNotifyUsers(), InputNotifyChats(), InputNotifyBroadcasts(), ]: result = await client.invoke(GetNotifyExceptions(peer=input_notify)) updates = cast(list[Any], getattr(result, "updates", [])) for update in updates: notify_peer = getattr(update, "peer", None) notify_settings = getattr(update, "notify_settings", None) if notify_peer is None or notify_settings is None: continue peer = getattr(notify_peer, "peer", notify_peer) chat_id = None if hasattr(peer, "user_id"): chat_id = peer.user_id elif hasattr(peer, "chat_id"): chat_id = peer.chat_id elif hasattr(peer, "channel_id"): chat_id = -100 - peer.channel_id if chat_id is not None: mute_until = getattr(notify_settings, "mute_until", 0) or 0 exceptions[chat_id] = mute_until except Exception as e: logger.warning(f"Failed to fetch notify exceptions: {e}") _notify_exceptions_cache = exceptions logger.info(f"Cached {len(exceptions)} notify exceptions") return dialogs async def get_cached_dialog_filters() -> list[Any]: global _dialog_filters_cache, _dialog_filters_cache_time now = time.time() if _dialog_filters_cache and (now - _dialog_filters_cache_time) < DIALOGS_CACHE_TTL: logger.info("Returning dialog filters from cache") return _dialog_filters_cache logger.info("Fetching dialog filters from Telegram...") try: result = await client.invoke(GetDialogFilters()) filters = list(cast(list[Any], getattr(result, "filters", []))) except Exception as e: logger.warning(f"Failed to fetch dialog filters: {e}") return [] _dialog_filters_cache = filters _dialog_filters_cache_time = now logger.info(f"Cached {len(filters)} dialog filters") return filters async def get_cached_folder_memberships() -> dict[int, list[int]]: global _folder_membership_cache, _folder_membership_cache_time now = time.time() if ( _folder_membership_cache and (now - _folder_membership_cache_time) < DIALOGS_CACHE_TTL ): logger.info("Returning folder memberships from cache") return _folder_membership_cache dialogs = await get_cached_dialogs() raw_filters = await get_cached_dialog_filters() memberships = build_folder_membership_map(dialogs, raw_filters) _folder_membership_cache = memberships _folder_membership_cache_time = now logger.info(f"Cached folder memberships for {len(memberships)} chats") return memberships def gc_message_caches() -> None: now = time.time() for chat_id in list(_raw_message_cache.keys()): messages = _raw_message_cache[chat_id] stale_ids = [ message_id for message_id, (_, fetched_at) in messages.items() if (now - fetched_at) >= MESSAGES_CACHE_TTL ] for message_id in stale_ids: del messages[message_id] if not messages: del _raw_message_cache[chat_id] stale_history_keys = [ key for key, (_, _, fetched_at) in _history_page_cache.items() if (now - fetched_at) >= MESSAGES_CACHE_TTL ] for key in stale_history_keys: del _history_page_cache[key] stale_delta_keys = [ key for key, (_, _, fetched_at) in _delta_cache.items() if (now - fetched_at) >= MESSAGES_CACHE_TTL ] for key in stale_delta_keys: del _delta_cache[key] def cache_raw_messages(chat_id: int, messages: list[Any]) -> None: if not messages: return now = time.time() chat_cache = _raw_message_cache.setdefault(chat_id, {}) for message in messages: chat_cache[message.id] = (message, now) def get_cached_raw_message(chat_id: int, message_id: int) -> Any | None: chat_cache = _raw_message_cache.get(chat_id) if not chat_cache: return None cached_entry = chat_cache.get(message_id) if not cached_entry: return None message, fetched_at = cached_entry if (time.time() - fetched_at) >= MESSAGES_CACHE_TTL: del chat_cache[message_id] if not chat_cache: del _raw_message_cache[chat_id] return None return message async def fetch_raw_messages(chat_id: int, limit: int, offset: int = 0) -> list[Any]: messages = [] async for message in client.get_chat_history(chat_id, limit=limit, offset=offset): messages.append(message) cache_raw_messages(chat_id, messages) return messages async def refresh_chat_head(chat_id: int) -> tuple[int | None, list[Any]]: fresh_messages = await fetch_raw_messages(chat_id, HEAD_REFRESH_LIMIT) head_id = fresh_messages[0].id if fresh_messages else None return head_id, fresh_messages async def get_cached_or_fetch_history_page( chat_id: int, limit: int, offset: int, ) -> list[Any]: gc_message_caches() head_id, _ = await refresh_chat_head(chat_id) cache_key = (chat_id, limit, offset) cached_entry = _history_page_cache.get(cache_key) if cached_entry is not None: cached_head_id, cached_messages, fetched_at = cached_entry if ( cached_head_id == head_id and (time.time() - fetched_at) < MESSAGES_CACHE_TTL ): cache_raw_messages(chat_id, cached_messages) return cached_messages messages = await fetch_raw_messages(chat_id, limit + 1, offset) _history_page_cache[cache_key] = (head_id, messages, time.time()) return messages async def get_cached_or_fetch_delta_messages( chat_id: int, since: datetime ) -> list[Any]: gc_message_caches() head_id, _ = await refresh_chat_head(chat_id) normalized_since = normalize_datetime(since) since_key = normalized_since.isoformat() if normalized_since else "none" cache_key = (chat_id, since_key) cached_entry = _delta_cache.get(cache_key) if cached_entry is not None: cached_head_id, cached_messages, fetched_at = cached_entry if ( cached_head_id == head_id and (time.time() - fetched_at) < MESSAGES_CACHE_TTL ): cache_raw_messages(chat_id, cached_messages) return cached_messages messages: list[Any] = [] async for message in client.get_chat_history(chat_id): message_date = normalize_datetime(message.date) if message_date and normalized_since and message_date < normalized_since: break messages.append(message) messages.reverse() cache_raw_messages(chat_id, messages) _delta_cache[cache_key] = (head_id, messages, time.time()) return messages async def get_dialog_by_chat_id(chat_id: int) -> Dialog | None: dialogs = await get_cached_dialogs() for dialog in dialogs: if dialog.chat.id == chat_id: return dialog return None def build_chat(dialog: Dialog, folder_ids: list[int] | None = None) -> Chat: chat = dialog.chat chat_type = str(chat.type.value) if chat.type else "unknown" if chat.id in _notify_exceptions_cache: muted_until = _notify_exceptions_cache[chat.id] elif chat_type in ("private", "bot", "direct"): muted_until = _global_notify_settings.get("users", 0) elif chat_type in ("group", "supergroup", "forum"): muted_until = _global_notify_settings.get("chats", 0) elif chat_type == "channel": muted_until = _global_notify_settings.get("broadcasts", 0) else: muted_until = 0 return Chat( id=chat.id or 0, title=chat.title or chat.first_name, type=chat_type, chat_type=normalize_chat_type(chat_type), username=chat.username, members_count=chat.members_count, is_pinned=dialog.is_pinned or False, pinned=dialog.is_pinned or False, last_message_date=dialog.top_message.date if dialog.top_message else None, unread_count=dialog.unread_messages_count or 0, is_muted=muted_until != 0, muted=muted_until != 0, archived=bool(dialog.folder_id), folder_id=getattr(dialog, "folder_id", None), folder_ids=folder_ids, last_online_at=getattr(chat, "last_online_date", None), ) def build_message(message, chat_id: int, dialog: Dialog | None = None) -> Message: from_me = getattr(message, "outgoing", None) if dialog is None: is_read = None elif from_me: read_max_id = getattr(dialog, "read_outbox_max_id", None) is_read = None if read_max_id is None else message.id <= read_max_id else: read_max_id = getattr(dialog, "read_inbox_max_id", None) is_read = None if read_max_id is None else message.id <= read_max_id return Message( id=message.id, date=message.date, text=message.text, from_user=message.from_user.first_name if message.from_user else None, chat_id=chat_id, from_me=from_me, is_outgoing=from_me, reply_to_message_id=getattr(message, "reply_to_message_id", None), quoted_text=None, reply_snippet=None, edited_at=getattr(message, "edit_date", None), is_read=is_read, attachments=build_attachments(message), ) async def enrich_reply_fields( messages: list[Message], chat_id: int, reply_cache: dict[int, str | None], ) -> list[Message]: messages_by_id: dict[int, Message] = {message.id: message for message in messages} missing_reply_ids: list[int] = [] for message in messages: reply_to_message_id = message.reply_to_message_id if not reply_to_message_id: continue if reply_to_message_id in messages_by_id: continue if reply_to_message_id in reply_cache: continue cached_reply = get_cached_raw_message(chat_id, reply_to_message_id) if cached_reply is not None: reply_cache[reply_to_message_id] = extract_message_snippet(cached_reply) continue if reply_to_message_id not in missing_reply_ids: missing_reply_ids.append(reply_to_message_id) for i in range(0, len(missing_reply_ids), 200): batch_ids = missing_reply_ids[i : i + 200] fetched_replies = await client.get_messages(chat_id, batch_ids) fetched_replies_list = list(fetched_replies) cache_raw_messages(chat_id, fetched_replies_list) for reply in fetched_replies_list: reply_cache[reply.id] = extract_message_snippet(reply) for reply_id in batch_ids: reply_cache.setdefault(reply_id, None) for message in messages: reply_to_message_id = message.reply_to_message_id if not reply_to_message_id: continue reply_message = messages_by_id.get(reply_to_message_id) if reply_message is not None: reply_snippet = extract_message_snippet(reply_message) else: reply_snippet = reply_cache.get(reply_to_message_id) message.quoted_text = reply_snippet message.reply_snippet = reply_snippet return messages @asynccontextmanager async def lifespan(_: FastAPI): os.makedirs("session", exist_ok=True) logger.info("Starting Telegram client...") await client.start() logger.info("Telegram client started") yield logger.info("Stopping Telegram client...") await client.stop() logger.info("Telegram client stopped") app = FastAPI(lifespan=lifespan, default_response_class=PrettyJSONResponse) app.add_middleware(AccessLogMiddleware) @app.exception_handler(Exception) async def unhandled_exception_handler(_: Request, exc: Exception): details = "".join(traceback.format_exception(type(exc), exc, exc.__traceback__)) logger.exception("Unhandled exception") return PlainTextResponse(details, status_code=500) @app.get( "/folders", response_model=list[DialogFolder], response_model_exclude_none=True ) async def get_folders() -> list[DialogFolder]: raw_filters = await get_cached_dialog_filters() folders: list[DialogFolder] = [] for raw_filter in raw_filters: folder = build_dialog_folder(raw_filter) if folder is not None: folders.append(folder) return sorted(folders, key=lambda folder: folder.id) @app.get("/chats", response_model_exclude_none=True) async def get_chats( limit: int = Query(default=50, ge=1), offset: int = Query(default=0, ge=0), archived: bool | None = Query(default=None), chat_type: str | None = Query(default=None), folder_id: int | None = Query(default=None), ) -> PaginatedChats: validate_limit(limit) dialogs = await get_cached_dialogs() folder_memberships = ( await get_cached_folder_memberships() if folder_id is not None else {} ) if folder_id is not None: folders = await get_folders() valid_folder_ids = {folder.id for folder in folders} if folder_id not in valid_folder_ids: raise HTTPException( status_code=400, detail=f"Unknown folder_id: {folder_id}" ) chats = [] matched_count = 0 for dialog in dialogs: dialog_chat_id = dialog.chat.id or 0 chat = build_chat(dialog, folder_memberships.get(dialog_chat_id)) if archived is not None and chat.archived != archived: continue if not matches_chat_type_filter(chat, chat_type): continue if folder_id is not None and folder_id not in (chat.folder_ids or []): continue if matched_count < offset: matched_count += 1 continue chats.append(chat) if len(chats) >= limit + 1: break has_more = len(chats) > limit total_matching = 0 for dialog in dialogs: dialog_chat_id = dialog.chat.id or 0 chat = build_chat(dialog, folder_memberships.get(dialog_chat_id)) if archived is not None and chat.archived != archived: continue if not matches_chat_type_filter(chat, chat_type): continue if folder_id is not None and folder_id not in (chat.folder_ids or []): continue total_matching += 1 remaining_count = max(0, total_matching - offset - len(chats[:limit])) return PaginatedChats( items=chats[:limit], limit=limit, offset=offset, has_more=has_more, remaining_count=remaining_count, ) @app.get("/chats/{chat_id}/messages", response_model_exclude_none=True) async def get_chat_messages( chat_id: int, limit: int = Query(default=50, ge=1), offset: int = Query(default=0, ge=0), ) -> PaginatedMessages: validate_limit(limit) dialog = await get_dialog_by_chat_id(chat_id) messages: list[Message] = [] folder_memberships = await get_cached_folder_memberships() raw_messages = await get_cached_or_fetch_history_page(chat_id, limit, offset) for msg in raw_messages: messages.append(build_message(msg, chat_id, dialog)) reply_cache: dict[int, str | None] = {} messages = await enrich_reply_fields(messages, chat_id, reply_cache) has_more = len(messages) > limit chat = build_chat(dialog, folder_memberships.get(chat_id)) if dialog else None return PaginatedMessages( chat=chat, items=messages[:limit], limit=limit, offset=offset, has_more=has_more, ) @app.get("/chats/{chat_id}/delta", response_model_exclude_none=True) async def get_chat_messages_delta( chat_id: int, since: datetime = Query(...), limit: int = Query(default=50, ge=0), offset: int = Query(default=0, ge=0), ) -> PaginatedMessages: validate_limit(limit) dialog = await get_dialog_by_chat_id(chat_id) messages: list[Message] = [] folder_memberships = await get_cached_folder_memberships() raw_messages = await get_cached_or_fetch_delta_messages(chat_id, since) for msg in raw_messages: messages.append(build_message(msg, chat_id, dialog)) reply_cache: dict[int, str | None] = {} messages = await enrich_reply_fields(messages, chat_id, reply_cache) if limit == 0: paginated_messages = messages[offset:] has_more = False remaining_count = 0 else: paginated_messages = messages[offset : offset + limit + 1] has_more = len(paginated_messages) > limit remaining_count = max( 0, len(messages) - offset - len(paginated_messages[:limit]) ) chat = build_chat(dialog, folder_memberships.get(chat_id)) if dialog else None items = paginated_messages if limit == 0 else paginated_messages[:limit] return PaginatedMessages( chat=chat, items=items, limit=limit, offset=offset, has_more=has_more, remaining_count=remaining_count, )