Skip to content

Commit

Permalink
Merge pull request #228 from stellar/master
Browse files Browse the repository at this point in the history
[PRODUCTION] Update production Airflow environment
  • Loading branch information
sydneynotthecity authored Oct 17, 2023
2 parents 8f37b2f + 8cd1008 commit 2e11bcf
Show file tree
Hide file tree
Showing 18 changed files with 795 additions and 32 deletions.
41 changes: 36 additions & 5 deletions airflow_variables.json
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,11 @@
"transaction_id",
"account",
"type"
]
],
"contract_data": ["last_modified_ledger"],
"contract_code": ["last_modified_ledger"],
"config_settings": ["last_modified_ledger"],
"expiration": ["last_modified_ledger"]
},
"dbt_auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"dbt_auth_uri": "https://accounts.google.com/o/oauth2/auth",
Expand All @@ -112,7 +116,7 @@
"partnership_assets__asset_activity_fact": false,
"trade_agg": false
},
"dbt_image_name": "stellar/stellar-dbt:0643adc",
"dbt_image_name": "stellar/stellar-dbt:011d897",
"dbt_job_execution_timeout_seconds": 6000,
"dbt_job_retries": 1,
"dbt_keyfile_profile": "",
Expand All @@ -126,7 +130,7 @@
"dbt_token_uri": "https://oauth2.googleapis.com/token",
"gcs_exported_data_bucket_name": "us-central1-hubble-2-d948d67b-bucket",
"gcs_exported_object_prefix": "dag-exported",
"image_name": "stellar/stellar-etl:6211230",
"image_name": "stellar/stellar-etl:e9c803c",
"image_output_path": "/etl/exported_data/",
"image_pull_policy": "IfNotPresent",
"kube_config_location": "",
Expand All @@ -153,7 +157,12 @@
"signers": "account_signers.txt",
"trades": "trades.txt",
"transactions": "transactions.txt",
"trustlines": "trustlines.txt"
"trustlines": "trustlines.txt",
"contract_data": "contract_data.txt",
"contract_code": "contract_code.txt",
"config_settings": "config_settings.txt",
"expiration": "expiration.txt",
"diagnostic_events": "diagnostic_events.txt"
},
"output_path": "/home/airflow/gcs/data/",
"owner": "SDF",
Expand Down Expand Up @@ -213,6 +222,22 @@
"trust_lines": {
"type": "MONTH",
"field": "batch_run_date"
},
"contract_data": {
"field": "batch_run_date",
"type": "MONTH"
},
"contract_code": {
"field": "batch_run_date",
"type": "MONTH"
},
"config_settings": {
"field": "batch_run_date",
"type": "MONTH"
},
"expiration": {
"field": "batch_run_date",
"type": "MONTH"
}
},
"public_dataset": "crypto_stellar_2",
Expand Down Expand Up @@ -258,7 +283,12 @@
"trades": "history_trades",
"transactions": "history_transactions",
"trustlines": "trust_lines",
"enriched_history_operations": "enriched_history_operations"
"enriched_history_operations": "enriched_history_operations",
"contract_data": "contract_data",
"contract_code": "contract_code",
"config_settings": "config_settings",
"expiration": "expiration",
"diagnostic_events": "diagnostic_events"
},
"task_timeout": {
"build_batch_stats": 180,
Expand Down Expand Up @@ -289,6 +319,7 @@
}
},
"partners_bucket": "ext-partner-sftp",
"use_futurenet": "False",
"currency_ohlc": {
"currency": "euro_ohlc",
"table_name": "euro_usd_ohlc",
Expand Down
41 changes: 36 additions & 5 deletions airflow_variables_dev.json
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,11 @@
"transaction_id",
"account",
"type"
]
],
"contract_data": ["last_modified_ledger"],
"contract_code": ["last_modified_ledger"],
"config_settings": ["last_modified_ledger"],
"expiration": ["last_modified_ledger"]
},
"dbt_auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"dbt_auth_uri": "https://accounts.google.com/o/oauth2/auth",
Expand All @@ -90,7 +94,7 @@
"partnership_assets__account_holders_activity_fact": true,
"partnership_assets__asset_activity_fact": true
},
"dbt_image_name": "stellar/stellar-dbt:0643adc",
"dbt_image_name": "stellar/stellar-dbt:011d897",
"dbt_job_execution_timeout_seconds": 300,
"dbt_job_retries": 1,
"dbt_keyfile_profile": "",
Expand All @@ -104,7 +108,7 @@
"dbt_token_uri": "https://oauth2.googleapis.com/token",
"gcs_exported_data_bucket_name": "us-central1-hubble-1pt5-dev-7db0e004-bucket",
"gcs_exported_object_prefix": "dag-exported",
"image_name": "stellar/stellar-etl:6211230",
"image_name": "chowbao/stellar-etl:testnet-noUsdDb",
"image_output_path": "/etl/exported_data/",
"image_pull_policy": "IfNotPresent",
"kube_config_location": "",
Expand All @@ -130,7 +134,12 @@
"signers": "account_signers.txt",
"trades": "trades.txt",
"transactions": "transactions.txt",
"trustlines": "trustlines.txt"
"trustlines": "trustlines.txt",
"contract_data": "contract_data.txt",
"contract_code": "contract_code.txt",
"config_settings": "config_settings.txt",
"expiration": "expiration.txt",
"diagnostic_events": "diagnostic_events.txt"
},
"output_path": "/home/airflow/gcs/data/",
"owner": "SDF",
Expand Down Expand Up @@ -194,6 +203,22 @@
"mgi": {
"field": "tran_evnt_date",
"type": "MONTH"
},
"contract_data": {
"field": "batch_run_date",
"type": "MONTH"
},
"contract_code": {
"field": "batch_run_date",
"type": "MONTH"
},
"config_settings": {
"field": "batch_run_date",
"type": "MONTH"
},
"expiration": {
"field": "batch_run_date",
"type": "MONTH"
}
},
"partners_data": {
Expand Down Expand Up @@ -246,7 +271,12 @@
"trades": "history_trades",
"transactions": "history_transactions",
"trustlines": "trust_lines",
"enriched_history_operations": "enriched_history_operations"
"enriched_history_operations": "enriched_history_operations",
"contract_data": "contract_data",
"contract_code": "contract_code",
"config_settings": "config_settings",
"expiration": "expiration",
"diagnostic_events": "diagnostic_events"
},
"task_timeout": {
"build_batch_stats": 180,
Expand All @@ -269,6 +299,7 @@
"sandbox_dataset": "crypto_stellar_internal_sandbox",
"volume_config": {},
"volume_name": "etl-data",
"use_futurenet": "False",
"currency_ohlc": {
"currency": "euro_ohlc",
"table_name": "euro_usd_ohlc",
Expand Down
1 change: 1 addition & 0 deletions dags/asset_pricing_pipeline_dag.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datetime
import json

from airflow import DAG
from airflow.models.variable import Variable
Expand Down
7 changes: 7 additions & 0 deletions dags/bucket_list_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
public_project = Variable.get("public_project")
public_dataset = Variable.get("public_dataset")
use_testnet = ast.literal_eval(Variable.get("use_testnet"))
use_futurenet = ast.literal_eval(Variable.get("use_futurenet"))

"""
The time task reads in the execution time of the current run, as well as the next
Expand All @@ -61,6 +62,7 @@
"export_accounts",
file_names["accounts"],
use_testnet=use_testnet,
use_futurenet=use_futurenet,
use_gcs=True,
)
export_bal_task = build_export_task(
Expand All @@ -69,6 +71,7 @@
"export_claimable_balances",
file_names["claimable_balances"],
use_testnet=use_testnet,
use_futurenet=use_futurenet,
use_gcs=True,
)
export_off_task = build_export_task(
Expand All @@ -77,6 +80,7 @@
"export_offers",
file_names["offers"],
use_testnet=use_testnet,
use_futurenet=use_futurenet,
use_gcs=True,
)
export_pool_task = build_export_task(
Expand All @@ -85,6 +89,7 @@
"export_pools",
file_names["liquidity_pools"],
use_testnet=use_testnet,
use_futurenet=use_futurenet,
use_gcs=True,
)
export_sign_task = build_export_task(
Expand All @@ -93,6 +98,7 @@
"export_signers",
file_names["signers"],
use_testnet=use_testnet,
use_futurenet=use_futurenet,
use_gcs=True,
)
export_trust_task = build_export_task(
Expand All @@ -101,6 +107,7 @@
"export_trustlines",
file_names["trustlines"],
use_testnet=use_testnet,
use_futurenet=use_futurenet,
use_gcs=True,
)

Expand Down
22 changes: 20 additions & 2 deletions dags/history_archive_with_captive_core_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
dag = DAG(
"history_archive_with_captive_core",
default_args=get_default_dag_args(),
start_date=datetime.datetime(2022, 3, 11, 18, 30),
start_date=datetime.datetime(2023, 9, 20, 15, 0),
catchup=True,
description="This DAG exports trades and operations from the history archive using CaptiveCore. This supports parsing sponsorship and AMMs.",
schedule_interval="*/30 * * * *",
params={
Expand All @@ -44,12 +45,13 @@
public_dataset = Variable.get("public_dataset")
public_dataset_new = Variable.get("public_dataset_new")
use_testnet = ast.literal_eval(Variable.get("use_testnet"))
use_futurenet = ast.literal_eval(Variable.get("use_futurenet"))

"""
The time task reads in the execution time of the current run, as well as the next
execution time. It converts these two times into ledger ranges.
"""
time_task = build_time_task(dag, use_testnet=use_testnet)
time_task = build_time_task(dag, use_testnet=use_testnet, use_futurenet=use_futurenet)

"""
The write batch stats task will take a snapshot of the DAG run_id, execution date,
Expand All @@ -60,6 +62,7 @@
write_trade_stats = build_batch_stats(dag, table_names["trades"])
write_effects_stats = build_batch_stats(dag, table_names["effects"])
write_tx_stats = build_batch_stats(dag, table_names["transactions"])
write_diagnostic_events_stats = build_batch_stats(dag, table_names["diagnostic_events"])

"""
The export tasks call export commands on the Stellar ETL using the ledger range from the time task.
Expand All @@ -76,6 +79,7 @@
"export_operations",
file_names["operations"],
use_testnet=use_testnet,
use_futurenet=use_futurenet,
use_gcs=True,
resource_cfg="cc",
)
Expand All @@ -85,6 +89,7 @@
"export_trades",
file_names["trades"],
use_testnet=use_testnet,
use_futurenet=use_futurenet,
use_gcs=True,
resource_cfg="cc",
)
Expand All @@ -94,6 +99,7 @@
"export_effects",
"effects.txt",
use_testnet=use_testnet,
use_futurenet=use_futurenet,
use_gcs=True,
resource_cfg="cc",
)
Expand All @@ -103,6 +109,17 @@
"export_transactions",
file_names["transactions"],
use_testnet=use_testnet,
use_futurenet=use_futurenet,
use_gcs=True,
resource_cfg="cc",
)
diagnostic_events_export_task = build_export_task(
dag,
"archive",
"export_diagnostic_events",
file_names["diagnostic_events"],
use_testnet=use_testnet,
use_futurenet=use_futurenet,
use_gcs=True,
resource_cfg="cc",
)
Expand Down Expand Up @@ -380,3 +397,4 @@
)
tx_export_task >> delete_old_tx_pub_task >> send_txs_to_pub_task >> wait_on_dag
tx_export_task >> delete_old_tx_pub_new_task >> send_txs_to_pub_new_task >> wait_on_dag
(time_task >> write_diagnostic_events_stats >> diagnostic_events_export_task)
8 changes: 6 additions & 2 deletions dags/history_archive_without_captive_core_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@
dag = DAG(
"history_archive_without_captive_core",
default_args=get_default_dag_args(),
start_date=datetime.datetime(2022, 3, 11, 18, 30),
start_date=datetime.datetime(2023, 9, 20, 15, 0),
catchup=True,
description="This DAG exports ledgers, transactions, and assets from the history archive to BigQuery. Incremental Loads",
schedule_interval="*/15 * * * *",
params={
Expand All @@ -43,12 +44,13 @@
public_dataset = Variable.get("public_dataset")
public_dataset_new = Variable.get("public_dataset_new")
use_testnet = ast.literal_eval(Variable.get("use_testnet"))
use_futurenet = ast.literal_eval(Variable.get("use_futurenet"))

"""
The time task reads in the execution time of the current run, as well as the next
execution time. It converts these two times into ledger ranges.
"""
time_task = build_time_task(dag, use_testnet=use_testnet)
time_task = build_time_task(dag, use_testnet=use_testnet, use_futurenet=use_futurenet)

"""
The write batch stats task will take a snapshot of the DAG run_id, execution date,
Expand All @@ -73,6 +75,7 @@
"export_ledgers",
file_names["ledgers"],
use_testnet=use_testnet,
use_futurenet=use_futurenet,
use_gcs=True,
)
asset_export_task = build_export_task(
Expand All @@ -81,6 +84,7 @@
"export_assets",
file_names["assets"],
use_testnet=use_testnet,
use_futurenet=use_futurenet,
use_gcs=True,
)

Expand Down
Loading

0 comments on commit 2e11bcf

Please sign in to comment.