Skip to content

Commit

Permalink
SAMZA-2763: Support worker JVM opts for Samza Beam portable mode
Browse files Browse the repository at this point in the history
  • Loading branch information
mynameborat committed Nov 1, 2023
1 parent 24e530d commit 51394f0
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 16 deletions.
15 changes: 15 additions & 0 deletions docs/learn/documentation/versioned/jobs/configuration-table.html
Original file line number Diff line number Diff line change
Expand Up @@ -2048,6 +2048,21 @@ <h1>Samza Configuration Reference</h1>
</th>
</tr>

<tr>
<td class="property" id="worker-opts">worker.opts</td>
<td class="default"></td>
<td class="description">
Any JVM options to include in the command line when executing worker process in portable execution of Samza using beam. For example,
this can be used to set the JVM heap size, to tune the garbage collector, or to enable
<a href="/learn/tutorials/{{site.version}}/remote-debugging-samza.html">remote debugging</a>.
Anything you put in <code>worker.opts</code> gets forwarded directly to the commandline of worker process as part of the JVM invocation.
<b>Note:</b> The configuration only applies for Samza Beam portable mode.
<dl>
<dt>Example: <code>worker.opts=-XX:+HeapDumpOnOutOfMemoryError -XX:+UseConcMarkSweepGC</code></dt>
</dl>
</td>
</tr>

