This commit is contained in:
commit
c7c1ff32a0
4 changed files with 250 additions and 0 deletions
148
poller.py
Normal file
148
poller.py
Normal file
|
|
@ -0,0 +1,148 @@
|
|||
import json
|
||||
import random
|
||||
import time
|
||||
import requests
|
||||
from collections.abc import Callable, Generator
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
from typing import Any
|
||||
|
||||
class Poller:
|
||||
@dataclass
|
||||
class ChangedValue[T]:
|
||||
value: T
|
||||
|
||||
@dataclass
|
||||
class FoundValue:
|
||||
value: str
|
||||
|
||||
class _LogTypes(Enum):
|
||||
NOTHING = "Nothing found."
|
||||
CHANGED = "The value has changed, but nothing was found."
|
||||
FOUND = "Found!"
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
filter: Callable[[requests.Response], str | None],
|
||||
requester: Callable[[], requests.Response],
|
||||
transformer: Callable[[requests.Response], Any] = lambda x: x.text,
|
||||
base_delay: tuple[int, int] = (20 * 60, 45 * 60),
|
||||
retry_delay: tuple[int, int] = (3 * 60, 10 * 60)
|
||||
) -> None:
|
||||
"""
|
||||
Initialize a Poller instance for periodically polling an API endpoint
|
||||
until a specific result is found.
|
||||
This class repeatedly sends requests to an API and inspects the responses
|
||||
using a filter function. If the filter detects the desired result,
|
||||
polling stops. Otherwise, it continues after a delay. A transformer
|
||||
function is used to normalize or clean the response content (e.g., removing
|
||||
noise such as timestamps or metadata) to avoid unnecessary triggering of
|
||||
the "changed" state.
|
||||
:param filter: A callable that examines a ``requests.Response`` object and
|
||||
returns a unique string (e.g., a key, ID, or result indicator)
|
||||
if the target condition is met. Returns ``None`` if polling
|
||||
should continue. This is the key signal for terminating the poll.
|
||||
:type filter: Callable[[requests.Response], str | None]
|
||||
:param requester: A callable that performs the actual API request and returns
|
||||
a ``requests.Response`` object. This abstracts the HTTP
|
||||
communication from the polling logic.
|
||||
:type requester: Callable[[], requests.Response]
|
||||
:param transformer: A callable used to clean or transform the response for
|
||||
change detection purposes. It helps filter out inconsequential
|
||||
differences like timestamps, headers, etc., that shouldn't
|
||||
be interpreted as meaningful changes. Defaults to
|
||||
``lambda x: x.text``.
|
||||
:type transformer: Callable[[requests.Response], Any], optional
|
||||
:param base_delay: A tuple representing the range of time in seconds to wait
|
||||
between polling cycles when no change is detected. This helps
|
||||
reduce the frequency of API calls when responses remain stable.
|
||||
:type base_delay: tuple[int, int], optional
|
||||
:param retry_delay: A tuple representing the range of time in seconds to wait
|
||||
after a change is detected (but not a final result), often
|
||||
to give the API time to settle or provide consistent data.
|
||||
:type retry_delay: tuple[int, int], optional
|
||||
:returns: None
|
||||
:rtype: None
|
||||
"""
|
||||
self.prev_resp = None
|
||||
self.filter = filter
|
||||
self.transformer = transformer
|
||||
self.requester = requester
|
||||
self.base_delay = base_delay
|
||||
self.retry_delay = retry_delay
|
||||
|
||||
|
||||
@staticmethod
|
||||
def _log(t: _LogTypes, r: Any):
|
||||
print(f'[{time.strftime("%X %x %Z")}] {t.value} Response: {r}')
|
||||
|
||||
def _set_and_return[T](self, value: T) -> tuple[tuple[int, int], ChangedValue[T] | None]:
|
||||
if self.prev_resp != value:
|
||||
self.prev_resp = value
|
||||
self._log(self._LogTypes.CHANGED, value)
|
||||
return self.retry_delay, self.ChangedValue(value)
|
||||
self._log(self._LogTypes.NOTHING, value)
|
||||
return self.base_delay, None
|
||||
|
||||
def poll(self) -> Generator[ChangedValue | FoundValue | None]:
|
||||
while True:
|
||||
r = self.requester()
|
||||
|
||||
match self.filter(r):
|
||||
case None:
|
||||
delay, value = self._set_and_return(self.transformer(r))
|
||||
yield value
|
||||
case value:
|
||||
self._log(self._LogTypes.FOUND, value)
|
||||
yield self.FoundValue(value)
|
||||
return
|
||||
|
||||
time.sleep(random.randint(*delay))
|
||||
|
||||
|
||||
TOKEN = ""
|
||||
ADMIN_ID = 0
|
||||
TG_API_URL = f'https://api.telegram.org/bot{TOKEN}/sendMessage'
|
||||
|
||||
def send_message(prefix: str, body: Any):
|
||||
prefix += '\n'
|
||||
body = str(body)
|
||||
|
||||
entities = {
|
||||
'type': 'pre',
|
||||
'offset': len(prefix) + 1,
|
||||
'length': len(body)
|
||||
}
|
||||
|
||||
r = requests.post(TG_API_URL, data = {
|
||||
'chat_id': ADMIN_ID,
|
||||
'entities': json.dumps([entities]),
|
||||
'text': prefix + body,
|
||||
})
|
||||
if r.status_code != 200:
|
||||
print(r.text)
|
||||
|
||||
|
||||
def make_request() -> requests.Response:
|
||||
return requests.get("https://google.com")
|
||||
|
||||
|
||||
def my_filter(r: requests.Response) -> str | None:
|
||||
if r.status_code != 200:
|
||||
return
|
||||
return r.text
|
||||
|
||||
|
||||
def main():
|
||||
print('Running!')
|
||||
poller = Poller(my_filter, make_request)
|
||||
|
||||
for resp in poller.poll():
|
||||
match resp:
|
||||
case Poller.ChangedValue(x):
|
||||
send_message("🟡", x)
|
||||
case Poller.FoundValue(x):
|
||||
send_message("🟢", x)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Loading…
Add table
Add a link
Reference in a new issue