diff --git a/airflow_variables_dev.json b/airflow_variables_dev.json index 4e0bad39..f138e4e8 100644 --- a/airflow_variables_dev.json +++ b/airflow_variables_dev.json @@ -55,7 +55,7 @@ "dbt_image_name": "stellar/stellar-dbt:6ee7342", "dbt_job_execution_timeout_seconds": 300, "dbt_job_retries": 1, - "dbt_mart_dataset": "test_sdf_marts", + "dbt_mart_dataset": "test_crypto_stellar_dbt", "dbt_maximum_bytes_billed": 250000000000, "dbt_project": "test-hubble-319619", "dbt_target": "test", @@ -235,8 +235,7 @@ "contract_data": "contract_data", "contract_code": "contract_code", "config_settings": "config_settings", - "ttl": "ttl", - "diagnostic_events": "diagnostic_events" + "ttl": "ttl" }, "task_timeout": { "build_batch_stats": 180, @@ -251,9 +250,13 @@ "dbt_tables": { "signers_current": "account_signers_current", "accounts_current": "accounts_current", + "config_settings_current": "config_settings_current", + "contract_code_current": "contract_code_current", + "contract_data_current": "contract_data_current", "liquidity_pools_current": "liquidity_pools_current", "offers_current": "offers_current", - "trustlines_current": "trust_lines_current" + "trustlines_current": "trust_lines_current", + "ttl_current": "ttl_current" }, "use_testnet": "True", "sandbox_dataset": "crypto_stellar_internal_sandbox", diff --git a/airflow_variables_prod.json b/airflow_variables_prod.json index 6978487d..ce758ef0 100644 --- a/airflow_variables_prod.json +++ b/airflow_variables_prod.json @@ -60,7 +60,7 @@ "dbt_image_name": "stellar/stellar-dbt:6ee7342", "dbt_job_execution_timeout_seconds": 1800, "dbt_job_retries": 1, - "dbt_mart_dataset": "sdf_marts", + "dbt_mart_dataset": "crypto_stellar_dbt", "dbt_maximum_bytes_billed": 100000000000000, "dbt_project": "hubble-261722", "dbt_target": "prod", @@ -230,8 +230,7 @@ "contract_data": "contract_data", "contract_code": "contract_code", "config_settings": "config_settings", - "ttl": "ttl", - "diagnostic_events": "diagnostic_events" + "ttl": "ttl" }, "task_timeout": { "build_batch_stats": 180, @@ -246,9 +245,13 @@ "dbt_tables": { "signers_current": "account_signers_current", "accounts_current": "accounts_current", + "config_settings_current": "config_settings_current", + "contract_code_current": "contract_code_current", + "contract_data_current": "contract_data_current", "liquidity_pools_current": "liquidity_pools_current", "offers_current": "offers_current", - "trustlines_current": "trust_lines_current" + "trustlines_current": "trust_lines_current", + "ttl_current": "ttl_current" }, "use_testnet": "False", "sandbox_dataset": "crypto_stellar_internal_sandbox", diff --git a/dags/history_tables_dag.py b/dags/history_tables_dag.py index 28e32ebd..97cf00e6 100644 --- a/dags/history_tables_dag.py +++ b/dags/history_tables_dag.py @@ -68,7 +68,7 @@ write_trade_stats = build_batch_stats(dag, table_names["trades"]) write_effects_stats = build_batch_stats(dag, table_names["effects"]) write_tx_stats = build_batch_stats(dag, table_names["transactions"]) -write_diagnostic_events_stats = build_batch_stats(dag, table_names["diagnostic_events"]) +write_diagnostic_events_stats = build_batch_stats(dag, "diagnostic_events") write_ledger_stats = build_batch_stats(dag, table_names["ledgers"]) write_asset_stats = build_batch_stats(dag, table_names["assets"]) diff --git a/dags/queries/create_table.sql b/dags/queries/create_table.sql index aeb28291..3f5a8569 100644 --- a/dags/queries/create_table.sql +++ b/dags/queries/create_table.sql @@ -3,5 +3,7 @@ partition by date_trunc(batch_run_date, day) options (partition_expiration_days = 180) as ( select * from `{project_id}.{dataset_id}.{table_id}` - where batch_run_date >= date_sub(current_date(), interval 6 month) + where + batch_run_date >= date_trunc(date_sub(current_date(), interval 6 month), day) + and batch_run_date < date_trunc(current_date(), day) ) diff --git a/dags/queries/update_table.sql b/dags/queries/update_table.sql index bfc3d297..9b1d6337 100644 --- a/dags/queries/update_table.sql +++ b/dags/queries/update_table.sql @@ -1,4 +1,4 @@ insert into {project_id}.{target_dataset}.{table_id} select * from {project_id}.{dataset_id}.{table_id} -where date_trunc(batch_run_date, day) = date_trunc(current_date() - interval 1 day, day) +where date_trunc(batch_run_date, day) = date_trunc(cast('{batch_run_date}' as datetime), day) diff --git a/dags/sandbox_create_dag.py b/dags/sandbox_create_dag.py index 7d5bdd5f..0f52aab3 100644 --- a/dags/sandbox_create_dag.py +++ b/dags/sandbox_create_dag.py @@ -1,7 +1,6 @@ """ This DAG creates the sandbox dataset with transactions tables, state tables with history and views. """ -from datetime import datetime from json import loads from airflow import DAG @@ -23,7 +22,6 @@ with DAG( "sandbox_create_dag", default_args=get_default_dag_args(), - start_date=datetime(2023, 1, 1), description="This DAG creates a sandbox", schedule_interval="@once", params={"alias": "sandbox_dataset"}, @@ -32,8 +30,8 @@ }, catchup=False, ) as dag: - PROJECT = Variable.get("bq_project") - DATASET = Variable.get("bq_dataset") + PROJECT = Variable.get("public_project") + DATASET = Variable.get("public_dataset") SANDBOX_DATASET = Variable.get("sandbox_dataset") DBT_DATASET = Variable.get("dbt_mart_dataset") TABLES_ID = Variable.get("table_ids", deserialize_json=True) diff --git a/dags/sandbox_update_dag.py b/dags/sandbox_update_dag.py index 96d29276..1b5b8807 100644 --- a/dags/sandbox_update_dag.py +++ b/dags/sandbox_update_dag.py @@ -8,6 +8,7 @@ from airflow.models.variable import Variable from airflow.operators.empty import EmptyOperator from airflow.providers.google.cloud.operators.bigquery import BigQueryInsertJobOperator +from stellar_etl_airflow import macros from stellar_etl_airflow.build_bq_insert_job_task import ( file_to_string, get_query_filepath, @@ -24,21 +25,34 @@ with DAG( "sandbox_update_dag", default_args=get_default_dag_args(), - start_date=datetime(2023, 1, 1), + start_date=datetime(2024, 4, 30), description="This DAG updates a sandbox", schedule_interval="0 1 * * *", params={"alias": "sandbox_dataset"}, user_defined_filters={"fromjson": lambda s: loads(s)}, - catchup=False, + render_template_as_native_obj=True, + catchup=True, + user_defined_macros={ + "subtract_data_interval": macros.subtract_data_interval, + "batch_run_date_as_datetime_string": macros.batch_run_date_as_datetime_string, + }, ) as dag: TABLES_ID = Variable.get("table_ids", deserialize_json=True) - PROJECT = Variable.get("bq_project") - BQ_DATASET = Variable.get("bq_dataset") + PROJECT = Variable.get("public_project") + BQ_DATASET = Variable.get("public_dataset") SANDBOX_DATASET = Variable.get("sandbox_dataset") + batch_run_date = "{{ batch_run_date_as_datetime_string(dag, data_interval_start) }}" + start_tables_task = EmptyOperator(task_id="start_tables_task") - wait_on_dag = build_cross_deps(dag, "wait_on_base_tables", "history_table_export") + wait_on_history_dag = build_cross_deps( + dag, "wait_on_base_tables", "history_table_export" + ) + + wait_on_state_dag = build_cross_deps( + dag, "wait_on_state_tables", "state_table_export" + ) for table_id in TABLES_ID: if table_id == "diagnostic_events": @@ -50,6 +64,7 @@ "dataset_id": BQ_DATASET, "table_id": TABLES_ID[table_id], "target_dataset": SANDBOX_DATASET, + "batch_run_date": batch_run_date, } query = query.format(**sql_params) tables_update_task = BigQueryInsertJobOperator( @@ -63,4 +78,8 @@ on_failure_callback=alert_after_max_retries, ) - start_tables_task >> wait_on_dag >> tables_update_task + ( + start_tables_task + >> [wait_on_history_dag, wait_on_state_dag] + >> tables_update_task + )