Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cria novo pull para a pipeline de recursos do SPPO #551

Merged
merged 103 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from 102 commits
Commits
Show all changes
103 commits
Select commit Hold shift + click to select a range
3c733ba
Realiza ajustes iniciais
lingsv Oct 27, 2023
b7bfdca
Merge branch 'master' into staging/smtr-subsidio-sppo-viagens-individ…
mergify[bot] Oct 27, 2023
a8e3d4c
update tasks principais
lingsv Oct 30, 2023
9ffbed6
update nome de função
lingsv Oct 30, 2023
778bcfc
retirei código sensível
lingsv Oct 30, 2023
664d5fd
atualiza parâmetro de função
lingsv Oct 30, 2023
a2207e7
criei arquivo de utils para separar funções
lingsv Oct 30, 2023
6686f70
Merge branch 'master' into staging/smtr-subsidio-sppo-viagens-individ…
mergify[bot] Oct 31, 2023
479f895
ajustes nos códigos
lingsv Oct 31, 2023
2232ab9
teste de extração
lingsv Oct 31, 2023
34c2fbb
alteração para testes no prefect
lingsv Oct 31, 2023
6d8d4ef
wip
lingsv Oct 31, 2023
1476240
wip
lingsv Oct 31, 2023
a999b49
Merge branch 'master' into staging/smtr-subsidio-sppo-viagens-individ…
mergify[bot] Nov 1, 2023
9da006a
configurei o fluxo de recurso para o fluxo default
lingsv Nov 1, 2023
70065be
update pipeline de recursos
lingsv Nov 6, 2023
8211741
wip teste de flow
lingsv Nov 6, 2023
57e0010
retirei função unmapped
lingsv Nov 6, 2023
88b3e20
tira map do flow, coloca top e skip como variável de função
lingsv Nov 7, 2023
184dccc
readequação de parâmetros
lingsv Nov 7, 2023
684eca5
alteração nome do flow
lingsv Nov 7, 2023
6ff0254
altera nome do flow
lingsv Nov 7, 2023
6dd66b4
altera nomes de arquivos antigos
lingsv Nov 7, 2023
7dbac76
excluí arquivos antigos
lingsv Nov 7, 2023
d90f5a6
update nome do flow
lingsv Nov 7, 2023
021b177
wip teste
lingsv Nov 7, 2023
73cd351
restaurando formato do flow
lingsv Nov 7, 2023
c5cb62c
alterei saída da função
lingsv Nov 7, 2023
3a38350
update constants
lingsv Nov 7, 2023
9a8cfeb
alterações no flow de captura
lingsv Nov 8, 2023
946d28c
retirado o unmapped
lingsv Nov 8, 2023
29891aa
retirei o map
lingsv Nov 8, 2023
f0ab1fc
removi logs de teste
lingsv Nov 8, 2023
9e7c7eb
Merge branch 'master' into staging/smtr-pipeline-sppo-recurso
mergify[bot] Nov 9, 2023
6a9a825
Merge branch 'master' into staging/smtr-pipeline-sppo-recurso
mergify[bot] Nov 9, 2023
0048a3f
Merge branch 'master' into staging/smtr-pipeline-sppo-recurso
mergify[bot] Nov 9, 2023
ba89313
estruturação inicial do flow de materialização
lingsv Nov 9, 2023
e5de057
Merge branch 'master' into staging/smtr-pipeline-sppo-recurso
mergify[bot] Nov 9, 2023
3974e1b
atualizei tempo de espera para extração
lingsv Nov 9, 2023
1d2e353
Merge branch 'staging/smtr-pipeline-sppo-recurso' of github.com:prefe…
lingsv Nov 9, 2023
9650837
alteração de sleep para evitar timeout
lingsv Nov 9, 2023
e8d4aaf
aumentei tempo de espera entre uma requisição e outra
lingsv Nov 9, 2023
072a15d
teste utils
lingsv Nov 10, 2023
9ac471d
retirado o log completo dos dados
lingsv Nov 10, 2023
4ec7a7a
Merge branch 'master' into staging/smtr-pipeline-sppo-recurso
mergify[bot] Nov 10, 2023
9827c22
atualização do formato do timestamp
lingsv Nov 10, 2023
77e2d31
Merge branch 'staging/smtr-pipeline-sppo-recurso' of github.com:prefe…
lingsv Nov 10, 2023
09f04ff
Merge branch 'master' into staging/smtr-pipeline-sppo-recurso
mergify[bot] Nov 13, 2023
6e904ad
Merge branch 'master' into staging/smtr-pipeline-sppo-recurso
mergify[bot] Nov 13, 2023
89eab75
Merge branch 'master' into staging/smtr-pipeline-sppo-recurso
mergify[bot] Nov 13, 2023
685889f
Merge branch 'master' into staging/smtr-pipeline-sppo-recurso
mergify[bot] Nov 13, 2023
c45b296
Merge branch 'master' into staging/smtr-pipeline-sppo-recurso
mergify[bot] Nov 13, 2023
1388b11
update de função de pré-tratamento
lingsv Nov 14, 2023
b1f05ab
Merge branch 'staging/smtr-pipeline-sppo-recurso' of github.com:prefe…
lingsv Nov 14, 2023
87b9c08
Merge branch 'master' into staging/smtr-pipeline-sppo-recurso
lingsv Nov 14, 2023
d482f13
ajuste de arquivo com conflitos
lingsv Nov 14, 2023
611c895
Merge branch 'master' into staging/smtr-pipeline-sppo-recurso
mergify[bot] Nov 15, 2023
ef06b16
Merge branch 'master' into staging/smtr-pipeline-sppo-recurso
mergify[bot] Nov 15, 2023
ec6a9a5
Merge branch 'master' into staging/smtr-pipeline-sppo-recurso
mergify[bot] Nov 15, 2023
96f9771
update função de dados aninhados
lingsv Nov 16, 2023
00312fc
Merge branch 'staging/smtr-pipeline-sppo-recurso' of github.com:prefe…
lingsv Nov 16, 2023
443bd8f
updates e teste flow de recursos
lingsv Nov 17, 2023
795ac78
altera função de dados aninhados
lingsv Nov 17, 2023
af4e58e
teste fluxo
lingsv Nov 17, 2023
bd5cd38
update timestamp no flow
lingsv Nov 17, 2023
42c7f98
inclusão do schedule
lingsv Nov 17, 2023
5bf8ef5
wip
lingsv Nov 17, 2023
d99f7fb
update parâmetros de extração
lingsv Nov 21, 2023
6e1526e
wip
lingsv Nov 21, 2023
9d22e87
Merge branch 'master' into staging/smtr-pipeline-sppo-recurso
lingsv Nov 21, 2023
fa9dfef
update parâmetro
lingsv Nov 22, 2023
0c579c5
Merge branch 'staging/smtr-pipeline-sppo-recurso' of github.com:prefe…
lingsv Nov 22, 2023
2d2b161
update nome do parâmetro
lingsv Nov 22, 2023
b052c3e
alteração da ordem dos parâmetros
lingsv Nov 22, 2023
e7a23bb
wip
lingsv Nov 22, 2023
35074e1
atualização da query
lingsv Nov 22, 2023
1489361
wip
lingsv Nov 22, 2023
c26ae64
wip
lingsv Nov 22, 2023
ba1c439
retorno ao ponto anterior
lingsv Nov 22, 2023
e9cb258
wip
lingsv Nov 23, 2023
8ee317a
update constants
lingsv Nov 23, 2023
9e30152
update constants
lingsv Nov 23, 2023
63d03cf
update
lingsv Nov 23, 2023
08a547c
adiona lógica para pular downloads antigos
lingsv Nov 23, 2023
3780778
update flow e tasks
lingsv Nov 23, 2023
795ad83
update code owners e nome do flow
lingsv Nov 24, 2023
c2374a3
update pipeline
lingsv Nov 27, 2023
1e75499
Merge branches 'master' and 'staging/smtr-pipeline-sppo-recurso' of g…
lingsv Nov 27, 2023
5682a94
update
lingsv Nov 27, 2023
182e641
wip
lingsv Nov 28, 2023
54fe97f
update dbt vars
lingsv Nov 28, 2023
cf15582
altera parâmetros de materialização
lingsv Nov 28, 2023
9244b7a
Merge branch 'master' into staging/smtr-pipeline-sppo-recurso
mergify[bot] Nov 28, 2023
2a68604
Merge branch 'master' into staging/smtr-pipeline-sppo-recurso
mergify[bot] Nov 28, 2023
c49879d
altera agents
lingsv Nov 28, 2023
e385c85
Merge branch 'staging/smtr-pipeline-sppo-recurso' of github.com:prefe…
lingsv Nov 28, 2023
a0d7632
retira id do igor das constants
lingsv Nov 28, 2023
45be073
simplifica parâmetrop default de materialização
lingsv Nov 28, 2023
5c8cfdb
update tasks
lingsv Nov 28, 2023
0a686dc
retira tasks não utilizadas de utils
lingsv Nov 28, 2023
1fdb0ee
teste de função
lingsv Nov 28, 2023
f32261a
altera agent e project name
lingsv Nov 28, 2023
ae1745a
remove comentários desnecessários
lingsv Nov 29, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ setup.py
.vscode/*
*.hdf
*.DS_Store

.idea/*

# Byte-compiled / optimized / DLL files
__pycache__/
Expand Down
1 change: 1 addition & 0 deletions pipelines/rj_smtr/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from pipelines.rj_smtr.materialize_to_datario.flows import *
from pipelines.rj_smtr.registros_ocr_rir.flows import *
from pipelines.rj_smtr.projeto_subsidio_sppo.flows import *
from pipelines.rj_smtr.br_rj_riodejaneiro_recurso.flows import *
from pipelines.rj_smtr.veiculo.flows import *
from pipelines.rj_smtr.example.flows import *
from pipelines.rj_smtr.br_rj_riodejaneiro_bilhetagem.flows import *
Expand Down
Empty file.
181 changes: 181 additions & 0 deletions pipelines/rj_smtr/br_rj_riodejaneiro_recurso/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
# -*- coding: utf-8 -*-
"""
Flows for br_rj_riodejaneiro_recurso
"""
from copy import deepcopy

from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefect.tasks.prefect import create_flow_run, wait_for_flow_run
from prefect import Parameter, case, task
from prefect.tasks.control_flow import merge


# EMD Imports #

from pipelines.constants import constants as emd_constants
from pipelines.utils.utils import set_default_parameters
from pipelines.utils.decorators import Flow
from pipelines.utils.tasks import (
rename_current_flow_run_now_time,
get_current_flow_labels,
)

# SMTR Imports #

from pipelines.rj_smtr.constants import constants
from pipelines.rj_smtr.tasks import get_current_timestamp

from pipelines.rj_smtr.flows import default_capture_flow, default_materialization_flow
from pipelines.rj_smtr.schedules import every_day


# CAPTURA #

sppo_recurso_captura = deepcopy(default_capture_flow)
sppo_recurso_captura.name = "SMTR: Subsídio SPPO Recursos - Captura (subflow)"
sppo_recurso_captura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
sppo_recurso_captura.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)
sppo_recurso_captura = set_default_parameters(
flow=sppo_recurso_captura,
default_parameters=constants.SUBSIDIO_SPPO_RECURSO_CAPTURE_PARAMS.value,
)
# RECAPTURA #
sppo_recurso_recaptura = deepcopy(default_capture_flow)
sppo_recurso_recaptura.name = "SMTR: Subsídio SPPO Recursos - Recaptura (subflow)"
sppo_recurso_recaptura.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
sppo_recurso_recaptura.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)
sppo_recurso_recaptura = set_default_parameters(
flow=sppo_recurso_recaptura,
default_parameters=constants.SUBSIDIO_SPPO_RECURSO_CAPTURE_PARAMS.value
| {"recapture": True},
)

# MATERIALIZAÇÃO #

sppo_recurso_materializacao = deepcopy(default_materialization_flow)
sppo_recurso_materializacao.name = (
"SMTR: Subsídio SPPO Recursos - Materialização (subflow)"
)
sppo_recurso_materializacao.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
sppo_recurso_materializacao.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)

sppo_recurso_materializacao = set_default_parameters(
flow=sppo_recurso_materializacao,
default_parameters=constants.SUBSIDIO_SPPO_RECURSOS_MATERIALIZACAO_PARAMS.value,
)

with Flow(
"SMTR: Subsídio Recursos Viagens Individuais - Captura/Tratamento",
code_owners=["carolinagomes", "rafaelpinheiro"],
) as subsidio_sppo_recurso:
capture = Parameter("capture", default=True)
materialize = Parameter("materialize", default=True)
recapture = Parameter("recapture", default=True)
data_recurso = Parameter("data_recurso", default=None)
interval_minutes = Parameter("interval_minutes", default=1440)
timestamp = get_current_timestamp(data_recurso, return_str=True)

rename_flow_run = rename_current_flow_run_now_time(
prefix=subsidio_sppo_recurso.name + " ",
now_time=timestamp,
)

LABELS = get_current_flow_labels()

# Captura

with case(capture, True):
run_captura = create_flow_run(
flow_name=sppo_recurso_captura.name,
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
parameters={"timestamp": timestamp},
labels=LABELS,
)

wait_captura_true = wait_for_flow_run(
run_captura,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)

with case(capture, False):
wait_captura_false = task(
lambda: [None], checkpoint=False, name="assign_none_to_previous_runs"
)()

wait_captura = merge(wait_captura_true, wait_captura_false)

# Recaptura

with case(recapture, True):
run_recaptura = create_flow_run(
flow_name=sppo_recurso_recaptura.name,
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
labels=LABELS,
)
run_recaptura.set_upstream(wait_captura)

wait_recaptura_true = wait_for_flow_run(
run_recaptura,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)

with case(recapture, False):
wait_recaptura_false = task(
lambda: [None], checkpoint=False, name="assign_none_to_previous_runs"
)()

wait_recaptura = merge(wait_recaptura_true, wait_recaptura_false)

# Materialização

with case(materialize, True):
run_materializacao = create_flow_run(
flow_name=sppo_recurso_materializacao.name,
project_name=emd_constants.PREFECT_DEFAULT_PROJECT.value,
labels=LABELS,
upstream_tasks=[wait_captura],
)

run_materializacao.set_upstream(wait_recaptura)

wait_materializacao_true = wait_for_flow_run(
run_materializacao,
stream_states=True,
stream_logs=True,
raise_final_state=True,
)

with case(materialize, False):
wait_materializacao_false = task(
lambda: [None], checkpoint=False, name="assign_none_to_previous_runs"
)()
wait_materializacao = merge(wait_materializacao_true, wait_materializacao_false)

subsidio_sppo_recurso.set_reference_tasks(
[wait_materializacao, wait_recaptura, wait_captura]
)

subsidio_sppo_recurso.storage = GCS(emd_constants.GCS_FLOWS_BUCKET.value)
subsidio_sppo_recurso.run_config = KubernetesRun(
image=emd_constants.DOCKER_IMAGE.value,
labels=[emd_constants.RJ_SMTR_AGENT_LABEL.value],
)

# schedule
subsidio_sppo_recurso.schedule = every_day

#####
37 changes: 37 additions & 0 deletions pipelines/rj_smtr/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,3 +527,40 @@ class constants(Enum): # pylint: disable=c0103
"version": {},
},
}

# SUBSÍDIO RECURSOS VIAGENS INDIVIDUAIS
SUBSIDIO_SPPO_RECURSOS_DATASET_ID = "br_rj_riodejaneiro_recurso"
SUBSIDIO_SPPO_RECURSO_API_BASE_URL = "https://api.movidesk.com/public/v1/tickets?"
SUBSIDIO_SPPO_RECURSO_API_SECRET_PATH = "sppo_subsidio_recursos_api"
SUBSIDIO_SPPO_RECURSO_SERVICE = "serviceFull eq 'SPPO'"
SUBSIDIO_SPPO_RECURSO_CAPTURE_PARAMS = {
"partition_date_only": True,
"table_id": "recurso_sppo",
"dataset_id": SUBSIDIO_SPPO_RECURSOS_DATASET_ID,
"extract_params": {
"token": "",
"$select": "id,protocol,createdDate",
"$filter": "{dates} and serviceFull/any(serviceFull: {service})",
"$expand": "customFieldValues,customFieldValues($expand=items)",
"$orderby": "createdDate asc",
},
"interval_minutes": 1440,
"source_type": "movidesk",
"primary_key": ["protocol"],
}

SUBSIDIO_SPPO_RECURSOS_MATERIALIZACAO_PARAMS = {
"dataset_id": SUBSIDIO_SPPO_RECURSOS_DATASET_ID,
"table_id": SUBSIDIO_SPPO_RECURSO_CAPTURE_PARAMS["table_id"],
"upstream": True,
"dbt_vars": {
"date_range": {
"table_run_datetime_column_name": "data_recurso",
"delay_hours": 0,
},
"version": {},
},
}
# 1. select, 2. filter, 3.expand, passar o service na função
lingsv marked this conversation as resolved.
Show resolved Hide resolved
# TIMEOUT = 10 # em segundos
# BACKOFF_FACTOR = 1.5
34 changes: 30 additions & 4 deletions pipelines/rj_smtr/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
get_raw_data_api,
get_raw_data_gcs,
get_raw_data_db,
get_raw_recursos,
upload_run_logs_to_bq,
get_datetime_range,
read_raw_data,
Expand Down Expand Up @@ -252,7 +253,7 @@ def create_dbt_run_vars(

###############
#
# Local file managment
# Local file management
#
###############

Expand Down Expand Up @@ -669,6 +670,25 @@ def create_request_params(
elif dataset_id == constants.GTFS_DATASET_ID.value:
request_params = extract_params["filename"]

elif dataset_id == constants.SUBSIDIO_SPPO_RECURSOS_DATASET_ID.value:
extract_params["token"] = get_vault_secret(
constants.SUBSIDIO_SPPO_RECURSO_API_SECRET_PATH.value
)["data"]["token"]
start = datetime.strftime(
timestamp - timedelta(minutes=interval_minutes), "%Y-%m-%dT%H:%M:%S.%MZ"
)
end = datetime.strftime(timestamp, "%Y-%m-%dT%H:%M:%S.%MZ")
log(f" Start date {start}, end date {end}")
recurso_params = {
"dates": f"createdDate ge {start} and createdDate le {end}",
"service": constants.SUBSIDIO_SPPO_RECURSO_SERVICE.value,
} # fazer um filtro para pegar só o pedaço necessário do banco
# fazer local a chamada dos dados totais
extract_params["$filter"] = extract_params["$filter"].format(**recurso_params)
request_params = extract_params

request_url = constants.SUBSIDIO_SPPO_RECURSO_API_BASE_URL.value

return request_params, request_url


Expand Down Expand Up @@ -726,6 +746,10 @@ def get_raw_from_sources(
error, data, filetype = get_raw_data_db(
host=source_path, secret_path=secret_path, **request_params
)
elif source_type == "movidesk":
error, data, filetype = get_raw_recursos(
request_url=source_path, request_params=request_params
)
else:
raise NotImplementedError(f"{source_type} not supported")

Expand Down Expand Up @@ -1275,15 +1299,17 @@ def transform_raw_to_nested_structure(
# Check empty dataframe
if data.empty:
log("Empty dataframe, skipping transformation...")

else:
log(f"Raw data:\n{data_info_str(data)}", level="info")

log("Adding captured timestamp column...", level="info")
data["timestamp_captura"] = timestamp

log("Striping string columns...", level="info")
for col in data.columns[data.dtypes == "object"].to_list():
data[col] = data[col].str.strip()
if "customFieldValues" not in data:
log("Striping string columns...", level="info")
for col in data.columns[data.dtypes == "object"].to_list():
data[col] = data[col].str.strip()

log(f"Finished cleaning! Data:\n{data_info_str(data)}", level="info")

Expand Down
58 changes: 58 additions & 0 deletions pipelines/rj_smtr/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,21 @@
import requests
import basedosdados as bd
from basedosdados import Table
from basedosdados import Storage
import math
import pandas as pd
from google.cloud.storage.blob import Blob
import pymysql
import psycopg2
import psycopg2.extras
import time


from prefect.schedules.clocks import IntervalClock

from pipelines.constants import constants as emd_constants


from pipelines.rj_smtr.implicit_ftp import ImplicitFtpTls
from pipelines.rj_smtr.constants import constants

Expand Down Expand Up @@ -828,3 +832,57 @@ def read_raw_data(filepath: str, csv_args: dict = None) -> tuple[str, pd.DataFra
log(f"[CATCHED] Task failed with error: \n{error}", level="error")

return error, data


def get_raw_recursos(request_url: str, request_params: dict) -> tuple[str, str, str]:
"""
Returns a dataframe with recursos data from movidesk api.
"""
all_records = False
top = 1000
skip = 0
error = None
filetype = "json"
data = []

while not all_records:
try:
request_params["$top"] = top
request_params["$skip"] = skip

log(f"Request url {request_url}")

response = requests.get(
request_url,
params=request_params,
timeout=constants.MAX_TIMEOUT_SECONDS.value,
)
response.raise_for_status()

paginated_data = response.json()

if isinstance(paginated_data, dict):
paginated_data = [paginated_data]

if len(paginated_data) == top:
skip += top
time.sleep(36)

if len(paginated_data) == 0:
log("Nenhum dado para tratar.")

else:
all_records = True
data += paginated_data

log(f"Dados (paginados): {len(data)}")

except Exception as error:
error = traceback.format_exc()
log(f"[CATCHED] Task failed with error: \n{error}", level="error")
data = []
break

log(f"Request concluído, tamanho dos dados: {len(data)}.")

return error, data, filetype
Loading