Initial commit - Video Service for Coolify

This commit is contained in:
2025-12-17 10:07:44 +01:00
commit baa1675738
46 changed files with 2386 additions and 0 deletions

2
app/__init__.py Executable file
View File

@@ -0,0 +1,2 @@
# Video-Service for Kurs-Booking Plugin
__version__ = "1.0.0"

1
app/api/__init__.py Executable file
View File

@@ -0,0 +1 @@
# API Module

127
app/api/auth.py Executable file
View File

@@ -0,0 +1,127 @@
"""
Authentication Module
- API Key for WordPress ↔ Video-Service communication
- JWT for streaming token validation
"""
from datetime import UTC, datetime, timedelta
from typing import Annotated
import jwt
from fastapi import Depends, Header, HTTPException, Request, status
from pydantic import BaseModel
from app.config import Settings, get_settings
class TokenPayload(BaseModel):
"""JWT token payload for video streaming."""
video_id: str
buchung_id: int
ip: str
exp: datetime
class StreamToken(BaseModel):
"""Response model for stream token."""
token: str
expires_at: datetime
stream_url: str
def verify_api_key(
x_api_key: Annotated[str | None, Header()] = None,
settings: Settings = Depends(get_settings),
) -> bool:
"""Verify API key from WordPress."""
if not x_api_key:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Missing API key",
)
if x_api_key != settings.api_key:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid API key",
)
return True
def create_stream_token(
video_id: str,
buchung_id: int,
client_ip: str,
settings: Settings = Depends(get_settings),
) -> StreamToken:
"""Create a JWT token for video streaming with IP binding."""
expiry = datetime.now(UTC) + timedelta(hours=settings.jwt_expiry_hours)
payload = {
"video_id": video_id,
"buchung_id": buchung_id,
"ip": client_ip,
"exp": expiry,
"iat": datetime.now(UTC),
}
token = jwt.encode(payload, settings.jwt_secret, algorithm=settings.jwt_algorithm)
# Build stream URL using configured base_url (handles production/development)
stream_url = (
f"{settings.base_url}/api/v1/stream/{video_id}/master.m3u8?token={token}"
)
return StreamToken(
token=token,
expires_at=expiry,
stream_url=stream_url,
)
def verify_stream_token(
token: str,
video_id: str,
request: Request,
settings: Settings = Depends(get_settings),
) -> TokenPayload:
"""Verify JWT token for streaming, including IP binding."""
try:
payload = jwt.decode(
token,
settings.jwt_secret,
algorithms=[settings.jwt_algorithm],
)
except jwt.ExpiredSignatureError as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token has expired",
) from e
except jwt.InvalidTokenError as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Invalid token: {e}",
) from e
# Verify video_id matches
if payload.get("video_id") != video_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Token not valid for this video",
)
# Verify IP binding (optional in development)
client_ip = request.client.host if request.client else "unknown"
if settings.environment == "production" and payload.get("ip") != client_ip:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="IP address mismatch",
)
return TokenPayload(
video_id=payload["video_id"],
buchung_id=payload["buchung_id"],
ip=payload["ip"],
exp=datetime.fromtimestamp(payload["exp"], tz=UTC),
)

458
app/api/routes.py Executable file
View File

