reorganization
This commit is contained in:
0
utils/__init__.py
Normal file
0
utils/__init__.py
Normal file
142
utils/chunker.py
Normal file
142
utils/chunker.py
Normal file
@@ -0,0 +1,142 @@
|
||||
import os
|
||||
from math import ceil
|
||||
import re
|
||||
from typing import Union
|
||||
from uuid import UUID, uuid4
|
||||
from ollama import Client
|
||||
from chromadb.utils.embedding_functions.openai_embedding_function import (
|
||||
OpenAIEmbeddingFunction,
|
||||
)
|
||||
from dotenv import load_dotenv
|
||||
from llm import LLMClient
|
||||
|
||||
|
||||
load_dotenv()
|
||||
|
||||
ollama_client = Client(
|
||||
host=os.getenv("OLLAMA_HOST", "http://localhost:11434"), timeout=1.0
|
||||
)
|
||||
|
||||
|
||||
def remove_headers_footers(text, header_patterns=None, footer_patterns=None):
|
||||
if header_patterns is None:
|
||||
header_patterns = [r"^.*Header.*$"]
|
||||
if footer_patterns is None:
|
||||
footer_patterns = [r"^.*Footer.*$"]
|
||||
|
||||
for pattern in header_patterns + footer_patterns:
|
||||
text = re.sub(pattern, "", text, flags=re.MULTILINE)
|
||||
|
||||
return text.strip()
|
||||
|
||||
|
||||
def remove_special_characters(text, special_chars=None):
|
||||
if special_chars is None:
|
||||
special_chars = r"[^A-Za-z0-9\s\.,;:\'\"\?\!\-]"
|
||||
|
||||
text = re.sub(special_chars, "", text)
|
||||
return text.strip()
|
||||
|
||||
|
||||
def remove_repeated_substrings(text, pattern=r"\.{2,}"):
|
||||
text = re.sub(pattern, ".", text)
|
||||
return text.strip()
|
||||
|
||||
|
||||
def remove_extra_spaces(text):
|
||||
text = re.sub(r"\n\s*\n", "\n\n", text)
|
||||
text = re.sub(r"\s+", " ", text)
|
||||
|
||||
return text.strip()
|
||||
|
||||
|
||||
def preprocess_text(text):
|
||||
# Remove headers and footers
|
||||
text = remove_headers_footers(text)
|
||||
|
||||
# Remove special characters
|
||||
text = remove_special_characters(text)
|
||||
|
||||
# Remove repeated substrings like dots
|
||||
text = remove_repeated_substrings(text)
|
||||
|
||||
# Remove extra spaces between lines and within lines
|
||||
text = remove_extra_spaces(text)
|
||||
|
||||
# Additional cleaning steps can be added here
|
||||
|
||||
return text.strip()
|
||||
|
||||
|
||||
class Chunk:
|
||||
def __init__(
|
||||
self,
|
||||
text: str,
|
||||
size: int,
|
||||
document_id: UUID,
|
||||
chunk_id: int,
|
||||
embedding,
|
||||
):
|
||||
self.text = text
|
||||
self.size = size
|
||||
self.document_id = document_id
|
||||
self.chunk_id = chunk_id
|
||||
self.embedding = embedding
|
||||
|
||||
|
||||
class Chunker:
|
||||
def __init__(self, collection) -> None:
|
||||
self.collection = collection
|
||||
self.llm_client = LLMClient()
|
||||
|
||||
def embedding_fx(self, inputs):
|
||||
openai_embedding_fx = OpenAIEmbeddingFunction(
|
||||
api_key=os.getenv("OPENAI_API_KEY"),
|
||||
model_name="text-embedding-3-small",
|
||||
)
|
||||
return openai_embedding_fx(inputs)
|
||||
|
||||
def chunk_document(
|
||||
self,
|
||||
document: str,
|
||||
chunk_size: int = 1000,
|
||||
metadata: dict[str, Union[str, float]] = {},
|
||||
) -> list[Chunk]:
|
||||
doc_uuid = uuid4()
|
||||
|
||||
chunk_size = min(chunk_size, len(document)) or 1
|
||||
|
||||
chunks = []
|
||||
num_chunks = ceil(len(document) / chunk_size)
|
||||
document_length = len(document)
|
||||
|
||||
for i in range(num_chunks):
|
||||
curr_pos = i * num_chunks
|
||||
to_pos = (
|
||||
curr_pos + chunk_size
|
||||
if curr_pos + chunk_size < document_length
|
||||
else document_length
|
||||
)
|
||||
text_chunk = self.clean_document(document[curr_pos:to_pos])
|
||||
|
||||
embedding = self.embedding_fx([text_chunk])
|
||||
self.collection.add(
|
||||
ids=[str(doc_uuid) + ":" + str(i)],
|
||||
documents=[text_chunk],
|
||||
embeddings=embedding,
|
||||
metadatas=[metadata],
|
||||
)
|
||||
|
||||
return chunks
|
||||
|
||||
def clean_document(self, document: str) -> str:
|
||||
"""This function will remove information that is noise or already known.
|
||||
|
||||
Example: We already know all the things in here are Simba-related, so we don't need things like
|
||||
"Sumamry of simba's visit"
|
||||
"""
|
||||
|
||||
document = document.replace("\\n", "")
|
||||
document = document.strip()
|
||||
|
||||
return preprocess_text(document)
|
||||
165
utils/cleaner.py
Normal file
165
utils/cleaner.py
Normal file
@@ -0,0 +1,165 @@
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
|
||||
import argparse
|
||||
from dotenv import load_dotenv
|
||||
import ollama
|
||||
from PIL import Image
|
||||
import fitz
|
||||
|
||||
from .request import PaperlessNGXService
|
||||
|
||||
load_dotenv()
|
||||
|
||||
# Configure ollama client with URL from environment or default to localhost
|
||||
ollama_client = ollama.Client(host=os.getenv("OLLAMA_URL", "http://localhost:11434"))
|
||||
|
||||
parser = argparse.ArgumentParser(description="use llm to clean documents")
|
||||
parser.add_argument("document_id", type=str, help="questions about simba's health")
|
||||
|
||||
|
||||
def pdf_to_image(filepath: str, dpi=300) -> list[str]:
|
||||
"""Returns the filepaths to the created images"""
|
||||
image_temp_files = []
|
||||
try:
|
||||
pdf_document = fitz.open(filepath)
|
||||
print(f"\nConverting '{os.path.basename(filepath)}' to temporary images...")
|
||||
|
||||
for page_num in range(len(pdf_document)):
|
||||
page = pdf_document.load_page(page_num)
|
||||
zoom = dpi / 72
|
||||
mat = fitz.Matrix(zoom, zoom)
|
||||
pix = page.get_pixmap(matrix=mat)
|
||||
|
||||
# Create a temporary file for the image. delete=False is crucial.
|
||||
with tempfile.NamedTemporaryFile(
|
||||
delete=False,
|
||||
suffix=".png",
|
||||
prefix=f"pdf_page_{page_num + 1}_",
|
||||
) as temp_image_file:
|
||||
temp_image_path = temp_image_file.name
|
||||
|
||||
# Save the pixel data to the temporary file
|
||||
pix.save(temp_image_path)
|
||||
image_temp_files.append(temp_image_path)
|
||||
print(
|
||||
f" -> Saved page {page_num + 1} to temporary file: '{temp_image_path}'"
|
||||
)
|
||||
|
||||
print("\nConversion successful! ✨")
|
||||
return image_temp_files
|
||||
|
||||
except Exception as e:
|
||||
print(f"An error occurred during PDF conversion: {e}", file=sys.stderr)
|
||||
# Clean up any image files that were created before the error
|
||||
for path in image_temp_files:
|
||||
os.remove(path)
|
||||
return []
|
||||
|
||||
|
||||
def merge_images_vertically_to_tempfile(image_paths):
|
||||
"""
|
||||
Merges a list of images vertically and saves the result to a temporary file.
|
||||
|
||||
Args:
|
||||
image_paths (list): A list of strings, where each string is the
|
||||
filepath to an image.
|
||||
|
||||
Returns:
|
||||
str: The filepath of the temporary merged image file.
|
||||
"""
|
||||
if not image_paths:
|
||||
print("Error: The list of image paths is empty.")
|
||||
return None
|
||||
|
||||
# Open all images and check for consistency
|
||||
try:
|
||||
images = [Image.open(path) for path in image_paths]
|
||||
except FileNotFoundError as e:
|
||||
print(f"Error: Could not find image file: {e}")
|
||||
return None
|
||||
|
||||
widths, heights = zip(*(img.size for img in images))
|
||||
max_width = max(widths)
|
||||
|
||||
# All images must have the same width
|
||||
if not all(width == max_width for width in widths):
|
||||
print("Warning: Images have different widths. They will be resized.")
|
||||
resized_images = []
|
||||
for img in images:
|
||||
if img.size[0] != max_width:
|
||||
img = img.resize(
|
||||
(max_width, int(img.size[1] * (max_width / img.size[0])))
|
||||
)
|
||||
resized_images.append(img)
|
||||
images = resized_images
|
||||
heights = [img.size[1] for img in images]
|
||||
|
||||
# Calculate the total height of the merged image
|
||||
total_height = sum(heights)
|
||||
|
||||
# Create a new blank image with the combined dimensions
|
||||
merged_image = Image.new("RGB", (max_width, total_height))
|
||||
|
||||
# Paste each image onto the new blank image
|
||||
y_offset = 0
|
||||
for img in images:
|
||||
merged_image.paste(img, (0, y_offset))
|
||||
y_offset += img.height
|
||||
|
||||
# Create a temporary file and save the image
|
||||
temp_file = tempfile.NamedTemporaryFile(suffix=".png", delete=False)
|
||||
temp_path = temp_file.name
|
||||
merged_image.save(temp_path)
|
||||
temp_file.close()
|
||||
|
||||
print(f"Successfully merged {len(images)} images into temporary file: {temp_path}")
|
||||
return temp_path
|
||||
|
||||
|
||||
OCR_PROMPT = """
|
||||
You job is to extract text from the images I provide you. Extract every bit of the text in the image. Don't say anything just do your job. Text should be same as in the images. If there are multiple images, categorize the transcriptions by page.
|
||||
|
||||
Things to avoid:
|
||||
- Don't miss anything to extract from the images
|
||||
|
||||
Things to include:
|
||||
- Include everything, even anything inside [], (), {} or anything.
|
||||
- Include any repetitive things like "..." or anything
|
||||
- If you think there is any mistake in image just include it too
|
||||
|
||||
Someone will kill the innocent kittens if you don't extract the text exactly. So, make sure you extract every bit of the text. Only output the extracted text.
|
||||
"""
|
||||
|
||||
|
||||
def summarize_pdf_image(filepaths: list[str]):
|
||||
res = ollama_client.chat(
|
||||
model="gemma3:4b",
|
||||
messages=[
|
||||
{
|
||||
"role": "user",
|
||||
"content": OCR_PROMPT,
|
||||
"images": filepaths,
|
||||
}
|
||||
],
|
||||
)
|
||||
|
||||
return res["message"]["content"]
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
args = parser.parse_args()
|
||||
ppngx = PaperlessNGXService()
|
||||
|
||||
if args.document_id:
|
||||
doc_id = args.document_id
|
||||
file = ppngx.get_doc_by_id(doc_id=doc_id)
|
||||
pdf_path = ppngx.download_pdf_from_id(doc_id)
|
||||
print(pdf_path)
|
||||
image_paths = pdf_to_image(filepath=pdf_path)
|
||||
summary = summarize_pdf_image(filepaths=image_paths)
|
||||
print(summary)
|
||||
file["content"] = summary
|
||||
print(file)
|
||||
ppngx.upload_cleaned_content(doc_id, file)
|
||||
83
utils/image_process.py
Normal file
83
utils/image_process.py
Normal file
@@ -0,0 +1,83 @@
|
||||
from ollama import Client
|
||||
import argparse
|
||||
import os
|
||||
import logging
|
||||
from PIL import Image, ExifTags
|
||||
from pillow_heif import register_heif_opener
|
||||
from pydantic import BaseModel
|
||||
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv()
|
||||
|
||||
register_heif_opener()
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
prog="SimbaImageProcessor",
|
||||
description="What the program does",
|
||||
epilog="Text at the bottom of help",
|
||||
)
|
||||
|
||||
parser.add_argument("filepath")
|
||||
|
||||
client = Client(host=os.getenv("OLLAMA_HOST", "http://localhost:11434"))
|
||||
|
||||
|
||||
class SimbaImageDescription(BaseModel):
|
||||
image_date: str
|
||||
description: str
|
||||
|
||||
|
||||
def describe_simba_image(input):
|
||||
logging.info("Opening image of Simba ...")
|
||||
if "heic" in input.lower() or "heif" in input.lower():
|
||||
new_filepath = input.split(".")[0] + ".jpg"
|
||||
img = Image.open(input)
|
||||
img.save(new_filepath, "JPEG")
|
||||
logging.info("Extracting EXIF...")
|
||||
exif = {
|
||||
ExifTags.TAGS[k]: v for k, v in img.getexif().items() if k in ExifTags.TAGS
|
||||
}
|
||||
img = Image.open(new_filepath)
|
||||
input = new_filepath
|
||||
else:
|
||||
img = Image.open(input)
|
||||
|
||||
logging.info("Extracting EXIF...")
|
||||
exif = {
|
||||
ExifTags.TAGS[k]: v for k, v in img.getexif().items() if k in ExifTags.TAGS
|
||||
}
|
||||
|
||||
if "MakerNote" in exif:
|
||||
exif.pop("MakerNote")
|
||||
|
||||
logging.info(exif)
|
||||
|
||||
prompt = f"Simba is an orange cat belonging to Ryan Chen. In 2025, they lived in New York. In 2024, they lived in California. Analyze the following image and tell me what Simba seems to be doing. Be extremely descriptive about Simba, things in the background, and the setting of the image. I will also include the EXIF data of the image, please use it to help you determine information about Simba. EXIF: {exif}. Put the notes in the description field and the date in the image_date field."
|
||||
|
||||
logging.info("Sending info to Ollama ...")
|
||||
response = client.chat(
|
||||
model="gemma3:4b",
|
||||
messages=[
|
||||
{
|
||||
"role": "system",
|
||||
"content": "you are a very shrewd and descriptive note taker. all of your responses will be formatted like notes in bullet points. be very descriptive. do not leave a single thing out.",
|
||||
},
|
||||
{"role": "user", "content": prompt, "images": [input]},
|
||||
],
|
||||
format=SimbaImageDescription.model_json_schema(),
|
||||
)
|
||||
|
||||
result = SimbaImageDescription.model_validate_json(response["message"]["content"])
|
||||
|
||||
return result
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
args = parser.parse_args()
|
||||
if args.filepath:
|
||||
logging.info
|
||||
describe_simba_image(input=args.filepath)
|
||||
86
utils/request.py
Normal file
86
utils/request.py
Normal file
@@ -0,0 +1,86 @@
|
||||
import os
|
||||
import tempfile
|
||||
import httpx
|
||||
import logging
|
||||
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv()
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
|
||||
class PaperlessNGXService:
|
||||
def __init__(self):
|
||||
self.base_url = os.getenv("BASE_URL")
|
||||
self.token = os.getenv("PAPERLESS_TOKEN")
|
||||
self.url = f"http://{os.getenv('BASE_URL')}/api/documents/?tags__id=8"
|
||||
self.headers = {"Authorization": f"Token {os.getenv('PAPERLESS_TOKEN')}"}
|
||||
|
||||
def get_data(self):
|
||||
print(f"Getting data from: {self.url}")
|
||||
r = httpx.get(self.url, headers=self.headers)
|
||||
results = r.json()["results"]
|
||||
|
||||
nextLink = r.json().get("next")
|
||||
|
||||
while nextLink:
|
||||
r = httpx.get(nextLink, headers=self.headers)
|
||||
results += r.json()["results"]
|
||||
nextLink = r.json().get("next")
|
||||
|
||||
return results
|
||||
|
||||
def get_doc_by_id(self, doc_id: int):
|
||||
url = f"http://{os.getenv('BASE_URL')}/api/documents/{doc_id}/"
|
||||
r = httpx.get(url, headers=self.headers)
|
||||
return r.json()
|
||||
|
||||
def download_pdf_from_id(self, id: int) -> str:
|
||||
download_url = f"http://{os.getenv('BASE_URL')}/api/documents/{id}/download/"
|
||||
response = httpx.get(
|
||||
download_url, headers=self.headers, follow_redirects=True, timeout=30
|
||||
)
|
||||
response.raise_for_status()
|
||||
# Use a temporary file for the downloaded PDF
|
||||
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".pdf")
|
||||
temp_file.write(response.content)
|
||||
temp_file.close()
|
||||
temp_pdf_path = temp_file.name
|
||||
pdf_to_process = temp_pdf_path
|
||||
return pdf_to_process
|
||||
|
||||
def upload_cleaned_content(self, document_id, data):
|
||||
PUTS_URL = f"http://{os.getenv('BASE_URL')}/api/documents/{document_id}/"
|
||||
r = httpx.put(PUTS_URL, headers=self.headers, data=data)
|
||||
r.raise_for_status()
|
||||
|
||||
def upload_description(self, description_filepath, file, title, exif_date: str):
|
||||
POST_URL = f"http://{os.getenv('BASE_URL')}/api/documents/post_document/"
|
||||
files = {"document": ("description_filepath", file, "application/txt")}
|
||||
data = {
|
||||
"title": title,
|
||||
"create": exif_date,
|
||||
"document_type": 3,
|
||||
"tags": [7],
|
||||
}
|
||||
|
||||
r = httpx.post(POST_URL, headers=self.headers, data=data, files=files)
|
||||
r.raise_for_status()
|
||||
|
||||
def get_tags(self):
|
||||
GET_URL = f"http://{os.getenv('BASE_URL')}/api/tags/"
|
||||
r = httpx.get(GET_URL, headers=self.headers)
|
||||
data = r.json()
|
||||
return {tag["id"]: tag["name"] for tag in data["results"]}
|
||||
|
||||
def get_doctypes(self):
|
||||
GET_URL = f"http://{os.getenv('BASE_URL')}/api/document_types/"
|
||||
r = httpx.get(GET_URL, headers=self.headers)
|
||||
data = r.json()
|
||||
return {doctype["id"]: doctype["name"] for doctype in data["results"]}
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pp = PaperlessNGXService()
|
||||
pp.get_data()
|
||||
Reference in New Issue
Block a user