Skip to content

Commit

Permalink
update quereies and vars
Browse files Browse the repository at this point in the history
  • Loading branch information
chowbao committed Oct 8, 2024
1 parent 25c33b3 commit e40ddd5
Show file tree
Hide file tree
Showing 12 changed files with 47 additions and 27 deletions.
5 changes: 4 additions & 1 deletion airflow_variables_prod.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
{
"api_key_path": "/home/airflow/gcs/data/apiKey.json",
"avro_gcs_bucket": "dune_bucket_sdf",
"bq_dataset": "crypto_stellar_internal_2",
"bq_dataset_audit_log": "audit_log",
"bq_project": "hubble-261722",
Expand Down Expand Up @@ -337,6 +338,7 @@
"build_export_task": 600,
"build_gcs_to_bq_task": 660,
"build_time_task": 300,
"build_bq_generate_avro_job": 600,
"cleanup_metadata": 60,
"create_sandbox": 1020,
"current_state": 1200,
Expand Down Expand Up @@ -371,7 +373,8 @@
"build_delete_data_task": 180,
"build_export_task": 300,
"build_gcs_to_bq_task": 300,
"build_time_task": 360
"build_time_task": 360,
"build_bq_generate_avro_job": 600
},
"txmeta_datastore_path": "sdf-ledger-close-meta/ledgers",
"use_captive_core": "False",
Expand Down
32 changes: 20 additions & 12 deletions dags/generate_avro_files_dag.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from datetime import datetime

