From 869de1c2507e277eb7d7848c23c8ef8b9d8b2ec2 Mon Sep 17 00:00:00 2001 From: Ryan Chen Date: Sun, 31 May 2026 07:05:48 -0400 Subject: [PATCH] Add incremental Obsidian-to-pgvector sync with background watcher Replace full delete-and-reindex with mtime-based incremental sync that only re-indexes changed/new files and removes deleted ones. A background polling task keeps the vector store up-to-date automatically when OBSIDIAN_CONTINUOUS_SYNC=true. Co-Authored-By: Claude Opus 4.6 --- app.py | 31 +++++++++++ blueprints/rag/__init__.py | 6 +-- blueprints/rag/logic.py | 102 +++++++++++++++++++++++++++++++++++++ 3 files changed, 136 insertions(+), 3 deletions(-) diff --git a/app.py b/app.py index 9aaf19f..67dfb89 100644 --- a/app.py +++ b/app.py @@ -1,3 +1,4 @@ +import asyncio import logging import os from datetime import timedelta @@ -51,13 +52,43 @@ app.register_blueprint(blueprints.rag.rag_blueprint) app.register_blueprint(blueprints.whatsapp.whatsapp_blueprint) +async def _obsidian_sync_loop(): + """Background task that incrementally syncs Obsidian documents to pgvector.""" + from blueprints.rag.logic import sync_obsidian_documents + + interval = int(os.getenv("OBSIDIAN_SYNC_INTERVAL", "60")) + logger = logging.getLogger("obsidian_sync") + logger.info(f"Obsidian sync watcher started (interval={interval}s)") + + while True: + try: + result = await sync_obsidian_documents() + if result["added"] or result["updated"] or result["deleted"]: + logger.info( + f"Obsidian sync: {result['added']} added, " + f"{result['updated']} updated, {result['deleted']} deleted" + ) + except Exception: + logger.exception("Obsidian sync error") + await asyncio.sleep(interval) + + # Initialize Tortoise ORM with lifecycle hooks @app.while_serving async def lifespan(): logging.info("Initializing Tortoise ORM...") await Tortoise.init(config=TORTOISE_CONFIG) logging.info("Tortoise ORM initialized successfully") + + watcher_task = None + if os.getenv("OBSIDIAN_CONTINUOUS_SYNC") == "true": + watcher_task = asyncio.create_task(_obsidian_sync_loop()) + yield + + if watcher_task is not None: + watcher_task.cancel() + logging.info("Closing Tortoise ORM connections...") await Tortoise.close_connections() diff --git a/blueprints/rag/__init__.py b/blueprints/rag/__init__.py index bd0db38..196aea9 100644 --- a/blueprints/rag/__init__.py +++ b/blueprints/rag/__init__.py @@ -5,7 +5,7 @@ from .logic import ( delete_all_documents, get_vector_store_stats, index_documents, - index_obsidian_documents, + sync_obsidian_documents, ) from blueprints.users.decorators import admin_required @@ -48,9 +48,9 @@ async def trigger_reindex(): @rag_blueprint.post("/index-obsidian") @admin_required async def trigger_obsidian_index(): - """Index all Obsidian markdown documents into vector store. Admin only.""" + """Incrementally sync Obsidian documents into vector store. Admin only.""" try: - result = await index_obsidian_documents() + result = await sync_obsidian_documents() stats = get_vector_store_stats() return jsonify({"status": "success", "result": result, "stats": stats}) except Exception as e: diff --git a/blueprints/rag/logic.py b/blueprints/rag/logic.py index a581f60..28f2795 100644 --- a/blueprints/rag/logic.py +++ b/blueprints/rag/logic.py @@ -2,6 +2,7 @@ import datetime import logging import os import re +import time from dotenv import load_dotenv from langchain_core.documents import Document @@ -180,6 +181,7 @@ async def fetch_obsidian_documents() -> list[Document]: "filepath": parsed["filepath"], "tags": parsed["tags"], "created_at": parsed["metadata"].get("created_at"), + "indexed_at": time.time(), **{ k: v for k, v in parsed["metadata"].items() @@ -219,6 +221,106 @@ async def index_obsidian_documents(): return {"indexed": len(documents)} +def _get_obsidian_indexed_files() -> dict[str, float]: + """Return {filepath: indexed_at} for all obsidian chunks in pgvector.""" + collection_id = _get_collection_id() + if not collection_id: + return {} + engine = _get_engine() + with engine.connect() as conn: + result = conn.execute( + text( + "SELECT DISTINCT cmetadata->>'filepath' AS filepath, " + "MAX((cmetadata->>'indexed_at')::float) AS indexed_at " + "FROM langchain_pg_embedding " + "WHERE collection_id = :cid AND cmetadata->>'source' = 'obsidian' " + "GROUP BY cmetadata->>'filepath'" + ), + {"cid": collection_id}, + ) + return {row[0]: row[1] for row in result if row[0] is not None} + + +async def sync_obsidian_documents() -> dict[str, int]: + """Incrementally sync Obsidian documents to pgvector. + + Compares file mtimes against stored indexed_at timestamps to only + re-index changed/new files and remove deleted ones. + + Returns: + Dict with counts of added, updated, and deleted files. + """ + obsidian_service = ObsidianService() + indexed_files = _get_obsidian_indexed_files() + + # Build map of current vault files -> mtime + vault_files: dict[str, float] = {} + for md_path in obsidian_service.walk_vault(): + vault_files[str(md_path)] = md_path.stat().st_mtime + + added = 0 + updated = 0 + deleted = 0 + + # Find files to add or update + files_to_index: list[str] = [] + for filepath, mtime in vault_files.items(): + indexed_at = indexed_files.get(filepath) + if indexed_at is None: + files_to_index.append(filepath) + added += 1 + elif mtime > indexed_at: + # Delete old chunks first + delete_documents_by_metadata("filepath", filepath) + files_to_index.append(filepath) + updated += 1 + + # Find deleted files (in DB but not on disk) + for filepath in indexed_files: + if filepath not in vault_files: + delete_documents_by_metadata("filepath", filepath) + deleted += 1 + + # Index new/changed files + if files_to_index: + documents = [] + for filepath in files_to_index: + try: + with open(filepath, "r", encoding="utf-8") as f: + content = f.read() + parsed = obsidian_service.parse_markdown(content, filepath) + document = Document( + page_content=parsed["content"], + metadata={ + "source": "obsidian", + "filepath": parsed["filepath"], + "tags": parsed["tags"], + "created_at": parsed["metadata"].get("created_at"), + "indexed_at": time.time(), + **{ + k: v + for k, v in parsed["metadata"].items() + if k not in ["created_at", "created_by"] + }, + }, + ) + documents.append(document) + except Exception as e: + logger.warning(f"Error reading {filepath}: {e}") + continue + + if documents: + splits = text_splitter.split_documents(documents) + splits = _sanitize_documents(splits) + vector_store = _get_vector_store() + await vector_store.aadd_documents(documents=splits) + + logger.info( + f"Obsidian sync complete: {added} added, {updated} updated, {deleted} deleted" + ) + return {"added": added, "updated": updated, "deleted": deleted} + + async def query_vector_store(query: str): vector_store = _get_vector_store() retrieved_docs = await vector_store.asimilarity_search(query, k=2)