@@ -0,0 +1,458 @@
"""
API Routes for Video-Service
"""
import logging
import re
from datetime import UTC
from pathlib import Path
import redis
from fastapi import (
APIRouter,
Depends,
File,
Form,
HTTPException,
Query,
Request,
UploadFile,
status,
)
from fastapi.responses import FileResponse, Response
from app.api.auth import create_stream_token, verify_api_key, verify_stream_token
from app.config import Settings, get_settings
from app.models.schemas import (
HealthResponse,
StreamTokenRequest,
VideoListResponse,
VideoMetadata,
VideoQuality,
VideoStatus,
VideoStatusResponse,
)
from app.services.converter import ConverterService
from app.services.storage import StorageService
from app.tasks.video_tasks import process_video
logger = logging.getLogger(__name__)
router = APIRouter()
# Health Check (no auth required)
@router.get("/health", response_model=HealthResponse)
async def health_check(settings: Settings = Depends(get_settings)):
"""Health check endpoint."""
storage = StorageService(settings)
converter = ConverterService(settings)
# Check Redis connection
redis_connected = False
try:
r = redis.from_url(settings.redis_url)
r.ping()
redis_connected = True
except Exception:
pass
from app import __version__
return HealthResponse(
status="healthy" if redis_connected and storage.is_writable() else "degraded",
version=__version__,
environment=settings.environment,
redis_connected=redis_connected,
storage_writable=storage.is_writable(),
ffmpeg_available=converter.is_ffmpeg_available(),
)
# Video Upload
@router.post("/videos/upload", response_model=VideoStatusResponse)
async def upload_video(
file: UploadFile = File(...),
kurs_id: int = Form(...),
title: str = Form(...),
description: str | None = Form(None),
_: bool = Depends(verify_api_key),
settings: Settings = Depends(get_settings),
):
"""
Upload a video file for processing.
Requires API key authentication.
"""
# Validate file type
allowed_types = [
"video/mp4",
"video/quicktime",
"video/x-msvideo",
"video/x-matroska",
]
if file.content_type not in allowed_types:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Invalid file type: {file.content_type}. Allowed: MP4, MOV, AVI, MKV",
)
# Check file size (read first chunk to estimate)
storage = StorageService(settings)
video_id = storage.generate_video_id()
# Determine safe filename
original_ext = Path(file.filename or "video.mp4").suffix.lower()
if original_ext not in [".mp4", ".mov", ".avi", ".mkv"]:
original_ext = ".mp4"
safe_filename = f"source{original_ext}"
# Save uploaded file
upload_path = storage.get_upload_path(video_id, safe_filename)
try:
# Stream file to disk
file_size = 0
with upload_path.open("wb") as f:
while chunk := await file.read(1024 * 1024): # 1MB chunks
file_size += len(chunk)
if file_size > settings.max_upload_size_bytes:
f.close()
upload_path.unlink()
raise HTTPException(
status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE,
detail=f"File too large. Max: {settings.max_upload_size_mb}MB",
)
f.write(chunk)
logger.info(f"Uploaded {file_size} bytes for video {video_id}")
# Save initial metadata
from datetime import datetime
storage.save_metadata(
video_id=video_id,
kurs_id=kurs_id,
title=title,
status=VideoStatus.PENDING,
)
# Queue processing task
process_video.delay(video_id, safe_filename)
# Return status
return VideoStatusResponse(
video_id=video_id,
kurs_id=kurs_id,
title=title,
status=VideoStatus.PENDING,
progress=0,
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
)
except HTTPException:
raise
except Exception as e:
logger.exception(f"Upload failed: {e}")
if upload_path.exists():
upload_path.unlink()
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Upload failed: {e!s}",
) from e
# Video Status
@router.get("/videos/{video_id}/status", response_model=VideoStatusResponse)
async def get_video_status(
video_id: str,
_: bool = Depends(verify_api_key),
settings: Settings = Depends(get_settings),
):
"""Get video processing status."""
storage = StorageService(settings)
meta = storage.load_metadata(video_id)
if not meta:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Video not found",
)
from datetime import datetime
# Parse metadata
video_metadata = None
if meta.get("metadata"):
video_metadata = VideoMetadata(**meta["metadata"])
# Build thumbnail URL using configured base_url
thumbnail_url = None
thumbnail_path = storage.get_thumbnail_path(video_id)
if thumbnail_path.exists():
thumbnail_url = f"{settings.base_url}/api/v1/videos/{video_id}/thumbnail"
# Determine available qualities
hls_path = storage.get_hls_path(video_id)
available_qualities = []
if hls_path.exists():
for quality in ["360p", "720p", "1080p"]:
if (hls_path / quality).exists():
available_qualities.append(VideoQuality(quality))
return VideoStatusResponse(
video_id=video_id,
kurs_id=meta.get("kurs_id", 0),
title=meta.get("title", ""),
status=VideoStatus(meta.get("status", "pending")),
progress=meta.get("progress", 0),
metadata=video_metadata,
thumbnail_url=thumbnail_url,
available_qualities=available_qualities,
created_at=datetime.fromisoformat(
meta.get("created_at", datetime.now().isoformat())
),
updated_at=datetime.fromisoformat(
meta.get("updated_at", datetime.now().isoformat())
),
error_message=meta.get("error_message"),
)
# List Videos
@router.get("/videos", response_model=VideoListResponse)
async def list_videos(
kurs_id: int | None = Query(None),
status_filter: VideoStatus | None = Query(None, alias="status"),
_: bool = Depends(verify_api_key),
settings: Settings = Depends(get_settings),
):
"""List all videos, optionally filtered by kurs_id or status."""
storage = StorageService(settings)
videos = []
for vid in storage.get_video_ids():
meta = storage.load_metadata(vid)
if not meta:
continue
# Apply filters
if kurs_id and meta.get("kurs_id") != kurs_id:
continue
if status_filter and meta.get("status") != status_filter.value:
continue
from datetime import datetime
video_metadata = None
if meta.get("metadata"):
video_metadata = VideoMetadata(**meta["metadata"])
videos.append(
VideoStatusResponse(
video_id=vid,
kurs_id=meta.get("kurs_id", 0),
title=meta.get("title", ""),
status=VideoStatus(meta.get("status", "pending")),
progress=meta.get("progress", 0),
metadata=video_metadata,
created_at=datetime.fromisoformat(
meta.get("created_at", datetime.now().isoformat())
),
updated_at=datetime.fromisoformat(
meta.get("updated_at", datetime.now().isoformat())
),
error_message=meta.get("error_message"),
)
)
return VideoListResponse(videos=videos, total=len(videos))
# Delete Video
@router.delete("/videos/{video_id}")
async def delete_video(
video_id: str,
_: bool = Depends(verify_api_key),
settings: Settings = Depends(get_settings),
):
"""Delete a video and all associated files."""
storage = StorageService(settings)
if not storage.delete_video(video_id):
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Video not found",
)
return {"status": "deleted", "video_id": video_id}
# Thumbnail
@router.get("/videos/{video_id}/thumbnail")
async def get_thumbnail(
video_id: str,
settings: Settings = Depends(get_settings),
):
"""Get video thumbnail. No auth required for thumbnails."""
storage = StorageService(settings)
thumbnail_path = storage.get_thumbnail_path(video_id)
if not thumbnail_path.exists():
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Thumbnail not found",
)
return FileResponse(
thumbnail_path,
media_type="image/jpeg",
headers={"Cache-Control": "public, max-age=86400"},
)
# Stream Token
@router.post("/videos/{video_id}/token")
async def get_stream_token(
video_id: str,
request: Request,
data: StreamTokenRequest,
_: bool = Depends(verify_api_key),
settings: Settings = Depends(get_settings),
):
"""
Get a JWT token for video streaming.
Token is IP-bound and expires after configured time.
"""
storage = StorageService(settings)
meta = storage.load_metadata(video_id)
if not meta:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Video not found",
)
if meta.get("status") != VideoStatus.READY.value:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"Video not ready. Status: {meta.get('status')}",
)
client_ip = request.client.host if request.client else "unknown"
return create_stream_token(
video_id=video_id,
buchung_id=data.buchung_id,
client_ip=client_ip,
settings=settings,
)
# HLS Streaming
@router.get("/stream/{video_id}/master.m3u8")
async def stream_master_playlist(
video_id: str,
token: str = Query(...),
request: Request = None,
settings: Settings = Depends(get_settings),
):
"""Serve master HLS playlist. Requires valid JWT token."""
# Verify token
verify_stream_token(token, video_id, request, settings)
storage = StorageService(settings)
master_path = storage.get_hls_path(video_id) / "master.m3u8"
if not master_path.exists():
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Playlist not found",
)
# Modify playlist to include token in quality playlist URLs
content = master_path.read_text()
# Add token to each playlist URL
modified_content = re.sub(
r"(\d+p/playlist\.m3u8)",
rf"\1?token={token}",
content,
)
return Response(
content=modified_content,
media_type="application/vnd.apple.mpegurl",
headers={"Cache-Control": "no-cache"},
)
@router.get("/stream/{video_id}/{quality}/playlist.m3u8")
async def stream_quality_playlist(
video_id: str,
quality: str,
token: str = Query(...),
request: Request = None,
settings: Settings = Depends(get_settings),
):
"""Serve quality-specific HLS playlist."""
verify_stream_token(token, video_id, request, settings)
storage = StorageService(settings)
playlist_path = storage.get_hls_path(video_id) / quality / "playlist.m3u8"
if not playlist_path.exists():
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Playlist not found",
)
# Modify segment URLs to include token
content = playlist_path.read_text()
modified_content = re.sub(
r"(segment_\d+\.ts)",
rf"\1?token={token}",
content,
)
return Response(
content=modified_content,
media_type="application/vnd.apple.mpegurl",
headers={"Cache-Control": "no-cache"},
)
@router.api_route("/stream/{video_id}/{quality}/{segment}", methods=["GET", "HEAD"])
async def stream_segment(
video_id: str,
quality: str,
segment: str,
token: str = Query(...),
request: Request = None,
settings: Settings = Depends(get_settings),
):
"""Serve HLS video segment. Supports HEAD for content-length checks."""
verify_stream_token(token, video_id, request, settings)
# Validate segment filename (prevent path traversal)
if not re.match(r"^segment_\d{3}\.ts$", segment):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid segment name",
)
storage = StorageService(settings)
segment_path = storage.get_hls_path(video_id) / quality / segment
if not segment_path.exists():
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Segment not found",
)
return FileResponse(
segment_path,
media_type="video/mp2t",
headers={
"Cache-Control": "public, max-age=31536000", # 1 year - segments don't change
},
)

