init: MVP

This commit is contained in:
Arthur K. 2025-01-31 17:05:07 +03:00
commit 46e870be42
Signed by: wzray
GPG key ID: B97F30FDC4636357
17 changed files with 1202 additions and 0 deletions

View file

149
codeforces/client/client.py Normal file
View file

@ -0,0 +1,149 @@
import asyncio
import re
import sys
from typing import Union
import aiohttp
from bs4 import BeautifulSoup
from .helpers import SB, random_string
from .types import Language
class CodeforcesClient:
def __init__(self, cookies: dict[str, str], csrf_token: str, ftaa: str, user_agent: str):
self.cookies = cookies
self.user_agent = user_agent
self._csrf_token = csrf_token
self._ftaa = ftaa
async def __aenter__(self):
self._session = aiohttp.ClientSession()
self._session.cookie_jar.update_cookies(self.cookies)
self._session.headers['User-Agent'] = self.user_agent
return self
async def __aexit__(self, *_):
await self.close()
@staticmethod
def auth(username: str, password: str): # TODO: error checking
with SB() as sb:
sb.uc_open_with_reconnect("https://codeforces.com/enter", 2)
sb.uc_gui_click_captcha()
sb.click('a[href="/enter?back=%2F"]')
sb.type("#handleOrEmail", username)
sb.type("#password", password)
sb.click("#remember")
sb.click('input[type="submit"]')
sb.wait_for_element("#jGrowl")
cookies = sb.get_cookies()
user_agent = sb.get_user_agent()
source = sb.get_page_source()
match = re.findall(r'<meta name="X-Csrf-Token" content="([a-f0-9]+)">', source)
assert match
csrf = match[0]
match = re.findall(r'window._ftaa = "(\w+)";', source)
assert match
ftaa = match[0]
cookies = {cookie['name']: cookie['value'] for cookie in cookies}
return CodeforcesClient(cookies, csrf, ftaa, user_agent)
@staticmethod
def find_error(body: str) -> Union[str, None]:
r = re.compile(r'error[a-zA-Z_\-\ ]*">(.*?)</span>')
m = re.findall(r, body)
return m[0] if m else None
async def submit(self, code: str, contest: int, task: str, lang: Language): # TODO: error checking
r = await self._session.post(f"https://codeforces.com/contest/{contest}/submit?csrf_token={self._csrf_token}", data = {
"csrf_token": self._csrf_token,
"ftaa": self._ftaa,
"bfaa": random_string(32, '[a-f0-9]'),
"action": "submitSolutionFormSubmitted",
"submittedProblemIndex": task, # TODO: idx saving
"programTypeId": lang.value,
"source": code,
"tabSize": "4",
"sourceFile": "",
"_tta": "239",
})
err = self.find_error(await r.text())
if err:
print(err, file=sys.stderr)
async def parse_tests(self, contest: int, tasks: list[str]):
futures = [self._session.get(f"https://codeforces.com/contest/{contest}/problem/{task}") for task in tasks]
requests = await asyncio.gather(*futures)
for idx, r in enumerate(requests):
soup = BeautifulSoup(await r.text(), 'html.parser')
test = soup.select_one('div.sample-test')
assert test
# funny line haha
inputs = ['\n'.join([el.text for el in input.select('div.test-example-line')] or [input.text]).strip() for input in test.select('div.input > pre')]
outputs = ['\n'.join([el.text for el in output.select('div.test-example-line')] or [output.text]).strip() for output in test.select('div.output > pre')]
yield {tasks[idx].lower(): [*zip(inputs, outputs)]}
async def parse_tasks(self, contest: int):
r = await self._session.get(f"https://codeforces.com/contest/{contest}")
soup = BeautifulSoup(await r.text(), 'html.parser')
table = soup.select_one('table.problems')
assert table
elements = table.select('tr')[1:]
out = []
for item in elements:
status = None
if 'class' in item.attrs:
status = item.attrs['class'][0]
inner_items = item.select('td')
assert inner_items
name_and_constraints = inner_items[1].select('div > div')
id = inner_items[0].text.strip()
out.append({
'id': id,
'name': name_and_constraints[0].text.strip(),
'status': status,
'constraints': [*name_and_constraints[1]][2].text.strip(),
})
return out
def to_dict(self):
return {
'ftaa': self._ftaa,
'cookies': self.cookies,
'user_agent': self.user_agent,
'csrf_token': self._csrf_token
}
async def close(self):
await self._session.close()

View file

@ -0,0 +1,17 @@
import random
from contextlib import _GeneratorContextManager
from seleniumbase import BaseCase, SB as _SB
def random_string(n: int, format = '[a-zA-Z0-9]') -> str:
format = format[1:-1]
chars = ''
for i in range(0, len(format), 3):
for j in range(ord(format[i]), ord(format[i+2]) + 1):
chars += chr(j)
return ''.join(random.choice(chars) for _ in range(n))
# make seleniumbase somewhat typed
def SB() -> _GeneratorContextManager[BaseCase]:
return _SB(uc = True)

View file

@ -0,0 +1,47 @@
from enum import Enum, auto
import time
import threading
class Spinner:
class State(Enum):
LOADING = auto()
SUCCESS = ''
FAILURE = ''
def __init__(self, text: str):
self.text = text
self.state = self.State.LOADING
def __enter__(self):
t = threading.Thread(target = self._thread_fn)
t.start()
self.thread = t
return self
def __exit__(self, *_):
if self.state == self.State.LOADING:
self.state = self.State.SUCCESS
if self.thread:
self.thread.join()
def fail(self):
if self.thread:
self.state = self.State.FAILURE
self.thread.join()
self.thread = None
def done(self):
if self.thread:
self.state = self.State.SUCCESS
self.thread.join()
self.thread = None
def _thread_fn(self):
interval = 80
spinner = ["", "", "", "", "", "", "", "", "", ""]
i = 0
while self.state == self.State.LOADING:
i = (i + 1) % len(spinner)
print(f"\r{spinner[i]} {self.text}", end=' ')
time.sleep(interval / 1000)
print(f"\r{self.state.value} {self.text}")

View file

@ -0,0 +1,12 @@
from enum import Enum
class Language(Enum):
CPP = 89
PYTHON = 31
UNKNOWN = 0
@staticmethod
def from_ext(s: str):
if s == 'py':
return Language.PYTHON
return Language.CPP