Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[rj-smtr] Materialização do RHO/RDO #316

Closed
wants to merge 120 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
120 commits
Select commit Hold shift + click to select a range
099ce17
add materialization
Hellcassius Dec 20, 2022
471fd7d
add conditional to flow
Hellcassius Dec 21, 2022
fa2fbe8
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Dec 23, 2022
b3986b5
change run_date to partition_date
Hellcassius Dec 23, 2022
6338a22
Merge branch 'staging/add_rho_materialize' of https://github.com/pref…
Hellcassius Dec 23, 2022
58d80cb
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Dec 23, 2022
50d6b31
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Dec 23, 2022
a705149
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Dec 23, 2022
6e97e68
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Dec 23, 2022
11bb66e
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Dec 23, 2022
1940174
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Dec 23, 2022
cbeb644
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Dec 23, 2022
3cc030a
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Dec 29, 2022
c1ffce9
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Jan 5, 2023
1a98fb5
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Jan 5, 2023
b4c1ccd
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Jan 6, 2023
652b90f
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Jan 6, 2023
dc2b4c8
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Jan 6, 2023
1531105
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Jan 9, 2023
416b467
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Jan 9, 2023
3694e85
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Jan 9, 2023
1b2c4c0
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Jan 10, 2023
4ff59c2
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Jan 10, 2023
683b513
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Jan 11, 2023
ca0450c
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Jan 12, 2023
007e0c5
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Jan 12, 2023
bb3841d
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Jan 12, 2023
67badcc
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Jan 12, 2023
1e7f6a7
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Jan 12, 2023
13084ec
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Jan 12, 2023
f91ea04
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Jan 16, 2023
3306122
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Jan 16, 2023
c16aa03
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Jan 27, 2023
17990a6
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Jan 31, 2023
cc40b21
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Feb 1, 2023
8557fdb
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Feb 9, 2023
b1ef684
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Feb 9, 2023
a011a06
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Feb 13, 2023
fb82683
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Feb 13, 2023
0145d92
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Feb 13, 2023
6ac0395
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Feb 16, 2023
d8178a5
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Feb 16, 2023
e0886af
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Feb 16, 2023
c720bd0
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Feb 16, 2023
69f0618
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Feb 16, 2023
70d4fd4
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Feb 16, 2023
2d0b6e8
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Feb 16, 2023
e012b5f
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Feb 16, 2023
bae3dff
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Feb 23, 2023
5da99d6
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Mar 2, 2023
0ab3ec3
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Mar 2, 2023
ed6fe59
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Mar 3, 2023
58ef8b0
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Mar 4, 2023
483a007
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Mar 7, 2023
edd5216
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Mar 10, 2023
d944279
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Mar 10, 2023
fcd022e
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Mar 11, 2023
628b6a5
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Mar 13, 2023
d64f2fd
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Mar 14, 2023
4e79f6d
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Mar 14, 2023
554005d
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Mar 14, 2023
5bf5616
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Mar 14, 2023
bd62428
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Mar 15, 2023
22a63f5
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Mar 17, 2023
32722e2
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Mar 20, 2023
f6d6272
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Mar 20, 2023
5e903d4
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Mar 21, 2023
ae95828
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Mar 22, 2023
4796056
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Mar 23, 2023
cb2e28d
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Mar 28, 2023
a9d8388
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Apr 6, 2023
8277433
Merge branch 'master' into staging/add_rho_materialize
eng-rodrigocunha Sep 26, 2023
2655ac2
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Sep 26, 2023
a785461
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Sep 28, 2023
7cdfe24
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Sep 28, 2023
c9965db
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Sep 29, 2023
95a473c
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Sep 29, 2023
36f86c3
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Oct 23, 2023
71f9d6d
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Oct 23, 2023
70e2dbc
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Oct 23, 2023
942dcae
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Oct 24, 2023
ac60950
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Oct 24, 2023
6ec1402
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Oct 24, 2023
fa3696a
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Oct 26, 2023
aa21118
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Oct 27, 2023
882f277
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Oct 27, 2023
2a9e2a0
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Nov 13, 2023
b90da9a
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Nov 13, 2023
6a3c87e
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Nov 13, 2023
02761f7
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Nov 13, 2023
0e3b693
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Nov 13, 2023
bae7f90
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Nov 14, 2023
4d9fbc1
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Nov 14, 2023
0bf054e
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Nov 15, 2023
b2b50fd
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Nov 15, 2023
0e55968
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Nov 15, 2023
947e4d4
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Nov 19, 2023
d887e1a
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Nov 19, 2023
65b063d
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Nov 24, 2023
ca21e76
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Dec 11, 2023
53a948b
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Dec 12, 2023
adb119f
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Dec 12, 2023
a02dc2b
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Dec 12, 2023
53b25c3
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Dec 13, 2023
c413e09
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Dec 14, 2023
b7e3b63
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Dec 14, 2023
a1452f0
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Dec 18, 2023
2bd4b6a
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Dec 18, 2023
3601f06
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Dec 18, 2023
b73f1af
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Dec 18, 2023
790532f
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Dec 21, 2023
72e28a0
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Jan 2, 2024
b99e2b4
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Jan 8, 2024
ea71626
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Jan 14, 2024
7c74587
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Jan 15, 2024
819130a
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Jan 15, 2024
c9e377a
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Jan 16, 2024
cd705ee
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Jan 16, 2024
8339934
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Jan 18, 2024
adec4f5
Merge branch 'master' into staging/add_rho_materialize
mergify[bot] Jan 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 26 additions & 23 deletions pipelines/rj_smtr/br_rj_riodejaneiro_rdo/flows.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

