Skip to content
This repository has been archived by the owner on Jun 19, 2023. It is now read-only.

Commit

Permalink
black formatting and documentation
Browse files Browse the repository at this point in the history
  • Loading branch information
PubChimps committed Jun 14, 2022
1 parent bc19374 commit 849ba88
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 140 deletions.
10 changes: 5 additions & 5 deletions fivetran_provider/example_dags/example_fivetran.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,22 @@
}

dag = DAG(
dag_id='example_fivetran',
dag_id="example_fivetran",
default_args=default_args,
schedule_interval=timedelta(days=1),
catchup=False,
)

with dag:
fivetran_sync_start = FivetranOperator(
task_id='fivetran-task',
fivetran_conn_id='fivetran_default',
task_id="fivetran-task",
fivetran_conn_id="fivetran_default",
connector_id="{{ var.value.connector_id }}",
)

fivetran_sync_wait = FivetranSensor(
task_id='fivetran-sensor',
fivetran_conn_id='fivetran_default',
task_id="fivetran-sensor",
fivetran_conn_id="fivetran_default",
connector_id="{{ var.value.connector_id }}",
poke_interval=5,
)
Expand Down
48 changes: 25 additions & 23 deletions fivetran_provider/example_dags/example_fivetran_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,23 +7,25 @@
from airflow.utils.dates import datetime


TABLE='forestfires'
DATASET='google_sheets'
TABLE = "forestfires"
DATASET = "google_sheets"
# These args will get passed on to each operator
# You can override them on a per-task basis during operator initialization
default_args = {
'owner': 'astronomer',
'depends_on_past': False,
'start_date': datetime(2021, 7, 7),
'email': ['[email protected]'],
'email_on_failure': False
"owner": "astronomer",
"depends_on_past": False,
"start_date": datetime(2021, 7, 7),
"email": ["[email protected]"],
"email_on_failure": False,
}

with DAG('example_fivetran_bigquery',
default_args=default_args,
description='',
schedule_interval=None,
catchup=False) as dag:
with DAG(
"example_fivetran_bigquery",
default_args=default_args,
description="",
schedule_interval=None,
catchup=False,
) as dag:
"""
### Simple EL Pipeline with Data Integrity and Quality Checks
Before running the DAG, set the following in an Airflow or Environment Variables:
Expand All @@ -44,16 +46,16 @@
The FivetranSensor monitors the status of the Fivetran data sync
"""
fivetran_sync_start = FivetranOperator(
task_id='fivetran-task',
fivetran_conn_id='fivetran_default',
connector_id='{{ var.value.connector_id }}'
task_id="fivetran-task",
fivetran_conn_id="fivetran_default",
connector_id="{{ var.value.connector_id }}",
)

fivetran_sync_wait = FivetranSensor(
task_id='fivetran-sensor',
fivetran_conn_id='fivetran_default',
connector_id='{{ var.value.connector_id }}',
poke_interval=5
task_id="fivetran-sensor",
fivetran_conn_id="fivetran_default",
connector_id="{{ var.value.connector_id }}",
poke_interval=5,
)

"""
Expand All @@ -62,10 +64,10 @@
exist.
"""
validate_bigquery = BigQueryTableExistenceSensor(
task_id='validate_bigquery',
project_id='{{ var.value.gcp_project_id }}',
task_id="validate_bigquery",
project_id="{{ var.value.gcp_project_id }}",
dataset_id=DATASET,
table_id='forestfires',
table_id="forestfires",
)

