463 lines
15 KiB
Python
463 lines
15 KiB
Python
import os
|
||
import gzip
|
||
import re
|
||
import sys
|
||
import urllib.request
|
||
import xml.etree.ElementTree as ET
|
||
from dataclasses import dataclass
|
||
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
||
from http.client import HTTPResponse
|
||
from typing import Any
|
||
|
||
|
||
URL: str = os.getenv('URL', '')
|
||
ALT_URL: str = os.getenv('ALT_URL', '')
|
||
BASE_URL: str = os.getenv('BASE_URL', '')
|
||
EPG_URL: str = os.getenv('EPG_URL', '')
|
||
HOST: str = '0.0.0.0'
|
||
PORT: int = int(os.getenv('PORT', '8080'))
|
||
TIMESHIFT_RE = re.compile(r'\(\+\d+\)')
|
||
TVG_ID_RE = re.compile(r'tvg-id="[^"]*"')
|
||
TVG_LOGO_RE = re.compile(r'\s*tvg-logo="[^"]*"')
|
||
URL_TVG_RE = re.compile(r'\b(?:url-tvg|x-tvg-url)="[^"]*"')
|
||
EXCLUDED_GROUPS: set[str] = {
|
||
'Грузия',
|
||
'Турция',
|
||
'Армения',
|
||
'Латвия',
|
||
'Германия',
|
||
'Литва',
|
||
'Эстония',
|
||
'Азербайджан',
|
||
'Казахстан',
|
||
'Молдова',
|
||
'Узбекистан',
|
||
'Израиль',
|
||
'Польша',
|
||
'Таджикистан',
|
||
'Другие (тест)',
|
||
'Саудовская Аравия',
|
||
}
|
||
|
||
if not URL:
|
||
print("Missing upstream URL", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
if not BASE_URL:
|
||
print("Missing base URL", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
if not EPG_URL:
|
||
print("Missing EPG URL", file=sys.stderr)
|
||
sys.exit(1)
|
||
|
||
|
||
@dataclass
|
||
class Channel:
|
||
name: str
|
||
lines: list[str]
|
||
keep_logo: bool = False
|
||
|
||
|
||
@dataclass
|
||
class EpgChannel:
|
||
channel_id: str
|
||
names: list[str]
|
||
icon: str | None
|
||
|
||
|
||
def fetch(url: str) -> HTTPResponse:
|
||
request = urllib.request.Request(url, headers={'User-Agent': 'curl/8.23.9'})
|
||
return urllib.request.urlopen(request, timeout=60)
|
||
|
||
|
||
def parse_playlist(text: str) -> tuple[str, list[Channel]]:
|
||
lines = text.splitlines()
|
||
if not lines or not lines[0].startswith('#EXTM3U'):
|
||
raise ValueError('Playlist must start with #EXTM3U')
|
||
|
||
header = lines[0]
|
||
channels = []
|
||
current = None
|
||
|
||
for line in lines[1:]:
|
||
if line.startswith('#EXTINF:'):
|
||
if current is not None:
|
||
channels.append(current)
|
||
name = line.split(',', 1)[1].strip() if ',' in line else ''
|
||
current = Channel(name=name, lines=[line])
|
||
continue
|
||
|
||
if current is None:
|
||
continue
|
||
|
||
current.lines.append(line)
|
||
if line and not line.startswith('#'):
|
||
channels.append(current)
|
||
current = None
|
||
|
||
if current is not None:
|
||
channels.append(current)
|
||
|
||
return header, channels
|
||
|
||
|
||
def filter_channels(channels: list[Channel]) -> list[Channel]:
|
||
by_name = {channel.name: channel for channel in channels}
|
||
replaced_hd_names: set[str] = set()
|
||
kept = []
|
||
|
||
for channel in channels:
|
||
if channel_group(channel) in EXCLUDED_GROUPS:
|
||
continue
|
||
|
||
if TIMESHIFT_RE.search(channel.name):
|
||
continue
|
||
|
||
hd_name = f'{channel.name} HD'
|
||
if hd_name in by_name:
|
||
hd_channel = by_name[hd_name]
|
||
if not TIMESHIFT_RE.search(hd_channel.name) and channel_group(hd_channel) not in EXCLUDED_GROUPS:
|
||
kept.append(Channel(name=hd_channel.name, lines=with_logo_from(hd_channel.lines, channel.lines), keep_logo=True))
|
||
replaced_hd_names.add(hd_name)
|
||
else:
|
||
kept.append(channel)
|
||
continue
|
||
|
||
if channel.name == 'Первый канал' and 'Первый HD' in by_name:
|
||
hd_channel = by_name['Первый HD']
|
||
if channel_group(hd_channel) not in EXCLUDED_GROUPS:
|
||
kept.append(Channel(name=hd_channel.name, lines=with_logo_from(hd_channel.lines, channel.lines), keep_logo=True))
|
||
replaced_hd_names.add('Первый HD')
|
||
else:
|
||
kept.append(channel)
|
||
continue
|
||
|
||
if channel.name in replaced_hd_names:
|
||
continue
|
||
|
||
kept.append(channel)
|
||
|
||
return kept
|
||
|
||
|
||
def channel_group(channel: Channel) -> str:
|
||
for line in channel.lines:
|
||
if not line.startswith('#EXTINF:'):
|
||
continue
|
||
|
||
match = re.search(r'group-title="([^"]*)"', line)
|
||
if match:
|
||
return match.group(1)
|
||
|
||
return ''
|
||
|
||
|
||
def extinf_logo(lines: list[str]) -> str | None:
|
||
for line in lines:
|
||
if not line.startswith('#EXTINF:'):
|
||
continue
|
||
|
||
match = TVG_LOGO_RE.search(line)
|
||
if match:
|
||
return match.group(0).split('="', 1)[1][:-1]
|
||
|
||
return None
|
||
|
||
|
||
def replace_logo(line: str, logo_url: str) -> str:
|
||
if TVG_LOGO_RE.search(line):
|
||
return TVG_LOGO_RE.sub(f' tvg-logo="{logo_url}"', line, count=1)
|
||
|
||
if ',' not in line:
|
||
return line
|
||
|
||
prefix, name = line.rsplit(',', 1)
|
||
return f'{prefix} tvg-logo="{logo_url}",{name}'
|
||
|
||
|
||
def with_logo_from(target_lines: list[str], source_lines: list[str]) -> list[str]:
|
||
logo = extinf_logo(source_lines)
|
||
if not logo:
|
||
return list(target_lines)
|
||
|
||
return [replace_logo(line, logo) if line.startswith('#EXTINF:') else line for line in target_lines]
|
||
|
||
|
||
def strip_hd_suffix(channel: Channel) -> Channel:
|
||
if not channel.name.endswith(' HD'):
|
||
return channel
|
||
|
||
new_name = 'Первый канал' if channel.name == 'Первый HD' else channel.name[:-3]
|
||
new_lines = list(channel.lines)
|
||
for index, line in enumerate(new_lines):
|
||
if line.startswith('#EXTINF:') and ',' in line:
|
||
prefix, _ = line.rsplit(',', 1)
|
||
new_lines[index] = f'{prefix},{new_name}'
|
||
break
|
||
|
||
return Channel(name=new_name, lines=new_lines, keep_logo=channel.keep_logo)
|
||
|
||
|
||
def strip_hd_suffixes(channels: list[Channel]) -> list[Channel]:
|
||
return [strip_hd_suffix(channel) for channel in channels]
|
||
|
||
|
||
def build_playlist(header: str, channels: list[Channel]) -> str:
|
||
lines = [header]
|
||
for channel in channels:
|
||
lines.extend(channel.lines)
|
||
return '\n'.join(lines) + '\n'
|
||
|
||
|
||
def local_name(tag: str) -> str:
|
||
return tag.rsplit('}', 1)[-1]
|
||
|
||
|
||
def parse_epg_channels(url: str) -> list[EpgChannel]:
|
||
channels: list[EpgChannel] = []
|
||
with fetch(url) as response:
|
||
with gzip.GzipFile(fileobj=response) as gz:
|
||
current_id: str | None = None
|
||
current_names: list[str] = []
|
||
current_icon: str | None = None
|
||
|
||
for event, elem in ET.iterparse(gz, events=('start', 'end')):
|
||
tag = local_name(elem.tag)
|
||
|
||
if event == 'start' and tag == 'programme':
|
||
elem.clear()
|
||
break
|
||
|
||
if event == 'start' and tag == 'channel':
|
||
current_id = elem.attrib.get('id', '').strip()
|
||
current_names = []
|
||
current_icon = None
|
||
continue
|
||
|
||
if event == 'start' and tag == 'icon' and current_id is not None:
|
||
current_icon = elem.attrib.get('src') or current_icon
|
||
continue
|
||
|
||
if event == 'end' and tag == 'display-name' and current_id is not None:
|
||
if elem.text and elem.text.strip():
|
||
current_names.append(elem.text.strip())
|
||
elem.clear()
|
||
continue
|
||
|
||
if event == 'end' and tag == 'channel':
|
||
names = list(dict.fromkeys(current_names))
|
||
if current_id and names:
|
||
channels.append(EpgChannel(channel_id=current_id, names=names, icon=current_icon))
|
||
current_id = None
|
||
current_names = []
|
||
current_icon = None
|
||
elem.clear()
|
||
continue
|
||
|
||
if event == 'end':
|
||
elem.clear()
|
||
|
||
return channels
|
||
|
||
|
||
def normalize_name(name: str) -> str:
|
||
name = name.casefold().replace('ё', 'е')
|
||
name = re.sub(r'\[[^\]]*\]', ' ', name)
|
||
name = re.sub(r'\([^)]*\)', ' ', name)
|
||
name = re.sub(r'[!.,:;_\-+/]+', ' ', name)
|
||
name = re.sub(r'\b(?:hd|fhd|uhd|4k|orig|50|tv|тв|канал|channel)\b', ' ', name)
|
||
name = re.sub(r'\s+', ' ', name).strip()
|
||
return name
|
||
|
||
|
||
def normalize_exact_name(name: str) -> str:
|
||
name = name.casefold().replace('ё', 'е')
|
||
name = re.sub(r'\[[^\]]*\]', ' ', name)
|
||
name = re.sub(r'\([^)]*\)', ' ', name)
|
||
name = re.sub(r'[!.,:;_\-+/]+', ' ', name)
|
||
name = re.sub(r'\s+', ' ', name).strip()
|
||
return name
|
||
|
||
|
||
def normalize_quality_name(name: str) -> str:
|
||
name = normalize_exact_name(name)
|
||
name = re.sub(r'\b(?:hd|fhd|uhd|4k|orig|50)\b', ' ', name)
|
||
name = re.sub(r'\s+', ' ', name).strip()
|
||
return name
|
||
|
||
|
||
def is_plain_display_name(name: str) -> bool:
|
||
return '(' not in name and ')' not in name and '[' not in name and ']' not in name
|
||
|
||
|
||
def has_quality_token(name: str) -> bool:
|
||
return re.search(r'\b(?:hd|fhd|uhd|4k|orig|50)\b', name.casefold()) is not None
|
||
|
||
|
||
def build_target_id_index(channels: list[EpgChannel]) -> dict[str, set[tuple[str, str, str | None]]]:
|
||
index: dict[str, set[tuple[str, str, str | None]]] = {}
|
||
for channel in channels:
|
||
for name in channel.names:
|
||
normalized = normalize_name(name)
|
||
if normalized:
|
||
index.setdefault(normalized, set()).add((channel.channel_id, name, channel.icon))
|
||
return index
|
||
|
||
|
||
def build_epg_name_index(epg_url: str) -> dict[str, set[tuple[str, str, str | None]]]:
|
||
target_channels = parse_epg_channels(epg_url)
|
||
index = build_target_id_index(target_channels)
|
||
return index
|
||
|
||
|
||
def rewrite_header(header: str) -> str:
|
||
header = URL_TVG_RE.sub('', header)
|
||
header = re.sub(r'\s+', ' ', header).strip()
|
||
epg_url = f'{BASE_URL.rstrip("/")}/epg.xml.gz'
|
||
return f'{header} url-tvg="{epg_url}" x-tvg-url="{epg_url}"'
|
||
|
||
|
||
def rewrite_tvg_id(line: str, channel: Channel, epg_name_index: dict[str, set[tuple[str, str, str | None]]]) -> tuple[str, bool]:
|
||
match = TVG_ID_RE.search(line)
|
||
if not match:
|
||
return line, False
|
||
|
||
old_id = match.group(0)[8:-1]
|
||
channel_name = channel.name
|
||
matches = epg_name_index.get(normalize_name(channel_name), set())
|
||
exact_channel_name = normalize_exact_name(channel_name)
|
||
exact_matches = {(target_id, target_name, icon) for target_id, target_name, icon in matches if normalize_exact_name(target_name) == exact_channel_name}
|
||
if exact_matches:
|
||
matches = exact_matches
|
||
|
||
plain_matches = {(target_id, target_name, icon) for target_id, target_name, icon in matches if is_plain_display_name(target_name)}
|
||
if plain_matches:
|
||
matches = plain_matches
|
||
|
||
quality_channel_name = normalize_quality_name(channel_name)
|
||
quality_matches = {(target_id, target_name, icon) for target_id, target_name, icon in matches if normalize_quality_name(target_name) == quality_channel_name}
|
||
if quality_matches:
|
||
matches = quality_matches
|
||
|
||
no_quality_matches = {(target_id, target_name, icon) for target_id, target_name, icon in matches if not has_quality_token(target_name)}
|
||
if no_quality_matches:
|
||
matches = no_quality_matches
|
||
|
||
target_ids = {target_id for target_id, _, _ in matches}
|
||
if len(target_ids) != 1:
|
||
return line, False
|
||
|
||
new_id = next(iter(target_ids))
|
||
line = TVG_ID_RE.sub(f'tvg-id="{new_id}"', line, count=1)
|
||
|
||
icon = next((icon for target_id, _, icon in sorted(matches) if target_id == new_id and icon), None)
|
||
if icon and not channel.keep_logo:
|
||
line = replace_logo(line, icon)
|
||
|
||
return line, True
|
||
|
||
|
||
def rewrite_channel_ids(channels: list[Channel], epg_name_index: dict[str, set[tuple[str, str, str | None]]]) -> list[Channel]:
|
||
rewritten: list[Channel] = []
|
||
matched = 0
|
||
for channel in channels:
|
||
lines = []
|
||
for line in channel.lines:
|
||
if line.startswith('#EXTINF:'):
|
||
line, was_matched = rewrite_tvg_id(line, channel, epg_name_index)
|
||
if was_matched:
|
||
matched += 1
|
||
lines.append(line)
|
||
rewritten.append(Channel(name=channel.name, lines=lines, keep_logo=channel.keep_logo))
|
||
|
||
print(f'epg matched {matched}/{len(channels)} channels', file=sys.stderr, flush=True)
|
||
return rewritten
|
||
|
||
|
||
def filter_playlist(url: str) -> str:
|
||
with fetch(url) as response:
|
||
text = response.read().decode('utf-8')
|
||
header, channels = parse_playlist(text)
|
||
epg_name_index = build_epg_name_index(EPG_URL)
|
||
header = rewrite_header(header)
|
||
channels = filter_channels(channels)
|
||
channels = strip_hd_suffixes(channels)
|
||
channels = rewrite_channel_ids(channels, epg_name_index)
|
||
return build_playlist(header, channels)
|
||
|
||
|
||
def proxy_epg(handler: BaseHTTPRequestHandler) -> None:
|
||
with fetch(EPG_URL) as response:
|
||
handler.send_response(200)
|
||
handler.send_header('Content-Type', response.headers.get('Content-Type', 'application/gzip'))
|
||
|
||
for header in ('Content-Length', 'Last-Modified', 'ETag'):
|
||
value = response.headers.get(header)
|
||
if value:
|
||
handler.send_header(header, value)
|
||
|
||
handler.send_header('Connection', 'close')
|
||
handler.end_headers()
|
||
|
||
while True:
|
||
chunk = response.read(1024 * 1024)
|
||
if not chunk:
|
||
break
|
||
handler.wfile.write(chunk)
|
||
|
||
|
||
def playlist_url_for_path(path: str) -> str | None:
|
||
if path == '/playlist.m3u8':
|
||
return URL
|
||
|
||
if path == '/alt.m3u8':
|
||
return ALT_URL or None
|
||
|
||
return None
|
||
|
||
|
||
class PlaylistHandler(BaseHTTPRequestHandler):
|
||
def do_GET(self) -> None:
|
||
if self.path == '/epg.xml.gz':
|
||
try:
|
||
proxy_epg(self)
|
||
except Exception as error:
|
||
print(f'failed to proxy epg: {error}', file=sys.stderr, flush=True)
|
||
self.send_error(502, 'Failed to proxy upstream EPG')
|
||
return
|
||
|
||
url = playlist_url_for_path(self.path)
|
||
if not url:
|
||
self.send_error(404, 'Not Found')
|
||
return
|
||
|
||
try:
|
||
playlist = filter_playlist(url)
|
||
except Exception as error:
|
||
print(f'failed to build playlist: {error}', file=sys.stderr, flush=True)
|
||
self.send_error(502, 'Failed to fetch or filter upstream playlist')
|
||
return
|
||
|
||
body = playlist.encode('utf-8')
|
||
self.send_response(200)
|
||
self.send_header('Content-Type', 'application/vnd.apple.mpegurl; charset=utf-8')
|
||
self.send_header('Content-Length', str(len(body)))
|
||
self.end_headers()
|
||
self.wfile.write(body)
|
||
|
||
def log_message(self, format: str, *args: Any) -> None:
|
||
print(f'{self.address_string()} - {format % args}', file=sys.stderr)
|
||
|
||
|
||
def main() -> None:
|
||
server = ThreadingHTTPServer((HOST, PORT), PlaylistHandler)
|
||
print(f'Serving at http://{HOST}:{PORT}')
|
||
print(f'Upstream: {URL}')
|
||
if ALT_URL:
|
||
print(f'Alternative: {ALT_URL}')
|
||
server.serve_forever()
|
||
|
||
|
||
if __name__ == '__main__':
|
||
main()
|