diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_rdo/flows.py b/pipelines/rj_smtr/br_rj_riodejaneiro_rdo/flows.py index c6a4a09eb..4bb16d9f2 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_rdo/flows.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_rdo/flows.py @@ -5,7 +5,7 @@ from prefect import Parameter, case -# from prefect.tasks.prefect import create_flow_run, wait_for_flow_run +from prefect.tasks.prefect import create_flow_run # , wait_for_flow_run from prefect.utilities.edges import unmapped from prefect.run_configs import KubernetesRun from prefect.storage import GCS @@ -15,14 +15,13 @@ check_files_for_download, download_and_save_local_from_ftp, pre_treatment_br_rj_riodejaneiro_rdo, - get_rdo_date_range, + # get_rdo_date_range, update_rdo_redis, ) from pipelines.rj_smtr.constants import constants from pipelines.rj_smtr.tasks import ( bq_upload, get_current_timestamp, - set_last_run_timestamp, ) from pipelines.rj_smtr.schedules import every_day @@ -49,6 +48,7 @@ dataset_id = Parameter("dataset_id", default=constants.RDO_DATASET_ID.value) table_id = Parameter("table_id", default=constants.SPPO_RHO_TABLE_ID.value) rebuild = Parameter("rebuild", False) + partition_dates = Parameter("partition_dates", default=None) LABELS = get_current_flow_labels() MODE = get_current_flow_mode(LABELS) @@ -59,37 +59,26 @@ # dbt_client = get_local_dbt_client(host="localhost", port=3001) # Set specific run parameters # - date_range = get_rdo_date_range(dataset_id=dataset_id, table_id=table_id, mode=MODE) + # with case(bool(run_dates), False): + # dates = get_rdo_date_range(dataset_id=dataset_id, table_id=table_id, mode=MODE) + # with case(bool(run_dates), True): + # dates = run_dates # Run materialization # with case(rebuild, True): - RUN = run_dbt_model( + RUN = run_dbt_model.map( dbt_client=dbt_client, dataset_id=dataset_id, table_id=table_id, upstream=True, - _vars=[date_range], + _vars=[partition_dates], flags="--full-refresh", ) - set_last_run_timestamp( - dataset_id=dataset_id, - table_id=table_id, - timestamp=date_range["date_range_end"], - wait=RUN, - mode=MODE, - ) with case(rebuild, False): - RUN = run_dbt_model( + RUN = run_dbt_model.map( dbt_client=dbt_client, dataset_id=dataset_id, table_id=table_id, - _vars=[date_range], - ) - set_last_run_timestamp( - dataset_id=dataset_id, - table_id=table_id, - timestamp=date_range["date_range_end"], - wait=RUN, - mode=MODE, + _vars=[partition_dates], ) sppo_rho_materialize.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) @@ -98,6 +87,7 @@ labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value], ) + with Flow( "SMTR: RHO - Captura", code_owners=["caio", "fernanda", "boris", "rodrigo"], @@ -109,11 +99,15 @@ table_id = Parameter("table_id", constants.SPPO_RHO_TABLE_ID.value) materialize = Parameter("materialize", False) + # SETUP rename_run = rename_current_flow_run_now_time( prefix=f"{captura_sppo_rho.name} FTP - {transport_mode.run()}-{report_type.run()} ", now_time=get_current_timestamp(), wait=None, ) + LABELS = get_current_flow_labels() + MODE = get_current_flow_mode(LABELS) + # EXTRACT files = get_file_paths_from_ftp( transport_mode=transport_mode, report_type=report_type, dump=dump @@ -135,9 +129,18 @@ partitions=partitions, status=status, ) - set_redis = update_rdo_redis( + run_dates = update_rdo_redis( download_files=download_files, table_id=table_id, errors=errors ) + with case(bool(run_dates) and materialize, True): + run_materialize = create_flow_run( + flow_name=sppo_rho_materialize.name, + project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value, + parameters={"run_dates": run_dates}, + labels=LABELS, + run_name=sppo_rho_materialize.name, + ) + captura_ftp.set_dependencies(run_materialize, [run_dates]) captura_sppo_rho.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value) captura_sppo_rho.run_config = KubernetesRun( diff --git a/pipelines/rj_smtr/br_rj_riodejaneiro_rdo/tasks.py b/pipelines/rj_smtr/br_rj_riodejaneiro_rdo/tasks.py index af27523c0..b7992559c 100644 --- a/pipelines/rj_smtr/br_rj_riodejaneiro_rdo/tasks.py +++ b/pipelines/rj_smtr/br_rj_riodejaneiro_rdo/tasks.py @@ -63,6 +63,8 @@ def get_file_paths_from_ftp( "filename": filename.split(".")[0], "ftp_path": transport_mode + "/" + filename, "partitions": f"ano={date[:4]}/mes={date[4:6]}/dia={date[6:]}", + "date": date, + "modified": file_mtime, "error": None, } # log(f"Create file info: {file_info}") @@ -74,24 +76,30 @@ def get_file_paths_from_ftp( @task -def check_files_for_download(files: list, dataset_id: str, table_id: str): +def check_files_for_download( + files: list, dataset_id: str, table_id: str, mode: str = "prod" +): """Check redis for files already downloaded from the FTP Args: files (list): file informations gathered from FTP dataset_id (str): dataset_id on BigQuery table_id (str): table_id on BigQuery + mode(str, Optional): mode to run the flow. Accepts + `prod` or `dev`. Defaults to `prod` Returns: list: Containing the info on the files to download """ redis_client = get_redis_client() + key = f"{dataset_id}.{table_id}" + if mode == "dev": + key = f"{mode}.{key}" try: - exclude_files = redis_client.get(f"{dataset_id}.{table_id}")["files"] + exclude_files = redis_client.get(key)["files"] except (TypeError, KeyError): set_redis_rdo_files(redis_client, dataset_id, table_id) - exclude_files = redis_client.get(f"{dataset_id}.{table_id}")["files"] log(f"There are {len(exclude_files)} already downloaded") download_files = [ @@ -235,11 +243,12 @@ def pre_treatment_br_rj_riodejaneiro_rdo( @task -def update_rdo_redis( +def update_rdo_redis( # pylint: disable=R0913 download_files: list, table_id: str, dataset_id: str = constants.RDO_DATASET_ID.value, errors=None, + mode: str = "prod", wait=None, # pylint: disable=W0613 ): """ @@ -257,6 +266,8 @@ def update_rdo_redis( bool: if redis key was set """ key = f"{dataset_id}.{table_id}" + if mode == "dev": + key = f"{mode}.{key}" redis_client = get_redis_client() content = redis_client.get(key) # get current redis state if errors: @@ -274,7 +285,13 @@ def update_rdo_redis( ) content["files"].extend(insert_content) # generate updated dict to set log(f"After appending, content has {len(content['files'])} files registered") - return redis_client.set(key, content) + redis_client.set(key, content) + run_dates = [ + {"partition_date": file_info["date"]} + for file_info in download_files + if not file_info["error"] + ] + return run_dates @task diff --git a/pipelines/rj_smtr/tasks.py b/pipelines/rj_smtr/tasks.py index 5127d1f51..ea80a8990 100644 --- a/pipelines/rj_smtr/tasks.py +++ b/pipelines/rj_smtr/tasks.py @@ -1279,6 +1279,7 @@ def set_last_run_timestamp( key = dataset_id + "." + table_id if mode == "dev": key = f"{mode}.{key}" + content = redis_client.get(key) if not content: content = {}