| import os |
| from tqdm import tqdm |
| import numpy as np |
| from transformers import ViTModel, ViTFeatureExtractor, ViTImageProcessor |
| from PIL import Image |
| import re |
| from fpdf import FPDF |
| from datetime import datetime |
| import fitz |
| import joblib |
| import json |
|
|
| model = ViTModel.from_pretrained('google/vit-base-patch16-224-in21k') |
| processor = ViTImageProcessor.from_pretrained('google/vit-base-patch16-224-in21k') |
|
|
| def create_pdf(input_text): |
| |
| pdf = FPDF() |
| |
| |
| pdf.add_page() |
| |
| |
| pdf.set_font("Arial", size=10) |
| |
| |
| |
| pdf.multi_cell(0, 5, txt=input_text) |
| |
| |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| file_name = f"temp/PDFs/{timestamp}.pdf" |
| |
| |
| os.makedirs(os.path.dirname(file_name), exist_ok=True) |
| |
| |
| pdf.output(file_name) |
| |
| |
| return file_name |
|
|
| def pdf_to_image(pdf_path, zoom=2.0): |
| |
| pdf_document = fitz.open(pdf_path) |
| |
| |
| image_paths = [] |
| |
| |
| os.makedirs("temp/Images", exist_ok=True) |
| |
| |
| for page_num in range(len(pdf_document)): |
| page = pdf_document.load_page(page_num) |
| |
| |
| mat = fitz.Matrix(zoom, zoom) |
| pix = page.get_pixmap(matrix=mat) |
| |
| image_file = f'temp/Images/{os.path.basename(pdf_path)}_page_{page_num}.png' |
| pix.save(image_file) |
| image_paths.append(image_file) |
| |
| |
| return image_paths |
|
|
| def sanitize_text(text): |
| """ |
| Cleans and standardizes text by keeping only alphanumeric characters and spaces. |
| Args: |
| text (str): Text to sanitize. |
| Returns: |
| str: Sanitized text. |
| """ |
| if isinstance(text, str): |
| |
| text = re.sub(r'[^a-zA-Z0-9\s]', '', text) |
| |
| text = re.sub(r'\s+', ' ', text).strip() |
| return text |
|
|
| def text_to_images(text): |
| text = sanitize_text(text) |
| pdf_path = create_pdf(text) |
| image_paths = pdf_to_image(pdf_path) |
| return image_paths |
|
|
| def documents_to_images(path): |
| document_set = [] |
| for filename in os.listdir(path): |
| file_path = os.path.join(path, filename) |
| if os.path.isfile(file_path): |
| with open(file_path, "r") as f: |
| content = f.read() |
| document_set.append(content) |
| document_image_paths = [] |
| for document in document_set: |
| image_paths = text_to_images(document) |
| document_image_paths.append(image_paths) |
| return document_image_paths |
|
|
| def single_unit_embedding(text): |
| image_paths = text_to_images(text) |
| temp = [] |
| for image_path in image_paths: |
| image = Image.open(image_path) |
| inputs = processor(images=image, return_tensors="pt") |
| outputs = model(**inputs) |
| vector = outputs.last_hidden_state.mean(dim=1).detach().numpy() |
| temp.append(vector) |
| return np.mean(np.array(temp), axis=0) |
|
|
| def single_image_embedding(image): |
| inputs = processor(images=image, return_tensors="pt") |
| outputs = model(**inputs) |
| vector = outputs.last_hidden_state.mean(dim=1).detach().numpy() |
| return vector |
|
|
| def documents_to_vision_embeddings(documents): |
| document_vision_embeddings = [] |
| for document in tqdm(documents): |
| vector = single_unit_embedding(document) |
| document_vision_embeddings.append(vector) |
| return document_vision_embeddings |
|
|
| def queries_to_vision_embeddings(queries): |
| query_vision_embeddings = [] |
| for query in tqdm(queries): |
| vector = single_unit_embedding(query) |
| query_vision_embeddings.append(vector) |
| return query_vision_embeddings |
|
|
| def get_documents_from_scores(scores): |
| rankings = [] |
| for score in scores: |
| rankings.append(score[0]) |
| return rankings |
|
|
| def cosine_similarity(v1, v2): |
| v1 = np.array(v1) |
| v2 = np.array(v2) |
| if(np.linalg.norm(v1) != 0 and np.linalg.norm(v2) != 0): |
| sim = np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2)) |
| else: |
| sim = 0 |
| return sim |
|
|
| def vision_rankings(query_embedding, document_embeddings, k): |
| |
| scores = [] |
| for idx, embedding in enumerate(document_embeddings): |
| scores.append((idx, cosine_similarity(query_embedding[0], embedding[0]))) |
| scores = sorted(scores, key=lambda x: x[1], reverse=True) |
| scores = scores[:k] |
| rankings = get_documents_from_scores(scores) |
| return rankings, scores |
|
|
|
|
| def vision_pipeline(query, document_embeddings_path="Retrieval/savedModels/document-vision-embeddings.json", ids_path="Retrieval/savedModels/ids.pkl", k=100): |
| |
| ids = joblib.load(ids_path) |
| with open(document_embeddings_path, "r") as f: |
| document_vision_embeddings2 = json.load(f) |
| document_vision_embeddings = [] |
| for embedding in tqdm(document_vision_embeddings2): |
| document_vision_embeddings.append(np.array(embedding)) |
| print("loaded embeddings") |
| query_embedding = single_unit_embedding(query) |
| rankings, scores = vision_rankings(query_embedding, document_vision_embeddings, k) |
| rankings2 = [] |
| for ranking in rankings: |
| rankings2.append(ids[ranking]) |
| return rankings2 |