Skip to content

Commit

Permalink
Introduce an improved client library API with chainable verb methods (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
edknv authored Nov 7, 2024
1 parent a09ad66 commit ba8c6d5
Show file tree
Hide file tree
Showing 9 changed files with 1,001 additions and 31 deletions.
5 changes: 3 additions & 2 deletions client/src/nv_ingest_client/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0

from .client import NvIngestClient
from nv_ingest_client.client.client import NvIngestClient
from nv_ingest_client.client.interface import Ingestor

__all__ = ["NvIngestClient"]
__all__ = ["NvIngestClient", "Ingestor"]
19 changes: 11 additions & 8 deletions client/src/nv_ingest_client/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ def _fetch_job_result(self, job_index: str, timeout: float = 100, data_only: boo
"""

try:
job_state = self._get_and_check_job_state(job_index, required_state=[JobStateEnum.SUBMITTED])
job_state = self._get_and_check_job_state(job_index, required_state=[JobStateEnum.SUBMITTED, JobStateEnum.SUBMITTED_ASYNC])
response = self._message_client.fetch_message(job_state.job_id, timeout)

if response is not None:
Expand Down Expand Up @@ -345,12 +345,12 @@ def _fetch_job_result_wait(self, job_id: str, timeout: float = 60, data_only: bo
# This is the direct Python approach function for retrieving jobs which handles the timeouts directly
# in the function itself instead of expecting the user to handle it themselves
def fetch_job_result(
self,
job_ids: List[str],
timeout: float = 100,
max_retries: Optional[int] = None,
retry_delay: float = 1,
verbose: bool = False,
self,
job_ids: Union[str, List[str]],
timeout: float = 100,
max_retries: Optional[int] = None,
retry_delay: float = 1,
verbose: bool = False,
) -> List[Tuple[Optional[Dict], str]]:
"""
Fetches job results for multiple job IDs concurrently with individual timeouts and retry logic.
Expand All @@ -370,6 +370,9 @@ def fetch_job_result(
TimeoutError: If the fetch operation times out.
Exception: For all other unexpected issues.
"""
if isinstance(job_ids, str):
job_ids = [job_ids]

results = []

def fetch_with_retries(job_id: str):
Expand Down Expand Up @@ -405,7 +408,7 @@ def fetch_with_retries(job_id: str):
for future in as_completed(futures):
job_id = futures[future]
try:
result = handle_future_result(future, futures, timeout)
result = handle_future_result(future, timeout=timeout)
results.append(result.get("data"))
except concurrent.futures.TimeoutError:
logger.error(f"Timeout while fetching result for job ID {job_id}")
Expand Down
Loading

0 comments on commit ba8c6d5

Please sign in to comment.