Skip to content

Commit

Permalink
Merge pull request #340 from stellar/master
Browse files Browse the repository at this point in the history
[PRODUCTION] Update production Airflow environment
  • Loading branch information
sydneynotthecity authored Apr 18, 2024
2 parents 6edf786 + 89e38d0 commit 450970d
Show file tree
Hide file tree
Showing 6 changed files with 26 additions and 88 deletions.
10 changes: 5 additions & 5 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ jobs:
- id: "get-credentials"
uses: "google-github-actions/get-gke-credentials@v2"
with:
cluster_name: "us-central1-hubble-1pt5-dev-7db0e004-gke"
location: "us-central1-c"
cluster_name: "us-central1-test-hubble-2-5f1f2dbf-gke"
location: "us-central1"

- name: Pytest
run: pytest dags/
Expand Down Expand Up @@ -98,20 +98,20 @@ jobs:
run: python dags/stellar_etl_airflow/add_files_to_composer.py --bucket $BUCKET
env:
GOOGLE_CLOUD_PROJECT: test-hubble-319619
BUCKET: us-central1-hubble-1pt5-dev-7db0e004-bucket
BUCKET: us-central1-test-hubble-2-5f1f2dbf-bucket

- name: Update Airflow variables
uses: actions-hub/gcloud@master
env:
PROJECT_ID: test-hubble-319619
APPLICATION_CREDENTIALS: "${{ secrets.CREDS_TEST_HUBBLE }}"
COMPOSER_ENVIRONMENT: hubble-1pt5-dev
COMPOSER_ENVIRONMENT: test-hubble-2
LOCATION: us-central1
with:
args: >
components install kubectl && gcloud composer environments run
$COMPOSER_ENVIRONMENT --location $LOCATION variables import
-- gcsfuse/variables.json
-- gcsfuse/actual_mount_path/variables.json
promote-to-prod:
runs-on: ubuntu-latest
Expand Down
77 changes: 4 additions & 73 deletions airflow_variables_dev.json
Original file line number Diff line number Diff line change
@@ -1,74 +1,5 @@
{
"affinity": {
"default": {
"nodeAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [
{
"matchExpressions": [
{
"key": "cloud.google.com/gke-nodepool",
"operator": "In",
"values": ["default-pool"]
}
]
}
]
}
}
},
"cc": {
"nodeAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [
{
"matchExpressions": [
{
"key": "cloud.google.com/gke-nodepool",
"operator": "In",
"values": ["pool-2"]
}
]
}
]
}
}
},
"wocc": {
"nodeAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [
{
"matchExpressions": [
{
"key": "cloud.google.com/gke-nodepool",
"operator": "In",
"values": ["default-pool"]
}
]
}
]
}
}
},
"state": {
"nodeAffinity": {
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": [
{
"matchExpressions": [
{
"key": "cloud.google.com/gke-nodepool",
"operator": "In",
"values": ["default-pool"]
}
]
}
]
}
}
}
},
"affinity": {},
"api_key_path": "/home/airflow/gcs/data/apiKey.json",
"bq_dataset": "test_crypto_stellar_internal",
"bq_project": "test-hubble-319619",
Expand Down Expand Up @@ -121,17 +52,17 @@
"partnership_assets__account_holders_activity_fact": false,
"partnership_assets__asset_activity_fact": false
},
"dbt_image_name": "stellar/stellar-dbt:b621192",
"dbt_image_name": "stellar/stellar-dbt:8ae3438",
"dbt_job_execution_timeout_seconds": 300,
"dbt_job_retries": 1,
"dbt_mart_dataset": "test_sdf_marts",
"dbt_maximum_bytes_billed": 250000000000,
"dbt_project": "test-hubble-319619",
"dbt_target": "test",
"dbt_threads": 12,
"gcs_exported_data_bucket_name": "us-central1-hubble-1pt5-dev-7db0e004-bucket",
"gcs_exported_data_bucket_name": "us-central1-test-hubble-2-5f1f2dbf-bucket",
"gcs_exported_object_prefix": "dag-exported",
"image_name": "stellar/stellar-etl:75c9a9c",
"image_name": "stellar/stellar-etl:395f1f8",
"image_output_path": "/etl/exported_data/",
"image_pull_policy": "IfNotPresent",
"kube_config_location": "",
Expand Down
4 changes: 2 additions & 2 deletions airflow_variables_prod.json
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@
"partnership_assets__asset_activity_fact": false,
"trade_agg": false
},
"dbt_image_name": "stellar/stellar-dbt:b621192",
"dbt_image_name": "stellar/stellar-dbt:8ae3438",
"dbt_job_execution_timeout_seconds": 1800,
"dbt_job_retries": 1,
"dbt_mart_dataset": "sdf_marts",
Expand All @@ -153,7 +153,7 @@
"dbt_threads": 12,
"gcs_exported_data_bucket_name": "us-central1-hubble-2-d948d67b-bucket",
"gcs_exported_object_prefix": "dag-exported",
"image_name": "stellar/stellar-etl:75c9a9c",
"image_name": "stellar/stellar-etl:395f1f8",
"image_output_path": "/etl/exported_data/",
"image_pull_policy": "IfNotPresent",
"kube_config_location": "",
Expand Down
15 changes: 9 additions & 6 deletions dags/dbt_sdf_marts_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,16 @@

