Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAMZA-2449: Create an example job in samza-hello-samza for job coordinator split deployment #72

Open
wants to merge 7 commits into
base: latest
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 126 additions & 6 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,29 +36,52 @@ repositories {
maven { url "https://repository.apache.org/content/groups/public" }
}


idea {
module {
sourceDirs += file('src/main/java')
testSourceDirs += file('src/test/java')
}
}

// a configuration for dependencies that need exploding into package
// NOTE: All of the "framework" pieces are for job coordinator dependency isolation. If you are
// just doing regular packaging for a Samza job, then those pieces can be ignored.

configurations {
// a configuration for dependencies that need exploding into package
explode

// configuration for runtime for Samza framework API package
frameworkApiRuntime {
// using log4j2, so exclude log4j dependencies
exclude group: 'log4j', module: 'log4j'
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
}
// configuration for generating class names for Samza framework API
frameworkApiClasses {
// only want to generate class names for the classes directly in the dependencies, not for transitive dependencies
transitive = false
}
// configuration for runtime for Samza framework infrastructure package
frameworkInfrastructureRuntime {
// using log4j2, so exclude log4j dependencies
exclude group: 'log4j', module: 'log4j'
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
}
// configuration for extracting scripts for running Samza in the framework infrastructure package
frameworkInfrastructureExplode
}

