|
|
|
@@ -1,6 +1,7 @@
|
|
|
|
|
import datetime
|
|
|
|
|
import logging
|
|
|
|
|
import os
|
|
|
|
|
import re
|
|
|
|
|
|
|
|
|
|
from dotenv import load_dotenv
|
|
|
|
|
from langchain_core.documents import Document
|
|
|
|
@@ -17,7 +18,18 @@ load_dotenv()
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
embeddings = OpenAIEmbeddings(model="text-embedding-3-small")
|
|
|
|
|
_embedding_server_url = os.getenv("EMBEDDING_SERVER_URL")
|
|
|
|
|
_embedding_model = os.getenv("EMBEDDING_MODEL_NAME", "text-embedding-3-small")
|
|
|
|
|
|
|
|
|
|
if _embedding_server_url:
|
|
|
|
|
embeddings = OpenAIEmbeddings(
|
|
|
|
|
model=_embedding_model,
|
|
|
|
|
base_url=_embedding_server_url,
|
|
|
|
|
api_key="not-needed",
|
|
|
|
|
check_embedding_ctx_length=False,
|
|
|
|
|
)
|
|
|
|
|
else:
|
|
|
|
|
embeddings = OpenAIEmbeddings(model=_embedding_model)
|
|
|
|
|
|
|
|
|
|
# Convert Tortoise-style postgres:// URL to SQLAlchemy-style postgresql+psycopg://
|
|
|
|
|
_db_url = os.getenv(
|
|
|
|
@@ -103,13 +115,43 @@ async def fetch_documents_from_paperless_ngx() -> list[Document]:
|
|
|
|
|
return documents
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _sanitize_text(text_content: str) -> str:
|
|
|
|
|
"""Strip non-printable and invalid characters that break embedding tokenizers."""
|
|
|
|
|
# Remove null bytes and control characters (keep newlines and tabs)
|
|
|
|
|
text_content = re.sub(r"[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\x9f]", "", text_content)
|
|
|
|
|
# Remove Unicode surrogates and other problematic Unicode
|
|
|
|
|
text_content = re.sub(r"[\ud800-\udfff\ufffe\uffff]", "", text_content)
|
|
|
|
|
# Remove replacement character clusters
|
|
|
|
|
text_content = text_content.replace("\ufffd", "")
|
|
|
|
|
# Collapse excessive whitespace
|
|
|
|
|
text_content = re.sub(r" {3,}", " ", text_content)
|
|
|
|
|
return text_content.strip()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _sanitize_documents(documents: list[Document]) -> list[Document]:
|
|
|
|
|
"""Sanitize page_content of all documents for embedding compatibility."""
|
|
|
|
|
for doc in documents:
|
|
|
|
|
doc.page_content = _sanitize_text(doc.page_content)
|
|
|
|
|
return [doc for doc in documents if doc.page_content]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def index_documents():
|
|
|
|
|
"""Index Paperless-NGX documents into vector store."""
|
|
|
|
|
documents = await fetch_documents_from_paperless_ngx()
|
|
|
|
|
|
|
|
|
|
splits = text_splitter.split_documents(documents)
|
|
|
|
|
splits = _sanitize_documents(splits)
|
|
|
|
|
logger.info(f"Indexing {len(splits)} chunks from {len(documents)} documents")
|
|
|
|
|
vector_store = _get_vector_store()
|
|
|
|
|
await vector_store.aadd_documents(documents=splits)
|
|
|
|
|
for i, split in enumerate(splits):
|
|
|
|
|
try:
|
|
|
|
|
await vector_store.aadd_documents(documents=[split])
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.error(
|
|
|
|
|
f"Failed to embed chunk {i} from {split.metadata.get('filename', 'unknown')}: {e}"
|
|
|
|
|
)
|
|
|
|
|
logger.debug(f"Chunk content preview: {split.page_content[:200]!r}")
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def fetch_obsidian_documents() -> list[Document]:
|
|
|
|
@@ -168,8 +210,9 @@ async def index_obsidian_documents():
|
|
|
|
|
# Delete existing obsidian chunks
|
|
|
|
|
delete_documents_by_metadata("source", "obsidian")
|
|
|
|
|
|
|
|
|
|
# Split and index documents
|
|
|
|
|
# Split, sanitize, and index documents
|
|
|
|
|
splits = text_splitter.split_documents(documents)
|
|
|
|
|
splits = _sanitize_documents(splits)
|
|
|
|
|
vector_store = _get_vector_store()
|
|
|
|
|
await vector_store.aadd_documents(documents=splits)
|
|
|
|
|
|
|
|
|
|