320 lines
10 KiB
Python
Executable file
320 lines
10 KiB
Python
Executable file
#!/bin/sh
|
|
# shellcheck disable=all
|
|
"exec" "${HOME}/.local/share/venv/statusbar/bin/python3" "-u" "$0" "$@"
|
|
|
|
import datetime
|
|
import json
|
|
import os
|
|
import signal
|
|
import sys
|
|
import time
|
|
import traceback
|
|
|
|
from collections.abc import Callable
|
|
from dataclasses import dataclass
|
|
from typing import Literal, Any, TypeVar
|
|
from urllib.parse import urlparse
|
|
|
|
import requests
|
|
from bs4 import BeautifulSoup, Tag
|
|
|
|
EMOJI_BY_STATUS = {
|
|
0: '🟡',
|
|
1: '🟢',
|
|
2: '🔴',
|
|
}
|
|
TIMEOUT = 30
|
|
CONFIG_FILE = f"{os.environ['HOME']}/.my.itmo"
|
|
CACHE_FILE = f"{os.environ['HOME']}/.cache/my_itmo.cache"
|
|
SECRET_FILE = f"{os.environ['HOME']}/.secrets/my_itmo.secret"
|
|
PIPE_FILE = f"{os.environ['XDG_RUNTIME_DIR']}/my.itmo.pipe"
|
|
|
|
T = TypeVar('T')
|
|
|
|
|
|
def run_forever(fn: Callable, *args, **kwargs):
|
|
while True:
|
|
try:
|
|
fn(*args, **kwargs)
|
|
except Exception:
|
|
print(traceback.format_exc())
|
|
|
|
|
|
def run_until_successful(fn: Callable[..., T], *args, **kwargs) -> T:
|
|
while True:
|
|
try:
|
|
return fn(*args, **kwargs)
|
|
except Exception:
|
|
pass
|
|
else:
|
|
break # no it's not >( # pyright: ignore
|
|
|
|
|
|
def send_message(chat_id: int, text: str, token: str):
|
|
requests.post(f"https://api.telegram.org/bot{token}/sendMessage", data={
|
|
'chat_id': chat_id,
|
|
'parse_mode': 'HTML',
|
|
'text': text
|
|
})
|
|
|
|
|
|
@dataclass
|
|
class StatusObject:
|
|
id: int
|
|
name: str
|
|
notice: str
|
|
status: Literal[0, 1, 2]
|
|
status_name: str
|
|
updated_at: datetime.datetime
|
|
created_at: datetime.datetime
|
|
|
|
@staticmethod
|
|
def from_dict(data: dict[str, Any]):
|
|
data['updated_at'] = datetime.datetime.strptime(data['updated_at'].replace("+03:00", ''), '%Y-%m-%dT%H:%M:%S')
|
|
data['created_at'] = datetime.datetime.strptime(data['created_at'].replace("+03:00", ''), '%Y-%m-%dT%H:%M:%S')
|
|
|
|
return StatusObject(**data)
|
|
|
|
|
|
class ApiException(Exception):
|
|
status_code: int
|
|
body: str
|
|
|
|
def __init__(self, status_code: int, body: str):
|
|
super().__init__(status_code, body)
|
|
self.status_code = status_code
|
|
self.body = body
|
|
|
|
def __str__(self):
|
|
return f'Status code: {self.status_code}\nBody: {self.body}'
|
|
|
|
|
|
class Api:
|
|
_session: requests.Session
|
|
_username: str
|
|
_password: str
|
|
_access_token: str
|
|
_refresh_token: str
|
|
_expires_in: int
|
|
_refresh_expires_in: int
|
|
|
|
def __init__(self, username: str, password: str, *, access_token: str | None = None,
|
|
refresh_token: str | None = None, expires_in: int | None = None,
|
|
refresh_expires_in: int | None = None, cookies: Any | None = None):
|
|
self._session = requests.Session()
|
|
self._username = username
|
|
self._password = password
|
|
self._refresh_token = refresh_token if refresh_token else ''
|
|
self._expires_in = expires_in if expires_in else 0
|
|
self._refresh_expires_in = refresh_expires_in if refresh_expires_in else 0
|
|
if cookies: self._session.cookies.update(cookies)
|
|
self._access_token = access_token if access_token else ''
|
|
if access_token: self._session.headers.update({'Authorization': f'Bearer {access_token}'})
|
|
self._ensure_authorized()
|
|
|
|
|
|
def _first_auth(self):
|
|
self._session.headers.clear()
|
|
self._session.cookies.clear()
|
|
code_request = run_until_successful(self._session.get, 'https://id.itmo.ru/auth/realms/itmo/protocol/openid-connect/auth', params={
|
|
'protocol': 'oauth2',
|
|
'response_type': 'code',
|
|
'client_id': 'student-personal-cabinet',
|
|
'redirect_uri': 'https://my.itmo.ru/login/callback',
|
|
'scope': 'openid profile',
|
|
}, timeout=2)
|
|
soup = BeautifulSoup(code_request.text, features='html.parser')
|
|
form = soup.find('form')
|
|
if not isinstance(form, Tag):
|
|
raise ApiException(code_request.status_code, code_request.text)
|
|
|
|
url = form.get_attribute_list('action')[0]
|
|
auth_request = run_until_successful(self._session.post, url, data={'username': self._username, 'password': self._password})
|
|
if auth_request.status_code != 200:
|
|
raise ApiException(auth_request.status_code, auth_request.text)
|
|
|
|
parsed_url_params = {a.split('=')[0]: a.split('=')[1] for a in urlparse(auth_request.url).query.split('&')}
|
|
|
|
self._get_and_save_tokens({
|
|
'code' : parsed_url_params['code'],
|
|
'client_id': 'student-personal-cabinet',
|
|
'redirect_uri': 'https://my.itmo.ru/login/callback',
|
|
'audience': '',
|
|
'response_type': 'code',
|
|
'grant_type': 'authorization_code',
|
|
'code_verifier': ''
|
|
})
|
|
|
|
|
|
def _renew(self):
|
|
self._session.headers.clear()
|
|
self._session.cookies.clear()
|
|
self._get_and_save_tokens({
|
|
'refresh_token': self._refresh_token,
|
|
'scopes': 'openid profile',
|
|
'client_id': 'student-personal-cabinet',
|
|
'grant_type': 'refresh_token'
|
|
})
|
|
|
|
|
|
def _get_and_save_tokens(self, data: Any):
|
|
tokens_request = run_until_successful(self._session.post, 'https://id.itmo.ru/auth/realms/itmo/protocol/openid-connect/token', data=data, timeout=2)
|
|
if tokens_request.status_code != 200:
|
|
raise ApiException(tokens_request.status_code, tokens_request.text)
|
|
tokens = tokens_request.json()
|
|
self._access_token = tokens['access_token']
|
|
self._expires_in = int(time.time()) + tokens['expires_in'] - 10
|
|
self._refresh_expires_in = int(time.time()) + tokens['refresh_expires_in'] - 10
|
|
self._refresh_token = tokens['refresh_token']
|
|
self._session.headers.update({"Authorization": f"Bearer {tokens_request.json()['access_token']}"})
|
|
|
|
|
|
def _ensure_authorized(self):
|
|
current_time = int(time.time())
|
|
|
|
if self._access_token and self._expires_in > current_time:
|
|
return
|
|
elif self._refresh_token and self._refresh_expires_in > current_time:
|
|
self._renew()
|
|
else:
|
|
self._first_auth()
|
|
|
|
|
|
def get_status_list(self):
|
|
self._ensure_authorized()
|
|
r = run_until_successful(self._session.get, 'https://my.itmo.ru/api/requests/my', timeout=2)
|
|
|
|
if r.status_code != 200 or r.json()['error_code'] != 0:
|
|
raise ApiException(r.status_code, r.text)
|
|
|
|
return [StatusObject.from_dict(obj) for obj in r.json()['result']]
|
|
|
|
|
|
def to_dict(self) -> Any:
|
|
return {
|
|
'username': self._username,
|
|
'password': self._password,
|
|
'access_token': self._access_token,
|
|
'refresh_token': self._refresh_token,
|
|
'expires_in': self._expires_in,
|
|
'refresh_expires_in': self._refresh_expires_in,
|
|
'cookies': self._session.cookies.get_dict()
|
|
}
|
|
|
|
|
|
@staticmethod
|
|
def from_dict(data: Any):
|
|
return Api(
|
|
data['username'],
|
|
data['password'],
|
|
access_token = data['access_token'],
|
|
refresh_token = data['refresh_token'],
|
|
expires_in = data['expires_in'],
|
|
refresh_expires_in = data['refresh_expires_in'],
|
|
cookies = data['cookies'],
|
|
)
|
|
|
|
|
|
def listen_for_messages(api: Api, timeout=TIMEOUT, filter_func: Callable[[StatusObject], bool] | None = None):
|
|
prev_msg = None
|
|
while True:
|
|
msg = list(filter(filter_func, api.get_status_list()))
|
|
|
|
if not msg or msg == prev_msg:
|
|
time.sleep(timeout)
|
|
continue
|
|
|
|
prev_msg = msg
|
|
yield msg
|
|
time.sleep(timeout)
|
|
|
|
|
|
format_status = lambda status: f"{EMOJI_BY_STATUS[status.status]} {status.notice.split('.')[0].strip()}"
|
|
format_message = lambda status: f"{EMOJI_BY_STATUS[status.status]} <b>{status.name}</b>\n\n{status.notice}"
|
|
|
|
|
|
class IDsFilter:
|
|
_ids: list[str]
|
|
_update_time: float
|
|
|
|
def __init__(self):
|
|
self._ids = []
|
|
self._update_dict()
|
|
|
|
|
|
def __call__(self, status: StatusObject) -> bool:
|
|
if self._update_time + TIMEOUT < time.time():
|
|
self._update_dict()
|
|
|
|
return str(status.id) in self._ids
|
|
|
|
def _update_dict(self):
|
|
self._update_time = time.time()
|
|
try:
|
|
with open(CONFIG_FILE) as file:
|
|
self._ids = file.read().strip().replace(' ', '').split(',')
|
|
except Exception:
|
|
self._ids = []
|
|
|
|
|
|
class LastUpdateFilter:
|
|
_update_time: datetime.datetime
|
|
|
|
def __init__(self, ignore_now = False) -> None:
|
|
self._update_time = datetime.datetime.fromtimestamp(0) if not ignore_now else datetime.datetime.now()
|
|
|
|
def __call__(self, status: StatusObject):
|
|
return status.updated_at >= self._update_time
|
|
|
|
def update(self):
|
|
self._update_time = datetime.datetime.now()
|
|
|
|
|
|
def main():
|
|
api = None
|
|
|
|
if os.path.isfile(CACHE_FILE):
|
|
with open(CACHE_FILE) as file:
|
|
api = Api.from_dict(json.load(file))
|
|
|
|
if os.path.isfile(SECRET_FILE):
|
|
with open(SECRET_FILE) as secret_file:
|
|
data = json.load(secret_file)
|
|
|
|
owner_id = data['owner_id']
|
|
bot_token = data['bot_token']
|
|
|
|
if not api:
|
|
api = Api(data['username'], data['password'])
|
|
else:
|
|
print("Missing secret file!", file=sys.stderr)
|
|
exit(1)
|
|
|
|
def die(*_):
|
|
with open(CACHE_FILE, 'w') as file:
|
|
json.dump(api.to_dict(), file)
|
|
if os.path.isfile(PIPE_FILE):
|
|
os.remove(PIPE_FILE)
|
|
exit(0)
|
|
|
|
signal.signal(signal.SIGTERM, die)
|
|
signal.signal(signal.SIGINT, die)
|
|
|
|
for message in listen_for_messages(api, filter_func=IDsFilter()):
|
|
with open(PIPE_FILE, 'w') as file:
|
|
print('\n'.join(map(format_status, message)))
|
|
file.write(' '.join(map(format_status, message)))
|
|
|
|
# update_filter = LastUpdateFilter(ignore_now=True)
|
|
# for message in listen_for_messages(api, filter_func=update_filter):
|
|
# formatted_messages = list(map(format_message, message))
|
|
# print('\n---\n'.join(formatted_messages))
|
|
# for message in formatted_messages:
|
|
# send_message(owner_id, message, bot_token)
|
|
# update_filter.update()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
run_forever(main)
|
|
|
|
# vim: ft=python
|