<tr>
<td class="property" id="yarn-package-path">yarn.package.path</td>
<td class="default"></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ public class JobConfig extends MapConfig {
public static final String JOB_AUTOSIZING_CONTAINER_MAX_HEAP_MB = JOB_AUTOSIZING_CONFIG_PREFIX + "container.maxheap.mb";
public static final String JOB_AUTOSIZING_CONTAINER_MEMORY_MB = JOB_AUTOSIZING_CONFIG_PREFIX + "container.memory.mb";
public static final String JOB_AUTOSIZING_CONTAINER_MAX_CORES = JOB_AUTOSIZING_CONFIG_PREFIX + "container.cpu.cores";
public static final String JOB_AUTOSIZING_WORKER_MAX_HEAP_MB = JOB_AUTOSIZING_CONFIG_PREFIX + "worker.maxheap.mb";

public static final String COORDINATOR_STREAM_FACTORY = "job.coordinatorstream.config.factory";
public static final String DEFAULT_COORDINATOR_STREAM_CONFIG_FACTORY = "org.apache.samza.util.DefaultCoordinatorStreamConfigFactory";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
*/
package org.apache.samza.config;

import com.google.common.annotations.VisibleForTesting;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;


public class ShellCommandConfig extends MapConfig {
Expand Down Expand Up @@ -77,6 +79,7 @@ public class ShellCommandConfig extends MapConfig {

public static final String COMMAND_SHELL_EXECUTE = "task.execute";
public static final String TASK_JVM_OPTS = "task.opts";
public static final String WORKER_JVM_OPTS = "worker.opts";
public static final String TASK_JAVA_HOME = "task.java.home";

/**
Expand All @@ -97,20 +100,16 @@ public String getCommand() {
}

public Optional<String> getTaskOpts() {
Optional<String> jvmOpts = Optional.ofNullable(get(ShellCommandConfig.TASK_JVM_OPTS));
Optional<String> maxHeapMbOptional = Optional.ofNullable(get(JobConfig.JOB_AUTOSIZING_CONTAINER_MAX_HEAP_MB));
if (new JobConfig(this).getAutosizingEnabled() && maxHeapMbOptional.isPresent()) {
String maxHeapMb = maxHeapMbOptional.get();
String xmxSetting = "-Xmx" + maxHeapMb + "m";
if (jvmOpts.isPresent() && jvmOpts.get().contains("-Xmx")) {
jvmOpts = Optional.of(jvmOpts.get().replaceAll("-Xmx\\S+", xmxSetting));
} else if (jvmOpts.isPresent()) {
jvmOpts = Optional.of(jvmOpts.get().concat(" " + xmxSetting));
} else {
jvmOpts = Optional.of(xmxSetting);
}
}
return jvmOpts;
String taskOpts = get(ShellCommandConfig.TASK_JVM_OPTS);
String autosizingContainerMaxHeap = get(JobConfig.JOB_AUTOSIZING_CONTAINER_MAX_HEAP_MB);

return Optional.ofNullable(getFinalJvmOptions(taskOpts, autosizingContainerMaxHeap));
}

public Optional<String> getWorkerOpts() {
String autosizingWorkerHeapMb = get(JobConfig.JOB_AUTOSIZING_WORKER_MAX_HEAP_MB);
String workerOpts = get(ShellCommandConfig.WORKER_JVM_OPTS);
return Optional.ofNullable(getFinalJvmOptions(workerOpts, autosizingWorkerHeapMb));
}

public Optional<String> getJavaHome() {
Expand All @@ -120,4 +119,23 @@ public Optional<String> getJavaHome() {
public Optional<String> getAdditionalClasspathDir() {
return Optional.ofNullable(get(ShellCommandConfig.ADDITIONAL_CLASSPATH_DIR));
}

@VisibleForTesting
String getFinalJvmOptions(String jvmOpts, String maxHeapOverride) {
String finalJvmOpts = jvmOpts;
if (new JobConfig(this).getAutosizingEnabled() && StringUtils.isNotEmpty(maxHeapOverride)) {
String xmxSetting = "-Xmx" + maxHeapOverride + "m";
if (StringUtils.isNotBlank(jvmOpts)) {
if (jvmOpts.contains("-Xmx")) {
finalJvmOpts = jvmOpts.replaceAll("-Xmx\\S+", xmxSetting);
} else {
finalJvmOpts = jvmOpts.concat(" " + xmxSetting);
}
} else {
finalJvmOpts = xmxSetting;
}
}

return finalJvmOpts;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public Map<String, String> buildEnvironment() {
envBuilder.put(ShellCommandConfig.ENV_CONTAINER_ID, this.id);
envBuilder.put(ShellCommandConfig.ENV_COORDINATOR_URL, this.url.toString());
envBuilder.put(ShellCommandConfig.ENV_JAVA_OPTS, shellCommandConfig.getTaskOpts().orElse(""));
envBuilder.put(ShellCommandConfig.WORKER_JVM_OPTS, shellCommandConfig.getWorkerOpts().orElse(""));
envBuilder.put(ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR,
shellCommandConfig.getAdditionalClasspathDir().orElse(""));
shellCommandConfig.getJavaHome().ifPresent(javaHome -> envBuilder.put(ShellCommandConfig.ENV_JAVA_HOME, javaHome));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@
import com.google.common.collect.ImmutableMap;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.*;


public class TestShellCommandConfig {
Expand Down Expand Up @@ -81,6 +80,66 @@ public void testGetTaskOptsAutosizingEnabled() {
assertEquals(Optional.of("-Dproperty=value -Xmx1024m"), shellCommandConfig.getTaskOpts());
}

@Test
public void testGetWorkerOptsAutosizingDisabled() {
ShellCommandConfig shellCommandConfig = new ShellCommandConfig(new MapConfig(
ImmutableMap.of(JobConfig.JOB_AUTOSIZING_WORKER_MAX_HEAP_MB,
"1024", "worker.opts", "-Xmx10m -Dproperty=value")));

String workerOpts = shellCommandConfig.getWorkerOpts()
.orElse(null);
String expectedOpts = "-Xmx10m -Dproperty=value";

assertNotNull(workerOpts);
assertEquals(expectedOpts, workerOpts);
}

@Test
public void testGetWorkerOptsAutosizingEnabled() {
ShellCommandConfig shellCommandConfig = new ShellCommandConfig(new MapConfig(
ImmutableMap.of(JobConfig.JOB_AUTOSIZING_ENABLED, "true", JobConfig.JOB_AUTOSIZING_WORKER_MAX_HEAP_MB,
"1024", "worker.opts", "-Xmx10m -Dproperty=value")));

String workerOpts = shellCommandConfig.getWorkerOpts()
.orElse(null);
String expectedOpts = "-Xmx1024m -Dproperty=value";

assertNotNull(workerOpts);
assertEquals(expectedOpts, workerOpts);
}

@Test
public void testGetFinalJvmOptionsAutosizingDisabled() {
ShellCommandConfig shellCommandConfig =
new ShellCommandConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_AUTOSIZING_ENABLED, "false")));
String jvmOptions = "";
String expectedJvmOptions = "";

// no override passed
assertEquals(expectedJvmOptions, shellCommandConfig.getFinalJvmOptions(jvmOptions, ""));

// ignore override since autosizing is disabled
assertEquals(expectedJvmOptions, shellCommandConfig.getFinalJvmOptions(jvmOptions, "2048"));
}

@Test
public void testGetFinalJvmOptionsAutosizingEnabled() {
ShellCommandConfig shellCommandConfig =
new ShellCommandConfig(new MapConfig(ImmutableMap.of(JobConfig.JOB_AUTOSIZING_ENABLED, "true")));
String jvmOptions = "-Xmx1024m";
String expectedJvmOptions = "-Xmx1024m";
assertEquals(expectedJvmOptions, shellCommandConfig.getFinalJvmOptions(jvmOptions, ""));

// override should take effect with autosizing enabled
expectedJvmOptions = "-Xmx2048m";
assertEquals(expectedJvmOptions, shellCommandConfig.getFinalJvmOptions(jvmOptions, "2048"));

// override should take effect even if xmx is not set
jvmOptions = "-Dproperty=value";
expectedJvmOptions = "-Dproperty=value -Xmx2048m";
assertEquals(expectedJvmOptions, shellCommandConfig.getFinalJvmOptions("", "2048"));
}

@Test
public void testGetJavaHome() {
ShellCommandConfig shellCommandConfig = new ShellCommandConfig(new MapConfig());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public void testBasicBuild() throws MalformedURLException {
ShellCommandConfig.ENV_CONTAINER_ID, "1",
ShellCommandConfig.ENV_COORDINATOR_URL, URL_STRING,
ShellCommandConfig.ENV_JAVA_OPTS, "",
ShellCommandConfig.WORKER_JVM_OPTS, "",
ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR, "");
// assertions when command path is not set
assertEquals("foo", shellCommandBuilder.buildCommand());
Expand All @@ -60,6 +61,7 @@ public void testBuildEnvironment() throws MalformedURLException {
Config config = new MapConfig(new ImmutableMap.Builder<String, String>()
.put(ShellCommandConfig.COMMAND_SHELL_EXECUTE, "foo")
.put(ShellCommandConfig.TASK_JVM_OPTS, "-Xmx4g")
.put(ShellCommandConfig.WORKER_JVM_OPTS, "-Xmx2g")
.put(ShellCommandConfig.ADDITIONAL_CLASSPATH_DIR, "/path/to/additional/classpath")
.put(ShellCommandConfig.TASK_JAVA_HOME, "/path/to/java/home")
.build());
Expand All @@ -71,6 +73,7 @@ public void testBuildEnvironment() throws MalformedURLException {
.put(ShellCommandConfig.ENV_CONTAINER_ID, "1")
.put(ShellCommandConfig.ENV_COORDINATOR_URL, URL_STRING)
.put(ShellCommandConfig.ENV_JAVA_OPTS, "-Xmx4g")
.put(ShellCommandConfig.WORKER_JVM_OPTS, "-Xmx2g")
.put(ShellCommandConfig.ENV_ADDITIONAL_CLASSPATH_DIR, "/path/to/additional/classpath")
.put(ShellCommandConfig.ENV_JAVA_HOME, "/path/to/java/home")
.build();
Expand Down

0 comments on commit 51394f0

Please sign in to comment.