Skip to content

Commit

Permalink
Updated formatting to match the linting standards as well as moved ta…
Browse files Browse the repository at this point in the history
…sk_id definition to resuable function
  • Loading branch information
harsha-stellar-data committed Jul 10, 2024
1 parent fb019a0 commit faeee88
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 12 deletions.
2 changes: 1 addition & 1 deletion dags/history_tables_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@
public_dataset,
)
del_ins_tasks[data_type] = create_del_ins_task(
dag, f"del_ins_{data_type}_task", task_vars, build_del_ins_from_gcs_to_bq_task
dag, task_vars, build_del_ins_from_gcs_to_bq_task
)

"""
Expand Down
11 changes: 1 addition & 10 deletions dags/state_table_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,41 +136,32 @@
source_object_suffix=source_object_suffix,
)
del_ins_tasks[data_type] = create_del_ins_task(
dag, f"del_ins_{data_type}_task", task_vars, build_del_ins_from_gcs_to_bq_task
dag, task_vars, build_del_ins_from_gcs_to_bq_task
)

# Set task dependencies
(date_task >> changes_task >> write_acc_stats >> del_ins_tasks["accounts"])

(date_task >> changes_task >> write_bal_stats >> del_ins_tasks["claimable_balances"])

(date_task >> changes_task >> write_off_stats >> del_ins_tasks["offers"])

(date_task >> changes_task >> write_pool_stats >> del_ins_tasks["liquidity_pools"])

(date_task >> changes_task >> write_sign_stats >> del_ins_tasks["signers"])

(date_task >> changes_task >> write_trust_stats >> del_ins_tasks["trustlines"])

(
date_task
>> changes_task
>> write_contract_data_stats
>> del_ins_tasks["contract_data"]
)

(
date_task
>> changes_task
>> write_contract_code_stats
>> del_ins_tasks["contract_code"]
)

(
date_task
>> changes_task
>> write_config_settings_stats
>> del_ins_tasks["config_settings"]
)

(date_task >> changes_task >> write_ttl_stats >> del_ins_tasks["ttl"])
5 changes: 4 additions & 1 deletion dags/stellar_etl_airflow/build_del_ins_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ def initialize_task_vars(
+ '\')["output"] }}'
+ source_object_suffix
]
task_id = f"del_ins_{data_type}_task"
return {
"task_id": task_id,
"project": public_project,
"dataset": public_dataset,
"table_name": table_name,
Expand All @@ -48,6 +50,7 @@ def initialize_task_vars(
"batch_id": batch_id,
"batch_date": batch_date,
"source_objects": source_objects,
"data_type": data_type,
}


Expand All @@ -65,7 +68,7 @@ def create_del_ins_task(dag, task_id, task_vars, del_ins_callable):
PythonOperator: The created PythonOperator.
"""
return PythonOperator(
task_id=task_id,
task_id=task_vars["task_id"],
python_callable=del_ins_callable,
op_kwargs=task_vars,
provide_context=True,
Expand Down

0 comments on commit faeee88

Please sign in to comment.