1
0
Fork 0

Initial kilocode: Kilo.ai token provider with Selenium Firefox + Google OAuth

This commit is contained in:
Arthur K. 2026-03-05 21:22:47 +03:00
commit 061eefdb24
17 changed files with 1629 additions and 0 deletions

362
src/registration.py Normal file
View file

@ -0,0 +1,362 @@
import asyncio
import json
import logging
import os
import time as _time
from datetime import datetime
from pathlib import Path
from urllib.parse import urlparse
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.firefox.options import Options
from selenium.webdriver.remote.webdriver import WebDriver
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from selenium.common.exceptions import (
TimeoutException,
NoSuchElementException,
WebDriverException,
)
from tokens import ProviderTokens
from proxy import get_proxy_url, rotate_proxy_ip
logger = logging.getLogger(__name__)
DATA_DIR = Path(os.environ.get("DATA_DIR", "./data"))
MAIL_JSON = Path(os.environ.get("MAIL_JSON", "./mail.json"))
FIREFOX_BINARY = os.environ.get("FIREFOX_BINARY", "firefox")
SIGN_IN_URL = "https://app.kilo.ai/users/sign_in?callbackPath=/profile"
PROFILE_URL = "https://app.kilo.ai/profile"
MAX_IP_ROTATIONS = 3
def _is_on_kilo(url: str) -> bool:
"""Check if URL's actual domain is kilo.ai (not just in query params)."""
hostname = urlparse(url).hostname or ""
return hostname.endswith("kilo.ai")
class AutomationError(Exception):
def __init__(self, step: str, message: str, driver: WebDriver | None = None):
self.step = step
self.message = message
self.driver = driver
super().__init__(f"[{step}] {message}")
def save_error_screenshot(driver: WebDriver | None, step: str) -> None:
if driver:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
screenshots_dir = DATA_DIR / "screenshots"
screenshots_dir.mkdir(parents=True, exist_ok=True)
filename = screenshots_dir / f"error_kilo_{step}_{timestamp}.png"
try:
driver.save_screenshot(str(filename))
logger.error("Screenshot saved: %s", filename)
except WebDriverException as e:
logger.warning("Failed to save screenshot at step %s: %s", step, e)
def load_google_accounts() -> list[dict[str, str]]:
"""Load Google accounts from mail.json."""
if not MAIL_JSON.exists():
logger.error("mail.json not found at %s", MAIL_JSON)
return []
try:
with open(MAIL_JSON) as f:
accounts = json.load(f)
if not isinstance(accounts, list):
logger.error("mail.json must contain a JSON array")
return []
return accounts
except (json.JSONDecodeError, OSError) as e:
logger.error("Failed to read mail.json: %s", e)
return []
def _create_firefox_driver() -> WebDriver:
"""Launch Firefox with fresh profile mimicking a real user setup."""
proxy_url = get_proxy_url()
options = Options()
os.environ["TZ"] = "America/New_York"
# Private browsing
options.add_argument("--private-window")
options.set_preference("browser.privatebrowsing.autostart", True)
# Dark theme
options.set_preference("ui.systemUsesDarkTheme", 1)
options.set_preference("browser.theme.content-theme", 0)
options.set_preference("browser.theme.toolbar-theme", 0)
# Enhanced Tracking Protection: Strict
options.set_preference("browser.contentblocking.category", "strict")
options.set_preference("privacy.trackingprotection.enabled", True)
options.set_preference("privacy.trackingprotection.socialtracking.enabled", True)
options.set_preference("privacy.trackingprotection.cryptomining.enabled", True)
options.set_preference("privacy.trackingprotection.fingerprinting.enabled", True)
options.set_preference("network.cookie.cookieBehavior", 5)
# Disable WebRTC IP leak
options.set_preference("media.peerconnection.enabled", False)
# Proxy
if proxy_url:
parsed = urlparse(proxy_url)
proxy_host = parsed.hostname or "localhost"
proxy_port = parsed.port or 80
options.set_preference("network.proxy.type", 1)
options.set_preference("network.proxy.http", proxy_host)
options.set_preference("network.proxy.http_port", proxy_port)
options.set_preference("network.proxy.ssl", proxy_host)
options.set_preference("network.proxy.ssl_port", proxy_port)
options.set_preference("network.proxy.no_proxies_on", "")
logger.info("Firefox proxy: %s:%s", proxy_host, proxy_port)
options.binary_location = FIREFOX_BINARY
driver = webdriver.Firefox(options=options)
driver.set_page_load_timeout(120)
# Install uBlock Origin
ublock_path = DATA_DIR / "extensions" / "ublock_origin.xpi"
if ublock_path.exists():
driver.install_addon(str(ublock_path))
logger.info("uBlock Origin installed")
else:
logger.warning("uBlock Origin xpi not found at %s", ublock_path)
logger.info("Firefox launched (private, dark theme, strict ETP, uBlock)")
return driver
def _google_sign_in(driver: WebDriver, email: str, password: str) -> bool:
"""Complete Google OAuth sign-in flow. Returns True on success."""
try:
wait = WebDriverWait(driver, 150)
# Enter email
email_input = wait.until(
EC.visibility_of_element_located((By.CSS_SELECTOR, 'input[type="email"]'))
)
email_input.clear()
email_input.send_keys(email)
# Click Next
next_btn = driver.find_element(By.CSS_SELECTOR, "#identifierNext")
next_btn.click()
_time.sleep(2)
# Enter password
password_input = WebDriverWait(driver, 150).until(
EC.visibility_of_element_located(
(By.CSS_SELECTOR, 'input[name="Passwd"], input[type="password"]')
)
)
logger.info("Password field found, filling...")
password_input.clear()
password_input.send_keys(password)
# Click Next
try:
password_next = driver.find_element(By.CSS_SELECTOR, "#passwordNext")
password_next.click()
except NoSuchElementException:
buttons = driver.find_elements(By.CSS_SELECTOR, "button")
for btn in buttons:
if "next" in btn.text.lower():
btn.click()
break
_time.sleep(3)
# Handle consent / TOS / speedbump screens
for _ in range(15):
if _is_on_kilo(driver.current_url):
return True
logger.info(
"Still on Google (%s), looking for buttons...", driver.current_url[:80]
)
all_buttons = driver.find_elements(By.CSS_SELECTOR, "button")
if all_buttons:
logger.info(
"Found %d buttons, clicking last (allow/continue)...",
len(all_buttons),
)
all_buttons[-1].click()
_time.sleep(3)
else:
_time.sleep(2)
return _is_on_kilo(driver.current_url)
except Exception as e:
logger.warning("Google sign-in error: %s", e)
return False
def _try_register_once_sync(
driver: WebDriver,
email: str,
password: str,
) -> str | None:
"""Attempt one full registration cycle via Google OAuth."""
try:
# Step 1: Navigate to sign-in
logger.info("[1/4] Navigating to Kilo sign-in page...")
driver.get(SIGN_IN_URL)
wait = WebDriverWait(driver, 150)
# Step 2: Click "Continue with Google"
logger.info("[2/4] Clicking 'Continue with Google'...")
google_btn = wait.until(
EC.element_to_be_clickable(
(By.XPATH, "//*[contains(text(), 'Continue with Google')]")
)
)
google_btn.click()
# Wait for Google
WebDriverWait(driver, 30).until(EC.url_contains("accounts.google.com"))
logger.info("[2/4] Google sign-in page loaded: %s", driver.current_url)
# Step 3: Google sign-in
logger.info("[3/4] Signing in with Google (%s)...", email)
success = _google_sign_in(driver, email, password)
if not success and not _is_on_kilo(driver.current_url):
raise AutomationError(
"google_auth", "Google sign-in did not redirect to Kilo", driver
)
# Wait for redirect to kilo.ai
logger.info("[3/4] Waiting for Kilo redirect...")
deadline = _time.time() + 120
while _time.time() < deadline:
if (
_is_on_kilo(driver.current_url)
and "/users/sign_in" not in driver.current_url
):
logger.info("[3/4] On kilo.ai: %s", driver.current_url)
break
_time.sleep(1)
else:
logger.warning("Redirect not detected, current: %s", driver.current_url)
# Handle educational account confirmation
try:
confirm_btn = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((By.CSS_SELECTOR, "input#confirm"))
)
logger.info("[3/4] Educational account page, clicking confirm...")
confirm_btn.click()
_time.sleep(3)
except TimeoutException:
logger.info("[3/4] No educational account page, continuing...")
# Wait for /get-started or /profile
deadline = _time.time() + 60
while _time.time() < deadline:
url = driver.current_url
if "/get-started" in url or "/profile" in url:
break
_time.sleep(1)
# Step 4: Get API key
logger.info("[4/4] Navigating to profile to get API key...")
driver.get(PROFILE_URL)
api_key_input = WebDriverWait(driver, 200).until(
EC.visibility_of_element_located((By.CSS_SELECTOR, "input#api-key"))
)
api_key = api_key_input.get_attribute("value")
if not api_key or not api_key.strip():
raise AutomationError("profile", "API key input is empty", driver)
api_key = api_key.strip()
logger.info("[4/4] API key obtained (length=%d)", len(api_key))
return api_key
except AutomationError as e:
logger.error("Error at step [%s]: %s", e.step, e.message)
save_error_screenshot(e.driver or driver, e.step)
return None
except Exception as e:
logger.error("Unexpected error during registration: %s", e)
save_error_screenshot(driver, "unexpected")
return None
async def register_kilo_account() -> ProviderTokens | None:
"""Register a new Kilo account via Google OAuth using Selenium Firefox.
Reads Google accounts from mail.json, tries each one.
Rotates proxy IP between attempts if needed.
Browser is NOT closed after registration.
"""
logger.info("=== Starting Kilo account registration (Google OAuth) ===")
accounts = load_google_accounts()
if not accounts:
logger.error("No Google accounts available in mail.json")
return None
driver: WebDriver | None = None
try:
driver = await asyncio.to_thread(_create_firefox_driver)
for ip_attempt in range(MAX_IP_ROTATIONS):
if ip_attempt > 0:
logger.info(
"Rotating proxy IP (attempt %d/%d)...",
ip_attempt + 1,
MAX_IP_ROTATIONS,
)
rotated = await rotate_proxy_ip()
if not rotated:
logger.warning("IP rotation failed, trying anyway")
for account in accounts:
email = account.get("email", "")
password = account.get("password", "")
if not email or not password:
logger.warning("Skipping account with missing credentials")
continue
logger.info(
"Trying Google account: %s (IP rotation %d/%d)",
email,
ip_attempt + 1,
MAX_IP_ROTATIONS,
)
api_key = await asyncio.to_thread(
_try_register_once_sync, driver, email, password
)
if api_key:
return ProviderTokens(
access_token=api_key,
refresh_token=None,
expires_at=0,
)
await asyncio.sleep(2)
logger.warning("All accounts failed for current IP")
logger.error("All registration attempts exhausted")
return None
except Exception as e:
logger.error("Fatal registration error: %s", e)
return None