diff --git a/airflow_variables_dev.json b/airflow_variables_dev.json index 9770af95..367e3228 100644 --- a/airflow_variables_dev.json +++ b/airflow_variables_dev.json @@ -347,7 +347,6 @@ "enriched_history_operations": 780, "enriched_history_operations_with_exclude": 780, "fee_stats": 840, - "freshness": 720, "history_assets": 720, "liquidity_pool_trade_volume": 1140, "liquidity_pools_value": 840, diff --git a/dags/dbt_source_data_freshness_test_dag.py b/dags/dbt_source_data_freshness_test_dag.py deleted file mode 100644 index c191b811..00000000 --- a/dags/dbt_source_data_freshness_test_dag.py +++ /dev/null @@ -1,42 +0,0 @@ -from datetime import datetime - -from airflow import DAG -from airflow.operators.empty import EmptyOperator -from kubernetes.client import models as k8s -from stellar_etl_airflow.build_dbt_task import dbt_task -from stellar_etl_airflow.build_elementary_slack_alert_task import elementary_task -from stellar_etl_airflow.default import ( - alert_sla_miss, - get_default_dag_args, - init_sentry, -) - -init_sentry() - -with DAG( - "dbt_source_data_freshness_tests", - default_args=get_default_dag_args(), - start_date=datetime(2024, 9, 24, 0, 0), - description="This DAG runs source freshness tests at 30 min cadence", - schedule="*/30 * * * *", # Every day at 3:00 PM UTC / 9:00 AM CST - user_defined_filters={ - "container_resources": lambda s: k8s.V1ResourceRequirements(requests=s), - }, - max_active_runs=1, - catchup=False, - tags=["dbt-data-quality"], - # sla_miss_callback=alert_sla_miss, -) as dag: - - # DBT tests to run - source_freshness_tests = dbt_task( - dag, - command_type="source", - sub_command="freshness", - cmd_args=[ - "source:crypto_stellar.*", - "source:crypto_stellar_internal_2.*", - ], - tag=None, - resource_cfg="dbt", - ) diff --git a/dags/stellar_etl_airflow/build_dbt_task.py b/dags/stellar_etl_airflow/build_dbt_task.py index 34009bee..56a88158 100644 --- a/dags/stellar_etl_airflow/build_dbt_task.py +++ b/dags/stellar_etl_airflow/build_dbt_task.py @@ -61,11 +61,9 @@ def create_dbt_profile(project="prod"): def dbt_task( dag, model_name=None, - sub_command=None, tag=None, flag="select", operator="", - cmd_args=[], command_type="build", excluded=None, resource_cfg="default", @@ -89,15 +87,7 @@ def dbt_task( dbt_image = "{{ var.value.dbt_image_name }}" - args = [command_type] - if sub_command: - args.append(sub_command) - - if flag: - args.append(f"--{flag}") - - if len(cmd_args): - args = [*args, *cmd_args] + args = [command_type, f"--{flag}"] models = [] if tag: @@ -106,12 +96,10 @@ def dbt_task( if model_name: task_name = model_name models.append(f"{operator}{model_name}") - if sub_command: - task_name = sub_command if len(models) > 1: task_name = "multiple_models" args.append(",".join(models)) - elif len(models): + else: args.append(models[0]) # --exclude selector added for necessary use cases