Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-50928][ML][PYTHON][CONNECT] Support GaussianMixture on Connect #49633

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ org.apache.spark.ml.regression.GBTRegressor
# clustering
org.apache.spark.ml.clustering.KMeans
org.apache.spark.ml.clustering.BisectingKMeans
org.apache.spark.ml.clustering.GaussianMixture


# recommendation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ org.apache.spark.ml.regression.GBTRegressionModel
# clustering
org.apache.spark.ml.clustering.KMeansModel
org.apache.spark.ml.clustering.BisectingKMeansModel
org.apache.spark.ml.clustering.GaussianMixtureModel

# recommendation
org.apache.spark.ml.recommendation.ALSModel
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ class GaussianMixtureModel private[ml] (
extends Model[GaussianMixtureModel] with GaussianMixtureParams with MLWritable
with HasTrainingSummary[GaussianMixtureSummary] {

private[ml] def this() = this(Identifiable.randomUID("gmm"),
Array.emptyDoubleArray, Array.empty)

@Since("3.0.0")
lazy val numFeatures: Int = gaussians.head.mean.size

Expand Down
2 changes: 2 additions & 0 deletions python/pyspark/ml/clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ def gaussians(self) -> List[MultivariateGaussian]:

@property
@since("2.0.0")
@try_remote_attribute_relation
def gaussiansDF(self) -> DataFrame:
"""
Retrieve Gaussian distributions as a DataFrame.
Expand Down Expand Up @@ -542,6 +543,7 @@ def probabilityCol(self) -> str:

@property
@since("2.1.0")
@try_remote_attribute_relation
def probability(self) -> DataFrame:
"""
DataFrame of probabilities of each cluster for each training data point.
Expand Down
129 changes: 112 additions & 17 deletions python/pyspark/ml/tests/test_clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@
BisectingKMeans,
BisectingKMeansModel,
BisectingKMeansSummary,
GaussianMixture,
GaussianMixtureModel,
GaussianMixtureSummary,
)


class ClusteringTestsMixin:
@property
def df(self):
return (
def test_kmeans(self):
df = (
self.spark.createDataFrame(
[
(1, 1.0, Vectors.dense([-0.1, -0.05])),
Expand All @@ -49,11 +51,9 @@ def df(self):
)
.coalesce(1)
.sortWithinPartitions("index")
.select("weight", "features")
)

def test_kmeans(self):
df = self.df.select("weight", "features")

km = KMeans(
k=2,
maxIter=2,
Expand All @@ -68,11 +68,7 @@ def test_kmeans(self):
# self.assertEqual(model.numFeatures, 2)

output = model.transform(df)
expected_cols = [
"weight",
"features",
"prediction",
]
expected_cols = ["weight", "features", "prediction"]
self.assertEqual(output.columns, expected_cols)
self.assertEqual(output.count(), 6)

Expand Down Expand Up @@ -107,7 +103,22 @@ def test_kmeans(self):
self.assertEqual(str(model), str(model2))

def test_bisecting_kmeans(self):
df = self.df.select("weight", "features")
df = (
self.spark.createDataFrame(
[
(1, 1.0, Vectors.dense([-0.1, -0.05])),
(2, 2.0, Vectors.dense([-0.01, -0.1])),
(3, 3.0, Vectors.dense([0.9, 0.8])),
(4, 1.0, Vectors.dense([0.75, 0.935])),
(5, 1.0, Vectors.dense([-0.83, -0.68])),
(6, 1.0, Vectors.dense([-0.91, -0.76])),
],
["index", "weight", "features"],
)
.coalesce(1)
.sortWithinPartitions("index")
.select("weight", "features")
)

bkm = BisectingKMeans(
k=2,
Expand All @@ -125,11 +136,7 @@ def test_bisecting_kmeans(self):
# self.assertEqual(model.numFeatures, 2)

output = model.transform(df)
expected_cols = [
"weight",
"features",
"prediction",
]
expected_cols = ["weight", "features", "prediction"]
self.assertEqual(output.columns, expected_cols)
self.assertEqual(output.count(), 6)

Expand Down Expand Up @@ -166,6 +173,94 @@ def test_bisecting_kmeans(self):
model2 = BisectingKMeansModel.load(d)
self.assertEqual(str(model), str(model2))

def test_gaussian_mixture(self):
df = (
self.spark.createDataFrame(
[
(1, 1.0, Vectors.dense([-0.1, -0.05])),
(2, 2.0, Vectors.dense([-0.01, -0.1])),
(3, 3.0, Vectors.dense([0.9, 0.8])),
(4, 1.0, Vectors.dense([0.75, 0.935])),
(5, 1.0, Vectors.dense([-0.83, -0.68])),
(6, 1.0, Vectors.dense([-0.91, -0.76])),
],
["index", "weight", "features"],
)
.coalesce(1)
.sortWithinPartitions("index")
.select("weight", "features")
)

gmm = GaussianMixture(
k=2,
maxIter=2,
weightCol="weight",
seed=1,
)
self.assertEqual(gmm.getK(), 2)
self.assertEqual(gmm.getMaxIter(), 2)
self.assertEqual(gmm.getWeightCol(), "weight")
self.assertEqual(gmm.getSeed(), 1)

model = gmm.fit(df)
# TODO: support GMM.numFeatures in Python
# self.assertEqual(model.numFeatures, 2)
self.assertEqual(len(model.weights), 2)
self.assertTrue(
np.allclose(model.weights, [0.541014115744985, 0.4589858842550149], atol=1e-4),
model.weights,
)
# TODO: support GMM.gaussians on connect
# self.assertEqual(model.gaussians, xxx)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

file SPARK-50969 to track it

self.assertEqual(model.gaussiansDF.columns, ["mean", "cov"])
self.assertEqual(model.gaussiansDF.count(), 2)

vec = Vectors.dense(0.0, 5.0)
pred = model.predict(vec)
self.assertTrue(np.allclose(pred, 0, atol=1e-4), pred)
pred = model.predictProbability(vec)
self.assertTrue(np.allclose(pred.toArray(), [0.5, 0.5], atol=1e-4), pred)

output = model.transform(df)
expected_cols = ["weight", "features", "probability", "prediction"]
self.assertEqual(output.columns, expected_cols)
self.assertEqual(output.count(), 6)

# Model summary
self.assertTrue(model.hasSummary)
summary = model.summary
self.assertTrue(isinstance(summary, GaussianMixtureSummary))
self.assertEqual(summary.k, 2)
self.assertEqual(summary.numIter, 2)
self.assertEqual(len(summary.clusterSizes), 2)
self.assertEqual(summary.clusterSizes, [3, 3])
ll = summary.logLikelihood
self.assertTrue(ll < 0, ll)
self.assertTrue(np.allclose(ll, -1.311264553744033, atol=1e-4), ll)

self.assertEqual(summary.featuresCol, "features")
self.assertEqual(summary.predictionCol, "prediction")
self.assertEqual(summary.probabilityCol, "probability")

self.assertEqual(summary.cluster.columns, ["prediction"])
self.assertEqual(summary.cluster.count(), 6)

self.assertEqual(summary.predictions.columns, expected_cols)
self.assertEqual(summary.predictions.count(), 6)

self.assertEqual(summary.probability.columns, ["probability"])
self.assertEqual(summary.predictions.count(), 6)

# save & load
with tempfile.TemporaryDirectory(prefix="gaussian_mixture") as d:
gmm.write().overwrite().save(d)
gmm2 = GaussianMixture.load(d)
self.assertEqual(str(gmm), str(gmm2))

model.write().overwrite().save(d)
model2 = GaussianMixtureModel.load(d)
self.assertEqual(str(model), str(model2))


class ClusteringTests(ClusteringTestsMixin, unittest.TestCase):
def setUp(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,10 @@ private[ml] object MLUtils {
classOf[BisectingKMeansModel],
Set("predict", "numFeatures", "clusterCenters", "computeCost")),
(classOf[BisectingKMeansSummary], Set("trainingCost")),
(
classOf[GaussianMixtureModel],
Set("predict", "numFeatures", "weights", "gaussians", "predictProbability", "gaussiansDF")),
(classOf[GaussianMixtureSummary], Set("probability", "probabilityCol", "logLikelihood")),

// Recommendation Models
(
Expand Down
Loading