| """ |
| File: ocr.py |
| Description: Optical Character Recognition (OCR) using software 2.0 models |
| Author: Didier Guillevic |
| Date: 2025-04-06 |
| """ |
|
|
| import os |
| os.system("bash setup.sh") |
| import magic |
| import vlm |
|
|
| import uuid |
| import shutil |
| import threading |
| import time |
| import pathlib |
|
|
| import pdf2image |
| from pdf2image.exceptions import PDFPageCountError, PDFSyntaxError |
| import pypdf |
| import base64 |
| from contextlib import contextmanager |
| from typing import List, Optional, Tuple, Union |
|
|
| import logging |
|
|
| class PDFScannerTempManager: |
| """ |
| Manages temporary directory creation and cleanup for PDF scanning operations. |
| """ |
| |
| def __init__(self, base_temp_dir: str = 'tmp'): |
| """ |
| Initialize temporary directory manager. |
| |
| Args: |
| base_temp_dir (str): Base directory for temporary files |
| """ |
| self.base_temp_dir = base_temp_dir |
| self.active_temp_dirs: list[str] = [] |
| |
| |
| os.makedirs(base_temp_dir, exist_ok=True) |
| |
| |
| logging.basicConfig(level=logging.INFO) |
| self.logger = logging.getLogger(__name__) |
| |
| @contextmanager |
| def temp_directory(self) -> str: |
| """ |
| Create a temporary directory with UUID and manage its lifecycle. |
| |
| Yields: |
| str: Path to the temporary directory |
| """ |
| |
| dir_uuid = str(uuid.uuid4()) |
| temp_dir = os.path.join(self.base_temp_dir, dir_uuid) |
| |
| try: |
| |
| os.makedirs(temp_dir, exist_ok=False) |
| self.active_temp_dirs.append(temp_dir) |
| |
| |
| yield temp_dir |
| |
| finally: |
| |
| self._cleanup_directory(temp_dir) |
| |
| def _cleanup_directory(self, directory: str) -> None: |
| """ |
| Safely remove a temporary directory. |
| |
| Args: |
| directory (str): Path to directory to remove |
| """ |
| try: |
| if os.path.exists(directory): |
| shutil.rmtree(directory) |
| |
| |
| if directory in self.active_temp_dirs: |
| self.active_temp_dirs.remove(directory) |
| |
| except Exception as e: |
| self.logger.error(f"Error cleaning up directory {directory}: {e}") |
| |
| def cleanup_all(self) -> None: |
| """ |
| Clean up all temporary directories created during the session. |
| """ |
| for directory in list(self.active_temp_dirs): |
| self._cleanup_directory(directory) |
|
|
|
|
| class PDFScanner: |
| """ |
| A class to perform OCR on PDF files with robust temp management. |
| """ |
| |
| def __init__(self, |
| dpi: int = 300, |
| temp_manager: Optional[PDFScannerTempManager] = None |
| ): |
| """ |
| Initialize the PDFScanner. |
| |
| Args: |
| dpi (int): DPI for PDF conversion |
| temp_manager (PDFScannerTempManager, optional): Temp directory manager |
| """ |
| self.dpi = dpi |
| self.temp_manager = temp_manager or PDFScannerTempManager() |
| self.logger = logging.getLogger(__name__) |
| |
| def _validate_pdf(self, pdf_path: str) -> Tuple[bool, str, bool]: |
| """ |
| Validate PDF file and check for encryption. |
| |
| Returns: |
| Tuple[bool, str, bool]: (is_valid, error_message, is_encrypted) |
| """ |
| try: |
| with open(pdf_path, 'rb') as file: |
| |
| if not file.read(4) == b'%PDF': |
| return False, "Not a valid PDF file (missing PDF signature)", False |
| |
| |
| file.seek(0) |
| |
| try: |
| pdf_reader = pypdf.PdfReader(file, strict=False) |
| is_encrypted = pdf_reader.is_encrypted |
| |
| if is_encrypted: |
| return False, "PDF is encrypted and requires password", True |
| |
| num_pages = len(pdf_reader.pages) |
| return True, f"Valid PDF with {num_pages} pages", False |
| |
| except pypdf.errors.PdfReadError as e: |
| return False, f"Invalid PDF structure: {str(e)}", False |
| |
| except Exception as e: |
| return False, f"Error validating PDF: {str(e)}", False |
| |
| def _repair_pdf(self, pdf_path: str, temp_dir: str) -> str: |
| """ |
| Attempt to repair a corrupted PDF file. |
| |
| Args: |
| pdf_path (str): Path to original PDF |
| temp_dir (str): Temporary directory for repair |
| |
| Returns: |
| str: Path to repaired PDF |
| """ |
| repaired_pdf = os.path.join(temp_dir, 'repaired.pdf') |
| |
| try: |
| |
| with open(pdf_path, 'rb') as file: |
| reader = pypdf.PdfReader(file, strict=False) |
| writer = pypdf.PdfWriter() |
| |
| for page in reader.pages: |
| writer.add_page(page) |
| |
| with open(repaired_pdf, 'wb') as output_file: |
| writer.write(output_file) |
| |
| if os.path.exists(repaired_pdf): |
| return repaired_pdf |
| |
| except Exception as e: |
| self.logger.warning(f"pypdf repair failed: {str(e)}") |
| |
| |
| try: |
| gs_command = [ |
| 'gs', |
| '-o', repaired_pdf, |
| '-sDEVICE=pdfwrite', |
| '-dPDFSETTINGS=/prepress', |
| pdf_path |
| ] |
| |
| process = subprocess.run( |
| gs_command, |
| capture_output=True, |
| text=True |
| ) |
| |
| if process.returncode == 0 and os.path.exists(repaired_pdf): |
| return repaired_pdf |
| else: |
| raise Exception(f"Ghostscript repair failed: {process.stderr}") |
| |
| except Exception as e: |
| self.logger.error(f"PDF repair failed: {str(e)}") |
| raise |
| |
| def _process_images( |
| self, |
| images: list, |
| temp_dir: str, |
| language: str |
| ) -> list[str]: |
| """Helper method to process converted images.""" |
| extracted_text = [] |
|
|
| for i, image in enumerate(images): |
| image_path = os.path.join(temp_dir, f'page_{i+1}.png') |
| try: |
| |
| image.save(image_path, 'PNG', quality=100) |
| |
| |
| text = process_image_file(image_path) |
| extracted_text.append(text) |
| |
| except Exception as e: |
| self.logger.error(f"Error processing page {i+1}: {str(e)}") |
| extracted_text.append(f"[ERROR ON PAGE {i+1}]") |
| |
| return extracted_text |
|
|
| def pdf_to_text( |
| self, |
| pdf_path: str, |
| language: str = 'eng', |
| first_page: Optional[int] = None, |
| last_page: Optional[int] = None, |
| attempt_repair: bool = True |
| ) -> list[str]: |
| """ |
| Convert a PDF file to text using OCR with robust error handling. |
| |
| Args: |
| pdf_path (str): Path to the PDF file |
| language (str): Language for OCR (default: 'eng') |
| first_page (int, optional): First page to process (1-based) |
| last_page (int, optional): Last page to process |
| attempt_repair (bool): Whether to attempt repairing corrupted PDFs |
| |
| Returns: |
| list[str]: List of extracted text for each page |
| """ |
| if not os.path.exists(pdf_path): |
| raise FileNotFoundError(f"PDF file not found: {pdf_path}") |
| |
| |
| with self.temp_manager.temp_directory() as temp_dir: |
| |
| is_valid, error_message, is_encrypted = self._validate_pdf(pdf_path) |
| if not is_valid: |
| self.logger.warning(f"PDF validation issue: {error_message}") |
| |
| if is_encrypted: |
| raise Exception("Cannot process encrypted PDF files") |
| |
| if attempt_repair: |
| try: |
| pdf_path = self._repair_pdf(pdf_path, temp_dir) |
| self.logger.info("Using repaired PDF file") |
| except Exception as e: |
| self.logger.error(f"Repair failed: {str(e)}") |
| |
| |
| conversion_methods = [ |
| {'use_pdftocairo': True, 'strict': False}, |
| {'use_pdftocairo': False, 'strict': False}, |
| {'use_pdftocairo': True, 'strict': False, 'dpi': self.dpi * 2}, |
| {'use_pdftocairo': False, 'strict': False, 'dpi': self.dpi * 3} |
| ] |
| |
| last_error = None |
| for method in conversion_methods: |
| try: |
| self.logger.info(f"Trying conversion method: {method}") |
| images = pdf2image.convert_from_path( |
| pdf_path, |
| dpi=method.get('dpi', self.dpi), |
| first_page=first_page, |
| last_page=last_page, |
| thread_count=4, |
| grayscale=True, |
| **{k: v for k, v in method.items() if k != 'dpi'} |
| ) |
| |
| if images: |
| return self._process_images(images, temp_dir, language) |
| |
| except Exception as e: |
| last_error = e |
| self.logger.warning(f"Method failed: {str(e)}") |
| continue |
| |
| if last_error: |
| raise Exception(f"All conversion methods failed. Last error: {str(last_error)}") |
|
|
| |
| |
| |
| pdf_scanner = PDFScanner() |
|
|
|
|
| |
| |
| |
| def process_file(input_file: str): |
| """Process given file with OCR" |
| """ |
| file_type = get_file_type(input_file) |
|
|
| if file_type == "Image": |
| return process_image_file(input_file) |
| elif file_type == "PDF": |
| return process_pdf_file(input_file) |
| else: |
| return "Unsupported file type. Please upload a PDF, or an image file." |
|
|
|
|
| def process_image_file(input_file: str): |
| """Process image file with OCR |
| """ |
| messages = [ |
| { |
| "role": "user", |
| "content": [ |
| { |
| "type": "text", |
| "text": ( |
| |
| |
| "Could you perform optical character recognition (OCR) on the image? " |
| "Simply return the text without any additional comments. " |
| "The exception would be if the image represents an ID card. " |
| "In such a case, please return the information in a structured format. " |
| ) |
| }, |
| { |
| "type": "image_url", |
| "image_url": f"data:image/jpeg;base64,{encode_image(input_file)}" |
| } |
| ] |
| } |
| ] |
| return vlm.get_response(messages) |
|
|
|
|
| def process_pdf_file(input_file: str): |
| """Process PDF file with OCR |
| |
| Args: |
| input_file: the PDF file to process with OCR |
| |
| Returns: |
| the text OCR result |
| |
| Note: |
| Each page of the PDF is processed as an image. |
| """ |
| texts = pdf_scanner.pdf_to_text(pdf_path=input_file.name) |
| output_text = '\n\n'.join(texts) |
| return output_text |
|
|
|
|
| |
| |
| |
| def get_file_type(file_path): |
| |
| file_extension = os.path.splitext(file_path)[1].lower() |
|
|
| |
| mime = magic.Magic(mime=True) |
| mime_type = mime.from_file(file_path) |
|
|
| |
| if file_extension == '.pdf' or mime_type == 'application/pdf': |
| return 'PDF' |
| elif file_extension in ['.jpg', '.jpeg', '.png', '.gif'] or mime_type.startswith('image/'): |
| return 'Image' |
| elif file_extension == '.pptx' or mime_type == 'application/vnd.openxmlformats-officedocument.presentationml.presentation': |
| return 'PowerPoint' |
| else: |
| return 'Other' |
|
|
| |
| |
| |
| def encode_image(image_path): |
| """Encode the image to base64.""" |
| try: |
| with open(image_path, "rb") as image_file: |
| return base64.b64encode(image_file.read()).decode('utf-8') |
| except FileNotFoundError: |
| print(f"Error: The file {image_path} was not found.") |
| return None |
| except Exception as e: |
| print(f"Error: {e}") |
| return None |
|
|