Skip to content

Commit

Permalink
Merge pull request apache#413 from rxin/scaladoc
Browse files Browse the repository at this point in the history
Adjusted visibility of various components and documentation for 0.9.0 release.
  • Loading branch information
pwendell committed Jan 14, 2014
2 parents 0ca0d4d + 33022d6 commit 68641bc
Show file tree
Hide file tree
Showing 21 changed files with 66 additions and 40 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.collection.generic.Growable
import org.apache.spark.serializer.JavaSerializer

/**
* A datatype that can be accumulated, i.e. has an commutative and associative "add" operation,
* A datatype that can be accumulated, ie has an commutative and associative "add" operation,
* but where the result type, `R`, may be different from the element type being added, `T`.
*
* You must define how to add data, and how to merge two of these together. For some datatypes,
Expand Down Expand Up @@ -185,7 +185,7 @@ class GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Ser
}

/**
* A simpler value of [[org.apache.spark.Accumulable]] where the result type being accumulated is the same
* A simpler value of [[Accumulable]] where the result type being accumulated is the same
* as the types of elements being merged.
*
* @param initialValue initial value of accumulator
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/FutureAction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import org.apache.spark.rdd.RDD


/**
* A future for the result of an action. This is an extension of the Scala Future interface to
* support cancellation.
* A future for the result of an action to support cancellation. This is an extension of the
* Scala Future interface to support cancellation.
*/
trait FutureAction[T] extends Future[T] {
// Note that we redefine methods of the Future trait here explicitly so we can specify a different
Expand Down Expand Up @@ -86,7 +86,7 @@ trait FutureAction[T] extends Future[T] {


/**
* The future holding the result of an action that triggers a single job. Examples include
* A [[FutureAction]] holding the result of an action that triggers a single job. Examples include
* count, collect, reduce.
*/
class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: => T)
Expand Down Expand Up @@ -150,7 +150,7 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc:


/**
* A FutureAction for actions that could trigger multiple Spark jobs. Examples include take,
* A [[FutureAction]] for actions that could trigger multiple Spark jobs. Examples include take,
* takeSample. Cancellation works by setting the cancelled flag to true and interrupting the
* action thread if it is being blocked by a job.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark

/**
* An iterator that wraps around an existing iterator to provide task killing functionality.
* It works by checking the interrupted flag in TaskContext.
* It works by checking the interrupted flag in [[TaskContext]].
*/
class InterruptibleIterator[+T](val context: TaskContext, val delegate: Iterator[T])
extends Iterator[T] {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/Logging.scala
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ trait Logging {
}
}

object Logging {
private object Logging {
@volatile private var initialized = false
val initLock = new Object()
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.concurrent.atomic.AtomicLong

import org.apache.spark._

private[spark]
abstract class Broadcast[T](private[spark] val id: Long) extends Serializable {
def value: T

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.SparkConf
* BroadcastFactory implementation to instantiate a particular broadcast for the
* entire Spark job.
*/
private[spark] trait BroadcastFactory {
trait BroadcastFactory {
def initialize(isDriver: Boolean, conf: SparkConf): Unit
def newBroadcast[T](value: T, isLocal: Boolean, id: Long): Broadcast[T]
def stop(): Unit
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,10 @@ private[spark] class HttpBroadcast[T](@transient var value_ : T, isLocal: Boolea
}
}

private[spark] class HttpBroadcastFactory extends BroadcastFactory {
/**
* A [[BroadcastFactory]] implementation that uses a HTTP server as the broadcast medium.
*/
class HttpBroadcastFactory extends BroadcastFactory {
def initialize(isDriver: Boolean, conf: SparkConf) { HttpBroadcast.initialize(isDriver, conf) }

def newBroadcast[T](value_ : T, isLocal: Boolean, id: Long) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,10 @@ private[spark] case class TorrentInfo(
@transient var hasBlocks = 0
}

private[spark] class TorrentBroadcastFactory
extends BroadcastFactory {
/**
* A [[BroadcastFactory]] that creates a torrent-based implementation of broadcast.
*/
class TorrentBroadcastFactory extends BroadcastFactory {

def initialize(isDriver: Boolean, conf: SparkConf) { TorrentBroadcast.initialize(isDriver, conf) }

Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,12 @@ import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.{DriverState, Master}
import org.apache.spark.util.{AkkaUtils, Utils}
import akka.actor.Actor.emptyBehavior
import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent}

/**
* Proxy that relays messages to the driver.
*/
class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends Actor with Logging {
private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends Actor with Logging {
var masterActor: ActorSelection = _
val timeout = AkkaUtils.askTimeout(conf)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ import org.apache.spark.util.Utils
/**
** Utilities for running commands with the spark classpath.
*/
private[spark]
object CommandUtils extends Logging {
private[spark] def buildCommandSeq(command: Command, memory: Int, sparkHome: String): Seq[String] = {
def buildCommandSeq(command: Command, memory: Int, sparkHome: String): Seq[String] = {
val runner = getEnv("JAVA_HOME", command).map(_ + "/bin/java").getOrElse("java")

// SPARK-698: do not call the run.cmd script, as process.destroy()
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ package org.apache
* be saved as SequenceFiles. These operations are automatically available on any RDD of the right
* type (e.g. RDD[(Int, Int)] through implicit conversions when you
* `import org.apache.spark.SparkContext._`.
*
* Java programmers should reference the [[spark.api.java]] package
* for Spark programming APIs in Java.
*/
package object spark {
// For package docs only
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import scala.io.Source
import scala.reflect.ClassTag

import org.apache.spark.{SparkEnv, Partition, TaskContext}
import org.apache.spark.broadcast.Broadcast


/**
Expand Down Expand Up @@ -113,7 +112,7 @@ class PipedRDD[T: ClassTag](
}
}

object PipedRDD {
private object PipedRDD {
// Split a string into words using a standard StringTokenizer
def tokenize(command: String): Seq[String] = {
val buf = new ArrayBuffer[String]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,15 @@ package org.apache.spark.scheduler

import java.util.Properties
import org.apache.spark.util.{Utils, Distribution}
import org.apache.spark.{Logging, SparkContext, TaskEndReason}
import org.apache.spark.{Logging, TaskEndReason}
import org.apache.spark.executor.TaskMetrics

sealed trait SparkListenerEvents

case class SparkListenerStageSubmitted(stage: StageInfo, properties: Properties)
extends SparkListenerEvents

case class SparkListenerStageCompleted(val stage: StageInfo) extends SparkListenerEvents
case class SparkListenerStageCompleted(stage: StageInfo) extends SparkListenerEvents

case class SparkListenerTaskStart(task: Task[_], taskInfo: TaskInfo) extends SparkListenerEvents

Expand All @@ -46,6 +46,9 @@ case class SparkListenerJobEnd(job: ActiveJob, jobResult: JobResult)
/** An event used in the listener to shutdown the listener daemon thread. */
private[scheduler] case object SparkListenerShutdown extends SparkListenerEvents

/**
* Interface for listening to events from the Spark scheduler.
*/
trait SparkListener {
/**
* Called when a stage is completed, with information on the completed stage
Expand Down Expand Up @@ -115,7 +118,7 @@ class StatsReportListener extends SparkListener with Logging {

}

object StatsReportListener extends Logging {
private[spark] object StatsReportListener extends Logging {

//for profiling, the extremes are more interesting
val percentiles = Array[Int](0,5,10,25,50,75,90,95,100)
Expand Down Expand Up @@ -202,9 +205,9 @@ object StatsReportListener extends Logging {
}
}

private case class RuntimePercentage(executorPct: Double, fetchPct: Option[Double], other: Double)

case class RuntimePercentage(executorPct: Double, fetchPct: Option[Double], other: Double)
object RuntimePercentage {
private object RuntimePercentage {
def apply(totalTime: Long, metrics: TaskMetrics): RuntimePercentage = {
val denom = totalTime.toDouble
val fetchTime = metrics.shuffleReadMetrics.map{_.fetchWaitTime}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.serializer.{SerializationStream, Serializer}
*
* This interface does not support concurrent writes.
*/
abstract class BlockObjectWriter(val blockId: BlockId) {
private[spark] abstract class BlockObjectWriter(val blockId: BlockId) {

def open(): BlockObjectWriter

Expand Down Expand Up @@ -69,7 +69,7 @@ abstract class BlockObjectWriter(val blockId: BlockId) {
}

/** BlockObjectWriter which writes directly to a file on disk. Appends to the given file. */
class DiskBlockObjectWriter(
private[spark] class DiskBlockObjectWriter(
blockId: BlockId,
file: File,
serializer: Serializer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ class StorageLevel private(
}


/**
* Various [[org.apache.spark.storage.StorageLevel]] defined and utility functions for creating
* new storage levels.
*/
object StorageLevel {
val NONE = new StorageLevel(false, false, false)
val DISK_ONLY = new StorageLevel(true, false, false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,23 @@
package org.apache.spark.util

/**
* Wrapper around an iterator which calls a completion method after it successfully iterates through all the elements
* Wrapper around an iterator which calls a completion method after it successfully iterates
* through all the elements.
*/
abstract class CompletionIterator[+A, +I <: Iterator[A]](sub: I) extends Iterator[A]{
def next = sub.next
private[spark] abstract class CompletionIterator[+A, +I <: Iterator[A]](sub: I) extends Iterator[A]{
def next() = sub.next()
def hasNext = {
val r = sub.hasNext
if (!r) {
completion
completion()
}
r
}

def completion()
}

object CompletionIterator {
private[spark] object CompletionIterator {
def apply[A, I <: Iterator[A]](sub: I, completionFunction: => Unit) : CompletionIterator[A,I] = {
new CompletionIterator[A,I](sub) {
def completion() = completionFunction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
package org.apache.spark.util

import java.util.{TimerTask, Timer}
import org.apache.spark.{SparkConf, SparkContext, Logging}
import org.apache.spark.{SparkConf, Logging}


/**
* Runs a timer task to periodically clean up metadata (e.g. old files or hashtable entries)
*/
class MetadataCleaner(
private[spark] class MetadataCleaner(
cleanerType: MetadataCleanerType.MetadataCleanerType,
cleanupFunc: (Long) => Unit,
conf: SparkConf)
Expand Down Expand Up @@ -60,7 +60,7 @@ class MetadataCleaner(
}
}

object MetadataCleanerType extends Enumeration {
private[spark] object MetadataCleanerType extends Enumeration {

val MAP_OUTPUT_TRACKER, SPARK_CONTEXT, HTTP_BROADCAST, DAG_SCHEDULER, RESULT_TASK,
SHUFFLE_MAP_TASK, BLOCK_MANAGER, SHUFFLE_BLOCK_MANAGER, BROADCAST_VARS = Value
Expand All @@ -72,7 +72,7 @@ object MetadataCleanerType extends Enumeration {

// TODO: This mutates a Conf to set properties right now, which is kind of ugly when used in the
// initialization of StreamingContext. It's okay for users trying to configure stuff themselves.
object MetadataCleaner {
private[spark] object MetadataCleaner {
def getDelaySeconds(conf: SparkConf) = {
conf.getInt("spark.cleaner.ttl", -1)
}
Expand Down
7 changes: 7 additions & 0 deletions project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,13 @@ object SparkBuild extends Build {
javaOptions += "-Xmx3g",
// Show full stack trace and duration in test cases.
testOptions in Test += Tests.Argument("-oDF"),
// Remove certain packages from Scaladoc
scalacOptions in (Compile,doc) := Seq("-skip-packages", Seq(
"akka",
"org.apache.spark.network",
"org.apache.spark.deploy",
"org.apache.spark.util.collection"
).mkString(":")),

// Only allow one test at a time, even across projects, since they run in the same JVM
concurrentRestrictions in Global += Tags.limit(Tags.Test, 1),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.util
package org.apache.spark.streaming.util

import scala.annotation.tailrec

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@

package org.apache.spark.streaming.util

import java.nio.ByteBuffer
import org.apache.spark.util.{RateLimitedOutputStream, IntParam}
import java.io.IOException
import java.net.ServerSocket
import org.apache.spark.{SparkConf, Logging}
import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
import java.nio.ByteBuffer

import scala.io.Source
import java.io.IOException

import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream

import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.util.IntParam

/**
* A helper program that sends blocks of Kryo-serialized text strings out on a socket at a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.util
package org.apache.spark.streaming.util

import org.scalatest.FunSuite
import java.io.ByteArrayOutputStream
Expand Down

0 comments on commit 68641bc

Please sign in to comment.