diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index b9298c7d..69060a9a 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -1,4 +1,4 @@ -name: release +name: Release on: push: @@ -83,8 +83,9 @@ jobs: with: name: python-artifacts path: dist + + - name: Install Hatch + uses: pypa/hatch@install - name: Publish package to PyPI - uses: pypa/gh-action-pypi-publish@v1.8.14 - with: - print-hash: true \ No newline at end of file + run: hatch publish --yes --no-prompt \ No newline at end of file diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 66af4fa1..240ccc5e 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -1,9 +1,23 @@ -name: test +name: Test on: pull_request: branches: - main + paths: + - '**.py' + - '**.toml' + workflow_dispatch: + inputs: + logLevel: + description: 'Log level' + required: true + default: 'warning' + type: choice + options: + - info + - warning + - debug concurrency: group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.sha }} @@ -18,6 +32,7 @@ jobs: tests: name: Python ${{ matrix.python-version }} with PySpark ${{ matrix.pyspark-version }} on ${{ startsWith(matrix.os, 'macos-') && 'macOS' || startsWith(matrix.os, 'windows-') && 'Windows' || 'Linux' }} runs-on: ${{ matrix.os }} + strategy: fail-fast: false matrix: diff --git a/README.md b/README.md index 3b476777..6e09e181 100644 --- a/README.md +++ b/README.md @@ -7,20 +7,18 @@ | | | |---------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| CI/CD | [![CI - Test](https://github.com/Nike-Inc/koheesio/actions/workflows/test.yml/badge.svg)](https://github.com/Nike-Inc/koheesio/actions/workflows/test.yml) [![CD - Build Koheesio](https://github.com/Nike-Inc/koheesio/actions/workflows/build_koheesio.yml/badge.svg)](https://github.com/Nike-Inc/koheesio/actions/workflows/release.yml) | +| CI/CD | [![CI - Test](https://github.com/Nike-Inc/koheesio/actions/workflows/test.yml/badge.svg)](https://github.com/Nike-Inc/koheesio/actions/workflows/test.yml) [![CD - Release Koheesio](https://github.com/Nike-Inc/koheesio/actions/workflows/release.yml/badge.svg)](https://github.com/Nike-Inc/koheesio/actions/workflows/release.yml) | | Package | [![PyPI - Version](https://img.shields.io/pypi/v/koheesio.svg?logo=pypi&label=PyPI&logoColor=gold)](https://pypi.org/project/koheesio/) [![PyPI - Python Version](https://img.shields.io/pypi/pyversions/koheesio.svg?logo=python&label=Python&logoColor=gold)](https://pypi.org/project/koheesio/) [![PyPI - Installs](https://img.shields.io/pypi/dm/koheesio.svg?color=blue&label=Installs&logo=pypi&logoColor=gold)](https://pypi.org/project/koheesio/) [![Release - Downloads](https://img.shields.io/github/downloads/Nike-Inc/koheesio/total?label=Downloads)](https://github.com/Nike-Inc/koheesio/releases) | -| Meta | [![Hatch project](https://img.shields.io/badge/%F0%9F%A5%9A-Hatch-4051b5.svg)](https://github.com/pypa/hatch) [![linting - Ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json)](https://github.com/astral-sh/ruff) [![types - Mypy](https://img.shields.io/badge/types-Mypy-blue.svg)](https://github.com/python/mypy) [![docstring - numpydoc](https://img.shields.io/badge/docstring-numpydoc-blue)](https://numpydoc.readthedocs.io/en/latest/format.html) [![code style - black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black) [![License - Apache 2.0](https://img.shields.io/badge/License-Apache_2.0-green.svg)](LICENSE.txt) | +| Meta | [![Hatch project](https://img.shields.io/badge/%F0%9F%A5%9A-Hatch-4051b5.svg)](https://github.com/pypa/hatch) [![linting - Ruff](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/astral-sh/ruff/main/assets/badge/v2.json)](https://github.com/astral-sh/ruff) [![types - Mypy](https://img.shields.io/badge/types-Mypy-blue.svg)](https://github.com/python/mypy) [![docstring - numpydoc](https://img.shields.io/badge/docstring-numpydoc-blue)](https://numpydoc.readthedocs.io/en/latest/format.html) [![code style - black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black) [![License - Apache 2.0](https://img.shields.io/github/license/Nike-Inc/koheesio)](LICENSE.txt) | +Koheesio, named after the Finnish word for cohesion, is a robust Python framework for building efficient data pipelines. +It promotes modularity and collaboration, enabling the creation of complex pipelines from simple, reusable components. - -Koheesio, named after the Finnish word for cohesion, is a robust Python framework for building efficient data pipelines. -It promotes modularity and collaboration, enabling the creation of complex pipelines from simple, reusable components. - -The framework is versatile, aiming to support multiple implementations and working seamlessly with various data +The framework is versatile, aiming to support multiple implementations and working seamlessly with various data processing libraries or frameworks. This ensures that Koheesio can handle any data processing task, regardless of the underlying technology or data scale. -Koheesio uses [Pydantic] for strong typing, data validation, and settings management, ensuring a high level of type +Koheesio uses [Pydantic] for strong typing, data validation, and settings management, ensuring a high level of type safety and structured configurations within pipeline components. [Pydantic]: docs/includes/glossary.md#pydantic @@ -29,25 +27,25 @@ Koheesio's goal is to ensure predictable pipeline execution through a solid foun set of features, making it an excellent choice for developers and organizations seeking to build robust and adaptable Data Pipelines. +## What sets Koheesio apart from other libraries?" -### What sets Koheesio apart from other libraries?" -Koheesio encapsulates years of data engineering expertise, fostering a collaborative and innovative community. While -similar libraries exist, Koheesio's focus on data pipelines, integration with PySpark, and specific design for tasks +Koheesio encapsulates years of data engineering expertise, fostering a collaborative and innovative community. While +similar libraries exist, Koheesio's focus on data pipelines, integration with PySpark, and specific design for tasks like data transformation, ETL jobs, data validation, and large-scale data processing sets it apart. Koheesio aims to provide a rich set of features including readers, writers, and transformations for any type of Data -processing. Koheesio is not in competition with other libraries. Its aim is to offer wide-ranging support and focus +processing. Koheesio is not in competition with other libraries. Its aim is to offer wide-ranging support and focus on utility in a multitude of scenarios. Our preference is for integration, not competition... We invite contributions from all, promoting collaboration and innovation in the data engineering community. - ## Koheesio Core Components Here are the key components included in Koheesio: - __Step__: This is the fundamental unit of work in Koheesio. It represents a single operation in a data pipeline, taking in inputs and producing outputs. + ```text ┌─────────┐ ┌──────────────────┐ ┌──────────┐ │ Input 1 │───────▶│ ├───────▶│ Output 1 │ @@ -61,11 +59,11 @@ Here are the key components included in Koheesio: │ Input 3 │───────▶│ ├───────▶│ Output 3 │ └─────────┘ └──────────────────┘ └──────────┘ ``` + - __Context__: This is a configuration class used to set up the environment for a Task. It can be used to share variables across tasks and adapt the behavior of a Task based on its environment. - __Logger__: This is a class for logging messages at different levels. - ## Installation You can install Koheesio using either pip or poetry. @@ -82,7 +80,11 @@ pip install koheesio If you're using Hatch for package management, you can add Koheesio to your project by simply adding koheesio to your `pyproject.toml`. - + + ```toml + [dependencies] + koheesio = "" + ``` ### Using Poetry @@ -92,34 +94,35 @@ If you're using poetry for package management, you can add Koheesio to your proj poetry add koheesio ``` -or add the following line to your `pyproject.toml` (under `[tool.poetry.dependencies]`), making sure to replace -`...` with the version you want to have installed: +or add the following line to your `pyproject.toml` (under `[tool.poetry.dependencies]`), making sure to replace `...` with the version you want to have installed: ```toml koheesio = {version = "..."} ``` -### Extras +### Features Koheesio also provides some additional features that can be useful in certain scenarios. These include: -- __Spark Expectations:__ Available through the `koheesio.steps.integration.spark.dq.spark_expectations` module; installable through the `se` extra. - - SE Provides Data Quality checks for Spark DataFrames. - - For more information, refer to the [Spark Expectations docs](https://engineering.nike.com/spark-expectations). +- __Spark Expectations__: Available through the `koheesio.steps.integration.spark.dq.spark_expectations` module; + - Installable through the `se` extra. + - SE Provides Data Quality checks for Spark DataFrames. For more information, refer to the [Spark Expectations docs](https://engineering.nike.com/spark-expectations). [//]: # (- **Brickflow:** Available through the `koheesio.steps.integration.workflow` module; installable through the `bf` extra.) [//]: # ( - Brickflow is a workflow orchestration tool that allows you to define and execute workflows in a declarative way.) [//]: # ( - For more information, refer to the [Brickflow docs](https://engineering.nike.com/brickflow)) -- __Box__: Available through the `koheesio.steps.integration.box` module; installable through the `box` extra. - - Box is a cloud content management and file sharing service for businesses. +- __Box__: Available through the `koheesio.steps.integration.box` module + - Installable through the `box` extra. + - Box is a cloud content management and file sharing service for businesses. -- __SFTP__: Available through the `koheesio.steps.integration.spark.sftp` module; installable through the `sftp` extra. - - SFTP is a network protocol used for secure file transfer over a secure shell. +- __SFTP__: Available through the `koheesio.steps.integration.spark.sftp` module; + - Installable through the `sftp` extra. + - SFTP is a network protocol used for secure file transfer over a secure shell. > __Note:__ -> Some of the steps require extra dependencies. See the [Extras](#extras) section for additional info. -> Extras can be added to Poetry by adding `extras=['name_of_the_extra']` to the toml entry mentioned above +> Some of the steps require extra dependencies. See the [Features](#features) section for additional info. +> Extras can be done by adding `features=['name_of_the_extra']` to the toml entry mentioned above ## Contributing @@ -127,12 +130,9 @@ Koheesio also provides some additional features that can be useful in certain sc We welcome contributions to our project! Here's a brief overview of our development process: -- __Code Standards__: We use `pylint`, `black`, and `mypy` to maintain code standards. Please ensure your code passes - these checks by running `make check`. No errors or warnings should be reported by the linter before you submit a pull - request. +- __Code Standards__: We use `pylint`, `black`, and `mypy` to maintain code standards. Please ensure your code passes these checks by running `make check`. No errors or warnings should be reported by the linter before you submit a pull request. -- __Testing__: We use `pytest` for testing. Run the tests with `make test` and ensure all tests pass before submitting - a pull request. +- __Testing__: We use `pytest` for testing. Run the tests with `make test` and ensure all tests pass before submitting a pull request. - __Release Process__: We aim for frequent releases. Typically when we have a new feature or bugfix, a developer with admin rights will create a new release on GitHub and publish the new version to PyPI. diff --git a/pyproject.toml b/pyproject.toml index e13db33d..6abf886b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -58,9 +58,7 @@ async_http = [ box = ["boxsdk[jwt]==3.8.1"] pandas = ["pandas>=1.3", "setuptools"] pyspark = ["pyspark>=3.2.0", "pyarrow>13"] -# FIXME: loose versioning in spark_excpectations for pluggy module -# se = ["spark-expectations>=1.1.0"] -se = [] +se = ["spark-expectations>=2.1.0"] # SFTP dependencies in to_csv line_iterator sftp = ["paramiko>=2.6.0"] delta = ["delta-spark>=2.2"] @@ -249,7 +247,6 @@ features = [ "box", "pandas", "pyspark", - "se", "sftp", "delta", "dev", @@ -284,19 +281,27 @@ matrix.version.extra-dependencies = [ { value = "pyspark>=3.3,<3.4", if = [ "pyspark33", ] }, + { value = "spark-expectations>=2.1.0", if = [ + "pyspark33", + ] }, { value = "pyspark>=3.4,<3.5", if = [ "pyspark34", ] }, + { value = "spark-expectations>=2.1.0", if = [ + "pyspark34", + ] }, { value = "pyspark>=3.5,<3.6", if = [ "pyspark35", ] }, ] + name.".*".env-vars = [ - # set number of workes for parallel testing - { key = "PYTEST_XDIST_AUTO_NUM_WORKERS", value = "2" }, - # disables Koheesio logo being printed during testing - { key = "KOHEESIO__PRINT_LOGO", value = "False" }, + # set number of workes for parallel testing + { key = "PYTEST_XDIST_AUTO_NUM_WORKERS", value = "2" }, + # disables Koheesio logo being printed during testing + { key = "KOHEESIO__PRINT_LOGO", value = "False" }, ] + [tool.pytest.ini_options] addopts = "-q --color=yes --order-scope=module" log_level = "CRITICAL" @@ -331,11 +336,7 @@ koheesio = ["src/koheesio", "*/koheesio/src/koheesio"] tests = ["tests", "*/koheesio/tests"] [tool.coverage.report] -exclude_lines = [ - "no cov", - "if __name__ == .__main__.:", - "if TYPE_CHECKING:", -] +exclude_lines = ["no cov", "if __name__ == .__main__.:", "if TYPE_CHECKING:"] omit = ["tests/*"] ### ~~~~ ### @@ -650,4 +651,4 @@ enable = ["logging-not-lazy", "c-extension-no-member"] notes = ["FIXME", "TODO"] [tool.pylint.refactoring] -max-nested-blocks = 3 \ No newline at end of file +max-nested-blocks = 3 diff --git a/src/koheesio/__about__.py b/src/koheesio/__about__.py index 85ad77e9..4c00ef34 100644 --- a/src/koheesio/__about__.py +++ b/src/koheesio/__about__.py @@ -13,12 +13,15 @@ LICENSE_INFO = "Licensed as Apache 2.0" SOURCE = "https://github.com/Nike-Inc/koheesio" __version__ = "0.7.0" -__logo__ = 75, ( - b"\x1f\x8b\x08\x00TiGf\x02\xff}\x91\xbb\r\xc30\x0cD{Nq\x1bh\n\x01\x16R\xa4pK@\x8bh\xf8\xe8\xf8\x89\xe9\x04\xf0\x15" - b"\xc4\x91\x10\x9f(J`z\xbd4B\xea8J\xf2\xa01T\x02\x01,\x0b\x85Q\x92\x07\xe9\x9cK\x92\xd1,\xe0mRBL\x9c\xa6\x9b\xee" - b"\xeet)\x07Av\xc9/\x0b\x98\x93\xb4=\xd1v\xa4\xf5NG\xc6\xe5\xce\x93nk\x8d\x81\xf5\xed\x92\x80AmC\xbb\xde,.\x7f\x1fc" - b"\x0fU\xa79\x19\x82\x16]\x1248\x8f\xa5\x7f\x1c|\x92\xe2\xb8\xa59\xfd\xa5\x86\x8b.I\x9a\xf3\xd4W\x80\x8a\xd3\x9e" - b"\xfb\xba\\\xecm\x9f#\xee\xea\x92}M+\xffb\xb7\xb2\xc4\xc4K\x88Zui\xda\xedD\xfb\x00\xcfU6\xd3_\x02\x00\x00" +__logo__ = ( + 75, + ( + b"\x1f\x8b\x08\x00TiGf\x02\xff}\x91\xbb\r\xc30\x0cD{Nq\x1bh\n\x01\x16R\xa4pK@\x8bh\xf8\xe8\xf8\x89\xe9\x04\xf0\x15" + b"\xc4\x91\x10\x9f(J`z\xbd4B\xea8J\xf2\xa01T\x02\x01,\x0b\x85Q\x92\x07\xe9\x9cK\x92\xd1,\xe0mRBL\x9c\xa6\x9b\xee" + b"\xeet)\x07Av\xc9/\x0b\x98\x93\xb4=\xd1v\xa4\xf5NG\xc6\xe5\xce\x93nk\x8d\x81\xf5\xed\x92\x80AmC\xbb\xde,.\x7f\x1fc" + b"\x0fU\xa79\x19\x82\x16]\x1248\x8f\xa5\x7f\x1c|\x92\xe2\xb8\xa59\xfd\xa5\x86\x8b.I\x9a\xf3\xd4W\x80\x8a\xd3\x9e" + b"\xfb\xba\\\xecm\x9f#\xee\xea\x92}M+\xffb\xb7\xb2\xc4\xc4K\x88Zui\xda\xedD\xfb\x00\xcfU6\xd3_\x02\x00\x00" + ), ) __short_description__ = __doc__.split("\n", maxsplit=1)[0] __about__ = f"""Koheesio - v{__version__} diff --git a/src/koheesio/asyncio/__init__.py b/src/koheesio/asyncio/__init__.py index 5997ed98..a71a8182 100644 --- a/src/koheesio/asyncio/__init__.py +++ b/src/koheesio/asyncio/__init__.py @@ -67,7 +67,9 @@ def merge(self, other: Union[Dict, StepOutput]): -------- ```python step_output = StepOutput(foo="bar") - step_output.merge({"lorem": "ipsum"}) # step_output will now contain {'foo': 'bar', 'lorem': 'ipsum'} + step_output.merge( + {"lorem": "ipsum"} + ) # step_output will now contain {'foo': 'bar', 'lorem': 'ipsum'} ``` Functionally similar to adding two dicts together; like running `{**dict_a, **dict_b}`. diff --git a/src/koheesio/integrations/box.py b/src/koheesio/integrations/box.py index 5fa4a179..843601f9 100644 --- a/src/koheesio/integrations/box.py +++ b/src/koheesio/integrations/box.py @@ -610,12 +610,16 @@ class BoxFileWriter(BoxFolderBase): from koheesio.steps.integrations.box import BoxFileWriter auth_params = {...} - f1 = BoxFileWriter(**auth_params, path="/foo/bar", file="path/to/my/file.ext").execute() + f1 = BoxFileWriter( + **auth_params, path="/foo/bar", file="path/to/my/file.ext" + ).execute() # or import io b = io.BytesIO(b"my-sample-data") - f2 = BoxFileWriter(**auth_params, path="/foo/bar", file=b, name="file.ext").execute() + f2 = BoxFileWriter( + **auth_params, path="/foo/bar", file=b, name="file.ext" + ).execute() ``` """ diff --git a/src/koheesio/integrations/spark/dq/spark_expectations.py b/src/koheesio/integrations/spark/dq/spark_expectations.py index 6a01f91f..325ccaf5 100644 --- a/src/koheesio/integrations/spark/dq/spark_expectations.py +++ b/src/koheesio/integrations/spark/dq/spark_expectations.py @@ -4,19 +4,21 @@ from typing import Any, Dict, Optional, Union +import pyspark +from pydantic import Field +from pyspark.sql import DataFrame from spark_expectations.config.user_config import Constants as user_config from spark_expectations.core.expectations import ( SparkExpectations, WrappedDataFrameWriter, ) -from pydantic import Field - -from pyspark.sql import DataFrame - from koheesio.spark.transformations import Transformation from koheesio.spark.writers import BatchOutputMode +if pyspark.__version__.startswith("3.5"): + raise ImportError("Spark Expectations is not supported for Spark 3.5") + class SparkExpectationsTransformation(Transformation): """ diff --git a/src/koheesio/models/__init__.py b/src/koheesio/models/__init__.py index 33bc7d9b..77a45973 100644 --- a/src/koheesio/models/__init__.py +++ b/src/koheesio/models/__init__.py @@ -371,7 +371,9 @@ def __add__(self, other: Union[Dict, BaseModel]) -> BaseModel: ```python step_output_1 = StepOutput(foo="bar") step_output_2 = StepOutput(lorem="ipsum") - step_output_1 + step_output_2 # step_output_1 will now contain {'foo': 'bar', 'lorem': 'ipsum'} + ( + step_output_1 + step_output_2 + ) # step_output_1 will now contain {'foo': 'bar', 'lorem': 'ipsum'} ``` Parameters @@ -495,7 +497,9 @@ def merge(self, other: Union[Dict, BaseModel]): -------- ```python step_output = StepOutput(foo="bar") - step_output.merge({"lorem": "ipsum"}) # step_output will now contain {'foo': 'bar', 'lorem': 'ipsum'} + step_output.merge( + {"lorem": "ipsum"} + ) # step_output will now contain {'foo': 'bar', 'lorem': 'ipsum'} ``` Parameters diff --git a/src/koheesio/spark/readers/file_loader.py b/src/koheesio/spark/readers/file_loader.py index f1161966..a497a0a8 100644 --- a/src/koheesio/spark/readers/file_loader.py +++ b/src/koheesio/spark/readers/file_loader.py @@ -80,7 +80,9 @@ class FileLoader(Reader, ExtraParamsMixin): Example: ```python - reader = FileLoader(path="path/to/textfile.txt", format="text", header=True, lineSep="\n") + reader = FileLoader( + path="path/to/textfile.txt", format="text", header=True, lineSep="\n" + ) ``` For more information about the available options, see Spark's diff --git a/src/koheesio/spark/readers/rest_api.py b/src/koheesio/spark/readers/rest_api.py index a4f7bd0a..49de6dbf 100644 --- a/src/koheesio/spark/readers/rest_api.py +++ b/src/koheesio/spark/readers/rest_api.py @@ -67,7 +67,9 @@ class RestApiReader(Reader): pages=3, session=session, ) - task = RestApiReader(transport=transport, spark_schema="id: int, page:int, value: string") + task = RestApiReader( + transport=transport, spark_schema="id: int, page:int, value: string" + ) task.execute() all_data = [row.asDict() for row in task.output.df.collect()] ``` @@ -92,7 +94,9 @@ class RestApiReader(Reader): connector=connector, ) - task = RestApiReader(transport=transport, spark_schema="id: int, page:int, value: string") + task = RestApiReader( + transport=transport, spark_schema="id: int, page:int, value: string" + ) task.execute() all_data = [row.asDict() for row in task.output.df.collect()] ``` diff --git a/src/koheesio/spark/transformations/__init__.py b/src/koheesio/spark/transformations/__init__.py index 76e638ec..938032c7 100644 --- a/src/koheesio/spark/transformations/__init__.py +++ b/src/koheesio/spark/transformations/__init__.py @@ -58,7 +58,9 @@ class Transformation(SparkStep, ABC): class AddOne(Transformation): def execute(self): - self.output.df = self.df.withColumn("new_column", f.col("old_column") + 1) + self.output.df = self.df.withColumn( + "new_column", f.col("old_column") + 1 + ) ``` In the example above, the `execute` method is implemented to add 1 to the values of the `old_column` and store the diff --git a/src/koheesio/spark/transformations/camel_to_snake.py b/src/koheesio/spark/transformations/camel_to_snake.py index 6246c8a2..7a0b8ebb 100644 --- a/src/koheesio/spark/transformations/camel_to_snake.py +++ b/src/koheesio/spark/transformations/camel_to_snake.py @@ -48,7 +48,9 @@ class CamelToSnakeTransformation(ColumnsTransformation): | ... | ... | ```python - output_df = CamelToSnakeTransformation(column="camelCaseColumn").transform(input_df) + output_df = CamelToSnakeTransformation(column="camelCaseColumn").transform( + input_df + ) ``` __output_df:__ diff --git a/src/koheesio/spark/transformations/date_time/interval.py b/src/koheesio/spark/transformations/date_time/interval.py index 5b3679a0..576b757f 100644 --- a/src/koheesio/spark/transformations/date_time/interval.py +++ b/src/koheesio/spark/transformations/date_time/interval.py @@ -99,10 +99,14 @@ DateTimeAddInterval, ) -input_df = spark.createDataFrame([(1, "2022-01-01 00:00:00")], ["id", "my_column"]) +input_df = spark.createDataFrame( + [(1, "2022-01-01 00:00:00")], ["id", "my_column"] +) # add 1 day to my_column and store the result in a new column called 'one_day_later' -output_df = DateTimeAddInterval(column="my_column", target_column="one_day_later", interval="1 day").transform(input_df) +output_df = DateTimeAddInterval( + column="my_column", target_column="one_day_later", interval="1 day" +).transform(input_df) ``` __output_df__: diff --git a/src/koheesio/spark/transformations/lookup.py b/src/koheesio/spark/transformations/lookup.py index 960b9987..f1b5a9c2 100644 --- a/src/koheesio/spark/transformations/lookup.py +++ b/src/koheesio/spark/transformations/lookup.py @@ -102,7 +102,9 @@ class DataframeLookup(Transformation): df=left_df, other=right_df, on=JoinMapping(source_column="id", joined_column="id"), - targets=TargetColumn(target_column="value", target_column_alias="right_value"), + targets=TargetColumn( + target_column="value", target_column_alias="right_value" + ), how=JoinType.LEFT, ) diff --git a/src/koheesio/spark/transformations/strings/change_case.py b/src/koheesio/spark/transformations/strings/change_case.py index 7929c4e3..0957ca8a 100644 --- a/src/koheesio/spark/transformations/strings/change_case.py +++ b/src/koheesio/spark/transformations/strings/change_case.py @@ -51,7 +51,9 @@ class LowerCase(ColumnsTransformationWithTarget): | Beans| 1600| USA| ```python - output_df = LowerCase(column="product", target_column="product_lower").transform(df) + output_df = LowerCase( + column="product", target_column="product_lower" + ).transform(df) ``` __output_df:__ @@ -107,7 +109,9 @@ class UpperCase(LowerCase): | Beans| 1600| USA| ```python - output_df = UpperCase(column="product", target_column="product_upper").transform(df) + output_df = UpperCase( + column="product", target_column="product_upper" + ).transform(df) ``` __output_df:__ @@ -158,7 +162,9 @@ class TitleCase(LowerCase): | Beans| 1600| USA| ```python - output_df = TitleCase(column="product", target_column="product_title").transform(df) + output_df = TitleCase( + column="product", target_column="product_title" + ).transform(df) ``` __output_df:__ diff --git a/src/koheesio/spark/transformations/strings/split.py b/src/koheesio/spark/transformations/strings/split.py index 05ae30d0..a7ef90af 100644 --- a/src/koheesio/spark/transformations/strings/split.py +++ b/src/koheesio/spark/transformations/strings/split.py @@ -51,7 +51,9 @@ class SplitAll(ColumnsTransformationWithTarget): | Beans| 1600| USA| ```python - output_df = SplitColumn(column="product", target_column="split", split_pattern=" ").transform(input_df) + output_df = SplitColumn( + column="product", target_column="split", split_pattern=" " + ).transform(input_df) ``` __output_df:__ @@ -107,7 +109,9 @@ class SplitAtFirstMatch(SplitAll): | Beans| 1600| USA| ```python - output_df = SplitColumn(column="product", target_column="split_first", split_pattern="an").transform(input_df) + output_df = SplitColumn( + column="product", target_column="split_first", split_pattern="an" + ).transform(input_df) ``` __output_df:__ diff --git a/src/koheesio/spark/transformations/strings/trim.py b/src/koheesio/spark/transformations/strings/trim.py index 57b7c4e8..655df98e 100644 --- a/src/koheesio/spark/transformations/strings/trim.py +++ b/src/koheesio/spark/transformations/strings/trim.py @@ -57,7 +57,9 @@ class Trim(ColumnsTransformationWithTarget): ### Trim whitespace from the beginning of a string ```python - output_df = Trim(column="column", target_column="trimmed_column", direction="left").transform(input_df) + output_df = Trim( + column="column", target_column="trimmed_column", direction="left" + ).transform(input_df) ``` __output_df:__ @@ -84,7 +86,9 @@ class Trim(ColumnsTransformationWithTarget): ### Trim whitespace from the end of a string ```python - output_df = Trim(column="column", target_column="trimmed_column", direction="right").transform(input_df) + output_df = Trim( + column="column", target_column="trimmed_column", direction="right" + ).transform(input_df) ``` __output_df:__ diff --git a/src/koheesio/spark/transformations/uuid5.py b/src/koheesio/spark/transformations/uuid5.py index 478bdb97..ec735329 100644 --- a/src/koheesio/spark/transformations/uuid5.py +++ b/src/koheesio/spark/transformations/uuid5.py @@ -110,7 +110,9 @@ class HashUUID5(Transformation): In code: ```python - HashUUID5(source_columns=["id", "string"], target_column="uuid5").transform(input_df) + HashUUID5(source_columns=["id", "string"], target_column="uuid5").transform( + input_df + ) ``` In this example, the `id` and `string` columns are concatenated and hashed using the UUID5 algorithm. The result is diff --git a/tests/spark/integrations/dq/test_spark_expectations.py b/tests/spark/integrations/dq/test_spark_expectations.py index 0ff6bf2d..a8ef6fbf 100644 --- a/tests/spark/integrations/dq/test_spark_expectations.py +++ b/tests/spark/integrations/dq/test_spark_expectations.py @@ -1,15 +1,16 @@ from typing import List, Union +import pyspark import pytest - -from pyspark.sql import SparkSession - from koheesio.utils import get_project_root +from pyspark.sql import SparkSession PROJECT_ROOT = get_project_root() pytestmark = pytest.mark.spark -pytestmark = pytest.mark.skip(reason="Skipping all tests in this module due to the spark expectation package issues") + +if pyspark.__version__.startswith("3.5"): + pytestmark = pytest.mark.skip("Spark Expectations is not supported for Spark 3.5") class TestSparkExpectationsTransform: diff --git a/tests/spark/writers/delta/test_delta_writer.py b/tests/spark/writers/delta/test_delta_writer.py index 78b0479b..21d02725 100644 --- a/tests/spark/writers/delta/test_delta_writer.py +++ b/tests/spark/writers/delta/test_delta_writer.py @@ -4,35 +4,35 @@ import pytest from conftest import await_job_completion from delta import DeltaTable - -from pydantic import ValidationError - -from pyspark.sql import functions as F - from koheesio.spark import AnalysisException from koheesio.spark.delta import DeltaTableStep from koheesio.spark.writers import BatchOutputMode, StreamingOutputMode from koheesio.spark.writers.delta import DeltaTableStreamWriter, DeltaTableWriter from koheesio.spark.writers.delta.utils import log_clauses from koheesio.spark.writers.stream import Trigger +from pydantic import ValidationError +from pyspark.sql import functions as F pytestmark = pytest.mark.spark -@pytest.mark.parametrize( - "output_mode,expected_count", - [ - (BatchOutputMode.APPEND, 1), - (BatchOutputMode.APPEND, 2), - (BatchOutputMode.OVERWRITE, 1), - (BatchOutputMode.IGNORE, 1), - ], -) -def test_delta_table_writer(output_mode, expected_count, dummy_df, spark): +def test_delta_table_writer(dummy_df, spark): table_name = "test_table" - DeltaTableWriter(table=table_name, output_mode=output_mode, df=dummy_df).execute() + writer = DeltaTableWriter(table=table_name, output_mode=BatchOutputMode.APPEND, df=dummy_df) + writer.execute() + actual_count = spark.read.table(table_name).count() + assert actual_count == 1 + writer.execute() + actual_count = spark.read.table(table_name).count() + assert actual_count == 2 + writer.output_mode = BatchOutputMode.OVERWRITE + writer.execute() + actual_count = spark.read.table(table_name).count() + assert actual_count == 1 + writer.output_mode = BatchOutputMode.IGNORE + writer.execute() actual_count = spark.read.table(table_name).count() - assert actual_count == expected_count + assert actual_count == 1 def test_delta_partitioning(spark, sample_df_to_partition): diff --git a/tests/spark/writers/test_sftp.py b/tests/spark/writers/test_sftp.py index e21790a0..7119edda 100644 --- a/tests/spark/writers/test_sftp.py +++ b/tests/spark/writers/test_sftp.py @@ -4,13 +4,13 @@ import pytest from paramiko import SSHException -from koheesio.spark.writers.buffer import PandasCsvBufferWriter -from koheesio.spark.writers.sftp import ( +from koheesio.integrations.spark.sftp import ( SendCsvToSftp, SendJsonToSftp, SFTPWriteMode, SFTPWriter, ) +from koheesio.spark.writers.buffer import PandasCsvBufferWriter pytestmark = pytest.mark.spark