Skip to content

Commit

Permalink
use subcommand
Browse files Browse the repository at this point in the history
fix
  • Loading branch information
amishas157 committed Oct 2, 2024
1 parent 9e357ff commit f14ef22
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 5 deletions.
2 changes: 1 addition & 1 deletion airflow_variables_dev.json
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,6 @@
"ttl": "ttl"
},
"task_sla": {
"all_sources": 720,
"asset_stats": 720,
"build_batch_stats": 840,
"build_bq_insert_job": 1080,
Expand All @@ -348,6 +347,7 @@
"enriched_history_operations": 780,
"enriched_history_operations_with_exclude": 780,
"fee_stats": 840,
"freshness": 720,
"history_assets": 720,
"liquidity_pool_trade_volume": 1140,
"liquidity_pools_value": 840,
Expand Down
4 changes: 2 additions & 2 deletions dags/dbt_source_data_freshness_test_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
# DBT tests to run
source_freshness_tests = dbt_task(
dag,
model_name="all_sources",
command_type="source freshness",
command_type="source",
sub_command="freshness",
flag=None,
tag=None,
resource_cfg="dbt",
Expand Down
11 changes: 9 additions & 2 deletions dags/stellar_etl_airflow/build_dbt_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def create_dbt_profile(project="prod"):
def dbt_task(
dag,
model_name=None,
sub_command=None,
tag=None,
flag="select",
operator="",
Expand All @@ -87,7 +88,12 @@ def dbt_task(

dbt_image = "{{ var.value.dbt_image_name }}"

args = [command_type, f"--{flag}"] if flag else [command_type]
args = [command_type]
if sub_command:
args.append(sub_command)

if flag:
args.append(f"--{flag}")

models = []
if tag:
Expand All @@ -96,6 +102,8 @@ def dbt_task(
if model_name:
task_name = model_name
models.append(f"{operator}{model_name}")
if sub_command:
task_name = sub_command
if len(models) > 1:
task_name = "multiple_models"
args.append(",".join(models))
Expand All @@ -117,7 +125,6 @@ def dbt_task(

logging.info(f"sh commands to run in pod: {args}")

command_type = command_type.replace(" ", "_")
return KubernetesPodOperator(
task_id=f"dbt_{command_type}_{task_name}",
name=f"dbt_{command_type}_{task_name}",
Expand Down

0 comments on commit f14ef22

Please sign in to comment.