Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/data quality #60

Open
wants to merge 30 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
c57eccb
bugfix: Fixed diff-2-module
RodrigoATorres Aug 20, 2021
d5d4edf
Merge pull request #52 from RodrigoATorres/bugfix/diff-2-module
RodrigoATorres Aug 20, 2021
5535884
feat:added workspace & acc number to crawler path
Aug 23, 2021
ac05806
Merge branch 'main' of github.com:MassaoMitsunaga/rony into dev
Aug 23, 2021
60424ae
Merge branch 'dev' of github.com:MassaoMitsunaga/rony into dev
Aug 23, 2021
c541f5b
feat:added kinesis stream module
Aug 23, 2021
b756d13
feat:quickstart updated
Aug 24, 2021
a965a1e
add: initial framework for creating Spark processing classes and stru…
pltoledo Aug 30, 2021
6e33e8e
add: module template config file for spark module
pltoledo Aug 30, 2021
8b37858
fix: structure module and add dependencie in module .json
pltoledo Sep 2, 2021
4984fef
fix: structure module and add dependencie in module .json
pltoledo Sep 2, 2021
b3a9eb5
Merge pull request #53 from MassaoMitsunaga/dev
RodrigoATorres Oct 23, 2021
a7b42a3
Merge pull request #55 from pltoledo/main
RodrigoATorres Oct 23, 2021
5c9ac38
Update ChangeLog.md
RodrigoATorres Oct 23, 2021
29d9533
feat/data-quality - first commit
Nov 17, 2021
0c3bb42
first DataQuality abstract Class
Nov 23, 2021
4cb32da
insert run abstractmethod in DataQuality class
Nov 23, 2021
f318f10
constroi DataQuality como um módulo da biblioteca Rony, não como gera…
Nov 23, 2021
c2cb7f4
improve DataQuality abstract class
Nov 24, 2021
73207d4
hard coded initial Analyzer run
Nov 24, 2021
aec41bc
WIP analyzer hard coded example is working
Nov 24, 2021
b0c1cc5
separate DataQuality and DQ AbsClass. Create default SparkSession
Nov 24, 2021
78042a1
change version to dev .1000
Nov 24, 2021
0250a8e
update class diagram
Nov 24, 2021
2840cb6
Analyzer with config file or string expression working. Only one colu…
Nov 25, 2021
85f5e95
doc(dq) correct parameters on write_file
Dec 6, 2021
97ff295
change json to yaml
Mar 31, 2022
0b76951
refactor yaml to deal with metrics with 2+ columns
Mar 31, 2022
538cb70
feat/complete analyzer jobs with all PyDeequ Analyzer methods
Jul 7, 2022
1ea8286
doc/rony analyzer example
Jul 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,7 @@ dmypy.json

*_env/

ci-test/
ci-test/

.DS_Store

10 changes: 10 additions & 0 deletions docs/ChangeLog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# Change Log

## Version 0.3.2 (dev)

- Bugfixes:
- Gitlab CI fix on production plan step
- Fixes on diif-2-module function
- Added dependencies field to json file created
- Fixed issue that would not add new files to created module
- Added spark scripts module


## Version 0.3.1

