diff --git a/dags/external_data_dag.py b/dags/external_data_dag.py index 2d5300ec..ba251931 100644 --- a/dags/external_data_dag.py +++ b/dags/external_data_dag.py @@ -40,6 +40,9 @@ "alias": "external", }, render_template_as_native_obj=True, + user_defined_macros={ + "batch_run_date_as_datetime_string": macros.batch_run_date_as_datetime_string, + }, user_defined_filters={ "fromjson": lambda s: loads(s), "container_resources": lambda s: k8s.V1ResourceRequirements(requests=s), @@ -49,7 +52,7 @@ def stellar_etl_internal_task( - dag, task_name, command, cmd_args=[], resource_cfg="default" + dag, task_name, command, cmd_args=[], resource_cfg="default", output_file="" ): namespace = conf.get("kubernetes", "NAMESPACE") @@ -68,6 +71,12 @@ def stellar_etl_internal_task( image = "{{ var.value.stellar_etl_internal_image_name }}" + etl_cmd_string = " ".join(cmd_args) + arguments = f""" + {etl_cmd_string} 1>> stdout.out 2>> stderr.out && cat stdout.out && cat stderr.out && echo "{{\\"output\\": \\"{output_file}\\", + \\"failed_transforms\\": `grep failed_transforms stderr.out | cut -d\\",\\" -f2 | cut -d\\":\\" -f2`}}" >> /airflow/xcom/return.json + """ + return KubernetesPodOperator( task_id=task_name, name=task_name, @@ -79,8 +88,8 @@ def stellar_etl_internal_task( "RETOOL_API_KEY": "{{ var.value.retool_api_key }}", }, image=image, - cmds=[command], - arguments=cmd_args, + cmds=["bash", "-c"], + arguments=[command, arguments], dag=dag, is_delete_operator_pod=True, startup_timeout_seconds=720, @@ -104,7 +113,6 @@ def stellar_etl_internal_task( "retool-exported-entity.txt", ) - retool_export_task = stellar_etl_internal_task( dag, "export_retool_data", @@ -121,6 +129,7 @@ def stellar_etl_internal_task( "--output", retool_filepath, ], + output_file=retool_filepath, ) retool_table_name = "retool_entity_data"