Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Take Home Exam Solution #1

Open
wants to merge 121 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
121 commits
Select commit Hold shift + click to select a range
5fdfdde
Added idea folder in gitignore
Jul 28, 2021
0241e85
Deleted data.zip to save storage
Jul 28, 2021
4d17309
Development Checklist for Continuous Integration
Jul 28, 2021
f0f41d9
Folder blueprint for development
Jul 28, 2021
2734723
Docker compose setup
Jul 28, 2021
51b2978
Added Project Package Requirements
Jul 28, 2021
df53a47
Git ignore logs folder
Jul 28, 2021
b8b5eb8
Renamed README.md to instructions.md
Jul 28, 2021
a5bcb3c
Renamed README.md to instructions.md
Jul 28, 2021
8f090a1
Added the project's manual
Jul 28, 2021
1785d63
Renamed README.md to test-instructions.md
Jul 28, 2021
b1f9065
Flake8 and Pytest configuration
Jul 28, 2021
ae24fb0
Spark package where Spark Jobs will reside, and deleted airflow confi…
Jul 28, 2021
2190b52
Added Note for docker-compose Postgresql execution, airflow web serve…
Jul 28, 2021
5e5d8b9
Adde pyspark in the project's package requirements
Jul 28, 2021
0d57403
Docker config for Spark with 3 executors
Jul 28, 2021
f8c9607
Added a note for future development
Jul 28, 2021
25170f5
Git Ignore Pycharm Config
Jul 28, 2021
c0015ed
Added TODO notes in README
Jul 28, 2021
33507d8
Fixed airflow scheduler is failing when database creation is not yet …
Jul 29, 2021
fdeea40
Added Development Notes
Jul 29, 2021
331778f
Upgraded airflow version, and reverted back pyspark to 3.0.1
Jul 29, 2021
9cbaa96
Retail dag auto formatted by black
Jul 29, 2021
5740f82
Docker configuration for Spark added in airflow Dockerfile, since we …
Jul 29, 2021
7724086
Added example dag for spark job
Jul 29, 2021
0e48e1f
Added Kafka in the Docker registry
Jul 29, 2021
e35cb40
Added log4j config for Kafka Logs
Jul 29, 2021
2f1828a
Added Kafka Python library in the package requirements
Jul 29, 2021
0247765
Added data folder in git ignore file
Jul 29, 2021
948919c
Added few more development notes for the future
Jul 29, 2021
3f84053
Added mysqlclient and mysqlconnector to support airflow's mysql hook …
Jul 29, 2021
441f848
Data generation (?)
Jul 29, 2021
a49e46c
Initialized data sync operator to test the MySQLHook installed
Jul 29, 2021
d049b17
Separate Dockerfile for MySQL service.
Jul 29, 2021
e6e9f6e
Black Auto Formatting
Jul 29, 2021
a62df65
Added development env file
Jul 29, 2021
6cc637b
Fixed Checklist Lints
Jul 29, 2021
82036b2
Data Generation (???)
Jul 29, 2021
a5f12db
Added notes for documentation later on
Jul 30, 2021
505d612
Added clean up of cache objects in make cmd
Jul 30, 2021
749185f
Remove synchronization of data folder to docker container
Jul 30, 2021
e585df7
Tbl to Stage Operator setup
Jul 30, 2021
0f91bd2
Added pandas in package requirements
Jul 30, 2021
4e02172
Renamed ORDERS table to ORDER
Jul 30, 2021
9ab60f8
TblToStage Operator implementation
Jul 30, 2021
b739847
Black Autoformatting
Jul 30, 2021
ec6bb8e
Added Upsert Query for region table
Jul 30, 2021
880841c
Renamed table names into lowercase
Jul 30, 2021
628a9c8
Renamed SQL table names into lowercase
Jul 30, 2021
1ca3572
Removed Kafka and Upgraded postgres into v13
Jul 30, 2021
35aa39d
Configured automated security check and static type checking
Jul 31, 2021
4358d2e
Added a command for pruning docker containers
Jul 31, 2021
69448e0
Replaced the usage of pandas to_sql for inserting.
Jul 31, 2021
a498131
Modularization of TblToStageOperator class
Jul 31, 2021
f2e98e0
Correction on each of items in the checklist configuration
Jul 31, 2021
c416dcc
Pytest plugin for dockerized testing
Jul 31, 2021
6b24f1a
Removed variables in conftest.py
Jul 31, 2021
f2bc149
Added unit tests for general rules for each DAGs
Jul 31, 2021
7f5bd13
Added unit tests for the retail_dag DAG
Jul 31, 2021
9ae3bc2
Removed unused variables in dag integrity test suite
Jul 31, 2021
3ec82ec
Black Autoformatting
Jul 31, 2021
59ca313
Sample dag for parallel spark job tasks
Jul 31, 2021
688b2c0
New Integration Tests for TblToStageOperator
Jul 31, 2021
6388788
Added pytest-docker in package requirements for dockerized testing
Jul 31, 2021
1009b23
Added documentation for common errors
Jul 31, 2021
b82ff26
Enhanced the static typing for TblToStageOperator.execute() method
Jul 31, 2021
4649e78
Removed Kafka Package Dependency
Jul 31, 2021
987057b
Added more notes for documentation purposes.
Jul 31, 2021
7db71f7
Adjustment on test for dag integrity due to changes in dag definition
Jul 31, 2021
8218ad5
Added a getter function to get upsert queries for a given table name.
Jul 31, 2021
6f78f01
Clean up retail_dag.py
Jul 31, 2021
bd81a43
Wrote an instruction to mitigate issues when running tests
Jul 31, 2021
4d03d41
Added a test for idempotency, also DRYed the code for cleaner tests.
Jul 31, 2021
dfa255a
Added INSERT queries for data insertion to MySQL
Aug 1, 2021
44ea6a2
Added execution time for query table filtering
Aug 1, 2021
18fef67
Modification of typo errors in SupplierHeader model
Aug 1, 2021
cc74896
Black Autoformatting
Aug 1, 2021
bb9a2fc
Improved the ddl operations by adding if not exists upon creation.
Aug 1, 2021
c9c532a
Restructured DAG connections
Aug 1, 2021
f5623bc
Black Autoformatting
Aug 1, 2021
ba99845
Fixed test errors for dag test suites.
Aug 1, 2021
7c18b63
Reduced verbosity of pytest
Aug 1, 2021
3818765
Fixed unit test errors for TblToStageOperator
Aug 1, 2021
38baba6
Merge pull request #1 from 1byte-yoda/mark/data-ingester-pipeline
1byte-yoda Aug 1, 2021
5107390
Merge pull request #1 from 1byte-yoda/mark/data-ingester-pipeline
1byte-yoda Aug 1, 2021
493e072
Modified the commit message for the merged branch.
Aug 1, 2021
9dde073
Removed PySpark from the project
Aug 1, 2021
12c4229
Removed spark volume binding in docker
Aug 1, 2021
f6096fd
Added PostgreSQL requirements for development
Aug 1, 2021
810700b
Health check for Postgres DW
Aug 1, 2021
70304ca
Optional return for get_upsert_query method
Aug 1, 2021
bd819f4
Removed unused files / irrelevant to project
Aug 2, 2021
6913d84
Added a Step-by-Step guide to run the project.
Aug 2, 2021
e0b8042
Postgres to Datawarehouse Operator Implementation
Aug 2, 2021
5723147
Clean up and added some TODO notes for later development use.
Aug 2, 2021
d46f64e
Added SQL Analytical Queries that will serve as an anwer for the Repo…
Aug 2, 2021
dc3fdb4
Initial test setup for postgres_dw_operator
Aug 2, 2021
b30c878
Decoupled environment variables for docker-compose services
Aug 2, 2021
c46a3c8
README minimal typo error
Aug 2, 2021
b45cef1
Removed extra/unuseful texts in README.md
Aug 2, 2021
772d2c4
Cleanup TODO lists
Aug 2, 2021
aab3bfd
Added Answers for each exam questions
Aug 2, 2021
d1eaf54
Removed some unnecessary details in README and ANSWERS file
Aug 2, 2021
1b1b26f
Removed docker environment variables in git ignored files
Aug 2, 2021
cd554cb
Added a link in the ANSWERS.md file for SQL Queries for Dashboard Rep…
Aug 2, 2021
464be36
Format SQL query for better readability
Aug 2, 2021
f20024c
Fixed formatting issues in ANSWERS.md
Aug 2, 2021
423aca2
Fixed some line indentions
Aug 2, 2021
e146119
Fixed Checklist Errors
Aug 2, 2021
86c1b5e
Fixed typo error in dashboard queries
Aug 2, 2021
5223044
Fixed some typo issue in ANSWERS.md file
Aug 2, 2021
1ba1cbb
Added more make commands in README.md file
Aug 2, 2021
30dcb30
Improved SQL Query variable naming
Aug 2, 2021
ac36a35
Improvements in the Table DDL file formatting
Aug 2, 2021
1634b49
Added surrogate key for each dimension tables and also adjusted the f…
Aug 4, 2021
cb6eff7
Fixed flake8 lints
Aug 4, 2021
6ec0182
Fixed Checklist lints
Aug 4, 2021
1388bde
Improved star schema by adding data types
Aug 4, 2021
492e8f0
Added key notations for start schema for better readability
Aug 4, 2021
eefe524
Optimized dag batch run to 15,000 rows per chunks
Aug 4, 2021
6d098da
Fixed checklist errors and optimized sql queries for auto incremented…
Aug 4, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .devt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
MC4gTWFrZSBzdXJlIHlvdSBhcmUgaW4gdGhlIGRhdGFlbmdpbmVlci10ZXN0IGRpcmVjdG9yeSwgdGhlbiBkbyB0aGUgcmVzdCBvZiB0aGUgaW5zdHJ1Y3Rpb25zIGJlbG93LgoxLiBnaXQgY2xvbmUgLS1yZWN1cnNpdmUgaHR0cHM6Ly9naXRodWIuY29tL2xvdmFzb2EvVFBDSC1zcWxpdGUuZ2l0CjIuIGNkIFRQQ0gtc3FsaXRlICYmIFNDQUxFX0ZBQ1RPUj0wLjAxIG1ha2UgJiYgY2QgLi4KMy4gbWtkaXIgLXAgLi9kYXRhICYmIGZpbmQgLi9UUENILXNxbGl0ZS90cGNoLWRiZ2VuIC1yZWdleCAiXi4qXC50YmwiIC1leGVjIGNwIHt9IC4vZGF0YS8gXDsKCgojIFRoZXJlIHdpbGwgYmUgYSBjcmVhdGVkIGZvbGRlciBuYW1lZCAiZGF0YSIgdW5kZXIgdGhlIGRhdGFlbmdpbmVlci10ZXN0IHByb2plY3QgZGlyZWN0b3J5CiMgVGhpcyBmb2xkZXIgY29udGFpbnMgdGhlIGdlbmVyYXRlZCAqLnRibCBmaWxlcy4KIyBscyAuL2RhdGEvKi50YmwKIyBJIHVzZWQgdGhpcyBhcHByb2FjaCwgYnV0IHRoZSBpZGVhbCBwaXBlbGluZSAoYXNzdW1pbmcgd2UgaGF2ZSBhIGRpc3RyaWJ1dGVkIGZpbGUgc3lzdGVtKSBzaG91bGQgbG9vawojIHNvbWV0aGluZyBsaWtlOiAgIFtTb21lIERhdGEgU291cmNlXSAgLS0tPiAgTGFtYmRhICAtLS0+ICBTeW5jICAtLS0+ICBTMyAgLS0tPiAgQ09QWSAgLS0tPiBSZWRzaGlmdCBEVw==
8 changes: 7 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ TPCH-sqlite


