Files
yottob/models.py
Ryan Chen 2305dfddb1 Add async video downloads with yt-dlp and Celery
- Added yt-dlp, celery, and redis dependencies to pyproject.toml
- Extended VideoEntry model with download tracking fields:
  - download_status (enum: pending, downloading, completed, failed)
  - download_path, download_started_at, download_completed_at
  - download_error, file_size
- Created celery_app.py with Redis broker configuration
- Created download_service.py with async download tasks:
  - download_video() task downloads as MP4 format
  - Configured yt-dlp for best MP4 quality with fallback
  - Automatic retries on failure (max 3 attempts)
  - Progress tracking and database updates
- Added Flask API endpoints in main.py:
  - POST /api/download/<video_id> to trigger download
  - GET /api/download/status/<video_id> to check status
  - POST /api/download/batch for bulk downloads
- Generated and applied Alembic migration for new fields
- Created downloads/ directory for video storage
- Updated .gitignore to exclude downloads/ directory
- Updated CLAUDE.md with comprehensive documentation:
  - Redis and Celery setup instructions
  - Download workflow and architecture
  - yt-dlp configuration details
  - New API endpoint examples

Videos are downloaded as MP4 files using Celery workers.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-26 14:04:30 -05:00

91 lines
3.5 KiB
Python

"""Database models for YouTube feed storage."""
from datetime import datetime
from typing import List, Optional
from enum import Enum as PyEnum
from sqlalchemy import String, DateTime, ForeignKey, Index, Enum, BigInteger
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
class DownloadStatus(PyEnum):
"""Download status enumeration."""
PENDING = "pending"
DOWNLOADING = "downloading"
COMPLETED = "completed"
FAILED = "failed"
class Base(DeclarativeBase):
"""Base class for all database models."""
pass
class Channel(Base):
"""YouTube channel model."""
__tablename__ = "channels"
id: Mapped[int] = mapped_column(primary_key=True)
channel_id: Mapped[str] = mapped_column(String(50), unique=True, nullable=False, index=True)
title: Mapped[str] = mapped_column(String(200), nullable=False)
link: Mapped[str] = mapped_column(String(500), nullable=False)
last_fetched: Mapped[datetime] = mapped_column(DateTime, nullable=False, default=datetime.utcnow)
# Relationship to video entries
videos: Mapped[List["VideoEntry"]] = relationship("VideoEntry", back_populates="channel", cascade="all, delete-orphan")
def __repr__(self) -> str:
return f"<Channel(id={self.id}, channel_id='{self.channel_id}', title='{self.title}')>"
class VideoEntry(Base):
"""YouTube video entry model."""
__tablename__ = "video_entries"
id: Mapped[int] = mapped_column(primary_key=True)
channel_id: Mapped[int] = mapped_column(ForeignKey("channels.id"), nullable=False)
title: Mapped[str] = mapped_column(String(500), nullable=False)
link: Mapped[str] = mapped_column(String(500), unique=True, nullable=False)
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, default=datetime.utcnow)
# Download tracking fields
download_status: Mapped[DownloadStatus] = mapped_column(
Enum(DownloadStatus),
nullable=False,
default=DownloadStatus.PENDING
)
download_path: Mapped[Optional[str]] = mapped_column(String(1000), nullable=True)
download_started_at: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True)
download_completed_at: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True)
download_error: Mapped[Optional[str]] = mapped_column(String(2000), nullable=True)
file_size: Mapped[Optional[int]] = mapped_column(BigInteger, nullable=True)
# Relationship to channel
channel: Mapped["Channel"] = relationship("Channel", back_populates="videos")
# Index for faster queries
__table_args__ = (
Index('idx_channel_created', 'channel_id', 'created_at'),
Index('idx_download_status', 'download_status'),
)
def __repr__(self) -> str:
return f"<VideoEntry(id={self.id}, title='{self.title}', link='{self.link}', status='{self.download_status.value}')>"
def to_dict(self) -> dict:
"""Convert to dictionary for API responses."""
return {
"id": self.id,
"title": self.title,
"link": self.link,
"created_at": self.created_at.isoformat(),
"download_status": self.download_status.value,
"download_path": self.download_path,
"download_started_at": self.download_started_at.isoformat() if self.download_started_at else None,
"download_completed_at": self.download_completed_at.isoformat() if self.download_completed_at else None,
"download_error": self.download_error,
"file_size": self.file_size
}