Skip to content

Commit

Permalink
Merge pull request #1 from prefeitura-rio/staging/migrate_smfp_flows
Browse files Browse the repository at this point in the history
[FEAT] Migra flows da SMFP para nova infra
  • Loading branch information
d116626 authored Jan 12, 2024
2 parents 5fddd4a + 3ef6d70 commit b28036d
Show file tree
Hide file tree
Showing 107 changed files with 8,440 additions and 57 deletions.
6 changes: 6 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ RUN apt-get update && \
# Start Python image
FROM python:${PYTHON_VERSION}

# Install git
RUN apt-get update && \

Check failure on line 21 in Dockerfile

View workflow job for this annotation

GitHub Actions / Lint

DL3008 warning: Pin versions in apt get install. Instead of `apt-get install <package>` use `apt-get install <package>=<version>`

Check failure on line 21 in Dockerfile

View workflow job for this annotation

GitHub Actions / Lint

DL3015 info: Avoid additional packages by specifying `--no-install-recommends`
apt-get install -y git && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*

# Setting environment with prefect version
ARG PREFECT_VERSION=1.4.1
ENV PREFECT_VERSION $PREFECT_VERSION
Expand Down
2 changes: 1 addition & 1 deletion pipelines/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ class constants(Enum):
######################################
# Agent labels
######################################
# EXAMPLE_AGENT_LABEL = "example_agent"
RJ_SMFP_AGENT_LABEL = "smfp"

######################################
# Other constants
Expand Down
4 changes: 4 additions & 0 deletions pipelines/egpweb_metas/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# -*- coding: utf-8 -*-
from pipelines.egpweb_metas.dump_db_metas.flows import * # noqa
from pipelines.egpweb_metas.dump_url_metas.flows import * # noqa
from pipelines.egpweb_metas.goals_dashboard_dbt.flows import * # noqa
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
Empty file.
2 changes: 2 additions & 0 deletions pipelines/ergon/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# -*- coding: utf-8 -*-
from pipelines.ergon.dump_db_ergon.flows import * # noqa
40 changes: 40 additions & 0 deletions pipelines/ergon/dump_db_ergon/flows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# -*- coding: utf-8 -*-
"""
Database dumping flows for segovi project.
"""

from copy import deepcopy

from prefect.run_configs import KubernetesRun
from prefect.storage import GCS
from prefeitura_rio.pipelines_templates.dump_db.flows import flow as dump_sql_flow
from prefeitura_rio.pipelines_utils.prefect import set_default_parameters
from prefeitura_rio.pipelines_utils.state_handlers import handler_inject_bd_credentials

from pipelines.constants import constants
from pipelines.ergon.dump_db_ergon.schedules import ergon_monthly_update_schedule

dump_sql_ergon_flow = deepcopy(dump_sql_flow)
dump_sql_ergon_flow.state_handlers = [handler_inject_bd_credentials]
dump_sql_ergon_flow.name = "SMFP: ergon - Ingerir tabelas de banco SQL"
dump_sql_ergon_flow.storage = GCS(constants.GCS_FLOWS_BUCKET.value)
dump_sql_ergon_flow.run_config = KubernetesRun(
image=constants.DOCKER_IMAGE.value,
labels=[
constants.RJ_SMFP_AGENT_LABEL.value,
],
)

ergon_default_parameters = {
"db_database": "P01.PCRJ",
"db_host": "10.70.6.21",
"db_port": "1526",
"db_type": "oracle",
"infisical_secret_path": "/db-ergon-prod",
"dataset_id": "recursos_humanos_ergon",
}
dump_sql_ergon_flow = set_default_parameters(
dump_sql_ergon_flow, default_parameters=ergon_default_parameters
)

dump_sql_ergon_flow.schedule = ergon_monthly_update_schedule
328 changes: 328 additions & 0 deletions pipelines/ergon/dump_db_ergon/schedules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,328 @@
# -*- coding: utf-8 -*-
# flake8: noqa: E501
"""
Schedules for the database dump pipeline
"""

from datetime import datetime, timedelta

import pytz
from prefect.schedules import Schedule
from prefeitura_rio.pipelines_utils.io import untuple_clocks as untuple
from prefeitura_rio.pipelines_utils.prefect import generate_dump_db_schedules

