| | """ |
| | Based upon ImageCaptionLoader in LangChain version: langchain/document_loaders/image_captions.py |
| | But accepts preloaded model to avoid slowness in use and CUDA forking issues |
| | |
| | Loader that uses H2O DocTR OCR models to extract text from images |
| | |
| | """ |
| | from typing import List, Union, Any, Tuple, Optional |
| |
|
| | import requests |
| | import torch |
| | from langchain.docstore.document import Document |
| | from langchain.document_loaders import ImageCaptionLoader |
| | import numpy as np |
| | from utils import get_device, clear_torch_cache, NullContext |
| | from doctr.utils.common_types import AbstractFile |
| |
|
| |
|
| | class H2OOCRLoader(ImageCaptionLoader): |
| | """Loader that extracts text from images""" |
| |
|
| | def __init__(self, path_images: Union[str, List[str]] = None, layout_aware=False, gpu_id=None): |
| | super().__init__(path_images) |
| | self._ocr_model = None |
| | self.layout_aware = layout_aware |
| | self.gpu_id = gpu_id if isinstance(gpu_id, int) and gpu_id >= 0 else 0 |
| |
|
| | self.device = 'cpu' |
| | |
| | self.set_context() |
| |
|
| | def set_context(self): |
| | if get_device() == 'cuda': |
| | import torch |
| | n_gpus = torch.cuda.device_count() if torch.cuda.is_available() else 0 |
| | if n_gpus > 0: |
| | self.context_class = torch.device |
| | if self.gpu_id is not None: |
| | self.device = "cuda:%d" % self.gpu_id |
| | else: |
| | self.device = 'cuda' |
| | else: |
| | self.device = 'cpu' |
| | else: |
| | self.device = 'cpu' |
| |
|
| | def load_model(self): |
| | try: |
| | from weasyprint import HTML |
| | from doctr.models.zoo import ocr_predictor |
| | except ImportError: |
| | raise ValueError( |
| | "`doctr` package not found, please install with " |
| | "`pip install git+https://github.com/h2oai/doctr.git`." |
| | ) |
| | if self._ocr_model: |
| | self._ocr_model = self._ocr_model.to(self.device) |
| | return self |
| | self.set_context() |
| | self._ocr_model = ocr_predictor(det_arch="db_resnet50", reco_arch="crnn_efficientnetv2_mV2", |
| | pretrained=True).to(self.device) |
| | return self |
| |
|
| | def unload_model(self): |
| | if self._ocr_model and hasattr(self._ocr_model.det_predictor.model, 'cpu'): |
| | self._ocr_model.det_predictor.model.cpu() |
| | clear_torch_cache() |
| | if self._ocr_model and hasattr(self._ocr_model.reco_predictor.model, 'cpu'): |
| | self._ocr_model.reco_predictor.model.cpu() |
| | clear_torch_cache() |
| | if self._ocr_model and hasattr(self._ocr_model, 'cpu'): |
| | self._ocr_model.cpu() |
| | clear_torch_cache() |
| |
|
| | def set_document_paths(self, document_paths: Union[str, List[str]]): |
| | """ |
| | Load from a list of image files |
| | """ |
| | if isinstance(document_paths, str): |
| | self.document_paths = [document_paths] |
| | else: |
| | self.document_paths = document_paths |
| |
|
| | def load(self, prompt=None) -> List[Document]: |
| | if self._ocr_model is None: |
| | self.load_model() |
| | context_class = torch.cuda.device(self.gpu_id) if 'cuda' in str(self.device) else NullContext |
| | results = [] |
| | with context_class: |
| | for document_path in self.document_paths: |
| | caption, metadata = self._get_captions_and_metadata( |
| | model=self._ocr_model, document_path=document_path |
| | ) |
| | doc = Document(page_content=" \n".join(caption), metadata=metadata) |
| | results.append(doc) |
| |
|
| | return results |
| |
|
| | @staticmethod |
| | def pad_resize_image(image): |
| | import cv2 |
| |
|
| | L = 1024 |
| | H = 1024 |
| |
|
| | |
| | Li, Hi = image.shape[1], image.shape[0] |
| |
|
| | |
| | aspect_ratio_original = Li / Hi |
| | aspect_ratio_final = L / H |
| |
|
| | |
| | if Li < L and Hi < H: |
| | |
| | padding_x = (L - Li) // 2 |
| | padding_y = (H - Hi) // 2 |
| | image = cv2.copyMakeBorder(image, padding_y, padding_y, padding_x, padding_x, cv2.BORDER_CONSTANT, value=[0, 0, 0]) |
| | elif Li > L and Hi > H: |
| | |
| | if aspect_ratio_original < aspect_ratio_final: |
| | |
| | new_height = H |
| | new_width = int(H * aspect_ratio_original) |
| | else: |
| | |
| | new_width = L |
| | new_height = int(L / aspect_ratio_original) |
| | image = cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_AREA) |
| | else: |
| | |
| | if aspect_ratio_original < aspect_ratio_final: |
| | |
| | new_height = H |
| | new_width = int(H * aspect_ratio_original) |
| | else: |
| | |
| | new_width = L |
| | new_height = int(L / aspect_ratio_original) |
| | image = cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_AREA) |
| | padding_x = (L - new_width) // 2 |
| | padding_y = (H - new_height) // 2 |
| | image = cv2.copyMakeBorder(image, padding_y, padding_y, padding_x, padding_x, cv2.BORDER_CONSTANT, value=[0, 0, 0]) |
| | return image |
| |
|
| | def _get_captions_and_metadata( |
| | self, model: Any, document_path: str) -> Tuple[list, dict]: |
| | """ |
| | Helper function for getting the captions and metadata of an image |
| | """ |
| | try: |
| | from doctr.io import DocumentFile |
| | except ImportError: |
| | raise ValueError( |
| | "`doctr` package not found, please install with " |
| | "`pip install git+https://github.com/h2oai/doctr.git`." |
| | ) |
| | try: |
| | if document_path.lower().endswith(".pdf"): |
| | |
| | images = read_pdf(document_path) |
| | else: |
| | images = DocumentFile.from_images(document_path) |
| | except Exception: |
| | raise ValueError(f"Could not get image data for {document_path}") |
| | document_words = [] |
| | shapes = [] |
| | for image in images: |
| | shape0 = str(image.shape) |
| | image = self.pad_resize_image(image) |
| | |
| | |
| | |
| | shape1 = str(image.shape) |
| |
|
| | ocr_output = model([image]) |
| | page_words = [] |
| | page_boxes = [] |
| | for block_num, block in enumerate(ocr_output.pages[0].blocks): |
| | for line_num, line in enumerate(block.lines): |
| | for word_num, word in enumerate(line.words): |
| | if not (word.value or "").strip(): |
| | continue |
| | page_words.append(word.value) |
| | page_boxes.append( |
| | [word.geometry[0][0], word.geometry[0][1], word.geometry[1][0], word.geometry[1][1]]) |
| | if self.layout_aware: |
| | ids = boxes_sort(page_boxes) |
| | texts = [page_words[i] for i in ids] |
| | text_boxes = [page_boxes[i] for i in ids] |
| | page_words = space_layout(texts=texts, boxes=text_boxes) |
| | else: |
| | page_words = " ".join(page_words) |
| | document_words.append(page_words) |
| | shapes.append(dict(shape0=shape0, shape1=shape1)) |
| | metadata: dict = {"image_path": document_path, 'shape': str(shapes)} |
| | return document_words, metadata |
| |
|
| |
|
| | def boxes_sort(boxes): |
| | """ From left top to right bottom |
| | Params: |
| | boxes: [[x1, y1, x2, y2], [x1, y1, x2, y2], ...] |
| | """ |
| | sorted_id = sorted(range(len(boxes)), key=lambda x: (boxes[x][1])) |
| |
|
| | |
| |
|
| | return sorted_id |
| |
|
| |
|
| | def is_same_line(box1, box2): |
| | """ |
| | Params: |
| | box1: [x1, y1, x2, y2] |
| | box2: [x1, y1, x2, y2] |
| | """ |
| |
|
| | box1_midy = (box1[1] + box1[3]) / 2 |
| | box2_midy = (box2[1] + box2[3]) / 2 |
| |
|
| | if box1_midy < box2[3] and box1_midy > box2[1] and box2_midy < box1[3] and box2_midy > box1[1]: |
| | return True |
| | else: |
| | return False |
| |
|
| |
|
| | def union_box(box1, box2): |
| | """ |
| | Params: |
| | box1: [x1, y1, x2, y2] |
| | box2: [x1, y1, x2, y2] |
| | """ |
| | x1 = min(box1[0], box2[0]) |
| | y1 = min(box1[1], box2[1]) |
| | x2 = max(box1[2], box2[2]) |
| | y2 = max(box1[3], box2[3]) |
| |
|
| | return [x1, y1, x2, y2] |
| |
|
| |
|
| | def space_layout(texts, boxes, threshold_show_spaces=8, threshold_char_width=0.02): |
| | line_boxes = [] |
| | line_texts = [] |
| | max_line_char_num = 0 |
| | line_width = 0 |
| | |
| | boxes = np.array(boxes) |
| | texts = np.array(texts) |
| | while len(boxes) > 0: |
| | box = boxes[0] |
| | mid = (boxes[:, 3] + boxes[:, 1]) / 2 |
| | inline_boxes = np.logical_and(mid > box[1], mid < box[3]) |
| | sorted_xs = np.argsort(boxes[inline_boxes][:, 0], axis=0) |
| | line_box = boxes[inline_boxes][sorted_xs] |
| | line_text = texts[inline_boxes][sorted_xs] |
| | boxes = boxes[~inline_boxes] |
| | texts = texts[~inline_boxes] |
| |
|
| | line_boxes.append(line_box.tolist()) |
| | line_texts.append(line_text.tolist()) |
| | if len(" ".join(line_texts[-1])) > max_line_char_num: |
| | max_line_char_num = len(" ".join(line_texts[-1])) |
| | line_width = np.array(line_boxes[-1]) |
| | line_width = line_width[:, 2].max() - line_width[:, 0].min() |
| |
|
| | char_width = (line_width / max_line_char_num) if max_line_char_num > 0 else 0 |
| | if threshold_char_width == 0.0: |
| | if char_width == 0: |
| | char_width = 1 |
| | else: |
| | if char_width <= 0.02: |
| | char_width = 0.02 |
| |
|
| | space_line_texts = [] |
| | for i, line_box in enumerate(line_boxes): |
| | space_line_text = "" |
| | for j, box in enumerate(line_box): |
| | left_char_num = int(box[0] / char_width) |
| | left_char_num = max((left_char_num - len(space_line_text)), 1) |
| |
|
| | |
| | |
| |
|
| | |
| | if left_char_num > threshold_show_spaces: |
| | space_line_text += f" <{left_char_num}> " |
| | else: |
| | space_line_text += " " |
| |
|
| | space_line_text += line_texts[i][j] |
| | space_line_texts.append(space_line_text + "\n") |
| |
|
| | return "".join(space_line_texts) |
| |
|
| |
|
| | def read_pdf( |
| | file: AbstractFile, |
| | scale: float = 300 / 72, |
| | rgb_mode: bool = True, |
| | password: Optional[str] = None, |
| | **kwargs: Any, |
| | ) -> List[np.ndarray]: |
| | """Read a PDF file and convert it into an image in numpy format |
| | |
| | >>> from doctr.documents import read_pdf |
| | >>> doc = read_pdf("path/to/your/doc.pdf") |
| | |
| | Args: |
| | file: the path to the PDF file |
| | scale: rendering scale (1 corresponds to 72dpi) |
| | rgb_mode: if True, the output will be RGB, otherwise BGR |
| | password: a password to unlock the document, if encrypted |
| | kwargs: additional parameters to :meth:`pypdfium2.PdfPage.render` |
| | |
| | Returns: |
| | the list of pages decoded as numpy ndarray of shape H x W x C |
| | """ |
| |
|
| | |
| | import pypdfium2 as pdfium |
| | pdf = pdfium.PdfDocument(file, password=password, autoclose=True) |
| | return [page.render(scale=scale, rev_byteorder=rgb_mode, **kwargs).to_numpy() for page in pdf] |
| |
|