From 67421a04f424001dbddf689c80d9dc4c8da9825b Mon Sep 17 00:00:00 2001 From: ddalvi Date: Wed, 15 Jan 2025 15:24:31 -0500 Subject: [PATCH] Add an integration test for Artifact API --- tests/artifacts_test.go | 179 ++++++++++++ .../resources/iris_pipeline_without_cache.py | 159 +++++++++++ .../iris_pipeline_without_cache_compiled.yaml | 264 ++++++++++++++++++ tests/util/rest.go | 2 +- 4 files changed, 603 insertions(+), 1 deletion(-) create mode 100644 tests/artifacts_test.go create mode 100644 tests/resources/iris_pipeline_without_cache.py create mode 100644 tests/resources/iris_pipeline_without_cache_compiled.yaml diff --git a/tests/artifacts_test.go b/tests/artifacts_test.go new file mode 100644 index 000000000..f9997e556 --- /dev/null +++ b/tests/artifacts_test.go @@ -0,0 +1,179 @@ +//go:build test_integration + +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package integration + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "net/url" + "os/exec" + "testing" + "time" + + TestUtil "github.com/opendatahub-io/data-science-pipelines-operator/tests/util" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func (suite *IntegrationTestSuite) TestFetchArtifacts() { + suite.T().Run("Should successfully fetch and download artifacts", func(t *testing.T) { + log.Printf("Starting TestFetchArtifacts in namespace: %s", suite.DSPANamespace) + + // Start port-forwarding + log.Println("Starting port-forwarding for artifact-service...") + cmd := exec.CommandContext(context.Background(), + "kubectl", "port-forward", "-n", suite.DSPANamespace, "svc/ml-pipeline", fmt.Sprintf("%d:8888", DefaultPortforwardLocalPort)) + err := cmd.Start() + require.NoError(t, err, "Failed to start port-forwarding") + + minioServiceName := fmt.Sprintf("minio-%s", suite.DSPANamespace) + + cmd2 := exec.CommandContext(context.Background(), + "kubectl", "port-forward", "-n", suite.DSPANamespace, fmt.Sprintf("svc/%s", minioServiceName), fmt.Sprintf("%d:9000", 9000)) + err = cmd2.Start() + require.NoError(t, err, "Failed to start port-forwarding") + + defer func() { + log.Println("Terminating port-forwarding process...") + _ = cmd.Process.Kill() + cmd.Wait() + _ = cmd2.Process.Kill() + cmd2.Wait() + }() + + // Wait briefly to ensure port-forwarding is established + time.Sleep(15 * time.Second) + log.Printf("Port-forwarding started on localhost:%d", DefaultPortforwardLocalPort) + + type ResponseArtifact struct { + ArtifactID string `json:"artifact_id"` + DownloadUrl string `json:"download_url"` + ArtifactType string `json:"artifact_type"` + } + type ResponseArtifactData struct { + Artifacts []ResponseArtifact `json:"artifacts"` + } + + name := "Test Iris Pipeline" + uploadUrl := fmt.Sprintf("%s/apis/v2beta1/pipelines/upload?name=%s", APIServerURL, url.QueryEscape(name)) + log.Printf("Uploading pipeline: %s to URL: %s", name, uploadUrl) + + vals := map[string]string{ + "uploadfile": "@resources/iris_pipeline_without_cache_compiled.yaml", + } + bodyUpload, contentTypeUpload := TestUtil.FormFromFile(t, vals) + response, err := suite.Clientmgr.httpClient.Post(uploadUrl, contentTypeUpload, bodyUpload) + require.NoError(t, err, "Failed to upload pipeline") + responseData, err := io.ReadAll(response.Body) + require.NoError(t, err, "Failed to read response data") + assert.Equal(t, http.StatusOK, response.StatusCode, "Unexpected HTTP status code") + log.Println("Pipeline uploaded successfully.") + + // Retrieve Pipeline ID + log.Println("Retrieving Pipeline ID...") + pipelineID, err := TestUtil.RetrievePipelineId(t, suite.Clientmgr.httpClient, APIServerURL, name) + require.NoError(t, err, "Failed to retrieve Pipeline ID") + log.Printf("Pipeline ID: %s", pipelineID) + + // Create a new run + log.Println("Creating a new pipeline run...") + runUrl := fmt.Sprintf("%s/apis/v2beta1/runs", APIServerURL) + bodyRun := TestUtil.FormatRequestBody(t, pipelineID, name) + contentTypeRun := "application/json" + response, err = suite.Clientmgr.httpClient.Post(runUrl, contentTypeRun, bytes.NewReader(bodyRun)) + require.NoError(t, err, "Failed to create pipeline run") + responseData, err = io.ReadAll(response.Body) + require.NoError(t, err, "Failed to read run response data") + require.Equal(t, http.StatusOK, response.StatusCode, "Unexpected HTTP status code") + log.Println("Pipeline run created successfully.") + + err = TestUtil.WaitForPipelineRunCompletion(t, suite.Clientmgr.httpClient, APIServerURL) + require.NoError(t, err, "Pipeline run did not complete successfully") + + // Fetch artifacts from API + artifactsUrl := fmt.Sprintf("%s/apis/v2beta1/artifacts?namespace=%s", APIServerURL, suite.DSPANamespace) + log.Printf("Fetching artifacts from URL: %s", artifactsUrl) + response, err = suite.Clientmgr.httpClient.Get(artifactsUrl) + require.NoError(t, err, "Failed to fetch artifacts") + responseData, err = io.ReadAll(response.Body) + require.NoError(t, err, "Failed to read artifacts response data") + assert.Equal(t, http.StatusOK, response.StatusCode, "Unexpected HTTP status code") + + // Parse the artifact list + var responseArtifactsData struct { + Artifacts []ResponseArtifact `json:"artifacts"` + } + err = json.Unmarshal(responseData, &responseArtifactsData) + require.NoError(t, err, "Failed to parse artifacts response JSON") + + hasDownloadError := false + + for _, artifact := range responseArtifactsData.Artifacts { + if artifact.ArtifactType != "system.Model" { + log.Printf("Skipping artifact ID: %s, as it is not a system.Model", artifact.ArtifactID) + continue + } + + // Fetch download URL + artifactsByIdUrl := fmt.Sprintf("%s/apis/v2beta1/artifacts/%s?view=DOWNLOAD", APIServerURL, artifact.ArtifactID) + log.Printf("Fetching download URL for artifact from: %s", artifactsByIdUrl) + response, err = suite.Clientmgr.httpClient.Get(artifactsByIdUrl) + require.NoError(t, err, "Failed to fetch download URL") + responseData, err = io.ReadAll(response.Body) + require.NoError(t, err, "Failed to read download URL response data") + assert.Equal(t, http.StatusOK, response.StatusCode, "Unexpected HTTP status code") + + var artifactWithDownload ResponseArtifact + err = json.Unmarshal(responseData, &artifactWithDownload) + require.NoError(t, err, "Failed to parse download URL response JSON") + + // Modify the URL for local port-forwarding + parsedURL, err := url.Parse(artifactWithDownload.DownloadUrl) + require.NoError(t, err, "Failed to parse artifact download URL") + + originalHost := parsedURL.Host + parsedURL.Host = "127.0.0.1:9000" + + log.Printf("Trying download URL: %s", parsedURL.String()) + + // Create and send the request with correct Host header + req, err := http.NewRequest("GET", parsedURL.String(), nil) + require.NoError(t, err, "Failed to create request") + + req.Host = originalHost + + downloadResp, err := suite.Clientmgr.httpClient.Do(req) + require.NoError(t, err, "Failed to perform request") + assert.Equal(t, http.StatusOK, downloadResp.StatusCode, "Download failed") + + log.Printf("Successfully downloaded artifact from %s", req.URL.String()) + } + + if hasDownloadError { + t.Errorf("Error downloading artifacts. Check logs for details.") + } + + log.Println("TestFetchArtifacts completed successfully.") + }) +} diff --git a/tests/resources/iris_pipeline_without_cache.py b/tests/resources/iris_pipeline_without_cache.py new file mode 100644 index 000000000..614e75ad4 --- /dev/null +++ b/tests/resources/iris_pipeline_without_cache.py @@ -0,0 +1,159 @@ +from kfp import compiler, dsl +from kfp.dsl import ClassificationMetrics, Dataset, Input, Model, Output + +common_base_image = ( + "registry.access.redhat.com/ubi8/python-39@sha256:3523b184212e1f2243e76d8094ab52b01ea3015471471290d011625e1763af61" +) +# common_base_image = "quay.io/opendatahub/ds-pipelines-sample-base:v1.0" + + +@dsl.component(base_image=common_base_image, packages_to_install=["pandas==2.2.0"]) +def create_dataset(iris_dataset: Output[Dataset]): + from io import StringIO # noqa: PLC0415 + + import pandas as pd # noqa: PLC0415 + + data = """ + 5.1,3.5,1.4,0.2,Iris-setosa + 4.9,3.0,1.4,0.2,Iris-setosa + 4.7,3.2,1.3,0.2,Iris-setosa + 4.6,3.1,1.5,0.2,Iris-setosa + 5.0,3.6,1.4,0.2,Iris-setosa + 5.7,3.8,1.7,0.3,Iris-setosa + 5.1,3.8,1.5,0.3,Iris-setosa + 5.4,3.4,1.7,0.2,Iris-setosa + 5.1,3.7,1.5,0.4,Iris-setosa + 5.1,3.4,1.5,0.2,Iris-setosa + 5.0,3.5,1.3,0.3,Iris-setosa + 4.5,2.3,1.3,0.3,Iris-setosa + 4.4,3.2,1.3,0.2,Iris-setosa + 5.0,3.5,1.6,0.6,Iris-setosa + 5.1,3.8,1.9,0.4,Iris-setosa + 4.8,3.0,1.4,0.3,Iris-setosa + 5.1,3.8,1.6,0.2,Iris-setosa + 4.6,3.2,1.4,0.2,Iris-setosa + 5.3,3.7,1.5,0.2,Iris-setosa + 5.0,3.3,1.4,0.2,Iris-setosa + 7.0,3.2,4.7,1.4,Iris-versicolor + 6.4,3.2,4.5,1.5,Iris-versicolor + 6.9,3.1,4.9,1.5,Iris-versicolor + 5.5,2.3,4.0,1.3,Iris-versicolor + 6.5,2.8,4.6,1.5,Iris-versicolor + 6.2,2.2,4.5,1.5,Iris-versicolor + 5.6,2.5,3.9,1.1,Iris-versicolor + 5.9,3.2,4.8,1.8,Iris-versicolor + 6.1,2.8,4.0,1.3,Iris-versicolor + 6.3,2.5,4.9,1.5,Iris-versicolor + 6.1,2.8,4.7,1.2,Iris-versicolor + 6.4,2.9,4.3,1.3,Iris-versicolor + 6.6,3.0,4.4,1.4,Iris-versicolor + 5.6,2.7,4.2,1.3,Iris-versicolor + 5.7,3.0,4.2,1.2,Iris-versicolor + 5.7,2.9,4.2,1.3,Iris-versicolor + 6.2,2.9,4.3,1.3,Iris-versicolor + 5.1,2.5,3.0,1.1,Iris-versicolor + 5.7,2.8,4.1,1.3,Iris-versicolor + 6.3,3.3,6.0,2.5,Iris-virginica + 5.8,2.7,5.1,1.9,Iris-virginica + 7.1,3.0,5.9,2.1,Iris-virginica + 6.3,2.9,5.6,1.8,Iris-virginica + 6.5,3.0,5.8,2.2,Iris-virginica + 6.9,3.1,5.1,2.3,Iris-virginica + 5.8,2.7,5.1,1.9,Iris-virginica + 6.8,3.2,5.9,2.3,Iris-virginica + 6.7,3.3,5.7,2.5,Iris-virginica + 6.7,3.0,5.2,2.3,Iris-virginica + 6.3,2.5,5.0,1.9,Iris-virginica + 6.5,3.0,5.2,2.0,Iris-virginica + 6.2,3.4,5.4,2.3,Iris-virginica + 5.9,3.0,5.1,1.8,Iris-virginica + """ + col_names = ["Sepal_Length", "Sepal_Width", "Petal_Length", "Petal_Width", "Labels"] + df = pd.read_csv(StringIO(data), names=col_names) + + with open(iris_dataset.path, "w") as f: + df.to_csv(f) + + +@dsl.component( + base_image=common_base_image, + packages_to_install=["pandas==2.2.0", "scikit-learn==1.4.0"], +) +def normalize_dataset( + input_iris_dataset: Input[Dataset], + normalized_iris_dataset: Output[Dataset], + standard_scaler: bool, +): + import pandas as pd # noqa: PLC0415 + from sklearn.preprocessing import MinMaxScaler, StandardScaler # noqa: PLC0415 + + with open(input_iris_dataset.path) as f: + df = pd.read_csv(f) + labels = df.pop("Labels") + + scaler = StandardScaler() if standard_scaler else MinMaxScaler() + + df = pd.DataFrame(scaler.fit_transform(df)) + df["Labels"] = labels + normalized_iris_dataset.metadata["state"] = "Normalized" + with open(normalized_iris_dataset.path, "w") as f: + df.to_csv(f) + + +@dsl.component( + base_image=common_base_image, + packages_to_install=["pandas==2.2.0", "scikit-learn==1.4.0"], +) +def train_model( + normalized_iris_dataset: Input[Dataset], + model: Output[Model], + metrics: Output[ClassificationMetrics], + n_neighbors: int, +): + import pickle # noqa: PLC0415 + + import pandas as pd # noqa: PLC0415 + from sklearn.metrics import confusion_matrix # noqa: PLC0415 + from sklearn.model_selection import cross_val_predict, train_test_split # noqa: PLC0415 + from sklearn.neighbors import KNeighborsClassifier # noqa: PLC0415 + + with open(normalized_iris_dataset.path) as f: + df = pd.read_csv(f) + + y = df.pop("Labels") + X = df + + X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0) # noqa: F841 + + clf = KNeighborsClassifier(n_neighbors=n_neighbors) + clf.fit(X_train, y_train) + + predictions = cross_val_predict(clf, X_train, y_train, cv=3) + metrics.log_confusion_matrix( + ["Iris-Setosa", "Iris-Versicolour", "Iris-Virginica"], + confusion_matrix(y_train, predictions).tolist(), # .tolist() to convert np array to list. + ) + + model.metadata["framework"] = "scikit-learn" + with open(model.path, "wb") as f: + pickle.dump(clf, f) + + +@dsl.pipeline(name="iris-training-pipeline") +def my_pipeline( + standard_scaler: bool = True, + neighbors: int = 3, +): + create_dataset_task = create_dataset().set_caching_options(False) + + normalize_dataset_task = normalize_dataset( + input_iris_dataset=create_dataset_task.outputs["iris_dataset"], standard_scaler=standard_scaler + ).set_caching_options(False) + + train_model( + normalized_iris_dataset=normalize_dataset_task.outputs["normalized_iris_dataset"], n_neighbors=neighbors + ).set_caching_options(False) + + +if __name__ == "__main__": + compiler.Compiler().compile(my_pipeline, package_path=__file__.replace(".py", "_compiled.yaml")) diff --git a/tests/resources/iris_pipeline_without_cache_compiled.yaml b/tests/resources/iris_pipeline_without_cache_compiled.yaml new file mode 100644 index 000000000..b2eb4b1b0 --- /dev/null +++ b/tests/resources/iris_pipeline_without_cache_compiled.yaml @@ -0,0 +1,264 @@ +# PIPELINE DEFINITION +# Name: iris-training-pipeline +# Inputs: +# neighbors: int [Default: 3.0] +# standard_scaler: bool [Default: True] +# Outputs: +# train-model-metrics: system.ClassificationMetrics +components: + comp-create-dataset: + executorLabel: exec-create-dataset + outputDefinitions: + artifacts: + iris_dataset: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + comp-normalize-dataset: + executorLabel: exec-normalize-dataset + inputDefinitions: + artifacts: + input_iris_dataset: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + parameters: + standard_scaler: + parameterType: BOOLEAN + outputDefinitions: + artifacts: + normalized_iris_dataset: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + comp-train-model: + executorLabel: exec-train-model + inputDefinitions: + artifacts: + normalized_iris_dataset: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + parameters: + n_neighbors: + parameterType: NUMBER_INTEGER + outputDefinitions: + artifacts: + metrics: + artifactType: + schemaTitle: system.ClassificationMetrics + schemaVersion: 0.0.1 + model: + artifactType: + schemaTitle: system.Model + schemaVersion: 0.0.1 +deploymentSpec: + executors: + exec-create-dataset: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - create_dataset + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.9.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'pandas==2.2.0'\ + \ && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef create_dataset(iris_dataset: Output[Dataset]):\n from io import\ + \ StringIO # noqa: PLC0415\n\n import pandas as pd # noqa: PLC0415\n\ + \n data = \"\"\"\n 5.1,3.5,1.4,0.2,Iris-setosa\n 4.9,3.0,1.4,0.2,Iris-setosa\n\ + \ 4.7,3.2,1.3,0.2,Iris-setosa\n 4.6,3.1,1.5,0.2,Iris-setosa\n 5.0,3.6,1.4,0.2,Iris-setosa\n\ + \ 5.7,3.8,1.7,0.3,Iris-setosa\n 5.1,3.8,1.5,0.3,Iris-setosa\n 5.4,3.4,1.7,0.2,Iris-setosa\n\ + \ 5.1,3.7,1.5,0.4,Iris-setosa\n 5.1,3.4,1.5,0.2,Iris-setosa\n 5.0,3.5,1.3,0.3,Iris-setosa\n\ + \ 4.5,2.3,1.3,0.3,Iris-setosa\n 4.4,3.2,1.3,0.2,Iris-setosa\n 5.0,3.5,1.6,0.6,Iris-setosa\n\ + \ 5.1,3.8,1.9,0.4,Iris-setosa\n 4.8,3.0,1.4,0.3,Iris-setosa\n 5.1,3.8,1.6,0.2,Iris-setosa\n\ + \ 4.6,3.2,1.4,0.2,Iris-setosa\n 5.3,3.7,1.5,0.2,Iris-setosa\n 5.0,3.3,1.4,0.2,Iris-setosa\n\ + \ 7.0,3.2,4.7,1.4,Iris-versicolor\n 6.4,3.2,4.5,1.5,Iris-versicolor\n\ + \ 6.9,3.1,4.9,1.5,Iris-versicolor\n 5.5,2.3,4.0,1.3,Iris-versicolor\n\ + \ 6.5,2.8,4.6,1.5,Iris-versicolor\n 6.2,2.2,4.5,1.5,Iris-versicolor\n\ + \ 5.6,2.5,3.9,1.1,Iris-versicolor\n 5.9,3.2,4.8,1.8,Iris-versicolor\n\ + \ 6.1,2.8,4.0,1.3,Iris-versicolor\n 6.3,2.5,4.9,1.5,Iris-versicolor\n\ + \ 6.1,2.8,4.7,1.2,Iris-versicolor\n 6.4,2.9,4.3,1.3,Iris-versicolor\n\ + \ 6.6,3.0,4.4,1.4,Iris-versicolor\n 5.6,2.7,4.2,1.3,Iris-versicolor\n\ + \ 5.7,3.0,4.2,1.2,Iris-versicolor\n 5.7,2.9,4.2,1.3,Iris-versicolor\n\ + \ 6.2,2.9,4.3,1.3,Iris-versicolor\n 5.1,2.5,3.0,1.1,Iris-versicolor\n\ + \ 5.7,2.8,4.1,1.3,Iris-versicolor\n 6.3,3.3,6.0,2.5,Iris-virginica\n\ + \ 5.8,2.7,5.1,1.9,Iris-virginica\n 7.1,3.0,5.9,2.1,Iris-virginica\n\ + \ 6.3,2.9,5.6,1.8,Iris-virginica\n 6.5,3.0,5.8,2.2,Iris-virginica\n\ + \ 6.9,3.1,5.1,2.3,Iris-virginica\n 5.8,2.7,5.1,1.9,Iris-virginica\n\ + \ 6.8,3.2,5.9,2.3,Iris-virginica\n 6.7,3.3,5.7,2.5,Iris-virginica\n\ + \ 6.7,3.0,5.2,2.3,Iris-virginica\n 6.3,2.5,5.0,1.9,Iris-virginica\n\ + \ 6.5,3.0,5.2,2.0,Iris-virginica\n 6.2,3.4,5.4,2.3,Iris-virginica\n\ + \ 5.9,3.0,5.1,1.8,Iris-virginica\n \"\"\"\n col_names = [\"Sepal_Length\"\ + , \"Sepal_Width\", \"Petal_Length\", \"Petal_Width\", \"Labels\"]\n df\ + \ = pd.read_csv(StringIO(data), names=col_names)\n\n with open(iris_dataset.path,\ + \ \"w\") as f:\n df.to_csv(f)\n\n" + image: registry.access.redhat.com/ubi8/python-39@sha256:3523b184212e1f2243e76d8094ab52b01ea3015471471290d011625e1763af61 + exec-normalize-dataset: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - normalize_dataset + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.9.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'pandas==2.2.0'\ + \ 'scikit-learn==1.4.0' && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef normalize_dataset(\n input_iris_dataset: Input[Dataset],\n\ + \ normalized_iris_dataset: Output[Dataset],\n standard_scaler: bool,\n\ + ):\n import pandas as pd # noqa: PLC0415\n from sklearn.preprocessing\ + \ import MinMaxScaler, StandardScaler # noqa: PLC0415\n\n with open(input_iris_dataset.path)\ + \ as f:\n df = pd.read_csv(f)\n labels = df.pop(\"Labels\")\n\n\ + \ scaler = StandardScaler() if standard_scaler else MinMaxScaler()\n\n\ + \ df = pd.DataFrame(scaler.fit_transform(df))\n df[\"Labels\"] = labels\n\ + \ normalized_iris_dataset.metadata[\"state\"] = \"Normalized\"\n with\ + \ open(normalized_iris_dataset.path, \"w\") as f:\n df.to_csv(f)\n\ + \n" + image: registry.access.redhat.com/ubi8/python-39@sha256:3523b184212e1f2243e76d8094ab52b01ea3015471471290d011625e1763af61 + exec-train-model: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - train_model + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'kfp==2.9.0'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' &&\ + \ python3 -m pip install --quiet --no-warn-script-location 'pandas==2.2.0'\ + \ 'scikit-learn==1.4.0' && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef train_model(\n normalized_iris_dataset: Input[Dataset],\n\ + \ model: Output[Model],\n metrics: Output[ClassificationMetrics],\n\ + \ n_neighbors: int,\n):\n import pickle # noqa: PLC0415\n\n import\ + \ pandas as pd # noqa: PLC0415\n from sklearn.metrics import confusion_matrix\ + \ # noqa: PLC0415\n from sklearn.model_selection import cross_val_predict,\ + \ train_test_split # noqa: PLC0415\n from sklearn.neighbors import KNeighborsClassifier\ + \ # noqa: PLC0415\n\n with open(normalized_iris_dataset.path) as f:\n\ + \ df = pd.read_csv(f)\n\n y = df.pop(\"Labels\")\n X = df\n\ + \n X_train, X_test, y_train, y_test = train_test_split(X, y, random_state=0)\ + \ # noqa: F841\n\n clf = KNeighborsClassifier(n_neighbors=n_neighbors)\n\ + \ clf.fit(X_train, y_train)\n\n predictions = cross_val_predict(clf,\ + \ X_train, y_train, cv=3)\n metrics.log_confusion_matrix(\n [\"\ + Iris-Setosa\", \"Iris-Versicolour\", \"Iris-Virginica\"],\n confusion_matrix(y_train,\ + \ predictions).tolist(), # .tolist() to convert np array to list.\n \ + \ )\n\n model.metadata[\"framework\"] = \"scikit-learn\"\n with open(model.path,\ + \ \"wb\") as f:\n pickle.dump(clf, f)\n\n" + image: registry.access.redhat.com/ubi8/python-39@sha256:3523b184212e1f2243e76d8094ab52b01ea3015471471290d011625e1763af61 +pipelineInfo: + name: iris-training-pipeline +root: + dag: + outputs: + artifacts: + train-model-metrics: + artifactSelectors: + - outputArtifactKey: metrics + producerSubtask: train-model + tasks: + create-dataset: + cachingOptions: {} + componentRef: + name: comp-create-dataset + taskInfo: + name: create-dataset + normalize-dataset: + cachingOptions: {} + componentRef: + name: comp-normalize-dataset + dependentTasks: + - create-dataset + inputs: + artifacts: + input_iris_dataset: + taskOutputArtifact: + outputArtifactKey: iris_dataset + producerTask: create-dataset + parameters: + standard_scaler: + componentInputParameter: standard_scaler + taskInfo: + name: normalize-dataset + train-model: + cachingOptions: {} + componentRef: + name: comp-train-model + dependentTasks: + - normalize-dataset + inputs: + artifacts: + normalized_iris_dataset: + taskOutputArtifact: + outputArtifactKey: normalized_iris_dataset + producerTask: normalize-dataset + parameters: + n_neighbors: + componentInputParameter: neighbors + taskInfo: + name: train-model + inputDefinitions: + parameters: + neighbors: + defaultValue: 3.0 + isOptional: true + parameterType: NUMBER_INTEGER + standard_scaler: + defaultValue: true + isOptional: true + parameterType: BOOLEAN + outputDefinitions: + artifacts: + train-model-metrics: + artifactType: + schemaTitle: system.ClassificationMetrics + schemaVersion: 0.0.1 +schemaVersion: 2.1.0 +sdkVersion: kfp-2.9.0 diff --git a/tests/util/rest.go b/tests/util/rest.go index 4baad3bb2..9fcf2994b 100644 --- a/tests/util/rest.go +++ b/tests/util/rest.go @@ -119,7 +119,7 @@ func FormatRequestBody(t *testing.T, pipelineID string, PipelineDisplayName stri } func WaitForPipelineRunCompletion(t *testing.T, httpClient http.Client, APIServerURL string) error { - timeout := time.After(6 * time.Minute) + timeout := time.After(10 * time.Minute) ticker := time.NewTicker(6 * time.Second) defer ticker.Stop() for {