3 Commits

Author SHA1 Message Date
Ryan Chen
6ae36b51a0 ynab update 2026-01-31 22:47:43 -05:00
ryan
f0f72cce36 Merge pull request 'Replace Ollama with llama-server (OpenAI-compatible API)' (#14) from feature/llama-cpp-integration into main
Reviewed-on: #14
2026-01-31 21:41:19 -05:00
Ryan Chen
32020a6c60 Replace Ollama with llama-server (OpenAI-compatible API)
- Update llm.py to use OpenAI client with custom base_url for llama-server
- Update agents.py to use ChatOpenAI instead of ChatOllama
- Remove unused ollama imports from main.py, chunker.py, query.py
- Add LLAMA_SERVER_URL and LLAMA_MODEL_NAME env vars
- Remove ollama and langchain-ollama dependencies

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-31 21:39:23 -05:00
19 changed files with 680 additions and 79 deletions

View File

@@ -14,9 +14,10 @@ JWT_SECRET_KEY=your-secret-key-here
PAPERLESS_TOKEN=your-paperless-token PAPERLESS_TOKEN=your-paperless-token
BASE_URL=192.168.1.5:8000 BASE_URL=192.168.1.5:8000
# Ollama Configuration # llama-server Configuration (OpenAI-compatible API)
OLLAMA_URL=http://192.168.1.14:11434 # If set, uses llama-server as the primary LLM backend with OpenAI as fallback
OLLAMA_HOST=http://192.168.1.14:11434 LLAMA_SERVER_URL=http://192.168.1.213:8080/v1
LLAMA_MODEL_NAME=llama-3.1-8b-instruct
# ChromaDB Configuration # ChromaDB Configuration
# For Docker: This is automatically set to /app/data/chromadb # For Docker: This is automatically set to /app/data/chromadb
@@ -26,6 +27,9 @@ CHROMADB_PATH=./data/chromadb
# OpenAI Configuration # OpenAI Configuration
OPENAI_API_KEY=your-openai-api-key OPENAI_API_KEY=your-openai-api-key
# Tavily Configuration (for web search)
TAVILY_API_KEY=your-tavily-api-key
# Immich Configuration # Immich Configuration
IMMICH_URL=http://192.168.1.5:2283 IMMICH_URL=http://192.168.1.5:2283
IMMICH_API_KEY=your-immich-api-key IMMICH_API_KEY=your-immich-api-key
@@ -44,3 +48,9 @@ OIDC_USE_DISCOVERY=true
# OIDC_TOKEN_ENDPOINT=https://auth.example.com/api/oidc/token # OIDC_TOKEN_ENDPOINT=https://auth.example.com/api/oidc/token
# OIDC_USERINFO_ENDPOINT=https://auth.example.com/api/oidc/userinfo # OIDC_USERINFO_ENDPOINT=https://auth.example.com/api/oidc/userinfo
# OIDC_JWKS_URI=https://auth.example.com/api/oidc/jwks # OIDC_JWKS_URI=https://auth.example.com/api/oidc/jwks
# YNAB Configuration
# Get your Personal Access Token from https://app.ynab.com/settings/developer
YNAB_ACCESS_TOKEN=your-ynab-personal-access-token
# Optional: Specify a budget ID, or leave empty to use the default/first budget
YNAB_BUDGET_ID=

1
.envrc Normal file
View File

@@ -0,0 +1 @@
dotenv_if_exists

View File

@@ -1,4 +1,8 @@
import os import os
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
# Database configuration with environment variable support # Database configuration with environment variable support
# Use DATABASE_PATH for relative paths or DATABASE_URL for full connection strings # Use DATABASE_PATH for relative paths or DATABASE_URL for full connection strings

4
app.py
View File

@@ -1,5 +1,6 @@
import os import os
from dotenv import load_dotenv
from quart import Quart, jsonify, render_template, request, send_from_directory from quart import Quart, jsonify, render_template, request, send_from_directory
from quart_jwt_extended import JWTManager, get_jwt_identity, jwt_refresh_token_required from quart_jwt_extended import JWTManager, get_jwt_identity, jwt_refresh_token_required
from tortoise.contrib.quart import register_tortoise from tortoise.contrib.quart import register_tortoise
@@ -11,6 +12,9 @@ import blueprints.users
import blueprints.users.models import blueprints.users.models
from main import consult_simba_oracle from main import consult_simba_oracle
# Load environment variables
load_dotenv()
app = Quart( app = Quart(
__name__, __name__,
static_folder="raggr-frontend/dist/static", static_folder="raggr-frontend/dist/static",

View File

@@ -84,7 +84,15 @@ Upcoming Appointments:
- Routine Examination: Due 6/1/2026 - Routine Examination: Due 6/1/2026
- FVRCP-3yr Vaccine: Due 10/2/2026 - FVRCP-3yr Vaccine: Due 10/2/2026
IMPORTANT: When users ask factual questions about Simba's health, medical history, veterinary visits, medications, weight, or any information that would be in documents, you MUST use the simba_search tool to retrieve accurate information before answering. Do not rely on general knowledge - always search the documents for factual questions.""", IMPORTANT: When users ask factual questions about Simba's health, medical history, veterinary visits, medications, weight, or any information that would be in documents, you MUST use the simba_search tool to retrieve accurate information before answering. Do not rely on general knowledge - always search the documents for factual questions.
BUDGET & FINANCE (YNAB Integration):
You have access to Ryan's budget data through YNAB (You Need A Budget). When users ask about financial matters, use the appropriate YNAB tools:
- Use ynab_budget_summary for overall budget health and status questions
- Use ynab_search_transactions to find specific purchases or spending at particular stores
- Use ynab_category_spending to analyze spending by category for a month
- Use ynab_insights to provide spending trends, patterns, and recommendations
Always use these tools when asked about budgets, spending, transactions, or financial health.""",
} }
] ]

View File

@@ -1,21 +1,44 @@
import os import os
from typing import cast from typing import cast
from dotenv import load_dotenv
from langchain.agents import create_agent from langchain.agents import create_agent
from langchain.chat_models import BaseChatModel from langchain.chat_models import BaseChatModel
from langchain.tools import tool from langchain.tools import tool
from langchain_ollama import ChatOllama
from langchain_openai import ChatOpenAI from langchain_openai import ChatOpenAI
from tavily import AsyncTavilyClient from tavily import AsyncTavilyClient
from blueprints.rag.logic import query_vector_store from blueprints.rag.logic import query_vector_store
from utils.ynab_service import YNABService
openai_gpt_5_mini = ChatOpenAI(model="gpt-5-mini") # Load environment variables
ollama_deepseek = ChatOllama(model="llama3.1:8b", base_url=os.getenv("OLLAMA_URL")) load_dotenv()
# Configure LLM with llama-server or OpenAI fallback
llama_url = os.getenv("LLAMA_SERVER_URL")
if llama_url:
llama_chat = ChatOpenAI(
base_url=llama_url,
api_key="not-needed",
model=os.getenv("LLAMA_MODEL_NAME", "llama-3.1-8b-instruct"),
)
else:
llama_chat = None
openai_fallback = ChatOpenAI(model="gpt-5-mini")
model_with_fallback = cast( model_with_fallback = cast(
BaseChatModel, ollama_deepseek.with_fallbacks([openai_gpt_5_mini]) BaseChatModel,
llama_chat.with_fallbacks([openai_fallback]) if llama_chat else openai_fallback,
) )
client = AsyncTavilyClient(os.getenv("TAVILY_KEY"), "") client = AsyncTavilyClient(api_key=os.getenv("TAVILY_API_KEY", ""))
# Initialize YNAB service (will only work if YNAB_ACCESS_TOKEN is set)
try:
ynab_service = YNABService()
ynab_enabled = True
except (ValueError, Exception) as e:
print(f"YNAB service not initialized: {e}")
ynab_enabled = False
@tool @tool
@@ -75,4 +98,198 @@ async def simba_search(query: str):
return serialized, docs return serialized, docs
main_agent = create_agent(model=model_with_fallback, tools=[simba_search, web_search]) @tool
def ynab_budget_summary() -> str:
"""Get overall budget summary and health status from YNAB.
Use this tool when the user asks about:
- Overall budget health or status
- How much money is to be budgeted
- Total budget amounts or spending
- General budget overview questions
Returns:
Summary of budget health, to-be-budgeted amount, total budgeted,
total activity, and available amounts.
"""
if not ynab_enabled:
return "YNAB integration is not configured. Please set YNAB_ACCESS_TOKEN environment variable."
try:
summary = ynab_service.get_budget_summary()
return summary["summary"]
except Exception as e:
return f"Error fetching budget summary: {str(e)}"
@tool
def ynab_search_transactions(
start_date: str = "",
end_date: str = "",
category_name: str = "",
payee_name: str = "",
) -> str:
"""Search YNAB transactions by date range, category, or payee.
Use this tool when the user asks about:
- Specific transactions or purchases
- Spending at a particular store or payee
- Transactions in a specific category
- What was spent during a time period
Args:
start_date: Start date in YYYY-MM-DD format (optional, defaults to 30 days ago)
end_date: End date in YYYY-MM-DD format (optional, defaults to today)
category_name: Filter by category name (optional, partial match)
payee_name: Filter by payee/store name (optional, partial match)
Returns:
List of matching transactions with dates, amounts, categories, and payees.
"""
if not ynab_enabled:
return "YNAB integration is not configured. Please set YNAB_ACCESS_TOKEN environment variable."
try:
result = ynab_service.get_transactions(
start_date=start_date or None,
end_date=end_date or None,
category_name=category_name or None,
payee_name=payee_name or None,
)
if result["count"] == 0:
return "No transactions found matching the specified criteria."
# Format transactions for readability
txn_list = []
for txn in result["transactions"][:10]: # Limit to 10 for readability
txn_list.append(
f"- {txn['date']}: {txn['payee']} - ${abs(txn['amount']):.2f} ({txn['category'] or 'Uncategorized'})"
)
return (
f"Found {result['count']} transactions from {result['start_date']} to {result['end_date']}. "
f"Total: ${abs(result['total_amount']):.2f}\n\n"
+ "\n".join(txn_list)
+ (
f"\n\n(Showing first 10 of {result['count']} transactions)"
if result["count"] > 10
else ""
)
)
except Exception as e:
return f"Error searching transactions: {str(e)}"
@tool
def ynab_category_spending(month: str = "") -> str:
"""Get spending breakdown by category for a specific month.
Use this tool when the user asks about:
- Spending by category
- What categories were overspent
- Monthly spending breakdown
- Budget vs actual spending for a month
Args:
month: Month in YYYY-MM format (optional, defaults to current month)
Returns:
Spending breakdown by category with budgeted, spent, and available amounts.
"""
if not ynab_enabled:
return "YNAB integration is not configured. Please set YNAB_ACCESS_TOKEN environment variable."
try:
result = ynab_service.get_category_spending(month=month or None)
summary = (
f"Budget spending for {result['month']}:\n"
f"Total budgeted: ${result['total_budgeted']:.2f}\n"
f"Total spent: ${result['total_spent']:.2f}\n"
f"Total available: ${result['total_available']:.2f}\n"
)
if result["overspent_categories"]:
summary += (
f"\nOverspent categories ({len(result['overspent_categories'])}):\n"
)
for cat in result["overspent_categories"][:5]:
summary += f"- {cat['name']}: Budgeted ${cat['budgeted']:.2f}, Spent ${cat['spent']:.2f}, Over by ${cat['overspent_by']:.2f}\n"
# Add top spending categories
summary += "\nTop spending categories:\n"
for cat in result["categories"][:10]:
if cat["activity"] < 0: # Only show spending (negative activity)
summary += f"- {cat['category']}: ${abs(cat['activity']):.2f} (budgeted: ${cat['budgeted']:.2f}, available: ${cat['available']:.2f})\n"
return summary
except Exception as e:
return f"Error fetching category spending: {str(e)}"
@tool
def ynab_insights(months_back: int = 3) -> str:
"""Generate insights about spending patterns and budget health over time.
Use this tool when the user asks about:
- Spending trends or patterns
- Budget recommendations
- Which categories are frequently overspent
- How current spending compares to past months
- Overall budget health analysis
Args:
months_back: Number of months to analyze (default 3, max 6)
Returns:
Insights about spending trends, frequently overspent categories,
and personalized recommendations.
"""
if not ynab_enabled:
return "YNAB integration is not configured. Please set YNAB_ACCESS_TOKEN environment variable."
try:
# Limit to reasonable range
months_back = min(max(1, months_back), 6)
result = ynab_service.get_spending_insights(months_back=months_back)
if "error" in result:
return result["error"]
summary = (
f"Spending insights for the last {months_back} months:\n\n"
f"Average monthly spending: ${result['average_monthly_spending']:.2f}\n"
f"Current month spending: ${result['current_month_spending']:.2f}\n"
f"Spending trend: {result['spending_trend']}\n"
)
if result["frequently_overspent_categories"]:
summary += "\nFrequently overspent categories:\n"
for cat in result["frequently_overspent_categories"][:5]:
summary += f"- {cat['category']}: overspent in {cat['months_overspent']} of {months_back} months\n"
if result["recommendations"]:
summary += "\nRecommendations:\n"
for rec in result["recommendations"]:
summary += f"- {rec}\n"
return summary
except Exception as e:
return f"Error generating insights: {str(e)}"
# Create tools list based on what's available
tools = [simba_search, web_search]
if ynab_enabled:
tools.extend(
[
ynab_budget_summary,
ynab_search_transactions,
ynab_category_spending,
ynab_insights,
]
)
# Llama 3.1 supports native function calling via OpenAI-compatible API
main_agent = create_agent(model=model_with_fallback, tools=tools)

View File

@@ -1,8 +1,12 @@
import os import os
import tempfile import tempfile
from dotenv import load_dotenv
import httpx import httpx
# Load environment variables
load_dotenv()
class PaperlessNGXService: class PaperlessNGXService:
def __init__(self): def __init__(self):

View File

@@ -1,6 +1,7 @@
import datetime import datetime
import os import os
from dotenv import load_dotenv
from langchain_chroma import Chroma from langchain_chroma import Chroma
from langchain_core.documents import Document from langchain_core.documents import Document
from langchain_openai import OpenAIEmbeddings from langchain_openai import OpenAIEmbeddings
@@ -8,6 +9,9 @@ from langchain_text_splitters import RecursiveCharacterTextSplitter
from .fetchers import PaperlessNGXService from .fetchers import PaperlessNGXService
# Load environment variables
load_dotenv()
embeddings = OpenAIEmbeddings(model="text-embedding-3-small") embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
vector_store = Chroma( vector_store = Chroma(

View File

@@ -7,6 +7,10 @@ from typing import Dict, Any
from authlib.jose import jwt from authlib.jose import jwt
from authlib.jose.errors import JoseError from authlib.jose.errors import JoseError
import httpx import httpx
from dotenv import load_dotenv
# Load environment variables
load_dotenv()
class OIDCConfig: class OIDCConfig:

View File

59
llm.py
View File

@@ -1,32 +1,25 @@
import os import os
from ollama import Client
from openai import OpenAI
import logging import logging
from openai import OpenAI
from dotenv import load_dotenv from dotenv import load_dotenv
load_dotenv() load_dotenv()
logging.basicConfig(level=logging.INFO) logging.basicConfig(level=logging.INFO)
TRY_OLLAMA = os.getenv("TRY_OLLAMA", False)
class LLMClient: class LLMClient:
def __init__(self): def __init__(self):
try: llama_url = os.getenv("LLAMA_SERVER_URL")
self.ollama_client = Client( if llama_url:
host=os.getenv("OLLAMA_URL", "http://localhost:11434"), timeout=1.0 self.client = OpenAI(base_url=llama_url, api_key="not-needed")
) self.model = os.getenv("LLAMA_MODEL_NAME", "llama-3.1-8b-instruct")
self.ollama_client.chat( self.PROVIDER = "llama_server"
model="gemma3:4b", messages=[{"role": "system", "content": "test"}] logging.info("Using llama_server as LLM backend")
) else:
self.PROVIDER = "ollama" self.client = OpenAI()
logging.info("Using Ollama as LLM backend") self.model = "gpt-4o-mini"
except Exception as e:
print(e)
self.openai_client = OpenAI()
self.PROVIDER = "openai" self.PROVIDER = "openai"
logging.info("Using OpenAI as LLM backend") logging.info("Using OpenAI as LLM backend")
@@ -35,11 +28,8 @@ class LLMClient:
prompt: str, prompt: str,
system_prompt: str, system_prompt: str,
): ):
# Instituting a fallback if my gaming PC is not on response = self.client.chat.completions.create(
if self.PROVIDER == "ollama": model=self.model,
try:
response = self.ollama_client.chat(
model="gemma3:4b",
messages=[ messages=[
{ {
"role": "system", "role": "system",
@@ -48,26 +38,9 @@ class LLMClient:
{"role": "user", "content": prompt}, {"role": "user", "content": prompt},
], ],
) )
output = response.message.content return response.choices[0].message.content
return output
except Exception as e:
logging.error(f"Could not connect to OLLAMA: {str(e)}")
response = self.openai_client.responses.create(
model="gpt-4o-mini",
input=[
{
"role": "system",
"content": system_prompt,
},
{"role": "user", "content": prompt},
],
)
output = response.output_text
return output
if __name__ == "__main__": if __name__ == "__main__":
client = Client() client = LLMClient()
client.chat(model="gemma3:4b", messages=[{"role": "system", "promp": "hack"}]) print(client.chat(prompt="Hello!", system_prompt="You are a helpful assistant."))

View File

@@ -5,7 +5,6 @@ import os
import sqlite3 import sqlite3
import time import time
import ollama
from dotenv import load_dotenv from dotenv import load_dotenv
import chromadb import chromadb
@@ -17,11 +16,6 @@ from utils.request import PaperlessNGXService
_dotenv_loaded = load_dotenv() _dotenv_loaded = load_dotenv()
# Configure ollama client with URL from environment or default to localhost
ollama_client = ollama.Client(
host=os.getenv("OLLAMA_URL", "http://localhost:11434"), timeout=10.0
)
client = chromadb.PersistentClient(path=os.getenv("CHROMADB_PATH", "")) client = chromadb.PersistentClient(path=os.getenv("CHROMADB_PATH", ""))
simba_docs = client.get_or_create_collection(name="simba_docs2") simba_docs = client.get_or_create_collection(name="simba_docs2")
feline_vet_lookup = client.get_or_create_collection(name="feline_vet_lookup") feline_vet_lookup = client.get_or_create_collection(name="feline_vet_lookup")

View File

@@ -9,7 +9,6 @@ dependencies = [
"python-dotenv>=1.0.0", "python-dotenv>=1.0.0",
"flask>=3.1.2", "flask>=3.1.2",
"httpx>=0.28.1", "httpx>=0.28.1",
"ollama>=0.6.0",
"openai>=2.0.1", "openai>=2.0.1",
"pydantic>=2.11.9", "pydantic>=2.11.9",
"pillow>=10.0.0", "pillow>=10.0.0",
@@ -34,8 +33,8 @@ dependencies = [
"langchain-chroma>=1.0.0", "langchain-chroma>=1.0.0",
"langchain-community>=0.4.1", "langchain-community>=0.4.1",
"jq>=1.10.0", "jq>=1.10.0",
"langchain-ollama>=1.0.1",
"tavily-python>=0.7.17", "tavily-python>=0.7.17",
"ynab>=1.3.0",
] ]
[tool.aerich] [tool.aerich]

View File

@@ -35,12 +35,14 @@ class ConversationService {
async sendQuery( async sendQuery(
query: string, query: string,
conversation_id: string, conversation_id: string,
signal?: AbortSignal,
): Promise<QueryResponse> { ): Promise<QueryResponse> {
const response = await userService.fetchWithRefreshToken( const response = await userService.fetchWithRefreshToken(
`${this.conversationBaseUrl}/query`, `${this.conversationBaseUrl}/query`,
{ {
method: "POST", method: "POST",
body: JSON.stringify({ query, conversation_id }), body: JSON.stringify({ query, conversation_id }),
signal,
}, },
); );

View File

@@ -43,12 +43,26 @@ export const ChatScreen = ({ setAuthenticated }: ChatScreenProps) => {
const [isLoading, setIsLoading] = useState<boolean>(false); const [isLoading, setIsLoading] = useState<boolean>(false);
const messagesEndRef = useRef<HTMLDivElement>(null); const messagesEndRef = useRef<HTMLDivElement>(null);
const isMountedRef = useRef<boolean>(true);
const abortControllerRef = useRef<AbortController | null>(null);
const simbaAnswers = ["meow.", "hiss...", "purrrrrr", "yowOWROWWowowr"]; const simbaAnswers = ["meow.", "hiss...", "purrrrrr", "yowOWROWWowowr"];
const scrollToBottom = () => { const scrollToBottom = () => {
messagesEndRef.current?.scrollIntoView({ behavior: "smooth" }); messagesEndRef.current?.scrollIntoView({ behavior: "smooth" });
}; };
// Cleanup effect to handle component unmounting
useEffect(() => {
isMountedRef.current = true;
return () => {
isMountedRef.current = false;
// Abort any pending requests when component unmounts
if (abortControllerRef.current) {
abortControllerRef.current.abort();
}
};
}, []);
const handleSelectConversation = (conversation: Conversation) => { const handleSelectConversation = (conversation: Conversation) => {
setShowConversations(false); setShowConversations(false);
setSelectedConversation(conversation); setSelectedConversation(conversation);
@@ -156,10 +170,15 @@ export const ChatScreen = ({ setAuthenticated }: ChatScreenProps) => {
return; return;
} }
// Create a new AbortController for this request
const abortController = new AbortController();
abortControllerRef.current = abortController;
try { try {
const result = await conversationService.sendQuery( const result = await conversationService.sendQuery(
query, query,
selectedConversation.id, selectedConversation.id,
abortController.signal,
); );
setQuestionsAnswers( setQuestionsAnswers(
questionsAnswers.concat([{ question: query, answer: result.response }]), questionsAnswers.concat([{ question: query, answer: result.response }]),
@@ -168,14 +187,24 @@ export const ChatScreen = ({ setAuthenticated }: ChatScreenProps) => {
currMessages.concat([{ text: result.response, speaker: "simba" }]), currMessages.concat([{ text: result.response, speaker: "simba" }]),
); );
} catch (error) { } catch (error) {
// Ignore abort errors (these are intentional cancellations)
if (error instanceof Error && error.name === "AbortError") {
console.log("Request was aborted");
} else {
console.error("Failed to send query:", error); console.error("Failed to send query:", error);
// If session expired, redirect to login // If session expired, redirect to login
if (error instanceof Error && error.message.includes("Session expired")) { if (error instanceof Error && error.message.includes("Session expired")) {
setAuthenticated(false); setAuthenticated(false);
} }
}
} finally { } finally {
// Only update loading state if component is still mounted
if (isMountedRef.current) {
setIsLoading(false); setIsLoading(false);
} }
// Clear the abort controller reference
abortControllerRef.current = null;
}
}; };
const handleQueryChange = (event: React.ChangeEvent<HTMLTextAreaElement>) => { const handleQueryChange = (event: React.ChangeEvent<HTMLTextAreaElement>) => {

View File

@@ -1,18 +1,11 @@
import json import json
import os
from typing import Literal from typing import Literal
import datetime import datetime
from ollama import Client
from openai import OpenAI from openai import OpenAI
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
# Configure ollama client with URL from environment or default to localhost
ollama_client = Client(
host=os.getenv("OLLAMA_URL", "http://localhost:11434"), timeout=10.0
)
# This uses inferred filters — which means using LLM to create the metadata filters # This uses inferred filters — which means using LLM to create the metadata filters

View File

@@ -3,7 +3,6 @@ from math import ceil
import re import re
from typing import Union from typing import Union
from uuid import UUID, uuid4 from uuid import UUID, uuid4
from ollama import Client
from chromadb.utils.embedding_functions.openai_embedding_function import ( from chromadb.utils.embedding_functions.openai_embedding_function import (
OpenAIEmbeddingFunction, OpenAIEmbeddingFunction,
) )
@@ -13,10 +12,6 @@ from llm import LLMClient
load_dotenv() load_dotenv()
ollama_client = Client(
host=os.getenv("OLLAMA_HOST", "http://localhost:11434"), timeout=1.0
)
def remove_headers_footers(text, header_patterns=None, footer_patterns=None): def remove_headers_footers(text, header_patterns=None, footer_patterns=None):
if header_patterns is None: if header_patterns is None:

342
utils/ynab_service.py Normal file
View File

@@ -0,0 +1,342 @@
"""YNAB API service for querying budget data."""
import os
from datetime import datetime, timedelta
from typing import Any, Optional
from dotenv import load_dotenv
import ynab
# Load environment variables
load_dotenv()
class YNABService:
"""Service for interacting with YNAB API."""
def __init__(self):
"""Initialize YNAB API client."""
self.access_token = os.getenv("YNAB_ACCESS_TOKEN", "")
self.budget_id = os.getenv("YNAB_BUDGET_ID", "")
if not self.access_token:
raise ValueError("YNAB_ACCESS_TOKEN environment variable is required")
# Configure API client
configuration = ynab.Configuration(access_token=self.access_token)
self.api_client = ynab.ApiClient(configuration)
# Initialize API endpoints
self.budgets_api = ynab.BudgetsApi(self.api_client)
self.transactions_api = ynab.TransactionsApi(self.api_client)
self.months_api = ynab.MonthsApi(self.api_client)
self.categories_api = ynab.CategoriesApi(self.api_client)
# Get budget ID if not provided
if not self.budget_id:
budgets_response = self.budgets_api.get_budgets()
if budgets_response.data and budgets_response.data.budgets:
self.budget_id = budgets_response.data.budgets[0].id
else:
raise ValueError("No YNAB budgets found")
def get_budget_summary(self) -> dict[str, Any]:
"""Get overall budget summary and health status.
Returns:
Dictionary containing budget summary with to-be-budgeted amount,
total budgeted, total activity, and overall budget health.
"""
budget_response = self.budgets_api.get_budget_by_id(self.budget_id)
budget_data = budget_response.data.budget
# Calculate totals from categories
to_be_budgeted = (
budget_data.months[0].to_be_budgeted / 1000 if budget_data.months else 0
)
total_budgeted = 0
total_activity = 0
total_available = 0
for category_group in budget_data.category_groups or []:
if category_group.deleted or category_group.hidden:
continue
for category in category_group.categories or []:
if category.deleted or category.hidden:
continue
total_budgeted += category.budgeted / 1000
total_activity += category.activity / 1000
total_available += category.balance / 1000
return {
"budget_name": budget_data.name,
"to_be_budgeted": round(to_be_budgeted, 2),
"total_budgeted": round(total_budgeted, 2),
"total_activity": round(total_activity, 2),
"total_available": round(total_available, 2),
"currency_format": budget_data.currency_format.iso_code
if budget_data.currency_format
else "USD",
"summary": f"Budget '{budget_data.name}' has ${abs(to_be_budgeted):.2f} {'to be budgeted' if to_be_budgeted > 0 else 'overbudgeted'}. "
f"Total budgeted: ${total_budgeted:.2f}, Total spent: ${abs(total_activity):.2f}, "
f"Total available: ${total_available:.2f}.",
}
def get_transactions(
self,
start_date: Optional[str] = None,
end_date: Optional[str] = None,
category_name: Optional[str] = None,
payee_name: Optional[str] = None,
limit: int = 50,
) -> dict[str, Any]:
"""Get transactions filtered by date range, category, or payee.
Args:
start_date: Start date in YYYY-MM-DD format (defaults to 30 days ago)
end_date: End date in YYYY-MM-DD format (defaults to today)
category_name: Filter by category name (case-insensitive partial match)
payee_name: Filter by payee name (case-insensitive partial match)
limit: Maximum number of transactions to return (default 50)
Returns:
Dictionary containing matching transactions and summary statistics.
"""
# Set default date range if not provided
if not start_date:
start_date = (datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d")
if not end_date:
end_date = datetime.now().strftime("%Y-%m-%d")
# Get transactions
transactions_response = self.transactions_api.get_transactions(
self.budget_id, since_date=start_date
)
transactions = transactions_response.data.transactions or []
# Filter by date range, category, and payee
filtered_transactions = []
total_amount = 0
for txn in transactions:
# Skip if deleted or before start date or after end date
if txn.deleted:
continue
txn_date = str(txn.date)
if txn_date < start_date or txn_date > end_date:
continue
# Filter by category if specified
if category_name and txn.category_name:
if category_name.lower() not in txn.category_name.lower():
continue
# Filter by payee if specified
if payee_name and txn.payee_name:
if payee_name.lower() not in txn.payee_name.lower():
continue
amount = txn.amount / 1000 # Convert milliunits to dollars
filtered_transactions.append(
{
"date": txn_date,
"payee": txn.payee_name,
"category": txn.category_name,
"memo": txn.memo,
"amount": round(amount, 2),
"approved": txn.approved,
}
)
total_amount += amount
# Sort by date (most recent first) and limit
filtered_transactions.sort(key=lambda x: x["date"], reverse=True)
filtered_transactions = filtered_transactions[:limit]
return {
"transactions": filtered_transactions,
"count": len(filtered_transactions),
"total_amount": round(total_amount, 2),
"start_date": start_date,
"end_date": end_date,
"filters": {"category": category_name, "payee": payee_name},
}
def get_category_spending(self, month: Optional[str] = None) -> dict[str, Any]:
"""Get spending breakdown by category for a specific month.
Args:
month: Month in YYYY-MM format (defaults to current month)
Returns:
Dictionary containing spending by category and summary.
"""
if not month:
month = datetime.now().strftime("%Y-%m-01")
else:
# Ensure format is YYYY-MM-01
if len(month) == 7: # YYYY-MM
month = f"{month}-01"
# Get budget month
month_response = self.months_api.get_budget_month(self.budget_id, month)
month_data = month_response.data.month
categories_spending = []
total_budgeted = 0
total_activity = 0
total_available = 0
overspent_categories = []
for category in month_data.categories or []:
if category.deleted or category.hidden:
continue
budgeted = category.budgeted / 1000
activity = category.activity / 1000
available = category.balance / 1000
total_budgeted += budgeted
total_activity += activity
total_available += available
# Track overspent categories
if available < 0:
overspent_categories.append(
{
"name": category.name,
"budgeted": round(budgeted, 2),
"spent": round(abs(activity), 2),
"overspent_by": round(abs(available), 2),
}
)
# Only include categories with activity
if activity != 0:
categories_spending.append(
{
"category": category.name,
"budgeted": round(budgeted, 2),
"activity": round(activity, 2),
"available": round(available, 2),
"goal_type": category.goal_type
if hasattr(category, "goal_type")
else None,
}
)
# Sort by absolute activity (highest spending first)
categories_spending.sort(key=lambda x: abs(x["activity"]), reverse=True)
return {
"month": month[:7], # Return YYYY-MM format
"categories": categories_spending,
"total_budgeted": round(total_budgeted, 2),
"total_spent": round(abs(total_activity), 2),
"total_available": round(total_available, 2),
"overspent_categories": overspent_categories,
"to_be_budgeted": round(month_data.to_be_budgeted / 1000, 2)
if month_data.to_be_budgeted
else 0,
}
def get_spending_insights(self, months_back: int = 3) -> dict[str, Any]:
"""Generate insights about spending patterns and budget health.
Args:
months_back: Number of months to analyze (default 3)
Returns:
Dictionary containing spending insights, trends, and recommendations.
"""
current_month = datetime.now()
monthly_data = []
# Collect data for the last N months
for i in range(months_back):
month_date = current_month - timedelta(days=30 * i)
month_str = month_date.strftime("%Y-%m-01")
try:
month_spending = self.get_category_spending(month_str)
monthly_data.append(
{
"month": month_str[:7],
"total_spent": month_spending["total_spent"],
"total_budgeted": month_spending["total_budgeted"],
"overspent_categories": month_spending["overspent_categories"],
}
)
except Exception:
# Skip months that don't have data yet
continue
if not monthly_data:
return {"error": "No spending data available for analysis"}
# Calculate average spending
avg_spending = sum(m["total_spent"] for m in monthly_data) / len(monthly_data)
current_spending = monthly_data[0]["total_spent"] if monthly_data else 0
# Identify spending trend
if len(monthly_data) >= 2:
recent_avg = sum(m["total_spent"] for m in monthly_data[:2]) / 2
older_avg = sum(m["total_spent"] for m in monthly_data[1:]) / (
len(monthly_data) - 1
)
trend = (
"increasing"
if recent_avg > older_avg * 1.1
else "decreasing"
if recent_avg < older_avg * 0.9
else "stable"
)
else:
trend = "insufficient data"
# Find frequently overspent categories
overspent_frequency = {}
for month in monthly_data:
for cat in month["overspent_categories"]:
cat_name = cat["name"]
if cat_name not in overspent_frequency:
overspent_frequency[cat_name] = 0
overspent_frequency[cat_name] += 1
frequently_overspent = [
{"category": cat, "months_overspent": count}
for cat, count in overspent_frequency.items()
if count > 1
]
frequently_overspent.sort(key=lambda x: x["months_overspent"], reverse=True)
# Generate recommendations
recommendations = []
if current_spending > avg_spending * 1.2:
recommendations.append(
f"Current month spending (${current_spending:.2f}) is significantly higher than your {months_back}-month average (${avg_spending:.2f})"
)
if frequently_overspent:
top_overspent = frequently_overspent[0]
recommendations.append(
f"'{top_overspent['category']}' has been overspent in {top_overspent['months_overspent']} of the last {months_back} months"
)
if trend == "increasing":
recommendations.append(
"Your spending trend is increasing. Consider reviewing your budget allocations."
)
return {
"analysis_period": f"Last {months_back} months",
"average_monthly_spending": round(avg_spending, 2),
"current_month_spending": round(current_spending, 2),
"spending_trend": trend,
"frequently_overspent_categories": frequently_overspent,
"recommendations": recommendations,
"monthly_breakdown": monthly_data,
}

18
uv.lock generated
View File

@@ -2551,6 +2551,7 @@ dependencies = [
{ name = "tomlkit" }, { name = "tomlkit" },
{ name = "tortoise-orm" }, { name = "tortoise-orm" },
{ name = "tortoise-orm-stubs" }, { name = "tortoise-orm-stubs" },
{ name = "ynab" },
] ]
[package.metadata] [package.metadata]
@@ -2586,6 +2587,7 @@ requires-dist = [
{ name = "tomlkit", specifier = ">=0.13.3" }, { name = "tomlkit", specifier = ">=0.13.3" },
{ name = "tortoise-orm", specifier = ">=0.25.1" }, { name = "tortoise-orm", specifier = ">=0.25.1" },
{ name = "tortoise-orm-stubs", specifier = ">=1.0.2" }, { name = "tortoise-orm-stubs", specifier = ">=1.0.2" },
{ name = "ynab", specifier = ">=1.3.0" },
] ]
[[package]] [[package]]
@@ -3385,6 +3387,22 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/73/ae/b48f95715333080afb75a4504487cbe142cae1268afc482d06692d605ae6/yarl-1.22.0-py3-none-any.whl", hash = "sha256:1380560bdba02b6b6c90de54133c81c9f2a453dee9912fe58c1dcced1edb7cff", size = 46814, upload-time = "2025-10-06T14:12:53.872Z" }, { url = "https://files.pythonhosted.org/packages/73/ae/b48f95715333080afb75a4504487cbe142cae1268afc482d06692d605ae6/yarl-1.22.0-py3-none-any.whl", hash = "sha256:1380560bdba02b6b6c90de54133c81c9f2a453dee9912fe58c1dcced1edb7cff", size = 46814, upload-time = "2025-10-06T14:12:53.872Z" },
] ]
[[package]]
name = "ynab"
version = "1.9.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "certifi" },
{ name = "pydantic" },
{ name = "python-dateutil" },
{ name = "typing-extensions" },
{ name = "urllib3" },
]
sdist = { url = "https://files.pythonhosted.org/packages/9a/3e/36599ae876db3e1d32e393ab0934547df75bab70373c14ca5805246f99bc/ynab-1.9.0.tar.gz", hash = "sha256:fa50bdff641b3a273661e9f6e8a210f5ad98991a998dc09dec0a8122d734d1c6", size = 64898, upload-time = "2025-10-06T19:14:32.707Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/b2/9c/0ccd11bcdf7522fcb2823fcd7ffbb48e3164d72caaf3f920c7b068347175/ynab-1.9.0-py3-none-any.whl", hash = "sha256:72ac0219605b4280149684ecd0fec3bd75d938772d65cdeea9b3e66a1b2f470d", size = 208674, upload-time = "2025-10-06T19:14:31.719Z" },
]
[[package]] [[package]]
name = "zipp" name = "zipp"
version = "3.23.0" version = "3.23.0"