Add SendBlue iMessage integration with admin-only access
- New imessage blueprint: webhook receives inbound iMessages, runs through LangChain agent, replies via SendBlue REST API - Admin-only: only users with lldap_admin group can use iMessage channel - Admin endpoints to link/unlink imessage_number on user accounts - Add imessage_number field to User model (needs aerich migration) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,203 @@
|
||||
import os
|
||||
import logging
|
||||
import time
|
||||
from collections import defaultdict
|
||||
|
||||
import httpx
|
||||
from quart import Blueprint, request, jsonify
|
||||
|
||||
from blueprints.users.models import User
|
||||
from blueprints.conversation.logic import (
|
||||
get_conversation_for_user,
|
||||
add_message_to_conversation,
|
||||
)
|
||||
from blueprints.conversation.agents import main_agent
|
||||
from blueprints.conversation.prompts import SIMBA_SYSTEM_PROMPT
|
||||
|
||||
imessage_blueprint = Blueprint("imessage_api", __name__, url_prefix="/api/imessage")
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Rate limiting: per-number message timestamps
|
||||
_rate_limit_store: dict[str, list[float]] = defaultdict(list)
|
||||
|
||||
RATE_LIMIT_MAX = int(os.getenv("IMESSAGE_RATE_LIMIT_MAX", "10"))
|
||||
RATE_LIMIT_WINDOW = int(os.getenv("IMESSAGE_RATE_LIMIT_WINDOW", "60"))
|
||||
|
||||
MAX_MESSAGE_LENGTH = 2000
|
||||
|
||||
SENDBLUE_API_BASE = "https://api.sendblue.co"
|
||||
|
||||
|
||||
def _get_sendblue_headers() -> dict[str, str]:
|
||||
return {
|
||||
"sb-api-key-id": os.getenv("SENDBLUE_API_KEY", ""),
|
||||
"sb-api-secret-key": os.getenv("SENDBLUE_API_SECRET", ""),
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
|
||||
def _check_rate_limit(phone_number: str) -> bool:
|
||||
"""Check if a phone number has exceeded the rate limit.
|
||||
|
||||
Returns True if the request is allowed, False if rate-limited.
|
||||
"""
|
||||
now = time.monotonic()
|
||||
cutoff = now - RATE_LIMIT_WINDOW
|
||||
|
||||
timestamps = _rate_limit_store[phone_number]
|
||||
_rate_limit_store[phone_number] = [t for t in timestamps if t > cutoff]
|
||||
|
||||
if len(_rate_limit_store[phone_number]) >= RATE_LIMIT_MAX:
|
||||
return False
|
||||
|
||||
_rate_limit_store[phone_number].append(now)
|
||||
return True
|
||||
|
||||
|
||||
async def send_imessage(to_number: str, content: str) -> dict:
|
||||
"""Send an iMessage via SendBlue API."""
|
||||
from_number = os.getenv("SENDBLUE_FROM_NUMBER", "")
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
resp = await client.post(
|
||||
f"{SENDBLUE_API_BASE}/api/send-message",
|
||||
headers=_get_sendblue_headers(),
|
||||
json={
|
||||
"number": to_number,
|
||||
"from_number": from_number,
|
||||
"content": content,
|
||||
},
|
||||
timeout=30,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
return resp.json()
|
||||
|
||||
|
||||
@imessage_blueprint.route("/webhook", methods=["POST"])
|
||||
async def webhook():
|
||||
"""Handle incoming iMessages from SendBlue."""
|
||||
data = await request.get_json()
|
||||
if not data:
|
||||
return jsonify({"error": "Invalid payload"}), 400
|
||||
|
||||
from_number = data.get("from_number")
|
||||
content = data.get("content")
|
||||
is_outbound = data.get("is_outbound", False)
|
||||
|
||||
# Ignore outbound messages (our own replies echoed back)
|
||||
if is_outbound:
|
||||
return jsonify({"status": "ignored"}), 200
|
||||
|
||||
if not from_number or not content:
|
||||
return jsonify({"error": "Missing from_number or content"}), 400
|
||||
|
||||
content = content.strip()
|
||||
if not content:
|
||||
await send_imessage(
|
||||
from_number, "I received an empty message. Please send some text!"
|
||||
)
|
||||
return jsonify({"status": "ok"}), 200
|
||||
|
||||
# Rate limiting
|
||||
if not _check_rate_limit(from_number):
|
||||
logger.warning(f"Rate limit exceeded for {from_number}")
|
||||
await send_imessage(
|
||||
from_number,
|
||||
"You're sending messages too quickly. Please wait a moment and try again.",
|
||||
)
|
||||
return jsonify({"status": "rate_limited"}), 200
|
||||
|
||||
# Truncate overly long messages
|
||||
if len(content) > MAX_MESSAGE_LENGTH:
|
||||
content = content[:MAX_MESSAGE_LENGTH]
|
||||
logger.info(
|
||||
f"Truncated long message from {from_number} to {MAX_MESSAGE_LENGTH} chars"
|
||||
)
|
||||
|
||||
logger.info(f"Received iMessage from {from_number}: {content[:100]}")
|
||||
|
||||
# Identify or create user
|
||||
user = await User.filter(imessage_number=from_number).first()
|
||||
|
||||
if not user:
|
||||
allowed_numbers = os.getenv("ALLOWED_IMESSAGE_NUMBERS", "").split(",")
|
||||
if from_number not in allowed_numbers and "*" not in allowed_numbers:
|
||||
await send_imessage(
|
||||
from_number, "Sorry, you are not authorized to use this service."
|
||||
)
|
||||
return jsonify({"status": "unauthorized"}), 200
|
||||
|
||||
username = f"im_{from_number.lstrip('+')}"
|
||||
try:
|
||||
user = await User.create(
|
||||
username=username,
|
||||
email=f"{username}@imessage.simbarag.local",
|
||||
imessage_number=from_number,
|
||||
auth_provider="imessage",
|
||||
)
|
||||
logger.info(f"Created new user for iMessage: {username}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to create user for {from_number}: {e}")
|
||||
await send_imessage(
|
||||
from_number, "Sorry, something went wrong setting up your account."
|
||||
)
|
||||
return jsonify({"status": "error"}), 200
|
||||
|
||||
# iMessage is restricted to admins
|
||||
if not user.is_admin():
|
||||
logger.warning(f"Non-admin user {user.username} attempted iMessage access")
|
||||
await send_imessage(from_number, "Sorry, this feature is restricted to admins.")
|
||||
return jsonify({"status": "forbidden"}), 200
|
||||
|
||||
# Get or create conversation
|
||||
try:
|
||||
conversation = await get_conversation_for_user(user=user)
|
||||
await conversation.fetch_related("messages")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get conversation for user {user.username}: {e}")
|
||||
await send_imessage(
|
||||
from_number, "Sorry, something went wrong. Please try again later."
|
||||
)
|
||||
return jsonify({"status": "error"}), 200
|
||||
|
||||
# Add user message to conversation
|
||||
await add_message_to_conversation(
|
||||
conversation=conversation,
|
||||
message=content,
|
||||
speaker="user",
|
||||
user=user,
|
||||
)
|
||||
|
||||
# Build messages payload for LangChain agent
|
||||
try:
|
||||
messages = await conversation.messages.all()
|
||||
recent_messages = list(messages)[-10:]
|
||||
|
||||
messages_payload = [{"role": "system", "content": SIMBA_SYSTEM_PROMPT}]
|
||||
|
||||
for msg in recent_messages[:-1]:
|
||||
role = "user" if msg.speaker == "user" else "assistant"
|
||||
messages_payload.append({"role": role, "content": msg.text})
|
||||
|
||||
messages_payload.append({"role": "user", "content": content})
|
||||
|
||||
logger.info(f"Invoking LangChain agent with {len(messages_payload)} messages")
|
||||
response = await main_agent.ainvoke({"messages": messages_payload})
|
||||
response_text = response.get("messages", [])[-1].content
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error invoking agent: {e}")
|
||||
response_text = "Sorry, I'm having trouble thinking right now."
|
||||
|
||||
# Save and send response
|
||||
await add_message_to_conversation(
|
||||
conversation=conversation,
|
||||
message=response_text,
|
||||
speaker="simba",
|
||||
user=user,
|
||||
)
|
||||
|
||||
await send_imessage(from_number, response_text)
|
||||
|
||||
return jsonify({"status": "ok"}), 200
|
||||
Reference in New Issue
Block a user