Skip to content

Commit

Permalink
fix bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
chowbao committed Oct 7, 2024
1 parent d38f6f8 commit 25c33b3
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 6 deletions.
5 changes: 4 additions & 1 deletion airflow_variables_dev.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"api_key_path": "/home/airflow/gcs/data/apiKey.json",
"avro_gcs_bucket": "test_dune_bucket_sdf",
"bq_dataset": "test_crypto_stellar_internal",
"bq_dataset_audit_log": "audit_log",
"bq_project": "test-hubble-319619",
Expand Down Expand Up @@ -339,6 +340,7 @@
"build_export_task": 840,
"build_gcs_to_bq_task": 960,
"build_time_task": 480,
"build_bq_generate_avro_job": 600,
"cleanup_metadata": 60,
"create_sandbox": 2400,
"current_state": 720,
Expand Down Expand Up @@ -373,7 +375,8 @@
"build_delete_data_task": 180,
"build_export_task": 420,
"build_gcs_to_bq_task": 300,
"build_time_task": 480
"build_time_task": 480,
"build_bq_generate_avro_job": 600
},
"txmeta_datastore_path": "sdf-ledger-close-meta/ledgers",
"use_captive_core": "False",
Expand Down
1 change: 0 additions & 1 deletion dags/generate_avro_files_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
},
max_active_runs=5,
catchup=True,
tags=["dbt-enriched-base-tables"],
sla_miss_callback=alert_sla_miss,
)

Expand Down
10 changes: 6 additions & 4 deletions dags/stellar_etl_airflow/build_bq_generate_avro_job_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ def build_bq_generate_avro_job(
next_batch_run_date = (
"{{ batch_run_date_as_datetime_string(dag, data_interval_end) }}"
)
batch_run_time = f"{batch_run_date.hour}:{batch_run_date.minute}:{batch_run_date.second}":
uri = f"gs://{gcs_bucket}/avro/{table}/{batch_run_date.year}/{batch_run_date.month}/{batch_run_date.day}/{batch_run_time}/*.avro"
uri_datetime = (
"{{ batch_run_date_as_directory_string(dag, data_interval_start) }}"
)
uri = f"gs://{gcs_bucket}/avro/{table}/{uri_datetime}/*.avro"
sql_params = {
"project_id": project,
"dataset_id": dataset,
Expand All @@ -51,14 +53,14 @@ def build_bq_generate_avro_job(
task_id=f"generate_avro_{table}",
execution_timeout=timedelta(
seconds=Variable.get("task_timeout", deserialize_json=True)[
build_bq_insert_job.__name__
build_bq_generate_avro_job.__name__
]
),
on_failure_callback=alert_after_max_retries,
configuration=configuration,
sla=timedelta(
seconds=Variable.get("task_sla", deserialize_json=True)[
build_bq_insert_job.__name__
build_bq_generate_avro_job.__name__
]
),
)
4 changes: 4 additions & 0 deletions dags/stellar_etl_airflow/macros.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,7 @@ def batch_run_date_as_datetime_string(dag, start_time):

def get_batch_id():
return "{}-{}".format("{{ run_id }}", "{{ params.alias }}")

def batch_run_date_as_directory_string(dag, start_time):
time = subtract_data_interval(dag, start_time)
return f"{time.year}/{time.month}/{time.day}/{time.hour}:{time.minute}:{time.second}"

0 comments on commit 25c33b3

Please sign in to comment.