commit 3c583aac15c77d89409506c1139151bab41e7ebf Author: Arthur K. Date: Sun May 3 14:23:08 2026 +0300 init diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..7f568fe --- /dev/null +++ b/Dockerfile @@ -0,0 +1,9 @@ +FROM python:3.14 + +WORKDIR /app +COPY main.py . + +STOPSIGNAL SIGINT +ENV PORT=80 +EXPOSE 80 +CMD ["python3", "-u", "main.py"] diff --git a/main.py b/main.py new file mode 100644 index 0000000..1508a27 --- /dev/null +++ b/main.py @@ -0,0 +1,463 @@ +import os +import gzip +import re +import sys +import urllib.request +import xml.etree.ElementTree as ET +from dataclasses import dataclass +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from http.client import HTTPResponse +from typing import Any + + +URL: str = os.getenv('URL', '') +ALT_URL: str = os.getenv('ALT_URL', '') +BASE_URL: str = os.getenv('BASE_URL', '') +EPG_URL: str = os.getenv('EPG_URL', '') +HOST: str = '0.0.0.0' +PORT: int = int(os.getenv('PORT', '8080')) +TIMESHIFT_RE = re.compile(r'\(\+\d+\)') +TVG_ID_RE = re.compile(r'tvg-id="[^"]*"') +TVG_LOGO_RE = re.compile(r'\s*tvg-logo="[^"]*"') +URL_TVG_RE = re.compile(r'\b(?:url-tvg|x-tvg-url)="[^"]*"') +EXCLUDED_GROUPS: set[str] = { + 'Грузия', + 'Турция', + 'Армения', + 'Латвия', + 'Германия', + 'Литва', + 'Эстония', + 'Азербайджан', + 'Казахстан', + 'Молдова', + 'Узбекистан', + 'Израиль', + 'Польша', + 'Таджикистан', + 'Другие (тест)', + 'Саудовская Аравия', +} + +if not URL: + print("Missing upstream URL", file=sys.stderr) + sys.exit(1) + +if not BASE_URL: + print("Missing base URL", file=sys.stderr) + sys.exit(1) + +if not EPG_URL: + print("Missing EPG URL", file=sys.stderr) + sys.exit(1) + + +@dataclass +class Channel: + name: str + lines: list[str] + keep_logo: bool = False + + +@dataclass +class EpgChannel: + channel_id: str + names: list[str] + icon: str | None + + +def fetch(url: str) -> HTTPResponse: + request = urllib.request.Request(url, headers={'User-Agent': 'curl/8.23.9'}) + return urllib.request.urlopen(request, timeout=60) + + +def parse_playlist(text: str) -> tuple[str, list[Channel]]: + lines = text.splitlines() + if not lines or not lines[0].startswith('#EXTM3U'): + raise ValueError('Playlist must start with #EXTM3U') + + header = lines[0] + channels = [] + current = None + + for line in lines[1:]: + if line.startswith('#EXTINF:'): + if current is not None: + channels.append(current) + name = line.split(',', 1)[1].strip() if ',' in line else '' + current = Channel(name=name, lines=[line]) + continue + + if current is None: + continue + + current.lines.append(line) + if line and not line.startswith('#'): + channels.append(current) + current = None + + if current is not None: + channels.append(current) + + return header, channels + + +def filter_channels(channels: list[Channel]) -> list[Channel]: + by_name = {channel.name: channel for channel in channels} + replaced_hd_names: set[str] = set() + kept = [] + + for channel in channels: + if channel_group(channel) in EXCLUDED_GROUPS: + continue + + if TIMESHIFT_RE.search(channel.name): + continue + + hd_name = f'{channel.name} HD' + if hd_name in by_name: + hd_channel = by_name[hd_name] + if not TIMESHIFT_RE.search(hd_channel.name) and channel_group(hd_channel) not in EXCLUDED_GROUPS: + kept.append(Channel(name=hd_channel.name, lines=with_logo_from(hd_channel.lines, channel.lines), keep_logo=True)) + replaced_hd_names.add(hd_name) + else: + kept.append(channel) + continue + + if channel.name == 'Первый канал' and 'Первый HD' in by_name: + hd_channel = by_name['Первый HD'] + if channel_group(hd_channel) not in EXCLUDED_GROUPS: + kept.append(Channel(name=hd_channel.name, lines=with_logo_from(hd_channel.lines, channel.lines), keep_logo=True)) + replaced_hd_names.add('Первый HD') + else: + kept.append(channel) + continue + + if channel.name in replaced_hd_names: + continue + + kept.append(channel) + + return kept + + +def channel_group(channel: Channel) -> str: + for line in channel.lines: + if not line.startswith('#EXTINF:'): + continue + + match = re.search(r'group-title="([^"]*)"', line) + if match: + return match.group(1) + + return '' + + +def extinf_logo(lines: list[str]) -> str | None: + for line in lines: + if not line.startswith('#EXTINF:'): + continue + + match = TVG_LOGO_RE.search(line) + if match: + return match.group(0).split('="', 1)[1][:-1] + + return None + + +def replace_logo(line: str, logo_url: str) -> str: + if TVG_LOGO_RE.search(line): + return TVG_LOGO_RE.sub(f' tvg-logo="{logo_url}"', line, count=1) + + if ',' not in line: + return line + + prefix, name = line.rsplit(',', 1) + return f'{prefix} tvg-logo="{logo_url}",{name}' + + +def with_logo_from(target_lines: list[str], source_lines: list[str]) -> list[str]: + logo = extinf_logo(source_lines) + if not logo: + return list(target_lines) + + return [replace_logo(line, logo) if line.startswith('#EXTINF:') else line for line in target_lines] + + +def strip_hd_suffix(channel: Channel) -> Channel: + if not channel.name.endswith(' HD'): + return channel + + new_name = 'Первый канал' if channel.name == 'Первый HD' else channel.name[:-3] + new_lines = list(channel.lines) + for index, line in enumerate(new_lines): + if line.startswith('#EXTINF:') and ',' in line: + prefix, _ = line.rsplit(',', 1) + new_lines[index] = f'{prefix},{new_name}' + break + + return Channel(name=new_name, lines=new_lines, keep_logo=channel.keep_logo) + + +def strip_hd_suffixes(channels: list[Channel]) -> list[Channel]: + return [strip_hd_suffix(channel) for channel in channels] + + +def build_playlist(header: str, channels: list[Channel]) -> str: + lines = [header] + for channel in channels: + lines.extend(channel.lines) + return '\n'.join(lines) + '\n' + + +def local_name(tag: str) -> str: + return tag.rsplit('}', 1)[-1] + + +def parse_epg_channels(url: str) -> list[EpgChannel]: + channels: list[EpgChannel] = [] + with fetch(url) as response: + with gzip.GzipFile(fileobj=response) as gz: + current_id: str | None = None + current_names: list[str] = [] + current_icon: str | None = None + + for event, elem in ET.iterparse(gz, events=('start', 'end')): + tag = local_name(elem.tag) + + if event == 'start' and tag == 'programme': + elem.clear() + break + + if event == 'start' and tag == 'channel': + current_id = elem.attrib.get('id', '').strip() + current_names = [] + current_icon = None + continue + + if event == 'start' and tag == 'icon' and current_id is not None: + current_icon = elem.attrib.get('src') or current_icon + continue + + if event == 'end' and tag == 'display-name' and current_id is not None: + if elem.text and elem.text.strip(): + current_names.append(elem.text.strip()) + elem.clear() + continue + + if event == 'end' and tag == 'channel': + names = list(dict.fromkeys(current_names)) + if current_id and names: + channels.append(EpgChannel(channel_id=current_id, names=names, icon=current_icon)) + current_id = None + current_names = [] + current_icon = None + elem.clear() + continue + + if event == 'end': + elem.clear() + + return channels + + +def normalize_name(name: str) -> str: + name = name.casefold().replace('ё', 'е') + name = re.sub(r'\[[^\]]*\]', ' ', name) + name = re.sub(r'\([^)]*\)', ' ', name) + name = re.sub(r'[!.,:;_\-+/]+', ' ', name) + name = re.sub(r'\b(?:hd|fhd|uhd|4k|orig|50|tv|тв|канал|channel)\b', ' ', name) + name = re.sub(r'\s+', ' ', name).strip() + return name + + +def normalize_exact_name(name: str) -> str: + name = name.casefold().replace('ё', 'е') + name = re.sub(r'\[[^\]]*\]', ' ', name) + name = re.sub(r'\([^)]*\)', ' ', name) + name = re.sub(r'[!.,:;_\-+/]+', ' ', name) + name = re.sub(r'\s+', ' ', name).strip() + return name + + +def normalize_quality_name(name: str) -> str: + name = normalize_exact_name(name) + name = re.sub(r'\b(?:hd|fhd|uhd|4k|orig|50)\b', ' ', name) + name = re.sub(r'\s+', ' ', name).strip() + return name + + +def is_plain_display_name(name: str) -> bool: + return '(' not in name and ')' not in name and '[' not in name and ']' not in name + + +def has_quality_token(name: str) -> bool: + return re.search(r'\b(?:hd|fhd|uhd|4k|orig|50)\b', name.casefold()) is not None + + +def build_target_id_index(channels: list[EpgChannel]) -> dict[str, set[tuple[str, str, str | None]]]: + index: dict[str, set[tuple[str, str, str | None]]] = {} + for channel in channels: + for name in channel.names: + normalized = normalize_name(name) + if normalized: + index.setdefault(normalized, set()).add((channel.channel_id, name, channel.icon)) + return index + + +def build_epg_name_index(epg_url: str) -> dict[str, set[tuple[str, str, str | None]]]: + target_channels = parse_epg_channels(epg_url) + index = build_target_id_index(target_channels) + return index + + +def rewrite_header(header: str) -> str: + header = URL_TVG_RE.sub('', header) + header = re.sub(r'\s+', ' ', header).strip() + epg_url = f'{BASE_URL.rstrip("/")}/epg.xml.gz' + return f'{header} url-tvg="{epg_url}" x-tvg-url="{epg_url}"' + + +def rewrite_tvg_id(line: str, channel: Channel, epg_name_index: dict[str, set[tuple[str, str, str | None]]]) -> tuple[str, bool]: + match = TVG_ID_RE.search(line) + if not match: + return line, False + + old_id = match.group(0)[8:-1] + channel_name = channel.name + matches = epg_name_index.get(normalize_name(channel_name), set()) + exact_channel_name = normalize_exact_name(channel_name) + exact_matches = {(target_id, target_name, icon) for target_id, target_name, icon in matches if normalize_exact_name(target_name) == exact_channel_name} + if exact_matches: + matches = exact_matches + + plain_matches = {(target_id, target_name, icon) for target_id, target_name, icon in matches if is_plain_display_name(target_name)} + if plain_matches: + matches = plain_matches + + quality_channel_name = normalize_quality_name(channel_name) + quality_matches = {(target_id, target_name, icon) for target_id, target_name, icon in matches if normalize_quality_name(target_name) == quality_channel_name} + if quality_matches: + matches = quality_matches + + no_quality_matches = {(target_id, target_name, icon) for target_id, target_name, icon in matches if not has_quality_token(target_name)} + if no_quality_matches: + matches = no_quality_matches + + target_ids = {target_id for target_id, _, _ in matches} + if len(target_ids) != 1: + return line, False + + new_id = next(iter(target_ids)) + line = TVG_ID_RE.sub(f'tvg-id="{new_id}"', line, count=1) + + icon = next((icon for target_id, _, icon in sorted(matches) if target_id == new_id and icon), None) + if icon and not channel.keep_logo: + line = replace_logo(line, icon) + + return line, True + + +def rewrite_channel_ids(channels: list[Channel], epg_name_index: dict[str, set[tuple[str, str, str | None]]]) -> list[Channel]: + rewritten: list[Channel] = [] + matched = 0 + for channel in channels: + lines = [] + for line in channel.lines: + if line.startswith('#EXTINF:'): + line, was_matched = rewrite_tvg_id(line, channel, epg_name_index) + if was_matched: + matched += 1 + lines.append(line) + rewritten.append(Channel(name=channel.name, lines=lines, keep_logo=channel.keep_logo)) + + print(f'epg matched {matched}/{len(channels)} channels', file=sys.stderr, flush=True) + return rewritten + + +def filter_playlist(url: str) -> str: + with fetch(url) as response: + text = response.read().decode('utf-8') + header, channels = parse_playlist(text) + epg_name_index = build_epg_name_index(EPG_URL) + header = rewrite_header(header) + channels = filter_channels(channels) + channels = strip_hd_suffixes(channels) + channels = rewrite_channel_ids(channels, epg_name_index) + return build_playlist(header, channels) + + +def proxy_epg(handler: BaseHTTPRequestHandler) -> None: + with fetch(EPG_URL) as response: + handler.send_response(200) + handler.send_header('Content-Type', response.headers.get('Content-Type', 'application/gzip')) + + for header in ('Content-Length', 'Last-Modified', 'ETag'): + value = response.headers.get(header) + if value: + handler.send_header(header, value) + + handler.send_header('Connection', 'close') + handler.end_headers() + + while True: + chunk = response.read(1024 * 1024) + if not chunk: + break + handler.wfile.write(chunk) + + +def playlist_url_for_path(path: str) -> str | None: + if path == '/playlist.m3u8': + return URL + + if path == '/alt.m3u8': + return ALT_URL or None + + return None + + +class PlaylistHandler(BaseHTTPRequestHandler): + def do_GET(self) -> None: + if self.path == '/epg.xml.gz': + try: + proxy_epg(self) + except Exception as error: + print(f'failed to proxy epg: {error}', file=sys.stderr, flush=True) + self.send_error(502, 'Failed to proxy upstream EPG') + return + + url = playlist_url_for_path(self.path) + if not url: + self.send_error(404, 'Not Found') + return + + try: + playlist = filter_playlist(url) + except Exception as error: + print(f'failed to build playlist: {error}', file=sys.stderr, flush=True) + self.send_error(502, 'Failed to fetch or filter upstream playlist') + return + + body = playlist.encode('utf-8') + self.send_response(200) + self.send_header('Content-Type', 'application/vnd.apple.mpegurl; charset=utf-8') + self.send_header('Content-Length', str(len(body))) + self.end_headers() + self.wfile.write(body) + + def log_message(self, format: str, *args: Any) -> None: + print(f'{self.address_string()} - {format % args}', file=sys.stderr) + + +def main() -> None: + server = ThreadingHTTPServer((HOST, PORT), PlaylistHandler) + print(f'Serving at http://{HOST}:{PORT}') + print(f'Upstream: {URL}') + if ALT_URL: + print(f'Alternative: {ALT_URL}') + server.serve_forever() + + +if __name__ == '__main__': + main()