Skip to content

Commit

Permalink
M 4504286101 wait time (#34)
Browse files Browse the repository at this point in the history
* M-4504286101: change polling wait time strategy

* Changes in the wait time increase condition

* adding wait time as a parameter in pipeline execution
  • Loading branch information
thiago-aixplain authored May 20, 2023
1 parent a7902c3 commit 5818c56
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 13 deletions.
18 changes: 11 additions & 7 deletions aixplain/modules/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,20 +92,22 @@ def to_dict(self) -> Dict:
clean_additional_info = {k: v for k, v in self.additional_info.items() if v is not None}
return {"id": self.id, "name": self.name, "supplier": self.supplier, "additional_info": clean_additional_info}

def __polling(self, poll_url: Text, name: Text = "model_process", wait_time: float = 1.0, timeout: float = 300) -> Dict:
def __polling(self, poll_url: Text, name: Text = "model_process", wait_time: float = 0.5, timeout: float = 300) -> Dict:
"""Keeps polling the platform to check whether an asynchronous call is done.
Args:
poll_url (Text): polling URL
name (Text, optional): ID given to a call. Defaults to "model_process".
wait_time (float, optional): wait time in seconds between polling calls. Defaults to 1.0.
wait_time (float, optional): wait time in seconds between polling calls. Defaults to 0.5.
timeout (float, optional): total polling time. Defaults to 300.
Returns:
Dict: response obtained by polling call
"""
logging.info(f"Polling for Model: Start polling for {name}")
start, end = time.time(), time.time()
# keep wait time as 0.2 seconds the minimum
wait_time = max(wait_time, 0.2)
completed = False
response_body = {"status": "FAILED", "completed": False}
while not completed and (end - start) < timeout:
Expand All @@ -114,9 +116,10 @@ def __polling(self, poll_url: Text, name: Text = "model_process", wait_time: flo
completed = response_body["completed"]

end = time.time()
time.sleep(wait_time)
if wait_time < 60:
wait_time *= 1.1
if completed is False:
time.sleep(wait_time)
if wait_time < 60:
wait_time *= 1.1
except Exception as e:
response_body = {"status": "ERROR", "completed": False, "error": "No response from the service."}
logging.error(f"Polling for Model: polling for {name}: {e}")
Expand Down Expand Up @@ -159,14 +162,15 @@ def poll(self, poll_url: Text, name: Text = "model_process") -> Dict:
logging.error(f"Single Poll for Model: Error of polling for {name}: {e}")
return resp

def run(self, data: Union[Text, Dict], name: Text = "model_process", timeout: float = 300, parameters: Dict = {}) -> Dict:
def run(self, data: Union[Text, Dict], name: Text = "model_process", timeout: float = 300, parameters: Dict = {}, wait_time: float = 0.5) -> Dict:
"""Runs a model call.
Args:
data (Union[Text, Dict]): link to the input data
name (Text, optional): ID given to a call. Defaults to "model_process".
timeout (float, optional): total polling time. Defaults to 300.
parameters (Dict, optional): optional parameters to the model. Defaults to "{}".
wait_time (float, optional): wait time in seconds between polling calls. Defaults to 0.5.
Returns:
Dict: parsed output from model
Expand All @@ -180,7 +184,7 @@ def run(self, data: Union[Text, Dict], name: Text = "model_process", timeout: fl
return response
poll_url = response["url"]
end = time.time()
response = self.__polling(poll_url, name=name, timeout=timeout)
response = self.__polling(poll_url, name=name, timeout=timeout, wait_time=wait_time)
return response
except Exception as e:
msg = f"Error in request for {name} - {traceback.format_exc()}"
Expand Down
14 changes: 8 additions & 6 deletions aixplain/modules/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def __polling(
Returns:
dict: response obtained by polling call
"""

# TO DO: wait_time = to the longest path of the pipeline * minimum waiting time
logging.debug(f"Polling for Pipeline: Start polling for {name} ")
start, end = time.time(), time.time()
completed = False
Expand All @@ -96,9 +96,10 @@ def __polling(
completed = response_body["completed"]

end = time.time()
time.sleep(wait_time)
if wait_time < 60:
wait_time *= 1.1
if completed is False:
time.sleep(wait_time)
if wait_time < 60:
wait_time *= 1.1
except Exception as e:
logging.error(f"Polling for Pipeline: polling for {name} : Continue")
if response_body and response_body["status"] == "SUCCESS":
Expand Down Expand Up @@ -132,13 +133,14 @@ def poll(self, poll_url: Text, name: Text = "pipeline_process") -> Dict:
resp = {"status": "FAILED"}
return resp

def run(self, data: Union[Text, Dict], name: Text = "pipeline_process", timeout: float = 20000.0) -> Dict:
def run(self, data: Union[Text, Dict], name: Text = "pipeline_process", timeout: float = 20000.0, wait_time: float = 1.0) -> Dict:
"""Runs a pipeline call.
Args:
data (Union[Text, Dict]): link to the input data
name (str, optional): ID given to a call. Defaults to "pipeline_process".
timeout (float, optional): total polling time. Defaults to 20000.0.
wait_time (float, optional): wait time in seconds between polling calls. Defaults to 1.0.
Returns:
Dict: parsed output from pipeline
Expand All @@ -152,7 +154,7 @@ def run(self, data: Union[Text, Dict], name: Text = "pipeline_process", timeout:
return response
poll_url = response["url"]
end = time.time()
response = self.__polling(poll_url, name=name, timeout=timeout)
response = self.__polling(poll_url, name=name, timeout=timeout, wait_time=wait_time)
return response
except Exception as e:
error_message = f"Error in request for {name} "
Expand Down

0 comments on commit 5818c56

Please sign in to comment.