Skip to content

Commit

Permalink
Merge branch 'master' into timestamp-mgi
Browse files Browse the repository at this point in the history
  • Loading branch information
sydneynotthecity authored Nov 28, 2023
2 parents 811fa8f + 478dc2c commit 7398c91
Show file tree
Hide file tree
Showing 27 changed files with 245 additions and 283 deletions.
9 changes: 0 additions & 9 deletions airflow_variables_dev.json
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,6 @@
"config_settings": ["last_modified_ledger"],
"expiration": ["last_modified_ledger"]
},
"dbt_auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"dbt_auth_uri": "https://accounts.google.com/o/oauth2/auth",
"dbt_client_email": "[email protected]",
"dbt_client_id": 101879863915888261202,
"dbt_client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/803774498738-compute%40developer.gserviceaccount.com",
"dbt_dataset": "test_sdf",
"dbt_full_refresh_models": {
"partnership_assets__account_holders_activity_fact": true,
Expand All @@ -97,15 +92,11 @@
"dbt_image_name": "stellar/stellar-dbt:1e160fe",
"dbt_job_execution_timeout_seconds": 300,
"dbt_job_retries": 1,
"dbt_keyfile_profile": "",
"dbt_mart_dataset": "test_sdf_marts",
"dbt_maximum_bytes_billed": 250000000000,
"dbt_private_key": "dummy",
"dbt_private_key_id": "dummy",
"dbt_project": "test-hubble-319619",
"dbt_target": "dev_airflow",
"dbt_threads": 1,
"dbt_token_uri": "https://oauth2.googleapis.com/token",
"gcs_exported_data_bucket_name": "us-central1-hubble-1pt5-dev-7db0e004-bucket",
"gcs_exported_object_prefix": "dag-exported",
"image_name": "stellar/stellar-etl:08fd8de",
Expand Down
9 changes: 0 additions & 9 deletions airflow_variables_prod.json
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,6 @@
"config_settings": ["last_modified_ledger"],
"expiration": ["last_modified_ledger"]
},
"dbt_auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"dbt_auth_uri": "https://accounts.google.com/o/oauth2/auth",
"dbt_client_email": "[email protected]",
"dbt_client_id": 102744410146098624313,
"dbt_client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/471979297599-compute%40developer.gserviceaccount.com",
"dbt_dataset": "sdf",
"dbt_full_refresh_models": {
"history_assets": false,
Expand All @@ -119,15 +114,11 @@
"dbt_image_name": "stellar/stellar-dbt:1e160fe",
"dbt_job_execution_timeout_seconds": 6000,
"dbt_job_retries": 1,
"dbt_keyfile_profile": "",
"dbt_mart_dataset": "sdf_marts",
"dbt_maximum_bytes_billed": 100000000000000,
"dbt_private_key": "",
"dbt_private_key_id": "",
"dbt_project": "hubble-261722",
"dbt_target": "prod",
"dbt_threads": 1,
"dbt_token_uri": "https://oauth2.googleapis.com/token",
"gcs_exported_data_bucket_name": "us-central1-hubble-2-d948d67b-bucket",
"gcs_exported_object_prefix": "dag-exported",
"image_name": "stellar/stellar-etl:08fd8de",
Expand Down
4 changes: 2 additions & 2 deletions dags/asset_pricing_pipeline_dag.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import datetime
import json
from json import loads

from airflow import DAG
from airflow.models.variable import Variable
Expand All @@ -16,7 +16,7 @@
description="This DAG runs dbt to calculate asset pricing based on stablecoin and XLM trades",
schedule_interval="0 2 * * *", # daily at 2am
params={},
user_defined_filters={"fromjson": lambda s: json.loads(s)},
user_defined_filters={"fromjson": lambda s: loads(s)},
user_defined_macros={
"subtract_data_interval": macros.subtract_data_interval,
"batch_run_date_as_datetime_string": macros.batch_run_date_as_datetime_string,
Expand Down
8 changes: 4 additions & 4 deletions dags/audit_log_dag.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
"""
This DAG runs an audit log SQL to update the audit log dashboard.
"""
import datetime
import json
from datetime import datetime
from json import loads

from airflow import DAG
from airflow.models import Variable
Expand All @@ -16,12 +16,12 @@
"audit_log_dag",
default_args=get_default_dag_args(),
description="This DAG runs periodically to update the audit log dashboard.",
start_date=datetime.datetime(2023, 1, 1, 0, 0),
start_date=datetime(2023, 1, 1, 0, 0),
schedule_interval="10 9 * * *",
params={
"alias": "audit-log",
},
user_defined_filters={"fromjson": lambda s: json.loads(s)},
user_defined_filters={"fromjson": lambda s: loads(s)},
catchup=False,
)

Expand Down
60 changes: 34 additions & 26 deletions dags/bucket_list_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
to stop exporting. This end ledger is determined by when the Airflow DAG is run. This DAG should be triggered manually
when initializing the tables in order to catch up to the current state in the network, but should not be scheduled to run constantly.
"""
import ast
import datetime
import json
from ast import literal_eval
from datetime import datetime
from json import loads

from airflow import DAG
from airflow.models import Variable
Expand All @@ -23,29 +23,31 @@
dag = DAG(
"bucket_list_export",
default_args=get_default_dag_args(),
start_date=datetime.datetime(2021, 10, 15),
end_date=datetime.datetime(2021, 10, 15),
start_date=datetime(2021, 10, 15),
end_date=datetime(2021, 10, 15),
description="This DAG loads a point forward view of state tables. Caution: Does not capture historical changes!",
schedule_interval="@daily",
params={
"alias": "bucket",
},
user_defined_filters={"fromjson": lambda s: json.loads(s)},
user_defined_filters={
"fromjson": lambda s: loads(s),
"literal_eval": lambda e: literal_eval(e),
},
user_defined_macros={
"subtract_data_interval": macros.subtract_data_interval,
"batch_run_date_as_datetime_string": macros.batch_run_date_as_datetime_string,
},
)

file_names = Variable.get("output_file_names", deserialize_json=True)
table_names = Variable.get("table_ids", deserialize_json=True)
internal_project = Variable.get("bq_project")
internal_dataset = Variable.get("bq_dataset")
public_project = Variable.get("public_project")
public_dataset = Variable.get("public_dataset")
use_testnet = ast.literal_eval(Variable.get("use_testnet"))
use_futurenet = ast.literal_eval(Variable.get("use_futurenet"))

internal_project = "{{ var.value.bq_project }}"
internal_dataset = "{{ var.value.bq_dataset }}"
public_project = "{{ var.value.public_project }}"
public_dataset = "{{ var.value.public_dataset }}"
public_dataset_new = "{{ var.value.public_dataset_new }}"
use_testnet = "{{ var.value.use_testnet | literal_eval }}"
use_futurenet = "{{ var.value.use_futurenet | literal_eval }}"
"""
The time task reads in the execution time of the current run, as well as the next
execution time. It converts these two times into ledger ranges.
Expand All @@ -60,7 +62,7 @@
dag,
"bucket",
"export_accounts",
file_names["accounts"],
"{{ var.json.output_file_names.accounts }}",
use_testnet=use_testnet,
use_futurenet=use_futurenet,
use_gcs=True,
Expand All @@ -69,7 +71,7 @@
dag,
"bucket",
"export_claimable_balances",
file_names["claimable_balances"],
"{{ var.json.output_file_names.claimable_balances }}",
use_testnet=use_testnet,
use_futurenet=use_futurenet,
use_gcs=True,
Expand All @@ -78,7 +80,7 @@
dag,
"bucket",
"export_offers",
file_names["offers"],
"{{ var.json.output_file_names.offers }}",
use_testnet=use_testnet,
use_futurenet=use_futurenet,
use_gcs=True,
Expand All @@ -87,7 +89,7 @@
dag,
"bucket",
"export_pools",
file_names["liquidity_pools"],
"{{ var.json.output_file_names.liquidity_pools }}",
use_testnet=use_testnet,
use_futurenet=use_futurenet,
use_gcs=True,
Expand All @@ -96,7 +98,7 @@
dag,
"bucket",
"export_signers",
file_names["signers"],
"{{ var.json.output_file_names.signers }}",
use_testnet=use_testnet,
use_futurenet=use_futurenet,
use_gcs=True,
Expand All @@ -105,7 +107,7 @@
dag,
"bucket",
"export_trustlines",
file_names["trustlines"],
"{{ var.json.output_file_names.trustlines }}",
use_testnet=use_testnet,
use_futurenet=use_futurenet,
use_gcs=True,
Expand All @@ -131,37 +133,37 @@
dag, internal_project, internal_dataset, table_names["accounts"]
)
delete_acc_pub_task = build_delete_data_task(
dag, public_project, public_dataset, table_names["accounts"]
dag, public_project, public_dataset, table_names["accounts"], "pub"
)
delete_bal_task = build_delete_data_task(
dag, internal_project, internal_dataset, table_names["claimable_balances"]
)
delete_bal_pub_task = build_delete_data_task(
dag, public_project, public_dataset, table_names["claimable_balances"]
dag, public_project, public_dataset, table_names["claimable_balances"], "pub"
)
delete_off_task = build_delete_data_task(
dag, internal_project, internal_dataset, table_names["offers"]
)
delete_off_pub_task = build_delete_data_task(
dag, public_project, public_dataset, table_names["offers"]
dag, public_project, public_dataset, table_names["offers"], "pub"
)
delete_pool_task = build_delete_data_task(
dag, internal_project, internal_dataset, table_names["liquidity_pools"]
)
delete_pool_pub_task = build_delete_data_task(
dag, public_project, public_dataset, table_names["liquidity_pools"]
dag, public_project, public_dataset, table_names["liquidity_pools"], "pub"
)
delete_sign_task = build_delete_data_task(
dag, internal_project, internal_dataset, table_names["signers"]
)
delete_sign_pub_task = build_delete_data_task(
dag, public_project, public_dataset, table_names["signers"]
dag, public_project, public_dataset, table_names["signers"], "pub"
)
delete_trust_task = build_delete_data_task(
dag, internal_project, internal_dataset, table_names["trustlines"]
)
delete_trust_pub_task = build_delete_data_task(
dag, public_project, public_dataset, table_names["trustlines"]
dag, public_project, public_dataset, table_names["trustlines"], "pub"
)

