From 8933a01db4d9c926673274aebb41692b2a8fc955 Mon Sep 17 00:00:00 2001 From: Cayo Dias <80067418+cayod@users.noreply.github.com> Date: Mon, 27 Nov 2023 13:29:47 -0300 Subject: [PATCH 1/3] Add dim_dates table to mgi_transform dag (#253) --- dags/mgi_transforms_dag.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/dags/mgi_transforms_dag.py b/dags/mgi_transforms_dag.py index 6b82eb04..0a77028b 100644 --- a/dags/mgi_transforms_dag.py +++ b/dags/mgi_transforms_dag.py @@ -39,6 +39,9 @@ # tasks for dim wallets dim_mgi_wallets = build_dbt_task(dag, "dim_mgi_wallets") +# task for dim dates +dim_dates = build_dbt_task(dag, "dim_dates") + # tasks for network stats enriched_history_mgi_operations = build_dbt_task(dag, "enriched_history_mgi_operations") mgi_network_stats_agg = build_dbt_task(dag, "mgi_network_stats_agg") @@ -52,6 +55,7 @@ >> stg_country_code >> int_mgi_transactions_transformed >> int_mgi_transactions_null_id + >> dim_dates >> fct_mgi_cashflow ) ( From 7e3f88396d55f4c8d59e0adda391f47630412eaf Mon Sep 17 00:00:00 2001 From: lucas zanotelli Date: Mon, 27 Nov 2023 17:48:35 -0300 Subject: [PATCH 2/3] Change `dbt` authentication method (#254) * replace authentication method * remove variables related to old auth method * correct variable name --- airflow_variables_dev.json | 9 ------ airflow_variables_prod.json | 9 ------ dags/stellar_etl_airflow/build_dbt_task.py | 34 ++-------------------- 3 files changed, 2 insertions(+), 50 deletions(-) diff --git a/airflow_variables_dev.json b/airflow_variables_dev.json index d748ea1b..219c6b8f 100644 --- a/airflow_variables_dev.json +++ b/airflow_variables_dev.json @@ -84,11 +84,6 @@ "config_settings": ["last_modified_ledger"], "expiration": ["last_modified_ledger"] }, - "dbt_auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", - "dbt_auth_uri": "https://accounts.google.com/o/oauth2/auth", - "dbt_client_email": "803774498738-compute@developer.gserviceaccount.com", - "dbt_client_id": 101879863915888261202, - "dbt_client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/803774498738-compute%40developer.gserviceaccount.com", "dbt_dataset": "test_sdf", "dbt_full_refresh_models": { "partnership_assets__account_holders_activity_fact": true, @@ -97,15 +92,11 @@ "dbt_image_name": "stellar/stellar-dbt:1e160fe", "dbt_job_execution_timeout_seconds": 300, "dbt_job_retries": 1, - "dbt_keyfile_profile": "", "dbt_mart_dataset": "test_sdf_marts", "dbt_maximum_bytes_billed": 250000000000, - "dbt_private_key": "dummy", - "dbt_private_key_id": "dummy", "dbt_project": "test-hubble-319619", "dbt_target": "dev_airflow", "dbt_threads": 1, - "dbt_token_uri": "https://oauth2.googleapis.com/token", "gcs_exported_data_bucket_name": "us-central1-hubble-1pt5-dev-7db0e004-bucket", "gcs_exported_object_prefix": "dag-exported", "image_name": "stellar/stellar-etl:08fd8de", diff --git a/airflow_variables_prod.json b/airflow_variables_prod.json index 1ca335b1..fda96be8 100644 --- a/airflow_variables_prod.json +++ b/airflow_variables_prod.json @@ -97,11 +97,6 @@ "config_settings": ["last_modified_ledger"], "expiration": ["last_modified_ledger"] }, - "dbt_auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", - "dbt_auth_uri": "https://accounts.google.com/o/oauth2/auth", - "dbt_client_email": "471979297599-compute@developer.gserviceaccount.com", - "dbt_client_id": 102744410146098624313, - "dbt_client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/471979297599-compute%40developer.gserviceaccount.com", "dbt_dataset": "sdf", "dbt_full_refresh_models": { "history_assets": false, @@ -119,15 +114,11 @@ "dbt_image_name": "stellar/stellar-dbt:1e160fe", "dbt_job_execution_timeout_seconds": 6000, "dbt_job_retries": 1, - "dbt_keyfile_profile": "", "dbt_mart_dataset": "sdf_marts", "dbt_maximum_bytes_billed": 100000000000000, - "dbt_private_key": "", - "dbt_private_key_id": "", "dbt_project": "hubble-261722", "dbt_target": "prod", "dbt_threads": 1, - "dbt_token_uri": "https://oauth2.googleapis.com/token", "gcs_exported_data_bucket_name": "us-central1-hubble-2-d948d67b-bucket", "gcs_exported_object_prefix": "dag-exported", "image_name": "stellar/stellar-etl:08fd8de", diff --git a/dags/stellar_etl_airflow/build_dbt_task.py b/dags/stellar_etl_airflow/build_dbt_task.py index 62b9b75e..827f362d 100644 --- a/dags/stellar_etl_airflow/build_dbt_task.py +++ b/dags/stellar_etl_airflow/build_dbt_task.py @@ -19,14 +19,6 @@ def create_dbt_profile(project="prod"): dbt_job_retries = Variable.get("dbt_job_retries") dbt_project = Variable.get("dbt_project") dbt_threads = Variable.get("dbt_threads") - dbt_private_key_id = Variable.get("dbt_private_key_id") - dbt_private_key = Variable.get("dbt_private_key") - dbt_client_email = Variable.get("dbt_client_email") - dbt_client_id = Variable.get("dbt_client_id") - dbt_auth_uri = Variable.get("dbt_auth_uri") - dbt_token_uri = Variable.get("dbt_token_uri") - dbt_auth_provider_x509_cert_url = Variable.get("dbt_auth_provider_x509_cert_url") - dbt_client_x509_cert_url = Variable.get("dbt_client_x509_cert_url") if project == "pub": dbt_project = Variable.get("public_project") dbt_dataset = Variable.get("public_dataset") @@ -41,21 +33,10 @@ def create_dbt_profile(project="prod"): job_execution_timeout_seconds: {dbt_job_execution_timeout_seconds} job_retries: {dbt_job_retries} location: us - method: service-account-json + method: oauth project: "{dbt_project}" threads: {dbt_threads} type: bigquery - keyfile_json: - type: "service_account" - project_id: "{dbt_project}" - private_key_id: "{dbt_private_key_id}" - private_key: "{dbt_private_key}" - client_email: "{dbt_client_email}" - client_id: "{dbt_client_id}" - auth_uri: "{dbt_auth_uri}" - token_uri: "{dbt_token_uri}" - auth_provider_x509_cert_url: "{dbt_auth_provider_x509_cert_url}" - client_x509_cert_url: "{dbt_client_x509_cert_url}" elementary: outputs: default: @@ -64,21 +45,10 @@ def create_dbt_profile(project="prod"): job_execution_timeout_seconds: {dbt_job_execution_timeout_seconds} job_retries: {dbt_job_retries} location: us - method: service-account-json + method: oauth project: "{dbt_project}" threads: {dbt_threads} type: bigquery - keyfile_json: - type: "service_account" - project_id: "{dbt_project}" - private_key_id: "{dbt_private_key_id}" - private_key: "{dbt_private_key}" - client_email: "{dbt_client_email}" - client_id: "{dbt_client_id}" - auth_uri: "{dbt_auth_uri}" - token_uri: "{dbt_token_uri}" - auth_provider_x509_cert_url: "{dbt_auth_provider_x509_cert_url}" - client_x509_cert_url: "{dbt_client_x509_cert_url}" """ create_dbt_profile_cmd = f"echo '{profiles_yml}' > profiles.yml;" From 478dc2c85f2fb90f7f36e5f6e85d251730e2a1e4 Mon Sep 17 00:00:00 2001 From: lucas zanotelli Date: Tue, 28 Nov 2023 17:15:39 -0300 Subject: [PATCH 3/3] Optimized DAGs parse time (#258) * replace `Variable.get()` when field is templatable * streamline imports * remove `Variable.get()` dependency from non templatable field * only set `config_file` when in default namespace --- dags/asset_pricing_pipeline_dag.py | 4 +- dags/audit_log_dag.py | 8 +-- dags/bucket_list_dag.py | 60 +++++++++------- dags/daily_euro_ohlc_dag.py | 8 +-- dags/dataset_reset_dag.py | 12 ++-- dags/enriched_tables_dag.py | 5 +- dags/history_archive_with_captive_core_dag.py | 63 +++++++++------- ...istory_archive_without_captive_core_dag.py | 42 ++++++----- dags/ledger_current_state_dag.py | 4 +- dags/marts_tables_dag.py | 5 +- dags/mgi_transforms_dag.py | 5 +- dags/partner_pipeline_dag.py | 21 ++---- dags/partnership_assets_dag.py | 5 +- dags/public_marts_tables_dag.py | 6 +- dags/sandbox_create_dag.py | 8 +-- dags/sandbox_update_dag.py | 8 +-- dags/state_table_dag.py | 72 +++++++++++-------- .../build_apply_gcs_changes_to_bq_task.py | 12 ++-- .../build_bq_insert_job_task.py | 7 +- .../build_cross_dependency_task.py | 4 +- dags/stellar_etl_airflow/build_dbt_task.py | 49 +++++++------ .../build_delete_data_task.py | 8 +-- dags/stellar_etl_airflow/build_export_task.py | 22 +++--- .../build_gcs_to_bq_task.py | 12 +--- dags/stellar_etl_airflow/build_time_task.py | 22 +++--- 25 files changed, 239 insertions(+), 233 deletions(-) diff --git a/dags/asset_pricing_pipeline_dag.py b/dags/asset_pricing_pipeline_dag.py index 7606d253..231281f5 100644 --- a/dags/asset_pricing_pipeline_dag.py +++ b/dags/asset_pricing_pipeline_dag.py @@ -1,5 +1,5 @@ import datetime -import json +from json import loads from airflow import DAG from airflow.models.variable import Variable @@ -16,7 +16,7 @@ description="This DAG runs dbt to calculate asset pricing based on stablecoin and XLM trades", schedule_interval="0 2 * * *", # daily at 2am params={}, - user_defined_filters={"fromjson": lambda s: json.loads(s)}, + user_defined_filters={"fromjson": lambda s: loads(s)}, user_defined_macros={ "subtract_data_interval": macros.subtract_data_interval, "batch_run_date_as_datetime_string": macros.batch_run_date_as_datetime_string, diff --git a/dags/audit_log_dag.py b/dags/audit_log_dag.py index fa6f058f..ffa75a60 100644 --- a/dags/audit_log_dag.py +++ b/dags/audit_log_dag.py @@ -1,8 +1,8 @@ """ This DAG runs an audit log SQL to update the audit log dashboard. """ -import datetime -import json +from datetime import datetime +from json import loads from airflow import DAG from airflow.models import Variable @@ -16,12 +16,12 @@ "audit_log_dag", default_args=get_default_dag_args(), description="This DAG runs periodically to update the audit log dashboard.", - start_date=datetime.datetime(2023, 1, 1, 0, 0), + start_date=datetime(2023, 1, 1, 0, 0), schedule_interval="10 9 * * *", params={ "alias": "audit-log", }, - user_defined_filters={"fromjson": lambda s: json.loads(s)}, + user_defined_filters={"fromjson": lambda s: loads(s)}, catchup=False, ) diff --git a/dags/bucket_list_dag.py b/dags/bucket_list_dag.py index 2ccfc08a..d2b3f6a6 100644 --- a/dags/bucket_list_dag.py +++ b/dags/bucket_list_dag.py @@ -4,9 +4,9 @@ to stop exporting. This end ledger is determined by when the Airflow DAG is run. This DAG should be triggered manually when initializing the tables in order to catch up to the current state in the network, but should not be scheduled to run constantly. """ -import ast -import datetime -import json +from ast import literal_eval +from datetime import datetime +from json import loads from airflow import DAG from airflow.models import Variable @@ -23,29 +23,31 @@ dag = DAG( "bucket_list_export", default_args=get_default_dag_args(), - start_date=datetime.datetime(2021, 10, 15), - end_date=datetime.datetime(2021, 10, 15), + start_date=datetime(2021, 10, 15), + end_date=datetime(2021, 10, 15), description="This DAG loads a point forward view of state tables. Caution: Does not capture historical changes!", schedule_interval="@daily", params={ "alias": "bucket", }, - user_defined_filters={"fromjson": lambda s: json.loads(s)}, + user_defined_filters={ + "fromjson": lambda s: loads(s), + "literal_eval": lambda e: literal_eval(e), + }, user_defined_macros={ "subtract_data_interval": macros.subtract_data_interval, "batch_run_date_as_datetime_string": macros.batch_run_date_as_datetime_string, }, ) -file_names = Variable.get("output_file_names", deserialize_json=True) table_names = Variable.get("table_ids", deserialize_json=True) -internal_project = Variable.get("bq_project") -internal_dataset = Variable.get("bq_dataset") -public_project = Variable.get("public_project") -public_dataset = Variable.get("public_dataset") -use_testnet = ast.literal_eval(Variable.get("use_testnet")) -use_futurenet = ast.literal_eval(Variable.get("use_futurenet")) - +internal_project = "{{ var.value.bq_project }}" +internal_dataset = "{{ var.value.bq_dataset }}" +public_project = "{{ var.value.public_project }}" +public_dataset = "{{ var.value.public_dataset }}" +public_dataset_new = "{{ var.value.public_dataset_new }}" +use_testnet = "{{ var.value.use_testnet | literal_eval }}" +use_futurenet = "{{ var.value.use_futurenet | literal_eval }}" """ The time task reads in the execution time of the current run, as well as the next execution time. It converts these two times into ledger ranges. @@ -60,7 +62,7 @@ dag, "bucket", "export_accounts", - file_names["accounts"], + "{{ var.json.output_file_names.accounts }}", use_testnet=use_testnet, use_futurenet=use_futurenet, use_gcs=True, @@ -69,7 +71,7 @@ dag, "bucket", "export_claimable_balances", - file_names["claimable_balances"], + "{{ var.json.output_file_names.claimable_balances }}", use_testnet=use_testnet, use_futurenet=use_futurenet, use_gcs=True, @@ -78,7 +80,7 @@ dag, "bucket", "export_offers", - file_names["offers"], + "{{ var.json.output_file_names.offers }}", use_testnet=use_testnet, use_futurenet=use_futurenet, use_gcs=True, @@ -87,7 +89,7 @@ dag, "bucket", "export_pools", - file_names["liquidity_pools"], + "{{ var.json.output_file_names.liquidity_pools }}", use_testnet=use_testnet, use_futurenet=use_futurenet, use_gcs=True, @@ -96,7 +98,7 @@ dag, "bucket", "export_signers", - file_names["signers"], + "{{ var.json.output_file_names.signers }}", use_testnet=use_testnet, use_futurenet=use_futurenet, use_gcs=True, @@ -105,7 +107,7 @@ dag, "bucket", "export_trustlines", - file_names["trustlines"], + "{{ var.json.output_file_names.trustlines }}", use_testnet=use_testnet, use_futurenet=use_futurenet, use_gcs=True, @@ -131,37 +133,37 @@ dag, internal_project, internal_dataset, table_names["accounts"] ) delete_acc_pub_task = build_delete_data_task( - dag, public_project, public_dataset, table_names["accounts"] + dag, public_project, public_dataset, table_names["accounts"], "pub" ) delete_bal_task = build_delete_data_task( dag, internal_project, internal_dataset, table_names["claimable_balances"] ) delete_bal_pub_task = build_delete_data_task( - dag, public_project, public_dataset, table_names["claimable_balances"] + dag, public_project, public_dataset, table_names["claimable_balances"], "pub" ) delete_off_task = build_delete_data_task( dag, internal_project, internal_dataset, table_names["offers"] ) delete_off_pub_task = build_delete_data_task( - dag, public_project, public_dataset, table_names["offers"] + dag, public_project, public_dataset, table_names["offers"], "pub" ) delete_pool_task = build_delete_data_task( dag, internal_project, internal_dataset, table_names["liquidity_pools"] ) delete_pool_pub_task = build_delete_data_task( - dag, public_project, public_dataset, table_names["liquidity_pools"] + dag, public_project, public_dataset, table_names["liquidity_pools"], "pub" ) delete_sign_task = build_delete_data_task( dag, internal_project, internal_dataset, table_names["signers"] ) delete_sign_pub_task = build_delete_data_task( - dag, public_project, public_dataset, table_names["signers"] + dag, public_project, public_dataset, table_names["signers"], "pub" ) delete_trust_task = build_delete_data_task( dag, internal_project, internal_dataset, table_names["trustlines"] ) delete_trust_pub_task = build_delete_data_task( - dag, public_project, public_dataset, table_names["trustlines"] + dag, public_project, public_dataset, table_names["trustlines"], "pub" ) """ @@ -244,6 +246,7 @@ "", partition=True, cluster=True, + dataset_type="pub", ) send_bal_to_pub_task = build_gcs_to_bq_task( dag, @@ -254,6 +257,7 @@ "", partition=True, cluster=True, + dataset_type="pub", ) send_off_to_pub_task = build_gcs_to_bq_task( dag, @@ -264,6 +268,7 @@ "", partition=True, cluster=True, + dataset_type="pub", ) send_pool_to_pub_task = build_gcs_to_bq_task( dag, @@ -274,6 +279,7 @@ "", partition=True, cluster=True, + dataset_type="pub", ) send_sign_to_pub_task = build_gcs_to_bq_task( dag, @@ -284,6 +290,7 @@ "", partition=True, cluster=True, + dataset_type="pub", ) send_trust_to_pub_task = build_gcs_to_bq_task( dag, @@ -294,6 +301,7 @@ "", partition=True, cluster=True, + dataset_type="pub", ) ( diff --git a/dags/daily_euro_ohlc_dag.py b/dags/daily_euro_ohlc_dag.py index 369e70de..20213934 100644 --- a/dags/daily_euro_ohlc_dag.py +++ b/dags/daily_euro_ohlc_dag.py @@ -2,8 +2,8 @@ The daily_euro_ohlc_dag DAG updates the currency table in Bigquey every day. """ -import datetime -import json +from datetime import datetime +from json import loads from airflow import DAG from airflow.decorators import dag @@ -18,13 +18,13 @@ with DAG( dag_id="daily_euro_ohlc_dag", - start_date=datetime.datetime(2023, 1, 1, 0, 0), + start_date=datetime(2023, 1, 1, 0, 0), description="This DAG updates the currency tables in Bigquey every day", schedule_interval="35 0 * * *", params={ "alias": "euro", }, - user_defined_filters={"fromjson": lambda s: json.loads(s)}, + user_defined_filters={"fromjson": lambda s: loads(s)}, catchup=False, ) as dag: currency_ohlc = Variable.get("currency_ohlc", deserialize_json=True) diff --git a/dags/dataset_reset_dag.py b/dags/dataset_reset_dag.py index e368f47e..8e1e2fcc 100644 --- a/dags/dataset_reset_dag.py +++ b/dags/dataset_reset_dag.py @@ -1,9 +1,9 @@ """ When the Test net server is reset, the dataset reset DAG deletes all the datasets in the test Hubble. """ -import ast -import datetime -import json +from ast import literal_eval +from datetime import datetime +from json import loads from airflow import DAG from airflow.models import Variable @@ -20,13 +20,13 @@ "testnet_data_reset", default_args=get_default_dag_args(), description="This DAG runs after the Testnet data reset that occurs periodically.", - start_date=datetime.datetime(2023, 1, 1, 0, 0), + start_date=datetime(2023, 1, 1, 0, 0), schedule_interval="10 9 * * *", - is_paused_upon_creation=ast.literal_eval(Variable.get("use_testnet")), + is_paused_upon_creation=literal_eval(Variable.get("use_testnet")), params={ "alias": "testnet-reset", }, - user_defined_filters={"fromjson": lambda s: json.loads(s)}, + user_defined_filters={"fromjson": lambda s: loads(s)}, ) internal_project = "test-hubble-319619" diff --git a/dags/enriched_tables_dag.py b/dags/enriched_tables_dag.py index 2ede0783..f0f25c41 100644 --- a/dags/enriched_tables_dag.py +++ b/dags/enriched_tables_dag.py @@ -1,7 +1,6 @@ -import datetime +from datetime import datetime from airflow import DAG -from airflow.models.variable import Variable from stellar_etl_airflow.build_cross_dependency_task import build_cross_deps from stellar_etl_airflow.build_dbt_task import build_dbt_task from stellar_etl_airflow.default import get_default_dag_args, init_sentry @@ -11,7 +10,7 @@ dag = DAG( "enriched_tables", default_args=get_default_dag_args(), - start_date=datetime.datetime(2023, 4, 12, 0, 0), + start_date=datetime(2023, 4, 12, 0, 0), description="This DAG runs dbt to create the tables for the models in marts/enriched/.", schedule_interval="*/30 * * * *", # Runs every 30 mins params={}, diff --git a/dags/history_archive_with_captive_core_dag.py b/dags/history_archive_with_captive_core_dag.py index f337dbe1..c068097d 100644 --- a/dags/history_archive_with_captive_core_dag.py +++ b/dags/history_archive_with_captive_core_dag.py @@ -2,9 +2,9 @@ The history_archive_export DAG exports operations and trades from the history archives. It is scheduled to export information to BigQuery at regular intervals. """ -import ast -import datetime -import json +from ast import literal_eval +from datetime import datetime +from json import loads from airflow import DAG from airflow.models.variable import Variable @@ -23,29 +23,31 @@ dag = DAG( "history_archive_with_captive_core", default_args=get_default_dag_args(), - start_date=datetime.datetime(2023, 9, 20, 15, 0), + start_date=datetime(2023, 9, 20, 15, 0), catchup=True, description="This DAG exports trades and operations from the history archive using CaptiveCore. This supports parsing sponsorship and AMMs.", schedule_interval="*/30 * * * *", params={ "alias": "cc", }, - user_defined_filters={"fromjson": lambda s: json.loads(s)}, + user_defined_filters={ + "fromjson": lambda s: loads(s), + "literal_eval": lambda e: literal_eval(e), + }, user_defined_macros={ "subtract_data_interval": macros.subtract_data_interval, "batch_run_date_as_datetime_string": macros.batch_run_date_as_datetime_string, }, ) -file_names = Variable.get("output_file_names", deserialize_json=True) table_names = Variable.get("table_ids", deserialize_json=True) -internal_project = Variable.get("bq_project") -internal_dataset = Variable.get("bq_dataset") -public_project = Variable.get("public_project") -public_dataset = Variable.get("public_dataset") -public_dataset_new = Variable.get("public_dataset_new") -use_testnet = ast.literal_eval(Variable.get("use_testnet")) -use_futurenet = ast.literal_eval(Variable.get("use_futurenet")) +internal_project = "{{ var.value.bq_project }}" +internal_dataset = "{{ var.value.bq_dataset }}" +public_project = "{{ var.value.public_project }}" +public_dataset = "{{ var.value.public_dataset }}" +public_dataset_new = "{{ var.value.public_dataset_new }}" +use_testnet = "{{ var.value.use_testnet | literal_eval }}" +use_futurenet = "{{ var.value.use_futurenet | literal_eval }}" """ The time task reads in the execution time of the current run, as well as the next @@ -77,7 +79,7 @@ dag, "archive", "export_operations", - file_names["operations"], + "{{ var.json.output_file_names.operations }}", use_testnet=use_testnet, use_futurenet=use_futurenet, use_gcs=True, @@ -87,7 +89,7 @@ dag, "archive", "export_trades", - file_names["trades"], + "{{ var.json.output_file_names.trades }}", use_testnet=use_testnet, use_futurenet=use_futurenet, use_gcs=True, @@ -107,7 +109,7 @@ dag, "archive", "export_transactions", - file_names["transactions"], + "{{ var.json.output_file_names.transactions }}", use_testnet=use_testnet, use_futurenet=use_futurenet, use_gcs=True, @@ -117,7 +119,7 @@ dag, "archive", "export_diagnostic_events", - file_names["diagnostic_events"], + "{{ var.json.output_file_names.diagnostic_events }}", use_testnet=use_testnet, use_futurenet=use_futurenet, use_gcs=True, @@ -132,28 +134,28 @@ dag, internal_project, internal_dataset, table_names["operations"] ) delete_old_op_pub_task = build_delete_data_task( - dag, public_project, public_dataset, table_names["operations"] + dag, public_project, public_dataset, table_names["operations"], "pub" ) delete_old_op_pub_new_task = build_delete_data_task( - dag, public_project, public_dataset_new, table_names["operations"] + dag, public_project, public_dataset_new, table_names["operations"], "pub_new" ) delete_old_trade_task = build_delete_data_task( dag, internal_project, internal_dataset, table_names["trades"] ) delete_old_trade_pub_task = build_delete_data_task( - dag, public_project, public_dataset, table_names["trades"] + dag, public_project, public_dataset, table_names["trades"], "pub" ) delete_old_trade_pub_new_task = build_delete_data_task( - dag, public_project, public_dataset_new, table_names["trades"] + dag, public_project, public_dataset_new, table_names["trades"], "pub_new" ) delete_enrich_op_task = build_delete_data_task( dag, internal_project, internal_dataset, "enriched_history_operations" ) delete_enrich_op_pub_task = build_delete_data_task( - dag, public_project, public_dataset, "enriched_history_operations" + dag, public_project, public_dataset, "enriched_history_operations", "pub" ) delete_enrich_op_pub_new_task = build_delete_data_task( - dag, public_project, public_dataset_new, "enriched_history_operations" + dag, public_project, public_dataset_new, "enriched_history_operations", "pub_new" ) delete_enrich_ma_op_task = build_delete_data_task( dag, internal_project, internal_dataset, "enriched_meaningful_history_operations" @@ -162,16 +164,16 @@ dag, internal_project, internal_dataset, table_names["effects"] ) delete_old_effects_pub_new_task = build_delete_data_task( - dag, public_project, public_dataset_new, table_names["effects"] + dag, public_project, public_dataset_new, table_names["effects"], "pub_new" ) delete_old_tx_task = build_delete_data_task( dag, internal_project, internal_dataset, table_names["transactions"] ) delete_old_tx_pub_task = build_delete_data_task( - dag, public_project, public_dataset, table_names["transactions"] + dag, public_project, public_dataset, table_names["transactions"], "pub" ) delete_old_tx_pub_new_task = build_delete_data_task( - dag, public_project, public_dataset_new, table_names["transactions"] + dag, public_project, public_dataset_new, table_names["transactions"], "pub_new" ) """ @@ -232,6 +234,7 @@ "", partition=True, cluster=True, + dataset_type="pub", ) send_trades_to_pub_task = build_gcs_to_bq_task( dag, @@ -242,6 +245,7 @@ "", partition=True, cluster=True, + dataset_type="pub", ) send_txs_to_pub_task = build_gcs_to_bq_task( dag, @@ -252,6 +256,7 @@ "", partition=True, cluster=True, + dataset_type="pub", ) """ @@ -266,6 +271,7 @@ "", partition=True, cluster=True, + dataset_type="pub_new", ) send_trades_to_pub_new_task = build_gcs_to_bq_task( dag, @@ -276,6 +282,7 @@ "", partition=True, cluster=True, + dataset_type="pub_new", ) send_effects_to_pub_new_task = build_gcs_to_bq_task( dag, @@ -286,6 +293,7 @@ "", partition=True, cluster=True, + dataset_type="pub_new", ) send_txs_to_pub_new_task = build_gcs_to_bq_task( dag, @@ -296,6 +304,7 @@ "", partition=True, cluster=True, + dataset_type="pub_new", ) """ @@ -321,6 +330,7 @@ "enriched_history_operations", partition=True, cluster=True, + dataset_type="pub", ) insert_enriched_hist_pub_new_task = build_bq_insert_job( dag, @@ -329,6 +339,7 @@ "enriched_history_operations", partition=True, cluster=True, + dataset_type="pub_new", ) insert_enriched_ma_hist_task = build_bq_insert_job( dag, diff --git a/dags/history_archive_without_captive_core_dag.py b/dags/history_archive_without_captive_core_dag.py index 68d15eee..d33c1abd 100644 --- a/dags/history_archive_without_captive_core_dag.py +++ b/dags/history_archive_without_captive_core_dag.py @@ -2,9 +2,9 @@ The history_archive_export DAG exports ledgers and transactions from the history archives. It is scheduled to export information to BigQuery at regular intervals. """ -import ast -import datetime -import json +from ast import literal_eval +from datetime import datetime +from json import loads from airflow import DAG from airflow.models import Variable @@ -22,29 +22,31 @@ dag = DAG( "history_archive_without_captive_core", default_args=get_default_dag_args(), - start_date=datetime.datetime(2023, 9, 20, 15, 0), + start_date=datetime(2023, 9, 20, 15, 0), catchup=True, description="This DAG exports ledgers, transactions, and assets from the history archive to BigQuery. Incremental Loads", schedule_interval="*/15 * * * *", params={ "alias": "archive", }, - user_defined_filters={"fromjson": lambda s: json.loads(s)}, + user_defined_filters={ + "fromjson": lambda s: loads(s), + "literal_eval": lambda e: literal_eval(e), + }, user_defined_macros={ "subtract_data_interval": macros.subtract_data_interval, "batch_run_date_as_datetime_string": macros.batch_run_date_as_datetime_string, }, ) -file_names = Variable.get("output_file_names", deserialize_json=True) table_names = Variable.get("table_ids", deserialize_json=True) -internal_project = Variable.get("bq_project") -internal_dataset = Variable.get("bq_dataset") -public_project = Variable.get("public_project") -public_dataset = Variable.get("public_dataset") -public_dataset_new = Variable.get("public_dataset_new") -use_testnet = ast.literal_eval(Variable.get("use_testnet")) -use_futurenet = ast.literal_eval(Variable.get("use_futurenet")) +internal_project = "{{ var.value.bq_project }}" +internal_dataset = "{{ var.value.bq_dataset }}" +public_project = "{{ var.value.public_project }}" +public_dataset = "{{ var.value.public_dataset }}" +public_dataset_new = "{{ var.value.public_dataset_new }}" +use_testnet = "{{ var.value.use_testnet | literal_eval }}" +use_futurenet = "{{ var.value.use_futurenet | literal_eval }}" """ The time task reads in the execution time of the current run, as well as the next @@ -73,7 +75,7 @@ dag, "archive", "export_ledgers", - file_names["ledgers"], + "{{ var.json.output_file_names.ledgers }}", use_testnet=use_testnet, use_futurenet=use_futurenet, use_gcs=True, @@ -82,7 +84,7 @@ dag, "archive", "export_assets", - file_names["assets"], + "{{ var.json.output_file_names.assets }}", use_testnet=use_testnet, use_futurenet=use_futurenet, use_gcs=True, @@ -96,16 +98,16 @@ dag, internal_project, internal_dataset, table_names["ledgers"] ) delete_old_ledger_pub_task = build_delete_data_task( - dag, public_project, public_dataset, table_names["ledgers"] + dag, public_project, public_dataset, table_names["ledgers"], "pub" ) delete_old_ledger_pub_new_task = build_delete_data_task( - dag, public_project, public_dataset_new, table_names["ledgers"] + dag, public_project, public_dataset_new, table_names["ledgers"], "pub_new" ) delete_old_asset_task = build_delete_data_task( dag, internal_project, internal_dataset, table_names["assets"] ) delete_old_asset_pub_new_task = build_delete_data_task( - dag, public_project, public_dataset_new, table_names["assets"] + dag, public_project, public_dataset_new, table_names["assets"], "pub_new" ) """ @@ -146,6 +148,7 @@ "", partition=True, cluster=True, + dataset_type="pub_new", ) send_ledgers_to_pub_task = build_gcs_to_bq_task( dag, @@ -156,6 +159,7 @@ "", partition=True, cluster=True, + dataset_type="pub", ) send_assets_to_pub_new_task = build_gcs_to_bq_task( dag, @@ -166,6 +170,7 @@ "", partition=True, cluster=True, + dataset_type="pub_new", ) """ @@ -189,6 +194,7 @@ partition=True, cluster=True, create=True, + dataset_type="pub_new", ) ( diff --git a/dags/ledger_current_state_dag.py b/dags/ledger_current_state_dag.py index fe9bf0db..d2be158e 100644 --- a/dags/ledger_current_state_dag.py +++ b/dags/ledger_current_state_dag.py @@ -1,4 +1,4 @@ -import datetime +from datetime import datetime from airflow import DAG from airflow.models.variable import Variable @@ -11,7 +11,7 @@ dag = DAG( "ledger_current_state", default_args=get_default_dag_args(), - start_date=datetime.datetime(2023, 4, 4, 0, 0), + start_date=datetime(2023, 4, 4, 0, 0), description="This DAG runs dbt to create the tables for the models in marts/ledger_current_state/", schedule_interval="0 */1 * * *", # Runs hourly; NOTE: This can be changed to daily if execution time is too slow params={}, diff --git a/dags/marts_tables_dag.py b/dags/marts_tables_dag.py index 1d99d2a4..552631b3 100644 --- a/dags/marts_tables_dag.py +++ b/dags/marts_tables_dag.py @@ -1,7 +1,6 @@ -import datetime +from datetime import datetime from airflow import DAG -from airflow.models.variable import Variable from stellar_etl_airflow.build_cross_dependency_task import build_cross_deps from stellar_etl_airflow.build_dbt_task import build_dbt_task from stellar_etl_airflow.default import get_default_dag_args, init_sentry @@ -11,7 +10,7 @@ dag = DAG( "marts_tables", default_args=get_default_dag_args(), - start_date=datetime.datetime(2015, 9, 30), + start_date=datetime(2015, 9, 30), description="This DAG runs dbt to create the tables for the models in marts/ but not any marts subdirectories.", schedule_interval="0 17 * * *", # Daily 11 AM UTC params={}, diff --git a/dags/mgi_transforms_dag.py b/dags/mgi_transforms_dag.py index 0a77028b..ed536966 100644 --- a/dags/mgi_transforms_dag.py +++ b/dags/mgi_transforms_dag.py @@ -1,7 +1,6 @@ -import datetime +from datetime import datetime from airflow import DAG -from airflow.models.variable import Variable from stellar_etl_airflow.build_cross_dependency_task import build_cross_deps from stellar_etl_airflow.build_dbt_task import build_dbt_task from stellar_etl_airflow.default import get_default_dag_args, init_sentry @@ -10,7 +9,7 @@ dag = DAG( "mgi_transforms", default_args=get_default_dag_args(), - start_date=datetime.datetime(2023, 5, 22, 0, 0), + start_date=datetime(2023, 5, 22, 0, 0), description="This DAG runs dbt to create the mgi cash in and cash out fact and dimension tables.", schedule_interval="30 15 * * *", # Daily 15:30 UTC after MGI pipeline params={}, diff --git a/dags/partner_pipeline_dag.py b/dags/partner_pipeline_dag.py index df47ecf9..6f88ae0a 100644 --- a/dags/partner_pipeline_dag.py +++ b/dags/partner_pipeline_dag.py @@ -1,10 +1,4 @@ -""" -The partner_pipeline_dag DAG updates the partners table in Bigquey every day. -""" - -import datetime -import json -from datetime import timedelta +from datetime import datetime, timedelta from airflow import DAG from airflow.models import Variable @@ -27,21 +21,20 @@ with DAG( "partner_pipeline_dag", default_args=get_default_dag_args(), - start_date=datetime.datetime(2023, 1, 1, 0, 0), - description="This DAG updates the partner tables in Bigquey every day", + start_date=datetime(2023, 1, 1, 0, 0), + description="This DAG automates daily updates to partner tables in BigQuery.", schedule_interval="20 15 * * *", params={ "alias": "partner", }, render_template_as_native_obj=True, - user_defined_filters={"fromjson": lambda s: json.loads(s)}, catchup=False, ) as dag: - PROJECT = Variable.get("bq_project") - DATASET = Variable.get("bq_dataset") - BUCKET_NAME = Variable.get("partners_bucket") + PROJECT = "{{ var.value.bq_project }}" + DATASET = "{{ var.value.bq_dataset }}" + BUCKET_NAME = "{{ var.value.partners_bucket }}" PARTNERS = Variable.get("partners_data", deserialize_json=True) - TODAY = "{{ next_ds_nodash }}" + TODAY = "{{ data_interval_end | ds }}" start_tables_task = EmptyOperator(task_id="start_update_task") diff --git a/dags/partnership_assets_dag.py b/dags/partnership_assets_dag.py index 4b8f515a..8712f48a 100644 --- a/dags/partnership_assets_dag.py +++ b/dags/partnership_assets_dag.py @@ -1,7 +1,6 @@ -import datetime +from datetime import datetime from airflow import DAG -from airflow.models.variable import Variable from stellar_etl_airflow.build_dbt_task import build_dbt_task from stellar_etl_airflow.default import get_default_dag_args, init_sentry @@ -10,7 +9,7 @@ dag = DAG( "partnership_assets", default_args=get_default_dag_args(), - start_date=datetime.datetime(2022, 4, 1, 0, 0), + start_date=datetime(2022, 4, 1, 0, 0), description="This DAG runs dbt to create the partnership asset intermediate tables and aggregate tables.", schedule_interval="30 16 * * *", # Daily 16:30 UTC, after cloud function params={}, diff --git a/dags/public_marts_tables_dag.py b/dags/public_marts_tables_dag.py index 5e16d4fe..848f12ef 100644 --- a/dags/public_marts_tables_dag.py +++ b/dags/public_marts_tables_dag.py @@ -1,8 +1,6 @@ -import datetime +from datetime import datetime from airflow import DAG -from airflow.models.variable import Variable -from airflow.operators.dummy import DummyOperator from stellar_etl_airflow.build_dbt_task import build_dbt_task from stellar_etl_airflow.default import get_default_dag_args, init_sentry @@ -11,7 +9,7 @@ dag = DAG( "public_marts_tables", default_args=get_default_dag_args(), - start_date=datetime.datetime(2023, 4, 4, 0, 0), + start_date=datetime(2023, 4, 4, 0, 0), description="This DAG runs public dbt to create the tables for the models in marts/ but not any marts subdirectories.", schedule_interval="0 11 * * *", # Daily 11 AM UTC params={}, diff --git a/dags/sandbox_create_dag.py b/dags/sandbox_create_dag.py index 735ded9a..7d5bdd5f 100644 --- a/dags/sandbox_create_dag.py +++ b/dags/sandbox_create_dag.py @@ -1,8 +1,8 @@ """ This DAG creates the sandbox dataset with transactions tables, state tables with history and views. """ -import datetime -import json +from datetime import datetime +from json import loads from airflow import DAG from airflow.models.variable import Variable @@ -23,12 +23,12 @@ with DAG( "sandbox_create_dag", default_args=get_default_dag_args(), - start_date=datetime.datetime(2023, 1, 1), + start_date=datetime(2023, 1, 1), description="This DAG creates a sandbox", schedule_interval="@once", params={"alias": "sandbox_dataset"}, user_defined_filters={ - "fromjson": lambda s: json.loads(s), + "fromjson": lambda s: loads(s), }, catchup=False, ) as dag: diff --git a/dags/sandbox_update_dag.py b/dags/sandbox_update_dag.py index 5cdfe56d..4fb62c9e 100644 --- a/dags/sandbox_update_dag.py +++ b/dags/sandbox_update_dag.py @@ -1,8 +1,8 @@ """ This DAG update the Canvas sandbox dataset with transactions tables, state tables with history once a month. """ -import datetime -import json +from datetime import datetime +from json import loads from airflow import DAG from airflow.models.variable import Variable @@ -23,11 +23,11 @@ with DAG( "sandbox_update_dag", default_args=get_default_dag_args(), - start_date=datetime.datetime(2023, 1, 1), + start_date=datetime(2023, 1, 1), description="This DAG updates a sandbox", schedule_interval="@daily", params={"alias": "sandbox_dataset"}, - user_defined_filters={"fromjson": lambda s: json.loads(s)}, + user_defined_filters={"fromjson": lambda s: loads(s)}, catchup=False, ) as dag: TABLES_ID = Variable.get("table_ids", deserialize_json=True) diff --git a/dags/state_table_dag.py b/dags/state_table_dag.py index 62f83ea5..7cdf5406 100644 --- a/dags/state_table_dag.py +++ b/dags/state_table_dag.py @@ -2,10 +2,9 @@ The state_table_export DAG exports ledger entry changes (accounts, offers, and trustlines) within a bounded range using stellar-core. This DAG should be triggered manually if it is required to export entry changes within a specified time range. """ -import ast -import datetime -import json -import logging +from ast import literal_eval +from datetime import datetime +from json import loads from airflow import DAG from airflow.models import Variable @@ -19,20 +18,19 @@ init_sentry() -logging.basicConfig(format="%(message)s") -logger = logging.getLogger("airflow.task") -logger.setLevel(logging.INFO) - dag = DAG( "state_table_export", default_args=get_default_dag_args(), - start_date=datetime.datetime(2023, 9, 20, 15, 0), + start_date=datetime(2023, 9, 20, 15, 0), description="This DAG runs a bounded stellar-core instance, which allows it to export accounts, offers, liquidity pools, and trustlines to BigQuery.", schedule_interval="*/30 * * * *", params={ "alias": "state", }, - user_defined_filters={"fromjson": lambda s: json.loads(s)}, + user_defined_filters={ + "fromjson": lambda s: loads(s), + "literal_eval": lambda e: literal_eval(e), + }, user_defined_macros={ "subtract_data_interval": macros.subtract_data_interval, "batch_run_date_as_datetime_string": macros.batch_run_date_as_datetime_string, @@ -40,22 +38,21 @@ catchup=True, ) -file_names = Variable.get("output_file_names", deserialize_json=True) table_names = Variable.get("table_ids", deserialize_json=True) -internal_project = Variable.get("bq_project") -internal_dataset = Variable.get("bq_dataset") -public_project = Variable.get("public_project") -public_dataset = Variable.get("public_dataset") -public_dataset_new = Variable.get("public_dataset_new") -use_testnet = ast.literal_eval(Variable.get("use_testnet")) -use_futurenet = ast.literal_eval(Variable.get("use_futurenet")) +internal_project = "{{ var.value.bq_project }}" +internal_dataset = "{{ var.value.bq_dataset }}" +public_project = "{{ var.value.public_project }}" +public_dataset = "{{ var.value.public_dataset }}" +public_dataset_new = "{{ var.value.public_dataset_new }}" +use_testnet = "{{ var.value.use_testnet | literal_eval }}" +use_futurenet = "{{ var.value.use_futurenet | literal_eval }}" date_task = build_time_task(dag, use_testnet=use_testnet, use_futurenet=use_futurenet) changes_task = build_export_task( dag, "bounded-core", "export_ledger_entry_changes", - file_names["changes"], + "{{ var.json.output_file_names.changes }}", use_testnet=use_testnet, use_futurenet=use_futurenet, use_gcs=True, @@ -86,52 +83,56 @@ dag, internal_project, internal_dataset, table_names["accounts"] ) delete_acc_pub_new_task = build_delete_data_task( - dag, public_project, public_dataset_new, table_names["accounts"] + dag, public_project, public_dataset_new, table_names["accounts"], "pub_new" ) delete_bal_task = build_delete_data_task( dag, internal_project, internal_dataset, table_names["claimable_balances"] ) delete_bal_pub_task = build_delete_data_task( - dag, public_project, public_dataset, table_names["claimable_balances"] + dag, public_project, public_dataset, table_names["claimable_balances"], "pub" ) delete_bal_pub_new_task = build_delete_data_task( - dag, public_project, public_dataset_new, table_names["claimable_balances"] + dag, + public_project, + public_dataset_new, + table_names["claimable_balances"], + "pub_new", ) delete_off_task = build_delete_data_task( dag, internal_project, internal_dataset, table_names["offers"] ) delete_off_pub_new_task = build_delete_data_task( - dag, public_project, public_dataset_new, table_names["offers"] + dag, public_project, public_dataset_new, table_names["offers"], "pub_new" ) delete_pool_task = build_delete_data_task( dag, internal_project, internal_dataset, table_names["liquidity_pools"] ) delete_pool_pub_new_task = build_delete_data_task( - dag, public_project, public_dataset_new, table_names["liquidity_pools"] + dag, public_project, public_dataset_new, table_names["liquidity_pools"], "pub_new" ) delete_sign_task = build_delete_data_task( dag, internal_project, internal_dataset, table_names["signers"] ) delete_sign_pub_new_task = build_delete_data_task( - dag, public_project, public_dataset_new, table_names["signers"] + dag, public_project, public_dataset_new, table_names["signers"], "pub_new" ) delete_trust_task = build_delete_data_task( dag, internal_project, internal_dataset, table_names["trustlines"] ) delete_trust_pub_new_task = build_delete_data_task( - dag, public_project, public_dataset_new, table_names["trustlines"] + dag, public_project, public_dataset_new, table_names["trustlines"], "pub_new" ) delete_contract_data_task = build_delete_data_task( - dag, public_project, public_dataset_new, table_names["contract_data"] + dag, public_project, public_dataset_new, table_names["contract_data"], "pub_new" ) delete_contract_code_task = build_delete_data_task( - dag, public_project, public_dataset_new, table_names["contract_code"] + dag, public_project, public_dataset_new, table_names["contract_code"], "pub_new" ) delete_config_settings_task = build_delete_data_task( - dag, public_project, public_dataset_new, table_names["config_settings"] + dag, public_project, public_dataset_new, table_names["config_settings"], "pub_new" ) delete_expiration_task = build_delete_data_task( - dag, public_project, public_dataset_new, table_names["expiration"] + dag, public_project, public_dataset_new, table_names["expiration"], "pub_new" ) """ @@ -214,6 +215,7 @@ "/*-claimable_balances.txt", partition=True, cluster=True, + dataset_type="pub", ) """ @@ -228,6 +230,7 @@ "/*-accounts.txt", partition=True, cluster=True, + dataset_type="pub_new", ) send_bal_to_pub_new_task = build_gcs_to_bq_task( dag, @@ -238,6 +241,7 @@ "/*-claimable_balances.txt", partition=True, cluster=True, + dataset_type="pub_new", ) send_off_to_pub_new_task = build_gcs_to_bq_task( dag, @@ -248,6 +252,7 @@ "/*-offers.txt", partition=True, cluster=True, + dataset_type="pub_new", ) send_pool_to_pub_new_task = build_gcs_to_bq_task( dag, @@ -258,6 +263,7 @@ "/*-liquidity_pools.txt", partition=True, cluster=True, + dataset_type="pub_new", ) send_sign_to_pub_new_task = build_gcs_to_bq_task( dag, @@ -268,6 +274,7 @@ "/*-signers.txt", partition=True, cluster=True, + dataset_type="pub_new", ) send_trust_to_pub_new_task = build_gcs_to_bq_task( dag, @@ -278,6 +285,7 @@ "/*-trustlines.txt", partition=True, cluster=True, + dataset_type="pub_new", ) send_contract_data_to_pub_task = build_gcs_to_bq_task( dag, @@ -288,6 +296,7 @@ "/*-contract_data.txt", partition=True, cluster=True, + dataset_type="pub_new", ) send_contract_code_to_pub_task = build_gcs_to_bq_task( dag, @@ -298,6 +307,7 @@ "/*-contract_code.txt", partition=True, cluster=True, + dataset_type="pub_new", ) send_config_settings_to_pub_task = build_gcs_to_bq_task( dag, @@ -308,6 +318,7 @@ "/*-config_settings.txt", partition=True, cluster=True, + dataset_type="pub_new", ) send_expiration_to_pub_task = build_gcs_to_bq_task( dag, @@ -318,6 +329,7 @@ "/*-expiration.txt", partition=True, cluster=True, + dataset_type="pub_new", ) date_task >> changes_task >> write_acc_stats >> delete_acc_task >> send_acc_to_bq_task diff --git a/dags/stellar_etl_airflow/build_apply_gcs_changes_to_bq_task.py b/dags/stellar_etl_airflow/build_apply_gcs_changes_to_bq_task.py index 78a9f102..59cdc751 100644 --- a/dags/stellar_etl_airflow/build_apply_gcs_changes_to_bq_task.py +++ b/dags/stellar_etl_airflow/build_apply_gcs_changes_to_bq_task.py @@ -2,13 +2,11 @@ This file contains functions for creating Airflow tasks to merge data on ledger entry changes from a file in Google Cloud storage into a BigQuery table. """ -import json import logging -import os -from os.path import basename, splitext +from json import loads +from os.path import basename, join, splitext from airflow import AirflowException -from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook from airflow.models import Variable from airflow.operators.python_operator import PythonOperator from google.cloud import bigquery @@ -30,13 +28,11 @@ def read_local_schema(data_type): """ # since the dags folder is shared among airflow workers and the webservers, schemas are stored in the dags folder - schema_filepath = os.path.join( - Variable.get("schema_filepath"), f"{data_type}_schema.json" - ) + schema_filepath = join(Variable.get("schema_filepath"), f"{data_type}_schema.json") logging.info(f"Loading schema file at {schema_filepath}") with open(schema_filepath, "r") as schema_file: - schema_fields = json.loads(schema_file.read()) + schema_fields = loads(schema_file.read()) return schema_fields diff --git a/dags/stellar_etl_airflow/build_bq_insert_job_task.py b/dags/stellar_etl_airflow/build_bq_insert_job_task.py index 7ab3bdd6..f66871d8 100644 --- a/dags/stellar_etl_airflow/build_bq_insert_job_task.py +++ b/dags/stellar_etl_airflow/build_bq_insert_job_task.py @@ -32,13 +32,8 @@ def build_bq_insert_job( cluster=False, create=False, write_disposition="WRITE_APPEND", + dataset_type="bq", ): - if dataset == Variable.get("public_dataset"): - dataset_type = "pub" - elif dataset == Variable.get("public_dataset_new"): - dataset_type = "pub_new" - else: - dataset_type = "bq" query_path = get_query_filepath(table) query = file_to_string(query_path) batch_id = macros.get_batch_id() diff --git a/dags/stellar_etl_airflow/build_cross_dependency_task.py b/dags/stellar_etl_airflow/build_cross_dependency_task.py index 84013192..1038b259 100644 --- a/dags/stellar_etl_airflow/build_cross_dependency_task.py +++ b/dags/stellar_etl_airflow/build_cross_dependency_task.py @@ -1,4 +1,4 @@ -import datetime +from datetime import timedelta from airflow.sensors.external_task import ExternalTaskSensor from stellar_etl_airflow.default import alert_after_max_retries @@ -9,7 +9,7 @@ def build_cross_deps(dag, task, parent_dag, parent_task=None, time_delta=0): task_id=f"check_{task}_finish", external_dag_id=parent_dag, external_task_id=parent_task, # None means wait for the entire DAG to finish - execution_delta=datetime.timedelta(minutes=time_delta), + execution_delta=timedelta(minutes=time_delta), on_failure_callback=alert_after_max_retries, timeout=3600, allowed_states=["success"], diff --git a/dags/stellar_etl_airflow/build_dbt_task.py b/dags/stellar_etl_airflow/build_dbt_task.py index 827f362d..4179d63c 100644 --- a/dags/stellar_etl_airflow/build_dbt_task.py +++ b/dags/stellar_etl_airflow/build_dbt_task.py @@ -1,6 +1,7 @@ import logging from datetime import timedelta +from airflow.configuration import conf from airflow.models import Variable from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import ( KubernetesPodOperator, @@ -10,18 +11,18 @@ def create_dbt_profile(project="prod"): - dbt_target = Variable.get("dbt_target") - dbt_dataset = Variable.get("dbt_dataset") - dbt_maximum_bytes_billed = Variable.get("dbt_maximum_bytes_billed") - dbt_job_execution_timeout_seconds = Variable.get( - "dbt_job_execution_timeout_seconds" + dbt_target = "{{ var.value.dbt_target }}" + dbt_dataset = "{{ var.value.dbt_dataset }}" + dbt_maximum_bytes_billed = "{{ var.value.dbt_maximum_bytes_billed }}" + dbt_job_execution_timeout_seconds = ( + "{{ var.value.dbt_job_execution_timeout_seconds }}" ) - dbt_job_retries = Variable.get("dbt_job_retries") - dbt_project = Variable.get("dbt_project") - dbt_threads = Variable.get("dbt_threads") + dbt_job_retries = "{{ var.value.dbt_job_retries }}" + dbt_project = "{{ var.value.dbt_project }}" + dbt_threads = "{{ var.value.dbt_threads }}" if project == "pub": - dbt_project = Variable.get("public_project") - dbt_dataset = Variable.get("public_dataset") + dbt_project = "{{ var.value.public_project }}" + dbt_dataset = "{{ var.value.public_dataset }}" profiles_yml = f""" stellar_dbt: @@ -72,10 +73,7 @@ def build_dbt_task( """ dbt_full_refresh = "" - dbt_full_refresh_models = Variable.get( - "dbt_full_refresh_models", deserialize_json=True - ) - if dbt_full_refresh_models.get(model_name): + if "{{ var.json.get('dbt_full_refresh_models.' + model_name) }}": dbt_full_refresh = "--full-refresh" create_dbt_profile_cmd = create_dbt_profile(project) @@ -100,18 +98,19 @@ def build_dbt_task( ] logging.info(f"sh commands to run in pod: {args}") - config_file_location = Variable.get("kube_config_location") - in_cluster = False if config_file_location else True - resources_requests = ( - Variable.get("resources", deserialize_json=True) - .get(resource_cfg) - .get("requests") - ) + namespace = conf.get("kubernetes", "NAMESPACE") + if namespace == "default": + config_file_location = Variable.get("kube_config_location") + in_cluster = False + else: + config_file_location = None + in_cluster = True + resources_requests = "{{ var.json.get('resources.' + resource_cfg + '.requests') }}" affinity = Variable.get("affinity", deserialize_json=True).get(resource_cfg) - dbt_image = Variable.get("dbt_image_name") + dbt_image = "{{ var.value.dbt_image_name }}" if project == "pub": - dbt_image = Variable.get("public_dbt_image_name") + dbt_image = "{{ var.value.public_dbt_image_name }}" return KubernetesPodOperator( task_id=f"{project}_{model_name}", @@ -121,8 +120,8 @@ def build_dbt_task( build_dbt_task.__name__ ] ), - namespace=Variable.get("k8s_namespace"), - service_account_name=Variable.get("k8s_service_account"), + namespace="{{ var.value.k8s_namespace }}", + service_account_name="{{ var.value.service_account_name }}", image=dbt_image, cmds=command, arguments=args, diff --git a/dags/stellar_etl_airflow/build_delete_data_task.py b/dags/stellar_etl_airflow/build_delete_data_task.py index 59b71873..b11d19f0 100644 --- a/dags/stellar_etl_airflow/build_delete_data_task.py +++ b/dags/stellar_etl_airflow/build_delete_data_task.py @@ -6,13 +6,7 @@ from stellar_etl_airflow.default import alert_after_max_retries -def build_delete_data_task(dag, project, dataset, table): - if dataset == Variable.get("public_dataset"): - dataset_type = "pub" - elif dataset == Variable.get("public_dataset_new"): - dataset_type = "pub_new" - else: - dataset_type = "bq" +def build_delete_data_task(dag, project, dataset, table, dataset_type="bq"): batch_id = macros.get_batch_id() batch_date = "{{ batch_run_date_as_datetime_string(dag, data_interval_start) }}" diff --git a/dags/stellar_etl_airflow/build_export_task.py b/dags/stellar_etl_airflow/build_export_task.py index ec0a02c0..427fffe7 100644 --- a/dags/stellar_etl_airflow/build_export_task.py +++ b/dags/stellar_etl_airflow/build_export_task.py @@ -6,6 +6,7 @@ from datetime import datetime, timedelta from airflow import AirflowException +from airflow.configuration import conf from airflow.models import Variable from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import ( KubernetesPodOperator, @@ -174,13 +175,14 @@ def build_export_task( command, filename, cmd_type, use_gcs, use_testnet, use_futurenet ) etl_cmd_string = " ".join(etl_cmd) - config_file_location = Variable.get("kube_config_location") - in_cluster = False if config_file_location else True - resources_requests = ( - Variable.get("resources", deserialize_json=True) - .get(resource_cfg) - .get("requests") - ) + namespace = conf.get("kubernetes", "NAMESPACE") + if namespace == "default": + config_file_location = Variable.get("kube_config_location") + in_cluster = False + else: + config_file_location = None + in_cluster = True + resources_requests = "{{ var.json.get('resources.' + resource_cfg + '.requests') }}" affinity = Variable.get("affinity", deserialize_json=True).get(resource_cfg) if command == "export_ledger_entry_changes": arguments = f"""{etl_cmd_string} && echo "{{\\"output\\": \\"{output_file}\\"}}" >> /airflow/xcom/return.json""" @@ -190,8 +192,8 @@ def build_export_task( \\"failed_transforms\\": `grep failed_transforms stderr.out | cut -d\\",\\" -f2 | cut -d\\":\\" -f2`}}" >> /airflow/xcom/return.json """ return KubernetesPodOperator( - service_account_name=Variable.get("k8s_service_account"), - namespace=Variable.get("k8s_namespace"), + service_account_name="{{ var.value.service_account_name }}", + namespace="{{ var.value.k8s_namespace }}", task_id=command + "_task", execution_timeout=timedelta( minutes=Variable.get("task_timeout", deserialize_json=True)[ @@ -199,7 +201,7 @@ def build_export_task( ] ), name=command + "_task", - image=Variable.get("image_name"), + image="{{ var.value.image_name }}", cmds=["bash", "-c"], arguments=[arguments], dag=dag, diff --git a/dags/stellar_etl_airflow/build_gcs_to_bq_task.py b/dags/stellar_etl_airflow/build_gcs_to_bq_task.py index d91858bb..e3726e43 100644 --- a/dags/stellar_etl_airflow/build_gcs_to_bq_task.py +++ b/dags/stellar_etl_airflow/build_gcs_to_bq_task.py @@ -54,6 +54,7 @@ def build_gcs_to_bq_task( source_object_suffix, partition, cluster, + dataset_type="bq", ): """ Creates a task to load a file from Google Cloud Storage into BigQuery. @@ -87,13 +88,6 @@ def build_gcs_to_bq_task( ) else: cluster_fields = None - project_name = project - if dataset == Variable.get("public_dataset"): - dataset_type = "pub" - elif dataset == Variable.get("public_dataset_new"): - dataset_type = "pub_new" - else: - dataset_type = "bq" dataset_name = dataset time_partition = {} if partition: @@ -128,7 +122,7 @@ def build_gcs_to_bq_task( + '\')["output"] }}' + source_object_suffix ], - destination_project_dataset_table=f"{project_name}.{dataset_name}.{data_type}{staging_table_suffix}", + destination_project_dataset_table=f"{project}.{dataset_name}.{data_type}{staging_table_suffix}", write_disposition="WRITE_APPEND", create_disposition="CREATE_IF_NEEDED", schema_update_option="ALLOW_FIELD_ADDITION", @@ -164,7 +158,7 @@ def build_gcs_to_bq_task( + '\')["output"] }}' + source_object_suffix ], - destination_project_dataset_table=f"{project_name}.{dataset_name}.{data_type}{staging_table_suffix}", + destination_project_dataset_table=f"{project}.{dataset_name}.{data_type}{staging_table_suffix}", write_disposition="WRITE_APPEND", create_disposition="CREATE_IF_NEEDED", max_bad_records=0, diff --git a/dags/stellar_etl_airflow/build_time_task.py b/dags/stellar_etl_airflow/build_time_task.py index 87f31242..2d62e33d 100644 --- a/dags/stellar_etl_airflow/build_time_task.py +++ b/dags/stellar_etl_airflow/build_time_task.py @@ -4,6 +4,7 @@ import logging from datetime import timedelta +from airflow.configuration import conf from airflow.models import Variable from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import ( KubernetesPodOperator, @@ -53,13 +54,14 @@ def build_time_task( args.append("--testnet") elif use_futurenet: args.append("--futurenet") - config_file_location = Variable.get("kube_config_location") - in_cluster = False if config_file_location else True - resources_requests = ( - Variable.get("resources", deserialize_json=True) - .get(resource_cfg) - .get("requests") - ) + namespace = conf.get("kubernetes", "NAMESPACE") + if namespace == "default": + config_file_location = Variable.get("kube_config_location") + in_cluster = False + else: + config_file_location = None + in_cluster = True + resources_requests = "{{ var.json.get('resources.' + resource_cfg + '.requests') }}" affinity = Variable.get("affinity", deserialize_json=True).get(resource_cfg) return KubernetesPodOperator( @@ -70,9 +72,9 @@ def build_time_task( build_time_task.__name__ ] ), - namespace=Variable.get("k8s_namespace"), - service_account_name=Variable.get("k8s_service_account"), - image=Variable.get("image_name"), + namespace="{{ var.value.k8s_namespace }}", + service_account_name="{{ var.value.service_account_name }}", + image="{{ var.value.image_name }}", cmds=command, arguments=args, dag=dag,