This commit is contained in:
ryan
2026-03-03 08:22:19 -05:00
parent 0e3684031b
commit 86cc269b3a
24 changed files with 1899 additions and 238 deletions

View File

@@ -1,6 +1,7 @@
import datetime
import json
from quart import Blueprint, jsonify, request
from quart import Blueprint, jsonify, make_response, request
from quart_jwt_extended import (
get_jwt_identity,
jwt_refresh_token_required,
@@ -24,35 +25,7 @@ conversation_blueprint = Blueprint(
"conversation_api", __name__, url_prefix="/api/conversation"
)
@conversation_blueprint.post("/query")
@jwt_refresh_token_required
async def query():
current_user_uuid = get_jwt_identity()
user = await blueprints.users.models.User.get(id=current_user_uuid)
data = await request.get_json()
query = data.get("query")
conversation_id = data.get("conversation_id")
conversation = await get_conversation_by_id(conversation_id)
await conversation.fetch_related("messages")
await add_message_to_conversation(
conversation=conversation,
message=query,
speaker="user",
user=user,
)
# Build conversation history from recent messages (last 10 for context)
recent_messages = (
conversation.messages[-10:]
if len(conversation.messages) > 10
else conversation.messages
)
messages_payload = [
{
"role": "system",
"content": """You are a helpful cat assistant named Simba that understands veterinary terms. When there are questions to you specifically, they are referring to Simba the cat. Answer the user in as if you were a cat named Simba. Don't act too catlike. Be assertive.
_SYSTEM_PROMPT = """You are a helpful cat assistant named Simba that understands veterinary terms. When there are questions to you specifically, they are referring to Simba the cat. Answer the user in as if you were a cat named Simba. Don't act too catlike. Be assertive.
SIMBA FACTS (as of January 2026):
- Name: Simba
@@ -92,18 +65,57 @@ You have access to Ryan's budget data through YNAB (You Need A Budget). When use
- Use ynab_search_transactions to find specific purchases or spending at particular stores
- Use ynab_category_spending to analyze spending by category for a month
- Use ynab_insights to provide spending trends, patterns, and recommendations
Always use these tools when asked about budgets, spending, transactions, or financial health.""",
}
]
Always use these tools when asked about budgets, spending, transactions, or financial health.
# Add recent conversation history
NOTES & RESEARCH (Obsidian Integration):
You have access to Ryan's Obsidian vault through the Obsidian integration. When users ask about research, personal notes, or information that might be stored in markdown files, use the appropriate Obsidian tools:
- Use obsidian_search_notes to search through your vault for relevant information
- Use obsidian_read_note to read the full content of a specific note by path
- Use obsidian_create_note to save new findings, ideas, or research to your vault
- Use obsidian_create_task to create task notes with due dates
Always use these tools when users ask about notes, research, ideas, tasks, or when you want to save information for future reference.
DAILY JOURNAL (Task Tracking):
You have access to Ryan's daily journal notes. Each note lives at journal/YYYY/YYYY-MM-DD.md and has two sections: tasks and log.
- Use journal_get_today to read today's full daily note (tasks + log)
- Use journal_get_tasks to list tasks (done/pending) for today or a specific date
- Use journal_add_task to add a new task to today's (or a given date's) note
- Use journal_complete_task to check off a task as done
Use these tools when Ryan asks about today's tasks, wants to add something to his list, or wants to mark a task complete."""
def _build_messages_payload(conversation, query_text: str) -> list:
recent_messages = (
conversation.messages[-10:]
if len(conversation.messages) > 10
else conversation.messages
)
messages_payload = [{"role": "system", "content": _SYSTEM_PROMPT}]
for msg in recent_messages[:-1]: # Exclude the message we just added
role = "user" if msg.speaker == "user" else "assistant"
messages_payload.append({"role": role, "content": msg.text})
messages_payload.append({"role": "user", "content": query_text})
return messages_payload
# Add current query
messages_payload.append({"role": "user", "content": query})
@conversation_blueprint.post("/query")
@jwt_refresh_token_required
async def query():
current_user_uuid = get_jwt_identity()
user = await blueprints.users.models.User.get(id=current_user_uuid)
data = await request.get_json()
query = data.get("query")
conversation_id = data.get("conversation_id")
conversation = await get_conversation_by_id(conversation_id)
await conversation.fetch_related("messages")
await add_message_to_conversation(
conversation=conversation,
message=query,
speaker="user",
user=user,
)
messages_payload = _build_messages_payload(conversation, query)
payload = {"messages": messages_payload}
response = await main_agent.ainvoke(payload)
@@ -117,6 +129,75 @@ Always use these tools when asked about budgets, spending, transactions, or fina
return jsonify({"response": message})
@conversation_blueprint.post("/stream-query")
@jwt_refresh_token_required
async def stream_query():
current_user_uuid = get_jwt_identity()
user = await blueprints.users.models.User.get(id=current_user_uuid)
data = await request.get_json()
query_text = data.get("query")
conversation_id = data.get("conversation_id")
conversation = await get_conversation_by_id(conversation_id)
await conversation.fetch_related("messages")
await add_message_to_conversation(
conversation=conversation,
message=query_text,
speaker="user",
user=user,
)
messages_payload = _build_messages_payload(conversation, query_text)
payload = {"messages": messages_payload}
async def event_generator():
final_message = None
try:
async for event in main_agent.astream_events(payload, version="v2"):
event_type = event.get("event")
if event_type == "on_tool_start":
yield f"data: {json.dumps({'type': 'tool_start', 'tool': event['name']})}\n\n"
elif event_type == "on_tool_end":
yield f"data: {json.dumps({'type': 'tool_end', 'tool': event['name']})}\n\n"
elif event_type == "on_chain_end":
output = event.get("data", {}).get("output")
if isinstance(output, dict):
msgs = output.get("messages", [])
if msgs:
last_msg = msgs[-1]
content = getattr(last_msg, "content", None)
if isinstance(content, str) and content:
final_message = content
except Exception as e:
yield f"data: {json.dumps({'type': 'error', 'message': str(e)})}\n\n"
if final_message:
await add_message_to_conversation(
conversation=conversation,
message=final_message,
speaker="simba",
user=user,
)
yield f"data: {json.dumps({'type': 'response', 'message': final_message})}\n\n"
else:
yield f"data: {json.dumps({'type': 'error', 'message': 'No response generated'})}\n\n"
yield "data: [DONE]\n\n"
return await make_response(
event_generator(),
200,
{
"Content-Type": "text/event-stream",
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no",
},
)
@conversation_blueprint.route("/<conversation_id>")
@jwt_refresh_token_required
async def get_conversation(conversation_id: str):

View File

@@ -9,6 +9,7 @@ from langchain_openai import ChatOpenAI
from tavily import AsyncTavilyClient
from blueprints.rag.logic import query_vector_store
from utils.obsidian_service import ObsidianService
from utils.ynab_service import YNABService
# Load environment variables
@@ -40,6 +41,32 @@ except (ValueError, Exception) as e:
print(f"YNAB service not initialized: {e}")
ynab_enabled = False
# Initialize Obsidian service (will only work if OBSIDIAN_VAULT_PATH is set)
try:
obsidian_service = ObsidianService()
obsidian_enabled = True
except (ValueError, Exception) as e:
print(f"Obsidian service not initialized: {e}")
obsidian_enabled = False
@tool
def get_current_date() -> str:
"""Get today's date in a human-readable format.
Use this tool when you need to:
- Reference today's date in your response
- Answer questions like "what is today's date"
- Format dates in messages or documents
- Calculate time periods relative to today
Returns:
Today's date in YYYY-MM-DD format
"""
from datetime import date
return date.today().isoformat()
@tool
async def web_search(query: str) -> str:
@@ -279,8 +306,291 @@ def ynab_insights(months_back: int = 3) -> str:
return f"Error generating insights: {str(e)}"
@tool
async def obsidian_search_notes(query: str) -> str:
"""Search through Obsidian vault notes for information.
Use this tool when you need to:
- Find information in personal notes
- Research past ideas or thoughts from your vault
- Look up information stored in markdown files
- Search for content that would be in your notes
Args:
query: The search query to look up in your Obsidian vault
Returns:
Relevant notes with their content and metadata
"""
if not obsidian_enabled:
return "Obsidian integration is not configured. Please set OBSIDIAN_VAULT_PATH environment variable."
try:
# Query ChromaDB for obsidian documents
serialized, docs = await query_vector_store(query=query)
return serialized
except Exception as e:
return f"Error searching Obsidian notes: {str(e)}"
@tool
async def obsidian_read_note(relative_path: str) -> str:
"""Read a specific note from your Obsidian vault.
Use this tool when you want to:
- Read the full content of a specific note
- Get detailed information from a particular markdown file
- Access content from a known note path
Args:
relative_path: Path to note relative to vault root (e.g., "notes/my-note.md")
Returns:
Full content and metadata of the requested note
"""
if not obsidian_enabled:
return "Obsidian integration is not configured. Please set OBSIDIAN_VAULT_PATH environment variable."
try:
note = obsidian_service.read_note(relative_path)
content_data = note["content"]
result = f"File: {note['path']}\n\n"
result += f"Frontmatter:\n{content_data['metadata']}\n\n"
result += f"Content:\n{content_data['content']}\n\n"
result += f"Tags: {', '.join(content_data['tags'])}\n"
result += f"Contains {len(content_data['wikilinks'])} wikilinks and {len(content_data['embeds'])} embeds"
return result
except FileNotFoundError:
return f"Note not found at '{relative_path}'. Please check the path is correct."
except Exception as e:
return f"Error reading note: {str(e)}"
@tool
async def obsidian_create_note(
title: str,
content: str,
folder: str = "notes",
tags: str = "",
) -> str:
"""Create a new note in your Obsidian vault.
Use this tool when you want to:
- Save research findings or ideas to your vault
- Create a new document with a specific title
- Write notes for future reference
Args:
title: The title of the note (will be used as filename)
content: The body content of the note
folder: The folder where to create the note (default: "notes")
tags: Comma-separated list of tags to add (default: "")
Returns:
Path to the created note
"""
if not obsidian_enabled:
return "Obsidian integration is not configured. Please set OBSIDIAN_VAULT_PATH environment variable."
try:
# Parse tags from comma-separated string
tag_list = [tag.strip() for tag in tags.split(",") if tag.strip()]
relative_path = obsidian_service.create_note(
title=title,
content=content,
folder=folder,
tags=tag_list,
)
return f"Successfully created note: {relative_path}"
except Exception as e:
return f"Error creating note: {str(e)}"
@tool
def journal_get_today() -> str:
"""Get today's daily journal note, including all tasks and log entries.
Use this tool when the user asks about:
- What's on their plate today
- Today's tasks or to-do list
- Today's journal entry
- What they've logged today
Returns:
The full content of today's daily note, or a message if it doesn't exist.
"""
if not obsidian_enabled:
return "Obsidian integration is not configured."
try:
note = obsidian_service.get_daily_note()
if not note["found"]:
return f"No daily note found for {note['date']}. Use journal_add_task to create one."
return f"Daily note for {note['date']}:\n\n{note['content']}"
except Exception as e:
return f"Error reading daily note: {str(e)}"
@tool
def journal_get_tasks(date: str = "") -> str:
"""Get tasks from a daily journal note.
Use this tool when the user asks about:
- Open or pending tasks for a day
- What tasks are done or not done
- Task status for today or a specific date
Args:
date: Date in YYYY-MM-DD format (optional, defaults to today)
Returns:
List of tasks with their completion status.
"""
if not obsidian_enabled:
return "Obsidian integration is not configured."
try:
from datetime import datetime as dt
parsed_date = dt.strptime(date, "%Y-%m-%d") if date else None
result = obsidian_service.get_daily_tasks(parsed_date)
if not result["found"]:
return f"No daily note found for {result['date']}."
if not result["tasks"]:
return f"No tasks found in the {result['date']} note."
lines = [f"Tasks for {result['date']}:"]
for task in result["tasks"]:
status = "[x]" if task["done"] else "[ ]"
lines.append(f"- {status} {task['text']}")
return "\n".join(lines)
except Exception as e:
return f"Error reading tasks: {str(e)}"
@tool
def journal_add_task(task: str, date: str = "") -> str:
"""Add a task to a daily journal note.
Use this tool when the user wants to:
- Add a task or to-do to today's note
- Remind themselves to do something
- Track a new item in their daily note
Args:
task: The task description to add
date: Date in YYYY-MM-DD format (optional, defaults to today)
Returns:
Confirmation of the added task.
"""
if not obsidian_enabled:
return "Obsidian integration is not configured."
try:
from datetime import datetime as dt
parsed_date = dt.strptime(date, "%Y-%m-%d") if date else None
result = obsidian_service.add_task_to_daily_note(task, parsed_date)
if result["success"]:
note_date = date or dt.now().strftime("%Y-%m-%d")
extra = " (created new note)" if result["created_note"] else ""
return f"Added task '{task}' to {note_date}{extra}."
return "Failed to add task."
except Exception as e:
return f"Error adding task: {str(e)}"
@tool
def journal_complete_task(task: str, date: str = "") -> str:
"""Mark a task as complete in a daily journal note.
Use this tool when the user wants to:
- Check off a task as done
- Mark something as completed
- Update task status in their daily note
Args:
task: The task text to mark complete (exact or partial match)
date: Date in YYYY-MM-DD format (optional, defaults to today)
Returns:
Confirmation that the task was marked complete.
"""
if not obsidian_enabled:
return "Obsidian integration is not configured."
try:
from datetime import datetime as dt
parsed_date = dt.strptime(date, "%Y-%m-%d") if date else None
result = obsidian_service.complete_task_in_daily_note(task, parsed_date)
if result["success"]:
return f"Marked '{result['completed_task']}' as complete."
return f"Could not complete task: {result.get('error', 'unknown error')}"
except Exception as e:
return f"Error completing task: {str(e)}"
@tool
async def obsidian_create_task(
title: str,
content: str = "",
folder: str = "tasks",
due_date: str = "",
tags: str = "",
) -> str:
"""Create a new task note in your Obsidian vault.
Use this tool when you want to:
- Create a task to remember to do something
- Add a task with a due date
- Track tasks in your vault
Args:
title: The title of the task
content: The description of the task (optional)
folder: The folder to place the task (default: "tasks")
due_date: Due date in YYYY-MM-DD format (optional)
tags: Comma-separated list of tags to add (optional)
Returns:
Path to the created task note
"""
if not obsidian_enabled:
return "Obsidian integration is not configured. Please set OBSIDIAN_VAULT_PATH environment variable."
try:
# Parse tags from comma-separated string
tag_list = [tag.strip() for tag in tags.split(",") if tag.strip()]
relative_path = obsidian_service.create_task(
title=title,
content=content,
folder=folder,
due_date=due_date or None,
tags=tag_list,
)
return f"Successfully created task: {relative_path}"
except Exception as e:
return f"Error creating task: {str(e)}"
# Create tools list based on what's available
tools = [simba_search, web_search]
tools = [get_current_date, simba_search, web_search]
if ynab_enabled:
tools.extend(
[
@@ -290,6 +600,19 @@ if ynab_enabled:
ynab_insights,
]
)
if obsidian_enabled:
tools.extend(
[
obsidian_search_notes,
obsidian_read_note,
obsidian_create_note,
obsidian_create_task,
journal_get_today,
journal_get_tasks,
journal_add_task,
journal_complete_task,
]
)
# Llama 3.1 supports native function calling via OpenAI-compatible API
main_agent = create_agent(model=model_with_fallback, tools=tools)