1
0
Fork 0
iptv/main.py
2026-05-03 14:23:08 +03:00

463 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import gzip
import re
import sys
import urllib.request
import xml.etree.ElementTree as ET
from dataclasses import dataclass
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from http.client import HTTPResponse
from typing import Any
URL: str = os.getenv('URL', '')
ALT_URL: str = os.getenv('ALT_URL', '')
BASE_URL: str = os.getenv('BASE_URL', '')
EPG_URL: str = os.getenv('EPG_URL', '')
HOST: str = '0.0.0.0'
PORT: int = int(os.getenv('PORT', '8080'))
TIMESHIFT_RE = re.compile(r'\(\+\d+\)')
TVG_ID_RE = re.compile(r'tvg-id="[^"]*"')
TVG_LOGO_RE = re.compile(r'\s*tvg-logo="[^"]*"')
URL_TVG_RE = re.compile(r'\b(?:url-tvg|x-tvg-url)="[^"]*"')
EXCLUDED_GROUPS: set[str] = {
'Грузия',
'Турция',
'Армения',
'Латвия',
'Германия',
'Литва',
'Эстония',
'Азербайджан',
'Казахстан',
'Молдова',
'Узбекистан',
'Израиль',
'Польша',
'Таджикистан',
'Другие (тест)',
'Саудовская Аравия',
}
if not URL:
print("Missing upstream URL", file=sys.stderr)
sys.exit(1)
if not BASE_URL:
print("Missing base URL", file=sys.stderr)
sys.exit(1)
if not EPG_URL:
print("Missing EPG URL", file=sys.stderr)
sys.exit(1)
@dataclass
class Channel:
name: str
lines: list[str]
keep_logo: bool = False
@dataclass
class EpgChannel:
channel_id: str
names: list[str]
icon: str | None
def fetch(url: str) -> HTTPResponse:
request = urllib.request.Request(url, headers={'User-Agent': 'curl/8.23.9'})
return urllib.request.urlopen(request, timeout=60)
def parse_playlist(text: str) -> tuple[str, list[Channel]]:
lines = text.splitlines()
if not lines or not lines[0].startswith('#EXTM3U'):
raise ValueError('Playlist must start with #EXTM3U')
header = lines[0]
channels = []
current = None
for line in lines[1:]:
if line.startswith('#EXTINF:'):
if current is not None:
channels.append(current)
name = line.split(',', 1)[1].strip() if ',' in line else ''
current = Channel(name=name, lines=[line])
continue
if current is None:
continue
current.lines.append(line)
if line and not line.startswith('#'):
channels.append(current)
current = None
if current is not None:
channels.append(current)
return header, channels
def filter_channels(channels: list[Channel]) -> list[Channel]:
by_name = {channel.name: channel for channel in channels}
replaced_hd_names: set[str] = set()
kept = []
for channel in channels:
if channel_group(channel) in EXCLUDED_GROUPS:
continue
if TIMESHIFT_RE.search(channel.name):
continue
hd_name = f'{channel.name} HD'
if hd_name in by_name:
hd_channel = by_name[hd_name]
if not TIMESHIFT_RE.search(hd_channel.name) and channel_group(hd_channel) not in EXCLUDED_GROUPS:
kept.append(Channel(name=hd_channel.name, lines=with_logo_from(hd_channel.lines, channel.lines), keep_logo=True))
replaced_hd_names.add(hd_name)
else:
kept.append(channel)
continue
if channel.name == 'Первый канал' and 'Первый HD' in by_name:
hd_channel = by_name['Первый HD']
if channel_group(hd_channel) not in EXCLUDED_GROUPS:
kept.append(Channel(name=hd_channel.name, lines=with_logo_from(hd_channel.lines, channel.lines), keep_logo=True))
replaced_hd_names.add('Первый HD')
else:
kept.append(channel)
continue
if channel.name in replaced_hd_names:
continue
kept.append(channel)
return kept
def channel_group(channel: Channel) -> str:
for line in channel.lines:
if not line.startswith('#EXTINF:'):
continue
match = re.search(r'group-title="([^"]*)"', line)
if match:
return match.group(1)
return ''
def extinf_logo(lines: list[str]) -> str | None:
for line in lines:
if not line.startswith('#EXTINF:'):
continue
match = TVG_LOGO_RE.search(line)
if match:
return match.group(0).split('="', 1)[1][:-1]
return None
def replace_logo(line: str, logo_url: str) -> str:
if TVG_LOGO_RE.search(line):
return TVG_LOGO_RE.sub(f' tvg-logo="{logo_url}"', line, count=1)
if ',' not in line:
return line
prefix, name = line.rsplit(',', 1)
return f'{prefix} tvg-logo="{logo_url}",{name}'
def with_logo_from(target_lines: list[str], source_lines: list[str]) -> list[str]:
logo = extinf_logo(source_lines)
if not logo:
return list(target_lines)
return [replace_logo(line, logo) if line.startswith('#EXTINF:') else line for line in target_lines]
def strip_hd_suffix(channel: Channel) -> Channel:
if not channel.name.endswith(' HD'):
return channel
new_name = 'Первый канал' if channel.name == 'Первый HD' else channel.name[:-3]
new_lines = list(channel.lines)
for index, line in enumerate(new_lines):
if line.startswith('#EXTINF:') and ',' in line:
prefix, _ = line.rsplit(',', 1)
new_lines[index] = f'{prefix},{new_name}'
break
return Channel(name=new_name, lines=new_lines, keep_logo=channel.keep_logo)
def strip_hd_suffixes(channels: list[Channel]) -> list[Channel]:
return [strip_hd_suffix(channel) for channel in channels]
def build_playlist(header: str, channels: list[Channel]) -> str:
lines = [header]
for channel in channels:
lines.extend(channel.lines)
return '\n'.join(lines) + '\n'
def local_name(tag: str) -> str:
return tag.rsplit('}', 1)[-1]
def parse_epg_channels(url: str) -> list[EpgChannel]:
channels: list[EpgChannel] = []
with fetch(url) as response:
with gzip.GzipFile(fileobj=response) as gz:
current_id: str | None = None
current_names: list[str] = []
current_icon: str | None = None
for event, elem in ET.iterparse(gz, events=('start', 'end')):
tag = local_name(elem.tag)
if event == 'start' and tag == 'programme':
elem.clear()
break
if event == 'start' and tag == 'channel':
current_id = elem.attrib.get('id', '').strip()
current_names = []
current_icon = None
continue
if event == 'start' and tag == 'icon' and current_id is not None:
current_icon = elem.attrib.get('src') or current_icon
continue
if event == 'end' and tag == 'display-name' and current_id is not None:
if elem.text and elem.text.strip():
current_names.append(elem.text.strip())
elem.clear()
continue
if event == 'end' and tag == 'channel':
names = list(dict.fromkeys(current_names))
if current_id and names:
channels.append(EpgChannel(channel_id=current_id, names=names, icon=current_icon))
current_id = None
current_names = []
current_icon = None
elem.clear()
continue
if event == 'end':
elem.clear()
return channels
def normalize_name(name: str) -> str:
name = name.casefold().replace('ё', 'е')
name = re.sub(r'\[[^\]]*\]', ' ', name)
name = re.sub(r'\([^)]*\)', ' ', name)
name = re.sub(r'[!.,:;_\-+/]+', ' ', name)
name = re.sub(r'\b(?:hd|fhd|uhd|4k|orig|50|tv|тв|канал|channel)\b', ' ', name)
name = re.sub(r'\s+', ' ', name).strip()
return name
def normalize_exact_name(name: str) -> str:
name = name.casefold().replace('ё', 'е')
name = re.sub(r'\[[^\]]*\]', ' ', name)
name = re.sub(r'\([^)]*\)', ' ', name)
name = re.sub(r'[!.,:;_\-+/]+', ' ', name)
name = re.sub(r'\s+', ' ', name).strip()
return name
def normalize_quality_name(name: str) -> str:
name = normalize_exact_name(name)
name = re.sub(r'\b(?:hd|fhd|uhd|4k|orig|50)\b', ' ', name)
name = re.sub(r'\s+', ' ', name).strip()
return name
def is_plain_display_name(name: str) -> bool:
return '(' not in name and ')' not in name and '[' not in name and ']' not in name
def has_quality_token(name: str) -> bool:
return re.search(r'\b(?:hd|fhd|uhd|4k|orig|50)\b', name.casefold()) is not None
def build_target_id_index(channels: list[EpgChannel]) -> dict[str, set[tuple[str, str, str | None]]]:
index: dict[str, set[tuple[str, str, str | None]]] = {}
for channel in channels:
for name in channel.names:
normalized = normalize_name(name)
if normalized:
index.setdefault(normalized, set()).add((channel.channel_id, name, channel.icon))
return index
def build_epg_name_index(epg_url: str) -> dict[str, set[tuple[str, str, str | None]]]:
target_channels = parse_epg_channels(epg_url)
index = build_target_id_index(target_channels)
return index
def rewrite_header(header: str) -> str:
header = URL_TVG_RE.sub('', header)
header = re.sub(r'\s+', ' ', header).strip()
epg_url = f'{BASE_URL.rstrip("/")}/epg.xml.gz'
return f'{header} url-tvg="{epg_url}" x-tvg-url="{epg_url}"'
def rewrite_tvg_id(line: str, channel: Channel, epg_name_index: dict[str, set[tuple[str, str, str | None]]]) -> tuple[str, bool]:
match = TVG_ID_RE.search(line)
if not match:
return line, False
old_id = match.group(0)[8:-1]
channel_name = channel.name
matches = epg_name_index.get(normalize_name(channel_name), set())
exact_channel_name = normalize_exact_name(channel_name)
exact_matches = {(target_id, target_name, icon) for target_id, target_name, icon in matches if normalize_exact_name(target_name) == exact_channel_name}
if exact_matches:
matches = exact_matches
plain_matches = {(target_id, target_name, icon) for target_id, target_name, icon in matches if is_plain_display_name(target_name)}
if plain_matches:
matches = plain_matches
quality_channel_name = normalize_quality_name(channel_name)
quality_matches = {(target_id, target_name, icon) for target_id, target_name, icon in matches if normalize_quality_name(target_name) == quality_channel_name}
if quality_matches:
matches = quality_matches
no_quality_matches = {(target_id, target_name, icon) for target_id, target_name, icon in matches if not has_quality_token(target_name)}
if no_quality_matches:
matches = no_quality_matches
target_ids = {target_id for target_id, _, _ in matches}
if len(target_ids) != 1:
return line, False
new_id = next(iter(target_ids))
line = TVG_ID_RE.sub(f'tvg-id="{new_id}"', line, count=1)
icon = next((icon for target_id, _, icon in sorted(matches) if target_id == new_id and icon), None)
if icon and not channel.keep_logo:
line = replace_logo(line, icon)
return line, True
def rewrite_channel_ids(channels: list[Channel], epg_name_index: dict[str, set[tuple[str, str, str | None]]]) -> list[Channel]:
rewritten: list[Channel] = []
matched = 0
for channel in channels:
lines = []
for line in channel.lines:
if line.startswith('#EXTINF:'):
line, was_matched = rewrite_tvg_id(line, channel, epg_name_index)
if was_matched:
matched += 1
lines.append(line)
rewritten.append(Channel(name=channel.name, lines=lines, keep_logo=channel.keep_logo))
print(f'epg matched {matched}/{len(channels)} channels', file=sys.stderr, flush=True)
return rewritten
def filter_playlist(url: str) -> str:
with fetch(url) as response:
text = response.read().decode('utf-8')
header, channels = parse_playlist(text)
epg_name_index = build_epg_name_index(EPG_URL)
header = rewrite_header(header)
channels = filter_channels(channels)
channels = strip_hd_suffixes(channels)
channels = rewrite_channel_ids(channels, epg_name_index)
return build_playlist(header, channels)
def proxy_epg(handler: BaseHTTPRequestHandler) -> None:
with fetch(EPG_URL) as response:
handler.send_response(200)
handler.send_header('Content-Type', response.headers.get('Content-Type', 'application/gzip'))
for header in ('Content-Length', 'Last-Modified', 'ETag'):
value = response.headers.get(header)
if value:
handler.send_header(header, value)
handler.send_header('Connection', 'close')
handler.end_headers()
while True:
chunk = response.read(1024 * 1024)
if not chunk:
break
handler.wfile.write(chunk)
def playlist_url_for_path(path: str) -> str | None:
if path == '/playlist.m3u8':
return URL
if path == '/alt.m3u8':
return ALT_URL or None
return None
class PlaylistHandler(BaseHTTPRequestHandler):
def do_GET(self) -> None:
if self.path == '/epg.xml.gz':
try:
proxy_epg(self)
except Exception as error:
print(f'failed to proxy epg: {error}', file=sys.stderr, flush=True)
self.send_error(502, 'Failed to proxy upstream EPG')
return
url = playlist_url_for_path(self.path)
if not url:
self.send_error(404, 'Not Found')
return
try:
playlist = filter_playlist(url)
except Exception as error:
print(f'failed to build playlist: {error}', file=sys.stderr, flush=True)
self.send_error(502, 'Failed to fetch or filter upstream playlist')
return
body = playlist.encode('utf-8')
self.send_response(200)
self.send_header('Content-Type', 'application/vnd.apple.mpegurl; charset=utf-8')
self.send_header('Content-Length', str(len(body)))
self.end_headers()
self.wfile.write(body)
def log_message(self, format: str, *args: Any) -> None:
print(f'{self.address_string()} - {format % args}', file=sys.stderr)
def main() -> None:
server = ThreadingHTTPServer((HOST, PORT), PlaylistHandler)
print(f'Serving at http://{HOST}:{PORT}')
print(f'Upstream: {URL}')
if ALT_URL:
print(f'Alternative: {ALT_URL}')
server.serve_forever()
if __name__ == '__main__':
main()