From 25c33b3155385f480ca7d62ceca6f5a3b32baa96 Mon Sep 17 00:00:00 2001 From: Simon Chow Date: Mon, 7 Oct 2024 14:04:39 -0400 Subject: [PATCH] fix bugs --- airflow_variables_dev.json | 5 ++++- dags/generate_avro_files_dag.py | 1 - .../build_bq_generate_avro_job_task.py | 10 ++++++---- dags/stellar_etl_airflow/macros.py | 4 ++++ 4 files changed, 14 insertions(+), 6 deletions(-) diff --git a/airflow_variables_dev.json b/airflow_variables_dev.json index 367e3228..78585f6a 100644 --- a/airflow_variables_dev.json +++ b/airflow_variables_dev.json @@ -1,5 +1,6 @@ { "api_key_path": "/home/airflow/gcs/data/apiKey.json", + "avro_gcs_bucket": "test_dune_bucket_sdf", "bq_dataset": "test_crypto_stellar_internal", "bq_dataset_audit_log": "audit_log", "bq_project": "test-hubble-319619", @@ -339,6 +340,7 @@ "build_export_task": 840, "build_gcs_to_bq_task": 960, "build_time_task": 480, + "build_bq_generate_avro_job": 600, "cleanup_metadata": 60, "create_sandbox": 2400, "current_state": 720, @@ -373,7 +375,8 @@ "build_delete_data_task": 180, "build_export_task": 420, "build_gcs_to_bq_task": 300, - "build_time_task": 480 + "build_time_task": 480, + "build_bq_generate_avro_job": 600 }, "txmeta_datastore_path": "sdf-ledger-close-meta/ledgers", "use_captive_core": "False", diff --git a/dags/generate_avro_files_dag.py b/dags/generate_avro_files_dag.py index 78ade559..a6aa95cf 100644 --- a/dags/generate_avro_files_dag.py +++ b/dags/generate_avro_files_dag.py @@ -25,7 +25,6 @@ }, max_active_runs=5, catchup=True, - tags=["dbt-enriched-base-tables"], sla_miss_callback=alert_sla_miss, ) diff --git a/dags/stellar_etl_airflow/build_bq_generate_avro_job_task.py b/dags/stellar_etl_airflow/build_bq_generate_avro_job_task.py index f5d05242..007dd388 100644 --- a/dags/stellar_etl_airflow/build_bq_generate_avro_job_task.py +++ b/dags/stellar_etl_airflow/build_bq_generate_avro_job_task.py @@ -29,8 +29,10 @@ def build_bq_generate_avro_job( next_batch_run_date = ( "{{ batch_run_date_as_datetime_string(dag, data_interval_end) }}" ) - batch_run_time = f"{batch_run_date.hour}:{batch_run_date.minute}:{batch_run_date.second}": - uri = f"gs://{gcs_bucket}/avro/{table}/{batch_run_date.year}/{batch_run_date.month}/{batch_run_date.day}/{batch_run_time}/*.avro" + uri_datetime = ( + "{{ batch_run_date_as_directory_string(dag, data_interval_start) }}" + ) + uri = f"gs://{gcs_bucket}/avro/{table}/{uri_datetime}/*.avro" sql_params = { "project_id": project, "dataset_id": dataset, @@ -51,14 +53,14 @@ def build_bq_generate_avro_job( task_id=f"generate_avro_{table}", execution_timeout=timedelta( seconds=Variable.get("task_timeout", deserialize_json=True)[ - build_bq_insert_job.__name__ + build_bq_generate_avro_job.__name__ ] ), on_failure_callback=alert_after_max_retries, configuration=configuration, sla=timedelta( seconds=Variable.get("task_sla", deserialize_json=True)[ - build_bq_insert_job.__name__ + build_bq_generate_avro_job.__name__ ] ), ) diff --git a/dags/stellar_etl_airflow/macros.py b/dags/stellar_etl_airflow/macros.py index fdd23c89..55501d76 100644 --- a/dags/stellar_etl_airflow/macros.py +++ b/dags/stellar_etl_airflow/macros.py @@ -9,3 +9,7 @@ def batch_run_date_as_datetime_string(dag, start_time): def get_batch_id(): return "{}-{}".format("{{ run_id }}", "{{ params.alias }}") + +def batch_run_date_as_directory_string(dag, start_time): + time = subtract_data_interval(dag, start_time) + return f"{time.year}/{time.month}/{time.day}/{time.hour}:{time.minute}:{time.second}"