348 lines
11 KiB
Python
Executable File
348 lines
11 KiB
Python
Executable File
"""
|
|
Video Converter Service
|
|
FFmpeg wrapper for HLS conversion with multiple qualities
|
|
"""
|
|
|
|
import json
|
|
import subprocess
|
|
from collections.abc import Callable
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
|
|
from app.config import Settings, get_settings
|
|
from app.models.schemas import VideoMetadata
|
|
|
|
|
|
@dataclass
|
|
class QualityPreset:
|
|
"""Video quality preset for HLS."""
|
|
|
|
name: str
|
|
height: int
|
|
bitrate: str # e.g., "1000k"
|
|
audio_bitrate: str # e.g., "128k"
|
|
|
|
|
|
QUALITY_PRESETS: dict[str, QualityPreset] = {
|
|
"360p": QualityPreset(name="360p", height=360, bitrate="800k", audio_bitrate="96k"),
|
|
"720p": QualityPreset(
|
|
name="720p", height=720, bitrate="2500k", audio_bitrate="128k"
|
|
),
|
|
"1080p": QualityPreset(
|
|
name="1080p", height=1080, bitrate="5000k", audio_bitrate="192k"
|
|
),
|
|
}
|
|
|
|
|
|
class ConverterService:
|
|
"""FFmpeg video converter for HLS streaming."""
|
|
|
|
def __init__(self, settings: Settings | None = None):
|
|
self.settings = settings or get_settings()
|
|
|
|
def is_ffmpeg_available(self) -> bool:
|
|
"""Check if FFmpeg is installed and accessible."""
|
|
try:
|
|
result = subprocess.run(
|
|
["ffmpeg", "-version"],
|
|
check=False,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10,
|
|
)
|
|
return result.returncode == 0
|
|
except (subprocess.SubprocessError, FileNotFoundError):
|
|
return False
|
|
|
|
def get_ffmpeg_version(self) -> str | None:
|
|
"""Get FFmpeg version string."""
|
|
try:
|
|
result = subprocess.run(
|
|
["ffmpeg", "-version"],
|
|
check=False,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10,
|
|
)
|
|
if result.returncode == 0:
|
|
# First line contains version
|
|
return result.stdout.split("\n")[0]
|
|
return None
|
|
except (subprocess.SubprocessError, FileNotFoundError):
|
|
return None
|
|
|
|
def extract_metadata(self, input_path: Path) -> VideoMetadata:
|
|
"""Extract video metadata using ffprobe."""
|
|
cmd = [
|
|
"ffprobe",
|
|
"-v",
|
|
"quiet",
|
|
"-print_format",
|
|
"json",
|
|
"-show_format",
|
|
"-show_streams",
|
|
str(input_path),
|
|
]
|
|
|
|
result = subprocess.run(
|
|
cmd, check=False, capture_output=True, text=True, timeout=60
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
raise RuntimeError(f"ffprobe failed: {result.stderr}")
|
|
|
|
data = json.loads(result.stdout)
|
|
|
|
# Find video stream
|
|
video_stream = None
|
|
for stream in data.get("streams", []):
|
|
if stream.get("codec_type") == "video":
|
|
video_stream = stream
|
|
break
|
|
|
|
if not video_stream:
|
|
raise RuntimeError("No video stream found")
|
|
|
|
format_info = data.get("format", {})
|
|
|
|
# Parse frame rate (e.g., "30/1" or "29.97")
|
|
fps_str = video_stream.get("r_frame_rate", "0/1")
|
|
if "/" in fps_str:
|
|
num, den = fps_str.split("/")
|
|
fps = float(num) / float(den) if float(den) > 0 else 0
|
|
else:
|
|
fps = float(fps_str)
|
|
|
|
return VideoMetadata(
|
|
duration_seconds=float(format_info.get("duration", 0)),
|
|
width=int(video_stream.get("width", 0)),
|
|
height=int(video_stream.get("height", 0)),
|
|
fps=round(fps, 2),
|
|
codec=video_stream.get("codec_name", ""),
|
|
bitrate=int(format_info.get("bit_rate", 0)),
|
|
file_size_bytes=int(format_info.get("size", 0)),
|
|
)
|
|
|
|
def has_audio_stream(self, input_path: Path) -> bool:
|
|
"""Check if video file has an audio stream."""
|
|
cmd = [
|
|
"ffprobe",
|
|
"-v",
|
|
"quiet",
|
|
"-print_format",
|
|
"json",
|
|
"-show_streams",
|
|
"-select_streams",
|
|
"a",
|
|
str(input_path),
|
|
]
|
|
result = subprocess.run(
|
|
cmd, check=False, capture_output=True, text=True, timeout=60
|
|
)
|
|
if result.returncode != 0:
|
|
return False
|
|
data = json.loads(result.stdout)
|
|
return len(data.get("streams", [])) > 0
|
|
|
|
def generate_thumbnail(
|
|
self,
|
|
input_path: Path,
|
|
output_path: Path,
|
|
time_offset: float = 5.0,
|
|
) -> bool:
|
|
"""Generate thumbnail at specified time offset."""
|
|
cmd = [
|
|
"ffmpeg",
|
|
"-y", # Overwrite
|
|
"-ss",
|
|
str(time_offset),
|
|
"-i",
|
|
str(input_path),
|
|
"-vframes",
|
|
"1",
|
|
"-vf",
|
|
"scale=640:-1", # 640px width, maintain aspect
|
|
"-q:v",
|
|
"2", # High quality JPEG
|
|
str(output_path),
|
|
]
|
|
|
|
result = subprocess.run(
|
|
cmd, check=False, capture_output=True, text=True, timeout=60
|
|
)
|
|
return result.returncode == 0
|
|
|
|
def convert_to_hls(
|
|
self,
|
|
input_path: Path,
|
|
output_dir: Path,
|
|
qualities: list[str] | None = None,
|
|
progress_callback: Callable[[int], None] | None = None,
|
|
) -> dict:
|
|
"""
|
|
Convert video to HLS with multiple quality levels.
|
|
|
|
Returns dict with:
|
|
- master_playlist: path to master.m3u8
|
|
- qualities: list of generated quality levels
|
|
- success: bool
|
|
- error: optional error message
|
|
"""
|
|
if qualities is None:
|
|
qualities = self.settings.video_qualities_list
|
|
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
hls_dir = output_dir / "hls"
|
|
hls_dir.mkdir(exist_ok=True)
|
|
|
|
# Get source video metadata to determine which qualities to generate
|
|
try:
|
|
metadata = self.extract_metadata(input_path)
|
|
source_height = metadata.height
|
|
except Exception:
|
|
source_height = 1080 # Assume HD if can't detect
|
|
|
|
# Filter qualities based on source resolution
|
|
valid_qualities = []
|
|
for q in qualities:
|
|
preset = QUALITY_PRESETS.get(q)
|
|
if preset and preset.height <= source_height:
|
|
valid_qualities.append(q)
|
|
|
|
if not valid_qualities:
|
|
# At minimum, include lowest quality
|
|
valid_qualities = ["360p"]
|
|
|
|
# Check if video has audio stream
|
|
has_audio = self.has_audio_stream(input_path)
|
|
|
|
# Build FFmpeg command for multi-quality HLS
|
|
cmd = [
|
|
"ffmpeg",
|
|
"-y",
|
|
"-i",
|
|
str(input_path),
|
|
"-threads",
|
|
str(self.settings.ffmpeg_threads),
|
|
]
|
|
|
|
# Add output for each quality
|
|
stream_maps = []
|
|
for i, quality_name in enumerate(valid_qualities):
|
|
preset = QUALITY_PRESETS[quality_name]
|
|
quality_dir = hls_dir / quality_name
|
|
quality_dir.mkdir(exist_ok=True)
|
|
|
|
# Video mapping and encoding
|
|
cmd.extend(
|
|
[
|
|
"-map",
|
|
"0:v:0",
|
|
f"-c:v:{i}",
|
|
"libx264",
|
|
f"-b:v:{i}",
|
|
preset.bitrate,
|
|
f"-vf:v:{i}",
|
|
f"scale=-2:{preset.height}",
|
|
]
|
|
)
|
|
|
|
# Audio mapping and encoding (only if audio stream exists)
|
|
if has_audio:
|
|
cmd.extend(
|
|
[
|
|
"-map",
|
|
"0:a:0",
|
|
f"-c:a:{i}",
|
|
"aac",
|
|
f"-b:a:{i}",
|
|
preset.audio_bitrate,
|
|
]
|
|
)
|
|
stream_maps.append(f"v:{i},a:{i}")
|
|
else:
|
|
stream_maps.append(f"v:{i}")
|
|
|
|
# HLS settings
|
|
segment_duration = self.settings.hls_segment_duration
|
|
|
|
# Use var_stream_map for multiple quality outputs
|
|
var_stream_map = " ".join(stream_maps)
|
|
|
|
cmd.extend(
|
|
[
|
|
"-f",
|
|
"hls",
|
|
"-hls_time",
|
|
str(segment_duration),
|
|
"-hls_playlist_type",
|
|
"vod",
|
|
"-hls_flags",
|
|
"independent_segments",
|
|
"-hls_segment_filename",
|
|
str(hls_dir / "%v" / "segment_%03d.ts"),
|
|
"-master_pl_name",
|
|
"master.m3u8",
|
|
"-var_stream_map",
|
|
var_stream_map,
|
|
str(hls_dir / "%v" / "playlist.m3u8"),
|
|
]
|
|
)
|
|
|
|
# Execute conversion
|
|
try:
|
|
result = subprocess.run(
|
|
cmd,
|
|
check=False,
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=3600, # 1 hour timeout
|
|
)
|
|
|
|
if result.returncode != 0:
|
|
return {
|
|
"success": False,
|
|
"error": result.stderr[-500:] if result.stderr else "Unknown error",
|
|
"qualities": [],
|
|
}
|
|
|
|
# Rename quality directories to proper names
|
|
for i, quality_name in enumerate(valid_qualities):
|
|
src_dir = hls_dir / str(i)
|
|
dst_dir = hls_dir / quality_name
|
|
if src_dir.exists() and src_dir != dst_dir:
|
|
if dst_dir.exists():
|
|
import shutil
|
|
|
|
shutil.rmtree(dst_dir)
|
|
src_dir.rename(dst_dir)
|
|
|
|
# Fix master playlist to use correct quality names
|
|
master_path = hls_dir / "master.m3u8"
|
|
if master_path.exists():
|
|
content = master_path.read_text()
|
|
for i, quality_name in enumerate(valid_qualities):
|
|
content = content.replace(
|
|
f"{i}/playlist.m3u8", f"{quality_name}/playlist.m3u8"
|
|
)
|
|
master_path.write_text(content)
|
|
|
|
return {
|
|
"success": True,
|
|
"master_playlist": str(master_path),
|
|
"qualities": valid_qualities,
|
|
}
|
|
|
|
except subprocess.TimeoutExpired:
|
|
return {
|
|
"success": False,
|
|
"error": "Conversion timed out after 1 hour",
|
|
"qualities": [],
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
"success": False,
|
|
"error": str(e),
|
|
"qualities": [],
|
|
}
|