configurations.all {
configurations {
// using log4j1 to log4j2 bridge so need to exclude log4j1
exclude group: 'log4j', module: 'log4j'
runtime.exclude group: 'log4j', module: 'log4j'
// exclude all other slf4j bindings that are transitively pulled in
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
runtime.exclude group: 'org.slf4j', module: 'slf4j-log4j12'
}

dependencies {
compile(group: 'org.codehaus.jackson', name: 'jackson-jaxrs', version: '1.9.13')
compile(group: 'org.slf4j', name: 'slf4j-api', version: "$SLF4J_VERSION")
// slf4j-api 1.8.0-alpha2 uses ServiceProvider which requires additional set-up for split deployment, so pinning down the version
compile(group: 'org.slf4j', name: 'slf4j-api', version: "$SLF4J_VERSION") { force = true }
compile(group: 'org.schwering', name: 'irclib', version: '1.10')
compile(group: 'org.apache.samza', name: 'samza-api', version: "$SAMZA_VERSION")
compile(group: 'org.apache.samza', name: 'samza-azure_2.11', version: "$SAMZA_VERSION")
Expand All @@ -73,8 +96,26 @@ dependencies {
runtime(group: 'org.apache.samza', name: 'samza-yarn_2.11', version: "$SAMZA_VERSION")
runtime(group: 'org.apache.kafka', name: 'kafka_2.11', version: "$KAFKA_VERSION")
runtime(group: 'org.apache.hadoop', name: 'hadoop-hdfs', version: "$HADOOP_VERSION")

testCompile(group: 'org.apache.samza', name: 'samza-test_2.11', version: "$SAMZA_VERSION")
testCompile(group: 'junit', name: 'junit', version: "4.12")

// dependencies for framework API package
frameworkApiRuntime(group: 'org.apache.samza', name: 'samza-api', version: "$SAMZA_VERSION")
frameworkApiRuntime(group: 'org.apache.samza', name: 'samza-kv_2.11', version: "$SAMZA_VERSION")
frameworkApiRuntime(group: 'org.apache.samza', name: 'samza-log4j2_2.11', version: "$SAMZA_VERSION") // pulls in log4j2 dependencies
frameworkApiRuntime(group: 'org.slf4j', name: 'slf4j-api', version: "$SLF4J_VERSION") { force = true }
// need to specify the classes in these modules as framework API classes
frameworkApiClasses(group: 'org.apache.samza', name: 'samza-api', version: "$SAMZA_VERSION")
frameworkApiClasses(group: 'org.apache.samza', name: 'samza-kv_2.11', version: "$SAMZA_VERSION")

// dependencies for framework infrastructure package
frameworkInfrastructureRuntime(group: 'org.apache.samza', name: 'samza-core_2.11', version: "$SAMZA_VERSION")
frameworkInfrastructureRuntime(group: 'org.apache.samza', name: 'samza-kafka_2.11', version: "$SAMZA_VERSION")
frameworkInfrastructureRuntime(group: 'org.apache.samza', name: 'samza-log4j2_2.11', version: "$SAMZA_VERSION")
frameworkInfrastructureRuntime(group: 'org.apache.samza', name: 'samza-yarn_2.11', version: "$SAMZA_VERSION")
frameworkInfrastructureRuntime(group: 'org.slf4j', name: 'slf4j-api', version: "$SLF4J_VERSION") { force = true }
frameworkInfrastructureExplode(group: 'org.apache.samza', name: 'samza-shell', ext: 'tgz', classifier: 'dist', version: "$SAMZA_VERSION")
}

// make the samza distribution .tgz file
Expand All @@ -89,6 +130,7 @@ task distTar(dependsOn: build, type: Tar) {
include "wikipedia-parser.properties"
include "wikipedia-stats.properties"
include "wikipedia-application.properties"
include "wikipedia-application-with-framework.properties"

// expand the Maven tokens with Gradle equivalents. Also change 'target' (Maven) to 'build/distributions' (Gradle)
filter { String line ->
Expand All @@ -112,6 +154,84 @@ task distTar(dependsOn: build, type: Tar) {
}
}

//
// Job coordinator isolation tasks
//

def SAMZA_FRAMEWORK_TMP_DIR_NAME = "samzaFrameworkTmp"

task classListGeneration(type: ClassListGenerationTask) {
classListOutputDirectory(new File(project.buildDir, SAMZA_FRAMEWORK_TMP_DIR_NAME))
customClassListEntries = [
// for log4j2 classes (both api and core classes)
"org.apache.logging.log4j.*"
]
}

// build the framework API .tgz file
task frameworkApiDistTar(dependsOn: classListGeneration, type: Tar) {
baseName("samza-hello-samza-frameworkApi")
destinationDir(new File(project.buildDir, "/distributions"))
compression(Compression.GZIP)
classifier('dist')
extension('tar.gz')
into("lib") {
from configurations.frameworkApiRuntime
from new File(new File(project.buildDir, SAMZA_FRAMEWORK_TMP_DIR_NAME), ClassListGenerationTask.SAMZA_FRAMEWORK_API_CLASSES_FILE_NAME)
}
}

/**
* Generates the framework API class list file by looking at the structure of the JARs in the frameworkApiClasses configuration.
*/
class ClassListGenerationTask extends DefaultTask {
static String SAMZA_FRAMEWORK_API_CLASSES_FILE_NAME = "samza-framework-api-classes.txt"
File classListOutputDirectory;
List<String> customClassListEntries;

@TaskAction
def generateClassList() {
def classExtension = ".class"
def classNames = [] as Set
classNames.addAll(customClassListEntries)
def jarFiles = project.configurations.frameworkApiClasses.files
.findAll { it.name.endsWith(".jar") }
.each { project.zipTree(it)
.matching { pattern -> pattern.include("**/*" + classExtension) }
.visit(new FileVisitor() {
public void visitDir(FileVisitDetails fileVisitDetails) {
// we only care about class files, so no need to do anything for directories
}
public void visitFile(FileVisitDetails fileVisitDetails) {
def path = fileVisitDetails.getPath()
classNames.add(path.substring(0, path.length() - classExtension.length()).replace("/", "."))
}
})
}
classListOutputDirectory.mkdirs()
def classListOutputFile = new File(classListOutputDirectory, SAMZA_FRAMEWORK_API_CLASSES_FILE_NAME)
classListOutputFile.text = classNames.sort().join("\n")
}
}

// build the framework infrastructure .tgz file
task frameworkInfrastructureDistTar(type: Tar) {
baseName("samza-hello-samza-frameworkInfrastructure")
destinationDir(new File(project.buildDir, "/distributions"))
compression(Compression.GZIP)
classifier('dist')
extension('tar.gz')
into("bin") {
from {
configurations.frameworkInfrastructureExplode.collect { tarTree(it) }
}
}
into("lib") {
from configurations.frameworkInfrastructureRuntime
}
}


// install everything
task installGrid(type: Exec) {
workingDir(project.projectDir)
Expand Down
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,5 @@ SAMZA_VERSION=1.4.0-SNAPSHOT
KAFKA_VERSION=0.11.0.2
HADOOP_VERSION=2.7.1

SLF4J_VERSION = 1.7.7

// slf4j-api 1.8.0-alpha2 uses ServiceProvider which requires additional set-up for split deployment, so pinning down the version
SLF4J_VERSION=1.7.7
41 changes: 41 additions & 0 deletions src/main/config/wikipedia-application-with-framework.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# Application / Job
app.class=samza.examples.wikipedia.application.WikipediaApplication
job.factory.class=org.apache.samza.job.yarn.YarnJobFactory
job.name=wikipedia-application

# YARN
yarn.package.path=file://${basedir}/target/${project.artifactId}-${pom.version}-dist.tar.gz

# Serializers
serializers.registry.string.class=org.apache.samza.serializers.StringSerdeFactory
serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory

# Key-value storage
stores.wikipedia-stats.factory=org.apache.samza.storage.kv.RocksDbKeyValueStorageEngineFactory
stores.wikipedia-stats.changelog=kafka.wikipedia-stats-changelog
stores.wikipedia-stats.key.serde=string
stores.wikipedia-stats.msg.serde=integer

# enabling usage of Samza framework for job coordinator isolation
samza.cluster.based.job.coordinator.dependency.isolation.enabled=true
yarn.resources.__samzaFrameworkApi.path=file://${basedir}/target/samza-hello-samza-frameworkApi-${pom.version}-dist.tar.gz
yarn.resources.__samzaFrameworkApi.local.type=ARCHIVE
yarn.resources.__samzaFrameworkInfrastructure.path=file://${basedir}/target/samza-hello-samza-frameworkInfrastructure-${pom.version}-dist.tar.gz
yarn.resources.__samzaFrameworkInfrastructure.local.type=ARCHIVE