diff --git a/app.py b/app.py index ba80f2a..6d4a8c1 100644 --- a/app.py +++ b/app.py @@ -15,6 +15,7 @@ import blueprints.rag import blueprints.users import blueprints.whatsapp import blueprints.imessage +import blueprints.scheduled_messages import blueprints.users.models from config.db import TORTOISE_CONFIG @@ -52,6 +53,7 @@ app.register_blueprint(blueprints.email.email_blueprint) app.register_blueprint(blueprints.rag.rag_blueprint) app.register_blueprint(blueprints.whatsapp.whatsapp_blueprint) app.register_blueprint(blueprints.imessage.imessage_blueprint) +app.register_blueprint(blueprints.scheduled_messages.scheduled_messages_blueprint) async def _obsidian_sync_loop(): @@ -86,8 +88,13 @@ async def lifespan(): if os.getenv("OBSIDIAN_CONTINUOUS_SYNC") == "true": watcher_task = asyncio.create_task(_obsidian_sync_loop()) + from blueprints.scheduled_messages.scheduler import scheduled_messages_loop + + scheduler_task = asyncio.create_task(scheduled_messages_loop()) + yield + scheduler_task.cancel() if watcher_task is not None: watcher_task.cancel() diff --git a/blueprints/imessage/__init__.py b/blueprints/imessage/__init__.py index f62706c..ee806f6 100644 --- a/blueprints/imessage/__init__.py +++ b/blueprints/imessage/__init__.py @@ -224,6 +224,8 @@ async def webhook(): user=user, ) - await send_imessage(from_number, response_text) + from utils.strip_markdown import strip_markdown + + await send_imessage(from_number, strip_markdown(response_text)) return jsonify({"status": "ok"}), 200 diff --git a/blueprints/scheduled_messages/__init__.py b/blueprints/scheduled_messages/__init__.py new file mode 100644 index 0000000..f8b9b99 --- /dev/null +++ b/blueprints/scheduled_messages/__init__.py @@ -0,0 +1,147 @@ +import logging +from datetime import datetime, timezone + +from quart import Blueprint, request, jsonify + +from blueprints.users.decorators import admin_required +from .models import ScheduledMessage, MessageChannel, MessageStatus + +scheduled_messages_blueprint = Blueprint( + "scheduled_messages_api", __name__, url_prefix="/api/scheduled-messages" +) + +logger = logging.getLogger(__name__) + + +def _serialize(msg: ScheduledMessage) -> dict: + return { + "id": str(msg.id), + "recipient": msg.recipient, + "channel": msg.channel.value, + "content": msg.content, + "subject": msg.subject, + "scheduled_at": msg.scheduled_at.isoformat(), + "status": msg.status.value, + "error_message": msg.error_message, + "created_at": msg.created_at.isoformat(), + "updated_at": msg.updated_at.isoformat(), + } + + +@scheduled_messages_blueprint.route("/", methods=["GET"]) +@admin_required +async def list_messages(): + messages = await ScheduledMessage.all().order_by("-scheduled_at") + return jsonify([_serialize(m) for m in messages]) + + +@scheduled_messages_blueprint.route("/", methods=["POST"]) +@admin_required +async def create_message(): + data = await request.get_json() + if not data: + return jsonify({"error": "Invalid payload"}), 400 + + recipient = (data.get("recipient") or "").strip() + channel = data.get("channel") + content = (data.get("content") or "").strip() + subject = (data.get("subject") or "").strip() or None + scheduled_at_str = data.get("scheduled_at") + + if not recipient or not channel or not content or not scheduled_at_str: + return jsonify( + {"error": "recipient, channel, content, and scheduled_at are required"} + ), 400 + + try: + channel_enum = MessageChannel(channel) + except ValueError: + return jsonify( + {"error": f"Invalid channel: {channel}. Must be 'imessage' or 'email'"} + ), 400 + + if channel_enum == MessageChannel.EMAIL and not subject: + return jsonify({"error": "subject is required for email messages"}), 400 + + try: + scheduled_at = datetime.fromisoformat(scheduled_at_str) + if scheduled_at.tzinfo is None: + scheduled_at = scheduled_at.replace(tzinfo=timezone.utc) + except ValueError: + return jsonify({"error": "Invalid scheduled_at format"}), 400 + + if scheduled_at <= datetime.now(timezone.utc): + return jsonify({"error": "scheduled_at must be in the future"}), 400 + + from quart_jwt_extended import get_jwt_identity + + user_id = get_jwt_identity() + + msg = await ScheduledMessage.create( + recipient=recipient, + channel=channel_enum, + content=content, + subject=subject, + scheduled_at=scheduled_at, + created_by_id=user_id, + ) + return jsonify(_serialize(msg)), 201 + + +@scheduled_messages_blueprint.route("/", methods=["PUT"]) +@admin_required +async def update_message(msg_id: str): + msg = await ScheduledMessage.get_or_none(id=msg_id) + if not msg: + return jsonify({"error": "Not found"}), 404 + + if msg.status != MessageStatus.PENDING: + return jsonify({"error": "Can only update pending messages"}), 400 + + data = await request.get_json() + if not data: + return jsonify({"error": "Invalid payload"}), 400 + + if "recipient" in data: + msg.recipient = data["recipient"].strip() + if "channel" in data: + try: + msg.channel = MessageChannel(data["channel"]) + except ValueError: + return jsonify({"error": f"Invalid channel: {data['channel']}"}), 400 + if "content" in data: + msg.content = data["content"].strip() + if "subject" in data: + msg.subject = data["subject"].strip() or None + if "scheduled_at" in data: + try: + scheduled_at = datetime.fromisoformat(data["scheduled_at"]) + if scheduled_at.tzinfo is None: + scheduled_at = scheduled_at.replace(tzinfo=timezone.utc) + if scheduled_at <= datetime.now(timezone.utc): + return jsonify({"error": "scheduled_at must be in the future"}), 400 + msg.scheduled_at = scheduled_at + except ValueError: + return jsonify({"error": "Invalid scheduled_at format"}), 400 + if "status" in data and data["status"] == "cancelled": + msg.status = MessageStatus.CANCELLED + + if msg.channel == MessageChannel.EMAIL and not msg.subject: + return jsonify({"error": "subject is required for email messages"}), 400 + + await msg.save() + return jsonify(_serialize(msg)) + + +@scheduled_messages_blueprint.route("/", methods=["DELETE"]) +@admin_required +async def delete_message(msg_id: str): + msg = await ScheduledMessage.get_or_none(id=msg_id) + if not msg: + return jsonify({"error": "Not found"}), 404 + + if msg.status not in (MessageStatus.PENDING, MessageStatus.CANCELLED): + return jsonify({"error": "Can only delete pending or cancelled messages"}), 400 + + await msg.delete() + return jsonify({"status": "deleted"}) diff --git a/blueprints/scheduled_messages/models.py b/blueprints/scheduled_messages/models.py new file mode 100644 index 0000000..11d3218 --- /dev/null +++ b/blueprints/scheduled_messages/models.py @@ -0,0 +1,37 @@ +import enum + +from tortoise import fields +from tortoise.models import Model + + +class MessageChannel(enum.Enum): + IMESSAGE = "imessage" + EMAIL = "email" + + +class MessageStatus(enum.Enum): + PENDING = "pending" + SENT = "sent" + FAILED = "failed" + CANCELLED = "cancelled" + + +class ScheduledMessage(Model): + id = fields.UUIDField(primary_key=True) + recipient = fields.CharField(max_length=255) + channel = fields.CharEnumField(enum_type=MessageChannel, max_length=10) + content = fields.TextField() + subject = fields.CharField(max_length=255, null=True) + scheduled_at = fields.DatetimeField() + status = fields.CharEnumField( + enum_type=MessageStatus, max_length=10, default=MessageStatus.PENDING + ) + error_message = fields.TextField(null=True) + created_by = fields.ForeignKeyField( + "models.User", related_name="scheduled_messages" + ) + created_at = fields.DatetimeField(auto_now_add=True) + updated_at = fields.DatetimeField(auto_now=True) + + class Meta: + table = "scheduled_messages" diff --git a/blueprints/scheduled_messages/scheduler.py b/blueprints/scheduled_messages/scheduler.py new file mode 100644 index 0000000..9ce428c --- /dev/null +++ b/blueprints/scheduled_messages/scheduler.py @@ -0,0 +1,57 @@ +import asyncio +import logging +from datetime import datetime, timezone + +from .models import ScheduledMessage, MessageChannel, MessageStatus + +logger = logging.getLogger(__name__) + +POLL_INTERVAL = 15 + + +async def scheduled_messages_loop(): + """Background loop that polls for and sends due scheduled messages.""" + logger.info(f"Scheduled messages loop started (interval={POLL_INTERVAL}s)") + + while True: + try: + now = datetime.now(timezone.utc) + due = await ScheduledMessage.filter( + status=MessageStatus.PENDING, + scheduled_at__lte=now, + ).all() + + for msg in due: + try: + if msg.channel == MessageChannel.IMESSAGE: + from blueprints.imessage import send_imessage + from utils.strip_markdown import strip_markdown + + await send_imessage(msg.recipient, strip_markdown(msg.content)) + + elif msg.channel == MessageChannel.EMAIL: + from blueprints.email import send_email_reply + + await send_email_reply( + to=msg.recipient, + subject=msg.subject or "(no subject)", + body=msg.content, + ) + + msg.status = MessageStatus.SENT + msg.error_message = None + await msg.save() + logger.info( + f"Sent scheduled {msg.channel.value} message {msg.id} to {msg.recipient}" + ) + + except Exception as e: + msg.status = MessageStatus.FAILED + msg.error_message = str(e) + await msg.save() + logger.error(f"Failed to send scheduled message {msg.id}: {e}") + + except Exception: + logger.exception("Error in scheduled messages loop") + + await asyncio.sleep(POLL_INTERVAL) diff --git a/config/db.py b/config/db.py index 0bab327..650010a 100644 --- a/config/db.py +++ b/config/db.py @@ -16,6 +16,7 @@ TORTOISE_CONFIG = { "blueprints.conversation.models", "blueprints.users.models", "blueprints.email.models", + "blueprints.scheduled_messages.models", "aerich.models", ], "default_connection": "default", diff --git a/raggr-frontend/src/api/scheduledMessageService.ts b/raggr-frontend/src/api/scheduledMessageService.ts new file mode 100644 index 0000000..b6ff803 --- /dev/null +++ b/raggr-frontend/src/api/scheduledMessageService.ts @@ -0,0 +1,68 @@ +import { userService } from "./userService"; + +export interface ScheduledMessage { + id: string; + recipient: string; + channel: "imessage" | "email"; + content: string; + subject: string | null; + scheduled_at: string; + status: "pending" | "sent" | "failed" | "cancelled"; + error_message: string | null; + created_at: string; + updated_at: string; +} + +export interface CreateScheduledMessage { + recipient: string; + channel: "imessage" | "email"; + content: string; + subject?: string; + scheduled_at: string; +} + +class ScheduledMessageService { + private baseUrl = "/api/scheduled-messages"; + + async list(): Promise { + const response = await userService.fetchWithRefreshToken(`${this.baseUrl}/`); + if (!response.ok) throw new Error("Failed to list scheduled messages"); + return response.json(); + } + + async create(data: CreateScheduledMessage): Promise { + const response = await userService.fetchWithRefreshToken(`${this.baseUrl}/`, { + method: "POST", + body: JSON.stringify(data), + }); + if (!response.ok) { + const err = await response.json(); + throw new Error(err.error ?? "Failed to create scheduled message"); + } + return response.json(); + } + + async update(id: string, data: Partial & { status?: string }): Promise { + const response = await userService.fetchWithRefreshToken(`${this.baseUrl}/${id}`, { + method: "PUT", + body: JSON.stringify(data), + }); + if (!response.ok) { + const err = await response.json(); + throw new Error(err.error ?? "Failed to update scheduled message"); + } + return response.json(); + } + + async remove(id: string): Promise { + const response = await userService.fetchWithRefreshToken(`${this.baseUrl}/${id}`, { + method: "DELETE", + }); + if (!response.ok) { + const err = await response.json(); + throw new Error(err.error ?? "Failed to delete scheduled message"); + } + } +} + +export const scheduledMessageService = new ScheduledMessageService(); diff --git a/raggr-frontend/src/components/ChatScreen.tsx b/raggr-frontend/src/components/ChatScreen.tsx index 6970da8..8d992f8 100644 --- a/raggr-frontend/src/components/ChatScreen.tsx +++ b/raggr-frontend/src/components/ChatScreen.tsx @@ -1,11 +1,12 @@ import { useCallback, useState, useRef } from "react"; -import { LogOut, Shield, PanelLeftClose, PanelLeftOpen, Menu, X } from "lucide-react"; +import { LogOut, Shield, Clock, PanelLeftClose, PanelLeftOpen, Menu, X } from "lucide-react"; import { QuestionBubble } from "./QuestionBubble"; import { AnswerBubble } from "./AnswerBubble"; import { ToolBubble } from "./ToolBubble"; import { MessageInput } from "./MessageInput"; import { ConversationList } from "./ConversationList"; import { AdminPanel } from "./AdminPanel"; +import { ScheduledMessagesPanel } from "./ScheduledMessagesPanel"; import { cn } from "../lib/utils"; import { useConversations } from "../hooks/useConversations"; import { useChat } from "../hooks/useChat"; @@ -22,6 +23,7 @@ export const ChatScreen = ({ setAuthenticated, isAdmin }: ChatScreenProps) => { const [showConversations, setShowConversations] = useState(false); const [sidebarCollapsed, setSidebarCollapsed] = useState(false); const [showAdminPanel, setShowAdminPanel] = useState(false); + const [showScheduler, setShowScheduler] = useState(false); const messagesEndRef = useRef(null); const isLoadingRef = useRef(false); @@ -157,13 +159,22 @@ export const ChatScreen = ({ setAuthenticated, isAdmin }: ChatScreenProps) => {
{isAdmin && ( - + <> + + + )} +
+ +
+ {/* Create form */} +
+
+ + +
+ +
+ setRecipient(e.target.value)} + placeholder={channel === "imessage" ? "+15551234567" : "user@example.com"} + className="flex-1" + /> + setScheduledAt(e.target.value)} + className="w-52" + /> +
+ + {channel === "email" && ( + setSubject(e.target.value)} + placeholder="Subject" + /> + )} + +