diff --git a/fivetran_provider/hooks/fivetran.py b/fivetran_provider/hooks/fivetran.py index 7ca52ff..0b357dc 100644 --- a/fivetran_provider/hooks/fivetran.py +++ b/fivetran_provider/hooks/fivetran.py @@ -356,7 +356,7 @@ def get_last_sync(self, connector_id, xcom=""): last_sync = succeeded_at if succeeded_at > failed_at else failed_at return last_sync - def get_sync_status(self, connector_id, previous_completed_at): + def get_sync_status(self, connector_id, previous_completed_at, reschedule_time=0): """ For sensor, return True if connector's 'succeeded_at' field has updated. :param connector_id: Fivetran connector_id, found in connector settings @@ -365,9 +365,10 @@ def get_sync_status(self, connector_id, previous_completed_at): :param previous_completed_at: The last time the connector ran, collected on Sensor initialization. :type previous_completed_at: pendulum.datetime.DateTime + :param reschedule_time: Optional, if connector is in reset state + number of seconds to wait before restarting, else Fivetran suggestion used + :type reschedule_time: int """ - # @todo Need logic here to tell if the sync is not running at all and not - # likely to run in the near future. connector_details = self.get_connector(connector_id) succeeded_at = self._parse_timestamp(connector_details["succeeded_at"]) failed_at = self._parse_timestamp(connector_details["failed_at"]) @@ -387,6 +388,19 @@ def get_sync_status(self, connector_id, previous_completed_at): sync_state = connector_details["status"]["sync_state"] self.log.info(f'Connector "{connector_id}": sync_state = {sync_state}') + # if sync in resheduled start, wait for time recommended by Fivetran + # or manually specified, then restart sync + if ( + sync_state == "rescheduled" + and connector_details["schedule_type"] == "manual" + ): + self.log.info( + f'Connector is in "rescheduled" state and needs to be manually restarted' + ) + self.pause_and_restart( + connector_details["status"]["rescheduled_for"], reschedule_time + ) + return False # Check if sync started by FivetranOperator has finished # indicated by new 'succeeded_at' timestamp if current_completed_at > previous_completed_at: @@ -399,6 +413,34 @@ def get_sync_status(self, connector_id, previous_completed_at): else: return False + def pause_and_restart(connector_id, reschedule_for, reschedule_time): + """ + While a connector is syncing, if it falls into a reschedule state, + wait for a time either specified by the user of recommended by Fivetran, + Then restart a sync + :param connector_id: Fivetran connector_id, found in connector settings + page in the Fivetran user interface. + :type connector_id: str + :param reschedule_for: From connector details, if schedule_type is manual, + then the connector expects triggering the event at the designated UTC time + :type reschedule_for: str + :param reschedule_time: Optional, if connector is in reset state + number of seconds to wait before restarting, else Fivetran suggestion used + :type reschedule_time: int + """ + if reschedule_time: + self.log.info(f'Starting connector again in "{reschedule_time}" seconds') + time.sleep(reschedule_time) + else: + wait_time = ( + _parse_timestamp(reschedule_for).add(minutes=1) - pendulum.now(tz="UTC") + ).seconds + self.log.info(f'Starting connector again in "{wait_time}" seconds') + time.sleep(wait_time) + + self.log.info("Restarting connector now") + return self.start_fivetran_sync(connector_id) + def _parse_timestamp(self, api_time): """ Returns either the pendulum-parsed actual timestamp or diff --git a/fivetran_provider/sensors/fivetran.py b/fivetran_provider/sensors/fivetran.py index 50c2485..780da71 100644 --- a/fivetran_provider/sensors/fivetran.py +++ b/fivetran_provider/sensors/fivetran.py @@ -37,6 +37,9 @@ class FivetranSensor(BaseSensorOperator): :param xcom: If used, FivetranSensor receives timestamp of previously completed sync from FivetranOperator via XCOM :type xcom: str + :param reschedule_time: Optional, if connector is in reset state + number of seconds to wait before restarting, else Fivetran suggestion used + :type reschedule_time: int """ # Define which fields get jinjaified @@ -51,6 +54,7 @@ def __init__( fivetran_retry_limit: int = 3, fivetran_retry_delay: int = 1, xcom: str = "", + reschedule_time: int = 0, **kwargs: Any ) -> None: super().__init__(**kwargs) @@ -62,6 +66,7 @@ def __init__( self.fivetran_retry_delay = fivetran_retry_delay self.hook = None self.xcom = xcom + self.reschedule_time = reschedule_time def _get_hook(self) -> FivetranHook: if self.hook is None: @@ -78,4 +83,4 @@ def poke(self, context): self.previous_completed_at = hook.get_last_sync( self.connector_id, self.xcom ) - return hook.get_sync_status(self.connector_id, self.previous_completed_at) + return hook.get_sync_status(self.connector_id, self.previous_completed_at, self.reschedule_time)