"""Video download service using yt-dlp and Celery.""" import os from datetime import datetime from pathlib import Path import yt_dlp from celery import Task from celery_app import celery_app from database import SessionLocal from models import VideoEntry, DownloadStatus # Download configuration DOWNLOAD_DIR = Path("downloads") DOWNLOAD_DIR.mkdir(exist_ok=True) class DatabaseTask(Task): """Base task with database session management.""" _session = None def after_return(self, *args, **kwargs): """Close database session after task completion.""" if self._session is not None: self._session.close() @property def session(self): """Get or create database session.""" if self._session is None: self._session = SessionLocal() return self._session @celery_app.task(base=DatabaseTask, bind=True, max_retries=3) def download_video(self, video_id: int) -> dict: """Download a video using yt-dlp. Args: video_id: Database ID of the VideoEntry to download Returns: Dictionary with download result information """ session = self.session # Get video entry from database video = session.query(VideoEntry).filter_by(id=video_id).first() if not video: return {"error": f"Video ID {video_id} not found"} # Update status to downloading video.download_status = DownloadStatus.DOWNLOADING video.download_started_at = datetime.utcnow() session.commit() try: # Get video URL from database youtube_url = video.video_url # Configure yt-dlp options for MP4 output ydl_opts = { 'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best', 'outtmpl': str(DOWNLOAD_DIR / f'{video_id}_%(title)s.%(ext)s'), 'merge_output_format': 'mp4', # Ensure output is MP4 'postprocessors': [{ 'key': 'FFmpegVideoConvertor', 'preferedformat': 'mp4', # Convert to MP4 if needed }], 'quiet': False, 'no_warnings': False, 'progress_hooks': [lambda d: _progress_hook(d, video_id, session)], } # Download the video with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(youtube_url, download=True) filename = ydl.prepare_filename(info) # Handle cases where extension might change if not filename.endswith('.mp4'): # Find the actual file with .mp4 extension base = os.path.splitext(filename)[0] mp4_file = f"{base}.mp4" if os.path.exists(mp4_file): filename = mp4_file # Get file size file_size = os.path.getsize(filename) if os.path.exists(filename) else None # Update video entry with success video.download_status = DownloadStatus.COMPLETED video.download_path = filename video.download_completed_at = datetime.utcnow() video.file_size = file_size video.download_error = None session.commit() return { "video_id": video_id, "status": "completed", "path": filename, "file_size": file_size } except Exception as e: # Update video entry with error video.download_status = DownloadStatus.FAILED video.download_error = str(e) video.download_completed_at = datetime.utcnow() session.commit() # Retry if we haven't exceeded max retries if self.request.retries < self.max_retries: raise self.retry(exc=e, countdown=60) # Retry after 60 seconds return { "video_id": video_id, "status": "failed", "error": str(e) } def _progress_hook(d: dict, video_id: int, session) -> None: """Progress hook for yt-dlp downloads. Args: d: Progress dictionary from yt-dlp video_id: Database ID of the video session: Database session """ if d['status'] == 'finished': print(f"Download finished for video {video_id}, now converting...") elif d['status'] == 'downloading': if 'total_bytes' in d: percent = d['downloaded_bytes'] / d['total_bytes'] * 100 print(f"Downloading video {video_id}: {percent:.1f}%") @celery_app.task def download_videos_batch(video_ids: list[int]) -> dict: """Download multiple videos in batch. Args: video_ids: List of VideoEntry IDs to download Returns: Dictionary with batch download results """ results = [] for video_id in video_ids: # Queue each download as a separate task task = download_video.delay(video_id) results.append({ "video_id": video_id, "task_id": task.id }) return { "total_queued": len(results), "tasks": results }