34
app/celery_app.py Executable file
View File

@@ -0,0 +1,34 @@
"""
Celery Application Configuration
"""
from celery import Celery
from app.config import get_settings
settings = get_settings()
celery_app = Celery(
"video_tasks",
broker=settings.redis_url,
backend=settings.redis_url,
include=["app.tasks.video_tasks"],
)
# Celery Configuration
celery_app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="Europe/Vienna",
enable_utc=True,
# Task settings
task_track_started=True,
task_time_limit=3600, # 1 hour hard limit
task_soft_time_limit=3300, # 55 min soft limit
# Worker settings
worker_prefetch_multiplier=1, # One task at a time for video processing
worker_concurrency=2,
# Result settings
result_expires=86400, # 24 hours
)

86
app/config.py Executable file
View File

@@ -0,0 +1,86 @@
"""
Video-Service Configuration
Using pydantic-settings for type-safe environment configuration
"""
from functools import lru_cache
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
"""Application settings loaded from environment variables."""
# Environment
environment: str = "development"
debug: bool = False
# API Security
api_key: str = "change-me-in-production"
jwt_secret: str = "change-me-in-production"
jwt_algorithm: str = "HS256"
jwt_expiry_hours: int = 1
# Redis
redis_url: str = "redis://localhost:6379/0"
# Storage
storage_path: str = "/app/storage"
# WordPress Integration
wordpress_webhook_url: str = ""
wordpress_api_key: str = ""
# CORS
allowed_origins: str = "http://localhost:8300"
# Upload Limits
max_upload_size_mb: int = 2048 # 2GB default
# FFmpeg Settings
ffmpeg_threads: int = 2
hls_segment_duration: int = 6
video_qualities: str = "360p,720p,1080p"
# Production Domain
video_domain: str = "videos.islandpferde-melanieworbs.de"
# Public URL for development (external access from browser)
# In production, https://{video_domain} is used instead
video_public_url: str = ""
@property
def base_url(self) -> str:
"""Get the base URL for video streaming (accessible from browser)."""
if self.environment == "production":
return f"https://{self.video_domain}"
elif self.video_public_url:
return self.video_public_url.rstrip("/")
else:
return "http://localhost:8500"
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
case_sensitive = False
@property
def allowed_origins_list(self) -> list[str]:
"""Parse comma-separated origins into list."""
return [origin.strip() for origin in self.allowed_origins.split(",")]
@property
def video_qualities_list(self) -> list[str]:
"""Parse comma-separated qualities into list."""
return [q.strip() for q in self.video_qualities.split(",")]
@property
def max_upload_size_bytes(self) -> int:
"""Convert MB to bytes."""
return self.max_upload_size_mb * 1024 * 1024
@lru_cache
def get_settings() -> Settings:
"""Cached settings instance."""
return Settings()

