diff --git a/app_name/jobs/audit_bronze_to_silver.py b/app_name/jobs/audit_bronze_to_silver.py index 9420247..5a0f619 100644 --- a/app_name/jobs/audit_bronze_to_silver.py +++ b/app_name/jobs/audit_bronze_to_silver.py @@ -1,6 +1,7 @@ from app_name.configs.spark_helper import create_delta_lake_session from pyspark.sql import functions as F + # variable path bronze_audit_path = "s3://bronze/auditoria_municipal/6-siap-net-orgaos-municipais-autoridades-2016.csv" silver_audit_path = "s3://silver/auditoria_municipal/" @@ -9,12 +10,13 @@ # bronze to silver function def audit_bronze_to_silver(bronze_path: str, silver_path: str): spark = create_delta_lake_session('auditoria') + spark.conf.set('spark.sql.sources.partitionOverwriteMode', 'dynamic') # Create df auditoria_df = (spark.read.option("delimiter", ";") - .option("header", True) - .option('encoding', 'ISO-8859-1') - .csv(bronze_path)) + .option("header", True) + .option('encoding', 'ISO-8859-1') + .csv(bronze_path)) # Change Schema auditoria_df_01 = auditoria_df.select(F.col('CodigoMunicipio').cast('int') diff --git a/app_name/jobs/despesas_bronze_to_silver.py b/app_name/jobs/despesas_bronze_to_silver.py index 0523bd4..77513be 100644 --- a/app_name/jobs/despesas_bronze_to_silver.py +++ b/app_name/jobs/despesas_bronze_to_silver.py @@ -1,6 +1,7 @@ from app_name.configs.spark_helper import create_delta_lake_session from pyspark.sql import functions as F + # variable path bronze_despesas_path = "s3://bronze/despesas/despesas-2023.csv" silver_despesas_path = "s3://silver/despesas/" @@ -9,6 +10,7 @@ # bronze to silver function def despesas_bronze_to_silver(bronze_path: str, silver_path: str): spark = create_delta_lake_session('auditoria') + spark.conf.set('spark.sql.sources.partitionOverwriteMode', 'dynamic') # Create df despesas_df = (spark.read.option("delimiter", ";") diff --git a/app_name/jobs/general_etl.py b/app_name/jobs/general_etl.py new file mode 100644 index 0000000..f3642e1 --- /dev/null +++ b/app_name/jobs/general_etl.py @@ -0,0 +1,27 @@ +from app_name.configs.spark_helper import create_delta_lake_session +from pyspark.sql import functions as F +from pyspark.sql.types import * + + +# bronze to silver function +def bronze_to_silver(bronze_path: str, silver_path: str, schema:str, partition:str, file_type:str): + spark = create_delta_lake_session('bronze_to_silver') + spark.conf.set('spark.sql.sources.partitionOverwriteMode', 'dynamic') + + if file_type=='csv': + + # Create df + df = (spark.read.option("delimiter", ";") + .option("header", True) + .option('encoding', 'ISO-8859-1') + .option('dateFormat', 'dd-MM-yyyy') + .schema(schema) + .csv(bronze_path)) + + # write df to parquet + df.write.parquet(path=silver_path, mode="overwrite", partitionBy=partition) + + spark.stop() + + else: + print('Error: wrong file type!') \ No newline at end of file diff --git a/app_name/jobs/main.py b/app_name/jobs/main.py new file mode 100644 index 0000000..c811ef2 --- /dev/null +++ b/app_name/jobs/main.py @@ -0,0 +1,33 @@ +from general_etl import bronze_to_silver +from pyspark.sql.types import * + + +# variable +bronze_audit_path = "s3://bronze/auditoria_municipal/6-siap-net-orgaos-municipais-autoridades-2016.csv" +silver_audit_path = "s3://silver/auditoria_municipal/" +partition="AnoExercicio" +file_type='csv' + +audit_schema=StructType([ + + StructField('CodigoMunicipio',IntegerType(), True) + ,StructField('NomeMunicipio',StringType(), True) + ,StructField('CodigoTipoOrgao',IntegerType(), True) + ,StructField('NomeTipoOrgao',StringType(), True) + ,StructField('AnoExercicio',IntegerType(), True) + ,StructField('SequenciaOrgao',IntegerType(), True) + ,StructField('NomeOrgao',StringType(), True) + ,StructField('CodigoAutoridade',IntegerType(), True) + ,StructField('Trata',StringType(), True) + ,StructField('mentoAutoridade',StringType(), True) + ,StructField('CargoAutoridade',StringType(), True) + ,StructField('SequenciaAutoridade',IntegerType(), True) + ,StructField('Nome',StringType(), True) + ,StructField('Sexo',StringType(), True) + ,StructField('InicioMandato',DateType(), True) + ,StructField('FimMandato',DateType(), True) +]) + + +if __name__ == '__main__': + bronze_to_silver(bronze_audit_path, silver_audit_path, audit_schema, partition, file_type) \ No newline at end of file diff --git a/tests/SchemaTests.ipynb b/tests/SchemaTests.ipynb new file mode 100644 index 0000000..e9c2c78 --- /dev/null +++ b/tests/SchemaTests.ipynb @@ -0,0 +1,372 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "from pyspark.sql import SparkSession" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "SLF4J: Class path contains multiple SLF4J bindings.\n", + "SLF4J: Found binding in [jar:file:/home/glue_user/spark/jars/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]\n", + "SLF4J: Found binding in [jar:file:/home/glue_user/spark/jars/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]\n", + "SLF4J: Found binding in [jar:file:/home/glue_user/aws-glue-libs/jars/log4j-slf4j-impl-2.17.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]\n", + "SLF4J: Found binding in [jar:file:/home/glue_user/aws-glue-libs/jars/slf4j-reload4j-1.7.36.jar!/org/slf4j/impl/StaticLoggerBinder.class]\n", + "SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.\n", + "SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]\n", + "Setting default log level to \"WARN\".\n", + "To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).\n" + ] + } + ], + "source": [ + "spark = SparkSession.builder.getOrCreate()" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "spark.conf.set('spark.sql.sources.partitionOverwriteMode', 'dynamic')" + ] + }, + { + "cell_type": "code", + "execution_count": 45, + "metadata": {}, + "outputs": [], + "source": [ + "import string\n", + "columns = 5\n", + "linhas = 10\n", + "df = spark.createDataFrame([[int(i*j/2) for i in range(columns)] for j in range(linhas)], [i for i in string.ascii_uppercase[:columns]])" + ] + }, + { + "cell_type": "code", + "execution_count": 46, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+---+---+---+---+\n", + "| A| B| C| D| E|\n", + "+---+---+---+---+---+\n", + "| 0| 0| 0| 0| 0|\n", + "| 0| 0| 1| 1| 2|\n", + "| 0| 1| 2| 3| 4|\n", + "| 0| 1| 3| 4| 6|\n", + "| 0| 2| 4| 6| 8|\n", + "| 0| 2| 5| 7| 10|\n", + "| 0| 3| 6| 9| 12|\n", + "| 0| 3| 7| 10| 14|\n", + "| 0| 4| 8| 12| 16|\n", + "| 0| 4| 9| 13| 18|\n", + "+---+---+---+---+---+\n", + "\n" + ] + } + ], + "source": [ + "df.show()" + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "metadata": {}, + "outputs": [], + "source": [ + "df = df.cache()" + ] + }, + { + "cell_type": "code", + "execution_count": 22, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+-----+\n", + "| B|count|\n", + "+---+-----+\n", + "| 0| 2|\n", + "| 1| 2|\n", + "| 2| 2|\n", + "| 3| 2|\n", + "| 4| 2|\n", + "+---+-----+\n", + "\n" + ] + } + ], + "source": [ + "df2 = df.groupBy('B').count()\n", + "df2.show()" + ] + }, + { + "cell_type": "code", + "execution_count": 23, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+---+---+---+---+\n", + "| A| B| C| D| E|\n", + "+---+---+---+---+---+\n", + "| 0| 0| 0| 0| 0|\n", + "| 0| 0| 1| 1| 2|\n", + "| 0| 1| 2| 3| 4|\n", + "| 0| 1| 3| 4| 6|\n", + "| 0| 2| 4| 6| 8|\n", + "| 0| 2| 5| 7| 10|\n", + "| 0| 3| 6| 9| 12|\n", + "| 0| 3| 7| 10| 14|\n", + "| 0| 4| 8| 12| 16|\n", + "| 0| 4| 9| 13| 18|\n", + "+---+---+---+---+---+\n", + "\n" + ] + } + ], + "source": [ + "df.show()" + ] + }, + { + "cell_type": "code", + "execution_count": 26, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + " \r" + ] + } + ], + "source": [ + "df.drop('A').filter('B < 3').write.parquet('Table1', partitionBy='B')" + ] + }, + { + "cell_type": "code", + "execution_count": 29, + "metadata": {}, + "outputs": [], + "source": [ + "df.filter('B >= 3').write.parquet('Table1', partitionBy='B', mode='overwrite')" + ] + }, + { + "cell_type": "code", + "execution_count": 33, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+---+---+----+---+\n", + "| C| D| E| A| B|\n", + "+---+---+---+----+---+\n", + "| 1| 1| 2|null| 0|\n", + "| 0| 0| 0|null| 0|\n", + "| 2| 3| 4|null| 1|\n", + "| 3| 4| 6|null| 1|\n", + "| 5| 7| 10|null| 2|\n", + "| 4| 6| 8|null| 2|\n", + "| 7| 10| 14| 0| 3|\n", + "| 6| 9| 12| 0| 3|\n", + "| 9| 13| 18| 0| 4|\n", + "| 8| 12| 16| 0| 4|\n", + "+---+---+---+----+---+\n", + "\n" + ] + } + ], + "source": [ + "df2 = spark.read.parquet('Table1', mergeSchema=True)\n", + "df2.orderBy('B').show()" + ] + }, + { + "cell_type": "code", + "execution_count": 32, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+---+---+---+\n", + "| A| C| D| E|\n", + "+---+---+---+---+\n", + "| 0| 8| 12| 16|\n", + "| 0| 9| 13| 18|\n", + "+---+---+---+---+\n", + "\n" + ] + } + ], + "source": [ + "spark.read.parquet('Table1/B=4/').show()" + ] + }, + { + "cell_type": "code", + "execution_count": 35, + "metadata": {}, + "outputs": [], + "source": [ + "df2.filter('B < 3').write.parquet('Table2')" + ] + }, + { + "cell_type": "code", + "execution_count": 37, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+---+---+----+---+\n", + "| C| D| E| A| B|\n", + "+---+---+---+----+---+\n", + "| 4| 6| 8|null| 2|\n", + "| 2| 3| 4|null| 1|\n", + "| 3| 4| 6|null| 1|\n", + "| 0| 0| 0|null| 0|\n", + "| 1| 1| 2|null| 0|\n", + "| 5| 7| 10|null| 2|\n", + "+---+---+---+----+---+\n", + "\n" + ] + } + ], + "source": [ + "spark.read.parquet('Table2').show()" + ] + }, + { + "cell_type": "code", + "execution_count": 38, + "metadata": {}, + "outputs": [], + "source": [ + "df3 = df.drop('A').filter('B < 3')" + ] + }, + { + "cell_type": "code", + "execution_count": 39, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "['B', 'C', 'D', 'E']" + ] + }, + "execution_count": 39, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df3.columns" + ] + }, + { + "cell_type": "code", + "execution_count": 40, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "['C', 'D', 'E', 'A', 'B']" + ] + }, + "execution_count": 40, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "df2.columns" + ] + }, + { + "cell_type": "code", + "execution_count": 42, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "False" + ] + }, + "execution_count": 42, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "set(df3.columns) == set(df2.columns)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "workspace-dqq3IVyd", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.2" + }, + "orig_nbformat": 4 + }, + "nbformat": 4, + "nbformat_minor": 2 +}