Files
simbarag/blueprints/imessage/__init__.py
T
ryan 1e753bfaab Add SendBlue webhook signature validation
Validates sb-signing-secret header against SENDBLUE_WEBHOOK_SECRET env var.
Can be disabled with SENDBLUE_SIGNATURE_VALIDATION=false for development.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-06-03 19:28:35 -04:00

230 lines
7.6 KiB
Python

import os
import hmac
import logging
import functools
import time
from collections import defaultdict
import httpx
from quart import Blueprint, request, jsonify
from blueprints.users.models import User
from blueprints.conversation.logic import (
get_conversation_for_user,
add_message_to_conversation,
)
from blueprints.conversation.agents import main_agent
from blueprints.conversation.prompts import SIMBA_SYSTEM_PROMPT
imessage_blueprint = Blueprint("imessage_api", __name__, url_prefix="/api/imessage")
logger = logging.getLogger(__name__)
# Rate limiting: per-number message timestamps
_rate_limit_store: dict[str, list[float]] = defaultdict(list)
RATE_LIMIT_MAX = int(os.getenv("IMESSAGE_RATE_LIMIT_MAX", "10"))
RATE_LIMIT_WINDOW = int(os.getenv("IMESSAGE_RATE_LIMIT_WINDOW", "60"))
MAX_MESSAGE_LENGTH = 2000
SENDBLUE_API_BASE = "https://api.sendblue.co"
def _get_sendblue_headers() -> dict[str, str]:
return {
"sb-api-key-id": os.getenv("SENDBLUE_API_KEY", ""),
"sb-api-secret-key": os.getenv("SENDBLUE_API_SECRET", ""),
"Content-Type": "application/json",
}
def _check_rate_limit(phone_number: str) -> bool:
"""Check if a phone number has exceeded the rate limit.
Returns True if the request is allowed, False if rate-limited.
"""
now = time.monotonic()
cutoff = now - RATE_LIMIT_WINDOW
timestamps = _rate_limit_store[phone_number]
_rate_limit_store[phone_number] = [t for t in timestamps if t > cutoff]
if len(_rate_limit_store[phone_number]) >= RATE_LIMIT_MAX:
return False
_rate_limit_store[phone_number].append(now)
return True
async def send_imessage(to_number: str, content: str) -> dict:
"""Send an iMessage via SendBlue API."""
from_number = os.getenv("SENDBLUE_FROM_NUMBER", "")
async with httpx.AsyncClient() as client:
resp = await client.post(
f"{SENDBLUE_API_BASE}/api/send-message",
headers=_get_sendblue_headers(),
json={
"number": to_number,
"from_number": from_number,
"content": content,
},
timeout=30,
)
resp.raise_for_status()
return resp.json()
def validate_sendblue_signature(f):
"""Decorator to validate the SendBlue webhook signing secret."""
@functools.wraps(f)
async def decorated_function(*args, **kwargs):
if os.getenv("SENDBLUE_SIGNATURE_VALIDATION", "true").lower() == "false":
return await f(*args, **kwargs)
secret = os.getenv("SENDBLUE_WEBHOOK_SECRET")
if not secret:
logger.error("SENDBLUE_WEBHOOK_SECRET not set — rejecting request")
return jsonify({"error": "Server misconfigured"}), 500
sig = request.headers.get("sb-signing-secret", "")
if not hmac.compare_digest(sig, secret):
logger.warning("Invalid SendBlue signing secret")
return jsonify({"error": "Unauthorized"}), 403
return await f(*args, **kwargs)
return decorated_function
@imessage_blueprint.route("/webhook", methods=["POST"])
@validate_sendblue_signature
async def webhook():
"""Handle incoming iMessages from SendBlue."""
data = await request.get_json()
if not data:
return jsonify({"error": "Invalid payload"}), 400
from_number = data.get("from_number")
content = data.get("content")
is_outbound = data.get("is_outbound", False)
# Ignore outbound messages (our own replies echoed back)
if is_outbound:
return jsonify({"status": "ignored"}), 200
if not from_number or not content:
return jsonify({"error": "Missing from_number or content"}), 400
content = content.strip()
if not content:
await send_imessage(
from_number, "I received an empty message. Please send some text!"
)
return jsonify({"status": "ok"}), 200
# Rate limiting
if not _check_rate_limit(from_number):
logger.warning(f"Rate limit exceeded for {from_number}")
await send_imessage(
from_number,
"You're sending messages too quickly. Please wait a moment and try again.",
)
return jsonify({"status": "rate_limited"}), 200
# Truncate overly long messages
if len(content) > MAX_MESSAGE_LENGTH:
content = content[:MAX_MESSAGE_LENGTH]
logger.info(
f"Truncated long message from {from_number} to {MAX_MESSAGE_LENGTH} chars"
)
logger.info(f"Received iMessage from {from_number}: {content[:100]}")
# Identify or create user
user = await User.filter(imessage_number=from_number).first()
if not user:
allowed_numbers = os.getenv("ALLOWED_IMESSAGE_NUMBERS", "").split(",")
if from_number not in allowed_numbers and "*" not in allowed_numbers:
await send_imessage(
from_number, "Sorry, you are not authorized to use this service."
)
return jsonify({"status": "unauthorized"}), 200
username = f"im_{from_number.lstrip('+')}"
try:
user = await User.create(
username=username,
email=f"{username}@imessage.simbarag.local",
imessage_number=from_number,
auth_provider="imessage",
)
logger.info(f"Created new user for iMessage: {username}")
except Exception as e:
logger.error(f"Failed to create user for {from_number}: {e}")
await send_imessage(
from_number, "Sorry, something went wrong setting up your account."
)
return jsonify({"status": "error"}), 200
# iMessage is restricted to admins
if not user.is_admin():
logger.warning(f"Non-admin user {user.username} attempted iMessage access")
await send_imessage(from_number, "Sorry, this feature is restricted to admins.")
return jsonify({"status": "forbidden"}), 200
# Get or create conversation
try:
conversation = await get_conversation_for_user(user=user)
await conversation.fetch_related("messages")
except Exception as e:
logger.error(f"Failed to get conversation for user {user.username}: {e}")
await send_imessage(
from_number, "Sorry, something went wrong. Please try again later."
)
return jsonify({"status": "error"}), 200
# Add user message to conversation
await add_message_to_conversation(
conversation=conversation,
message=content,
speaker="user",
user=user,
)
# Build messages payload for LangChain agent
try:
messages = await conversation.messages.all()
recent_messages = list(messages)[-10:]
messages_payload = [{"role": "system", "content": SIMBA_SYSTEM_PROMPT}]
for msg in recent_messages[:-1]:
role = "user" if msg.speaker == "user" else "assistant"
messages_payload.append({"role": role, "content": msg.text})
messages_payload.append({"role": "user", "content": content})
logger.info(f"Invoking LangChain agent with {len(messages_payload)} messages")
response = await main_agent.ainvoke({"messages": messages_payload})
response_text = response.get("messages", [])[-1].content
except Exception as e:
logger.error(f"Error invoking agent: {e}")
response_text = "Sorry, I'm having trouble thinking right now."
# Save and send response
await add_message_to_conversation(
conversation=conversation,
message=response_text,
speaker="simba",
user=user,
)
await send_imessage(from_number, response_text)
return jsonify({"status": "ok"}), 200