diff --git a/airflow_variables_prod.json b/airflow_variables_prod.json index 53347235..d26cd16e 100644 --- a/airflow_variables_prod.json +++ b/airflow_variables_prod.json @@ -1,5 +1,6 @@ { "api_key_path": "/home/airflow/gcs/data/apiKey.json", + "avro_gcs_bucket": "dune_bucket_sdf", "bq_dataset": "crypto_stellar_internal_2", "bq_dataset_audit_log": "audit_log", "bq_project": "hubble-261722", @@ -337,6 +338,7 @@ "build_export_task": 600, "build_gcs_to_bq_task": 660, "build_time_task": 300, + "build_bq_generate_avro_job": 600, "cleanup_metadata": 60, "create_sandbox": 1020, "current_state": 1200, @@ -371,7 +373,8 @@ "build_delete_data_task": 180, "build_export_task": 300, "build_gcs_to_bq_task": 300, - "build_time_task": 360 + "build_time_task": 360, + "build_bq_generate_avro_job": 600 }, "txmeta_datastore_path": "sdf-ledger-close-meta/ledgers", "use_captive_core": "False", diff --git a/dags/generate_avro_files_dag.py b/dags/generate_avro_files_dag.py index a6aa95cf..b8914112 100644 --- a/dags/generate_avro_files_dag.py +++ b/dags/generate_avro_files_dag.py @@ -1,11 +1,12 @@ from datetime import datetime from airflow import DAG -from kubernetes.client import models as k8s +from stellar_etl_airflow import macros +from stellar_etl_airflow.build_cross_dependency_task import build_cross_deps from stellar_etl_airflow.build_bq_generate_avro_job_task import ( build_bq_generate_avro_job, ) -from stellar_etl_airflow.build_cross_dependency_task import build_cross_deps +from airflow.operators.dummy import DummyOperator from stellar_etl_airflow.default import ( alert_sla_miss, get_default_dag_args, @@ -15,16 +16,18 @@ init_sentry() dag = DAG( - "generate_avro_files", + "generate_avro", default_args=get_default_dag_args(), - start_date=datetime(2024, 10, 1, 0, 0), + start_date=datetime(2024, 10, 1, 1, 0), + catchup=True, description="This DAG generates AVRO files from BQ tables", - schedule_interval="0 * * * *", # Runs every hour - user_defined_filters={ - "container_resources": lambda s: k8s.V1ResourceRequirements(requests=s), + schedule_interval="0 * * * *", + render_template_as_native_obj=True, + user_defined_macros={ + "subtract_data_interval": macros.subtract_data_interval, + "batch_run_date_as_datetime_string": macros.batch_run_date_as_datetime_string, + "batch_run_date_as_directory_string": macros.batch_run_date_as_directory_string, }, - max_active_runs=5, - catchup=True, sla_miss_callback=alert_sla_miss, ) @@ -32,12 +35,15 @@ public_dataset = "{{ var.value.public_dataset }}" gcs_bucket = "{{ var.value.avro_gcs_bucket }}" + # Wait on ingestion DAGs wait_on_history_table = build_cross_deps( dag, "wait_on_ledgers_txs", "history_table_export" ) wait_on_state_table = build_cross_deps(dag, "wait_on_state_table", "state_table_export") +dummy_task = DummyOperator(task_id='dummy_task', dag=dag) + # Generate AVRO files avro_tables = [ "accounts", @@ -55,7 +61,7 @@ ] for table in avro_tables: - task = build_bq_generate_avro_job( + avro_task = build_bq_generate_avro_job( dag=dag, project=public_project, dataset=public_dataset, @@ -63,5 +69,7 @@ gcs_bucket=gcs_bucket, ) - wait_on_history_table >> task - wait_on_state_table >> task + dummy_task >> avro_task + wait_on_history_table >> avro_task + wait_on_state_table >> avro_task + diff --git a/dags/queries/generate_avro/accounts.sql b/dags/queries/generate_avro/accounts.sql index ee25193c..5280a2bd 100644 --- a/dags/queries/generate_avro/accounts.sql +++ b/dags/queries/generate_avro/accounts.sql @@ -11,6 +11,8 @@ as ( sequence_ledger as account_sequence_last_modified_ledger from {project_id}.{dataset_id}.accounts where true + and batch_run_date >= '{batch_run_date}' + and batch_run_date < '{next_batch_run_date}' and closed_at >= '{batch_run_date}' and closed_at < '{next_batch_run_date}' order by closed_at asc diff --git a/dags/queries/generate_avro/history_effects.sql b/dags/queries/generate_avro/history_effects.sql index 54309e0b..b2d53c1c 100644 --- a/dags/queries/generate_avro/history_effects.sql +++ b/dags/queries/generate_avro/history_effects.sql @@ -12,6 +12,8 @@ as ( except(predicate) from {project_id}.{dataset_id}.history_effects where true + and batch_run_date >= '{batch_run_date}' + and batch_run_date < '{next_batch_run_date}' and closed_at >= '{batch_run_date}' and closed_at < '{next_batch_run_date}' order by closed_at asc diff --git a/dags/queries/generate_avro/history_operations.sql b/dags/queries/generate_avro/history_operations.sql index 1a22ccb4..bf2fd99d 100644 --- a/dags/queries/generate_avro/history_operations.sql +++ b/dags/queries/generate_avro/history_operations.sql @@ -10,9 +10,11 @@ as ( except(details, details_json, batch_id, batch_insert_ts, batch_run_date), details.* except(claimants, type), - details.type as details_type + details.type as soroban_operation_type from {project_id}.{dataset_id}.history_operations where true + and batch_run_date >= '{batch_run_date}' + and batch_run_date < '{next_batch_run_date}' and closed_at >= '{batch_run_date}' and closed_at < '{next_batch_run_date}' order by closed_at asc diff --git a/dags/queries/generate_avro/history_trades.sql b/dags/queries/generate_avro/history_trades.sql index 5ddb6c1f..151192f9 100644 --- a/dags/queries/generate_avro/history_trades.sql +++ b/dags/queries/generate_avro/history_trades.sql @@ -11,7 +11,7 @@ as ( ledger_closed_at as closed_at from {project_id}.{dataset_id}.history_trades where true - and closed_at >= '{batch_run_date}' - and closed_at < '{next_batch_run_date}' + and ledger_closed_at >= '{batch_run_date}' + and ledger_closed_at < '{next_batch_run_date}' order by closed_at asc ) diff --git a/dags/queries/generate_avro/history_transactions.sql b/dags/queries/generate_avro/history_transactions.sql index 6c134c88..84e0d096 100644 --- a/dags/queries/generate_avro/history_transactions.sql +++ b/dags/queries/generate_avro/history_transactions.sql @@ -10,6 +10,8 @@ as ( except(batch_id, batch_insert_ts, batch_run_date) from {project_id}.{dataset_id}.history_transactions where true + and batch_run_date >= '{batch_run_date}' + and batch_run_date < '{next_batch_run_date}' and closed_at >= '{batch_run_date}' and closed_at < '{next_batch_run_date}' order by closed_at asc diff --git a/dags/queries/generate_avro/liquidity_pools.sql b/dags/queries/generate_avro/liquidity_pools.sql index ac92c690..cb74e81f 100644 --- a/dags/queries/generate_avro/liquidity_pools.sql +++ b/dags/queries/generate_avro/liquidity_pools.sql @@ -10,6 +10,8 @@ as ( except(batch_id, batch_insert_ts, batch_run_date) from {project_id}.{dataset_id}.liquidity_pools where true + and batch_run_date >= '{batch_run_date}' + and batch_run_date < '{next_batch_run_date}' and closed_at >= '{batch_run_date}' and closed_at < '{next_batch_run_date}' order by closed_at asc diff --git a/dags/queries/generate_avro/offers.sql b/dags/queries/generate_avro/offers.sql index bb3077d8..d6c6f4a2 100644 --- a/dags/queries/generate_avro/offers.sql +++ b/dags/queries/generate_avro/offers.sql @@ -10,6 +10,8 @@ as ( except(batch_id, batch_insert_ts, batch_run_date) from {project_id}.{dataset_id}.offers where true + and batch_run_date >= '{batch_run_date}' + and batch_run_date < '{next_batch_run_date}' and closed_at >= '{batch_run_date}' and closed_at < '{next_batch_run_date}' order by closed_at asc diff --git a/dags/queries/generate_avro/trust_lines.sql b/dags/queries/generate_avro/trust_lines.sql index 32917d22..47c1a076 100644 --- a/dags/queries/generate_avro/trust_lines.sql +++ b/dags/queries/generate_avro/trust_lines.sql @@ -10,6 +10,8 @@ as ( except(batch_id, batch_insert_ts, batch_run_date) from {project_id}.{dataset_id}.trust_lines where true + and batch_run_date >= '{batch_run_date}' + and batch_run_date < '{next_batch_run_date}' and closed_at >= '{batch_run_date}' and closed_at < '{next_batch_run_date}' order by closed_at asc diff --git a/dags/stellar_etl_airflow/build_bq_generate_avro_job_task.py b/dags/stellar_etl_airflow/build_bq_generate_avro_job_task.py index 007dd388..ebf81901 100644 --- a/dags/stellar_etl_airflow/build_bq_generate_avro_job_task.py +++ b/dags/stellar_etl_airflow/build_bq_generate_avro_job_task.py @@ -3,16 +3,14 @@ from airflow.models import Variable from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator +from stellar_etl_airflow.build_bq_insert_job_task import ( + get_query_filepath, + file_to_string, +) from stellar_etl_airflow import macros -from stellar_etl_airflow.build_bq_insert_job_task import file_to_string from stellar_etl_airflow.default import alert_after_max_retries -def get_query_filepath(query_name): - root = os.path.dirname(os.path.dirname(__file__)) - return os.path.join(root, f"queries/generate_avro/{query_name}.sql") - - def build_bq_generate_avro_job( dag, project, @@ -20,12 +18,9 @@ def build_bq_generate_avro_job( table, gcs_bucket, ): - query_path = get_query_filepath(table) + query_path = get_query_filepath(f"generate_avro/{table}") query = file_to_string(query_path) batch_run_date = "{{ batch_run_date_as_datetime_string(dag, data_interval_start) }}" - prev_batch_run_date = ( - "{{ batch_run_date_as_datetime_string(dag, prev_data_interval_start_success) }}" - ) next_batch_run_date = ( "{{ batch_run_date_as_datetime_string(dag, data_interval_end) }}" ) @@ -37,7 +32,6 @@ def build_bq_generate_avro_job( "project_id": project, "dataset_id": dataset, "batch_run_date": batch_run_date, - "prev_batch_run_date": prev_batch_run_date, "next_batch_run_date": next_batch_run_date, "uri": uri, } diff --git a/dags/stellar_etl_airflow/macros.py b/dags/stellar_etl_airflow/macros.py index 55501d76..79ded807 100644 --- a/dags/stellar_etl_airflow/macros.py +++ b/dags/stellar_etl_airflow/macros.py @@ -10,6 +10,7 @@ def batch_run_date_as_datetime_string(dag, start_time): def get_batch_id(): return "{}-{}".format("{{ run_id }}", "{{ params.alias }}") + def batch_run_date_as_directory_string(dag, start_time): time = subtract_data_interval(dag, start_time) return f"{time.year}/{time.month}/{time.day}/{time.hour}:{time.minute}:{time.second}"