diff --git a/DONT DELETE.png b/DONT DELETE.png new file mode 100644 index 0000000..2c1215e Binary files /dev/null and b/DONT DELETE.png differ diff --git a/avatar_small.png b/avatar_small.png new file mode 100644 index 0000000..30e96ec Binary files /dev/null and b/avatar_small.png differ diff --git a/kms.cmd b/kms.cmd new file mode 100644 index 0000000..a04948b --- /dev/null +++ b/kms.cmd @@ -0,0 +1,6536 @@ + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/merge_pdfs.sh b/merge_pdfs.sh new file mode 100644 index 0000000..52f82fb --- /dev/null +++ b/merge_pdfs.sh @@ -0,0 +1,4 @@ +pdfunite -- *.pdf out.pdf + +/usr/bin/gs -dNOPAUSE -dBATCH -sDEVICE=pdfwrite \ + -sOutputFile=fixed.pdf out.pdf diff --git a/my.itmo b/my.itmo new file mode 100755 index 0000000..21ae178 --- /dev/null +++ b/my.itmo @@ -0,0 +1,326 @@ +#!/usr/bin/env python3 +# shellcheck disable=all +# "exec" "${HOME}/.local/share/venv/statusbar/bin/python3" "-u" "$0" "$@" + +import datetime +import json +import os +import signal +import sys +import time +import traceback + +from collections.abc import Callable +from dataclasses import dataclass +from typing import Literal, Any, TypeVar +from urllib.parse import urlparse + +import requests +from bs4 import BeautifulSoup, Tag + +EMOJI_BY_STATUS = { + 0: '🟡', + 1: '🟢', + 2: '🔴', +} +TIMEOUT = 30 +CONFIG_FILE = f"{os.environ['HOME']}/.my.itmo" +CACHE_FILE = f"{os.environ['HOME']}/.cache/my_itmo.cache" +SECRET_FILE = f"{os.environ['HOME']}/.secrets/my_itmo.secret" +PIPE_FILE = f"{os.environ['XDG_RUNTIME_DIR']}/my.itmo.pipe" + +T = TypeVar('T') + + +def run_forever(fn: Callable, *args, **kwargs): + while True: + try: + fn(*args, **kwargs) + except Exception: + print(traceback.format_exc()) + + +def run_until_successful(fn: Callable[..., T], *args, **kwargs) -> T: + while True: + try: + return fn(*args, **kwargs) + except Exception: + pass + + +def send_message(chat_id: int, text: str, token: str): + requests.post(f"https://api.telegram.org/bot{token}/sendMessage", data={ + 'chat_id': chat_id, + 'parse_mode': 'HTML', + 'text': text + }) + + +@dataclass +class StatusObject: + id: int + name: str + notice: str + status: Literal[0, 1, 2] + status_name: str + updated_at: datetime.datetime + created_at: datetime.datetime + + @staticmethod + def from_dict(data: dict[str, Any]): + data['updated_at'] = datetime.datetime.strptime(data['updated_at'].replace("+03:00", ''), '%Y-%m-%dT%H:%M:%S') + data['created_at'] = datetime.datetime.strptime(data['created_at'].replace("+03:00", ''), '%Y-%m-%dT%H:%M:%S') + + return StatusObject(**data) + + +class ApiException(Exception): + status_code: int + body: str + + def __init__(self, status_code: int, body: str): + super().__init__(status_code, body) + self.status_code = status_code + self.body = body + + def __str__(self): + return f'Status code: {self.status_code}\nBody: {self.body}' + + +class Api: + _session: requests.Session + _username: str + _password: str + _access_token: str + _refresh_token: str + _expires_in: int + _refresh_expires_in: int + + def __init__(self, username: str, password: str, *, access_token: str | None = None, + refresh_token: str | None = None, expires_in: int | None = None, + refresh_expires_in: int | None = None, cookies: Any | None = None): + self._session = requests.Session() + self._username = username + self._password = password + self._refresh_token = refresh_token if refresh_token else '' + self._expires_in = expires_in if expires_in else 0 + self._refresh_expires_in = refresh_expires_in if refresh_expires_in else 0 + if cookies: self._session.cookies.update(cookies) + self._access_token = access_token if access_token else '' + if access_token: self._session.headers.update({'Authorization': f'Bearer {access_token}'}) + self._ensure_authorized() + + + def _first_auth(self): + self._session.headers.clear() + self._session.cookies.clear() + code_request = run_until_successful(self._session.get, 'https://id.itmo.ru/auth/realms/itmo/protocol/openid-connect/auth', params={ + 'protocol': 'oauth2', + 'response_type': 'code', + 'client_id': 'student-personal-cabinet', + 'redirect_uri': 'https://my.itmo.ru/login/callback', + 'scope': 'openid profile', + }, timeout=2) + soup = BeautifulSoup(code_request.text, features='html.parser') + form = soup.find('form') + if not isinstance(form, Tag): + raise ApiException(code_request.status_code, code_request.text) + + url = form.get_attribute_list('action')[0] + auth_request = run_until_successful(self._session.post, url, data={'username': self._username, 'password': self._password}) + if auth_request.status_code != 200: + raise ApiException(auth_request.status_code, auth_request.text) + + parsed_url_params = {a.split('=')[0]: a.split('=')[1] for a in urlparse(auth_request.url).query.split('&')} + + self._get_and_save_tokens({ + 'code' : parsed_url_params['code'], + 'client_id': 'student-personal-cabinet', + 'redirect_uri': 'https://my.itmo.ru/login/callback', + 'audience': '', + 'response_type': 'code', + 'grant_type': 'authorization_code', + 'code_verifier': '' + }) + + + def _renew(self): + self._session.headers.clear() + self._session.cookies.clear() + self._get_and_save_tokens({ + 'refresh_token': self._refresh_token, + 'scopes': 'openid profile', + 'client_id': 'student-personal-cabinet', + 'grant_type': 'refresh_token' + }) + + + def _get_and_save_tokens(self, data: Any): + tokens_request = run_until_successful(self._session.post, 'https://id.itmo.ru/auth/realms/itmo/protocol/openid-connect/token', data=data, timeout=2) + if tokens_request.status_code != 200: + raise ApiException(tokens_request.status_code, tokens_request.text) + tokens = tokens_request.json() + self._access_token = tokens['access_token'] + self._expires_in = int(time.time()) + tokens['expires_in'] - 10 + self._refresh_expires_in = int(time.time()) + tokens['refresh_expires_in'] - 10 + self._refresh_token = tokens['refresh_token'] + self._session.headers.update({"Authorization": f"Bearer {tokens_request.json()['access_token']}"}) + + + def _ensure_authorized(self): + current_time = int(time.time()) + + if self._access_token and self._expires_in > current_time: + return + elif self._refresh_token and self._refresh_expires_in > current_time: + self._renew() + else: + self._first_auth() + + + def _make_request(self, method: Literal["GET", "POST"], endpoint: str): + self._ensure_authorized() + r = run_until_successful(self._session.request, method, f'https://my.itmo.ru/api/{endpoint}', timeout=2) + + if r.status_code == 403: + self._first_auth() # do full reauth if 403 after self._ensure_authorized() + r = run_until_successful(self._session.request, method, f'https://my.itmo.ru/api/{endpoint}', timeout=2) + + if r.status_code != 200 or r.json()['error_code'] != 0: + raise ApiException(r.status_code, r.text) + + return r.json() + + + def get_status_list(self): + return [StatusObject.from_dict(obj) for obj in self._make_request('GET', 'requests/my')['result']] + + + def to_dict(self) -> Any: + return { + 'username': self._username, + 'password': self._password, + 'access_token': self._access_token, + 'refresh_token': self._refresh_token, + 'expires_in': self._expires_in, + 'refresh_expires_in': self._refresh_expires_in, + 'cookies': self._session.cookies.get_dict() + } + + + @staticmethod + def from_dict(data: Any): + return Api( + data['username'], + data['password'], + access_token = data['access_token'], + refresh_token = data['refresh_token'], + expires_in = data['expires_in'], + refresh_expires_in = data['refresh_expires_in'], + cookies = data['cookies'], + ) + + +def listen_for_messages(api: Api, timeout=TIMEOUT, filter_func: Callable[[StatusObject], bool] | None = None): + prev_msg = None + while True: + msg = list(filter(filter_func, api.get_status_list())) + + if not msg or msg == prev_msg: + time.sleep(timeout) + continue + + prev_msg = msg + yield msg + time.sleep(timeout) + + +format_status = lambda status: f"{EMOJI_BY_STATUS[status.status]} {status.notice.split('.')[0].strip()}" +format_message = lambda status: f"{EMOJI_BY_STATUS[status.status]} {status.name}\n\n{status.notice}" + + +class IDsFilter: + _ids: list[str] + _update_time: float + + def __init__(self): + self._ids = [] + self._update_dict() + + + def __call__(self, status: StatusObject) -> bool: + if self._update_time + TIMEOUT < time.time(): + self._update_dict() + + return str(status.id) in self._ids + + def _update_dict(self): + self._update_time = time.time() + try: + with open(CONFIG_FILE) as file: + self._ids = file.read().strip().replace(' ', '').split(',') + except Exception: + self._ids = [] + + +class LastUpdateFilter: + _update_time: datetime.datetime + + def __init__(self, ignore_now = False) -> None: + self._update_time = datetime.datetime.fromtimestamp(0) if not ignore_now else datetime.datetime.now() + + def __call__(self, status: StatusObject): + return status.updated_at >= self._update_time + + def update(self): + self._update_time = datetime.datetime.now() + + +def main(): + api = None + + if os.path.isfile(CACHE_FILE): + with open(CACHE_FILE) as file: + api = Api.from_dict(json.load(file)) + + if os.path.isfile(SECRET_FILE): + with open(SECRET_FILE) as secret_file: + data = json.load(secret_file) + + owner_id = data['owner_id'] + bot_token = data['bot_token'] + + if not api: + api = Api(data['username'], data['password']) + else: + print("Missing secret file!", file=sys.stderr) + exit(1) + + def die(*_): + with open(CACHE_FILE, 'w') as file: + json.dump(api.to_dict(), file) + if os.path.isfile(PIPE_FILE): + os.remove(PIPE_FILE) + exit(0) + + signal.signal(signal.SIGTERM, die) + signal.signal(signal.SIGINT, die) + + for message in listen_for_messages(api, filter_func=IDsFilter()): + with open(PIPE_FILE, 'w') as file: + print('\n'.join(map(format_status, message))) + file.write(' '.join(map(format_status, message))) + + update_filter = LastUpdateFilter(ignore_now=True) + for message in listen_for_messages(api, filter_func=update_filter): + formatted_messages = list(map(format_message, message)) + print('\n---\n'.join(formatted_messages)) + for message in formatted_messages: + send_message(owner_id, message, bot_token) + update_filter.update() + + +if __name__ == "__main__": + run_forever(main) + +# vim: ft=python