diff --git a/client/src/nv_ingest_client/nv_ingest_cli.py b/client/src/nv_ingest_client/nv_ingest_cli.py
index f3cb0b9a..b1d40f7f 100644
--- a/client/src/nv_ingest_client/nv_ingest_cli.py
+++ b/client/src/nv_ingest_client/nv_ingest_cli.py
@@ -116,7 +116,7 @@
Example:
--task 'split:{"split_by":"page", "split_length":10}'
--task 'extract:{"document_type":"pdf", "extract_text":true}'
- --task 'extract:{"document_type":"pdf", "extract_method":"doughnut"}'
+ --task 'extract:{"document_type":"pdf", "extract_method":"nemoretriever_parse"}'
--task 'extract:{"document_type":"pdf", "extract_method":"unstructured_io"}'
--task 'extract:{"document_type":"docx", "extract_text":true, "extract_images":true}'
--task 'store:{"content_type":"image", "store_method":"minio", "endpoint":"minio:9000"}'
diff --git a/client/src/nv_ingest_client/primitives/tasks/extract.py b/client/src/nv_ingest_client/primitives/tasks/extract.py
index 6d3722f5..78f065c7 100644
--- a/client/src/nv_ingest_client/primitives/tasks/extract.py
+++ b/client/src/nv_ingest_client/primitives/tasks/extract.py
@@ -19,10 +19,6 @@
logger = logging.getLogger(__name__)
-DOUGHNUT_TRITON_HOST = os.environ.get("DOUGHNUT_TRITON_HOST", "localhost")
-DOUGHNUT_TRITON_PORT = os.environ.get("DOUGHNUT_TRITON_PORT", "8001")
-DOUGHNUT_BATCH_SIZE = os.environ.get("DOUGHNUT_TRITON_PORT", "16")
-
UNSTRUCTURED_API_KEY = os.environ.get("UNSTRUCTURED_API_KEY", None)
UNSTRUCTURED_URL = os.environ.get("UNSTRUCTURED_URL", "https://api.unstructured.io/general/v0/general")
UNSTRUCTURED_STRATEGY = os.environ.get("UNSTRUCTURED_STRATEGY", "auto")
@@ -49,7 +45,7 @@
_Type_Extract_Method_PDF = Literal[
"adobe",
- "doughnut",
+ "nemoretriever_parse",
"haystack",
"llama_parse",
"pdfium",
@@ -74,7 +70,7 @@
"tiff": get_args(_Type_Extract_Method_Image),
}
-_Type_Extract_Tables_Method_PDF = Literal["yolox", "pdfium"]
+_Type_Extract_Tables_Method_PDF = Literal["yolox", "pdfium", "nemoretriever_parse"]
_Type_Extract_Tables_Method_DOCX = Literal["python_docx",]
@@ -238,13 +234,6 @@ def to_dict(self) -> Dict:
"unstructured_url": "", # TODO(Devin): Should be an environment variable
}
task_properties["params"].update(unstructured_properties)
- elif self._extract_method == "doughnut":
- doughnut_properties = {
- "doughnut_triton_host": os.environ.get("DOUGHNUT_TRITON_HOST", DOUGHNUT_TRITON_HOST),
- "doughnut_triton_port": os.environ.get("DOUGHNUT_TRITON_PORT", DOUGHNUT_TRITON_PORT),
- "doughnut_batch_size": os.environ.get("DOUGHNUT_BATCH_SIZE", DOUGHNUT_BATCH_SIZE),
- }
- task_properties["params"].update(doughnut_properties)
elif self._extract_method == "unstructured_io":
unstructured_properties = {
"unstructured_api_key": os.environ.get("UNSTRUCTURED_API_KEY", UNSTRUCTURED_API_KEY),
diff --git a/docker-compose.yaml b/docker-compose.yaml
index e265ebb2..a76454f3 100644
--- a/docker-compose.yaml
+++ b/docker-compose.yaml
@@ -125,6 +125,27 @@ services:
capabilities: [gpu]
runtime: nvidia
+ nemoretriever-parse:
+ image: ${NEMORETRIEVER_PARSE_IMAGE:-nvcr.io/nvidia/nemo-microservices/nemoretriever-parse}:${NEMORETRIEVER_PARSE_TAG:-1.2.0ea}
+ ports:
+ - "8015:8000"
+ - "8016:8001"
+ - "8017:8002"
+ user: root
+ environment:
+ - NIM_HTTP_API_PORT=8000
+ - NIM_TRITON_LOG_VERBOSE=1
+ - NGC_API_KEY=${NIM_NGC_API_KEY:-${NGC_API_KEY:-ngcapikey}}
+ - CUDA_VISIBLE_DEVICES=0
+ deploy:
+ resources:
+ reservations:
+ devices:
+ - driver: nvidia
+ device_ids: ["0"]
+ capabilities: [gpu]
+ runtime: nvidia
+
nv-ingest-ms-runtime:
image: nvcr.io/nvidia/nemo-microservices/nv-ingest:24.12
build:
@@ -155,7 +176,6 @@ services:
# build.nvidia.com hosted deplot
#- DEPLOT_HTTP_ENDPOINT=https://ai.api.nvidia.com/v1/vlm/google/deplot
- DEPLOT_INFER_PROTOCOL=http
- - DOUGHNUT_GRPC_TRITON=triton-doughnut:8001
- EMBEDDING_NIM_MODEL_NAME=${EMBEDDING_NIM_MODEL_NAME:-nvidia/nv-embedqa-e5-v5}
- INGEST_LOG_LEVEL=DEFAULT
# Message client for development
@@ -168,6 +188,10 @@ services:
- MESSAGE_CLIENT_TYPE=redis
- MINIO_BUCKET=${MINIO_BUCKET:-nv-ingest}
- MRC_IGNORE_NUMA_CHECK=1
+ # build.nvidia.com hosted nemoretriever-parse
+ #- NEMORETRIEVER_PARSE_HTTP_ENDPOINT=https://ai.api.nvidia.com/v1/vlm/nvidia/nemoretriever-parse
+ - NEMORETRIEVER_PARSE_HTTP_ENDPOINT=http://nemoretriever-parse:8000/v1/chat/completions
+ - NEMORETRIEVER_PARSE_INFER_PROTOCOL=http
- NGC_API_KEY=${NGC_API_KEY:-ngcapikey}
- NVIDIA_BUILD_API_KEY=${NVIDIA_BUILD_API_KEY:-${NGC_API_KEY:-ngcapikey}}
- OTEL_EXPORTER_OTLP_ENDPOINT=otel-collector:4317
diff --git a/docs/docs/assets/images/doughnut_batch_dize.png b/docs/docs/assets/images/doughnut_batch_dize.png
deleted file mode 100644
index b3ae9598..00000000
Binary files a/docs/docs/assets/images/doughnut_batch_dize.png and /dev/null differ
diff --git a/docs/docs/user-guide/developer-guide/deployment.md b/docs/docs/user-guide/developer-guide/deployment.md
index 1bbd5148..8d81eec2 100644
--- a/docs/docs/user-guide/developer-guide/deployment.md
+++ b/docs/docs/user-guide/developer-guide/deployment.md
@@ -25,10 +25,6 @@ docker compose up -d otel-collector prometheus grafana zipkin
# The `embed` task will not be functional without this service.
docker compose up -d embedding
-# Optional (Triton) See below for Triton setup we need Triton for any model inference
-# This is only needed for captioning or DOUGHNUT based extraction.
-docker compose up -d triton
-
# Ingest service
docker compose up -d nv-ingest-ms-runtime
```
diff --git a/docs/docs/user-guide/developer-guide/environment-config.md b/docs/docs/user-guide/developer-guide/environment-config.md
index a3dba36e..f208f3ad 100644
--- a/docs/docs/user-guide/developer-guide/environment-config.md
+++ b/docs/docs/user-guide/developer-guide/environment-config.md
@@ -7,8 +7,6 @@ The following are the environment configuration variables that you can specify i
|----------------------------------|--------------------------------|-----------------------------------------------------------------------|
| `CAPTION_CLASSIFIER_GRPC_TRITON` | - `triton:8001`
| The endpoint where the caption classifier model is hosted using gRPC for communication. This is used to send requests for caption classification. You must specify only ONE of an http or gRPC endpoint. If both are specified gRPC will take precedence. |
| `CAPTION_CLASSIFIER_MODEL_NAME` | - `deberta_large`
| The name of the caption classifier model. |
-| `DOUGHNUT_TRITON_HOST` | - `triton-doughnut`
| The hostname or IP address of the DOUGHNUT model service. |
-| `DOUGHNUT_TRITON_PORT` | - `8001`
| The port number on which the DOUGHNUT model service is listening. |
| `INGEST_LOG_LEVEL` | - `DEBUG`
- `INFO`
- `WARNING`
- `ERROR`
- `CRITICAL`
| The log level for the ingest service, which controls the verbosity of the logging output. |
| `MESSAGE_CLIENT_HOST` | - `redis`
- `localhost`
- `192.168.1.10`
| Specifies the hostname or IP address of the message broker used for communication between services. |
| `MESSAGE_CLIENT_PORT` | - `7670`
- `6379`
| Specifies the port number on which the message broker is listening. |
diff --git a/docs/docs/user-guide/developer-guide/nv-ingest_cli.md b/docs/docs/user-guide/developer-guide/nv-ingest_cli.md
index d4fe2c5c..b1cb0445 100644
--- a/docs/docs/user-guide/developer-guide/nv-ingest_cli.md
+++ b/docs/docs/user-guide/developer-guide/nv-ingest_cli.md
@@ -32,7 +32,7 @@ Options:
Example:
--task 'split:{"split_by":"page", "split_length":10}'
--task 'extract:{"document_type":"pdf", "extract_text":true}'
- --task 'extract:{"document_type":"pdf", "extract_method":"doughnut"}'
+ --task 'extract:{"document_type":"pdf", "extract_method":"nemoretriever_parse"}'
--task 'extract:{"document_type":"pdf", "extract_method":"unstructured_io"}'
--task 'extract:{"document_type":"docx", "extract_text":true, "extract_images":true}'
--task 'store:{"content_type":"image", "store_method":"minio", "endpoint":"minio:9000"}'
@@ -120,7 +120,7 @@ nv-ingest-cli \
Submit a PDF file with splitting and extraction tasks.
-**Note: (TODO)** This currently only works for pdfium, doughnut, and Unstructured.io; haystack, Adobe, and LlamaParse
+**Note: (TODO)** This currently only works for pdfium, nemoretriever_parse, and Unstructured.io; haystack, Adobe, and LlamaParse
have existing workflows but have not been fully converted to use our unified metadata schema.
```bash
diff --git a/src/nv_ingest/extraction_workflows/image/image_handlers.py b/src/nv_ingest/extraction_workflows/image/image_handlers.py
index 9accecf3..3e7a9bec 100644
--- a/src/nv_ingest/extraction_workflows/image/image_handlers.py
+++ b/src/nv_ingest/extraction_workflows/image/image_handlers.py
@@ -30,9 +30,9 @@
from wand.image import Image as WandImage
import nv_ingest.util.nim.yolox as yolox_utils
-from nv_ingest.extraction_workflows.pdf.doughnut_utils import crop_image
from nv_ingest.schemas.image_extractor_schema import ImageConfigSchema
from nv_ingest.schemas.metadata_schema import AccessLevelEnum
+from nv_ingest.util.image_processing.transforms import crop_image
from nv_ingest.util.image_processing.transforms import numpy_to_base64
from nv_ingest.util.nim.helpers import create_inference_client
from nv_ingest.util.pdf.metadata_aggregators import CroppedImageWithContent
@@ -160,7 +160,8 @@ def extract_table_and_chart_images(
*bbox, _ = bboxes
h1, w1, h2, w2 = np.array(bbox) * np.array([height, width, height, width])
- base64_img = crop_image(original_image, (int(h1), int(w1), int(h2), int(w2)))
+ cropped_img = crop_image(original_image, (int(h1), int(w1), int(h2), int(w2)))
+ base64_img = numpy_to_base64(cropped_img) if cropped_img is not None else None
table_data = CroppedImageWithContent(
content="",
diff --git a/src/nv_ingest/extraction_workflows/pdf/__init__.py b/src/nv_ingest/extraction_workflows/pdf/__init__.py
index ff752ef5..8db3bddf 100644
--- a/src/nv_ingest/extraction_workflows/pdf/__init__.py
+++ b/src/nv_ingest/extraction_workflows/pdf/__init__.py
@@ -4,7 +4,7 @@
from nv_ingest.extraction_workflows.pdf.adobe_helper import adobe
-from nv_ingest.extraction_workflows.pdf.doughnut_helper import doughnut
+from nv_ingest.extraction_workflows.pdf.nemoretriever_parse_helper import nemoretriever_parse
from nv_ingest.extraction_workflows.pdf.llama_parse_helper import llama_parse
from nv_ingest.extraction_workflows.pdf.pdfium_helper import pdfium_extractor as pdfium
from nv_ingest.extraction_workflows.pdf.tika_helper import tika
@@ -15,6 +15,6 @@
"pdfium",
"tika",
"unstructured_io",
- "doughnut",
+ "nemoretriever_parse",
"adobe",
]
diff --git a/src/nv_ingest/extraction_workflows/pdf/doughnut_helper.py b/src/nv_ingest/extraction_workflows/pdf/doughnut_helper.py
deleted file mode 100644
index 10435fef..00000000
--- a/src/nv_ingest/extraction_workflows/pdf/doughnut_helper.py
+++ /dev/null
@@ -1,365 +0,0 @@
-# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
-# All rights reserved.
-# SPDX-License-Identifier: Apache-2.0
-
-
-# Copyright (c) 2024, NVIDIA CORPORATION.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import logging
-import os
-import uuid
-from typing import Dict
-from typing import List
-from typing import Tuple
-
-import numpy as np
-import pypdfium2 as pdfium
-import tritonclient.grpc as grpcclient
-
-from nv_ingest.schemas.metadata_schema import AccessLevelEnum
-from nv_ingest.schemas.metadata_schema import ContentSubtypeEnum
-from nv_ingest.schemas.metadata_schema import ContentTypeEnum
-from nv_ingest.schemas.metadata_schema import StdContentDescEnum
-from nv_ingest.schemas.metadata_schema import TableFormatEnum
-from nv_ingest.schemas.metadata_schema import TextTypeEnum
-from nv_ingest.schemas.metadata_schema import validate_metadata
-from nv_ingest.util.exception_handlers.pdf import pdfium_exception_handler
-from nv_ingest.util.image_processing.transforms import crop_image
-from nv_ingest.util.image_processing.transforms import numpy_to_base64
-from nv_ingest.util.nim import doughnut as doughnut_utils
-from nv_ingest.util.pdf.metadata_aggregators import Base64Image
-from nv_ingest.util.pdf.metadata_aggregators import LatexTable
-from nv_ingest.util.pdf.metadata_aggregators import construct_image_metadata_from_pdf_image
-from nv_ingest.util.pdf.metadata_aggregators import construct_text_metadata
-from nv_ingest.util.pdf.metadata_aggregators import extract_pdf_metadata
-from nv_ingest.util.pdf.pdfium import pdfium_pages_to_numpy
-
-logger = logging.getLogger(__name__)
-
-DOUGHNUT_GRPC_TRITON = os.environ.get("DOUGHNUT_GRPC_TRITON", "triton:8001")
-DEFAULT_BATCH_SIZE = 16
-DEFAULT_RENDER_DPI = 300
-DEFAULT_MAX_WIDTH = 1024
-DEFAULT_MAX_HEIGHT = 1280
-
-
-# Define a helper function to use doughnut to extract text from a base64 encoded bytestram PDF
-def doughnut(pdf_stream, extract_text: bool, extract_images: bool, extract_tables: bool, **kwargs):
- """
- Helper function to use doughnut to extract text from a bytestream PDF.
-
- Parameters
- ----------
- pdf_stream : io.BytesIO
- A bytestream PDF.
- extract_text : bool
- Specifies whether to extract text.
- extract_images : bool
- Specifies whether to extract images.
- extract_tables : bool
- Specifies whether to extract tables.
- **kwargs
- The keyword arguments are used for additional extraction parameters.
-
- Returns
- -------
- str
- A string of extracted text.
- """
- logger.debug("Extracting PDF with doughnut backend.")
-
- doughnut_triton_url = kwargs.get("doughnut_grpc_triton", DOUGHNUT_GRPC_TRITON)
-
- batch_size = int(kwargs.get("doughnut_batch_size", DEFAULT_BATCH_SIZE))
-
- row_data = kwargs.get("row_data")
- # get source_id
- source_id = row_data["source_id"]
- # get text_depth
- text_depth = kwargs.get("text_depth", "page")
- text_depth = TextTypeEnum[text_depth.upper()]
-
- identify_nearby_objects = kwargs.get("identify_nearby_objects", True)
-
- # get base metadata
- metadata_col = kwargs.get("metadata_column", "metadata")
- base_unified_metadata = row_data[metadata_col] if metadata_col in row_data.index else {}
-
- # get base source_metadata
- base_source_metadata = base_unified_metadata.get("source_metadata", {})
- # get source_location
- source_location = base_source_metadata.get("source_location", "")
- # get collection_id (assuming coming in from source_metadata...)
- collection_id = base_source_metadata.get("collection_id", "")
- # get partition_id (assuming coming in from source_metadata...)
- partition_id = base_source_metadata.get("partition_id", -1)
- # get access_level (assuming coming in from source_metadata...)
- access_level = base_source_metadata.get("access_level", AccessLevelEnum.LEVEL_1)
-
- extracted_data = []
- doc = pdfium.PdfDocument(pdf_stream)
- pdf_metadata = extract_pdf_metadata(doc, source_id)
-
- source_metadata = {
- "source_name": pdf_metadata.filename,
- "source_id": source_id,
- "source_location": source_location,
- "source_type": pdf_metadata.source_type,
- "collection_id": collection_id,
- "date_created": pdf_metadata.date_created,
- "last_modified": pdf_metadata.last_modified,
- "summary": "",
- "partition_id": partition_id,
- "access_level": access_level,
- }
-
- pages = []
- page_sizes = []
- for page_idx in range(pdf_metadata.page_count):
- page = doc.get_page(page_idx)
- pages.append(page)
- page_width, page_height = doc.get_page_size(page_idx)
- page_sizes.append((page_width, page_height))
-
- # Split into batches.
- i = 0
- batches = []
- batch_page_offsets = []
- while i < len(pages):
- batches.append(pages[i : i + batch_size]) # noqa: E203
- batch_page_offsets.append(i)
- i += batch_size
-
- accumulated_text = []
- accumulated_tables = []
- accumulated_images = []
-
- triton_client = grpcclient.InferenceServerClient(url=doughnut_triton_url)
-
- for batch, batch_page_offset in zip(batches, batch_page_offsets):
- responses = preprocess_and_send_requests(triton_client, batch, batch_page_offset)
-
- for page_idx, raw_text, bbox_offset in responses:
- page_image = None
- page_width, page_height = page_sizes[page_idx]
-
- classes, bboxes, texts = doughnut_utils.extract_classes_bboxes(raw_text)
-
- page_nearby_blocks = {
- "text": {"content": [], "bbox": []},
- "images": {"content": [], "bbox": []},
- "structured": {"content": [], "bbox": []},
- }
-
- for cls, bbox, txt in zip(classes, bboxes, texts):
- if extract_text:
- txt = doughnut_utils.postprocess_text(txt, cls)
-
- if extract_images and identify_nearby_objects:
- bbox = doughnut_utils.reverse_transform_bbox(
- bbox=bbox,
- bbox_offset=bbox_offset,
- original_width=DEFAULT_MAX_WIDTH,
- original_height=DEFAULT_MAX_HEIGHT,
- )
- page_nearby_blocks["text"]["content"].append(txt)
- page_nearby_blocks["text"]["bbox"].append(bbox)
-
- accumulated_text.append(txt)
-
- elif extract_tables and (cls == "Table"):
- try:
- txt = txt.encode().decode("unicode_escape") # remove double backlashes
- except UnicodeDecodeError:
- pass
- bbox = doughnut_utils.reverse_transform_bbox(bbox, bbox_offset)
- table = LatexTable(latex=txt, bbox=bbox, max_width=page_width, max_height=page_height)
- accumulated_tables.append(table)
-
- elif extract_images and (cls == "Picture"):
- if page_image is None:
- scale_tuple = (DEFAULT_MAX_WIDTH, DEFAULT_MAX_HEIGHT)
- padding_tuple = (DEFAULT_MAX_WIDTH, DEFAULT_MAX_HEIGHT)
- page_image, *_ = pdfium_pages_to_numpy(
- [pages[page_idx]], scale_tuple=scale_tuple, padding_tuple=padding_tuple
- )
- page_image = page_image[0]
-
- img_numpy = crop_image(page_image, bbox)
- if img_numpy is not None:
- base64_img = numpy_to_base64(img_numpy)
- bbox = doughnut_utils.reverse_transform_bbox(bbox, bbox_offset)
- image = Base64Image(
- image=base64_img,
- bbox=bbox,
- width=img_numpy.shape[1],
- height=img_numpy.shape[0],
- max_width=page_width,
- max_height=page_height,
- )
- accumulated_images.append(image)
-
- # Construct tables
- if extract_tables:
- for table in accumulated_tables:
- extracted_data.append(
- _construct_table_metadata(
- table,
- page_idx,
- pdf_metadata.page_count,
- source_metadata,
- base_unified_metadata,
- )
- )
- accumulated_tables = []
-
- # Construct images
- if extract_images:
- for image in accumulated_images:
- extracted_data.append(
- construct_image_metadata_from_pdf_image(
- image,
- page_idx,
- pdf_metadata.page_count,
- source_metadata,
- base_unified_metadata,
- )
- )
- accumulated_images = []
-
- # Construct text - page
- if (extract_text) and (text_depth == TextTypeEnum.PAGE):
- extracted_data.append(
- construct_text_metadata(
- accumulated_text,
- pdf_metadata.keywords,
- page_idx,
- -1,
- -1,
- -1,
- pdf_metadata.page_count,
- text_depth,
- source_metadata,
- base_unified_metadata,
- )
- )
- accumulated_text = []
-
- # Construct text - document
- if (extract_text) and (text_depth == TextTypeEnum.DOCUMENT):
- text_extraction = construct_text_metadata(
- accumulated_text,
- pdf_metadata.keywords,
- -1,
- -1,
- -1,
- -1,
- pdf_metadata.page_count,
- text_depth,
- source_metadata,
- base_unified_metadata,
- )
-
- if len(text_extraction) > 0:
- extracted_data.append(text_extraction)
-
- triton_client.close()
-
- return extracted_data
-
-
-def preprocess_and_send_requests(
- triton_client,
- batch: List[pdfium.PdfPage],
- batch_offset: int,
-) -> List[Tuple[int, str]]:
- if not batch:
- return []
-
- render_dpi = DEFAULT_RENDER_DPI
- scale_tuple = (DEFAULT_MAX_WIDTH, DEFAULT_MAX_HEIGHT)
- padding_tuple = (DEFAULT_MAX_WIDTH, DEFAULT_MAX_HEIGHT)
-
- page_images, bbox_offsets = pdfium_pages_to_numpy(
- batch, render_dpi=render_dpi, scale_tuple=scale_tuple, padding_tuple=padding_tuple
- )
- page_numbers = [page_idx for page_idx in range(batch_offset, batch_offset + len(page_images))]
-
- batch = np.array(page_images)
-
- input_tensors = [grpcclient.InferInput("image", batch.shape, datatype="UINT8")]
- input_tensors[0].set_data_from_numpy(batch)
-
- outputs = [grpcclient.InferRequestedOutput("text")]
-
- query_response = triton_client.infer(
- model_name="doughnut",
- inputs=input_tensors,
- outputs=outputs,
- )
-
- text = query_response.as_numpy("text").tolist()
- text = [t.decode() for t in text]
-
- if len(text) != len(batch):
- return []
-
- return list(zip(page_numbers, text, bbox_offsets))
-
-
-@pdfium_exception_handler(descriptor="doughnut")
-def _construct_table_metadata(
- table: LatexTable,
- page_idx: int,
- page_count: int,
- source_metadata: Dict,
- base_unified_metadata: Dict,
-):
- content = table.latex
- table_format = TableFormatEnum.LATEX
- subtype = ContentSubtypeEnum.TABLE
- description = StdContentDescEnum.PDF_TABLE
-
- content_metadata = {
- "type": ContentTypeEnum.STRUCTURED,
- "description": description,
- "page_number": page_idx,
- "hierarchy": {
- "page_count": page_count,
- "page": page_idx,
- "line": -1,
- "span": -1,
- },
- "subtype": subtype,
- }
- table_metadata = {
- "caption": "",
- "table_format": table_format,
- "table_location": table.bbox,
- }
- ext_unified_metadata = base_unified_metadata.copy()
-
- ext_unified_metadata.update(
- {
- "content": content,
- "source_metadata": source_metadata,
- "content_metadata": content_metadata,
- "table_metadata": table_metadata,
- }
- )
-
- validated_unified_metadata = validate_metadata(ext_unified_metadata)
-
- return [ContentTypeEnum.STRUCTURED, validated_unified_metadata.model_dump(), str(uuid.uuid4())]
diff --git a/src/nv_ingest/extraction_workflows/pdf/doughnut_utils.py b/src/nv_ingest/extraction_workflows/pdf/doughnut_utils.py
deleted file mode 100644
index d1f7ac21..00000000
--- a/src/nv_ingest/extraction_workflows/pdf/doughnut_utils.py
+++ /dev/null
@@ -1,161 +0,0 @@
-# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
-# All rights reserved.
-# SPDX-License-Identifier: Apache-2.0
-
-import re
-from math import ceil
-from math import floor
-from typing import List
-from typing import Optional
-from typing import Tuple
-
-import numpy as np
-
-from nv_ingest.util.image_processing.transforms import numpy_to_base64
-
-DEFAULT_DPI = 300
-DEFAULT_MAX_WIDTH = 1024
-DEFAULT_MAX_HEIGHT = 1280
-
-ACCEPTED_CLASSES = set(
- [
- "Text",
- "Title",
- "Section-header",
- "List-item",
- "TOC",
- "Bibliography",
- "Formula",
- ]
-)
-IGNORED_CLASSES = set(
- [
- "Page-header",
- "Page-footer",
- "Caption",
- "Footnote",
- "Floating-text",
- ]
-)
-
-_re_extract_class_bbox = re.compile(
- r"(.*?)]+)>", re.MULTILINE | re.DOTALL
-)
-
-
-def extract_classes_bboxes(text: str) -> Tuple[List[str], List[Tuple[int, int, int, int]], List[str]]:
- classes: List[str] = []
- bboxes: List[Tuple[int, int, int, int]] = []
- texts: List[str] = []
- for m in _re_extract_class_bbox.finditer(text):
- x1, y1, text, x2, y2, cls = m.groups()
- classes.append(cls)
- bboxes.append((int(x1), int(y1), int(x2), int(y2)))
- texts.append(text)
-
- return classes, bboxes, texts
-
-
-def convert_mmd_to_plain_text_ours(mmd_text, remove_inline_math: bool = False):
- # Remove markdown links (e.g., [link](url))
- mmd_text = re.sub(r"\(\[https?://[^\s\]]+\]\((https?://[^\s\]]+)\)\)", r"(\1)", mmd_text)
-
- # Remove headers (e.g., ##)
- mmd_text = re.sub(r"#+\s", "", mmd_text)
-
- # Remove bold (e.g., **)
- mmd_text = mmd_text.replace("**", "")
- # Remove italic (e.g., *)
- mmd_text = re.sub(r"\*(.*?)\*", r"\1", mmd_text)
- # Remove emphasized text formatting (e.g., _)
- mmd_text = re.sub(r"(?", "", mmd_text)
-
- if remove_inline_math:
- # Remove formulas inside paragraphs (e.g., \(R_{ij}(P^{a})=0\))
- mmd_text = re.sub(r"\\\((.*?)\\\)", "", mmd_text)
- else:
- # Treat simple formulas inside paragraphs as plain text
- mmd_text = re.sub(r"\\\((.*?)\\\)", r"\1", mmd_text)
-
- # Remove asterisk in lists
- mmd_text = re.sub(r"^\*\s", "", mmd_text, flags=re.MULTILINE)
- # Remove tables
- mmd_text = re.sub(r"\\begin{table}(.*?)\\end{table}", "", mmd_text, flags=re.DOTALL)
- mmd_text = re.sub(r"\\begin{tabular}(.*?)\\end{tabular}", "", mmd_text, flags=re.DOTALL)
- # Remove code blocks (e.g., ```python ... ```)
- mmd_text = re.sub(r"```.*?```", "", mmd_text, flags=re.DOTALL)
- # Remove equations (e.g., \[ ... \])
- mmd_text = re.sub(r"\\\[(.*?)\\\]", "", mmd_text, flags=re.DOTALL)
- # Remove inline equations (e.g., $ ... $)
- mmd_text = re.sub(r"\$(.*?)\$", "", mmd_text)
- # Remove tables
- mmd_text = re.sub(r"\|.*?\|", "", mmd_text, flags=re.DOTALL)
-
- # Additional cleanup for special characters
- mmd_text = re.sub(r"\\", "", mmd_text)
-
- return mmd_text.strip()
-
-
-def crop_image(array: np.array, bbox: Tuple[int, int, int, int], format="PNG") -> Optional[str]:
- w1, h1, w2, h2 = bbox
- h1 = max(floor(h1), 0)
- h2 = min(ceil(h2), array.shape[0])
- w1 = max(floor(w1), 0)
- w2 = min(ceil(w2), array.shape[1])
- if (w2 - w1 <= 0) or (h2 - h1 <= 0):
- return None
- cropped = array[h1:h2, w1:w2]
- base64_img = numpy_to_base64(cropped)
-
- return base64_img
-
-
-def pad_image(
- array: np.array, target_width=DEFAULT_MAX_WIDTH, target_height=DEFAULT_MAX_HEIGHT
-) -> Tuple[np.array, Tuple[int, int]]:
- height, width = array.shape[:2]
- if (height > target_height) or (width > target_width):
- raise ValueError(
- f"Image array is too large. Dimensions must be width <= {target_width} and height <= {target_height}."
- )
-
- if height == target_height and width == target_width:
- return array, (0, 0)
-
- pad_height = (target_height - height) // 2
- pad_width = (target_width - width) // 2
- canvas = 255 * np.ones((target_height, target_width, 3), dtype=np.uint8)
- canvas[pad_height : pad_height + height, pad_width : pad_width + width] = array # noqa: E203
-
- return canvas, (pad_width, pad_height)
-
-
-def reverse_transform_bbox(
- bbox: Tuple[int, int, int, int],
- bbox_offset: Tuple[int, int],
- original_width: int = DEFAULT_MAX_WIDTH,
- original_height: int = DEFAULT_MAX_HEIGHT,
-) -> Tuple[int, int, int, int]:
- width_ratio = (original_width - 2 * bbox_offset[0]) / original_width
- height_ratio = (original_height - 2 * bbox_offset[1]) / original_height
- w1, h1, w2, h2 = bbox
- w1 = int((w1 - bbox_offset[0]) / width_ratio)
- h1 = int((h1 - bbox_offset[1]) / height_ratio)
- w2 = int((w2 - bbox_offset[0]) / width_ratio)
- h2 = int((h2 - bbox_offset[1]) / height_ratio)
-
- return (w1, h1, w2, h2)
-
-
-def postprocess_text(txt: str, cls: str):
- if cls in ACCEPTED_CLASSES:
- txt = txt.replace("", "").strip() # remove tokens (continued paragraphs)
- txt = convert_mmd_to_plain_text_ours(txt)
- else:
- txt = ""
-
- return txt
diff --git a/src/nv_ingest/extraction_workflows/pdf/nemoretriever_parse_helper.py b/src/nv_ingest/extraction_workflows/pdf/nemoretriever_parse_helper.py
new file mode 100644
index 00000000..8f02ec21
--- /dev/null
+++ b/src/nv_ingest/extraction_workflows/pdf/nemoretriever_parse_helper.py
@@ -0,0 +1,516 @@
+# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
+# All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+
+
+# Copyright (c) 2024, NVIDIA CORPORATION.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import math
+import traceback
+import uuid
+import concurrent.futures
+from typing import Any
+from typing import Dict
+from typing import Tuple
+from typing import Optional
+from typing import List
+
+import numpy as np
+import pypdfium2 as pdfium
+
+from nv_ingest.schemas.metadata_schema import AccessLevelEnum
+from nv_ingest.schemas.metadata_schema import ContentSubtypeEnum
+from nv_ingest.schemas.metadata_schema import ContentTypeEnum
+from nv_ingest.schemas.metadata_schema import StdContentDescEnum
+from nv_ingest.schemas.metadata_schema import TableFormatEnum
+from nv_ingest.schemas.metadata_schema import TextTypeEnum
+from nv_ingest.schemas.metadata_schema import validate_metadata
+from nv_ingest.schemas.pdf_extractor_schema import PDFiumConfigSchema
+from nv_ingest.schemas.pdf_extractor_schema import NemoRetrieverParseConfigSchema
+from nv_ingest.util.exception_handlers.pdf import pdfium_exception_handler
+from nv_ingest.util.image_processing.transforms import crop_image
+from nv_ingest.util.image_processing.transforms import numpy_to_base64
+from nv_ingest.util.nim import nemoretriever_parse as nemoretriever_parse_utils
+from nv_ingest.util.nim.helpers import create_inference_client
+from nv_ingest.util.pdf.metadata_aggregators import Base64Image
+from nv_ingest.util.pdf.metadata_aggregators import LatexTable
+from nv_ingest.util.pdf.metadata_aggregators import construct_image_metadata_from_pdf_image
+from nv_ingest.util.pdf.metadata_aggregators import construct_text_metadata
+from nv_ingest.util.pdf.metadata_aggregators import extract_pdf_metadata
+from nv_ingest.util.pdf.pdfium import pdfium_pages_to_numpy
+from nv_ingest.extraction_workflows.pdf.pdfium_helper import _extract_tables_and_charts
+from nv_ingest.extraction_workflows.pdf.pdfium_helper import YOLOX_MAX_BATCH_SIZE
+
+
+logger = logging.getLogger(__name__)
+
+NEMORETRIEVER_PARSE_RENDER_DPI = 300
+NEMORETRIEVER_PARSE_MAX_WIDTH = 1024
+NEMORETRIEVER_PARSE_MAX_HEIGHT = 1280
+NEMORETRIEVER_PARSE_MAX_BATCH_SIZE = 2
+
+
+# Define a helper function to use nemoretriever_parse to extract text from a base64 encoded bytestram PDF
+def nemoretriever_parse(
+ pdf_stream,
+ extract_text: bool,
+ extract_images: bool,
+ extract_tables: bool,
+ extract_charts: bool,
+ trace_info: Optional[List] = None,
+ **kwargs,
+):
+ """
+ Helper function to use nemoretriever_parse to extract text from a bytestream PDF.
+
+ Parameters
+ ----------
+ pdf_stream : io.BytesIO
+ A bytestream PDF.
+ extract_text : bool
+ Specifies whether to extract text.
+ extract_images : bool
+ Specifies whether to extract images.
+ extract_tables : bool
+ Specifies whether to extract tables.
+ **kwargs
+ The keyword arguments are used for additional extraction parameters.
+
+ Returns
+ -------
+ str
+ A string of extracted text.
+ """
+ logger.debug("Extracting PDF with nemoretriever_parse backend.")
+
+ nemoretriever_parse_config = kwargs.get("nemoretriever_parse_config", {})
+ nemoretriever_parse_config = nemoretriever_parse_config if nemoretriever_parse_config is not None else {}
+
+ row_data = kwargs.get("row_data")
+ # get source_id
+ source_id = row_data["source_id"]
+ # get text_depth
+ text_depth = kwargs.get("text_depth", "page")
+ text_depth = TextTypeEnum[text_depth.upper()]
+
+ extract_tables_method = kwargs.get("extract_tables_method", "yolox")
+ identify_nearby_objects = kwargs.get("identify_nearby_objects", True)
+ paddle_output_format = kwargs.get("paddle_output_format", "pseudo_markdown")
+ paddle_output_format = TableFormatEnum[paddle_output_format.upper()]
+
+ pdfium_config = kwargs.get("pdfium_config", {})
+ if isinstance(pdfium_config, dict):
+ pdfium_config = PDFiumConfigSchema(**pdfium_config)
+ nemoretriever_parse_config = kwargs.get("nemoretriever_parse_config", {})
+ if isinstance(nemoretriever_parse_config, dict):
+ nemoretriever_parse_config = NemoRetrieverParseConfigSchema(**nemoretriever_parse_config)
+
+ # get base metadata
+ metadata_col = kwargs.get("metadata_column", "metadata")
+ base_unified_metadata = row_data[metadata_col] if metadata_col in row_data.index else {}
+
+ # get base source_metadata
+ base_source_metadata = base_unified_metadata.get("source_metadata", {})
+ # get source_location
+ source_location = base_source_metadata.get("source_location", "")
+ # get collection_id (assuming coming in from source_metadata...)
+ collection_id = base_source_metadata.get("collection_id", "")
+ # get partition_id (assuming coming in from source_metadata...)
+ partition_id = base_source_metadata.get("partition_id", -1)
+ # get access_level (assuming coming in from source_metadata...)
+ access_level = base_source_metadata.get("access_level", AccessLevelEnum.LEVEL_1)
+
+ extracted_data = []
+ doc = pdfium.PdfDocument(pdf_stream)
+ pdf_metadata = extract_pdf_metadata(doc, source_id)
+ page_count = pdf_metadata.page_count
+
+ source_metadata = {
+ "source_name": pdf_metadata.filename,
+ "source_id": source_id,
+ "source_location": source_location,
+ "source_type": pdf_metadata.source_type,
+ "collection_id": collection_id,
+ "date_created": pdf_metadata.date_created,
+ "last_modified": pdf_metadata.last_modified,
+ "summary": "",
+ "partition_id": partition_id,
+ "access_level": access_level,
+ }
+
+ accumulated_text = []
+ accumulated_tables = []
+ accumulated_images = []
+
+ pages_for_ocr = [] # We'll accumulate (page_idx, np_image) here
+ pages_for_tables = [] # We'll accumulate (page_idx, np_image) here
+ futures = [] # We'll keep track of all the Future objects for table/charts
+
+ nemoretriever_parse_client = _create_clients(nemoretriever_parse_config)
+
+ max_workers = nemoretriever_parse_config.workers_per_progress_engine
+ with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
+
+ for page_idx in range(page_count):
+ page = doc.get_page(page_idx)
+
+ page_image = _convert_pdfium_page_to_numpy(page)
+ pages_for_ocr.append((page_idx, page_image))
+ pages_for_tables.append((page_idx, page_image))
+
+ page.close()
+
+ # Whenever pages_as_images hits NEMORETRIEVER_PARSE_MAX_BATCH_SIZE, submit a job
+ if len(pages_for_ocr) >= NEMORETRIEVER_PARSE_MAX_BATCH_SIZE:
+ future_parser = executor.submit(
+ lambda *args, **kwargs: ("parser", _extract_text_and_bounding_boxes(*args, **kwargs)),
+ pages_for_ocr[:], # pass a copy
+ nemoretriever_parse_client,
+ trace_info=trace_info,
+ )
+ futures.append(future_parser)
+ pages_for_ocr.clear()
+
+ # Whenever pages_as_images hits YOLOX_MAX_BATCH_SIZE, submit a job
+ if (
+ (extract_tables_method == "yolox")
+ and (extract_tables or extract_charts)
+ and (len(pages_for_tables) >= YOLOX_MAX_BATCH_SIZE)
+ ):
+ future_yolox = executor.submit(
+ lambda *args, **kwargs: ("yolox", _extract_tables_and_charts(*args, **kwargs)),
+ pages_for_tables[:], # pass a copy
+ pdfium_config,
+ page_count,
+ source_metadata,
+ base_unified_metadata,
+ paddle_output_format,
+ trace_info=trace_info,
+ )
+ futures.append(future_yolox)
+ pages_for_tables.clear()
+
+ # After page loop, if we still have leftover pages_as_images, submit one last job
+ if pages_for_ocr:
+ future_parser = executor.submit(
+ lambda *args, **kwargs: ("parser", _extract_text_and_bounding_boxes(*args, **kwargs)),
+ pages_for_ocr[:], # pass a copy
+ nemoretriever_parse_client,
+ trace_info=trace_info,
+ )
+ futures.append(future_parser)
+ pages_for_ocr.clear()
+
+ if (extract_tables_method == "yolox") and (extract_tables or extract_charts) and pages_for_tables:
+ future_yolox = executor.submit(
+ lambda *args, **kwargs: ("yolox", _extract_tables_and_charts(*args, **kwargs)),
+ pages_for_tables[:],
+ pdfium_config,
+ page_count,
+ source_metadata,
+ base_unified_metadata,
+ paddle_output_format,
+ trace_info=trace_info,
+ )
+ futures.append(future_yolox)
+ pages_for_tables.clear()
+
+ parser_results = []
+ # Now wait for all futures to complete
+ for fut in concurrent.futures.as_completed(futures):
+ model_name, extracted_items = fut.result() # blocks until finished
+ if (model_name == "yolox") and (extract_tables or extract_charts):
+ extracted_data.extend(extracted_items)
+ elif model_name == "parser":
+ parser_results.extend(extracted_items)
+
+ for page_idx, parser_output in parser_results:
+ page = None
+ page_image = None
+ page_text = []
+
+ page_nearby_blocks = {
+ "text": {"content": [], "bbox": [], "type": []},
+ "images": {"content": [], "bbox": [], "type": []},
+ "structured": {"content": [], "bbox": [], "type": []},
+ }
+
+ for bbox_dict in parser_output:
+ cls = bbox_dict["type"]
+ bbox = bbox_dict["bbox"]
+ txt = bbox_dict["text"]
+
+ transformed_bbox = [
+ math.floor(bbox["xmin"] * NEMORETRIEVER_PARSE_MAX_WIDTH),
+ math.floor(bbox["ymin"] * NEMORETRIEVER_PARSE_MAX_HEIGHT),
+ math.ceil(bbox["xmax"] * NEMORETRIEVER_PARSE_MAX_WIDTH),
+ math.ceil(bbox["ymax"] * NEMORETRIEVER_PARSE_MAX_HEIGHT),
+ ]
+
+ if cls not in nemoretriever_parse_utils.ACCEPTED_CLASSES:
+ continue
+
+ if identify_nearby_objects:
+ _insert_page_nearby_blocks(page_nearby_blocks, cls, txt, transformed_bbox)
+
+ if extract_text:
+ page_text.append(txt)
+
+ if (extract_tables_method == "nemoretriever_parse") and (extract_tables) and (cls == "Table"):
+ table = LatexTable(
+ latex=txt,
+ bbox=transformed_bbox,
+ max_width=NEMORETRIEVER_PARSE_MAX_WIDTH,
+ max_height=NEMORETRIEVER_PARSE_MAX_HEIGHT,
+ )
+ accumulated_tables.append(table)
+
+ if extract_images and (cls == "Picture"):
+ if page is None:
+ page = doc.get_page(page_idx)
+ if page_image is None:
+ page_image = _convert_pdfium_page_to_numpy(page)
+
+ img_numpy = crop_image(page_image, transformed_bbox)
+
+ if img_numpy is not None:
+ base64_img = numpy_to_base64(img_numpy)
+ image = Base64Image(
+ image=base64_img,
+ bbox=transformed_bbox,
+ width=img_numpy.shape[1],
+ height=img_numpy.shape[0],
+ max_width=NEMORETRIEVER_PARSE_MAX_WIDTH,
+ max_height=NEMORETRIEVER_PARSE_MAX_HEIGHT,
+ )
+ accumulated_images.append(image)
+
+ # If NemoRetrieverParse fails to extract anything, fall back to using pdfium.
+ if not "".join(page_text).strip():
+ if page is None:
+ page = doc.get_page(page_idx)
+ page_text = [page.get_textpage().get_text_bounded()]
+
+ accumulated_text.extend(page_text)
+
+ # Construct tables
+ if extract_tables:
+ for table in accumulated_tables:
+ extracted_data.append(
+ _construct_table_metadata(
+ table,
+ page_idx,
+ page_count,
+ source_metadata,
+ base_unified_metadata,
+ )
+ )
+ accumulated_tables = []
+
+ # Construct images
+ if extract_images:
+ for image in accumulated_images:
+ extracted_data.append(
+ construct_image_metadata_from_pdf_image(
+ image,
+ page_idx,
+ page_count,
+ source_metadata,
+ base_unified_metadata,
+ )
+ )
+ accumulated_images = []
+
+ # Construct text - page
+ if (extract_text) and (text_depth == TextTypeEnum.PAGE):
+ extracted_data.append(
+ construct_text_metadata(
+ accumulated_text,
+ pdf_metadata.keywords,
+ page_idx,
+ -1,
+ -1,
+ -1,
+ page_count,
+ text_depth,
+ source_metadata,
+ base_unified_metadata,
+ delimiter="\n\n",
+ bbox_max_dimensions=(NEMORETRIEVER_PARSE_MAX_WIDTH, NEMORETRIEVER_PARSE_MAX_HEIGHT),
+ nearby_objects=page_nearby_blocks,
+ )
+ )
+ accumulated_text = []
+
+ # Construct text - document
+ if (extract_text) and (text_depth == TextTypeEnum.DOCUMENT):
+ text_extraction = construct_text_metadata(
+ accumulated_text,
+ pdf_metadata.keywords,
+ -1,
+ -1,
+ -1,
+ -1,
+ page_count,
+ text_depth,
+ source_metadata,
+ base_unified_metadata,
+ delimiter="\n\n",
+ )
+
+ if len(text_extraction) > 0:
+ extracted_data.append(text_extraction)
+
+ nemoretriever_parse_client.close()
+ doc.close()
+
+ return extracted_data
+
+
+def _extract_text_and_bounding_boxes(
+ pages: list,
+ nemoretriever_parse_client,
+ trace_info=None,
+) -> list:
+
+ # Collect all page indices and images in order.
+ image_page_indices = [page[0] for page in pages]
+ original_images = [page[1] for page in pages]
+
+ # Prepare the data payload with all images.
+ data = {"images": original_images}
+
+ # Perform inference using the NimClient.
+ inference_results = nemoretriever_parse_client.infer(
+ data=data,
+ model_name="nemoretriever_parse",
+ stage_name="pdf_content_extractor",
+ max_batch_size=NEMORETRIEVER_PARSE_MAX_BATCH_SIZE,
+ trace_info=trace_info,
+ )
+
+ return list(zip(image_page_indices, inference_results))
+
+
+def _create_clients(nemoretriever_parse_config):
+ model_interface = nemoretriever_parse_utils.NemoRetrieverParseModelInterface()
+ nemoretriever_parse_client = create_inference_client(
+ nemoretriever_parse_config.nemoretriever_parse_endpoints,
+ model_interface,
+ nemoretriever_parse_config.auth_token,
+ nemoretriever_parse_config.nemoretriever_parse_infer_protocol,
+ )
+
+ return nemoretriever_parse_client
+
+
+def _send_inference_request(
+ nemoretriever_parse_client,
+ image_array: np.ndarray,
+) -> Dict[str, Any]:
+
+ try:
+ # NIM only supports processing one page at a time (batch size = 1).
+ data = {"image": image_array}
+ response = nemoretriever_parse_client.infer(
+ data=data,
+ model_name="nemoretriever_parse",
+ )
+ except Exception as e:
+ logger.error(f"Unhandled error during NemoRetrieverParse inference: {e}")
+ traceback.print_exc()
+ raise e
+
+ return response
+
+
+def _convert_pdfium_page_to_numpy(
+ page: pdfium.PdfPage,
+ render_dpi: int = NEMORETRIEVER_PARSE_RENDER_DPI,
+ scale_tuple: Tuple[int, int] = (NEMORETRIEVER_PARSE_MAX_WIDTH, NEMORETRIEVER_PARSE_MAX_HEIGHT),
+ padding_tuple: Tuple[int, int] = (NEMORETRIEVER_PARSE_MAX_WIDTH, NEMORETRIEVER_PARSE_MAX_HEIGHT),
+) -> np.ndarray:
+ page_images, _ = pdfium_pages_to_numpy(
+ [page], render_dpi=render_dpi, scale_tuple=scale_tuple, padding_tuple=padding_tuple
+ )
+
+ return page_images[0]
+
+
+def _insert_page_nearby_blocks(
+ page_nearby_blocks: Dict[str, Any],
+ cls: str,
+ txt: str,
+ bbox: str,
+):
+ if cls in nemoretriever_parse_utils.ACCEPTED_TEXT_CLASSES:
+ nearby_blocks_key = "text"
+ elif cls in nemoretriever_parse_utils.ACCEPTED_TABLE_CLASSES:
+ nearby_blocks_key = "structured"
+ elif cls in nemoretriever_parse_utils.ACCEPTED_IMAGE_CLASSES:
+ nearby_blocks_key = "images"
+
+ page_nearby_blocks[nearby_blocks_key]["content"].append(txt)
+ page_nearby_blocks[nearby_blocks_key]["bbox"].append(bbox)
+ page_nearby_blocks[nearby_blocks_key]["type"].append(cls)
+
+
+@pdfium_exception_handler(descriptor="nemoretriever_parse")
+def _construct_table_metadata(
+ table: LatexTable,
+ page_idx: int,
+ page_count: int,
+ source_metadata: Dict,
+ base_unified_metadata: Dict,
+):
+ content = table.latex
+ table_format = TableFormatEnum.LATEX
+ subtype = ContentSubtypeEnum.TABLE
+ description = StdContentDescEnum.PDF_TABLE
+
+ content_metadata = {
+ "type": ContentTypeEnum.STRUCTURED,
+ "description": description,
+ "page_number": page_idx,
+ "hierarchy": {
+ "page_count": page_count,
+ "page": page_idx,
+ "line": -1,
+ "span": -1,
+ },
+ "subtype": subtype,
+ }
+ table_metadata = {
+ "caption": "",
+ "table_content": content,
+ "table_format": table_format,
+ "table_location": table.bbox,
+ "table_location_max_dimensions": (table.max_width, table.max_height),
+ }
+ ext_unified_metadata = base_unified_metadata.copy()
+
+ ext_unified_metadata.update(
+ {
+ "content": "",
+ "source_metadata": source_metadata,
+ "content_metadata": content_metadata,
+ "table_metadata": table_metadata,
+ }
+ )
+
+ validated_unified_metadata = validate_metadata(ext_unified_metadata)
+
+ return [ContentTypeEnum.STRUCTURED, validated_unified_metadata.model_dump(), str(uuid.uuid4())]
diff --git a/src/nv_ingest/schemas/metadata_schema.py b/src/nv_ingest/schemas/metadata_schema.py
index 9de9aba9..f794c516 100644
--- a/src/nv_ingest/schemas/metadata_schema.py
+++ b/src/nv_ingest/schemas/metadata_schema.py
@@ -209,6 +209,7 @@ class NearbyObjectsSubSchema(BaseModelNoExt):
content: List[str] = []
bbox: List[tuple] = []
+ type: List[str] = []
class NearbyObjectsSchema(BaseModelNoExt):
@@ -252,6 +253,7 @@ class TextMetadataSchema(BaseModelNoExt):
keywords: Union[str, List[str], Dict] = ""
language: LanguageEnum = "en" # default to Unknown? Maybe do some kind of heuristic check
text_location: tuple = (0, 0, 0, 0)
+ text_location_max_dimensions: tuple = (0, 0, 0, 0)
class ImageMetadataSchema(BaseModelNoExt):
diff --git a/src/nv_ingest/schemas/pdf_extractor_schema.py b/src/nv_ingest/schemas/pdf_extractor_schema.py
index b0cc9457..e02bea93 100644
--- a/src/nv_ingest/schemas/pdf_extractor_schema.py
+++ b/src/nv_ingest/schemas/pdf_extractor_schema.py
@@ -71,17 +71,92 @@ def validate_endpoints(cls, values):
If both gRPC and HTTP services are empty for any endpoint.
"""
- def clean_service(service):
- """Set service to None if it's an empty string or contains only spaces or quotes."""
- if service is None or not service.strip() or service.strip(" \"'") == "":
- return None
- return service
-
for model_name in ["yolox"]:
endpoint_name = f"{model_name}_endpoints"
grpc_service, http_service = values.get(endpoint_name)
- grpc_service = clean_service(grpc_service)
- http_service = clean_service(http_service)
+ grpc_service = _clean_service(grpc_service)
+ http_service = _clean_service(http_service)
+
+ if not grpc_service and not http_service:
+ raise ValueError(f"Both gRPC and HTTP services cannot be empty for {endpoint_name}.")
+
+ values[endpoint_name] = (grpc_service, http_service)
+
+ protocol_name = f"{model_name}_infer_protocol"
+ protocol_value = values.get(protocol_name)
+ if not protocol_value:
+ protocol_value = "http" if http_service else "grpc" if grpc_service else ""
+ protocol_value = protocol_value.lower()
+ values[protocol_name] = protocol_value
+
+ return values
+
+ model_config = ConfigDict(extra="forbid")
+
+
+class NemoRetrieverParseConfigSchema(BaseModel):
+ """
+ Configuration schema for NemoRetrieverParse endpoints and options.
+
+ Parameters
+ ----------
+ auth_token : Optional[str], default=None
+ Authentication token required for secure services.
+
+ nemoretriever_parse_endpoints : Tuple[str, str]
+ A tuple containing the gRPC and HTTP services for the nemoretriever_parse endpoint.
+ Either the gRPC or HTTP service can be empty, but not both.
+
+ Methods
+ -------
+ validate_endpoints(values)
+ Validates that at least one of the gRPC or HTTP services is provided for each endpoint.
+
+ Raises
+ ------
+ ValueError
+ If both gRPC and HTTP services are empty for any endpoint.
+
+ Config
+ ------
+ extra : str
+ Pydantic config option to forbid extra fields.
+ """
+
+ auth_token: Optional[str] = None
+
+ nemoretriever_parse_endpoints: Tuple[Optional[str], Optional[str]] = (None, None)
+ nemoretriever_parse_infer_protocol: str = ""
+
+ workers_per_progress_engine: int = 2
+
+ @model_validator(mode="before")
+ @classmethod
+ def validate_endpoints(cls, values):
+ """
+ Validates the gRPC and HTTP services for all endpoints.
+
+ Parameters
+ ----------
+ values : dict
+ Dictionary containing the values of the attributes for the class.
+
+ Returns
+ -------
+ dict
+ The validated dictionary of values.
+
+ Raises
+ ------
+ ValueError
+ If both gRPC and HTTP services are empty for any endpoint.
+ """
+
+ for model_name in ["nemoretriever_parse"]:
+ endpoint_name = f"{model_name}_endpoints"
+ grpc_service, http_service = values.get(endpoint_name)
+ grpc_service = _clean_service(grpc_service)
+ http_service = _clean_service(http_service)
if not grpc_service and not http_service:
raise ValueError(f"Both gRPC and HTTP services cannot be empty for {endpoint_name}.")
@@ -124,4 +199,13 @@ class PDFExtractorSchema(BaseModel):
raise_on_failure: bool = False
pdfium_config: Optional[PDFiumConfigSchema] = None
+ nemoretriever_parse_config: Optional[NemoRetrieverParseConfigSchema] = None
+
model_config = ConfigDict(extra="forbid")
+
+
+def _clean_service(service):
+ """Set service to None if it's an empty string or contains only spaces or quotes."""
+ if service is None or not service.strip() or service.strip(" \"'") == "":
+ return None
+ return service
diff --git a/src/nv_ingest/stages/nim/table_extraction.py b/src/nv_ingest/stages/nim/table_extraction.py
index b980e847..fbc58077 100644
--- a/src/nv_ingest/stages/nim/table_extraction.py
+++ b/src/nv_ingest/stages/nim/table_extraction.py
@@ -7,13 +7,14 @@
from typing import Any, Dict, List, Optional, Tuple
import pandas as pd
-
from morpheus.config import Config
from nv_ingest.schemas.table_extractor_schema import TableExtractorSchema
from nv_ingest.stages.multiprocessing_stage import MultiProcessingBaseStage
from nv_ingest.util.image_processing.transforms import base64_to_numpy
-from nv_ingest.util.nim.helpers import create_inference_client, NimClient, get_version
+from nv_ingest.util.nim.helpers import NimClient
+from nv_ingest.util.nim.helpers import create_inference_client
+from nv_ingest.util.nim.helpers import get_version
from nv_ingest.util.nim.paddle import PaddleOCRModelInterface
logger = logging.getLogger(__name__)
diff --git a/src/nv_ingest/stages/pdf_extractor_stage.py b/src/nv_ingest/stages/pdf_extractor_stage.py
index 1cc60452..f8e4664d 100644
--- a/src/nv_ingest/stages/pdf_extractor_stage.py
+++ b/src/nv_ingest/stages/pdf_extractor_stage.py
@@ -88,6 +88,8 @@ def decode_and_extract(
if validated_config.pdfium_config is not None:
extract_params["pdfium_config"] = validated_config.pdfium_config
+ if validated_config.nemoretriever_parse_config is not None:
+ extract_params["nemoretriever_parse_config"] = validated_config.nemoretriever_parse_config
if trace_info is not None:
extract_params["trace_info"] = trace_info
diff --git a/src/nv_ingest/util/nim/doughnut.py b/src/nv_ingest/util/nim/doughnut.py
deleted file mode 100644
index 84cafe3b..00000000
--- a/src/nv_ingest/util/nim/doughnut.py
+++ /dev/null
@@ -1,165 +0,0 @@
-# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
-# All rights reserved.
-# SPDX-License-Identifier: Apache-2.0
-
-import logging
-import re
-from typing import List
-from typing import Tuple
-
-ACCEPTED_TEXT_CLASSES = set(
- [
- "Text",
- "Title",
- "Section-header",
- "List-item",
- "TOC",
- "Bibliography",
- "Formula",
- "Page-header",
- "Page-footer",
- "Caption",
- "Footnote",
- "Floating-text",
- ]
-)
-ACCEPTED_TABLE_CLASSES = set(
- [
- "Table",
- ]
-)
-ACCEPTED_IMAGE_CLASSES = set(
- [
- "Picture",
- ]
-)
-ACCEPTED_CLASSES = ACCEPTED_TEXT_CLASSES | ACCEPTED_TABLE_CLASSES | ACCEPTED_IMAGE_CLASSES
-
-_re_extract_class_bbox = re.compile(
- r"((?:|.(?:(?", # noqa: E501
- re.MULTILINE | re.DOTALL,
-)
-
-logger = logging.getLogger(__name__)
-
-
-def extract_classes_bboxes(text: str) -> Tuple[List[str], List[Tuple[int, int, int, int]], List[str]]:
- classes: List[str] = []
- bboxes: List[Tuple[int, int, int, int]] = []
- texts: List[str] = []
-
- last_end = 0
-
- for m in _re_extract_class_bbox.finditer(text):
- start, end = m.span()
-
- # [Bad box] Add the non-match chunk (text between the last match and the current match)
- if start > last_end:
- bad_text = text[last_end:start].strip()
- classes.append("Bad-box")
- bboxes.append((0, 0, 0, 0))
- texts.append(bad_text)
-
- last_end = end
-
- x1, y1, text, x2, y2, cls = m.groups()
-
- bbox = tuple(map(int, (x1, y1, x2, y2)))
-
- # [Bad box] check if the class is a valid class.
- if cls not in ACCEPTED_CLASSES:
- logger.debug(f"Dropped a bad box: invalid class {cls} at {bbox}.")
- classes.append("Bad-box")
- bboxes.append(bbox)
- texts.append(text)
- continue
-
- # Drop bad box: drop if the box is invalid.
- if (bbox[0] >= bbox[2]) or (bbox[1] >= bbox[3]):
- logger.debug(f"Dropped a bad box: invalid box {cls} at {bbox}.")
- classes.append("Bad-box")
- bboxes.append(bbox)
- texts.append(text)
- continue
-
- classes.append(cls)
- bboxes.append(bbox)
- texts.append(text)
-
- if last_end < len(text):
- bad_text = text[last_end:].strip()
- if len(bad_text) > 0:
- classes.append("Bad-box")
- bboxes.append((0, 0, 0, 0))
- texts.append(bad_text)
-
- return classes, bboxes, texts
-
-
-def _fix_dots(m):
- # Remove spaces between dots.
- s = m.group(0)
- return s.startswith(" ") * " " + min(5, s.count(".")) * "." + s.endswith(" ") * " "
-
-
-def strip_markdown_formatting(text):
- # Remove headers (e.g., # Header, ## Header, ### Header)
- text = re.sub(r"^(#+)\s*(.*)", r"\2", text, flags=re.MULTILINE)
-
- # Remove bold formatting (e.g., **bold text** or __bold text__)
- text = re.sub(r"\*\*(.*?)\*\*", r"\1", text)
- text = re.sub(r"__(.*?)__", r"\1", text)
-
- # Remove italic formatting (e.g., *italic text* or _italic text_)
- text = re.sub(r"\*(.*?)\*", r"\1", text)
- text = re.sub(r"_(.*?)_", r"\1", text)
-
- # Remove strikethrough formatting (e.g., ~~strikethrough~~)
- text = re.sub(r"~~(.*?)~~", r"\1", text)
-
- # Remove list items (e.g., - item, * item, 1. item)
- text = re.sub(r"^\s*([-*+]|[0-9]+\.)\s+", "", text, flags=re.MULTILINE)
-
- # Remove hyperlinks (e.g., [link text](http://example.com))
- text = re.sub(r"\[(.*?)\]\(.*?\)", r"\1", text)
-
- # Remove inline code (e.g., `code`)
- text = re.sub(r"`(.*?)`", r"\1", text)
-
- # Remove blockquotes (e.g., > quote)
- text = re.sub(r"^\s*>\s*(.*)", r"\1", text, flags=re.MULTILINE)
-
- # Remove multiple newlines
- text = re.sub(r"\n{3,}", "\n\n", text)
-
- # Limit dots sequences to max 5 dots
- text = re.sub(r"(?:\s*\.\s*){3,}", _fix_dots, text, flags=re.DOTALL)
-
- return text
-
-
-def reverse_transform_bbox(
- bbox: Tuple[int, int, int, int],
- bbox_offset: Tuple[int, int],
- original_width: int,
- original_height: int,
-) -> Tuple[int, int, int, int]:
- width_ratio = (original_width - 2 * bbox_offset[0]) / original_width
- height_ratio = (original_height - 2 * bbox_offset[1]) / original_height
- w1, h1, w2, h2 = bbox
- w1 = int((w1 - bbox_offset[0]) / width_ratio)
- h1 = int((h1 - bbox_offset[1]) / height_ratio)
- w2 = int((w2 - bbox_offset[0]) / width_ratio)
- h2 = int((h2 - bbox_offset[1]) / height_ratio)
-
- return (w1, h1, w2, h2)
-
-
-def postprocess_text(txt: str, cls: str):
- if cls in ACCEPTED_CLASSES:
- txt = txt.replace("", "").strip() # remove tokens (continued paragraphs)
- txt = strip_markdown_formatting(txt)
- else:
- txt = ""
-
- return txt
diff --git a/src/nv_ingest/util/nim/nemoretriever_parse.py b/src/nv_ingest/util/nim/nemoretriever_parse.py
new file mode 100644
index 00000000..1bca39f5
--- /dev/null
+++ b/src/nv_ingest/util/nim/nemoretriever_parse.py
@@ -0,0 +1,223 @@
+# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
+# All rights reserved.
+# SPDX-License-Identifier: Apache-2.0
+
+import json
+import logging
+from typing import Any
+from typing import Dict
+from typing import List
+from typing import Optional
+
+from nv_ingest.util.image_processing.transforms import numpy_to_base64
+from nv_ingest.util.nim.helpers import ModelInterface
+
+ACCEPTED_TEXT_CLASSES = set(
+ [
+ "Text",
+ "Title",
+ "Section-header",
+ "List-item",
+ "TOC",
+ "Bibliography",
+ "Formula",
+ "Page-header",
+ "Page-footer",
+ "Caption",
+ "Footnote",
+ "Floating-text",
+ ]
+)
+ACCEPTED_TABLE_CLASSES = set(
+ [
+ "Table",
+ ]
+)
+ACCEPTED_IMAGE_CLASSES = set(
+ [
+ "Picture",
+ ]
+)
+ACCEPTED_CLASSES = ACCEPTED_TEXT_CLASSES | ACCEPTED_TABLE_CLASSES | ACCEPTED_IMAGE_CLASSES
+
+logger = logging.getLogger(__name__)
+
+
+class NemoRetrieverParseModelInterface(ModelInterface):
+ """
+ An interface for handling inference with a NemoRetrieverParse model.
+ """
+
+ def name(self) -> str:
+ """
+ Get the name of the model interface.
+
+ Returns
+ -------
+ str
+ The name of the model interface.
+ """
+ return "nemoretriever_parse"
+
+ def prepare_data_for_inference(self, data: Dict[str, Any]) -> Dict[str, Any]:
+ """
+ Prepare input data for inference by resizing images and storing their original shapes.
+
+ Parameters
+ ----------
+ data : dict
+ The input data containing a list of images.
+
+ Returns
+ -------
+ dict
+ The updated data dictionary with resized images and original image shapes.
+ """
+
+ return data
+
+ def format_input(self, data: Dict[str, Any], protocol: str, max_batch_size: int, **kwargs) -> Any:
+ """
+ Format input data for the specified protocol.
+
+ Parameters
+ ----------
+ data : dict
+ The input data to format.
+ protocol : str
+ The protocol to use ("grpc" or "http").
+ **kwargs : dict
+ Additional parameters for HTTP payload formatting.
+
+ Returns
+ -------
+ Any
+ The formatted input data.
+
+ Raises
+ ------
+ ValueError
+ If an invalid protocol is specified.
+ """
+
+ # Helper function: chunk a list into sublists of length <= chunk_size.
+ def chunk_list(lst: list, chunk_size: int) -> List[list]:
+ return [lst[i : i + chunk_size] for i in range(0, len(lst), chunk_size)]
+
+ if protocol == "grpc":
+ raise ValueError("gRPC protocol is not supported for NemoRetrieverParse.")
+ elif protocol == "http":
+ logger.debug("Formatting input for HTTP NemoRetrieverParse model")
+ # Prepare payload for HTTP request
+
+ if "images" in data:
+ base64_list = [numpy_to_base64(img) for img in data["images"]]
+ else:
+ base64_list = [numpy_to_base64(data["image"])]
+
+ payloads = []
+ for chunk in chunk_list(base64_list, max_batch_size):
+ payload = self._prepare_nemoretriever_parse_payload(chunk)
+ payloads.append(payload)
+ return payloads
+ else:
+ raise ValueError("Invalid protocol specified. Must be 'grpc' or 'http'.")
+
+ def parse_output(self, response: Any, protocol: str, data: Optional[Dict[str, Any]] = None, **kwargs) -> Any:
+ """
+ Parse the output from the model's inference response.
+
+ Parameters
+ ----------
+ response : Any
+ The response from the model inference.
+ protocol : str
+ The protocol used ("grpc" or "http").
+ data : dict, optional
+ Additional input data passed to the function.
+
+ Returns
+ -------
+ Any
+ The parsed output data.
+
+ Raises
+ ------
+ ValueError
+ If an invalid protocol is specified.
+ """
+
+ if protocol == "grpc":
+ raise ValueError("gRPC protocol is not supported for NemoRetrieverParse.")
+ elif protocol == "http":
+ logger.debug("Parsing output from HTTP NemoRetrieverParse model")
+ return self._extract_content_from_nemoretriever_parse_response(response)
+ else:
+ raise ValueError("Invalid protocol specified. Must be 'grpc' or 'http'.")
+
+ def process_inference_results(self, output: Any, **kwargs) -> Any:
+ """
+ Process inference results for the NemoRetrieverParse model.
+
+ Parameters
+ ----------
+ output : Any
+ The raw output from the model.
+
+ Returns
+ -------
+ Any
+ The processed inference results.
+ """
+
+ return output
+
+ def _prepare_nemoretriever_parse_payload(self, base64_list: List[str]) -> Dict[str, Any]:
+ messages = []
+
+ for b64_img in base64_list:
+ messages.append(
+ {
+ "role": "user",
+ "content": [
+ {
+ "type": "image_url",
+ "image_url": {
+ "url": f"data:image/png;base64,{b64_img}",
+ },
+ }
+ ],
+ }
+ )
+ payload = {
+ "model": "nvidia/nemoretriever-parse",
+ "messages": messages,
+ }
+
+ return payload
+
+ def _extract_content_from_nemoretriever_parse_response(self, json_response: Dict[str, Any]) -> Any:
+ """
+ Extract content from the JSON response of a Deplot HTTP API request.
+
+ Parameters
+ ----------
+ json_response : dict
+ The JSON response from the Deplot API.
+
+ Returns
+ -------
+ Any
+ The extracted content from the response.
+
+ Raises
+ ------
+ RuntimeError
+ If the response does not contain the expected "choices" key or if it is empty.
+ """
+
+ if "choices" not in json_response or not json_response["choices"]:
+ raise RuntimeError("Unexpected response format: 'choices' key is missing or empty.")
+
+ tool_call = json_response["choices"][0]["message"]["tool_calls"][0]
+ return json.loads(tool_call["function"]["arguments"])
diff --git a/src/nv_ingest/util/pdf/metadata_aggregators.py b/src/nv_ingest/util/pdf/metadata_aggregators.py
index 2ffa3a0a..3a1721db 100644
--- a/src/nv_ingest/util/pdf/metadata_aggregators.py
+++ b/src/nv_ingest/util/pdf/metadata_aggregators.py
@@ -11,6 +11,7 @@
from typing import Any
from typing import Dict
from typing import List
+from typing import Optional
from typing import Tuple
import pandas as pd
@@ -21,6 +22,7 @@
from nv_ingest.schemas.metadata_schema import ContentSubtypeEnum
from nv_ingest.schemas.metadata_schema import ContentTypeEnum
from nv_ingest.schemas.metadata_schema import ImageTypeEnum
+from nv_ingest.schemas.metadata_schema import NearbyObjectsSchema
from nv_ingest.schemas.metadata_schema import StdContentDescEnum
from nv_ingest.schemas.metadata_schema import TableFormatEnum
from nv_ingest.schemas.metadata_schema import validate_metadata
@@ -145,8 +147,11 @@ def construct_text_metadata(
text_depth,
source_metadata,
base_unified_metadata,
+ delimiter=" ",
+ bbox_max_dimensions: Tuple[int, int] = (-1, -1),
+ nearby_objects: Optional[Dict[str, Any]] = None,
):
- extracted_text = " ".join(accumulated_text)
+ extracted_text = delimiter.join(accumulated_text)
content_metadata = {
"type": ContentTypeEnum.TEXT,
@@ -158,6 +163,7 @@ def construct_text_metadata(
"block": -1,
"line": -1,
"span": -1,
+ "nearby_objects": nearby_objects or NearbyObjectsSchema(),
},
}
@@ -172,6 +178,7 @@ def construct_text_metadata(
"keywords": keywords,
"language": language,
"text_location": bbox,
+ "text_location_max_dimensions": bbox_max_dimensions,
}
ext_unified_metadata = base_unified_metadata.copy()
diff --git a/src/nv_ingest/util/pipeline/pipeline_runners.py b/src/nv_ingest/util/pipeline/pipeline_runners.py
index 5d01c12d..5f3dba72 100644
--- a/src/nv_ingest/util/pipeline/pipeline_runners.py
+++ b/src/nv_ingest/util/pipeline/pipeline_runners.py
@@ -38,7 +38,10 @@ class PipelineCreationSchema(BaseModel):
cached_infer_protocol: str = "http"
deplot_http_endpoint: str = os.getenv("DEPLOT_HTTP_ENDPOINT", "https://ai.api.nvidia.com/v1/vlm/google/deplot")
deplot_infer_protocol: str = "http"
- doughnut_grpc_triton: str = "triton-doughnut:8001"
+ nemoretriever_parse_http_endpoint: str = os.getenv(
+ "NEMORETRIEVER_PARSE_HTTP_ENDPOINT", "https://ai.api.nvidia.com/v1/vlm/nvidia/nemoretriever-parse"
+ )
+ nemoretriever_parse_infer_protocol: str = "http"
embedding_nim_endpoint: str = os.getenv("EMBEDDING_NIM_ENDPOINT", "https://integrate.api.nvidia.com/v1")
embedding_nim_model_name: str = os.getenv("EMBEDDING_NIM_MODEL_NAME", "nvidia/nv-embedqa-e5-v5")
ingest_log_level: str = os.getenv("INGEST_LOG_LEVEL", "INFO")
diff --git a/src/nv_ingest/util/pipeline/stage_builders.py b/src/nv_ingest/util/pipeline/stage_builders.py
index 6824c431..c2e8758d 100644
--- a/src/nv_ingest/util/pipeline/stage_builders.py
+++ b/src/nv_ingest/util/pipeline/stage_builders.py
@@ -68,7 +68,7 @@ def get_caption_classifier_service():
return triton_service_caption_classifier, triton_service_caption_classifier_name
-def get_table_detection_service(env_var_prefix):
+def get_nim_service(env_var_prefix):
prefix = env_var_prefix.upper()
grpc_endpoint = os.environ.get(
f"{prefix}_GRPC_ENDPOINT",
@@ -85,6 +85,7 @@ def get_table_detection_service(env_var_prefix):
"NGC_API_KEY",
"",
)
+
infer_protocol = os.environ.get(
f"{prefix}_INFER_PROTOCOL",
"http" if http_endpoint else "grpc" if grpc_endpoint else "",
@@ -179,7 +180,10 @@ def add_metadata_injector_stage(pipe, morpheus_pipeline_config):
def add_pdf_extractor_stage(pipe, morpheus_pipeline_config, ingest_config, default_cpu_count):
- yolox_grpc, yolox_http, yolox_auth, yolox_protocol = get_table_detection_service("yolox")
+ yolox_grpc, yolox_http, yolox_auth, yolox_protocol = get_nim_service("yolox")
+ nemoretriever_parse_grpc, nemoretriever_parse_http, nemoretriever_parse_auth, nemoretriever_parse_protocol = (
+ get_nim_service("nemoretriever_parse")
+ )
pdf_content_extractor_config = ingest_config.get(
"pdf_content_extraction_module",
{
@@ -187,7 +191,12 @@ def add_pdf_extractor_stage(pipe, morpheus_pipeline_config, ingest_config, defau
"yolox_endpoints": (yolox_grpc, yolox_http),
"yolox_infer_protocol": yolox_protocol,
"auth_token": yolox_auth, # All auth tokens are the same for the moment
- }
+ },
+ "nemoretriever_parse_config": {
+ "nemoretriever_parse_endpoints": (nemoretriever_parse_grpc, nemoretriever_parse_http),
+ "nemoretriever_parse_infer_protocol": nemoretriever_parse_protocol,
+ "auth_token": nemoretriever_parse_auth, # All auth tokens are the same for the moment
+ },
},
)
pdf_extractor_stage = pipe.add_stage(
@@ -204,8 +213,8 @@ def add_pdf_extractor_stage(pipe, morpheus_pipeline_config, ingest_config, defau
def add_table_extractor_stage(pipe, morpheus_pipeline_config, ingest_config, default_cpu_count):
- _, _, yolox_auth, _ = get_table_detection_service("yolox")
- paddle_grpc, paddle_http, paddle_auth, paddle_protocol = get_table_detection_service("paddle")
+ _, _, yolox_auth, _ = get_nim_service("yolox")
+ paddle_grpc, paddle_http, paddle_auth, paddle_protocol = get_nim_service("paddle")
table_content_extractor_config = ingest_config.get(
"table_content_extraction_module",
{
@@ -225,12 +234,12 @@ def add_table_extractor_stage(pipe, morpheus_pipeline_config, ingest_config, def
def add_chart_extractor_stage(pipe, morpheus_pipeline_config, ingest_config, default_cpu_count):
- _, _, yolox_auth, _ = get_table_detection_service("yolox")
+ _, _, yolox_auth, _ = get_nim_service("yolox")
- deplot_grpc, deplot_http, deplot_auth, deplot_protocol = get_table_detection_service("deplot")
- cached_grpc, cached_http, cached_auth, cached_protocol = get_table_detection_service("cached")
+ deplot_grpc, deplot_http, deplot_auth, deplot_protocol = get_nim_service("deplot")
+ cached_grpc, cached_http, cached_auth, cached_protocol = get_nim_service("cached")
# NOTE: Paddle isn't currently used directly by the chart extraction stage, but will be in the future.
- paddle_grpc, paddle_http, paddle_auth, paddle_protocol = get_table_detection_service("paddle")
+ paddle_grpc, paddle_http, paddle_auth, paddle_protocol = get_nim_service("paddle")
table_content_extractor_config = ingest_config.get(
"table_content_extraction_module",
@@ -255,15 +264,14 @@ def add_chart_extractor_stage(pipe, morpheus_pipeline_config, ingest_config, def
def add_image_extractor_stage(pipe, morpheus_pipeline_config, ingest_config, default_cpu_count):
- yolox_grpc, yolox_http, yolox_auth, yolox_protocol = get_table_detection_service("yolox")
+ yolox_grpc, yolox_http, yolox_auth, yolox_protocol = get_nim_service("yolox")
image_extractor_config = ingest_config.get(
"image_extraction_module",
{
"image_extraction_config": {
"yolox_endpoints": (yolox_grpc, yolox_http),
"yolox_infer_protocol": yolox_protocol,
- "auth_token": yolox_auth,
- # All auth tokens are the same for the moment
+ "auth_token": yolox_auth, # All auth tokens are the same for the moment
}
},
)
@@ -280,7 +288,7 @@ def add_image_extractor_stage(pipe, morpheus_pipeline_config, ingest_config, def
def add_docx_extractor_stage(pipe, morpheus_pipeline_config, ingest_config, default_cpu_count):
- yolox_grpc, yolox_http, yolox_auth, yolox_protocol = get_table_detection_service("yolox")
+ yolox_grpc, yolox_http, yolox_auth, yolox_protocol = get_nim_service("yolox")
docx_extractor_config = ingest_config.get(
"docx_extraction_module",
{
@@ -304,7 +312,7 @@ def add_docx_extractor_stage(pipe, morpheus_pipeline_config, ingest_config, defa
def add_pptx_extractor_stage(pipe, morpheus_pipeline_config, ingest_config, default_cpu_count):
- yolox_grpc, yolox_http, yolox_auth, yolox_protocol = get_table_detection_service("yolox")
+ yolox_grpc, yolox_http, yolox_auth, yolox_protocol = get_nim_service("yolox")
pptx_extractor_config = ingest_config.get(
"pptx_extraction_module",
{
diff --git a/tests/nv_ingest/extraction_workflows/pdf/test_eclair_helper.py b/tests/nv_ingest/extraction_workflows/pdf/test_eclair_helper.py
deleted file mode 100644
index e69de29b..00000000
diff --git a/tests/nv_ingest/extraction_workflows/pdf/test_nemoretriever_parse_helper.py b/tests/nv_ingest/extraction_workflows/pdf/test_nemoretriever_parse_helper.py
new file mode 100644
index 00000000..7853d505
--- /dev/null
+++ b/tests/nv_ingest/extraction_workflows/pdf/test_nemoretriever_parse_helper.py
@@ -0,0 +1,159 @@
+from io import BytesIO
+from unittest.mock import MagicMock
+from unittest.mock import patch
+
+import numpy as np
+import pandas as pd
+import pytest
+
+from nv_ingest.extraction_workflows.pdf.nemoretriever_parse_helper import _construct_table_metadata
+from nv_ingest.extraction_workflows.pdf.nemoretriever_parse_helper import nemoretriever_parse
+from nv_ingest.schemas.metadata_schema import AccessLevelEnum
+from nv_ingest.schemas.metadata_schema import TextTypeEnum
+from nv_ingest.util.nim import nemoretriever_parse as nemoretriever_parse_utils
+from nv_ingest.util.pdf.metadata_aggregators import Base64Image
+from nv_ingest.util.pdf.metadata_aggregators import LatexTable
+
+_MODULE_UNDER_TEST = "nv_ingest.extraction_workflows.pdf.nemoretriever_parse_helper"
+
+
+@pytest.fixture
+def document_df():
+ """Fixture to create a DataFrame for testing."""
+ return pd.DataFrame(
+ {
+ "source_id": ["source1"],
+ }
+ )
+
+
+@pytest.fixture
+def sample_pdf_stream():
+ with open("data/test.pdf", "rb") as f:
+ pdf_stream = BytesIO(f.read())
+ return pdf_stream
+
+
+@patch(f"{_MODULE_UNDER_TEST}.create_inference_client")
+def test_nemoretriever_parse_text_extraction(mock_client, sample_pdf_stream, document_df):
+ mock_client_instance = MagicMock()
+ mock_client.return_value = mock_client_instance
+ mock_client_instance.infer.return_value = [
+ {
+ "bbox": {"xmin": 0.16633729456384325, "ymin": 0.0969, "xmax": 0.3097820480404551, "ymax": 0.1102},
+ "text": "testing",
+ "type": "Text",
+ }
+ ]
+
+ result = nemoretriever_parse(
+ pdf_stream=sample_pdf_stream,
+ extract_text=True,
+ extract_images=False,
+ extract_tables=False,
+ row_data=document_df.iloc[0],
+ text_depth="page",
+ nemoretriever_parse_config=MagicMock(),
+ )
+
+ assert len(result) == 1
+ assert result[0][0].value == "text"
+ assert result[0][1]["content"] == "testing"
+ assert result[0][1]["source_metadata"]["source_id"] == "source1"
+
+
+@patch(f"{_MODULE_UNDER_TEST}.create_inference_client")
+def test_nemoretriever_parse_table_extraction(mock_client, sample_pdf_stream, document_df):
+ mock_client_instance = MagicMock()
+ mock_client.return_value = mock_client_instance
+ mock_client_instance.infer.return_value = [
+ {
+ "bbox": {"xmin": 1 / 1024, "ymin": 2 / 1280, "xmax": 101 / 1024, "ymax": 102 / 1280},
+ "text": "table text",
+ "type": "Table",
+ }
+ ]
+
+ result = nemoretriever_parse(
+ pdf_stream=sample_pdf_stream,
+ extract_text=True,
+ extract_images=False,
+ extract_tables=True,
+ row_data=document_df.iloc[0],
+ text_depth="page",
+ nemoretriever_parse_config=MagicMock(),
+ )
+
+ assert len(result) == 2
+ assert result[0][0].value == "structured"
+ assert result[0][1]["table_metadata"]["table_content"] == "table text"
+ assert result[0][1]["table_metadata"]["table_location"] == (1, 2, 101, 102)
+ assert result[0][1]["table_metadata"]["table_location_max_dimensions"] == (1024, 1280)
+ assert result[1][0].value == "text"
+
+
+@patch(f"{_MODULE_UNDER_TEST}.create_inference_client")
+def test_nemoretriever_parse_image_extraction(mock_client, sample_pdf_stream, document_df):
+ mock_client_instance = MagicMock()
+ mock_client.return_value = mock_client_instance
+ mock_client_instance.infer.return_value = [
+ {
+ "bbox": {"xmin": 1 / 1024, "ymin": 2 / 1280, "xmax": 101 / 1024, "ymax": 102 / 1280},
+ "text": "",
+ "type": "Picture",
+ }
+ ]
+
+ result = nemoretriever_parse(
+ pdf_stream=sample_pdf_stream,
+ extract_text=True,
+ extract_images=True,
+ extract_tables=False,
+ row_data=document_df.iloc[0],
+ text_depth="page",
+ nemoretriever_parse_config=MagicMock(),
+ )
+
+ assert len(result) == 2
+ assert result[0][0].value == "image"
+ assert result[0][1]["content"][:10] == "iVBORw0KGg" # PNG format header
+ assert result[0][1]["image_metadata"]["image_location"] == (1, 2, 101, 102)
+ assert result[0][1]["image_metadata"]["image_location_max_dimensions"] == (1024, 1280)
+ assert result[1][0].value == "text"
+
+
+@patch(f"{_MODULE_UNDER_TEST}.create_inference_client")
+def test_nemoretriever_parse_text_extraction_bboxes(mock_client, sample_pdf_stream, document_df):
+ mock_client_instance = MagicMock()
+ mock_client.return_value = mock_client_instance
+ mock_client_instance.infer.return_value = [
+ {
+ "bbox": {"xmin": 0.16633729456384325, "ymin": 0.0969, "xmax": 0.3097820480404551, "ymax": 0.1102},
+ "text": "testing0",
+ "type": "Title",
+ },
+ {
+ "bbox": {"xmin": 0.16633729456384325, "ymin": 0.0969, "xmax": 0.3097820480404551, "ymax": 0.1102},
+ "text": "testing1",
+ "type": "Text",
+ },
+ ]
+
+ result = nemoretriever_parse(
+ pdf_stream=sample_pdf_stream,
+ extract_text=True,
+ extract_images=False,
+ extract_tables=False,
+ row_data=document_df.iloc[0],
+ text_depth="page",
+ nemoretriever_parse_config=MagicMock(),
+ )
+
+ assert len(result) == 1
+ assert result[0][0].value == "text"
+ assert result[0][1]["content"] == "testing0\n\ntesting1"
+ assert result[0][1]["source_metadata"]["source_id"] == "source1"
+
+ blocks = result[0][1]["content_metadata"]["hierarchy"]["nearby_objects"]
+ assert blocks["text"]["content"] == ["testing0", "testing1"]
+ assert blocks["text"]["type"] == ["Title", "Text"]
diff --git a/tests/nv_ingest/util/nim/test_doughnut.py b/tests/nv_ingest/util/nim/test_doughnut.py
deleted file mode 100644
index 656acf8f..00000000
--- a/tests/nv_ingest/util/nim/test_doughnut.py
+++ /dev/null
@@ -1,143 +0,0 @@
-# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
-# All rights reserved.
-# SPDX-License-Identifier: Apache-2.0
-
-import pytest
-
-from nv_ingest.util.nim.doughnut import extract_classes_bboxes
-from nv_ingest.util.nim.doughnut import postprocess_text
-from nv_ingest.util.nim.doughnut import reverse_transform_bbox
-from nv_ingest.util.nim.doughnut import strip_markdown_formatting
-
-
-def test_reverse_transform_bbox_no_offset():
- bbox = (10, 20, 30, 40)
- bbox_offset = (0, 0)
- expected_bbox = (10, 20, 30, 40)
- transformed_bbox = reverse_transform_bbox(bbox, bbox_offset, 100, 100)
-
- assert transformed_bbox == expected_bbox
-
-
-def test_reverse_transform_bbox_with_offset():
- bbox = (20, 30, 40, 50)
- bbox_offset = (10, 10)
- expected_bbox = (12, 25, 37, 50)
- transformed_bbox = reverse_transform_bbox(bbox, bbox_offset, 100, 100)
-
- assert transformed_bbox == expected_bbox
-
-
-def test_reverse_transform_bbox_with_large_offset():
- bbox = (60, 80, 90, 100)
- bbox_offset = (20, 30)
- width_ratio = (100 - 2 * bbox_offset[0]) / 100
- height_ratio = (100 - 2 * bbox_offset[1]) / 100
- expected_bbox = (
- int((60 - bbox_offset[0]) / width_ratio),
- int((80 - bbox_offset[1]) / height_ratio),
- int((90 - bbox_offset[0]) / width_ratio),
- int((100 - bbox_offset[1]) / height_ratio),
- )
- transformed_bbox = reverse_transform_bbox(bbox, bbox_offset, 100, 100)
-
- assert transformed_bbox == expected_bbox
-
-
-def test_reverse_transform_bbox_custom_dimensions():
- bbox = (15, 25, 35, 45)
- bbox_offset = (5, 5)
- original_width = 200
- original_height = 200
- width_ratio = (original_width - 2 * bbox_offset[0]) / original_width
- height_ratio = (original_height - 2 * bbox_offset[1]) / original_height
- expected_bbox = (
- int((15 - bbox_offset[0]) / width_ratio),
- int((25 - bbox_offset[1]) / height_ratio),
- int((35 - bbox_offset[0]) / width_ratio),
- int((45 - bbox_offset[1]) / height_ratio),
- )
- transformed_bbox = reverse_transform_bbox(bbox, bbox_offset, original_width, original_height)
-
- assert transformed_bbox == expected_bbox
-
-
-def test_reverse_transform_bbox_zero_dimension():
- bbox = (10, 10, 20, 20)
- bbox_offset = (0, 0)
- original_width = 0
- original_height = 0
- with pytest.raises(ZeroDivisionError):
- reverse_transform_bbox(bbox, bbox_offset, original_width, original_height)
-
-
-def test_postprocess_text_with_unaccepted_class():
- # Input text that should not be processed
- txt = "This text should not be processed"
- cls = "InvalidClass" # Not in ACCEPTED_CLASSES
-
- result = postprocess_text(txt, cls)
-
- assert result == ""
-
-
-def test_postprocess_text_removes_tbc_and_processes_text():
- # Input text containing ""
- txt = "Some text"
- cls = "Title" # An accepted class
-
- expected_output = "Some text"
-
- result = postprocess_text(txt, cls)
-
- assert result == expected_output
-
-
-def test_postprocess_text_no_tbc_but_accepted_class():
- # Input text without ""
- txt = "This is a test **without** tbc"
- cls = "Section-header" # An accepted class
-
- expected_output = "This is a test without tbc"
-
- result = postprocess_text(txt, cls)
-
- assert result == expected_output
-
-
-@pytest.mark.parametrize(
- "input_text, expected_classes, expected_bboxes, expected_texts",
- [
- ("Sample text", ["Text"], [(10, 20, 30, 40)], ["Sample text"]),
- (
- "Invalid text ",
- ["Bad-box", "Bad-box"],
- [(0, 0, 0, 0), (10, 20, 30, 40)],
- ["Invalid text", ""],
- ),
- ("Header content", ["Title"], [(15, 25, 35, 45)], ["Header content"]),
- ("Overlapping box", ["Bad-box"], [(5, 10, 5, 10)], ["Overlapping box"]),
- ],
-)
-def test_extract_classes_bboxes(input_text, expected_classes, expected_bboxes, expected_texts):
- classes, bboxes, texts = extract_classes_bboxes(input_text)
- assert classes == expected_classes
- assert bboxes == expected_bboxes
- assert texts == expected_texts
-
-
-# Test cases for strip_markdown_formatting
-@pytest.mark.parametrize(
- "input_text, expected_output",
- [
- ("# Header\n**Bold text**\n*Italic*", "Header\nBold text\nItalic"),
- ("~~Strikethrough~~", "Strikethrough"),
- ("[Link](http://example.com)", "Link"),
- ("`inline code`", "inline code"),
- ("> Blockquote", "Blockquote"),
- ("Normal text\n\n\nMultiple newlines", "Normal text\n\nMultiple newlines"),
- ("Dot sequence...... more text", "Dot sequence..... more text"),
- ],
-)
-def test_strip_markdown_formatting(input_text, expected_output):
- assert strip_markdown_formatting(input_text) == expected_output