From a3d07130a6edf6965ca77f843a7bb081ba42eecd Mon Sep 17 00:00:00 2001 From: Mike Date: Wed, 28 Aug 2013 17:55:46 -0700 Subject: [PATCH 1/2] In SparkLR, use fold() instead of reduce(), to avoid Vector instantiations. Also add an example of Kryo usage, and an optional "points" parameter. --- .../main/scala/spark/examples/SparkLR.scala | 30 +++++++++++++++---- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/examples/src/main/scala/spark/examples/SparkLR.scala b/examples/src/main/scala/spark/examples/SparkLR.scala index 52a0d69744..6a635a085d 100644 --- a/examples/src/main/scala/spark/examples/SparkLR.scala +++ b/examples/src/main/scala/spark/examples/SparkLR.scala @@ -21,19 +21,33 @@ import java.util.Random import scala.math.exp import spark.util.Vector import spark._ +import com.esotericsoftware.kryo.Kryo /** * Logistic regression based classification. */ object SparkLR { - val N = 10000 // Number of data points + case class DataPoint(x: Vector, y: Double) + + class MyRegistrator extends KryoRegistrator { + override def registerClasses(kryo: Kryo) { + kryo.setRegistrationRequired(true) + + kryo.register(classOf[scala.collection.mutable.WrappedArray.ofRef[_]]) + kryo.register(classOf[java.lang.Class[_]]) + kryo.register(classOf[DataPoint]) + kryo.register(classOf[Array[DataPoint]]) + kryo.register(classOf[Vector]) + kryo.register(classOf[Array[Double]]) + } + } + + var N = 10000 // Number of data points val D = 10 // Numer of dimensions val R = 0.7 // Scaling factor val ITERATIONS = 5 val rand = new Random(42) - case class DataPoint(x: Vector, y: Double) - def generateData = { def generatePoint(i: Int) = { val y = if(i % 2 == 0) -1 else 1 @@ -45,12 +59,17 @@ object SparkLR { def main(args: Array[String]) { if (args.length == 0) { - System.err.println("Usage: SparkLR []") + System.err.println("Usage: SparkLR [ []]") System.exit(1) } + + System.setProperty("spark.serializer", "spark.KryoSerializer") + System.setProperty("spark.kryo.registrator", "spark.examples.SparkLR$MyRegistrator") val sc = new SparkContext(args(0), "SparkLR", System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) + val numSlices = if (args.length > 1) args(1).toInt else 2 + if (args.length > 2) N = args(2).toInt val points = sc.parallelize(generateData, numSlices).cache() // Initialize w to a random value @@ -59,9 +78,10 @@ object SparkLR { for (i <- 1 to ITERATIONS) { println("On iteration " + i) + val zero = Vector.zeros(D) val gradient = points.map { p => (1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y * p.x - }.reduce(_ + _) + }.fold(zero)(_ += _) w -= gradient } From 90ede8e39607c4bd012d39e654894342653af802 Mon Sep 17 00:00:00 2001 From: Mike Date: Mon, 2 Sep 2013 12:03:45 -0700 Subject: [PATCH 2/2] Add a saxpy method to Vector. Replace fold() with aggregate() in SparkLR, to avoid Vector instatiations. --- core/src/main/scala/spark/util/Vector.scala | 16 ++++++++++++++++ .../src/main/scala/spark/examples/SparkLR.scala | 4 ++-- 2 files changed, 18 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/spark/util/Vector.scala b/core/src/main/scala/spark/util/Vector.scala index a47cac3b96..c57f5e4dd5 100644 --- a/core/src/main/scala/spark/util/Vector.scala +++ b/core/src/main/scala/spark/util/Vector.scala @@ -83,6 +83,22 @@ class Vector(val elements: Array[Double]) extends Serializable { def addInPlace(other: Vector) = this +=other + /** + * Perform a saxpy operation: multiply the given vector by the given scalar and add the + * result to this vector, returning this vector. + */ + def saxpy(a: Double, x: Vector) = { + if (length != x.length) + throw new IllegalArgumentException("Vectors of different length") + + var i = 0 + while (i < length) { + elements(i) += a * x(i) + i += 1 + } + this + } + def * (scale: Double): Vector = Vector(length, i => this(i) * scale) def multiply (d: Double) = this * d diff --git a/examples/src/main/scala/spark/examples/SparkLR.scala b/examples/src/main/scala/spark/examples/SparkLR.scala index 6a635a085d..2a1f7b16e1 100644 --- a/examples/src/main/scala/spark/examples/SparkLR.scala +++ b/examples/src/main/scala/spark/examples/SparkLR.scala @@ -80,8 +80,8 @@ object SparkLR { println("On iteration " + i) val zero = Vector.zeros(D) val gradient = points.map { p => - (1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y * p.x - }.fold(zero)(_ += _) + ((1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y, p.x) + }.aggregate(zero)((sum,v) => sum saxpy (v._1,v._2), _ += _) w -= gradient }