from pipelines.constants import constants

#####################################
#
# Ergon Schedules
#
#####################################

ergon_queries = {
"cargo": {
"materialize_after_dump": True,
"biglake_table": True,
"materialization_mode": "prod",
"dump_mode": "overwrite",
"execute_query": "SELECT * FROM C_ERGON.VW_DLK_ERG_CARGOS_",
},
"categoria": {
"materialize_after_dump": True,
"biglake_table": True,
"materialization_mode": "prod",
"dump_mode": "overwrite",
"execute_query": "SELECT * FROM C_ERGON.VW_DLK_ERG_CATEGORIAS_",
},
"empresa": {
"materialize_after_dump": True,
"biglake_table": True,
"materialization_mode": "prod",
"dump_mode": "overwrite",
"execute_query": "SELECT * FROM C_ERGON.VW_DLK_ERG_EMPRESAS",
},
"matricula": {
"materialize_after_dump": True,
"biglake_table": True,
"materialization_mode": "prod",
"dump_mode": "overwrite",
"execute_query": "SELECT * FROM C_ERGON.VW_DLK_ERG_ERG_MATRICULAS",
},
"fita_banco": {
"materialize_after_dump": True,
"biglake_table": True,
"materialization_mode": "prod",
"partition_columns": "MES_ANO",
"dump_mode": "append",
"lower_bound_date": "current_month",
"execute_query": "SELECT * FROM C_ERGON.VW_DLK_ERG_FITA_BANCO",
},
"folha_empresa": {
"materialize_after_dump": True,
"biglake_table": True,
"materialization_mode": "prod",
"partition_columns": "MES_ANO",
"dump_mode": "append",
"lower_bound_date": "current_month",
"execute_query": "SELECT * FROM C_ERGON.VW_DLK_ERG_FOLHAS_EMP",
},
"forma_provimento": {
"materialize_after_dump": True,
"biglake_table": True,
"materialization_mode": "prod",
"dump_mode": "overwrite",
"execute_query": "SELECT * FROM C_ERGON.VW_DLK_ERG_FORMAS_PROV_",
},
"funcionario": {
"materialize_after_dump": True,
"biglake_table": True,
"materialization_mode": "prod",
"dump_mode": "overwrite",
"execute_query": "SELECT * FROM C_ERGON.VW_DLK_ERG_FUNCIONARIOS",
},
"horario_trabalho": {
"materialize_after_dump": True,
"biglake_table": True,
"materialization_mode": "prod",
"dump_mode": "overwrite",
"execute_query": "SELECT * FROM C_ERGON.VW_DLK_ERG_HORARIO_TRAB_",
},
"setor": {
"materialize_after_dump": True,
"biglake_table": True,
"materialization_mode": "prod",
"dump_mode": "overwrite",
"execute_query": "SELECT * FROM C_ERGON.VW_DLK_ERG_HSETOR_",
},
"jornada": {
"materialize_after_dump": True,
"biglake_table": True,
"materialization_mode": "prod",
"dump_mode": "overwrite",
"execute_query": "SELECT * FROM C_ERGON.VW_DLK_ERG_JORNADAS_",
},
"orgaos_externos": {
"materialize_after_dump": True,
"biglake_table": True,
"materialization_mode": "prod",
"dump_mode": "overwrite",
"execute_query": "SELECT * FROM C_ERGON.VW_DLK_ERG_ORGAOS_EXTERNOS",
},
"orgaos_regime_juridico": {
"materialize_after_dump": True,
"biglake_table": True,
"materialization_mode": "prod",
"dump_mode": "overwrite",
"execute_query": "SELECT * FROM C_ERGON.VW_DLK_ERG_ORGAOS_REGIMES_JUR_",
},
"provimento": {
"materialize_after_dump": True,
"biglake_table": True,
"materialization_mode": "prod",
"dump_mode": "overwrite",
"execute_query": "SELECT * FROM C_ERGON.VW_DLK_ERG_PROVIMENTOS_EV",
},
"regime_juridico": {
"materialize_after_dump": True,
"biglake_table": True,
"materialization_mode": "prod",
"dump_mode": "overwrite",
"execute_query": "SELECT * FROM C_ERGON.VW_DLK_ERG_REGIMES_JUR_",
},
"tipo_folha": {
"materialize_after_dump": True,
"biglake_table": True,
"materialization_mode": "prod",
"dump_mode": "overwrite",
"execute_query": "SELECT * FROM C_ERGON.VW_DLK_ERG_TIPO_FOLHA",
},
"tipo_orgao": {
"materialize_after_dump": True,
"biglake_table": True,
"materialization_mode": "prod",
"dump_mode": "overwrite",
"execute_query": "SELECT * FROM C_ERGON.VW_DLK_ERG_TIPO_ORGAO",
},
"tipo_vinculo": {
"materialize_after_dump": True,
"biglake_table": True,
"materialization_mode": "prod",
"dump_mode": "overwrite",
"execute_query": "SELECT * FROM C_ERGON.VW_DLK_ERG_TIPO_VINC_",
},
"vinculo": {
"materialize_after_dump": True,
"biglake_table": True,
"materialization_mode": "prod",
"dump_mode": "overwrite",
"execute_query": "SELECT * FROM C_ERGON.VW_DLK_ERG_VINCULOS",
},
"licenca_afastamento": {
"materialize_after_dump": True,
"biglake_table": True,
"materialization_mode": "prod",
"partition_columns": "DTINI",
"dump_mode": "append",
"lower_bound_date": "current_month",
"execute_query": """
SELECT NUMFUNC,NUMVINC,DTINI,DTFIM,TIPOFREQ,CODFREQ,MOTIVO,DTPREVFIM,FLEX_CAMPO_01,
FLEX_CAMPO_02,EMP_CODIGO,FLEX_CAMPO_07
FROM ERGON.LIC_AFAST
""",
},
"frequencia": {
"materialize_after_dump": True,
"biglake_table": True,
"materialization_mode": "prod",
"partition_columns": "DTINI",
"dump_mode": "append",
"lower_bound_date": "current_month",
"execute_query": """
SELECT NUMFUNC,NUMVINC,DTINI,DTFIM,TIPOFREQ,CODFREQ,OBS,EMP_CODIGO
FROM ERGON.FREQUENCIAS
""",
},
"vantagens": {
"materialize_after_dump": True,
"biglake_table": True,
"materialization_mode": "prod",
"partition_columns": "DTINI",
"dump_mode": "append",
"lower_bound_date": "current_month",
"execute_query": """
SELECT NUMFUNC,NUMVINC,VANTAGEM,DTINI,DTFIM,VALOR,INFO,TIPO_INCORPORACAO,PERC_INC_FUNCAO,
INC_TABELAVENC,INC_REFERENCIA,OBS,VALOR2,INFO2,VALOR3,INFO3,VALOR4,INFO4,VALOR5,INFO5,
VALOR6,INFO6,FLEX_CAMPO_05,EMP_CODIGO,CHAVEVANT
FROM ERGON.VANTAGENS
""",
},
"total_contagem": {
"materialize_after_dump": True,
"biglake_table": True,
"materialization_mode": "prod",
"dump_mode": "overwrite",
"execute_query": """
SELECT CHAVE, NUMFUNC,NUMVINC,FINALIDADE,DIASTOT,DIASFPUB,DIASFPUBESP,TOTAL_PERIODOS,
TOTAL_ANOS,DATA_PROXIMO,NOME_PROXIMO,EMP_CODIGO
FROM ERGON.TOTAL_CONTA
""",
"interval": timedelta(days=15),
},
"pre_contagem": {
"materialize_after_dump": True,
"biglake_table": True,
"materialization_mode": "prod",
"dump_mode": "overwrite",
"execute_query": """
SELECT FINALIDADE,NUMFUNC,NUMVINC,PERIODOS,OFFSET,DTINI,EMP_CODIGO,FLEX_CAMPO_01
FROM ERGON.PRE_CONTA
""",
},
"averbacoes": {
"materialize_after_dump": True,
"biglake_table": True,
"materialization_mode": "prod",
"dump_mode": "overwrite",
"execute_query": """
SELECT NUMFUNC,NUMVINC,CHAVE,DTINI,DTFIM,INSTITUICAO,TIPOTEMPO,DATA_A_CONTAR,TOTAL_DIAS,
MOTIVO,SOBREPOE,EMP_CODIGO,OBS,REGPREV
FROM ERGON.AVERBACOES_CONTA
""",
},
"averbacoes_contagem": {
"materialize_after_dump": True,
"biglake_table": True,
"materialization_mode": "prod",
"dump_mode": "overwrite",
"execute_query": """
SELECT NUMFUNC,NUMVINC,CHAVEAVERB,FINALIDADE,DIAS,EMP_CODIGO
FROM ERGON.AVERB_OQUE_CONTA
""",
},
"frequencia_antigo": {
"materialize_after_dump": True,
"biglake_table": True,
"materialization_mode": "prod",
"dump_mode": "overwrite",
"execute_query": """
SELECT M9, SF_OCORRENCIA, SF_DT_OC_Y2
FROM C_ERGON.VW_SIMPA_SIRHU_FREQUENCIA_GBP
""",
},
"afastamento_antigo": {
"materialize_after_dump": True,
"biglake_table": True,
"materialization_mode": "prod",
"dump_mode": "overwrite",
"execute_query": """
SELECT M10, SA_DT_AFAS_Y2, SA_DT_PRER_Y2, SA_DT_RETR_Y2
FROM C_ERGON.VW_SIMPA_SIRHU_AFASTAMENTO_GBP
""",
},
"afastamento_antigo_nomes": {
"materialize_after_dump": True,
"biglake_table": True,
"materialization_mode": "prod",
"dump_mode": "overwrite",
"execute_query": """
SELECT
EMP_CODIGO, AFAST_COD, AFAST_DESCR
FROM SIMPA.SIRHU_DBTABELAS_AFASTAMENTO
""",
},
"tipo_tempo": {
"materialize_after_dump": True,
"biglake_table": True,
"materialization_mode": "prod",
"dump_mode": "overwrite",
"execute_query": """
SELECT SIGLA, NOME, APOSENTADORIA, FERIAS, DIAS_FER, ADICTSERV, LICESP, DIAS_LICESP,
ADICTCHEFIA, PROGRESSAO
FROM ERGON.TIPO_TEMPO
""",
},
"ficha_financeira": {
"materialize_after_dump": True,
"biglake_table": True,
"materialization_mode": "prod",
"dump_mode": "append",
"lower_bound_date": "current_month",
"partition_columns": "MES_ANO_FOLHA",
"execute_query": """
SELECT MES_ANO_FOLHA,NUM_FOLHA,LANCAMENTO,NUMFUNC,NUMVINC,NUMPENS,MES_ANO_DIREITO,
RUBRICA,TIPO_RUBRICA,DESC_VANT,COMPLEMENTO,VALOR,CORRECAO,EXECUCAO,EMP_CODIGO
FROM ERGON.FICHAS_FINANCEIRAS
""",
},
"ficha_financeira_contabil": {
"materialize_after_dump": True,
"biglake_table": True,
"materialization_mode": "prod",
"dump_mode": "append",
"lower_bound_date": "current_month",
"partition_columns": "MES_ANO_FOLHA",
"execute_query": """
SELECT MES_ANO_FOLHA,NUM_FOLHA,NUMFUNC,NUMVINC,NUMPENS,SETOR,SECRETARIA,TIPO_FUNC,
ATI_INAT_PENS,DETALHA,RUBRICA,TIPO_RUBRICA,MES_ANO_DIREITO,DESC_VANT,VALOR,COMPLEMENTO,
TIPO_CLASSIF,CLASSIFICACAO,TIPO_CLASSIF_FR,CLASSIF_FR,ELEMDESP,TIPORUB,EMP_CODIGO
FROM ERGON.IPL_PT_FICHAS
""",
},
}

ergon_clocks = generate_dump_db_schedules(
interval=timedelta(days=1),
start_date=datetime(2022, 11, 9, 22, 30, tzinfo=pytz.timezone("America/Sao_Paulo")),
labels=[
constants.RJ_SMFP_AGENT_LABEL.value,
],
db_database="P01.PCRJ",
db_host="10.70.6.21",
db_port="1526",
db_type="oracle",
dataset_id="recursos_humanos_ergon",
infisical_secret_path="/db-ergon-prod",
table_parameters=ergon_queries,
)

ergon_monthly_update_schedule = Schedule(clocks=untuple(ergon_clocks))
Loading

0 comments on commit b28036d

Please sign in to comment.