178 lines
5.4 KiB
Python
178 lines
5.4 KiB
Python
import argparse
|
|
import asyncio
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
|
|
from tabulate import tabulate
|
|
|
|
from codeforces.client.client import CodeforcesClient
|
|
from codeforces.client.spinner import Spinner
|
|
from codeforces.client.types import Language
|
|
from codeforces.ui.config import config_menu
|
|
from codeforces.ui.utils import CONFIG_FILE, SESSION_FILE
|
|
|
|
|
|
async def parse(client: CodeforcesClient, contest_id: int):
|
|
with Spinner('Fetching task list...'):
|
|
tasks = [task['id'] for task in await client.parse_tasks(contest_id)]
|
|
|
|
tests = {}
|
|
text = lambda: f"Pulling tests ({len(tests)}/{len(tasks)})"
|
|
with Spinner(text()) as sp:
|
|
async for test in client.parse_tests(contest_id, tasks):
|
|
tests.update(test)
|
|
sp.text = text()
|
|
|
|
with open("./.cf.json", 'w') as f:
|
|
json.dump({'contest': contest_id, 'tests': tests}, f, separators=(',', ':'))
|
|
|
|
|
|
async def submit(client: CodeforcesClient, filename: str):
|
|
with open('./.cf.json') as f:
|
|
data = json.load(f)
|
|
|
|
with open(filename) as f:
|
|
fn = filename.split('.')
|
|
await client.submit(f.read(), data['contest'], fn[0], Language.from_ext(fn[-1]))
|
|
|
|
|
|
async def list_tasks(client: CodeforcesClient):
|
|
with open("./.cf.json") as f:
|
|
contest_id = json.load(f)['contest']
|
|
|
|
tasks = await client.parse_tasks(contest_id)
|
|
headers = [*tasks[0].keys()]
|
|
t2 = [[*t.values()] for t in tasks]
|
|
|
|
print(tabulate(t2, headers))
|
|
|
|
|
|
def generate(task: str):
|
|
filename = task + '.cpp'
|
|
|
|
with open(CONFIG_FILE) as f:
|
|
config = json.load(f)
|
|
|
|
template = '' if 'template' not in config else config['template']
|
|
|
|
if os.path.isfile(filename):
|
|
print('File already exists!', file=sys.stderr)
|
|
return
|
|
|
|
with open(task + '.cpp', 'w') as f:
|
|
f.write(template)
|
|
|
|
|
|
async def test(filename: str):
|
|
with open('./.cf.json') as f:
|
|
data = json.load(f)
|
|
|
|
task_name = filename.split('.')[0].lower()
|
|
tests = data['tests'][task_name]
|
|
|
|
with Spinner("Compiling...") as sp:
|
|
ps = subprocess.Popen(f'g++ -Wall -Wextra -Wconversion -static -fno-stack-limit -O2 -std=c++20 -fdiagnostics-color=always -o {task_name} ./{filename}'.split(),
|
|
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
|
|
|
|
ret = ps.wait()
|
|
if ret != 0:
|
|
sp.fail()
|
|
assert ps.stdout
|
|
print(ps.stdout.read().decode("utf-8"), file=sys.stderr)
|
|
exit(ret)
|
|
|
|
try:
|
|
for test in tests:
|
|
test[0] += '\n'
|
|
ps = subprocess.Popen(f"./{task_name}", stdin=subprocess.PIPE, stdout=subprocess.PIPE)
|
|
assert ps.stdin
|
|
ps.stdin.write(test[0].encode('utf-8'))
|
|
ps.stdin.flush()
|
|
ps.wait()
|
|
assert ps.stdout
|
|
output = ps.stdout.read()
|
|
is_ok = output.decode('utf-8').strip() == test[1]
|
|
print("Success" if is_ok else output)
|
|
finally:
|
|
os.remove(task_name)
|
|
|
|
|
|
async def eval_args(client: CodeforcesClient, args: argparse.Namespace):
|
|
try:
|
|
if args.action == 'parse':
|
|
await parse(client, args.contest_id)
|
|
elif args.action == 'submit':
|
|
await submit(client, args.filename)
|
|
elif args.action == 'test':
|
|
await test(args.filename)
|
|
elif args.action == 'list':
|
|
await list_tasks(client)
|
|
|
|
except Exception:
|
|
raise # TODO: exceptions for 403
|
|
await eval_args(client, args)
|
|
|
|
|
|
async def main():
|
|
CONFIG_FILE.parent.mkdir(exist_ok=True, parents=True)
|
|
if not os.path.isfile(CONFIG_FILE):
|
|
with open(CONFIG_FILE, 'w') as f:
|
|
f.write("{}")
|
|
|
|
SESSION_FILE.parent.mkdir(exist_ok=True, parents=True)
|
|
|
|
parser = argparse.ArgumentParser('cf')
|
|
subparsers = parser.add_subparsers(required = True, dest = 'action')
|
|
|
|
subparsers.add_parser('list', help = "List problems from a contest.")
|
|
subparsers.add_parser('config', help = "Configure the cf-tool.")
|
|
|
|
parse_parser = subparsers.add_parser('parse', help = "Parse and download problem statements.")
|
|
parse_parser.add_argument('contest_id', type = int)
|
|
|
|
gen_parser = subparsers.add_parser('gen', help = "Generate a template for a problem.")
|
|
gen_parser.add_argument('task')
|
|
|
|
submit_parser = subparsers.add_parser('submit', help = "Submit a solution to Codeforces.")
|
|
submit_parser.add_argument('filename')
|
|
|
|
test_parser = subparsers.add_parser('test', help = "Run local tests on a solution.")
|
|
test_parser.add_argument('filename')
|
|
|
|
args = parser.parse_args()
|
|
|
|
if args.action == 'config':
|
|
config_menu()
|
|
return
|
|
elif args.action == 'gen':
|
|
generate(args.task)
|
|
return
|
|
|
|
if not os.path.isfile(SESSION_FILE):
|
|
print('Missing session file! Run "cf config" first', file=sys.stderr)
|
|
exit(1)
|
|
|
|
with open(SESSION_FILE) as f:
|
|
auth_data = json.load(f)
|
|
|
|
if 'session' in auth_data:
|
|
session = auth_data['session']
|
|
client = CodeforcesClient(session['cookies'], session['csrf_token'], session['ftaa'], session['user_agent'])
|
|
else:
|
|
client = CodeforcesClient.auth(auth_data['login'], auth_data['password'])
|
|
|
|
with open(SESSION_FILE, 'w') as file:
|
|
auth_data.update({
|
|
'session': client.to_dict()
|
|
})
|
|
json.dump(auth_data, file)
|
|
|
|
async with client:
|
|
await eval_args(client, args)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
loop = asyncio.new_event_loop()
|
|
loop.run_until_complete(main())
|