Skip to content

Commit

Permalink
[SPARK-50928][ML][PYTHON][CONNECT] Support GaussianMixture on Connect
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Support GaussianMixture on Connect

### Why are the changes needed?
For feature parity

### Does this PR introduce _any_ user-facing change?
Yes

### How was this patch tested?
Added test

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #49633 from zhengruifeng/ml_connect_gmm.

Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Ruifeng Zheng <[email protected]>
  • Loading branch information
zhengruifeng committed Jan 24, 2025
1 parent 311a4e0 commit 560dd5e
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ org.apache.spark.ml.regression.GBTRegressor
# clustering
org.apache.spark.ml.clustering.KMeans
org.apache.spark.ml.clustering.BisectingKMeans
org.apache.spark.ml.clustering.GaussianMixture


# recommendation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ org.apache.spark.ml.regression.GBTRegressionModel
# clustering
org.apache.spark.ml.clustering.KMeansModel
org.apache.spark.ml.clustering.BisectingKMeansModel
org.apache.spark.ml.clustering.GaussianMixtureModel

# recommendation
org.apache.spark.ml.recommendation.ALSModel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ class GaussianMixtureModel private[ml] (
extends Model[GaussianMixtureModel] with GaussianMixtureParams with MLWritable
with HasTrainingSummary[GaussianMixtureSummary] {

private[ml] def this() = this(Identifiable.randomUID("gmm"),
Array.emptyDoubleArray, Array.empty)

@Since("3.0.0")
lazy val numFeatures: Int = gaussians.head.mean.size

Expand Down
2 changes: 2 additions & 0 deletions python/pyspark/ml/clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ def gaussians(self) -> List[MultivariateGaussian]:

@property
@since("2.0.0")
@try_remote_attribute_relation
def gaussiansDF(self) -> DataFrame:
"""
Retrieve Gaussian distributions as a DataFrame.
Expand Down Expand Up @@ -542,6 +543,7 @@ def probabilityCol(self) -> str:

@property
@since("2.1.0")
@try_remote_attribute_relation
def probability(self) -> DataFrame:
"""
DataFrame of probabilities of each cluster for each training data point.
Expand Down
129 changes: 112 additions & 17 deletions python/pyspark/ml/tests/test_clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@
BisectingKMeans,
BisectingKMeansModel,
BisectingKMeansSummary,
GaussianMixture,
GaussianMixtureModel,
GaussianMixtureSummary,
)


class ClusteringTestsMixin:
@property
def df(self):
return (
def test_kmeans(self):
df = (
self.spark.createDataFrame(
[
(1, 1.0, Vectors.dense([-0.1, -0.05])),
Expand All @@ -49,11 +51,9 @@ def df(self):
)
.coalesce(1)
.sortWithinPartitions("index")
.select("weight", "features")
)

def test_kmeans(self):
df = self.df.select("weight", "features")

km = KMeans(
k=2,
maxIter=2,
Expand All @@ -68,11 +68,7 @@ def test_kmeans(self):
# self.assertEqual(model.numFeatures, 2)

output = model.transform(df)
expected_cols = [
"weight",
"features",
"prediction",
]
expected_cols = ["weight", "features", "prediction"]
self.assertEqual(output.columns, expected_cols)
self.assertEqual(output.count(), 6)

Expand Down Expand Up @@ -107,7 +103,22 @@ def test_kmeans(self):
self.assertEqual(str(model), str(model2))

def test_bisecting_kmeans(self):
df = self.df.select("weight", "features")
df = (
self.spark.createDataFrame(
[
(1, 1.0, Vectors.dense([-0.1, -0.05])),
(2, 2.0, Vectors.dense([-0.01, -0.1])),
(3, 3.0, Vectors.dense([0.9, 0.8])),
(4, 1.0, Vectors.dense([0.75, 0.935])),
(5, 1.0, Vectors.dense([-0.83, -0.68])),
(6, 1.0, Vectors.dense([-0.91, -0.76])),
],
["index", "weight", "features"],
)
.coalesce(1)
.sortWithinPartitions("index")
.select("weight", "features")
)

bkm = BisectingKMeans(
k=2,
Expand All @@ -125,11 +136,7 @@ def test_bisecting_kmeans(self):
# self.assertEqual(model.numFeatures, 2)

output = model.transform(df)
expected_cols = [
"weight",
"features",
"prediction",
]
expected_cols = ["weight", "features", "prediction"]
self.assertEqual(output.columns, expected_cols)
self.assertEqual(output.count(), 6)

Expand Down Expand Up @@ -166,6 +173,94 @@ def test_bisecting_kmeans(self):
model2 = BisectingKMeansModel.load(d)
self.assertEqual(str(model), str(model2))

def test_gaussian_mixture(self):
df = (
self.spark.createDataFrame(
[
(1, 1.0, Vectors.dense([-0.1, -0.05])),
(2, 2.0, Vectors.dense([-0.01, -0.1])),
(3, 3.0, Vectors.dense([0.9, 0.8])),
(4, 1.0, Vectors.dense([0.75, 0.935])),
(5, 1.0, Vectors.dense([-0.83, -0.68])),
(6, 1.0, Vectors.dense([-0.91, -0.76])),
],
["index", "weight", "features"],
)
.coalesce(1)
.sortWithinPartitions("index")
.select("weight", "features")
)

gmm = GaussianMixture(
k=2,
maxIter=2,
weightCol="weight",
seed=1,
)
self.assertEqual(gmm.getK(), 2)
self.assertEqual(gmm.getMaxIter(), 2)
self.assertEqual(gmm.getWeightCol(), "weight")
self.assertEqual(gmm.getSeed(), 1)

model = gmm.fit(df)
# TODO: support GMM.numFeatures in Python
# self.assertEqual(model.numFeatures, 2)
self.assertEqual(len(model.weights), 2)
self.assertTrue(
np.allclose(model.weights, [0.541014115744985, 0.4589858842550149], atol=1e-4),
model.weights,
)
# TODO: support GMM.gaussians on connect
# self.assertEqual(model.gaussians, xxx)
self.assertEqual(model.gaussiansDF.columns, ["mean", "cov"])
self.assertEqual(model.gaussiansDF.count(), 2)

vec = Vectors.dense(0.0, 5.0)
pred = model.predict(vec)
self.assertTrue(np.allclose(pred, 0, atol=1e-4), pred)
pred = model.predictProbability(vec)
self.assertTrue(np.allclose(pred.toArray(), [0.5, 0.5], atol=1e-4), pred)

output = model.transform(df)
expected_cols = ["weight", "features", "probability", "prediction"]
self.assertEqual(output.columns, expected_cols)
self.assertEqual(output.count(), 6)

# Model summary
self.assertTrue(model.hasSummary)
summary = model.summary
self.assertTrue(isinstance(summary, GaussianMixtureSummary))
self.assertEqual(summary.k, 2)
self.assertEqual(summary.numIter, 2)
self.assertEqual(len(summary.clusterSizes), 2)
self.assertEqual(summary.clusterSizes, [3, 3])
ll = summary.logLikelihood
self.assertTrue(ll < 0, ll)
self.assertTrue(np.allclose(ll, -1.311264553744033, atol=1e-4), ll)

self.assertEqual(summary.featuresCol, "features")
self.assertEqual(summary.predictionCol, "prediction")
self.assertEqual(summary.probabilityCol, "probability")

self.assertEqual(summary.cluster.columns, ["prediction"])
self.assertEqual(summary.cluster.count(), 6)

self.assertEqual(summary.predictions.columns, expected_cols)
self.assertEqual(summary.predictions.count(), 6)

self.assertEqual(summary.probability.columns, ["probability"])
self.assertEqual(summary.predictions.count(), 6)

# save & load
with tempfile.TemporaryDirectory(prefix="gaussian_mixture") as d:
gmm.write().overwrite().save(d)
gmm2 = GaussianMixture.load(d)
self.assertEqual(str(gmm), str(gmm2))

model.write().overwrite().save(d)
model2 = GaussianMixtureModel.load(d)
self.assertEqual(str(model), str(model2))


class ClusteringTests(ClusteringTestsMixin, unittest.TestCase):
def setUp(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,10 @@ private[ml] object MLUtils {
classOf[BisectingKMeansModel],
Set("predict", "numFeatures", "clusterCenters", "computeCost")),
(classOf[BisectingKMeansSummary], Set("trainingCost")),
(
classOf[GaussianMixtureModel],
Set("predict", "numFeatures", "weights", "gaussians", "predictProbability", "gaussiansDF")),
(classOf[GaussianMixtureSummary], Set("probability", "probabilityCol", "logLikelihood")),

// Recommendation Models
(
Expand Down

0 comments on commit 560dd5e

Please sign in to comment.