From 48c8e632ee41f1c42fa26f851f14857dc0c09c62 Mon Sep 17 00:00:00 2001 From: Amisha Singla Date: Tue, 12 Nov 2024 16:01:50 -0600 Subject: [PATCH] Add dag to create elementary report (#506) * Add dag to create elementary report * lint update update * Use last invocation update * send args * concat * reduce days * update * update dbt image * get all invokes * update dbt image * udpate image * increase task sla threshold * limited data fetch * increase memory * Specific config for elementary report * Increase ephemeral storage * report for last 7 days * report for last 7 days * upgrade elementary * limit days * increase ephemeral storage * decrease execution limit * Actually use ephemeral storage * Actually use ephemeral storage * Actually use ephemeral storage * bring memory down back * Fix schedule, add vars in prod * lint * update dbt image and remove unnecessary vars --- airflow_variables_dev.json | 13 +++-- airflow_variables_prod.json | 13 +++-- dags/dbt_data_quality_alerts_dag.py | 4 +- dags/elementary_report_dag.py | 47 +++++++++++++++++++ .../build_elementary_slack_alert_task.py | 26 +++++----- 5 files changed, 82 insertions(+), 21 deletions(-) create mode 100644 dags/elementary_report_dag.py diff --git a/airflow_variables_dev.json b/airflow_variables_dev.json index 957f269b..e2b876b9 100644 --- a/airflow_variables_dev.json +++ b/airflow_variables_dev.json @@ -124,7 +124,7 @@ "partnership_assets__account_holders_activity_fact": false, "partnership_assets__asset_activity_fact": false }, - "dbt_image_name": "stellar/stellar-dbt:53375b5f9", + "dbt_image_name": "stellar/stellar-dbt:420c216df", "dbt_internal_source_db": "test-hubble-319619", "dbt_internal_source_schema": "test_crypto_stellar_internal", "dbt_job_execution_timeout_seconds": 300, @@ -288,21 +288,25 @@ "dbt": { "requests": { "cpu": "1", - "ephemeral-storage": "500Mi", "memory": "600Mi" } }, "default": { "requests": { "cpu": "0.3", - "ephemeral-storage": "500Mi", "memory": "600Mi" } }, + "elementaryreport": { + "requests": { + "cpu": "1", + "ephemeral_storage": "2Gi", + "memory": "4Gi" + } + }, "stellaretl": { "requests": { "cpu": "0.3", - "ephemeral-storage": "500Mi", "memory": "600Mi" } } @@ -346,6 +350,7 @@ "current_state": 720, "default": 60, "elementary_dbt_data_quality": 1620, + "elementary_generate_report": 1200, "enriched_history_operations": 780, "enriched_history_operations_with_exclude": 780, "fee_stats": 840, diff --git a/airflow_variables_prod.json b/airflow_variables_prod.json index 9376de30..4c02c54d 100644 --- a/airflow_variables_prod.json +++ b/airflow_variables_prod.json @@ -125,7 +125,7 @@ "partnership_assets__asset_activity_fact": false, "trade_agg": false }, - "dbt_image_name": "stellar/stellar-dbt:53375b5f9", + "dbt_image_name": "stellar/stellar-dbt:420c216df", "dbt_internal_source_db": "hubble-261722", "dbt_internal_source_schema": "crypto_stellar_internal_2", "dbt_job_execution_timeout_seconds": 2400, @@ -286,21 +286,25 @@ "dbt": { "requests": { "cpu": "1", - "ephemeral-storage": "1Gi", "memory": "1Gi" } }, "default": { "requests": { "cpu": "0.5", - "ephemeral-storage": "1Gi", "memory": "1Gi" } }, + "elementaryreport": { + "requests": { + "cpu": "1", + "ephemeral_storage": "2Gi", + "memory": "4Gi" + } + }, "stellaretl": { "requests": { "cpu": "0.5", - "ephemeral-storage": "1Gi", "memory": "1Gi" } } @@ -344,6 +348,7 @@ "current_state": 1200, "default": 60, "elementary_dbt_data_quality": 2100, + "elementary_generate_report": 1200, "enriched_history_operations": 1800, "enriched_history_operations_with_exclude": 1800, "fee_stats": 360, diff --git a/dags/dbt_data_quality_alerts_dag.py b/dags/dbt_data_quality_alerts_dag.py index 3a599218..e8f58b8a 100644 --- a/dags/dbt_data_quality_alerts_dag.py +++ b/dags/dbt_data_quality_alerts_dag.py @@ -29,6 +29,8 @@ ) as dag: # Trigger elementary - elementary_alerts = elementary_task(dag, "dbt_data_quality", resource_cfg="dbt") + elementary_alerts = elementary_task( + dag, "dbt_data_quality", "monitor", resource_cfg="dbt" + ) elementary_alerts diff --git a/dags/elementary_report_dag.py b/dags/elementary_report_dag.py new file mode 100644 index 00000000..ab9c606d --- /dev/null +++ b/dags/elementary_report_dag.py @@ -0,0 +1,47 @@ +from datetime import datetime + +from airflow import DAG +from airflow.operators.empty import EmptyOperator +from kubernetes.client import models as k8s +from stellar_etl_airflow.build_dbt_task import dbt_task +from stellar_etl_airflow.build_elementary_slack_alert_task import elementary_task +from stellar_etl_airflow.default import ( + alert_sla_miss, + get_default_dag_args, + init_sentry, +) + +init_sentry() + +with DAG( + "elementary_report", + default_args=get_default_dag_args(), + start_date=datetime(2024, 11, 11, 0, 0), + description="This DAG creates elementary report and send it to slack", + schedule="0 3 * * MON", # Runs every Monday + user_defined_filters={ + "container_resources": lambda s: k8s.V1ResourceRequirements(requests=s), + }, + max_active_runs=1, + catchup=False, +) as dag: + + # Trigger elementary + elementary_alerts = elementary_task( + dag, + "generate_report", + "send-report", + resource_cfg="elementaryreport", + cmd_args=[ + "--days-back", + "7", + "--profiles-dir", + ".", + "--executions-limit", + "120", + "--slack-file-name", + f"elementary_report_{datetime.today().date()}.html", + ], + ) + + elementary_alerts diff --git a/dags/stellar_etl_airflow/build_elementary_slack_alert_task.py b/dags/stellar_etl_airflow/build_elementary_slack_alert_task.py index 2b6b88e5..6bc76e26 100644 --- a/dags/stellar_etl_airflow/build_elementary_slack_alert_task.py +++ b/dags/stellar_etl_airflow/build_elementary_slack_alert_task.py @@ -19,11 +19,7 @@ def access_secret(secret_name, namespace): return secret -def elementary_task( - dag, - task_name, - resource_cfg="default", -): +def elementary_task(dag, task_name, command, cmd_args=[], resource_cfg="default"): namespace = conf.get("kubernetes", "NAMESPACE") if namespace == "default": @@ -33,25 +29,31 @@ def elementary_task( config_file_location = None in_cluster = True - container_resources = k8s.V1ResourceRequirements( - requests={ - "cpu": f"{{{{ var.json.resources.{resource_cfg}.requests.cpu }}}}", - "memory": f"{{{{ var.json.resources.{resource_cfg}.requests.memory }}}}", - } - ) + requests = { + "cpu": f"{{{{ var.json.resources.{resource_cfg}.requests.cpu }}}}", + "memory": f"{{{{ var.json.resources.{resource_cfg}.requests.memory }}}}", + } + if resource_cfg == "elementaryreport": + requests["ephemeral-storage"] = ( + f"{{{{ var.json.resources.{resource_cfg}.requests.ephemeral_storage }}}}" + ) + container_resources = k8s.V1ResourceRequirements(requests=requests) dbt_image = "{{ var.value.dbt_image_name }}" slack_secret_name = Variable.get("dbt_elementary_secret") secret = access_secret(slack_secret_name, "default") args = [ - "monitor", + f"{command}", "--slack-token", f"{secret}", "--slack-channel-name", "{{ var.value.dbt_slack_elementary_channel }}", ] + if len(cmd_args): + args = [*args, *cmd_args] + logging.info(f"sh commands to run in pod: {args}") return KubernetesPodOperator(