Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release/0.9 #97

Merged
merged 33 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
3deae1a
feature: add support for Spark Connect (#63)
mikita-sakalouski Oct 29, 2024
90e6462
refactor: change private attr and step getter (#82)
mikita-sakalouski Oct 30, 2024
b37a302
90-Bug-fix-file-encoding-box-integration (#96)
louis-paulvlx Nov 8, 2024
6d6ccbd
added documentation
dannymeijer Nov 8, 2024
a0aa8a2
fix for DeltaMergeBuilder, when the instance doesn't check out (#100)
dannymeijer Nov 11, 2024
6ca9f30
Bump version to rc1
dannymeijer Nov 11, 2024
76207e7
Fix/98 sparkexpectations bump version to 220 (#99)
dannymeijer Nov 11, 2024
1e645d8
quick fix
dannymeijer Nov 12, 2024
abb435b
quick fix #2
dannymeijer Nov 12, 2024
fb54aef
[FIX] Accidental duplication of logs (#105)
dannymeijer Nov 18, 2024
93f413e
version bump
dannymeijer Nov 18, 2024
5146f59
fix: adjust branch fetching (#106)
mikita-sakalouski Nov 18, 2024
9f32fcc
[FIX] broken import statements and updated hello-world.md (#107)
dannymeijer Nov 19, 2024
047506a
fix: test github (#109)
mikita-sakalouski Nov 21, 2024
79f32aa
[Fix] Add overwrite functionality to the BoxFileWriterClass (#103)
ToneVDB Nov 21, 2024
4596410
[FEATURE] Enable adding options to DeltaReader both streaming and wri…
mikita-sakalouski Nov 22, 2024
ac95f2d
chore: bump version
mikita-sakalouski Nov 22, 2024
602866b
[feature] Add support for HyperProcess parameters (#112)
maxim-mityutko Nov 22, 2024
c34abbe
[HOTFIX] Remove duplicated implementation (#116)
mikita-sakalouski Nov 24, 2024
7fb1b27
[FEATURE] Populate account from url if not provided in SnowflakeBaseM…
mikita-sakalouski Nov 24, 2024
b9c0299
hotfix: check url alias - sfURL
mikita-sakalouski Nov 24, 2024
7fe5920
test: add test for account population from sfURL in SnowflakeRunQuery…
mikita-sakalouski Nov 24, 2024
7c00d7f
chore: bump version to 0.9.0rc5
mikita-sakalouski Nov 24, 2024
6faa20d
refactor: replace RunQuery with SnowflakeRunQueryPython (#121)
mikita-sakalouski Nov 25, 2024
9496eb5
hotfix: snowflake python connector default config dir (#125)
mikita-sakalouski Nov 25, 2024
ea2d15e
version bump
dannymeijer Nov 26, 2024
c72f381
Fix/delta merge builder instance check for connect + util fix (#130)
dannymeijer Nov 26, 2024
a085947
Release/0.9 - final version bump and docs (#132)
dannymeijer Nov 27, 2024
50f1e69
[FEATURE] Make Transformations callable (#126)
dannymeijer Nov 27, 2024
de56d00
[BUG] small fix for Tableau Server path checking (#134)
dannymeijer Nov 28, 2024
a7d2997
[FEATURE] DataBricksSecret for getting secrets from DataBricks scope …
mikita-sakalouski Nov 29, 2024
1e21e37
[FIX] Remove mention of non-existent class type in docs (#138)
dannymeijer Nov 29, 2024
5298c2b
[FIX] unused SparkSession being import from pyspark.sql in several te…
dannymeijer Nov 29, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ on:
pull_request:
branches:
- main
- release/*
workflow_dispatch:
inputs:
logLevel:
Expand Down Expand Up @@ -40,8 +41,8 @@ jobs:
fetch-depth: 0
ref: ${{ github.event.pull_request.head.ref }}
repository: ${{ github.event.pull_request.head.repo.full_name }}
- name: Fetch main branch
run: git fetch origin main:main
- name: Fetch target branch
run: git fetch origin ${{ github.event.pull_request.head.ref || 'main'}}:${{ github.event.pull_request.base.ref || 'main'}}
- name: Check changes
id: check
run: |
Expand All @@ -61,7 +62,7 @@ jobs:

tests:
needs: check_changes
if: needs.check_changes.outputs.python_changed > 0 || needs.check_changes.outputs.toml_changed > 0 || github.event_name == 'workflow_dispatch'
if: needs.check_changes.outputs.python_changed > 0 || needs.check_changes.outputs.toml_changed > 0 || github.event_name == 'workflow_dispatch' || ${{ github.event.pull_request.head.repo.owner.login }} != ${{ github.event.pull_request.base.repo.owner.login }}
name: Python ${{ matrix.python-version }} with PySpark ${{ matrix.pyspark-version }} on ${{ startsWith(matrix.os, 'macos-') && 'macOS' || startsWith(matrix.os, 'windows-') && 'Windows' || 'Linux' }}
runs-on: ${{ matrix.os }}

Expand All @@ -71,10 +72,12 @@ jobs:
# os: [ubuntu-latest, windows-latest, macos-latest] # FIXME: Add Windows and macOS
os: [ubuntu-latest]
python-version: ['3.9', '3.10', '3.11', '3.12']
pyspark-version: ['33', '34', '35']
pyspark-version: ['33', '34', '35', '35r']
exclude:
- python-version: '3.9'
pyspark-version: '35'
- python-version: '3.9'
pyspark-version: '35r'
- python-version: '3.11'
pyspark-version: '33'
- python-version: '3.11'
Expand All @@ -100,7 +103,7 @@ jobs:
# hatch fmt --check --python=${{ matrix.python-version }}

- name: Run tests
run: hatch test --python=${{ matrix.python-version }} -i version=pyspark${{ matrix.pyspark-version }}
run: hatch test --python=${{ matrix.python-version }} -i version=pyspark${{ matrix.pyspark-version }} --verbose

# https://github.com/marketplace/actions/alls-green#why
final_check: # This job does nothing and is only used for the branch protection
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,4 @@ out/**

# DevContainer
.devcontainer
uv.lock
20 changes: 16 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -194,23 +194,35 @@ the `pyproject.toml` entry mentioned above or installing through pip.
### Integrations

- __Spark Expectations:__
Available through the `koheesio.steps.integration.spark.dq.spark_expectations` module; installable through the `se` extra.
Available through the `koheesio.integrations.spark.dq.spark_expectations` module; installable through the `se` extra.
- SE Provides Data Quality checks for Spark DataFrames.
- For more information, refer to the [Spark Expectations docs](https://engineering.nike.com/spark-expectations).

[//]: # (- **Brickflow:** Available through the `koheesio.steps.integration.workflow` module; installable through the `bf` extra.)
- __Spark Connect and Delta:__
Koheesio is ready to be used with Spark Connect. In case you are using Delta package in combination with a remote/connect session, you are getting full support in Databricks and partial support for Delta package in Apache Spark. Full support for Delta in Apache Spark is coming with the release of PySpark 4.0.
- The spark extra can be installed by adding `koheesio[spark]` to the `pyproject.toml` entry mentioned above.
- The spark module is available through the `koheesio.spark` module.
- The delta module is available through the `koheesio.spark.writers.delta` module.
- For more information, refer to the [Databricks documentation](https://docs.databricks.com/).
- For more information on Apache Spark, refer to the [Apache Spark documentation](https://spark.apache.org/docs/latest/).

[//]: # (- **Brickflow:** Available through the `koheesio.integrations.workflow` module; installable through the `bf` extra.)
[//]: # ( - Brickflow is a workflow orchestration tool that allows you to define and execute workflows in a declarative way.)
[//]: # ( - For more information, refer to the [Brickflow docs](https://engineering.nike.com/brickflow))

- __Box__:
Available through the `koheesio.integration.box` module; installable through the `box` extra.
Available through the `koheesio.integrations.box` module; installable through the `box` extra.
- [Box](https://www.box.com) is a cloud content management and file sharing service for businesses.

- __SFTP__:
Available through the `koheesio.integration.spark.sftp` module; installable through the `sftp` extra.
Available through the `koheesio.integrations.spark.sftp` module; installable through the `sftp` extra.
- SFTP is a network protocol used for secure file transfer over a secure shell.
- The SFTP integration of Koheesio relies on [paramiko](https://www.paramiko.org/)

- __Snowflake__:
Available through the `koheesio.integrations.snowflake` module; installable through the `snowflake` extra.
- [Snowflake](https://www.snowflake.com) is a cloud-based data warehousing platform.

[//]: # (TODO: add implementations)
[//]: # (## Implementations)
[//]: # (TODO: add async extra)
Expand Down
12 changes: 6 additions & 6 deletions docs/reference/spark/transformations.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ Here's an example of a `ColumnsTransformation`:

```python
from pyspark.sql import functions as f
from koheesio.steps.transformations import ColumnsTransformation
from koheesio.spark.transformations import ColumnsTransformation

class AddOne(ColumnsTransformation):
def execute(self):
Expand Down Expand Up @@ -109,7 +109,7 @@ Here's an example of a `ColumnsTransformationWithTarget`:

```python
from pyspark.sql import Column
from koheesio.steps.transformations import ColumnsTransformationWithTarget
from koheesio.spark.transformations import ColumnsTransformationWithTarget

class AddOneWithTarget(ColumnsTransformationWithTarget):
def func(self, col: Column):
Expand Down Expand Up @@ -167,7 +167,7 @@ examples:

```python
from pyspark.sql import SparkSession
from koheesio.steps.transformations import DataframeLookup, JoinMapping, TargetColumn, JoinType
from koheesio.spark.transformations.lookup import DataframeLookup, JoinMapping, TargetColumn, JoinType

spark = SparkSession.builder.getOrCreate()
left_df = spark.createDataFrame([(1, "A"), (2, "B")], ["id", "value"])
Expand All @@ -191,7 +191,7 @@ examples:

```python
from pyspark.sql import SparkSession
from koheesio.steps.transformations import HashUUID5
from koheesio.spark.transformations.uuid5 import HashUUID5

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([(1, "A"), (2, "B")], ["id", "value"])
Expand Down Expand Up @@ -245,8 +245,8 @@ how to chain transformations:

```python
from pyspark.sql import SparkSession
from koheesio.steps.transformations import HashUUID5
from koheesio.steps.transformations import DataframeLookup, JoinMapping, TargetColumn, JoinType
from koheesio.spark.transformations.uuid5 import HashUUID5
from koheesio.spark.transformations.lookup import DataframeLookup, JoinMapping, TargetColumn, JoinType

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()
Expand Down
6 changes: 3 additions & 3 deletions docs/tutorials/advanced-data-processing.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ Partitioning is a technique that divides your data into smaller, more manageable
allows you to specify the partitioning scheme for your data when writing it to a target.

```python
from koheesio.steps.writers.delta import DeltaTableWriter
from koheesio.tasks.etl_task import EtlTask
from koheesio.spark.writers.delta import DeltaTableWriter
from koheesio.spark.etl_task import EtlTask

class MyTask(EtlTask):
target = DeltaTableWriter(table="my_table", partitionBy=["column1", "column2"])
Expand All @@ -52,7 +52,7 @@ class MyTask(EtlTask):
[//]: # ()
[//]: # (```python)

[//]: # (from koheesio.steps.transformations.cache import CacheTransformation)
[//]: # (from koheesio.spark.transformations.cache import CacheTransformation)

[//]: # ()
[//]: # (class MyTask(EtlTask):)
Expand Down
17 changes: 0 additions & 17 deletions docs/tutorials/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,6 @@
```
</details>

<details>
<summary>poetry</summary>

If you're using Poetry, add the following entry to the `pyproject.toml` file:

```toml title="pyproject.toml"
[[tool.poetry.source]]
name = "nike"
url = "https://artifactory.nike.com/artifactory/api/pypi/python-virtual/simple"
secondary = true
```

```bash
poetry add koheesio
```
</details>

<details>
<summary>pip</summary>

Expand Down
38 changes: 28 additions & 10 deletions docs/tutorials/hello-world.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,23 @@
# Simple Examples

## Bring your own SparkSession

The Koheesio Spark module does not set up a SparkSession for you. You need to create a SparkSession before using
Koheesio spark classes. This is the entry point for any Spark functionality, allowing the step to interact with the
Spark cluster.

- Every `SparkStep` has a `spark` attribute, which is the active SparkSession.
- Koheesio supports both local and remote (connect) Spark Sessions
- The SparkSession you created can be explicitly passed to the `SparkStep` constructor (this is optional)

To create a simple SparkSession, you can use the following code:

```python
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
```

## Creating a Custom Step

This example demonstrates how to use the `SparkStep` class from the `koheesio` library to create a custom step named
Expand All @@ -8,7 +26,7 @@ This example demonstrates how to use the `SparkStep` class from the `koheesio` l
### Code

```python
from koheesio.steps.step import SparkStep
from koheesio.spark import SparkStep

class HelloWorldStep(SparkStep):
message: str
Expand All @@ -21,7 +39,7 @@ class HelloWorldStep(SparkStep):
### Usage

```python
hello_world_step = HelloWorldStep(message="Hello, World!")
hello_world_step = HelloWorldStep(message="Hello, World!", spark=spark) # optionally pass the spark session
hello_world_step.execute()

hello_world_step.output.df.show()
Expand All @@ -33,16 +51,15 @@ The `HelloWorldStep` class is a `SparkStep` in Koheesio, designed to generate a

- `HelloWorldStep` inherits from `SparkStep`, a fundamental building block in Koheesio for creating data processing steps with Apache Spark.
- It has a `message` attribute. When creating an instance of `HelloWorldStep`, you can pass a custom message that will be used in the DataFrame.
- `SparkStep` has a `spark` attribute, which is the active SparkSession. This is the entry point for any Spark functionality, allowing the step to interact with the Spark cluster.
- `SparkStep` also includes an `Output` class, used to store the output of the step. In this case, `Output` has a `df` attribute to store the output DataFrame.
- The `execute` method creates a DataFrame with the custom message and stores it in `output.df`. It doesn't return a value explicitly; instead, the output DataFrame can be accessed via `output.df`.
- Koheesio uses pydantic for automatic validation of the step's input and output, ensuring they are correctly defined and of the correct types.
- The `spark` attribute can be optionally passed to the constructor when creating an instance of `HelloWorldStep`. This allows you to use an existing SparkSession or create a new one specifically for the step.
- If no `SparkSession` is passed to a `SparkStep`, Koheesio will use the `SparkSession.getActiveSession()` method to attempt retrieving an active SparkSession. If no active session is found, your code will not work.

Note: Pydantic is a data validation library that provides a way to validate that the data (in this case, the input and output of the step) conforms to the expected format.


---

## Creating a Custom Task

This example demonstrates how to use the `EtlTask` from the `koheesio` library to create a custom task named `MyFavoriteMovieTask`.
Expand All @@ -51,9 +68,10 @@ This example demonstrates how to use the `EtlTask` from the `koheesio` library t

```python
from typing import Any
from pyspark.sql import DataFrame, functions as f
from koheesio.steps.transformations import Transform
from koheesio.tasks.etl_task import EtlTask
from pyspark.sql import functions as f
from koheesio.spark import DataFrame
from koheesio.spark.transformations.transform import Transform
from koheesio.spark.etl_task import EtlTask


def add_column(df: DataFrame, target_column: str, value: Any):
Expand Down Expand Up @@ -104,8 +122,8 @@ source:
```python
from pyspark.sql import SparkSession
from koheesio.context import Context
from koheesio.steps.readers import DummyReader
from koheesio.steps.writers.dummy import DummyWriter
from koheesio.spark.readers.dummy import DummyReader
from koheesio.spark.writers.dummy import DummyWriter

context = Context.from_yaml("sample.yaml")

Expand Down
32 changes: 17 additions & 15 deletions docs/tutorials/testing-koheesio-steps.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,15 @@ Unit testing involves testing individual components of the software in isolation
Here's an example of how to unit test a Koheesio task:

```python
from koheesio.tasks.etl_task import EtlTask
from koheesio.steps.readers import DummyReader
from koheesio.steps.writers.dummy import DummyWriter
from koheesio.steps.transformations import Transform
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

from koheesio.spark import DataFrame
from koheesio.spark.etl_task import EtlTask
from koheesio.spark.readers.dummy import DummyReader
from koheesio.spark.writers.dummy import DummyWriter
from koheesio.spark.transformations.transform import Transform


def filter_age(df: DataFrame) -> DataFrame:
return df.filter(col("Age") > 18)
Expand Down Expand Up @@ -62,12 +64,12 @@ Here's an example of how to write an integration test for this task:

```python
# my_module.py
from koheesio.tasks.etl_task import EtlTask
from koheesio.spark.readers.delta import DeltaReader
from koheesio.steps.writers.delta import DeltaWriter
from koheesio.steps.transformations import Transform
from koheesio.context import Context
from pyspark.sql.functions import col
from koheesio.spark.etl_task import EtlTask
from koheesio.spark.readers.delta import DeltaTableReader
from koheesio.spark.writers.delta import DeltaTableWriter
from koheesio.spark.transformations.transform import Transform
from koheesio.context import Context


def filter_age(df):
Expand All @@ -84,8 +86,8 @@ context = Context({
})

task = EtlTask(
source=DeltaReader(**context.reader_options),
target=DeltaWriter(**context.writer_options),
source=DeltaTableReader(**context.reader_options),
target=DeltaTableWriter(**context.writer_options),
transformations=[
Transform(filter_age)
]
Expand All @@ -97,11 +99,11 @@ Now, let's create a test for this task. We'll use pytest and unittest.mock to mo
```python
# test_my_module.py
import pytest
from unittest.mock import MagicMock, patch
from unittest.mock import patch
from pyspark.sql import SparkSession
from koheesio.context import Context
from koheesio.steps.readers import Reader
from koheesio.steps.writers import Writer
from koheesio.spark.readers import Reader
from koheesio.spark.writers import Writer

from my_module import task

Expand Down
12 changes: 9 additions & 3 deletions makefile
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ hatch-install:
fi
init: hatch-install

.PHONY: sync ## hatch - Update dependencies if you changed project dependencies in pyproject.toml
.PHONY: update ## hatch - alias for sync (if you are used to poetry, thi is similar to running `poetry update`)
sync:
@hatch run dev:uv sync --all-extras
update: sync

# Code Quality
.PHONY: black black-fmt ## code quality - Use black to (re)format the codebase
black-fmt:
Expand Down Expand Up @@ -105,16 +111,16 @@ coverage: cov
all-tests:
@echo "\033[1mRunning all tests:\033[0m\n\033[35m This will run the full test suite\033[0m"
@echo "\033[1;31mWARNING:\033[0;33m This may take upward of 20-30 minutes to complete!\033[0m"
@hatch test --no-header --no-summary
@hatch test --no-header
.PHONY: spark-tests ## testing - Run SPARK tests in ALL environments
spark-tests:
@echo "\033[1mRunning Spark tests:\033[0m\n\033[35m This will run the Spark test suite against all specified environments\033[0m"
@echo "\033[1;31mWARNING:\033[0;33m This may take upward of 20-30 minutes to complete!\033[0m"
@hatch test -m spark --no-header --no-summary
@hatch test -m spark --no-header
.PHONY: non-spark-tests ## testing - Run non-spark tests in ALL environments
non-spark-tests:
@echo "\033[1mRunning non-Spark tests:\033[0m\n\033[35m This will run the non-Spark test suite against all specified environments\033[0m"
@hatch test -m "not spark" --no-header --no-summary
@hatch test -m "not spark" --no-header

.PHONY: dev-test ## testing - Run pytest, with all tests in the dev environment
dev-test:
Expand Down
Loading