diff --git a/.devt b/.devt new file mode 100644 index 0000000..6a7faa7 --- /dev/null +++ b/.devt @@ -0,0 +1 @@ +MC4gTWFrZSBzdXJlIHlvdSBhcmUgaW4gdGhlIGRhdGFlbmdpbmVlci10ZXN0IGRpcmVjdG9yeSwgdGhlbiBkbyB0aGUgcmVzdCBvZiB0aGUgaW5zdHJ1Y3Rpb25zIGJlbG93LgoxLiBnaXQgY2xvbmUgLS1yZWN1cnNpdmUgaHR0cHM6Ly9naXRodWIuY29tL2xvdmFzb2EvVFBDSC1zcWxpdGUuZ2l0CjIuIGNkIFRQQ0gtc3FsaXRlICYmIFNDQUxFX0ZBQ1RPUj0wLjAxIG1ha2UgJiYgY2QgLi4KMy4gbWtkaXIgLXAgLi9kYXRhICYmIGZpbmQgLi9UUENILXNxbGl0ZS90cGNoLWRiZ2VuIC1yZWdleCAiXi4qXC50YmwiIC1leGVjIGNwIHt9IC4vZGF0YS8gXDsKCgojIFRoZXJlIHdpbGwgYmUgYSBjcmVhdGVkIGZvbGRlciBuYW1lZCAiZGF0YSIgdW5kZXIgdGhlIGRhdGFlbmdpbmVlci10ZXN0IHByb2plY3QgZGlyZWN0b3J5CiMgVGhpcyBmb2xkZXIgY29udGFpbnMgdGhlIGdlbmVyYXRlZCAqLnRibCBmaWxlcy4KIyBscyAuL2RhdGEvKi50YmwKIyBJIHVzZWQgdGhpcyBhcHByb2FjaCwgYnV0IHRoZSBpZGVhbCBwaXBlbGluZSAoYXNzdW1pbmcgd2UgaGF2ZSBhIGRpc3RyaWJ1dGVkIGZpbGUgc3lzdGVtKSBzaG91bGQgbG9vawojIHNvbWV0aGluZyBsaWtlOiAgIFtTb21lIERhdGEgU291cmNlXSAgLS0tPiAgTGFtYmRhICAtLS0+ICBTeW5jICAtLS0+ICBTMyAgLS0tPiAgQ09QWSAgLS0tPiBSZWRzaGlmdCBEVw== \ No newline at end of file diff --git a/.gitignore b/.gitignore index 85eb5bf..fd0d689 100644 --- a/.gitignore +++ b/.gitignore @@ -94,7 +94,6 @@ TPCH-sqlite # Environments -.env .venv env/ venv/ @@ -119,3 +118,10 @@ venv.bak/ .python-version example.db +# Airflow dev logs +/logs + +# Pycharm Configs +.idea/ + +data/ \ No newline at end of file diff --git a/ANSWERS.md b/ANSWERS.md new file mode 100644 index 0000000..254ba6a --- /dev/null +++ b/ANSWERS.md @@ -0,0 +1,136 @@ +## The small ETL project +#### Requirements 1 and 2 +1. Data was generated using the `bonus_etl_data_gen.txt` +2. There's an instruction in the README.md file to replicate and run this project on your machine. + +#### Requirements 3 and 4: Star Schema +Here, I removed the partsupp table entirely to simplify the model and to make it easier to declare the +grain for the fact table, +I also removed other candidates for dimension in the lineitem table that's not +relevant to our business use case. +![star-schema](docs/images/dw-star-schema.png) + +#### Bonus Points +1. I created a cluster attribute in the dim_customer table, +the logic was based on their percentile standing among other customers, +and are defined as follows: + * Bronze Customers – Those who are in the first 50 percentile. + * Silver Customers – Those who are between the 50th and the 75th percentile. + * Gold Customers – Those who are above the 75th percentile. + +2. The revenue was calculated using the ff. formula: +``` +revenue = (quantity * extended_price) - (discount * extended_price) +``` + +3. I haven't impelented my idea here, and chose to do it as part of future improvements. +But the idea is to shift the median/mean of the dates to be centered at a new center date. +The pseudocode can be: + 1. Convert the dates to unix timestamp + 2. Calculate the mean or median of those timestamps, depending on data knowledge, but we can use the + mean for now. + 3. Measure the distance between each dates versus the mean date ie. + * ```x_i - mean_x``` for all i in the dataset. + 4. In our use case we are interested with the dates from the + past 2 years, so we can use this to define our new center date which is approx. 2020-06-15 for our use + case. + 5. Use the distance and our defined center date to shift the old dates' distribution. + * ```old_timestamp + convert_to_timestamp(new_date)``` where old_timestamp can either be positive + (above the mean) or negative (below the mean) + 6. This will give us date range values that can be either above the mean, or below the mean. + +### Scheduling the Process +* The project uses airflow for orchestration, and we can set there the frequency of +pipeline runs ie. by using cron pattern + +### Mitigating Random Data Arrival/Order +* If we have a streaming use case, we can do two 2 things: + 1. Design the pipeline such that it's idempotent to reprocess the data, ie. + using staging distributed storages / or by using pure python functions. + This way, we can configure our pipeline to reprocess the data inside the staging storage, ie. by using + the Lambda Architecture where we have a speed layer that has the possibility to consume + late data, and another one is the batch layer, which will catch the computation for the late data. + 2. Use Spark Watermark ie. we can define the latency that we want to tolerate. + This will work if we're going to use window based processing and if we've defined a + watermark column in our aggregates. + + For example if we have an event data that arrives at 10:15, and all other + events related to it came only after 1 hour (at 11:15). If we defined a watermark + of say 2 hours, our pipeline will still consider those events that are not older than + 9:15. It means, data between 9:15 and 11:15 are considered as one micro-batch, thus, 10:15's events + will be considered as on time. + +### Dockerize Implementation +* The project uses docker to containerize airflow and our databases, please check README.md file for more info + + +### Data Reporting +For the data reporting part, I've created SQL queries inside the +[dashboard_queries.sql](https://github.com/1byte-yoda/dataengineer_test-1/blob/develop/services/redash_analytics/scripts/dashboard_queries.sql) file. +Please check it for more information. + +### Data Profiling +* Data profiling tools will depend on the size of the data and the business requirements. +For small to mid size data, I will use pandas data profiling, although it's limited to Column profiling. +For mid to large scale data, I will use Alteryx or Talend. +* For techniques, I'll be looking on the following: + - Data anomalies - are there values that we think are suspicious? + - range of values - what is the acceptable range of values for Column Y? for Column N? + - Frequency - frequency distribution of Column X? Column Y? do we have more data coming from Europe vs. + Asia? + - data / variable type - how can we classify Column X? Is it a Continuous Variable or a Categorical Variable? + - Missing values - how many missing values are acceptable in Column X? + - Cardinality - (relationship) how many unique values do we have for Categorical Column Z? + do we have a 1 -> many relationship(s)? + - Distribution of data - do we have normal distribution for column x? do we have a log-normal distrubtion? + If we're using mathematical models that requires normal data, then checking the distribution of the + data can be useful. +* Data profiling will help us define our expectations to our data, as a best practice we should explore the + data before creating our data pipeline. + - For instance, if we identified that there's an increasing amount of data coming + from the users that resides at Europe, therefore we can configure our cloud stores / servers + to be located near Europe. + - Also. we can identify how many times we need to validate our data, and in + which layer of the pipeline. ie. We know that the data coming from Source_A + has a standard deviation of 1.5, therefore we can add a data validation just + near to that. + - In terms of security, we can identify which data should be protected / hashed. + +### Production Environment and Scalability + What would be your recommendations in terms of tools and process? + + * Depending on the requirements / SLAs, and I would declare some assumptions. + + + * In terms of tools, instead of using pandas, I'll use distributed processing frameworks + like spark, AWS EMR or AWS Lambda. This will help us scale our pipeline easily if we have that in our requirements. + We can add more nodes in our spark cluster easily in just few click, if we want to process n-times + amount of data. + + + * For the staging storage, instead of using MySQL database, I will use distributed storage like S3, + which is also horizontally scalable. This will avoid bottlenecks in our processing framework, for + eg. Spark, thus, utilization of distributed processing. + + + * If we have a streaming / event use case, we can use a streaming broker like Kafka or Kinesis. + Here, we can control the amount of partitions if we expect to have more data load. + It also allows decoupling of components in our pipeline, ie. we can avoid direct access to our + various data source and to only depend on our streaming broker for the data that we want to consume. + + + * For orchestration framework, I'll use Cloud watch to trigger the pipeline jobs for simplicity. + We can also use Airflow here, and I can talk about it if needed. + + + * For the data warehouse store, I'll pick a columnar database like Redshift which was + optimized for analytics and aggregation, also good performance for read-heavy workloads. + It can also tolerate late data, ie. if we have late data from a dimensional attribute in which our fact + table depends on. We can decide to proceed it to Redshift instead of isolating it for later processing, + this is + because redshift does not enforce constraints. Also, I used Postgres in my current build, so migrating + knowledge base as well as our resources won't be a painful process. + + + + \ No newline at end of file diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..e0256bd --- /dev/null +++ b/Makefile @@ -0,0 +1,60 @@ +# Variable Declaration +COVERAGE_FOLDER := retail_etl tests/ +KNOWN_TARGETS = cov_report +ARGS := $(filter-out $(KNOWN_TARGETS),$(MAKECMDGOALS)) + +# HTML Coverage Report +ifeq ($(ARGS), html) + COV_REPORT_TYPE := --cov-report html +endif + +# XML Coverage Report +ifeq ($(ARGS), xml) + COV_REPORT_TYPE := --cov-report xml +endif + +PYTHON=python3.9 + +# Check for Type Hint inconsistencies +.PHONY: typehint +typehint: + mypy $(COVERAGE_FOLDER) + +# Run all Test Suites under the tests folder +.PHONY: test +test: + pytest tests/ + +# Format the code into black formatting +.PHONY: black +black: + black -l 110 $(COVERAGE_FOLDER) + +# Check for Lint errors +.PHONY: lint +lint: + flake8 $(COVERAGE_FOLDER) + +# Check for Security Vulnerabilities +.PHONY: scan_security +scan_security: + bandit -r $(COVERAGE_FOLDER) --skip B101 + +# Clean up local development's cache data +.PHONY: clean +clean: + find . -type f -name "*.pyc" | xargs rm -fr + find . -type d -name __pycache__ | xargs rm -fr + find . -type d -name .pytest_cache | xargs rm -fr + +# Run all Pre-commit Checks +.PHONY: checklist +checklist: black lint scan_security test clean + +# Check Coverage Report +.DEFAULT: ;: do nothing + +.SUFFIXES: +.PHONY: cov_report +cov_report: + PYTHONPATH=. pytest --cov $(COVERAGE_FOLDER) $(COV_REPORT_TYPE) diff --git a/README.md b/README.md index d46218c..393786d 100644 --- a/README.md +++ b/README.md @@ -1,109 +1,175 @@ -# Data Engineer Interview Test +# Retail ETL -We are looking for a high quality data engineer which can deliver comprehensive solutions for our continuity and business growth. +[![Build Status](https://travis-ci.com/PHMark/gs-news-app.svg?branch=main)](https://travis-ci.com/1byte-yoda/gs-news-app) + + -The Analytics team drives the data culture, we want to change how we produce data from large batches to micro batching, from daily to near real-time/streaming processing, from tabular reports to insightful dashboards. + +## Table of Contents -You can be part of an amazing team which deals with data all the time using different process, tools and technologies. +* [About the Project](#about-the-project) + * [Technology Stack Used](#technology-stack-used) +* [Getting Started](#getting-started) + * [Prerequisites](#prerequisites) + * [Installation](#installation) +* [Common Errors](#common-errors) +* [Test Answers](https://github.com/1byte-yoda/dataengineer-test/blob/master/ANSWERS.md) -Following is a little treasure and challenge for those keen on joining this amazing company and team. -## Junior/Mid -For a Junior/Mid role we are expecting at least 2-3 tables to be loaded and an aggregated report done. - -## Senior -We are expecting the most from you. - - -# The Project -Build a small ETL process to digest a few set of files into a data warehouse like project. - -We are expecting an end-to-end ETL solution to deliver a simple star schema which an end user can easily slice and dice the data through a report or using basic ad-hoc query. - -### Tools and Technologies -We are a Python and SQL workshop, we would like to see this project using just those tools. - -However, we are open to other tools and technologies if we are able to easily replicate on our side. - -For the database, use a simple and light optimizer for your database, choose the one which can run a browser, but don't be limited to it. - -Please, avoid licensed products, we may not be able to proceed with this restriction on our own, if this is the case you may need to book a meeting to bring your tool and demo to us. - -How to do it? ------------------------ -Fork this repo, build your ETL process and commit the code with your answers. Open a Pull Request and send us a message highlighting the test is completed. - -#### Rules -* it must come with step by step instructions to run the code. -* please, be mindful that your code might be moved or deleted after we analyse the PR. -* use the best practices -* be able to explain from the ground up the whole process on face to face interview - -The small ETL project ---------- - -1. The data for this exercise can be found on the `data.zip` file. Can you describe the file format? - -**Super Bonus**: generate your own data through the instructions on the encoded file `bonus_etl_data_gen.txt`. -To get the bonus points, please encoded the file with the instructions were used to generate the files. - -2. Code you scripts to load the data into a database. - -3. Design a star schema model which the data should flow. - -4. Build your process to load the data into the star schema - -**Bonus** point: -- add a fields to classify the customer account balance in 3 groups -- add revenue per line item -- convert the dates to be distributed over the last 2 years - -5. How to schedule this process to run multiple times per day? - -**Bonus**: What to do if the data arrives in random order and times via streaming? - -6. How to deploy this code? - -**Bonus**: Can you make it to run on a container like process (Docker)? - -Data Reporting -------- -One of the most important aspects to build a DWH is to deliver insights to end-users. - -Can you using the designed star schema (or if you prefer the raw data), generate SQL statements to answer the following questions: - -1. What are the top 5 nations in terms of revenue? - -2. From the top 5 nations, what is the most common shipping mode? - -3. What are the top selling months? - -4. Who are the top customer in terms of revenue and/or quantity? - -5. Compare the sales revenue of on current period against previous period? - - -Data profilling ----- -Data profiling are bonus. - -What tools or techniques you would use to profile the data? - -What results of the data profiling can impact on your analysis and design? - - - -Architecture ------ -If this pipeline is to be build for a real live environment. -What would be your recommendations in terms of tools and process? - -Would be a problem if the data from the source system is growing at 6.1-12.7% rate a month? - - - -ERD --- -![alt text](erd.png "ERD") - -Author: adilsonmendonca + +### About The Project +This Batch ETL pipeline aims to parse semi-structured data like .tbl files, +stage it to a MySQL database, and then load it to a Star Schema +which resides on a PostgreSQL database. + +#### General Architecture +![General Architecture](docs/images/general-architecture.png) + +#### DAG Representation +![DAG Representation](docs/images/dag.png) + + +#### Technology Stack Used + +* [Python 3](https://www.python.org/) +* [Airflow](https://www.airflow.org/) +* [Docker](https://www.docker.com/) +* [PostgreSQL](https://www.postgresql.org/) +* [MySQL](https://www.mysql.com/) + + +#### Network Configurations Used +* Airflow: running on `localhost` at port `8282` +* PostgreSQL: running on `localhost` at port `5432` +* MySQL: running on `locahost` at port `3306` + + +## Getting Started + +### Prerequisites +For an easy setup, the installation of this project only requires docker and docker-compose which can be +downloaded from the following links: +* [Docker](https://docs.docker.com/get-docker/) +* [Docker Compose](https://docs.docker.com/compose/install/) + +Although, for development environment, like running tests & stuff, you will need to install +Python3 and the pacakges inside the `requirements-dev.txt` file. + +Just take note that this project was tested and developed using Python 3.9.1, please check if you have the +right version installed. +``` +$ python --version +Python 3.9.1 +``` + +### Installation +Once you have the requirements above, you can clone and spin up the project with ease. +1. Clone the repo +```sh +$ git clone https://github.com/1byte-yoda/dataengineer-test +```` + +2. This step is crucial, go to the project's root directory and follow the instructions inside the data_gen. + txt file. + +3. Make sure you've done the previous step, if so, you can spin up the docker containers with the + following command. + +``` +$ docker-compose up --build +``` + +4. This step is optional, and is required for development setup. +``` +$ pip install -r requirements-dev.txt +``` + +5. Accessing the Airflow Web UI. + * Airflow was set up to run at http://localhost:8282, kindly check this link, airflow must be up and + running. You'll be prompted to log-in. + * Default credentials for airflow: + * `username: airflow` + * `password: airflow` + * Although these information can be modified inside .env file. + +#### Airflow's Login Page +![Login Page](docs/images/airflow-homepage.png) + +6. This additional step was needed before you can run the project, + * Add the connection details / credentials for PostgreSQL and MySQL databases. + * Link to connection setup page http://localhost:8282/connection/list/, go to this link. + * On the top-left area of the UI, you'll find a + (plus sign), click it to add services / configurations. + +#### Airflow's Connection List Page +![Connection List](docs/images/airflow-connection-list.png) + +#### MySQL Configuration +![MySQL Configuration](docs/images/airflow-mysql-config.png) + +#### Postgres Configuration +![Postgres Configuration](docs/images/airflow-postgres-config.png) + +#### Important +Please use "*password*" as your password for Both MySQL and Postgres configs. + +### Running Useful Commands for Development +Running the test suites: +``` +# NOTE: This will download docker containers for Postgres and MySQL testing db. +# and might take some time to finish. +# +$ make test +``` + +Formatting the whole code base following the `black` code style: +``` +$ make black +``` + +Checking for lints / un-obvious code mistakes: +``` +$ make lint +``` + +Checking the test coverage of the project. +``` +# Prints the coverage output in the CLI. +$ make cov_report + +# Saves the coverage output in a readable, HTML format. +$ make cov_report html +``` + +Checking for Security Issues. +``` +$ make scan_security +``` + +For CI/CD Setup, I also created a checklist which +was a series of steps/checks that can raise failures +before we can deploy the project. +``` +$ make checklist +``` + +#### TODO: +* Add Travis CI + +#### Dev Notes: +* Each services have their own folders, you can configure your dev environment + there as needed. +* You can change the logging level by modifying the AIRFLOW__CORE__LOGGING_LEVEL variable +in the docker-compose.yml / .env file + + +### Common Errors +* Volume data was not found by docker-compose + * Make sure you already ran the data_gen.txt command. + * If that's not the case, run docker system prune -a to clean up docker images, then build the + containers again +* pytest throws docker.errors.APIError + * This is due to unstable network connection with the docker server, + try running the test again. + * Make sure airflow and other dependent services are down by running: + `docker-compose down`, then run the test again + \ No newline at end of file diff --git a/bonus_etl_data_gen.txt b/bonus_etl_data_gen.txt deleted file mode 100644 index 8404309..0000000 --- a/bonus_etl_data_gen.txt +++ /dev/null @@ -1 +0,0 @@ -VXNlIHRoZSBpbnN0cnVjdGlvbiBvbiBodHRwczovL2dpdGh1Yi5jb20vbG92YXNvYS9UUENILXNxbGl0ZSB0byBnZW5lcmF0ZSB5b3VyIGRhdGEgZmlsZXMuClRoZSBkYXRhLnppcCBmaWxlIHdlcmUgZ2VuZXJhdGVkIHdpdGggc2NhbGUgZmFjdG9yIG9mIDAuMDEKUGxlYXNlLCBlbmNvZGUgeW91ciBmaWxlIHdpdGggdGhlIGluc3RydWN0aW9uIHlvdSB1c2VkIHRvIGdlbmVyYXRlIHRoZSBkYXRhIGZpbGVzLgo= \ No newline at end of file diff --git a/data.zip b/data.zip deleted file mode 100644 index 44a6834..0000000 Binary files a/data.zip and /dev/null differ diff --git a/data_gen.txt b/data_gen.txt new file mode 100644 index 0000000..d634d6a --- /dev/null +++ b/data_gen.txt @@ -0,0 +1 @@ +SnVzdCBraWRkaW5nIDpQIQoKWW91IGNhbiBmaW5kIHRoZSBpbnN0cnVjdGlvbnMgaW4gdGhlIC5kZXZ0IGZpbGUgaW5zaWRlIHRoZSBwcm9qZWN0J3Mgcm9vdCBkaXJlY3RvcnkgYXMgd2VsbC4KCllvdSBjYW4gcmV2aWV3IHRoZSBaZW4gb2YgUHl0aG9uIHByaW5jaXBsZXMgZm9yIG5vdywgaWYgeW91IHdhbnQ/CgpUaGUgWmVuIG9mIFB5dGhvbiwgYnkgVGltIFBldGVycwoKQmVhdXRpZnVsIGlzIGJldHRlciB0aGFuIHVnbHkuCkV4cGxpY2l0IGlzIGJldHRlciB0aGFuIGltcGxpY2l0LgpTaW1wbGUgaXMgYmV0dGVyIHRoYW4gY29tcGxleC4KQ29tcGxleCBpcyBiZXR0ZXIgdGhhbiBjb21wbGljYXRlZC4KRmxhdCBpcyBiZXR0ZXIgdGhhbiBuZXN0ZWQuClNwYXJzZSBpcyBiZXR0ZXIgdGhhbiBkZW5zZS4KUmVhZGFiaWxpdHkgY291bnRzLgpTcGVjaWFsIGNhc2VzIGFyZW4ndCBzcGVjaWFsIGVub3VnaCB0byBicmVhayB0aGUgcnVsZXMuCkFsdGhvdWdoIHByYWN0aWNhbGl0eSBiZWF0cyBwdXJpdHkuCkVycm9ycyBzaG91bGQgbmV2ZXIgcGFzcyBzaWxlbnRseS4KVW5sZXNzIGV4cGxpY2l0bHkgc2lsZW5jZWQuCkluIHRoZSBmYWNlIG9mIGFtYmlndWl0eSwgcmVmdXNlIHRoZSB0ZW1wdGF0aW9uIHRvIGd1ZXNzLgpUaGVyZSBzaG91bGQgYmUgb25lLS0gYW5kIHByZWZlcmFibHkgb25seSBvbmUgLS1vYnZpb3VzIHdheSB0byBkbyBpdC4KQWx0aG91Z2ggdGhhdCB3YXkgbWF5IG5vdCBiZSBvYnZpb3VzIGF0IGZpcnN0IHVubGVzcyB5b3UncmUgRHV0Y2guCk5vdyBpcyBiZXR0ZXIgdGhhbiBuZXZlci4KQWx0aG91Z2ggbmV2ZXIgaXMgb2Z0ZW4gYmV0dGVyIHRoYW4gKnJpZ2h0KiBub3cuCklmIHRoZSBpbXBsZW1lbnRhdGlvbiBpcyBoYXJkIHRvIGV4cGxhaW4sIGl0J3MgYSBiYWQgaWRlYS4KSWYgdGhlIGltcGxlbWVudGF0aW9uIGlzIGVhc3kgdG8gZXhwbGFpbiwgaXQgbWF5IGJlIGEgZ29vZCBpZGVhLgpOYW1lc3BhY2VzIGFyZSBvbmUgaG9ua2luZyBncmVhdCBpZGVhIC0tIGxldCdzIGRvIG1vcmUgb2YgdGhvc2Uh \ No newline at end of file diff --git a/ddl.sql b/ddl.sql deleted file mode 100644 index ca270f5..0000000 --- a/ddl.sql +++ /dev/null @@ -1,94 +0,0 @@ -CREATE TABLE NATION ( - N_NATIONKEY INTEGER PRIMARY KEY NOT NULL, - N_NAME TEXT NOT NULL, - N_REGIONKEY INTEGER NOT NULL, - N_COMMENT TEXT, - FOREIGN KEY (N_REGIONKEY) REFERENCES REGION(R_REGIONKEY) -); - -CREATE TABLE REGION ( - R_REGIONKEY INTEGER PRIMARY KEY NOT NULL, - R_NAME TEXT NOT NULL, - R_COMMENT TEXT -); - -CREATE TABLE PART ( - P_PARTKEY INTEGER PRIMARY KEY NOT NULL, - P_NAME TEXT NOT NULL, - P_MFGR TEXT NOT NULL, - P_BRAND TEXT NOT NULL, - P_TYPE TEXT NOT NULL, - P_SIZE INTEGER NOT NULL, - P_CONTAINER TEXT NOT NULL, - P_RETAILPRICE INTEGER NOT NULL, - P_COMMENT TEXT NOT NULL -); - -CREATE TABLE SUPPLIER ( - S_SUPPKEY INTEGER PRIMARY KEY NOT NULL, - S_NAME TEXT NOT NULL, - S_ADDRESS TEXT NOT NULL, - S_NATIONKEY INTEGER NOT NULL, - S_PHONE TEXT NOT NULL, - S_ACCTBAL INTEGER NOT NULL, - S_COMMENT TEXT NOT NULL, - FOREIGN KEY (S_NATIONKEY) REFERENCES NATION(N_NATIONKEY) -); - -CREATE TABLE PARTSUPP ( - PS_PARTKEY INTEGER NOT NULL, - PS_SUPPKEY INTEGER NOT NULL, - PS_AVAILQTY INTEGER NOT NULL, - PS_SUPPLYCOST INTEGER NOT NULL, - PS_COMMENT TEXT NOT NULL, - PRIMARY KEY (PS_PARTKEY, PS_SUPPKEY), - FOREIGN KEY (PS_SUPPKEY) REFERENCES SUPPLIER(S_SUPPKEY), - FOREIGN KEY (PS_PARTKEY) REFERENCES PART(P_PARTKEY) -); - -CREATE TABLE CUSTOMER ( - C_CUSTKEY INTEGER PRIMARY KEY NOT NULL, - C_NAME TEXT NOT NULL, - C_ADDRESS TEXT NOT NULL, - C_NATIONKEY INTEGER NOT NULL, - C_PHONE TEXT NOT NULL, - C_ACCTBAL INTEGER NOT NULL, - C_MKTSEGMENT TEXT NOT NULL, - C_COMMENT TEXT NOT NULL, - FOREIGN KEY (C_NATIONKEY) REFERENCES NATION(N_NATIONKEY) -); - -CREATE TABLE ORDERS ( - O_ORDERKEY INTEGER PRIMARY KEY NOT NULL, - O_CUSTKEY INTEGER NOT NULL, - O_ORDERSTATUS TEXT NOT NULL, - O_TOTALPRICE INTEGER NOT NULL, - O_ORDERDATE DATE NOT NULL, - O_ORDERPRIORITY TEXT NOT NULL, - O_CLERK TEXT NOT NULL, - O_SHIPPRIORITY INTEGER NOT NULL, - O_COMMENT TEXT NOT NULL, - FOREIGN KEY (O_CUSTKEY) REFERENCES CUSTOMER(C_CUSTKEY) -); - -CREATE TABLE LINEITEM ( - L_ORDERKEY INTEGER NOT NULL, - L_PARTKEY INTEGER NOT NULL, - L_SUPPKEY INTEGER NOT NULL, - L_LINENUMBER INTEGER NOT NULL, - L_QUANTITY INTEGER NOT NULL, - L_EXTENDEDPRICE INTEGER NOT NULL, - L_DISCOUNT INTEGER NOT NULL, - L_TAX INTEGER NOT NULL, - L_RETURNFLAG TEXT NOT NULL, - L_LINESTATUS TEXT NOT NULL, - L_SHIPDATE DATE NOT NULL, - L_COMMITDATE DATE NOT NULL, - L_RECEIPTDATE DATE NOT NULL, - L_SHIPINSTRUCT TEXT NOT NULL, - L_SHIPMODE TEXT NOT NULL, - L_COMMENT TEXT NOT NULL, - PRIMARY KEY (L_ORDERKEY, L_LINENUMBER), - FOREIGN KEY (L_ORDERKEY) REFERENCES ORDERS(O_ORDERKEY), - FOREIGN KEY (L_PARTKEY, L_SUPPKEY) REFERENCES PARTSUPP(PS_PARTKEY, PS_SUPPKEY) -); diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..3070311 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,107 @@ +version: "3" + +# ====================================== AIRFLOW ENVIRONMENT VARIABLES ======================================= +x-airflow-image: &airflow_image apache/airflow:2.1.0-python3.6 + +x-airflow-common: + &airflow-common + image: *airflow_image + env_file: + - ./services/airflow/.env + build: + context: ./services/airflow + args: + AIRFLOW_BASE_IMAGE: *airflow_image + volumes: + - ./retail_etl/dags:/opt/airflow/dags + - ./logs:/opt/airflow/logs + - ./retail_etl/plugins:/opt/airflow/plugins + - ./data:/opt/data + user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}" + depends_on: + &airflow-common-depends-on + airflow_db: + condition: service_healthy +# ====================================== /AIRFLOW ENVIRONMENT VARIABLES ====================================== + +services: + airflow_db: + image: postgres:13-alpine + env_file: + - ./services/airflow_db/.env + expose: + - "5432" + healthcheck: + test: [ "CMD", "pg_isready", "-U", "airflow" ] + interval: 5s + retries: 5 + restart: always + volumes: + - psql-db:/var/lib/psql-db + + mysql: + build: + context: ./services/mysql + env_file: + - ./services/mysql/.env + ports: + - '3306:3306' + healthcheck: + test: [ "CMD", "mysqladmin" ,"ping", "-h", "localhost" ] + timeout: 5s + retries: 5 + restart: always + volumes: + - mysql-db:/var/lib/mysql-db + + postgres: + build: + context: ./services/postgres_dw + env_file: + - ./services/postgres_dw/.env + expose: + - "5433" + ports: + - "5433:5432" + restart: always + volumes: + - psql-dw:/var/lib/psql-dw + + airflow-webserver: + <<: *airflow-common + command: webserver + ports: + - "8282:8080" + healthcheck: + test: [ "CMD", "curl", "--fail", "http://localhost:8080/health" ] + interval: 10s + timeout: 10s + retries: 5 + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-scheduler: + <<: *airflow-common + command: scheduler + healthcheck: + test: [ "CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"' ] + interval: 10s + timeout: 10s + retries: 5 + restart: always + depends_on: + <<: *airflow-common-depends-on + airflow-init: + condition: service_completed_successfully + + airflow-init: + <<: *airflow-common + command: version + +volumes: + psql-db: + mysql-db: + psql-dw: \ No newline at end of file diff --git a/docs/images/airflow-connection-list.png b/docs/images/airflow-connection-list.png new file mode 100644 index 0000000..3e73bbf Binary files /dev/null and b/docs/images/airflow-connection-list.png differ diff --git a/docs/images/airflow-homepage.png b/docs/images/airflow-homepage.png new file mode 100644 index 0000000..e1a5990 Binary files /dev/null and b/docs/images/airflow-homepage.png differ diff --git a/docs/images/airflow-mysql-config.png b/docs/images/airflow-mysql-config.png new file mode 100644 index 0000000..fd993c0 Binary files /dev/null and b/docs/images/airflow-mysql-config.png differ diff --git a/docs/images/airflow-postgres-config.png b/docs/images/airflow-postgres-config.png new file mode 100644 index 0000000..57da3be Binary files /dev/null and b/docs/images/airflow-postgres-config.png differ diff --git a/docs/images/dag.png b/docs/images/dag.png new file mode 100644 index 0000000..9eef338 Binary files /dev/null and b/docs/images/dag.png differ diff --git a/docs/images/dw-star-schema.png b/docs/images/dw-star-schema.png new file mode 100644 index 0000000..518b52f Binary files /dev/null and b/docs/images/dw-star-schema.png differ diff --git a/docs/images/general-architecture.png b/docs/images/general-architecture.png new file mode 100644 index 0000000..95bf45f Binary files /dev/null and b/docs/images/general-architecture.png differ diff --git a/erd.png b/erd.png deleted file mode 100644 index 59a592e..0000000 Binary files a/erd.png and /dev/null differ diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 0000000..5515a4a --- /dev/null +++ b/mypy.ini @@ -0,0 +1,5 @@ +[mypy] +python_version = 3.9 +warn_return_any = True +ignore_missing_imports = True +exclude = "(data|logs|venv)/$" diff --git a/pytest.ini b/pytest.ini new file mode 100644 index 0000000..880aeba --- /dev/null +++ b/pytest.ini @@ -0,0 +1,3 @@ +[pytest] +python_files = *_test.py +python_classes = *Test diff --git a/requirements-dev.txt b/requirements-dev.txt new file mode 100644 index 0000000..77dec31 --- /dev/null +++ b/requirements-dev.txt @@ -0,0 +1,35 @@ +# Testing +pytest==6.2.2 +pytest-cov==2.11.1 +pytest-docker-tools==3.1.0 +pytest-mock==3.6.1 + +# Data Wrangling +pandas==1.1.4 + +# Typehints +mypy==0.910 + +# Lints +flake8==3.8.4 + +# Black formatting +black==20.8b1 + +# Security Check +bandit==1.7.0 + +# Static Type Check +mypy==0.910 + +# Airflow +apache-airflow==2.1.0 +apache-airflow-providers-postgres==2.0.0 +apache-airflow-providers-mysql==2.0.0 + +# Kafka Python +kafka-python>=1.4.4 + +# MySQL +mysql-connector-python==8.0.11 +mysqlclient==2.0.3 \ No newline at end of file diff --git a/retail_etl/__init__.py b/retail_etl/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/retail_etl/dags/retail_dag.py b/retail_etl/dags/retail_dag.py new file mode 100644 index 0000000..ce4c228 --- /dev/null +++ b/retail_etl/dags/retail_dag.py @@ -0,0 +1,307 @@ +from datetime import datetime, timedelta +from pathlib import Path + +import airflow.utils.dates +from airflow.models import DAG +from airflow.operators.dummy import DummyOperator + +from stage_queries.helper import get_table_upsert_query +from dw_queries.helper import get_dw_table_upsert_query, get_dw_table_select_query +from custom_operators.tbl_to_staging.tbl_to_staging import TblToStageOperator +from custom_operators.postgres_dw_operator.postgres_dw_operator import PostgresDwOperator + + +# TODO: DRY this module + + +_SIMULATED_DATA_FOLDER = Path(__file__).parent.parent.parent / "data" +_NOW = datetime.now() +_DEFAULT_ARGS = { + "owner": "1byteyoda@makr.dev", + "depends_on_past": False, + "start_date": datetime(_NOW.year, _NOW.month, _NOW.day), + "email": ["dmc.markr@gmail.com"], + "email_on_failure": False, + "email_on_retry": False, + "retries": 1, + "retry_delay": timedelta(minutes=1), +} + + +dag = DAG( + dag_id="retail_dag", + description="This DAG parses data from a set of .tbl files, stage it to a MySQL DB, then store to a DW", + schedule_interval=timedelta(1), + start_date=airflow.utils.dates.days_ago(1), + catchup=False, + default_args=_DEFAULT_ARGS, +) + + +######################################### +# OPERATOR DEFINITIONS # +######################################### + +begin_execution = DummyOperator(task_id="begin_execution", dag=dag) + +region_tbl_to_staging_db = TblToStageOperator( + task_id="region_tbl_to_staging_db", + pandas_read_args={ + # TODO: Put in Airflow Vars. + "filepath_or_buffer": _SIMULATED_DATA_FOLDER / "region.tbl", + "chunksize": 10000, + "sep": "|", + "iterator": True, + "table_name": "region", + }, + data_load_args={ + "mysql_conn_id": "mysql_default", + "table_name": "region", + "upsert_query": get_table_upsert_query(table_name="region"), + "logger_name": "airflow.task", + }, + dag=dag, +) + +nation_tbl_to_staging_db = TblToStageOperator( + task_id="nation_tbl_to_staging_db", + pandas_read_args={ + # TODO: Put in Airflow Vars. + "filepath_or_buffer": _SIMULATED_DATA_FOLDER / "nation.tbl", + "chunksize": 10000, + "sep": "|", + "iterator": True, + "table_name": "nation", + }, + data_load_args={ + "mysql_conn_id": "mysql_default", + "table_name": "nation", + "upsert_query": get_table_upsert_query(table_name="nation"), + "logger_name": "airflow.task", + }, + dag=dag, +) + +part_tbl_to_staging_db = TblToStageOperator( + task_id="part_tbl_to_staging_db", + pandas_read_args={ + # TODO: Put in Airflow Vars. + "filepath_or_buffer": _SIMULATED_DATA_FOLDER / "part.tbl", + "chunksize": 10000, + "sep": "|", + "iterator": True, + "table_name": "part", + }, + data_load_args={ + "mysql_conn_id": "mysql_default", + "table_name": "part", + "upsert_query": get_table_upsert_query(table_name="part"), + "logger_name": "airflow.task", + }, + dag=dag, +) + +supplier_tbl_to_staging_db = TblToStageOperator( + task_id="supplier_tbl_to_staging_db", + pandas_read_args={ + # TODO: Put in Airflow Vars. + "filepath_or_buffer": _SIMULATED_DATA_FOLDER / "supplier.tbl", + "chunksize": 10000, + "sep": "|", + "iterator": True, + "table_name": "supplier", + }, + data_load_args={ + "mysql_conn_id": "mysql_default", + "table_name": "supplier", + "upsert_query": get_table_upsert_query(table_name="supplier"), + "logger_name": "airflow.task", + }, + dag=dag, +) + +partsupp_tbl_to_staging_db = TblToStageOperator( + task_id="partsupp_tbl_to_staging_db", + pandas_read_args={ + # TODO: Put in Airflow Vars. + "filepath_or_buffer": _SIMULATED_DATA_FOLDER / "partsupp.tbl", + "chunksize": 10000, + "sep": "|", + "iterator": True, + "table_name": "partsupp", + }, + data_load_args={ + "mysql_conn_id": "mysql_default", + "table_name": "partsupp", + "upsert_query": get_table_upsert_query(table_name="partsupp"), + "logger_name": "airflow.task", + }, + dag=dag, +) + +customer_tbl_to_staging_db = TblToStageOperator( + task_id="customer_tbl_to_staging_db", + pandas_read_args={ + # TODO: Put in Airflow Vars. + "filepath_or_buffer": _SIMULATED_DATA_FOLDER / "customer.tbl", + "chunksize": 10000, + "sep": "|", + "iterator": True, + "table_name": "customer", + }, + data_load_args={ + "mysql_conn_id": "mysql_default", + "table_name": "customer", + "upsert_query": get_table_upsert_query(table_name="customer"), + "logger_name": "airflow.task", + }, + dag=dag, +) + +orders_tbl_to_staging_db = TblToStageOperator( + task_id="orders_tbl_to_staging_db", + pandas_read_args={ + # TODO: Put in Airflow Vars. + "filepath_or_buffer": _SIMULATED_DATA_FOLDER / "orders.tbl", + "chunksize": 10000, + "sep": "|", + "iterator": True, + "table_name": "orders", + }, + data_load_args={ + "mysql_conn_id": "mysql_default", + "table_name": "orders", + "upsert_query": get_table_upsert_query(table_name="orders"), + "logger_name": "airflow.task", + }, + dag=dag, +) + +lineitem_tbl_to_staging_db = TblToStageOperator( + task_id="lineitem_tbl_to_staging_db", + pandas_read_args={ + # TODO: Put in Airflow Vars. + "filepath_or_buffer": _SIMULATED_DATA_FOLDER / "lineitem.tbl", + "chunksize": 10000, + "sep": "|", + "iterator": True, + "table_name": "lineitem", + }, + data_load_args={ + "mysql_conn_id": "mysql_default", + "table_name": "lineitem", + "upsert_query": get_table_upsert_query(table_name="lineitem"), + "logger_name": "airflow.task", + }, + dag=dag, +) + +dim_part_to_postgres_dw = PostgresDwOperator( + task_id="dim_part_to_postgres_dw", + pandas_read_config={ + "sql": get_dw_table_select_query(table_name="dim_part"), + "chunksize": 10000, + "mysql_conn_id": "mysql_default", + }, + postgres_load_config={ + "logger_name": "airflow.task", + "upsert_query": get_dw_table_upsert_query(table_name="dim_part"), + "table_name": "dim_part", + "postgres_conn_id": "postgres_default", + }, + dag=dag, +) + + +dim_supplier_to_postgres_dw = PostgresDwOperator( + task_id="dim_supplier_to_postgres_dw", + pandas_read_config={ + "sql": get_dw_table_select_query(table_name="dim_supplier"), + "chunksize": 10000, + "mysql_conn_id": "mysql_default", + }, + postgres_load_config={ + "logger_name": "airflow.task", + "upsert_query": get_dw_table_upsert_query(table_name="dim_supplier"), + "table_name": "dim_supplier", + "postgres_conn_id": "postgres_default", + }, + dag=dag, +) + + +dim_customer_to_postgres_dw = PostgresDwOperator( + task_id="dim_customer_to_postgres_dw", + pandas_read_config={ + "sql": get_dw_table_select_query(table_name="dim_customer"), + "chunksize": 10000, + "mysql_conn_id": "mysql_default", + }, + postgres_load_config={ + "logger_name": "airflow.task", + "upsert_query": get_dw_table_upsert_query(table_name="dim_customer"), + "table_name": "dim_customer", + "postgres_conn_id": "postgres_default", + }, + dag=dag, +) + + +dim_date_to_postgres_dw = PostgresDwOperator( + task_id="dim_date_to_postgres_dw", + pandas_read_config={ + "sql": get_dw_table_select_query(table_name="dim_date"), + "chunksize": 10000, + "mysql_conn_id": "mysql_default", + }, + postgres_load_config={ + "logger_name": "airflow.task", + "upsert_query": get_dw_table_upsert_query(table_name="dim_date"), + "table_name": "dim_date", + "postgres_conn_id": "postgres_default", + }, + dag=dag, +) + + +fact_lineitem_to_postgres_dw = PostgresDwOperator( + task_id="fact_lineitem_to_postgres_dw", + pandas_read_config={ + "sql": get_dw_table_select_query(table_name="fact_lineitem"), + "chunksize": 15000, + "mysql_conn_id": "mysql_default", + }, + postgres_load_config={ + "logger_name": "airflow.task", + "upsert_query": get_dw_table_upsert_query(table_name="fact_lineitem"), + "table_name": "fact_lineitem", + "postgres_conn_id": "postgres_default", + }, + dag=dag, +) + + +end_execution = DummyOperator(task_id="end_execution", dag=dag) + +######################################### +# NODE CONNECTIONS # +######################################### +begin_execution.set_downstream(region_tbl_to_staging_db) +region_tbl_to_staging_db.set_downstream(nation_tbl_to_staging_db) +nation_tbl_to_staging_db.set_downstream( + [customer_tbl_to_staging_db, supplier_tbl_to_staging_db, part_tbl_to_staging_db] +) +customer_tbl_to_staging_db.set_downstream(orders_tbl_to_staging_db) +supplier_tbl_to_staging_db.set_downstream(partsupp_tbl_to_staging_db) +part_tbl_to_staging_db.set_downstream(partsupp_tbl_to_staging_db) +orders_tbl_to_staging_db.set_downstream(lineitem_tbl_to_staging_db) +partsupp_tbl_to_staging_db.set_downstream(lineitem_tbl_to_staging_db) +lineitem_tbl_to_staging_db.set_downstream(dim_part_to_postgres_dw) +lineitem_tbl_to_staging_db.set_downstream(dim_supplier_to_postgres_dw) +lineitem_tbl_to_staging_db.set_downstream(dim_customer_to_postgres_dw) +lineitem_tbl_to_staging_db.set_downstream(dim_date_to_postgres_dw) +dim_part_to_postgres_dw.set_downstream(fact_lineitem_to_postgres_dw) +dim_supplier_to_postgres_dw.set_downstream(fact_lineitem_to_postgres_dw) +dim_customer_to_postgres_dw.set_downstream(fact_lineitem_to_postgres_dw) +dim_date_to_postgres_dw.set_downstream(fact_lineitem_to_postgres_dw) +fact_lineitem_to_postgres_dw >> end_execution diff --git a/retail_etl/plugins/__init__.py b/retail_etl/plugins/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/retail_etl/plugins/custom_operators/__init__.py b/retail_etl/plugins/custom_operators/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/retail_etl/plugins/custom_operators/postgres_dw_operator/__init__.py b/retail_etl/plugins/custom_operators/postgres_dw_operator/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/retail_etl/plugins/custom_operators/postgres_dw_operator/helper.py b/retail_etl/plugins/custom_operators/postgres_dw_operator/helper.py new file mode 100644 index 0000000..7514a91 --- /dev/null +++ b/retail_etl/plugins/custom_operators/postgres_dw_operator/helper.py @@ -0,0 +1,78 @@ +import traceback +from datetime import timedelta +from typing import Dict, Optional, Iterator +import logging + +import pandas as pd +import pendulum +from airflow.providers.mysql.hooks.mysql import MySqlHook +from airflow.providers.postgres.hooks.postgres import PostgresHook + + +def _get_logger(logger_name: Optional[str]) -> logging.Logger: + logger = logging.getLogger(logger_name if logger_name else "airflow.task") + return logger + + +def _create_mysql_connection(mysql_conn_id: str): + mysql_hook = MySqlHook(mysql_conn_id=mysql_conn_id) + mysql_conn = mysql_hook.get_conn() + return mysql_conn + + +def _create_postgres_connection(postgres_conn_id: str): + postgres_hook = PostgresHook(postgres_conn_id=postgres_conn_id) + postgres_conn = postgres_hook.get_conn() + return postgres_conn + + +def get_dataframe(execution_ts: pendulum.DateTime, **pandas_read_args: Dict) -> Iterator[pd.DataFrame]: + """Performs Select query to get a DataFrame from MySQL DB.""" + connection = _create_mysql_connection(mysql_conn_id=pandas_read_args.pop("mysql_conn_id")) + query = pandas_read_args.pop("sql").format( + today=execution_ts + timedelta(minutes=15), yesterday=execution_ts - timedelta(days=1) + ) + df_batches = pd.read_sql(**pandas_read_args, sql=query, con=connection) + return df_batches + + +def load_to_postgres_dw( + df_batches: Iterator[pd.DataFrame], execution_ts: pendulum.DateTime, **data_load_args: Dict +) -> int: + """ + Loads batches of data to Postgres Data Warehouse. + + :param df_batches: An iterator like object + which contains the batches of DataFrames to be loaded. + :param data_load_args: The arguments needed to connect + and perform insert query into the Database. + :param execution_ts: The timestamp when this function was called. + + :return: Total number of rows successfully loaded. + """ + logger = _get_logger(logger_name=data_load_args.get("logger_name")) + postgres_conn = _create_postgres_connection( + postgres_conn_id=data_load_args.get("postgres_default", "postgres_default") + ) + postgres_cursor = postgres_conn.cursor() + table_name = data_load_args.get("table_name") + total_inserted_rows = 0 + for idx, _df in enumerate(df_batches): # type: int, pd.DataFrame + nrows = len(_df) + logger.info(f"Inserting {nrows} rows in the {table_name} table...") + try: + rows = (tuple(row) for row in _df.itertuples(index=False)) + upsert_query = data_load_args.get("upsert_query", "") + postgres_cursor.executemany(upsert_query, rows) + postgres_conn.commit() + except Exception: + postgres_conn.rollback() + logger.info("The PostgresDwOperator process has failed", exc_info=traceback.format_exc()) + finally: + postgres_cursor.execute( + f"SELECT COUNT(CASE WHEN updated_at >= '{execution_ts}' THEN 1 ELSE NULL END) " + f"FROM {table_name};" + ) + total_inserted_rows = postgres_cursor.fetchone()[0] + postgres_conn.close() + return total_inserted_rows diff --git a/retail_etl/plugins/custom_operators/postgres_dw_operator/postgres_dw_operator.py b/retail_etl/plugins/custom_operators/postgres_dw_operator/postgres_dw_operator.py new file mode 100644 index 0000000..0c4837f --- /dev/null +++ b/retail_etl/plugins/custom_operators/postgres_dw_operator/postgres_dw_operator.py @@ -0,0 +1,23 @@ +from typing import Dict + +from airflow.models import BaseOperator + +from custom_operators.postgres_dw_operator import helper + + +class PostgresDwOperator(BaseOperator): + def __init__(self, pandas_read_config: Dict, postgres_load_config: Dict, *args, **kwargs): + super().__init__(*args, **kwargs) + self._pandas_read_config = pandas_read_config + self._postgres_load_config = postgres_load_config + + def execute(self, context: Dict): + self.log.info("PostgresDwOperator Starting...") + execution_ts = context.get("execution_date") + df_batches = helper.get_dataframe(**self._pandas_read_config, execution_ts=execution_ts) + total_inserted_rows = helper.load_to_postgres_dw( + **self._postgres_load_config, df_batches=df_batches, execution_ts=execution_ts + ) + table_name = self._postgres_load_config.get("table_name") + + self.log.info(f"Finished Loading {total_inserted_rows} rows in the {table_name} table.") diff --git a/retail_etl/plugins/custom_operators/tbl_to_staging/__init__.py b/retail_etl/plugins/custom_operators/tbl_to_staging/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/retail_etl/plugins/custom_operators/tbl_to_staging/helper.py b/retail_etl/plugins/custom_operators/tbl_to_staging/helper.py new file mode 100644 index 0000000..28b36a0 --- /dev/null +++ b/retail_etl/plugins/custom_operators/tbl_to_staging/helper.py @@ -0,0 +1,96 @@ +import traceback +from typing import Dict, Union, Optional +import logging + +import pandas as pd +import pendulum +from airflow.models import Connection +from airflow.providers.mysql.hooks.mysql import MySqlHook +from pandas.io.parsers import TextFileReader + +from custom_operators.tbl_to_staging import model + + +_HEADER_MAPPING = { + "region": model.RegionHeader(), + "nation": model.NationHeader(), + "part": model.PartHeader(), + "customer": model.CustomerHeader(), + "supplier": model.SupplierHeader(), + "orders": model.OrderHeader(), + "partsupp": model.PartSuppHeader(), + "lineitem": model.LineItemHeader(), +} + + +def _get_logger(logger_name: Optional[str]) -> logging.Logger: + logger = logging.getLogger(logger_name if logger_name else "airflow.task") + return logger + + +def _get_header( + *, table_name: str +) -> Union[ + model.RegionHeader, + model.NationHeader, + model.PartHeader, + model.CustomerHeader, + model.SupplierHeader, + model.OrderHeader, + model.PartSuppHeader, + model.LineItemHeader, + None, +]: + """Map a table name into it's header counterpart.""" + return _HEADER_MAPPING.get(table_name) + + +def _create_mysql_connection(mysql_conn_id: Optional[str]) -> Connection: + mysql_hook = MySqlHook(mysql_conn_id=mysql_conn_id if mysql_conn_id else "mysql_default") + mysql_conn = mysql_hook.get_conn() + return mysql_conn + + +def get_dataframe(table_name: str, **pandas_read_args: Dict) -> TextFileReader: + """Get the corresponding DataFrame of a given file.""" + header = _get_header(table_name=table_name) + columns = header.to_list() if header else None + return pd.read_csv(**pandas_read_args, names=columns, usecols=columns) + + +def load_to_mysql_db( + df_batches: TextFileReader, execution_ts: pendulum.DateTime, **data_load_args: Dict +) -> int: + """ + Loads batches of data to MySQL Database. + + :param df_batches: An iterator like object + which contains the batches of DataFrames to be loaded. + :param data_load_args: The arguments needed to connect + and perform insert query into the Database. + :param: execution_ts: The timestamp when this function was called. + + :return: Total number of rows successfully loaded. + """ + logger = _get_logger(logger_name=data_load_args.get("logger_name")) + mysql_conn = _create_mysql_connection(mysql_conn_id=data_load_args.get("mysql_conn_id")) + mysql_cursor = mysql_conn.cursor() + table_name = data_load_args.get("table_name") + total_inserted_rows = 0 + for idx, _df in enumerate(df_batches): # type: int, pd.DataFrame + nrows = len(_df) + logger.debug(f"Inserting {nrows} rows in the {table_name} table...") + try: + rows = (tuple(row) for row in _df.itertuples(index=False)) + mysql_cursor.executemany(data_load_args.get("upsert_query"), rows) + except Exception: + mysql_conn.rollback() + logger.error("The TblToStageOperator process has failed", exc_info=traceback.format_exc()) + finally: + mysql_conn.commit() + mysql_cursor.execute( + f"SELECT COUNT(IF(updated_at >= '{execution_ts}', 1, NULL)) FROM {table_name};" + ) + total_inserted_rows = mysql_cursor.fetchone()[0] + mysql_conn.close() + return total_inserted_rows diff --git a/retail_etl/plugins/custom_operators/tbl_to_staging/model.py b/retail_etl/plugins/custom_operators/tbl_to_staging/model.py new file mode 100644 index 0000000..368d978 --- /dev/null +++ b/retail_etl/plugins/custom_operators/tbl_to_staging/model.py @@ -0,0 +1,109 @@ +from datetime import date as date + +from dataclasses import dataclass +from typing import Optional, Text + + +@dataclass(frozen=True) +class Column: + name: str + data_type: object + + +@dataclass(frozen=True) +class Header: + def to_list(self): + return list(self.__annotations__) + + +@dataclass(init=False, frozen=True) +class RegionHeader(Header): + r_regionkey: Column = Column(name="r_region_key", data_type=int) + r_name: Column = Column(name="r_name", data_type=Text) + r_comment: Column = Column(name="r_comment", data_type=Optional[Text]) + + +@dataclass(init=False, frozen=True) +class NationHeader(Header): + n_nationkey: Column = Column(name="n_nationkey", data_type=int) + n_name: Column = Column(name="n_name", data_type=Text) + n_regionkey: Column = Column(name="n_regionkey", data_type=int) + n_comment: Column = Column(name="n_comment", data_type=Optional[Text]) + + +@dataclass(init=False, frozen=True) +class PartHeader(Header): + p_partkey: Column = Column(name="p_partkey", data_type=int) + p_name: Column = Column(name="p_name", data_type=Text) + p_mfgr: Column = Column(name="p_mfgr", data_type=Text) + p_brand: Column = Column(name="p_brand", data_type=Text) + p_type: Column = Column(name="p_type", data_type=Text) + p_size: Column = Column(name="p_size", data_type=int) + p_container: Column = Column(name="p_container", data_type=Text) + p_retailprice: Column = Column(name="p_retailprice", data_type=int) + p_comment: Column = Column(name="p_comment", data_type=Text) + + +@dataclass(init=False, frozen=True) +class SupplierHeader(Header): + s_suppkey: Column = Column(name="s_suppkey", data_type=int) + s_name: Column = Column(name="s_name", data_type=Text) + s_address: Column = Column(name="s_address", data_type=Text) + s_nationkey: Column = Column(name="s_nationkey", data_type=int) + s_phone: Column = Column(name="s_phone", data_type=Text) + s_acctbal: Column = Column(name="s_acctbal", data_type=int) + s_comment: Column = Column(name="s_comment", data_type=Text) + + +@dataclass(init=False, frozen=True) +class PartSuppHeader(Header): + ps_partkey: Column = Column(name="p_partkey", data_type=int) + ps_suppkey: Column = Column(name="p_name", data_type=int) + ps_availqty: Column = Column(name="p_mfgr", data_type=int) + ps_supplycost: Column = Column(name="p_brand", data_type=int) + ps_comment: Column = Column(name="p_type", data_type=Text) + + +@dataclass(init=False, frozen=True) +class CustomerHeader(Header): + c_custkey: Column = Column(name="c_custkey", data_type=int) + c_name: Column = Column(name="c_name", data_type=Text) + c_address: Column = Column(name="c_address", data_type=Text) + c_nationkey: Column = Column(name="c_nationkey", data_type=int) + c_phone: Column = Column(name="c_phone", data_type=Text) + c_acctbal: Column = Column(name="c_acctbal", data_type=int) + c_mktsegment: Column = Column(name="c_mktsegment", data_type=Text) + c_comment: Column = Column(name="c_comment", data_type=Text) + + +@dataclass(init=False, frozen=True) +class OrderHeader(Header): + o_ordkey: Column = Column(name="o_ordkey", data_type=int) + o_custkey: Column = Column(name="o_custkey", data_type=int) + o_orderstatus: Column = Column(name="o_orderstatus", data_type=Text) + o_totalprice: Column = Column(name="o_totalprice", data_type=int) + o_orderdate: Column = Column(name="o_orderdate", data_type=date) + o_orderpriority: Column = Column(name="o_orderpriority", data_type=Text) + o_clerk: Column = Column(name="o_clerk", data_type=Text) + o_shippriority: Column = Column(name="o_shippriority", data_type=int) + o_comment: Column = Column(name="o_comment", data_type=Text) + + +@dataclass(init=False, frozen=True) +class LineItemHeader(Header): + l_orderkey: Column = Column(name="l_orderkey", data_type=int) + l_partkey: Column = Column(name="l_partkey", data_type=int) + l_suppkey: Column = Column(name="l_suppkey", data_type=Text) + l_linenumber: Column = Column(name="l_linenumber", data_type=int) + l_quantity: Column = Column(name="l_quantity", data_type=int) + l_extendedprice: Column = Column(name="l_extendedprice", data_type=int) + l_discount: Column = Column(name="l_discount", data_type=int) + l_tax: Column = Column(name="l_tax", data_type=int) + l_returnflag: Column = Column(name="l_returnflag", data_type=Text) + l_linestatus: Column = Column(name="l_linestatus", data_type=Text) + l_shipdate: Column = Column(name="l_shipdate", data_type=date) + l_commitdate: Column = Column(name="l_commitdate", data_type=date) + l_receiptdate: Column = Column(name="l_receiptdate", data_type=date) + l_shipinstruct: Column = Column(name="l_shipinstruct", data_type=Text) + l_shipmode: Column = Column(name="l_shipmode", data_type=Text) + l_comment: Column = Column(name="l_comment", data_type=Text) diff --git a/retail_etl/plugins/custom_operators/tbl_to_staging/tbl_to_staging.py b/retail_etl/plugins/custom_operators/tbl_to_staging/tbl_to_staging.py new file mode 100644 index 0000000..494a502 --- /dev/null +++ b/retail_etl/plugins/custom_operators/tbl_to_staging/tbl_to_staging.py @@ -0,0 +1,31 @@ +from typing import Dict, Optional + +from airflow.models import BaseOperator + +from custom_operators.tbl_to_staging import helper + + +class TblToStageOperator(BaseOperator): + def __init__(self, pandas_read_args: Dict, data_load_args: Dict, *args, **kwargs): + """ + Operator to import a .tbl file to a MySQL database. + + :param pandas_read_args: The arguments that will be used for reading the .tbl files + :param data_load_args: A key/value pair with the args that will be used for loading a batch of data. + :param args: Any additional args that BaseOperator can use. + :param kwargs: Any additional keyword args that BaseOperator can use. + """ + super().__init__(*args, **kwargs) + self._data_load_args = data_load_args + self._pandas_read_args = pandas_read_args + + def execute(self, context: Optional[Dict]): + self.log.info("TblToStageOperator Starting...") + + df_batches = helper.get_dataframe(**self._pandas_read_args) + total_inserted_rows = helper.load_to_mysql_db( + **self._data_load_args, df_batches=df_batches, execution_ts=context.get("execution_date") + ) + table_name = self._data_load_args.get("table_name") + + self.log.info(f"Finished Loading {total_inserted_rows} rows in the {table_name} table.") diff --git a/retail_etl/plugins/dw_queries/__init__.py b/retail_etl/plugins/dw_queries/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/retail_etl/plugins/dw_queries/dim_customer.py b/retail_etl/plugins/dw_queries/dim_customer.py new file mode 100644 index 0000000..40ff5bd --- /dev/null +++ b/retail_etl/plugins/dw_queries/dim_customer.py @@ -0,0 +1,51 @@ +def get_upsert_query(): + return """ + INSERT INTO dim_customer( + c_id, c_name, c_address, c_nation, c_region, c_phone, c_mktsegment, c_cluster + ) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s) + ON CONFLICT (c_id) + DO UPDATE SET + (c_name, c_address, c_nation, c_region, c_phone, c_mktsegment, c_cluster) + = ( + EXCLUDED.c_name, + EXCLUDED.c_address, + EXCLUDED.c_nation, + EXCLUDED.c_region, + EXCLUDED.c_phone, + EXCLUDED.c_mktsegment, + EXCLUDED.c_cluster + ); + """ + + +def get_select_query_for_insert(): + return """ + SELECT DISTINCT + c.c_custkey AS c_id, + c.c_name, + c.c_address, + n.n_name AS c_nation, + r.r_name AS c_region, + c.c_phone, + c.c_mktsegment, + CASE + WHEN ranking BETWEEN 0 AND 0.5 + THEN "Bronze Customer" + WHEN ranking BETWEEN 0.50 AND 0.75 + THEN "Silver Customer" + WHEN ranking BETWEEN 0.75 AND 1 + THEN "Gold Customer" + END AS c_cluster + FROM customer AS c + JOIN nation AS n ON n.n_nationkey = c.c_nationkey + JOIN region AS r ON r.r_regionkey = n.n_regionkey + JOIN ( + SELECT + c_custkey, + c_acctbal, + PERCENT_RANK() OVER(ORDER BY c_acctbal) AS ranking + FROM customer ORDER BY c_acctbal + ) AS ranking ON ranking.c_custkey = c.c_custkey + WHERE c.updated_at BETWEEN '{yesterday}' AND '{today}' + """ diff --git a/retail_etl/plugins/dw_queries/dim_date.py b/retail_etl/plugins/dw_queries/dim_date.py new file mode 100644 index 0000000..114373a --- /dev/null +++ b/retail_etl/plugins/dw_queries/dim_date.py @@ -0,0 +1,93 @@ +def get_upsert_query(): + return """ + INSERT INTO dim_date( + d_id, + d_date, + d_dayofweek, + d_month, + d_year, + d_monthname, + d_yearweek, + d_yearmonth, + d_quarter, + d_yearquarter + ) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + ON CONFLICT (d_id) + DO UPDATE SET + ( + d_date, + d_dayofweek, + d_month, + d_year, + d_monthname, + d_yearweek, + d_yearmonth, + d_quarter, + d_yearquarter + ) + = ( + EXCLUDED.d_date, + EXCLUDED.d_dayofweek, + EXCLUDED.d_month, + EXCLUDED.d_year, + EXCLUDED.d_monthname, + EXCLUDED.d_yearweek, + EXCLUDED.d_yearmonth, + EXCLUDED.d_quarter, + EXCLUDED.d_yearquarter + ); + """ + + +def get_select_query_for_insert(): + return f""" + SELECT + DATE_FORMAT(o.o_orderdate, "%Y%m%d") AS d_id, + o.o_orderdate AS d_date, + DAYOFWEEK(o.o_orderdate) AS d_dayofweek, + MONTH(o.o_orderdate) AS d_month, + YEAR(o.o_orderdate) AS d_year, + MONTHNAME(o.o_orderdate) AS d_monthname, + YEARWEEK(o.o_orderdate) AS d_yearweek, + DATE_FORMAT(o.o_orderdate, '%Y-%m') AS d_yearmonth, + CONCAT('Q', QUARTER(o.o_orderdate)) AS d_quarter, + CONCAT(YEAR(o.o_orderdate), '-Q', QUARTER(o.o_orderdate)) AS d_yearquarter + FROM orders AS o + JOIN lineitem AS l ON l.l_orderkey = o.o_orderkey + GROUP BY d_date + + UNION + + SELECT + DATE_FORMAT(l.l_commitdate, "%Y%m%d") AS d_id, + l.l_commitdate AS d_date, + DAYOFWEEK(l.l_commitdate) AS d_dayofweek, + MONTH(l.l_commitdate) AS d_month, + YEAR(l.l_commitdate) AS d_year, + MONTHNAME(l.l_commitdate) AS d_monthname, + YEARWEEK(l.l_commitdate) AS d_yearweek, + DATE_FORMAT(l.l_commitdate, '%Y-%m') AS d_yearmonth, + CONCAT('Q', QUARTER(l.l_commitdate)) AS d_quarter, + CONCAT(YEAR(l.l_commitdate), '-Q', QUARTER(l.l_commitdate)) AS d_yearquarter + FROM orders AS o + JOIN lineitem AS l ON l.l_orderkey = o.o_orderkey + GROUP BY d_date + + UNION + + SELECT + DATE_FORMAT(l.l_receiptdate, "%Y%m%d") AS d_id, + l.l_receiptdate AS d_date, + DAYOFWEEK(l.l_receiptdate) AS d_dayofweek, + MONTH(l.l_receiptdate) AS d_month, + YEAR(l.l_receiptdate) AS d_year, + MONTHNAME(l.l_receiptdate) AS d_monthname, + YEARWEEK(l.l_receiptdate) AS d_yearweek, + DATE_FORMAT(l.l_receiptdate, '%Y-%m') AS d_yearmonth, + CONCAT('Q', QUARTER(l.l_receiptdate)) AS d_quarter, + CONCAT(YEAR(l.l_receiptdate), '-Q', QUARTER(l.l_receiptdate)) AS d_yearquarter + FROM orders AS o + JOIN lineitem AS l ON l.l_orderkey = o.o_orderkey + GROUP BY d_date + """ diff --git a/retail_etl/plugins/dw_queries/dim_part.py b/retail_etl/plugins/dw_queries/dim_part.py new file mode 100644 index 0000000..59c6842 --- /dev/null +++ b/retail_etl/plugins/dw_queries/dim_part.py @@ -0,0 +1,35 @@ +def get_upsert_query(): + return """ + INSERT INTO dim_part( + p_id, p_name, p_mfgr, p_brand, p_type, p_size, p_container + ) + VALUES (%s, %s, %s, %s, %s, %s, %s) + ON CONFLICT (p_id) + DO UPDATE SET + (p_name, p_mfgr, p_brand, p_type, p_size, p_container) + = ( + EXCLUDED.p_name, + EXCLUDED.p_mfgr, + EXCLUDED.p_brand, + EXCLUDED.p_type, + EXCLUDED.p_size, + EXCLUDED.p_container + ); + """ + + +def get_select_query_for_insert(): + return """ + SELECT DISTINCT + p.p_partkey AS p_id, + p.p_name, + p.p_mfgr, + p.p_brand, + p.p_type, + p.p_size, + p.p_container + FROM part AS p + WHERE p.updated_at + BETWEEN '{yesterday}' + AND '{today}' + """ diff --git a/retail_etl/plugins/dw_queries/dim_supplier.py b/retail_etl/plugins/dw_queries/dim_supplier.py new file mode 100644 index 0000000..9f3c4f5 --- /dev/null +++ b/retail_etl/plugins/dw_queries/dim_supplier.py @@ -0,0 +1,33 @@ +def get_upsert_query(): + return """ + INSERT INTO dim_supplier( + s_id, s_name, s_address, s_nation, s_region, s_phone + ) + VALUES (%s, %s, %s, %s, %s, %s) + ON CONFLICT (s_id) + DO UPDATE SET + (s_name, s_address, s_nation, s_region, s_phone) + = ( + EXCLUDED.s_name, + EXCLUDED.s_address, + EXCLUDED.s_nation, + EXCLUDED.s_region, + EXCLUDED.s_phone + ); + """ + + +def get_select_query_for_insert(): + return """ + SELECT DISTINCT + s.s_suppkey AS s_id, + s.s_name, + s.s_address, + n.n_name AS s_nation, + r.r_name AS s_region, + s.s_phone + FROM supplier AS s + JOIN nation AS n ON n.n_nationkey = s.s_nationkey + JOIN region AS r ON r.r_regionkey = n.n_regionkey + WHERE s.updated_at BETWEEN '{yesterday}' AND '{today}' + """ diff --git a/retail_etl/plugins/dw_queries/fact_lineitem.py b/retail_etl/plugins/dw_queries/fact_lineitem.py new file mode 100644 index 0000000..0b44d02 --- /dev/null +++ b/retail_etl/plugins/dw_queries/fact_lineitem.py @@ -0,0 +1,79 @@ +def get_upsert_query(): + return """ + INSERT INTO fact_lineitem( + l_linenumber, + l_orderkey, + l_partkey, + l_suppkey, + l_custkey, + l_orderdatekey, + l_commitdatekey, + l_receiptdatekey, + l_shipmode, + l_quantity, + l_extendedprice, + l_discount, + l_revenue, + l_tax + ) + VALUES ( + %s, + %s, + (SELECT p_partkey from dim_part WHERE p_id=%s), + (SELECT s_suppkey from dim_supplier WHERE s_id=%s), + (SELECT c_custkey from dim_customer WHERE c_id=%s), + (SELECT d_datekey from dim_date WHERE d_id=%s), + (SELECT d_datekey from dim_date WHERE d_id=%s), + (SELECT d_datekey from dim_date WHERE d_id=%s), + %s, %s, %s, %s, %s, %s + ) + ON CONFLICT ( + l_partkey, + l_suppkey, + l_custkey, + l_orderdatekey, + l_commitdatekey, + l_receiptdatekey + ) + DO UPDATE SET + ( + l_shipmode, + l_quantity, + l_extendedprice, + l_discount, + l_revenue, + l_tax + ) + = ( + EXCLUDED.l_shipmode, + EXCLUDED.l_quantity, + EXCLUDED.l_extendedprice, + EXCLUDED.l_discount, + EXCLUDED.l_revenue, + EXCLUDED.l_tax + ); + """ + + +def get_select_query_for_insert(): + return """ + SELECT + l.l_linenumber, + l.l_orderkey, + l.l_partkey, + l.l_suppkey, + c.c_custkey AS l_custkey, + DATE_FORMAT(o.o_orderdate, '%Y%m%d') AS l_orderdatekey, + DATE_FORMAT(l.l_commitdate, '%Y%m%d') AS l_commitdatekey, + DATE_FORMAT(l.l_receiptdate, '%Y%m%d') AS l_receiptdatekey, + l.l_shipmode, + l.l_quantity, + l.l_extendedprice, + l.l_discount, + (l.l_quantity * l.l_extendedprice) - (l.l_extendedprice * l.l_discount) AS l_revenue, + l.l_tax + FROM orders AS o + JOIN lineitem AS l ON l.l_orderkey = o.o_orderkey + JOIN customer AS c ON c.c_custkey = o.o_custkey + WHERE l.updated_at BETWEEN '{yesterday}' AND '{today}' + """ diff --git a/retail_etl/plugins/dw_queries/helper.py b/retail_etl/plugins/dw_queries/helper.py new file mode 100644 index 0000000..2bb989b --- /dev/null +++ b/retail_etl/plugins/dw_queries/helper.py @@ -0,0 +1,50 @@ +from typing import Optional + +from dw_queries.dim_part import ( + get_upsert_query as _get_dim_part_upsert_query, + get_select_query_for_insert as _get_dim_part_select_query_for_insert, +) +from dw_queries.dim_supplier import ( + get_upsert_query as _get_dim_supplier_upsert_query, + get_select_query_for_insert as _get_dim_supplier_select_query_for_insert, +) +from dw_queries.dim_customer import ( + get_upsert_query as _get_dim_customer_upsert_query, + get_select_query_for_insert as _get_dim_customer_select_query_for_insert, +) +from dw_queries.dim_date import ( + get_upsert_query as _get_dim_date_upsert_query, + get_select_query_for_insert as _get_dim_date_select_query_for_insert, +) +from dw_queries.fact_lineitem import ( + get_upsert_query as _get_fact_lineitem_upsert_query, + get_select_query_for_insert as _get_fact_lineitem_select_query_for_insert, +) + + +_DIM_UPSERT_QUERIES = { + "dim_part": _get_dim_part_upsert_query(), + "dim_supplier": _get_dim_supplier_upsert_query(), + "dim_customer": _get_dim_customer_upsert_query(), + "dim_date": _get_dim_date_upsert_query(), + "fact_lineitem": _get_fact_lineitem_upsert_query(), +} + + +_DIM_SELECT_QUERIES = { + "dim_part": _get_dim_part_select_query_for_insert(), + "dim_supplier": _get_dim_supplier_select_query_for_insert(), + "dim_customer": _get_dim_customer_select_query_for_insert(), + "dim_date": _get_dim_date_select_query_for_insert(), + "fact_lineitem": _get_fact_lineitem_select_query_for_insert(), +} + + +def get_dw_table_upsert_query(table_name: str) -> Optional[str]: + """Get the Upsert SQL Query for a given table name for the Data Warehouse.""" + return _DIM_UPSERT_QUERIES.get(table_name) + + +def get_dw_table_select_query(table_name: str) -> Optional[str]: + """Get the Select Query to be used on inserting data to the Data Warehouse.""" + return _DIM_SELECT_QUERIES.get(table_name) diff --git a/retail_etl/plugins/stage_queries/__init__.py b/retail_etl/plugins/stage_queries/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/retail_etl/plugins/stage_queries/customer.py b/retail_etl/plugins/stage_queries/customer.py new file mode 100644 index 0000000..7841773 --- /dev/null +++ b/retail_etl/plugins/stage_queries/customer.py @@ -0,0 +1,17 @@ +def get_upsert_query(): + return """ + INSERT INTO customer( + c_custkey, c_name, c_address, c_nationkey, c_phone, c_acctbal, c_mktsegment, c_comment + ) + VALUES ( + %s, %s, %s, %s, %s, %s, %s, %s + ) + ON DUPLICATE KEY UPDATE + c_custkey = VALUES(c_custkey), + c_name = VALUES(c_name), + c_address = VALUES(c_address), + c_phone = VALUES(c_phone), + c_acctbal = VALUES(c_acctbal), + c_mktsegment = VALUES(c_mktsegment), + c_comment = VALUES(c_comment) + """ diff --git a/retail_etl/plugins/stage_queries/helper.py b/retail_etl/plugins/stage_queries/helper.py new file mode 100644 index 0000000..db25eca --- /dev/null +++ b/retail_etl/plugins/stage_queries/helper.py @@ -0,0 +1,26 @@ +from typing import Optional + +from stage_queries.region import get_upsert_query as _get_region_upsert_query +from stage_queries.customer import get_upsert_query as _get_customer_upsert_query +from stage_queries.order import get_upsert_query as _get_order_upsert_query +from stage_queries.lineitem import get_upsert_query as _get_lineitem_upsert_query +from stage_queries.nation import get_upsert_query as _get_nation_upsert_query +from stage_queries.part import get_upsert_query as _get_part_upsert_query +from stage_queries.supplier import get_upsert_query as _get_supplier_upsert_query +from stage_queries.partsupp import get_upsert_query as _get_partsupp_upsert_query + +_UPSERT_QUERIES = { + "region": _get_region_upsert_query(), + "customer": _get_customer_upsert_query(), + "orders": _get_order_upsert_query(), + "lineitem": _get_lineitem_upsert_query(), + "nation": _get_nation_upsert_query(), + "part": _get_part_upsert_query(), + "supplier": _get_supplier_upsert_query(), + "partsupp": _get_partsupp_upsert_query(), +} + + +def get_table_upsert_query(table_name: str) -> Optional[str]: + """Get the Upsert SQL Query for a given table name.""" + return _UPSERT_QUERIES.get(table_name) diff --git a/retail_etl/plugins/stage_queries/lineitem.py b/retail_etl/plugins/stage_queries/lineitem.py new file mode 100644 index 0000000..3216435 --- /dev/null +++ b/retail_etl/plugins/stage_queries/lineitem.py @@ -0,0 +1,38 @@ +def get_upsert_query(): + return """ + INSERT INTO lineitem( + l_orderkey, + l_partkey, + L_suppkey, + l_linenumber, + l_quantity, + l_extendedprice, + l_discount, + l_tax, + l_returnflag, + l_linestatus, + l_shipdate, + l_commitdate, + l_receiptdate, + l_shipinstruct, + l_shipmode, + l_comment + ) + VALUES ( + %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s + ) + ON DUPLICATE KEY UPDATE + l_linenumber = VALUES(l_linenumber), + l_quantity = VALUES(l_quantity), + l_extendedprice = VALUES(l_extendedprice), + l_discount = VALUES(l_discount), + l_tax = VALUES(l_tax), + l_returnflag = VALUES(l_returnflag), + l_linestatus = VALUES(l_linestatus), + l_shipdate = VALUES(l_shipdate), + l_commitdate = VALUES(l_commitdate), + l_receiptdate = VALUES(l_receiptdate), + l_shipinstruct = VALUES(l_shipinstruct), + l_shipmode = VALUES(l_shipmode), + l_comment = VALUES(l_comment) + """ diff --git a/retail_etl/plugins/stage_queries/nation.py b/retail_etl/plugins/stage_queries/nation.py new file mode 100644 index 0000000..ffc809b --- /dev/null +++ b/retail_etl/plugins/stage_queries/nation.py @@ -0,0 +1,17 @@ +def get_upsert_query(): + return """ + INSERT INTO nation( + n_nationkey, + n_name, + n_regionkey, + n_comment + ) + VALUES ( + %s, %s, %s, %s + ) + ON DUPLICATE KEY UPDATE + n_nationkey = VALUES(n_nationkey), + n_name = VALUES(n_name), + n_regionkey = VALUES(n_regionkey), + n_comment = VALUES(n_comment) + """ diff --git a/retail_etl/plugins/stage_queries/order.py b/retail_etl/plugins/stage_queries/order.py new file mode 100644 index 0000000..2f71aa2 --- /dev/null +++ b/retail_etl/plugins/stage_queries/order.py @@ -0,0 +1,27 @@ +def get_upsert_query(): + return """ + INSERT INTO `orders`( + o_orderkey, + o_custkey, + o_orderstatus, + o_totalprice, + o_orderdate, + o_orderpriority, + o_clerk, + o_shippriority, + o_comment + ) + VALUES ( + %s, %s, %s, %s, %s, %s, %s, %s, %s + ) + ON DUPLICATE KEY UPDATE + o_orderkey = VALUES(o_orderkey), + o_custkey = VALUES(o_custkey), + o_orderstatus = VALUES(o_orderstatus), + o_totalprice = VALUES(o_totalprice), + o_orderdate = VALUES(o_orderdate), + o_orderpriority = VALUES(o_orderpriority), + o_clerk = VALUES(o_clerk), + o_shippriority = VALUES(o_shippriority), + o_comment = VALUES(o_comment) + """ diff --git a/retail_etl/plugins/stage_queries/part.py b/retail_etl/plugins/stage_queries/part.py new file mode 100644 index 0000000..f55fff1 --- /dev/null +++ b/retail_etl/plugins/stage_queries/part.py @@ -0,0 +1,27 @@ +def get_upsert_query(): + return """ + INSERT INTO part( + p_partkey, + p_name, + p_mfgr, + p_brand, + p_type, + p_size, + p_container, + p_retailprice, + p_comment + ) + VALUES ( + %s, %s, %s, %s, %s, %s, %s, %s, %s + ) + ON DUPLICATE KEY UPDATE + p_partkey = VALUES(p_partkey), + p_name = VALUES(p_name), + p_mfgr = VALUES(p_mfgr), + p_brand = VALUES(p_brand), + p_type = VALUES(p_type), + p_size = VALUES(p_size), + p_container = VALUES(p_container), + p_retailprice = VALUES(p_retailprice), + p_comment = VALUES(p_comment) + """ diff --git a/retail_etl/plugins/stage_queries/partsupp.py b/retail_etl/plugins/stage_queries/partsupp.py new file mode 100644 index 0000000..596d1a8 --- /dev/null +++ b/retail_etl/plugins/stage_queries/partsupp.py @@ -0,0 +1,17 @@ +def get_upsert_query(): + return """ + INSERT INTO partsupp( + ps_partkey, + ps_suppkey, + ps_availqty, + ps_supplycost, + ps_comment + ) + VALUES ( + %s, %s, %s, %s, %s + ) + ON DUPLICATE KEY UPDATE + ps_availqty = VALUES(ps_availqty), + ps_supplycost = VALUES(ps_supplycost), + ps_comment = VALUES(ps_comment) + """ diff --git a/retail_etl/plugins/stage_queries/region.py b/retail_etl/plugins/stage_queries/region.py new file mode 100644 index 0000000..767d27d --- /dev/null +++ b/retail_etl/plugins/stage_queries/region.py @@ -0,0 +1,11 @@ +def get_upsert_query(): + return """ + INSERT INTO region( + r_regionkey, r_name, r_comment + ) + VALUES ( + %s, %s, %s + ) + ON DUPLICATE KEY UPDATE + r_regionkey = VALUES(r_regionkey), r_name = VALUES(r_name), r_comment = VALUES(r_comment) + """ diff --git a/retail_etl/plugins/stage_queries/supplier.py b/retail_etl/plugins/stage_queries/supplier.py new file mode 100644 index 0000000..c9bbcba --- /dev/null +++ b/retail_etl/plugins/stage_queries/supplier.py @@ -0,0 +1,23 @@ +def get_upsert_query(): + return """ + INSERT INTO supplier( + s_suppkey, + s_name, + s_address, + s_nationkey, + s_phone, + s_acctbal, + s_comment + ) + VALUES ( + %s, %s, %s, %s, %s, %s, %s + ) + ON DUPLICATE KEY UPDATE + s_suppkey = VALUES(s_suppkey), + s_name = VALUES(s_name), + s_address = VALUES(s_address), + s_nationkey = VALUES(s_nationkey), + s_phone = VALUES(s_phone), + s_acctbal = VALUES(s_acctbal), + s_comment = VALUES(s_comment) + """ diff --git a/services/airflow/.env b/services/airflow/.env new file mode 100644 index 0000000..a8d3767 --- /dev/null +++ b/services/airflow/.env @@ -0,0 +1,17 @@ +# Airflow's Common Environment Configs +AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION="true" +AIRFLOW__CORE__EXECUTOR=LocalExecutor +AIRFLOW__CORE__FERNET_KEY=hCRoPUYBO27QiEg1MRu5hSjLG7yNd8y8XKlm-8kRlkQ= +AIRFLOW__CORE__LOAD_DEFAULT_CONNECTIONS="false" +AIRFLOW__CORE__LOAD_EXAMPLES="false" +AIRFLOW__CORE__SQL_ALCHEMY_CONN=postgresql+psycopg2://airflow:airflow@airflow_db:5432/airflow +AIRFLOW__CORE__STORE_DAG_CODE="true" +AIRFLOW__CORE__STORE_SERIALIZED_DAGS="true" +AIRFLOW__LOGGING__LOGGING_LEVEL="INFO" +_PIP_ADDITIONAL_REQUIREMENTS="" + +# Airflow's Backend / Credential Configs (Feel free to modify) +_AIRFLOW_DB_UPGRADE="true" +_AIRFLOW_WWW_USER_CREATE="true" +_AIRFLOW_WWW_USER_USERNAME=airflow +_AIRFLOW_WWW_USER_PASSWORD=airflow \ No newline at end of file diff --git a/services/airflow/Dockerfile b/services/airflow/Dockerfile new file mode 100644 index 0000000..fa82558 --- /dev/null +++ b/services/airflow/Dockerfile @@ -0,0 +1,22 @@ +ARG AIRFLOW_BASE_IMAGE="apache/airflow:2.1.0-python3.6" +FROM ${AIRFLOW_BASE_IMAGE} + +USER root + +RUN apt update && \ + apt-get install -y openjdk-11-jdk && \ + apt-get install -y ant && \ + apt-get clean; \ + apt-get -y install default-libmysqlclient-dev + +ENV JAVA_HOME /usr/lib/jvm/java-11-openjdk-amd64/ +RUN export JAVA_HOME + +USER airflow + +RUN pip install --user --no-cache-dir \ + pandas==1.1.4 \ + apache-airflow-providers-postgres==2.0.0 \ + apache-airflow-providers-mysql==2.0.0 \ + mysql-connector-python==8.0.11 \ + mysqlclient==2.0.3 diff --git a/services/airflow_db/.env b/services/airflow_db/.env new file mode 100644 index 0000000..d3fb5b6 --- /dev/null +++ b/services/airflow_db/.env @@ -0,0 +1,3 @@ +POSTGRES_USER=airflow +POSTGRES_PASSWORD=airflow +POSTGRES_DB=airflow \ No newline at end of file diff --git a/services/mysql/.env b/services/mysql/.env new file mode 100644 index 0000000..096eb80 --- /dev/null +++ b/services/mysql/.env @@ -0,0 +1 @@ +MYSQL_ROOT_PASSWORD=password \ No newline at end of file diff --git a/services/mysql/Dockerfile b/services/mysql/Dockerfile new file mode 100644 index 0000000..b20eb1b --- /dev/null +++ b/services/mysql/Dockerfile @@ -0,0 +1,3 @@ +FROM mysql:8.0.26 + +ADD ./scripts/ddl.sql /docker-entrypoint-initdb.d \ No newline at end of file diff --git a/services/mysql/scripts/ddl.sql b/services/mysql/scripts/ddl.sql new file mode 100644 index 0000000..bf9ab9d --- /dev/null +++ b/services/mysql/scripts/ddl.sql @@ -0,0 +1,114 @@ +CREATE DATABASE IF NOT EXISTS retail; + +USE retail; + +CREATE TABLE IF NOT EXISTS region ( + r_regionkey INTEGER PRIMARY KEY NOT NULL, + r_name TEXT NOT NULL, + r_comment TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP +); + +CREATE TABLE IF NOT EXISTS nation ( + n_nationkey INTEGER PRIMARY KEY NOT NULL, + n_name TEXT NOT NULL, + n_regionkey INTEGER NOT NULL, + n_comment TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + FOREIGN KEY (n_regionkey) REFERENCES region(r_regionkey) +); + +CREATE TABLE IF NOT EXISTS part ( + p_partkey INTEGER PRIMARY KEY NOT NULL, + p_name VARCHAR(255) NOT NULL, + p_mfgr VARCHAR(255) NOT NULL, + p_brand VARCHAR(255) NOT NULL, + p_type VARCHAR(255) NOT NULL, + p_size INTEGER NOT NULL, + p_container VARCHAR(255) NOT NULL, + p_retailprice DECIMAL(7, 2) NOT NULL, + p_comment TEXT NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP +); + +CREATE TABLE IF NOT EXISTS supplier ( + s_suppkey INTEGER PRIMARY KEY NOT NULL, + s_name VARCHAR(255) NOT NULL, + s_address TEXT NOT NULL, + s_nationkey INTEGER NOT NULL, + s_phone VARCHAR(255) NOT NULL, + s_acctbal DECIMAL(10, 2) NOT NULL, + s_comment TEXT NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + FOREIGN KEY (s_nationkey) REFERENCES nation(n_nationkey) +); + +CREATE TABLE IF NOT EXISTS partsupp ( + ps_partkey INTEGER NOT NULL, + ps_suppkey INTEGER NOT NULL, + ps_availqty INTEGER NOT NULL, + ps_supplycost DECIMAL(10, 2) NOT NULL, + ps_comment TEXT NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (ps_partkey, ps_suppkey), + FOREIGN KEY (ps_suppkey) REFERENCES supplier(s_suppkey), + FOREIGN KEY (ps_partkey) REFERENCES part(p_partkey) +); + +CREATE TABLE IF NOT EXISTS customer ( + c_custkey INTEGER PRIMARY KEY NOT NULL, + c_name VARCHAR(255) NOT NULL, + c_address TEXT NOT NULL, + c_nationkey INTEGER NOT NULL, + c_phone VARCHAR(255) NOT NULL, + c_acctbal DECIMAL(7, 2) NOT NULL, + c_mktsegment VARCHAR(255) NOT NULL, + c_comment TEXT NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + FOREIGN KEY (c_nationkey) REFERENCES nation(n_nationkey) +); + +CREATE TABLE IF NOT EXISTS orders ( + o_orderkey INTEGER PRIMARY KEY NOT NULL, + o_custkey INTEGER NOT NULL, + o_orderstatus VARCHAR(16) NOT NULL, + o_totalprice DECIMAL(10, 2) NOT NULL, + o_orderdate DATE NOT NULL, + o_orderpriority VARCHAR(128) NOT NULL, + o_clerk VARCHAR(128) NOT NULL, + o_shippriority VARCHAR(128) NOT NULL, + o_comment TEXT NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + FOREIGN KEY (o_custkey) REFERENCES customer(c_custkey) +); + +CREATE TABLE IF NOT EXISTS lineitem ( + l_orderkey INTEGER NOT NULL, + l_partkey INTEGER NOT NULL, + l_suppkey INTEGER NOT NULL, + l_linenumber INTEGER NOT NULL, + l_quantity INTEGER NOT NULL, + l_extendedprice DECIMAL(10, 2) NOT NULL, + l_discount DECIMAL(5, 2) NOT NULL, + l_tax DECIMAL(5, 2) NOT NULL, + l_returnflag VARCHAR(128) NOT NULL, + l_linestatus VARCHAR(128) NOT NULL, + l_shipdate DATE NOT NULL, + l_commitdate DATE NOT NULL, + l_receiptdate DATE NOT NULL, + l_shipinstruct VARCHAR(255) NOT NULL, + l_shipmode VARCHAR(255) NOT NULL, + l_comment TEXT NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + PRIMARY KEY (l_orderkey, l_linenumber), + FOREIGN KEY (l_orderkey) REFERENCES `orders`(o_orderkey), + FOREIGN KEY (l_partkey, l_suppkey) REFERENCES partsupp(ps_partkey, ps_suppkey) +); diff --git a/services/postgres_dw/.env b/services/postgres_dw/.env new file mode 100644 index 0000000..7cf8d05 --- /dev/null +++ b/services/postgres_dw/.env @@ -0,0 +1,3 @@ +POSTGRES_USER=user +POSTGRES_PASSWORD=password +POSTGRES_DB=retail_etl_dw \ No newline at end of file diff --git a/services/postgres_dw/Dockerfile b/services/postgres_dw/Dockerfile new file mode 100644 index 0000000..d5305c5 --- /dev/null +++ b/services/postgres_dw/Dockerfile @@ -0,0 +1,3 @@ +FROM postgres:13-alpine + +ADD ./scripts/table_ddl.sql /docker-entrypoint-initdb.d \ No newline at end of file diff --git a/services/postgres_dw/scripts/table_ddl.sql b/services/postgres_dw/scripts/table_ddl.sql new file mode 100644 index 0000000..4cf73b6 --- /dev/null +++ b/services/postgres_dw/scripts/table_ddl.sql @@ -0,0 +1,136 @@ +\c retail_etl_dw; + + +/***********************************************/ +/************* TABLE DEFINITIONS *************/ +/***********************************************/ +CREATE TABLE IF NOT EXISTS dim_part ( + p_partkey SERIAL PRIMARY KEY NOT NULL, + p_id INTEGER NOT NULL, + p_name VARCHAR(255) NOT NULL, + p_mfgr VARCHAR(255) NOT NULL, + p_brand VARCHAR(255) NOT NULL, + p_type VARCHAR(255) NOT NULL, + p_size INTEGER NOT NULL, + p_container VARCHAR(255) NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + CONSTRAINT dim_part_secondary_key UNIQUE (p_id) +); + +CREATE TABLE IF NOT EXISTS dim_supplier ( + s_suppkey SERIAL PRIMARY KEY NOT NULL, + s_id INTEGER NOT NULL, + s_name VARCHAR(255) NOT NULL, + s_address TEXT NOT NULL, + s_nation TEXT NOT NULL, + s_region TEXT NOT NULL, + s_phone VARCHAR(255) NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + CONSTRAINT dim_supplier_secondary_key UNIQUE (s_id) +); + +CREATE TABLE IF NOT EXISTS dim_customer ( + c_custkey SERIAL PRIMARY KEY NOT NULL, + c_id INTEGER NOT NULL, + c_name VARCHAR(255) NOT NULL, + c_address TEXT NOT NULL, + c_nation TEXT NOT NULL, + c_region TEXT NOT NULL, + c_phone VARCHAR(255) NOT NULL, + c_mktsegment VARCHAR(255) NOT NULL, + c_cluster VARCHAR(255) NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + CONSTRAINT dim_customer_secondary_key UNIQUE (c_id) +); + +CREATE TABLE IF NOT EXISTS dim_date ( + d_datekey SERIAL PRIMARY KEY NOT NULL, + d_id INTEGER NOT NULL, + d_date DATE NOT NULL, + d_dayofweek VARCHAR(255) NOT NULL, + d_month INTEGER NOT NULL, + d_year INTEGER NOT NULL, + d_monthname VARCHAR(255) NOT NULL, + d_yearweek INTEGER NOT NULL, + d_yearmonth VARCHAR(255) NOT NULL, + d_quarter VARCHAR(255) NOT NULL, + d_yearquarter VARCHAR(255) NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + CONSTRAINT dim_date_seconday_key UNIQUE (d_id) +); + +CREATE TABLE IF NOT EXISTS fact_lineitem ( + l_linenumber INTEGER NOT NULL, + l_orderkey INTEGER NOT NULL, + l_partkey INTEGER NOT NULL, + l_suppkey INTEGER NOT NULL, + l_custkey INTEGER NOT NULL, + l_orderdatekey INTEGER NOT NULL, + l_commitdatekey INTEGER NOT NULL, + l_receiptdatekey INTEGER NOT NULL, + l_shipmode VARCHAR(255), + l_quantity INTEGER NOT NULL, + l_extendedprice DECIMAL(10, 2) NOT NULL, + l_discount DECIMAL(5, 2) NOT NULL, + l_revenue DECIMAL(10, 2) NOT NULL, + l_tax DECIMAL(5, 2) NOT NULL, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + PRIMARY KEY (l_partkey, l_suppkey, l_custkey, l_orderdatekey, l_commitdatekey, l_receiptdatekey), + FOREIGN KEY (l_partkey) REFERENCES dim_part(p_partkey), + FOREIGN KEY (l_custkey) REFERENCES dim_customer(c_custkey), + FOREIGN KEY (l_orderdatekey) REFERENCES dim_date(d_datekey), + FOREIGN KEY (l_commitdatekey) REFERENCES dim_date(d_datekey), + FOREIGN KEY (l_receiptdatekey) REFERENCES dim_date(d_datekey), + CONSTRAINT fact_lineitem_idx UNIQUE (l_orderkey, l_linenumber) +); + + +/**************************************************/ +/************* FUNCTION DEFINITIONS *************/ +/**************************************************/ +CREATE OR REPLACE FUNCTION update_updated_at_column() +RETURNS TRIGGER AS $$ +BEGIN + NEW.updated_at = now(); + RETURN NEW; +END; +$$ language 'plpgsql'; + + +/**************************************************/ +/************** TRIGGER DEFINITIONS *************/ +/**************************************************/ +CREATE TRIGGER update_dim_date_updated_at + BEFORE UPDATE + ON dim_date + FOR EACH ROW + EXECUTE PROCEDURE update_updated_at_column(); + +CREATE TRIGGER update_dim_part_updated_at + BEFORE UPDATE + ON dim_part + FOR EACH ROW + EXECUTE PROCEDURE update_updated_at_column(); + +CREATE TRIGGER update_dim_supplier_updated_at + BEFORE UPDATE + ON dim_supplier + FOR EACH ROW + EXECUTE PROCEDURE update_updated_at_column(); + +CREATE TRIGGER update_dim_customer_updated_at + BEFORE UPDATE + ON dim_customer + FOR EACH ROW + EXECUTE PROCEDURE update_updated_at_column(); + +CREATE TRIGGER update_fact_lineitem_updated_at + BEFORE UPDATE + ON fact_lineitem + FOR EACH ROW + EXECUTE PROCEDURE update_updated_at_column(); diff --git a/services/redash_analytics/scripts/dashboard_queries.sql b/services/redash_analytics/scripts/dashboard_queries.sql new file mode 100644 index 0000000..18b8d18 --- /dev/null +++ b/services/redash_analytics/scripts/dashboard_queries.sql @@ -0,0 +1,125 @@ +------------------------------------------------- +-- TOP 5 NATIONS BY REVENUE -- +------------------------------------------------- +SELECT + c.c_nation, + ROUND(AVG(l.l_revenue), 2) AS avg_revenue +FROM fact_lineitem AS l +JOIN dim_customer AS c ON c.c_custkey = l.l_custkey +GROUP BY c.c_nation +ORDER BY avg_revenue DESC +LIMIT 5; +------------------------------------------------- + + + +------------------------------------------------- +-- MOST USED SHIPPING MODE AMONG TOP 5 NATIONS -- +------------------------------------------------- +WITH top_5_nation AS ( + SELECT + c.c_nation, + ROUND(AVG(l.l_revenue), 2) AS avg_revenue + FROM fact_lineitem AS l + JOIN dim_customer AS c ON c.c_custkey = l.l_custkey + GROUP BY c.c_nation + ORDER BY avg_revenue DESC + LIMIT 5 +) +SELECT + l.l_shipmode, + COUNT(1) AS frequency +FROM fact_lineitem AS l +JOIN dim_customer c ON c.c_custkey = l.l_custkey +WHERE c.c_nation IN (SELECT c_nation FROM top_5_nation) +GROUP BY l.l_shipmode +LIMIT 3; +------------------------------------------------- + + + +------------------------------------------------- +-- TOP 5 SELLING MONTHS -- +------------------------------------------------- +SELECT + d.d_monthname, + ROUND(AVG(l.l_revenue), 2) AS avg_revenue +FROM fact_lineitem AS l +JOIN dim_date AS d ON d.d_datekey = l.l_orderdatekey +GROUP BY d_monthname +ORDER BY avg_revenue DESC +LIMIT 5; +------------------------------------------------- + + + +------------------------------------------------- +-- TOP 5 CUSTOMERS (REVENUE) -- +------------------------------------------------- +SELECT + c.c_custkey, + c.c_name AS customer_name, + ROUND(AVG(l.l_revenue), 2) AS avg_revenue +FROM fact_lineitem AS l +JOIN dim_customer AS c ON c.c_custkey = l.l_custkey +GROUP BY c.c_custkey +ORDER BY avg_revenue DESC +LIMIT 5; +------------------------------------------------- + + + + +------------------------------------------------- +-- TOP 5 CUSTOMERS (QUANTITY) -- +------------------------------------------------- +SELECT + c.c_custkey, + c.c_name AS customer_name, + ROUND(AVG(l.l_quantity), 2) AS avg_quantity +FROM fact_lineitem AS l +JOIN dim_customer AS c ON c.c_custkey = l.l_custkey +GROUP BY c.c_custkey +ORDER BY avg_quantity DESC +LIMIT 5; +------------------------------------------------- + + + +------------------------------------------------- +-- PREVIOUS VS. CURRENT PERIOD REVENUE -- +------------------------------------------------- +SELECT + CASE + WHEN current_period.d_yearmonth IS NULL + THEN 'N/A' + ELSE current_period.d_yearmonth + END AS current_period_d_yearmonth, + CASE + WHEN current_period.avg_revenue IS NULL + THEN 0 + ELSE current_period.avg_revenue + END AS current_period_avg_revenue, + prev_period.d_yearmonth AS prev_period_d_yearmonth, + prev_period.avg_revenue AS prev_period_avg_revenue +FROM ( + SELECT + ROUND(AVG(l_revenue), 2) AS avg_revenue, + d.d_month, + d_yearmonth + FROM fact_lineitem AS l + JOIN dim_date AS d ON d.d_datekey = l.l_receiptdatekey + WHERE d_year = 1997 + GROUP BY d_month, d_yearmonth +) AS prev_period +LEFT JOIN ( + SELECT + ROUND(AVG(l_revenue), 2) AS avg_revenue, + d.d_month, + d.d_yearmonth + FROM fact_lineitem AS l + JOIN dim_date AS d ON d.d_datekey = l.l_receiptdatekey + WHERE d_year = 1998 + GROUP BY d_month, d_yearmonth +) AS current_period ON current_period.d_month = prev_period.d_month +ORDER BY current_period_d_yearmonth, prev_period_d_yearmonth; \ No newline at end of file diff --git a/test-instructions.md b/test-instructions.md new file mode 100644 index 0000000..d46218c --- /dev/null +++ b/test-instructions.md @@ -0,0 +1,109 @@ +# Data Engineer Interview Test + +We are looking for a high quality data engineer which can deliver comprehensive solutions for our continuity and business growth. + +The Analytics team drives the data culture, we want to change how we produce data from large batches to micro batching, from daily to near real-time/streaming processing, from tabular reports to insightful dashboards. + +You can be part of an amazing team which deals with data all the time using different process, tools and technologies. + +Following is a little treasure and challenge for those keen on joining this amazing company and team. + +## Junior/Mid +For a Junior/Mid role we are expecting at least 2-3 tables to be loaded and an aggregated report done. + +## Senior +We are expecting the most from you. + + +# The Project +Build a small ETL process to digest a few set of files into a data warehouse like project. + +We are expecting an end-to-end ETL solution to deliver a simple star schema which an end user can easily slice and dice the data through a report or using basic ad-hoc query. + +### Tools and Technologies +We are a Python and SQL workshop, we would like to see this project using just those tools. + +However, we are open to other tools and technologies if we are able to easily replicate on our side. + +For the database, use a simple and light optimizer for your database, choose the one which can run a browser, but don't be limited to it. + +Please, avoid licensed products, we may not be able to proceed with this restriction on our own, if this is the case you may need to book a meeting to bring your tool and demo to us. + +How to do it? +----------------------- +Fork this repo, build your ETL process and commit the code with your answers. Open a Pull Request and send us a message highlighting the test is completed. + +#### Rules +* it must come with step by step instructions to run the code. +* please, be mindful that your code might be moved or deleted after we analyse the PR. +* use the best practices +* be able to explain from the ground up the whole process on face to face interview + +The small ETL project +--------- + +1. The data for this exercise can be found on the `data.zip` file. Can you describe the file format? + +**Super Bonus**: generate your own data through the instructions on the encoded file `bonus_etl_data_gen.txt`. +To get the bonus points, please encoded the file with the instructions were used to generate the files. + +2. Code you scripts to load the data into a database. + +3. Design a star schema model which the data should flow. + +4. Build your process to load the data into the star schema + +**Bonus** point: +- add a fields to classify the customer account balance in 3 groups +- add revenue per line item +- convert the dates to be distributed over the last 2 years + +5. How to schedule this process to run multiple times per day? + +**Bonus**: What to do if the data arrives in random order and times via streaming? + +6. How to deploy this code? + +**Bonus**: Can you make it to run on a container like process (Docker)? + +Data Reporting +------- +One of the most important aspects to build a DWH is to deliver insights to end-users. + +Can you using the designed star schema (or if you prefer the raw data), generate SQL statements to answer the following questions: + +1. What are the top 5 nations in terms of revenue? + +2. From the top 5 nations, what is the most common shipping mode? + +3. What are the top selling months? + +4. Who are the top customer in terms of revenue and/or quantity? + +5. Compare the sales revenue of on current period against previous period? + + +Data profilling +---- +Data profiling are bonus. + +What tools or techniques you would use to profile the data? + +What results of the data profiling can impact on your analysis and design? + + + +Architecture +----- +If this pipeline is to be build for a real live environment. +What would be your recommendations in terms of tools and process? + +Would be a problem if the data from the source system is growing at 6.1-12.7% rate a month? + + + +ERD +-- +![alt text](erd.png "ERD") + +Author: adilsonmendonca diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/retail_etl/__init__.py b/tests/retail_etl/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/retail_etl/dags/__init__.py b/tests/retail_etl/dags/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/retail_etl/dags/dag_integrity_test.py b/tests/retail_etl/dags/dag_integrity_test.py new file mode 100644 index 0000000..8b1f77d --- /dev/null +++ b/tests/retail_etl/dags/dag_integrity_test.py @@ -0,0 +1,42 @@ +import sys + +from pathlib import Path + +import pytest +from airflow.models import DAG +from airflow.utils.dag_cycle_tester import test_cycle as _test_cycle + +from tests.retail_etl.dags.helper import dag_files, import_dag_file + +# NOTE: This is required to replicate how airflow manage its modules under the plugins folder. +sys.path.append(f"{Path(__file__).parent.parent.parent.parent}/retail_etl/plugins") + + +@pytest.mark.parametrize("dag_file", dag_files) +class DagIntegrityTest: + @staticmethod + def test_dag_cycle(dag_file: str): + + module = import_dag_file(dag_file=dag_file) + dag_objects = [var for var in vars(module).values() if isinstance(var, DAG)] + + for dag in dag_objects: + # Test cycles + _test_cycle(dag=dag) + + assert dag_objects + + @staticmethod + def test_dag_default_configs(dag_file: str): + module = import_dag_file(dag_file=dag_file) + dag_objects = [var for var in vars(module).values() if isinstance(var, DAG)] + + assert len(dag_objects) == 1 + + for dag in dag_objects: + emails = dag.default_args.get("email", []) + num_retries = dag.default_args.get("retries", None) + retry_delay_sec = dag.default_args.get("retry_delay", None) + assert emails == ["dmc.markr@gmail.com"] + assert num_retries is not None + assert retry_delay_sec is not None diff --git a/tests/retail_etl/dags/helper.py b/tests/retail_etl/dags/helper.py new file mode 100644 index 0000000..8a6b1a5 --- /dev/null +++ b/tests/retail_etl/dags/helper.py @@ -0,0 +1,81 @@ +import importlib.util +import os +from pathlib import Path +from types import ModuleType +from typing import List, Dict + +_dag_path = Path(__file__).parent.parent.parent.parent / "retail_etl" / "dags" +dag_files = list(_dag_path.glob("**/retail_dag.py")) + + +def get_retail_dag_task_hierarchy() -> List[Dict]: + return [ + { + "task": "begin_execution", + "expected_upstream": [], + "expected_downstream": ["region_tbl_to_staging_db"], + }, + { + "task": "region_tbl_to_staging_db", + "expected_upstream": ["begin_execution"], + "expected_downstream": ["nation_tbl_to_staging_db"], + }, + { + "task": "nation_tbl_to_staging_db", + "expected_upstream": ["region_tbl_to_staging_db"], + "expected_downstream": [ + "customer_tbl_to_staging_db", + "supplier_tbl_to_staging_db", + "part_tbl_to_staging_db", + ], + }, + { + "task": "customer_tbl_to_staging_db", + "expected_upstream": ["nation_tbl_to_staging_db"], + "expected_downstream": ["orders_tbl_to_staging_db"], + }, + { + "task": "supplier_tbl_to_staging_db", + "expected_upstream": ["nation_tbl_to_staging_db"], + "expected_downstream": ["partsupp_tbl_to_staging_db"], + }, + { + "task": "part_tbl_to_staging_db", + "expected_upstream": ["nation_tbl_to_staging_db"], + "expected_downstream": ["partsupp_tbl_to_staging_db"], + }, + { + "task": "partsupp_tbl_to_staging_db", + "expected_upstream": ["supplier_tbl_to_staging_db", "part_tbl_to_staging_db"], + "expected_downstream": ["lineitem_tbl_to_staging_db"], + }, + { + "task": "orders_tbl_to_staging_db", + "expected_upstream": ["customer_tbl_to_staging_db"], + "expected_downstream": ["lineitem_tbl_to_staging_db"], + }, + { + "task": "lineitem_tbl_to_staging_db", + "expected_upstream": ["orders_tbl_to_staging_db", "partsupp_tbl_to_staging_db"], + "expected_downstream": [ + "dim_part_to_postgres_dw", + "dim_supplier_to_postgres_dw", + "dim_customer_to_postgres_dw", + "dim_date_to_postgres_dw", + ], + }, + { + "task": "end_execution", + "expected_upstream": ["fact_lineitem_to_postgres_dw"], + "expected_downstream": [], + }, + ] + + +def import_dag_file(dag_file: str) -> ModuleType: + module_name, _ = os.path.splitext(dag_file) + module_path = os.path.join(_dag_path, dag_file) + mod_spec = importlib.util.spec_from_file_location(module_name, module_path) + module = importlib.util.module_from_spec(mod_spec) + mod_spec.loader.exec_module(module) + return module diff --git a/tests/retail_etl/dags/retail_dag_tasks_definition_test.py b/tests/retail_etl/dags/retail_dag_tasks_definition_test.py new file mode 100644 index 0000000..15ad1d9 --- /dev/null +++ b/tests/retail_etl/dags/retail_dag_tasks_definition_test.py @@ -0,0 +1,78 @@ +from collections import Counter +from typing import Dict + +import pytest +from airflow.models import DAG + +from tests.retail_etl.dags.helper import dag_files, import_dag_file, get_retail_dag_task_hierarchy + + +class RetailDagTaskDefTest: + EXPECTED_TASKS_COUNT = 15 + DAG_ID = "retail_dag" + EXPECTED_TASKS = [ + "begin_execution", + "region_tbl_to_staging_db", + "nation_tbl_to_staging_db", + "part_tbl_to_staging_db", + "supplier_tbl_to_staging_db", + "partsupp_tbl_to_staging_db", + "customer_tbl_to_staging_db", + "orders_tbl_to_staging_db", + "lineitem_tbl_to_staging_db", + "dim_part_to_postgres_dw", + "dim_supplier_to_postgres_dw", + "dim_customer_to_postgres_dw", + "dim_date_to_postgres_dw", + "fact_lineitem_to_postgres_dw", + "end_execution", + ] + + @staticmethod + def test_dag_task_count_is_correct(retail_dag: DAG): + tasks_count = len(retail_dag.tasks) + msg = f"Wrong number of tasks, got {tasks_count}" + assert tasks_count == RetailDagTaskDefTest.EXPECTED_TASKS_COUNT, msg + + @staticmethod + def test_dag_contains_valid_tasks(retail_dag: DAG): + task_ids = list(map(lambda task: task.task_id, retail_dag.tasks)) + assert RetailDagTaskDefTest._compare_tasks(task_ids, RetailDagTaskDefTest.EXPECTED_TASKS) + + @pytest.mark.parametrize("task", get_retail_dag_task_hierarchy()) + def test_dependencies_of_tasks(self, retail_dag: DAG, task: Dict): + expected_upstream = task.get("expected_upstream") + expected_downstream = task.get("expected_downstream") + task_name = task.get("task") + dag_task = retail_dag.get_task(task_name) + + upstream_msg = f"The task {task} doesn't have the expected " "upstream dependencies." + downstream_msg = f"The task {task} doesn't have the expected " "downstream dependencies." + assert self._compare_tasks(task_a=dag_task.upstream_task_ids, task_b=expected_upstream), upstream_msg + assert self._compare_tasks( + task_a=dag_task.downstream_task_ids, task_b=expected_downstream + ), downstream_msg + + @staticmethod + def test_dag_will_not_perform_catchup(retail_dag: DAG): + catchup = retail_dag.catchup + assert not catchup + + @staticmethod + def test_dag_have_same_start_date_for_each_tasks(retail_dag: DAG): + tasks = retail_dag.tasks + start_dates = list(map(lambda task: task.start_date, tasks)) + assert len(set(start_dates)) == 1 + + @classmethod + @pytest.fixture(scope="class") + def retail_dag(cls): + for dag_file in dag_files: + module = import_dag_file(dag_file=dag_file) + dag_objects = [var for var in vars(module).values() if isinstance(var, DAG)] + dag = list(filter(lambda _dag: _dag.dag_id == cls.DAG_ID, dag_objects))[0] + return dag + + @staticmethod + def _compare_tasks(task_a, task_b): + return Counter(task_a) == Counter(task_b) diff --git a/tests/retail_etl/plugins/__init__.py b/tests/retail_etl/plugins/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/retail_etl/plugins/custom_operators/__init__.py b/tests/retail_etl/plugins/custom_operators/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/retail_etl/plugins/custom_operators/postgres_dw_operator/__init__.py b/tests/retail_etl/plugins/custom_operators/postgres_dw_operator/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/retail_etl/plugins/custom_operators/postgres_dw_operator/postgres_dw_operator_test.py b/tests/retail_etl/plugins/custom_operators/postgres_dw_operator/postgres_dw_operator_test.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/retail_etl/plugins/custom_operators/tbl_to_staging/__init__.py b/tests/retail_etl/plugins/custom_operators/tbl_to_staging/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/retail_etl/plugins/custom_operators/tbl_to_staging/helper.py b/tests/retail_etl/plugins/custom_operators/tbl_to_staging/helper.py new file mode 100644 index 0000000..9c2419c --- /dev/null +++ b/tests/retail_etl/plugins/custom_operators/tbl_to_staging/helper.py @@ -0,0 +1,60 @@ +import datetime + +from pathlib import Path + +from airflow.models import DAG + +from retail_etl.plugins.custom_operators.tbl_to_staging.tbl_to_staging import TblToStageOperator +from retail_etl.plugins.stage_queries import region as region_tbl_queries + + +def test_dag(): + return DAG( + dag_id="test_dag", + default_args={"owner": "makr.dev", "start_date": datetime.datetime(2015, 1, 1)}, + schedule_interval="@daily", + ) + + +def get_valid_region_tbl_to_staging_db(use_valid_query: bool = True) -> TblToStageOperator: + upsert_query = region_tbl_queries.get_upsert_query() if use_valid_query else "INSERT INTO ... ..." + return TblToStageOperator( + task_id="region_tbl_to_staging_db", + pandas_read_args={ + # TODO: Put in Airflow Vars. + "filepath_or_buffer": Path(__file__).parent / "test_data" / "region.tbl", + "chunksize": 10000, + "sep": "|", + "iterator": True, + "table_name": "region", + }, + data_load_args={ + "mysql_conn_id": "mysql", + "table_name": "region", + "upsert_query": upsert_query, + "logger_name": "airflow.task", + }, + dag=test_dag(), + ) + + +def get_invalid_region_tbl_to_stage_operator() -> TblToStageOperator: + upsert_query = region_tbl_queries.get_upsert_query() + return TblToStageOperator( + task_id="region_tbl_to_staging_db", + pandas_read_args={ + # TODO: Put in Airflow Vars. + "filepath_or_buffer": Path(__file__).parent / "test_data" / "invalid_format_region.tbl", + "chunksize": 10000, + "sep": "|", + "iterator": True, + "table_name": "region", + }, + data_load_args={ + "mysql_conn_id": "mysql", + "table_name": "region", + "upsert_query": upsert_query, + "logger_name": "airflow.task", + }, + dag=test_dag(), + ) diff --git a/tests/retail_etl/plugins/custom_operators/tbl_to_staging/tbl_to_staging_test.py b/tests/retail_etl/plugins/custom_operators/tbl_to_staging/tbl_to_staging_test.py new file mode 100644 index 0000000..4dc3fc6 --- /dev/null +++ b/tests/retail_etl/plugins/custom_operators/tbl_to_staging/tbl_to_staging_test.py @@ -0,0 +1,127 @@ +import datetime +import logging +import os +from collections import namedtuple + +from typing import Callable, Any + +import MySQLdb +import pytest +from _pytest.logging import LogCaptureFixture +from MySQLdb._exceptions import ProgrammingError +from pytest_docker_tools import container, fetch +from pytest_docker_tools.wrappers import Container +from pytest_mock import MockFixture +from airflow.providers.mysql.hooks.mysql import MySqlHook + +from tests.retail_etl.plugins.custom_operators.tbl_to_staging import helper + + +mysql_image = fetch(repository="mysql:latest") +mysql_container = container( + image="{mysql_image.id}", + environment={ + "MYSQL_DATABASE": "retail_test", + "MYSQL_USER": "{mysql_credentials.username}", + "MYSQL_PASSWORD": "{mysql_credentials.password}", + "MYSQL_ROOT_PASSWORD": "{mysql_credentials.password}", + }, + ports={"3306/tcp": None}, + volumes={ + os.path.join(os.path.dirname(__file__), "test_data/ddl_test.sql"): { + "bind": "/docker-entrypoint-initdb.d/ddl_test.sql" + } + }, +) + + +@pytest.fixture(scope="module") +def mysql_credentials(): + MySQLCredentials = namedtuple("MySQLCredentials", ["username", "password"]) + return MySQLCredentials("testuser", "testpass") + + +@pytest.fixture +def mysql_connection(mysql_credentials: Any, mysql_container: Container) -> MySQLdb.Connection: + yield lambda: MySQLdb.connect( + user=mysql_credentials.username, + password=mysql_credentials.password, + host="127.0.0.1", + database="retail_test", + port=mysql_container.ports["3306/tcp"][0], + ) + + +class TblToStagingTest: + @staticmethod + def test_operator_can_read_data_and_load_to_mysql(mocker: MockFixture, mysql_connection: Callable): + # GIVEN + connection = mysql_connection() + mocker.patch.object(target=MySqlHook, attribute="get_conn", return_value=connection) + task = helper.get_valid_region_tbl_to_staging_db() + + # WHEN + task.execute(context={"execution_date": datetime.datetime.now().isoformat()}) + + # THEN + connection = mysql_connection() + cursor = connection.cursor() + cursor.execute("SELECT COUNT(*) FROM region;") + result = cursor.fetchone()[0] + assert result == 5 + + @staticmethod + def test_operator_is_resilient_to_invalid_data_format( + mocker: MockFixture, caplog: LogCaptureFixture, mysql_connection: Callable + ): + # GIVEN + connection = mysql_connection() + mocker.patch.object(target=MySqlHook, attribute="get_conn", return_value=connection) + task = helper.get_invalid_region_tbl_to_stage_operator() + + # WHEN + logging.getLogger("airflow.task").propagate = True + with caplog.at_level(logging.ERROR): + task.execute(context={"execution_date": datetime.datetime.now().isoformat()}) + # THEN + assert caplog.records[0].message == "The TblToStageOperator process has failed" + assert isinstance(caplog.records[0].exc_info[1], ProgrammingError) + assert str(caplog.records[0].exc_info[1]) == "nan can not be used with MySQL" + + @staticmethod + def test_operator_is_resilient_to_invalid_sql_query( + mocker: MockFixture, caplog: LogCaptureFixture, mysql_connection: Callable + ): + # GIVEN + connection = mysql_connection() + mocker.patch.object(target=MySqlHook, attribute="get_conn", return_value=connection) + task = helper.get_valid_region_tbl_to_staging_db(use_valid_query=False) + + # WHEN + logging.getLogger("airflow.task").propagate = True + with caplog.at_level(logging.ERROR): + task.execute(context={"execution_date": datetime.datetime.now().isoformat()}) + # THEN + assert caplog.records[0].message == "The TblToStageOperator process has failed" + assert isinstance(caplog.records[0].exc_info[1], ProgrammingError) + assert str(caplog.records[0].exc_info[1]) == "not all arguments converted during bytes formatting" + + @staticmethod + def test_operator_is_idempotent(mocker: MockFixture, mysql_connection: Callable): + # GIVEN + connection = mysql_connection() + mocker.patch.object(target=MySqlHook, attribute="get_conn", return_value=connection) + task = helper.get_valid_region_tbl_to_staging_db() + + # WHEN + task.execute(context={"execution_date": datetime.datetime.now().isoformat()}) + connection = mysql_connection() + mocker.patch.object(target=MySqlHook, attribute="get_conn", return_value=connection) + task.execute(context={"execution_date": datetime.datetime.now().isoformat()}) + + # THEN + connection = mysql_connection() + cursor = connection.cursor() + cursor.execute("SELECT COUNT(*) FROM region;") + result = cursor.fetchone()[0] + assert result == 5 diff --git a/tests/retail_etl/plugins/custom_operators/tbl_to_staging/test_data/ddl_test.sql b/tests/retail_etl/plugins/custom_operators/tbl_to_staging/test_data/ddl_test.sql new file mode 100644 index 0000000..9bb921c --- /dev/null +++ b/tests/retail_etl/plugins/custom_operators/tbl_to_staging/test_data/ddl_test.sql @@ -0,0 +1,11 @@ +CREATE DATABASE IF NOT EXISTS retail_test; + +USE retail_test; + +CREATE TABLE IF NOT EXISTS region ( + r_regionkey INTEGER PRIMARY KEY NOT NULL, + r_name TEXT NOT NULL, + r_comment TEXT, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP +); diff --git a/tests/retail_etl/plugins/custom_operators/tbl_to_staging/test_data/invalid_format_region.tbl b/tests/retail_etl/plugins/custom_operators/tbl_to_staging/test_data/invalid_format_region.tbl new file mode 100644 index 0000000..7b438bc --- /dev/null +++ b/tests/retail_etl/plugins/custom_operators/tbl_to_staging/test_data/invalid_format_region.tbl @@ -0,0 +1,5 @@ +0|AFRICA| +1|AMERICA| +2|ASIA| +3|EUROPE| +4|MIDDLE EAST| diff --git a/tests/retail_etl/plugins/custom_operators/tbl_to_staging/test_data/region.tbl b/tests/retail_etl/plugins/custom_operators/tbl_to_staging/test_data/region.tbl new file mode 100644 index 0000000..c5ebb63 --- /dev/null +++ b/tests/retail_etl/plugins/custom_operators/tbl_to_staging/test_data/region.tbl @@ -0,0 +1,5 @@ +0|AFRICA|lar deposits. blithely final packages cajole. regular waters are final requests. regular accounts are according to | +1|AMERICA|hs use ironic, even requests. s| +2|ASIA|ges. thinly even pinto beans ca| +3|EUROPE|ly final courts cajole furiously final excuse| +4|MIDDLE EAST|uickly special accounts cajole carefully blithely close requests. carefully final asymptotes haggle furiousl| diff --git a/tox.ini b/tox.ini new file mode 100644 index 0000000..7dbff01 --- /dev/null +++ b/tox.ini @@ -0,0 +1,39 @@ +[flake8] +exclude= + # Python cache + *.pyc, + __pycache__, + + # Virtual Environment + venv, + .venv, + + # Local Config Variables + conf, + + # Static Data Local Storage + data, + + logs + + # Code coverage + .coverage, + htmlcov, + + # Git + .gitignore, + README.md, + LICENSE, + + # Requirements + requirements.txt, + + # Other + Dockerfile, + Makefile + +filename=*.py +max-line-length=110 +statistics=True +inline-quotes=" +multiline-quotes="