Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[auto-merge] branch-24.12 to branch-25.02 [skip ci] [bot] #806

Merged
merged 1 commit into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 0 additions & 60 deletions python/src/spark_rapids_ml/classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -986,60 +986,7 @@ def _logistic_regression_fit(
params[param_alias.num_cols],
)

# Use cupy to standardize dataset as a workaround to gain better numeric stability
standarization_with_cupy = standardization and not is_sparse
if standarization_with_cupy is True:
import cupy as cp

if isinstance(concated, np.ndarray):
concated = cp.array(concated)
elif isinstance(concated, pd.DataFrame):
concated = cp.array(concated.values)
else:
assert isinstance(
concated, cp.ndarray
), "only numpy array, cupy array, and pandas dataframe are supported when standardization_with_cupy is on"

mean_partial = concated.sum(axis=0) / pdesc.m

import json

from pyspark import BarrierTaskContext

context = BarrierTaskContext.get()

def all_gather_then_sum(
cp_array: cp.ndarray, dtype: Union[np.float32, np.float64]
) -> cp.ndarray:
msgs = context.allGather(json.dumps(cp_array.tolist()))
arrays = [json.loads(p) for p in msgs]
array_sum = np.sum(arrays, axis=0).astype(dtype)
return cp.array(array_sum)

mean = all_gather_then_sum(mean_partial, concated.dtype)
concated -= mean

l2 = cp.linalg.norm(concated, ord=2, axis=0)

var_partial = l2 * l2 / (pdesc.m - 1)
var = all_gather_then_sum(var_partial, concated.dtype)

assert cp.all(
var >= 0
), "numeric instable detected when calculating variance. Got negative variance"

stddev = cp.sqrt(var)

stddev_inv = cp.where(stddev != 0, 1.0 / stddev, 1.0)

if fit_intercept is False:
concated += mean

concated *= stddev_inv

def _single_fit(init_parameters: Dict[str, Any]) -> Dict[str, Any]:
if standarization_with_cupy is True:
init_parameters["standardization"] = False

if init_parameters["C"] == 0.0:
init_parameters["penalty"] = None
Expand Down Expand Up @@ -1071,13 +1018,6 @@ def _single_fit(init_parameters: Dict[str, Any]) -> Dict[str, Any]:

coef_ = logistic_regression.coef_
intercept_ = logistic_regression.intercept_
if standarization_with_cupy is True:
import cupy as cp

coef_ = cp.where(stddev > 0, coef_ / stddev, coef_)
if init_parameters["fit_intercept"] is True:
intercept_ = intercept_ - cp.dot(coef_, mean)

intercept_array = intercept_
# follow Spark to center the intercepts for multinomial classification
if (
Expand Down
20 changes: 10 additions & 10 deletions python/tests/test_logistic_regression.py
Original file line number Diff line number Diff line change
Expand Up @@ -580,7 +580,7 @@ def test_compat(
else [-2.42377087, 2.42377087]
)
assert array_equal(blor_model.coefficients.toArray(), coef_gnd, tolerance)
assert blor_model.intercept == pytest.approx(0, abs=1e-4)
assert blor_model.intercept == pytest.approx(0, abs=tolerance)

assert isinstance(blor_model.coefficientMatrix, DenseMatrix)
assert array_equal(
Expand All @@ -589,7 +589,7 @@ def test_compat(
tolerance,
)
assert isinstance(blor_model.interceptVector, DenseVector)
assert array_equal(blor_model.interceptVector.toArray(), [0.0])
assert array_equal(blor_model.interceptVector.toArray(), [0.0], tolerance)

example = bdf.head()
if example:
Expand Down Expand Up @@ -2238,10 +2238,10 @@ def test_sparse_all_zeroes(

with CleanSparkSession() as spark:
data = [
Row(label=1.0, weight=1.0, features=Vectors.sparse(2, {})),
Row(label=1.0, weight=1.0, features=Vectors.sparse(2, {})),
Row(label=0.0, weight=1.0, features=Vectors.sparse(2, {})),
Row(label=0.0, weight=1.0, features=Vectors.sparse(2, {})),
Row(label=1.0, features=Vectors.sparse(2, {})),
Row(label=1.0, features=Vectors.sparse(2, {})),
Row(label=0.0, features=Vectors.sparse(2, {})),
Row(label=0.0, features=Vectors.sparse(2, {})),
]

bdf = spark.createDataFrame(data)
Expand Down Expand Up @@ -2281,10 +2281,10 @@ def test_sparse_one_gpu_all_zeroes(

with CleanSparkSession() as spark:
data = [
Row(label=1.0, weight=1.0, features=Vectors.sparse(2, {0: 10.0, 1: 20.0})),
Row(label=1.0, weight=1.0, features=Vectors.sparse(2, {})),
Row(label=0.0, weight=1.0, features=Vectors.sparse(2, {})),
Row(label=0.0, weight=1.0, features=Vectors.sparse(2, {})),
Row(label=1.0, features=Vectors.sparse(2, {0: 10.0, 1: 20.0})),
Row(label=1.0, features=Vectors.sparse(2, {})),
Row(label=0.0, features=Vectors.sparse(2, {})),
Row(label=0.0, features=Vectors.sparse(2, {})),
]

bdf = spark.createDataFrame(data)
Expand Down
Loading