From 20576cabf388e1808d6227de251bd7e5f6fe793c Mon Sep 17 00:00:00 2001 From: Ryan Chen Date: Wed, 3 Jun 2026 19:22:15 -0400 Subject: [PATCH] Add SendBlue iMessage integration with admin-only access - New imessage blueprint: webhook receives inbound iMessages, runs through LangChain agent, replies via SendBlue REST API - Admin-only: only users with lldap_admin group can use iMessage channel - Admin endpoints to link/unlink imessage_number on user accounts - Add imessage_number field to User model (needs aerich migration) Co-Authored-By: Claude Opus 4.6 --- .env.example | 11 ++ app.py | 2 + blueprints/imessage/__init__.py | 203 ++++++++++++++++++++++++++++++++ blueprints/users/__init__.py | 132 +++++++++++++++------ blueprints/users/models.py | 11 +- 5 files changed, 322 insertions(+), 37 deletions(-) create mode 100644 blueprints/imessage/__init__.py diff --git a/.env.example b/.env.example index 2026ea0..e8965fd 100644 --- a/.env.example +++ b/.env.example @@ -93,6 +93,17 @@ EMAIL_HMAC_SECRET= # Set to false to disable Mailgun signature validation in development MAILGUN_SIGNATURE_VALIDATION=true +# SendBlue Configuration (iMessage) +SENDBLUE_API_KEY=your-sendblue-api-key +SENDBLUE_API_SECRET=your-sendblue-api-secret +SENDBLUE_FROM_NUMBER=+1XXXXXXXXXX +# Comma-separated list of iMessage numbers allowed to use the service (E.164 format) +# Use * to allow any number +ALLOWED_IMESSAGE_NUMBERS= +# Rate limiting: max messages per window (default: 10 messages per 60 seconds) +# IMESSAGE_RATE_LIMIT_MAX=10 +# IMESSAGE_RATE_LIMIT_WINDOW=60 + # Google Calendar Configuration (via gws CLI) GOOGLE_CALENDAR_ENABLED=true # Export credentials: gws auth export --unmasked > credentials.json diff --git a/app.py b/app.py index 67dfb89..ba80f2a 100644 --- a/app.py +++ b/app.py @@ -14,6 +14,7 @@ import blueprints.email import blueprints.rag import blueprints.users import blueprints.whatsapp +import blueprints.imessage import blueprints.users.models from config.db import TORTOISE_CONFIG @@ -50,6 +51,7 @@ app.register_blueprint(blueprints.conversation.conversation_blueprint) app.register_blueprint(blueprints.email.email_blueprint) app.register_blueprint(blueprints.rag.rag_blueprint) app.register_blueprint(blueprints.whatsapp.whatsapp_blueprint) +app.register_blueprint(blueprints.imessage.imessage_blueprint) async def _obsidian_sync_loop(): diff --git a/blueprints/imessage/__init__.py b/blueprints/imessage/__init__.py new file mode 100644 index 0000000..32a8204 --- /dev/null +++ b/blueprints/imessage/__init__.py @@ -0,0 +1,203 @@ +import os +import logging +import time +from collections import defaultdict + +import httpx +from quart import Blueprint, request, jsonify + +from blueprints.users.models import User +from blueprints.conversation.logic import ( + get_conversation_for_user, + add_message_to_conversation, +) +from blueprints.conversation.agents import main_agent +from blueprints.conversation.prompts import SIMBA_SYSTEM_PROMPT + +imessage_blueprint = Blueprint("imessage_api", __name__, url_prefix="/api/imessage") + +logger = logging.getLogger(__name__) + +# Rate limiting: per-number message timestamps +_rate_limit_store: dict[str, list[float]] = defaultdict(list) + +RATE_LIMIT_MAX = int(os.getenv("IMESSAGE_RATE_LIMIT_MAX", "10")) +RATE_LIMIT_WINDOW = int(os.getenv("IMESSAGE_RATE_LIMIT_WINDOW", "60")) + +MAX_MESSAGE_LENGTH = 2000 + +SENDBLUE_API_BASE = "https://api.sendblue.co" + + +def _get_sendblue_headers() -> dict[str, str]: + return { + "sb-api-key-id": os.getenv("SENDBLUE_API_KEY", ""), + "sb-api-secret-key": os.getenv("SENDBLUE_API_SECRET", ""), + "Content-Type": "application/json", + } + + +def _check_rate_limit(phone_number: str) -> bool: + """Check if a phone number has exceeded the rate limit. + + Returns True if the request is allowed, False if rate-limited. + """ + now = time.monotonic() + cutoff = now - RATE_LIMIT_WINDOW + + timestamps = _rate_limit_store[phone_number] + _rate_limit_store[phone_number] = [t for t in timestamps if t > cutoff] + + if len(_rate_limit_store[phone_number]) >= RATE_LIMIT_MAX: + return False + + _rate_limit_store[phone_number].append(now) + return True + + +async def send_imessage(to_number: str, content: str) -> dict: + """Send an iMessage via SendBlue API.""" + from_number = os.getenv("SENDBLUE_FROM_NUMBER", "") + + async with httpx.AsyncClient() as client: + resp = await client.post( + f"{SENDBLUE_API_BASE}/api/send-message", + headers=_get_sendblue_headers(), + json={ + "number": to_number, + "from_number": from_number, + "content": content, + }, + timeout=30, + ) + resp.raise_for_status() + return resp.json() + + +@imessage_blueprint.route("/webhook", methods=["POST"]) +async def webhook(): + """Handle incoming iMessages from SendBlue.""" + data = await request.get_json() + if not data: + return jsonify({"error": "Invalid payload"}), 400 + + from_number = data.get("from_number") + content = data.get("content") + is_outbound = data.get("is_outbound", False) + + # Ignore outbound messages (our own replies echoed back) + if is_outbound: + return jsonify({"status": "ignored"}), 200 + + if not from_number or not content: + return jsonify({"error": "Missing from_number or content"}), 400 + + content = content.strip() + if not content: + await send_imessage( + from_number, "I received an empty message. Please send some text!" + ) + return jsonify({"status": "ok"}), 200 + + # Rate limiting + if not _check_rate_limit(from_number): + logger.warning(f"Rate limit exceeded for {from_number}") + await send_imessage( + from_number, + "You're sending messages too quickly. Please wait a moment and try again.", + ) + return jsonify({"status": "rate_limited"}), 200 + + # Truncate overly long messages + if len(content) > MAX_MESSAGE_LENGTH: + content = content[:MAX_MESSAGE_LENGTH] + logger.info( + f"Truncated long message from {from_number} to {MAX_MESSAGE_LENGTH} chars" + ) + + logger.info(f"Received iMessage from {from_number}: {content[:100]}") + + # Identify or create user + user = await User.filter(imessage_number=from_number).first() + + if not user: + allowed_numbers = os.getenv("ALLOWED_IMESSAGE_NUMBERS", "").split(",") + if from_number not in allowed_numbers and "*" not in allowed_numbers: + await send_imessage( + from_number, "Sorry, you are not authorized to use this service." + ) + return jsonify({"status": "unauthorized"}), 200 + + username = f"im_{from_number.lstrip('+')}" + try: + user = await User.create( + username=username, + email=f"{username}@imessage.simbarag.local", + imessage_number=from_number, + auth_provider="imessage", + ) + logger.info(f"Created new user for iMessage: {username}") + except Exception as e: + logger.error(f"Failed to create user for {from_number}: {e}") + await send_imessage( + from_number, "Sorry, something went wrong setting up your account." + ) + return jsonify({"status": "error"}), 200 + + # iMessage is restricted to admins + if not user.is_admin(): + logger.warning(f"Non-admin user {user.username} attempted iMessage access") + await send_imessage(from_number, "Sorry, this feature is restricted to admins.") + return jsonify({"status": "forbidden"}), 200 + + # Get or create conversation + try: + conversation = await get_conversation_for_user(user=user) + await conversation.fetch_related("messages") + except Exception as e: + logger.error(f"Failed to get conversation for user {user.username}: {e}") + await send_imessage( + from_number, "Sorry, something went wrong. Please try again later." + ) + return jsonify({"status": "error"}), 200 + + # Add user message to conversation + await add_message_to_conversation( + conversation=conversation, + message=content, + speaker="user", + user=user, + ) + + # Build messages payload for LangChain agent + try: + messages = await conversation.messages.all() + recent_messages = list(messages)[-10:] + + messages_payload = [{"role": "system", "content": SIMBA_SYSTEM_PROMPT}] + + for msg in recent_messages[:-1]: + role = "user" if msg.speaker == "user" else "assistant" + messages_payload.append({"role": role, "content": msg.text}) + + messages_payload.append({"role": "user", "content": content}) + + logger.info(f"Invoking LangChain agent with {len(messages_payload)} messages") + response = await main_agent.ainvoke({"messages": messages_payload}) + response_text = response.get("messages", [])[-1].content + + except Exception as e: + logger.error(f"Error invoking agent: {e}") + response_text = "Sorry, I'm having trouble thinking right now." + + # Save and send response + await add_message_to_conversation( + conversation=conversation, + message=response_text, + speaker="simba", + user=user, + ) + + await send_imessage(from_number, response_text) + + return jsonify({"status": "ok"}), 200 diff --git a/blueprints/users/__init__.py b/blueprints/users/__init__.py index 58c9edf..ea9cfe0 100644 --- a/blueprints/users/__init__.py +++ b/blueprints/users/__init__.py @@ -212,32 +212,42 @@ async def me(): user = await User.get_or_none(id=user_id) if not user: return jsonify({"error": "User not found"}), 404 - return jsonify({ - "id": str(user.id), - "username": user.username, - "email": user.email, - "is_admin": user.is_admin(), - }) + return jsonify( + { + "id": str(user.id), + "username": user.username, + "email": user.email, + "is_admin": user.is_admin(), + } + ) @user_blueprint.route("/admin/users", methods=["GET"]) @admin_required async def list_users(): from blueprints.email.helpers import get_user_email_address + users = await User.all().order_by("username") mailgun_domain = os.getenv("MAILGUN_DOMAIN", "") - return jsonify([ - { - "id": str(u.id), - "username": u.username, - "email": u.email, - "whatsapp_number": u.whatsapp_number, - "auth_provider": u.auth_provider, - "email_enabled": u.email_enabled, - "email_address": get_user_email_address(u.email_hmac_token, mailgun_domain) if u.email_hmac_token and u.email_enabled else None, - } - for u in users - ]) + return jsonify( + [ + { + "id": str(u.id), + "username": u.username, + "email": u.email, + "whatsapp_number": u.whatsapp_number, + "imessage_number": u.imessage_number, + "auth_provider": u.auth_provider, + "email_enabled": u.email_enabled, + "email_address": get_user_email_address( + u.email_hmac_token, mailgun_domain + ) + if u.email_hmac_token and u.email_enabled + else None, + } + for u in users + ] + ) @user_blueprint.route("/admin/users//whatsapp", methods=["PUT"]) @@ -254,17 +264,21 @@ async def set_whatsapp(user_id): conflict = await User.filter(whatsapp_number=number).exclude(id=user_id).first() if conflict: - return jsonify({"error": "That WhatsApp number is already linked to another account"}), 409 + return jsonify( + {"error": "That WhatsApp number is already linked to another account"} + ), 409 user.whatsapp_number = number await user.save() - return jsonify({ - "id": str(user.id), - "username": user.username, - "email": user.email, - "whatsapp_number": user.whatsapp_number, - "auth_provider": user.auth_provider, - }) + return jsonify( + { + "id": str(user.id), + "username": user.username, + "email": user.email, + "whatsapp_number": user.whatsapp_number, + "auth_provider": user.auth_provider, + } + ) @user_blueprint.route("/admin/users//whatsapp", methods=["DELETE"]) @@ -279,11 +293,55 @@ async def unlink_whatsapp(user_id): return jsonify({"ok": True}) +@user_blueprint.route("/admin/users//imessage", methods=["PUT"]) +@admin_required +async def set_imessage(user_id): + data = await request.get_json() + number = (data or {}).get("imessage_number", "").strip() + if not number: + return jsonify({"error": "imessage_number is required"}), 400 + + user = await User.get_or_none(id=user_id) + if not user: + return jsonify({"error": "User not found"}), 404 + + conflict = await User.filter(imessage_number=number).exclude(id=user_id).first() + if conflict: + return jsonify( + {"error": "That iMessage number is already linked to another account"} + ), 409 + + user.imessage_number = number + await user.save() + return jsonify( + { + "id": str(user.id), + "username": user.username, + "email": user.email, + "imessage_number": user.imessage_number, + "auth_provider": user.auth_provider, + } + ) + + +@user_blueprint.route("/admin/users//imessage", methods=["DELETE"]) +@admin_required +async def unlink_imessage(user_id): + user = await User.get_or_none(id=user_id) + if not user: + return jsonify({"error": "User not found"}), 404 + + user.imessage_number = None + await user.save() + return jsonify({"ok": True}) + + @user_blueprint.route("/admin/users//email", methods=["PUT"]) @admin_required async def toggle_email(user_id): """Enable email channel for a user, generating an HMAC token.""" from blueprints.email.helpers import generate_email_token, get_user_email_address + user = await User.get_or_none(id=user_id) if not user: return jsonify({"error": "User not found"}), 404 @@ -299,15 +357,19 @@ async def toggle_email(user_id): user.email_enabled = True await user.save() - return jsonify({ - "id": str(user.id), - "username": user.username, - "email": user.email, - "whatsapp_number": user.whatsapp_number, - "auth_provider": user.auth_provider, - "email_enabled": user.email_enabled, - "email_address": get_user_email_address(user.email_hmac_token, mailgun_domain), - }) + return jsonify( + { + "id": str(user.id), + "username": user.username, + "email": user.email, + "whatsapp_number": user.whatsapp_number, + "auth_provider": user.auth_provider, + "email_enabled": user.email_enabled, + "email_address": get_user_email_address( + user.email_hmac_token, mailgun_domain + ), + } + ) @user_blueprint.route("/admin/users//email", methods=["DELETE"]) diff --git a/blueprints/users/models.py b/blueprints/users/models.py index 82ea099..f6b1902 100644 --- a/blueprints/users/models.py +++ b/blueprints/users/models.py @@ -10,11 +10,18 @@ class User(Model): username = fields.CharField(max_length=255) password = fields.BinaryField(null=True) # Hashed - nullable for OIDC users email = fields.CharField(max_length=100, unique=True) - whatsapp_number = fields.CharField(max_length=30, unique=True, null=True, index=True) + whatsapp_number = fields.CharField( + max_length=30, unique=True, null=True, index=True + ) + imessage_number = fields.CharField( + max_length=30, unique=True, null=True, index=True + ) # Email channel fields email_enabled = fields.BooleanField(default=False) - email_hmac_token = fields.CharField(max_length=16, unique=True, null=True, index=True) + email_hmac_token = fields.CharField( + max_length=16, unique=True, null=True, index=True + ) # OIDC fields oidc_subject = fields.CharField(