Users can now receive a unique email address (ask+<token>@domain) and interact with Simba by sending emails. Inbound emails hit a Mailgun webhook, are authenticated via HMAC token lookup, processed through the LangChain agent, and replied to via the Mailgun API. - Extract shared SIMBA_SYSTEM_PROMPT to blueprints/conversation/prompts.py - Add email_enabled and email_hmac_token fields to User model - Create blueprints/email with webhook, signature validation, rate limiting - Add admin endpoints to enable/disable email per user - Update AdminPanel with Email column, toggle, and copy-address button - Add Mailgun env vars to .env.example - Include database migration for new fields Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
218 lines
6.8 KiB
Python
218 lines
6.8 KiB
Python
import os
|
|
import hmac
|
|
import hashlib
|
|
import logging
|
|
import functools
|
|
import time
|
|
from collections import defaultdict
|
|
|
|
import httpx
|
|
from quart import Blueprint, request
|
|
|
|
from blueprints.users.models import User
|
|
from blueprints.conversation.logic import (
|
|
get_conversation_for_user,
|
|
add_message_to_conversation,
|
|
get_conversation_transcript,
|
|
)
|
|
from blueprints.conversation.agents import main_agent
|
|
from blueprints.conversation.prompts import SIMBA_SYSTEM_PROMPT
|
|
from .helpers import generate_email_token, get_user_email_address # noqa: F401
|
|
|
|
email_blueprint = Blueprint("email_api", __name__, url_prefix="/api/email")
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Rate limiting: per-sender message timestamps
|
|
_rate_limit_store: dict[str, list[float]] = defaultdict(list)
|
|
|
|
RATE_LIMIT_MAX = int(os.getenv("EMAIL_RATE_LIMIT_MAX", "5"))
|
|
RATE_LIMIT_WINDOW = int(os.getenv("EMAIL_RATE_LIMIT_WINDOW", "300"))
|
|
|
|
MAX_MESSAGE_LENGTH = 2000
|
|
|
|
|
|
# --- Mailgun signature validation ---
|
|
|
|
def validate_mailgun_signature(f):
|
|
"""Decorator to validate Mailgun webhook signatures."""
|
|
@functools.wraps(f)
|
|
async def decorated_function(*args, **kwargs):
|
|
if os.getenv("MAILGUN_SIGNATURE_VALIDATION", "true").lower() == "false":
|
|
return await f(*args, **kwargs)
|
|
|
|
signing_key = os.getenv("MAILGUN_WEBHOOK_SIGNING_KEY")
|
|
if not signing_key:
|
|
logger.error("MAILGUN_WEBHOOK_SIGNING_KEY not set — rejecting request")
|
|
return "", 406
|
|
|
|
form_data = await request.form
|
|
timestamp = form_data.get("timestamp", "")
|
|
token = form_data.get("token", "")
|
|
signature = form_data.get("signature", "")
|
|
|
|
if not timestamp or not token or not signature:
|
|
logger.warning("Missing Mailgun signature fields")
|
|
return "", 406
|
|
|
|
expected = hmac.new(
|
|
signing_key.encode(),
|
|
f"{timestamp}{token}".encode(),
|
|
hashlib.sha256,
|
|
).hexdigest()
|
|
|
|
if not hmac.compare_digest(expected, signature):
|
|
logger.warning("Invalid Mailgun signature")
|
|
return "", 406
|
|
|
|
return await f(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
|
|
# --- Rate limiting ---
|
|
|
|
def _check_rate_limit(sender: str) -> bool:
|
|
"""Check if a sender has exceeded the rate limit.
|
|
|
|
Returns True if the request is allowed, False if rate-limited.
|
|
"""
|
|
now = time.monotonic()
|
|
cutoff = now - RATE_LIMIT_WINDOW
|
|
|
|
timestamps = _rate_limit_store[sender]
|
|
_rate_limit_store[sender] = [t for t in timestamps if t > cutoff]
|
|
|
|
if len(_rate_limit_store[sender]) >= RATE_LIMIT_MAX:
|
|
return False
|
|
|
|
_rate_limit_store[sender].append(now)
|
|
return True
|
|
|
|
|
|
# --- Send reply via Mailgun API ---
|
|
|
|
async def send_email_reply(to: str, subject: str, body: str, in_reply_to: str | None = None):
|
|
"""Send a reply email via the Mailgun API."""
|
|
api_key = os.getenv("MAILGUN_API_KEY")
|
|
domain = os.getenv("MAILGUN_DOMAIN")
|
|
if not api_key or not domain:
|
|
logger.error("MAILGUN_API_KEY or MAILGUN_DOMAIN not configured")
|
|
return
|
|
|
|
data = {
|
|
"from": f"Simba <simba@{domain}>",
|
|
"to": to,
|
|
"subject": f"Re: {subject}" if not subject.startswith("Re:") else subject,
|
|
"text": body,
|
|
}
|
|
if in_reply_to:
|
|
data["h:In-Reply-To"] = in_reply_to
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
resp = await client.post(
|
|
f"https://api.mailgun.net/v3/{domain}/messages",
|
|
auth=("api", api_key),
|
|
data=data,
|
|
)
|
|
if resp.status_code != 200:
|
|
logger.error(f"Mailgun send failed ({resp.status_code}): {resp.text}")
|
|
else:
|
|
logger.info(f"Sent email reply to {to}")
|
|
|
|
|
|
# --- Webhook route ---
|
|
|
|
@email_blueprint.route("/webhook", methods=["POST"])
|
|
@validate_mailgun_signature
|
|
async def webhook():
|
|
"""Handle inbound emails forwarded by Mailgun."""
|
|
form_data = await request.form
|
|
sender = form_data.get("sender", "")
|
|
recipient = form_data.get("recipient", "")
|
|
body = form_data.get("stripped-text", "")
|
|
subject = form_data.get("subject", "(no subject)")
|
|
message_id = form_data.get("Message-Id", "")
|
|
|
|
# Extract token from recipient: ask+<token>@domain
|
|
local_part = recipient.split("@")[0] if "@" in recipient else ""
|
|
if "+" not in local_part:
|
|
logger.info(f"Ignoring email to {recipient} — no token in address")
|
|
return "", 200
|
|
|
|
token = local_part.split("+", 1)[1]
|
|
|
|
# Lookup user by token
|
|
user = await User.filter(email_hmac_token=token, email_enabled=True).first()
|
|
if not user:
|
|
logger.info(f"No user found for email token {token}")
|
|
return "", 200
|
|
|
|
# Rate limit
|
|
if not _check_rate_limit(sender):
|
|
logger.warning(f"Rate limit exceeded for email sender {sender}")
|
|
return "", 200
|
|
|
|
# Clean up body
|
|
body = (body or "").strip()
|
|
if not body:
|
|
logger.info(f"Ignoring empty email from {sender}")
|
|
return "", 200
|
|
|
|
if len(body) > MAX_MESSAGE_LENGTH:
|
|
body = body[:MAX_MESSAGE_LENGTH]
|
|
logger.info(f"Truncated long email from {sender} to {MAX_MESSAGE_LENGTH} chars")
|
|
|
|
logger.info(f"Processing email from {sender} for user {user.username}: {body[:100]}")
|
|
|
|
# Get or create conversation
|
|
try:
|
|
conversation = await get_conversation_for_user(user=user)
|
|
await conversation.fetch_related("messages")
|
|
except Exception as e:
|
|
logger.error(f"Failed to get conversation for user {user.username}: {e}")
|
|
return "", 200
|
|
|
|
# Add user message
|
|
await add_message_to_conversation(
|
|
conversation=conversation,
|
|
message=body,
|
|
speaker="user",
|
|
user=user,
|
|
)
|
|
|
|
# Build messages payload
|
|
try:
|
|
messages = await conversation.messages.all()
|
|
recent_messages = list(messages)[-10:]
|
|
|
|
messages_payload = [{"role": "system", "content": SIMBA_SYSTEM_PROMPT}]
|
|
for msg in recent_messages[:-1]:
|
|
role = "user" if msg.speaker == "user" else "assistant"
|
|
messages_payload.append({"role": role, "content": msg.text})
|
|
messages_payload.append({"role": "user", "content": body})
|
|
|
|
logger.info(f"Invoking LangChain agent with {len(messages_payload)} messages")
|
|
response = await main_agent.ainvoke({"messages": messages_payload})
|
|
response_text = response.get("messages", [])[-1].content
|
|
except Exception as e:
|
|
logger.error(f"Error invoking agent for email: {e}")
|
|
response_text = "Sorry, I'm having trouble thinking right now."
|
|
|
|
# Save response
|
|
await add_message_to_conversation(
|
|
conversation=conversation,
|
|
message=response_text,
|
|
speaker="simba",
|
|
user=user,
|
|
)
|
|
|
|
# Send reply email
|
|
await send_email_reply(
|
|
to=sender,
|
|
subject=subject,
|
|
body=response_text,
|
|
in_reply_to=message_id,
|
|
)
|
|
|
|
return "", 200
|