1
0
Fork 0
pwnbars/main.py

337 lines
11 KiB
Python

import asyncio
import base64
import hashlib
import hmac
import json as _json
import os
import random
import subprocess
import time
import uuid
from typing import Literal
from datetime import datetime as Dt, timedelta as Td
import urllib.parse as up
import requests
FETCH_DELAY = 50
UPDATE_DELAY = 30
type PrimitiveType = int | str | bool | float | bytes | None
type ValidData = PrimitiveType | list[ValidData] | dict[str, ValidData]
jq = lambda x: subprocess.run('jq', input=_json.dumps(x).encode())
def print_headers(r: requests.Response):
for k, v in r.request.headers.items():
print(f'{k}: {v}')
print(r.request.body)
def print_request(r: requests.Response):
print(r.url)
print_headers(r)
if str(r.headers.get('content-type')).startswith('application/json'):
print(r.status_code)
jq(r.json())
else:
print(r.status_code, r.text)
r.raise_for_status()
class PacerClient:
SECRET_UUID = b'B7A4DB15-D69A-4C8A-BA68-39E0AA208DB8'
BASE_URL = 'https://api.pacer.cc/'
def __init__(self, email: str, password: str):
self._session = requests.Session()
self._device_id = self.random_device_id()
self._authorize(email, password)
@staticmethod
def random_device_id() -> str:
return ''.join(random.choice('0123456789abcdef') for _ in range(16))
def request(
self,
method: Literal['GET', 'POST', 'PUT'],
path: str,
account_id: int,
access_token: str | None,
params: dict[str, PrimitiveType] | None = None,
data: dict[str, PrimitiveType] | None = None,
json: ValidData | None = None,
):
data_hash = params_url = content_type = None
if params:
for k, v in params.items():
if isinstance(v, bool):
params[k] = str(v).lower()
params_url = up.urlencode(params) if params else None
if data:
content_type = 'application/x-www-form-urlencoded; charset=UTF-8'
data_hash = self._md5(up.urlencode(data))
if json:
content_type = 'application/json; charset=UTF-8'
data_hash = self._md5(_json.dumps(json, separators=(',', ':')))
nonce = str(random.randrange(1000000000))
timestamp = str(int(time.time()))
auth_header = 'Pacer ' + self._generate_signature(
access_token=access_token,
body_hash=data_hash,
current_time=timestamp,
nonce=nonce,
params=params_url,
path='/' + path
)
r = requests.request(
method=method,
params=params,
url=up.urljoin(self.BASE_URL, path),
headers={
'Authorization': auth_header,
'X-Pacer-Nonce': nonce,
'X-Pacer-Time': timestamp,
'User-Agent': 'okhttp/4.10.0',
'X-Pacer-Access-Token': access_token or '',
'X-Pacer-Request-Token': 'true' if not access_token else None,
'Content-Type': content_type,
'X-Pacer-Device-Id': self._device_id,
'X-Pacer-Account-Id': str(account_id),
'X-Pacer-OS': 'android',
'X-Pacer-Product': 'pacer',
'X-Pacer-Client-Id': 'pacer_android',
'X-Pacer-Language': 'en',
'X-Pacer-Locale': 'en_US',
'X-Pacer-Timezone': 'Europe/Moscow',
'X-Pacer-Version': 'p12.10.1',
'X-Pacer-Timezone-Offset': '180',
},
data=_json.dumps(json, separators=(',', ':')) if json else data,
)
r.raise_for_status()
return r
@staticmethod
def _md5(s: str) -> str:
md5 = hashlib.md5()
md5.update(s.encode())
digest = md5.digest()
return ''.join(f'{b:02x}' for b in digest)
@classmethod
def _generate_signature(
cls,
path: str,
current_time: str,
nonce: str,
access_token: str | None = None,
params: str | None = None,
body_hash: str | None = None
) -> str:
mac = hmac.new(cls.SECRET_UUID, digestmod=hashlib.sha1)
mac.update(current_time.encode())
mac.update(nonce.encode())
if access_token:
mac.update(access_token.encode())
if params:
mac.update(params.encode())
if body_hash:
mac.update(body_hash.encode())
if path:
mac.update(path.encode())
digest = mac.digest()
b64 = base64.b64encode(digest).decode()
return up.quote(b64, safe='').replace('%0A', '')
def _authorize(self, email: str, password: str):
r = self.request('POST', 'api/v2.0/accounts', 0, None)
data = r.json()['data']
access_token = data['access_token']
account_id = data['id']
r = self.request(
method='POST',
path='api/v2.0/login',
account_id=account_id,
access_token=access_token,
json={
'email': email,
'password': self._md5(password),
}
)
data = r.json()['data']
self.access_token: str = data['access_token']
self.account_id: int = data['id']
def make_run(self, distance: float, duration: int, calories: float, steps: int) -> str:
current_time = Dt.now()
current_timestamp = int(current_time.timestamp())
start_time = current_time - Td(seconds=duration)
client_hash = f"PID{self.account_id}--{uuid.uuid4()!s}"
r = self.request(
method='PUT',
path=f'api/v2.0/accounts/me/activities/daily_summaries/{current_time.strftime("%y%m%d")}',
account_id=self.account_id,
access_token=self.access_token,
params={'post_note': 'false'},
json={
"calories": 1,
"client_hash": "auto",
"client_timezone": "Europe/Moscow",
"client_timezone_offset": 180,
"client_unixtime": current_timestamp,
"data_version": 1,
"distance_value": 1.0,
"duration_in_seconds": 1,
"floors": 0,
"is_background": False,
"last_foreground_unixtime": current_timestamp,
"pedometer_mode": "111",
"recorded_by": "phone",
"recorded_for_datetime_iso8601": start_time.strftime("%Y-%m-%dT00:00:00.000+03:00"),
"sessions": [{
"calories": calories,
"client_hash": client_hash,
"client_payload": "{\"is_normal_data\":false,\"storageType\":\"trackTable\",\"trackId\":1,\"trackLogType\":\"Gps_Normally_Finished\"}",
"client_timezone": "Europe/Moscow",
"client_timezone_offset": 180,
"client_unixtime": current_timestamp,
"data_version": 1,
"deleted": False,
"distance_value": distance,
"duration_in_seconds": duration,
"end_for_unixtime": current_timestamp,
"google_fit_sync_state": "unsync",
"partner_session_type": "",
"partner_sync_hash": 0,
"partner_sync_state": "unsync",
"recorded_by": "phone",
"recorded_for_datetime_iso8601": start_time.strftime("%Y-%m-%dT%H:%M:%S.%f%z"),
"steps": steps,
"type": 1002
}],
"steps": 0,
"type": 0,
}
)
r = self.request(
method='POST',
path='pacer/android/api/v18/track/upload',
account_id=self.account_id,
access_token=self.access_token,
params={'post_note': 'false'},
data={
'metadata': _json.dumps({
"visible": "public",
"account_id": str(self.account_id),
"activity_data": {
"calories": calories,
"distance": distance,
"duration": duration,
"source": "pacer_android",
"steps": steps,
},
"client_hash": client_hash,
"description": "",
"hide_map": False,
"share_url": "",
"start_time": int(start_time.timestamp()),
"title": "bebra",
"track_data": "",
"track_id": "",
"type": "run",
"visible": "global",
}, separators=(',', ':'))
}
)
return r.json()['data']['share_url']
async def update_runs(delay: float | int = 5):
import pyrogram
c = pyrogram.client.Client('user')
await c.start()
m = await c.send_message('@KronbarsRunningBot', '/profile')
id: int = m.chat.id # type: ignore
await asyncio.sleep(delay)
h = [x async for x in c.get_chat_history('@KronbarsRunningBot', limit=1)][0]
mid = h.id
await c.request_callback_answer(id, mid, 'trackers')
await asyncio.sleep(delay)
await c.request_callback_answer(id, mid, 'tracker-profile_pacer')
await asyncio.sleep(delay)
await c.request_callback_answer(id, mid, 'update-tracker-activities_pacer')
await asyncio.sleep(delay)
async def main():
email = os.getenv('EMAIL')
password = os.getenv('PASSWORD')
if not (email and password):
raise ValueError('Missing email or password!')
print('-- New run: ', Dt.now().strftime('%a %m/%d %I:%M %P'), '--')
calories = float(os.getenv('CALORIES') or 0) or 383.0 + (random.random() * 50)
distance = float(os.getenv('DISTANCE') or 0) or 5239.0 + (random.random() * 30)
duration = int(os.getenv('DURATION') or '0') or 2039 + int(random.random() * 30)
steps = int(os.getenv('STEPS') or '0') or 6239 + int(random.random() * 130)
url = PacerClient(email, password).make_run(
calories=calories,
distance=distance,
duration=duration,
steps=steps,
)
print(f'Calories: {calories}')
print(f'Distance: {distance}')
print(f'Duration: {duration}')
print(f'Steps: {steps}')
print(f'Share: {url}')
if not os.getenv("TELEGRAM_SKIP"):
print(f'-- Sleeping for {FETCH_DELAY} minutes before fetching runs... --')
await asyncio.sleep(FETCH_DELAY * 60)
print("-- Fetching runs --")
try:
await update_runs(UPDATE_DELAY)
except Exception as e:
print(f'Failed to fetch runs: {e}')
print('-- Done! --')
if __name__ == '__main__':
asyncio.run(main())