- Added yt-dlp, celery, and redis dependencies to pyproject.toml - Extended VideoEntry model with download tracking fields: - download_status (enum: pending, downloading, completed, failed) - download_path, download_started_at, download_completed_at - download_error, file_size - Created celery_app.py with Redis broker configuration - Created download_service.py with async download tasks: - download_video() task downloads as MP4 format - Configured yt-dlp for best MP4 quality with fallback - Automatic retries on failure (max 3 attempts) - Progress tracking and database updates - Added Flask API endpoints in main.py: - POST /api/download/<video_id> to trigger download - GET /api/download/status/<video_id> to check status - POST /api/download/batch for bulk downloads - Generated and applied Alembic migration for new fields - Created downloads/ directory for video storage - Updated .gitignore to exclude downloads/ directory - Updated CLAUDE.md with comprehensive documentation: - Redis and Celery setup instructions - Download workflow and architecture - yt-dlp configuration details - New API endpoint examples Videos are downloaded as MP4 files using Celery workers. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
166 lines
4.9 KiB
Python
166 lines
4.9 KiB
Python
"""Video download service using yt-dlp and Celery."""
|
|
|
|
import os
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
import yt_dlp
|
|
from celery import Task
|
|
|
|
from celery_app import celery_app
|
|
from database import SessionLocal
|
|
from models import VideoEntry, DownloadStatus
|
|
|
|
|
|
# Download configuration
|
|
DOWNLOAD_DIR = Path("downloads")
|
|
DOWNLOAD_DIR.mkdir(exist_ok=True)
|
|
|
|
|
|
class DatabaseTask(Task):
|
|
"""Base task with database session management."""
|
|
|
|
_session = None
|
|
|
|
def after_return(self, *args, **kwargs):
|
|
"""Close database session after task completion."""
|
|
if self._session is not None:
|
|
self._session.close()
|
|
|
|
@property
|
|
def session(self):
|
|
"""Get or create database session."""
|
|
if self._session is None:
|
|
self._session = SessionLocal()
|
|
return self._session
|
|
|
|
|
|
@celery_app.task(base=DatabaseTask, bind=True, max_retries=3)
|
|
def download_video(self, video_id: int) -> dict:
|
|
"""Download a video using yt-dlp.
|
|
|
|
Args:
|
|
video_id: Database ID of the VideoEntry to download
|
|
|
|
Returns:
|
|
Dictionary with download result information
|
|
"""
|
|
session = self.session
|
|
|
|
# Get video entry from database
|
|
video = session.query(VideoEntry).filter_by(id=video_id).first()
|
|
if not video:
|
|
return {"error": f"Video ID {video_id} not found"}
|
|
|
|
# Update status to downloading
|
|
video.download_status = DownloadStatus.DOWNLOADING
|
|
video.download_started_at = datetime.utcnow()
|
|
session.commit()
|
|
|
|
try:
|
|
# Extract video ID from YouTube URL
|
|
youtube_url = video.link
|
|
|
|
# Configure yt-dlp options for MP4 output
|
|
ydl_opts = {
|
|
'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best',
|
|
'outtmpl': str(DOWNLOAD_DIR / f'{video_id}_%(title)s.%(ext)s'),
|
|
'merge_output_format': 'mp4', # Ensure output is MP4
|
|
'postprocessors': [{
|
|
'key': 'FFmpegVideoConvertor',
|
|
'preferedformat': 'mp4', # Convert to MP4 if needed
|
|
}],
|
|
'quiet': False,
|
|
'no_warnings': False,
|
|
'progress_hooks': [lambda d: _progress_hook(d, video_id, session)],
|
|
}
|
|
|
|
# Download the video
|
|
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
|
info = ydl.extract_info(youtube_url, download=True)
|
|
filename = ydl.prepare_filename(info)
|
|
|
|
# Handle cases where extension might change
|
|
if not filename.endswith('.mp4'):
|
|
# Find the actual file with .mp4 extension
|
|
base = os.path.splitext(filename)[0]
|
|
mp4_file = f"{base}.mp4"
|
|
if os.path.exists(mp4_file):
|
|
filename = mp4_file
|
|
|
|
# Get file size
|
|
file_size = os.path.getsize(filename) if os.path.exists(filename) else None
|
|
|
|
# Update video entry with success
|
|
video.download_status = DownloadStatus.COMPLETED
|
|
video.download_path = filename
|
|
video.download_completed_at = datetime.utcnow()
|
|
video.file_size = file_size
|
|
video.download_error = None
|
|
session.commit()
|
|
|
|
return {
|
|
"video_id": video_id,
|
|
"status": "completed",
|
|
"path": filename,
|
|
"file_size": file_size
|
|
}
|
|
|
|
except Exception as e:
|
|
# Update video entry with error
|
|
video.download_status = DownloadStatus.FAILED
|
|
video.download_error = str(e)
|
|
video.download_completed_at = datetime.utcnow()
|
|
session.commit()
|
|
|
|
# Retry if we haven't exceeded max retries
|
|
if self.request.retries < self.max_retries:
|
|
raise self.retry(exc=e, countdown=60) # Retry after 60 seconds
|
|
|
|
return {
|
|
"video_id": video_id,
|
|
"status": "failed",
|
|
"error": str(e)
|
|
}
|
|
|
|
|
|
def _progress_hook(d: dict, video_id: int, session) -> None:
|
|
"""Progress hook for yt-dlp downloads.
|
|
|
|
Args:
|
|
d: Progress dictionary from yt-dlp
|
|
video_id: Database ID of the video
|
|
session: Database session
|
|
"""
|
|
if d['status'] == 'finished':
|
|
print(f"Download finished for video {video_id}, now converting...")
|
|
elif d['status'] == 'downloading':
|
|
if 'total_bytes' in d:
|
|
percent = d['downloaded_bytes'] / d['total_bytes'] * 100
|
|
print(f"Downloading video {video_id}: {percent:.1f}%")
|
|
|
|
|
|
@celery_app.task
|
|
def download_videos_batch(video_ids: list[int]) -> dict:
|
|
"""Download multiple videos in batch.
|
|
|
|
Args:
|
|
video_ids: List of VideoEntry IDs to download
|
|
|
|
Returns:
|
|
Dictionary with batch download results
|
|
"""
|
|
results = []
|
|
for video_id in video_ids:
|
|
# Queue each download as a separate task
|
|
task = download_video.delay(video_id)
|
|
results.append({
|
|
"video_id": video_id,
|
|
"task_id": task.id
|
|
})
|
|
|
|
return {
|
|
"total_queued": len(results),
|
|
"tasks": results
|
|
}
|