Files
simbarag/blueprints/rag/logic.py
T
ryan 438399646f Replace ChromaDB with pgvector for vector storage
Consolidate onto PostgreSQL by using pgvector instead of a separate
ChromaDB instance. This removes a Docker volume, a large dependency,
and simplifies the stack without meaningful performance impact at
our document scale.

- Swap langchain-chroma for langchain-postgres (PGVector)
- Use pgvector/pgvector:pg16 Docker image with init script
- Lazy-initialize vector store to avoid eager DB connections
- Add SQL helpers for stats/delete/list (replacing _collection access)
- Remove legacy main.py, chunker, petmd scraper, and /api/query endpoint

Re-index required after deploy (POST /api/rag/index + /index-obsidian).

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-04-24 08:43:52 -04:00

261 lines
7.7 KiB
Python

import datetime
import logging
import os
from dotenv import load_dotenv
from langchain_core.documents import Document
from langchain_openai import OpenAIEmbeddings
from langchain_postgres import PGVector
from langchain_text_splitters import RecursiveCharacterTextSplitter
from sqlalchemy import create_engine, text
from .fetchers import PaperlessNGXService
from utils.obsidian_service import ObsidianService
# Load environment variables
load_dotenv()
logger = logging.getLogger(__name__)
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
# Convert Tortoise-style postgres:// URL to SQLAlchemy-style postgresql+psycopg://
_db_url = os.getenv(
"DATABASE_URL", "postgres://raggr:raggr_dev_password@localhost:5432/raggr"
)
_pgvector_url = _db_url.replace("postgres://", "postgresql+psycopg://")
# Lazy-initialized vector store (defers DB connection to first use)
_vector_store = None
def _get_vector_store() -> PGVector:
global _vector_store
if _vector_store is None:
_vector_store = PGVector(
embeddings=embeddings,
collection_name="simba_docs",
connection=_pgvector_url,
use_jsonb=True,
create_extension=False, # created by docker init script
)
return _vector_store
def _get_engine():
"""Get a SQLAlchemy engine for direct queries."""
if not hasattr(_get_engine, "_engine"):
_get_engine._engine = create_engine(_pgvector_url)
return _get_engine._engine
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000, # chunk size (characters)
chunk_overlap=200, # chunk overlap (characters)
add_start_index=True, # track index in original document
)
def _get_collection_id():
"""Get the UUID of our collection from the langchain_pg_collection table."""
engine = _get_engine()
with engine.connect() as conn:
result = conn.execute(
text("SELECT uuid FROM langchain_pg_collection WHERE name = :name"),
{"name": "simba_docs"},
)
row = result.fetchone()
return row[0] if row else None
def date_to_epoch(date_str: str) -> float:
split_date = date_str.split("-")
date = datetime.datetime(
int(split_date[0]),
int(split_date[1]),
int(split_date[2]),
0,
0,
0,
)
return date.timestamp()
async def fetch_documents_from_paperless_ngx() -> list[Document]:
ppngx = PaperlessNGXService()
data = ppngx.get_data()
doctypes = ppngx.get_doctypes()
documents = []
for doc in data:
metadata = {
"created_date": date_to_epoch(doc["created_date"]),
"filename": doc["original_file_name"],
"document_type": doctypes.get(doc["document_type"], ""),
}
documents.append(Document(page_content=doc["content"], metadata=metadata))
return documents
async def index_documents():
"""Index Paperless-NGX documents into vector store."""
documents = await fetch_documents_from_paperless_ngx()
splits = text_splitter.split_documents(documents)
vector_store = _get_vector_store()
await vector_store.aadd_documents(documents=splits)
async def fetch_obsidian_documents() -> list[Document]:
"""Fetch all markdown documents from Obsidian vault.
Returns:
List of LangChain Document objects with source='obsidian' metadata.
"""
obsidian_service = ObsidianService()
documents = []
for md_path in obsidian_service.walk_vault():
try:
# Read markdown file
with open(md_path, "r", encoding="utf-8") as f:
content = f.read()
# Parse metadata
parsed = obsidian_service.parse_markdown(content, md_path)
# Create LangChain Document with obsidian source
document = Document(
page_content=parsed["content"],
metadata={
"source": "obsidian",
"filepath": parsed["filepath"],
"tags": parsed["tags"],
"created_at": parsed["metadata"].get("created_at"),
**{
k: v
for k, v in parsed["metadata"].items()
if k not in ["created_at", "created_by"]
},
},
)
documents.append(document)
except Exception as e:
logger.warning(f"Error reading {md_path}: {e}")
continue
return documents
async def index_obsidian_documents():
"""Index all Obsidian markdown documents into vector store.
Deletes existing obsidian source chunks before re-indexing.
"""
documents = await fetch_obsidian_documents()
if not documents:
logger.info("No Obsidian documents found to index")
return {"indexed": 0}
# Delete existing obsidian chunks
delete_documents_by_metadata("source", "obsidian")
# Split and index documents
splits = text_splitter.split_documents(documents)
vector_store = _get_vector_store()
await vector_store.aadd_documents(documents=splits)
return {"indexed": len(documents)}
async def query_vector_store(query: str):
vector_store = _get_vector_store()
retrieved_docs = await vector_store.asimilarity_search(query, k=2)
serialized = "\n\n".join(
(f"Source: {doc.metadata}\nContent: {doc.page_content}")
for doc in retrieved_docs
)
return serialized, retrieved_docs
def delete_all_documents():
"""Delete all documents from the vector store collection."""
collection_id = _get_collection_id()
if not collection_id:
return
engine = _get_engine()
with engine.connect() as conn:
conn.execute(
text("DELETE FROM langchain_pg_embedding WHERE collection_id = :cid"),
{"cid": collection_id},
)
conn.commit()
def delete_documents_by_metadata(key: str, value: str):
"""Delete documents matching a metadata key/value pair."""
collection_id = _get_collection_id()
if not collection_id:
return
engine = _get_engine()
with engine.connect() as conn:
conn.execute(
text(
"DELETE FROM langchain_pg_embedding "
"WHERE collection_id = :cid AND cmetadata->>:key = :value"
),
{"cid": collection_id, "key": key, "value": value},
)
conn.commit()
def get_vector_store_stats():
"""Get statistics about the vector store."""
collection_id = _get_collection_id()
count = 0
if collection_id:
engine = _get_engine()
with engine.connect() as conn:
result = conn.execute(
text(
"SELECT COUNT(*) FROM langchain_pg_embedding WHERE collection_id = :cid"
),
{"cid": collection_id},
)
count = result.scalar()
return {
"total_documents": count,
"collection_name": "simba_docs",
}
def list_all_documents(limit: int = 10):
"""List documents in the vector store with their metadata."""
collection_id = _get_collection_id()
if not collection_id:
return []
engine = _get_engine()
with engine.connect() as conn:
result = conn.execute(
text(
"SELECT id, document, cmetadata FROM langchain_pg_embedding "
"WHERE collection_id = :cid LIMIT :limit"
),
{"cid": collection_id, "limit": limit},
)
documents = []
for row in result:
documents.append(
{
"id": str(row[0]),
"metadata": row[2],
"content_preview": row[1][:200] if row[1] else None,
}
)
return documents