From 194959a9880228e342be902dd0b0c79178784435 Mon Sep 17 00:00:00 2001 From: Alekhya Kommasani Date: Mon, 25 Jul 2022 10:25:17 -0400 Subject: [PATCH] Replace the hook.py with recent version airflow-provider-fivetran==1.1.2 The link to the new hook.py can be found here https://github.com/fivetran/airflow-provider-fivetran/blob/main/fivetran_provider/hooks/fivetran.py. This is to catch the short syncs --- dags/operators/backport/fivetran/hook.py | 109 ++++++++++++++--------- 1 file changed, 67 insertions(+), 42 deletions(-) diff --git a/dags/operators/backport/fivetran/hook.py b/dags/operators/backport/fivetran/hook.py index 0e337fc02..012ce10f2 100644 --- a/dags/operators/backport/fivetran/hook.py +++ b/dags/operators/backport/fivetran/hook.py @@ -29,27 +29,27 @@ class FivetranHook(BaseHook): :type retry_delay: float """ - conn_name_attr = 'fivetran_conn_id' - default_conn_name = 'fivetran_default' - conn_type = 'fivetran' - hook_name = 'Fivetran' - api_user_agent = 'airflow_provider_fivetran/1.0.1' - api_protocol = 'https' - api_host = 'api.fivetran.com' - api_path_connectors = 'v1/connectors/' + conn_name_attr = "fivetran_conn_id" + default_conn_name = "fivetran_default" + conn_type = "fivetran" + hook_name = "Fivetran" + api_user_agent = "airflow_provider_fivetran/1.1.2" + api_protocol = "https" + api_host = "api.fivetran.com" + api_path_connectors = "v1/connectors/" @staticmethod def get_ui_field_behaviour() -> Dict: """Returns custom field behaviour""" return { - "hidden_fields": ['schema', 'port', 'extra', 'host'], + "hidden_fields": ["schema", "port", "extra", "host"], "relabeling": { - 'login': 'Fivetran API Key', - 'password': 'Fivetran API Secret', + "login": "Fivetran API Key", + "password": "Fivetran API Secret", }, "placeholders": { - 'login': 'api key', - 'password': 'api secret', + "login": "api key", + "password": "api secret", }, } @@ -61,7 +61,7 @@ def __init__( retry_limit: int = 3, retry_delay: float = 1.0, ) -> None: - super().__init__(None) # Passing None fixes a runtime problem in Airflow 1 + super().__init__(None) # Passing None fixes a runtime problem in Airflow 1 self.conn_id = fivetran_conn_id self.fivetran_conn = fivetran_conn self.timeout_seconds = timeout_seconds @@ -88,9 +88,7 @@ def _do_api_call(self, endpoint_info, json=None): auth = (self.fivetran_conn.login, self.fivetran_conn.password) url = f"{self.api_protocol}://{self.api_host}/{endpoint}" - headers = { - "User-Agent": self.api_user_agent - } + headers = {"User-Agent": self.api_user_agent} if method == "GET": request_func = requests.get @@ -110,7 +108,7 @@ def _do_api_call(self, endpoint_info, json=None): data=json if method in ("POST", "PATCH") else None, params=json if method in ("GET") else None, auth=auth, - headers=headers + headers=headers, ) response.raise_for_status() return response.json() @@ -159,6 +157,7 @@ def get_connector(self, connector_id): :param connector_id: Fivetran connector_id, found in connector settings page in the Fivetran user interface. :type connector_id: str + :return: connector details :rtype: Dict """ if connector_id == "": @@ -167,7 +166,6 @@ def get_connector(self, connector_id): resp = self._do_api_call(("GET", endpoint)) return resp["data"] - def check_connector(self, connector_id): """ Ensures connector configuration has been completed successfully and is in @@ -195,31 +193,35 @@ def check_connector(self, connector_id): ) return True - def set_manual_schedule(self, connector_id): + def set_schedule_type(self, connector_id, schedule_type): """ - Set connector to manual sync mode, required to force sync through the API. - Syncs will no longer be performed automatically and must be started - via the API. + Set connector sync mode to switch sync control between API and UI. :param connector_id: Fivetran connector_id, found in connector settings page in the Fivetran user interface. :type connector_id: str + :param schedule_type: "manual" (schedule controlled via Airlow) or "auto" (schedule controlled via Fivetran) + :type schedule_type: str """ endpoint = self.api_path_connectors + connector_id return self._do_api_call( - ("PATCH", endpoint), - json.dumps({"schedule_type": "manual"}) + ("PATCH", endpoint), json.dumps({"schedule_type": schedule_type}) ) - def prep_connector(self, connector_id): + def prep_connector(self, connector_id, schedule_type): """ - Prepare the connector to run in Airflow by checking that it exists and is a good state, then taking it off of Fivetran's schedule to be managed by Airflow's. + Prepare the connector to run in Airflow by checking that it exists and is a good state, + then update connector sync schedule type if changed. :param connector_id: Fivetran connector_id, found in connector settings page in the Fivetran user interface. :type connector_id: str + :param schedule_type: Fivetran connector schedule type + :type schedule_type: str """ self.check_connector(connector_id) - if self.get_connector(connector_id)['schedule_type'] != 'manual': - return self.set_manual_schedule(connector_id) + if schedule_type not in {"manual", "auto"}: + raise ValueError('schedule_type must be either "manual" or "auto"') + if self.get_connector(connector_id)["schedule_type"] != schedule_type: + return self.set_schedule_type(connector_id, schedule_type) return True def start_fivetran_sync(self, connector_id): @@ -227,22 +229,45 @@ def start_fivetran_sync(self, connector_id): :param connector_id: Fivetran connector_id, found in connector settings page in the Fivetran user interface. :type connector_id: str + :return: Timestamp of previously completed sync + :rtype: str """ - endpoint = self.api_path_connectors + connector_id + "/force" - return self._do_api_call(("POST", endpoint)) + connector_details = self.get_connector(connector_id) + succeeded_at = connector_details["succeeded_at"] + failed_at = connector_details["failed_at"] + endpoint = self.api_path_connectors + connector_id + if self._do_api_call(("GET", endpoint))['data']['paused'] == True: + self._do_api_call(("PATCH", endpoint),json.dumps({"paused": False})) + if succeeded_at == None and failed_at == None: + succeeded_at = str(pendulum.now()) + self._do_api_call(("POST", endpoint + "/force")) + last_sync = ( + succeeded_at + if self._parse_timestamp(succeeded_at) > self._parse_timestamp(failed_at) + else failed_at + ) + return last_sync - def get_last_sync(self, connector_id): + def get_last_sync(self, connector_id, xcom=""): """ Get the last time Fivetran connector completed a sync. Used with FivetranSensor to monitor sync completion status. :param connector_id: Fivetran connector_id, found in connector settings page in the Fivetran user interface. :type connector_id: str + :param xcom: Timestamp as string pull from FivetranOperator via XCOM + :type xcom: str + :return: Timestamp of last completed sync + :rtype: Pendulum.DateTime """ - connector_details = self.get_connector(connector_id) - succeeded_at = self._parse_timestamp(connector_details["succeeded_at"]) - failed_at = self._parse_timestamp(connector_details["failed_at"]) - return succeeded_at if succeeded_at > failed_at else failed_at + if xcom: + last_sync = self._parse_timestamp(xcom) + else: + connector_details = self.get_connector(connector_id) + succeeded_at = self._parse_timestamp(connector_details["succeeded_at"]) + failed_at = self._parse_timestamp(connector_details["failed_at"]) + last_sync = succeeded_at if succeeded_at > failed_at else failed_at + return last_sync def get_sync_status(self, connector_id, previous_completed_at): """ @@ -259,9 +284,7 @@ def get_sync_status(self, connector_id, previous_completed_at): connector_details = self.get_connector(connector_id) succeeded_at = self._parse_timestamp(connector_details["succeeded_at"]) failed_at = self._parse_timestamp(connector_details["failed_at"]) - current_completed_at = ( - succeeded_at if succeeded_at > failed_at else failed_at - ) + current_completed_at = succeeded_at if succeeded_at > failed_at else failed_at # The only way to tell if a sync failed is to check if its latest # failed_at value is greater than then last known "sync completed at" value. @@ -280,8 +303,10 @@ def get_sync_status(self, connector_id, previous_completed_at): # Check if sync started by FivetranOperator has finished # indicated by new 'succeeded_at' timestamp if current_completed_at > previous_completed_at: - self.log.info('Connector "{}": succeeded_at: {}'.format( - connector_id, succeeded_at.to_iso8601_string()) + self.log.info( + 'Connector "{}": succeeded_at: {}'.format( + connector_id, succeeded_at.to_iso8601_string() + ) ) return True else: @@ -310,4 +335,4 @@ def _retryable_error(exception) -> bool: ) or exception.response is not None and exception.response.status_code >= 500 - ) + ) \ No newline at end of file