Skip to content

Commit

Permalink
Add an integration test for Artifact API
Browse files Browse the repository at this point in the history
  • Loading branch information
DharmitD committed Feb 4, 2025
1 parent 22529bb commit 67421a0
Show file tree
Hide file tree
Showing 4 changed files with 603 additions and 1 deletion.
179 changes: 179 additions & 0 deletions tests/artifacts_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
//go:build test_integration

/*
Copyright 2024.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package integration

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"net/url"
"os/exec"
"testing"
"time"

TestUtil "github.com/opendatahub-io/data-science-pipelines-operator/tests/util"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func (suite *IntegrationTestSuite) TestFetchArtifacts() {
suite.T().Run("Should successfully fetch and download artifacts", func(t *testing.T) {
log.Printf("Starting TestFetchArtifacts in namespace: %s", suite.DSPANamespace)

// Start port-forwarding
log.Println("Starting port-forwarding for artifact-service...")
cmd := exec.CommandContext(context.Background(),
"kubectl", "port-forward", "-n", suite.DSPANamespace, "svc/ml-pipeline", fmt.Sprintf("%d:8888", DefaultPortforwardLocalPort))
err := cmd.Start()
require.NoError(t, err, "Failed to start port-forwarding")

minioServiceName := fmt.Sprintf("minio-%s", suite.DSPANamespace)

cmd2 := exec.CommandContext(context.Background(),
"kubectl", "port-forward", "-n", suite.DSPANamespace, fmt.Sprintf("svc/%s", minioServiceName), fmt.Sprintf("%d:9000", 9000))
err = cmd2.Start()
require.NoError(t, err, "Failed to start port-forwarding")

defer func() {
log.Println("Terminating port-forwarding process...")
_ = cmd.Process.Kill()
cmd.Wait()
_ = cmd2.Process.Kill()
cmd2.Wait()
}()

// Wait briefly to ensure port-forwarding is established
time.Sleep(15 * time.Second)
log.Printf("Port-forwarding started on localhost:%d", DefaultPortforwardLocalPort)

type ResponseArtifact struct {
ArtifactID string `json:"artifact_id"`
DownloadUrl string `json:"download_url"`
ArtifactType string `json:"artifact_type"`
}
type ResponseArtifactData struct {
Artifacts []ResponseArtifact `json:"artifacts"`
}

name := "Test Iris Pipeline"
uploadUrl := fmt.Sprintf("%s/apis/v2beta1/pipelines/upload?name=%s", APIServerURL, url.QueryEscape(name))
log.Printf("Uploading pipeline: %s to URL: %s", name, uploadUrl)

vals := map[string]string{
"uploadfile": "@resources/iris_pipeline_without_cache_compiled.yaml",
}
bodyUpload, contentTypeUpload := TestUtil.FormFromFile(t, vals)
response, err := suite.Clientmgr.httpClient.Post(uploadUrl, contentTypeUpload, bodyUpload)
require.NoError(t, err, "Failed to upload pipeline")
responseData, err := io.ReadAll(response.Body)
require.NoError(t, err, "Failed to read response data")
assert.Equal(t, http.StatusOK, response.StatusCode, "Unexpected HTTP status code")
log.Println("Pipeline uploaded successfully.")

// Retrieve Pipeline ID
log.Println("Retrieving Pipeline ID...")
pipelineID, err := TestUtil.RetrievePipelineId(t, suite.Clientmgr.httpClient, APIServerURL, name)
require.NoError(t, err, "Failed to retrieve Pipeline ID")
log.Printf("Pipeline ID: %s", pipelineID)

// Create a new run
log.Println("Creating a new pipeline run...")
runUrl := fmt.Sprintf("%s/apis/v2beta1/runs", APIServerURL)
bodyRun := TestUtil.FormatRequestBody(t, pipelineID, name)
contentTypeRun := "application/json"
response, err = suite.Clientmgr.httpClient.Post(runUrl, contentTypeRun, bytes.NewReader(bodyRun))
require.NoError(t, err, "Failed to create pipeline run")
responseData, err = io.ReadAll(response.Body)
require.NoError(t, err, "Failed to read run response data")
require.Equal(t, http.StatusOK, response.StatusCode, "Unexpected HTTP status code")
log.Println("Pipeline run created successfully.")

err = TestUtil.WaitForPipelineRunCompletion(t, suite.Clientmgr.httpClient, APIServerURL)
require.NoError(t, err, "Pipeline run did not complete successfully")

// Fetch artifacts from API
artifactsUrl := fmt.Sprintf("%s/apis/v2beta1/artifacts?namespace=%s", APIServerURL, suite.DSPANamespace)
log.Printf("Fetching artifacts from URL: %s", artifactsUrl)
response, err = suite.Clientmgr.httpClient.Get(artifactsUrl)
require.NoError(t, err, "Failed to fetch artifacts")
responseData, err = io.ReadAll(response.Body)
require.NoError(t, err, "Failed to read artifacts response data")
assert.Equal(t, http.StatusOK, response.StatusCode, "Unexpected HTTP status code")

// Parse the artifact list
var responseArtifactsData struct {
Artifacts []ResponseArtifact `json:"artifacts"`
}
err = json.Unmarshal(responseData, &responseArtifactsData)
require.NoError(t, err, "Failed to parse artifacts response JSON")

hasDownloadError := false

for _, artifact := range responseArtifactsData.Artifacts {
if artifact.ArtifactType != "system.Model" {
log.Printf("Skipping artifact ID: %s, as it is not a system.Model", artifact.ArtifactID)
continue
}

// Fetch download URL
artifactsByIdUrl := fmt.Sprintf("%s/apis/v2beta1/artifacts/%s?view=DOWNLOAD", APIServerURL, artifact.ArtifactID)
log.Printf("Fetching download URL for artifact from: %s", artifactsByIdUrl)
response, err = suite.Clientmgr.httpClient.Get(artifactsByIdUrl)
require.NoError(t, err, "Failed to fetch download URL")
responseData, err = io.ReadAll(response.Body)
require.NoError(t, err, "Failed to read download URL response data")
assert.Equal(t, http.StatusOK, response.StatusCode, "Unexpected HTTP status code")

var artifactWithDownload ResponseArtifact
err = json.Unmarshal(responseData, &artifactWithDownload)
require.NoError(t, err, "Failed to parse download URL response JSON")

// Modify the URL for local port-forwarding
parsedURL, err := url.Parse(artifactWithDownload.DownloadUrl)
require.NoError(t, err, "Failed to parse artifact download URL")

originalHost := parsedURL.Host
parsedURL.Host = "127.0.0.1:9000"

log.Printf("Trying download URL: %s", parsedURL.String())

// Create and send the request with correct Host header
req, err := http.NewRequest("GET", parsedURL.String(), nil)
require.NoError(t, err, "Failed to create request")

req.Host = originalHost

downloadResp, err := suite.Clientmgr.httpClient.Do(req)
require.NoError(t, err, "Failed to perform request")
assert.Equal(t, http.StatusOK, downloadResp.StatusCode, "Download failed")

log.Printf("Successfully downloaded artifact from %s", req.URL.String())
}

if hasDownloadError {
t.Errorf("Error downloading artifacts. Check logs for details.")
}

log.Println("TestFetchArtifacts completed successfully.")
})
}
159 changes: 159 additions & 0 deletions tests/resources/iris_pipeline_without_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
from kfp import compiler, dsl
from kfp.dsl import ClassificationMetrics, Dataset, Input, Model, Output

common_base_image = (
"registry.access.redhat.com/ubi8/python-39@sha256:3523b184212e1f2243e76d8094ab52b01ea3015471471290d011625e1763af61"
)
# common_base_image = "quay.io/opendatahub/ds-pipelines-sample-base:v1.0"


@dsl.component(base_image=common_base_image, packages_to_install=["pandas==2.2.0"])
def create_dataset(iris_dataset: Output[Dataset]):
from io import StringIO # noqa: PLC0415

import pandas as pd # noqa: PLC0415

data = """
5.1,3.5,1.4,0.2,Iris-setosa
4.9,3.0,1.4,0.2,Iris-setosa
4.7,3.2,1.3,0.2,Iris-setosa
4.6,3.1,1.5,0.2,Iris-setosa
5.0,3.6,1.4,0.2,Iris-setosa
5.7,3.8,1.7,0.3,Iris-setosa
5.1,3.8,1.5,0.3,Iris-setosa
5.4,3.4,1.7,0.2,Iris-setosa
5.1,3.7,1.5,0.4,Iris-setosa
5.1,3.4,1.5,0.2,Iris-setosa
5.0,3.5,1.3,0.3,Iris-setosa
4.5,2.3,1.3,0.3,Iris-setosa
4.4,3.2,1.3,0.2,Iris-setosa
5.0,3.5,1.6,0.6,Iris-setosa
5.1,3.8,1.9,0.4,Iris-setosa
4.8,3.0,1.4,0.3,Iris-setosa
5.1,3.8,1.6,0.2,Iris-setosa
4.6,3.2,1.4,0.2,Iris-setosa
5.3,3.7,1.5,0.2,Iris-setosa
5.0,3.3,1.4,0.2,Iris-setosa
7.0,3.2,4.7,1.4,Iris-versicolor
6.4,3.2,4.5,1.5,Iris-versicolor
6.9,3.1,4.9,1.5,Iris-versicolor
5.5,2.3,4.0,1.3,Iris-versicolor
6.5,2.8,4.6,1.5,Iris-versicolor
6.2,2.2,4.5,1.5,Iris-versicolor
5.6,2.5,3.9,1.1,Iris-versicolor
5.9,3.2,4.8,1.8,Iris-versicolor
6.1,2.8,4.0,1.3,Iris-versicolor
6.3,2.5,4.9,1.5,Iris-versicolor
6.1,2.8,4.7,1.2,Iris-versicolor
6.4,2.9,4.3,1.3,Iris-versicolor
6.6,3.0,4.4,1.4,Iris-versicolor
5.6,2.7,4.2,1.3,Iris-versicolor
5.7,3.0,4.2,1.2,Iris-versicolor
5.7,2.9,4.2,1.3,Iris-versicolor
6.2,2.9,4.3,1.3,Iris-versicolor
5.1,2.5,3.0,1.1,Iris-versicolor
5.7,2.8,4.1,1.3,Iris-versicolor
6.3,3.3,6.0,2.5,Iris-virginica
5.8,2.7,5.1,1.9,Iris-virginica
7.1,3.0,5.9,2.1,Iris-virginica
6.3,2.9,5.6,1.8,Iris-virginica
6.5,3.0,5.8,2.2,Iris-virginica
6.9,3.1,5.1,2.3,Iris-virginica
5.8,2.7,5.1,1.9,Iris-virginica
6.8,3.2,5.9,2.3,Iris-virginica
6.7,3.3,5.7,2.5,Iris-virginica
6.7,3.0,5.2,2.3,Iris-virginica
6.3,2.5,5.0,1.9,Iris-virginica
6.5,3.0,5.2,2.0,Iris-virginica
6.2,3.4,5.4,2.3,Iris-virginica
5.9,3.0,5.1,1.8,Iris-virginica
"""
col_names = ["Sepal_Length", "Sepal_Width", "Petal_Length", "Petal_Width", "Labels"]
df = pd.read_csv(StringIO(data), names=col_names)

with open(iris_dataset.path, "w") as f:
df.to_csv(f)


@dsl.component(
base_image=common_base_image,
packages_to_install=["pandas==2.2.0", "scikit-learn==1.4.0"],
)
def normalize_dataset(
input_iris_dataset: Input[Dataset],
normalized_iris_dataset: Output[Dataset],
standard_scaler: bool,
):
import pandas as pd # noqa: PLC0415
from sklearn.preprocessing import MinMaxScaler, StandardScaler # noqa: PLC0415

with open(input_iris_dataset.path) as f:
df = pd.read_csv(f)
labels = df.pop("Labels")

scaler = StandardScaler() if standard_scaler else MinMaxScaler()

df = pd.DataFrame(scaler.fit_transform(df))
df["Labels"] = labels
normalized_iris_dataset.metadata["state"] = "Normalized"
with open(normalized_iris_dataset.path, "w") as f:
df.to_csv(f)


@dsl.component(
base_image=common_base_image,
packages_to_install=["pandas==2.2.0", "scikit-learn==1.4.0"],
)
def train_model(
normalized_iris_dataset: Input[Dataset],
model: Output[Model],
metrics: Output[ClassificationMetrics],
n_neighbors: int,
):
import pickle # noqa: PLC0415

import pandas as pd # noqa: PLC0415
from sklearn.metrics import confusion_matrix # noqa: PLC0415
from sklearn.model_selection import cross_val_predict, train_test_split # noqa: PLC0415
from sklearn.neighbors import KNeighborsClassifier # noqa: PLC0415

with open(normalized_iris_dataset.path) as f:
df = pd.read_csv(f)

y = df.pop("Labels")
X = df

X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0) # noqa: F841

clf = KNeighborsClassifier(n_neighbors=n_neighbors)
clf.fit(X_train, y_train)

predictions = cross_val_predict(clf, X_train, y_train, cv=3)
metrics.log_confusion_matrix(
["Iris-Setosa", "Iris-Versicolour", "Iris-Virginica"],
confusion_matrix(y_train, predictions).tolist(), # .tolist() to convert np array to list.
)

model.metadata["framework"] = "scikit-learn"
with open(model.path, "wb") as f:
pickle.dump(clf, f)


@dsl.pipeline(name="iris-training-pipeline")
def my_pipeline(
standard_scaler: bool = True,
neighbors: int = 3,
):
create_dataset_task = create_dataset().set_caching_options(False)

normalize_dataset_task = normalize_dataset(
input_iris_dataset=create_dataset_task.outputs["iris_dataset"], standard_scaler=standard_scaler
).set_caching_options(False)

train_model(
normalized_iris_dataset=normalize_dataset_task.outputs["normalized_iris_dataset"], n_neighbors=neighbors
).set_caching_options(False)


if __name__ == "__main__":
compiler.Compiler().compile(my_pipeline, package_path=__file__.replace(".py", "_compiled.yaml"))
Loading

0 comments on commit 67421a0

Please sign in to comment.