Skip to content

Commit

Permalink
Generate avro files from bq tables
Browse files Browse the repository at this point in the history
  • Loading branch information
chowbao committed Oct 7, 2024
1 parent d40950b commit 540ef57
Show file tree
Hide file tree
Showing 14 changed files with 337 additions and 0 deletions.
63 changes: 63 additions & 0 deletions dags/generate_avro_files_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from datetime import datetime

from airflow import DAG
from kubernetes.client import models as k8s
from stellar_etl_airflow.build_cross_dependency_task import build_cross_deps
from stellar_etl_airflow.build_bq_generate_avro_job_task import build_bq_generate_avro_job
from stellar_etl_airflow.default import (
alert_sla_miss,
get_default_dag_args,
init_sentry,
)

init_sentry()

dag = DAG(
"generate_avro_files",
default_args=get_default_dag_args(),
start_date=datetime(2024, 10, 1, 0, 0),
description="This DAG generates AVRO files from BQ tables",
schedule_interval="0 * * * *", # Runs every hour
user_defined_filters={
"container_resources": lambda s: k8s.V1ResourceRequirements(requests=s),
},
max_active_runs=5,
catchup=True,
tags=["dbt-enriched-base-tables"],
sla_miss_callback=alert_sla_miss,
)

# Wait on ingestion DAGs
wait_on_history_table = build_cross_deps(
dag, "wait_on_ledgers_txs", "history_table_export"
)
wait_on_state_table = build_cross_deps(dag, "wait_on_state_table", "state_table_export")

# Generate AVRO files
avro_tables = [
"accounts",
"contract_data",
"history_contract_events",
"history_ledgers",
"history_trades",
"history_transactions",
"liquidity_pools",
"offers",
"trust_lines",
"ttl",
#"history_effects",
#"history_operations",
]

for table in avro_tables:
task = build_bq_generate_avro_job(
dag=dag,
project=public_project,
dataset=public_dataset,
table=table,
gcs_bucket=gcs_bucket,
)

wait_on_history_table >> task
wait_on_state_table >> task

17 changes: 17 additions & 0 deletions dags/queries/generate_avro/accounts.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
export data
options (
uri = '{uri}',
format = 'avro',
overwrite=true
)
as (
select
*
except(sequence_ledger, batch_id, batch_insert_ts, batch_run_date),
sequence_ledger as account_sequence_last_modified_ledger
from {project_id}.{dataset_id}.accounts
where true
and closed_at >= '{batch_run_date}'
and closed_at < '{next_batch_run_date}'
order by closed_at asc
)
17 changes: 17 additions & 0 deletions dags/queries/generate_avro/contract_data.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
export data
options (
uri = '{uri}',
format = 'avro',
overwrite=true
)
as (
select
*
except(batch_id, batch_insert_ts, batch_run_date)
from {project_id}.{dataset_id}.contract_data
where true
and closed_at >= '{batch_run_date}'
and closed_at < '{next_batch_run_date}'
order by closed_at asc
)

17 changes: 17 additions & 0 deletions dags/queries/generate_avro/history_contract_events.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
export data
options (
uri = '{uri}',
format = 'avro',
overwrite=true
)
as (
select
*
except(batch_id, batch_insert_ts, batch_run_date)
from {project_id}.{dataset_id}.history_contract_events
where true
and closed_at >= '{batch_run_date}'
and closed_at < '{next_batch_run_date}'
order by closed_at asc
)

19 changes: 19 additions & 0 deletions dags/queries/generate_avro/history_effects.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
export data
options (
uri = '{uri}',
format = 'avro',
overwrite=true
)
as (
select
*
except(details, batch_id, batch_insert_ts, batch_run_date),
details.*
except(predicate)
from {project_id}.{dataset_id}.history_effects
where true
and closed_at >= '{batch_run_date}'
and closed_at < '{next_batch_run_date}'
order by closed_at asc
)

17 changes: 17 additions & 0 deletions dags/queries/generate_avro/history_ledgers.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
export data
options (
uri = '{uri}',
format = 'avro',
overwrite=true
)
as (
select
*
except(batch_id, batch_insert_ts, batch_run_date)
from {project_id}.{dataset_id}.history_ledgers
where true
and closed_at >= '{batch_run_date}'
and closed_at < '{next_batch_run_date}'
order by closed_at asc
)

20 changes: 20 additions & 0 deletions dags/queries/generate_avro/history_operations.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
export data
options (
uri = '{uri}',
format = 'avro',
overwrite=true
)
as (
select
*
except(details, details_json, batch_id, batch_insert_ts, batch_run_date),
details.*
except(claimants, type),
details.type as details_type
from {project_id}.{dataset_id}.history_operations
where true
and closed_at >= '{batch_run_date}'
and closed_at < '{next_batch_run_date}'
order by closed_at asc
)

