diff --git a/fivetran_provider/example_dags/example_fivetran.py b/fivetran_provider/example_dags/example_fivetran.py index 96ed72f..e0308d5 100644 --- a/fivetran_provider/example_dags/example_fivetran.py +++ b/fivetran_provider/example_dags/example_fivetran.py @@ -12,7 +12,7 @@ } dag = DAG( - dag_id='example_fivetran', + dag_id="example_fivetran", default_args=default_args, schedule_interval=timedelta(days=1), catchup=False, @@ -20,14 +20,14 @@ with dag: fivetran_sync_start = FivetranOperator( - task_id='fivetran-task', - fivetran_conn_id='fivetran_default', + task_id="fivetran-task", + fivetran_conn_id="fivetran_default", connector_id="{{ var.value.connector_id }}", ) fivetran_sync_wait = FivetranSensor( - task_id='fivetran-sensor', - fivetran_conn_id='fivetran_default', + task_id="fivetran-sensor", + fivetran_conn_id="fivetran_default", connector_id="{{ var.value.connector_id }}", poke_interval=5, ) diff --git a/fivetran_provider/example_dags/example_fivetran_bigquery.py b/fivetran_provider/example_dags/example_fivetran_bigquery.py index 8312cb7..eeb5345 100644 --- a/fivetran_provider/example_dags/example_fivetran_bigquery.py +++ b/fivetran_provider/example_dags/example_fivetran_bigquery.py @@ -7,23 +7,25 @@ from airflow.utils.dates import datetime -TABLE='forestfires' -DATASET='google_sheets' +TABLE = "forestfires" +DATASET = "google_sheets" # These args will get passed on to each operator # You can override them on a per-task basis during operator initialization default_args = { - 'owner': 'astronomer', - 'depends_on_past': False, - 'start_date': datetime(2021, 7, 7), - 'email': ['noreply@astronomer.io'], - 'email_on_failure': False + "owner": "astronomer", + "depends_on_past": False, + "start_date": datetime(2021, 7, 7), + "email": ["noreply@astronomer.io"], + "email_on_failure": False, } -with DAG('example_fivetran_bigquery', - default_args=default_args, - description='', - schedule_interval=None, - catchup=False) as dag: +with DAG( + "example_fivetran_bigquery", + default_args=default_args, + description="", + schedule_interval=None, + catchup=False, +) as dag: """ ### Simple EL Pipeline with Data Integrity and Quality Checks Before running the DAG, set the following in an Airflow or Environment Variables: @@ -44,16 +46,16 @@ The FivetranSensor monitors the status of the Fivetran data sync """ fivetran_sync_start = FivetranOperator( - task_id='fivetran-task', - fivetran_conn_id='fivetran_default', - connector_id='{{ var.value.connector_id }}' + task_id="fivetran-task", + fivetran_conn_id="fivetran_default", + connector_id="{{ var.value.connector_id }}", ) fivetran_sync_wait = FivetranSensor( - task_id='fivetran-sensor', - fivetran_conn_id='fivetran_default', - connector_id='{{ var.value.connector_id }}', - poke_interval=5 + task_id="fivetran-sensor", + fivetran_conn_id="fivetran_default", + connector_id="{{ var.value.connector_id }}", + poke_interval=5, ) """ @@ -62,10 +64,10 @@ exist. """ validate_bigquery = BigQueryTableExistenceSensor( - task_id='validate_bigquery', - project_id='{{ var.value.gcp_project_id }}', + task_id="validate_bigquery", + project_id="{{ var.value.gcp_project_id }}", dataset_id=DATASET, - table_id='forestfires', + table_id="forestfires", ) """ @@ -80,7 +82,7 @@ use_legacy_sql=False, ) - done = DummyOperator(task_id='done') + done = DummyOperator(task_id="done") fivetran_sync_start >> fivetran_sync_wait >> validate_bigquery validate_bigquery >> check_bq_row_count >> done diff --git a/fivetran_provider/example_dags/example_fivetran_bqml.py b/fivetran_provider/example_dags/example_fivetran_bqml.py index 61fcb1f..7bcd1cf 100644 --- a/fivetran_provider/example_dags/example_fivetran_bqml.py +++ b/fivetran_provider/example_dags/example_fivetran_bqml.py @@ -5,45 +5,56 @@ from fivetran_provider.operators.fivetran import FivetranOperator from fivetran_provider.sensors.fivetran import FivetranSensor from airflow.providers.google.cloud.hooks.compute_ssh import ComputeEngineSSHHook -from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator, BigQueryGetDataOperator +from airflow.providers.google.cloud.operators.bigquery import ( + BigQueryExecuteQueryOperator, + BigQueryGetDataOperator, +) from datetime import datetime, timedelta -#EDIT WITH YOUR PROJECT ID & DATASET NAME +# EDIT WITH YOUR PROJECT ID & DATASET NAME PROJECT_ID = "YOUR PROJECT ID" DATASET_NAME = "bqml" DESTINATION_TABLE = "dbt_ads_bqml_preds" -TRAINING_QUERY = "CREATE OR REPLACE MODEL bqml.dbt_ads_airflow_model " \ - "OPTIONS " \ - "(model_type = 'ARIMA_PLUS', " \ - "time_series_timestamp_col = 'parsed_date', " \ - "time_series_data_col = 'daily_impressions', " \ - "auto_arima = TRUE, " \ - "data_frequency = 'AUTO_FREQUENCY', " \ - "decompose_time_series = TRUE " \ - ") AS " \ - "SELECT " \ - "timestamp(date_day) as parsed_date, " \ - "SUM(impressions) as daily_impressions " \ - "FROM `" + PROJECT_ID + ".bqml.ad_reporting` " \ - "GROUP BY date_day;" - -SERVING_QUERY = "SELECT string(forecast_timestamp) as forecast_timestamp, " \ - "forecast_value, " \ - "standard_error, " \ - "confidence_level, " \ - "prediction_interval_lower_bound, " \ - "prediction_interval_upper_bound, " \ - "confidence_interval_lower_bound, " \ - "confidence_interval_upper_bound " \ - "FROM ML.FORECAST(MODEL `" + PROJECT_ID + ".bqml.dbt_ads_airflow_model`,STRUCT(30 AS horizon, 0.8 AS confidence_level));" - -def ml_branch(ds,**kwargs): - if 'train' in kwargs['params'] and kwargs['params']['train']: - return 'train_model' +TRAINING_QUERY = ( + "CREATE OR REPLACE MODEL bqml.dbt_ads_airflow_model " + "OPTIONS " + "(model_type = 'ARIMA_PLUS', " + "time_series_timestamp_col = 'parsed_date', " + "time_series_data_col = 'daily_impressions', " + "auto_arima = TRUE, " + "data_frequency = 'AUTO_FREQUENCY', " + "decompose_time_series = TRUE " + ") AS " + "SELECT " + "timestamp(date_day) as parsed_date, " + "SUM(impressions) as daily_impressions " + "FROM `" + PROJECT_ID + ".bqml.ad_reporting` " + "GROUP BY date_day;" +) + +SERVING_QUERY = ( + "SELECT string(forecast_timestamp) as forecast_timestamp, " + "forecast_value, " + "standard_error, " + "confidence_level, " + "prediction_interval_lower_bound, " + "prediction_interval_upper_bound, " + "confidence_interval_lower_bound, " + "confidence_interval_upper_bound " + "FROM ML.FORECAST(MODEL `" + + PROJECT_ID + + ".bqml.dbt_ads_airflow_model`,STRUCT(30 AS horizon, 0.8 AS confidence_level));" +) + + +def ml_branch(ds, **kwargs): + if "train" in kwargs["params"] and kwargs["params"]["train"]: + return "train_model" else: - return 'get_predictions' + return "get_predictions" + default_args = { "owner": "Airflow", @@ -51,7 +62,7 @@ def ml_branch(ds,**kwargs): } dag = DAG( - dag_id='example_fivetran_bqml', + dag_id="example_fivetran_bqml", default_args=default_args, schedule_interval=timedelta(days=1), catchup=False, @@ -59,71 +70,61 @@ def ml_branch(ds,**kwargs): with dag: linkedin_sync = FivetranOperator( - task_id='linkedin-sync', - fivetran_conn_id='fivetran_default', + task_id="linkedin-sync", + fivetran_conn_id="fivetran_default", connector_id="{{ var.value.linkedin_connector_id }}", ) linkedin_sensor = FivetranSensor( - task_id='linkedin-sensor', - fivetran_conn_id='fivetran_default', + task_id="linkedin-sensor", + fivetran_conn_id="fivetran_default", connector_id="{{ var.value.linkedin_connector_id }}", poke_interval=5, ) twitter_sync = FivetranOperator( - task_id='twitter-sync', - fivetran_conn_id='fivetran_default', + task_id="twitter-sync", + fivetran_conn_id="fivetran_default", connector_id="{{ var.value.twitter_connector_id }}", ) twitter_sensor = FivetranSensor( - task_id='twitter-sensor', - fivetran_conn_id='fivetran_default', + task_id="twitter-sensor", + fivetran_conn_id="fivetran_default", connector_id="{{ var.value.twitter_connector_id }}", poke_interval=5, ) - + dbt_run = SSHOperator( - task_id='dbt_ad_reporting', - command='cd dbt_ad_reporting ; ~/.local/bin/dbt run -m +ad_reporting', - ssh_conn_id='dbtvm' + task_id="dbt_ad_reporting", + command="cd dbt_ad_reporting ; ~/.local/bin/dbt run -m +ad_reporting", + ssh_conn_id="dbtvm", ) - ml_branch = BranchPythonOperator( - task_id='ml_branch', - python_callable=ml_branch, - provide_context=True + task_id="ml_branch", python_callable=ml_branch, provide_context=True ) - train_model = BigQueryExecuteQueryOperator( - task_id="train_model", - sql=TRAINING_QUERY, - use_legacy_sql=False + task_id="train_model", sql=TRAINING_QUERY, use_legacy_sql=False ) - get_preds = BigQueryExecuteQueryOperator( task_id="get_predictions", sql=SERVING_QUERY, use_legacy_sql=False, destination_dataset_table=DATASET_NAME + "." + DESTINATION_TABLE, - write_disposition="WRITE_APPEND" + write_disposition="WRITE_APPEND", ) - print_preds = BigQueryGetDataOperator( - task_id="print_predictions", - dataset_id=DATASET_NAME, - table_id=DESTINATION_TABLE + task_id="print_predictions", dataset_id=DATASET_NAME, table_id=DESTINATION_TABLE ) linkedin_sync >> linkedin_sensor twitter_sync >> twitter_sensor - + [linkedin_sensor, twitter_sensor] >> dbt_run - dbt_run >> ml_branch >> [train_model, get_preds] + dbt_run >> ml_branch >> [train_model, get_preds] get_preds >> print_preds diff --git a/fivetran_provider/example_dags/example_fivetran_dbt.py b/fivetran_provider/example_dags/example_fivetran_dbt.py index af7a5ad..0c80aca 100644 --- a/fivetran_provider/example_dags/example_fivetran_dbt.py +++ b/fivetran_provider/example_dags/example_fivetran_dbt.py @@ -16,8 +16,8 @@ dag_id="ad_reporting_dag", default_args=default_args, schedule_interval=timedelta(days=1), - catchup=False - ) as dag: + catchup=False, +) as dag: linkedin_sync = FivetranOperator( task_id="linkedin-ads-sync", diff --git a/fivetran_provider/example_dags/example_fivetran_xcom.py b/fivetran_provider/example_dags/example_fivetran_xcom.py index 09b4f85..11e89e7 100644 --- a/fivetran_provider/example_dags/example_fivetran_xcom.py +++ b/fivetran_provider/example_dags/example_fivetran_xcom.py @@ -1,4 +1,4 @@ -import time +import time from airflow import DAG from fivetran_provider.operators.fivetran import FivetranOperator @@ -11,11 +11,11 @@ default_args = { "owner": "Airflow", "start_date": datetime(2021, 4, 6), - "provide_context": True + "provide_context": True, } dag = DAG( - dag_id='example_fivetran_xcom', + dag_id="example_fivetran_xcom", default_args=default_args, schedule_interval=timedelta(days=1), catchup=False, @@ -23,17 +23,18 @@ with dag: fivetran_operator = FivetranOperator( - task_id='fivetran-operator', - fivetran_conn_id='fivetran_default', + task_id="fivetran-operator", + fivetran_conn_id="fivetran_default", connector_id="{{ var.value.connector_id }}", ) - delay_task = PythonOperator(task_id="delay_python_task", - python_callable=lambda: time.sleep(60)) + delay_task = PythonOperator( + task_id="delay_python_task", python_callable=lambda: time.sleep(60) + ) fivetran_sensor = FivetranSensor( - task_id='fivetran-sensor', - fivetran_conn_id='fivetran_default', + task_id="fivetran-sensor", + fivetran_conn_id="fivetran_default", connector_id="{{ var.value.connector_id }}", poke_interval=5, xcom="{{ task_instance.xcom_pull('fivetran-operator', key='return_value') }}", diff --git a/fivetran_provider/hooks/fivetran.py b/fivetran_provider/hooks/fivetran.py index 5dcbf53..6789ed4 100644 --- a/fivetran_provider/hooks/fivetran.py +++ b/fivetran_provider/hooks/fivetran.py @@ -27,39 +27,39 @@ class FivetranHook(BaseHook): :type retry_delay: float """ - conn_name_attr = 'fivetran_conn_id' - default_conn_name = 'fivetran_default' - conn_type = 'fivetran' - hook_name = 'Fivetran' - api_user_agent = 'airflow_provider_fivetran/1.0.3' - api_protocol = 'https' - api_host = 'api.fivetran.com' - api_path_connectors = 'v1/connectors/' + conn_name_attr = "fivetran_conn_id" + default_conn_name = "fivetran_default" + conn_type = "fivetran" + hook_name = "Fivetran" + api_user_agent = "airflow_provider_fivetran/1.0.3" + api_protocol = "https" + api_host = "api.fivetran.com" + api_path_connectors = "v1/connectors/" @staticmethod def get_ui_field_behaviour() -> Dict: """Returns custom field behaviour""" return { - "hidden_fields": ['schema', 'port', 'extra', 'host'], + "hidden_fields": ["schema", "port", "extra", "host"], "relabeling": { - 'login': 'Fivetran API Key', - 'password': 'Fivetran API Secret', + "login": "Fivetran API Key", + "password": "Fivetran API Secret", }, "placeholders": { - 'login': 'api key', - 'password': 'api secret', + "login": "api key", + "password": "api secret", }, } def __init__( self, fivetran_conn_id: str = "fivetran", - fivetran_conn = None, + fivetran_conn=None, timeout_seconds: int = 180, retry_limit: int = 3, retry_delay: float = 1.0, ) -> None: - super().__init__(None) # Passing None fixes a runtime problem in Airflow 1 + super().__init__(None) # Passing None fixes a runtime problem in Airflow 1 self.conn_id = fivetran_conn_id self.fivetran_conn = fivetran_conn self.timeout_seconds = timeout_seconds @@ -86,9 +86,7 @@ def _do_api_call(self, endpoint_info, json=None): auth = (self.fivetran_conn.login, self.fivetran_conn.password) url = f"{self.api_protocol}://{self.api_host}/{endpoint}" - headers = { - "User-Agent": self.api_user_agent - } + headers = {"User-Agent": self.api_user_agent} if method == "GET": request_func = requests.get @@ -108,7 +106,7 @@ def _do_api_call(self, endpoint_info, json=None): data=json if method in ("POST", "PATCH") else None, params=json if method in ("GET") else None, auth=auth, - headers=headers + headers=headers, ) response.raise_for_status() return response.json() @@ -157,6 +155,7 @@ def get_connector(self, connector_id): :param connector_id: Fivetran connector_id, found in connector settings page in the Fivetran user interface. :type connector_id: str + :return: connector details :rtype: Dict """ if connector_id == "": @@ -165,7 +164,6 @@ def get_connector(self, connector_id): resp = self._do_api_call(("GET", endpoint)) return resp["data"] - def check_connector(self, connector_id): """ Ensures connector configuration has been completed successfully and is in @@ -199,18 +197,18 @@ def set_schedule_type(self, connector_id, schedule_type): :param connector_id: Fivetran connector_id, found in connector settings page in the Fivetran user interface. :type connector_id: str - :param schedule_type: Either "manual" (sync schedule only controlled via Airlow) or "auto" (sync schedule controlled via Fivetran) + :param schedule_type: "manual" (schedule controlled via Airlow) or "auto" (schedule controlled via Fivetran) :type schedule_type: str """ endpoint = self.api_path_connectors + connector_id return self._do_api_call( - ("PATCH", endpoint), - json.dumps({"schedule_type": schedule_type}) + ("PATCH", endpoint), json.dumps({"schedule_type": schedule_type}) ) def prep_connector(self, connector_id, schedule_type): """ - Prepare the connector to run in Airflow by checking that it exists and is a good state, then update connector sync schedule type if changed. + Prepare the connector to run in Airflow by checking that it exists and is a good state, + then update connector sync schedule type if changed. :param connector_id: Fivetran connector_id, found in connector settings page in the Fivetran user interface. :type connector_id: str @@ -229,6 +227,8 @@ def start_fivetran_sync(self, connector_id): :param connector_id: Fivetran connector_id, found in connector settings page in the Fivetran user interface. :type connector_id: str + :return: Timestamp of previously completed sync + :rtype: str """ endpoint = self.api_path_connectors + connector_id + "/force" self._do_api_call(("POST", endpoint)) @@ -236,7 +236,7 @@ def start_fivetran_sync(self, connector_id): succeeded_at = connector_details["succeeded_at"] failed_at = connector_details["failed_at"] last_sync = succeeded_at if succeeded_at > failed_at else failed_at - + return last_sync def get_last_sync(self, connector_id, xcom=""): @@ -246,9 +246,13 @@ def get_last_sync(self, connector_id, xcom=""): :param connector_id: Fivetran connector_id, found in connector settings page in the Fivetran user interface. :type connector_id: str + :param xcom: Timestamp as string pull from FivetranOperator via XCOM + :type xcom: str + :return: Timestamp of last completed sync + :rtype: Pendulum.DateTime """ if xcom: - last_sync = self._parse_timestamp(xcom) + last_sync = self._parse_timestamp(xcom) else: connector_details = self.get_connector(connector_id) succeeded_at = self._parse_timestamp(connector_details["succeeded_at"]) @@ -271,18 +275,8 @@ def get_sync_status(self, connector_id, previous_completed_at): connector_details = self.get_connector(connector_id) succeeded_at = self._parse_timestamp(connector_details["succeeded_at"]) failed_at = self._parse_timestamp(connector_details["failed_at"]) - current_completed_at = ( - succeeded_at if succeeded_at > failed_at else failed_at - ) - - self.log.info('Connector prev: "{}"'.format( - previous_completed_at) - ) + current_completed_at = succeeded_at if succeeded_at > failed_at else failed_at - self.log.info('Connector curr: "{}"'.format( - current_completed_at) - ) - # The only way to tell if a sync failed is to check if its latest # failed_at value is greater than then last known "sync completed at" value. if failed_at > previous_completed_at: @@ -300,8 +294,10 @@ def get_sync_status(self, connector_id, previous_completed_at): # Check if sync started by FivetranOperator has finished # indicated by new 'succeeded_at' timestamp if current_completed_at > previous_completed_at: - self.log.info('Connector "{}": succeeded_at: {}'.format( - connector_id, succeeded_at.to_iso8601_string()) + self.log.info( + 'Connector "{}": succeeded_at: {}'.format( + connector_id, succeeded_at.to_iso8601_string() + ) ) return True else: @@ -331,4 +327,3 @@ def _retryable_error(exception) -> bool: or exception.response is not None and exception.response.status_code >= 500 ) - diff --git a/fivetran_provider/sensors/fivetran.py b/fivetran_provider/sensors/fivetran.py index b16b616..50c2485 100644 --- a/fivetran_provider/sensors/fivetran.py +++ b/fivetran_provider/sensors/fivetran.py @@ -34,6 +34,9 @@ class FivetranSensor(BaseSensorOperator): :type fivetran_retry_limit: Optional[int] :param fivetran_retry_delay: Time to wait before retrying API request :type fivetran_retry_delay: int + :param xcom: If used, FivetranSensor receives timestamp of previously + completed sync from FivetranOperator via XCOM + :type xcom: str """ # Define which fields get jinjaified @@ -59,6 +62,7 @@ def __init__( self.fivetran_retry_delay = fivetran_retry_delay self.hook = None self.xcom = xcom + def _get_hook(self) -> FivetranHook: if self.hook is None: self.hook = FivetranHook( @@ -71,5 +75,7 @@ def _get_hook(self) -> FivetranHook: def poke(self, context): hook = self._get_hook() if self.previous_completed_at is None: - self.previous_completed_at = hook.get_last_sync(self.connector_id, self.xcom) + self.previous_completed_at = hook.get_last_sync( + self.connector_id, self.xcom + ) return hook.get_sync_status(self.connector_id, self.previous_completed_at)