Skip to content

Commit

Permalink
Draft version of what true async function might look like and what as…
Browse files Browse the repository at this point in the history
…k_ml might look like
  • Loading branch information
brandon-groundlight committed Sep 19, 2023
1 parent 1c54591 commit 614fd54
Showing 1 changed file with 108 additions and 6 deletions.
114 changes: 108 additions & 6 deletions src/groundlight/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
import os
import time
from io import BufferedReader, BytesIO
from typing import Optional, Union
from typing import Optional, Union, List
import asyncio

from model import Detector, ImageQuery, PaginatedDetectorList, PaginatedImageQueryList
from openapi_client import Configuration
Expand Down Expand Up @@ -235,15 +236,43 @@ def ask_confident(
image: Union[str, bytes, Image.Image, BytesIO, BufferedReader, np.ndarray],
patience_time: Optional[float] = None,
) -> ImageQuery:
# Not yet differentiated from submit_image_query other than simplified parameter set
self.submit_image_query(detector, image, wait=patience_time, patience_time=patience_time)

def ask_ml(
self,
detector: Union[Detector, str],
image: Union[str, bytes, Image.Image, BytesIO, BufferedReader, np.ndarray],
wait: Optional[float] = None,
human_review: Optional[str] = None,
) -> ImageQuery:
self.submit_image_query(detector, image, wait=wait)
if wait is None:
wait = self.DEFAULT_WAIT
if patience_time is None:
patience_time = self.DEFAULT_PATIENCE
if wait > patience_time:
patience_time = wait

detector_id = detector.id if isinstance(detector, Detector) else detector

image_bytesio: ByteStreamWrapper = parse_supported_image_types(image)

params = {"detector_id": detector_id, "body": image_bytesio}
if patience_time == 0:
params["patience_time"] = self.DEFAULT_PATIENCE
else:
params["patience_time"] = patience_time

# still available to even within ask_ml
if human_review is not None:
params["human_review"] = human_review

raw_image_query = self.image_queries_api.submit_image_query(**params)
image_query = ImageQuery.parse_obj(raw_image_query.to_dict())

if wait:
image_query = self.wait_for_fast_ml_result(image_query, timeout_sec=wait)
return self._fixup_image_query(image_query)

def ask_async(
self,
Expand Down Expand Up @@ -276,19 +305,92 @@ def ask_async(
image_bytesio: ByteStreamWrapper = parse_supported_image_types(image)

params = {"detector_id": detector_id, "body": image_bytesio}
### This would require a corresponding backend change, but could save up to a couple seconds of time
### waiting for the server response
### alternatively, we could use the asyncio
params["async"] = True
if patience_time == 0:
params["patience_time"] = self.DEFAULT_PATIENCE
else:
params["patience_time"] = patience_time

# If no inspection_id is provided, we submit the image query using image_queries_api (autogenerated via OpenAPI)
# However, our autogenerated code does not currently support inspection_id, so if an inspection_id was
# provided, we use the private API client instead.
raw_image_query = self.image_queries_api.submit_image_query(**params) # best api call we have, still has delay
image_query = ImageQuery.parse_obj(raw_image_query.to_dict())
return self._fixup_image_query(image_query)

raw_image_query = self.image_queries_api.submit_image_query(**params)
async def ask_async_alternate(
self,
detector: Union[Detector, str],
image: Union[str, bytes, Image.Image, BytesIO, BufferedReader, np.ndarray],
patience_time: Optional[float] = None,
) -> ImageQuery:
if patience_time is None:
patience_time = self.DEFAULT_PATIENCE

detector_id = detector.id if isinstance(detector, Detector) else detector

image_bytesio: ByteStreamWrapper = parse_supported_image_types(image)

params = {"detector_id": detector_id, "body": image_bytesio}
if patience_time == 0:
params["patience_time"] = self.DEFAULT_PATIENCE
else:
params["patience_time"] = patience_time
### This would still benefit from a backend change, but uses true async
# params["async"] = True
raw_image_query = await self.image_queries_api.submit_image_query(
**params
) # best api call we have, still has delay
image_query = ImageQuery.parse_obj(raw_image_query.to_dict())
return self._fixup_image_query(image_query)

def ask_async_alternate_wrapper(
self,
detector: Union[Detector, str],
image_set: List[Union[str, bytes, Image.Image, BytesIO, BufferedReader, np.ndarray]],
patience_time: Optional[float] = None,
) -> List[ImageQuery]:
async def wrapper():
tasks = [
asyncio.create_task(self.ask_async_alternate(detector, image, patience_time)) for image in image_set
]
for task in tasks:
await task
# alternatively use asyncio.gather
# await asyncio.gather(*(self.ask_async_alternate(i) for i in image_set))

asyncio.run(wrapper())

def wait_for_fast_ml_result(
self,
image_query: Union[ImageQuery, str],
timeout_sec: float = 30.0,
) -> ImageQuery:
# Convert from image_query_id to ImageQuery if needed.
if isinstance(image_query, str):
image_query = self.get_image_query(image_query)

start_time = time.time()
next_delay = self.POLLING_INITIAL_DELAY
target_delay = 0.0
image_query = self._fixup_image_query(image_query)
while True:
patience_so_far = time.time() - start_time
if iq_has_answer(image_query): # Primary difference from wait_for_confident_result
logger.debug(f"Confident answer for {image_query} after {patience_so_far:.1f}s")
break
if patience_so_far >= timeout_sec:
logger.debug(f"Timeout after {timeout_sec:.0f}s waiting for {image_query}")
break
target_delay = min(patience_so_far + next_delay, timeout_sec)
sleep_time = max(target_delay - patience_so_far, 0)

time.sleep(sleep_time)
next_delay *= self.POLLING_EXPONENTIAL_BACKOFF
image_query = self.get_image_query(image_query.id)
image_query = self._fixup_image_query(image_query)
return image_query

def wait_for_confident_result(
self,
image_query: Union[ImageQuery, str],
Expand Down

0 comments on commit 614fd54

Please sign in to comment.