#!/bin/sh # shellcheck disable=all "exec" "${HOME}/.local/share/venv/statusbar/bin/python3" "-u" "$0" "$@" import datetime import json import os import signal import sys import time import traceback from collections.abc import Callable from dataclasses import dataclass from typing import Literal, Any, TypeVar from urllib.parse import urlparse import requests from bs4 import BeautifulSoup, Tag EMOJI_BY_STATUS = { 0: '🟡', 1: '🟢', 2: '🔴', } TIMEOUT = 30 CONFIG_FILE = f"{os.environ['HOME']}/.my.itmo" CACHE_FILE = f"{os.environ['HOME']}/.cache/my_itmo.cache" SECRET_FILE = f"{os.environ['HOME']}/.secrets/my_itmo.secret" PIPE_FILE = f"{os.environ['XDG_RUNTIME_DIR']}/my.itmo.pipe" T = TypeVar('T') def run_forever(fn: Callable, *args, **kwargs): while True: try: fn(*args, **kwargs) except Exception: print(traceback.format_exc()) def run_until_successful(fn: Callable[..., T], *args, **kwargs) -> T: while True: try: return fn(*args, **kwargs) except Exception: pass def send_message(chat_id: int, text: str, token: str): requests.post(f"https://api.telegram.org/bot{token}/sendMessage", data={ 'chat_id': chat_id, 'parse_mode': 'HTML', 'text': text }) @dataclass class StatusObject: id: int name: str notice: str status: Literal[0, 1, 2] status_name: str updated_at: datetime.datetime created_at: datetime.datetime @staticmethod def from_dict(data: dict[str, Any]): data['updated_at'] = datetime.datetime.strptime(data['updated_at'].replace("+03:00", ''), '%Y-%m-%dT%H:%M:%S') data['created_at'] = datetime.datetime.strptime(data['created_at'].replace("+03:00", ''), '%Y-%m-%dT%H:%M:%S') return StatusObject(**data) class ApiException(Exception): status_code: int body: str def __init__(self, status_code: int, body: str): super().__init__(status_code, body) self.status_code = status_code self.body = body def __str__(self): return f'Status code: {self.status_code}\nBody: {self.body}' class Api: _session: requests.Session _username: str _password: str _access_token: str _refresh_token: str _expires_in: int _refresh_expires_in: int def __init__(self, username: str, password: str, *, access_token: str | None = None, refresh_token: str | None = None, expires_in: int | None = None, refresh_expires_in: int | None = None, cookies: Any | None = None): self._session = requests.Session() self._username = username self._password = password self._refresh_token = refresh_token if refresh_token else '' self._expires_in = expires_in if expires_in else 0 self._refresh_expires_in = refresh_expires_in if refresh_expires_in else 0 if cookies: self._session.cookies.update(cookies) self._access_token = access_token if access_token else '' if access_token: self._session.headers.update({'Authorization': f'Bearer {access_token}'}) self._ensure_authorized() def _first_auth(self): self._session.headers.clear() self._session.cookies.clear() code_request = run_until_successful(self._session.get, 'https://id.itmo.ru/auth/realms/itmo/protocol/openid-connect/auth', params={ 'protocol': 'oauth2', 'response_type': 'code', 'client_id': 'student-personal-cabinet', 'redirect_uri': 'https://my.itmo.ru/login/callback', 'scope': 'openid profile', }, timeout=2) soup = BeautifulSoup(code_request.text, features='html.parser') form = soup.find('form') if not isinstance(form, Tag): raise ApiException(code_request.status_code, code_request.text) url = form.get_attribute_list('action')[0] auth_request = run_until_successful(self._session.post, url, data={'username': self._username, 'password': self._password}) if auth_request.status_code != 200: raise ApiException(auth_request.status_code, auth_request.text) parsed_url_params = {a.split('=')[0]: a.split('=')[1] for a in urlparse(auth_request.url).query.split('&')} self._get_and_save_tokens({ 'code' : parsed_url_params['code'], 'client_id': 'student-personal-cabinet', 'redirect_uri': 'https://my.itmo.ru/login/callback', 'audience': '', 'response_type': 'code', 'grant_type': 'authorization_code', 'code_verifier': '' }) def _renew(self): self._session.headers.clear() self._session.cookies.clear() self._get_and_save_tokens({ 'refresh_token': self._refresh_token, 'scopes': 'openid profile', 'client_id': 'student-personal-cabinet', 'grant_type': 'refresh_token' }) def _get_and_save_tokens(self, data: Any): tokens_request = run_until_successful(self._session.post, 'https://id.itmo.ru/auth/realms/itmo/protocol/openid-connect/token', data=data, timeout=2) if tokens_request.status_code != 200: raise ApiException(tokens_request.status_code, tokens_request.text) tokens = tokens_request.json() self._access_token = tokens['access_token'] self._expires_in = int(time.time()) + tokens['expires_in'] - 10 self._refresh_expires_in = int(time.time()) + tokens['refresh_expires_in'] - 10 self._refresh_token = tokens['refresh_token'] self._session.headers.update({"Authorization": f"Bearer {tokens_request.json()['access_token']}"}) def _ensure_authorized(self): current_time = int(time.time()) if self._access_token and self._expires_in > current_time: return elif self._refresh_token and self._refresh_expires_in > current_time: self._renew() else: self._first_auth() def _make_request(self, method: Literal["GET", "POST"], endpoint: str): self._ensure_authorized() r = run_until_successful(self._session.request, method, f'https://my.itmo.ru/api/{endpoint}', timeout=2) if r.status_code == 403: self._first_auth() # do full reauth if 403 after self._ensure_authorized() r = run_until_successful(self._session.request, method, f'https://my.itmo.ru/api/{endpoint}', timeout=2) if r.status_code != 200 or r.json()['error_code'] != 0: raise ApiException(r.status_code, r.text) return r.json() def get_status_list(self): return [StatusObject.from_dict(obj) for obj in self._make_request('GET', 'requests/my')['result']] def to_dict(self) -> Any: return { 'username': self._username, 'password': self._password, 'access_token': self._access_token, 'refresh_token': self._refresh_token, 'expires_in': self._expires_in, 'refresh_expires_in': self._refresh_expires_in, 'cookies': self._session.cookies.get_dict() } @staticmethod def from_dict(data: Any): return Api( data['username'], data['password'], access_token = data['access_token'], refresh_token = data['refresh_token'], expires_in = data['expires_in'], refresh_expires_in = data['refresh_expires_in'], cookies = data['cookies'], ) def listen_for_messages(api: Api, timeout=TIMEOUT, filter_func: Callable[[StatusObject], bool] | None = None): prev_msg = None while True: msg = list(filter(filter_func, api.get_status_list())) if not msg or msg == prev_msg: time.sleep(timeout) continue prev_msg = msg yield msg time.sleep(timeout) format_status = lambda status: f"{EMOJI_BY_STATUS[status.status]} {status.notice.split('.')[0].strip()}" format_message = lambda status: f"{EMOJI_BY_STATUS[status.status]} {status.name}\n\n{status.notice}" class IDsFilter: _ids: list[str] _update_time: float def __init__(self): self._ids = [] self._update_dict() def __call__(self, status: StatusObject) -> bool: if self._update_time + TIMEOUT < time.time(): self._update_dict() return str(status.id) in self._ids def _update_dict(self): self._update_time = time.time() try: with open(CONFIG_FILE) as file: self._ids = file.read().strip().replace(' ', '').split(',') except Exception: self._ids = [] class LastUpdateFilter: _update_time: datetime.datetime def __init__(self, ignore_now = False) -> None: self._update_time = datetime.datetime.fromtimestamp(0) if not ignore_now else datetime.datetime.now() def __call__(self, status: StatusObject): return status.updated_at >= self._update_time def update(self): self._update_time = datetime.datetime.now() def main(): api = None if os.path.isfile(CACHE_FILE): with open(CACHE_FILE) as file: api = Api.from_dict(json.load(file)) if os.path.isfile(SECRET_FILE): with open(SECRET_FILE) as secret_file: data = json.load(secret_file) owner_id = data['owner_id'] bot_token = data['bot_token'] if not api: api = Api(data['username'], data['password']) else: print("Missing secret file!", file=sys.stderr) exit(1) def die(*_): with open(CACHE_FILE, 'w') as file: json.dump(api.to_dict(), file) if os.path.isfile(PIPE_FILE): os.remove(PIPE_FILE) exit(0) signal.signal(signal.SIGTERM, die) signal.signal(signal.SIGINT, die) for message in listen_for_messages(api, filter_func=IDsFilter()): with open(PIPE_FILE, 'w') as file: print('\n'.join(map(format_status, message))) file.write(' '.join(map(format_status, message))) update_filter = LastUpdateFilter(ignore_now=True) for message in listen_for_messages(api, filter_func=update_filter): formatted_messages = list(map(format_message, message)) print('\n---\n'.join(formatted_messages)) for message in formatted_messages: send_message(owner_id, message, bot_token) update_filter.update() if __name__ == "__main__": run_forever(main) # vim: ft=python