diff --git a/dags/external_data_dag.py b/dags/external_data_dag.py index 9393788a..f8e11b3d 100644 --- a/dags/external_data_dag.py +++ b/dags/external_data_dag.py @@ -3,9 +3,8 @@ It is scheduled to export information to BigQuery at regular intervals. """ -import os from ast import literal_eval -from datetime import datetime, timedelta +from datetime import datetime from json import loads from airflow import DAG @@ -17,15 +16,12 @@ from stellar_etl_airflow.build_del_ins_from_gcs_to_bq_task import ( build_del_ins_from_gcs_to_bq_task, ) -from stellar_etl_airflow.build_del_ins_operator import ( - create_del_ins_task, - initialize_task_vars, -) -from stellar_etl_airflow.default import ( - alert_after_max_retries, - get_default_dag_args, - init_sentry, +from stellar_etl_airflow.build_del_ins_operator import create_del_ins_task +from stellar_etl_airflow.build_internal_export_task import ( + build_export_task, + get_airflow_metadata, ) +from stellar_etl_airflow.default import get_default_dag_args, init_sentry init_sentry() @@ -52,123 +48,67 @@ ) -def stellar_etl_internal_task( - dag, task_name, command, cmd_args=[], resource_cfg="default", output_file="" -): - namespace = conf.get("kubernetes", "NAMESPACE") - - if namespace == "default": - config_file_location = Variable.get("kube_config_location") - in_cluster = False - else: - config_file_location = None - in_cluster = True - - requests = { - "cpu": f"{{{{ var.json.resources.{resource_cfg}.requests.cpu }}}}", - "memory": f"{{{{ var.json.resources.{resource_cfg}.requests.memory }}}}", - } - container_resources = k8s.V1ResourceRequirements(requests=requests) - - image = "{{ var.value.stellar_etl_internal_image_name }}" - - etl_cmd_string = " ".join(cmd_args) - arguments = f""" {command} {etl_cmd_string} 1>> stdout.out 2>> stderr.out && cat stdout.out && cat stderr.out && echo "{{\\"output\\": \\"{output_file}\\"}}" >> /airflow/xcom/return.json""" - - return KubernetesPodOperator( - task_id=task_name, - name=task_name, - namespace=Variable.get("k8s_namespace"), - service_account_name=Variable.get("k8s_service_account"), - env_vars={ - "EXECUTION_DATE": "{{ ds }}", - "AIRFLOW_START_TIMESTAMP": "{{ ti.start_date.strftime('%Y-%m-%dT%H:%M:%SZ') }}", - "RETOOL_API_KEY": "{{ var.value.retool_api_key }}", - }, - image=image, - cmds=["bash", "-c"], - arguments=[arguments], - do_xcom_push=True, - dag=dag, - is_delete_operator_pod=True, - startup_timeout_seconds=720, - in_cluster=in_cluster, - config_file=config_file_location, - container_resources=container_resources, - on_failure_callback=alert_after_max_retries, - image_pull_policy="IfNotPresent", - image_pull_secrets=[k8s.V1LocalObjectReference("private-docker-auth")], - sla=timedelta( - seconds=Variable.get("task_sla", deserialize_json=True)[task_name] - ), - trigger_rule="all_done", - ) - - -retool_run_id = "{{ run_id }}" -retool_filepath = os.path.join( - Variable.get("gcs_exported_object_prefix"), - retool_run_id, - "retool-exported-entity.txt", -) - -retool_table_name = "retool_entity_data" -retool_table_id = "test-hubble-319619.test_crypto_stellar_internal.retool_entity_data" -retool_public_project = "test-hubble-319619" -retool_public_dataset = "test_crypto_stellar_internal" -retool_batch_id = macros.get_batch_id() -retool_batch_date = "{{ batch_run_date_as_datetime_string(dag, data_interval_start) }}" -retool_export_task_id = "export_retool_data" -retool_source_object_suffix = "" -retool_source_objects = [ - "{{ task_instance.xcom_pull(task_ids='" - + retool_export_task_id - + '\')["output"] }}' - + retool_source_object_suffix -] -retool_batch_insert_ts = datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ") - -retool_start_time = "{{ subtract_data_interval(dag, data_interval_start).isoformat() }}" -retool_end_time = "{{ subtract_data_interval(dag, data_interval_end).isoformat() }}" - -retool_export_task = stellar_etl_internal_task( +retool_export_task = build_export_task( dag, "export_retool_data", - "export-retool", + command="export-retool", cmd_args=[ "--start-time", - retool_start_time, + "{{ subtract_data_interval(dag, data_interval_start).isoformat() }}", "--end-time", - retool_end_time, - "--cloud-storage-bucket", - Variable.get("gcs_exported_data_bucket_name"), - "--cloud-provider", - "gcp", - "--output", - retool_filepath, - "-u", - f"'batch_id={retool_batch_id},batch_run_date={retool_batch_date},batch_insert_ts={retool_batch_insert_ts}'", + "{{ subtract_data_interval(dag, data_interval_end).isoformat() }}", ], - output_file=retool_filepath, + use_gcs=True, + env_vars={"RETOOL_API_KEY": "{{ var.value.retool_api_key }}"}, ) -retool_task_vars = { - "task_id": f"del_ins_{retool_table_name}_task", - "project": retool_public_project, - "dataset": retool_public_dataset, - "table_name": retool_table_name, - "export_task_id": "export_retool_data", - "source_object_suffix": retool_source_object_suffix, - "partition": False, - "cluster": False, - "batch_id": retool_batch_id, - "batch_date": retool_batch_date, - "source_objects": retool_source_objects, - "table_id": retool_table_id, -} -retool_insert_to_bq_task = create_del_ins_task( - dag, retool_task_vars, build_del_ins_from_gcs_to_bq_task +def get_insert_to_bq_task( + table_name: str, + project: str, + dataset: str, + export_task_id: str, + source_object_suffix: str, + partition: bool, + cluster: bool, + table_id: str, +): + metadata = get_airflow_metadata() + source_objects = [ + "{{ task_instance.xcom_pull(task_ids='" + + export_task_id + + '\')["output"] }}' + + source_object_suffix + ] + task_vars = { + "task_id": f"del_ins_{table_name}_task", + "project": project, + "dataset": dataset, + "table_name": table_name, + "export_task_id": export_task_id, + "source_object_suffix": source_object_suffix, + "partition": partition, + "cluster": cluster, + "batch_id": metadata["batch_id"], + "batch_date": metadata["batch_date"], + "source_objects": source_objects, + "table_id": table_id, + } + insert_to_bq_task = create_del_ins_task( + dag, task_vars, build_del_ins_from_gcs_to_bq_task + ) + return insert_to_bq_task + + +retool_insert_to_bq_task = get_insert_to_bq_task( + table_name="retool_entity_data", + project="test-hubble-319619", + dataset="test_crypto_stellar_internal", + export_task_id="export_retool_data", + source_object_suffix="", + partition=False, + cluster=False, + table_id="test-hubble-319619.test_crypto_stellar_internal.retool_entity_data", ) retool_export_task >> retool_insert_to_bq_task diff --git a/dags/stellar_etl_airflow/build_internal_export_task.py b/dags/stellar_etl_airflow/build_internal_export_task.py new file mode 100644 index 00000000..f22c104e --- /dev/null +++ b/dags/stellar_etl_airflow/build_internal_export_task.py @@ -0,0 +1,107 @@ +""" +This file contains functions for creating Airflow tasks to run stellar-etl-internal export functions. +""" + +import os +from datetime import datetime, timedelta + +from airflow import DAG +from airflow.configuration import conf +from airflow.models.variable import Variable +from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator +from kubernetes.client import models as k8s +from stellar_etl_airflow import macros +from stellar_etl_airflow.default import alert_after_max_retries + + +def get_airflow_metadata(): + return { + "batch_insert_ts": datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ"), + "batch_date": "{{ batch_run_date_as_datetime_string(dag, data_interval_start) }}", + "batch_id": macros.get_batch_id(), + "run_id": "{{ run_id }}", + } + + +def build_export_task( + dag, + task_name, + command, + cmd_args=[], + env_vars={}, + use_gcs=False, + resource_cfg="default", +): + namespace = conf.get("kubernetes", "NAMESPACE") + + if namespace == "default": + config_file_location = Variable.get("kube_config_location") + in_cluster = False + else: + config_file_location = None + in_cluster = True + + requests = { + "cpu": f"{{{{ var.json.resources.{resource_cfg}.requests.cpu }}}}", + "memory": f"{{{{ var.json.resources.{resource_cfg}.requests.memory }}}}", + } + container_resources = k8s.V1ResourceRequirements(requests=requests) + + image = "{{ var.value.stellar_etl_internal_image_name }}" + + output_filepath = "" + if use_gcs: + metadata = get_airflow_metadata() + batch_insert_ts = metadata["batch_insert_ts"] + batch_date = metadata["batch_date"] + batch_id = metadata["batch_id"] + run_id = metadata["run_id"] + + output_filepath = os.path.join( + Variable.get("gcs_exported_object_prefix"), + run_id, + f"{task_name}-exported-entity.txt", + ) + + cmd_args = cmd_args + [ + "--cloud-storage-bucket", + Variable.get("gcs_exported_data_bucket_name"), + "--cloud-provider", + "gcp", + "--output", + output_filepath, + "-u", + f"'batch_id={batch_id},batch_run_date={batch_date},batch_insert_ts={batch_insert_ts}'", + ] + etl_cmd_string = " ".join(cmd_args) + arguments = f""" {command} {etl_cmd_string} 1>> stdout.out 2>> stderr.out && cat stdout.out && cat stderr.out && echo "{{\\"output\\": \\"{output_filepath}\\"}}" >> /airflow/xcom/return.json""" + + return KubernetesPodOperator( + task_id=task_name, + name=task_name, + namespace=Variable.get("k8s_namespace"), + service_account_name=Variable.get("k8s_service_account"), + env_vars=env_vars.update( + { + "EXECUTION_DATE": "{{ ds }}", + "AIRFLOW_START_TIMESTAMP": "{{ ti.start_date.strftime('%Y-%m-%dT%H:%M:%SZ') }}", + } + ), + image=image, + cmds=["bash", "-c"], + arguments=[arguments], + do_xcom_push=True, + dag=dag, + is_delete_operator_pod=True, + startup_timeout_seconds=720, + in_cluster=in_cluster, + config_file=config_file_location, + container_resources=container_resources, + on_failure_callback=alert_after_max_retries, + image_pull_policy="IfNotPresent", + image_pull_secrets=[k8s.V1LocalObjectReference("private-docker-auth")], + sla=timedelta( + seconds=Variable.get("task_sla", deserialize_json=True)[task_name] + ), + trigger_rule="all_done", + )