""" Video Converter Service FFmpeg wrapper for HLS conversion with multiple qualities """ import json import subprocess from collections.abc import Callable from dataclasses import dataclass from pathlib import Path from app.config import Settings, get_settings from app.models.schemas import VideoMetadata @dataclass class QualityPreset: """Video quality preset for HLS.""" name: str height: int bitrate: str # e.g., "1000k" audio_bitrate: str # e.g., "128k" QUALITY_PRESETS: dict[str, QualityPreset] = { "360p": QualityPreset(name="360p", height=360, bitrate="800k", audio_bitrate="96k"), "720p": QualityPreset( name="720p", height=720, bitrate="2500k", audio_bitrate="128k" ), "1080p": QualityPreset( name="1080p", height=1080, bitrate="5000k", audio_bitrate="192k" ), } class ConverterService: """FFmpeg video converter for HLS streaming.""" def __init__(self, settings: Settings | None = None): self.settings = settings or get_settings() def is_ffmpeg_available(self) -> bool: """Check if FFmpeg is installed and accessible.""" try: result = subprocess.run( ["ffmpeg", "-version"], check=False, capture_output=True, text=True, timeout=10, ) return result.returncode == 0 except (subprocess.SubprocessError, FileNotFoundError): return False def get_ffmpeg_version(self) -> str | None: """Get FFmpeg version string.""" try: result = subprocess.run( ["ffmpeg", "-version"], check=False, capture_output=True, text=True, timeout=10, ) if result.returncode == 0: # First line contains version return result.stdout.split("\n")[0] return None except (subprocess.SubprocessError, FileNotFoundError): return None def extract_metadata(self, input_path: Path) -> VideoMetadata: """Extract video metadata using ffprobe.""" cmd = [ "ffprobe", "-v", "quiet", "-print_format", "json", "-show_format", "-show_streams", str(input_path), ] result = subprocess.run( cmd, check=False, capture_output=True, text=True, timeout=60 ) if result.returncode != 0: raise RuntimeError(f"ffprobe failed: {result.stderr}") data = json.loads(result.stdout) # Find video stream video_stream = None for stream in data.get("streams", []): if stream.get("codec_type") == "video": video_stream = stream break if not video_stream: raise RuntimeError("No video stream found") format_info = data.get("format", {}) # Parse frame rate (e.g., "30/1" or "29.97") fps_str = video_stream.get("r_frame_rate", "0/1") if "/" in fps_str: num, den = fps_str.split("/") fps = float(num) / float(den) if float(den) > 0 else 0 else: fps = float(fps_str) return VideoMetadata( duration_seconds=float(format_info.get("duration", 0)), width=int(video_stream.get("width", 0)), height=int(video_stream.get("height", 0)), fps=round(fps, 2), codec=video_stream.get("codec_name", ""), bitrate=int(format_info.get("bit_rate", 0)), file_size_bytes=int(format_info.get("size", 0)), ) def has_audio_stream(self, input_path: Path) -> bool: """Check if video file has an audio stream.""" cmd = [ "ffprobe", "-v", "quiet", "-print_format", "json", "-show_streams", "-select_streams", "a", str(input_path), ] result = subprocess.run( cmd, check=False, capture_output=True, text=True, timeout=60 ) if result.returncode != 0: return False data = json.loads(result.stdout) return len(data.get("streams", [])) > 0 def generate_thumbnail( self, input_path: Path, output_path: Path, time_offset: float = 5.0, ) -> bool: """Generate thumbnail at specified time offset.""" cmd = [ "ffmpeg", "-y", # Overwrite "-ss", str(time_offset), "-i", str(input_path), "-vframes", "1", "-vf", "scale=640:-1", # 640px width, maintain aspect "-q:v", "2", # High quality JPEG str(output_path), ] result = subprocess.run( cmd, check=False, capture_output=True, text=True, timeout=60 ) return result.returncode == 0 def convert_to_hls( self, input_path: Path, output_dir: Path, qualities: list[str] | None = None, progress_callback: Callable[[int], None] | None = None, ) -> dict: """ Convert video to HLS with multiple quality levels. Returns dict with: - master_playlist: path to master.m3u8 - qualities: list of generated quality levels - success: bool - error: optional error message """ if qualities is None: qualities = self.settings.video_qualities_list output_dir.mkdir(parents=True, exist_ok=True) hls_dir = output_dir / "hls" hls_dir.mkdir(exist_ok=True) # Get source video metadata to determine which qualities to generate try: metadata = self.extract_metadata(input_path) source_height = metadata.height except Exception: source_height = 1080 # Assume HD if can't detect # Filter qualities based on source resolution valid_qualities = [] for q in qualities: preset = QUALITY_PRESETS.get(q) if preset and preset.height <= source_height: valid_qualities.append(q) if not valid_qualities: # At minimum, include lowest quality valid_qualities = ["360p"] # Check if video has audio stream has_audio = self.has_audio_stream(input_path) # Build FFmpeg command for multi-quality HLS cmd = [ "ffmpeg", "-y", "-i", str(input_path), "-threads", str(self.settings.ffmpeg_threads), ] # Add output for each quality stream_maps = [] for i, quality_name in enumerate(valid_qualities): preset = QUALITY_PRESETS[quality_name] quality_dir = hls_dir / quality_name quality_dir.mkdir(exist_ok=True) # Video mapping and encoding cmd.extend( [ "-map", "0:v:0", f"-c:v:{i}", "libx264", f"-b:v:{i}", preset.bitrate, f"-vf:v:{i}", f"scale=-2:{preset.height}", ] ) # Audio mapping and encoding (only if audio stream exists) if has_audio: cmd.extend( [ "-map", "0:a:0", f"-c:a:{i}", "aac", f"-b:a:{i}", preset.audio_bitrate, ] ) stream_maps.append(f"v:{i},a:{i}") else: stream_maps.append(f"v:{i}") # HLS settings segment_duration = self.settings.hls_segment_duration # Use var_stream_map for multiple quality outputs var_stream_map = " ".join(stream_maps) cmd.extend( [ "-f", "hls", "-hls_time", str(segment_duration), "-hls_playlist_type", "vod", "-hls_flags", "independent_segments", "-hls_segment_filename", str(hls_dir / "%v" / "segment_%03d.ts"), "-master_pl_name", "master.m3u8", "-var_stream_map", var_stream_map, str(hls_dir / "%v" / "playlist.m3u8"), ] ) # Execute conversion try: result = subprocess.run( cmd, check=False, capture_output=True, text=True, timeout=3600, # 1 hour timeout ) if result.returncode != 0: return { "success": False, "error": result.stderr[-500:] if result.stderr else "Unknown error", "qualities": [], } # Rename quality directories to proper names for i, quality_name in enumerate(valid_qualities): src_dir = hls_dir / str(i) dst_dir = hls_dir / quality_name if src_dir.exists() and src_dir != dst_dir: if dst_dir.exists(): import shutil shutil.rmtree(dst_dir) src_dir.rename(dst_dir) # Fix master playlist to use correct quality names master_path = hls_dir / "master.m3u8" if master_path.exists(): content = master_path.read_text() for i, quality_name in enumerate(valid_qualities): content = content.replace( f"{i}/playlist.m3u8", f"{quality_name}/playlist.m3u8" ) master_path.write_text(content) return { "success": True, "master_playlist": str(master_path), "qualities": valid_qualities, } except subprocess.TimeoutExpired: return { "success": False, "error": "Conversion timed out after 1 hour", "qualities": [], } except Exception as e: return { "success": False, "error": str(e), "qualities": [], }