- Bugfixes
Expand Down
49 changes: 15 additions & 34 deletions docs/QuickStart.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,49 +33,30 @@ source <project_name>_env/bin/activate
pip install -r requirements.txt
```

4) Rony has also some handy cli commands to build and run docker images locally. You can do

```bash
cd etl
rony build <image_name>:<tag>
```

to build an image and run it with
## Implementation suggestions

```bash
rony run <image_name>:<tag>
```
When you start a new `rony` project, you will by default create a project with `aws` as `provider` and you will be asked which modules you want to implement in yout project.

In this particular implementation, `run.py` has a simple etl code that accepts a parameter to filter the data based on the `Sex` column. To use that, you can do
You can implement modules after the project creation using the following command in your project's root folder.

```bash
docker run <image_name>:<tag> -s female
rony add-module <module_name>
```

## Implementation suggestions

When you start a new `rony` project, you will find

- an `infrastructure` folder with terraform code creating on AWS:
- an S3 bucket
- a Lambda function
- a CloudWatch log group
- a ECR repository
- a AWS Glue Crawler
- IAM roles and policies for lambda and glue
The module_name can be found on the module_templates folder in the rony project folder.

- an `etl` folder with:
- a `Dockerfile` and a `run.py` example of ETL code
- a `lambda_function.py` with a "Hello World" example
When you start a new `rony` project, you will find:

- a `tests` folder with unit testing on the Lambda function
- a `.github/workflow` folder with a Github Actions CI/CD pipeline suggestion. This pipeline
- Tests lambda function
- Builds and runs the docker image
- Sets AWS credentials
- Make a terraform plan (but not actually deploy anything)
- an `infrastructure` folder with terraform code creating on the chosen provider:
- a backend file
- For the aws provider, you will need to create a bucket manually and reference its name on the bucket variable.
- a provider file
- a variables file
- files for the modules implemented during the project creation

- a `dags` folder with some **Airflow** example code.f
- a `CI` folder with a docker-composer file and CI scripts and tests

- a `Makefile` file containing shortcut commands for the project

You also have a `scripts` folder with a bash file that builds a lambda deploy package.

Expand Down
Binary file added img/Rony-data-quality-classes-diagram.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
3 changes: 2 additions & 1 deletion rony/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from .cli import *
from ._version import __version__
__author__ = 'A3Data'
from .data_quality import *
__author__ = 'A3Data'
2 changes: 1 addition & 1 deletion rony/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.3.1'
__version__ = '0.3.1.1000'
3 changes: 3 additions & 0 deletions rony/cli_aux.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def get_modules_to_add(command, opts, ctx):
"aws_simple_storage_service",
"aws_glue_crawler",
"aws_lambda_function",
"aws_kinesis_stream"
]
else:
if click.confirm("Add S3 module?", default=True):
Expand All @@ -56,6 +57,8 @@ def get_modules_to_add(command, opts, ctx):
all_modules.append("aws_glue_crawler")
if click.confirm("Add LAMBDA FUNCTION module?", default=True):
all_modules.append("aws_lambda_function")
if click.confirm("Add KINESIS STREAM module?", default=True):
all_modules.append("aws_kinesis_stream")

if opts["provider"] == "gcp":

Expand Down
4 changes: 4 additions & 0 deletions rony/data_quality/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from .profiler import Profiler
from .analyzer import Analyzer
from .data_quality import DataQuality

206 changes: 206 additions & 0 deletions rony/data_quality/analyzer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
import yaml
from datetime import datetime

from pyspark.sql.dataframe import DataFrame
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import functions as f

from .data_quality import DataQuality

from pydeequ.analyzers import (
AnalysisRunner, AnalyzerContext,
ApproxCountDistinct, ApproxQuantile, ApproxQuantiles,
Completeness, Compliance, Correlation, CountDistinct,
DataType, Distinctness, Entropy, Histogram, KLLParameters, KLLSketch,
Maximum, MaxLength, Mean, Minimum, MinLength, MutualInformation,
PatternMatch, Size, StandardDeviation, Sum, Uniqueness,
UniqueValueRatio
)


class Analyzer(DataQuality):
"""
Class for building and running Analyzer jobs and output tables.

Parameters
----------
spark: SparkSession
A SparkSession object
"""

def __init__(self, spark: SparkSession) -> None:
super().__init__(spark)


def _analysis_job_builder(self, config_path: str) -> str:
"""
Build the Analyzer job code as a string expression to be evaluated.

Parameters
----------
config_path: str
Path to a yaml config file.
Config file must have 2 major keys: columns and metrics.

Columns major key must have dataframe column names as keys and lists of
analysis methods as values (as each method listed here works with only one column as input).

Methods major key was built to deal with methods that take more than one column or parameter.
Methods major key must have methods as keys and lists of lists as values.

YAML Example
------------

columns:
PassengerId: [Completeness]
Age: [Completeness, Mean, StandardDeviation, Minimum, Maximum, Sum, Entropy]
Sex: [Completeness, ApproxCountDistinct, Distinctness]
Fare: [Completeness, Mean, StandardDeviation]
Pclass: [DataType]
Survived: [Histogram]
Name: [MaxLength, MinLength]
metrics:
Correlation:
- [Fare, Age]
- [Fare, Survived]
Compliance:
- [Age, "Age>40.2"]
PatternMatch:
- [Name, "M(r|rs|iss)."]
ApproxQuantiles:
- [Age, '0.5', '0.25', '0.75']
- [Fare, '0.5', '0.25', '0.75']
Uniqueness:
- [PassengerId]
- [Name,Sex]
- [Ticket]
UniqueValueRatio:
- [PassengerId]
- [Name,Sex]

Returns
-------
str -> The AnalysisRunner expression as a str object
"""
with open(config_path, "r") as file:
configurations = yaml.full_load(file)

columnsconfig = configurations["columns"]
metricsconfig = configurations["metrics"]

expression = "AnalysisRunner(spark).onData(df).addAnalyzer(Size())"

for col in columnsconfig.keys():
for method in columnsconfig[col]:
expression += ".addAnalyzer(" + method + '("' + col + '"))'

for method in metricsconfig.keys():

for params in metricsconfig[method]:
expression += ".addAnalyzer(" + method + '('

if method == "ApproxQuantiles":
expression += '"' + params[0] + '", ['
for i in range(1,len(params)):
expression += params[i] + ', '
expression += ']'

elif method == "ApproxQuantile":
expression += '"' + params[0] + '", ' + params[1]

elif method == "Uniqueness" or method == "UniqueValueRatio":
expression += '['
for i in range(len(params)):
expression += '"' + params[i] + '", '
expression += ']'

else:
for col in params:
expression += '"' + col + '", '
expression += '))'

expression += ".run()"
return expression


def run(self, df: DataFrame, config: str) -> DataFrame:
"""
Run the Analyzer job

Parameters
----------

df: DataFrame
A Spark DataFrame

config: str
Path to a yaml config file or a str expression of AnalysisRunner to evaluate.

If config is a path to a yaml file, config file must have 2 major keys: columns and metrics.

Columns major key must have dataframe column names as keys and lists of
analysis methods as values (as each method listed here works with only one column as input).

Methods major key was built to deal with methods that take more than one column or parameter.
Methods major key must have methods as keys and lists of lists as values.

YAML Example
------------

columns:
PassengerId: [Completeness]
Age: [Completeness, Mean, StandardDeviation, Minimum, Maximum, Sum, Entropy]
Sex: [Completeness, ApproxCountDistinct, Distinctness]
Fare: [Completeness, Mean, StandardDeviation]
Pclass: [DataType]
Survived: [Histogram]
Name: [MaxLength, MinLength]
metrics:
Correlation:
- [Fare, Age]
- [Fare, Survived]
Compliance:
- [Age, "Age>40.2"]
PatternMatch:
- [Name, "M(r|rs|iss)."]
ApproxQuantiles:
- [Age, '0.5', '0.25', '0.75']
- [Fare, '0.5', '0.25', '0.75']
Uniqueness:
- [PassengerId]
- [Name,Sex]
- [Ticket]
UniqueValueRatio:
- [PassengerId]
- [Name,Sex]

Returns
-------

DataFrame -> A DataFrame with the results for Analyzer job.
"""
try:
expression = self._analysis_job_builder(config)
except:
expression_start = "AnalysisRunner(spark).onData(df)"
if not config.startswith(expression_start):
raise AttributeError("String expression should start with 'AnalysisRunner(spark).onData(df)'")
else:
expression = config

spark = self.spark
analysisResult = eval(expression)

analysisResult_df = (
AnalyzerContext
.successMetricsAsDataFrame(self.spark, analysisResult)
)

analysisResult_df = (
analysisResult_df
.orderBy("entity", "instance", "name")
.withColumn("dt_update", f.lit(datetime.now().strftime("%Y-%m-%d-%H-%M-%S")))
)

return analysisResult_df

Loading