Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core feature] Integrate with Soda.io #2884

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
16 changes: 16 additions & 0 deletions plugins/community/flytekit-soda.io/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Soda.io Flyte Plugin

This plugin integrates Flyte with [Soda.io](https://soda.io/), enabling users to run data quality checks as part of their Flyte workflows. With this plugin, data engineers and analysts can embed data monitoring tasks directly into Flyte, improving data reliability and visibility.

## Features

- **Soda.io Integration**: Leverages Soda.io for running data quality scans within Flyte tasks.
- **Configurable Scan Definitions**: Users can define custom scan configurations to monitor data quality based on Soda.io's definitions.
- **API Integration**: Enables configuration of Soda.io credentials and other Soda-specific settings for secure and dynamic data quality checks.

## Installation

This plugin is intended to be installed as part of the Flyte ecosystem. To install it, ensure you have Flytekit installed:

```bash
pip install flytekit
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
"""
.. currentmodule:: flytekitplugins.kfsoda

This package contains the Soda.io plugin for Flytekit, allowing Flyte users to run data quality scans.

.. autosummary::
:template: custom.rst
:toctree: generated/

SodaCheckConfig
SodaCheckTask
"""

from .task import SodaCheckConfig, SodaCheckTask
117 changes: 117 additions & 0 deletions plugins/community/flytekit-soda.io/flytekitplugins/soda/task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
"""
This Plugin integrates Soda.io data quality checks with Flyte, allowing users to run and manage
data quality scans as part of Flyte workflows. It leverages Soda.io's scanning capabilities to monitor
data quality by defining customizable scan configurations.

This plugin allows setting various parameters like scan definition files, data sources, and Soda Cloud
API credentials to run these scans in an automated fashion within Flyte.
"""

import os
import subprocess
from dataclasses import dataclass
from typing import Any, Callable, Dict, Optional

import requests

from flytekit import PythonFunctionTask
from flytekit.configuration import SecretsManager


# This would be the main task configuration class for Soda.io
@dataclass
class SodaCheckConfig:
"""
Configuration class for a Soda.io data quality scan task.

Attributes:
scan_definition (str): Path to the Soda.io scan definition YAML file.
soda_cloud_api_key (Optional[str]): API key for Soda Cloud access, if applicable.
data_source (Optional[str]): Name of the data source in Soda.io to use for the scan.
scan_name (Optional[str]): Name of the scan job for organizational purposes.
"""

scan_definition: str
soda_cloud_api_key: Optional[str] = None
data_source: Optional[str] = None # Name of the data source in Soda.io
scan_name: Optional[str] = "default_scan" # Name for the scan job


class SodaCheckTask(PythonFunctionTask[SodaCheckConfig]):
"""
A Flyte task that runs a Soda.io data quality scan as defined in the provided configuration.

This task allows users to execute data quality checks by leveraging the Soda.io API. The scan
configuration includes options for specifying the scan definition file, Soda.io Cloud API key,
and data source.

Attributes:
_TASK_TYPE (str): The task type identifier for Soda.io checks within Flyte.
"""

_TASK_TYPE = "soda_check_task"

def __init__(self, task_config: SodaCheckConfig, task_function: Callable, **kwargs):
"""
Initializes the SodaCheckTask with the provided configuration.

Args:
task_config (SodaCheckConfig): The configuration for the Soda.io scan.
task_function (Callable): The function representing the task logic.
kwargs: Additional keyword arguments.
"""
super().__init__(
task_type=self._TASK_TYPE,
task_config=task_config,
task_function=task_function,
task_type_version=1,
**kwargs,
)

def execute(self, **kwargs) -> Dict[str, Any]:
"""
Executes the Soda.io scan using the configuration provided in task_config.

Returns:
dict: A dictionary containing the results of the Soda.io scan.
"""
# Retrieve the Soda Cloud API key from environment or Flyte's SecretsManager
api_key = (
self.task_config.soda_cloud_api_key
or os.getenv("SODA_CLOUD_API_KEY")
or SecretsManager.get_secrets("soda", "api_key")
)

if not api_key:
raise ValueError("Soda Cloud API key is required but not provided.")

scan_definition = self.task_config.scan_definition
data_source = self.task_config.data_source
scan_name = self.task_config.scan_name

# Placeholder for API request to Soda.io
url = "https://api.soda.io/v1/scan" # Replace with actual Soda.io API endpoint

# Prepare the request payload
payload = {
"scan_definition": scan_definition,
"data_source": data_source,
"scan_name": scan_name,
"api_key": api_key,
}

# Placeholder for API result
result = {}

# Make the API call (using POST method as an example)
try:
response = requests.post(url, json=payload)
response.raise_for_status() # Raise an error for bad responses (4xx or 5xx)

# Assuming the API returns a JSON response
result = response.json()

except requests.exceptions.RequestException as e:
raise RuntimeError(f"API call failed: {e}")

return {"scan_result": result}
36 changes: 36 additions & 0 deletions plugins/community/flytekit-soda.io/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from setuptools import setup

PLUGIN_NAME = "kfsoda"
microlib_name = f"flytekitplugins-{PLUGIN_NAME}"
plugin_requires = [
"flytekit>=1.6.1",
"soda-spark",
"requests>=2.25.1",
] # Update as per Soda.io requirements
__version__ = "0.0.0+develop"

setup(
name=microlib_name,
version=__version__,
author="Your Name",
author_email="[email protected]",
description="Soda.io data quality plugin for Flyte",
namespace_packages=["flytekitplugins"],
packages=[f"flytekitplugins.{PLUGIN_NAME}"],
install_requires=plugin_requires,
license="apache2",
python_requires=">=3.8",
classifiers=[
"Intended Audience :: Science/Research",
"Intended Audience :: Developers",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Topic :: Scientific/Engineering",
"Topic :: Scientific/Engineering :: Data Quality",
"Topic :: Software Development",
"Topic :: Software Development :: Libraries",
"Topic :: Software Development :: Libraries :: Python Modules",
],
)
Empty file.
70 changes: 70 additions & 0 deletions plugins/community/flytekit-soda.io/tests/test_soda_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import unittest
from typing import Dict, NamedTuple
from unittest.mock import MagicMock, patch

from flytekitplugins.soda import SodaTask

from flytekit import task, workflow

# Define a NamedTuple to represent the expected output from the SodaTask
SodaTaskOutput = NamedTuple("SodaTaskOutput", [("scan_result", Dict[str, any])])

# Mock configurations for the SodaTask
MOCK_SCAN_DEFINITION = "mock_scan_definition.yaml"
MOCK_DATA_SOURCE = "mock_data_source"
MOCK_SCAN_NAME = "mock_scan_name"
MOCK_API_KEY = "mock_api_key"
MOCK_RESPONSE = {
"scan_result": {"status": "success", "findings": []}
} # Example response structure


# Define a Flyte task to initialize the SodaTask and execute it
@task
def setup_soda_task() -> SodaTaskOutput:
# Initialize the SodaTask with mock parameters
soda_task = SodaTask(
scan_definition=MOCK_SCAN_DEFINITION,
data_source=MOCK_DATA_SOURCE,
scan_name=MOCK_SCAN_NAME,
soda_cloud_api_key=MOCK_API_KEY,
)

# Execute the task and return the mock response
return soda_task.execute()


# Define a Flyte workflow to test the setup task
@workflow
def test_soda_workflow() -> SodaTaskOutput:
return setup_soda_task()


# Define the test class for the SodaTask plugin
class TestSodaTask(unittest.TestCase):
@patch("requests.post")
def test_soda_task_execution(self, mock_post):
# Mock the response to simulate the Soda.io API call
mock_response = MagicMock()
mock_response.json.return_value = MOCK_RESPONSE
mock_response.raise_for_status = MagicMock()
mock_post.return_value = mock_response

# Run the workflow
result = test_soda_workflow()

# Assertions to verify expected results
self.assertEqual(result.scan_result, MOCK_RESPONSE["scan_result"])
mock_post.assert_called_once_with(
"https://api.soda.io/v1/scan", # Replace with actual endpoint
json={
"scan_definition": MOCK_SCAN_DEFINITION,
"data_source": MOCK_DATA_SOURCE,
"scan_name": MOCK_SCAN_NAME,
"api_key": MOCK_API_KEY,
},
)


if __name__ == "__main__":
unittest.main()