"""Flask web application for YouTube RSS feed parsing.""" import os from flask import Flask, render_template, request, jsonify, redirect, url_for, flash, send_file from feed_parser import YouTubeFeedParser from database import init_db, get_db_session from models import Channel, VideoEntry, DownloadStatus from download_service import download_video, download_videos_batch from sqlalchemy import desc app = Flask(__name__) app.secret_key = os.getenv("SECRET_KEY", "dev-secret-key-change-in-production") # Default channel ID for demonstration DEFAULT_CHANNEL_ID = "UCtTWOND3uyl4tVc_FarDmpw" # Initialize database on app startup with app.app_context(): init_db() @app.route("/", methods=["GET"]) def index(): """Render the dashboard with all videos sorted by date.""" try: with get_db_session() as session: # Query all videos with their channels, sorted by published date (newest first) videos = session.query(VideoEntry).join(Channel).order_by( desc(VideoEntry.published_at) ).all() return render_template("dashboard.html", videos=videos) except Exception as e: flash(f"Error loading videos: {str(e)}", "error") return render_template("dashboard.html", videos=[]) @app.route("/channels", methods=["GET"]) def channels_page(): """Render the channels management page.""" try: with get_db_session() as session: channels = session.query(Channel).order_by(desc(Channel.last_fetched_at)).all() return render_template("channels.html", channels=channels) except Exception as e: flash(f"Error loading channels: {str(e)}", "error") return render_template("channels.html", channels=[]) @app.route("/add-channel", methods=["GET", "POST"]) def add_channel_page(): """Render the add channel page and handle channel subscription.""" if request.method == "GET": return render_template("add_channel.html") # Handle POST - add new channel rss_url = request.form.get("rss_url") if not rss_url: flash("RSS URL is required", "error") return redirect(url_for("add_channel_page")) # Extract channel_id from RSS URL # Format: https://www.youtube.com/feeds/videos.xml?channel_id=CHANNEL_ID try: if "channel_id=" in rss_url: channel_id = rss_url.split("channel_id=")[1].split("&")[0] else: flash("Invalid RSS URL format. Must contain channel_id parameter.", "error") return redirect(url_for("add_channel_page")) # Fetch feed using parser parser = YouTubeFeedParser(channel_id) result = parser.fetch_feed() if result is None: flash("Failed to fetch feed from YouTube", "error") return redirect(url_for("add_channel_page")) # Save to database with get_db_session() as session: channel = parser.save_to_db(session, result) video_count = len(result["entries"]) flash(f"Successfully subscribed to {channel.title}! Added {video_count} videos.", "success") return redirect(url_for("channels_page")) except Exception as e: flash(f"Error subscribing to channel: {str(e)}", "error") return redirect(url_for("add_channel_page")) @app.route("/watch/", methods=["GET"]) def watch_video(video_id: int): """Render the video watch page.""" try: with get_db_session() as session: video = session.query(VideoEntry).filter_by(id=video_id).first() if not video: flash("Video not found", "error") return redirect(url_for("index")) return render_template("watch.html", video=video) except Exception as e: flash(f"Error loading video: {str(e)}", "error") return redirect(url_for("index")) @app.route("/api/feed", methods=["GET"]) def get_feed(): """API endpoint to fetch YouTube channel feed and save to database. Query parameters: channel_id: YouTube channel ID (optional, uses default if not provided) filter_shorts: Whether to filter out Shorts (default: true) save: Whether to save to database (default: true) Returns: JSON response with feed data or error message """ channel_id = request.args.get("channel_id", DEFAULT_CHANNEL_ID) filter_shorts = request.args.get("filter_shorts", "true").lower() == "true" save_to_db = request.args.get("save", "true").lower() == "true" parser = YouTubeFeedParser(channel_id) result = parser.fetch_feed(filter_shorts=filter_shorts) if result is None: return jsonify({"error": "Failed to fetch feed"}), 500 # Save to database if requested if save_to_db: try: with get_db_session() as session: parser.save_to_db(session, result) except Exception as e: return jsonify({"error": f"Failed to save to database: {str(e)}"}), 500 return jsonify(result) @app.route("/api/channels", methods=["GET"]) def get_channels(): """API endpoint to list all tracked channels. Returns: JSON response with list of channels """ try: with get_db_session() as session: channels = session.query(Channel).all() return jsonify({ "channels": [ { "id": ch.id, "channel_id": ch.channel_id, "title": ch.title, "link": ch.link, "last_fetched": ch.last_fetched.isoformat(), "video_count": len(ch.videos) } for ch in channels ] }) except Exception as e: return jsonify({"error": f"Failed to fetch channels: {str(e)}"}), 500 @app.route("/api/history/", methods=["GET"]) def get_history(channel_id: str): """API endpoint to get video history for a specific channel. Args: channel_id: YouTube channel ID Query parameters: limit: Maximum number of videos to return (default: 50) Returns: JSON response with channel info and video history """ limit = request.args.get("limit", "50") try: limit = int(limit) except ValueError: limit = 50 try: with get_db_session() as session: channel = session.query(Channel).filter_by( channel_id=channel_id ).first() if not channel: return jsonify({"error": "Channel not found"}), 404 videos = session.query(VideoEntry).filter_by( channel_id=channel.id ).order_by(VideoEntry.created_at.desc()).limit(limit).all() return jsonify({ "channel": { "channel_id": channel.channel_id, "title": channel.title, "link": channel.link, "last_fetched": channel.last_fetched.isoformat() }, "videos": [video.to_dict() for video in videos], "total_videos": len(channel.videos) }) except Exception as e: return jsonify({"error": f"Failed to fetch history: {str(e)}"}), 500 @app.route("/api/download/", methods=["POST"]) def trigger_download(video_id: int): """Trigger video download for a specific video. Args: video_id: Database ID of the VideoEntry Returns: JSON response with task information """ try: with get_db_session() as session: video = session.query(VideoEntry).filter_by(id=video_id).first() if not video: return jsonify({"status": "error", "message": "Video not found"}), 404 # Check if already downloaded or downloading if video.download_status == DownloadStatus.COMPLETED: return jsonify({ "status": "success", "message": "Video already downloaded" }) if video.download_status == DownloadStatus.DOWNLOADING: return jsonify({ "status": "success", "message": "Video is already downloading" }) # Queue download task task = download_video.delay(video_id) return jsonify({ "status": "success", "video_id": video_id, "task_id": task.id, "message": "Download started" }) except Exception as e: return jsonify({"status": "error", "message": f"Failed to queue download: {str(e)}"}), 500 @app.route("/api/download/status/", methods=["GET"]) def get_download_status(video_id: int): """Get download status for a specific video. Args: video_id: Database ID of the VideoEntry Returns: JSON response with download status """ try: with get_db_session() as session: video = session.query(VideoEntry).filter_by(id=video_id).first() if not video: return jsonify({"error": "Video not found"}), 404 return jsonify({ "video_id": video_id, "title": video.title, "download_status": video.download_status.value, "download_path": video.download_path, "download_started_at": video.download_started_at.isoformat() if video.download_started_at else None, "download_completed_at": video.download_completed_at.isoformat() if video.download_completed_at else None, "download_error": video.download_error, "file_size": video.file_size }) except Exception as e: return jsonify({"error": f"Failed to fetch download status: {str(e)}"}), 500 @app.route("/api/download/batch", methods=["POST"]) def trigger_batch_download(): """Trigger batch download for multiple videos. Query parameters: channel_id: Download all pending videos for this channel (optional) status: Filter by download status (default: pending) Request body (alternative to query params): video_ids: List of video IDs to download Returns: JSON response with batch task information """ try: with get_db_session() as session: # Check if video_ids provided in request body data = request.get_json(silent=True) if data and 'video_ids' in data: video_ids = data['video_ids'] else: # Filter by channel and/or status channel_id = request.args.get("channel_id") status_str = request.args.get("status", "pending") try: status = DownloadStatus(status_str) except ValueError: return jsonify({"error": f"Invalid status: {status_str}"}), 400 query = session.query(VideoEntry).filter_by(download_status=status) if channel_id: channel = session.query(Channel).filter_by( channel_id=channel_id ).first() if not channel: return jsonify({"error": "Channel not found"}), 404 query = query.filter_by(channel_id=channel.id) videos = query.all() video_ids = [v.id for v in videos] if not video_ids: return jsonify({"message": "No videos to download", "total_queued": 0}) # Queue batch download task task = download_videos_batch.delay(video_ids) return jsonify({ "task_id": task.id, "total_queued": len(video_ids), "video_ids": video_ids, "message": "Batch download queued successfully" }) except Exception as e: return jsonify({"error": f"Failed to queue batch download: {str(e)}"}), 500 @app.route("/api/videos/refresh/", methods=["POST"]) def refresh_channel_videos(channel_id: int): """Refresh videos for a specific channel by fetching latest from RSS feed. Args: channel_id: Database ID of the Channel Returns: JSON response with number of new videos found """ try: with get_db_session() as session: channel = session.query(Channel).filter_by(id=channel_id).first() if not channel: return jsonify({"status": "error", "message": "Channel not found"}), 404 # Fetch latest feed parser = YouTubeFeedParser(channel.channel_id) result = parser.fetch_feed() if result is None: return jsonify({"status": "error", "message": "Failed to fetch feed"}), 500 # Count existing videos before save existing_count = len(channel.video_entries) # Save to database (upsert logic will handle duplicates) parser.save_to_db(session, result) # Count after save session.refresh(channel) new_count = len(channel.video_entries) - existing_count return jsonify({ "status": "success", "channel_id": channel_id, "new_videos": max(0, new_count), # Ensure non-negative "total_videos": len(channel.video_entries), "message": f"Found {max(0, new_count)} new videos" }) except Exception as e: return jsonify({"status": "error", "message": f"Failed to refresh channel: {str(e)}"}), 500 @app.route("/api/video/stream/", methods=["GET"]) def stream_video(video_id: int): """Stream a downloaded video file. Args: video_id: Database ID of the VideoEntry Query parameters: download: If set to 1, force download instead of streaming Returns: Video file stream or error """ try: with get_db_session() as session: video = session.query(VideoEntry).filter_by(id=video_id).first() if not video: return jsonify({"error": "Video not found"}), 404 if video.download_status != DownloadStatus.COMPLETED or not video.download_path: return jsonify({"error": "Video not downloaded yet"}), 404 # Check if file exists if not os.path.exists(video.download_path): return jsonify({"error": "Video file not found on disk"}), 404 # Check if download is requested as_attachment = request.args.get("download") == "1" return send_file( video.download_path, mimetype="video/mp4", as_attachment=as_attachment, download_name=f"{video.title}.mp4" if as_attachment else None ) except Exception as e: return jsonify({"error": f"Failed to stream video: {str(e)}"}), 500 def main(): """CLI entry point for testing feed parser.""" parser = YouTubeFeedParser(DEFAULT_CHANNEL_ID) result = parser.fetch_feed() if result is None: print("Failed to retrieve RSS feed") return print(f"Feed Title: {result['feed_title']}") print(f"Feed Link: {result['feed_link']}") for entry in result['entries']: print(f"\nEntry Title: {entry['title']}") print(f"Entry Link: {entry['link']}") if __name__ == "__main__": main()