-
Notifications
You must be signed in to change notification settings - Fork 19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HUBBLE 444 - Refactor Elementary monitoring to run every 30 min #379
Changes from 5 commits
a7333e5
8b1c2fc
9abcd5e
79d85a0
9568161
b88170e
64b38ef
38cb2ea
2454286
058ba13
19055df
8107c2c
a6ebaad
4952a23
df06729
81add38
376f3d8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,37 @@ | ||
from datetime import datetime | ||
|
||
from airflow import DAG | ||
from airflow.operators.empty import EmptyOperator | ||
from kubernetes.client import models as k8s | ||
from stellar_etl_airflow.build_dbt_task import dbt_task | ||
from stellar_etl_airflow.build_elementary_slack_alert_task import elementary_task | ||
from stellar_etl_airflow.default import get_default_dag_args, init_sentry | ||
|
||
init_sentry() | ||
|
||
dag = DAG( | ||
"dbt_data_quality_alerts", | ||
default_args=get_default_dag_args(), | ||
start_date=datetime(2024, 6, 11, 0, 0), | ||
description="This DAG runs dbt tests and Elementary alerts at a half-hourly cadence", | ||
schedule_interval="*/30 * * * *", | ||
user_defined_filters={ | ||
"container_resources": lambda s: k8s.V1ResourceRequirements(requests=s), | ||
}, | ||
max_active_runs=1, | ||
catchup=False, | ||
tags=["dbt-data-quality", "dbt-elementary-alerts"], | ||
) | ||
|
||
|
||
# DBT tests to run | ||
dbt_unit_tests = dbt_task( | ||
dag, | ||
command_type="test", | ||
tag="singular_test", | ||
) | ||
unit_tests_elementary_alerts = elementary_task(dag, "dbt_data_quality") | ||
start_tests = EmptyOperator(task_id="start_tests_task") | ||
|
||
# DAG task graph | ||
start_tests >> dbt_unit_tests >> unit_tests_elementary_alerts |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -64,6 +64,7 @@ def dbt_task( | |
flag="select", | ||
operator="", | ||
command_type="build", | ||
excluded=None, | ||
resource_cfg="default", | ||
): | ||
namespace = conf.get("kubernetes", "NAMESPACE") | ||
|
@@ -97,6 +98,15 @@ def dbt_task( | |
args.append(",".join(models)) | ||
else: | ||
args.append(models[0]) | ||
# --exclude selector added for necessary use cases | ||
# Argument should be string or list of strings | ||
if excluded: | ||
task_name = f"{task_name}_with_exclude" | ||
args.append("--exclude") | ||
if isinstance(excluded, list): | ||
args.append(",".join(excluded)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this supposed to be a comma or a space? I think the comma means it is the intersection of the items in the excluded list whereas space means both are excluded There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're right, it should be space-separated to provide union for the arguments; I'm fixing it in the next commit. |
||
else: | ||
args.append(excluded) | ||
|
||
if Variable.get("dbt_full_refresh_models", deserialize_json=True).get(task_name): | ||
args.append("--full-refresh") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we change the interval to
*/15,*/45 * * * *
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, it makes much more sense to run in-between the dbt dags.