dotfiles/.local/bin/scripts/my.itmo
Arthur Khachaturov 628baf3eea
minor updates
2024-09-19 04:25:07 +03:00

320 lines
10 KiB
Python
Executable file

#!/bin/sh
# shellcheck disable=all
"exec" "${HOME}/.local/share/venv/statusbar/bin/python3" "-u" "$0" "$@"
import datetime
import json
import os
import signal
import sys
import time
import traceback
from collections.abc import Callable
from dataclasses import dataclass
from typing import Literal, Any, TypeVar
from urllib.parse import urlparse
import requests
from bs4 import BeautifulSoup, Tag
EMOJI_BY_STATUS = {
0: '🟡',
1: '🟢',
2: '🔴',
}
TIMEOUT = 30
CONFIG_FILE = f"{os.environ['HOME']}/.my.itmo"
CACHE_FILE = f"{os.environ['HOME']}/.cache/my_itmo.cache"
SECRET_FILE = f"{os.environ['HOME']}/.secrets/my_itmo.secret"
PIPE_FILE = f"{os.environ['XDG_RUNTIME_DIR']}/my.itmo.pipe"
T = TypeVar('T')
def run_forever(fn: Callable, *args, **kwargs):
while True:
try:
fn(*args, **kwargs)
except Exception:
print(traceback.format_exc())
def run_until_successful(fn: Callable[..., T], *args, **kwargs) -> T:
while True:
try:
return fn(*args, **kwargs)
except Exception:
pass
else:
break # no it's not >( # pyright: ignore
def send_message(chat_id: int, text: str, token: str):
requests.post(f"https://api.telegram.org/bot{token}/sendMessage", data={
'chat_id': chat_id,
'parse_mode': 'HTML',
'text': text
})
@dataclass
class StatusObject:
id: int
name: str
notice: str
status: Literal[0, 1, 2]
status_name: str
updated_at: datetime.datetime
created_at: datetime.datetime
@staticmethod
def from_dict(data: dict[str, Any]):
data['updated_at'] = datetime.datetime.strptime(data['updated_at'].replace("+03:00", ''), '%Y-%m-%dT%H:%M:%S')
data['created_at'] = datetime.datetime.strptime(data['created_at'].replace("+03:00", ''), '%Y-%m-%dT%H:%M:%S')
return StatusObject(**data)
class ApiException(Exception):
status_code: int
body: str
def __init__(self, status_code: int, body: str):
super().__init__(status_code, body)
self.status_code = status_code
self.body = body
def __str__(self):
return f'Status code: {self.status_code}\nBody: {self.body}'
class Api:
_session: requests.Session
_username: str
_password: str
_access_token: str
_refresh_token: str
_expires_in: int
_refresh_expires_in: int
def __init__(self, username: str, password: str, *, access_token: str | None = None,
refresh_token: str | None = None, expires_in: int | None = None,
refresh_expires_in: int | None = None, cookies: Any | None = None):
self._session = requests.Session()
self._username = username
self._password = password
self._refresh_token = refresh_token if refresh_token else ''
self._expires_in = expires_in if expires_in else 0
self._refresh_expires_in = refresh_expires_in if refresh_expires_in else 0
if cookies: self._session.cookies.update(cookies)
self._access_token = access_token if access_token else ''
if access_token: self._session.headers.update({'Authorization': f'Bearer {access_token}'})
self._ensure_authorized()
def _first_auth(self):
self._session.headers.clear()
self._session.cookies.clear()
code_request = run_until_successful(self._session.get, 'https://id.itmo.ru/auth/realms/itmo/protocol/openid-connect/auth', params={
'protocol': 'oauth2',
'response_type': 'code',
'client_id': 'student-personal-cabinet',
'redirect_uri': 'https://my.itmo.ru/login/callback',
'scope': 'openid profile',
}, timeout=2)
soup = BeautifulSoup(code_request.text, features='html.parser')
form = soup.find('form')
if not isinstance(form, Tag):
raise ApiException(code_request.status_code, code_request.text)
url = form.get_attribute_list('action')[0]
auth_request = run_until_successful(self._session.post, url, data={'username': self._username, 'password': self._password})
if auth_request.status_code != 200:
raise ApiException(auth_request.status_code, auth_request.text)
parsed_url_params = {a.split('=')[0]: a.split('=')[1] for a in urlparse(auth_request.url).query.split('&')}
self._get_and_save_tokens({
'code' : parsed_url_params['code'],
'client_id': 'student-personal-cabinet',
'redirect_uri': 'https://my.itmo.ru/login/callback',
'audience': '',
'response_type': 'code',
'grant_type': 'authorization_code',
'code_verifier': ''
})
def _renew(self):
self._session.headers.clear()
self._session.cookies.clear()
self._get_and_save_tokens({
'refresh_token': self._refresh_token,
'scopes': 'openid profile',
'client_id': 'student-personal-cabinet',
'grant_type': 'refresh_token'
})
def _get_and_save_tokens(self, data: Any):
tokens_request = run_until_successful(self._session.post, 'https://id.itmo.ru/auth/realms/itmo/protocol/openid-connect/token', data=data, timeout=2)
if tokens_request.status_code != 200:
raise ApiException(tokens_request.status_code, tokens_request.text)
tokens = tokens_request.json()
self._access_token = tokens['access_token']
self._expires_in = int(time.time()) + tokens['expires_in'] - 10
self._refresh_expires_in = int(time.time()) + tokens['refresh_expires_in'] - 10
self._refresh_token = tokens['refresh_token']
self._session.headers.update({"Authorization": f"Bearer {tokens_request.json()['access_token']}"})
def _ensure_authorized(self):
current_time = int(time.time())
if self._access_token and self._expires_in > current_time:
return
elif self._refresh_token and self._refresh_expires_in > current_time:
self._renew()
else:
self._first_auth()
def get_status_list(self):
self._ensure_authorized()
r = run_until_successful(self._session.get, 'https://my.itmo.ru/api/requests/my', timeout=2)
if r.status_code != 200 or r.json()['error_code'] != 0:
raise ApiException(r.status_code, r.text)
return [StatusObject.from_dict(obj) for obj in r.json()['result']]
def to_dict(self) -> Any:
return {
'username': self._username,
'password': self._password,
'access_token': self._access_token,
'refresh_token': self._refresh_token,
'expires_in': self._expires_in,
'refresh_expires_in': self._refresh_expires_in,
'cookies': self._session.cookies.get_dict()
}
@staticmethod
def from_dict(data: Any):
return Api(
data['username'],
data['password'],
access_token = data['access_token'],
refresh_token = data['refresh_token'],
expires_in = data['expires_in'],
refresh_expires_in = data['refresh_expires_in'],
cookies = data['cookies'],
)
def listen_for_messages(api: Api, timeout=TIMEOUT, filter_func: Callable[[StatusObject], bool] | None = None):
prev_msg = None
while True:
msg = list(filter(filter_func, api.get_status_list()))
if not msg or msg == prev_msg:
time.sleep(timeout)
continue
prev_msg = msg
yield msg
time.sleep(timeout)
format_status = lambda status: f"{EMOJI_BY_STATUS[status.status]} {status.notice.split('.')[0].strip()}"
format_message = lambda status: f"{EMOJI_BY_STATUS[status.status]} <b>{status.name}</b>\n\n{status.notice}"
class IDsFilter:
_ids: list[str]
_update_time: float
def __init__(self):
self._ids = []
self._update_dict()
def __call__(self, status: StatusObject) -> bool:
if self._update_time + TIMEOUT < time.time():
self._update_dict()
return str(status.id) in self._ids
def _update_dict(self):
self._update_time = time.time()
try:
with open(CONFIG_FILE) as file:
self._ids = file.read().strip().replace(' ', '').split(',')
except Exception:
self._ids = []
class LastUpdateFilter:
_update_time: datetime.datetime
def __init__(self, ignore_now = False) -> None:
self._update_time = datetime.datetime.fromtimestamp(0) if not ignore_now else datetime.datetime.now()
def __call__(self, status: StatusObject):
return status.updated_at >= self._update_time
def update(self):
self._update_time = datetime.datetime.now()
def main():
api = None
if os.path.isfile(CACHE_FILE):
with open(CACHE_FILE) as file:
api = Api.from_dict(json.load(file))
if os.path.isfile(SECRET_FILE):
with open(SECRET_FILE) as secret_file:
data = json.load(secret_file)
owner_id = data['owner_id']
bot_token = data['bot_token']
if not api:
api = Api(data['username'], data['password'])
else:
print("Missing secret file!", file=sys.stderr)
exit(1)
def die(*_):
with open(CACHE_FILE, 'w') as file:
json.dump(api.to_dict(), file)
if os.path.isfile(PIPE_FILE):
os.remove(PIPE_FILE)
exit(0)
signal.signal(signal.SIGTERM, die)
signal.signal(signal.SIGINT, die)
for message in listen_for_messages(api, filter_func=IDsFilter()):
with open(PIPE_FILE, 'w') as file:
print('\n'.join(map(format_status, message)))
file.write(' '.join(map(format_status, message)))
# update_filter = LastUpdateFilter(ignore_now=True)
# for message in listen_for_messages(api, filter_func=update_filter):
# formatted_messages = list(map(format_message, message))
# print('\n---\n'.join(formatted_messages))
# for message in formatted_messages:
# send_message(owner_id, message, bot_token)
# update_filter.update()
if __name__ == "__main__":
run_forever(main)
# vim: ft=python