From bc193743774f9e74e267e2e0b8ac21042a6e7f9d Mon Sep 17 00:00:00 2001 From: Nick Acosta Date: Tue, 14 Jun 2022 13:04:23 -0700 Subject: [PATCH] add xcom functionality and example --- .../example_dags/example_fivetran_xcom.py | 42 +++++++++++++++++++ fivetran_provider/hooks/fivetran.py | 31 +++++++++++--- fivetran_provider/sensors/fivetran.py | 7 ++-- 3 files changed, 71 insertions(+), 9 deletions(-) create mode 100644 fivetran_provider/example_dags/example_fivetran_xcom.py diff --git a/fivetran_provider/example_dags/example_fivetran_xcom.py b/fivetran_provider/example_dags/example_fivetran_xcom.py new file mode 100644 index 0000000..09b4f85 --- /dev/null +++ b/fivetran_provider/example_dags/example_fivetran_xcom.py @@ -0,0 +1,42 @@ +import time +from airflow import DAG + +from fivetran_provider.operators.fivetran import FivetranOperator +from fivetran_provider.sensors.fivetran import FivetranSensor +from airflow.operators.python import PythonOperator + +from datetime import datetime, timedelta + + +default_args = { + "owner": "Airflow", + "start_date": datetime(2021, 4, 6), + "provide_context": True +} + +dag = DAG( + dag_id='example_fivetran_xcom', + default_args=default_args, + schedule_interval=timedelta(days=1), + catchup=False, +) + +with dag: + fivetran_operator = FivetranOperator( + task_id='fivetran-operator', + fivetran_conn_id='fivetran_default', + connector_id="{{ var.value.connector_id }}", + ) + + delay_task = PythonOperator(task_id="delay_python_task", + python_callable=lambda: time.sleep(60)) + + fivetran_sensor = FivetranSensor( + task_id='fivetran-sensor', + fivetran_conn_id='fivetran_default', + connector_id="{{ var.value.connector_id }}", + poke_interval=5, + xcom="{{ task_instance.xcom_pull('fivetran-operator', key='return_value') }}", + ) + + fivetran_operator >> delay_task >> fivetran_sensor diff --git a/fivetran_provider/hooks/fivetran.py b/fivetran_provider/hooks/fivetran.py index 060a3cc..5dcbf53 100644 --- a/fivetran_provider/hooks/fivetran.py +++ b/fivetran_provider/hooks/fivetran.py @@ -231,9 +231,15 @@ def start_fivetran_sync(self, connector_id): :type connector_id: str """ endpoint = self.api_path_connectors + connector_id + "/force" - return self._do_api_call(("POST", endpoint)) + self._do_api_call(("POST", endpoint)) + connector_details = self.get_connector(connector_id) + succeeded_at = connector_details["succeeded_at"] + failed_at = connector_details["failed_at"] + last_sync = succeeded_at if succeeded_at > failed_at else failed_at + + return last_sync - def get_last_sync(self, connector_id): + def get_last_sync(self, connector_id, xcom=""): """ Get the last time Fivetran connector completed a sync. Used with FivetranSensor to monitor sync completion status. @@ -241,10 +247,14 @@ def get_last_sync(self, connector_id): page in the Fivetran user interface. :type connector_id: str """ - connector_details = self.get_connector(connector_id) - succeeded_at = self._parse_timestamp(connector_details["succeeded_at"]) - failed_at = self._parse_timestamp(connector_details["failed_at"]) - return succeeded_at if succeeded_at > failed_at else failed_at + if xcom: + last_sync = self._parse_timestamp(xcom) + else: + connector_details = self.get_connector(connector_id) + succeeded_at = self._parse_timestamp(connector_details["succeeded_at"]) + failed_at = self._parse_timestamp(connector_details["failed_at"]) + last_sync = succeeded_at if succeeded_at > failed_at else failed_at + return last_sync def get_sync_status(self, connector_id, previous_completed_at): """ @@ -264,7 +274,15 @@ def get_sync_status(self, connector_id, previous_completed_at): current_completed_at = ( succeeded_at if succeeded_at > failed_at else failed_at ) + + self.log.info('Connector prev: "{}"'.format( + previous_completed_at) + ) + self.log.info('Connector curr: "{}"'.format( + current_completed_at) + ) + # The only way to tell if a sync failed is to check if its latest # failed_at value is greater than then last known "sync completed at" value. if failed_at > previous_completed_at: @@ -313,3 +331,4 @@ def _retryable_error(exception) -> bool: or exception.response is not None and exception.response.status_code >= 500 ) + diff --git a/fivetran_provider/sensors/fivetran.py b/fivetran_provider/sensors/fivetran.py index 35e3481..b16b616 100644 --- a/fivetran_provider/sensors/fivetran.py +++ b/fivetran_provider/sensors/fivetran.py @@ -37,7 +37,7 @@ class FivetranSensor(BaseSensorOperator): """ # Define which fields get jinjaified - template_fields = ["connector_id"] + template_fields = ["connector_id", "xcom"] @apply_defaults def __init__( @@ -47,6 +47,7 @@ def __init__( poke_interval: int = 60, fivetran_retry_limit: int = 3, fivetran_retry_delay: int = 1, + xcom: str = "", **kwargs: Any ) -> None: super().__init__(**kwargs) @@ -57,7 +58,7 @@ def __init__( self.fivetran_retry_limit = fivetran_retry_limit self.fivetran_retry_delay = fivetran_retry_delay self.hook = None - + self.xcom = xcom def _get_hook(self) -> FivetranHook: if self.hook is None: self.hook = FivetranHook( @@ -70,5 +71,5 @@ def _get_hook(self) -> FivetranHook: def poke(self, context): hook = self._get_hook() if self.previous_completed_at is None: - self.previous_completed_at = hook.get_last_sync(self.connector_id) + self.previous_completed_at = hook.get_last_sync(self.connector_id, self.xcom) return hook.get_sync_status(self.connector_id, self.previous_completed_at)