# Environments
.env
.venv
env/
venv/
Expand All @@ -119,3 +118,10 @@ venv.bak/
.python-version
example.db

# Airflow dev logs
/logs

# Pycharm Configs
.idea/

data/
136 changes: 136 additions & 0 deletions ANSWERS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
## The small ETL project
#### Requirements 1 and 2
1. Data was generated using the `bonus_etl_data_gen.txt`
2. There's an instruction in the README.md file to replicate and run this project on your machine.

#### Requirements 3 and 4: Star Schema
Here, I removed the partsupp table entirely to simplify the model and to make it easier to declare the
grain for the fact table,
I also removed other candidates for dimension in the lineitem table that's not
relevant to our business use case.
![star-schema](docs/images/dw-star-schema.png)

#### Bonus Points
1. I created a cluster attribute in the dim_customer table,
the logic was based on their percentile standing among other customers,
and are defined as follows:
* Bronze Customers – Those who are in the first 50 percentile.
* Silver Customers – Those who are between the 50th and the 75th percentile.
* Gold Customers – Those who are above the 75th percentile.

2. The revenue was calculated using the ff. formula:
```
revenue = (quantity * extended_price) - (discount * extended_price)
```

3. I haven't impelented my idea here, and chose to do it as part of future improvements.
But the idea is to shift the median/mean of the dates to be centered at a new center date.
The pseudocode can be:
1. Convert the dates to unix timestamp
2. Calculate the mean or median of those timestamps, depending on data knowledge, but we can use the
mean for now.
3. Measure the distance between each dates versus the mean date ie.
* ```x_i - mean_x``` for all i in the dataset.
4. In our use case we are interested with the dates from the
past 2 years, so we can use this to define our new center date which is approx. 2020-06-15 for our use
case.
5. Use the distance and our defined center date to shift the old dates' distribution.
* ```old_timestamp + convert_to_timestamp(new_date)``` where old_timestamp can either be positive
(above the mean) or negative (below the mean)
6. This will give us date range values that can be either above the mean, or below the mean.

