import asyncio import logging from datetime import datetime, timezone from dateutil.relativedelta import relativedelta from .models import ScheduledMessage, MessageChannel, MessageStatus, Recurrence logger = logging.getLogger(__name__) POLL_INTERVAL = 15 RECURRENCE_DELTAS = { Recurrence.DAILY: relativedelta(days=1), Recurrence.WEEKLY: relativedelta(weeks=1), Recurrence.MONTHLY: relativedelta(months=1), } async def _run_agent(prompt: str) -> str: """Run a prompt through the LangChain agent and return the response text.""" from blueprints.conversation.agents import main_agent from blueprints.conversation.prompts import SIMBA_SYSTEM_PROMPT messages_payload = [ {"role": "system", "content": SIMBA_SYSTEM_PROMPT}, {"role": "user", "content": prompt}, ] response = await main_agent.ainvoke({"messages": messages_payload}) return response.get("messages", [])[-1].content async def _schedule_next_occurrence(msg: ScheduledMessage): """Create the next pending occurrence for a recurring message.""" delta = RECURRENCE_DELTAS.get(msg.recurrence) if not delta: return next_at = msg.scheduled_at + delta # If we missed several intervals, advance until we're in the future now = datetime.now(timezone.utc) while next_at <= now: next_at += delta await ScheduledMessage.create( recipient=msg.recipient, channel=msg.channel, content=msg.content, subject=msg.subject, scheduled_at=next_at, recurrence=msg.recurrence, use_agent=msg.use_agent, created_by_id=msg.created_by_id, ) logger.info( f"Scheduled next {msg.recurrence.value} occurrence for {msg.id} at {next_at.isoformat()}" ) async def scheduled_messages_loop(): """Background loop that polls for and sends due scheduled messages.""" logger.info(f"Scheduled messages loop started (interval={POLL_INTERVAL}s)") while True: try: now = datetime.now(timezone.utc) due = await ScheduledMessage.filter( status=MessageStatus.PENDING, scheduled_at__lte=now, ).all() for msg in due: try: send_content = msg.content if msg.use_agent: send_content = await _run_agent(msg.content) if msg.channel == MessageChannel.IMESSAGE: from blueprints.imessage import send_imessage from utils.strip_markdown import strip_markdown await send_imessage(msg.recipient, strip_markdown(send_content)) elif msg.channel == MessageChannel.EMAIL: from blueprints.email import send_email_reply await send_email_reply( to=msg.recipient, subject=msg.subject or "(no subject)", body=send_content, ) msg.status = MessageStatus.SENT msg.error_message = None await msg.save() logger.info( f"Sent scheduled {msg.channel.value} message {msg.id} to {msg.recipient}" ) # Schedule next occurrence for recurring messages if msg.recurrence != Recurrence.NONE: await _schedule_next_occurrence(msg) except Exception as e: msg.status = MessageStatus.FAILED msg.error_message = str(e) await msg.save() logger.error(f"Failed to send scheduled message {msg.id}: {e}") except Exception: logger.exception("Error in scheduled messages loop") await asyncio.sleep(POLL_INTERVAL)