81
app/main.py Executable file
View File

@@ -0,0 +1,81 @@
"""
Video-Service for Kurs-Booking Plugin
FastAPI Application Entry Point
"""
import logging
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from app import __version__
from app.api.routes import router
from app.config import get_settings
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan handler."""
settings = get_settings()
logger.info(f"Starting Video-Service v{__version__}")
logger.info(f"Environment: {settings.environment}")
logger.info(f"Storage path: {settings.storage_path}")
logger.info(f"Redis URL: {settings.redis_url}")
yield
logger.info("Shutting down Video-Service")
# Create FastAPI app
settings = get_settings()
app = FastAPI(
title="Kurs-Booking Video-Service",
description="Video hosting and streaming service for Kurs-Booking WordPress plugin",
version=__version__,
docs_url="/docs" if settings.debug else None,
redoc_url="/redoc" if settings.debug else None,
openapi_url="/openapi.json" if settings.debug else None,
lifespan=lifespan,
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=settings.allowed_origins_list,
allow_credentials=True,
allow_methods=["GET", "POST", "DELETE"],
allow_headers=["*"],
expose_headers=["Content-Disposition"],
)
# Include API routes
app.include_router(router, prefix="/api/v1")
# Root endpoint
@app.get("/")
async def root():
"""Root endpoint with service info."""
return {
"service": "Kurs-Booking Video-Service",
"version": __version__,
"docs": "/docs" if settings.debug else "disabled",
"health": "/api/v1/health",
}
# Health check at root level (for Docker health checks)
@app.get("/health")
async def health():
"""Simple health check for Docker."""
return {"status": "ok"}

1
app/models/__init__.py Executable file
View File

@@ -0,0 +1 @@
# Models Module

109
app/models/schemas.py Executable file
View File

@@ -0,0 +1,109 @@
"""
Pydantic schemas for Video-Service API
"""
from datetime import datetime
from enum import Enum
from pydantic import BaseModel, Field
class VideoStatus(str, Enum):
"""Video processing status."""
PENDING = "pending"
UPLOADING = "uploading"
PROCESSING = "processing"
READY = "ready"
ERROR = "error"
class VideoQuality(str, Enum):
"""Available video qualities."""
Q360P = "360p"
Q720P = "720p"
Q1080P = "1080p"
class VideoMetadata(BaseModel):
"""Video metadata extracted from file."""
duration_seconds: float = 0
width: int = 0
height: int = 0
fps: float = 0
codec: str = ""
bitrate: int = 0
file_size_bytes: int = 0
class VideoUploadRequest(BaseModel):
"""Request to initiate video upload."""
kurs_id: int = Field(..., description="WordPress Kurs Post ID")
title: str = Field(..., min_length=1, max_length=255)
description: str | None = None
class VideoUploadResponse(BaseModel):
"""Response after initiating upload."""
video_id: str
upload_url: str
expires_at: datetime
class VideoStatusResponse(BaseModel):
"""Video status response."""
video_id: str
kurs_id: int
title: str
status: VideoStatus
progress: int = Field(0, ge=0, le=100)
metadata: VideoMetadata | None = None
thumbnail_url: str | None = None
available_qualities: list[VideoQuality] = []
created_at: datetime
updated_at: datetime
error_message: str | None = None
class VideoListResponse(BaseModel):
"""List of videos response."""
videos: list[VideoStatusResponse]
total: int
class StreamTokenRequest(BaseModel):
"""Request for stream token."""
video_id: str
buchung_id: int
class WebhookPayload(BaseModel):
"""Payload sent to WordPress webhook."""
event: str # video.ready, video.error, video.progress
video_id: str
kurs_id: int
status: VideoStatus
progress: int = 0
metadata: VideoMetadata | None = None
thumbnail_url: str | None = None
error_message: str | None = None
timestamp: datetime
class HealthResponse(BaseModel):
"""Health check response."""
status: str
version: str
environment: str
redis_connected: bool
storage_writable: bool
ffmpeg_available: bool

1
app/services/__init__.py Executable file
View File

@@ -0,0 +1 @@
# Services Module

347
app/services/converter.py Executable file
View File

@@ -0,0 +1,347 @@
"""
Video Converter Service
FFmpeg wrapper for HLS conversion with multiple qualities
"""
import json
import subprocess
from collections.abc import Callable
from dataclasses import dataclass
from pathlib import Path
from app.config import Settings, get_settings
from app.models.schemas import VideoMetadata
@dataclass
class QualityPreset:
"""Video quality preset for HLS."""
name: str
height: int
bitrate: str # e.g., "1000k"
audio_bitrate: str # e.g., "128k"
QUALITY_PRESETS: dict[str, QualityPreset] = {
"360p": QualityPreset(name="360p", height=360, bitrate="800k", audio_bitrate="96k"),
"720p": QualityPreset(
name="720p", height=720, bitrate="2500k", audio_bitrate="128k"
),
"1080p": QualityPreset(
name="1080p", height=1080, bitrate="5000k", audio_bitrate="192k"
),
}
class ConverterService:
"""FFmpeg video converter for HLS streaming."""
def __init__(self, settings: Settings | None = None):
self.settings = settings or get_settings()
def is_ffmpeg_available(self) -> bool:
"""Check if FFmpeg is installed and accessible."""
try:
result = subprocess.run(
["ffmpeg", "-version"],
check=False,
capture_output=True,
text=True,
timeout=10,
)
return result.returncode == 0
except (subprocess.SubprocessError, FileNotFoundError):
return False
def get_ffmpeg_version(self) -> str | None:
"""Get FFmpeg version string."""
try:
result = subprocess.run(
["ffmpeg", "-version"],
check=False,
capture_output=True,
text=True,
timeout=10,
)
if result.returncode == 0:
# First line contains version
return result.stdout.split("\n")[0]
return None
except (subprocess.SubprocessError, FileNotFoundError):
return None
def extract_metadata(self, input_path: Path) -> VideoMetadata:
"""Extract video metadata using ffprobe."""
cmd = [
"ffprobe",
"-v",
"quiet",
"-print_format",
"json",
"-show_format",
"-show_streams",
str(input_path),
]
result = subprocess.run(
cmd, check=False, capture_output=True, text=True, timeout=60
)
if result.returncode != 0:
raise RuntimeError(f"ffprobe failed: {result.stderr}")
data = json.loads(result.stdout)
# Find video stream
video_stream = None
for stream in data.get("streams", []):
if stream.get("codec_type") == "video":
video_stream = stream
break
if not video_stream:
raise RuntimeError("No video stream found")
format_info = data.get("format", {})
# Parse frame rate (e.g., "30/1" or "29.97")
fps_str = video_stream.get("r_frame_rate", "0/1")
if "/" in fps_str:
num, den = fps_str.split("/")
fps = float(num) / float(den) if float(den) > 0 else 0
else:
fps = float(fps_str)
return VideoMetadata(
duration_seconds=float(format_info.get("duration", 0)),
width=int(video_stream.get("width", 0)),
height=int(video_stream.get("height", 0)),
fps=round(fps, 2),
codec=video_stream.get("codec_name", ""),
bitrate=int(format_info.get("bit_rate", 0)),
file_size_bytes=int(format_info.get("size", 0)),
)
def has_audio_stream(self, input_path: Path) -> bool:
"""Check if video file has an audio stream."""
cmd = [
"ffprobe",
"-v",
"quiet",
"-print_format",
"json",
"-show_streams",
"-select_streams",
"a",
str(input_path),
]
result = subprocess.run(
cmd, check=False, capture_output=True, text=True, timeout=60
)
if result.returncode != 0:
return False
data = json.loads(result.stdout)
return len(data.get("streams", [])) > 0
def generate_thumbnail(
self,
input_path: Path,
output_path: Path,
time_offset: float = 5.0,
) -> bool:
"""Generate thumbnail at specified time offset."""
cmd = [
"ffmpeg",
"-y", # Overwrite
"-ss",
str(time_offset),
"-i",
str(input_path),
"-vframes",
"1",
"-vf",
"scale=640:-1", # 640px width, maintain aspect
"-q:v",
"2", # High quality JPEG
str(output_path),
]
result = subprocess.run(
cmd, check=False, capture_output=True, text=True, timeout=60
)
return result.returncode == 0
def convert_to_hls(
self,
input_path: Path,
output_dir: Path,
qualities: list[str] | None = None,
progress_callback: Callable[[int], None] | None = None,
) -> dict:
"""
Convert video to HLS with multiple quality levels.
Returns dict with:
- master_playlist: path to master.m3u8
- qualities: list of generated quality levels
- success: bool
- error: optional error message
"""
if qualities is None:
qualities = self.settings.video_qualities_list
output_dir.mkdir(parents=True, exist_ok=True)
hls_dir = output_dir / "hls"
hls_dir.mkdir(exist_ok=True)
# Get source video metadata to determine which qualities to generate
try:
metadata = self.extract_metadata(input_path)
source_height = metadata.height
except Exception:
source_height = 1080 # Assume HD if can't detect
# Filter qualities based on source resolution
valid_qualities = []
for q in qualities:
preset = QUALITY_PRESETS.get(q)
if preset and preset.height <= source_height:
valid_qualities.append(q)
if not valid_qualities:
# At minimum, include lowest quality
valid_qualities = ["360p"]
# Check if video has audio stream
has_audio = self.has_audio_stream(input_path)
# Build FFmpeg command for multi-quality HLS
cmd = [
"ffmpeg",
"-y",
"-i",
str(input_path),
"-threads",
str(self.settings.ffmpeg_threads),
]
# Add output for each quality
stream_maps = []
for i, quality_name in enumerate(valid_qualities):
preset = QUALITY_PRESETS[quality_name]
quality_dir = hls_dir / quality_name
quality_dir.mkdir(exist_ok=True)
# Video mapping and encoding
cmd.extend(
[
"-map",
"0:v:0",
f"-c:v:{i}",
"libx264",
f"-b:v:{i}",
preset.bitrate,
f"-vf:v:{i}",
f"scale=-2:{preset.height}",
]
)
# Audio mapping and encoding (only if audio stream exists)
if has_audio:
cmd.extend(
[
"-map",
"0:a:0",
f"-c:a:{i}",
"aac",
f"-b:a:{i}",
preset.audio_bitrate,
]
)
stream_maps.append(f"v:{i},a:{i}")
else:
stream_maps.append(f"v:{i}")
# HLS settings
segment_duration = self.settings.hls_segment_duration
# Use var_stream_map for multiple quality outputs
var_stream_map = " ".join(stream_maps)
cmd.extend(
[
"-f",
"hls",
"-hls_time",
str(segment_duration),
"-hls_playlist_type",
"vod",
"-hls_flags",
"independent_segments",
"-hls_segment_filename",
str(hls_dir / "%v" / "segment_%03d.ts"),
"-master_pl_name",
"master.m3u8",
"-var_stream_map",
var_stream_map,
str(hls_dir / "%v" / "playlist.m3u8"),
]
)
# Execute conversion
try:
result = subprocess.run(
cmd,
check=False,
capture_output=True,
text=True,
timeout=3600, # 1 hour timeout
)
if result.returncode != 0:
return {
"success": False,
"error": result.stderr[-500:] if result.stderr else "Unknown error",
"qualities": [],
}
# Rename quality directories to proper names
for i, quality_name in enumerate(valid_qualities):
src_dir = hls_dir / str(i)
dst_dir = hls_dir / quality_name
if src_dir.exists() and src_dir != dst_dir:
if dst_dir.exists():
import shutil
shutil.rmtree(dst_dir)
src_dir.rename(dst_dir)
# Fix master playlist to use correct quality names
master_path = hls_dir / "master.m3u8"
if master_path.exists():
content = master_path.read_text()
for i, quality_name in enumerate(valid_qualities):
content = content.replace(
f"{i}/playlist.m3u8", f"{quality_name}/playlist.m3u8"
)
master_path.write_text(content)
return {
"success": True,
"master_playlist": str(master_path),
"qualities": valid_qualities,
}
except subprocess.TimeoutExpired:
return {
"success": False,
"error": "Conversion timed out after 1 hour",
"qualities": [],
}
except Exception as e:
return {
"success": False,
"error": str(e),
"qualities": [],
}

168
app/services/storage.py Executable file
View File

@@ -0,0 +1,168 @@
"""
Storage Service
Handles file storage for uploaded and converted videos
"""
import json
import shutil
import uuid
from datetime import UTC, datetime
from pathlib import Path
from app.config import Settings, get_settings
from app.models.schemas import VideoMetadata, VideoStatus
class StorageService:
"""Manage video file storage."""
def __init__(self, settings: Settings | None = None):
self.settings = settings or get_settings()
self.base_path = Path(self.settings.storage_path)
self.uploads_path = self.base_path / "uploads"
self.converted_path = self.base_path / "converted"
# Ensure directories exist
self.uploads_path.mkdir(parents=True, exist_ok=True)
self.converted_path.mkdir(parents=True, exist_ok=True)
def generate_video_id(self) -> str:
"""Generate unique video ID."""
return str(uuid.uuid4())[:12]
def get_upload_path(self, video_id: str, filename: str) -> Path:
"""Get path for uploaded video file."""
video_dir = self.uploads_path / video_id
video_dir.mkdir(parents=True, exist_ok=True)
return video_dir / filename
def get_converted_path(self, video_id: str) -> Path:
"""Get path for converted video directory."""
video_dir = self.converted_path / video_id
video_dir.mkdir(parents=True, exist_ok=True)
return video_dir
def get_hls_path(self, video_id: str) -> Path:
"""Get path for HLS output."""
return self.get_converted_path(video_id) / "hls"
def get_thumbnail_path(self, video_id: str) -> Path:
"""Get path for thumbnail."""
return self.get_converted_path(video_id) / "thumbnail.jpg"
def get_metadata_path(self, video_id: str) -> Path:
"""Get path for metadata JSON."""
return self.get_converted_path(video_id) / "metadata.json"
def save_metadata(
self,
video_id: str,
kurs_id: int,
title: str,
status: VideoStatus,
metadata: VideoMetadata | None = None,
error_message: str | None = None,
) -> dict:
"""Save video metadata to JSON file."""
meta_path = self.get_metadata_path(video_id)
data = {
"video_id": video_id,
"kurs_id": kurs_id,
"title": title,
"status": status.value,
"created_at": datetime.now(UTC).isoformat(),
"updated_at": datetime.now(UTC).isoformat(),
"error_message": error_message,
}
if metadata:
data["metadata"] = metadata.model_dump()
# Load existing data to preserve created_at
if meta_path.exists():
with meta_path.open() as f:
existing = json.load(f)
data["created_at"] = existing.get("created_at", data["created_at"])
with meta_path.open("w") as f:
json.dump(data, f, indent=2, default=str)
return data
def load_metadata(self, video_id: str) -> dict | None:
"""Load video metadata from JSON file."""
meta_path = self.get_metadata_path(video_id)
if not meta_path.exists():
return None
with meta_path.open() as f:
return json.load(f)
def update_status(
self,
video_id: str,
status: VideoStatus,
progress: int = 0,
error_message: str | None = None,
) -> dict | None:
"""Update video status in metadata."""
meta = self.load_metadata(video_id)
if not meta:
return None
meta["status"] = status.value
meta["progress"] = progress
meta["updated_at"] = datetime.now(UTC).isoformat()
if error_message:
meta["error_message"] = error_message
meta_path = self.get_metadata_path(video_id)
with meta_path.open("w") as f:
json.dump(meta, f, indent=2, default=str)
return meta
def delete_video(self, video_id: str) -> bool:
"""Delete all files for a video."""
upload_dir = self.uploads_path / video_id
converted_dir = self.converted_path / video_id
deleted = False
if upload_dir.exists():
shutil.rmtree(upload_dir)
deleted = True
if converted_dir.exists():
shutil.rmtree(converted_dir)
deleted = True
return deleted
def get_video_ids(self) -> list[str]:
"""Get all video IDs."""
ids = set()
if self.uploads_path.exists():
ids.update(d.name for d in self.uploads_path.iterdir() if d.is_dir())
if self.converted_path.exists():
ids.update(d.name for d in self.converted_path.iterdir() if d.is_dir())
return sorted(ids)
def is_writable(self) -> bool:
"""Check if storage is writable."""
test_file = self.base_path / ".write_test"
try:
test_file.write_text("test")
test_file.unlink()
return True
except (OSError, PermissionError):
return False
def get_disk_usage(self) -> dict:
"""Get disk usage statistics."""
total, used, free = shutil.disk_usage(self.base_path)
return {
"total_gb": round(total / (1024**3), 2),
"used_gb": round(used / (1024**3), 2),
"free_gb": round(free / (1024**3), 2),
"percent_used": round((used / total) * 100, 1),
}

1
app/tasks/__init__.py Executable file
View File

@@ -0,0 +1 @@
# Tasks Module

219
app/tasks/video_tasks.py Executable file
View File

@@ -0,0 +1,219 @@
"""
Celery Tasks for Video Processing
"""
import logging
from datetime import UTC, datetime
import httpx
from app.celery_app import celery_app
from app.config import get_settings
from app.models.schemas import VideoMetadata, VideoStatus, WebhookPayload
from app.services.converter import ConverterService
from app.services.storage import StorageService
logger = logging.getLogger(__name__)
def send_webhook(payload: WebhookPayload) -> bool:
"""Send webhook notification to WordPress."""
settings = get_settings()
if not settings.wordpress_webhook_url:
logger.warning("WordPress webhook URL not configured")
return False
headers = {"Content-Type": "application/json"}
if settings.wordpress_api_key:
headers["X-API-Key"] = settings.wordpress_api_key
try:
with httpx.Client(timeout=30.0) as client:
response = client.post(
settings.wordpress_webhook_url,
json=payload.model_dump(mode="json"),
headers=headers,
)
response.raise_for_status()
logger.info(f"Webhook sent successfully: {payload.event}")
return True
except httpx.HTTPError as e:
logger.error(f"Webhook failed: {e}")
return False
@celery_app.task(bind=True, max_retries=3)
def process_video(self, video_id: str, input_filename: str) -> dict:
"""
Process uploaded video:
1. Extract metadata
2. Generate thumbnail
3. Convert to HLS
4. Send webhook on completion
"""
storage = StorageService()
converter = ConverterService()
input_path = storage.get_upload_path(video_id, input_filename)
output_dir = storage.get_converted_path(video_id)
try:
# Update status to processing
meta = storage.load_metadata(video_id)
if not meta:
raise RuntimeError(f"Metadata not found for video {video_id}")
storage.update_status(video_id, VideoStatus.PROCESSING, progress=5)
# Send progress webhook
send_webhook(
WebhookPayload(
event="video.progress",
video_id=video_id,
kurs_id=meta["kurs_id"],
status=VideoStatus.PROCESSING,
progress=5,
timestamp=datetime.now(UTC),
)
)
# Step 1: Extract metadata (10%)
logger.info(f"Extracting metadata for {video_id}")
try:
video_metadata = converter.extract_metadata(input_path)
storage.update_status(video_id, VideoStatus.PROCESSING, progress=10)
except Exception as e:
logger.error(f"Metadata extraction failed: {e}")
video_metadata = VideoMetadata()
# Step 2: Generate thumbnail (20%)
logger.info(f"Generating thumbnail for {video_id}")
thumbnail_path = storage.get_thumbnail_path(video_id)
thumbnail_success = converter.generate_thumbnail(
input_path,
thumbnail_path,
time_offset=min(5.0, video_metadata.duration_seconds / 2),
)
storage.update_status(video_id, VideoStatus.PROCESSING, progress=20)
if thumbnail_success:
logger.info(f"Thumbnail generated: {thumbnail_path}")
else:
logger.warning(f"Thumbnail generation failed for {video_id}")
# Step 3: Convert to HLS (20% - 90%)
logger.info(f"Starting HLS conversion for {video_id}")
# Progress callback for conversion
def update_progress(progress: int):
# Map 0-100 to 20-90
mapped_progress = 20 + int(progress * 0.7)
storage.update_status(
video_id, VideoStatus.PROCESSING, progress=mapped_progress
)
result = converter.convert_to_hls(
input_path=input_path,
output_dir=output_dir,
progress_callback=update_progress,
)
if not result["success"]:
raise RuntimeError(f"HLS conversion failed: {result.get('error')}")
# Step 4: Finalize (100%)
logger.info(f"Conversion complete for {video_id}")
# Save final metadata
storage.save_metadata(
video_id=video_id,
kurs_id=meta["kurs_id"],
title=meta["title"],
status=VideoStatus.READY,
metadata=video_metadata,
)
storage.update_status(video_id, VideoStatus.READY, progress=100)
# Build thumbnail URL
settings = get_settings()
if settings.environment == "production":
base_url = f"https://{settings.video_domain}"
else:
base_url = "http://localhost:8500"
thumbnail_url = (
f"{base_url}/api/v1/videos/{video_id}/thumbnail"
if thumbnail_success
else None
)
# Send completion webhook
send_webhook(
WebhookPayload(
event="video.ready",
video_id=video_id,
kurs_id=meta["kurs_id"],
status=VideoStatus.READY,
progress=100,
metadata=video_metadata,
thumbnail_url=thumbnail_url,
timestamp=datetime.now(UTC),
)
)
# Optionally delete source file to save space
# input_path.unlink()
return {
"success": True,
"video_id": video_id,
"qualities": result["qualities"],
"duration": video_metadata.duration_seconds,
}
except Exception as e:
logger.exception(f"Video processing failed for {video_id}")
# Update status to error
storage.update_status(video_id, VideoStatus.ERROR, error_message=str(e))
# Load metadata for webhook
meta = storage.load_metadata(video_id) or {}
# Send error webhook
send_webhook(
WebhookPayload(
event="video.error",
video_id=video_id,
kurs_id=meta.get("kurs_id", 0),
status=VideoStatus.ERROR,
error_message=str(e),
timestamp=datetime.now(UTC),
)
)
# Retry with exponential backoff
raise self.retry(exc=e, countdown=60 * (2**self.request.retries)) from e
@celery_app.task
def cleanup_old_uploads(days: int = 7) -> dict:
"""Clean up old upload files that were never processed."""
from datetime import timedelta
storage = StorageService()
deleted_count = 0
cutoff = datetime.now(UTC) - timedelta(days=days)
for video_id in storage.get_video_ids():
meta = storage.load_metadata(video_id)
if meta:
# Check if stuck in pending/uploading
if meta.get("status") in ["pending", "uploading"]:
created = datetime.fromisoformat(meta.get("created_at", ""))
if created.replace(tzinfo=UTC) < cutoff:
storage.delete_video(video_id)
deleted_count += 1
logger.info(f"Deleted stale video: {video_id}")
return {"deleted_count": deleted_count}