mgi_task = dbt_task(dag, tag="mgi")
liquidity_providers_task = dbt_task(dag, tag="liquidity_providers")
# liquidity_pools_values_task = dbt_task(dag, tag="liquidity_pools_value")
# liquidity_pools_value_history_task = dbt_task(dag, tag="liquidity_pools_value_history")
liquidity_pools_values_task = dbt_task(dag, tag="liquidity_pools_value")
liquidity_pools_value_history_task = dbt_task(dag, tag="liquidity_pools_value_history")
trade_agg_task = dbt_task(dag, tag="trade_agg")
fee_stats_agg_task = dbt_task(dag, tag="fee_stats")
asset_stats_agg_task = dbt_task(dag, tag="asset_stats")
network_stats_agg_task = dbt_task(dag, tag="network_stats")
partnership_assets_task = dbt_task(dag, tag="partnership_assets")
history_assets = dbt_task(dag, tag="history_assets")
soroban = dbt_task(dag, tag="soroban")
snapshot_state = dbt_task(dag, tag="snapshot_state")

elementary = elementary_task(dag, "dbt_sdf_marts")

Expand All @@ -57,20 +58,21 @@
wait_on_partner_pipeline_dag >> mgi_task

wait_on_dbt_enriched_base_tables >> liquidity_providers_task
# wait_on_dbt_enriched_base_tables >> liquidity_pools_values_task
# wait_on_dbt_enriched_base_tables >> liquidity_pools_value_history_task
wait_on_dbt_enriched_base_tables >> liquidity_pools_values_task
wait_on_dbt_enriched_base_tables >> liquidity_pools_value_history_task
wait_on_dbt_enriched_base_tables >> trade_agg_task
wait_on_dbt_enriched_base_tables >> fee_stats_agg_task
wait_on_dbt_enriched_base_tables >> asset_stats_agg_task
wait_on_dbt_enriched_base_tables >> network_stats_agg_task
wait_on_dbt_enriched_base_tables >> partnership_assets_task
wait_on_dbt_enriched_base_tables >> history_assets
wait_on_dbt_enriched_base_tables >> soroban
wait_on_dbt_enriched_base_tables >> snapshot_state

mgi_task >> elementary
liquidity_providers_task >> elementary
# liquidity_pools_values_task >> elementary
# liquidity_pools_value_history_task >> elementary
liquidity_pools_values_task >> elementary
liquidity_pools_value_history_task >> elementary
trade_agg_task >> elementary
fee_stats_agg_task >> elementary
asset_stats_agg_task >> elementary
Expand All @@ -79,3 +81,4 @@
history_assets >> elementary
soroban >> elementary
liquidity_pool_trade_volume_task >> elementary
snapshot_state >> elementary
3 changes: 1 addition & 2 deletions dags/stellar_etl_airflow/build_export_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,7 @@ def build_export_task(
else:
arguments = f"""
{etl_cmd_string} 2>> stderr.out && echo "{{\\"output\\": \\"{output_file}\\",
\\"failed_transforms\\": `grep failed_transforms stderr.out | cut -d\\",\\" -f2 | cut -d\\":\\" -f2`\\",
\\"succcessful_transforms\\": `grep successful_transforms stderr.out | cut -d\\",\\" -f2 | cut -d\\":\\" -f2`}}" >> /airflow/xcom/return.json
\\"failed_transforms\\": `grep failed_transforms stderr.out | cut -d\\",\\" -f2 | cut -d\\":\\" -f2`}}" >> /airflow/xcom/return.json
"""
return KubernetesPodOperator(
service_account_name=Variable.get("k8s_service_account"),
Expand Down
5 changes: 5 additions & 0 deletions schemas/history_operations_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1074,6 +1074,11 @@
"mode": "NULLABLE",
"name": "contract_code_hash",
"type": "STRING"
},
{
"mode": "REPEATED",
"name": "ledger_key_hash",
"type": "STRING"
}
],
"mode": "NULLABLE",
Expand Down

0 comments on commit 450970d

Please sign in to comment.