diff --git a/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator b/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator index d4fceadeea2f0..3fad0896339db 100644 --- a/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator +++ b/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Estimator @@ -36,6 +36,7 @@ org.apache.spark.ml.regression.GBTRegressor # clustering org.apache.spark.ml.clustering.KMeans org.apache.spark.ml.clustering.BisectingKMeans +org.apache.spark.ml.clustering.GaussianMixture # recommendation diff --git a/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Transformer b/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Transformer index 5bbc18191e5c7..d2ba928af47ae 100644 --- a/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Transformer +++ b/mllib/src/main/resources/META-INF/services/org.apache.spark.ml.Transformer @@ -42,6 +42,7 @@ org.apache.spark.ml.regression.GBTRegressionModel # clustering org.apache.spark.ml.clustering.KMeansModel org.apache.spark.ml.clustering.BisectingKMeansModel +org.apache.spark.ml.clustering.GaussianMixtureModel # recommendation org.apache.spark.ml.recommendation.ALSModel diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala index d0db5dcba87b5..ad1533cd37a9e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala @@ -93,6 +93,9 @@ class GaussianMixtureModel private[ml] ( extends Model[GaussianMixtureModel] with GaussianMixtureParams with MLWritable with HasTrainingSummary[GaussianMixtureSummary] { + private[ml] def this() = this(Identifiable.randomUID("gmm"), + Array.emptyDoubleArray, Array.empty) + @Since("3.0.0") lazy val numFeatures: Int = gaussians.head.mean.size diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py index 8a518dac380c6..6cd508a9e950b 100644 --- a/python/pyspark/ml/clustering.py +++ b/python/pyspark/ml/clustering.py @@ -241,6 +241,7 @@ def gaussians(self) -> List[MultivariateGaussian]: @property @since("2.0.0") + @try_remote_attribute_relation def gaussiansDF(self) -> DataFrame: """ Retrieve Gaussian distributions as a DataFrame. @@ -542,6 +543,7 @@ def probabilityCol(self) -> str: @property @since("2.1.0") + @try_remote_attribute_relation def probability(self) -> DataFrame: """ DataFrame of probabilities of each cluster for each training data point. diff --git a/python/pyspark/ml/tests/test_clustering.py b/python/pyspark/ml/tests/test_clustering.py index 98b7a4f57c1dc..a6685914eab80 100644 --- a/python/pyspark/ml/tests/test_clustering.py +++ b/python/pyspark/ml/tests/test_clustering.py @@ -29,13 +29,15 @@ BisectingKMeans, BisectingKMeansModel, BisectingKMeansSummary, + GaussianMixture, + GaussianMixtureModel, + GaussianMixtureSummary, ) class ClusteringTestsMixin: - @property - def df(self): - return ( + def test_kmeans(self): + df = ( self.spark.createDataFrame( [ (1, 1.0, Vectors.dense([-0.1, -0.05])), @@ -49,11 +51,9 @@ def df(self): ) .coalesce(1) .sortWithinPartitions("index") + .select("weight", "features") ) - def test_kmeans(self): - df = self.df.select("weight", "features") - km = KMeans( k=2, maxIter=2, @@ -68,11 +68,7 @@ def test_kmeans(self): # self.assertEqual(model.numFeatures, 2) output = model.transform(df) - expected_cols = [ - "weight", - "features", - "prediction", - ] + expected_cols = ["weight", "features", "prediction"] self.assertEqual(output.columns, expected_cols) self.assertEqual(output.count(), 6) @@ -107,7 +103,22 @@ def test_kmeans(self): self.assertEqual(str(model), str(model2)) def test_bisecting_kmeans(self): - df = self.df.select("weight", "features") + df = ( + self.spark.createDataFrame( + [ + (1, 1.0, Vectors.dense([-0.1, -0.05])), + (2, 2.0, Vectors.dense([-0.01, -0.1])), + (3, 3.0, Vectors.dense([0.9, 0.8])), + (4, 1.0, Vectors.dense([0.75, 0.935])), + (5, 1.0, Vectors.dense([-0.83, -0.68])), + (6, 1.0, Vectors.dense([-0.91, -0.76])), + ], + ["index", "weight", "features"], + ) + .coalesce(1) + .sortWithinPartitions("index") + .select("weight", "features") + ) bkm = BisectingKMeans( k=2, @@ -125,11 +136,7 @@ def test_bisecting_kmeans(self): # self.assertEqual(model.numFeatures, 2) output = model.transform(df) - expected_cols = [ - "weight", - "features", - "prediction", - ] + expected_cols = ["weight", "features", "prediction"] self.assertEqual(output.columns, expected_cols) self.assertEqual(output.count(), 6) @@ -166,6 +173,94 @@ def test_bisecting_kmeans(self): model2 = BisectingKMeansModel.load(d) self.assertEqual(str(model), str(model2)) + def test_gaussian_mixture(self): + df = ( + self.spark.createDataFrame( + [ + (1, 1.0, Vectors.dense([-0.1, -0.05])), + (2, 2.0, Vectors.dense([-0.01, -0.1])), + (3, 3.0, Vectors.dense([0.9, 0.8])), + (4, 1.0, Vectors.dense([0.75, 0.935])), + (5, 1.0, Vectors.dense([-0.83, -0.68])), + (6, 1.0, Vectors.dense([-0.91, -0.76])), + ], + ["index", "weight", "features"], + ) + .coalesce(1) + .sortWithinPartitions("index") + .select("weight", "features") + ) + + gmm = GaussianMixture( + k=2, + maxIter=2, + weightCol="weight", + seed=1, + ) + self.assertEqual(gmm.getK(), 2) + self.assertEqual(gmm.getMaxIter(), 2) + self.assertEqual(gmm.getWeightCol(), "weight") + self.assertEqual(gmm.getSeed(), 1) + + model = gmm.fit(df) + # TODO: support GMM.numFeatures in Python + # self.assertEqual(model.numFeatures, 2) + self.assertEqual(len(model.weights), 2) + self.assertTrue( + np.allclose(model.weights, [0.541014115744985, 0.4589858842550149], atol=1e-4), + model.weights, + ) + # TODO: support GMM.gaussians on connect + # self.assertEqual(model.gaussians, xxx) + self.assertEqual(model.gaussiansDF.columns, ["mean", "cov"]) + self.assertEqual(model.gaussiansDF.count(), 2) + + vec = Vectors.dense(0.0, 5.0) + pred = model.predict(vec) + self.assertTrue(np.allclose(pred, 0, atol=1e-4), pred) + pred = model.predictProbability(vec) + self.assertTrue(np.allclose(pred.toArray(), [0.5, 0.5], atol=1e-4), pred) + + output = model.transform(df) + expected_cols = ["weight", "features", "probability", "prediction"] + self.assertEqual(output.columns, expected_cols) + self.assertEqual(output.count(), 6) + + # Model summary + self.assertTrue(model.hasSummary) + summary = model.summary + self.assertTrue(isinstance(summary, GaussianMixtureSummary)) + self.assertEqual(summary.k, 2) + self.assertEqual(summary.numIter, 2) + self.assertEqual(len(summary.clusterSizes), 2) + self.assertEqual(summary.clusterSizes, [3, 3]) + ll = summary.logLikelihood + self.assertTrue(ll < 0, ll) + self.assertTrue(np.allclose(ll, -1.311264553744033, atol=1e-4), ll) + + self.assertEqual(summary.featuresCol, "features") + self.assertEqual(summary.predictionCol, "prediction") + self.assertEqual(summary.probabilityCol, "probability") + + self.assertEqual(summary.cluster.columns, ["prediction"]) + self.assertEqual(summary.cluster.count(), 6) + + self.assertEqual(summary.predictions.columns, expected_cols) + self.assertEqual(summary.predictions.count(), 6) + + self.assertEqual(summary.probability.columns, ["probability"]) + self.assertEqual(summary.predictions.count(), 6) + + # save & load + with tempfile.TemporaryDirectory(prefix="gaussian_mixture") as d: + gmm.write().overwrite().save(d) + gmm2 = GaussianMixture.load(d) + self.assertEqual(str(gmm), str(gmm2)) + + model.write().overwrite().save(d) + model2 = GaussianMixtureModel.load(d) + self.assertEqual(str(model), str(model2)) + class ClusteringTests(ClusteringTestsMixin, unittest.TestCase): def setUp(self) -> None: diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala index 926fc23621634..162ab69e41919 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/ml/MLUtils.scala @@ -564,6 +564,10 @@ private[ml] object MLUtils { classOf[BisectingKMeansModel], Set("predict", "numFeatures", "clusterCenters", "computeCost")), (classOf[BisectingKMeansSummary], Set("trainingCost")), + ( + classOf[GaussianMixtureModel], + Set("predict", "numFeatures", "weights", "gaussians", "predictProbability", "gaussiansDF")), + (classOf[GaussianMixtureSummary], Set("probability", "probabilityCol", "logLikelihood")), // Recommendation Models (