-
Notifications
You must be signed in to change notification settings - Fork 1
Running Scalding on TEZ
These are provisional instructions to run your Scalding application on TEZ. Depending on your distribution, your mileage may vary:
- Build a version of scalding 0.13.1 patched with https://github.com/twitter/scalding/pull/1220
- modify version.sbt to make it different. I have
version in ThisBuild := "0.13.1-cch-ffc2"
in mine - put the resulting jars somewhere maven and ivy (sbt) will find them
-
Get the TEZ 0.6.1 release, which should be imminent now (use 0.6.1rc0 if not yet released)
- Unpack the tez runtime library (a tarball you're supposed to put on your cluster)
-
remove guava from the runtime library's unpacked "lib" directory, put this https://github.com/cchepelov/dev-slash-random/raw/master/guava-18.0-nostopwatch.jar and that https://github.com/cchepelov/dev-slash-random/blob/master/guava-fix_2.11-1.0.2.jar
-
rebuild the runtime library tarball under a different name. I'll assume tez-0.6.1-guavafix.tar.gz
-
put this tarball on your cluster's HDFS. I'll assume /apps/tez-0.6.1/tez-0.6.1-guavafix.tar.gz
-
Add the following in your build.sbt:
// Near the top: val cascadingFabric = sys.props.getOrElse("CASCADING_FABRIC", "hadoop2-tez") // can be "hadoop", "hadoop2-mr1" or "hadoop2-tez" val cascadingVersion = "3.0.0-wip-115" // TODO: test wip-116, and keep updating until 3.0.0 val scaldingVersion = { if (cascadingFabric == "hadoop") { "0.13.1" } else { "0.13.1-cch-ffc2" // put your own private build version name here } } // Near the bottom: libraryDependencies ++= Seq( "com.twitter" %% "scalding-core" % scaldingVersion exclude("cascading", "cascading-core") exclude("cascading", "cascading-hadoop") exclude("cascading", "cascading-local"), "com.twitter" %% "scalding-args" % scaldingVersion exclude("cascading", "cascading-core") exclude("cascading", "cascading-hadoop") exclude("cascading", "cascading-local"), "com.twitter" %% "scalding-date" % scaldingVersion exclude("cascading", "cascading-core") exclude("cascading", "cascading-hadoop") exclude("cascading", "cascading-local"), "com.twitter" %% "scalding-commons" % scaldingVersion exclude("cascading", "cascading-core") exclude("cascading", "cascading-hadoop") exclude("cascading", "cascading-local") exclude("com.hadoop.gplcompression", "hadoop-lzo") // hadoop-lzo also pulled in by elephantbird // WATCH OUT: explicitly exclude("cascading", "cascading-hadoop") (and friends) on any dependency that pulls it in; use ivy2's target/resolution-cache/reports to figure out who pulls what ) libraryDependencies ++= Seq( "org.apache.thrift" % "libthrift" % "0.9.1", "cascading" % "cascading-core" % cascadingVersion, "cascading" % ("cascading-" + cascadingFabric) % cascadingVersion, "cascading" % "cascading-local" % cascadingVersion ) if (cascadingFabric != "hadoop") { libraryDependencies ++= Seq( "cascading" % "cascading-hadoop" % cascadingVersion % "test" ) } else { libraryDependencies ++= Nil }
-
Override your scalding Job #config() method to include:
override def config(): Map[AnyRef,AnyRef] = { super.config ++ Map( "tez.task.launch.cmd-opts" -> ( //"-XshowSettings -Xdiag -verbose:class " + //"-XX:+TraceClassLoading -XX:+TraceLoaderConstraints " + "-XX:+TraceClassResolution " + "-XX:+UnlockDiagnosticVMOptions -XX:NativeMemoryTracking=summary -XX:+PrintNMTStatistics " + "-XX:+AggressiveOpts " + "-XX:+PrintGCDetails -verbose:gc -XX:+PrintGCTimeStamps -XX:+UseNUMA -XX:+UseParallelGC " + " -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp"), "tez.generate.debug.artifacts" -> "true", // default false "tez.task.resource.memory.mb" -> (1024+512).toString, // default 1024. CRITICAL as the scalding stacks' uses up more native memory than usual "tez.container.max.java.heap.fraction" -> "0.7", // default 0.8 // goes with the previous line "tez.am.mode.session" -> "true", "tez.am.container.idle.release-timeout-min.millis" -> "10000", "tez.am.container.idle.release-timeout-max.millis" -> "60000", //"tez.task.resource.memory.mb" -> "2048", // default 1024 //"tez.container.max.java.heap.fraction" -> "0.8", // default 0.8 "tez.tez-ui.history-url.base" -> "http://localhost:9111", "cascading.flow.runtime.gather.partitions.num" -> args.getOrElse("tez-partitions","4"), "tez.runtime.intermediate-output.should-compress" -> "true", "tez.runtime.intermediate-input.is-compressed" -> "true", "tez.lib.uris" -> args.getOrElse("tez.lib.uris", "/apps/tez-0.6.1/tez-0.6.1-guavafix.tar.gz") /* you might want to add tez.history.logging.service.class / yarn.timeline-service.hostname etc. for ATS */ )
-
When running your jar, replace the --hdfs flag with --hadoop2-tez