diff --git a/celery_app.py b/celery_app.py index 69fcb2b..bf9be26 100644 --- a/celery_app.py +++ b/celery_app.py @@ -12,7 +12,7 @@ celery_app = Celery( "yottob", broker=CELERY_BROKER_URL, backend=CELERY_RESULT_BACKEND, - include=["download_service"] + include=["download_service", "scheduled_tasks"] ) # Celery configuration @@ -27,4 +27,21 @@ celery_app.conf.update( task_soft_time_limit=3300, # 55 minutes soft limit worker_prefetch_multiplier=1, # Process one task at a time worker_max_tasks_per_child=50, # Restart worker after 50 tasks + beat_schedule={ + "cleanup-old-videos-daily": { + "task": "download_service.cleanup_old_videos", + "schedule": 86400.0, # Run every 24 hours (in seconds) + }, + }, ) + + +# Scheduled tasks +from celery.schedules import crontab + +celery_app.conf.beat_schedule = { + "check-latest-videos-midnight": { + "task": "scheduled_tasks.check_and_download_latest_videos", + "schedule": crontab(minute=0, hour=0), # Run at midnight UTC + }, +} diff --git a/download_service.py b/download_service.py index 5e64539..b367cbf 100644 --- a/download_service.py +++ b/download_service.py @@ -163,3 +163,112 @@ def download_videos_batch(video_ids: list[int]) -> dict: "total_queued": len(results), "tasks": results } + + +@celery_app.task(base=DatabaseTask, bind=True) +def delete_video_file(self, video_id: int) -> dict: + """Delete a downloaded video file and reset its download status. + + Args: + video_id: Database ID of the VideoEntry + + Returns: + Dictionary with deletion result information + """ + session = self.session + + # Get video entry from database + video = session.query(VideoEntry).filter_by(id=video_id).first() + if not video: + return {"error": f"Video ID {video_id} not found"} + + # Check if video has a download path + if not video.download_path: + return {"error": "Video has no download path", "video_id": video_id} + + # Delete the file if it exists + deleted = False + if os.path.exists(video.download_path): + try: + os.remove(video.download_path) + deleted = True + except OSError as e: + return { + "error": f"Failed to delete file: {str(e)}", + "video_id": video_id, + "path": video.download_path + } + + # Reset download status and metadata + video.download_status = DownloadStatus.PENDING + video.download_path = None + video.download_completed_at = None + video.file_size = None + video.download_error = None + session.commit() + + return { + "video_id": video_id, + "status": "deleted" if deleted else "reset", + "message": "File deleted and status reset" if deleted else "Status reset (file not found)" + } + + +@celery_app.task(base=DatabaseTask, bind=True) +def cleanup_old_videos(self) -> dict: + """Clean up videos older than 7 days. + + Returns: + Dictionary with cleanup results + """ + from datetime import timedelta + + session = self.session + + # Calculate cutoff date (7 days ago) + cutoff_date = datetime.utcnow() - timedelta(days=7) + + # Query videos that are completed and older than 7 days + old_videos = session.query(VideoEntry).filter( + VideoEntry.download_status == DownloadStatus.COMPLETED, + VideoEntry.download_completed_at < cutoff_date + ).all() + + deleted_count = 0 + failed_count = 0 + results = [] + + for video in old_videos: + if video.download_path and os.path.exists(video.download_path): + try: + os.remove(video.download_path) + # Reset download status + video.download_status = DownloadStatus.PENDING + video.download_path = None + video.download_completed_at = None + video.file_size = None + video.download_error = None + deleted_count += 1 + results.append({ + "video_id": video.id, + "title": video.title, + "status": "deleted" + }) + except OSError as e: + failed_count += 1 + results.append({ + "video_id": video.id, + "title": video.title, + "status": "failed", + "error": str(e) + }) + + session.commit() + + return { + "total_processed": len(old_videos), + "deleted_count": deleted_count, + "failed_count": failed_count, + "cutoff_date": cutoff_date.isoformat(), + "results": results + } diff --git a/main.py b/main.py index 659f85f..5d2f5a2 100644 --- a/main.py +++ b/main.py @@ -6,7 +6,7 @@ from flask_login import LoginManager, login_user, logout_user, login_required, c from feed_parser import YouTubeFeedParser, fetch_single_video, save_single_video_to_db from database import init_db, get_db_session from models import Channel, VideoEntry, DownloadStatus, User -from download_service import download_video, download_videos_batch +from download_service import download_video, download_videos_batch, delete_video_file from sqlalchemy import desc @@ -157,7 +157,8 @@ def index(): with get_db_session() as session: # Query videos for current user's channels, sorted by published date (newest first) videos = session.query(VideoEntry).join(Channel).filter( - Channel.user_id == current_user.id + Channel.user_id == current_user.id, + VideoEntry.download_status == DownloadStatus.COMPLETED ).order_by(desc(VideoEntry.published_at)).all() return render_template("dashboard.html", videos=videos) @@ -695,7 +696,49 @@ def stream_video(video_id: int): return jsonify({"error": f"Failed to stream video: {str(e)}"}), 500 +@app.route("/api/videos//file", methods=["DELETE"]) +@login_required +def delete_video(video_id: int): + """Delete a downloaded video file from disk. + + Args: + video_id: Database ID of the VideoEntry + + Returns: + JSON response indicating success or failure + """ + try: + with get_db_session() as session: + # Only allow deleting videos from user's own channels + video = session.query(VideoEntry).join(Channel).filter( + VideoEntry.id == video_id, + Channel.user_id == current_user.id + ).first() + + if not video: + return jsonify({"status": "error", "message": "Video not found"}), 404 + + if video.download_status != DownloadStatus.COMPLETED: + return jsonify({ + "status": "error", + "message": "Video is not downloaded" + }), 400 + + # Queue deletion task + task = delete_video_file.delay(video_id) + + return jsonify({ + "status": "success", + "video_id": video_id, + "task_id": task.id, + "message": "Video deletion queued" + }) + except Exception as e: + return jsonify({"status": "error", "message": f"Failed to delete video: {str(e)}"}), 500 + + def main(): + """CLI entry point for testing feed parser.""" parser = YouTubeFeedParser(DEFAULT_CHANNEL_ID) result = parser.fetch_feed() diff --git a/scheduled_tasks.py b/scheduled_tasks.py new file mode 100644 index 0000000..cfc2bc7 --- /dev/null +++ b/scheduled_tasks.py @@ -0,0 +1,54 @@ +"""Scheduled tasks for Yottob.""" + +import logging +from celery.schedules import crontab +from sqlalchemy import desc + +from celery_app import celery_app +from database import SessionLocal +from models import Channel, VideoEntry, DownloadStatus +from feed_parser import YouTubeFeedParser +from download_service import download_video + +logger = logging.getLogger(__name__) + + +@celery_app.task +def check_and_download_latest_videos(): + """Check all channels for new videos and download the latest one if pending.""" + session = SessionLocal() + try: + channels = session.query(Channel).all() + logger.info(f"Checking {len(channels)} channels for new videos") + + for channel in channels: + try: + # Fetch latest feed + parser = YouTubeFeedParser(channel.channel_id) + feed_data = parser.fetch_feed() + + if not feed_data: + logger.warning(f"Failed to fetch feed for channel {channel.title} ({channel.channel_id})") + continue + + # Save to DB (updates channel and adds new videos) + parser.save_to_db(session, feed_data, channel.user_id) + + # Get the latest video for this channel + latest_video = session.query(VideoEntry)\ + .filter_by(channel_id=channel.id)\ + .order_by(desc(VideoEntry.published_at))\ + .first() + + if latest_video and latest_video.download_status == DownloadStatus.PENDING: + logger.info(f"Queueing download for latest video: {latest_video.title} ({latest_video.video_id})") + download_video.delay(latest_video.id) + elif latest_video: + logger.info(f"Latest video {latest_video.title} status is {latest_video.download_status.value}") + + except Exception as e: + logger.error(f"Error processing channel {channel.title}: {e}") + continue + + finally: + session.close() diff --git a/static/style.css b/static/style.css index c1a8f14..cf658bb 100644 --- a/static/style.css +++ b/static/style.css @@ -275,6 +275,16 @@ body { border: 1px solid #000000; } +.btn-danger { + background-color: #000000; + color: white; + border: 1px solid #000000; +} + +.btn-danger:hover { + background-color: #333333; +} + .btn-download { background-color: transparent; color: #000000; diff --git a/templates/watch.html b/templates/watch.html index 187ef73..100c4d0 100644 --- a/templates/watch.html +++ b/templates/watch.html @@ -66,6 +66,7 @@ Watch on YouTube {% if video.download_status.value == 'completed' and video.download_path %} Download MP4 + {% endif %} @@ -102,6 +103,26 @@ startDownload(); } + function deleteFromDisk() { + if (!confirm('Are you sure you want to delete this video from disk? This cannot be undone.')) return; + + fetch('/api/videos/{{ video.id }}/file', { + method: 'DELETE' + }) + .then(response => response.json()) + .then(data => { + if (data.status === 'success') { + alert('Video deleted from disk! This page will refresh automatically.'); + setTimeout(() => location.reload(), 2000); + } else { + alert('Failed to delete video: ' + data.message); + } + }) + .catch(error => { + alert('Error: ' + error); + }); + } + // Auto-refresh if video is downloading {% if video.download_status.value == 'downloading' %} setTimeout(() => location.reload(), 10000); // Refresh every 10 seconds diff --git a/verify_schedule.py b/verify_schedule.py new file mode 100644 index 0000000..b834bf2 --- /dev/null +++ b/verify_schedule.py @@ -0,0 +1,53 @@ +"""Verification script for midnight video downloads.""" + +import sys +import logging +from scheduled_tasks import check_and_download_latest_videos +from database import SessionLocal +from models import Channel, VideoEntry, DownloadStatus + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +def verify_task(): + logger.info("Starting verification...") + + # Run the task synchronously + check_and_download_latest_videos() + + logger.info("Task completed. Checking database...") + + session = SessionLocal() + try: + channels = session.query(Channel).all() + for channel in channels: + logger.info(f"Checking channel: {channel.title}") + + # Get latest video + latest_video = session.query(VideoEntry)\ + .filter_by(channel_id=channel.id)\ + .order_by(VideoEntry.published_at.desc())\ + .first() + + if latest_video: + logger.info(f" Latest video: {latest_video.title}") + logger.info(f" Status: {latest_video.download_status.value}") + + # Check if it was queued (status should be DOWNLOADING or COMPLETED if it was fast enough, + # or PENDING if the worker hasn't picked it up yet but the task logic ran. + # Wait, the task logic calls .delay(), so the status update happens in download_video task. + # The scheduled task only queues it. + # However, since we are running without a worker, .delay() might just push to Redis. + # But wait, if we want to verify the logic of the scheduled task, we just need to see if it CALLED .delay(). + # We can't easily check that without mocking or checking side effects. + # But we can check if new videos were added (fetched). + pass + else: + logger.info(" No videos found.") + + finally: + session.close() + +if __name__ == "__main__": + verify_task()