diff --git a/.gitignore b/.gitignore index 1c2f86e..f994b42 100644 --- a/.gitignore +++ b/.gitignore @@ -132,4 +132,7 @@ dmypy.json *_env/ -ci-test/ \ No newline at end of file +ci-test/ + +.DS_Store + diff --git a/docs/ChangeLog.md b/docs/ChangeLog.md index 41a1bd5..39ea991 100644 --- a/docs/ChangeLog.md +++ b/docs/ChangeLog.md @@ -1,5 +1,15 @@ # Change Log +## Version 0.3.2 (dev) + +- Bugfixes: + - Gitlab CI fix on production plan step + - Fixes on diif-2-module function + - Added dependencies field to json file created + - Fixed issue that would not add new files to created module +- Added spark scripts module + + ## Version 0.3.1 - Bugfixes diff --git a/docs/QuickStart.md b/docs/QuickStart.md index 3d58da0..9100191 100644 --- a/docs/QuickStart.md +++ b/docs/QuickStart.md @@ -33,49 +33,30 @@ source _env/bin/activate pip install -r requirements.txt ``` -4) Rony has also some handy cli commands to build and run docker images locally. You can do - -```bash -cd etl -rony build : -``` - -to build an image and run it with +## Implementation suggestions -```bash -rony run : -``` +When you start a new `rony` project, you will by default create a project with `aws` as `provider` and you will be asked which modules you want to implement in yout project. -In this particular implementation, `run.py` has a simple etl code that accepts a parameter to filter the data based on the `Sex` column. To use that, you can do +You can implement modules after the project creation using the following command in your project's root folder. ```bash -docker run : -s female +rony add-module ``` -## Implementation suggestions - -When you start a new `rony` project, you will find - -- an `infrastructure` folder with terraform code creating on AWS: - - an S3 bucket - - a Lambda function - - a CloudWatch log group - - a ECR repository - - a AWS Glue Crawler - - IAM roles and policies for lambda and glue +The module_name can be found on the module_templates folder in the rony project folder. -- an `etl` folder with: - - a `Dockerfile` and a `run.py` example of ETL code - - a `lambda_function.py` with a "Hello World" example +When you start a new `rony` project, you will find: -- a `tests` folder with unit testing on the Lambda function -- a `.github/workflow` folder with a Github Actions CI/CD pipeline suggestion. This pipeline - - Tests lambda function - - Builds and runs the docker image - - Sets AWS credentials - - Make a terraform plan (but not actually deploy anything) +- an `infrastructure` folder with terraform code creating on the chosen provider: + - a backend file + - For the aws provider, you will need to create a bucket manually and reference its name on the bucket variable. + - a provider file + - a variables file + - files for the modules implemented during the project creation -- a `dags` folder with some **Airflow** example code.f +- a `CI` folder with a docker-composer file and CI scripts and tests + +- a `Makefile` file containing shortcut commands for the project You also have a `scripts` folder with a bash file that builds a lambda deploy package. diff --git a/img/Rony-data-quality-classes-diagram.png b/img/Rony-data-quality-classes-diagram.png new file mode 100644 index 0000000..74393c3 Binary files /dev/null and b/img/Rony-data-quality-classes-diagram.png differ diff --git a/rony/__init__.py b/rony/__init__.py index 683640d..ad74eb0 100644 --- a/rony/__init__.py +++ b/rony/__init__.py @@ -1,3 +1,4 @@ from .cli import * from ._version import __version__ -__author__ = 'A3Data' \ No newline at end of file +from .data_quality import * +__author__ = 'A3Data' diff --git a/rony/_version.py b/rony/_version.py index e1424ed..17f5211 100644 --- a/rony/_version.py +++ b/rony/_version.py @@ -1 +1 @@ -__version__ = '0.3.1' +__version__ = '0.3.1.1000' diff --git a/rony/cli_aux.py b/rony/cli_aux.py index ad58acc..2370a01 100644 --- a/rony/cli_aux.py +++ b/rony/cli_aux.py @@ -48,6 +48,7 @@ def get_modules_to_add(command, opts, ctx): "aws_simple_storage_service", "aws_glue_crawler", "aws_lambda_function", + "aws_kinesis_stream" ] else: if click.confirm("Add S3 module?", default=True): @@ -56,6 +57,8 @@ def get_modules_to_add(command, opts, ctx): all_modules.append("aws_glue_crawler") if click.confirm("Add LAMBDA FUNCTION module?", default=True): all_modules.append("aws_lambda_function") + if click.confirm("Add KINESIS STREAM module?", default=True): + all_modules.append("aws_kinesis_stream") if opts["provider"] == "gcp": diff --git a/rony/data_quality/__init__.py b/rony/data_quality/__init__.py new file mode 100644 index 0000000..f7ba861 --- /dev/null +++ b/rony/data_quality/__init__.py @@ -0,0 +1,4 @@ +from .profiler import Profiler +from .analyzer import Analyzer +from .data_quality import DataQuality + diff --git a/rony/data_quality/analyzer.py b/rony/data_quality/analyzer.py new file mode 100644 index 0000000..d2254d7 --- /dev/null +++ b/rony/data_quality/analyzer.py @@ -0,0 +1,206 @@ +import yaml +from datetime import datetime + +from pyspark.sql.dataframe import DataFrame +from pyspark.sql import SparkSession, DataFrame +from pyspark.sql import functions as f + +from .data_quality import DataQuality + +from pydeequ.analyzers import ( + AnalysisRunner, AnalyzerContext, + ApproxCountDistinct, ApproxQuantile, ApproxQuantiles, + Completeness, Compliance, Correlation, CountDistinct, + DataType, Distinctness, Entropy, Histogram, KLLParameters, KLLSketch, + Maximum, MaxLength, Mean, Minimum, MinLength, MutualInformation, + PatternMatch, Size, StandardDeviation, Sum, Uniqueness, + UniqueValueRatio +) + + +class Analyzer(DataQuality): + """ + Class for building and running Analyzer jobs and output tables. + + Parameters + ---------- + spark: SparkSession + A SparkSession object + """ + + def __init__(self, spark: SparkSession) -> None: + super().__init__(spark) + + + def _analysis_job_builder(self, config_path: str) -> str: + """ + Build the Analyzer job code as a string expression to be evaluated. + + Parameters + ---------- + config_path: str + Path to a yaml config file. + Config file must have 2 major keys: columns and metrics. + + Columns major key must have dataframe column names as keys and lists of + analysis methods as values (as each method listed here works with only one column as input). + + Methods major key was built to deal with methods that take more than one column or parameter. + Methods major key must have methods as keys and lists of lists as values. + + YAML Example + ------------ + + columns: + PassengerId: [Completeness] + Age: [Completeness, Mean, StandardDeviation, Minimum, Maximum, Sum, Entropy] + Sex: [Completeness, ApproxCountDistinct, Distinctness] + Fare: [Completeness, Mean, StandardDeviation] + Pclass: [DataType] + Survived: [Histogram] + Name: [MaxLength, MinLength] + metrics: + Correlation: + - [Fare, Age] + - [Fare, Survived] + Compliance: + - [Age, "Age>40.2"] + PatternMatch: + - [Name, "M(r|rs|iss)."] + ApproxQuantiles: + - [Age, '0.5', '0.25', '0.75'] + - [Fare, '0.5', '0.25', '0.75'] + Uniqueness: + - [PassengerId] + - [Name,Sex] + - [Ticket] + UniqueValueRatio: + - [PassengerId] + - [Name,Sex] + + Returns + ------- + str -> The AnalysisRunner expression as a str object + """ + with open(config_path, "r") as file: + configurations = yaml.full_load(file) + + columnsconfig = configurations["columns"] + metricsconfig = configurations["metrics"] + + expression = "AnalysisRunner(spark).onData(df).addAnalyzer(Size())" + + for col in columnsconfig.keys(): + for method in columnsconfig[col]: + expression += ".addAnalyzer(" + method + '("' + col + '"))' + + for method in metricsconfig.keys(): + + for params in metricsconfig[method]: + expression += ".addAnalyzer(" + method + '(' + + if method == "ApproxQuantiles": + expression += '"' + params[0] + '", [' + for i in range(1,len(params)): + expression += params[i] + ', ' + expression += ']' + + elif method == "ApproxQuantile": + expression += '"' + params[0] + '", ' + params[1] + + elif method == "Uniqueness" or method == "UniqueValueRatio": + expression += '[' + for i in range(len(params)): + expression += '"' + params[i] + '", ' + expression += ']' + + else: + for col in params: + expression += '"' + col + '", ' + expression += '))' + + expression += ".run()" + return expression + + + def run(self, df: DataFrame, config: str) -> DataFrame: + """ + Run the Analyzer job + + Parameters + ---------- + + df: DataFrame + A Spark DataFrame + + config: str + Path to a yaml config file or a str expression of AnalysisRunner to evaluate. + + If config is a path to a yaml file, config file must have 2 major keys: columns and metrics. + + Columns major key must have dataframe column names as keys and lists of + analysis methods as values (as each method listed here works with only one column as input). + + Methods major key was built to deal with methods that take more than one column or parameter. + Methods major key must have methods as keys and lists of lists as values. + + YAML Example + ------------ + + columns: + PassengerId: [Completeness] + Age: [Completeness, Mean, StandardDeviation, Minimum, Maximum, Sum, Entropy] + Sex: [Completeness, ApproxCountDistinct, Distinctness] + Fare: [Completeness, Mean, StandardDeviation] + Pclass: [DataType] + Survived: [Histogram] + Name: [MaxLength, MinLength] + metrics: + Correlation: + - [Fare, Age] + - [Fare, Survived] + Compliance: + - [Age, "Age>40.2"] + PatternMatch: + - [Name, "M(r|rs|iss)."] + ApproxQuantiles: + - [Age, '0.5', '0.25', '0.75'] + - [Fare, '0.5', '0.25', '0.75'] + Uniqueness: + - [PassengerId] + - [Name,Sex] + - [Ticket] + UniqueValueRatio: + - [PassengerId] + - [Name,Sex] + + Returns + ------- + + DataFrame -> A DataFrame with the results for Analyzer job. + """ + try: + expression = self._analysis_job_builder(config) + except: + expression_start = "AnalysisRunner(spark).onData(df)" + if not config.startswith(expression_start): + raise AttributeError("String expression should start with 'AnalysisRunner(spark).onData(df)'") + else: + expression = config + + spark = self.spark + analysisResult = eval(expression) + + analysisResult_df = ( + AnalyzerContext + .successMetricsAsDataFrame(self.spark, analysisResult) + ) + + analysisResult_df = ( + analysisResult_df + .orderBy("entity", "instance", "name") + .withColumn("dt_update", f.lit(datetime.now().strftime("%Y-%m-%d-%H-%M-%S"))) + ) + + return analysisResult_df + diff --git a/rony/data_quality/data_quality.py b/rony/data_quality/data_quality.py new file mode 100644 index 0000000..5a11c28 --- /dev/null +++ b/rony/data_quality/data_quality.py @@ -0,0 +1,162 @@ +from abc import ABC, abstractmethod +from pyspark.sql import SparkSession, DataFrame + + +class DQ(ABC): + """ + Abstract DataQuality Class + + Parameters + ---------- + spark: SparkSession + A SparkSession object to run DataQuality jobs. + SparkSession must have two configurations: + .config("spark.jars.packages", "com.amazon.deequ:deequ:2.0.1-spark-3.2") + .config("spark.jars.excludes", "net.sourceforge.f2j:arpack_combined_all") + """ + + @abstractmethod + def __init__(self, spark: SparkSession) -> None: + if isinstance(spark, SparkSession): + self.spark = spark + else: + raise TypeError("spark must be a valid SparkSession object.") + + + @abstractmethod + def run(self, df: DataFrame) -> DataFrame: + """ + Run the DataQuality process + + Parameters + ---------- + + df: DataFrame + A Spark DataFrame to run DataQuality jobs. + + Returns + ------- + + DataFrame -> A DataFrame with the results for DataQuality job. + """ + pass + + + @abstractmethod + def write_output(self, df: DataFrame, path: str, + delta: bool = True) -> None: + """ + Write output for DataQuality process. + + Parameters + ---------- + df: DataFrame + The DataFrame object to write to an external object storage + path: str + The path to write the results. Usually, an S3/GCS/Blob Storage + path + delta: bool + If True, write a delta table on specified path. If False, write a + simple parquet file. + """ + pass + + +class DataQuality(DQ): + """ + Base DataQuality Class + + Parameters + ---------- + spark: SparkSession + A SparkSession object to run DataQuality jobs. + SparkSession must have two configurations: + .config("spark.jars.packages", "com.amazon.deequ:deequ:2.0.1-spark-3.2") + .config("spark.jars.excludes", "net.sourceforge.f2j:arpack_combined_all") + """ + + def __init__(self, spark: SparkSession) -> None: + super().__init__(spark) + + + @classmethod + def create_pydeequ_SparkSession(cls, deequ_maven_package="com.amazon.deequ:deequ:2.0.1-spark-3.2") -> SparkSession: + """ + Creates a default SparkSession with PyDeequ jars. + + Parameters + ---------- + + deequ_maven_package: str + Maven package to use in Spark application. Defaults to "com.amazon.deequ:deequ:2.0.1-spark-3.2". + """ + return ( + SparkSession + .builder + .config("spark.jars.packages", deequ_maven_package) + .config("spark.jars.excludes", "net.sourceforge.f2j:arpack_combined_all") + .getOrCreate() + ) + + + + @classmethod + def create_deequ_delta_SparkSession(cls, + deequ_maven_package="com.amazon.deequ:deequ:2.0.1-spark-3.2", + delta_maven_package="io.delta:delta-core_2.12:1.2.1") -> SparkSession: + """ + Creates a SparkSession with PyDeequ and Delta jars + and necessary configurations. + + + Parameters + ---------- + + deequ_maven_package: str + Deequ maven package to use in Spark application. Defaults to "com.amazon.deequ:deequ:2.0.1-spark-3.2". + + delta_maven_package: str + Delta maven package to use in Spark application. Defaults to "io.delta:delta-core_2.12:1.2.1" + """ + return ( + SparkSession + .builder + .config("spark.jars.packages", f"{deequ_maven_package},{delta_maven_package}") + .config("spark.jars.excludes", "net.sourceforge.f2j:arpack_combined_all") + .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") + .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") + .getOrCreate() + ) + + + def write_output(self, df: DataFrame, path: str, + mode: str = "append", delta: bool = True) -> None: + """ + Write output for DataQuality process. + + Parameters + ---------- + df: DataFrame + The DataFrame object to write to an external object storage + path: str + The path to write the results. Usually, an S3/GCS/Blob Storage + path + mode: str + Write mode for parquet or delta table. One of "overwrite", + "append", "error" or "ignore". Defaults to "append". + delta: bool + If True, write a delta table on specified path. If False, write a + simple parquet file. Defaults to True. + """ + if delta: + output_format = "delta" + else: + output_format = "parquet" + + ( + df.write + .mode(mode) + .format(output_format) + .save(path) + ) + diff --git a/rony/data_quality/examples/rony_analyzer_example.ipynb b/rony/data_quality/examples/rony_analyzer_example.ipynb new file mode 100644 index 0000000..6a2ce16 --- /dev/null +++ b/rony/data_quality/examples/rony_analyzer_example.ipynb @@ -0,0 +1,488 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "2e536555", + "metadata": {}, + "source": [ + "# Rony Data Quality - Analyzer example" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "b28f9afa", + "metadata": { + "scrolled": true + }, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "Please set env variable SPARK_VERSION\n" + ] + } + ], + "source": [ + "import pydeequ\n", + "\n", + "from pyspark import SparkFiles\n", + "from pyspark.sql import SparkSession\n", + "\n", + "from rony.data_quality import Analyzer, DataQuality" + ] + }, + { + "cell_type": "markdown", + "id": "74a7baab", + "metadata": {}, + "source": [ + "### Create a SparkSession with PyDeequ and Delta jars from maven" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "eb1b2484", + "metadata": { + "scrolled": true + }, + "outputs": [], + "source": [ + "spark = DataQuality.create_deequ_delta_SparkSession()" + ] + }, + { + "cell_type": "markdown", + "id": "cca9fff8", + "metadata": {}, + "source": [ + "### Download *titanic* data from the web" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "d0a899d7", + "metadata": { + "scrolled": true + }, + "outputs": [], + "source": [ + "url = \"https://raw.githubusercontent.com/neylsoncrepalde/titanic_data_with_semicolon/main/titanic.csv\"\n", + "spark.sparkContext.addFile(url)\n", + "df = (\n", + " spark\n", + " .read\n", + " .csv(\"file://\" + SparkFiles.get(\"titanic.csv\"), header=True, inferSchema=True, sep=\";\")\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "3d467c7a", + "metadata": { + "scrolled": false + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+\n", + "|PassengerId|Survived|Pclass| Name| Sex| Age|SibSp|Parch| Ticket| Fare|Cabin|Embarked|\n", + "+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+\n", + "| 1| 0| 3|Braund, Mr. Owen ...| male|22.0| 1| 0| A/5 21171| 7.25| null| S|\n", + "| 2| 1| 1|Cumings, Mrs. Joh...|female|38.0| 1| 0| PC 17599|71.2833| C85| C|\n", + "| 3| 1| 3|Heikkinen, Miss. ...|female|26.0| 0| 0|STON/O2. 3101282| 7.925| null| S|\n", + "| 4| 1| 1|Futrelle, Mrs. Ja...|female|35.0| 1| 0| 113803| 53.1| C123| S|\n", + "| 5| 0| 3|Allen, Mr. Willia...| male|35.0| 0| 0| 373450| 8.05| null| S|\n", + "| 6| 0| 3| Moran, Mr. James| male|null| 0| 0| 330877| 8.4583| null| Q|\n", + "| 7| 0| 1|McCarthy, Mr. Tim...| male|54.0| 0| 0| 17463|51.8625| E46| S|\n", + "| 8| 0| 3|Palsson, Master. ...| male| 2.0| 3| 1| 349909| 21.075| null| S|\n", + "| 9| 1| 3|Johnson, Mrs. Osc...|female|27.0| 0| 2| 347742|11.1333| null| S|\n", + "| 10| 1| 2|Nasser, Mrs. Nich...|female|14.0| 1| 0| 237736|30.0708| null| C|\n", + "| 11| 1| 3|Sandstrom, Miss. ...|female| 4.0| 1| 1| PP 9549| 16.7| G6| S|\n", + "| 12| 1| 1|Bonnell, Miss. El...|female|58.0| 0| 0| 113783| 26.55| C103| S|\n", + "| 13| 0| 3|Saundercock, Mr. ...| male|20.0| 0| 0| A/5. 2151| 8.05| null| S|\n", + "| 14| 0| 3|Andersson, Mr. An...| male|39.0| 1| 5| 347082| 31.275| null| S|\n", + "| 15| 0| 3|Vestrom, Miss. Hu...|female|14.0| 0| 0| 350406| 7.8542| null| S|\n", + "| 16| 1| 2|Hewlett, Mrs. (Ma...|female|55.0| 0| 0| 248706| 16.0| null| S|\n", + "| 17| 0| 3|Rice, Master. Eugene| male| 2.0| 4| 1| 382652| 29.125| null| Q|\n", + "| 18| 1| 2|Williams, Mr. Cha...| male|null| 0| 0| 244373| 13.0| null| S|\n", + "| 19| 0| 3|Vander Planke, Mr...|female|31.0| 1| 0| 345763| 18.0| null| S|\n", + "| 20| 1| 3|Masselmani, Mrs. ...|female|null| 0| 0| 2649| 7.225| null| C|\n", + "+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+\n", + "only showing top 20 rows\n", + "\n" + ] + } + ], + "source": [ + "df.show()" + ] + }, + { + "cell_type": "markdown", + "id": "4e2abcb6", + "metadata": {}, + "source": [ + "### Write job configurations yaml file" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "2d8b2d50-847e-4dde-b5d6-bb8730848a48", + "metadata": { + "scrolled": true + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Writing analyzer_configs.yaml\n" + ] + } + ], + "source": [ + "%%writefile analyzer_configs.yaml\n", + "columns:\n", + " PassengerId: [Completeness]\n", + " Age: [Completeness, Mean, StandardDeviation, Minimum, Maximum, Sum, Entropy]\n", + " Sex: [Completeness, ApproxCountDistinct, Distinctness]\n", + " Fare: [Completeness, Mean, StandardDeviation]\n", + " Pclass: [DataType]\n", + " Survived: [Histogram]\n", + " Name: [MaxLength, MinLength]\n", + "metrics:\n", + " Correlation: \n", + " - [Fare, Age]\n", + " - [Fare, Survived]\n", + " Compliance: \n", + " - [Age, \"Age>40.2\"]\n", + " PatternMatch: \n", + " - [Name, \"M(r|rs|iss).\"]\n", + " ApproxQuantiles: \n", + " - [Age, '0.5', '0.25', '0.75']\n", + " - [Fare, '0.5', '0.25', '0.75']\n", + " Uniqueness:\n", + " - [PassengerId]\n", + " - [Name,Sex]\n", + " - [Ticket]\n", + " UniqueValueRatio:\n", + " - [PassengerId]\n", + " - [Name,Sex]" + ] + }, + { + "cell_type": "markdown", + "id": "af76cd09", + "metadata": {}, + "source": [ + "If you have experience with PyDeequ library, you can pass an *AnalysisRunner* statement as a string. For now, it must be a \"one liner\" expression:" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "aa28b1ee", + "metadata": { + "scrolled": true + }, + "outputs": [], + "source": [ + "string_expression = 'AnalysisRunner(spark).onData(df).addAnalyzer(Size()).addAnalyzer(Completeness(\"Age\")).addAnalyzer(Mean(\"Age\")).addAnalyzer(StandardDeviation(\"Age\")).addAnalyzer(Minimum(\"Age\")).addAnalyzer(Maximum(\"Age\")).addAnalyzer(Completeness(\"Sex\")).addAnalyzer(ApproxCountDistinct(\"Sex\")).addAnalyzer(ApproxQuantile(\"Age\", 0.5)).addAnalyzer(UniqueValueRatio([\"PassengerId\", \"Sex\"])).run()'" + ] + }, + { + "cell_type": "markdown", + "id": "2db25e3d", + "metadata": {}, + "source": [ + "### Instantiate Analyzer Class" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "e5d4f770", + "metadata": { + "scrolled": true + }, + "outputs": [], + "source": [ + "analyzer = Analyzer(spark)" + ] + }, + { + "cell_type": "markdown", + "id": "5f096737", + "metadata": {}, + "source": [ + "### Run the analyzer job and show results" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "2e7287d9", + "metadata": { + "scrolled": false + }, + "outputs": [], + "source": [ + "res_from_file = analyzer.run(df, \"analyzer_configs.yaml\")" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "996b2891", + "metadata": { + "scrolled": false + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+-----------+-------------+--------------------------+--------------------+-------------------+\n", + "|entity |instance |name |value |dt_update |\n", + "+-----------+-------------+--------------------------+--------------------+-------------------+\n", + "|Column |Age |ApproxQuantiles-0.25 |20.0 |2022-07-06-23-24-56|\n", + "|Column |Age |ApproxQuantiles-0.5 |28.0 |2022-07-06-23-24-56|\n", + "|Column |Age |ApproxQuantiles-0.75 |38.0 |2022-07-06-23-24-56|\n", + "|Column |Age |Completeness |0.8013468013468014 |2022-07-06-23-24-56|\n", + "|Column |Age |Compliance |0.16835016835016836 |2022-07-06-23-24-56|\n", + "|Column |Age |Entropy |4.045611490075093 |2022-07-06-23-24-56|\n", + "|Column |Age |Maximum |80.0 |2022-07-06-23-24-56|\n", + "|Column |Age |Mean |29.69911764705882 |2022-07-06-23-24-56|\n", + "|Column |Age |Minimum |0.42 |2022-07-06-23-24-56|\n", + "|Column |Age |StandardDeviation |14.51632115081731 |2022-07-06-23-24-56|\n", + "|Column |Age |Sum |21205.17 |2022-07-06-23-24-56|\n", + "|Column |Fare |ApproxQuantiles-0.25 |7.8958 |2022-07-06-23-24-56|\n", + "|Column |Fare |ApproxQuantiles-0.5 |14.4542 |2022-07-06-23-24-56|\n", + "|Column |Fare |ApproxQuantiles-0.75 |30.5 |2022-07-06-23-24-56|\n", + "|Column |Fare |Completeness |1.0 |2022-07-06-23-24-56|\n", + "|Column |Fare |Mean |32.2042079685746 |2022-07-06-23-24-56|\n", + "|Column |Fare |StandardDeviation |49.6655344447741 |2022-07-06-23-24-56|\n", + "|Column |Name |MaxLength |82.0 |2022-07-06-23-24-56|\n", + "|Column |Name |MinLength |12.0 |2022-07-06-23-24-56|\n", + "|Column |Name |PatternMatch |0.0 |2022-07-06-23-24-56|\n", + "|Column |PassengerId |Completeness |1.0 |2022-07-06-23-24-56|\n", + "|Column |PassengerId |UniqueValueRatio |1.0 |2022-07-06-23-24-56|\n", + "|Column |PassengerId |Uniqueness |1.0 |2022-07-06-23-24-56|\n", + "|Column |Pclass |Histogram.abs.Boolean |0.0 |2022-07-06-23-24-56|\n", + "|Column |Pclass |Histogram.abs.Fractional |0.0 |2022-07-06-23-24-56|\n", + "|Column |Pclass |Histogram.abs.Integral |891.0 |2022-07-06-23-24-56|\n", + "|Column |Pclass |Histogram.abs.String |0.0 |2022-07-06-23-24-56|\n", + "|Column |Pclass |Histogram.abs.Unknown |0.0 |2022-07-06-23-24-56|\n", + "|Column |Pclass |Histogram.bins |5.0 |2022-07-06-23-24-56|\n", + "|Column |Pclass |Histogram.ratio.Boolean |0.0 |2022-07-06-23-24-56|\n", + "|Column |Pclass |Histogram.ratio.Fractional|0.0 |2022-07-06-23-24-56|\n", + "|Column |Pclass |Histogram.ratio.Integral |1.0 |2022-07-06-23-24-56|\n", + "|Column |Pclass |Histogram.ratio.String |0.0 |2022-07-06-23-24-56|\n", + "|Column |Pclass |Histogram.ratio.Unknown |0.0 |2022-07-06-23-24-56|\n", + "|Column |Sex |ApproxCountDistinct |2.0 |2022-07-06-23-24-56|\n", + "|Column |Sex |Completeness |1.0 |2022-07-06-23-24-56|\n", + "|Column |Sex |Distinctness |0.002244668911335578|2022-07-06-23-24-56|\n", + "|Column |Survived |Histogram.abs.0 |549.0 |2022-07-06-23-24-56|\n", + "|Column |Survived |Histogram.abs.1 |342.0 |2022-07-06-23-24-56|\n", + "|Column |Survived |Histogram.bins |2.0 |2022-07-06-23-24-56|\n", + "|Column |Survived |Histogram.ratio.0 |0.6161616161616161 |2022-07-06-23-24-56|\n", + "|Column |Survived |Histogram.ratio.1 |0.3838383838383838 |2022-07-06-23-24-56|\n", + "|Column |Ticket |Uniqueness |0.6139169472502806 |2022-07-06-23-24-56|\n", + "|Dataset |* |Size |891.0 |2022-07-06-23-24-56|\n", + "|Mutlicolumn|Fare,Age |Correlation |0.09606669176903912 |2022-07-06-23-24-56|\n", + "|Mutlicolumn|Fare,Survived|Correlation |0.2573065223849626 |2022-07-06-23-24-56|\n", + "|Mutlicolumn|Name,Sex |UniqueValueRatio |1.0 |2022-07-06-23-24-56|\n", + "|Mutlicolumn|Name,Sex |Uniqueness |1.0 |2022-07-06-23-24-56|\n", + "+-----------+-------------+--------------------------+--------------------+-------------------+\n", + "\n" + ] + } + ], + "source": [ + "res_from_file.show(truncate=False, n=50)" + ] + }, + { + "cell_type": "markdown", + "id": "501cea3f", + "metadata": {}, + "source": [ + "### Run analyzer job from string expression" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "bb7fa942", + "metadata": {}, + "outputs": [], + "source": [ + "res_from_expression = analyzer.run(df, string_expression)" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "1a7ee91f", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+-----------+---------------+-------------------+------------------+-------------------+\n", + "|entity |instance |name |value |dt_update |\n", + "+-----------+---------------+-------------------+------------------+-------------------+\n", + "|Column |Age |ApproxQuantile-0.5 |28.0 |2022-07-06-23-24-59|\n", + "|Column |Age |Completeness |0.8013468013468014|2022-07-06-23-24-59|\n", + "|Column |Age |Maximum |80.0 |2022-07-06-23-24-59|\n", + "|Column |Age |Mean |29.69911764705882 |2022-07-06-23-24-59|\n", + "|Column |Age |Minimum |0.42 |2022-07-06-23-24-59|\n", + "|Column |Age |StandardDeviation |14.51632115081731 |2022-07-06-23-24-59|\n", + "|Column |Sex |ApproxCountDistinct|2.0 |2022-07-06-23-24-59|\n", + "|Column |Sex |Completeness |1.0 |2022-07-06-23-24-59|\n", + "|Dataset |* |Size |891.0 |2022-07-06-23-24-59|\n", + "|Mutlicolumn|PassengerId,Sex|UniqueValueRatio |1.0 |2022-07-06-23-24-59|\n", + "+-----------+---------------+-------------------+------------------+-------------------+\n", + "\n" + ] + } + ], + "source": [ + "res_from_expression.show(truncate=False)" + ] + }, + { + "cell_type": "markdown", + "id": "4e63a03f", + "metadata": {}, + "source": [ + "### Write dataframe output as a Delta Table" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "ad2080c0", + "metadata": { + "scrolled": true + }, + "outputs": [], + "source": [ + "analyzer.write_output(res_from_file, \"./dataqualityoutput\", delta=True, mode=\"append\")" + ] + }, + { + "cell_type": "markdown", + "id": "c05cc09e", + "metadata": {}, + "source": [ + "Check if the results were correctly written." + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "58673876", + "metadata": { + "scrolled": false + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+-----------+-------------+--------------------+--------------------+-------------------+\n", + "| entity| instance| name| value| dt_update|\n", + "+-----------+-------------+--------------------+--------------------+-------------------+\n", + "| Column| Age|ApproxQuantiles-0.25| 20.0|2022-07-06-23-24-56|\n", + "| Column| Age| ApproxQuantiles-0.5| 28.0|2022-07-06-23-24-56|\n", + "| Column| Age|ApproxQuantiles-0.75| 38.0|2022-07-06-23-24-56|\n", + "| Column| Age| Completeness| 0.8013468013468014|2022-07-06-23-24-56|\n", + "| Column| Age| Compliance| 0.16835016835016836|2022-07-06-23-24-56|\n", + "| Column| Age| Entropy| 4.045611490075093|2022-07-06-23-24-56|\n", + "| Column| Age| Maximum| 80.0|2022-07-06-23-24-56|\n", + "| Column| Age| Mean| 29.69911764705882|2022-07-06-23-24-56|\n", + "| Column| Age| Minimum| 0.42|2022-07-06-23-24-56|\n", + "| Column| Age| StandardDeviation| 14.51632115081731|2022-07-06-23-24-56|\n", + "| Column| Age| Sum| 21205.17|2022-07-06-23-24-56|\n", + "| Column| Fare|ApproxQuantiles-0.25| 7.8958|2022-07-06-23-24-56|\n", + "| Column| Fare| ApproxQuantiles-0.5| 14.4542|2022-07-06-23-24-56|\n", + "| Column| Fare|ApproxQuantiles-0.75| 30.5|2022-07-06-23-24-56|\n", + "| Column| Fare| Completeness| 1.0|2022-07-06-23-24-56|\n", + "| Column| Fare| Mean| 32.2042079685746|2022-07-06-23-24-56|\n", + "| Column| Fare| StandardDeviation| 49.6655344447741|2022-07-06-23-24-56|\n", + "| Column| Name| MaxLength| 82.0|2022-07-06-23-24-56|\n", + "| Column| Name| MinLength| 12.0|2022-07-06-23-24-56|\n", + "| Column| Name| PatternMatch| 0.0|2022-07-06-23-24-56|\n", + "| Column| PassengerId| Completeness| 1.0|2022-07-06-23-24-56|\n", + "| Column| PassengerId| UniqueValueRatio| 1.0|2022-07-06-23-24-56|\n", + "| Column| PassengerId| Uniqueness| 1.0|2022-07-06-23-24-56|\n", + "| Column| Pclass|Histogram.abs.Boo...| 0.0|2022-07-06-23-24-56|\n", + "| Column| Pclass|Histogram.abs.Fra...| 0.0|2022-07-06-23-24-56|\n", + "| Column| Pclass|Histogram.abs.Int...| 891.0|2022-07-06-23-24-56|\n", + "| Column| Pclass|Histogram.abs.String| 0.0|2022-07-06-23-24-56|\n", + "| Column| Pclass|Histogram.abs.Unk...| 0.0|2022-07-06-23-24-56|\n", + "| Column| Pclass| Histogram.bins| 5.0|2022-07-06-23-24-56|\n", + "| Column| Pclass|Histogram.ratio.B...| 0.0|2022-07-06-23-24-56|\n", + "| Column| Pclass|Histogram.ratio.F...| 0.0|2022-07-06-23-24-56|\n", + "| Column| Pclass|Histogram.ratio.I...| 1.0|2022-07-06-23-24-56|\n", + "| Column| Pclass|Histogram.ratio.S...| 0.0|2022-07-06-23-24-56|\n", + "| Column| Pclass|Histogram.ratio.U...| 0.0|2022-07-06-23-24-56|\n", + "| Column| Sex| ApproxCountDistinct| 2.0|2022-07-06-23-24-56|\n", + "| Column| Sex| Completeness| 1.0|2022-07-06-23-24-56|\n", + "| Column| Sex| Distinctness|0.002244668911335578|2022-07-06-23-24-56|\n", + "| Column| Survived| Histogram.abs.0| 549.0|2022-07-06-23-24-56|\n", + "| Column| Survived| Histogram.abs.1| 342.0|2022-07-06-23-24-56|\n", + "| Column| Survived| Histogram.bins| 2.0|2022-07-06-23-24-56|\n", + "| Column| Survived| Histogram.ratio.0| 0.6161616161616161|2022-07-06-23-24-56|\n", + "| Column| Survived| Histogram.ratio.1| 0.3838383838383838|2022-07-06-23-24-56|\n", + "| Column| Ticket| Uniqueness| 0.6139169472502806|2022-07-06-23-24-56|\n", + "| Dataset| *| Size| 891.0|2022-07-06-23-24-56|\n", + "|Mutlicolumn| Fare,Age| Correlation| 0.09606669176903912|2022-07-06-23-24-56|\n", + "|Mutlicolumn|Fare,Survived| Correlation| 0.2573065223849626|2022-07-06-23-24-56|\n", + "|Mutlicolumn| Name,Sex| UniqueValueRatio| 1.0|2022-07-06-23-24-56|\n", + "|Mutlicolumn| Name,Sex| Uniqueness| 1.0|2022-07-06-23-24-56|\n", + "| Column| Age|ApproxQuantiles-0.25| 20.0|2022-07-06-23-08-11|\n", + "| Column| Age| ApproxQuantiles-0.5| 28.0|2022-07-06-23-08-11|\n", + "+-----------+-------------+--------------------+--------------------+-------------------+\n", + "only showing top 50 rows\n", + "\n" + ] + } + ], + "source": [ + "spark.read.parquet(\"dataqualityoutput/\").show(50)" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.5" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/rony/data_quality/profiler.py b/rony/data_quality/profiler.py new file mode 100644 index 0000000..5cc0536 --- /dev/null +++ b/rony/data_quality/profiler.py @@ -0,0 +1,36 @@ +from pyspark.sql.dataframe import DataFrame +from .data_quality import DataQuality +from pyspark.sql import SparkSession, DataFrame + + +class Profiler(DataQuality): + """ + Abstract DataQuality Class + + Parameters + ---------- + spark: SparkSession + A SparkSession object to run DataQuality jobs. + """ + + def __init__(self, spark: SparkSession) -> None: + super().__init__(spark) + + + def run(self, df: DataFrame) -> DataFrame: + """ + Run the DataQuality job + + Parameters + ---------- + + df: DataFrame + A Spark DataFrame to run DataQuality jobs. + + Returns + ------- + + DataFrame -> A DataFrame with the results for DataQuality job. + """ + pass + diff --git a/rony/module_templates/aws_glue_crawler/infrastructure/aws/crawler.tf b/rony/module_templates/aws_glue_crawler/infrastructure/aws/crawler.tf index 5b013ef..07aadc3 100644 --- a/rony/module_templates/aws_glue_crawler/infrastructure/aws/crawler.tf +++ b/rony/module_templates/aws_glue_crawler/infrastructure/aws/crawler.tf @@ -5,7 +5,7 @@ resource "aws_glue_crawler" "glue_crawler" { role = aws_iam_role.glue_role.arn s3_target { - path = var.bucket_paths[count.index] + path = "${local.prefix}-${var.bucket_paths[count.index]}-${var.account}" } tags = local.common_tags diff --git a/rony/module_templates/aws_glue_crawler/infrastructure/aws/variables.tf b/rony/module_templates/aws_glue_crawler/infrastructure/aws/variables.tf index 2c7d9ea..67daa47 100644 --- a/rony/module_templates/aws_glue_crawler/infrastructure/aws/variables.tf +++ b/rony/module_templates/aws_glue_crawler/infrastructure/aws/variables.tf @@ -13,8 +13,8 @@ variable "bucket_paths" { description = "Paths to S3 bucket used by the crawler" type = list(string) default = [ - "s3://landing-zone-123456789", - "s3://processing-zone-123456789", - "s3://delivery-zone-123456789" + "s3://landing-zone", + "s3://processing-zone", + "s3://delivery-zone" ] } \ No newline at end of file diff --git a/rony/module_templates/aws_kinesis_stream.json b/rony/module_templates/aws_kinesis_stream.json new file mode 100644 index 0000000..c71729d --- /dev/null +++ b/rony/module_templates/aws_kinesis_stream.json @@ -0,0 +1,21 @@ +{ + "info": "Module for deploying AWS Kinesis Stream", + "instructions": [ + "" + ], + "developers": [ + "massao.mitsunaga@a3data.com.br", + "hery.barbosa@a3data.com.br" + ], + "input_info": [ + [ + "project_start_date", + "23/08/2021", + "Initial development aws modules" + ] + ], + "version": "0.0.1", + "dependencies": [ + "__AWS_BASE__" + ] +} \ No newline at end of file diff --git a/rony/module_templates/aws_kinesis_stream/infrastructure/aws/kinesis_stream.tf b/rony/module_templates/aws_kinesis_stream/infrastructure/aws/kinesis_stream.tf new file mode 100644 index 0000000..aa18278 --- /dev/null +++ b/rony/module_templates/aws_kinesis_stream/infrastructure/aws/kinesis_stream.tf @@ -0,0 +1,13 @@ +resource "aws_kinesis_stream" "rony_kinesis_stream" { + name = "${local.prefix}-${var.kinesis_stream_name}-${var.account}" + shard_count = 1 + retention_period = 48 + + shard_level_metrics = [ + "IncomingBytes", + "OutgoingBytes", + ] + + tags = local.common_tags + +} \ No newline at end of file diff --git a/rony/module_templates/aws_kinesis_stream/infrastructure/aws/variables.tf b/rony/module_templates/aws_kinesis_stream/infrastructure/aws/variables.tf new file mode 100644 index 0000000..dc787da --- /dev/null +++ b/rony/module_templates/aws_kinesis_stream/infrastructure/aws/variables.tf @@ -0,0 +1,3 @@ +variable "kinesis_stream_name" { + default = "rony_kinesis_stream" +} \ No newline at end of file diff --git a/rony/module_templates/spark.json b/rony/module_templates/spark.json new file mode 100644 index 0000000..4f837ba --- /dev/null +++ b/rony/module_templates/spark.json @@ -0,0 +1,14 @@ +{ + "info": "Module for developing Spark classes", + "instructions": [ + "" + ], + "developers": [ + "pedro.toledo@a3data.com.br" + ], + "input_info": [], + "version": "0.0.1", + "dependencies": [ + "CI_workflows" + ] +} \ No newline at end of file diff --git a/rony/module_templates/spark/spark/data_source.py b/rony/module_templates/spark/spark/data_source.py new file mode 100644 index 0000000..851f263 --- /dev/null +++ b/rony/module_templates/spark/spark/data_source.py @@ -0,0 +1,75 @@ +from pyspark.sql.dataframe import DataFrame + + +class FlatFile: + """ + Class for reading files with Spark + """ + + def __init__(self, spark_session) -> None: + """ + Instantiate class + + Parameters + ---------- + spark_session : pyspark.sql.session.SparkSession + SparkSession used to read data + + Returns + ------- + self: + returns an instance of the object + """ + self.spark = spark_session + + def read_data(self, file_path, format, **kwargs) -> DataFrame: + """ + Read a Spark DataFrame + + Parameters + ---------- + file_path : str + File path + format: str + name of the file format, e.g. 'csv', 'parquet'. + **kwargs: + Additional reading options passed to `options()`. + Returns + ------- + pyspark.sql.DataFrame + """ + return self.spark.read.format(format).options(**kwargs).load(file_path) + + def write_data(self, df, save_path, format, mode, partitions=None) -> None: + """ + Save a Spark DataFrame + + Parameters + ---------- + save_path : str + Destination path + format : str + name of the destination file format, e.g. 'csv', 'parquet'. + mode : str + specify the mode of writing data, if data already exist in the designed path + * append: Append the contents of the DataFrame to the existing data + * overwrite: Overwrite existing data + * ignore: Silently ignores this operation + * error or errorifexists (default case): Raises an error + partitions : int + number of partitions desired in output files. If greater than `df`'s partitions, `rapartition()` will be used. + Else, `coalesce()` will be used. + + Returns + ------- + self: + returns an instance of the object + """ + if partitions: + df_partitions = df.rdd.getNumPartitions() + if df_partitions >= partitions: + df.coalesce(partitions).write.format(format).save(save_path, mode=mode) + else: + df.repartition(partitions).write.format(format).save(save_path, mode=mode) + else: + df.write.format(format).save(save_path, mode=mode) diff --git a/rony/module_templates/spark/spark/main.py b/rony/module_templates/spark/spark/main.py new file mode 100644 index 0000000..3495e42 --- /dev/null +++ b/rony/module_templates/spark/spark/main.py @@ -0,0 +1,13 @@ +from pyspark.sql import SparkSession +from processing import SparkCleaner + +if __name__ == "__main__": + spark = ( + SparkSession + .builder + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .getOrCreate() + ) + cleaner = SparkCleaner(spark) + cleaner.clean() + spark.stop() \ No newline at end of file diff --git a/rony/module_templates/spark/spark/processing.py b/rony/module_templates/spark/spark/processing.py new file mode 100644 index 0000000..1ea254d --- /dev/null +++ b/rony/module_templates/spark/spark/processing.py @@ -0,0 +1,128 @@ +import pyspark.sql.functions as f +from pyspark.sql.window import Window +from data_source import FlatFile +from spark_utils import unidecode_udf, convert_decimal_udf + +FILE_PATHS = { + 'data': 'path/to/file' +} + +class SparkCleaner: + """ + Class used to clean perform basic cleaning with Spark + """ + + def __init__(self, spark_session) -> None: + """ + Instantiates class + + Parameters + ---------- + spark_session : pyspark.sql.session.SparkSession + SparkSession that is used to manipulate data. + + Returns + ------- + self: + returns an instance of the object + """ + self.spark = spark_session + self.ff_source = FlatFile(spark_session) + self.read_options = {} + self.save_path = 'destination/path' + + def read_data(self, format) -> None: + """ + Reads raw data for cleaning. + + Parameters + ---------- + format : str + name of the destination file format, e.g. 'csv', 'parquet'. + + Returns + ------- + self: + returns an instance of the object + """ + self.df = self.ff_source.read_data(FILE_PATHS['data'], format, **self.read_options) + + def clean_types(self) -> None: + """ + Cleans DataFrame column types. + + Parameters + ---------- + None + + Returns + ------- + self: + returns an instance of the object + """ + str_cols = [] + for c in str_cols: + self.df = self.df.withColumn(c, unidecode_udf(f.initcap(f.trim(c)))) + + dbl_cols = [] + for c in dbl_cols: + self.df = self.df.withColumn(c, convert_decimal_udf(f.col(c))) + + date_cols = [] + for c in date_cols: + self.df = self.df.withColumn(c, f.to_date(c, 'dd/MM/yyyy')) + + def clean_specific(self) -> None: + """ + Performs cleaning operations specific to this DataFrame. + + Parameters + ---------- + None + + Returns + ------- + self: + returns an instance of the object + """ + self.df_cleaned = self.df + + def write_data(self, format, mode='error') -> None: + """ + Saves intermediate DataFrames generated in this process. + + Parameters + ---------- + format : str + name of the destination file format, e.g. 'csv', 'parquet'. + mode : str + specify the mode of writing data, if data already exist in the designed path + * append: Append the contents of the DataFrame to the existing data + * overwrite: Overwrite existing data + * ignore: Silently ignores this operation + * error or errorifexists (default case): Raises an error + + Returns + ------- + self: + returns an instance of the object + """ + self.ff_source.write_data(self.df_cleaned, self.save_path, format, mode) + + def clean(self) -> None: + """ + Wrapper for running cleaner. + + Parameters + ---------- + None + + Returns + ------- + self: + returns an instance of the object + """ + self.read_data('csv') + self.clean_types() + self.clean_specific() + self.write_data('parquet') \ No newline at end of file diff --git a/rony/module_templates/spark/spark/spark_utils.py b/rony/module_templates/spark/spark/spark_utils.py new file mode 100644 index 0000000..52bfa4d --- /dev/null +++ b/rony/module_templates/spark/spark/spark_utils.py @@ -0,0 +1,98 @@ +import pyspark.sql.functions as f +import pyspark.sql.types as t +from pyspark.sql.dataframe import DataFrame +from unidecode import unidecode +import numpy as np +import os +import re + +def get_spark_versions() -> 'list[str]': + spark_home = os.environ['SPARK_HOME'] + spark_version = re.search('(?<=spark-).+(?=-bin)', spark_home).group(0) + hadoop_version = re.search('(?<=hadoop).+', spark_home).group(0) + return (spark_version, hadoop_version) + +spark_version, hadoop_version = get_spark_versions() + +if int(spark_version[0]) < 3: + def transform(self, f) -> DataFrame: + return f(self) + + DataFrame.transform = transform + +# UDFs + +@f.udf(returnType=t.StringType()) +def unidecode_udf(string): + if not string: + return None + else: + return unidecode(string) + +@f.udf(returnType=t.DoubleType()) +def convert_decimal_udf(string): + if string is None: + return None + else: + string = string.replace(",", ".") + return float(string.replace(".", "", string.count(".") - 1)) + +@f.udf(returnType=t.FloatType()) +def array_product_udf(array): + if not array: + return None + else: + array = [e for e in array if e is not None] + return float(np.prod(array)) + + +@f.udf(returnType=t.FloatType()) +def array_sum_udf(array): + if not array: + return None + else: + array = [e for e in array if e is not None] + return sum(array) + + +# Custom methods + +def df_from_struct(cols, extract_col, explode) -> DataFrame: + def _(df): + if explode: + df = df.withColumn(extract_col, f.explode(extract_col)) + struct_cols = df.select(f'{extract_col}.*').columns + renamed_cols = [] + for c in struct_cols: + col_ref = f.col(f'{extract_col}.' + c) + if c in cols: + renamed_cols.append(col_ref.alias(c + '_struct')) + else: + renamed_cols.append(col_ref) + return df.select(*cols, *renamed_cols) + return _ + +def renamer(dict) -> DataFrame: + def _(df): + for c, n in dict.items(): + df = df.withColumnRenamed(c, n) + return df + return _ + + +def unpivot(*args, col_name="categorias", value_name="valor") -> DataFrame: + if not args[0]: + key_cols = [] + else: + key_cols = args[0] if type(args[0]) is list else args + + def _(df): + unpivot_cols = [c for c in df.columns if c not in key_cols] + groups_str = [f"'{i}', `{i}`" for i in unpivot_cols] + unpivot_string = ", ".join(groups_str) + unpivot_query = "stack({}, {}) as ({}, {})".format( + len(unpivot_cols), unpivot_string, col_name, value_name + ) + return df.selectExpr(*key_cols, unpivot_query) + + return _ \ No newline at end of file diff --git a/rony/module_writer.py b/rony/module_writer.py index 3bce338..7dbc151 100644 --- a/rony/module_writer.py +++ b/rony/module_writer.py @@ -212,13 +212,18 @@ def create_module_from_diff(module_name): patch_file = f".rony_{module_name}.patch" module_path = os.path.join(os.path.expanduser("~"), "MyRonyModules", module_name) - p = subprocess.call(["git", "diff", "--no-prefix"], stdout=open(patch_file, "w")) + git_command = "git add -N -A; git diff --no-prefix" + ret = subprocess.run( + git_command, + capture_output=True, + shell=True, + ) - patch_set = PatchSet.from_filename(patch_file) + patch_set = PatchSet(ret.stdout.decode()) for patched_file in patch_set: - file_path = patched_file.path + file_path = patched_file.target_file added_lines = [] for hunk in patched_file: for line in hunk: @@ -234,8 +239,6 @@ def create_module_from_diff(module_name): with open(module_file_path, "w") as f: f.write("".join(added_lines)) - os.remove(patch_file) - info = click.prompt("Please your modules description", default="") inst = click.prompt( "Please instructions to be displayed to the users after they add this module", @@ -252,6 +255,7 @@ def create_module_from_diff(module_name): "developers": [developer], "input_info": [], "version": "0.0.0", + "dependencies": [], }, sort_keys=True, indent=4,