### Scheduling the Process
* The project uses airflow for orchestration, and we can set there the frequency of
pipeline runs ie. by using cron pattern

### Mitigating Random Data Arrival/Order
* If we have a streaming use case, we can do two 2 things:
1. Design the pipeline such that it's idempotent to reprocess the data, ie.
using staging distributed storages / or by using pure python functions.
This way, we can configure our pipeline to reprocess the data inside the staging storage, ie. by using
the Lambda Architecture where we have a speed layer that has the possibility to consume
late data, and another one is the batch layer, which will catch the computation for the late data.
2. Use Spark Watermark ie. we can define the latency that we want to tolerate.
This will work if we're going to use window based processing and if we've defined a
watermark column in our aggregates.

For example if we have an event data that arrives at 10:15, and all other
events related to it came only after 1 hour (at 11:15). If we defined a watermark
of say 2 hours, our pipeline will still consider those events that are not older than
9:15. It means, data between 9:15 and 11:15 are considered as one micro-batch, thus, 10:15's events
will be considered as on time.

### Dockerize Implementation
* The project uses docker to containerize airflow and our databases, please check README.md file for more info


### Data Reporting
For the data reporting part, I've created SQL queries inside the
[dashboard_queries.sql](https://github.com/1byte-yoda/dataengineer_test-1/blob/develop/services/redash_analytics/scripts/dashboard_queries.sql) file.
Please check it for more information.

### Data Profiling
* Data profiling tools will depend on the size of the data and the business requirements.
For small to mid size data, I will use pandas data profiling, although it's limited to Column profiling.
For mid to large scale data, I will use Alteryx or Talend.
* For techniques, I'll be looking on the following:
- Data anomalies - are there values that we think are suspicious?
- range of values - what is the acceptable range of values for Column Y? for Column N?
- Frequency - frequency distribution of Column X? Column Y? do we have more data coming from Europe vs.
Asia?
- data / variable type - how can we classify Column X? Is it a Continuous Variable or a Categorical Variable?
- Missing values - how many missing values are acceptable in Column X?
- Cardinality - (relationship) how many unique values do we have for Categorical Column Z?
do we have a 1 -> many relationship(s)?
- Distribution of data - do we have normal distribution for column x? do we have a log-normal distrubtion?
If we're using mathematical models that requires normal data, then checking the distribution of the
data can be useful.
* Data profiling will help us define our expectations to our data, as a best practice we should explore the
data before creating our data pipeline.
- For instance, if we identified that there's an increasing amount of data coming
from the users that resides at Europe, therefore we can configure our cloud stores / servers
to be located near Europe.
- Also. we can identify how many times we need to validate our data, and in
which layer of the pipeline. ie. We know that the data coming from Source_A
has a standard deviation of 1.5, therefore we can add a data validation just
near to that.
- In terms of security, we can identify which data should be protected / hashed.

### Production Environment and Scalability
What would be your recommendations in terms of tools and process?

* Depending on the requirements / SLAs, and I would declare some assumptions.


* In terms of tools, instead of using pandas, I'll use distributed processing frameworks
like spark, AWS EMR or AWS Lambda. This will help us scale our pipeline easily if we have that in our requirements.
We can add more nodes in our spark cluster easily in just few click, if we want to process n-times
amount of data.


* For the staging storage, instead of using MySQL database, I will use distributed storage like S3,
which is also horizontally scalable. This will avoid bottlenecks in our processing framework, for
eg. Spark, thus, utilization of distributed processing.


* If we have a streaming / event use case, we can use a streaming broker like Kafka or Kinesis.
Here, we can control the amount of partitions if we expect to have more data load.
It also allows decoupling of components in our pipeline, ie. we can avoid direct access to our
various data source and to only depend on our streaming broker for the data that we want to consume.


* For orchestration framework, I'll use Cloud watch to trigger the pipeline jobs for simplicity.
We can also use Airflow here, and I can talk about it if needed.


* For the data warehouse store, I'll pick a columnar database like Redshift which was
optimized for analytics and aggregation, also good performance for read-heavy workloads.
It can also tolerate late data, ie. if we have late data from a dimensional attribute in which our fact
table depends on. We can decide to proceed it to Redshift instead of isolating it for later processing,
this is
because redshift does not enforce constraints. Also, I used Postgres in my current build, so migrating
knowledge base as well as our resources won't be a painful process.




60 changes: 60 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Variable Declaration
COVERAGE_FOLDER := retail_etl tests/
KNOWN_TARGETS = cov_report
ARGS := $(filter-out $(KNOWN_TARGETS),$(MAKECMDGOALS))

# HTML Coverage Report
ifeq ($(ARGS), html)
COV_REPORT_TYPE := --cov-report html
endif

# XML Coverage Report
ifeq ($(ARGS), xml)
COV_REPORT_TYPE := --cov-report xml
endif

PYTHON=python3.9

# Check for Type Hint inconsistencies
.PHONY: typehint
typehint:
mypy $(COVERAGE_FOLDER)

# Run all Test Suites under the tests folder
.PHONY: test
test:
pytest tests/

# Format the code into black formatting
.PHONY: black
black:
black -l 110 $(COVERAGE_FOLDER)

# Check for Lint errors
.PHONY: lint
lint:
flake8 $(COVERAGE_FOLDER)

# Check for Security Vulnerabilities
.PHONY: scan_security
scan_security:
bandit -r $(COVERAGE_FOLDER) --skip B101

# Clean up local development's cache data
.PHONY: clean
clean:
find . -type f -name "*.pyc" | xargs rm -fr
find . -type d -name __pycache__ | xargs rm -fr
find . -type d -name .pytest_cache | xargs rm -fr

# Run all Pre-commit Checks
.PHONY: checklist
checklist: black lint scan_security test clean

# Check Coverage Report
.DEFAULT: ;: do nothing

.SUFFIXES:
.PHONY: cov_report
cov_report:
PYTHONPATH=. pytest --cov $(COVERAGE_FOLDER) $(COV_REPORT_TYPE)
Loading