Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply initial centroids on Spark Kmeans workload. #187

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
*/
package org.apache.spark.examples.mllib;

import java.util.regex.Pattern;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.mahout.clustering.kmeans.Kluster;
import org.apache.mahout.math.VectorWritable;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
Expand All @@ -41,37 +41,25 @@
*/
public final class JavaKMeans {

private static class ParsePoint implements Function<String, Vector> {
private static final Pattern SPACE = Pattern.compile(" ");

@Override
public Vector call(String line) {
String[] tok = SPACE.split(line);
double[] point = new double[tok.length];
for (int i = 0; i < tok.length; ++i) {
point[i] = Double.parseDouble(tok[i]);
}
return Vectors.dense(point);
}
}

public static void main(String[] args) {
if (args.length < 3) {
if (args.length < 4) {
System.err.println(
"Usage: JavaKMeans <input_file> <k> <max_iterations> [<runs>]");
"Usage: JavaKMeans <input_file> <input_cluster> <k> <max_iterations> [<runs>]");
System.exit(1);
}
String inputFile = args[0];
int k = Integer.parseInt(args[1]);
int iterations = Integer.parseInt(args[2]);
String inputCluster = args[1];
int k = Integer.parseInt(args[2]);
int iterations = Integer.parseInt(args[3]);
int runs = 1;

if (args.length >= 4) {
runs = Integer.parseInt(args[3]);
if (args.length >= 5) {
runs = Integer.parseInt(args[4]);
}
SparkConf sparkConf = new SparkConf().setAppName("JavaKMeans");
JavaSparkContext sc = new JavaSparkContext(sparkConf);

// Load input points
JavaPairRDD<LongWritable, VectorWritable> data = sc.sequenceFile(inputFile,
LongWritable.class, VectorWritable.class);

Expand All @@ -87,7 +75,28 @@ public Vector call(Tuple2<LongWritable, VectorWritable> e) {
}
});

KMeansModel model = KMeans.train(points.rdd(), k, iterations, runs, KMeans.K_MEANS_PARALLEL());
// Load initial centroids
JavaPairRDD<Text, Kluster> clusters = sc.sequenceFile(inputCluster, Text.class, Kluster.class);
JavaRDD<Vector> centroids = clusters.map(new Function<Tuple2<Text, Kluster>, Vector>() {
@Override
public Vector call(Tuple2<Text, Kluster> e) {
org.apache.mahout.math.Vector centroid = e._2().getCenter();
double[] v = new double[centroid.size()];
for (int i = 0; i < centroid.size(); ++i) {
v[i] = centroid.get(i);
}
return Vectors.dense(v);
}
});

// Train model
KMeansModel initModel = new KMeansModel(centroids.collect());
KMeansModel model = new KMeans()
.setK(k)
.setMaxIterations(iterations)
.setRuns(runs)
.setInitialModel(initModel)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use KMeans.RANDOM? What is the difference?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KMeans.RANDOM is from Spark MLLib and KMeans.setInitialModel shares the initial centroids generated by HiBench GenKMeansDataset. To get a meaningful comparison, we have to pick one of approaches and apply it on both of MapReduce and Spark benchmarks. In this PR, we select the latter approach, HiBench GenKMeansDataset, to generate the random centroids based-on normal (Gaussian) distribution.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ye, the random centroids would be generated by takeSample without a normal distribution in Spark Kmeans even if KMeans.RANDOM is used. If getting a meaningful comparison between MapReduce and Spark benchmarks is the core value of HiBench, I agree with your approach, it looks like not enough for the performance of Spark K-Means algorithm implementation by itself though. @carsonwang

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if k != number of centroids in initiModel, Spark KMeans will throw exception:
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: mismatched cluster count

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hqzizania, this should be an expected behavior. If Spark KMeans uses HiBench generated initial model, the parameter k must has the matched value as defined in the initiModel.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pfxuan , please see 10-data-scale-profile.conf. The num_of_clusters used to generate the input data and the k might be different. This is one of the concerns for using the input centroids as the initial model. The MR version is currently using the input centroids and doesn't pass the k parameter. We'd prefer to using the k parameter as this is the expected number of clusters.
As @hqzizania mentioned, the problem in Java Kmeans is the RRD is not cached. Can you please have a check with the scala version which supports both KMeas || and Random. It seems there is no huge computation cost when initializing the mode as the RDD is cached? If this is the case, we can fix the Java version and also pass -k to the MR version. This should make all of them comparable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did the performance test on scala version Kmeans. The size of input data is about 100 GB across 4 nodes. The running time on Random and Parallel is almost same, which took about 4 mins for running 3 iterations including 1.3 mins on centroid initialization. So there is about 48.1% overhead when using either the Spark version of Random or Parallel. As a comparison, the implementation of this PR only took about 2.4 mins for 3 iterations with almost zero-overhead on initialization.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition, Mahout version of random initialization is a sequential rather than MapReduce-based implementation. I passed -k 20 parameter to the MR benchmark, and it took 18.8 mins to generate 20 initial centroids using only one CPU core. To make a reasonable comparison, I think it would be better to keep the original HiBench generator for all Kmeans benchmarks.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @pfxuan , I just got chance to run KMean with Random initialization by passing --initMode Random in run.sh. In my test, the random initialization ran much faster and I saw much less stages. The Parallel initialization is slow as there are many stages like collect and aggregate which run several iterations. But these stages are not expected when using Random initialization. Can you take a look at the Spark UI to see if there is any difference?

.run(points.rdd());

System.out.println("Cluster centers:");
for (Vector center : model.clusterCenters()) {
Expand Down
4 changes: 2 additions & 2 deletions workloads/kmeans/spark/java/bin/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ rmr-hdfs $OUTPUT_HDFS || true
SIZE=`dir_size $INPUT_HDFS`
START_TIME=`timestamp`

run-spark-job org.apache.spark.examples.mllib.JavaKMeans $INPUT_HDFS/samples $K $MAX_ITERATION
run-spark-job org.apache.spark.examples.mllib.JavaKMeans $INPUT_HDFS/samples $INPUT_HDFS/cluster $K $MAX_ITERATION
END_TIME=`timestamp`

gen_report ${START_TIME} ${END_TIME} ${SIZE}
Expand All @@ -38,5 +38,5 @@ leave_bench

# run bench
#run-spark-job org.apache.spark.examples.mllib.JavaKMeans $INPUT_HDFS $K $MAX_ITERATION || exit 1
#$SPARK_HOME/bin/spark-submit --class org.apache.spark.examples.mllib.JavaKMeans --master ${SPARK_MASTER} ${SPARK_EXAMPLES_JAR} $INPUT_HDFS $K $MAX_ITERATION
#$SPARK_HOME/bin/spark-submit --class org.apache.spark.examples.mllib.JavaKMeans --master ${SPARK_MASTER} ${SPARK_EXAMPLES_JAR} $INPUT_HDFS/samples $INPUT_HDFS/cluster $K $MAX_ITERATION