From 8bc958f3465a414e78219785b27442245c5bd32c Mon Sep 17 00:00:00 2001 From: Jinfeng Date: Mon, 30 Oct 2023 13:01:55 -0700 Subject: [PATCH 1/9] add test_compat_standardization in progress Signed-off-by: Jinfeng --- python/tests/test_logistic_regression.py | 89 ++++++++++++++++++++++++ 1 file changed, 89 insertions(+) diff --git a/python/tests/test_logistic_regression.py b/python/tests/test_logistic_regression.py index 2b3fd967..81223556 100644 --- a/python/tests/test_logistic_regression.py +++ b/python/tests/test_logistic_regression.py @@ -561,6 +561,95 @@ def test_compat( assert blor_model.numFeatures == 2 +@pytest.mark.compat +@pytest.mark.parametrize("fit_intercept", [True, False]) +@pytest.mark.parametrize( + "lr_types", + [ + (SparkLogisticRegression, SparkLogisticRegressionModel), + (LogisticRegression, LogisticRegressionModel), + ], +) +def test_compat_standardization( + fit_intercept: bool, + lr_types: Tuple[LogisticRegressionType, LogisticRegressionModelType], +) -> None: + + _LogisticRegression, _LogisticRegressionModel = lr_types + + X = np.array( + [ + [1.0, 3000.0], + [1.0, 4000.0], + [2.0, 1000.0], + [2.0, 2000.0], + ] + ) + y = np.array( + [ + 1.0, + 1.0, + 0.0, + 0.0, + ] + ) + num_rows = len(X) + + weight = np.ones([num_rows]) + feature_cols = ["c0", "c1"] + schema = ["c0 float, c1 float, weight float, label float"] + + with CleanSparkSession() as spark: + np_array = np.concatenate( + (X, weight.reshape(num_rows, 1), y.reshape(num_rows, 1)), axis=1 + ) + + bdf = spark.createDataFrame( + np_array.tolist(), + ",".join(schema), + ) + + bdf = bdf.withColumn("features", array_to_vector(array(*feature_cols))).drop( + *feature_cols + ) + + blor = _LogisticRegression( + regParam=0.1, fitIntercept=fit_intercept, standardization=True + ) + + if isinstance(blor, SparkLogisticRegression): + blor.setWeightCol("weight") + + blor_model = blor.fit(bdf) + + blor_model.setFeaturesCol("features") + blor_model.setProbabilityCol("newProbability") + blor_model.setRawPredictionCol("newRawPrediction") + + array_equal( + blor_model.coefficients.toArray(), [-2.42377087, 2.42377087], tolerance + ) + assert array_equal( + blor_model.coefficientMatrix.toArray(), + np.array([[-2.42377087, 2.42377087]]), + tolerance, + ) + assert array_equal(blor_model.interceptVector.toArray(), [0.0]) + + output_df = blor_model.transform(bdf) + + if isinstance(blor_model, SparkLogisticRegressionModel): + assert array_equal( + output.newRawPrediction.toArray(), + Vectors.dense([-2.4238, 2.4238]).toArray(), + tolerance, + ) + else: + warnings.warn( + "transform of spark rapids ml currently does not support rawPredictionCol" + ) + + @pytest.mark.parametrize("feature_type", [feature_types.vector]) @pytest.mark.parametrize("data_type", [np.float32]) @pytest.mark.parametrize("data_shape", [(2000, 8)], ids=idfn) From f01f7a719ac520ffbba73f77cd45f2e9ea821263 Mon Sep 17 00:00:00 2001 From: Jinfeng Date: Sat, 25 Nov 2023 18:21:15 -0800 Subject: [PATCH 2/9] support standardization and add tests --- python/src/spark_rapids_ml/classification.py | 17 ++- python/tests/test_logistic_regression.py | 141 ++++--------------- 2 files changed, 43 insertions(+), 115 deletions(-) diff --git a/python/src/spark_rapids_ml/classification.py b/python/src/spark_rapids_ml/classification.py index 88d7bf92..d9fcf0a3 100644 --- a/python/src/spark_rapids_ml/classification.py +++ b/python/src/spark_rapids_ml/classification.py @@ -660,7 +660,7 @@ def _param_mapping(cls) -> Dict[str, Optional[str]]: "fitIntercept": "fit_intercept", "threshold": None, "thresholds": None, - "standardization": "", # Set to "" instead of None because cuml defaults to standardization = False + "standardization": "standardization", "weightCol": None, "aggregationDepth": None, "family": "", # family can be 'auto', 'binomial' or 'multinomial', cuml automatically detects num_classes @@ -680,6 +680,7 @@ def _param_value_mapping( def _get_cuml_params_default(self) -> Dict[str, Any]: return { "fit_intercept": True, + "standardization": False, "verbose": False, "C": 1.0, "penalty": "l2", @@ -848,6 +849,8 @@ class LogisticRegression( Note this is only supported in spark >= 3.4. fitIntercept: Whether to fit an intercept term. + standardization: + Whether to standardize the training data. num_workers: Number of cuML workers, where each cuML worker corresponds to one Spark task running on one GPU. If not set, spark-rapids-ml tries to infer the number of @@ -909,6 +912,7 @@ def __init__( elasticNetParam: float = 0.0, tol: float = 1e-6, fitIntercept: bool = True, + standardization: bool = True, enable_sparse_data_optim: Optional[bool] = None, num_workers: Optional[int] = None, verbose: Union[int, bool] = False, @@ -985,9 +989,18 @@ def _single_fit(init_parameters: Dict[str, Any]) -> Dict[str, Any]: pdesc.rank, ) + intercept_array = logistic_regression.intercept_ + # follow Spark to center the intercepts for multinomial classification + if ( + init_parameters["fit_intercept"] is True + and len(intercept_array) > 1 + ): + intercept_mean = sum(intercept_array) / len(intercept_array) + intercept_array -= intercept_mean + model = { "coef_": logistic_regression.coef_.tolist(), - "intercept_": logistic_regression.intercept_.tolist(), + "intercept_": intercept_array.tolist(), "classes_": logistic_regression.classes_.tolist(), "n_cols": logistic_regression.n_cols, "dtype": logistic_regression.dtype.name, diff --git a/python/tests/test_logistic_regression.py b/python/tests/test_logistic_regression.py index 81223556..0750387f 100644 --- a/python/tests/test_logistic_regression.py +++ b/python/tests/test_logistic_regression.py @@ -109,7 +109,9 @@ def test_toy_example(gpu_number: int) -> None: schema = features_col + " array, " + label_col + " float" df = spark.createDataFrame(data, schema=schema) - lr_estimator = LogisticRegression(regParam=1.0, num_workers=gpu_number) + lr_estimator = LogisticRegression( + standardization=False, regParam=1.0, num_workers=gpu_number + ) lr_estimator.setFeaturesCol(features_col) lr_estimator.setLabelCol(label_col) lr_estimator.setProbabilityCol(probability_col) @@ -169,6 +171,7 @@ def test_params(tmp_path: str, caplog: LogCaptureFixture) -> None: "elasticNetParam": 0.0, "tol": 1e-06, "fitIntercept": True, + "standardization": True, } default_cuml_params = { @@ -178,6 +181,7 @@ def test_params(tmp_path: str, caplog: LogCaptureFixture) -> None: "l1_ratio": 0.0, "tol": 1e-6, "fit_intercept": True, + "standardization": True, } default_lr = LogisticRegression() @@ -191,6 +195,7 @@ def test_params(tmp_path: str, caplog: LogCaptureFixture) -> None: "elasticNetParam": 0.0, "tol": 1e-2, "fitIntercept": False, + "standardization": False, } spark_lr = LogisticRegression(**spark_params) @@ -205,6 +210,7 @@ def test_params(tmp_path: str, caplog: LogCaptureFixture) -> None: "l1_ratio": 0.0, "tol": 1e-2, "fit_intercept": False, + "standardization": False, } ) assert_params(spark_lr, expected_spark_params, expected_cuml_params) @@ -216,6 +222,7 @@ def test_params(tmp_path: str, caplog: LogCaptureFixture) -> None: "elasticNetParam": 1.0, "tol": 1e-2, "fitIntercept": False, + "standardization": False, } spark_lr = LogisticRegression(**spark_params) @@ -230,6 +237,7 @@ def test_params(tmp_path: str, caplog: LogCaptureFixture) -> None: "l1_ratio": 1.0, "tol": 1e-2, "fit_intercept": False, + "standardization": False, } ) assert_params(spark_lr, expected_spark_params, expected_cuml_params) @@ -241,6 +249,7 @@ def test_params(tmp_path: str, caplog: LogCaptureFixture) -> None: "elasticNetParam": 0.3, "tol": 1e-2, "fitIntercept": False, + "standardization": True, } spark_lr = LogisticRegression(**spark_params) @@ -255,6 +264,7 @@ def test_params(tmp_path: str, caplog: LogCaptureFixture) -> None: "l1_ratio": 0.3, "tol": 1e-2, "fit_intercept": False, + "standardization": True, } ) assert_params(spark_lr, expected_spark_params, expected_cuml_params) @@ -291,8 +301,13 @@ def test_classifier( reg_param: float = 0.0, elasticNet_param: float = 0.0, tolerance: float = 0.001, + standardization: bool = False, convert_to_sparse: bool = False, ) -> LogisticRegression: + assert ( + standardization is False + ), "standardization=True is not supported due to testing with single-GPU LogisticRegression" + if convert_to_sparse is True: assert feature_type == "vector" @@ -340,6 +355,7 @@ def to_sparse_func(v: Union[SparseVector, DenseVector]) -> SparseVector: assert label_col is not None spark_lr = LogisticRegression( enable_sparse_data_optim=convert_to_sparse, + standardization=standardization, fitIntercept=fit_intercept, regParam=reg_param, elasticNetParam=elasticNet_param, @@ -463,13 +479,9 @@ def test_compat( ) assert _LogisticRegression().getRegParam() == 0.0 - if lr_types[0] is SparkLogisticRegression: - blor = _LogisticRegression( - regParam=0.1, fitIntercept=fit_intercept, standardization=False - ) - else: - warnings.warn("spark rapids ml does not accept standardization") - blor = _LogisticRegression(regParam=0.1, fitIntercept=fit_intercept) + blor = _LogisticRegression( + regParam=0.1, fitIntercept=fit_intercept, standardization=False + ) assert blor.getRegParam() == 0.1 @@ -561,95 +573,6 @@ def test_compat( assert blor_model.numFeatures == 2 -@pytest.mark.compat -@pytest.mark.parametrize("fit_intercept", [True, False]) -@pytest.mark.parametrize( - "lr_types", - [ - (SparkLogisticRegression, SparkLogisticRegressionModel), - (LogisticRegression, LogisticRegressionModel), - ], -) -def test_compat_standardization( - fit_intercept: bool, - lr_types: Tuple[LogisticRegressionType, LogisticRegressionModelType], -) -> None: - - _LogisticRegression, _LogisticRegressionModel = lr_types - - X = np.array( - [ - [1.0, 3000.0], - [1.0, 4000.0], - [2.0, 1000.0], - [2.0, 2000.0], - ] - ) - y = np.array( - [ - 1.0, - 1.0, - 0.0, - 0.0, - ] - ) - num_rows = len(X) - - weight = np.ones([num_rows]) - feature_cols = ["c0", "c1"] - schema = ["c0 float, c1 float, weight float, label float"] - - with CleanSparkSession() as spark: - np_array = np.concatenate( - (X, weight.reshape(num_rows, 1), y.reshape(num_rows, 1)), axis=1 - ) - - bdf = spark.createDataFrame( - np_array.tolist(), - ",".join(schema), - ) - - bdf = bdf.withColumn("features", array_to_vector(array(*feature_cols))).drop( - *feature_cols - ) - - blor = _LogisticRegression( - regParam=0.1, fitIntercept=fit_intercept, standardization=True - ) - - if isinstance(blor, SparkLogisticRegression): - blor.setWeightCol("weight") - - blor_model = blor.fit(bdf) - - blor_model.setFeaturesCol("features") - blor_model.setProbabilityCol("newProbability") - blor_model.setRawPredictionCol("newRawPrediction") - - array_equal( - blor_model.coefficients.toArray(), [-2.42377087, 2.42377087], tolerance - ) - assert array_equal( - blor_model.coefficientMatrix.toArray(), - np.array([[-2.42377087, 2.42377087]]), - tolerance, - ) - assert array_equal(blor_model.interceptVector.toArray(), [0.0]) - - output_df = blor_model.transform(bdf) - - if isinstance(blor_model, SparkLogisticRegressionModel): - assert array_equal( - output.newRawPrediction.toArray(), - Vectors.dense([-2.4238, 2.4238]).toArray(), - tolerance, - ) - else: - warnings.warn( - "transform of spark rapids ml currently does not support rawPredictionCol" - ) - - @pytest.mark.parametrize("feature_type", [feature_types.vector]) @pytest.mark.parametrize("data_type", [np.float32]) @pytest.mark.parametrize("data_shape", [(2000, 8)], ids=idfn) @@ -801,23 +724,15 @@ def test_compat_multinomial( ) assert _LogisticRegression().getRegParam() == 0.0 - if lr_types[0] is SparkLogisticRegression: - mlor = _LogisticRegression( - regParam=0.1, - elasticNetParam=0.2, - fitIntercept=fit_intercept, - family="multinomial", - standardization=False, - ) - else: - warnings.warn("spark rapids ml does not accept standardization") - mlor = _LogisticRegression( - regParam=0.1, - elasticNetParam=0.2, - fitIntercept=fit_intercept, - family="multinomial", - ) + mlor = _LogisticRegression( + regParam=0.1, + elasticNetParam=0.2, + fitIntercept=fit_intercept, + family="multinomial", + standardization=False, + ) + assert mlor.getStandardization() == False assert mlor.getRegParam() == 0.1 assert mlor.getElasticNetParam() == 0.2 From f3722d80c30d1c2dfcac78448fd32fe0a619352a Mon Sep 17 00:00:00 2001 From: Jinfeng Date: Thu, 18 Jan 2024 17:23:09 -0800 Subject: [PATCH 3/9] rebase latest --- python/tests/test_logistic_regression.py | 190 +++++++++++++++++++++++ 1 file changed, 190 insertions(+) diff --git a/python/tests/test_logistic_regression.py b/python/tests/test_logistic_regression.py index 0750387f..cd0a80f3 100644 --- a/python/tests/test_logistic_regression.py +++ b/python/tests/test_logistic_regression.py @@ -1667,3 +1667,193 @@ def test_sparse_crossvalidator_logistic_regression( data_shape=data_shape, convert_to_sparse=True, ) + + +@pytest.mark.compat +@pytest.mark.parametrize("fit_intercept", [True, False]) +@pytest.mark.parametrize("data_type", [np.float32]) +@pytest.mark.parametrize( + "lr_types", + [ + (SparkLogisticRegression, SparkLogisticRegressionModel), + (LogisticRegression, LogisticRegressionModel), + ], +) +def test_compat_standardization( + fit_intercept: bool, + data_type: np.dtype, + lr_types: Tuple[LogisticRegressionType, LogisticRegressionModelType], +) -> None: + _LogisticRegression, _LogisticRegressionModel = lr_types + tolerance = 1e-3 + + X, _, y, y_test = make_classification_dataset( + datatype=data_type, + nrows=10000, + ncols=2, + n_classes=2, + n_informative=2, + n_redundant=0, + n_repeated=0, + ) + + X[:, 0] *= 1000 # Scale up the first features by 1000 + X[:, 0] += 50 # Shift the first features by 50 + + num_rows = len(X) + weight = np.ones([num_rows]) + feature_cols = ["c0", "c1"] + schema = ["c0 float, c1 float, weight float, label float"] + + with CleanSparkSession() as spark: + np_array = np.concatenate( + (X, weight.reshape(num_rows, 1), y.reshape(num_rows, 1)), axis=1 + ) + + bdf = spark.createDataFrame( + np_array.tolist(), + ",".join(schema), + ) + + bdf = bdf.withColumn("features", array_to_vector(array(*feature_cols))).drop( + *feature_cols + ) + + blor = _LogisticRegression( + regParam=0.01, fitIntercept=fit_intercept, standardization=True + ) + + if isinstance(blor, SparkLogisticRegression): + blor.setWeightCol("weight") + + blor_model = blor.fit(bdf) + + blor_model.setFeaturesCol("features") + blor_model.setProbabilityCol("newProbability") + blor_model.setRawPredictionCol("newRawPrediction") + + if fit_intercept is False: + array_equal( + blor_model.coefficients.toArray(), + [-1.59550205e-04, 1.35555146e00], + tolerance, + ) + array_equal( + blor_model.coefficientMatrix.toArray(), + [-1.59550205e-04, 1.35555146e00], + tolerance, + ) + assert blor_model.intercept == 0.0 + assert blor_model.interceptVector.toArray() == [0.0] + else: + array_equal( + blor_model.coefficients.toArray(), + [-1.63432342e-04, 1.35951030e00], + tolerance, + ) + array_equal( + blor_model.coefficientMatrix.toArray(), + [-1.63432342e-04, 1.35951030e00], + tolerance, + ) + assert array_equal([blor_model.intercept], [-0.05060137], tolerance) + assert array_equal( + blor_model.interceptVector.toArray(), [-0.05060137], tolerance + ) + + +@pytest.mark.parametrize("fit_intercept", [True, False]) +@pytest.mark.parametrize( + "reg_factors", [(0.0, 0.0), (0.1, 0.0), (0.1, 1.0), (0.1, 0.2)] +) +@pytest.mark.parametrize("feature_type", ["vector"]) +@pytest.mark.parametrize("data_type", [np.float32]) +@pytest.mark.parametrize("max_record_batch", [20]) +@pytest.mark.parametrize("ncols_nclasses", [(2, 2), (4, 3), (4, 4)]) +@pytest.mark.slow +def test_standardization( + fit_intercept: bool, + reg_factors: Tuple[float, float], + feature_type: str, + data_type: np.dtype, + max_record_batch: int, + ncols_nclasses: Tuple[int, int], + gpu_number: int, +) -> None: + tolerance = 0.001 + reg_param = reg_factors[0] + elasticNet_param = reg_factors[1] + n_rows = 10000 + n_cols = ncols_nclasses[0] + n_classes = ncols_nclasses[1] + + X_train, X_test, y_train, y_test = make_classification_dataset( + datatype=data_type, + nrows=n_rows, + ncols=n_cols, + n_classes=n_classes, + n_informative=n_cols, + n_redundant=0, + n_repeated=0, + ) + X_train[:, 0] *= 1000 # Scale up the first features by 1000 + X_train[:, 0] += 50 # Shift the first features by 50 + + X_test[:, 0] *= 1000 # Scale up the first features by 1000 + X_test[:, 0] += 50 # Shift the first features by 50 + + conf = {"spark.sql.execution.arrow.maxRecordsPerBatch": str(max_record_batch)} + with CleanSparkSession(conf) as spark: + train_df, features_col, label_col = create_pyspark_dataframe( + spark, feature_type, data_type, X_train, y_train + ) + test_df, _, _ = create_pyspark_dataframe( + spark, feature_type, data_type, X_test, y_test + ) + + assert label_col is not None + + def train_model(EstimatorClass, ModelClass): # type: ignore + estimator = EstimatorClass( + standardization=True, + fitIntercept=fit_intercept, + regParam=reg_param, + elasticNetParam=elasticNet_param, + ) + estimator.setFeaturesCol(features_col) + estimator.setLabelCol(label_col) + model = estimator.fit(train_df) + + preds = model.transform(train_df).collect() + y_preds = [row[label_col] for row in preds] + from sklearn.metrics import accuracy_score + + train_acc = accuracy_score(y_train, y_preds) + + preds = model.transform(test_df).collect() + y_preds = [row[label_col] for row in preds] + test_acc = accuracy_score(y_test, y_preds) + + return (estimator, model, train_acc, test_acc) + + mg, mg_model, mg_train_acc, mg_test_acc = train_model( + LogisticRegression, LogisticRegressionModel + ) + mc, mc_model, mc_train_acc, mc_test_acc = train_model( + SparkLogisticRegression, SparkLogisticRegressionModel + ) + + assert array_equal( + mg_model.coefficientMatrix.toArray(), + mc_model.coefficientMatrix.toArray(), + tolerance, + ) + assert array_equal( + mg_model.interceptVector.toArray(), + mc_model.interceptVector.toArray(), + tolerance, + ) + assert ( + mg_train_acc > mc_train_acc or abs(mg_train_acc - mc_train_acc) < tolerance + ) + assert mg_test_acc > mc_test_acc or abs(mg_test_acc - mc_test_acc) < tolerance From 9afa6884259f1e81269368b9fc80ed9ae3c7d518 Mon Sep 17 00:00:00 2001 From: Jinfeng Date: Mon, 22 Jan 2024 15:06:48 -0800 Subject: [PATCH 4/9] test standardization on sparse example and nlp20news Signed-off-by: Jinfeng --- python/tests/test_logistic_regression.py | 131 +++++++++++++++++++++-- 1 file changed, 122 insertions(+), 9 deletions(-) diff --git a/python/tests/test_logistic_regression.py b/python/tests/test_logistic_regression.py index cd0a80f3..9e65b8b2 100644 --- a/python/tests/test_logistic_regression.py +++ b/python/tests/test_logistic_regression.py @@ -37,6 +37,9 @@ import warnings +import scipy +from scipy.sparse import csr_matrix + from spark_rapids_ml.classification import LogisticRegression, LogisticRegressionModel from spark_rapids_ml.core import _use_sparse_in_cuml, alias from spark_rapids_ml.tuning import CrossValidator @@ -1371,6 +1374,8 @@ def compare_model( cpu_model: SparkLogisticRegressionModel, df_test: DataFrame, unit_tol: float = 1e-4, + total_tol: float = 0.0, + accuracy_and_probability_only: bool = False, ) -> Tuple[LogisticRegressionModel, SparkLogisticRegressionModel]: gpu_res = gpu_model.transform(df_test).collect() @@ -1384,17 +1389,20 @@ def compare_model( gpu_acc = accuracy_score(ytest_true, gpu_pred) cpu_acc = accuracy_score(ytest_true, cpu_pred) - assert gpu_acc == cpu_acc or abs(gpu_acc - cpu_acc) < 1e-3 - - # compare rawPrediction column - gpu_rawpred = [row["rawPrediction"].toArray().tolist() for row in gpu_res] - cpu_rawpred = [row["rawPrediction"].toArray().tolist() for row in cpu_res] - assert array_equal(gpu_rawpred, cpu_rawpred) + assert gpu_acc >= cpu_acc or abs(gpu_acc - cpu_acc) < 1e-3 # compare probability column gpu_prob = [row["probability"].toArray().tolist() for row in gpu_res] cpu_prob = [row["probability"].toArray().tolist() for row in cpu_res] - assert array_equal(gpu_prob, cpu_prob) + assert array_equal(gpu_prob, cpu_prob, unit_tol, total_tol) + + if accuracy_and_probability_only: + return (gpu_model, cpu_model) + + # compare rawPrediction column + gpu_rawpred = [row["rawPrediction"].toArray().tolist() for row in gpu_res] + cpu_rawpred = [row["rawPrediction"].toArray().tolist() for row in cpu_res] + assert array_equal(gpu_rawpred, cpu_rawpred, unit_tol, total_tol) # compare coefficients assert array_equal( @@ -1521,9 +1529,11 @@ def test_compat_sparse_multinomial( @pytest.mark.parametrize("fit_intercept", [True, False]) +@pytest.mark.parametrize("standardization", [True, False]) @pytest.mark.slow def test_sparse_nlp20news( fit_intercept: bool, + standardization: bool, caplog: LogCaptureFixture, ) -> None: if version.parse(pyspark.__version__) < version.parse("3.4.0"): @@ -1577,7 +1587,7 @@ def test_sparse_nlp20news( verbose=6, regParam=reg_param, fitIntercept=fit_intercept, - standardization=False, + standardization=standardization, featuresCol="features", labelCol="label", ) @@ -1585,7 +1595,7 @@ def test_sparse_nlp20news( cpu_lr = SparkLogisticRegression( regParam=reg_param, fitIntercept=fit_intercept, - standardization=False, + standardization=standardization, featuresCol="features", labelCol="label", ) @@ -1601,6 +1611,16 @@ def test_sparse_nlp20news( assert "CUDA managed memory enabled." in caplog.text + if standardization is True: + compare_model( + gpu_model, + cpu_model, + df_train, + unit_tol=tolerance, + total_tol=tolerance, + accuracy_and_probability_only=True, + ) + @pytest.mark.parametrize("fit_intercept", [True, False]) @pytest.mark.parametrize( @@ -1857,3 +1877,96 @@ def train_model(EstimatorClass, ModelClass): # type: ignore mg_train_acc > mc_train_acc or abs(mg_train_acc - mc_train_acc) < tolerance ) assert mg_test_acc > mc_test_acc or abs(mg_test_acc - mc_test_acc) < tolerance + + +@pytest.mark.parametrize("fit_intercept", [True, False]) +@pytest.mark.parametrize( + "reg_factors", [(0.0, 0.0), (0.1, 0.0), (0.1, 1.0), (0.1, 0.2)] +) +def test_standardization_sparse_example( + fit_intercept: bool, + reg_factors: Tuple[float, float], + caplog: LogCaptureFixture, +) -> None: + if version.parse(pyspark.__version__) < version.parse("3.4.0"): + import logging + + err_msg = ( + "pyspark < 3.4 is detected. Cannot import pyspark `unwrap_udt` function. " + ) + "The test case will be skipped. Please install pyspark>=3.4." + logging.info(err_msg) + return + + tolerance = 0.001 + # Compare accuracy and probability only when regularizaiton is disabled. + # It is observed that no regularization leads to large absolute values of coefficients, and + # therefore large difference of GPU and CPU in raw Predictions (e.g. 23.1068 v.s. 27.6741) + # and in coefficients (e.g. -23.57752037 v.s. -28.48549335). + accuracy_and_probability_only = True if reg_factors[0] == 0.0 else False + + datatype = np.float32 + + est_params: Dict[str, Any] = { + "standardization": True, + "regParam": reg_factors[0], + "elasticNetParam": reg_factors[1], + "fitIntercept": fit_intercept, + "featuresCol": "features", + "labelCol": "label", + } + + def prepare_csr_matrix_and_y() -> Tuple[csr_matrix, List[float]]: + X_origin = np.array( + [ + [-1.1258, 0.0000, 0.0000, -0.4339, 0.0000], + [-1.5551, -0.3414, 0.0000, 0.0000, 0.0000], + [0.0000, 0.2660, 0.0000, 0.0000, 0.9463], + [-0.8437, 0.0000, 1.2590, 0.0000, 0.0000], + ], + datatype, + ) + + X_origin = np.ascontiguousarray(X_origin.T) + + X = csr_matrix(X_origin) + assert X.nnz == 8 and X.shape == (5, 4) + y = [0.0, 1.0, 2.0, 0.0, 1.0] + return X, y + + X, y = prepare_csr_matrix_and_y() + + conf = { + "spark.rapids.ml.uvm.enabled": True + } # enable memory management to run the test case on GPU with small memory (e.g. 2G) + with CleanSparkSession(conf) as spark: + + def sparse_to_df(X: csr_matrix, y: List[float]) -> DataFrame: + assert X.shape[0] == len(y) + dimension = X.shape[1] + data = [ + Row( + features=SparseVector(dimension, X[i].indices, X[i].data), + label=y[i], + ) + for i in range(len(y)) + ] + df = spark.createDataFrame(data) + + return df + + df = sparse_to_df(X, y) + + gpu_lr = LogisticRegression(**est_params) + cpu_lr = SparkLogisticRegression(**est_params) + + gpu_model = gpu_lr.fit(df) + cpu_model = cpu_lr.fit(df) + + compare_model( + gpu_model, + cpu_model, + df, + tolerance, + accuracy_and_probability_only=accuracy_and_probability_only, + ) From 9910accf7ea83ba33721e816d74f808643048125 Mon Sep 17 00:00:00 2001 From: Jinfeng Date: Tue, 6 Feb 2024 14:33:52 -0800 Subject: [PATCH 5/9] try setting enable_sparse_data_optim in progress --- python/src/spark_rapids_ml/classification.py | 24 +++++++++++++++++++- python/tests/test_logistic_regression.py | 22 +++++++++++++++--- 2 files changed, 42 insertions(+), 4 deletions(-) diff --git a/python/src/spark_rapids_ml/classification.py b/python/src/spark_rapids_ml/classification.py index d9fcf0a3..34a2db80 100644 --- a/python/src/spark_rapids_ml/classification.py +++ b/python/src/spark_rapids_ml/classification.py @@ -791,6 +791,21 @@ def setRawPredictionCol( """ return self._set(rawPredictionCol=value) + def setStandardization( + self: "_LogisticRegressionCumlParams", value: bool + ) -> "_LogisticRegressionCumlParams": + """ + Sets the value of :py:attr:`standardization`. + """ + self._set_params(standardization=value) + if value is True and self.getOrDefault("enable_sparse_data_optim") is not False: + get_logger(self.__class__).warning( + ("when standardization is True, spark rapids ml forces densifying sparse vectors to dense vectors for training. " + "enable_sparse_data_optim is set to False") + ) + self._set_params(enable_sparse_data_optim=False) + return self + class LogisticRegression( LogisticRegressionClass, @@ -850,7 +865,9 @@ class LogisticRegression( fitIntercept: Whether to fit an intercept term. standardization: - Whether to standardize the training data. + Whether to standardize the training data. If true, spark rapids ml sets enable_sparse_data_optim=False + to densify sparse vectors into dense vectors for fitting. Currently there is no support for sparse vectors + standardization in cuml yet. num_workers: Number of cuML workers, where each cuML worker corresponds to one Spark task running on one GPU. If not set, spark-rapids-ml tries to infer the number of @@ -923,9 +940,11 @@ def __init__( "This estimator does not support double precision inputs. Setting float32_inputs to False will be ignored." ) self._input_kwargs.pop("float32_inputs") + super().__init__() self._set_cuml_reg_params() self._set_params(**self._input_kwargs) + self.setStandardization(self.getStandardization()) def _fit_array_order(self) -> _ArrayOrder: return "C" @@ -940,6 +959,9 @@ def _get_cuml_fit_func( ]: array_order = self._fit_array_order() + if self.getStandardization() == True: + assert self.getOrDefault("enable_sparse_data_optim") is False + def _logistic_regression_fit( dfs: FitInputType, params: Dict[str, Any], diff --git a/python/tests/test_logistic_regression.py b/python/tests/test_logistic_regression.py index 9e65b8b2..c28ca9a4 100644 --- a/python/tests/test_logistic_regression.py +++ b/python/tests/test_logistic_regression.py @@ -1528,8 +1528,10 @@ def test_compat_sparse_multinomial( compare_model(gpu_model, cpu_model, mdf) -@pytest.mark.parametrize("fit_intercept", [True, False]) -@pytest.mark.parametrize("standardization", [True, False]) +#@pytest.mark.parametrize("fit_intercept", [True, False]) +@pytest.mark.parametrize("fit_intercept", [True]) +#@pytest.mark.parametrize("standardization", [True, False]) +@pytest.mark.parametrize("standardization", [False]) @pytest.mark.slow def test_sparse_nlp20news( fit_intercept: bool, @@ -1601,6 +1603,9 @@ def test_sparse_nlp20news( ) gpu_model = gpu_lr.fit(df_train) + + exit() + cpu_model = cpu_lr.fit(df_train) cpu_objective = cpu_model.summary.objectiveHistory[-1] @@ -1703,6 +1708,7 @@ def test_compat_standardization( fit_intercept: bool, data_type: np.dtype, lr_types: Tuple[LogisticRegressionType, LogisticRegressionModelType], + caplog: LogCaptureFixture, ) -> None: _LogisticRegression, _LogisticRegressionModel = lr_types tolerance = 1e-3 @@ -1748,6 +1754,11 @@ def test_compat_standardization( blor_model = blor.fit(bdf) + if isinstance(blor, LogisticRegression): + warning_log = ("when standardization is True, spark rapids ml forces densifying sparse vectors to dense vectors for training. " + "enable_sparse_data_optim is set to False") + assert warning_log in caplog.text + blor_model.setFeaturesCol("features") blor_model.setProbabilityCol("newProbability") blor_model.setRawPredictionCol("newRawPrediction") @@ -1881,7 +1892,8 @@ def train_model(EstimatorClass, ModelClass): # type: ignore @pytest.mark.parametrize("fit_intercept", [True, False]) @pytest.mark.parametrize( - "reg_factors", [(0.0, 0.0), (0.1, 0.0), (0.1, 1.0), (0.1, 0.2)] + "reg_factors", [(0.0, 0.0)] + #"reg_factors", [(0.0, 0.0), (0.1, 0.0), (0.1, 1.0), (0.1, 0.2)] ) def test_standardization_sparse_example( fit_intercept: bool, @@ -1961,6 +1973,10 @@ def sparse_to_df(X: csr_matrix, y: List[float]) -> DataFrame: cpu_lr = SparkLogisticRegression(**est_params) gpu_model = gpu_lr.fit(df) + warning_log = ("when standardization is True, spark rapids ml forces densifying sparse vectors to dense vectors for training. " + "enable_sparse_data_optim is set to False") + assert warning_log in caplog.text + cpu_model = cpu_lr.fit(df) compare_model( From 54920f3ad66c8f4b9720b1f74e87ff159ceebeaf Mon Sep 17 00:00:00 2001 From: Jinfeng Date: Mon, 12 Feb 2024 14:32:04 -0800 Subject: [PATCH 6/9] densification in progress with padding zeroes --- python/src/spark_rapids_ml/classification.py | 71 ++++++++++++++------ python/tests/test_logistic_regression.py | 45 ++++++------- 2 files changed, 70 insertions(+), 46 deletions(-) diff --git a/python/src/spark_rapids_ml/classification.py b/python/src/spark_rapids_ml/classification.py index 34a2db80..9d671d2d 100644 --- a/python/src/spark_rapids_ml/classification.py +++ b/python/src/spark_rapids_ml/classification.py @@ -791,21 +791,6 @@ def setRawPredictionCol( """ return self._set(rawPredictionCol=value) - def setStandardization( - self: "_LogisticRegressionCumlParams", value: bool - ) -> "_LogisticRegressionCumlParams": - """ - Sets the value of :py:attr:`standardization`. - """ - self._set_params(standardization=value) - if value is True and self.getOrDefault("enable_sparse_data_optim") is not False: - get_logger(self.__class__).warning( - ("when standardization is True, spark rapids ml forces densifying sparse vectors to dense vectors for training. " - "enable_sparse_data_optim is set to False") - ) - self._set_params(enable_sparse_data_optim=False) - return self - class LogisticRegression( LogisticRegressionClass, @@ -944,7 +929,6 @@ def __init__( super().__init__() self._set_cuml_reg_params() self._set_params(**self._input_kwargs) - self.setStandardization(self.getStandardization()) def _fit_array_order(self) -> _ArrayOrder: return "C" @@ -959,8 +943,11 @@ def _get_cuml_fit_func( ]: array_order = self._fit_array_order() - if self.getStandardization() == True: - assert self.getOrDefault("enable_sparse_data_optim") is False + logger = get_logger(self.__class__) + if self.getStandardization() is True and self.getOrDefault("enable_sparse_data_optim") is not False: + logger.warning( + ("when standardization is True, spark rapids ml forces densifying sparse vectors to dense vectors for training.") + ) def _logistic_regression_fit( dfs: FitInputType, @@ -970,6 +957,32 @@ def _logistic_regression_fit( X_list = [x for (x, _, _) in dfs] y_list = [y for (_, y, _) in dfs] + + # padding temporarily to bypass a cuda kernel error + dimension_origin = X_list[0].shape[1] + enable_padding = dimension_origin % 32 != 0 + #enable_padding = False + if enable_padding: + pad_num_cols = 32 - X_list[0].shape[1] % 32 + def convert_X(X): + print(f"debug before convert X: {X}") + new_shape = (X.shape[0], X.shape[1] + pad_num_cols) + X._shape = new_shape + print(f"debug after convert X: {X}") + return X + #def convert_X(X): + # assert isinstance(X, cupyx.scipy.sparse.csr_matrix) + # num_rows = X.shape[0] + # pad_cols = cupyx.scipy.sparse.csr_matrix((num_rows, pad_num_cols), dtype=X.dtype) + # X = cupyx.scipy.sparse.hstack([X, pad_cols]) + # assert X.shape[1] % 32 == 0 + # return X + if pad_num_cols > 0: + X_list = [convert_X(X) for X in X_list] + + for i in range(len(X_list)): + print(f"debug i: {i}, X[i].shape: {X_list[i].shape}") + if isinstance(X_list[0], pd.DataFrame): concated = pd.concat(X_list) concated_y = pd.concat(y_list) @@ -978,6 +991,13 @@ def _logistic_regression_fit( concated = _concat_and_free(X_list, order=array_order) concated_y = _concat_and_free(y_list, order=array_order) + is_sparse = isinstance(concated, scipy.sparse.csr_matrix) or isinstance(concated, cupyx.scipy.sparse.csr_matrix) + if self.getStandardization() is True and is_sparse is True: + print(f"debug concated.shape: {concated.shape}") + concated = concated.toarray() + + print(f"debug concated: {concated}") + pdesc = PartitionDescriptor.build( [concated.shape[0]], params[param_alias.num_cols] ) @@ -1020,11 +1040,18 @@ def _single_fit(init_parameters: Dict[str, Any]) -> Dict[str, Any]: intercept_mean = sum(intercept_array) / len(intercept_array) intercept_array -= intercept_mean + n_cols = logistic_regression.n_cols if enable_padding is False else dimension_origin + + print(f"debug n_cols: {n_cols}") + print(f"debug logistic_regression.coef_: {logistic_regression.coef_}") + print(f"debug logistic_regression.coef_[:, :n_cols]: {logistic_regression.coef_[:, :n_cols]}") + print(f"debug intercept_array: {intercept_array}") + print(f"debug intercept_array[:n_cols]: {intercept_array[:n_cols]}") model = { - "coef_": logistic_regression.coef_.tolist(), - "intercept_": intercept_array.tolist(), + "coef_": logistic_regression.coef_[:, :n_cols].tolist(), + "intercept_": intercept_array[:n_cols].tolist(), "classes_": logistic_regression.classes_.tolist(), - "n_cols": logistic_regression.n_cols, + "n_cols": n_cols, "dtype": logistic_regression.dtype.name, "num_iters": logistic_regression.solver_model.num_iters, "objective": logistic_regression.solver_model.objective, @@ -1051,7 +1078,7 @@ def _single_fit(init_parameters: Dict[str, Any]) -> Dict[str, Any]: ) if init_parameters["fit_intercept"] is True: - model["coef_"] = [[0.0] * logistic_regression.n_cols] + model["coef_"] = [[0.0] * n_cols] model["intercept_"] = [ float("inf") if class_val == 1.0 else float("-inf") ] diff --git a/python/tests/test_logistic_regression.py b/python/tests/test_logistic_regression.py index c28ca9a4..c4f77575 100644 --- a/python/tests/test_logistic_regression.py +++ b/python/tests/test_logistic_regression.py @@ -1531,7 +1531,7 @@ def test_compat_sparse_multinomial( #@pytest.mark.parametrize("fit_intercept", [True, False]) @pytest.mark.parametrize("fit_intercept", [True]) #@pytest.mark.parametrize("standardization", [True, False]) -@pytest.mark.parametrize("standardization", [False]) +@pytest.mark.parametrize("standardization", [True]) @pytest.mark.slow def test_sparse_nlp20news( fit_intercept: bool, @@ -1604,27 +1604,25 @@ def test_sparse_nlp20news( gpu_model = gpu_lr.fit(df_train) - exit() + #cpu_model = cpu_lr.fit(df_train) + #cpu_objective = cpu_model.summary.objectiveHistory[-1] - cpu_model = cpu_lr.fit(df_train) - cpu_objective = cpu_model.summary.objectiveHistory[-1] + #assert ( + # gpu_model.objective < cpu_objective + # or abs(gpu_model.objective - cpu_objective) < tolerance + #) - assert ( - gpu_model.objective < cpu_objective - or abs(gpu_model.objective - cpu_objective) < tolerance - ) + #assert "CUDA managed memory enabled." in caplog.text - assert "CUDA managed memory enabled." in caplog.text - - if standardization is True: - compare_model( - gpu_model, - cpu_model, - df_train, - unit_tol=tolerance, - total_tol=tolerance, - accuracy_and_probability_only=True, - ) + #if standardization is True: + # compare_model( + # gpu_model, + # cpu_model, + # df_train, + # unit_tol=tolerance, + # total_tol=tolerance, + # accuracy_and_probability_only=True, + # ) @pytest.mark.parametrize("fit_intercept", [True, False]) @@ -1755,8 +1753,7 @@ def test_compat_standardization( blor_model = blor.fit(bdf) if isinstance(blor, LogisticRegression): - warning_log = ("when standardization is True, spark rapids ml forces densifying sparse vectors to dense vectors for training. " - "enable_sparse_data_optim is set to False") + warning_log = ("when standardization is True, spark rapids ml forces densifying sparse vectors to dense vectors for training.") assert warning_log in caplog.text blor_model.setFeaturesCol("features") @@ -1890,7 +1887,8 @@ def train_model(EstimatorClass, ModelClass): # type: ignore assert mg_test_acc > mc_test_acc or abs(mg_test_acc - mc_test_acc) < tolerance -@pytest.mark.parametrize("fit_intercept", [True, False]) +#@pytest.mark.parametrize("fit_intercept", [True, False]) +@pytest.mark.parametrize("fit_intercept", [True]) @pytest.mark.parametrize( "reg_factors", [(0.0, 0.0)] #"reg_factors", [(0.0, 0.0), (0.1, 0.0), (0.1, 1.0), (0.1, 0.2)] @@ -1973,8 +1971,7 @@ def sparse_to_df(X: csr_matrix, y: List[float]) -> DataFrame: cpu_lr = SparkLogisticRegression(**est_params) gpu_model = gpu_lr.fit(df) - warning_log = ("when standardization is True, spark rapids ml forces densifying sparse vectors to dense vectors for training. " - "enable_sparse_data_optim is set to False") + warning_log = ("when standardization is True, spark rapids ml forces densifying sparse vectors to dense vectors for training.") assert warning_log in caplog.text cpu_model = cpu_lr.fit(df) From 3cee060504707431fee6b0fb1ed118ec16fe87bf Mon Sep 17 00:00:00 2001 From: Jinfeng Date: Wed, 14 Feb 2024 13:47:00 -0800 Subject: [PATCH 7/9] remove uvm for nlp20news gets padding 0 working, still need to get all tests working --- python/src/spark_rapids_ml/classification.py | 86 ++++++++++---------- python/tests/test_logistic_regression.py | 44 +++++----- 2 files changed, 65 insertions(+), 65 deletions(-) diff --git a/python/src/spark_rapids_ml/classification.py b/python/src/spark_rapids_ml/classification.py index 9d671d2d..00918b80 100644 --- a/python/src/spark_rapids_ml/classification.py +++ b/python/src/spark_rapids_ml/classification.py @@ -850,8 +850,8 @@ class LogisticRegression( fitIntercept: Whether to fit an intercept term. standardization: - Whether to standardize the training data. If true, spark rapids ml sets enable_sparse_data_optim=False - to densify sparse vectors into dense vectors for fitting. Currently there is no support for sparse vectors + Whether to standardize the training data. If true, spark rapids ml sets enable_sparse_data_optim=False + to densify sparse vectors into dense vectors for fitting. Currently there is no support for sparse vectors standardization in cuml yet. num_workers: Number of cuML workers, where each cuML worker corresponds to one Spark task @@ -944,10 +944,15 @@ def _get_cuml_fit_func( array_order = self._fit_array_order() logger = get_logger(self.__class__) - if self.getStandardization() is True and self.getOrDefault("enable_sparse_data_optim") is not False: - logger.warning( - ("when standardization is True, spark rapids ml forces densifying sparse vectors to dense vectors for training.") + if ( + self.getStandardization() is True + and self.getOrDefault("enable_sparse_data_optim") is not False + ): + logger.warning( + ( + "when standardization is True, spark rapids ml forces densifying sparse vectors to dense vectors for training." ) + ) def _logistic_regression_fit( dfs: FitInputType, @@ -958,30 +963,19 @@ def _logistic_regression_fit( X_list = [x for (x, _, _) in dfs] y_list = [y for (_, y, _) in dfs] - # padding temporarily to bypass a cuda kernel error - dimension_origin = X_list[0].shape[1] - enable_padding = dimension_origin % 32 != 0 - #enable_padding = False - if enable_padding: - pad_num_cols = 32 - X_list[0].shape[1] % 32 - def convert_X(X): - print(f"debug before convert X: {X}") - new_shape = (X.shape[0], X.shape[1] + pad_num_cols) - X._shape = new_shape - print(f"debug after convert X: {X}") - return X - #def convert_X(X): - # assert isinstance(X, cupyx.scipy.sparse.csr_matrix) - # num_rows = X.shape[0] - # pad_cols = cupyx.scipy.sparse.csr_matrix((num_rows, pad_num_cols), dtype=X.dtype) - # X = cupyx.scipy.sparse.hstack([X, pad_cols]) - # assert X.shape[1] % 32 == 0 - # return X - if pad_num_cols > 0: - X_list = [convert_X(X) for X in X_list] - - for i in range(len(X_list)): - print(f"debug i: {i}, X[i].shape: {X_list[i].shape}") + ## padding zero columns to bypass a cuda kernel bug in cuml 24.02 + #dimension_origin = X_list[0].shape[1] + #enable_padding = dimension_origin % 32 != 0 + #if enable_padding: + # pad_num_cols = 32 - X_list[0].shape[1] % 32 + + # def convert_X(X): + # new_shape = (X.shape[0], X.shape[1] + pad_num_cols) + # X._shape = new_shape + # return X + + # if pad_num_cols > 0: + # X_list = [convert_X(X) for X in X_list] if isinstance(X_list[0], pd.DataFrame): concated = pd.concat(X_list) @@ -991,15 +985,28 @@ def convert_X(X): concated = _concat_and_free(X_list, order=array_order) concated_y = _concat_and_free(y_list, order=array_order) - is_sparse = isinstance(concated, scipy.sparse.csr_matrix) or isinstance(concated, cupyx.scipy.sparse.csr_matrix) - if self.getStandardization() is True and is_sparse is True: - print(f"debug concated.shape: {concated.shape}") + is_sparse = isinstance(concated, scipy.sparse.csr_matrix) or isinstance( + concated, cupyx.scipy.sparse.csr_matrix + ) + + if self.getStandardization() is True and is_sparse is True: concated = concated.toarray() - print(f"debug concated: {concated}") + #enable_padding = False + #dimension_origin = concated.shape[1] + num_pad_cols = 0 + if self.getStandardization() is True and concated.shape[1] % 32 != 0: + num_pad_cols = concated.shape[1] % 32 + nrows = concated.shape[0] + if isinstance(concated, np.ndarray): + zeros = np.zeros((nrows, num_pad_cols), dtype=concated.dtype) + concated = np.concatenate([concated, zeros], axis=1) + else: + concated._shape = (nrows, concated.shape[1] + num_pad_cols) pdesc = PartitionDescriptor.build( - [concated.shape[0]], params[param_alias.num_cols] + [concated.shape[0]], + params[param_alias.num_cols] + num_pad_cols, ) def _single_fit(init_parameters: Dict[str, Any]) -> Dict[str, Any]: @@ -1040,16 +1047,11 @@ def _single_fit(init_parameters: Dict[str, Any]) -> Dict[str, Any]: intercept_mean = sum(intercept_array) / len(intercept_array) intercept_array -= intercept_mean - n_cols = logistic_regression.n_cols if enable_padding is False else dimension_origin - - print(f"debug n_cols: {n_cols}") - print(f"debug logistic_regression.coef_: {logistic_regression.coef_}") - print(f"debug logistic_regression.coef_[:, :n_cols]: {logistic_regression.coef_[:, :n_cols]}") - print(f"debug intercept_array: {intercept_array}") - print(f"debug intercept_array[:n_cols]: {intercept_array[:n_cols]}") + n_cols = logistic_regression.n_cols - num_pad_cols + model = { "coef_": logistic_regression.coef_[:, :n_cols].tolist(), - "intercept_": intercept_array[:n_cols].tolist(), + "intercept_": intercept_array.tolist(), "classes_": logistic_regression.classes_.tolist(), "n_cols": n_cols, "dtype": logistic_regression.dtype.name, diff --git a/python/tests/test_logistic_regression.py b/python/tests/test_logistic_regression.py index c4f77575..105337a3 100644 --- a/python/tests/test_logistic_regression.py +++ b/python/tests/test_logistic_regression.py @@ -1528,10 +1528,8 @@ def test_compat_sparse_multinomial( compare_model(gpu_model, cpu_model, mdf) -#@pytest.mark.parametrize("fit_intercept", [True, False]) -@pytest.mark.parametrize("fit_intercept", [True]) -#@pytest.mark.parametrize("standardization", [True, False]) -@pytest.mark.parametrize("standardization", [True]) +@pytest.mark.parametrize("fit_intercept", [True, False]) +@pytest.mark.parametrize("standardization", [True, False]) @pytest.mark.slow def test_sparse_nlp20news( fit_intercept: bool, @@ -1563,7 +1561,7 @@ def test_sparse_nlp20news( y = twenty_train.target.tolist() conf = { - "spark.rapids.ml.uvm.enabled": True + # "spark.rapids.ml.uvm.enabled": True # Commenting this out can resolve a cudaMemSet error } # enable memory management to run the test case on GPU with small memory (e.g. 2G) with CleanSparkSession(conf) as spark: data = [ @@ -1604,25 +1602,25 @@ def test_sparse_nlp20news( gpu_model = gpu_lr.fit(df_train) - #cpu_model = cpu_lr.fit(df_train) - #cpu_objective = cpu_model.summary.objectiveHistory[-1] + cpu_model = cpu_lr.fit(df_train) + cpu_objective = cpu_model.summary.objectiveHistory[-1] - #assert ( - # gpu_model.objective < cpu_objective - # or abs(gpu_model.objective - cpu_objective) < tolerance - #) + assert ( + gpu_model.objective < cpu_objective + or abs(gpu_model.objective - cpu_objective) < tolerance + ) - #assert "CUDA managed memory enabled." in caplog.text + # assert "CUDA managed memory enabled." in caplog.text - #if standardization is True: - # compare_model( - # gpu_model, - # cpu_model, - # df_train, - # unit_tol=tolerance, - # total_tol=tolerance, - # accuracy_and_probability_only=True, - # ) + if standardization is True: + compare_model( + gpu_model, + cpu_model, + df_train, + unit_tol=tolerance, + total_tol=tolerance, + accuracy_and_probability_only=True, + ) @pytest.mark.parametrize("fit_intercept", [True, False]) @@ -1753,7 +1751,7 @@ def test_compat_standardization( blor_model = blor.fit(bdf) if isinstance(blor, LogisticRegression): - warning_log = ("when standardization is True, spark rapids ml forces densifying sparse vectors to dense vectors for training.") + warning_log = "when standardization is True, spark rapids ml forces densifying sparse vectors to dense vectors for training." assert warning_log in caplog.text blor_model.setFeaturesCol("features") @@ -1971,7 +1969,7 @@ def sparse_to_df(X: csr_matrix, y: List[float]) -> DataFrame: cpu_lr = SparkLogisticRegression(**est_params) gpu_model = gpu_lr.fit(df) - warning_log = ("when standardization is True, spark rapids ml forces densifying sparse vectors to dense vectors for training.") + warning_log = "when standardization is True, spark rapids ml forces densifying sparse vectors to dense vectors for training." assert warning_log in caplog.text cpu_model = cpu_lr.fit(df) From b0d772f5aa2d3eee06494d2edec20e49864a99cf Mon Sep 17 00:00:00 2001 From: Jinfeng Date: Wed, 14 Feb 2024 15:31:01 -0800 Subject: [PATCH 8/9] confirms cudaMemSet error at vars is due to uvm instead of the out-of-memory access bug in sum kernal --- python/src/spark_rapids_ml/classification.py | 34 +++----------------- python/tests/test_logistic_regression.py | 8 ++--- 2 files changed, 7 insertions(+), 35 deletions(-) diff --git a/python/src/spark_rapids_ml/classification.py b/python/src/spark_rapids_ml/classification.py index 00918b80..36f1d8c3 100644 --- a/python/src/spark_rapids_ml/classification.py +++ b/python/src/spark_rapids_ml/classification.py @@ -963,20 +963,6 @@ def _logistic_regression_fit( X_list = [x for (x, _, _) in dfs] y_list = [y for (_, y, _) in dfs] - ## padding zero columns to bypass a cuda kernel bug in cuml 24.02 - #dimension_origin = X_list[0].shape[1] - #enable_padding = dimension_origin % 32 != 0 - #if enable_padding: - # pad_num_cols = 32 - X_list[0].shape[1] % 32 - - # def convert_X(X): - # new_shape = (X.shape[0], X.shape[1] + pad_num_cols) - # X._shape = new_shape - # return X - - # if pad_num_cols > 0: - # X_list = [convert_X(X) for X in X_list] - if isinstance(X_list[0], pd.DataFrame): concated = pd.concat(X_list) concated_y = pd.concat(y_list) @@ -989,24 +975,13 @@ def _logistic_regression_fit( concated, cupyx.scipy.sparse.csr_matrix ) + # densifying sparse vectors into dense to use standardization if self.getStandardization() is True and is_sparse is True: concated = concated.toarray() - #enable_padding = False - #dimension_origin = concated.shape[1] - num_pad_cols = 0 - if self.getStandardization() is True and concated.shape[1] % 32 != 0: - num_pad_cols = concated.shape[1] % 32 - nrows = concated.shape[0] - if isinstance(concated, np.ndarray): - zeros = np.zeros((nrows, num_pad_cols), dtype=concated.dtype) - concated = np.concatenate([concated, zeros], axis=1) - else: - concated._shape = (nrows, concated.shape[1] + num_pad_cols) - pdesc = PartitionDescriptor.build( [concated.shape[0]], - params[param_alias.num_cols] + num_pad_cols, + params[param_alias.num_cols], ) def _single_fit(init_parameters: Dict[str, Any]) -> Dict[str, Any]: @@ -1047,10 +1022,9 @@ def _single_fit(init_parameters: Dict[str, Any]) -> Dict[str, Any]: intercept_mean = sum(intercept_array) / len(intercept_array) intercept_array -= intercept_mean - n_cols = logistic_regression.n_cols - num_pad_cols - + n_cols = logistic_regression.n_cols model = { - "coef_": logistic_regression.coef_[:, :n_cols].tolist(), + "coef_": logistic_regression.coef_.tolist(), "intercept_": intercept_array.tolist(), "classes_": logistic_regression.classes_.tolist(), "n_cols": n_cols, diff --git a/python/tests/test_logistic_regression.py b/python/tests/test_logistic_regression.py index 105337a3..a791705f 100644 --- a/python/tests/test_logistic_regression.py +++ b/python/tests/test_logistic_regression.py @@ -1560,7 +1560,7 @@ def test_sparse_nlp20news( X = twenty_train.data y = twenty_train.target.tolist() - conf = { + conf: Dict[str, Any] = { # "spark.rapids.ml.uvm.enabled": True # Commenting this out can resolve a cudaMemSet error } # enable memory management to run the test case on GPU with small memory (e.g. 2G) with CleanSparkSession(conf) as spark: @@ -1885,11 +1885,9 @@ def train_model(EstimatorClass, ModelClass): # type: ignore assert mg_test_acc > mc_test_acc or abs(mg_test_acc - mc_test_acc) < tolerance -#@pytest.mark.parametrize("fit_intercept", [True, False]) -@pytest.mark.parametrize("fit_intercept", [True]) +@pytest.mark.parametrize("fit_intercept", [True, False]) @pytest.mark.parametrize( - "reg_factors", [(0.0, 0.0)] - #"reg_factors", [(0.0, 0.0), (0.1, 0.0), (0.1, 1.0), (0.1, 0.2)] + "reg_factors", [(0.0, 0.0), (0.1, 0.0), (0.1, 1.0), (0.1, 0.2)] ) def test_standardization_sparse_example( fit_intercept: bool, From 16e9e15f44fd5f3d0851f1c357d21b83e1251af3 Mon Sep 17 00:00:00 2001 From: Jinfeng Date: Fri, 16 Feb 2024 15:57:49 -0800 Subject: [PATCH 9/9] revise per comments --- python/src/spark_rapids_ml/classification.py | 8 ++++- python/tests/test_logistic_regression.py | 32 +++++++++++++------- 2 files changed, 28 insertions(+), 12 deletions(-) diff --git a/python/src/spark_rapids_ml/classification.py b/python/src/spark_rapids_ml/classification.py index 36f1d8c3..6ae0f004 100644 --- a/python/src/spark_rapids_ml/classification.py +++ b/python/src/spark_rapids_ml/classification.py @@ -1019,7 +1019,13 @@ def _single_fit(init_parameters: Dict[str, Any]) -> Dict[str, Any]: init_parameters["fit_intercept"] is True and len(intercept_array) > 1 ): - intercept_mean = sum(intercept_array) / len(intercept_array) + import cupy as cp + + intercept_mean = ( + np.mean(intercept_array) + if isinstance(intercept_array, np.ndarray) + else cp.mean(intercept_array) + ) intercept_array -= intercept_mean n_cols = logistic_regression.n_cols diff --git a/python/tests/test_logistic_regression.py b/python/tests/test_logistic_regression.py index a791705f..59bc5b87 100644 --- a/python/tests/test_logistic_regression.py +++ b/python/tests/test_logistic_regression.py @@ -304,12 +304,10 @@ def test_classifier( reg_param: float = 0.0, elasticNet_param: float = 0.0, tolerance: float = 0.001, - standardization: bool = False, convert_to_sparse: bool = False, ) -> LogisticRegression: - assert ( - standardization is False - ), "standardization=True is not supported due to testing with single-GPU LogisticRegression" + + standardization: bool = False if convert_to_sparse is True: assert feature_type == "vector" @@ -429,6 +427,7 @@ def to_sparse_func(v: Union[SparseVector, DenseVector]) -> SparseVector: @pytest.mark.compat @pytest.mark.parametrize("fit_intercept", [True, False]) +@pytest.mark.parametrize("standardization", [True, False]) @pytest.mark.parametrize( "lr_types", [ @@ -438,6 +437,7 @@ def to_sparse_func(v: Union[SparseVector, DenseVector]) -> SparseVector: ) def test_compat( fit_intercept: bool, + standardization: bool, lr_types: Tuple[LogisticRegressionType, LogisticRegressionModelType], tmp_path: str, ) -> None: @@ -483,7 +483,7 @@ def test_compat( assert _LogisticRegression().getRegParam() == 0.0 blor = _LogisticRegression( - regParam=0.1, fitIntercept=fit_intercept, standardization=False + regParam=0.1, fitIntercept=fit_intercept, standardization=standardization ) assert blor.getRegParam() == 0.1 @@ -513,15 +513,19 @@ def test_compat( assert blor_model.getProbabilityCol() == "newProbability" assert isinstance(blor_model.coefficients, DenseVector) - assert array_equal( - blor_model.coefficients.toArray(), [-2.42377087, 2.42377087], tolerance + + coef_gnd = ( + [-2.48197058, 2.48197058] + if standardization is True + else [-2.42377087, 2.42377087] ) + assert array_equal(blor_model.coefficients.toArray(), coef_gnd, tolerance) assert blor_model.intercept == pytest.approx(0, abs=1e-6) assert isinstance(blor_model.coefficientMatrix, DenseMatrix) assert array_equal( blor_model.coefficientMatrix.toArray(), - np.array([[-2.42377087, 2.42377087]]), + np.array([coef_gnd]), tolerance, ) assert isinstance(blor_model.interceptVector, DenseVector) @@ -546,15 +550,21 @@ def test_compat( output = output_df.head() assert output.prediction == 1.0 + prob_gnd = ( + [0.07713181, 0.92286819] if standardization is True else [0.0814, 0.9186] + ) assert array_equal( output.newProbability.toArray(), - Vectors.dense([0.0814, 0.9186]).toArray(), + Vectors.dense(prob_gnd).toArray(), tolerance, ) - array_equal( + rawPredict_gnd = ( + [-2.48197058, 2.48197058] if standardization is True else [-2.4238, 2.4238] + ) + assert array_equal( output.newRawPrediction.toArray(), - Vectors.dense([-2.4238, 2.4238]).toArray(), + Vectors.dense(rawPredict_gnd).toArray(), tolerance, )