148 lines
5.3 KiB
Python
148 lines
5.3 KiB
Python
import json
|
|
import random
|
|
import time
|
|
import requests
|
|
from collections.abc import Callable, Generator
|
|
from dataclasses import dataclass
|
|
from enum import Enum
|
|
from typing import Any
|
|
|
|
class Poller:
|
|
@dataclass
|
|
class ChangedValue[T]:
|
|
value: T
|
|
|
|
@dataclass
|
|
class FoundValue:
|
|
value: str
|
|
|
|
class _LogTypes(Enum):
|
|
NOTHING = "Nothing found."
|
|
CHANGED = "The value has changed, but nothing was found."
|
|
FOUND = "Found!"
|
|
|
|
def __init__(
|
|
self,
|
|
filter: Callable[[requests.Response], str | None],
|
|
requester: Callable[[], requests.Response],
|
|
transformer: Callable[[requests.Response], Any] = lambda x: x.text,
|
|
base_delay: tuple[int, int] = (20 * 60, 45 * 60),
|
|
retry_delay: tuple[int, int] = (3 * 60, 10 * 60)
|
|
) -> None:
|
|
"""
|
|
Initialize a Poller instance for periodically polling an API endpoint
|
|
until a specific result is found.
|
|
This class repeatedly sends requests to an API and inspects the responses
|
|
using a filter function. If the filter detects the desired result,
|
|
polling stops. Otherwise, it continues after a delay. A transformer
|
|
function is used to normalize or clean the response content (e.g., removing
|
|
noise such as timestamps or metadata) to avoid unnecessary triggering of
|
|
the "changed" state.
|
|
:param filter: A callable that examines a ``requests.Response`` object and
|
|
returns a unique string (e.g., a key, ID, or result indicator)
|
|
if the target condition is met. Returns ``None`` if polling
|
|
should continue. This is the key signal for terminating the poll.
|
|
:type filter: Callable[[requests.Response], str | None]
|
|
:param requester: A callable that performs the actual API request and returns
|
|
a ``requests.Response`` object. This abstracts the HTTP
|
|
communication from the polling logic.
|
|
:type requester: Callable[[], requests.Response]
|
|
:param transformer: A callable used to clean or transform the response for
|
|
change detection purposes. It helps filter out inconsequential
|
|
differences like timestamps, headers, etc., that shouldn't
|
|
be interpreted as meaningful changes. Defaults to
|
|
``lambda x: x.text``.
|
|
:type transformer: Callable[[requests.Response], Any], optional
|
|
:param base_delay: A tuple representing the range of time in seconds to wait
|
|
between polling cycles when no change is detected. This helps
|
|
reduce the frequency of API calls when responses remain stable.
|
|
:type base_delay: tuple[int, int], optional
|
|
:param retry_delay: A tuple representing the range of time in seconds to wait
|
|
after a change is detected (but not a final result), often
|
|
to give the API time to settle or provide consistent data.
|
|
:type retry_delay: tuple[int, int], optional
|
|
:returns: None
|
|
:rtype: None
|
|
"""
|
|
self.prev_resp = None
|
|
self.filter = filter
|
|
self.transformer = transformer
|
|
self.requester = requester
|
|
self.base_delay = base_delay
|
|
self.retry_delay = retry_delay
|
|
|
|
|
|
@staticmethod
|
|
def _log(t: _LogTypes, r: Any):
|
|
print(f'[{time.strftime("%X %x %Z")}] {t.value} Response: {r}')
|
|
|
|
def _set_and_return[T](self, value: T) -> tuple[tuple[int, int], ChangedValue[T] | None]:
|
|
if self.prev_resp != value:
|
|
self.prev_resp = value
|
|
self._log(self._LogTypes.CHANGED, value)
|
|
return self.retry_delay, self.ChangedValue(value)
|
|
self._log(self._LogTypes.NOTHING, value)
|
|
return self.base_delay, None
|
|
|
|
def poll(self) -> Generator[ChangedValue | FoundValue | None]:
|
|
while True:
|
|
r = self.requester()
|
|
|
|
match self.filter(r):
|
|
case None:
|
|
delay, value = self._set_and_return(self.transformer(r))
|
|
yield value
|
|
case value:
|
|
self._log(self._LogTypes.FOUND, value)
|
|
yield self.FoundValue(value)
|
|
return
|
|
|
|
time.sleep(random.randint(*delay))
|
|
|
|
|
|
TOKEN = ""
|
|
ADMIN_ID = 0
|
|
TG_API_URL = f'https://api.telegram.org/bot{TOKEN}/sendMessage'
|
|
|
|
def send_message(prefix: str, body: Any):
|
|
prefix += '\n'
|
|
body = str(body)
|
|
|
|
entities = {
|
|
'type': 'pre',
|
|
'offset': len(prefix) + 1,
|
|
'length': len(body)
|
|
}
|
|
|
|
r = requests.post(TG_API_URL, data = {
|
|
'chat_id': ADMIN_ID,
|
|
'entities': json.dumps([entities]),
|
|
'text': prefix + body,
|
|
})
|
|
if r.status_code != 200:
|
|
print(r.text)
|
|
|
|
|
|
def make_request() -> requests.Response:
|
|
return requests.get("https://google.com")
|
|
|
|
|
|
def my_filter(r: requests.Response) -> str | None:
|
|
if r.status_code != 200:
|
|
return
|
|
return r.text
|
|
|
|
|
|
def main():
|
|
print('Running!')
|
|
poller = Poller(my_filter, make_request)
|
|
|
|
for resp in poller.poll():
|
|
match resp:
|
|
case Poller.ChangedValue(x):
|
|
send_message("🟡", x)
|
|
case Poller.FoundValue(x):
|
|
send_message("🟢", x)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|