Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Project/mentoria spark #9

Open
wants to merge 2 commits into
base: project/mentoria-spark
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions app_name/jobs/audit_bronze_to_silver.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from app_name.configs.spark_helper import create_delta_lake_session
from pyspark.sql import functions as F


# variable path
bronze_audit_path = "s3://bronze/auditoria_municipal/6-siap-net-orgaos-municipais-autoridades-2016.csv"
silver_audit_path = "s3://silver/auditoria_municipal/"
Expand All @@ -9,12 +10,13 @@
# bronze to silver function
def audit_bronze_to_silver(bronze_path: str, silver_path: str):
spark = create_delta_lake_session('auditoria')
spark.conf.set('spark.sql.sources.partitionOverwriteMode', 'dynamic')

# Create df
auditoria_df = (spark.read.option("delimiter", ";")
.option("header", True)
.option('encoding', 'ISO-8859-1')
.csv(bronze_path))
.option("header", True)
.option('encoding', 'ISO-8859-1')
.csv(bronze_path))

# Change Schema
auditoria_df_01 = auditoria_df.select(F.col('CodigoMunicipio').cast('int')
Expand Down
2 changes: 2 additions & 0 deletions app_name/jobs/despesas_bronze_to_silver.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from app_name.configs.spark_helper import create_delta_lake_session
from pyspark.sql import functions as F


# variable path
bronze_despesas_path = "s3://bronze/despesas/despesas-2023.csv"
silver_despesas_path = "s3://silver/despesas/"
Expand All @@ -9,6 +10,7 @@
# bronze to silver function
def despesas_bronze_to_silver(bronze_path: str, silver_path: str):
spark = create_delta_lake_session('auditoria')
spark.conf.set('spark.sql.sources.partitionOverwriteMode', 'dynamic')

# Create df
despesas_df = (spark.read.option("delimiter", ";")
Expand Down
27 changes: 27 additions & 0 deletions app_name/jobs/general_etl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
from app_name.configs.spark_helper import create_delta_lake_session
from pyspark.sql import functions as F
from pyspark.sql.types import *


# bronze to silver function
def bronze_to_silver(bronze_path: str, silver_path: str, schema:str, partition:str, file_type:str):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use o factory pattern para lidar com tipos de leituras diferentes:
https://dagster.io/blog/python-factory-patterns

spark = create_delta_lake_session('bronze_to_silver')
spark.conf.set('spark.sql.sources.partitionOverwriteMode', 'dynamic')

if file_type=='csv':

# Create df
df = (spark.read.option("delimiter", ";")
.option("header", True)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use args and kwargs para controlar as opções de leitura dos CSVs. Não é mais necessário passar os parâmetros de leitura como options. Todos podem ser passados dentro do método .csv. Verifique a documentação
https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrameReader.csv.html?highlight=csv#pyspark.sql.DataFrameReader.csv

.option('encoding', 'ISO-8859-1')
.option('dateFormat', 'dd-MM-yyyy')
.schema(schema)
.csv(bronze_path))

# write df to parquet
df.write.parquet(path=silver_path, mode="overwrite", partitionBy=partition)

spark.stop()

else:
print('Error: wrong file type!')
33 changes: 33 additions & 0 deletions app_name/jobs/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
from general_etl import bronze_to_silver
from pyspark.sql.types import *


# variable
bronze_audit_path = "s3://bronze/auditoria_municipal/6-siap-net-orgaos-municipais-autoridades-2016.csv"
silver_audit_path = "s3://silver/auditoria_municipal/"
partition="AnoExercicio"
file_type='csv'

audit_schema=StructType([

StructField('CodigoMunicipio',IntegerType(), True)
,StructField('NomeMunicipio',StringType(), True)
,StructField('CodigoTipoOrgao',IntegerType(), True)
,StructField('NomeTipoOrgao',StringType(), True)
,StructField('AnoExercicio',IntegerType(), True)
,StructField('SequenciaOrgao',IntegerType(), True)
,StructField('NomeOrgao',StringType(), True)
,StructField('CodigoAutoridade',IntegerType(), True)
,StructField('Trata',StringType(), True)
,StructField('mentoAutoridade',StringType(), True)
,StructField('CargoAutoridade',StringType(), True)
,StructField('SequenciaAutoridade',IntegerType(), True)
,StructField('Nome',StringType(), True)
,StructField('Sexo',StringType(), True)
,StructField('InicioMandato',DateType(), True)
,StructField('FimMandato',DateType(), True)
])


if __name__ == '__main__':
bronze_to_silver(bronze_audit_path, silver_audit_path, audit_schema, partition, file_type)
Loading