from airflow import DAG
from kubernetes.client import models as k8s
from stellar_etl_airflow import macros
from stellar_etl_airflow.build_cross_dependency_task import build_cross_deps
from stellar_etl_airflow.build_bq_generate_avro_job_task import (
build_bq_generate_avro_job,
)
from stellar_etl_airflow.build_cross_dependency_task import build_cross_deps
from airflow.operators.dummy import DummyOperator
from stellar_etl_airflow.default import (
alert_sla_miss,
get_default_dag_args,
Expand All @@ -15,29 +16,34 @@
init_sentry()

dag = DAG(
"generate_avro_files",
"generate_avro",
default_args=get_default_dag_args(),
start_date=datetime(2024, 10, 1, 0, 0),
start_date=datetime(2024, 10, 1, 1, 0),
catchup=True,
description="This DAG generates AVRO files from BQ tables",
schedule_interval="0 * * * *", # Runs every hour
user_defined_filters={
"container_resources": lambda s: k8s.V1ResourceRequirements(requests=s),
schedule_interval="0 * * * *",
render_template_as_native_obj=True,
user_defined_macros={
"subtract_data_interval": macros.subtract_data_interval,
"batch_run_date_as_datetime_string": macros.batch_run_date_as_datetime_string,
"batch_run_date_as_directory_string": macros.batch_run_date_as_directory_string,
},
max_active_runs=5,
catchup=True,
sla_miss_callback=alert_sla_miss,
)

public_project = "{{ var.value.public_project }}"
public_dataset = "{{ var.value.public_dataset }}"
gcs_bucket = "{{ var.value.avro_gcs_bucket }}"


# Wait on ingestion DAGs
wait_on_history_table = build_cross_deps(
dag, "wait_on_ledgers_txs", "history_table_export"
)
wait_on_state_table = build_cross_deps(dag, "wait_on_state_table", "state_table_export")

dummy_task = DummyOperator(task_id='dummy_task', dag=dag)

# Generate AVRO files
avro_tables = [
"accounts",
Expand All @@ -55,13 +61,15 @@
]

for table in avro_tables:
task = build_bq_generate_avro_job(
avro_task = build_bq_generate_avro_job(
dag=dag,
project=public_project,
dataset=public_dataset,
table=table,
gcs_bucket=gcs_bucket,
)

wait_on_history_table >> task
wait_on_state_table >> task
dummy_task >> avro_task
wait_on_history_table >> avro_task
wait_on_state_table >> avro_task

2 changes: 2 additions & 0 deletions dags/queries/generate_avro/accounts.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ as (
sequence_ledger as account_sequence_last_modified_ledger
from {project_id}.{dataset_id}.accounts
where true
and batch_run_date >= '{batch_run_date}'
and batch_run_date < '{next_batch_run_date}'
and closed_at >= '{batch_run_date}'
and closed_at < '{next_batch_run_date}'
order by closed_at asc
Expand Down
2 changes: 2 additions & 0 deletions dags/queries/generate_avro/history_effects.sql
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ as (
except(predicate)
from {project_id}.{dataset_id}.history_effects
where true
and batch_run_date >= '{batch_run_date}'
and batch_run_date < '{next_batch_run_date}'
and closed_at >= '{batch_run_date}'
and closed_at < '{next_batch_run_date}'
order by closed_at asc
Expand Down
4 changes: 3 additions & 1 deletion dags/queries/generate_avro/history_operations.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ as (
except(details, details_json, batch_id, batch_insert_ts, batch_run_date),
details.*
except(claimants, type),
details.type as details_type
details.type as soroban_operation_type
from {project_id}.{dataset_id}.history_operations
where true
and batch_run_date >= '{batch_run_date}'
and batch_run_date < '{next_batch_run_date}'
and closed_at >= '{batch_run_date}'
and closed_at < '{next_batch_run_date}'
order by closed_at asc
Expand Down
4 changes: 2 additions & 2 deletions dags/queries/generate_avro/history_trades.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ as (
ledger_closed_at as closed_at
from {project_id}.{dataset_id}.history_trades
where true
and closed_at >= '{batch_run_date}'
and closed_at < '{next_batch_run_date}'
and ledger_closed_at >= '{batch_run_date}'
and ledger_closed_at < '{next_batch_run_date}'
order by closed_at asc
)
2 changes: 2 additions & 0 deletions dags/queries/generate_avro/history_transactions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ as (
except(batch_id, batch_insert_ts, batch_run_date)
from {project_id}.{dataset_id}.history_transactions
where true
and batch_run_date >= '{batch_run_date}'
and batch_run_date < '{next_batch_run_date}'
and closed_at >= '{batch_run_date}'
and closed_at < '{next_batch_run_date}'
order by closed_at asc
Expand Down
2 changes: 2 additions & 0 deletions dags/queries/generate_avro/liquidity_pools.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ as (
except(batch_id, batch_insert_ts, batch_run_date)
from {project_id}.{dataset_id}.liquidity_pools
where true
and batch_run_date >= '{batch_run_date}'
and batch_run_date < '{next_batch_run_date}'
and closed_at >= '{batch_run_date}'
and closed_at < '{next_batch_run_date}'
order by closed_at asc
Expand Down
2 changes: 2 additions & 0 deletions dags/queries/generate_avro/offers.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ as (
except(batch_id, batch_insert_ts, batch_run_date)
from {project_id}.{dataset_id}.offers
where true
and batch_run_date >= '{batch_run_date}'
and batch_run_date < '{next_batch_run_date}'
and closed_at >= '{batch_run_date}'
and closed_at < '{next_batch_run_date}'
order by closed_at asc
Expand Down
2 changes: 2 additions & 0 deletions dags/queries/generate_avro/trust_lines.sql
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ as (
except(batch_id, batch_insert_ts, batch_run_date)
from {project_id}.{dataset_id}.trust_lines
where true
and batch_run_date >= '{batch_run_date}'
and batch_run_date < '{next_batch_run_date}'
and closed_at >= '{batch_run_date}'
and closed_at < '{next_batch_run_date}'
order by closed_at asc
Expand Down
16 changes: 5 additions & 11 deletions dags/stellar_etl_airflow/build_bq_generate_avro_job_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,24 @@

from airflow.models import Variable
from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator
from stellar_etl_airflow.build_bq_insert_job_task import (
get_query_filepath,
file_to_string,
)
from stellar_etl_airflow import macros
from stellar_etl_airflow.build_bq_insert_job_task import file_to_string
from stellar_etl_airflow.default import alert_after_max_retries


def get_query_filepath(query_name):
root = os.path.dirname(os.path.dirname(__file__))
return os.path.join(root, f"queries/generate_avro/{query_name}.sql")


def build_bq_generate_avro_job(
dag,
project,
dataset,
table,
gcs_bucket,
):
query_path = get_query_filepath(table)
query_path = get_query_filepath(f"generate_avro/{table}")
query = file_to_string(query_path)
batch_run_date = "{{ batch_run_date_as_datetime_string(dag, data_interval_start) }}"
prev_batch_run_date = (
"{{ batch_run_date_as_datetime_string(dag, prev_data_interval_start_success) }}"
)
next_batch_run_date = (
"{{ batch_run_date_as_datetime_string(dag, data_interval_end) }}"
)
Expand All @@ -37,7 +32,6 @@ def build_bq_generate_avro_job(
"project_id": project,
"dataset_id": dataset,
"batch_run_date": batch_run_date,
"prev_batch_run_date": prev_batch_run_date,
"next_batch_run_date": next_batch_run_date,
"uri": uri,
}
Expand Down
1 change: 1 addition & 0 deletions dags/stellar_etl_airflow/macros.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ def batch_run_date_as_datetime_string(dag, start_time):
def get_batch_id():
return "{}-{}".format("{{ run_id }}", "{{ params.alias }}")


def batch_run_date_as_directory_string(dag, start_time):
time = subtract_data_interval(dag, start_time)
return f"{time.year}/{time.month}/{time.day}/{time.hour}:{time.minute}:{time.second}"

0 comments on commit e40ddd5

Please sign in to comment.