"""
Expand All @@ -80,7 +82,7 @@
use_legacy_sql=False,
)

done = DummyOperator(task_id='done')
done = DummyOperator(task_id="done")

fivetran_sync_start >> fivetran_sync_wait >> validate_bigquery
validate_bigquery >> check_bq_row_count >> done
121 changes: 61 additions & 60 deletions fivetran_provider/example_dags/example_fivetran_bqml.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,125 +5,126 @@
from fivetran_provider.operators.fivetran import FivetranOperator
from fivetran_provider.sensors.fivetran import FivetranSensor
from airflow.providers.google.cloud.hooks.compute_ssh import ComputeEngineSSHHook
from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator, BigQueryGetDataOperator
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryExecuteQueryOperator,
BigQueryGetDataOperator,
)

from datetime import datetime, timedelta

#EDIT WITH YOUR PROJECT ID & DATASET NAME
# EDIT WITH YOUR PROJECT ID & DATASET NAME
PROJECT_ID = "YOUR PROJECT ID"
DATASET_NAME = "bqml"
DESTINATION_TABLE = "dbt_ads_bqml_preds"

TRAINING_QUERY = "CREATE OR REPLACE MODEL bqml.dbt_ads_airflow_model " \
"OPTIONS " \
"(model_type = 'ARIMA_PLUS', " \
"time_series_timestamp_col = 'parsed_date', " \
"time_series_data_col = 'daily_impressions', " \
"auto_arima = TRUE, " \
"data_frequency = 'AUTO_FREQUENCY', " \
"decompose_time_series = TRUE " \
") AS " \
"SELECT " \
"timestamp(date_day) as parsed_date, " \
"SUM(impressions) as daily_impressions " \
"FROM `" + PROJECT_ID + ".bqml.ad_reporting` " \
"GROUP BY date_day;"

SERVING_QUERY = "SELECT string(forecast_timestamp) as forecast_timestamp, " \
"forecast_value, " \
"standard_error, " \
"confidence_level, " \
"prediction_interval_lower_bound, " \
"prediction_interval_upper_bound, " \
"confidence_interval_lower_bound, " \
"confidence_interval_upper_bound " \
"FROM ML.FORECAST(MODEL `" + PROJECT_ID + ".bqml.dbt_ads_airflow_model`,STRUCT(30 AS horizon, 0.8 AS confidence_level));"

def ml_branch(ds,**kwargs):
if 'train' in kwargs['params'] and kwargs['params']['train']:
return 'train_model'
TRAINING_QUERY = (
"CREATE OR REPLACE MODEL bqml.dbt_ads_airflow_model "
"OPTIONS "
"(model_type = 'ARIMA_PLUS', "
"time_series_timestamp_col = 'parsed_date', "
"time_series_data_col = 'daily_impressions', "
"auto_arima = TRUE, "
"data_frequency = 'AUTO_FREQUENCY', "
"decompose_time_series = TRUE "
") AS "
"SELECT "
"timestamp(date_day) as parsed_date, "
"SUM(impressions) as daily_impressions "
"FROM `" + PROJECT_ID + ".bqml.ad_reporting` "
"GROUP BY date_day;"
)

SERVING_QUERY = (
"SELECT string(forecast_timestamp) as forecast_timestamp, "
"forecast_value, "
"standard_error, "
"confidence_level, "
"prediction_interval_lower_bound, "
"prediction_interval_upper_bound, "
"confidence_interval_lower_bound, "
"confidence_interval_upper_bound "
"FROM ML.FORECAST(MODEL `"
+ PROJECT_ID
+ ".bqml.dbt_ads_airflow_model`,STRUCT(30 AS horizon, 0.8 AS confidence_level));"
)


def ml_branch(ds, **kwargs):
if "train" in kwargs["params"] and kwargs["params"]["train"]:
return "train_model"
else:
return 'get_predictions'
return "get_predictions"


default_args = {
"owner": "Airflow",
"start_date": datetime(2021, 4, 6),
}

dag = DAG(
dag_id='example_fivetran_bqml',
dag_id="example_fivetran_bqml",
default_args=default_args,
schedule_interval=timedelta(days=1),
catchup=False,
)

with dag:
linkedin_sync = FivetranOperator(
task_id='linkedin-sync',
fivetran_conn_id='fivetran_default',
task_id="linkedin-sync",
fivetran_conn_id="fivetran_default",
connector_id="{{ var.value.linkedin_connector_id }}",
)

linkedin_sensor = FivetranSensor(
task_id='linkedin-sensor',
fivetran_conn_id='fivetran_default',
task_id="linkedin-sensor",
fivetran_conn_id="fivetran_default",
connector_id="{{ var.value.linkedin_connector_id }}",
poke_interval=5,
)

twitter_sync = FivetranOperator(
task_id='twitter-sync',
fivetran_conn_id='fivetran_default',
task_id="twitter-sync",
fivetran_conn_id="fivetran_default",
connector_id="{{ var.value.twitter_connector_id }}",
)

twitter_sensor = FivetranSensor(
task_id='twitter-sensor',
fivetran_conn_id='fivetran_default',
task_id="twitter-sensor",
fivetran_conn_id="fivetran_default",
connector_id="{{ var.value.twitter_connector_id }}",
poke_interval=5,
)

dbt_run = SSHOperator(
task_id='dbt_ad_reporting',
command='cd dbt_ad_reporting ; ~/.local/bin/dbt run -m +ad_reporting',
ssh_conn_id='dbtvm'
task_id="dbt_ad_reporting",
command="cd dbt_ad_reporting ; ~/.local/bin/dbt run -m +ad_reporting",
ssh_conn_id="dbtvm",
)


ml_branch = BranchPythonOperator(
task_id='ml_branch',
python_callable=ml_branch,
provide_context=True
task_id="ml_branch", python_callable=ml_branch, provide_context=True
)


train_model = BigQueryExecuteQueryOperator(
task_id="train_model",
sql=TRAINING_QUERY,
use_legacy_sql=False
task_id="train_model", sql=TRAINING_QUERY, use_legacy_sql=False
)


get_preds = BigQueryExecuteQueryOperator(
task_id="get_predictions",
sql=SERVING_QUERY,
use_legacy_sql=False,
destination_dataset_table=DATASET_NAME + "." + DESTINATION_TABLE,
write_disposition="WRITE_APPEND"
write_disposition="WRITE_APPEND",
)


print_preds = BigQueryGetDataOperator(
task_id="print_predictions",
dataset_id=DATASET_NAME,
table_id=DESTINATION_TABLE
task_id="print_predictions", dataset_id=DATASET_NAME, table_id=DESTINATION_TABLE
)

linkedin_sync >> linkedin_sensor
twitter_sync >> twitter_sensor

[linkedin_sensor, twitter_sensor] >> dbt_run

dbt_run >> ml_branch >> [train_model, get_preds]
dbt_run >> ml_branch >> [train_model, get_preds]
get_preds >> print_preds
4 changes: 2 additions & 2 deletions fivetran_provider/example_dags/example_fivetran_dbt.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
dag_id="ad_reporting_dag",
default_args=default_args,
schedule_interval=timedelta(days=1),
catchup=False
) as dag:
catchup=False,
) as dag:

linkedin_sync = FivetranOperator(
task_id="linkedin-ads-sync",
Expand Down
19 changes: 10 additions & 9 deletions fivetran_provider/example_dags/example_fivetran_xcom.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import time
import time
from airflow import DAG

from fivetran_provider.operators.fivetran import FivetranOperator
Expand All @@ -11,29 +11,30 @@
default_args = {
"owner": "Airflow",
"start_date": datetime(2021, 4, 6),
"provide_context": True
"provide_context": True,
}

dag = DAG(
dag_id='example_fivetran_xcom',
dag_id="example_fivetran_xcom",
default_args=default_args,
schedule_interval=timedelta(days=1),
catchup=False,
)

with dag:
fivetran_operator = FivetranOperator(
task_id='fivetran-operator',
fivetran_conn_id='fivetran_default',
task_id="fivetran-operator",
fivetran_conn_id="fivetran_default",
connector_id="{{ var.value.connector_id }}",
)

delay_task = PythonOperator(task_id="delay_python_task",
python_callable=lambda: time.sleep(60))
delay_task = PythonOperator(
task_id="delay_python_task", python_callable=lambda: time.sleep(60)
)

fivetran_sensor = FivetranSensor(
task_id='fivetran-sensor',
fivetran_conn_id='fivetran_default',
task_id="fivetran-sensor",
fivetran_conn_id="fivetran_default",
connector_id="{{ var.value.connector_id }}",
poke_interval=5,
xcom="{{ task_instance.xcom_pull('fivetran-operator', key='return_value') }}",
Expand Down
Loading

0 comments on commit 849ba88

Please sign in to comment.