diff --git a/aixplain/modules/model.py b/aixplain/modules/model.py index 1d3c9c81..298a93a6 100644 --- a/aixplain/modules/model.py +++ b/aixplain/modules/model.py @@ -92,13 +92,13 @@ def to_dict(self) -> Dict: clean_additional_info = {k: v for k, v in self.additional_info.items() if v is not None} return {"id": self.id, "name": self.name, "supplier": self.supplier, "additional_info": clean_additional_info} - def __polling(self, poll_url: Text, name: Text = "model_process", wait_time: float = 1.0, timeout: float = 300) -> Dict: + def __polling(self, poll_url: Text, name: Text = "model_process", wait_time: float = 0.5, timeout: float = 300) -> Dict: """Keeps polling the platform to check whether an asynchronous call is done. Args: poll_url (Text): polling URL name (Text, optional): ID given to a call. Defaults to "model_process". - wait_time (float, optional): wait time in seconds between polling calls. Defaults to 1.0. + wait_time (float, optional): wait time in seconds between polling calls. Defaults to 0.5. timeout (float, optional): total polling time. Defaults to 300. Returns: @@ -106,6 +106,8 @@ def __polling(self, poll_url: Text, name: Text = "model_process", wait_time: flo """ logging.info(f"Polling for Model: Start polling for {name}") start, end = time.time(), time.time() + # keep wait time as 0.2 seconds the minimum + wait_time = max(wait_time, 0.2) completed = False response_body = {"status": "FAILED", "completed": False} while not completed and (end - start) < timeout: @@ -114,9 +116,10 @@ def __polling(self, poll_url: Text, name: Text = "model_process", wait_time: flo completed = response_body["completed"] end = time.time() - time.sleep(wait_time) - if wait_time < 60: - wait_time *= 1.1 + if completed is False: + time.sleep(wait_time) + if wait_time < 60: + wait_time *= 1.1 except Exception as e: response_body = {"status": "ERROR", "completed": False, "error": "No response from the service."} logging.error(f"Polling for Model: polling for {name}: {e}") @@ -159,7 +162,7 @@ def poll(self, poll_url: Text, name: Text = "model_process") -> Dict: logging.error(f"Single Poll for Model: Error of polling for {name}: {e}") return resp - def run(self, data: Union[Text, Dict], name: Text = "model_process", timeout: float = 300, parameters: Dict = {}) -> Dict: + def run(self, data: Union[Text, Dict], name: Text = "model_process", timeout: float = 300, parameters: Dict = {}, wait_time: float = 0.5) -> Dict: """Runs a model call. Args: @@ -167,6 +170,7 @@ def run(self, data: Union[Text, Dict], name: Text = "model_process", timeout: fl name (Text, optional): ID given to a call. Defaults to "model_process". timeout (float, optional): total polling time. Defaults to 300. parameters (Dict, optional): optional parameters to the model. Defaults to "{}". + wait_time (float, optional): wait time in seconds between polling calls. Defaults to 0.5. Returns: Dict: parsed output from model @@ -180,7 +184,7 @@ def run(self, data: Union[Text, Dict], name: Text = "model_process", timeout: fl return response poll_url = response["url"] end = time.time() - response = self.__polling(poll_url, name=name, timeout=timeout) + response = self.__polling(poll_url, name=name, timeout=timeout, wait_time=wait_time) return response except Exception as e: msg = f"Error in request for {name} - {traceback.format_exc()}" diff --git a/aixplain/modules/pipeline.py b/aixplain/modules/pipeline.py index 595b3a4e..c4eb290e 100644 --- a/aixplain/modules/pipeline.py +++ b/aixplain/modules/pipeline.py @@ -84,7 +84,7 @@ def __polling( Returns: dict: response obtained by polling call """ - + # TO DO: wait_time = to the longest path of the pipeline * minimum waiting time logging.debug(f"Polling for Pipeline: Start polling for {name} ") start, end = time.time(), time.time() completed = False @@ -96,9 +96,10 @@ def __polling( completed = response_body["completed"] end = time.time() - time.sleep(wait_time) - if wait_time < 60: - wait_time *= 1.1 + if completed is False: + time.sleep(wait_time) + if wait_time < 60: + wait_time *= 1.1 except Exception as e: logging.error(f"Polling for Pipeline: polling for {name} : Continue") if response_body and response_body["status"] == "SUCCESS": @@ -132,13 +133,14 @@ def poll(self, poll_url: Text, name: Text = "pipeline_process") -> Dict: resp = {"status": "FAILED"} return resp - def run(self, data: Union[Text, Dict], name: Text = "pipeline_process", timeout: float = 20000.0) -> Dict: + def run(self, data: Union[Text, Dict], name: Text = "pipeline_process", timeout: float = 20000.0, wait_time: float = 1.0) -> Dict: """Runs a pipeline call. Args: data (Union[Text, Dict]): link to the input data name (str, optional): ID given to a call. Defaults to "pipeline_process". timeout (float, optional): total polling time. Defaults to 20000.0. + wait_time (float, optional): wait time in seconds between polling calls. Defaults to 1.0. Returns: Dict: parsed output from pipeline @@ -152,7 +154,7 @@ def run(self, data: Union[Text, Dict], name: Text = "pipeline_process", timeout: return response poll_url = response["url"] end = time.time() - response = self.__polling(poll_url, name=name, timeout=timeout) + response = self.__polling(poll_url, name=name, timeout=timeout, wait_time=wait_time) return response except Exception as e: error_message = f"Error in request for {name} "