"""
Expand Down Expand Up @@ -244,6 +246,7 @@
"",
partition=True,
cluster=True,
dataset_type="pub",
)
send_bal_to_pub_task = build_gcs_to_bq_task(
dag,
Expand All @@ -254,6 +257,7 @@
"",
partition=True,
cluster=True,
dataset_type="pub",
)
send_off_to_pub_task = build_gcs_to_bq_task(
dag,
Expand All @@ -264,6 +268,7 @@
"",
partition=True,
cluster=True,
dataset_type="pub",
)
send_pool_to_pub_task = build_gcs_to_bq_task(
dag,
Expand All @@ -274,6 +279,7 @@
"",
partition=True,
cluster=True,
dataset_type="pub",
)
send_sign_to_pub_task = build_gcs_to_bq_task(
dag,
Expand All @@ -284,6 +290,7 @@
"",
partition=True,
cluster=True,
dataset_type="pub",
)
send_trust_to_pub_task = build_gcs_to_bq_task(
dag,
Expand All @@ -294,6 +301,7 @@
"",
partition=True,
cluster=True,
dataset_type="pub",
)

(
Expand Down
8 changes: 4 additions & 4 deletions dags/daily_euro_ohlc_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
The daily_euro_ohlc_dag DAG updates the currency table in Bigquey every day.
"""

