From 3816e3745d012f30e4852332537a550ca05d6261 Mon Sep 17 00:00:00 2001 From: 10sharmashivam <10sharmashivam@gmail.com> Date: Thu, 31 Oct 2024 12:12:45 +0530 Subject: [PATCH 01/10] Basic structure of Plugin Signed-off-by: 10sharmashivam <10sharmashivam@gmail.com> --- plugins/flytekit-soda.io/README.md | 16 ++++ .../flytekitplugins/soda/__init__.py | 14 ++++ .../flytekitplugins/soda/task.py | 74 +++++++++++++++++++ plugins/flytekit-soda.io/setup.py | 32 ++++++++ 4 files changed, 136 insertions(+) create mode 100644 plugins/flytekit-soda.io/README.md create mode 100644 plugins/flytekit-soda.io/flytekitplugins/soda/__init__.py create mode 100644 plugins/flytekit-soda.io/flytekitplugins/soda/task.py create mode 100644 plugins/flytekit-soda.io/setup.py diff --git a/plugins/flytekit-soda.io/README.md b/plugins/flytekit-soda.io/README.md new file mode 100644 index 0000000000..a7e752f0d4 --- /dev/null +++ b/plugins/flytekit-soda.io/README.md @@ -0,0 +1,16 @@ +# Soda.io Flyte Plugin + +This plugin integrates Flyte with [Soda.io](https://soda.io/), enabling users to run data quality checks as part of their Flyte workflows. With this plugin, data engineers and analysts can embed data monitoring tasks directly into Flyte, improving data reliability and visibility. + +## Features + +- **Soda.io Integration**: Leverages Soda.io for running data quality scans within Flyte tasks. +- **Configurable Scan Definitions**: Users can define custom scan configurations to monitor data quality based on Soda.io's definitions. +- **API Integration**: Enables configuration of Soda.io credentials and other Soda-specific settings for secure and dynamic data quality checks. + +## Installation + +This plugin is intended to be installed as part of the Flyte ecosystem. To install it, ensure you have Flytekit installed: + +```bash +pip install flytekit \ No newline at end of file diff --git a/plugins/flytekit-soda.io/flytekitplugins/soda/__init__.py b/plugins/flytekit-soda.io/flytekitplugins/soda/__init__.py new file mode 100644 index 0000000000..c1c48d32b0 --- /dev/null +++ b/plugins/flytekit-soda.io/flytekitplugins/soda/__init__.py @@ -0,0 +1,14 @@ +""" +.. currentmodule:: flytekitplugins.kfsoda + +This package contains the Soda.io plugin for Flytekit, allowing Flyte users to run data quality scans. + +.. autosummary:: + :template: custom.rst + :toctree: generated/ + + SodaCheckConfig + SodaCheckTask +""" + +from .task import SodaCheckConfig, SodaCheckTask \ No newline at end of file diff --git a/plugins/flytekit-soda.io/flytekitplugins/soda/task.py b/plugins/flytekit-soda.io/flytekitplugins/soda/task.py new file mode 100644 index 0000000000..5e7341eedc --- /dev/null +++ b/plugins/flytekit-soda.io/flytekitplugins/soda/task.py @@ -0,0 +1,74 @@ +""" +This Plugin integrates Soda.io data quality checks with Flyte, allowing users to run and manage +data quality scans as part of Flyte workflows. It leverages Soda.io's scanning capabilities to monitor +data quality by defining customizable scan configurations. + +This plugin allows setting various parameters like scan definition files, data sources, and Soda Cloud +API credentials to run these scans in an automated fashion within Flyte. +""" +from dataclasses import dataclass +from flytekit import PythonFunctionTask +from typing import Any, Dict, Callable, Optional + +# This would be the main task configuration class for Soda.io +@dataclass +class SodaCheckConfig: + """ + Configuration class for a Soda.io data quality scan task. + + Attributes: + scan_definition (str): Path to the Soda.io scan definition YAML file. + soda_cloud_api_key (Optional[str]): API key for Soda Cloud access, if applicable. + data_source (Optional[str]): Name of the data source in Soda.io to use for the scan. + scan_name (Optional[str]): Name of the scan job for organizational purposes. + """ + scan_definition: str + soda_cloud_api_key: Optional[str] = None + data_source: Optional[str] = None # Name of the data source in Soda.io + scan_name: Optional[str] = "default_scan" # Name for the scan job + +class SodaCheckTask(PythonFunctionTask[SodaCheckConfig]): + """ + A Flyte task that runs a Soda.io data quality scan as defined in the provided configuration. + + This task allows users to execute data quality checks by leveraging the Soda.io API. The scan + configuration includes options for specifying the scan definition file, Soda.io Cloud API key, + and data source. + + Attributes: + _TASK_TYPE (str): The task type identifier for Soda.io checks within Flyte. + """ + _TASK_TYPE = "soda_check_task" + + def __init__(self, task_config: SodaCheckConfig, task_function: Callable, **kwargs): + """ + Initializes the SodaCheckTask with the provided configuration. + + Args: + task_config (SodaCheckConfig): The configuration for the Soda.io scan. + task_function (Callable): The function representing the task logic. + kwargs: Additional keyword arguments. + """ + super().__init__( + task_type=self._TASK_TYPE, + task_config=task_config, + task_function=task_function, + task_type_version=1, + **kwargs, + ) + + def execute(self, **kwargs) -> Dict[str, Any]: + """ + Executes the Soda.io scan using the configuration provided in task_config. + + Returns: + dict: A dictionary containing the results of the Soda.io scan. + """ + # Example code to invoke Soda.io data scan + scan_definition = self.task_config.scan_definition + # Integrate with Soda.io here, such as by calling a scan API with scan_definition + + # You would actually integrate with the Soda API here, using soda-cloud-api-key and other configurations + result = {} # Placeholder for API result + + return {"scan_result": result} \ No newline at end of file diff --git a/plugins/flytekit-soda.io/setup.py b/plugins/flytekit-soda.io/setup.py new file mode 100644 index 0000000000..36331c0744 --- /dev/null +++ b/plugins/flytekit-soda.io/setup.py @@ -0,0 +1,32 @@ +from setuptools import setup + +PLUGIN_NAME = "kfsoda" +microlib_name = f"flytekitplugins-{PLUGIN_NAME}" +plugin_requires = ["flytekit>=1.6.1", "soda-spark"] # Update as per Soda.io requirements +__version__ = "0.0.0+develop" + +setup( + name=microlib_name, + version=__version__, + author="Your Name", + author_email="your.email@example.com", + description="Soda.io data quality plugin for Flyte", + namespace_packages=["flytekitplugins"], + packages=[f"flytekitplugins.{PLUGIN_NAME}"], + install_requires=plugin_requires, + license="apache2", + python_requires=">=3.8", + classifiers=[ + "Intended Audience :: Science/Research", + "Intended Audience :: Developers", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Topic :: Scientific/Engineering", + "Topic :: Scientific/Engineering :: Data Quality", + "Topic :: Software Development", + "Topic :: Software Development :: Libraries", + "Topic :: Software Development :: Libraries :: Python Modules", + ], +) \ No newline at end of file From a25c8d58e9356cc186e1f670deba0ce13995202f Mon Sep 17 00:00:00 2001 From: 10sharmashivam <10sharmashivam@gmail.com> Date: Thu, 31 Oct 2024 15:48:52 +0530 Subject: [PATCH 02/10] Basic Integration with soda.io Signed-off-by: 10sharmashivam <10sharmashivam@gmail.com> --- .../flytekitplugins/soda/task.py | 44 +++++++++++-- plugins/flytekit-soda.io/setup.py | 2 +- plugins/flytekit-soda.io/tests/__init__.py | 0 .../flytekit-soda.io/tests/test_soda_task.py | 63 +++++++++++++++++++ 4 files changed, 104 insertions(+), 5 deletions(-) create mode 100644 plugins/flytekit-soda.io/tests/__init__.py create mode 100644 plugins/flytekit-soda.io/tests/test_soda_task.py diff --git a/plugins/flytekit-soda.io/flytekitplugins/soda/task.py b/plugins/flytekit-soda.io/flytekitplugins/soda/task.py index 5e7341eedc..a2f27090db 100644 --- a/plugins/flytekit-soda.io/flytekitplugins/soda/task.py +++ b/plugins/flytekit-soda.io/flytekitplugins/soda/task.py @@ -9,6 +9,10 @@ from dataclasses import dataclass from flytekit import PythonFunctionTask from typing import Any, Dict, Callable, Optional +from flytekit.configuration import SecretsManager +import requests +import os +import subprocess # This would be the main task configuration class for Soda.io @dataclass @@ -64,11 +68,43 @@ def execute(self, **kwargs) -> Dict[str, Any]: Returns: dict: A dictionary containing the results of the Soda.io scan. """ - # Example code to invoke Soda.io data scan + # Retrieve the Soda Cloud API key from environment or Flyte's SecretsManager + api_key = ( + self.task_config.soda_cloud_api_key + or os.getenv("SODA_CLOUD_API_KEY") + or SecretsManager.get_secrets("soda", "api_key") + ) + + if not api_key: + raise ValueError("Soda Cloud API key is required but not provided.") + scan_definition = self.task_config.scan_definition - # Integrate with Soda.io here, such as by calling a scan API with scan_definition + data_source = self.task_config.data_source + scan_name = self.task_config.scan_name + + # Placeholder for API request to Soda.io + url = "https://api.soda.io/v1/scan" # Replace with actual Soda.io API endpoint + + # Prepare the request payload + payload = { + "scan_definition": scan_definition, + "data_source": data_source, + "scan_name": scan_name, + "api_key": api_key + } + + # Placeholder for API result + result = {} + + # Make the API call (using POST method as an example) + try: + response = requests.post(url, json=payload) + response.raise_for_status() # Raise an error for bad responses (4xx or 5xx) + + # Assuming the API returns a JSON response + result = response.json() - # You would actually integrate with the Soda API here, using soda-cloud-api-key and other configurations - result = {} # Placeholder for API result + except requests.exceptions.RequestException as e: + raise RuntimeError(f"API call failed: {e}") return {"scan_result": result} \ No newline at end of file diff --git a/plugins/flytekit-soda.io/setup.py b/plugins/flytekit-soda.io/setup.py index 36331c0744..094d1d594f 100644 --- a/plugins/flytekit-soda.io/setup.py +++ b/plugins/flytekit-soda.io/setup.py @@ -2,7 +2,7 @@ PLUGIN_NAME = "kfsoda" microlib_name = f"flytekitplugins-{PLUGIN_NAME}" -plugin_requires = ["flytekit>=1.6.1", "soda-spark"] # Update as per Soda.io requirements +plugin_requires = ["flytekit>=1.6.1", "soda-spark", "requests>=2.25.1"] # Update as per Soda.io requirements __version__ = "0.0.0+develop" setup( diff --git a/plugins/flytekit-soda.io/tests/__init__.py b/plugins/flytekit-soda.io/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/plugins/flytekit-soda.io/tests/test_soda_task.py b/plugins/flytekit-soda.io/tests/test_soda_task.py new file mode 100644 index 0000000000..a2a36c9fe1 --- /dev/null +++ b/plugins/flytekit-soda.io/tests/test_soda_task.py @@ -0,0 +1,63 @@ +import unittest +from unittest.mock import patch, MagicMock +from typing import NamedTuple, Dict + +from flytekit import task, workflow +from flytekitplugins.soda import SodaTask + +# Define a NamedTuple to represent the expected output from the SodaTask +SodaTaskOutput = NamedTuple("SodaTaskOutput", [("scan_result", Dict[str, any])]) + +# Mock configurations for the SodaTask +MOCK_SCAN_DEFINITION = "mock_scan_definition.yaml" +MOCK_DATA_SOURCE = "mock_data_source" +MOCK_SCAN_NAME = "mock_scan_name" +MOCK_API_KEY = "mock_api_key" +MOCK_RESPONSE = {"scan_result": {"status": "success", "findings": []}} # Example response structure + +# Define a Flyte task to initialize the SodaTask and execute it +@task +def setup_soda_task() -> SodaTaskOutput: + # Initialize the SodaTask with mock parameters + soda_task = SodaTask( + scan_definition=MOCK_SCAN_DEFINITION, + data_source=MOCK_DATA_SOURCE, + scan_name=MOCK_SCAN_NAME, + soda_cloud_api_key=MOCK_API_KEY + ) + + # Execute the task and return the mock response + return soda_task.execute() + +# Define a Flyte workflow to test the setup task +@workflow +def test_soda_workflow() -> SodaTaskOutput: + return setup_soda_task() + +# Define the test class for the SodaTask plugin +class TestSodaTask(unittest.TestCase): + @patch("requests.post") + def test_soda_task_execution(self, mock_post): + # Mock the response to simulate the Soda.io API call + mock_response = MagicMock() + mock_response.json.return_value = MOCK_RESPONSE + mock_response.raise_for_status = MagicMock() + mock_post.return_value = mock_response + + # Run the workflow + result = test_soda_workflow() + + # Assertions to verify expected results + self.assertEqual(result.scan_result, MOCK_RESPONSE["scan_result"]) + mock_post.assert_called_once_with( + "https://api.soda.io/v1/scan", # Replace with actual endpoint + json={ + "scan_definition": MOCK_SCAN_DEFINITION, + "data_source": MOCK_DATA_SOURCE, + "scan_name": MOCK_SCAN_NAME, + "api_key": MOCK_API_KEY + } + ) + +if __name__ == "__main__": + unittest.main() \ No newline at end of file From 5243774fe1d6dd21084a8e4ddade0002bc7ae6bf Mon Sep 17 00:00:00 2001 From: 10sharmashivam <10sharmashivam@gmail.com> Date: Tue, 12 Nov 2024 22:49:40 +0530 Subject: [PATCH 03/10] set up the new plugins/community folder structure Signed-off-by: 10sharmashivam <10sharmashivam@gmail.com> --- plugins/community/flytekit-soda.io/README.md | 16 +++ .../flytekitplugins/soda/__init__.py | 14 +++ .../flytekitplugins/soda/task.py | 110 ++++++++++++++++++ plugins/community/flytekit-soda.io/setup.py | 32 +++++ .../flytekit-soda.io/tests/__init__.py | 0 .../flytekit-soda.io/tests/test_soda_task.py | 63 ++++++++++ 6 files changed, 235 insertions(+) create mode 100644 plugins/community/flytekit-soda.io/README.md create mode 100644 plugins/community/flytekit-soda.io/flytekitplugins/soda/__init__.py create mode 100644 plugins/community/flytekit-soda.io/flytekitplugins/soda/task.py create mode 100644 plugins/community/flytekit-soda.io/setup.py create mode 100644 plugins/community/flytekit-soda.io/tests/__init__.py create mode 100644 plugins/community/flytekit-soda.io/tests/test_soda_task.py diff --git a/plugins/community/flytekit-soda.io/README.md b/plugins/community/flytekit-soda.io/README.md new file mode 100644 index 0000000000..a7e752f0d4 --- /dev/null +++ b/plugins/community/flytekit-soda.io/README.md @@ -0,0 +1,16 @@ +# Soda.io Flyte Plugin + +This plugin integrates Flyte with [Soda.io](https://soda.io/), enabling users to run data quality checks as part of their Flyte workflows. With this plugin, data engineers and analysts can embed data monitoring tasks directly into Flyte, improving data reliability and visibility. + +## Features + +- **Soda.io Integration**: Leverages Soda.io for running data quality scans within Flyte tasks. +- **Configurable Scan Definitions**: Users can define custom scan configurations to monitor data quality based on Soda.io's definitions. +- **API Integration**: Enables configuration of Soda.io credentials and other Soda-specific settings for secure and dynamic data quality checks. + +## Installation + +This plugin is intended to be installed as part of the Flyte ecosystem. To install it, ensure you have Flytekit installed: + +```bash +pip install flytekit \ No newline at end of file diff --git a/plugins/community/flytekit-soda.io/flytekitplugins/soda/__init__.py b/plugins/community/flytekit-soda.io/flytekitplugins/soda/__init__.py new file mode 100644 index 0000000000..c1c48d32b0 --- /dev/null +++ b/plugins/community/flytekit-soda.io/flytekitplugins/soda/__init__.py @@ -0,0 +1,14 @@ +""" +.. currentmodule:: flytekitplugins.kfsoda + +This package contains the Soda.io plugin for Flytekit, allowing Flyte users to run data quality scans. + +.. autosummary:: + :template: custom.rst + :toctree: generated/ + + SodaCheckConfig + SodaCheckTask +""" + +from .task import SodaCheckConfig, SodaCheckTask \ No newline at end of file diff --git a/plugins/community/flytekit-soda.io/flytekitplugins/soda/task.py b/plugins/community/flytekit-soda.io/flytekitplugins/soda/task.py new file mode 100644 index 0000000000..a2f27090db --- /dev/null +++ b/plugins/community/flytekit-soda.io/flytekitplugins/soda/task.py @@ -0,0 +1,110 @@ +""" +This Plugin integrates Soda.io data quality checks with Flyte, allowing users to run and manage +data quality scans as part of Flyte workflows. It leverages Soda.io's scanning capabilities to monitor +data quality by defining customizable scan configurations. + +This plugin allows setting various parameters like scan definition files, data sources, and Soda Cloud +API credentials to run these scans in an automated fashion within Flyte. +""" +from dataclasses import dataclass +from flytekit import PythonFunctionTask +from typing import Any, Dict, Callable, Optional +from flytekit.configuration import SecretsManager +import requests +import os +import subprocess + +# This would be the main task configuration class for Soda.io +@dataclass +class SodaCheckConfig: + """ + Configuration class for a Soda.io data quality scan task. + + Attributes: + scan_definition (str): Path to the Soda.io scan definition YAML file. + soda_cloud_api_key (Optional[str]): API key for Soda Cloud access, if applicable. + data_source (Optional[str]): Name of the data source in Soda.io to use for the scan. + scan_name (Optional[str]): Name of the scan job for organizational purposes. + """ + scan_definition: str + soda_cloud_api_key: Optional[str] = None + data_source: Optional[str] = None # Name of the data source in Soda.io + scan_name: Optional[str] = "default_scan" # Name for the scan job + +class SodaCheckTask(PythonFunctionTask[SodaCheckConfig]): + """ + A Flyte task that runs a Soda.io data quality scan as defined in the provided configuration. + + This task allows users to execute data quality checks by leveraging the Soda.io API. The scan + configuration includes options for specifying the scan definition file, Soda.io Cloud API key, + and data source. + + Attributes: + _TASK_TYPE (str): The task type identifier for Soda.io checks within Flyte. + """ + _TASK_TYPE = "soda_check_task" + + def __init__(self, task_config: SodaCheckConfig, task_function: Callable, **kwargs): + """ + Initializes the SodaCheckTask with the provided configuration. + + Args: + task_config (SodaCheckConfig): The configuration for the Soda.io scan. + task_function (Callable): The function representing the task logic. + kwargs: Additional keyword arguments. + """ + super().__init__( + task_type=self._TASK_TYPE, + task_config=task_config, + task_function=task_function, + task_type_version=1, + **kwargs, + ) + + def execute(self, **kwargs) -> Dict[str, Any]: + """ + Executes the Soda.io scan using the configuration provided in task_config. + + Returns: + dict: A dictionary containing the results of the Soda.io scan. + """ + # Retrieve the Soda Cloud API key from environment or Flyte's SecretsManager + api_key = ( + self.task_config.soda_cloud_api_key + or os.getenv("SODA_CLOUD_API_KEY") + or SecretsManager.get_secrets("soda", "api_key") + ) + + if not api_key: + raise ValueError("Soda Cloud API key is required but not provided.") + + scan_definition = self.task_config.scan_definition + data_source = self.task_config.data_source + scan_name = self.task_config.scan_name + + # Placeholder for API request to Soda.io + url = "https://api.soda.io/v1/scan" # Replace with actual Soda.io API endpoint + + # Prepare the request payload + payload = { + "scan_definition": scan_definition, + "data_source": data_source, + "scan_name": scan_name, + "api_key": api_key + } + + # Placeholder for API result + result = {} + + # Make the API call (using POST method as an example) + try: + response = requests.post(url, json=payload) + response.raise_for_status() # Raise an error for bad responses (4xx or 5xx) + + # Assuming the API returns a JSON response + result = response.json() + + except requests.exceptions.RequestException as e: + raise RuntimeError(f"API call failed: {e}") + + return {"scan_result": result} \ No newline at end of file diff --git a/plugins/community/flytekit-soda.io/setup.py b/plugins/community/flytekit-soda.io/setup.py new file mode 100644 index 0000000000..094d1d594f --- /dev/null +++ b/plugins/community/flytekit-soda.io/setup.py @@ -0,0 +1,32 @@ +from setuptools import setup + +PLUGIN_NAME = "kfsoda" +microlib_name = f"flytekitplugins-{PLUGIN_NAME}" +plugin_requires = ["flytekit>=1.6.1", "soda-spark", "requests>=2.25.1"] # Update as per Soda.io requirements +__version__ = "0.0.0+develop" + +setup( + name=microlib_name, + version=__version__, + author="Your Name", + author_email="your.email@example.com", + description="Soda.io data quality plugin for Flyte", + namespace_packages=["flytekitplugins"], + packages=[f"flytekitplugins.{PLUGIN_NAME}"], + install_requires=plugin_requires, + license="apache2", + python_requires=">=3.8", + classifiers=[ + "Intended Audience :: Science/Research", + "Intended Audience :: Developers", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Topic :: Scientific/Engineering", + "Topic :: Scientific/Engineering :: Data Quality", + "Topic :: Software Development", + "Topic :: Software Development :: Libraries", + "Topic :: Software Development :: Libraries :: Python Modules", + ], +) \ No newline at end of file diff --git a/plugins/community/flytekit-soda.io/tests/__init__.py b/plugins/community/flytekit-soda.io/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/plugins/community/flytekit-soda.io/tests/test_soda_task.py b/plugins/community/flytekit-soda.io/tests/test_soda_task.py new file mode 100644 index 0000000000..a2a36c9fe1 --- /dev/null +++ b/plugins/community/flytekit-soda.io/tests/test_soda_task.py @@ -0,0 +1,63 @@ +import unittest +from unittest.mock import patch, MagicMock +from typing import NamedTuple, Dict + +from flytekit import task, workflow +from flytekitplugins.soda import SodaTask + +# Define a NamedTuple to represent the expected output from the SodaTask +SodaTaskOutput = NamedTuple("SodaTaskOutput", [("scan_result", Dict[str, any])]) + +# Mock configurations for the SodaTask +MOCK_SCAN_DEFINITION = "mock_scan_definition.yaml" +MOCK_DATA_SOURCE = "mock_data_source" +MOCK_SCAN_NAME = "mock_scan_name" +MOCK_API_KEY = "mock_api_key" +MOCK_RESPONSE = {"scan_result": {"status": "success", "findings": []}} # Example response structure + +# Define a Flyte task to initialize the SodaTask and execute it +@task +def setup_soda_task() -> SodaTaskOutput: + # Initialize the SodaTask with mock parameters + soda_task = SodaTask( + scan_definition=MOCK_SCAN_DEFINITION, + data_source=MOCK_DATA_SOURCE, + scan_name=MOCK_SCAN_NAME, + soda_cloud_api_key=MOCK_API_KEY + ) + + # Execute the task and return the mock response + return soda_task.execute() + +# Define a Flyte workflow to test the setup task +@workflow +def test_soda_workflow() -> SodaTaskOutput: + return setup_soda_task() + +# Define the test class for the SodaTask plugin +class TestSodaTask(unittest.TestCase): + @patch("requests.post") + def test_soda_task_execution(self, mock_post): + # Mock the response to simulate the Soda.io API call + mock_response = MagicMock() + mock_response.json.return_value = MOCK_RESPONSE + mock_response.raise_for_status = MagicMock() + mock_post.return_value = mock_response + + # Run the workflow + result = test_soda_workflow() + + # Assertions to verify expected results + self.assertEqual(result.scan_result, MOCK_RESPONSE["scan_result"]) + mock_post.assert_called_once_with( + "https://api.soda.io/v1/scan", # Replace with actual endpoint + json={ + "scan_definition": MOCK_SCAN_DEFINITION, + "data_source": MOCK_DATA_SOURCE, + "scan_name": MOCK_SCAN_NAME, + "api_key": MOCK_API_KEY + } + ) + +if __name__ == "__main__": + unittest.main() \ No newline at end of file From 527fdf892239db2fb7522c9bbf2cbc0f8101518e Mon Sep 17 00:00:00 2001 From: Shivam Sharma <66767992+10sharmashivam@users.noreply.github.com> Date: Tue, 12 Nov 2024 22:57:42 +0530 Subject: [PATCH 04/10] Delete plugins/flytekit-soda.io/README.md Signed-off-by: 10sharmashivam <10sharmashivam@gmail.com> --- plugins/flytekit-soda.io/README.md | 16 ---------------- 1 file changed, 16 deletions(-) delete mode 100644 plugins/flytekit-soda.io/README.md diff --git a/plugins/flytekit-soda.io/README.md b/plugins/flytekit-soda.io/README.md deleted file mode 100644 index a7e752f0d4..0000000000 --- a/plugins/flytekit-soda.io/README.md +++ /dev/null @@ -1,16 +0,0 @@ -# Soda.io Flyte Plugin - -This plugin integrates Flyte with [Soda.io](https://soda.io/), enabling users to run data quality checks as part of their Flyte workflows. With this plugin, data engineers and analysts can embed data monitoring tasks directly into Flyte, improving data reliability and visibility. - -## Features - -- **Soda.io Integration**: Leverages Soda.io for running data quality scans within Flyte tasks. -- **Configurable Scan Definitions**: Users can define custom scan configurations to monitor data quality based on Soda.io's definitions. -- **API Integration**: Enables configuration of Soda.io credentials and other Soda-specific settings for secure and dynamic data quality checks. - -## Installation - -This plugin is intended to be installed as part of the Flyte ecosystem. To install it, ensure you have Flytekit installed: - -```bash -pip install flytekit \ No newline at end of file From 2f17dec35ea15c25334ff3ee099d0b769c00b6a7 Mon Sep 17 00:00:00 2001 From: Shivam Sharma <66767992+10sharmashivam@users.noreply.github.com> Date: Tue, 12 Nov 2024 22:58:22 +0530 Subject: [PATCH 05/10] Delete plugins/flytekit-soda.io/flytekitplugins/soda/__init__.py Signed-off-by: 10sharmashivam <10sharmashivam@gmail.com> --- .../flytekitplugins/soda/__init__.py | 14 -------------- 1 file changed, 14 deletions(-) delete mode 100644 plugins/flytekit-soda.io/flytekitplugins/soda/__init__.py diff --git a/plugins/flytekit-soda.io/flytekitplugins/soda/__init__.py b/plugins/flytekit-soda.io/flytekitplugins/soda/__init__.py deleted file mode 100644 index c1c48d32b0..0000000000 --- a/plugins/flytekit-soda.io/flytekitplugins/soda/__init__.py +++ /dev/null @@ -1,14 +0,0 @@ -""" -.. currentmodule:: flytekitplugins.kfsoda - -This package contains the Soda.io plugin for Flytekit, allowing Flyte users to run data quality scans. - -.. autosummary:: - :template: custom.rst - :toctree: generated/ - - SodaCheckConfig - SodaCheckTask -""" - -from .task import SodaCheckConfig, SodaCheckTask \ No newline at end of file From bc6ae1f48457fd1d31d1992373640bd01ad2ed5c Mon Sep 17 00:00:00 2001 From: Shivam Sharma <66767992+10sharmashivam@users.noreply.github.com> Date: Tue, 12 Nov 2024 22:58:43 +0530 Subject: [PATCH 06/10] Delete plugins/flytekit-soda.io/tests/test_soda_task.py Signed-off-by: 10sharmashivam <10sharmashivam@gmail.com> --- .../flytekit-soda.io/tests/test_soda_task.py | 63 ------------------- 1 file changed, 63 deletions(-) delete mode 100644 plugins/flytekit-soda.io/tests/test_soda_task.py diff --git a/plugins/flytekit-soda.io/tests/test_soda_task.py b/plugins/flytekit-soda.io/tests/test_soda_task.py deleted file mode 100644 index a2a36c9fe1..0000000000 --- a/plugins/flytekit-soda.io/tests/test_soda_task.py +++ /dev/null @@ -1,63 +0,0 @@ -import unittest -from unittest.mock import patch, MagicMock -from typing import NamedTuple, Dict - -from flytekit import task, workflow -from flytekitplugins.soda import SodaTask - -# Define a NamedTuple to represent the expected output from the SodaTask -SodaTaskOutput = NamedTuple("SodaTaskOutput", [("scan_result", Dict[str, any])]) - -# Mock configurations for the SodaTask -MOCK_SCAN_DEFINITION = "mock_scan_definition.yaml" -MOCK_DATA_SOURCE = "mock_data_source" -MOCK_SCAN_NAME = "mock_scan_name" -MOCK_API_KEY = "mock_api_key" -MOCK_RESPONSE = {"scan_result": {"status": "success", "findings": []}} # Example response structure - -# Define a Flyte task to initialize the SodaTask and execute it -@task -def setup_soda_task() -> SodaTaskOutput: - # Initialize the SodaTask with mock parameters - soda_task = SodaTask( - scan_definition=MOCK_SCAN_DEFINITION, - data_source=MOCK_DATA_SOURCE, - scan_name=MOCK_SCAN_NAME, - soda_cloud_api_key=MOCK_API_KEY - ) - - # Execute the task and return the mock response - return soda_task.execute() - -# Define a Flyte workflow to test the setup task -@workflow -def test_soda_workflow() -> SodaTaskOutput: - return setup_soda_task() - -# Define the test class for the SodaTask plugin -class TestSodaTask(unittest.TestCase): - @patch("requests.post") - def test_soda_task_execution(self, mock_post): - # Mock the response to simulate the Soda.io API call - mock_response = MagicMock() - mock_response.json.return_value = MOCK_RESPONSE - mock_response.raise_for_status = MagicMock() - mock_post.return_value = mock_response - - # Run the workflow - result = test_soda_workflow() - - # Assertions to verify expected results - self.assertEqual(result.scan_result, MOCK_RESPONSE["scan_result"]) - mock_post.assert_called_once_with( - "https://api.soda.io/v1/scan", # Replace with actual endpoint - json={ - "scan_definition": MOCK_SCAN_DEFINITION, - "data_source": MOCK_DATA_SOURCE, - "scan_name": MOCK_SCAN_NAME, - "api_key": MOCK_API_KEY - } - ) - -if __name__ == "__main__": - unittest.main() \ No newline at end of file From e2df2f2cea77fa738a71c78bf65dbbeb2cf056e1 Mon Sep 17 00:00:00 2001 From: Shivam Sharma <66767992+10sharmashivam@users.noreply.github.com> Date: Tue, 12 Nov 2024 22:59:03 +0530 Subject: [PATCH 07/10] Delete plugins/flytekit-soda.io/tests/__init__.py Signed-off-by: 10sharmashivam <10sharmashivam@gmail.com> --- plugins/flytekit-soda.io/tests/__init__.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 plugins/flytekit-soda.io/tests/__init__.py diff --git a/plugins/flytekit-soda.io/tests/__init__.py b/plugins/flytekit-soda.io/tests/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 From 9812ad3d12028be82aac056333a4a94de65d5127 Mon Sep 17 00:00:00 2001 From: Shivam Sharma <66767992+10sharmashivam@users.noreply.github.com> Date: Tue, 12 Nov 2024 22:59:30 +0530 Subject: [PATCH 08/10] Delete plugins/flytekit-soda.io/setup.py Signed-off-by: 10sharmashivam <10sharmashivam@gmail.com> --- plugins/flytekit-soda.io/setup.py | 32 ------------------------------- 1 file changed, 32 deletions(-) delete mode 100644 plugins/flytekit-soda.io/setup.py diff --git a/plugins/flytekit-soda.io/setup.py b/plugins/flytekit-soda.io/setup.py deleted file mode 100644 index 094d1d594f..0000000000 --- a/plugins/flytekit-soda.io/setup.py +++ /dev/null @@ -1,32 +0,0 @@ -from setuptools import setup - -PLUGIN_NAME = "kfsoda" -microlib_name = f"flytekitplugins-{PLUGIN_NAME}" -plugin_requires = ["flytekit>=1.6.1", "soda-spark", "requests>=2.25.1"] # Update as per Soda.io requirements -__version__ = "0.0.0+develop" - -setup( - name=microlib_name, - version=__version__, - author="Your Name", - author_email="your.email@example.com", - description="Soda.io data quality plugin for Flyte", - namespace_packages=["flytekitplugins"], - packages=[f"flytekitplugins.{PLUGIN_NAME}"], - install_requires=plugin_requires, - license="apache2", - python_requires=">=3.8", - classifiers=[ - "Intended Audience :: Science/Research", - "Intended Audience :: Developers", - "License :: OSI Approved :: Apache Software License", - "Programming Language :: Python :: 3.8", - "Programming Language :: Python :: 3.9", - "Programming Language :: Python :: 3.10", - "Topic :: Scientific/Engineering", - "Topic :: Scientific/Engineering :: Data Quality", - "Topic :: Software Development", - "Topic :: Software Development :: Libraries", - "Topic :: Software Development :: Libraries :: Python Modules", - ], -) \ No newline at end of file From 9a2d2d46eb72371d734577519aee025e0586f14c Mon Sep 17 00:00:00 2001 From: Shivam Sharma <66767992+10sharmashivam@users.noreply.github.com> Date: Tue, 12 Nov 2024 22:59:52 +0530 Subject: [PATCH 09/10] Delete plugins/flytekit-soda.io/flytekitplugins/soda/task.py Signed-off-by: 10sharmashivam <10sharmashivam@gmail.com> --- .../flytekitplugins/soda/task.py | 110 ------------------ 1 file changed, 110 deletions(-) delete mode 100644 plugins/flytekit-soda.io/flytekitplugins/soda/task.py diff --git a/plugins/flytekit-soda.io/flytekitplugins/soda/task.py b/plugins/flytekit-soda.io/flytekitplugins/soda/task.py deleted file mode 100644 index a2f27090db..0000000000 --- a/plugins/flytekit-soda.io/flytekitplugins/soda/task.py +++ /dev/null @@ -1,110 +0,0 @@ -""" -This Plugin integrates Soda.io data quality checks with Flyte, allowing users to run and manage -data quality scans as part of Flyte workflows. It leverages Soda.io's scanning capabilities to monitor -data quality by defining customizable scan configurations. - -This plugin allows setting various parameters like scan definition files, data sources, and Soda Cloud -API credentials to run these scans in an automated fashion within Flyte. -""" -from dataclasses import dataclass -from flytekit import PythonFunctionTask -from typing import Any, Dict, Callable, Optional -from flytekit.configuration import SecretsManager -import requests -import os -import subprocess - -# This would be the main task configuration class for Soda.io -@dataclass -class SodaCheckConfig: - """ - Configuration class for a Soda.io data quality scan task. - - Attributes: - scan_definition (str): Path to the Soda.io scan definition YAML file. - soda_cloud_api_key (Optional[str]): API key for Soda Cloud access, if applicable. - data_source (Optional[str]): Name of the data source in Soda.io to use for the scan. - scan_name (Optional[str]): Name of the scan job for organizational purposes. - """ - scan_definition: str - soda_cloud_api_key: Optional[str] = None - data_source: Optional[str] = None # Name of the data source in Soda.io - scan_name: Optional[str] = "default_scan" # Name for the scan job - -class SodaCheckTask(PythonFunctionTask[SodaCheckConfig]): - """ - A Flyte task that runs a Soda.io data quality scan as defined in the provided configuration. - - This task allows users to execute data quality checks by leveraging the Soda.io API. The scan - configuration includes options for specifying the scan definition file, Soda.io Cloud API key, - and data source. - - Attributes: - _TASK_TYPE (str): The task type identifier for Soda.io checks within Flyte. - """ - _TASK_TYPE = "soda_check_task" - - def __init__(self, task_config: SodaCheckConfig, task_function: Callable, **kwargs): - """ - Initializes the SodaCheckTask with the provided configuration. - - Args: - task_config (SodaCheckConfig): The configuration for the Soda.io scan. - task_function (Callable): The function representing the task logic. - kwargs: Additional keyword arguments. - """ - super().__init__( - task_type=self._TASK_TYPE, - task_config=task_config, - task_function=task_function, - task_type_version=1, - **kwargs, - ) - - def execute(self, **kwargs) -> Dict[str, Any]: - """ - Executes the Soda.io scan using the configuration provided in task_config. - - Returns: - dict: A dictionary containing the results of the Soda.io scan. - """ - # Retrieve the Soda Cloud API key from environment or Flyte's SecretsManager - api_key = ( - self.task_config.soda_cloud_api_key - or os.getenv("SODA_CLOUD_API_KEY") - or SecretsManager.get_secrets("soda", "api_key") - ) - - if not api_key: - raise ValueError("Soda Cloud API key is required but not provided.") - - scan_definition = self.task_config.scan_definition - data_source = self.task_config.data_source - scan_name = self.task_config.scan_name - - # Placeholder for API request to Soda.io - url = "https://api.soda.io/v1/scan" # Replace with actual Soda.io API endpoint - - # Prepare the request payload - payload = { - "scan_definition": scan_definition, - "data_source": data_source, - "scan_name": scan_name, - "api_key": api_key - } - - # Placeholder for API result - result = {} - - # Make the API call (using POST method as an example) - try: - response = requests.post(url, json=payload) - response.raise_for_status() # Raise an error for bad responses (4xx or 5xx) - - # Assuming the API returns a JSON response - result = response.json() - - except requests.exceptions.RequestException as e: - raise RuntimeError(f"API call failed: {e}") - - return {"scan_result": result} \ No newline at end of file From 41f4f586aed0edac6c191ad2de345bc32b340092 Mon Sep 17 00:00:00 2001 From: 10sharmashivam <10sharmashivam@gmail.com> Date: Tue, 12 Nov 2024 23:09:13 +0530 Subject: [PATCH 10/10] Linting issues corrected Signed-off-by: 10sharmashivam <10sharmashivam@gmail.com> --- .../flytekitplugins/soda/__init__.py | 2 +- .../flytekitplugins/soda/task.py | 23 ++++++++++++------- plugins/community/flytekit-soda.io/setup.py | 8 +++++-- .../flytekit-soda.io/tests/test_soda_task.py | 23 ++++++++++++------- 4 files changed, 37 insertions(+), 19 deletions(-) diff --git a/plugins/community/flytekit-soda.io/flytekitplugins/soda/__init__.py b/plugins/community/flytekit-soda.io/flytekitplugins/soda/__init__.py index c1c48d32b0..0aa38a555e 100644 --- a/plugins/community/flytekit-soda.io/flytekitplugins/soda/__init__.py +++ b/plugins/community/flytekit-soda.io/flytekitplugins/soda/__init__.py @@ -11,4 +11,4 @@ SodaCheckTask """ -from .task import SodaCheckConfig, SodaCheckTask \ No newline at end of file +from .task import SodaCheckConfig, SodaCheckTask diff --git a/plugins/community/flytekit-soda.io/flytekitplugins/soda/task.py b/plugins/community/flytekit-soda.io/flytekitplugins/soda/task.py index a2f27090db..bd593ca9e8 100644 --- a/plugins/community/flytekit-soda.io/flytekitplugins/soda/task.py +++ b/plugins/community/flytekit-soda.io/flytekitplugins/soda/task.py @@ -6,13 +6,17 @@ This plugin allows setting various parameters like scan definition files, data sources, and Soda Cloud API credentials to run these scans in an automated fashion within Flyte. """ + +import os +import subprocess from dataclasses import dataclass +from typing import Any, Callable, Dict, Optional + +import requests + from flytekit import PythonFunctionTask -from typing import Any, Dict, Callable, Optional from flytekit.configuration import SecretsManager -import requests -import os -import subprocess + # This would be the main task configuration class for Soda.io @dataclass @@ -26,11 +30,13 @@ class SodaCheckConfig: data_source (Optional[str]): Name of the data source in Soda.io to use for the scan. scan_name (Optional[str]): Name of the scan job for organizational purposes. """ + scan_definition: str soda_cloud_api_key: Optional[str] = None data_source: Optional[str] = None # Name of the data source in Soda.io scan_name: Optional[str] = "default_scan" # Name for the scan job + class SodaCheckTask(PythonFunctionTask[SodaCheckConfig]): """ A Flyte task that runs a Soda.io data quality scan as defined in the provided configuration. @@ -42,6 +48,7 @@ class SodaCheckTask(PythonFunctionTask[SodaCheckConfig]): Attributes: _TASK_TYPE (str): The task type identifier for Soda.io checks within Flyte. """ + _TASK_TYPE = "soda_check_task" def __init__(self, task_config: SodaCheckConfig, task_function: Callable, **kwargs): @@ -90,9 +97,9 @@ def execute(self, **kwargs) -> Dict[str, Any]: "scan_definition": scan_definition, "data_source": data_source, "scan_name": scan_name, - "api_key": api_key + "api_key": api_key, } - + # Placeholder for API result result = {} @@ -100,11 +107,11 @@ def execute(self, **kwargs) -> Dict[str, Any]: try: response = requests.post(url, json=payload) response.raise_for_status() # Raise an error for bad responses (4xx or 5xx) - + # Assuming the API returns a JSON response result = response.json() except requests.exceptions.RequestException as e: raise RuntimeError(f"API call failed: {e}") - return {"scan_result": result} \ No newline at end of file + return {"scan_result": result} diff --git a/plugins/community/flytekit-soda.io/setup.py b/plugins/community/flytekit-soda.io/setup.py index 094d1d594f..c36b66c94b 100644 --- a/plugins/community/flytekit-soda.io/setup.py +++ b/plugins/community/flytekit-soda.io/setup.py @@ -2,7 +2,11 @@ PLUGIN_NAME = "kfsoda" microlib_name = f"flytekitplugins-{PLUGIN_NAME}" -plugin_requires = ["flytekit>=1.6.1", "soda-spark", "requests>=2.25.1"] # Update as per Soda.io requirements +plugin_requires = [ + "flytekit>=1.6.1", + "soda-spark", + "requests>=2.25.1", +] # Update as per Soda.io requirements __version__ = "0.0.0+develop" setup( @@ -29,4 +33,4 @@ "Topic :: Software Development :: Libraries", "Topic :: Software Development :: Libraries :: Python Modules", ], -) \ No newline at end of file +) diff --git a/plugins/community/flytekit-soda.io/tests/test_soda_task.py b/plugins/community/flytekit-soda.io/tests/test_soda_task.py index a2a36c9fe1..c3365e49e8 100644 --- a/plugins/community/flytekit-soda.io/tests/test_soda_task.py +++ b/plugins/community/flytekit-soda.io/tests/test_soda_task.py @@ -1,10 +1,11 @@ import unittest -from unittest.mock import patch, MagicMock -from typing import NamedTuple, Dict +from typing import Dict, NamedTuple +from unittest.mock import MagicMock, patch -from flytekit import task, workflow from flytekitplugins.soda import SodaTask +from flytekit import task, workflow + # Define a NamedTuple to represent the expected output from the SodaTask SodaTaskOutput = NamedTuple("SodaTaskOutput", [("scan_result", Dict[str, any])]) @@ -13,7 +14,10 @@ MOCK_DATA_SOURCE = "mock_data_source" MOCK_SCAN_NAME = "mock_scan_name" MOCK_API_KEY = "mock_api_key" -MOCK_RESPONSE = {"scan_result": {"status": "success", "findings": []}} # Example response structure +MOCK_RESPONSE = { + "scan_result": {"status": "success", "findings": []} +} # Example response structure + # Define a Flyte task to initialize the SodaTask and execute it @task @@ -23,17 +27,19 @@ def setup_soda_task() -> SodaTaskOutput: scan_definition=MOCK_SCAN_DEFINITION, data_source=MOCK_DATA_SOURCE, scan_name=MOCK_SCAN_NAME, - soda_cloud_api_key=MOCK_API_KEY + soda_cloud_api_key=MOCK_API_KEY, ) # Execute the task and return the mock response return soda_task.execute() + # Define a Flyte workflow to test the setup task @workflow def test_soda_workflow() -> SodaTaskOutput: return setup_soda_task() + # Define the test class for the SodaTask plugin class TestSodaTask(unittest.TestCase): @patch("requests.post") @@ -55,9 +61,10 @@ def test_soda_task_execution(self, mock_post): "scan_definition": MOCK_SCAN_DEFINITION, "data_source": MOCK_DATA_SOURCE, "scan_name": MOCK_SCAN_NAME, - "api_key": MOCK_API_KEY - } + "api_key": MOCK_API_KEY, + }, ) + if __name__ == "__main__": - unittest.main() \ No newline at end of file + unittest.main()