import os import gzip import re import sys import urllib.request import xml.etree.ElementTree as ET from dataclasses import dataclass from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from http.client import HTTPResponse from typing import Any URL: str = os.getenv('URL', '') ALT_URL: str = os.getenv('ALT_URL', '') BASE_URL: str = os.getenv('BASE_URL', '') EPG_URL: str = os.getenv('EPG_URL', '') HOST: str = '0.0.0.0' PORT: int = int(os.getenv('PORT', '8080')) TIMESHIFT_RE = re.compile(r'\(\+\d+\)') TVG_ID_RE = re.compile(r'tvg-id="[^"]*"') TVG_LOGO_RE = re.compile(r'\s*tvg-logo="[^"]*"') URL_TVG_RE = re.compile(r'\b(?:url-tvg|x-tvg-url)="[^"]*"') EXCLUDED_GROUPS: set[str] = { 'Грузия', 'Турция', 'Армения', 'Латвия', 'Германия', 'Литва', 'Эстония', 'Азербайджан', 'Казахстан', 'Молдова', 'Узбекистан', 'Израиль', 'Польша', 'Таджикистан', 'Другие (тест)', 'Саудовская Аравия', } if not URL: print("Missing upstream URL", file=sys.stderr) sys.exit(1) if not BASE_URL: print("Missing base URL", file=sys.stderr) sys.exit(1) if not EPG_URL: print("Missing EPG URL", file=sys.stderr) sys.exit(1) @dataclass class Channel: name: str lines: list[str] keep_logo: bool = False @dataclass class EpgChannel: channel_id: str names: list[str] icon: str | None def fetch(url: str) -> HTTPResponse: request = urllib.request.Request(url, headers={'User-Agent': 'curl/8.23.9'}) return urllib.request.urlopen(request, timeout=60) def parse_playlist(text: str) -> tuple[str, list[Channel]]: lines = text.splitlines() if not lines or not lines[0].startswith('#EXTM3U'): raise ValueError('Playlist must start with #EXTM3U') header = lines[0] channels = [] current = None for line in lines[1:]: if line.startswith('#EXTINF:'): if current is not None: channels.append(current) name = line.split(',', 1)[1].strip() if ',' in line else '' current = Channel(name=name, lines=[line]) continue if current is None: continue current.lines.append(line) if line and not line.startswith('#'): channels.append(current) current = None if current is not None: channels.append(current) return header, channels def filter_channels(channels: list[Channel]) -> list[Channel]: by_name = {channel.name: channel for channel in channels} replaced_hd_names: set[str] = set() kept = [] for channel in channels: if channel_group(channel) in EXCLUDED_GROUPS: continue if TIMESHIFT_RE.search(channel.name): continue hd_name = f'{channel.name} HD' if hd_name in by_name: hd_channel = by_name[hd_name] if not TIMESHIFT_RE.search(hd_channel.name) and channel_group(hd_channel) not in EXCLUDED_GROUPS: kept.append(Channel(name=hd_channel.name, lines=with_logo_from(hd_channel.lines, channel.lines), keep_logo=True)) replaced_hd_names.add(hd_name) else: kept.append(channel) continue if channel.name == 'Первый канал' and 'Первый HD' in by_name: hd_channel = by_name['Первый HD'] if channel_group(hd_channel) not in EXCLUDED_GROUPS: kept.append(Channel(name=hd_channel.name, lines=with_logo_from(hd_channel.lines, channel.lines), keep_logo=True)) replaced_hd_names.add('Первый HD') else: kept.append(channel) continue if channel.name in replaced_hd_names: continue kept.append(channel) return kept def channel_group(channel: Channel) -> str: for line in channel.lines: if not line.startswith('#EXTINF:'): continue match = re.search(r'group-title="([^"]*)"', line) if match: return match.group(1) return '' def extinf_logo(lines: list[str]) -> str | None: for line in lines: if not line.startswith('#EXTINF:'): continue match = TVG_LOGO_RE.search(line) if match: return match.group(0).split('="', 1)[1][:-1] return None def replace_logo(line: str, logo_url: str) -> str: if TVG_LOGO_RE.search(line): return TVG_LOGO_RE.sub(f' tvg-logo="{logo_url}"', line, count=1) if ',' not in line: return line prefix, name = line.rsplit(',', 1) return f'{prefix} tvg-logo="{logo_url}",{name}' def with_logo_from(target_lines: list[str], source_lines: list[str]) -> list[str]: logo = extinf_logo(source_lines) if not logo: return list(target_lines) return [replace_logo(line, logo) if line.startswith('#EXTINF:') else line for line in target_lines] def strip_hd_suffix(channel: Channel) -> Channel: if not channel.name.endswith(' HD'): return channel new_name = 'Первый канал' if channel.name == 'Первый HD' else channel.name[:-3] new_lines = list(channel.lines) for index, line in enumerate(new_lines): if line.startswith('#EXTINF:') and ',' in line: prefix, _ = line.rsplit(',', 1) new_lines[index] = f'{prefix},{new_name}' break return Channel(name=new_name, lines=new_lines, keep_logo=channel.keep_logo) def strip_hd_suffixes(channels: list[Channel]) -> list[Channel]: return [strip_hd_suffix(channel) for channel in channels] def build_playlist(header: str, channels: list[Channel]) -> str: lines = [header] for channel in channels: lines.extend(channel.lines) return '\n'.join(lines) + '\n' def local_name(tag: str) -> str: return tag.rsplit('}', 1)[-1] def parse_epg_channels(url: str) -> list[EpgChannel]: channels: list[EpgChannel] = [] with fetch(url) as response: with gzip.GzipFile(fileobj=response) as gz: current_id: str | None = None current_names: list[str] = [] current_icon: str | None = None for event, elem in ET.iterparse(gz, events=('start', 'end')): tag = local_name(elem.tag) if event == 'start' and tag == 'programme': elem.clear() break if event == 'start' and tag == 'channel': current_id = elem.attrib.get('id', '').strip() current_names = [] current_icon = None continue if event == 'start' and tag == 'icon' and current_id is not None: current_icon = elem.attrib.get('src') or current_icon continue if event == 'end' and tag == 'display-name' and current_id is not None: if elem.text and elem.text.strip(): current_names.append(elem.text.strip()) elem.clear() continue if event == 'end' and tag == 'channel': names = list(dict.fromkeys(current_names)) if current_id and names: channels.append(EpgChannel(channel_id=current_id, names=names, icon=current_icon)) current_id = None current_names = [] current_icon = None elem.clear() continue if event == 'end': elem.clear() return channels def normalize_name(name: str) -> str: name = name.casefold().replace('ё', 'е') name = re.sub(r'\[[^\]]*\]', ' ', name) name = re.sub(r'\([^)]*\)', ' ', name) name = re.sub(r'[!.,:;_\-+/]+', ' ', name) name = re.sub(r'\b(?:hd|fhd|uhd|4k|orig|50|tv|тв|канал|channel)\b', ' ', name) name = re.sub(r'\s+', ' ', name).strip() return name def normalize_exact_name(name: str) -> str: name = name.casefold().replace('ё', 'е') name = re.sub(r'\[[^\]]*\]', ' ', name) name = re.sub(r'\([^)]*\)', ' ', name) name = re.sub(r'[!.,:;_\-+/]+', ' ', name) name = re.sub(r'\s+', ' ', name).strip() return name def normalize_quality_name(name: str) -> str: name = normalize_exact_name(name) name = re.sub(r'\b(?:hd|fhd|uhd|4k|orig|50)\b', ' ', name) name = re.sub(r'\s+', ' ', name).strip() return name def is_plain_display_name(name: str) -> bool: return '(' not in name and ')' not in name and '[' not in name and ']' not in name def has_quality_token(name: str) -> bool: return re.search(r'\b(?:hd|fhd|uhd|4k|orig|50)\b', name.casefold()) is not None def build_target_id_index(channels: list[EpgChannel]) -> dict[str, set[tuple[str, str, str | None]]]: index: dict[str, set[tuple[str, str, str | None]]] = {} for channel in channels: for name in channel.names: normalized = normalize_name(name) if normalized: index.setdefault(normalized, set()).add((channel.channel_id, name, channel.icon)) return index def build_epg_name_index(epg_url: str) -> dict[str, set[tuple[str, str, str | None]]]: target_channels = parse_epg_channels(epg_url) index = build_target_id_index(target_channels) return index def rewrite_header(header: str) -> str: header = URL_TVG_RE.sub('', header) header = re.sub(r'\s+', ' ', header).strip() epg_url = f'{BASE_URL.rstrip("/")}/epg.xml.gz' return f'{header} url-tvg="{epg_url}" x-tvg-url="{epg_url}"' def rewrite_tvg_id(line: str, channel: Channel, epg_name_index: dict[str, set[tuple[str, str, str | None]]]) -> tuple[str, bool]: match = TVG_ID_RE.search(line) if not match: return line, False old_id = match.group(0)[8:-1] channel_name = channel.name matches = epg_name_index.get(normalize_name(channel_name), set()) exact_channel_name = normalize_exact_name(channel_name) exact_matches = {(target_id, target_name, icon) for target_id, target_name, icon in matches if normalize_exact_name(target_name) == exact_channel_name} if exact_matches: matches = exact_matches plain_matches = {(target_id, target_name, icon) for target_id, target_name, icon in matches if is_plain_display_name(target_name)} if plain_matches: matches = plain_matches quality_channel_name = normalize_quality_name(channel_name) quality_matches = {(target_id, target_name, icon) for target_id, target_name, icon in matches if normalize_quality_name(target_name) == quality_channel_name} if quality_matches: matches = quality_matches no_quality_matches = {(target_id, target_name, icon) for target_id, target_name, icon in matches if not has_quality_token(target_name)} if no_quality_matches: matches = no_quality_matches target_ids = {target_id for target_id, _, _ in matches} if len(target_ids) != 1: return line, False new_id = next(iter(target_ids)) line = TVG_ID_RE.sub(f'tvg-id="{new_id}"', line, count=1) icon = next((icon for target_id, _, icon in sorted(matches) if target_id == new_id and icon), None) if icon and not channel.keep_logo: line = replace_logo(line, icon) return line, True def rewrite_channel_ids(channels: list[Channel], epg_name_index: dict[str, set[tuple[str, str, str | None]]]) -> list[Channel]: rewritten: list[Channel] = [] matched = 0 for channel in channels: lines = [] for line in channel.lines: if line.startswith('#EXTINF:'): line, was_matched = rewrite_tvg_id(line, channel, epg_name_index) if was_matched: matched += 1 lines.append(line) rewritten.append(Channel(name=channel.name, lines=lines, keep_logo=channel.keep_logo)) print(f'epg matched {matched}/{len(channels)} channels', file=sys.stderr, flush=True) return rewritten def filter_playlist(url: str) -> str: with fetch(url) as response: text = response.read().decode('utf-8') header, channels = parse_playlist(text) epg_name_index = build_epg_name_index(EPG_URL) header = rewrite_header(header) channels = filter_channels(channels) channels = strip_hd_suffixes(channels) channels = rewrite_channel_ids(channels, epg_name_index) return build_playlist(header, channels) def proxy_epg(handler: BaseHTTPRequestHandler) -> None: with fetch(EPG_URL) as response: handler.send_response(200) handler.send_header('Content-Type', response.headers.get('Content-Type', 'application/gzip')) for header in ('Content-Length', 'Last-Modified', 'ETag'): value = response.headers.get(header) if value: handler.send_header(header, value) handler.send_header('Connection', 'close') handler.end_headers() while True: chunk = response.read(1024 * 1024) if not chunk: break handler.wfile.write(chunk) def playlist_url_for_path(path: str) -> str | None: if path == '/playlist.m3u8': return URL if path == '/alt.m3u8': return ALT_URL or None return None class PlaylistHandler(BaseHTTPRequestHandler): def do_GET(self) -> None: if self.path == '/epg.xml.gz': try: proxy_epg(self) except Exception as error: print(f'failed to proxy epg: {error}', file=sys.stderr, flush=True) self.send_error(502, 'Failed to proxy upstream EPG') return url = playlist_url_for_path(self.path) if not url: self.send_error(404, 'Not Found') return try: playlist = filter_playlist(url) except Exception as error: print(f'failed to build playlist: {error}', file=sys.stderr, flush=True) self.send_error(502, 'Failed to fetch or filter upstream playlist') return body = playlist.encode('utf-8') self.send_response(200) self.send_header('Content-Type', 'application/vnd.apple.mpegurl; charset=utf-8') self.send_header('Content-Length', str(len(body))) self.end_headers() self.wfile.write(body) def log_message(self, format: str, *args: Any) -> None: print(f'{self.address_string()} - {format % args}', file=sys.stderr) def main() -> None: server = ThreadingHTTPServer((HOST, PORT), PlaylistHandler) print(f'Serving at http://{HOST}:{PORT}') print(f'Upstream: {URL}') if ALT_URL: print(f'Alternative: {ALT_URL}') server.serve_forever() if __name__ == '__main__': main()