Skip to content

Commit

Permalink
feat: add sancao fornecedor
Browse files Browse the repository at this point in the history
  • Loading branch information
d116626 committed Feb 7, 2024
1 parent f822e9b commit 2fea94c
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 0 deletions.
44 changes: 44 additions & 0 deletions pipelines/sigma/dump_db_sancao_fornecedor/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# -*- coding: utf-8 -*-
"""
Database dumping flows for SMFP SIGMA system.
"""
from copy import deepcopy

from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefeitura_rio.pipelines_templates.dump_db.flows import flow as dump_sql_flow
from prefeitura_rio.pipelines_utils.prefect import set_default_parameters
from prefeitura_rio.pipelines_utils.state_handlers import handler_inject_bd_credentials

from pipelines.constants import constants
from pipelines.sigma.dump_db_sancao_fornecedor.schedules import (
sigma_daily_update_schedule,
)

rj_smfp_dump_db_sigma_flow = deepcopy(dump_sql_flow)
rj_smfp_dump_db_sigma_flow.name = "SMFP: SIGMA - Sancao Fornecedor - Ingerir tabelas de banco SQL"
rj_smfp_dump_db_sigma_flow.state_handlers = [handler_inject_bd_credentials]
rj_smfp_dump_db_sigma_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value)

rj_smfp_dump_db_sigma_flow.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
labels=[
constants.RJ_SMFP_AGENT_LABEL.value, # label do agente
],
)

rj_smfp_dump_db_sigma_default_parameters = {
"db_database": "CP01.SMF",
"db_host": "10.90.31.22",
"db_port": "1521",
"db_type": "oracle",
"dataset_id": "adm_orcamento_sigma",
"infisical_secret_path": "/db-sigma",
}

rj_smfp_dump_db_sigma_flow = set_default_parameters(
rj_smfp_dump_db_sigma_flow,
default_parameters=rj_smfp_dump_db_sigma_default_parameters,
)

rj_smfp_dump_db_sigma_flow.schedule = sigma_daily_update_schedule
59 changes: 59 additions & 0 deletions pipelines/sigma/dump_db_sancao_fornecedor/schedules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# -*- coding: utf-8 -*-
"""
Schedules for the SMFP SIGMA SANCAO ADMINISTRATIVA.
"""

from datetime import datetime, timedelta

import pytz
from prefect.schedules import Schedule
from prefeitura_rio.pipelines_utils.io import untuple_clocks as untuple
from prefeitura_rio.pipelines_utils.prefect import generate_dump_db_schedules

from pipelines.constants import constants

#####################################
#
# SMFP SIGMA Schedules
#
#####################################

_sigma_queries = {
"sancao_fornecedor": {
"biglake_table": True,
"materialize_after_dump": True,
"materialization_mode": "prod",
"dump_mode": "overwrite",
"execute_query": """
SELECT
CPF_CNPJ,
RAZAO_SOCIAL,
NR_ORDEM,
PROCESSO_ORIGEM,
PROCESSO_INSTRUTIVO,
PROCESSO_FATURA,
CD_SANCAO,
DS_SANCAO,
DT_SANCAO,
DT_EXTINCAO_SANCAO
FROM SIGMA.VW_SANCAO_ADMINISTRATIVA
""", # noqa
},
}

sigma_infra_clocks = generate_dump_db_schedules(
interval=timedelta(days=1),
start_date=datetime(2022, 3, 21, 1, 0, tzinfo=pytz.timezone("America/Sao_Paulo")),
labels=[
constants.RJ_SMFP_AGENT_LABEL.value,
],
db_database="CP01.SMF",
db_host="10.90.31.22",
db_port="1521",
db_type="oracle",
dataset_id="adm_orcamento_sigma",
infisical_secret_path="/db-sigma",
table_parameters=_sigma_queries,
)

sigma_daily_update_schedule = Schedule(clocks=untuple(sigma_infra_clocks))

0 comments on commit 2fea94c

Please sign in to comment.