"""YouTube RSS feed parser module. This module handles fetching and parsing YouTube channel RSS feeds, with filtering capabilities to exclude unwanted content like Shorts. """ from datetime import datetime import feedparser from typing import Dict, List, Optional import re import yt_dlp from sqlalchemy.orm import Session from sqlalchemy.exc import IntegrityError from models import Channel, VideoEntry class FeedEntry: """Represents a single entry in a YouTube RSS feed.""" def __init__(self, title: str, video_url: str, video_id: str, published_at: datetime, thumbnail_url: Optional[str] = None, description: Optional[str] = None): self.title = title self.video_url = video_url self.video_id = video_id self.published_at = published_at self.thumbnail_url = thumbnail_url self.description = description def to_dict(self) -> Dict: """Convert entry to dictionary.""" return { "title": self.title, "video_url": self.video_url, "video_id": self.video_id, "published_at": self.published_at.isoformat(), "thumbnail_url": self.thumbnail_url, "description": self.description } class YouTubeFeedParser: """Parser for YouTube channel RSS feeds.""" BASE_URL = "https://www.youtube.com/feeds/videos.xml" def __init__(self, channel_id: str): """Initialize parser with a YouTube channel ID. Args: channel_id: The YouTube channel ID to fetch feeds from """ self.channel_id = channel_id self.url = f"{self.BASE_URL}?channel_id={channel_id}" def fetch_feed(self, filter_shorts: bool = True) -> Optional[Dict]: """Fetch and parse the RSS feed. Args: filter_shorts: If True, exclude YouTube Shorts from results Returns: Dictionary containing feed metadata and entries, or None if fetch fails """ feed = feedparser.parse(self.url) if feed.status != 200: return None entries = [] for entry in feed.entries: if filter_shorts and "shorts" in entry.link.lower(): continue # Extract video ID from URL video_id = self._extract_video_id(entry.link) if not video_id: continue # Get thumbnail URL (YouTube provides this in media:group) thumbnail_url = None if hasattr(entry, 'media_thumbnail') and entry.media_thumbnail: thumbnail_url = entry.media_thumbnail[0]['url'] # Get description description = None if hasattr(entry, 'summary'): description = entry.summary # Parse published date published_at = datetime(*entry.published_parsed[:6]) entries.append(FeedEntry( title=entry.title, video_url=entry.link, video_id=video_id, published_at=published_at, thumbnail_url=thumbnail_url, description=description )) return { "feed_title": feed.feed.title, "feed_link": feed.feed.link, "rss_url": self.url, "entries": [entry.to_dict() for entry in entries] } @staticmethod def _extract_video_id(url: str) -> Optional[str]: """Extract video ID from YouTube URL. Args: url: YouTube video URL Returns: Video ID or None if not found """ # Match patterns like: youtube.com/watch?v=VIDEO_ID match = re.search(r'[?&]v=([a-zA-Z0-9_-]{11})', url) if match: return match.group(1) # Match patterns like: youtu.be/VIDEO_ID match = re.search(r'youtu\.be/([a-zA-Z0-9_-]{11})', url) if match: return match.group(1) return None def save_to_db(self, db_session: Session, feed_data: Dict, user_id: int) -> Channel: """Save feed data to the database. Args: db_session: SQLAlchemy database session feed_data: Dictionary containing feed metadata and entries (from fetch_feed) user_id: ID of the user subscribing to this channel Returns: The Channel model instance This method uses upsert logic: - Updates existing channel if it exists for this user - Creates new channel if it doesn't exist - Only inserts new video entries (ignores duplicates based on video_id and channel_id) """ # Get or create channel for this user channel = db_session.query(Channel).filter_by( user_id=user_id, channel_id=self.channel_id ).first() if channel: # Update existing channel channel.title = feed_data["feed_title"] channel.link = feed_data["feed_link"] channel.rss_url = feed_data["rss_url"] channel.last_fetched_at = datetime.utcnow() else: # Create new channel channel = Channel( user_id=user_id, channel_id=self.channel_id, title=feed_data["feed_title"], link=feed_data["feed_link"], rss_url=feed_data["rss_url"], last_fetched_at=datetime.utcnow() ) db_session.add(channel) db_session.flush() # Get the channel ID # Add video entries (ignore duplicates) for entry_data in feed_data["entries"]: # Check if video already exists for this channel existing = db_session.query(VideoEntry).filter_by( channel_id=channel.id, video_id=entry_data["video_id"] ).first() if not existing: # Parse published_at if it's a string published_at = entry_data["published_at"] if isinstance(published_at, str): published_at = datetime.fromisoformat(published_at.replace('Z', '+00:00')) video = VideoEntry( channel_id=channel.id, video_id=entry_data["video_id"], title=entry_data["title"], video_url=entry_data["video_url"], thumbnail_url=entry_data.get("thumbnail_url"), description=entry_data.get("description"), published_at=published_at, created_at=datetime.utcnow() ) db_session.add(video) db_session.commit() return channel def fetch_single_video(video_url: str) -> Optional[Dict]: """Fetch metadata for a single YouTube video using yt-dlp. Args: video_url: YouTube video URL Returns: Dictionary with video metadata or None if fetch fails """ # Check if URL contains "shorts" - reject shorts if "shorts" in video_url.lower(): return None try: ydl_opts = { 'quiet': True, 'no_warnings': True, 'extract_flat': False, } with yt_dlp.YoutubeDL(ydl_opts) as ydl: info = ydl.extract_info(video_url, download=False) if not info: return None # Extract video ID from URL video_id = info.get('id') if not video_id: return None # Get channel info channel_id = info.get('channel_id') channel_name = info.get('channel') or info.get('uploader') channel_url = info.get('channel_url') or f"https://www.youtube.com/channel/{channel_id}" # Get thumbnail - prefer maxresdefault, fall back to other qualities thumbnail_url = None if info.get('thumbnails'): # Get highest quality thumbnail thumbnail_url = info['thumbnails'][-1].get('url') elif info.get('thumbnail'): thumbnail_url = info.get('thumbnail') # Parse upload date upload_date_str = info.get('upload_date') if upload_date_str: # Format: YYYYMMDD published_at = datetime.strptime(upload_date_str, '%Y%m%d') else: published_at = datetime.utcnow() return { 'video_id': video_id, 'title': info.get('title'), 'video_url': f"https://www.youtube.com/watch?v={video_id}", 'description': info.get('description'), 'thumbnail_url': thumbnail_url, 'published_at': published_at, 'channel_id': channel_id, 'channel_name': channel_name, 'channel_url': channel_url, } except Exception as e: print(f"Error fetching video metadata: {e}") return None def save_single_video_to_db(db_session: Session, video_data: Dict, user_id: int) -> VideoEntry: """Save a single video to the database. Args: db_session: SQLAlchemy database session video_data: Dictionary containing video metadata (from fetch_single_video) user_id: ID of the user adding this video Returns: The VideoEntry model instance This method: - Creates channel if it doesn't exist for this user - Creates video entry if it doesn't exist - Returns existing video if already in database """ # Get or create channel for this user channel = db_session.query(Channel).filter_by( user_id=user_id, channel_id=video_data['channel_id'] ).first() if not channel: # Create new channel channel = Channel( user_id=user_id, channel_id=video_data['channel_id'], title=video_data['channel_name'], link=video_data['channel_url'], rss_url=f"https://www.youtube.com/feeds/videos.xml?channel_id={video_data['channel_id']}", last_fetched_at=datetime.utcnow() ) db_session.add(channel) db_session.flush() # Get the channel ID # Check if video already exists for this channel existing_video = db_session.query(VideoEntry).filter_by( channel_id=channel.id, video_id=video_data['video_id'] ).first() if existing_video: return existing_video # Create new video entry video = VideoEntry( channel_id=channel.id, video_id=video_data['video_id'], title=video_data['title'], video_url=video_data['video_url'], thumbnail_url=video_data.get('thumbnail_url'), description=video_data.get('description'), published_at=video_data['published_at'], created_at=datetime.utcnow() ) db_session.add(video) db_session.commit() return video