Skip to content

spark utils

Oliver Tupran edited this page Feb 23, 2019 · 5 revisions

spark-utils is a simple framework, developed across a few years of writing Spark applications that so far helped me starting up new projects and creating applications fast and relatively easy.

The main ideas behind building a new Spark application are logic, configuration and execution.

Logic

Any Spark application, or as a matter of fact any application, needs to do something, otherwise it is not very useful. The something can be as simple as counting words and as complex as needed.

The logic is implemented through the SparkRunnable trait, which is comes down to implementing a single function, run().

trait SparkRunnable[Context, Result]  {
    def run(implicit spark: SparkSession, context: Context): Result
}

We can see that the run() function takes two parameters, one being an active SparkSession and context which represents the application context . The application context can be anything, containing IO configurations or specific application configuration parameters. The most popular choice is the use of a case class.

Let's examine the most simple example, available in RecordsCount01.

  override def run(implicit spark: SparkSession, context: LinesCount01Context): Unit =
    println(s"There are ${spark.source(context.input).read.count} records in the ${context.input.path} file.")

The Result itself can be anything or nothing, leaving it to the developer to define. It is very useful however to have a meaningful return type for unit and integration tests. In the example above, we might return the actual count so it will be easy to test the run() function itself rather than it's side-effects.

At this point there are two reasonable questions:

1) Where is the Spark session coming from?

Normally the developer does not implement the SparkRunnable trait directly, but implements it through the SparkApp trait, which, in turn, extends SparkRunnable.

In a few strokes, the SparkApp looks like the following:

trait SparkApp[Context, Result] extends SparkRunnable[Context, Result] with Logging {
    def appName: String = getClass.getSimpleName.replaceAll("\\$", "")
    def createContext(config: Config): Context
    def main(implicit args: Array[String]): Unit = {...}
}

We will cover more about the functions here in the next sections.

The main() function is the one responsible for creating the Spark session. We will find more details in the Execution section.

2) What about that configuration?

About the createContext() and appName() functions we will find out more in the Configuration section.

3) What is the spark.source() function and where is it coming from?

The spark.source() function is provided by the DataSource framework and together with the corresponding configurations it can make reading input files really easy.

Configuration

In order to perform the logic, we usually need at least one of the two: an input and an output. Also, some logic specific parameters might be necessary as well. We need to be able to pass all these parameters to our logic as easy and transparent as possible.

By default, the SparkApp will use the simple class name as the application name. This can be useful when running the application in Yarn, as it gives some indication on what is actually running, rather than some random name. The developer is free to overwrite the appName() function to get a better name for the application.

The more challenging part is defining the actual application configuration. Currently, the SparkApp requires the implementation of the createContext() function, that creates an application context instance out of a given Typesafe Config instance.

Why use the Typesafe Configuration framework? This framework provides a much more flexible way of dealing with application configuration, which goes beyond the popular Java properties way, by adding both Json and Hocon support. Other choices might be considered in the future, like Yaml. For now, being able to express a variety of types in the configuration, including sequences and maps, also in a structured way, it is more than powerful enough to reach or configuration goal. The principle of least power comes to mind.

Ok, but how do we create the application configuration out of the Typesafe Config instance? The developer can always go and write some custom code to do it, but there is already a configuration framework in place that can help a lot.

An example of an application that needs and input an output and some string parameter in the context configuration is shown in the code sample below:

import org.tupol.spark.io.{ FileSinkConfiguration, FileSourceConfiguration }
import org.tupol.utils.config.Configurator

case class SomeApplicationContext(input: FileSourceConfiguration, output: FileSinkConfiguration, wordFilter: String)
object SomeApplicationContext extends Configurator[SomeApplicationContext] {
  import com.typesafe.config.Config
  import org.tupol.utils.config._
  import scalaz.ValidationNel
  import scalaz.syntax.applicative._

  override def validationNel(config: Config): ValidationNel[Throwable, SomeApplicationContext] = {
    config.extract[FileSourceConfiguration]("input") |@|
    config.extract[FileSinkConfiguration]("output") |@|
    config.extract[Option[String]]("wordFilter") apply
    SomeApplicationContext.apply
  }
}

An in-depth description of the configuration framework used is available in the scala-utils project.

Execution

In order to get the Spark application going we need to create the Spark context or session.

The spark-utils framework is providing the execution support out of the box. It helps with creating the application configuration and focuses the actual development effort on creating the application logic. The framework provides configuration tools for inputs and outputs, leaving for the developer to create the other configuration parameters and assemble them into a case class.

The setup is done through the main() function, which makes any application implementing SparkApp an actual Spark application. The following steps will be performed:

  1. Configuration initialization and application context creation
  2. Spark session creation
  3. Execution of the run() function
  4. Logging the steps and the success or failure of the application
  5. Return the result and exit

The external configuration is passed to the main() function in the following ways, in the order specified below:

  1. Application parameters; they are passed in a properties style, separated by whitespaces, like app.name.param1=param1value app.name.param2=param2value.
  2. Configuration file; passed as an argument to spark-submit --files=..../application.conf
  3. Configuration file application.conf, if available in the classpath.
  4. Reference configuration file; it can be in the application jar itself as reference.conf. The order is important because a parameter defined in the application parameters overwrites the parameter with the same name defined in the application.conf, which in turn overwrites the parameter with the same name from the reference.conf.

Failure[T] is Always an Option[T]

Almost all the functions exposed by the spark-utils API can fail so write the code accordingly. In the past all of these functions used to return a Try[T] but the API looks much cleaner this way and gives more power and more responsibility to the developer.

First Application

Starting up the first time there are many things to consider besides understanding the technology itself. An sbt g8 template is available in the spark-apps.seed.g8 project. Using this template one can setup a project as simple as running

g8 tupol/spark-apps.seed.g8

What Next?