From 99bb9607e58c4ece1c37a54fa765481ed7ff816c Mon Sep 17 00:00:00 2001 From: "Arthur K." Date: Wed, 14 May 2025 15:27:39 +0300 Subject: [PATCH] refactor: simplify deadline filtering and message generation logic --- main.py | 127 +++++++++++++++++++++----------------------------------- 1 file changed, 48 insertions(+), 79 deletions(-) diff --git a/main.py b/main.py index 54b798d..f625cf7 100644 --- a/main.py +++ b/main.py @@ -7,6 +7,7 @@ from aiogram import Bot import datetime as dt import locale import urllib.parse +import re # Modify the links and data below: DEADLINES_URL = "https://m3104.nawinds.dev/DEADLINES.json" @@ -15,8 +16,8 @@ BOT_NAME = "Дединсайдер M3104" BOT_USERNAME = "m3104_deadliner_bot" # Environment variables that should be available: -TOKEN = os.getenv("TOKEN") -MAIN_GROUP_ID = int(os.getenv("MAIN_GROUP_ID")) +TOKEN: str = os.getenv("TOKEN") # type: ignore +MAIN_GROUP_ID = int(os.getenv("MAIN_GROUP_ID")) # type: ignore logging.basicConfig(level=logging.INFO) @@ -24,7 +25,7 @@ bot = Bot(TOKEN) NUMBER_EMOJIS = ['0.', '1️⃣', '2️⃣', '3️⃣', '4️⃣', '5️⃣', '6️⃣', '7️⃣', '8️⃣', '9️⃣', '🔟'] -async def get_current_time() -> str: +def get_current_time() -> str: current_time = dt.datetime.now() current_time_hour = current_time.hour if current_time.hour >= 10 else "0" + str(current_time.hour) current_time_minute = current_time.minute if current_time.minute >= 10 else "0" + str(current_time.minute) @@ -35,7 +36,7 @@ def get_dt_obj_from_string(time: str) -> dt.datetime: locale.setlocale(locale.LC_TIME, 'en_US.UTF-8') return dt.datetime.strptime(time, "%d %b %Y %H:%M:%S %z") -async def generate_link(event_name: str, event_time: str) -> str: +def generate_link(event_name: str, event_time: str) -> str: dt_obj = get_dt_obj_from_string(event_time) formatted_time = dt_obj.strftime("%Y%m%d T%H%M%S%z") description = f"Дедлайн добавлен ботом {BOT_NAME} (https://t.me/{BOT_USERNAME})" @@ -45,7 +46,7 @@ async def generate_link(event_name: str, event_time: str) -> str: f"color=6" return link -async def get_human_timedelta(time: str) -> str: +def get_human_timedelta(time: str) -> str: dt_obj = get_dt_obj_from_string(time) dt_now = dt.datetime.now(dt_obj.tzinfo) # Ensure timezones are consistent delta = dt_obj - dt_now @@ -64,7 +65,7 @@ async def get_human_timedelta(time: str) -> str: else: return f"{hours}ч {minutes}м" -async def get_human_time(time: str) -> str: +def get_human_time(time: str) -> str: dt_obj = get_dt_obj_from_string(time) locale.setlocale(locale.LC_TIME, 'ru_RU.UTF-8') formatted_date = dt_obj.strftime("%a, %d %B в %H:%M") @@ -78,20 +79,15 @@ def timestamp_func(a: dict) -> float: def relevant_filter_func(d: dict) -> bool: dt_obj = get_dt_obj_from_string(d["time"]) - if dt_obj < dt.datetime.now(dt_obj.tzinfo): - return False - return True + return not dt_obj < dt.datetime.now(dt_obj.tzinfo) -def deadline_type_filter_func(d: dict, dtype: str) -> bool: - if f"[{dtype.lower()}]" in d["name"].lower(): - return True - return False +def deadline_type_filter_func(d: dict, dtype: str = '') -> bool: + if not dtype: + return not re.match(r'^\[.*\]', d['name']) -def deadlines_filter_func(d: dict) -> bool: - return (not deadline_type_filter_func(d, "тест") and - not deadline_type_filter_func(d, "лекция")) + return f"[{dtype.lower()}]" in d["name"].lower() -async def get_message_text() -> str: +def get_message_text() -> str: try: response = requests.get(DEADLINES_URL).json() except Exception as e: @@ -99,89 +95,62 @@ async def get_message_text() -> str: return "" all_deadlines = response["deadlines"] - deadlines = list(filter(lambda d: deadlines_filter_func(d) and relevant_filter_func(d), all_deadlines)) - tests = list(filter(lambda t: deadline_type_filter_func(t, "тест") and relevant_filter_func(t), all_deadlines)) - lectures = list(filter(lambda t: deadline_type_filter_func(t, "лекция") and relevant_filter_func(t), all_deadlines)) + types = [ + ('', ''), # deadlines + ('🧑‍💻 Тесты', 'тест'), + ('🎓 Лекции', 'лекция'), + ('🛡 Защиты', 'защита') + ] - text = f"🔥️️ Дедлайны (Обновлено в {await get_current_time()} 🔄):\n\n" + assignments = [(sorted(filter(lambda t: deadline_type_filter_func(t, x[1]) and relevant_filter_func(t), all_deadlines), + key=lambda z: timestamp_func(z)), x[0], x[1]) for x in types] - deadlines = sorted(deadlines, key=lambda x: timestamp_func(x)) - tests = sorted(tests, key=lambda x: timestamp_func(x)) - lectures = sorted(lectures, key=lambda x: timestamp_func(x)) + text = f"🔥️️ Дедлайны (Обновлено в {get_current_time()} 🔄):\n\n" - if len(deadlines) == 0: + if len(assignments[0]) == 0: text += "Дедлайнов нет)\n\n" - for i in range(len(deadlines)): - no = i + 1 - if no < 11: - no = NUMBER_EMOJIS[no] + " " - else: - no += ". " - text += str(no) + "" + def add_items(items: list, category_name: str = '', replace_name: str = ''): + if len(items) == 0: + return - if deadlines[i].get("url"): - text += f"{deadlines[i]['name']}" - else: - text += deadlines[i]["name"] + nonlocal text + REPLACE_PATTERN = re.compile(rf'^\[{replace_name}\]', flags=re.IGNORECASE) - text += " — " - text += await get_human_timedelta(deadlines[i]["time"]) - text += f"\n(" - text += await get_human_time(deadlines[i]["time"]) + ")\n\n" + if category_name: + text += f"\n{category_name}:\n\n" - if len(tests) > 0: - text += f"\n🧑‍💻 Тесты:\n\n" - - for i in range(len(tests)): - test_name = tests[i]["name"].replace("[Тест] ", "").replace("[тест]", "") - test_url = tests[i].get("url") + for i, item in enumerate(items): no = i + 1 - if no < 11: + if no <= 10: no = NUMBER_EMOJIS[no] + " " else: - no += ". " - text += str(no) + "" + no = str(no) + ". " - if test_url: - text += f"{test_name}" + text += no + "" + + name = re.sub(REPLACE_PATTERN, '', item['name']) + url = item.get('url') + + if url: + text += f"{name}" else: - text += test_name + text += name text += " — " - text += await get_human_timedelta(tests[i]["time"]) - text += f"\n(" - text += await get_human_time(tests[i]["time"]) + ")\n\n" + text += get_human_timedelta(item["time"]) + text += f"\n(" + text += get_human_time(item["time"]) + ")\n\n" - if len(lectures) > 0: - text += f"\n👨‍🏫 Лекции:\n\n" - - for i in range(len(lectures)): - lecture_name = lectures[i]["name"].replace("[Лекция] ", "").replace("[лекция]", "") - lecture_url = lectures[i].get("url") - no = i + 1 - if no < 11: - no = NUMBER_EMOJIS[no] + " " - else: - no += ". " - text += str(no) + "" - - if lecture_url: - text += f"{lecture_name}" - else: - text += lecture_name - - text += " — " - text += await get_human_timedelta(lectures[i]["time"]) - text += f"\n(" - text += await get_human_time(lectures[i]["time"]) + ")\n\n" + for assignment_type in assignments: + add_items(*assignment_type) text += f"\n🆕 " \ f"Добавить дедлайн" return text async def send_deadlines(chat_id: int) -> None: - text = await get_message_text() + text = get_message_text() if text == "Дедлайнов нет)\n\n": return @@ -192,7 +161,7 @@ async def send_deadlines(chat_id: int) -> None: while dt.datetime.now() - started_updating < dt.timedelta(days=1): await asyncio.sleep(60) try: - new_text = await get_message_text() + new_text = get_message_text() if text != new_text and new_text != "": await msg.edit_text(new_text, parse_mode="HTML", disable_web_page_preview=True) text = new_text