1
0
Fork 0
megacode/src/registration.py

458 lines
16 KiB
Python

import asyncio
import logging
import os
import random
import time as _time
from datetime import datetime
from pathlib import Path
from urllib.parse import urlparse
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.firefox.options import Options
from selenium.webdriver.firefox.service import Service
from selenium.webdriver.common.action_chains import ActionChains
from selenium.webdriver.remote.webdriver import WebDriver
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.support.ui import WebDriverWait
from selenium.common.exceptions import (
TimeoutException,
NoSuchElementException,
WebDriverException,
)
from tokens import ProviderTokens
from proxy import HTTPS_PROXY, rotate_proxy_ip
from emails import pop_account
logger = logging.getLogger(__name__)
DATA_DIR = Path(os.environ.get("DATA_DIR", "./data"))
EXTRAS_DIR = Path(os.environ.get("EXTRAS_DIR", "./extras"))
FIREFOX_BINARY = os.environ.get("FIREFOX_BINARY", "firefox")
GECKODRIVER_PATH = os.environ.get("GECKODRIVER_PATH", "/usr/local/bin/geckodriver")
KILO_HOME = "https://kilo.ai/"
PROFILE_URL = "https://app.kilo.ai/profile"
MAX_IP_ROTATIONS = 3
def human_delay():
_time.sleep(random.uniform(0.5, 1.35))
def human_type(element, text):
for char in text:
element.send_keys(char)
_time.sleep(random.uniform(0.05, 0.15))
def human_click(driver, element):
driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", element)
human_delay()
driver.execute_script("arguments[0].click();", element)
human_delay()
def _is_on_kilo(url: str) -> bool:
"""Check if URL's actual domain is kilo.ai (not just in query params)."""
hostname = urlparse(url).hostname or ""
return hostname.endswith("kilo.ai")
class AutomationError(Exception):
def __init__(self, step: str, message: str, driver: WebDriver | None = None):
self.step = step
self.message = message
self.driver = driver
super().__init__(f"[{step}] {message}")
def save_error_screenshot(driver: WebDriver | None, step: str) -> None:
if driver:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
screenshots_dir = DATA_DIR / "screenshots"
screenshots_dir.mkdir(parents=True, exist_ok=True)
filename = screenshots_dir / f"error_kilo_{step}_{timestamp}.png"
try:
driver.save_screenshot(str(filename))
logger.error("Screenshot saved: %s", filename)
except WebDriverException as e:
logger.warning("Failed to save screenshot at step %s: %s", step, e)
def _create_firefox_driver() -> WebDriver:
"""Launch Firefox with fresh profile mimicking a real user setup."""
proxy_url = HTTPS_PROXY
options = Options()
os.environ["TZ"] = "America/New_York"
# Private browsing mode with extensions allowed
options.add_argument("-private")
options.set_preference("extensions.privatebrowsing.autostart", True)
options.set_preference("extensions.allowPrivateBrowsingByDefault", True)
# Dark theme
options.set_preference("ui.systemUsesDarkTheme", 1)
options.set_preference("browser.theme.content-theme", 0)
options.set_preference("browser.theme.toolbar-theme", 0)
# Enhanced Tracking Protection: Strict
options.set_preference("browser.contentblocking.category", "strict")
options.set_preference("privacy.trackingprotection.enabled", True)
options.set_preference("privacy.trackingprotection.socialtracking.enabled", True)
options.set_preference("privacy.trackingprotection.cryptomining.enabled", True)
options.set_preference("privacy.trackingprotection.fingerprinting.enabled", True)
options.set_preference("network.cookie.cookieBehavior", 5)
# Disable WebRTC IP leak
options.set_preference("media.peerconnection.enabled", False)
# Anti-detection: hide webdriver
options.set_preference("dom.webdriver.enabled", False)
options.set_preference("useAutomationExtension", False)
# Enable WebGL (software rendering via Mesa)
options.set_preference("webgl.disabled", False)
options.set_preference("webgl.force-enabled", True)
options.set_preference("webgl.msaa-force", True)
options.set_preference("webgl.max-warnings-per-context", 0)
# Proxy
if proxy_url:
parsed = urlparse(proxy_url)
proxy_host = parsed.hostname or "localhost"
proxy_port = parsed.port or 80
options.set_preference("network.proxy.type", 1)
options.set_preference("network.proxy.http", proxy_host)
options.set_preference("network.proxy.http_port", proxy_port)
options.set_preference("network.proxy.ssl", proxy_host)
options.set_preference("network.proxy.ssl_port", proxy_port)
options.set_preference("network.proxy.no_proxies_on", "")
logger.info("Firefox proxy: %s:%s", proxy_host, proxy_port)
options.binary_location = FIREFOX_BINARY
service = Service(executable_path=GECKODRIVER_PATH)
driver = webdriver.Firefox(service=service, options=options) # type: ignore[reportCallIssue]
driver.set_page_load_timeout(120)
# Install Dark Reader extension (Selenium cleanup)
dark_reader_path = EXTRAS_DIR / "extensions" / "dark-reader.xpi"
if dark_reader_path.exists():
driver.install_addon(str(dark_reader_path), temporary=True)
logger.info("Dark Reader extension installed")
else:
logger.warning("Dark Reader xpi not found at %s", dark_reader_path)
# Install uBlock Origin
ublock_path = EXTRAS_DIR / "extensions" / "ublock_origin.xpi"
if ublock_path.exists():
driver.install_addon(str(ublock_path), temporary=True)
logger.info("uBlock Origin installed")
else:
logger.warning("uBlock Origin xpi not found at %s", ublock_path)
logger.info("Firefox launched (Dark Reader, uBlock, dark theme, strict ETP)")
return driver
def _google_sign_in(driver: WebDriver, email: str, password: str) -> bool:
"""Complete Google OAuth sign-in flow. Returns True on success."""
try:
wait = WebDriverWait(driver, 150)
# Enter email
email_input = wait.until(
EC.visibility_of_element_located((By.CSS_SELECTOR, 'input[type="email"]'))
)
human_delay()
email_input.clear()
human_delay()
human_type(email_input, email)
human_delay()
# Click Next
next_btn = driver.find_element(By.CSS_SELECTOR, "#identifierNext")
human_click(driver, next_btn)
# Enter password
password_input = WebDriverWait(driver, 150).until(
EC.visibility_of_element_located(
(By.CSS_SELECTOR, 'input[name="Passwd"], input[type="password"]')
)
)
logger.info("Password field found, filling...")
human_delay()
password_input.clear()
human_delay()
human_type(password_input, password)
human_delay()
# Click Next
try:
password_next = driver.find_element(By.CSS_SELECTOR, "#passwordNext")
human_click(driver, password_next)
except NoSuchElementException:
buttons = driver.find_elements(By.CSS_SELECTOR, "button")
for btn in buttons:
if "next" in btn.text.lower():
human_click(driver, btn)
break
human_delay()
# wait for the page to reload # TODO: wait for a proper event
_time.sleep(8)
# Handle consent / TOS / speedbump screens
for _ in range(15):
if _is_on_kilo(driver.current_url):
return True
logger.info(
"Still on Google (%s), looking for buttons...", driver.current_url[:80]
)
all_buttons = driver.find_elements(By.CSS_SELECTOR, "button")
if all_buttons:
btn_texts = [b.text.strip() for b in all_buttons]
logger.info("Found %d buttons: %s", len(all_buttons), btn_texts)
btn = all_buttons[-1]
driver.execute_script(
"arguments[0].scrollIntoView({block: 'center'});", btn
)
human_delay()
# Try ActionChains for more realistic click
try:
ActionChains(driver).move_to_element(btn).pause(
0.3
).click().perform()
except Exception:
btn.click()
human_delay()
# Check if URL changed
if _is_on_kilo(driver.current_url):
return True
else:
human_delay()
return _is_on_kilo(driver.current_url)
except Exception as e:
logger.warning("Google sign-in error: %s", e)
return False
def _try_register_once_sync(
driver: WebDriver,
email: str,
password: str,
) -> str | None:
"""Attempt one full registration cycle via Google OAuth."""
try:
# Step 1: Navigate to Kilo home
logger.info("[1/6] Navigating to Kilo home...")
driver.get(KILO_HOME)
human_delay()
wait = WebDriverWait(driver, 150)
# Step 2: Click Sign up (opens new tab)
logger.info("[2/6] Clicking 'Sign up'...")
handles_before = set(driver.window_handles)
signup_btn = wait.until(
EC.element_to_be_clickable(
(
By.XPATH,
"//a[contains(text(), 'Sign up') or contains(text(), 'sign up')]",
)
)
)
human_click(driver, signup_btn)
# Switch to new tab
WebDriverWait(driver, 30).until(
lambda d: len(d.window_handles) > len(handles_before)
)
new_handles = set(driver.window_handles) - handles_before
if new_handles:
driver.switch_to.window(new_handles.pop())
logger.info("[2/6] Switched to new tab: %s", driver.current_url)
else:
raise AutomationError(
"signup", "No new tab opened after clicking Sign up", driver
)
human_delay()
# Wait for page load
WebDriverWait(driver, 30).until(
lambda d: d.execute_script("return document.readyState") == "complete"
)
human_delay()
logger.info("[2/6] Page loaded: %s", driver.current_url)
# Step 3: Click "Sign in or Sign up"
logger.info("[3/6] Clicking 'Sign in or Sign up'...")
signin_signup_btn = wait.until(
EC.element_to_be_clickable(
(
By.XPATH,
"//a[contains(text(), 'Sign in') or contains(text(), 'sign in') or contains(text(), 'Sign up') or contains(text(), 'sign up')]",
)
)
)
human_click(driver, signin_signup_btn)
human_delay()
# Wait for page load
WebDriverWait(driver, 30).until(
lambda d: d.execute_script("return document.readyState") == "complete"
)
human_delay()
logger.info("[3/6] Redirected to: %s", driver.current_url)
# Step 4: Click "Sign in with Google"
logger.info("[4/6] Clicking 'Sign in with Google'...")
google_btn = wait.until(
EC.element_to_be_clickable(
(
By.XPATH,
"//*[contains(text(), 'Sign in with Google') or contains(text(), 'Continue with Google')]",
)
)
)
human_click(driver, google_btn)
# Wait for Google
WebDriverWait(driver, 30).until(EC.url_contains("accounts.google.com"))
logger.info("[4/6] Google sign-in page loaded: %s", driver.current_url)
# Step 5: Google sign-in
logger.info("[5/6] Signing in with Google (%s)...", email)
success = _google_sign_in(driver, email, password)
if not success and not _is_on_kilo(driver.current_url):
raise AutomationError(
"google_auth", "Google sign-in did not redirect to Kilo", driver
)
# Wait for redirect to kilo.ai
logger.info("[5/6] Waiting for Kilo redirect...")
deadline = _time.time() + 120
while _time.time() < deadline:
if (
_is_on_kilo(driver.current_url)
and "/users/sign_in" not in driver.current_url
):
logger.info("[5/6] On kilo.ai: %s", driver.current_url)
break
human_delay()
else:
logger.warning("Redirect not detected, current: %s", driver.current_url)
# Handle educational account confirmation
try:
confirm_btn = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((By.CSS_SELECTOR, "input#confirm"))
)
logger.info("[5/6] Educational account page, clicking confirm...")
human_click(driver, confirm_btn)
except TimeoutException:
logger.info("[5/6] No educational account page, continuing...")
# Wait for /get-started or /profile
deadline = _time.time() + 60
while _time.time() < deadline:
url = driver.current_url
if "/get-started" in url or "/profile" in url:
break
human_delay()
# Step 6: Get API key
logger.info("[6/6] Navigating to profile to get API key...")
driver.get(PROFILE_URL)
human_delay()
api_key_input = WebDriverWait(driver, 200).until(
EC.visibility_of_element_located((By.CSS_SELECTOR, "input#api-key"))
)
api_key = api_key_input.get_attribute("value")
if not api_key or not api_key.strip():
raise AutomationError("profile", "API key input is empty", driver)
api_key = api_key.strip()
logger.info("[6/6] API key obtained (length=%d)", len(api_key))
return api_key
except AutomationError as e:
logger.error("Error at step [%s]: %s", e.step, e.message)
save_error_screenshot(e.driver or driver, e.step)
return None
except Exception as e:
logger.error("Unexpected error during registration: %s", e)
save_error_screenshot(driver, "unexpected")
return None
async def register_kilo_account() -> ProviderTokens | None:
"""Register a new Kilo account via Google OAuth using Selenium Firefox.
Pops one email account from emails.txt and attempts registration.
Rotates proxy IP between attempts if needed.
"""
logger.info("=== Starting Kilo account registration (Google OAuth) ===")
account = pop_account()
if not account:
logger.error("No email accounts available")
return None
driver: WebDriver | None = None
try:
driver = await asyncio.to_thread(_create_firefox_driver)
for ip_attempt in range(MAX_IP_ROTATIONS):
# driver.get("http://localhost:8005/")
# await asyncio.sleep(100000000000000000) # for debugging
if ip_attempt > 0:
logger.info(
"Rotating proxy IP (attempt %d/%d)...",
ip_attempt + 1,
MAX_IP_ROTATIONS,
)
rotated = await rotate_proxy_ip()
if not rotated:
logger.warning("IP rotation failed, trying anyway")
logger.info(
"Trying Google account: %s (IP attempt %d/%d)",
account.email,
ip_attempt + 1,
MAX_IP_ROTATIONS,
)
api_key = await asyncio.to_thread(
_try_register_once_sync, driver, account.email, account.password
)
if api_key:
return ProviderTokens(
access_token=api_key,
refresh_token=None,
expires_at=0,
)
await asyncio.sleep(2)
logger.error("All registration attempts exhausted for %s", account.email)
return None
except Exception as e:
logger.error("Fatal registration error: %s", e)
return None