forked from spark-jobserver/spark-jobserver
-
Notifications
You must be signed in to change notification settings - Fork 0
/
SparkJobBase.scala
84 lines (71 loc) · 2.78 KB
/
SparkJobBase.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package spark.jobserver.api
import java.io.{File, IOException}
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.spark.SparkContext
import org.apache.spark.api.java.JavaSparkContext
import org.scalactic._
import spark.jobserver.{ContextLike, NamedObjects}
trait JobEnvironment {
def jobId: String
def namedObjects: NamedObjects
def contextConfig: Config
}
/**
* This trait provides data file access on local and cluster deployments of job manager.
*/
trait DataFileCache {
/**
* Returns data file if exists on this host or a temporary copy if job server and manager
* running on different hosts (cluster deployment).
*
* @param dataFile full path of data file on job server returned by web API
* @return data file or temporary copy cluster deployments
*/
@throws(classOf[IOException])
def getDataFile(dataFile: String): File
}
trait ValidationProblem
case class SingleProblem(problem: String) extends ValidationProblem
/**
* This trait is the main API for Spark jobs submitted to the Job Server.
*
* The idea is that validate() will translate the config into another type, and runJob will
* operate on that type.
*/
trait SparkJobBase {
type C
type JobData
type JobOutput
/**
* This is the entry point for a Spark Job Server to execute Spark jobs.
* This function should create or reuse RDDs and return the result at the end, which the
* Job Server will cache or display.
*
* @param sc a SparkContext or similar for the job. May be reused across jobs.
* @param runtime the JobEnvironment containing run time information pertaining to the job and context.
* @param data the JobData returned by the validate method
* @return the job result
*/
def runJob(sc: C, runtime: JobEnvironment, data: JobData): JobOutput
/**
* This method is called by the job server to allow jobs to validate their input and reject
* invalid job requests. If SparkJobInvalid is returned, then the job server returns 400
* to the user.
* NOTE: this method should return very quickly. If it responds slowly then the job server may time out
* trying to start this job.
*
* @param sc a SparkContext or similar for the job. May be reused across jobs.
* @param runtime the JobEnvironment containing run time information pertaining to the job and context.
* @param config the Typesafe Config object passed into the job request
* @return either JobData, which is parsed from config, or a list of validation issues.
*/
def validate(sc: C, runtime: JobEnvironment, config: Config): JobData Or Every[ValidationProblem]
}
trait SparkJob extends SparkJobBase {
type C = SparkContext
}
trait OldSparkJob extends SparkJobBase {
type C = SparkContext
type JobOutput = Any
type JobData = Config
}