1
0
Fork 0
a/poller.py
2025-11-07 19:35:10 +03:00

148 lines
5.3 KiB
Python

import json
import random
import time
import requests
from collections.abc import Callable, Generator
from dataclasses import dataclass
from enum import Enum
from typing import Any
class Poller:
@dataclass
class ChangedValue[T]:
value: T
@dataclass
class FoundValue:
value: str
class _LogTypes(Enum):
NOTHING = "Nothing found."
CHANGED = "The value has changed, but nothing was found."
FOUND = "Found!"
def __init__(
self,
filter: Callable[[requests.Response], str | None],
requester: Callable[[], requests.Response],
transformer: Callable[[requests.Response], Any] = lambda x: x.text,
base_delay: tuple[int, int] = (20 * 60, 45 * 60),
retry_delay: tuple[int, int] = (3 * 60, 10 * 60)
) -> None:
"""
Initialize a Poller instance for periodically polling an API endpoint
until a specific result is found.
This class repeatedly sends requests to an API and inspects the responses
using a filter function. If the filter detects the desired result,
polling stops. Otherwise, it continues after a delay. A transformer
function is used to normalize or clean the response content (e.g., removing
noise such as timestamps or metadata) to avoid unnecessary triggering of
the "changed" state.
:param filter: A callable that examines a ``requests.Response`` object and
returns a unique string (e.g., a key, ID, or result indicator)
if the target condition is met. Returns ``None`` if polling
should continue. This is the key signal for terminating the poll.
:type filter: Callable[[requests.Response], str | None]
:param requester: A callable that performs the actual API request and returns
a ``requests.Response`` object. This abstracts the HTTP
communication from the polling logic.
:type requester: Callable[[], requests.Response]
:param transformer: A callable used to clean or transform the response for
change detection purposes. It helps filter out inconsequential
differences like timestamps, headers, etc., that shouldn't
be interpreted as meaningful changes. Defaults to
``lambda x: x.text``.
:type transformer: Callable[[requests.Response], Any], optional
:param base_delay: A tuple representing the range of time in seconds to wait
between polling cycles when no change is detected. This helps
reduce the frequency of API calls when responses remain stable.
:type base_delay: tuple[int, int], optional
:param retry_delay: A tuple representing the range of time in seconds to wait
after a change is detected (but not a final result), often
to give the API time to settle or provide consistent data.
:type retry_delay: tuple[int, int], optional
:returns: None
:rtype: None
"""
self.prev_resp = None
self.filter = filter
self.transformer = transformer
self.requester = requester
self.base_delay = base_delay
self.retry_delay = retry_delay
@staticmethod
def _log(t: _LogTypes, r: Any):
print(f'[{time.strftime("%X %x %Z")}] {t.value} Response: {r}')
def _set_and_return[T](self, value: T) -> tuple[tuple[int, int], ChangedValue[T] | None]:
if self.prev_resp != value:
self.prev_resp = value
self._log(self._LogTypes.CHANGED, value)
return self.retry_delay, self.ChangedValue(value)
self._log(self._LogTypes.NOTHING, value)
return self.base_delay, None
def poll(self) -> Generator[ChangedValue | FoundValue | None]:
while True:
r = self.requester()
match self.filter(r):
case None:
delay, value = self._set_and_return(self.transformer(r))
yield value
case value:
self._log(self._LogTypes.FOUND, value)
yield self.FoundValue(value)
return
time.sleep(random.randint(*delay))
TOKEN = ""
ADMIN_ID = 0
TG_API_URL = f'https://api.telegram.org/bot{TOKEN}/sendMessage'
def send_message(prefix: str, body: Any):
prefix += '\n'
body = str(body)
entities = {
'type': 'pre',
'offset': len(prefix) + 1,
'length': len(body)
}
r = requests.post(TG_API_URL, data = {
'chat_id': ADMIN_ID,
'entities': json.dumps([entities]),
'text': prefix + body,
})
if r.status_code != 200:
print(r.text)
def make_request() -> requests.Response:
return requests.get("https://google.com")
def my_filter(r: requests.Response) -> str | None:
if r.status_code != 200:
return
return r.text
def main():
print('Running!')
poller = Poller(my_filter, make_request)
for resp in poller.poll():
match resp:
case Poller.ChangedValue(x):
send_message("🟡", x)
case Poller.FoundValue(x):
send_message("🟢", x)
if __name__ == "__main__":
main()