Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/NVIDIA/nv-ingest into expan…
Browse files Browse the repository at this point in the history
…d-examples
  • Loading branch information
ChrisJar committed Nov 22, 2024
2 parents 655fc25 + 64f3db3 commit eb19853
Show file tree
Hide file tree
Showing 16 changed files with 466 additions and 389 deletions.
1 change: 1 addition & 0 deletions client/client_examples/examples/python_client_usage.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,7 @@
"job_spec.add_task(extract_task)\n",
"job_spec.add_task(dedup_task)\n",
"job_spec.add_task(filter_task)\n",
"job_spec.add_task(split_task)\n",
"job_spec.add_task(store_task)\n",
"job_spec.add_task(embed_task)\n",
"job_spec.add_task(vdb_upload_task)"
Expand Down
2 changes: 1 addition & 1 deletion client/src/nv_ingest_client/cli/util/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ def create_and_process_jobs(
failed_jobs.append(f"{job_id}::{source_name}")
except RuntimeError as e:
source_name = job_id_map[job_id]
logger.error(f"Error while processing {job_id}({source_name}) {e}")
logger.error(f"Error while processing '{job_id}' - ({source_name}):\n{e}")
failed_jobs.append(f"{job_id}::{source_name}")
except Exception as e:
traceback.print_exc()
Expand Down
39 changes: 25 additions & 14 deletions client/src/nv_ingest_client/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ def __init__(

self._current_message_id = 0
self._job_states = {}
self._job_index_to_job_spec = {}
self._message_client_hostname = message_client_hostname or "localhost"
self._message_client_port = message_client_port or 7670
self._message_counter_id = msg_counter_id or "nv-ingest-message-id"
Expand Down Expand Up @@ -177,16 +178,19 @@ def _add_single_job(self, job_spec: JobSpec) -> str:

return job_index

def add_job(self, job_spec: Union[BatchJobSpec, JobSpec]) -> str:
def add_job(self, job_spec: Union[BatchJobSpec, JobSpec]) -> Union[str, List[str]]:
if isinstance(job_spec, JobSpec):
job_index = self._add_single_job(job_spec)
self._job_index_to_job_spec[job_index] = job_spec

return job_index
elif isinstance(job_spec, BatchJobSpec):
job_indexes = []
for _, job_specs in job_spec.job_specs.items():
for job in job_specs:
job_index = self._add_single_job(job)
job_indexes.append(job_index)
self._job_index_to_job_spec[job_index] = job
return job_indexes
else:
raise ValueError(f"Unexpected type: {type(job_spec)}")
Expand Down Expand Up @@ -241,7 +245,8 @@ def create_job(
extended_options=extended_options,
)

return self.add_job(job_spec)
job_id = self.add_job(job_spec)
return job_id

def add_task(self, job_index: str, task: Task) -> None:
job_state = self._get_and_check_job_state(job_index, required_state=JobStateEnum.PENDING)
Expand Down Expand Up @@ -295,7 +300,8 @@ def _fetch_job_result(self, job_index: str, timeout: float = 100, data_only: boo
"""

try:
job_state = self._get_and_check_job_state(job_index, required_state=[JobStateEnum.SUBMITTED, JobStateEnum.SUBMITTED_ASYNC])
job_state = self._get_and_check_job_state(job_index, required_state=[JobStateEnum.SUBMITTED,
JobStateEnum.SUBMITTED_ASYNC])
response = self._message_client.fetch_message(job_state.job_id, timeout)

if response is not None:
Expand Down Expand Up @@ -345,12 +351,12 @@ def _fetch_job_result_wait(self, job_id: str, timeout: float = 60, data_only: bo
# This is the direct Python approach function for retrieving jobs which handles the timeouts directly
# in the function itself instead of expecting the user to handle it themselves
def fetch_job_result(
self,
job_ids: Union[str, List[str]],
timeout: float = 100,
max_retries: Optional[int] = None,
retry_delay: float = 1,
verbose: bool = False,
self,
job_ids: Union[str, List[str]],
timeout: float = 100,
max_retries: Optional[int] = None,
retry_delay: float = 1,
verbose: bool = False,
) -> List[Tuple[Optional[Dict], str]]:
"""
Fetches job results for multiple job IDs concurrently with individual timeouts and retry logic.
Expand Down Expand Up @@ -410,14 +416,19 @@ def fetch_with_retries(job_id: str):
try:
result = handle_future_result(future, timeout=timeout)
results.append(result.get("data"))
del self._job_index_to_job_spec[job_id]
except concurrent.futures.TimeoutError:
logger.error(f"Timeout while fetching result for job ID {job_id}")
logger.error(
f"Timeout while fetching result for job ID {job_id}: {self._job_index_to_job_spec[job_id].source_id}")
except json.JSONDecodeError as e:
logger.error(f"Decoding while processing job ID {job_id}: {e}")
logger.error(
f"Decoding while processing job ID {job_id}: {self._job_index_to_job_spec[job_id].source_id}\n{e}")
except RuntimeError as e:
logger.error(f"Error while processing job ID {job_id}: {e}")
logger.error(
f"Error while processing job ID {job_id}: {self._job_index_to_job_spec[job_id].source_id}\n{e}")
except Exception as e:
logger.error(f"Error while fetching result for job ID {job_id}: {e}")
logger.error(
f"Error while fetching result for job ID {job_id}: {self._job_index_to_job_spec[job_id].source_id}\n{e}")

return results

Expand Down Expand Up @@ -585,7 +596,7 @@ def submit_job_async(self, job_indices: Union[str, List[str]], job_queue_id: str

return future_to_job_index

def create_jobs_for_batch(self, files_batch: List[str], tasks: Dict[str, Any]) -> List[JobSpec]:
def create_jobs_for_batch(self, files_batch: List[str], tasks: Dict[str, Any]) -> List[str]:
"""
Create and submit job specifications (JobSpecs) for a batch of files, returning the job IDs.
This function takes a batch of files, processes each file to extract its content and type,
Expand Down
24 changes: 14 additions & 10 deletions client/src/nv_ingest_client/client/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ class Ingestor:
"""

def __init__(
self,
documents: Optional[List[str]] = None,
client: Optional[NvIngestClient] = None,
job_queue_id: str = DEFAULT_JOB_QUEUE_ID,
**kwargs,
self,
documents: Optional[List[str]] = None,
client: Optional[NvIngestClient] = None,
job_queue_id: str = DEFAULT_JOB_QUEUE_ID,
**kwargs,
):
self._documents = documents or []
self._client = client
Expand All @@ -83,6 +83,7 @@ def __init__(
self._job_specs = None
self._job_ids = None
self._job_states = None
self._job_id_to_source_id = {}

if self._check_files_local():
self._job_specs = BatchJobSpec(self._documents)
Expand Down Expand Up @@ -242,6 +243,7 @@ def ingest_async(self, **kwargs: Any) -> Future:
self._prepare_ingest_run()

self._job_ids = self._client.add_job(self._job_specs)

future_to_job_id = self._client.submit_job_async(self._job_ids, self._job_queue_id, **kwargs)
self._job_states = {job_id: self._client._get_and_check_job_state(job_id) for job_id in self._job_ids}

Expand Down Expand Up @@ -300,8 +302,8 @@ def all_tasks(self) -> "Ingestor":
.filter() \
.split() \
.embed()
# .store() \
# .vdb_upload()
# .store() \
# .vdb_upload()
# fmt: on
return self

Expand Down Expand Up @@ -360,11 +362,13 @@ def extract(self, **kwargs: Any) -> "Ingestor":
Ingestor
Returns self for chaining.
"""
extract_tables = kwargs.get("extract_tables", False)
extract_charts = kwargs.get("extract_charts", False)
extract_tables = kwargs.pop("extract_tables", True)
extract_charts = kwargs.pop("extract_charts", True)

for document_type in self._job_specs.file_types:
extract_task = ExtractTask(document_type, **kwargs)
extract_task = ExtractTask(
document_type, extract_tables=extract_tables, extract_charts=extract_charts, **kwargs
)
self._job_specs.add_task(extract_task, document_type=document_type)

if extract_tables is True:
Expand Down
4 changes: 4 additions & 0 deletions client/src/nv_ingest_client/primitives/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
# SPDX-License-Identifier: Apache-2.0

from .caption import CaptionTask
from .chart_extraction import ChartExtractionTask
from .dedup import DedupTask
from .embed import EmbedTask
from .extract import ExtractTask
from .filter import FilterTask
from .split import SplitTask
from .store import StoreTask
from .table_extraction import TableExtractionTask
from .task_base import Task
from .task_base import TaskType
from .task_base import is_valid_task_type
Expand All @@ -17,10 +19,12 @@

__all__ = [
"CaptionTask",
"ChartExtractionTask",
"ExtractTask",
"is_valid_task_type",
"SplitTask",
"StoreTask",
"TableExtractionTask",
"Task",
"task_factory",
"TaskType",
Expand Down
16 changes: 13 additions & 3 deletions client/src/nv_ingest_client/util/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@

logger = logging.getLogger(__name__)


# pylint: disable=invalid-name
# pylint: disable=missing-class-docstring
# pylint: disable=logging-fstring-interpolation
Expand Down Expand Up @@ -257,14 +256,25 @@ def check_ingest_result(json_payload: Dict) -> typing.Tuple[bool, str]:
)

is_failed = json_payload.get("status", "") in "failed"
description = json_payload.get("description", "")
description = ""
if (is_failed):
try:
source_id = json_payload.get("data", [])[0].get("metadata", {}).get("source_metadata", {}).get(
"source_name",
"")
except Exception as e:
source_id = ""

description = f"[{source_id}]: {json_payload.get('status', '')}\n"

description += (json_payload.get("description", ""))

# Look to see if we have any failure annotations to augment the description
if is_failed and "annotations" in json_payload:
for annot_id, value in json_payload["annotations"].items():
if "task_result" in value and value["task_result"] == "FAILURE":
message = value.get("message", "Unknown")
description = f"\n↪ Event that caused this failure: {annot_id} -> {message}"
description += f"\n↪ Event that caused this failure: {annot_id} -> {message}"
break

return is_failed, description
Expand Down
15 changes: 14 additions & 1 deletion config/otel-collector-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,19 @@ exporters:

processors:
batch:
tail_sampling:
policies: [
{
name: filter_http_url,
type: string_attribute,
string_attribute: {
key: http.route,
values: [ "/health/ready" ],
enabled_regex_matching: true,
invert_match: true
}
}
]

extensions:
health_check:
Expand All @@ -32,7 +45,7 @@ service:
pipelines:
traces:
receivers: [otlp]
processors: [batch]
processors: [batch, tail_sampling]
exporters: [zipkin, logging]
metrics:
receivers: [otlp]
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
aiohttp==3.9.4
aiohttp==3.10.11
charset-normalizer
click
opencv-python
Expand Down
4 changes: 3 additions & 1 deletion src/nv_ingest/api/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
Expand All @@ -22,7 +23,8 @@
from .v1.ingest import router as IngestApiRouter

# Set up the tracer provider and add a processor for exporting traces
trace.set_tracer_provider(TracerProvider())
resource = Resource(attributes={"service.name": "nv-ingest"})
trace.set_tracer_provider(TracerProvider(resource=resource))
tracer = trace.get_tracer(__name__)

otel_endpoint = os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT", "otel-collector:4317")
Expand Down
22 changes: 15 additions & 7 deletions src/nv_ingest/extraction_workflows/pdf/doughnut_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import pypdfium2 as pdfium
import tritonclient.grpc as grpcclient

from nv_ingest.extraction_workflows.pdf import doughnut_utils
from nv_ingest.schemas.metadata_schema import AccessLevelEnum
from nv_ingest.schemas.metadata_schema import ContentSubtypeEnum
from nv_ingest.schemas.metadata_schema import ContentTypeEnum
Expand All @@ -39,6 +38,7 @@
from nv_ingest.util.exception_handlers.pdf import pdfium_exception_handler
from nv_ingest.util.image_processing.transforms import crop_image
from nv_ingest.util.image_processing.transforms import numpy_to_base64
from nv_ingest.util.nim import doughnut as doughnut_utils
from nv_ingest.util.pdf.metadata_aggregators import Base64Image
from nv_ingest.util.pdf.metadata_aggregators import LatexTable
from nv_ingest.util.pdf.metadata_aggregators import construct_image_metadata_from_pdf_image
Expand All @@ -50,6 +50,9 @@

DOUGHNUT_GRPC_TRITON = os.environ.get("DOUGHNUT_GRPC_TRITON", "triton:8001")
DEFAULT_BATCH_SIZE = 16
DEFAULT_RENDER_DPI = 300
DEFAULT_MAX_WIDTH = 1024
DEFAULT_MAX_HEIGHT = 1280


# Define a helper function to use doughnut to extract text from a base64 encoded bytestram PDF
Expand Down Expand Up @@ -165,7 +168,12 @@ def doughnut(pdf_stream, extract_text: bool, extract_images: bool, extract_table
txt = doughnut_utils.postprocess_text(txt, cls)

if extract_images and identify_nearby_objects:
bbox = doughnut_utils.reverse_transform_bbox(bbox, bbox_offset)
bbox = doughnut_utils.reverse_transform_bbox(
bbox=bbox,
bbox_offset=bbox_offset,
original_width=DEFAULT_MAX_WIDTH,
original_height=DEFAULT_MAX_HEIGHT,
)
page_nearby_blocks["text"]["content"].append(txt)
page_nearby_blocks["text"]["bbox"].append(bbox)

Expand All @@ -182,8 +190,8 @@ def doughnut(pdf_stream, extract_text: bool, extract_images: bool, extract_table

elif extract_images and (cls == "Picture"):
if page_image is None:
scale_tuple = (doughnut_utils.DEFAULT_MAX_WIDTH, doughnut_utils.DEFAULT_MAX_HEIGHT)
padding_tuple = (doughnut_utils.DEFAULT_MAX_WIDTH, doughnut_utils.DEFAULT_MAX_HEIGHT)
scale_tuple = (DEFAULT_MAX_WIDTH, DEFAULT_MAX_HEIGHT)
padding_tuple = (DEFAULT_MAX_WIDTH, DEFAULT_MAX_HEIGHT)
page_image, *_ = pdfium_pages_to_numpy(
[pages[page_idx]], scale_tuple=scale_tuple, padding_tuple=padding_tuple
)
Expand Down Expand Up @@ -280,9 +288,9 @@ def preprocess_and_send_requests(
if not batch:
return []

render_dpi = 300
scale_tuple = (doughnut_utils.DEFAULT_MAX_WIDTH, doughnut_utils.DEFAULT_MAX_HEIGHT)
padding_tuple = (doughnut_utils.DEFAULT_MAX_WIDTH, doughnut_utils.DEFAULT_MAX_HEIGHT)
render_dpi = DEFAULT_RENDER_DPI
scale_tuple = (DEFAULT_MAX_WIDTH, DEFAULT_MAX_HEIGHT)
padding_tuple = (DEFAULT_MAX_WIDTH, DEFAULT_MAX_HEIGHT)

page_images, bbox_offsets = pdfium_pages_to_numpy(
batch, render_dpi=render_dpi, scale_tuple=scale_tuple, padding_tuple=padding_tuple
Expand Down
19 changes: 11 additions & 8 deletions src/nv_ingest/extraction_workflows/pdf/pdfium_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -471,15 +471,18 @@ def pdfium_extractor(
pdfium_config,
trace_info=trace_info,
):
extracted_data.append(
construct_table_and_chart_metadata(
table_and_charts,
page_idx,
pdf_metadata.page_count,
source_metadata,
base_unified_metadata,
if (extract_tables and (table_and_charts.type_string == "table")) or (
extract_charts and (table_and_charts.type_string == "chart")
):
extracted_data.append(
construct_table_and_chart_metadata(
table_and_charts,
page_idx,
pdf_metadata.page_count,
source_metadata,
base_unified_metadata,
)
)
)

logger.debug(f"Extracted {len(extracted_data)} items from PDF.")

Expand Down
2 changes: 1 addition & 1 deletion src/nv_ingest/modules/telemetry/otel_tracer.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def collect_timestamps(message):
is_remote=True,
trace_flags=TraceFlags(0x01),
)
parent_ctx = trace.set_span_in_context(NonRecordingSpan(span_context))
parent_ctx = trace.set_span_in_context(span_context)
parent_span = tracer.start_span(job_id, context=parent_ctx, start_time=start_time)

create_span_with_timestamps(tracer, parent_span, message)
Expand Down
Loading

0 comments on commit eb19853

Please sign in to comment.