Files
yottob/feed_parser.py
Ryan Chen acb2ec0654 Implement UI improvements and download management features
- Switch to light mode with black and white color scheme
- Simplify channel subscription to use channel ID only instead of RSS URL
- Add Downloads page to track all video download jobs
- Fix Flask-Login session management bug in user loader
- Always filter YouTube Shorts from feeds (case-insensitive)
- Fix download service video URL attribute error
- Fix watch page enum comparison for download status display

UI Changes:
- Update CSS to pure black/white/grayscale theme
- Remove colored text and buttons
- Use underlines for hover states instead of color changes
- Improve visual hierarchy with grayscale shades

Channel Subscription:
- Accept channel ID directly instead of full RSS URL
- Add validation for channel ID format (UC/UU prefix)
- Update help text and examples for easier onboarding

Downloads Page:
- New route at /downloads showing all video download jobs
- Display status, progress, and metadata for each download
- Sortable by status (downloading, pending, failed, completed)
- Actions to download, retry, or watch videos
- Responsive grid layout with thumbnails

Bug Fixes:
- Fix user loader to properly use database session context manager
- Fix download service accessing wrong attribute (link → video_url)
- Fix watch page template enum value comparisons
- Fix session detachment issues when accessing channel data

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-26 15:36:26 -05:00

201 lines
6.6 KiB
Python

"""YouTube RSS feed parser module.
This module handles fetching and parsing YouTube channel RSS feeds,
with filtering capabilities to exclude unwanted content like Shorts.
"""
from datetime import datetime
import feedparser
from typing import Dict, List, Optional
import re
from sqlalchemy.orm import Session
from sqlalchemy.exc import IntegrityError
from models import Channel, VideoEntry
class FeedEntry:
"""Represents a single entry in a YouTube RSS feed."""
def __init__(self, title: str, video_url: str, video_id: str,
published_at: datetime, thumbnail_url: Optional[str] = None,
description: Optional[str] = None):
self.title = title
self.video_url = video_url
self.video_id = video_id
self.published_at = published_at
self.thumbnail_url = thumbnail_url
self.description = description
def to_dict(self) -> Dict:
"""Convert entry to dictionary."""
return {
"title": self.title,
"video_url": self.video_url,
"video_id": self.video_id,
"published_at": self.published_at.isoformat(),
"thumbnail_url": self.thumbnail_url,
"description": self.description
}
class YouTubeFeedParser:
"""Parser for YouTube channel RSS feeds."""
BASE_URL = "https://www.youtube.com/feeds/videos.xml"
def __init__(self, channel_id: str):
"""Initialize parser with a YouTube channel ID.
Args:
channel_id: The YouTube channel ID to fetch feeds from
"""
self.channel_id = channel_id
self.url = f"{self.BASE_URL}?channel_id={channel_id}"
def fetch_feed(self, filter_shorts: bool = True) -> Optional[Dict]:
"""Fetch and parse the RSS feed.
Args:
filter_shorts: If True, exclude YouTube Shorts from results
Returns:
Dictionary containing feed metadata and entries, or None if fetch fails
"""
feed = feedparser.parse(self.url)
if feed.status != 200:
return None
entries = []
for entry in feed.entries:
if filter_shorts and "shorts" in entry.link.lower():
continue
# Extract video ID from URL
video_id = self._extract_video_id(entry.link)
if not video_id:
continue
# Get thumbnail URL (YouTube provides this in media:group)
thumbnail_url = None
if hasattr(entry, 'media_thumbnail') and entry.media_thumbnail:
thumbnail_url = entry.media_thumbnail[0]['url']
# Get description
description = None
if hasattr(entry, 'summary'):
description = entry.summary
# Parse published date
published_at = datetime(*entry.published_parsed[:6])
entries.append(FeedEntry(
title=entry.title,
video_url=entry.link,
video_id=video_id,
published_at=published_at,
thumbnail_url=thumbnail_url,
description=description
))
return {
"feed_title": feed.feed.title,
"feed_link": feed.feed.link,
"rss_url": self.url,
"entries": [entry.to_dict() for entry in entries]
}
@staticmethod
def _extract_video_id(url: str) -> Optional[str]:
"""Extract video ID from YouTube URL.
Args:
url: YouTube video URL
Returns:
Video ID or None if not found
"""
# Match patterns like: youtube.com/watch?v=VIDEO_ID
match = re.search(r'[?&]v=([a-zA-Z0-9_-]{11})', url)
if match:
return match.group(1)
# Match patterns like: youtu.be/VIDEO_ID
match = re.search(r'youtu\.be/([a-zA-Z0-9_-]{11})', url)
if match:
return match.group(1)
return None
def save_to_db(self, db_session: Session, feed_data: Dict, user_id: int) -> Channel:
"""Save feed data to the database.
Args:
db_session: SQLAlchemy database session
feed_data: Dictionary containing feed metadata and entries (from fetch_feed)
user_id: ID of the user subscribing to this channel
Returns:
The Channel model instance
This method uses upsert logic:
- Updates existing channel if it exists for this user
- Creates new channel if it doesn't exist
- Only inserts new video entries (ignores duplicates based on video_id and channel_id)
"""
# Get or create channel for this user
channel = db_session.query(Channel).filter_by(
user_id=user_id,
channel_id=self.channel_id
).first()
if channel:
# Update existing channel
channel.title = feed_data["feed_title"]
channel.link = feed_data["feed_link"]
channel.rss_url = feed_data["rss_url"]
channel.last_fetched_at = datetime.utcnow()
else:
# Create new channel
channel = Channel(
user_id=user_id,
channel_id=self.channel_id,
title=feed_data["feed_title"],
link=feed_data["feed_link"],
rss_url=feed_data["rss_url"],
last_fetched_at=datetime.utcnow()
)
db_session.add(channel)
db_session.flush() # Get the channel ID
# Add video entries (ignore duplicates)
for entry_data in feed_data["entries"]:
# Check if video already exists for this channel
existing = db_session.query(VideoEntry).filter_by(
channel_id=channel.id,
video_id=entry_data["video_id"]
).first()
if not existing:
# Parse published_at if it's a string
published_at = entry_data["published_at"]
if isinstance(published_at, str):
published_at = datetime.fromisoformat(published_at.replace('Z', '+00:00'))
video = VideoEntry(
channel_id=channel.id,
video_id=entry_data["video_id"],
title=entry_data["title"],
video_url=entry_data["video_url"],
thumbnail_url=entry_data.get("thumbnail_url"),
description=entry_data.get("description"),
published_at=published_at,
created_at=datetime.utcnow()
)
db_session.add(video)
db_session.commit()
return channel