Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[LogisticRegression] Support standardization for dense vectors #565

Merged
merged 9 commits into from
Feb 17, 2024
48 changes: 43 additions & 5 deletions python/src/spark_rapids_ml/classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -660,7 +660,7 @@ def _param_mapping(cls) -> Dict[str, Optional[str]]:
"fitIntercept": "fit_intercept",
"threshold": None,
"thresholds": None,
"standardization": "", # Set to "" instead of None because cuml defaults to standardization = False
"standardization": "standardization",
"weightCol": None,
"aggregationDepth": None,
"family": "", # family can be 'auto', 'binomial' or 'multinomial', cuml automatically detects num_classes
Expand All @@ -680,6 +680,7 @@ def _param_value_mapping(
def _get_cuml_params_default(self) -> Dict[str, Any]:
return {
"fit_intercept": True,
"standardization": False,
"verbose": False,
"C": 1.0,
"penalty": "l2",
Expand Down Expand Up @@ -848,6 +849,10 @@ class LogisticRegression(
Note this is only supported in spark >= 3.4.
fitIntercept:
Whether to fit an intercept term.
standardization:
Whether to standardize the training data. If true, spark rapids ml sets enable_sparse_data_optim=False
to densify sparse vectors into dense vectors for fitting. Currently there is no support for sparse vectors
standardization in cuml yet.
num_workers:
Number of cuML workers, where each cuML worker corresponds to one Spark task
running on one GPU. If not set, spark-rapids-ml tries to infer the number of
Expand Down Expand Up @@ -909,6 +914,7 @@ def __init__(
elasticNetParam: float = 0.0,
tol: float = 1e-6,
fitIntercept: bool = True,
standardization: bool = True,
enable_sparse_data_optim: Optional[bool] = None,
num_workers: Optional[int] = None,
verbose: Union[int, bool] = False,
Expand All @@ -919,6 +925,7 @@ def __init__(
"This estimator does not support double precision inputs. Setting float32_inputs to False will be ignored."
)
self._input_kwargs.pop("float32_inputs")

super().__init__()
self._set_cuml_reg_params()
self._set_params(**self._input_kwargs)
Expand All @@ -936,6 +943,17 @@ def _get_cuml_fit_func(
]:
array_order = self._fit_array_order()

logger = get_logger(self.__class__)
if (
self.getStandardization() is True
and self.getOrDefault("enable_sparse_data_optim") is not False
):
logger.warning(
(
"when standardization is True, spark rapids ml forces densifying sparse vectors to dense vectors for training."
)
)

def _logistic_regression_fit(
dfs: FitInputType,
params: Dict[str, Any],
Expand All @@ -944,6 +962,7 @@ def _logistic_regression_fit(

X_list = [x for (x, _, _) in dfs]
y_list = [y for (_, y, _) in dfs]

if isinstance(X_list[0], pd.DataFrame):
concated = pd.concat(X_list)
concated_y = pd.concat(y_list)
Expand All @@ -952,8 +971,17 @@ def _logistic_regression_fit(
concated = _concat_and_free(X_list, order=array_order)
concated_y = _concat_and_free(y_list, order=array_order)

is_sparse = isinstance(concated, scipy.sparse.csr_matrix) or isinstance(
eordentlich marked this conversation as resolved.
Show resolved Hide resolved
concated, cupyx.scipy.sparse.csr_matrix
)

# densifying sparse vectors into dense to use standardization
if self.getStandardization() is True and is_sparse is True:
concated = concated.toarray()

pdesc = PartitionDescriptor.build(
[concated.shape[0]], params[param_alias.num_cols]
[concated.shape[0]],
params[param_alias.num_cols],
)

def _single_fit(init_parameters: Dict[str, Any]) -> Dict[str, Any]:
Expand Down Expand Up @@ -985,11 +1013,21 @@ def _single_fit(init_parameters: Dict[str, Any]) -> Dict[str, Any]:
pdesc.rank,
)

intercept_array = logistic_regression.intercept_
# follow Spark to center the intercepts for multinomial classification
if (
init_parameters["fit_intercept"] is True
and len(intercept_array) > 1
):
intercept_mean = sum(intercept_array) / len(intercept_array)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a 'mean' method that can be called? Also, how does this not change the model output?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, revised the code to use np.mean and cp.mean

intercept_array -= intercept_mean

n_cols = logistic_regression.n_cols
model = {
"coef_": logistic_regression.coef_.tolist(),
"intercept_": logistic_regression.intercept_.tolist(),
"intercept_": intercept_array.tolist(),
"classes_": logistic_regression.classes_.tolist(),
"n_cols": logistic_regression.n_cols,
"n_cols": n_cols,
"dtype": logistic_regression.dtype.name,
"num_iters": logistic_regression.solver_model.num_iters,
"objective": logistic_regression.solver_model.objective,
Expand All @@ -1016,7 +1054,7 @@ def _single_fit(init_parameters: Dict[str, Any]) -> Dict[str, Any]:
)

if init_parameters["fit_intercept"] is True:
model["coef_"] = [[0.0] * logistic_regression.n_cols]
model["coef_"] = [[0.0] * n_cols]
model["intercept_"] = [
float("inf") if class_val == 1.0 else float("-inf")
]
Expand Down
Loading
Loading