Skip to content

Commit

Permalink
Merge pull request amplab#35 from MartinWeindel/scala-2.10
Browse files Browse the repository at this point in the history
Fixing inconsistencies and warnings on Scala 2.10 branch

- scala 2.10 requires Java 1.6
- using newest Scala release: 2.10.3
- resolved maven-scala-plugin warning
- fixed various compiler warnings
  • Loading branch information
mateiz committed Oct 9, 2013
2 parents 3fe12cc + e09f4a9 commit 7d50f9f
Show file tree
Hide file tree
Showing 53 changed files with 132 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelOption;
import io.netty.channel.oio.OioEventLoopGroup;
import io.netty.channel.socket.oio.OioSocketChannel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.net.InetSocketAddress;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.oio.OioEventLoopGroup;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.api.java;

import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@
package org.apache.spark.api.java.function;


import scala.runtime.AbstractFunction1;

import java.io.Serializable;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.runtime.AbstractFunction1;

import java.io.Serializable;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.runtime.AbstractFunction2;

import java.io.Serializable;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import scala.Tuple2;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.runtime.AbstractFunction1;

import java.io.Serializable;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import scala.Tuple2;
import scala.reflect.ClassTag;
import scala.reflect.ClassTag$;
import scala.runtime.AbstractFunction1;

