Use in-memory cache for obsidian indexed files instead of cross-engine DB query

The async/sync engine split caused visibility issues where newly indexed
files weren't found on the next cycle, triggering re-indexing of all 36
files every 60 seconds. Replace with a module-level dict that loads from
DB on cold start and stays in sync via cache updates after each indexing.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-05-31 07:37:39 -04:00
parent 00c9b44c0e
commit 9a149cdaa6
+25 -18
View File
@@ -10,7 +10,6 @@ from langchain_openai import OpenAIEmbeddings
from langchain_postgres import PGVector from langchain_postgres import PGVector
from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_text_splitters import RecursiveCharacterTextSplitter
from sqlalchemy import create_engine, text from sqlalchemy import create_engine, text
from sqlalchemy.ext.asyncio import create_async_engine
from .fetchers import PaperlessNGXService from .fetchers import PaperlessNGXService
from utils.obsidian_service import ObsidianService from utils.obsidian_service import ObsidianService
@@ -64,13 +63,6 @@ def _get_engine():
return _get_engine._engine return _get_engine._engine
def _get_async_engine():
"""Get an async SQLAlchemy engine for direct queries."""
if not hasattr(_get_async_engine, "_engine"):
_get_async_engine._engine = create_async_engine(_pgvector_url)
return _get_async_engine._engine
text_splitter = RecursiveCharacterTextSplitter( text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000, # chunk size (characters) chunk_size=1000, # chunk size (characters)
chunk_overlap=200, # chunk overlap (characters) chunk_overlap=200, # chunk overlap (characters)
@@ -241,14 +233,18 @@ async def index_obsidian_documents():
return {"indexed": len(documents)} return {"indexed": len(documents)}
async def _get_obsidian_indexed_files() -> dict[str, float]: # In-memory cache of indexed obsidian files: {filepath: indexed_at}
"""Return {filepath: indexed_at} for all obsidian chunks in pgvector.""" _obsidian_index_cache: dict[str, float] = {}
def _load_obsidian_index_cache() -> dict[str, float]:
"""Load indexed obsidian files from DB into cache (cold start only)."""
collection_id = _get_collection_id() collection_id = _get_collection_id()
if not collection_id: if not collection_id:
return {} return {}
engine = _get_async_engine() engine = _get_engine()
async with engine.connect() as conn: with engine.connect() as conn:
result = await conn.execute( result = conn.execute(
text( text(
"SELECT DISTINCT cmetadata->>'filepath' AS filepath, " "SELECT DISTINCT cmetadata->>'filepath' AS filepath, "
"MAX((cmetadata->>'indexed_at')::float) AS indexed_at " "MAX((cmetadata->>'indexed_at')::float) AS indexed_at "
@@ -270,8 +266,13 @@ async def sync_obsidian_documents() -> dict[str, int]:
Returns: Returns:
Dict with counts of added, updated, and deleted files. Dict with counts of added, updated, and deleted files.
""" """
global _obsidian_index_cache
obsidian_service = ObsidianService() obsidian_service = ObsidianService()
indexed_files = await _get_obsidian_indexed_files()
# Load cache from DB on first run
if not _obsidian_index_cache:
_obsidian_index_cache = _load_obsidian_index_cache()
# Build map of current vault files -> mtime # Build map of current vault files -> mtime
vault_files: dict[str, float] = {} vault_files: dict[str, float] = {}
@@ -285,7 +286,7 @@ async def sync_obsidian_documents() -> dict[str, int]:
# Find files to add or update # Find files to add or update
files_to_index: list[str] = [] files_to_index: list[str] = []
for filepath, mtime in vault_files.items(): for filepath, mtime in vault_files.items():
indexed_at = indexed_files.get(filepath) indexed_at = _obsidian_index_cache.get(filepath)
if indexed_at is None: if indexed_at is None:
files_to_index.append(filepath) files_to_index.append(filepath)
added += 1 added += 1
@@ -295,14 +296,16 @@ async def sync_obsidian_documents() -> dict[str, int]:
files_to_index.append(filepath) files_to_index.append(filepath)
updated += 1 updated += 1
# Find deleted files (in DB but not on disk) # Find deleted files (in cache but not on disk)
for filepath in indexed_files: for filepath in list(_obsidian_index_cache):
if filepath not in vault_files: if filepath not in vault_files:
delete_documents_by_metadata("filepath", filepath) delete_documents_by_metadata("filepath", filepath)
del _obsidian_index_cache[filepath]
deleted += 1 deleted += 1
# Index new/changed files # Index new/changed files
if files_to_index: if files_to_index:
now = time.time()
documents = [] documents = []
for filepath in files_to_index: for filepath in files_to_index:
try: try:
@@ -314,7 +317,7 @@ async def sync_obsidian_documents() -> dict[str, int]:
"filepath": parsed["filepath"], "filepath": parsed["filepath"],
"tags": parsed["tags"], "tags": parsed["tags"],
"created_at": parsed["metadata"].get("created_at"), "created_at": parsed["metadata"].get("created_at"),
"indexed_at": time.time(), "indexed_at": now,
**{ **{
k: v k: v
for k, v in parsed["metadata"].items() for k, v in parsed["metadata"].items()
@@ -337,6 +340,10 @@ async def sync_obsidian_documents() -> dict[str, int]:
vector_store = _get_vector_store() vector_store = _get_vector_store()
await vector_store.aadd_documents(documents=splits) await vector_store.aadd_documents(documents=splits)
# Update cache for successfully processed files
for filepath in files_to_index:
_obsidian_index_cache[filepath] = now
logger.info( logger.info(
f"Obsidian sync complete: {added} added, {updated} updated, {deleted} deleted" f"Obsidian sync complete: {added} added, {updated} updated, {deleted} deleted"
) )