from prefect import Parameter, case

# from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from prefect.tasks.prefect import create_flow_run # , wait_for_flow_run
from prefect.utilities.edges import unmapped
from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
Expand All @@ -15,14 +15,13 @@
check_files_for_download,
download_and_save_local_from_ftp,
pre_treatment_br_rj_riodejaneiro_rdo,
get_rdo_date_range,
# get_rdo_date_range,
update_rdo_redis,
)
from pipelines.rj_smtr.constants import constants
from pipelines.rj_smtr.tasks import (
bq_upload,
get_current_timestamp,
set_last_run_timestamp,
)
from pipelines.rj_smtr.schedules import every_day

Expand All @@ -49,6 +48,7 @@
dataset_id = Parameter("dataset_id", default=constants.RDO_DATASET_ID.value)
table_id = Parameter("table_id", default=constants.SPPO_RHO_TABLE_ID.value)
rebuild = Parameter("rebuild", False)
partition_dates = Parameter("partition_dates", default=None)

LABELS = get_current_flow_labels()
MODE = get_current_flow_mode(LABELS)
Expand All @@ -59,37 +59,26 @@
# dbt_client = get_local_dbt_client(host="localhost", port=3001)

# Set specific run parameters #
date_range = get_rdo_date_range(dataset_id=dataset_id, table_id=table_id, mode=MODE)
# with case(bool(run_dates), False):
# dates = get_rdo_date_range(dataset_id=dataset_id, table_id=table_id, mode=MODE)
# with case(bool(run_dates), True):
# dates = run_dates
# Run materialization #
with case(rebuild, True):
RUN = run_dbt_model(
RUN = run_dbt_model.map(
dbt_client=dbt_client,
dataset_id=dataset_id,
table_id=table_id,
upstream=True,
_vars=[date_range],
_vars=[partition_dates],
flags="--full-refresh",
)
set_last_run_timestamp(
dataset_id=dataset_id,
table_id=table_id,
timestamp=date_range["date_range_end"],
wait=RUN,
mode=MODE,
)
with case(rebuild, False):
RUN = run_dbt_model(
RUN = run_dbt_model.map(
dbt_client=dbt_client,
dataset_id=dataset_id,
table_id=table_id,
_vars=[date_range],
)
set_last_run_timestamp(
dataset_id=dataset_id,
table_id=table_id,
timestamp=date_range["date_range_end"],
wait=RUN,
mode=MODE,
_vars=[partition_dates],
)

sppo_rho_materialize.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
Expand All @@ -98,6 +87,7 @@
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)


with Flow(
"SMTR: RHO - Captura",
code_owners=["caio", "fernanda", "boris", "rodrigo"],
Expand All @@ -109,11 +99,15 @@
table_id = Parameter("table_id", constants.SPPO_RHO_TABLE_ID.value)
materialize = Parameter("materialize", False)

# SETUP
rename_run = rename_current_flow_run_now_time(
prefix=f"{captura_sppo_rho.name} FTP - {transport_mode.run()}-{report_type.run()} ",
now_time=get_current_timestamp(),
wait=None,
)
LABELS = get_current_flow_labels()
MODE = get_current_flow_mode(LABELS)

# EXTRACT
files = get_file_paths_from_ftp(
transport_mode=transport_mode, report_type=report_type, dump=dump
Expand All @@ -135,9 +129,18 @@
partitions=partitions,
status=status,
)
set_redis = update_rdo_redis(
run_dates = update_rdo_redis(
download_files=download_files, table_id=table_id, errors=errors
)
with case(bool(run_dates) and materialize, True):
run_materialize = create_flow_run(
flow_name=sppo_rho_materialize.name,
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
parameters={"run_dates": run_dates},
labels=LABELS,
run_name=sppo_rho_materialize.name,
)
captura_ftp.set_dependencies(run_materialize, [run_dates])

captura_sppo_rho.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
captura_sppo_rho.run_config = KubernetesRun(
Expand Down
27 changes: 22 additions & 5 deletions pipelines/rj_smtr/br_rj_riodejaneiro_rdo/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ def get_file_paths_from_ftp(
"filename": filename.split(".")[0],
"ftp_path": transport_mode + "/" + filename,
"partitions": f"ano={date[:4]}/mes={date[4:6]}/dia={date[6:]}",
"date": date,
"modified": file_mtime,
"error": None,
}
# log(f"Create file info: {file_info}")
Expand All @@ -74,24 +76,30 @@ def get_file_paths_from_ftp(


@task
def check_files_for_download(files: list, dataset_id: str, table_id: str):
def check_files_for_download(
files: list, dataset_id: str, table_id: str, mode: str = "prod"
):
"""Check redis for files already downloaded from the FTP

Args:
files (list): file informations gathered from FTP
dataset_id (str): dataset_id on BigQuery
table_id (str): table_id on BigQuery
mode(str, Optional): mode to run the flow. Accepts
`prod` or `dev`. Defaults to `prod`

Returns:
list: Containing the info on the files to download
"""
redis_client = get_redis_client()
key = f"{dataset_id}.{table_id}"
if mode == "dev":
key = f"{mode}.{key}"

try:
exclude_files = redis_client.get(f"{dataset_id}.{table_id}")["files"]
exclude_files = redis_client.get(key)["files"]
except (TypeError, KeyError):
set_redis_rdo_files(redis_client, dataset_id, table_id)
exclude_files = redis_client.get(f"{dataset_id}.{table_id}")["files"]

log(f"There are {len(exclude_files)} already downloaded")
download_files = [
Expand Down Expand Up @@ -235,11 +243,12 @@ def pre_treatment_br_rj_riodejaneiro_rdo(


@task
def update_rdo_redis(
def update_rdo_redis( # pylint: disable=R0913
download_files: list,
table_id: str,
dataset_id: str = constants.RDO_DATASET_ID.value,
errors=None,
mode: str = "prod",
wait=None, # pylint: disable=W0613
):
"""
Expand All @@ -257,6 +266,8 @@ def update_rdo_redis(
bool: if redis key was set
"""
key = f"{dataset_id}.{table_id}"
if mode == "dev":
key = f"{mode}.{key}"
redis_client = get_redis_client()
content = redis_client.get(key) # get current redis state
if errors:
Expand All @@ -274,7 +285,13 @@ def update_rdo_redis(
)
content["files"].extend(insert_content) # generate updated dict to set
log(f"After appending, content has {len(content['files'])} files registered")
return redis_client.set(key, content)
redis_client.set(key, content)
run_dates = [
{"partition_date": file_info["date"]}
for file_info in download_files
if not file_info["error"]
]
return run_dates


@task
Expand Down
1 change: 1 addition & 0 deletions pipelines/rj_smtr/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -1279,6 +1279,7 @@ def set_last_run_timestamp(
key = dataset_id + "." + table_id
if mode == "dev":
key = f"{mode}.{key}"

content = redis_client.get(key)
if not content:
content = {}
Expand Down
Loading