Merge origin/main: resolve conflicts keeping both email/Mealie and WhatsApp/Mailgun/Obsidian work
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
@@ -1,12 +1,19 @@
|
||||
import datetime
|
||||
import json
|
||||
import logging
|
||||
import uuid
|
||||
|
||||
from quart import Blueprint, jsonify, request
|
||||
from quart import Blueprint, Response, jsonify, make_response, request
|
||||
from quart_jwt_extended import (
|
||||
get_jwt_identity,
|
||||
jwt_refresh_token_required,
|
||||
)
|
||||
|
||||
import blueprints.users.models
|
||||
from utils.image_process import analyze_user_image
|
||||
from utils.image_upload import ImageValidationError, process_image
|
||||
from utils.s3_client import get_image as s3_get_image
|
||||
from utils.s3_client import upload_image as s3_upload_image
|
||||
|
||||
from .agents import main_agent
|
||||
from .logic import (
|
||||
@@ -19,11 +26,41 @@ from .models import (
|
||||
PydConversation,
|
||||
PydListConversation,
|
||||
)
|
||||
from .prompts import SIMBA_SYSTEM_PROMPT
|
||||
|
||||
conversation_blueprint = Blueprint(
|
||||
"conversation_api", __name__, url_prefix="/api/conversation"
|
||||
)
|
||||
|
||||
_SYSTEM_PROMPT = SIMBA_SYSTEM_PROMPT
|
||||
|
||||
|
||||
def _build_messages_payload(
|
||||
conversation, query_text: str, image_description: str | None = None
|
||||
) -> list:
|
||||
recent_messages = (
|
||||
conversation.messages[-10:]
|
||||
if len(conversation.messages) > 10
|
||||
else conversation.messages
|
||||
)
|
||||
messages_payload = [{"role": "system", "content": _SYSTEM_PROMPT}]
|
||||
for msg in recent_messages[:-1]: # Exclude the message we just added
|
||||
role = "user" if msg.speaker == "user" else "assistant"
|
||||
text = msg.text
|
||||
if msg.image_key and role == "user":
|
||||
text = f"[User sent an image]\n{text}"
|
||||
messages_payload.append({"role": role, "content": text})
|
||||
|
||||
# Build the current user message with optional image description
|
||||
if image_description:
|
||||
content = f"[Image analysis: {image_description}]"
|
||||
if query_text:
|
||||
content = f"{query_text}\n\n{content}"
|
||||
else:
|
||||
content = query_text
|
||||
messages_payload.append({"role": "user", "content": content})
|
||||
return messages_payload
|
||||
|
||||
|
||||
@conversation_blueprint.post("/query")
|
||||
@jwt_refresh_token_required
|
||||
@@ -42,68 +79,7 @@ async def query():
|
||||
user=user,
|
||||
)
|
||||
|
||||
# Build conversation history from recent messages (last 10 for context)
|
||||
recent_messages = (
|
||||
conversation.messages[-10:]
|
||||
if len(conversation.messages) > 10
|
||||
else conversation.messages
|
||||
)
|
||||
|
||||
messages_payload = [
|
||||
{
|
||||
"role": "system",
|
||||
"content": """You are a helpful cat assistant named Simba that understands veterinary terms. When there are questions to you specifically, they are referring to Simba the cat. Answer the user in as if you were a cat named Simba. Don't act too catlike. Be assertive.
|
||||
|
||||
SIMBA FACTS (as of January 2026):
|
||||
- Name: Simba
|
||||
- Species: Feline (Domestic Short Hair / American Short Hair)
|
||||
- Sex: Male, Neutered
|
||||
- Date of Birth: August 8, 2016 (approximately 9 years 5 months old)
|
||||
- Color: Orange
|
||||
- Current Weight: 16 lbs (as of 1/8/2026)
|
||||
- Owner: Ryan Chen
|
||||
- Location: Long Island City, NY
|
||||
- Veterinarian: Court Square Animal Hospital
|
||||
|
||||
Medical Conditions:
|
||||
- Hypertrophic Cardiomyopathy (HCM): Diagnosed 12/11/2025. Concentric left ventricular hypertrophy with no left atrial dilation. Grade II-III/VI systolic heart murmur. No cardiac medications currently needed. Must avoid Domitor, acepromazine, and ketamine during anesthesia.
|
||||
- Dental Issues: Prior extraction of teeth 307 and 407 due to resorption. Tooth 107 extracted on 1/8/2026. Early resorption lesions present on teeth 207, 309, and 409.
|
||||
|
||||
Recent Medical Events:
|
||||
- 1/8/2026: Dental cleaning and tooth 107 extraction. Prescribed Onsior for 3 days. Oravet sealant applied.
|
||||
- 12/11/2025: Echocardiogram confirming HCM diagnosis. Pre-op bloodwork was normal.
|
||||
- 12/1/2025: Visited for decreased appetite/nausea. Received subcutaneous fluids and Cerenia.
|
||||
|
||||
Diet & Lifestyle:
|
||||
- Diet: Hill's I/D wet and dry food
|
||||
- Supplements: Plaque Off
|
||||
- Indoor only cat, only pet in the household
|
||||
|
||||
Upcoming Appointments:
|
||||
- Rabies Vaccine: Due 2/19/2026
|
||||
- Routine Examination: Due 6/1/2026
|
||||
- FVRCP-3yr Vaccine: Due 10/2/2026
|
||||
|
||||
IMPORTANT: When users ask factual questions about Simba's health, medical history, veterinary visits, medications, weight, or any information that would be in documents, you MUST use the simba_search tool to retrieve accurate information before answering. Do not rely on general knowledge - always search the documents for factual questions.
|
||||
|
||||
BUDGET & FINANCE (YNAB Integration):
|
||||
You have access to Ryan's budget data through YNAB (You Need A Budget). When users ask about financial matters, use the appropriate YNAB tools:
|
||||
- Use ynab_budget_summary for overall budget health and status questions
|
||||
- Use ynab_search_transactions to find specific purchases or spending at particular stores
|
||||
- Use ynab_category_spending to analyze spending by category for a month
|
||||
- Use ynab_insights to provide spending trends, patterns, and recommendations
|
||||
Always use these tools when asked about budgets, spending, transactions, or financial health.""",
|
||||
}
|
||||
]
|
||||
|
||||
# Add recent conversation history
|
||||
for msg in recent_messages[:-1]: # Exclude the message we just added
|
||||
role = "user" if msg.speaker == "user" else "assistant"
|
||||
messages_payload.append({"role": role, "content": msg.text})
|
||||
|
||||
# Add current query
|
||||
messages_payload.append({"role": "user", "content": query})
|
||||
|
||||
messages_payload = _build_messages_payload(conversation, query)
|
||||
payload = {"messages": messages_payload}
|
||||
|
||||
response = await main_agent.ainvoke(payload)
|
||||
@@ -117,6 +93,142 @@ Always use these tools when asked about budgets, spending, transactions, or fina
|
||||
return jsonify({"response": message})
|
||||
|
||||
|
||||
@conversation_blueprint.post("/upload-image")
|
||||
@jwt_refresh_token_required
|
||||
async def upload_image():
|
||||
current_user_uuid = get_jwt_identity()
|
||||
await blueprints.users.models.User.get(id=current_user_uuid)
|
||||
|
||||
files = await request.files
|
||||
form = await request.form
|
||||
file = files.get("file")
|
||||
conversation_id = form.get("conversation_id")
|
||||
|
||||
if not file or not conversation_id:
|
||||
return jsonify({"error": "file and conversation_id are required"}), 400
|
||||
|
||||
file_bytes = file.read()
|
||||
content_type = file.content_type or "image/jpeg"
|
||||
|
||||
try:
|
||||
processed_bytes, output_content_type = process_image(file_bytes, content_type)
|
||||
except ImageValidationError as e:
|
||||
return jsonify({"error": str(e)}), 400
|
||||
|
||||
ext = output_content_type.split("/")[-1]
|
||||
if ext == "jpeg":
|
||||
ext = "jpg"
|
||||
key = f"conversations/{conversation_id}/{uuid.uuid4()}.{ext}"
|
||||
|
||||
await s3_upload_image(processed_bytes, key, output_content_type)
|
||||
|
||||
return jsonify(
|
||||
{
|
||||
"image_key": key,
|
||||
"image_url": f"/api/conversation/image/{key}",
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
@conversation_blueprint.get("/image/<path:image_key>")
|
||||
@jwt_refresh_token_required
|
||||
async def serve_image(image_key: str):
|
||||
try:
|
||||
image_bytes, content_type = await s3_get_image(image_key)
|
||||
except Exception:
|
||||
return jsonify({"error": "Image not found"}), 404
|
||||
|
||||
return Response(
|
||||
image_bytes,
|
||||
content_type=content_type,
|
||||
headers={"Cache-Control": "private, max-age=3600"},
|
||||
)
|
||||
|
||||
|
||||
@conversation_blueprint.post("/stream-query")
|
||||
@jwt_refresh_token_required
|
||||
async def stream_query():
|
||||
current_user_uuid = get_jwt_identity()
|
||||
user = await blueprints.users.models.User.get(id=current_user_uuid)
|
||||
data = await request.get_json()
|
||||
query_text = data.get("query")
|
||||
conversation_id = data.get("conversation_id")
|
||||
image_key = data.get("image_key")
|
||||
conversation = await get_conversation_by_id(conversation_id)
|
||||
await conversation.fetch_related("messages")
|
||||
await add_message_to_conversation(
|
||||
conversation=conversation,
|
||||
message=query_text or "",
|
||||
speaker="user",
|
||||
user=user,
|
||||
image_key=image_key,
|
||||
)
|
||||
|
||||
# If an image was uploaded, analyze it with the vision model
|
||||
image_description = None
|
||||
if image_key:
|
||||
try:
|
||||
image_bytes, _ = await s3_get_image(image_key)
|
||||
image_description = await analyze_user_image(image_bytes)
|
||||
logging.info(f"Image analysis complete for {image_key}")
|
||||
except Exception as e:
|
||||
logging.error(f"Failed to analyze image: {e}")
|
||||
image_description = "[Image could not be analyzed]"
|
||||
|
||||
messages_payload = _build_messages_payload(
|
||||
conversation, query_text or "", image_description
|
||||
)
|
||||
payload = {"messages": messages_payload}
|
||||
|
||||
async def event_generator():
|
||||
final_message = None
|
||||
try:
|
||||
async for event in main_agent.astream_events(payload, version="v2"):
|
||||
event_type = event.get("event")
|
||||
|
||||
if event_type == "on_tool_start":
|
||||
yield f"data: {json.dumps({'type': 'tool_start', 'tool': event['name']})}\n\n"
|
||||
|
||||
elif event_type == "on_tool_end":
|
||||
yield f"data: {json.dumps({'type': 'tool_end', 'tool': event['name']})}\n\n"
|
||||
|
||||
elif event_type == "on_chain_end":
|
||||
output = event.get("data", {}).get("output")
|
||||
if isinstance(output, dict):
|
||||
msgs = output.get("messages", [])
|
||||
if msgs:
|
||||
last_msg = msgs[-1]
|
||||
content = getattr(last_msg, "content", None)
|
||||
if isinstance(content, str) and content:
|
||||
final_message = content
|
||||
|
||||
except Exception as e:
|
||||
yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n"
|
||||
|
||||
if final_message:
|
||||
await add_message_to_conversation(
|
||||
conversation=conversation,
|
||||
message=final_message,
|
||||
speaker="simba",
|
||||
user=user,
|
||||
)
|
||||
yield f"data: {json.dumps({'type': 'response', 'message': final_message})}\n\n"
|
||||
else:
|
||||
yield f"data: {json.dumps({'type': 'error', 'message': 'No response generated'})}\n\n"
|
||||
|
||||
yield "data: [DONE]\n\n"
|
||||
|
||||
return await make_response(
|
||||
event_generator(),
|
||||
200,
|
||||
{
|
||||
"Content-Type": "text/event-stream",
|
||||
"Cache-Control": "no-cache",
|
||||
"X-Accel-Buffering": "no",
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@conversation_blueprint.route("/<conversation_id>")
|
||||
@jwt_refresh_token_required
|
||||
async def get_conversation(conversation_id: str):
|
||||
@@ -134,6 +246,7 @@ async def get_conversation(conversation_id: str):
|
||||
"text": msg.text,
|
||||
"speaker": msg.speaker.value,
|
||||
"created_at": msg.created_at.isoformat(),
|
||||
"image_key": msg.image_key,
|
||||
}
|
||||
)
|
||||
name = conversation.name
|
||||
|
||||
@@ -9,6 +9,7 @@ from langchain_openai import ChatOpenAI
|
||||
from tavily import AsyncTavilyClient
|
||||
|
||||
from blueprints.rag.logic import query_vector_store
|
||||
from utils.obsidian_service import ObsidianService
|
||||
from utils.ynab_service import YNABService
|
||||
|
||||
# Load environment variables
|
||||
@@ -40,6 +41,32 @@ except (ValueError, Exception) as e:
|
||||
print(f"YNAB service not initialized: {e}")
|
||||
ynab_enabled = False
|
||||
|
||||
# Initialize Obsidian service (will only work if OBSIDIAN_VAULT_PATH is set)
|
||||
try:
|
||||
obsidian_service = ObsidianService()
|
||||
obsidian_enabled = True
|
||||
except (ValueError, Exception) as e:
|
||||
print(f"Obsidian service not initialized: {e}")
|
||||
obsidian_enabled = False
|
||||
|
||||
|
||||
@tool
|
||||
def get_current_date() -> str:
|
||||
"""Get today's date in a human-readable format.
|
||||
|
||||
Use this tool when you need to:
|
||||
- Reference today's date in your response
|
||||
- Answer questions like "what is today's date"
|
||||
- Format dates in messages or documents
|
||||
- Calculate time periods relative to today
|
||||
|
||||
Returns:
|
||||
Today's date in YYYY-MM-DD format
|
||||
"""
|
||||
from datetime import date
|
||||
|
||||
return date.today().isoformat()
|
||||
|
||||
|
||||
@tool
|
||||
async def web_search(query: str) -> str:
|
||||
@@ -279,8 +306,291 @@ def ynab_insights(months_back: int = 3) -> str:
|
||||
return f"Error generating insights: {str(e)}"
|
||||
|
||||
|
||||
@tool
|
||||
async def obsidian_search_notes(query: str) -> str:
|
||||
"""Search through Obsidian vault notes for information.
|
||||
|
||||
Use this tool when you need to:
|
||||
- Find information in personal notes
|
||||
- Research past ideas or thoughts from your vault
|
||||
- Look up information stored in markdown files
|
||||
- Search for content that would be in your notes
|
||||
|
||||
Args:
|
||||
query: The search query to look up in your Obsidian vault
|
||||
|
||||
Returns:
|
||||
Relevant notes with their content and metadata
|
||||
"""
|
||||
if not obsidian_enabled:
|
||||
return "Obsidian integration is not configured. Please set OBSIDIAN_VAULT_PATH environment variable."
|
||||
|
||||
try:
|
||||
# Query ChromaDB for obsidian documents
|
||||
serialized, docs = await query_vector_store(query=query)
|
||||
return serialized
|
||||
|
||||
except Exception as e:
|
||||
return f"Error searching Obsidian notes: {str(e)}"
|
||||
|
||||
|
||||
@tool
|
||||
async def obsidian_read_note(relative_path: str) -> str:
|
||||
"""Read a specific note from your Obsidian vault.
|
||||
|
||||
Use this tool when you want to:
|
||||
- Read the full content of a specific note
|
||||
- Get detailed information from a particular markdown file
|
||||
- Access content from a known note path
|
||||
|
||||
Args:
|
||||
relative_path: Path to note relative to vault root (e.g., "notes/my-note.md")
|
||||
|
||||
Returns:
|
||||
Full content and metadata of the requested note
|
||||
"""
|
||||
if not obsidian_enabled:
|
||||
return "Obsidian integration is not configured. Please set OBSIDIAN_VAULT_PATH environment variable."
|
||||
|
||||
try:
|
||||
note = obsidian_service.read_note(relative_path)
|
||||
content_data = note["content"]
|
||||
|
||||
result = f"File: {note['path']}\n\n"
|
||||
result += f"Frontmatter:\n{content_data['metadata']}\n\n"
|
||||
result += f"Content:\n{content_data['content']}\n\n"
|
||||
result += f"Tags: {', '.join(content_data['tags'])}\n"
|
||||
result += f"Contains {len(content_data['wikilinks'])} wikilinks and {len(content_data['embeds'])} embeds"
|
||||
|
||||
return result
|
||||
|
||||
except FileNotFoundError:
|
||||
return f"Note not found at '{relative_path}'. Please check the path is correct."
|
||||
except Exception as e:
|
||||
return f"Error reading note: {str(e)}"
|
||||
|
||||
|
||||
@tool
|
||||
async def obsidian_create_note(
|
||||
title: str,
|
||||
content: str,
|
||||
folder: str = "notes",
|
||||
tags: str = "",
|
||||
) -> str:
|
||||
"""Create a new note in your Obsidian vault.
|
||||
|
||||
Use this tool when you want to:
|
||||
- Save research findings or ideas to your vault
|
||||
- Create a new document with a specific title
|
||||
- Write notes for future reference
|
||||
|
||||
Args:
|
||||
title: The title of the note (will be used as filename)
|
||||
content: The body content of the note
|
||||
folder: The folder where to create the note (default: "notes")
|
||||
tags: Comma-separated list of tags to add (default: "")
|
||||
|
||||
Returns:
|
||||
Path to the created note
|
||||
"""
|
||||
if not obsidian_enabled:
|
||||
return "Obsidian integration is not configured. Please set OBSIDIAN_VAULT_PATH environment variable."
|
||||
|
||||
try:
|
||||
# Parse tags from comma-separated string
|
||||
tag_list = [tag.strip() for tag in tags.split(",") if tag.strip()]
|
||||
|
||||
relative_path = obsidian_service.create_note(
|
||||
title=title,
|
||||
content=content,
|
||||
folder=folder,
|
||||
tags=tag_list,
|
||||
)
|
||||
|
||||
return f"Successfully created note: {relative_path}"
|
||||
|
||||
except Exception as e:
|
||||
return f"Error creating note: {str(e)}"
|
||||
|
||||
|
||||
@tool
|
||||
def journal_get_today() -> str:
|
||||
"""Get today's daily journal note, including all tasks and log entries.
|
||||
|
||||
Use this tool when the user asks about:
|
||||
- What's on their plate today
|
||||
- Today's tasks or to-do list
|
||||
- Today's journal entry
|
||||
- What they've logged today
|
||||
|
||||
Returns:
|
||||
The full content of today's daily note, or a message if it doesn't exist.
|
||||
"""
|
||||
if not obsidian_enabled:
|
||||
return "Obsidian integration is not configured."
|
||||
|
||||
try:
|
||||
note = obsidian_service.get_daily_note()
|
||||
if not note["found"]:
|
||||
return f"No daily note found for {note['date']}. Use journal_add_task to create one."
|
||||
return f"Daily note for {note['date']}:\n\n{note['content']}"
|
||||
except Exception as e:
|
||||
return f"Error reading daily note: {str(e)}"
|
||||
|
||||
|
||||
@tool
|
||||
def journal_get_tasks(date: str = "") -> str:
|
||||
"""Get tasks from a daily journal note.
|
||||
|
||||
Use this tool when the user asks about:
|
||||
- Open or pending tasks for a day
|
||||
- What tasks are done or not done
|
||||
- Task status for today or a specific date
|
||||
|
||||
Args:
|
||||
date: Date in YYYY-MM-DD format (optional, defaults to today)
|
||||
|
||||
Returns:
|
||||
List of tasks with their completion status.
|
||||
"""
|
||||
if not obsidian_enabled:
|
||||
return "Obsidian integration is not configured."
|
||||
|
||||
try:
|
||||
from datetime import datetime as dt
|
||||
|
||||
parsed_date = dt.strptime(date, "%Y-%m-%d") if date else None
|
||||
result = obsidian_service.get_daily_tasks(parsed_date)
|
||||
|
||||
if not result["found"]:
|
||||
return f"No daily note found for {result['date']}."
|
||||
|
||||
if not result["tasks"]:
|
||||
return f"No tasks found in the {result['date']} note."
|
||||
|
||||
lines = [f"Tasks for {result['date']}:"]
|
||||
for task in result["tasks"]:
|
||||
status = "[x]" if task["done"] else "[ ]"
|
||||
lines.append(f"- {status} {task['text']}")
|
||||
return "\n".join(lines)
|
||||
except Exception as e:
|
||||
return f"Error reading tasks: {str(e)}"
|
||||
|
||||
|
||||
@tool
|
||||
def journal_add_task(task: str, date: str = "") -> str:
|
||||
"""Add a task to a daily journal note.
|
||||
|
||||
Use this tool when the user wants to:
|
||||
- Add a task or to-do to today's note
|
||||
- Remind themselves to do something
|
||||
- Track a new item in their daily note
|
||||
|
||||
Args:
|
||||
task: The task description to add
|
||||
date: Date in YYYY-MM-DD format (optional, defaults to today)
|
||||
|
||||
Returns:
|
||||
Confirmation of the added task.
|
||||
"""
|
||||
if not obsidian_enabled:
|
||||
return "Obsidian integration is not configured."
|
||||
|
||||
try:
|
||||
from datetime import datetime as dt
|
||||
|
||||
parsed_date = dt.strptime(date, "%Y-%m-%d") if date else None
|
||||
result = obsidian_service.add_task_to_daily_note(task, parsed_date)
|
||||
|
||||
if result["success"]:
|
||||
note_date = date or dt.now().strftime("%Y-%m-%d")
|
||||
extra = " (created new note)" if result["created_note"] else ""
|
||||
return f"Added task '{task}' to {note_date}{extra}."
|
||||
return "Failed to add task."
|
||||
except Exception as e:
|
||||
return f"Error adding task: {str(e)}"
|
||||
|
||||
|
||||
@tool
|
||||
def journal_complete_task(task: str, date: str = "") -> str:
|
||||
"""Mark a task as complete in a daily journal note.
|
||||
|
||||
Use this tool when the user wants to:
|
||||
- Check off a task as done
|
||||
- Mark something as completed
|
||||
- Update task status in their daily note
|
||||
|
||||
Args:
|
||||
task: The task text to mark complete (exact or partial match)
|
||||
date: Date in YYYY-MM-DD format (optional, defaults to today)
|
||||
|
||||
Returns:
|
||||
Confirmation that the task was marked complete.
|
||||
"""
|
||||
if not obsidian_enabled:
|
||||
return "Obsidian integration is not configured."
|
||||
|
||||
try:
|
||||
from datetime import datetime as dt
|
||||
|
||||
parsed_date = dt.strptime(date, "%Y-%m-%d") if date else None
|
||||
result = obsidian_service.complete_task_in_daily_note(task, parsed_date)
|
||||
|
||||
if result["success"]:
|
||||
return f"Marked '{result['completed_task']}' as complete."
|
||||
return f"Could not complete task: {result.get('error', 'unknown error')}"
|
||||
except Exception as e:
|
||||
return f"Error completing task: {str(e)}"
|
||||
|
||||
|
||||
@tool
|
||||
async def obsidian_create_task(
|
||||
title: str,
|
||||
content: str = "",
|
||||
folder: str = "tasks",
|
||||
due_date: str = "",
|
||||
tags: str = "",
|
||||
) -> str:
|
||||
"""Create a new task note in your Obsidian vault.
|
||||
|
||||
Use this tool when you want to:
|
||||
- Create a task to remember to do something
|
||||
- Add a task with a due date
|
||||
- Track tasks in your vault
|
||||
|
||||
Args:
|
||||
title: The title of the task
|
||||
content: The description of the task (optional)
|
||||
folder: The folder to place the task (default: "tasks")
|
||||
due_date: Due date in YYYY-MM-DD format (optional)
|
||||
tags: Comma-separated list of tags to add (optional)
|
||||
|
||||
Returns:
|
||||
Path to the created task note
|
||||
"""
|
||||
if not obsidian_enabled:
|
||||
return "Obsidian integration is not configured. Please set OBSIDIAN_VAULT_PATH environment variable."
|
||||
|
||||
try:
|
||||
# Parse tags from comma-separated string
|
||||
tag_list = [tag.strip() for tag in tags.split(",") if tag.strip()]
|
||||
|
||||
relative_path = obsidian_service.create_task(
|
||||
title=title,
|
||||
content=content,
|
||||
folder=folder,
|
||||
due_date=due_date or None,
|
||||
tags=tag_list,
|
||||
)
|
||||
|
||||
return f"Successfully created task: {relative_path}"
|
||||
|
||||
except Exception as e:
|
||||
return f"Error creating task: {str(e)}"
|
||||
|
||||
|
||||
# Create tools list based on what's available
|
||||
tools = [simba_search, web_search]
|
||||
tools = [get_current_date, simba_search, web_search]
|
||||
if ynab_enabled:
|
||||
tools.extend(
|
||||
[
|
||||
@@ -290,6 +600,19 @@ if ynab_enabled:
|
||||
ynab_insights,
|
||||
]
|
||||
)
|
||||
if obsidian_enabled:
|
||||
tools.extend(
|
||||
[
|
||||
obsidian_search_notes,
|
||||
obsidian_read_note,
|
||||
obsidian_create_note,
|
||||
obsidian_create_task,
|
||||
journal_get_today,
|
||||
journal_get_tasks,
|
||||
journal_add_task,
|
||||
journal_complete_task,
|
||||
]
|
||||
)
|
||||
|
||||
# Llama 3.1 supports native function calling via OpenAI-compatible API
|
||||
main_agent = create_agent(model=model_with_fallback, tools=tools)
|
||||
|
||||
@@ -16,12 +16,14 @@ async def add_message_to_conversation(
|
||||
message: str,
|
||||
speaker: str,
|
||||
user: blueprints.users.models.User,
|
||||
image_key: str | None = None,
|
||||
) -> ConversationMessage:
|
||||
print(conversation, message, speaker)
|
||||
message = await ConversationMessage.create(
|
||||
text=message,
|
||||
speaker=speaker,
|
||||
conversation=conversation,
|
||||
image_key=image_key,
|
||||
)
|
||||
|
||||
return message
|
||||
|
||||
@@ -41,6 +41,7 @@ class ConversationMessage(Model):
|
||||
)
|
||||
created_at = fields.DatetimeField(auto_now_add=True)
|
||||
speaker = fields.CharEnumField(enum_type=Speaker, max_length=10)
|
||||
image_key = fields.CharField(max_length=512, null=True, default=None)
|
||||
|
||||
class Meta:
|
||||
table = "conversation_messages"
|
||||
|
||||
57
blueprints/conversation/prompts.py
Normal file
57
blueprints/conversation/prompts.py
Normal file
@@ -0,0 +1,57 @@
|
||||
SIMBA_SYSTEM_PROMPT = """You are a helpful cat assistant named Simba that understands veterinary terms. When there are questions to you specifically, they are referring to Simba the cat. Answer the user in as if you were a cat named Simba. Don't act too catlike. Be assertive.
|
||||
|
||||
SIMBA FACTS (as of January 2026):
|
||||
- Name: Simba
|
||||
- Species: Feline (Domestic Short Hair / American Short Hair)
|
||||
- Sex: Male, Neutered
|
||||
- Date of Birth: August 8, 2016 (approximately 9 years 5 months old)
|
||||
- Color: Orange
|
||||
- Current Weight: 16 lbs (as of 1/8/2026)
|
||||
- Owner: Ryan Chen
|
||||
- Location: Long Island City, NY
|
||||
- Veterinarian: Court Square Animal Hospital
|
||||
|
||||
Medical Conditions:
|
||||
- Hypertrophic Cardiomyopathy (HCM): Diagnosed 12/11/2025. Concentric left ventricular hypertrophy with no left atrial dilation. Grade II-III/VI systolic heart murmur. No cardiac medications currently needed. Must avoid Domitor, acepromazine, and ketamine during anesthesia.
|
||||
- Dental Issues: Prior extraction of teeth 307 and 407 due to resorption. Tooth 107 extracted on 1/8/2026. Early resorption lesions present on teeth 207, 309, and 409.
|
||||
|
||||
Recent Medical Events:
|
||||
- 1/8/2026: Dental cleaning and tooth 107 extraction. Prescribed Onsior for 3 days. Oravet sealant applied.
|
||||
- 12/11/2025: Echocardiogram confirming HCM diagnosis. Pre-op bloodwork was normal.
|
||||
- 12/1/2025: Visited for decreased appetite/nausea. Received subcutaneous fluids and Cerenia.
|
||||
|
||||
Diet & Lifestyle:
|
||||
- Diet: Hill's I/D wet and dry food
|
||||
- Supplements: Plaque Off
|
||||
- Indoor only cat, only pet in the household
|
||||
|
||||
Upcoming Appointments:
|
||||
- Rabies Vaccine: Due 2/19/2026
|
||||
- Routine Examination: Due 6/1/2026
|
||||
- FVRCP-3yr Vaccine: Due 10/2/2026
|
||||
|
||||
IMPORTANT: When users ask factual questions about Simba's health, medical history, veterinary visits, medications, weight, or any information that would be in documents, you MUST use the simba_search tool to retrieve accurate information before answering. Do not rely on general knowledge - always search the documents for factual questions.
|
||||
|
||||
BUDGET & FINANCE (YNAB Integration):
|
||||
You have access to Ryan's budget data through YNAB (You Need A Budget). When users ask about financial matters, use the appropriate YNAB tools:
|
||||
- Use ynab_budget_summary for overall budget health and status questions
|
||||
- Use ynab_search_transactions to find specific purchases or spending at particular stores
|
||||
- Use ynab_category_spending to analyze spending by category for a month
|
||||
- Use ynab_insights to provide spending trends, patterns, and recommendations
|
||||
Always use these tools when asked about budgets, spending, transactions, or financial health.
|
||||
|
||||
NOTES & RESEARCH (Obsidian Integration):
|
||||
You have access to Ryan's Obsidian vault through the Obsidian integration. When users ask about research, personal notes, or information that might be stored in markdown files, use the appropriate Obsidian tools:
|
||||
- Use obsidian_search_notes to search through your vault for relevant information
|
||||
- Use obsidian_read_note to read the full content of a specific note by path
|
||||
- Use obsidian_create_note to save new findings, ideas, or research to your vault
|
||||
- Use obsidian_create_task to create task notes with due dates
|
||||
Always use these tools when users ask about notes, research, ideas, tasks, or when you want to save information for future reference.
|
||||
|
||||
DAILY JOURNAL (Task Tracking):
|
||||
You have access to Ryan's daily journal notes. Each note lives at journal/YYYY/YYYY-MM-DD.md and has two sections: tasks and log.
|
||||
- Use journal_get_today to read today's full daily note (tasks + log)
|
||||
- Use journal_get_tasks to list tasks (done/pending) for today or a specific date
|
||||
- Use journal_add_task to add a new task to today's (or a given date's) note
|
||||
- Use journal_complete_task to check off a task as done
|
||||
Use these tools when Ryan asks about today's tasks, wants to add something to his list, or wants to mark a task complete."""
|
||||
@@ -1,16 +1,227 @@
|
||||
"""
|
||||
Email blueprint for IMAP email ingestion.
|
||||
import os
|
||||
import hmac
|
||||
import hashlib
|
||||
import logging
|
||||
import functools
|
||||
import time
|
||||
from collections import defaultdict
|
||||
|
||||
Provides API endpoints for managing email accounts and querying email content.
|
||||
Admin-only access enforced via lldap_admin group membership.
|
||||
"""
|
||||
import httpx
|
||||
from quart import Blueprint, request
|
||||
|
||||
from quart import Blueprint
|
||||
from blueprints.users.models import User
|
||||
from blueprints.conversation.logic import (
|
||||
get_conversation_for_user,
|
||||
add_message_to_conversation,
|
||||
)
|
||||
from blueprints.conversation.agents import main_agent
|
||||
from blueprints.conversation.prompts import SIMBA_SYSTEM_PROMPT
|
||||
from . import models # noqa: F401 — register Tortoise ORM models
|
||||
from .helpers import generate_email_token, get_user_email_address # noqa: F401
|
||||
|
||||
# Import models for Tortoise ORM registration
|
||||
from . import models # noqa: F401
|
||||
email_blueprint = Blueprint("email_api", __name__, url_prefix="/api/email")
|
||||
|
||||
# Create blueprint
|
||||
email_blueprint = Blueprint("email", __name__, url_prefix="/api/email")
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Routes will be added in Phase 2
|
||||
# Rate limiting: per-sender message timestamps
|
||||
_rate_limit_store: dict[str, list[float]] = defaultdict(list)
|
||||
|
||||
RATE_LIMIT_MAX = int(os.getenv("EMAIL_RATE_LIMIT_MAX", "5"))
|
||||
RATE_LIMIT_WINDOW = int(os.getenv("EMAIL_RATE_LIMIT_WINDOW", "300"))
|
||||
|
||||
MAX_MESSAGE_LENGTH = 2000
|
||||
|
||||
|
||||
# --- Mailgun signature validation ---
|
||||
|
||||
|
||||
def validate_mailgun_signature(f):
|
||||
"""Decorator to validate Mailgun webhook signatures."""
|
||||
|
||||
@functools.wraps(f)
|
||||
async def decorated_function(*args, **kwargs):
|
||||
if os.getenv("MAILGUN_SIGNATURE_VALIDATION", "true").lower() == "false":
|
||||
return await f(*args, **kwargs)
|
||||
|
||||
signing_key = os.getenv("MAILGUN_WEBHOOK_SIGNING_KEY")
|
||||
if not signing_key:
|
||||
logger.error("MAILGUN_WEBHOOK_SIGNING_KEY not set — rejecting request")
|
||||
return "", 406
|
||||
|
||||
form_data = await request.form
|
||||
timestamp = form_data.get("timestamp", "")
|
||||
token = form_data.get("token", "")
|
||||
signature = form_data.get("signature", "")
|
||||
|
||||
if not timestamp or not token or not signature:
|
||||
logger.warning("Missing Mailgun signature fields")
|
||||
return "", 406
|
||||
|
||||
expected = hmac.new(
|
||||
signing_key.encode(),
|
||||
f"{timestamp}{token}".encode(),
|
||||
hashlib.sha256,
|
||||
).hexdigest()
|
||||
|
||||
if not hmac.compare_digest(expected, signature):
|
||||
logger.warning("Invalid Mailgun signature")
|
||||
return "", 406
|
||||
|
||||
return await f(*args, **kwargs)
|
||||
|
||||
return decorated_function
|
||||
|
||||
|
||||
# --- Rate limiting ---
|
||||
|
||||
|
||||
def _check_rate_limit(sender: str) -> bool:
|
||||
"""Check if a sender has exceeded the rate limit.
|
||||
|
||||
Returns True if the request is allowed, False if rate-limited.
|
||||
"""
|
||||
now = time.monotonic()
|
||||
cutoff = now - RATE_LIMIT_WINDOW
|
||||
|
||||
timestamps = _rate_limit_store[sender]
|
||||
_rate_limit_store[sender] = [t for t in timestamps if t > cutoff]
|
||||
|
||||
if len(_rate_limit_store[sender]) >= RATE_LIMIT_MAX:
|
||||
return False
|
||||
|
||||
_rate_limit_store[sender].append(now)
|
||||
return True
|
||||
|
||||
|
||||
# --- Send reply via Mailgun API ---
|
||||
|
||||
|
||||
async def send_email_reply(
|
||||
to: str, subject: str, body: str, in_reply_to: str | None = None
|
||||
):
|
||||
"""Send a reply email via the Mailgun API."""
|
||||
api_key = os.getenv("MAILGUN_API_KEY")
|
||||
domain = os.getenv("MAILGUN_DOMAIN")
|
||||
if not api_key or not domain:
|
||||
logger.error("MAILGUN_API_KEY or MAILGUN_DOMAIN not configured")
|
||||
return
|
||||
|
||||
data = {
|
||||
"from": f"Simba <simba@{domain}>",
|
||||
"to": to,
|
||||
"subject": f"Re: {subject}" if not subject.startswith("Re:") else subject,
|
||||
"text": body,
|
||||
}
|
||||
if in_reply_to:
|
||||
data["h:In-Reply-To"] = in_reply_to
|
||||
|
||||
async with httpx.AsyncClient() as client:
|
||||
resp = await client.post(
|
||||
f"https://api.mailgun.net/v3/{domain}/messages",
|
||||
auth=("api", api_key),
|
||||
data=data,
|
||||
)
|
||||
if resp.status_code != 200:
|
||||
logger.error(f"Mailgun send failed ({resp.status_code}): {resp.text}")
|
||||
else:
|
||||
logger.info(f"Sent email reply to {to}")
|
||||
|
||||
|
||||
# --- Webhook route ---
|
||||
|
||||
|
||||
@email_blueprint.route("/webhook", methods=["POST"])
|
||||
@validate_mailgun_signature
|
||||
async def webhook():
|
||||
"""Handle inbound emails forwarded by Mailgun."""
|
||||
form_data = await request.form
|
||||
sender = form_data.get("sender", "")
|
||||
recipient = form_data.get("recipient", "")
|
||||
body = form_data.get("stripped-text", "")
|
||||
subject = form_data.get("subject", "(no subject)")
|
||||
message_id = form_data.get("Message-Id", "")
|
||||
|
||||
# Extract token from recipient: ask+<token>@domain
|
||||
local_part = recipient.split("@")[0] if "@" in recipient else ""
|
||||
if "+" not in local_part:
|
||||
logger.info(f"Ignoring email to {recipient} — no token in address")
|
||||
return "", 200
|
||||
|
||||
token = local_part.split("+", 1)[1]
|
||||
|
||||
# Lookup user by token
|
||||
user = await User.filter(email_hmac_token=token, email_enabled=True).first()
|
||||
if not user:
|
||||
logger.info(f"No user found for email token {token}")
|
||||
return "", 200
|
||||
|
||||
# Rate limit
|
||||
if not _check_rate_limit(sender):
|
||||
logger.warning(f"Rate limit exceeded for email sender {sender}")
|
||||
return "", 200
|
||||
|
||||
# Clean up body
|
||||
body = (body or "").strip()
|
||||
if not body:
|
||||
logger.info(f"Ignoring empty email from {sender}")
|
||||
return "", 200
|
||||
|
||||
if len(body) > MAX_MESSAGE_LENGTH:
|
||||
body = body[:MAX_MESSAGE_LENGTH]
|
||||
logger.info(f"Truncated long email from {sender} to {MAX_MESSAGE_LENGTH} chars")
|
||||
|
||||
logger.info(
|
||||
f"Processing email from {sender} for user {user.username}: {body[:100]}"
|
||||
)
|
||||
|
||||
# Get or create conversation
|
||||
try:
|
||||
conversation = await get_conversation_for_user(user=user)
|
||||
await conversation.fetch_related("messages")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get conversation for user {user.username}: {e}")
|
||||
return "", 200
|
||||
|
||||
# Add user message
|
||||
await add_message_to_conversation(
|
||||
conversation=conversation,
|
||||
message=body,
|
||||
speaker="user",
|
||||
user=user,
|
||||
)
|
||||
|
||||
# Build messages payload
|
||||
try:
|
||||
messages = await conversation.messages.all()
|
||||
recent_messages = list(messages)[-10:]
|
||||
|
||||
messages_payload = [{"role": "system", "content": SIMBA_SYSTEM_PROMPT}]
|
||||
for msg in recent_messages[:-1]:
|
||||
role = "user" if msg.speaker == "user" else "assistant"
|
||||
messages_payload.append({"role": role, "content": msg.text})
|
||||
messages_payload.append({"role": "user", "content": body})
|
||||
|
||||
logger.info(f"Invoking LangChain agent with {len(messages_payload)} messages")
|
||||
response = await main_agent.ainvoke({"messages": messages_payload})
|
||||
response_text = response.get("messages", [])[-1].content
|
||||
except Exception as e:
|
||||
logger.error(f"Error invoking agent for email: {e}")
|
||||
response_text = "Sorry, I'm having trouble thinking right now."
|
||||
|
||||
# Save response
|
||||
await add_message_to_conversation(
|
||||
conversation=conversation,
|
||||
message=response_text,
|
||||
speaker="simba",
|
||||
user=user,
|
||||
)
|
||||
|
||||
# Send reply email
|
||||
await send_email_reply(
|
||||
to=sender,
|
||||
subject=subject,
|
||||
body=response_text,
|
||||
in_reply_to=message_id,
|
||||
)
|
||||
|
||||
return "", 200
|
||||
|
||||
14
blueprints/email/helpers.py
Normal file
14
blueprints/email/helpers.py
Normal file
@@ -0,0 +1,14 @@
|
||||
import hmac
|
||||
import hashlib
|
||||
|
||||
|
||||
def generate_email_token(user_id: str, secret: str) -> str:
|
||||
"""Generate a 16-char hex HMAC token for a user's email address."""
|
||||
return hmac.new(
|
||||
secret.encode(), str(user_id).encode(), hashlib.sha256
|
||||
).hexdigest()[:16]
|
||||
|
||||
|
||||
def get_user_email_address(token: str, domain: str) -> str:
|
||||
"""Return the routable email address for a given token."""
|
||||
return f"ask+{token}@{domain}"
|
||||
@@ -1,7 +1,7 @@
|
||||
from quart import Blueprint, jsonify
|
||||
from quart_jwt_extended import jwt_refresh_token_required
|
||||
|
||||
from .logic import get_vector_store_stats, index_documents, vector_store
|
||||
from .logic import fetch_obsidian_documents, get_vector_store_stats, index_documents, index_obsidian_documents, vector_store
|
||||
from blueprints.users.decorators import admin_required
|
||||
|
||||
rag_blueprint = Blueprint("rag_api", __name__, url_prefix="/api/rag")
|
||||
@@ -45,3 +45,15 @@ async def trigger_reindex():
|
||||
return jsonify({"status": "success", "stats": stats})
|
||||
except Exception as e:
|
||||
return jsonify({"status": "error", "message": str(e)}), 500
|
||||
|
||||
|
||||
@rag_blueprint.post("/index-obsidian")
|
||||
@admin_required
|
||||
async def trigger_obsidian_index():
|
||||
"""Index all Obsidian markdown documents into vector store. Admin only."""
|
||||
try:
|
||||
result = await index_obsidian_documents()
|
||||
stats = get_vector_store_stats()
|
||||
return jsonify({"status": "success", "result": result, "stats": stats})
|
||||
except Exception as e:
|
||||
return jsonify({"status": "error", "message": str(e)}), 500
|
||||
|
||||
@@ -8,6 +8,7 @@ from langchain_openai import OpenAIEmbeddings
|
||||
from langchain_text_splitters import RecursiveCharacterTextSplitter
|
||||
|
||||
from .fetchers import PaperlessNGXService
|
||||
from utils.obsidian_service import ObsidianService
|
||||
|
||||
# Load environment variables
|
||||
load_dotenv()
|
||||
@@ -58,12 +59,75 @@ async def fetch_documents_from_paperless_ngx() -> list[Document]:
|
||||
|
||||
|
||||
async def index_documents():
|
||||
"""Index Paperless-NGX documents into vector store."""
|
||||
documents = await fetch_documents_from_paperless_ngx()
|
||||
|
||||
splits = text_splitter.split_documents(documents)
|
||||
await vector_store.aadd_documents(documents=splits)
|
||||
|
||||
|
||||
async def fetch_obsidian_documents() -> list[Document]:
|
||||
"""Fetch all markdown documents from Obsidian vault.
|
||||
|
||||
Returns:
|
||||
List of LangChain Document objects with source='obsidian' metadata.
|
||||
"""
|
||||
obsidian_service = ObsidianService()
|
||||
documents = []
|
||||
|
||||
for md_path in obsidian_service.walk_vault():
|
||||
try:
|
||||
# Read markdown file
|
||||
with open(md_path, "r", encoding="utf-8") as f:
|
||||
content = f.read()
|
||||
|
||||
# Parse metadata
|
||||
parsed = obsidian_service.parse_markdown(content, md_path)
|
||||
|
||||
# Create LangChain Document with obsidian source
|
||||
document = Document(
|
||||
page_content=parsed["content"],
|
||||
metadata={
|
||||
"source": "obsidian",
|
||||
"filepath": parsed["filepath"],
|
||||
"tags": parsed["tags"],
|
||||
"created_at": parsed["metadata"].get("created_at"),
|
||||
**{k: v for k, v in parsed["metadata"].items() if k not in ["created_at", "created_by"]},
|
||||
},
|
||||
)
|
||||
documents.append(document)
|
||||
|
||||
except Exception as e:
|
||||
print(f"Error reading {md_path}: {e}")
|
||||
continue
|
||||
|
||||
return documents
|
||||
|
||||
|
||||
async def index_obsidian_documents():
|
||||
"""Index all Obsidian markdown documents into vector store.
|
||||
|
||||
Deletes existing obsidian source chunks before re-indexing.
|
||||
"""
|
||||
obsidian_service = ObsidianService()
|
||||
documents = await fetch_obsidian_documents()
|
||||
|
||||
if not documents:
|
||||
print("No Obsidian documents found to index")
|
||||
return {"indexed": 0}
|
||||
|
||||
# Delete existing obsidian chunks
|
||||
existing_results = vector_store.get(where={"source": "obsidian"})
|
||||
if existing_results.get("ids"):
|
||||
await vector_store.adelete(existing_results["ids"])
|
||||
|
||||
# Split and index documents
|
||||
splits = text_splitter.split_documents(documents)
|
||||
await vector_store.aadd_documents(documents=splits)
|
||||
|
||||
return {"indexed": len(documents)}
|
||||
|
||||
|
||||
async def query_vector_store(query: str):
|
||||
retrieved_docs = await vector_store.asimilarity_search(query, k=2)
|
||||
serialized = "\n\n".join(
|
||||
|
||||
@@ -7,7 +7,9 @@ from quart_jwt_extended import (
|
||||
)
|
||||
from .models import User
|
||||
from .oidc_service import OIDCUserService
|
||||
from .decorators import admin_required
|
||||
from config.oidc_config import oidc_config
|
||||
import os
|
||||
import secrets
|
||||
import httpx
|
||||
from urllib.parse import urlencode
|
||||
@@ -131,6 +133,21 @@ async def oidc_callback():
|
||||
except Exception as e:
|
||||
return jsonify({"error": f"ID token verification failed: {str(e)}"}), 400
|
||||
|
||||
# Fetch userinfo to get groups (older Authelia versions only include groups there)
|
||||
userinfo_endpoint = discovery.get("userinfo_endpoint")
|
||||
if userinfo_endpoint:
|
||||
access_token_str = tokens.get("access_token")
|
||||
if access_token_str:
|
||||
async with httpx.AsyncClient() as client:
|
||||
userinfo_response = await client.get(
|
||||
userinfo_endpoint,
|
||||
headers={"Authorization": f"Bearer {access_token_str}"},
|
||||
)
|
||||
if userinfo_response.status_code == 200:
|
||||
userinfo = userinfo_response.json()
|
||||
if "groups" in userinfo and "groups" not in claims:
|
||||
claims["groups"] = userinfo["groups"]
|
||||
|
||||
# Get or create user from OIDC claims
|
||||
user = await OIDCUserService.get_or_create_user_from_oidc(claims)
|
||||
|
||||
@@ -186,3 +203,122 @@ async def login():
|
||||
refresh_token=refresh_token,
|
||||
user={"id": str(user.id), "username": user.username},
|
||||
)
|
||||
|
||||
|
||||
@user_blueprint.route("/me", methods=["GET"])
|
||||
@jwt_refresh_token_required
|
||||
async def me():
|
||||
user_id = get_jwt_identity()
|
||||
user = await User.get_or_none(id=user_id)
|
||||
if not user:
|
||||
return jsonify({"error": "User not found"}), 404
|
||||
return jsonify({
|
||||
"id": str(user.id),
|
||||
"username": user.username,
|
||||
"email": user.email,
|
||||
"is_admin": user.is_admin(),
|
||||
})
|
||||
|
||||
|
||||
@user_blueprint.route("/admin/users", methods=["GET"])
|
||||
@admin_required
|
||||
async def list_users():
|
||||
from blueprints.email.helpers import get_user_email_address
|
||||
users = await User.all().order_by("username")
|
||||
mailgun_domain = os.getenv("MAILGUN_DOMAIN", "")
|
||||
return jsonify([
|
||||
{
|
||||
"id": str(u.id),
|
||||
"username": u.username,
|
||||
"email": u.email,
|
||||
"whatsapp_number": u.whatsapp_number,
|
||||
"auth_provider": u.auth_provider,
|
||||
"email_enabled": u.email_enabled,
|
||||
"email_address": get_user_email_address(u.email_hmac_token, mailgun_domain) if u.email_hmac_token and u.email_enabled else None,
|
||||
}
|
||||
for u in users
|
||||
])
|
||||
|
||||
|
||||
@user_blueprint.route("/admin/users/<user_id>/whatsapp", methods=["PUT"])
|
||||
@admin_required
|
||||
async def set_whatsapp(user_id):
|
||||
data = await request.get_json()
|
||||
number = (data or {}).get("whatsapp_number", "").strip()
|
||||
if not number:
|
||||
return jsonify({"error": "whatsapp_number is required"}), 400
|
||||
|
||||
user = await User.get_or_none(id=user_id)
|
||||
if not user:
|
||||
return jsonify({"error": "User not found"}), 404
|
||||
|
||||
conflict = await User.filter(whatsapp_number=number).exclude(id=user_id).first()
|
||||
if conflict:
|
||||
return jsonify({"error": "That WhatsApp number is already linked to another account"}), 409
|
||||
|
||||
user.whatsapp_number = number
|
||||
await user.save()
|
||||
return jsonify({
|
||||
"id": str(user.id),
|
||||
"username": user.username,
|
||||
"email": user.email,
|
||||
"whatsapp_number": user.whatsapp_number,
|
||||
"auth_provider": user.auth_provider,
|
||||
})
|
||||
|
||||
|
||||
@user_blueprint.route("/admin/users/<user_id>/whatsapp", methods=["DELETE"])
|
||||
@admin_required
|
||||
async def unlink_whatsapp(user_id):
|
||||
user = await User.get_or_none(id=user_id)
|
||||
if not user:
|
||||
return jsonify({"error": "User not found"}), 404
|
||||
|
||||
user.whatsapp_number = None
|
||||
await user.save()
|
||||
return jsonify({"ok": True})
|
||||
|
||||
|
||||
@user_blueprint.route("/admin/users/<user_id>/email", methods=["PUT"])
|
||||
@admin_required
|
||||
async def toggle_email(user_id):
|
||||
"""Enable email channel for a user, generating an HMAC token."""
|
||||
from blueprints.email.helpers import generate_email_token, get_user_email_address
|
||||
user = await User.get_or_none(id=user_id)
|
||||
if not user:
|
||||
return jsonify({"error": "User not found"}), 404
|
||||
|
||||
email_secret = os.getenv("EMAIL_HMAC_SECRET")
|
||||
if not email_secret:
|
||||
return jsonify({"error": "EMAIL_HMAC_SECRET not configured"}), 500
|
||||
|
||||
mailgun_domain = os.getenv("MAILGUN_DOMAIN", "")
|
||||
|
||||
if not user.email_hmac_token:
|
||||
user.email_hmac_token = generate_email_token(user.id, email_secret)
|
||||
user.email_enabled = True
|
||||
await user.save()
|
||||
|
||||
return jsonify({
|
||||
"id": str(user.id),
|
||||
"username": user.username,
|
||||
"email": user.email,
|
||||
"whatsapp_number": user.whatsapp_number,
|
||||
"auth_provider": user.auth_provider,
|
||||
"email_enabled": user.email_enabled,
|
||||
"email_address": get_user_email_address(user.email_hmac_token, mailgun_domain),
|
||||
})
|
||||
|
||||
|
||||
@user_blueprint.route("/admin/users/<user_id>/email", methods=["DELETE"])
|
||||
@admin_required
|
||||
async def disable_email(user_id):
|
||||
"""Disable email channel and clear the token."""
|
||||
user = await User.get_or_none(id=user_id)
|
||||
if not user:
|
||||
return jsonify({"error": "User not found"}), 404
|
||||
|
||||
user.email_enabled = False
|
||||
user.email_hmac_token = None
|
||||
await user.save()
|
||||
return jsonify({"ok": True})
|
||||
|
||||
@@ -10,6 +10,11 @@ class User(Model):
|
||||
username = fields.CharField(max_length=255)
|
||||
password = fields.BinaryField(null=True) # Hashed - nullable for OIDC users
|
||||
email = fields.CharField(max_length=100, unique=True)
|
||||
whatsapp_number = fields.CharField(max_length=30, unique=True, null=True, index=True)
|
||||
|
||||
# Email channel fields
|
||||
email_enabled = fields.BooleanField(default=False)
|
||||
email_hmac_token = fields.CharField(max_length=16, unique=True, null=True, index=True)
|
||||
|
||||
# OIDC fields
|
||||
oidc_subject = fields.CharField(
|
||||
|
||||
212
blueprints/whatsapp/__init__.py
Normal file
212
blueprints/whatsapp/__init__.py
Normal file
@@ -0,0 +1,212 @@
|
||||
import os
|
||||
import logging
|
||||
import asyncio
|
||||
import functools
|
||||
import time
|
||||
from collections import defaultdict
|
||||
from quart import Blueprint, request, jsonify, abort
|
||||
from twilio.request_validator import RequestValidator
|
||||
from twilio.twiml.messaging_response import MessagingResponse
|
||||
|
||||
from blueprints.users.models import User
|
||||
from blueprints.conversation.logic import (
|
||||
get_conversation_for_user,
|
||||
add_message_to_conversation,
|
||||
get_conversation_transcript,
|
||||
)
|
||||
from blueprints.conversation.agents import main_agent
|
||||
from blueprints.conversation.prompts import SIMBA_SYSTEM_PROMPT
|
||||
|
||||
whatsapp_blueprint = Blueprint("whatsapp_api", __name__, url_prefix="/api/whatsapp")
|
||||
|
||||
# Configure logging
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Rate limiting: per-number message timestamps
|
||||
# Format: {phone_number: [timestamp1, timestamp2, ...]}
|
||||
_rate_limit_store: dict[str, list[float]] = defaultdict(list)
|
||||
|
||||
# Configurable via env: max messages per window (default: 10 per 60s)
|
||||
RATE_LIMIT_MAX = int(os.getenv("WHATSAPP_RATE_LIMIT_MAX", "10"))
|
||||
RATE_LIMIT_WINDOW = int(os.getenv("WHATSAPP_RATE_LIMIT_WINDOW", "60"))
|
||||
|
||||
# Max message length to process (WhatsApp max is 4096, but we cap for LLM sanity)
|
||||
MAX_MESSAGE_LENGTH = 2000
|
||||
|
||||
|
||||
def _twiml_response(text: str) -> tuple[str, int]:
|
||||
"""Helper to return a TwiML MessagingResponse."""
|
||||
resp = MessagingResponse()
|
||||
resp.message(text)
|
||||
return str(resp), 200
|
||||
|
||||
|
||||
def _check_rate_limit(phone_number: str) -> bool:
|
||||
"""Check if a phone number has exceeded the rate limit.
|
||||
|
||||
Returns True if the request is allowed, False if rate-limited.
|
||||
Also cleans up expired entries.
|
||||
"""
|
||||
now = time.monotonic()
|
||||
cutoff = now - RATE_LIMIT_WINDOW
|
||||
|
||||
# Remove expired timestamps
|
||||
timestamps = _rate_limit_store[phone_number]
|
||||
_rate_limit_store[phone_number] = [t for t in timestamps if t > cutoff]
|
||||
|
||||
if len(_rate_limit_store[phone_number]) >= RATE_LIMIT_MAX:
|
||||
return False
|
||||
|
||||
_rate_limit_store[phone_number].append(now)
|
||||
return True
|
||||
|
||||
|
||||
def validate_twilio_request(f):
|
||||
"""Decorator to validate that the request comes from Twilio.
|
||||
|
||||
Validates the X-Twilio-Signature header using the TWILIO_AUTH_TOKEN.
|
||||
Set TWILIO_WEBHOOK_URL if behind a reverse proxy (e.g., ngrok, Caddy)
|
||||
so the validated URL matches what Twilio signed against.
|
||||
Set TWILIO_SIGNATURE_VALIDATION=false to disable in development.
|
||||
"""
|
||||
@functools.wraps(f)
|
||||
async def decorated_function(*args, **kwargs):
|
||||
if os.getenv("TWILIO_SIGNATURE_VALIDATION", "true").lower() == "false":
|
||||
return await f(*args, **kwargs)
|
||||
|
||||
auth_token = os.getenv("TWILIO_AUTH_TOKEN")
|
||||
if not auth_token:
|
||||
logger.error("TWILIO_AUTH_TOKEN not set — rejecting request")
|
||||
abort(403)
|
||||
|
||||
twilio_signature = request.headers.get("X-Twilio-Signature")
|
||||
if not twilio_signature:
|
||||
logger.warning("Missing X-Twilio-Signature header")
|
||||
abort(403)
|
||||
|
||||
# Use configured webhook URL if behind a proxy, otherwise use request URL
|
||||
url = os.getenv("TWILIO_WEBHOOK_URL") or request.url
|
||||
form_data = await request.form
|
||||
|
||||
validator = RequestValidator(auth_token)
|
||||
if not validator.validate(url, form_data, twilio_signature):
|
||||
logger.warning(f"Invalid Twilio signature for URL: {url}")
|
||||
abort(403)
|
||||
|
||||
return await f(*args, **kwargs)
|
||||
return decorated_function
|
||||
|
||||
|
||||
@whatsapp_blueprint.route("/webhook", methods=["POST"])
|
||||
@validate_twilio_request
|
||||
async def webhook():
|
||||
"""
|
||||
Handle incoming WhatsApp messages from Twilio.
|
||||
"""
|
||||
form_data = await request.form
|
||||
from_number = form_data.get("From") # e.g., "whatsapp:+1234567890"
|
||||
body = form_data.get("Body")
|
||||
|
||||
if not from_number or not body:
|
||||
return _twiml_response("Invalid message received.") if from_number else ("Missing From or Body", 400)
|
||||
|
||||
# Strip whitespace and check for empty body
|
||||
body = body.strip()
|
||||
if not body:
|
||||
return _twiml_response("I received an empty message. Please send some text!")
|
||||
|
||||
# Rate limiting
|
||||
if not _check_rate_limit(from_number):
|
||||
logger.warning(f"Rate limit exceeded for {from_number}")
|
||||
return _twiml_response("You're sending messages too quickly. Please wait a moment and try again.")
|
||||
|
||||
# Truncate overly long messages
|
||||
if len(body) > MAX_MESSAGE_LENGTH:
|
||||
body = body[:MAX_MESSAGE_LENGTH]
|
||||
logger.info(f"Truncated long message from {from_number} to {MAX_MESSAGE_LENGTH} chars")
|
||||
|
||||
logger.info(f"Received WhatsApp message from {from_number}: {body[:100]}")
|
||||
|
||||
# Identify or create user
|
||||
user = await User.filter(whatsapp_number=from_number).first()
|
||||
|
||||
if not user:
|
||||
# Check if number is in allowlist
|
||||
allowed_numbers = os.getenv("ALLOWED_WHATSAPP_NUMBERS", "").split(",")
|
||||
if from_number not in allowed_numbers and "*" not in allowed_numbers:
|
||||
return _twiml_response("Sorry, you are not authorized to use this service.")
|
||||
|
||||
# Create a new user for this WhatsApp number
|
||||
username = f"wa_{from_number.split(':')[-1]}"
|
||||
try:
|
||||
user = await User.create(
|
||||
username=username,
|
||||
email=f"{username}@whatsapp.simbarag.local",
|
||||
whatsapp_number=from_number,
|
||||
auth_provider="whatsapp"
|
||||
)
|
||||
logger.info(f"Created new user for WhatsApp: {username}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to create user for {from_number}: {e}")
|
||||
return _twiml_response("Sorry, something went wrong setting up your account. Please try again later.")
|
||||
|
||||
# Get or create a conversation for this user
|
||||
try:
|
||||
conversation = await get_conversation_for_user(user=user)
|
||||
await conversation.fetch_related("messages")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get conversation for user {user.username}: {e}")
|
||||
return _twiml_response("Sorry, something went wrong. Please try again later.")
|
||||
|
||||
# Add user message to conversation
|
||||
await add_message_to_conversation(
|
||||
conversation=conversation,
|
||||
message=body,
|
||||
speaker="user",
|
||||
user=user,
|
||||
)
|
||||
|
||||
# Get transcript for context
|
||||
transcript = await get_conversation_transcript(user=user, conversation=conversation)
|
||||
|
||||
# Build messages payload for LangChain agent with system prompt and conversation history
|
||||
try:
|
||||
# Get last 10 messages for conversation history
|
||||
messages = await conversation.messages.all()
|
||||
recent_messages = list(messages)[-10:]
|
||||
|
||||
# Build messages payload
|
||||
messages_payload = [{"role": "system", "content": SIMBA_SYSTEM_PROMPT}]
|
||||
|
||||
# Add recent conversation history (exclude the message we just added)
|
||||
for msg in recent_messages[:-1]:
|
||||
role = "user" if msg.speaker == "user" else "assistant"
|
||||
messages_payload.append({"role": role, "content": msg.text})
|
||||
|
||||
# Add current query
|
||||
messages_payload.append({"role": "user", "content": body})
|
||||
|
||||
# Invoke LangChain agent
|
||||
logger.info(f"Invoking LangChain agent with {len(messages_payload)} messages")
|
||||
response = await main_agent.ainvoke({"messages": messages_payload})
|
||||
response_text = response.get("messages", [])[-1].content
|
||||
|
||||
# Log YNAB availability
|
||||
if os.getenv("YNAB_ACCESS_TOKEN"):
|
||||
logger.info("YNAB integration is available for this conversation")
|
||||
else:
|
||||
logger.info("YNAB integration is not configured")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error invoking agent: {e}")
|
||||
response_text = "Sorry, I'm having trouble thinking right now. 😿"
|
||||
|
||||
# Add Simba's response to conversation
|
||||
await add_message_to_conversation(
|
||||
conversation=conversation,
|
||||
message=response_text,
|
||||
speaker="simba",
|
||||
user=user,
|
||||
)
|
||||
|
||||
return _twiml_response(response_text)
|
||||
Reference in New Issue
Block a user