diff --git a/blueprints/conversation/agents.py b/blueprints/conversation/agents.py index 9027ab1..8b238b3 100644 --- a/blueprints/conversation/agents.py +++ b/blueprints/conversation/agents.py @@ -121,7 +121,7 @@ async def simba_search(query: str): Relevant information from Simba's documents """ print(f"[SIMBA SEARCH] Tool called with query: {query}") - serialized, docs = await query_vector_store(query=query) + serialized, docs = await query_vector_store(query=query, source="paperless") print(f"[SIMBA SEARCH] Found {len(docs)} documents") print(f"[SIMBA SEARCH] Serialized result length: {len(serialized)}") print(f"[SIMBA SEARCH] First 200 chars: {serialized[:200]}") @@ -329,8 +329,8 @@ async def obsidian_search_notes(query: str) -> str: return "Obsidian integration is not configured. Please set OBSIDIAN_VAULT_PATH environment variable." try: - # Query vector store for obsidian documents - serialized, docs = await query_vector_store(query=query) + # Query vector store filtered to obsidian source only + serialized, docs = await query_vector_store(query=query, source="obsidian") return serialized except Exception as e: diff --git a/blueprints/rag/logic.py b/blueprints/rag/logic.py index 26f8d0f..38cbb4b 100644 --- a/blueprints/rag/logic.py +++ b/blueprints/rag/logic.py @@ -3,12 +3,16 @@ import logging import os import re import time +from pathlib import Path from dotenv import load_dotenv from langchain_core.documents import Document from langchain_openai import OpenAIEmbeddings from langchain_postgres import PGVector -from langchain_text_splitters import RecursiveCharacterTextSplitter +from langchain_text_splitters import ( + MarkdownHeaderTextSplitter, + RecursiveCharacterTextSplitter, +) from sqlalchemy import create_engine, text from .fetchers import PaperlessNGXService @@ -69,6 +73,46 @@ text_splitter = RecursiveCharacterTextSplitter( add_start_index=True, # track index in original document ) +md_header_splitter = MarkdownHeaderTextSplitter( + headers_to_split_on=[("#", "h1"), ("##", "h2"), ("###", "h3")], + strip_headers=False, +) + +md_chunk_splitter = RecursiveCharacterTextSplitter( + chunk_size=1000, + chunk_overlap=200, + add_start_index=True, +) + + +def _split_markdown_document(doc: Document) -> list[Document]: + """Split a markdown document by headers first, then by size. + + Prepends the note filename to each chunk so chunks are self-contained. + """ + note_name = ( + Path(doc.metadata.get("filepath", "")).stem + if doc.metadata.get("filepath") + else "" + ) + + # Split by markdown headers + header_splits = md_header_splitter.split_text(doc.page_content) + + # Carry over original document metadata to each header split + for split in header_splits: + split.metadata.update(doc.metadata) + + # Then apply size-based splitting on large sections + sized_splits = md_chunk_splitter.split_documents(header_splits) + + # Prepend note name for self-contained context + if note_name: + for split in sized_splits: + split.page_content = f"[Note: {note_name}]\n{split.page_content}" + + return sized_splits + def _get_collection_id(): """Get the UUID of our collection from the langchain_pg_collection table.""" @@ -107,6 +151,7 @@ async def fetch_documents_from_paperless_ngx() -> list[Document]: documents = [] for doc in data: metadata = { + "source": "paperless", "created_date": date_to_epoch(doc["created_date"]), "filename": doc["original_file_name"], "document_type": doctypes.get(doc["document_type"], ""), @@ -188,6 +233,9 @@ async def fetch_obsidian_documents() -> list[Document]: metadata = { "source": "obsidian", "filepath": parsed["filepath"], + "folder": str(Path(parsed["filepath"]).parent) + if parsed["filepath"] + else "", "tags": parsed["tags"], "created_at": parsed["metadata"].get("created_at"), "indexed_at": time.time(), @@ -224,8 +272,10 @@ async def index_obsidian_documents(): # Delete existing obsidian chunks delete_documents_by_metadata("source", "obsidian") - # Split, sanitize, and index documents - splits = text_splitter.split_documents(documents) + # Split using markdown-aware chunking, sanitize, and index + splits = [] + for doc in documents: + splits.extend(_split_markdown_document(doc)) splits = _sanitize_documents(splits) vector_store = _get_vector_store() await vector_store.aadd_documents(documents=splits) @@ -315,6 +365,9 @@ async def sync_obsidian_documents() -> dict[str, int]: metadata = { "source": "obsidian", "filepath": parsed["filepath"], + "folder": str(Path(parsed["filepath"]).parent) + if parsed["filepath"] + else "", "tags": parsed["tags"], "created_at": parsed["metadata"].get("created_at"), "indexed_at": now, @@ -334,7 +387,9 @@ async def sync_obsidian_documents() -> dict[str, int]: continue if documents: - splits = text_splitter.split_documents(documents) + splits = [] + for doc in documents: + splits.extend(_split_markdown_document(doc)) splits = _sanitize_documents(splits) if splits: vector_store = _get_vector_store() @@ -350,9 +405,26 @@ async def sync_obsidian_documents() -> dict[str, int]: return {"added": added, "updated": updated, "deleted": deleted} -async def query_vector_store(query: str): +async def query_vector_store( + query: str, + source: str | None = None, + k: int = 8, +): + """Query the vector store with optional source filtering and MMR. + + Args: + query: Search query text + source: Filter by source metadata (e.g., "obsidian", "paperless") + k: Number of results to return + """ vector_store = _get_vector_store() - retrieved_docs = await vector_store.asimilarity_search(query, k=6) + filter_dict = {"source": source} if source else None + retrieved_docs = await vector_store.amax_marginal_relevance_search( + query, + k=k, + fetch_k=k * 3, + filter=filter_dict, + ) serialized = "\n\n".join( (f"Source: {doc.metadata}\nContent: {doc.page_content}") for doc in retrieved_docs