Add async video downloads with yt-dlp and Celery

- Added yt-dlp, celery, and redis dependencies to pyproject.toml
- Extended VideoEntry model with download tracking fields:
  - download_status (enum: pending, downloading, completed, failed)
  - download_path, download_started_at, download_completed_at
  - download_error, file_size
- Created celery_app.py with Redis broker configuration
- Created download_service.py with async download tasks:
  - download_video() task downloads as MP4 format
  - Configured yt-dlp for best MP4 quality with fallback
  - Automatic retries on failure (max 3 attempts)
  - Progress tracking and database updates
- Added Flask API endpoints in main.py:
  - POST /api/download/<video_id> to trigger download
  - GET /api/download/status/<video_id> to check status
  - POST /api/download/batch for bulk downloads
- Generated and applied Alembic migration for new fields
- Created downloads/ directory for video storage
- Updated .gitignore to exclude downloads/ directory
- Updated CLAUDE.md with comprehensive documentation:
  - Redis and Celery setup instructions
  - Download workflow and architecture
  - yt-dlp configuration details
  - New API endpoint examples

Videos are downloaded as MP4 files using Celery workers.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-11-26 14:04:30 -05:00
parent 4892bec986
commit 2305dfddb1
9 changed files with 674 additions and 13 deletions

165
download_service.py Normal file
View File

@@ -0,0 +1,165 @@
"""Video download service using yt-dlp and Celery."""
import os
from datetime import datetime
from pathlib import Path
import yt_dlp
from celery import Task
from celery_app import celery_app
from database import SessionLocal
from models import VideoEntry, DownloadStatus
# Download configuration
DOWNLOAD_DIR = Path("downloads")
DOWNLOAD_DIR.mkdir(exist_ok=True)
class DatabaseTask(Task):
"""Base task with database session management."""
_session = None
def after_return(self, *args, **kwargs):
"""Close database session after task completion."""
if self._session is not None:
self._session.close()
@property
def session(self):
"""Get or create database session."""
if self._session is None:
self._session = SessionLocal()
return self._session
@celery_app.task(base=DatabaseTask, bind=True, max_retries=3)
def download_video(self, video_id: int) -> dict:
"""Download a video using yt-dlp.
Args:
video_id: Database ID of the VideoEntry to download
Returns:
Dictionary with download result information
"""
session = self.session
# Get video entry from database
video = session.query(VideoEntry).filter_by(id=video_id).first()
if not video:
return {"error": f"Video ID {video_id} not found"}
# Update status to downloading
video.download_status = DownloadStatus.DOWNLOADING
video.download_started_at = datetime.utcnow()
session.commit()
try:
# Extract video ID from YouTube URL
youtube_url = video.link
# Configure yt-dlp options for MP4 output
ydl_opts = {
'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best',
'outtmpl': str(DOWNLOAD_DIR / f'{video_id}_%(title)s.%(ext)s'),
'merge_output_format': 'mp4', # Ensure output is MP4
'postprocessors': [{
'key': 'FFmpegVideoConvertor',
'preferedformat': 'mp4', # Convert to MP4 if needed
}],
'quiet': False,
'no_warnings': False,
'progress_hooks': [lambda d: _progress_hook(d, video_id, session)],
}
# Download the video
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(youtube_url, download=True)
filename = ydl.prepare_filename(info)
# Handle cases where extension might change
if not filename.endswith('.mp4'):
# Find the actual file with .mp4 extension
base = os.path.splitext(filename)[0]
mp4_file = f"{base}.mp4"
if os.path.exists(mp4_file):
filename = mp4_file
# Get file size
file_size = os.path.getsize(filename) if os.path.exists(filename) else None
# Update video entry with success
video.download_status = DownloadStatus.COMPLETED
video.download_path = filename
video.download_completed_at = datetime.utcnow()
video.file_size = file_size
video.download_error = None
session.commit()
return {
"video_id": video_id,
"status": "completed",
"path": filename,
"file_size": file_size
}
except Exception as e:
# Update video entry with error
video.download_status = DownloadStatus.FAILED
video.download_error = str(e)
video.download_completed_at = datetime.utcnow()
session.commit()
# Retry if we haven't exceeded max retries
if self.request.retries < self.max_retries:
raise self.retry(exc=e, countdown=60) # Retry after 60 seconds
return {
"video_id": video_id,
"status": "failed",
"error": str(e)
}
def _progress_hook(d: dict, video_id: int, session) -> None:
"""Progress hook for yt-dlp downloads.
Args:
d: Progress dictionary from yt-dlp
video_id: Database ID of the video
session: Database session
"""
if d['status'] == 'finished':
print(f"Download finished for video {video_id}, now converting...")
elif d['status'] == 'downloading':
if 'total_bytes' in d:
percent = d['downloaded_bytes'] / d['total_bytes'] * 100
print(f"Downloading video {video_id}: {percent:.1f}%")
@celery_app.task
def download_videos_batch(video_ids: list[int]) -> dict:
"""Download multiple videos in batch.
Args:
video_ids: List of VideoEntry IDs to download
Returns:
Dictionary with batch download results
"""
results = []
for video_id in video_ids:
# Queue each download as a separate task
task = download_video.delay(video_id)
results.append({
"video_id": video_id,
"task_id": task.id
})
return {
"total_queued": len(results),
"tasks": results
}