import datetime
import json
from datetime import datetime
from json import loads

from airflow import DAG
from airflow.decorators import dag
Expand All @@ -18,13 +18,13 @@

with DAG(
dag_id="daily_euro_ohlc_dag",
start_date=datetime.datetime(2023, 1, 1, 0, 0),
start_date=datetime(2023, 1, 1, 0, 0),
description="This DAG updates the currency tables in Bigquey every day",
schedule_interval="35 0 * * *",
params={
"alias": "euro",
},
user_defined_filters={"fromjson": lambda s: json.loads(s)},
user_defined_filters={"fromjson": lambda s: loads(s)},
catchup=False,
) as dag:
currency_ohlc = Variable.get("currency_ohlc", deserialize_json=True)
Expand Down
12 changes: 6 additions & 6 deletions dags/dataset_reset_dag.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
"""
When the Test net server is reset, the dataset reset DAG deletes all the datasets in the test Hubble.
"""
import ast
import datetime
import json
from ast import literal_eval
from datetime import datetime
from json import loads

from airflow import DAG
from airflow.models import Variable
Expand All @@ -20,13 +20,13 @@
"testnet_data_reset",
default_args=get_default_dag_args(),
description="This DAG runs after the Testnet data reset that occurs periodically.",
start_date=datetime.datetime(2023, 1, 1, 0, 0),
start_date=datetime(2023, 1, 1, 0, 0),
schedule_interval="10 9 * * *",
is_paused_upon_creation=ast.literal_eval(Variable.get("use_testnet")),
is_paused_upon_creation=literal_eval(Variable.get("use_testnet")),
params={
"alias": "testnet-reset",
},
user_defined_filters={"fromjson": lambda s: json.loads(s)},
user_defined_filters={"fromjson": lambda s: loads(s)},
)

internal_project = "test-hubble-319619"
Expand Down
5 changes: 2 additions & 3 deletions dags/enriched_tables_dag.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import datetime
from datetime import datetime

from airflow import DAG
from airflow.models.variable import Variable
from stellar_etl_airflow.build_cross_dependency_task import build_cross_deps
from stellar_etl_airflow.build_dbt_task import build_dbt_task
from stellar_etl_airflow.default import get_default_dag_args, init_sentry
Expand All @@ -11,7 +10,7 @@
dag = DAG(
"enriched_tables",
default_args=get_default_dag_args(),
start_date=datetime.datetime(2023, 4, 12, 0, 0),
start_date=datetime(2023, 4, 12, 0, 0),
description="This DAG runs dbt to create the tables for the models in marts/enriched/.",
schedule_interval="*/30 * * * *", # Runs every 30 mins
params={},
Expand Down
Loading

0 comments on commit 7398c91

Please sign in to comment.