diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 93cbd16b..2495befe 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -26,7 +26,7 @@ jobs: python-version: 3.8 - id: file_changes - uses: trilom/file-changes-action@v1.2.3 + uses: trilom/file-changes-action@v1.2.4 with: output: " " diff --git a/airflow_variables_dev.json b/airflow_variables_dev.json index 04c9cb2d..bdf80319 100644 --- a/airflow_variables_dev.json +++ b/airflow_variables_dev.json @@ -342,9 +342,9 @@ "create_sandbox": 2400, "current_state": 720, "default": 60, - "elementary_dbt_enriched_base_tables": 1080, - "elementary_dbt_stellar_marts": 1620, + "elementary_dbt_data_quality": 1620, "enriched_history_operations": 780, + "enriched_history_operations_with_exclude": 780, "fee_stats": 840, "history_assets": 720, "liquidity_pool_trade_volume": 1140, @@ -356,6 +356,7 @@ "ohlc": 720, "partnership_assets": 660, "relevant_asset_trades": 1200, + "singular_test": 600, "snapshot_state": 600, "soroban": 720, "trade_agg": 720, diff --git a/airflow_variables_prod.json b/airflow_variables_prod.json index 5c831fab..cee5b226 100644 --- a/airflow_variables_prod.json +++ b/airflow_variables_prod.json @@ -340,9 +340,9 @@ "create_sandbox": 1020, "current_state": 1200, "default": 60, - "elementary_dbt_enriched_base_tables": 2100, - "elementary_dbt_stellar_marts": 1560, + "elementary_dbt_data_quality": 2100, "enriched_history_operations": 1800, + "enriched_history_operations_with_exclude": 1800, "fee_stats": 360, "history_assets": 360, "liquidity_pool_trade_volume": 1200, @@ -354,6 +354,7 @@ "ohlc": 960, "partnership_assets": 1380, "relevant_asset_trades": 1800, + "singular_test": 840, "snapshot_state": 840, "soroban": 420, "trade_agg": 1020, diff --git a/dags/dbt_data_quality_alerts_dag.py b/dags/dbt_data_quality_alerts_dag.py new file mode 100644 index 00000000..1d3a195c --- /dev/null +++ b/dags/dbt_data_quality_alerts_dag.py @@ -0,0 +1,41 @@ +from datetime import datetime + +from airflow import DAG +from airflow.operators.empty import EmptyOperator +from kubernetes.client import models as k8s +from stellar_etl_airflow.build_dbt_task import dbt_task +from stellar_etl_airflow.build_elementary_slack_alert_task import elementary_task +from stellar_etl_airflow.default import ( + alert_sla_miss, + get_default_dag_args, + init_sentry, +) + +init_sentry() + +with DAG( + "dbt_data_quality_alerts", + default_args=get_default_dag_args(), + start_date=datetime(2024, 6, 25, 0, 0), + description="This DAG runs dbt tests and Elementary alerts at a half-hourly cadence", + schedule="*/15,*/45 * * * *", # Runs every 15th minute and every 45th minute + user_defined_filters={ + "container_resources": lambda s: k8s.V1ResourceRequirements(requests=s), + }, + max_active_runs=1, + catchup=False, + tags=["dbt-data-quality", "dbt-elementary-alerts"], + # sla_miss_callback=alert_sla_miss, +) as dag: + + # DBT tests to run + singular_tests = dbt_task( + dag, + command_type="test", + tag="singular_test", + ) + singular_tests_elementary_alerts = elementary_task(dag, "dbt_data_quality") + start_tests = EmptyOperator(task_id="start_tests_task") + + # DAG task graph + start_tests >> singular_tests >> singular_tests_elementary_alerts diff --git a/dags/dbt_enriched_base_tables_dag.py b/dags/dbt_enriched_base_tables_dag.py index 1414002e..1b4a07c6 100644 --- a/dags/dbt_enriched_base_tables_dag.py +++ b/dags/dbt_enriched_base_tables_dag.py @@ -4,7 +4,6 @@ from kubernetes.client import models as k8s from stellar_etl_airflow.build_cross_dependency_task import build_cross_deps from stellar_etl_airflow.build_dbt_task import dbt_task -from stellar_etl_airflow.build_elementary_slack_alert_task import elementary_task from stellar_etl_airflow.default import ( alert_sla_miss, get_default_dag_args, @@ -35,16 +34,12 @@ wait_on_state_table = build_cross_deps(dag, "wait_on_state_table", "state_table_export") # DBT models to run -enriched_history_operations_task = dbt_task(dag, tag="enriched_history_operations") +enriched_history_operations_task = dbt_task( + dag, tag="enriched_history_operations", excluded="singular_test" +) current_state_task = dbt_task(dag, tag="current_state") -elementary = elementary_task(dag, "dbt_enriched_base_tables") - # DAG task graph wait_on_history_table >> enriched_history_operations_task wait_on_state_table >> current_state_task - -enriched_history_operations_task >> elementary - -current_state_task >> elementary diff --git a/dags/dbt_stellar_marts_dag.py b/dags/dbt_stellar_marts_dag.py index 7cc7bc2e..47920366 100644 --- a/dags/dbt_stellar_marts_dag.py +++ b/dags/dbt_stellar_marts_dag.py @@ -4,7 +4,6 @@ from kubernetes.client import models as k8s from stellar_etl_airflow.build_cross_dependency_task import build_cross_deps from stellar_etl_airflow.build_dbt_task import dbt_task -from stellar_etl_airflow.build_elementary_slack_alert_task import elementary_task from stellar_etl_airflow.default import ( alert_sla_miss, get_default_dag_args, @@ -55,8 +54,6 @@ snapshot_state = dbt_task(dag, tag="snapshot_state") relevant_asset_trades = dbt_task(dag, tag="relevant_asset_trades") -elementary = elementary_task(dag, "dbt_stellar_marts") - # DAG task graph wait_on_dbt_enriched_base_tables >> ohlc_task >> liquidity_pool_trade_volume_task @@ -75,18 +72,3 @@ wait_on_dbt_enriched_base_tables >> soroban wait_on_dbt_enriched_base_tables >> snapshot_state wait_on_dbt_enriched_base_tables >> relevant_asset_trades - -mgi_task >> elementary -liquidity_providers_task >> elementary -liquidity_pools_values_task >> elementary -liquidity_pools_value_history_task >> elementary -trade_agg_task >> elementary -fee_stats_agg_task >> elementary -asset_stats_agg_task >> elementary -network_stats_agg_task >> elementary -partnership_assets_task >> elementary -history_assets >> elementary -soroban >> elementary -liquidity_pool_trade_volume_task >> elementary -snapshot_state >> elementary -relevant_asset_trades >> elementary diff --git a/dags/stellar_etl_airflow/build_dbt_task.py b/dags/stellar_etl_airflow/build_dbt_task.py index 6c0d4a0d..df34f292 100644 --- a/dags/stellar_etl_airflow/build_dbt_task.py +++ b/dags/stellar_etl_airflow/build_dbt_task.py @@ -64,6 +64,7 @@ def dbt_task( flag="select", operator="", command_type="build", + excluded=None, resource_cfg="default", ): namespace = conf.get("kubernetes", "NAMESPACE") @@ -97,6 +98,15 @@ def dbt_task( args.append(",".join(models)) else: args.append(models[0]) + # --exclude selector added for necessary use cases + # Argument should be string or list of strings + if excluded: + task_name = f"{task_name}_with_exclude" + args.append("--exclude") + if isinstance(excluded, list): + args.append(" ".join(excluded)) + else: + args.append(excluded) if Variable.get("dbt_full_refresh_models", deserialize_json=True).get(task_name): args.append("--full-refresh") diff --git a/dags/stellar_etl_airflow/build_elementary_slack_alert_task.py b/dags/stellar_etl_airflow/build_elementary_slack_alert_task.py index 86b0727c..abb93ebb 100644 --- a/dags/stellar_etl_airflow/build_elementary_slack_alert_task.py +++ b/dags/stellar_etl_airflow/build_elementary_slack_alert_task.py @@ -4,9 +4,7 @@ from airflow.configuration import conf from airflow.models import Variable -from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import ( - KubernetesPodOperator, -) +from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator from kubernetes import client, config from kubernetes.client import models as k8s from stellar_etl_airflow.default import alert_after_max_retries @@ -93,4 +91,5 @@ def elementary_task( f"elementary_{task_name}" ] ), + trigger_rule="all_done", )