18 changes: 18 additions & 0 deletions dags/queries/generate_avro/history_trades.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
export data
options (
uri = '{uri}',
format = 'avro',
overwrite=true
)
as (
select
*
except(ledger_closed_at, batch_id, batch_insert_ts, batch_run_date),
ledger_closed_at as closed_at
from {project_id}.{dataset_id}.history_trades
where true
and closed_at >= '{batch_run_date}'
and closed_at < '{next_batch_run_date}'
order by closed_at asc
)

17 changes: 17 additions & 0 deletions dags/queries/generate_avro/history_transactions.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
export data
options (
uri = '{uri}',
format = 'avro',
overwrite=true
)
as (
select
*
except(batch_id, batch_insert_ts, batch_run_date)
from {project_id}.{dataset_id}.history_transactions
where true
and closed_at >= '{batch_run_date}'
and closed_at < '{next_batch_run_date}'
order by closed_at asc
)

17 changes: 17 additions & 0 deletions dags/queries/generate_avro/liquidity_pools.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
export data
options (
uri = '{uri}',
format = 'avro',
overwrite=true
)
as (
select
*
except(batch_id, batch_insert_ts, batch_run_date)
from {project_id}.{dataset_id}.liquidity_pools
where true
and closed_at >= '{batch_run_date}'
and closed_at < '{next_batch_run_date}'
order by closed_at asc
)

17 changes: 17 additions & 0 deletions dags/queries/generate_avro/offers.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
export data
options (
uri = '{uri}',
format = 'avro',
overwrite=true
)
as (
select
*
except(batch_id, batch_insert_ts, batch_run_date)
from {project_id}.{dataset_id}.offers
where true
and closed_at >= '{batch_run_date}'
and closed_at < '{next_batch_run_date}'
order by closed_at asc
)

17 changes: 17 additions & 0 deletions dags/queries/generate_avro/trust_lines.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
export data
options (
uri = '{uri}',
format = 'avro',
overwrite=true
)
as (
select
*
except(batch_id, batch_insert_ts, batch_run_date)
from {project_id}.{dataset_id}.trust_lines
where true
and closed_at >= '{batch_run_date}'
and closed_at < '{next_batch_run_date}'
order by closed_at asc
)

17 changes: 17 additions & 0 deletions dags/queries/generate_avro/ttl.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
export data
options (
uri = '{uri}',
format = 'avro',
overwrite=true
)
as (
select
*
except(batch_id, batch_insert_ts, batch_run_date)
from {project_id}.{dataset_id}.ttl
where true
and closed_at >= '{batch_run_date}'
and closed_at < '{next_batch_run_date}'
order by closed_at asc
)

64 changes: 64 additions & 0 deletions dags/stellar_etl_airflow/build_bq_generate_avro_job_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import os
from datetime import timedelta

from airflow.models import Variable
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from stellar_etl_airflow import macros
from stellar_etl_airflow.build_bq_insert_job_task import file_to_string
from stellar_etl_airflow.default import alert_after_max_retries


def get_query_filepath(query_name):
root = os.path.dirname(os.path.dirname(__file__))
return os.path.join(root, f"queries/generate_avro/{query_name}.sql")


def build_bq_generate_avro_job(
dag,
project,
dataset,
table,
gcs_bucket,
):
query_path = get_query_filepath(table)
query = file_to_string(query_path)
batch_run_date = "{{ batch_run_date_as_datetime_string(dag, data_interval_start) }}"
prev_batch_run_date = (
"{{ batch_run_date_as_datetime_string(dag, prev_data_interval_start_success) }}"
)
next_batch_run_date = (
"{{ batch_run_date_as_datetime_string(dag, data_interval_end) }}"
)
batch_run_time = f"{batch_run_date.hour}:{batch_run_date.minute}:{batch_run_date.second}":
uri = f"gs://{gcs_bucket}/avro/{table}/{batch_run_date.year}/{batch_run_date.month}/{batch_run_date.day}/{batch_run_time}/*.avro"
sql_params = {
"project_id": project,
"dataset_id": dataset,
"prev_batch_run_date": prev_batch_run_date,
"next_batch_run_date": next_batch_run_date,
"uri": uri,
}
query = query.format(**sql_params)
configuration = {
"query": {
"query": query,
"useLegacySql": False,
}
}

return BigQueryInsertJobOperator(
task_id=f"generate_avro_{table}",
execution_timeout=timedelta(
seconds=Variable.get("task_timeout", deserialize_json=True)[
build_bq_insert_job.__name__
]
),
on_failure_callback=alert_after_max_retries,
configuration=configuration,
sla=timedelta(
seconds=Variable.get("task_sla", deserialize_json=True)[
build_bq_insert_job.__name__
]
),
)

0 comments on commit 540ef57

Please sign in to comment.