Skip to content

Commit

Permalink
Merge pull request #26 from prefeitura-rio/staging/ergon_hci
Browse files Browse the repository at this point in the history
feat: add table funcionario, remove duplicity from provimento
  • Loading branch information
d116626 authored Nov 5, 2024
2 parents 4368474 + df84dac commit 7dedd0b
Show file tree
Hide file tree
Showing 4 changed files with 271 additions and 126 deletions.
9 changes: 7 additions & 2 deletions pipelines/ergon/materializa_funcionarios_saude/schedules.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,23 @@

from pipelines.constants import constants

tables = [{"table_id": "funcionarios_ativos"}, {"table_id": "funcionarios"}]


smfp_funcionarios_saude_clocks = [
IntervalClock(
interval=timedelta(days=1),
start_date=datetime(2021, 11, 23, 1, 0, tzinfo=pytz.timezone("America/Sao_Paulo")),
start_date=datetime(2021, 11, 23, 1, 0, tzinfo=pytz.timezone("America/Sao_Paulo"))
+ timedelta(minutes=5 * count),
labels=[
constants.RJ_SMFP_AGENT_LABEL.value,
],
parameter_defaults={
"dataset_id": "recursos_humanos_ergon_saude",
"table_id": "funcionarios_ativos",
"table_id": parameter["table_id"],
},
)
for count, parameter in enumerate(tables)
]
smfp_funcionarios_saude_daily_update_schedule = Schedule(
clocks=untuple(smfp_funcionarios_saude_clocks)
Expand Down
2 changes: 1 addition & 1 deletion pipelines/templates/run_dbt_model/flows.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
"""
MATERIALIZA MODELOS DO DBT...................
MATERIALIZA MODELOS DO DBT....................
"""

from copy import deepcopy
Expand Down
141 changes: 141 additions & 0 deletions queries/models/recursos_humanos_ergon_saude/funcionarios.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@

{{
config(
materialized="table",
partition_by={
"field": "cpf_particao",
"data_type": "int64",
"range": {"start": 0, "end": 100000000000, "interval": 34722222},
},
)
}}

with
funcionarios as (
select
f.id_vinculo as id_funcionario,
lpad(f.id_cpf, 11, '0') as cpf, -- Adiciona zero à esquerda caso o CPF tenha menos de 11 dígitos
f.nome
from `rj-smfp.recursos_humanos_ergon.funcionario` f
where f.id_cpf is not null -- and lpad(f.id_cpf, 11, '0') in ('')
),

provimento as (
select
p.id_funcionario,
p.id_vinculo,
p.data_inicio as provimento_inicio,
p.data_fim as provimento_fim,
p.id_setor,
p.id_cargo,
p.empresa_vinculo as id_empresa
from `rj-smfp.recursos_humanos_ergon.provimento` p
-- get the most recent id_vinculo
qualify
row_number() over (
partition by id_funcionario order by id_vinculo desc, data_inicio desc
)
= 1
),

setor as (
select
id_setor,
data_inicio as setor_inicio,
data_fim as setor_fim,
id_setor_pai,
nome as setor_nome,
sigla as setor_sigla,
id_secretaria
from `rj-smfp.recursos_humanos_ergon.setor`
-- get the most recent of the setor
qualify row_number() over (partition by id_setor order by data_inicio desc) = 1
),

cargo as (
select
id_cargo,
nome as cargo_nome,
categoria as cargo_categoria,
subcategoria as cargo_subcategoria,
from `rj-smfp.recursos_humanos_ergon.cargo`
),

vacancia_vinculo as (
select id_funcionario, id_vinculo, data_vacancia
from `rj-smfp.recursos_humanos_ergon.vinculo`
-- get the most recent id_vinculo
-- qualify
-- row_number() over (partition by id_funcionario order by id_vinculo desc) = 1
),

empresa as (
select
id_empresa,
nome_empresa as empresa_nome,
sigla as empresa_sigla,
cnpj as empresa_cnpj
from `rj-smfp.recursos_humanos_ergon.empresas`
),

secretaria as (
select
id_unidade_administrativa as id_secretaria,
sigla_unidade_administrativa as secretaria_sigla,
nome_unidade_administrativa as secretaria_nome
from `rj-iplanrio.unidades_administrativas.orgaos`
),

funcionarios_saude as (
select
f.cpf,
array_agg(
struct(
f.id_funcionario,
f.nome,
case
when (p.provimento_fim is null) and (vv.data_vacancia is null)
then true
else false
end as status_ativo,
p.provimento_inicio,
p.provimento_fim,
vv.data_vacancia,
s.id_secretaria,
sec.secretaria_sigla,
sec.secretaria_nome,
p.id_empresa,
s.setor_nome,
s.setor_sigla,
s.setor_inicio,
s.setor_fim,
c.cargo_nome,
c.cargo_categoria,
c.cargo_subcategoria,
emp.empresa_nome,
emp.empresa_sigla,
emp.empresa_cnpj
)
) as dados,
safe_cast(f.cpf as int64) as cpf_particao
from funcionarios f
left join provimento p on f.id_funcionario = p.id_funcionario
left join setor s on p.id_setor = s.id_setor
left join cargo c on p.id_cargo = c.id_cargo
left join
vacancia_vinculo vv
on f.id_funcionario = vv.id_funcionario
and p.id_vinculo = vv.id_vinculo
left join empresa emp on p.id_empresa = emp.id_empresa
left join secretaria sec on s.id_secretaria = sec.id_secretaria

where
s.id_secretaria in ('1800')
or p.id_empresa
in ('32', '80', '81', '82', '83', '84', '85', '86', '87', '88', '89', '97')

group by f.cpf
)

select *
from funcionarios_saude
Loading

0 comments on commit 7dedd0b

Please sign in to comment.