diff --git a/dags/generate_avro_files_dag.py b/dags/generate_avro_files_dag.py new file mode 100644 index 00000000..5bf83e2b --- /dev/null +++ b/dags/generate_avro_files_dag.py @@ -0,0 +1,63 @@ +from datetime import datetime + +from airflow import DAG +from kubernetes.client import models as k8s +from stellar_etl_airflow.build_cross_dependency_task import build_cross_deps +from stellar_etl_airflow.build_bq_generate_avro_job_task import build_bq_generate_avro_job +from stellar_etl_airflow.default import ( + alert_sla_miss, + get_default_dag_args, + init_sentry, +) + +init_sentry() + +dag = DAG( + "generate_avro_files", + default_args=get_default_dag_args(), + start_date=datetime(2024, 10, 1, 0, 0), + description="This DAG generates AVRO files from BQ tables", + schedule_interval="0 * * * *", # Runs every hour + user_defined_filters={ + "container_resources": lambda s: k8s.V1ResourceRequirements(requests=s), + }, + max_active_runs=5, + catchup=True, + tags=["dbt-enriched-base-tables"], + sla_miss_callback=alert_sla_miss, +) + +# Wait on ingestion DAGs +wait_on_history_table = build_cross_deps( + dag, "wait_on_ledgers_txs", "history_table_export" +) +wait_on_state_table = build_cross_deps(dag, "wait_on_state_table", "state_table_export") + +# Generate AVRO files +avro_tables = [ + "accounts", + "contract_data", + "history_contract_events", + "history_ledgers", + "history_trades", + "history_transactions", + "liquidity_pools", + "offers", + "trust_lines", + "ttl", + #"history_effects", + #"history_operations", +] + +for table in avro_tables: + task = build_bq_generate_avro_job( + dag=dag, + project=public_project, + dataset=public_dataset, + table=table, + gcs_bucket=gcs_bucket, + ) + + wait_on_history_table >> task + wait_on_state_table >> task + diff --git a/dags/queries/generate_avro/accounts.sql b/dags/queries/generate_avro/accounts.sql new file mode 100644 index 00000000..ee25193c --- /dev/null +++ b/dags/queries/generate_avro/accounts.sql @@ -0,0 +1,17 @@ +export data + options ( + uri = '{uri}', + format = 'avro', + overwrite=true + ) +as ( + select + * + except(sequence_ledger, batch_id, batch_insert_ts, batch_run_date), + sequence_ledger as account_sequence_last_modified_ledger + from {project_id}.{dataset_id}.accounts + where true + and closed_at >= '{batch_run_date}' + and closed_at < '{next_batch_run_date}' + order by closed_at asc +) diff --git a/dags/queries/generate_avro/contract_data.sql b/dags/queries/generate_avro/contract_data.sql new file mode 100644 index 00000000..4a1eb01e --- /dev/null +++ b/dags/queries/generate_avro/contract_data.sql @@ -0,0 +1,17 @@ +export data + options ( + uri = '{uri}', + format = 'avro', + overwrite=true + ) +as ( + select + * + except(batch_id, batch_insert_ts, batch_run_date) + from {project_id}.{dataset_id}.contract_data + where true + and closed_at >= '{batch_run_date}' + and closed_at < '{next_batch_run_date}' + order by closed_at asc +) + diff --git a/dags/queries/generate_avro/history_contract_events.sql b/dags/queries/generate_avro/history_contract_events.sql new file mode 100644 index 00000000..68e9b3ba --- /dev/null +++ b/dags/queries/generate_avro/history_contract_events.sql @@ -0,0 +1,17 @@ +export data + options ( + uri = '{uri}', + format = 'avro', + overwrite=true + ) +as ( + select + * + except(batch_id, batch_insert_ts, batch_run_date) + from {project_id}.{dataset_id}.history_contract_events + where true + and closed_at >= '{batch_run_date}' + and closed_at < '{next_batch_run_date}' + order by closed_at asc +) + diff --git a/dags/queries/generate_avro/history_effects.sql b/dags/queries/generate_avro/history_effects.sql new file mode 100644 index 00000000..4d80ab45 --- /dev/null +++ b/dags/queries/generate_avro/history_effects.sql @@ -0,0 +1,19 @@ +export data + options ( + uri = '{uri}', + format = 'avro', + overwrite=true + ) +as ( + select + * + except(details, batch_id, batch_insert_ts, batch_run_date), + details.* + except(predicate) + from {project_id}.{dataset_id}.history_effects + where true + and closed_at >= '{batch_run_date}' + and closed_at < '{next_batch_run_date}' + order by closed_at asc +) + diff --git a/dags/queries/generate_avro/history_ledgers.sql b/dags/queries/generate_avro/history_ledgers.sql new file mode 100644 index 00000000..926e188a --- /dev/null +++ b/dags/queries/generate_avro/history_ledgers.sql @@ -0,0 +1,17 @@ +export data + options ( + uri = '{uri}', + format = 'avro', + overwrite=true + ) +as ( + select + * + except(batch_id, batch_insert_ts, batch_run_date) + from {project_id}.{dataset_id}.history_ledgers + where true + and closed_at >= '{batch_run_date}' + and closed_at < '{next_batch_run_date}' + order by closed_at asc +) + diff --git a/dags/queries/generate_avro/history_operations.sql b/dags/queries/generate_avro/history_operations.sql new file mode 100644 index 00000000..908c710c --- /dev/null +++ b/dags/queries/generate_avro/history_operations.sql @@ -0,0 +1,20 @@ +export data + options ( + uri = '{uri}', + format = 'avro', + overwrite=true + ) +as ( + select + * + except(details, details_json, batch_id, batch_insert_ts, batch_run_date), + details.* + except(claimants, type), + details.type as details_type + from {project_id}.{dataset_id}.history_operations + where true + and closed_at >= '{batch_run_date}' + and closed_at < '{next_batch_run_date}' + order by closed_at asc +) + diff --git a/dags/queries/generate_avro/history_trades.sql b/dags/queries/generate_avro/history_trades.sql new file mode 100644 index 00000000..52b84cc3 --- /dev/null +++ b/dags/queries/generate_avro/history_trades.sql @@ -0,0 +1,18 @@ +export data + options ( + uri = '{uri}', + format = 'avro', + overwrite=true + ) +as ( + select + * + except(ledger_closed_at, batch_id, batch_insert_ts, batch_run_date), + ledger_closed_at as closed_at + from {project_id}.{dataset_id}.history_trades + where true + and closed_at >= '{batch_run_date}' + and closed_at < '{next_batch_run_date}' + order by closed_at asc +) + diff --git a/dags/queries/generate_avro/history_transactions.sql b/dags/queries/generate_avro/history_transactions.sql new file mode 100644 index 00000000..9fe1be63 --- /dev/null +++ b/dags/queries/generate_avro/history_transactions.sql @@ -0,0 +1,17 @@ +export data + options ( + uri = '{uri}', + format = 'avro', + overwrite=true + ) +as ( + select + * + except(batch_id, batch_insert_ts, batch_run_date) + from {project_id}.{dataset_id}.history_transactions + where true + and closed_at >= '{batch_run_date}' + and closed_at < '{next_batch_run_date}' + order by closed_at asc +) + diff --git a/dags/queries/generate_avro/liquidity_pools.sql b/dags/queries/generate_avro/liquidity_pools.sql new file mode 100644 index 00000000..1bb4b622 --- /dev/null +++ b/dags/queries/generate_avro/liquidity_pools.sql @@ -0,0 +1,17 @@ +export data + options ( + uri = '{uri}', + format = 'avro', + overwrite=true + ) +as ( + select + * + except(batch_id, batch_insert_ts, batch_run_date) + from {project_id}.{dataset_id}.liquidity_pools + where true + and closed_at >= '{batch_run_date}' + and closed_at < '{next_batch_run_date}' + order by closed_at asc +) + diff --git a/dags/queries/generate_avro/offers.sql b/dags/queries/generate_avro/offers.sql new file mode 100644 index 00000000..e3eb8b74 --- /dev/null +++ b/dags/queries/generate_avro/offers.sql @@ -0,0 +1,17 @@ +export data + options ( + uri = '{uri}', + format = 'avro', + overwrite=true + ) +as ( + select + * + except(batch_id, batch_insert_ts, batch_run_date) + from {project_id}.{dataset_id}.offers + where true + and closed_at >= '{batch_run_date}' + and closed_at < '{next_batch_run_date}' + order by closed_at asc +) + diff --git a/dags/queries/generate_avro/trust_lines.sql b/dags/queries/generate_avro/trust_lines.sql new file mode 100644 index 00000000..6917e93f --- /dev/null +++ b/dags/queries/generate_avro/trust_lines.sql @@ -0,0 +1,17 @@ +export data + options ( + uri = '{uri}', + format = 'avro', + overwrite=true + ) +as ( + select + * + except(batch_id, batch_insert_ts, batch_run_date) + from {project_id}.{dataset_id}.trust_lines + where true + and closed_at >= '{batch_run_date}' + and closed_at < '{next_batch_run_date}' + order by closed_at asc +) + diff --git a/dags/queries/generate_avro/ttl.sql b/dags/queries/generate_avro/ttl.sql new file mode 100644 index 00000000..fd0ef89c --- /dev/null +++ b/dags/queries/generate_avro/ttl.sql @@ -0,0 +1,17 @@ +export data + options ( + uri = '{uri}', + format = 'avro', + overwrite=true + ) +as ( + select + * + except(batch_id, batch_insert_ts, batch_run_date) + from {project_id}.{dataset_id}.ttl + where true + and closed_at >= '{batch_run_date}' + and closed_at < '{next_batch_run_date}' + order by closed_at asc +) + diff --git a/dags/stellar_etl_airflow/build_bq_generate_avro_job_task.py b/dags/stellar_etl_airflow/build_bq_generate_avro_job_task.py new file mode 100644 index 00000000..1a10a6c6 --- /dev/null +++ b/dags/stellar_etl_airflow/build_bq_generate_avro_job_task.py @@ -0,0 +1,64 @@ +import os +from datetime import timedelta + +from airflow.models import Variable +from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator +from stellar_etl_airflow import macros +from stellar_etl_airflow.build_bq_insert_job_task import file_to_string +from stellar_etl_airflow.default import alert_after_max_retries + + +def get_query_filepath(query_name): + root = os.path.dirname(os.path.dirname(__file__)) + return os.path.join(root, f"queries/generate_avro/{query_name}.sql") + + +def build_bq_generate_avro_job( + dag, + project, + dataset, + table, + gcs_bucket, +): + query_path = get_query_filepath(table) + query = file_to_string(query_path) + batch_run_date = "{{ batch_run_date_as_datetime_string(dag, data_interval_start) }}" + prev_batch_run_date = ( + "{{ batch_run_date_as_datetime_string(dag, prev_data_interval_start_success) }}" + ) + next_batch_run_date = ( + "{{ batch_run_date_as_datetime_string(dag, data_interval_end) }}" + ) + batch_run_time = f"{batch_run_date.hour}:{batch_run_date.minute}:{batch_run_date.second}": + uri = f"gs://{gcs_bucket}/avro/{table}/{batch_run_date.year}/{batch_run_date.month}/{batch_run_date.day}/{batch_run_time}/*.avro" + sql_params = { + "project_id": project, + "dataset_id": dataset, + "prev_batch_run_date": prev_batch_run_date, + "next_batch_run_date": next_batch_run_date, + "uri": uri, + } + query = query.format(**sql_params) + configuration = { + "query": { + "query": query, + "useLegacySql": False, + } + } + + return BigQueryInsertJobOperator( + task_id=f"generate_avro_{table}", + execution_timeout=timedelta( + seconds=Variable.get("task_timeout", deserialize_json=True)[ + build_bq_insert_job.__name__ + ] + ), + on_failure_callback=alert_after_max_retries, + configuration=configuration, + sla=timedelta( + seconds=Variable.get("task_sla", deserialize_json=True)[ + build_bq_insert_job.__name__ + ] + ), + ) +