diff --git a/blueprints/rag/logic.py b/blueprints/rag/logic.py index 8a90e5c..26f8d0f 100644 --- a/blueprints/rag/logic.py +++ b/blueprints/rag/logic.py @@ -10,7 +10,6 @@ from langchain_openai import OpenAIEmbeddings from langchain_postgres import PGVector from langchain_text_splitters import RecursiveCharacterTextSplitter from sqlalchemy import create_engine, text -from sqlalchemy.ext.asyncio import create_async_engine from .fetchers import PaperlessNGXService from utils.obsidian_service import ObsidianService @@ -64,13 +63,6 @@ def _get_engine(): return _get_engine._engine -def _get_async_engine(): - """Get an async SQLAlchemy engine for direct queries.""" - if not hasattr(_get_async_engine, "_engine"): - _get_async_engine._engine = create_async_engine(_pgvector_url) - return _get_async_engine._engine - - text_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, # chunk size (characters) chunk_overlap=200, # chunk overlap (characters) @@ -241,14 +233,18 @@ async def index_obsidian_documents(): return {"indexed": len(documents)} -async def _get_obsidian_indexed_files() -> dict[str, float]: - """Return {filepath: indexed_at} for all obsidian chunks in pgvector.""" +# In-memory cache of indexed obsidian files: {filepath: indexed_at} +_obsidian_index_cache: dict[str, float] = {} + + +def _load_obsidian_index_cache() -> dict[str, float]: + """Load indexed obsidian files from DB into cache (cold start only).""" collection_id = _get_collection_id() if not collection_id: return {} - engine = _get_async_engine() - async with engine.connect() as conn: - result = await conn.execute( + engine = _get_engine() + with engine.connect() as conn: + result = conn.execute( text( "SELECT DISTINCT cmetadata->>'filepath' AS filepath, " "MAX((cmetadata->>'indexed_at')::float) AS indexed_at " @@ -270,8 +266,13 @@ async def sync_obsidian_documents() -> dict[str, int]: Returns: Dict with counts of added, updated, and deleted files. """ + global _obsidian_index_cache + obsidian_service = ObsidianService() - indexed_files = await _get_obsidian_indexed_files() + + # Load cache from DB on first run + if not _obsidian_index_cache: + _obsidian_index_cache = _load_obsidian_index_cache() # Build map of current vault files -> mtime vault_files: dict[str, float] = {} @@ -285,7 +286,7 @@ async def sync_obsidian_documents() -> dict[str, int]: # Find files to add or update files_to_index: list[str] = [] for filepath, mtime in vault_files.items(): - indexed_at = indexed_files.get(filepath) + indexed_at = _obsidian_index_cache.get(filepath) if indexed_at is None: files_to_index.append(filepath) added += 1 @@ -295,14 +296,16 @@ async def sync_obsidian_documents() -> dict[str, int]: files_to_index.append(filepath) updated += 1 - # Find deleted files (in DB but not on disk) - for filepath in indexed_files: + # Find deleted files (in cache but not on disk) + for filepath in list(_obsidian_index_cache): if filepath not in vault_files: delete_documents_by_metadata("filepath", filepath) + del _obsidian_index_cache[filepath] deleted += 1 # Index new/changed files if files_to_index: + now = time.time() documents = [] for filepath in files_to_index: try: @@ -314,7 +317,7 @@ async def sync_obsidian_documents() -> dict[str, int]: "filepath": parsed["filepath"], "tags": parsed["tags"], "created_at": parsed["metadata"].get("created_at"), - "indexed_at": time.time(), + "indexed_at": now, **{ k: v for k, v in parsed["metadata"].items() @@ -337,6 +340,10 @@ async def sync_obsidian_documents() -> dict[str, int]: vector_store = _get_vector_store() await vector_store.aadd_documents(documents=splits) + # Update cache for successfully processed files + for filepath in files_to_index: + _obsidian_index_cache[filepath] = now + logger.info( f"Obsidian sync complete: {added} added, {updated} updated, {deleted} deleted" )