import java.io.Serializable;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
startDaemon()
new Socket(daemonHost, daemonPort)
}
case e => throw e
case e: Throwable => throw e
}
}
}
Expand Down Expand Up @@ -198,7 +198,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
}
}.start()
} catch {
case e => {
case e: Throwable => {
stopDaemon()
throw e
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.deploy

private[spark] object ExecutorState
extends Enumeration("LAUNCHING", "LOADING", "RUNNING", "KILLED", "FAILED", "LOST") {
extends Enumeration {

val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST = Value

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.deploy.master

private[spark] object ApplicationState
extends Enumeration("WAITING", "RUNNING", "FINISHED", "FAILED") {
extends Enumeration {

type ApplicationState = Value

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.deploy.master

private[spark] object WorkerState extends Enumeration("ALIVE", "DEAD", "DECOMMISSIONED") {
private[spark] object WorkerState extends Enumeration {
type WorkerState = Value

val ALIVE, DEAD, DECOMMISSIONED = Value
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ package org.apache.spark.rdd

import org.apache.spark.{SparkContext, SparkEnv, Partition, TaskContext}
import org.apache.spark.storage.BlockManager
import scala.reflect.ClassTag

private[spark] class BlockRDDPartition(val blockId: String, idx: Int) extends Partition {
val index = idx
}

private[spark]
class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[String])
class BlockRDD[T: ClassTag](sc: SparkContext, @transient blockIds: Array[String])
extends RDD[T](sc, Nil) {

@transient lazy val locations_ = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get)
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.io.{ObjectOutputStream, IOException}
import scala.collection.mutable
import scala.Some
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag

/**
* Class that captures a coalesced RDD by essentially keeping track of parent partitions
Expand Down Expand Up @@ -68,7 +69,7 @@ case class CoalescedRDDPartition(
* @param maxPartitions number of desired partitions in the coalesced RDD
* @param balanceSlack used to trade-off balance and locality. 1.0 is all locality, 0 is all balance
*/
class CoalescedRDD[T: ClassManifest](
class CoalescedRDD[T: ClassTag](
@transient var prev: RDD[T],
maxPartitions: Int,
balanceSlack: Double = 0.10)
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/EmptyRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,13 @@
package org.apache.spark.rdd

import org.apache.spark.{SparkContext, SparkEnv, Partition, TaskContext}
import scala.reflect.ClassTag


/**
* An RDD that is empty, i.e. has no element in it.
*/
class EmptyRDD[T: ClassManifest](sc: SparkContext) extends RDD[T](sc, Nil) {
class EmptyRDD[T: ClassTag](sc: SparkContext) extends RDD[T](sc, Nil) {

override def getPartitions: Array[Partition] = Array.empty

Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/FilteredRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
package org.apache.spark.rdd

import org.apache.spark.{OneToOneDependency, Partition, TaskContext}
import scala.reflect.ClassTag

private[spark] class FilteredRDD[T: ClassManifest](
private[spark] class FilteredRDD[T: ClassTag](
prev: RDD[T],
f: T => Boolean)
extends RDD[T](prev) {
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/FlatMappedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
package org.apache.spark.rdd

import org.apache.spark.{Partition, TaskContext}
import scala.reflect.ClassTag


private[spark]
class FlatMappedRDD[U: ClassManifest, T: ClassManifest](
class FlatMappedRDD[U: ClassTag, T: ClassTag](
prev: RDD[T],
f: T => TraversableOnce[U])
extends RDD[U](prev) {
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/GlommedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
package org.apache.spark.rdd

import org.apache.spark.{Partition, TaskContext}
import scala.reflect.ClassTag

private[spark] class GlommedRDD[T: ClassManifest](prev: RDD[T])
private[spark] class GlommedRDD[T: ClassTag](prev: RDD[T])
extends RDD[Array[T]](prev) {

override def getPartitions: Array[Partition] = firstParent[T].partitions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
package org.apache.spark.rdd

import org.apache.spark.{Partition, TaskContext}
import scala.reflect.ClassTag


private[spark]
class MapPartitionsRDD[U: ClassManifest, T: ClassManifest](
class MapPartitionsRDD[U: ClassTag, T: ClassTag](
prev: RDD[T],
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.rdd

import org.apache.spark.{Partition, TaskContext}
import scala.reflect.ClassTag


/**
Expand All @@ -26,7 +27,7 @@ import org.apache.spark.{Partition, TaskContext}
* information such as the number of tuples in a partition.
*/
private[spark]
class MapPartitionsWithIndexRDD[U: ClassManifest, T: ClassManifest](
class MapPartitionsWithIndexRDD[U: ClassTag, T: ClassTag](
prev: RDD[T],
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/MappedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
package org.apache.spark.rdd

import org.apache.spark.{Partition, TaskContext}
import scala.reflect.ClassTag

private[spark]
class MappedRDD[U: ClassManifest, T: ClassManifest](prev: RDD[T], f: T => U)
class MappedRDD[U: ClassTag, T: ClassTag](prev: RDD[T], f: T => U)
extends RDD[U](prev) {

override def getPartitions: Array[Partition] = firstParent[T].partitions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,17 @@
package org.apache.spark.rdd

import org.apache.spark.{RangePartitioner, Logging}
import scala.reflect.ClassTag

/**
* Extra functions available on RDDs of (key, value) pairs where the key is sortable through
* an implicit conversion. Import `org.apache.spark.SparkContext._` at the top of your program to
* use these functions. They will work with any key type that has a `scala.math.Ordered`
* implementation.
*/
class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest,
V: ClassManifest,
P <: Product2[K, V] : ClassManifest](
class OrderedRDDFunctions[K <% Ordered[K]: ClassTag,
V: ClassTag,
P <: Product2[K, V] : ClassTag](
self: RDD[P])
extends Logging with Serializable {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ import org.apache.spark.Partitioner.defaultPartitioner
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
* Import `org.apache.spark.SparkContext._` at the top of your program to use these functions.
*/
class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)])
class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
extends Logging
with SparkHadoopMapReduceUtil
with Serializable {
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/ShuffledRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.rdd

import org.apache.spark.{Dependency, Partitioner, SparkEnv, ShuffleDependency, Partition, TaskContext}
import scala.reflect.ClassTag


private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
Expand All @@ -32,7 +33,7 @@ private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition {
* @tparam K the key class.
* @tparam V the value class.
*/
class ShuffledRDD[K, V, P <: Product2[K, V] : ClassManifest](
class ShuffledRDD[K, V, P <: Product2[K, V] : ClassTag](
@transient var prev: RDD[P],
part: Partitioner)
extends RDD[P](prev.context, Nil) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ private[spark] class ClusterTaskSetManager(
case cnf: ClassNotFoundException =>
val loader = Thread.currentThread().getContextClassLoader
throw new SparkException("ClassNotFound with classloader: " + loader, cnf)
case ex => throw ex
case ex: Throwable => throw ex
}
// Mark finished and stop if we've finished all the tasks
finished(index) = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ package org.apache.spark.scheduler.cluster
* to order tasks amongst a Schedulable's sub-queues
* "NONE" is used when the a Schedulable has no sub-queues.
*/
object SchedulingMode extends Enumeration("FAIR", "FIFO", "NONE") {
object SchedulingMode extends Enumeration {

type SchedulingMode = Value
val FAIR,FIFO,NONE = Value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.scheduler.cluster


private[spark] object TaskLocality
extends Enumeration("PROCESS_LOCAL", "NODE_LOCAL", "RACK_LOCAL", "ANY")
extends Enumeration
{
// process local is expected to be used ONLY within tasksetmanager for now.
val PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY = Value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ class DistributedSuite extends FunSuite with ShouldMatchers with BeforeAndAfter
Thread.sleep(200)
}
} catch {
case _ => { Thread.sleep(10) }
case _: Throwable => { Thread.sleep(10) }
// Do nothing. We might see exceptions because block manager
// is racing this thread to remove entries from the driver.
}
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/org/apache/spark/UnpersistSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class UnpersistSuite extends FunSuite with LocalSparkContext {
Thread.sleep(200)
}
} catch {
case _ => { Thread.sleep(10) }
case _: Throwable => { Thread.sleep(10) }
// Do nothing. We might see exceptions because block manager
// is racing this thread to remove entries from the driver.
}
Expand Down
8 changes: 4 additions & 4 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,8 @@ class RDDSuite extends FunSuite with SharedSparkContext {
// test that you get over 90% locality in each group
val minLocality = coalesced2.partitions
.map(part => part.asInstanceOf[CoalescedRDDPartition].localFraction)
.foldLeft(1.)((perc, loc) => math.min(perc,loc))
assert(minLocality >= 0.90, "Expected 90% locality but got " + (minLocality*100.).toInt + "%")
.foldLeft(1.0)((perc, loc) => math.min(perc,loc))
assert(minLocality >= 0.90, "Expected 90% locality but got " + (minLocality*100.0).toInt + "%")

// test that the groups are load balanced with 100 +/- 20 elements in each
val maxImbalance = coalesced2.partitions
Expand All @@ -238,9 +238,9 @@ class RDDSuite extends FunSuite with SharedSparkContext {
val coalesced3 = data3.coalesce(numMachines*2)
val minLocality2 = coalesced3.partitions
.map(part => part.asInstanceOf[CoalescedRDDPartition].localFraction)
.foldLeft(1.)((perc, loc) => math.min(perc,loc))
.foldLeft(1.0)((perc, loc) => math.min(perc,loc))
assert(minLocality2 >= 0.90, "Expected 90% locality for derived RDD but got " +
(minLocality2*100.).toInt + "%")
(minLocality2*100.0).toInt + "%")
}

test("zipped RDDs") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ public Stats call(Stats stats, Stats stats2) throws Exception {
});

List<Tuple2<Tuple3<String, String, String>, Stats>> output = counts.collect();
for (Tuple2 t : output) {
for (Tuple2<?,?> t : output) {
System.out.println(t._1 + "\t" + t._2);
}
System.exit(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
Expand Down Expand Up @@ -106,7 +105,7 @@ public Double call(Double sum) throws Exception {

// Collects all URL ranks and dump them to console.
List<Tuple2<String, Double>> output = ranks.collect();
for (Tuple2 tuple : output) {
for (Tuple2<?,?> tuple : output) {
System.out.println(tuple._1 + " has rank: " + tuple._2 + ".");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public Integer call(Integer i1, Integer i2) {
});

List<Tuple2<String, Integer>> output = counts.collect();
for (Tuple2 tuple : output) {
for (Tuple2<?,?> tuple : output) {
System.out.println(tuple._1 + ": " + tuple._2);
}
System.exit(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel;
import org.apache.spark.mllib.recommendation.Rating;

import java.io.Serializable;
import java.util.Arrays;
import java.util.StringTokenizer;

Expand Down
Loading

0 comments on commit 7d50f9f

Please sign in to comment.