Files
simbarag/blueprints/conversation/agents.py
2026-03-03 08:23:31 -05:00

619 lines
20 KiB
Python

import os
from typing import cast
from dotenv import load_dotenv
from langchain.agents import create_agent
from langchain.chat_models import BaseChatModel
from langchain.tools import tool
from langchain_openai import ChatOpenAI
from tavily import AsyncTavilyClient
from blueprints.rag.logic import query_vector_store
from utils.obsidian_service import ObsidianService
from utils.ynab_service import YNABService
# Load environment variables
load_dotenv()
# Configure LLM with llama-server or OpenAI fallback
llama_url = os.getenv("LLAMA_SERVER_URL")
if llama_url:
llama_chat = ChatOpenAI(
base_url=llama_url,
api_key="not-needed",
model=os.getenv("LLAMA_MODEL_NAME", "llama-3.1-8b-instruct"),
)
else:
llama_chat = None
openai_fallback = ChatOpenAI(model="gpt-5-mini")
model_with_fallback = cast(
BaseChatModel,
llama_chat.with_fallbacks([openai_fallback]) if llama_chat else openai_fallback,
)
client = AsyncTavilyClient(api_key=os.getenv("TAVILY_API_KEY", ""))
# Initialize YNAB service (will only work if YNAB_ACCESS_TOKEN is set)
try:
ynab_service = YNABService()
ynab_enabled = True
except (ValueError, Exception) as e:
print(f"YNAB service not initialized: {e}")
ynab_enabled = False
# Initialize Obsidian service (will only work if OBSIDIAN_VAULT_PATH is set)
try:
obsidian_service = ObsidianService()
obsidian_enabled = True
except (ValueError, Exception) as e:
print(f"Obsidian service not initialized: {e}")
obsidian_enabled = False
@tool
def get_current_date() -> str:
"""Get today's date in a human-readable format.
Use this tool when you need to:
- Reference today's date in your response
- Answer questions like "what is today's date"
- Format dates in messages or documents
- Calculate time periods relative to today
Returns:
Today's date in YYYY-MM-DD format
"""
from datetime import date
return date.today().isoformat()
@tool
async def web_search(query: str) -> str:
"""Search the web for current information using Tavily.
Use this tool when you need to:
- Find current information not in the knowledge base
- Look up recent events, news, or updates
- Verify facts or get additional context
- Search for information outside of Simba's documents
Args:
query: The search query to look up on the web
Returns:
Search results from the web with titles, content, and source URLs
"""
response = await client.search(query=query, search_depth="basic")
results = response.get("results", [])
if not results:
return "No results found for the query."
formatted = "\n\n".join(
[
f"**{result['title']}**\n{result['content']}\nSource: {result['url']}"
for result in results[:5]
]
)
return formatted
@tool(response_format="content_and_artifact")
async def simba_search(query: str):
"""Search through Simba's medical records, veterinary documents, and personal information.
Use this tool whenever the user asks questions about:
- Simba's health history, medical records, or veterinary visits
- Medications, treatments, or diagnoses
- Weight, diet, or physical characteristics over time
- Veterinary recommendations or advice
- Ryan's (the owner's) information related to Simba
- Any factual information that would be found in documents
Args:
query: The user's question or information need about Simba
Returns:
Relevant information from Simba's documents
"""
print(f"[SIMBA SEARCH] Tool called with query: {query}")
serialized, docs = await query_vector_store(query=query)
print(f"[SIMBA SEARCH] Found {len(docs)} documents")
print(f"[SIMBA SEARCH] Serialized result length: {len(serialized)}")
print(f"[SIMBA SEARCH] First 200 chars: {serialized[:200]}")
return serialized, docs
@tool
def ynab_budget_summary() -> str:
"""Get overall budget summary and health status from YNAB.
Use this tool when the user asks about:
- Overall budget health or status
- How much money is to be budgeted
- Total budget amounts or spending
- General budget overview questions
Returns:
Summary of budget health, to-be-budgeted amount, total budgeted,
total activity, and available amounts.
"""
if not ynab_enabled:
return "YNAB integration is not configured. Please set YNAB_ACCESS_TOKEN environment variable."
try:
summary = ynab_service.get_budget_summary()
return summary["summary"]
except Exception as e:
return f"Error fetching budget summary: {str(e)}"
@tool
def ynab_search_transactions(
start_date: str = "",
end_date: str = "",
category_name: str = "",
payee_name: str = "",
) -> str:
"""Search YNAB transactions by date range, category, or payee.
Use this tool when the user asks about:
- Specific transactions or purchases
- Spending at a particular store or payee
- Transactions in a specific category
- What was spent during a time period
Args:
start_date: Start date in YYYY-MM-DD format (optional, defaults to 30 days ago)
end_date: End date in YYYY-MM-DD format (optional, defaults to today)
category_name: Filter by category name (optional, partial match)
payee_name: Filter by payee/store name (optional, partial match)
Returns:
List of matching transactions with dates, amounts, categories, and payees.
"""
if not ynab_enabled:
return "YNAB integration is not configured. Please set YNAB_ACCESS_TOKEN environment variable."
try:
result = ynab_service.get_transactions(
start_date=start_date or None,
end_date=end_date or None,
category_name=category_name or None,
payee_name=payee_name or None,
)
if result["count"] == 0:
return "No transactions found matching the specified criteria."
# Format transactions for readability
txn_list = []
for txn in result["transactions"][:10]: # Limit to 10 for readability
txn_list.append(
f"- {txn['date']}: {txn['payee']} - ${abs(txn['amount']):.2f} ({txn['category'] or 'Uncategorized'})"
)
return (
f"Found {result['count']} transactions from {result['start_date']} to {result['end_date']}. "
f"Total: ${abs(result['total_amount']):.2f}\n\n"
+ "\n".join(txn_list)
+ (
f"\n\n(Showing first 10 of {result['count']} transactions)"
if result["count"] > 10
else ""
)
)
except Exception as e:
return f"Error searching transactions: {str(e)}"
@tool
def ynab_category_spending(month: str = "") -> str:
"""Get spending breakdown by category for a specific month.
Use this tool when the user asks about:
- Spending by category
- What categories were overspent
- Monthly spending breakdown
- Budget vs actual spending for a month
Args:
month: Month in YYYY-MM format (optional, defaults to current month)
Returns:
Spending breakdown by category with budgeted, spent, and available amounts.
"""
if not ynab_enabled:
return "YNAB integration is not configured. Please set YNAB_ACCESS_TOKEN environment variable."
try:
result = ynab_service.get_category_spending(month=month or None)
summary = (
f"Budget spending for {result['month']}:\n"
f"Total budgeted: ${result['total_budgeted']:.2f}\n"
f"Total spent: ${result['total_spent']:.2f}\n"
f"Total available: ${result['total_available']:.2f}\n"
)
if result["overspent_categories"]:
summary += (
f"\nOverspent categories ({len(result['overspent_categories'])}):\n"
)
for cat in result["overspent_categories"][:5]:
summary += f"- {cat['name']}: Budgeted ${cat['budgeted']:.2f}, Spent ${cat['spent']:.2f}, Over by ${cat['overspent_by']:.2f}\n"
# Add top spending categories
summary += "\nTop spending categories:\n"
for cat in result["categories"][:10]:
if cat["activity"] < 0: # Only show spending (negative activity)
summary += f"- {cat['category']}: ${abs(cat['activity']):.2f} (budgeted: ${cat['budgeted']:.2f}, available: ${cat['available']:.2f})\n"
return summary
except Exception as e:
return f"Error fetching category spending: {str(e)}"
@tool
def ynab_insights(months_back: int = 3) -> str:
"""Generate insights about spending patterns and budget health over time.
Use this tool when the user asks about:
- Spending trends or patterns
- Budget recommendations
- Which categories are frequently overspent
- How current spending compares to past months
- Overall budget health analysis
Args:
months_back: Number of months to analyze (default 3, max 6)
Returns:
Insights about spending trends, frequently overspent categories,
and personalized recommendations.
"""
if not ynab_enabled:
return "YNAB integration is not configured. Please set YNAB_ACCESS_TOKEN environment variable."
try:
# Limit to reasonable range
months_back = min(max(1, months_back), 6)
result = ynab_service.get_spending_insights(months_back=months_back)
if "error" in result:
return result["error"]
summary = (
f"Spending insights for the last {months_back} months:\n\n"
f"Average monthly spending: ${result['average_monthly_spending']:.2f}\n"
f"Current month spending: ${result['current_month_spending']:.2f}\n"
f"Spending trend: {result['spending_trend']}\n"
)
if result["frequently_overspent_categories"]:
summary += "\nFrequently overspent categories:\n"
for cat in result["frequently_overspent_categories"][:5]:
summary += f"- {cat['category']}: overspent in {cat['months_overspent']} of {months_back} months\n"
if result["recommendations"]:
summary += "\nRecommendations:\n"
for rec in result["recommendations"]:
summary += f"- {rec}\n"
return summary
except Exception as e:
return f"Error generating insights: {str(e)}"
@tool
async def obsidian_search_notes(query: str) -> str:
"""Search through Obsidian vault notes for information.
Use this tool when you need to:
- Find information in personal notes
- Research past ideas or thoughts from your vault
- Look up information stored in markdown files
- Search for content that would be in your notes
Args:
query: The search query to look up in your Obsidian vault
Returns:
Relevant notes with their content and metadata
"""
if not obsidian_enabled:
return "Obsidian integration is not configured. Please set OBSIDIAN_VAULT_PATH environment variable."
try:
# Query ChromaDB for obsidian documents
serialized, docs = await query_vector_store(query=query)
return serialized
except Exception as e:
return f"Error searching Obsidian notes: {str(e)}"
@tool
async def obsidian_read_note(relative_path: str) -> str:
"""Read a specific note from your Obsidian vault.
Use this tool when you want to:
- Read the full content of a specific note
- Get detailed information from a particular markdown file
- Access content from a known note path
Args:
relative_path: Path to note relative to vault root (e.g., "notes/my-note.md")
Returns:
Full content and metadata of the requested note
"""
if not obsidian_enabled:
return "Obsidian integration is not configured. Please set OBSIDIAN_VAULT_PATH environment variable."
try:
note = obsidian_service.read_note(relative_path)
content_data = note["content"]
result = f"File: {note['path']}\n\n"
result += f"Frontmatter:\n{content_data['metadata']}\n\n"
result += f"Content:\n{content_data['content']}\n\n"
result += f"Tags: {', '.join(content_data['tags'])}\n"
result += f"Contains {len(content_data['wikilinks'])} wikilinks and {len(content_data['embeds'])} embeds"
return result
except FileNotFoundError:
return f"Note not found at '{relative_path}'. Please check the path is correct."
except Exception as e:
return f"Error reading note: {str(e)}"
@tool
async def obsidian_create_note(
title: str,
content: str,
folder: str = "notes",
tags: str = "",
) -> str:
"""Create a new note in your Obsidian vault.
Use this tool when you want to:
- Save research findings or ideas to your vault
- Create a new document with a specific title
- Write notes for future reference
Args:
title: The title of the note (will be used as filename)
content: The body content of the note
folder: The folder where to create the note (default: "notes")
tags: Comma-separated list of tags to add (default: "")
Returns:
Path to the created note
"""
if not obsidian_enabled:
return "Obsidian integration is not configured. Please set OBSIDIAN_VAULT_PATH environment variable."
try:
# Parse tags from comma-separated string
tag_list = [tag.strip() for tag in tags.split(",") if tag.strip()]
relative_path = obsidian_service.create_note(
title=title,
content=content,
folder=folder,
tags=tag_list,
)
return f"Successfully created note: {relative_path}"
except Exception as e:
return f"Error creating note: {str(e)}"
@tool
def journal_get_today() -> str:
"""Get today's daily journal note, including all tasks and log entries.
Use this tool when the user asks about:
- What's on their plate today
- Today's tasks or to-do list
- Today's journal entry
- What they've logged today
Returns:
The full content of today's daily note, or a message if it doesn't exist.
"""
if not obsidian_enabled:
return "Obsidian integration is not configured."
try:
note = obsidian_service.get_daily_note()
if not note["found"]:
return f"No daily note found for {note['date']}. Use journal_add_task to create one."
return f"Daily note for {note['date']}:\n\n{note['content']}"
except Exception as e:
return f"Error reading daily note: {str(e)}"
@tool
def journal_get_tasks(date: str = "") -> str:
"""Get tasks from a daily journal note.
Use this tool when the user asks about:
- Open or pending tasks for a day
- What tasks are done or not done
- Task status for today or a specific date
Args:
date: Date in YYYY-MM-DD format (optional, defaults to today)
Returns:
List of tasks with their completion status.
"""
if not obsidian_enabled:
return "Obsidian integration is not configured."
try:
from datetime import datetime as dt
parsed_date = dt.strptime(date, "%Y-%m-%d") if date else None
result = obsidian_service.get_daily_tasks(parsed_date)
if not result["found"]:
return f"No daily note found for {result['date']}."
if not result["tasks"]:
return f"No tasks found in the {result['date']} note."
lines = [f"Tasks for {result['date']}:"]
for task in result["tasks"]:
status = "[x]" if task["done"] else "[ ]"
lines.append(f"- {status} {task['text']}")
return "\n".join(lines)
except Exception as e:
return f"Error reading tasks: {str(e)}"
@tool
def journal_add_task(task: str, date: str = "") -> str:
"""Add a task to a daily journal note.
Use this tool when the user wants to:
- Add a task or to-do to today's note
- Remind themselves to do something
- Track a new item in their daily note
Args:
task: The task description to add
date: Date in YYYY-MM-DD format (optional, defaults to today)
Returns:
Confirmation of the added task.
"""
if not obsidian_enabled:
return "Obsidian integration is not configured."
try:
from datetime import datetime as dt
parsed_date = dt.strptime(date, "%Y-%m-%d") if date else None
result = obsidian_service.add_task_to_daily_note(task, parsed_date)
if result["success"]:
note_date = date or dt.now().strftime("%Y-%m-%d")
extra = " (created new note)" if result["created_note"] else ""
return f"Added task '{task}' to {note_date}{extra}."
return "Failed to add task."
except Exception as e:
return f"Error adding task: {str(e)}"
@tool
def journal_complete_task(task: str, date: str = "") -> str:
"""Mark a task as complete in a daily journal note.
Use this tool when the user wants to:
- Check off a task as done
- Mark something as completed
- Update task status in their daily note
Args:
task: The task text to mark complete (exact or partial match)
date: Date in YYYY-MM-DD format (optional, defaults to today)
Returns:
Confirmation that the task was marked complete.
"""
if not obsidian_enabled:
return "Obsidian integration is not configured."
try:
from datetime import datetime as dt
parsed_date = dt.strptime(date, "%Y-%m-%d") if date else None
result = obsidian_service.complete_task_in_daily_note(task, parsed_date)
if result["success"]:
return f"Marked '{result['completed_task']}' as complete."
return f"Could not complete task: {result.get('error', 'unknown error')}"
except Exception as e:
return f"Error completing task: {str(e)}"
@tool
async def obsidian_create_task(
title: str,
content: str = "",
folder: str = "tasks",
due_date: str = "",
tags: str = "",
) -> str:
"""Create a new task note in your Obsidian vault.
Use this tool when you want to:
- Create a task to remember to do something
- Add a task with a due date
- Track tasks in your vault
Args:
title: The title of the task
content: The description of the task (optional)
folder: The folder to place the task (default: "tasks")
due_date: Due date in YYYY-MM-DD format (optional)
tags: Comma-separated list of tags to add (optional)
Returns:
Path to the created task note
"""
if not obsidian_enabled:
return "Obsidian integration is not configured. Please set OBSIDIAN_VAULT_PATH environment variable."
try:
# Parse tags from comma-separated string
tag_list = [tag.strip() for tag in tags.split(",") if tag.strip()]
relative_path = obsidian_service.create_task(
title=title,
content=content,
folder=folder,
due_date=due_date or None,
tags=tag_list,
)
return f"Successfully created task: {relative_path}"
except Exception as e:
return f"Error creating task: {str(e)}"
# Create tools list based on what's available
tools = [get_current_date, simba_search, web_search]
if ynab_enabled:
tools.extend(
[
ynab_budget_summary,
ynab_search_transactions,
ynab_category_spending,
ynab_insights,
]
)
if obsidian_enabled:
tools.extend(
[
obsidian_search_notes,
obsidian_read_note,
obsidian_create_note,
obsidian_create_task,
journal_get_today,
journal_get_tasks,
journal_add_task,
journal_complete_task,
]
)
# Llama 3.1 supports native function calling via OpenAI-compatible API
main_agent = create_agent(model=model_with_fallback, tools=tools)