Skip to content

Commit

Permalink
restructure module structure
Browse files Browse the repository at this point in the history
  • Loading branch information
dannymeijer authored May 24, 2024
1 parent 8b32062 commit 89af932
Show file tree
Hide file tree
Showing 143 changed files with 766 additions and 879 deletions.
6 changes: 3 additions & 3 deletions docs/reference/concepts/transformations.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,13 @@ Koheesio provides a variety of `Transformation` subclasses for transforming data
examples:

- `DataframeLookup`: This transformation joins two dataframes together based on a list of join mappings. It allows you
to specify the join type and join hint, and it supports selecting specific target columns from the right dataframe.
to specify the join type and join hint, and it supports selecting specific target columns from the right dataframe.

Here's an example of how to use the `DataframeLookup` transformation:

```python
from pyspark.sql import SparkSession
from koheesio.steps.transformations.lookup import DataframeLookup, JoinMapping, TargetColumn, JoinType
from koheesio.steps.transformations import DataframeLookup, JoinMapping, TargetColumn, JoinType

spark = SparkSession.builder.getOrCreate()
left_df = spark.createDataFrame([(1, "A"), (2, "B")], ["id", "value"])
Expand Down Expand Up @@ -246,7 +246,7 @@ how to chain transformations:
```python
from pyspark.sql import SparkSession
from koheesio.steps.transformations import HashUUID5
from koheesio.steps.transformations.lookup import DataframeLookup, JoinMapping, TargetColumn, JoinType
from koheesio.steps.transformations import DataframeLookup, JoinMapping, TargetColumn, JoinType

# Create a SparkSession
spark = SparkSession.builder.getOrCreate()
Expand Down
7 changes: 5 additions & 2 deletions docs/tutorials/advanced-data-processing.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ Here's an example of a custom transformation that normalizes a column in a DataF

```python
from pyspark.sql import DataFrame
from koheesio.steps.transformations.transform import Transform
from koheesio.steps.transformations import Transform


def normalize_column(df: DataFrame, column: str) -> DataFrame:
max_value = df.agg({column: "max"}).collect()[0][0]
min_value = df.agg({column: "min"}).collect()[0][0]
return df.withColumn(column, (df[column] - min_value) / (max_value - min_value))


class NormalizeColumnTransform(Transform):
column: str

Expand Down Expand Up @@ -45,7 +47,8 @@ Caching is another technique that can improve performance by storing the result
doesn't have to be recomputed each time it's used. You can use the cache method to cache the result of a transformation.

```python
from koheesio.steps.transformations.cache import CacheTransformation
from koheesio.steps.transformations import CacheTransformation


class MyTask(EtlTask):
transformations = [NormalizeColumnTransform(column="my_column"), CacheTransformation()]
Expand Down
6 changes: 4 additions & 2 deletions docs/tutorials/hello-world.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,14 @@ This example demonstrates how to use the `EtlTask` from the `koheesio` library t
```python
from typing import Any
from pyspark.sql import DataFrame, functions as f
from koheesio.steps.transformations.transform import Transform
from koheesio.steps.transformations import Transform
from koheesio.tasks.etl_task import EtlTask


def add_column(df: DataFrame, target_column: str, value: Any):
return df.withColumn(target_column, f.lit(value))


class MyFavoriteMovieTask(EtlTask):
my_favorite_movie: str

Expand Down Expand Up @@ -102,7 +104,7 @@ source:
```python
from pyspark.sql import SparkSession
from koheesio.context import Context
from koheesio.steps.readers.dummy import DummyReader
from koheesio.steps.readers import DummyReader
from koheesio.steps.writers.dummy import DummyWriter

context = Context.from_yaml("sample.yaml")
Expand Down
12 changes: 8 additions & 4 deletions docs/tutorials/testing-koheesio-tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,17 @@ Here's an example of how to unit test a Koheesio task:

```python
from koheesio.tasks.etl_task import EtlTask
from koheesio.steps.readers.dummy import DummyReader
from koheesio.steps.readers import DummyReader
from koheesio.steps.writers.dummy import DummyWriter
from koheesio.steps.transformations.transform import Transform
from koheesio.steps.transformations import Transform
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col


def filter_age(df: DataFrame) -> DataFrame:
return df.filter(col("Age") > 18)


def test_etl_task():
# Initialize SparkSession
spark = SparkSession.builder.getOrCreate()
Expand Down Expand Up @@ -61,15 +63,17 @@ Here's an example of how to write an integration test for this task:
```python
# my_module.py
from koheesio.tasks.etl_task import EtlTask
from koheesio.steps.readers.delta import DeltaReader
from koheesio.spark.readers.delta import DeltaReader
from koheesio.steps.writers.delta import DeltaWriter
from koheesio.steps.transformations.transform import Transform
from koheesio.steps.transformations import Transform
from koheesio.context import Context
from pyspark.sql.functions import col


def filter_age(df):
return df.filter(col("Age") > 18)


context = Context({
"reader_options": {
"table": "input_table"
Expand Down
28 changes: 0 additions & 28 deletions koheesio/steps/__init__.py

This file was deleted.

51 changes: 0 additions & 51 deletions koheesio/steps/transformations/rank_dedup.py

This file was deleted.

20 changes: 0 additions & 20 deletions koheesio/tasks/__init__.py

This file was deleted.

Loading

0 comments on commit 89af932

Please sign in to comment.