Skip to content

Commit

Permalink
SAMZA-2791: Introduce callback timeout specific to watermark messages (
Browse files Browse the repository at this point in the history
…apache#1681)

Description:
Currently, watermark is implemented as a special message within Samza. However, in terms of processing semantics, it shares similar behavior to normal messages processed by the task. i.e., task.callback.timeout.ms, a configuration to tune the time until which runloop waits for a message to be processed applies to both watermark and normal messages.

However, this tie up constrains watermark processing logic to be bounded by the processing messages time bound. For Beam on Samza, we use watermark as a trigger to execute event timers which can take a long time depending on the number of timers accumulated. Especially, when the application is down, the timers accumulated could be too large and users will have to tune this configuration which will also impact fault tolerance behavior in case of failures/delays during processing messages.

Changes:
- Introduce callback timeout configuration specific to watermark
- Update configuration documentation
- Consolidate overload methods for TaskCallbackManager
- Always use watermark specific timeout even when run loop is in draining mode

API Changes:
- Internal change to constructor

Upgrade Instructions: None

Usage Instructions:
- Users can configure the timeout for watermark messages using task.callback.watermark.timeout.ms
- Refer to the configuration documentation for more details and defaults.
  • Loading branch information
mynameborat authored Aug 29, 2023
1 parent aeeaf0b commit 65bf4ae
Show file tree
Hide file tree
Showing 10 changed files with 196 additions and 21 deletions.
12 changes: 12 additions & 0 deletions docs/learn/documentation/versioned/jobs/configuration-table.html
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,18 @@ <h1>Samza Configuration Reference</h1>
</td>
</tr>

<tr>
<td class="property" id="task-callback-watermark-timeout-ms">task.callback.watermark.timeout.ms</td>
<td class="default">task.callback.watermark.timeout.ms</td>
<td class="description">
It defines the upper bound on the time taken by the task to process a watermark message.
When the timeout happens, it will throw a TaskCallbackTimeoutException and shut down the container.
Default is <i>task.callback.timeout.ms</i>. <b>Note:</b> In event of draining state, it is recommended
to keep the <i>task.callback.drain.timeout.ms</i> to be same as <i>task.callback.watermark.timeout.ms</i>
in order to not terminate drain prematurely due to higher latency for watermark processing.
</td>
</tr>

<tr>
<td class="property" id="task-consumer-batch-size">task.consumer.batch.size</td>
<td class="default">1</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ These are the basic properties for setting up a Samza application.
|job.systemstreampartition.<br>input.expansion.enabled|true|When enabled, this allows stateful jobs to expand or contract their partition count by a multiple of the previous count so that events from an input stream partition are processed on the same task as before. This will prevent erroneous results. This feature is disabled if the configuration is set to false or if the job is stateless. See [SEP-5](https://cwiki.apache.org/confluence/display/SAMZA/SEP-5%3A+Enable+partition+expansion+of+input+streams) for more details.|
|job.security.manager.<br>factory|(none)|This is the factory class used to create the proper SecurityManager to handle security for Samza containers when running in a secure environment, such as Yarn with Kerberos eanbled. Samza ships with one security manager by default:<br><br>`org.apache.samza.job.yarn.SamzaYarnSecurityManagerFactory`<br>Supports Samza containers to run properly in a Kerberos enabled Yarn cluster. Each Samza container, once started, will create a SamzaContainerSecurityManager. SamzaContainerSecurityManager runs on its separate thread and update user's delegation tokens at the interval specified by yarn.token.renewal.interval.seconds. See Yarn Security for details.|
|task.callback.timeout.ms|-1(no timeout)|For an AsyncStreamTask, this defines the max allowed time for a processAsync callback to complete. For a StreamTask, this is the max allowed time for a process call to complete. When the timeout happens,the container is shutdown. Default is no timeout.|
|task.callback.watermark.timeout.ms|task.callback.timeout.ms|It defines the upper bound on the time taken by the task to process a watermark message. When the timeout happens, it will throw a TaskCallbackTimeoutException and shut down the container. In event of draining, it is recommended to keep `task.callback.drain.timeout.ms` to be same as `task.callback.watermark.timeout.ms` in order to prevent drain from terminating prematurely due to higher latency for watermark processing.|
|task.chooser.class|`org.apache.samza.`<br>`system.chooser.`<br>`RoundRobinChooserFactory`|This property can be optionally set to override the default [message chooser](../container/streams.html#messagechooser), which determines the order in which messages from multiple input streams are processed. The value of this property is the fully-qualified name of a Java class that implements [MessageChooserFactory](../api/javadocs/org/apache/samza/system/chooser/MessageChooserFactory.html).|
|task.command.class|`org.apache.samza.job.`<br>`ShellCommandBuilder`|The fully-qualified name of the Java class which determines the command line and environment variables for a [container](../container/samza-container.html). It must be a subclass of [CommandBuilder](../api/javadocs/org/apache/samza/job/CommandBuilder.html). This defaults to task.command.class=`org.apache.samza.job.ShellCommandBuilder`.|
|task.drop.deserialization.errors|false|This property is to define how the system deals with deserialization failure situation. If set to true, the system will skip the error messages and keep running. If set to false, the system with throw exceptions and fail the container. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public String getAppRunnerClass() {
public ApplicationApiType getAppApiType() {
return ApplicationApiType.valueOf(get(APP_API_TYPE, ApplicationApiType.HIGH_LEVEL.name()).toUpperCase());
}

public boolean isHighLevelApiJob() {
return getAppApiType() == ApplicationApiType.HIGH_LEVEL;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ public long getDrainCallbackTimeoutMs() {
return taskConfig.getDrainCallbackTimeoutMs();
}

public long getWatermarkCallbackTimeoutMs() {
return taskConfig.getWatermarkCallbackTimeoutMs();
}

public boolean asyncCommitEnabled() {
return taskConfig.getAsyncCommit();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ public class TaskConfig extends MapConfig {
// default timeout for triggering a callback during drain
static final long DEFAULT_DRAIN_CALLBACK_TIMEOUT_MS = -1L;

public static final String WATERMARK_CALLBACK_TIMEOUT_MS = "task.callback.watermark.timeout.ms";

// enable async commit
public static final String ASYNC_COMMIT = "task.async.commit";
// maximum time to wait for a task worker to complete when there are no new messages to handle
Expand Down Expand Up @@ -235,6 +237,10 @@ public long getDrainCallbackTimeoutMs() {
return getLong(DRAIN_CALLBACK_TIMEOUT_MS, DEFAULT_DRAIN_CALLBACK_TIMEOUT_MS);
}

public long getWatermarkCallbackTimeoutMs() {
return getLong(WATERMARK_CALLBACK_TIMEOUT_MS, getCallbackTimeoutMs());
}

public boolean getAsyncCommit() {
return getBoolean(ASYNC_COMMIT, false);
}
Expand Down
36 changes: 28 additions & 8 deletions samza-core/src/main/java/org/apache/samza/container/RunLoop.java
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,10 @@ public class RunLoop implements Runnable, Throttleable {
private final int maxConcurrency;
private final long windowMs;
private final long commitMs;
private final long callbackTimeoutMs;
private final long messageCallbackTimeoutMs;
private final long drainCallbackTimeoutMs;

private final long watermarkCallbackTimeoutMs;
private final long maxIdleMs;
private final SamzaContainerMetrics containerMetrics;
private final ScheduledExecutorService workerTimer;
Expand Down Expand Up @@ -121,12 +123,15 @@ public RunLoop(Map<TaskName, RunLoopTask> runLoopTasks,
this.maxConcurrency = config.getMaxConcurrency();
log.info("Got task concurrency: {}.", maxConcurrency);

this.callbackTimeoutMs = config.getTaskCallbackTimeoutMs();
log.info("Got callback timeout for task in milliseconds: {}.", callbackTimeoutMs);
this.messageCallbackTimeoutMs = config.getTaskCallbackTimeoutMs();
log.info("Got default callback timeout for task in milliseconds: {}.", messageCallbackTimeoutMs);

this.drainCallbackTimeoutMs = config.getDrainCallbackTimeoutMs();
log.info("Got callback timeout for drain in milliseconds: {}.", drainCallbackTimeoutMs);

this.watermarkCallbackTimeoutMs = config.getWatermarkCallbackTimeoutMs();
log.info("Got callback timeout for watermark in milliseconds: {}.", watermarkCallbackTimeoutMs);

this.maxIdleMs = config.getMaxIdleMs();
log.info("Got max idle in milliseconds: {}.", maxIdleMs);

Expand All @@ -152,7 +157,7 @@ public RunLoop(Map<TaskName, RunLoopTask> runLoopTasks,
this.latch = new Object();
this.workerTimer = Executors.newSingleThreadScheduledExecutor();

this.callbackTimer = (callbackTimeoutMs > 0) ? Executors.newSingleThreadScheduledExecutor() : null;
this.callbackTimer = (messageCallbackTimeoutMs > 0) ? Executors.newSingleThreadScheduledExecutor() : null;
this.callbackExecutor = new ThrottlingScheduler(config.getMaxThrottlingDelayMs());
this.coordinatorRequests = new CoordinatorRequests(runLoopTasks.keySet());

Expand Down Expand Up @@ -492,7 +497,7 @@ private class AsyncTaskWorker implements TaskCallbackListener {

AsyncTaskWorker(RunLoopTask task) {
this.task = task;
this.callbackManager = new TaskCallbackManager(this, callbackTimer, callbackTimeoutMs, maxConcurrency, clock);
this.callbackManager = new TaskCallbackManager(this, callbackTimer, messageCallbackTimeoutMs, maxConcurrency, clock);
Set<SystemStreamPartition> sspSet = getWorkingSSPSet(task);
this.state = new AsyncTaskState(task.taskName(), task.metrics(), sspSet, !task.intermediateStreams().isEmpty(), isHighLevelApiJob,
runId);
Expand Down Expand Up @@ -635,9 +640,24 @@ public TaskCallback createCallback() {
containerMetrics.processes().inc();
// report 1 whenever the contaienr is running. Can be used to calculate the number of containers not running
containerMetrics.containerRunning().set(1L);
return isDraining && (envelope.isDrain() || envelope.isWatermark())
? callbackManager.createCallbackForDrain(task.taskName(), envelope, coordinator, drainCallbackTimeoutMs)
: callbackManager.createCallback(task.taskName(), envelope, coordinator);

/*
* Timeout used in the task callback. The value is determined based on the following logic
* 1. If run loop is in draining mode and the envelope is drain, use drainCallbackTimeoutMs
* 2. If the envelope is watermark, use watermarkCallbackTimeoutMs regardless of the modes. Setting a lower
* watermark callback timeout during draining mode can cause drain to be unsuccessful prematurely and
* vice-versa.
* 3. Use callbackTimeoutMs otherwise
*/
long timeout = messageCallbackTimeoutMs;

if (envelope.isWatermark()) {
timeout = watermarkCallbackTimeoutMs;
} else if (isDraining && envelope.isDrain()) {
timeout = drainCallbackTimeoutMs;
}

return callbackManager.createCallback(task.taskName(), envelope, coordinator, timeout);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,16 +109,9 @@ public TaskCallbackImpl createCallback(TaskName taskName,
* @param taskName task name
* @param envelope incoming envelope
* @param coordinator coordinator
* @param drainTimeout timeout for processing drain messages.
* @param callbackTimeout timeout to expire the callback
* */
public TaskCallbackImpl createCallbackForDrain(TaskName taskName,
IncomingMessageEnvelope envelope,
ReadableCoordinator coordinator,
long drainTimeout) {
return createCallback(taskName, envelope, coordinator, drainTimeout);
}

private TaskCallbackImpl createCallback(TaskName taskName,
public TaskCallbackImpl createCallback(TaskName taskName,
IncomingMessageEnvelope envelope,
ReadableCoordinator coordinator,
long callbackTimeout) {
Expand All @@ -129,7 +122,7 @@ private TaskCallbackImpl createCallback(TaskName taskName,
@Override
public void run() {
ThreadUtil.logThreadDump("Thread dump at task callback timeout");
String msg = "Callback for task {} " + callback.taskName + " timed out after " + timeout + " ms.";
String msg = "Callback for task {} " + callback.taskName + " timed out after " + callbackTimeout + " ms.";
callback.failure(new SamzaException(msg));
}
};
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.config;

import com.google.common.collect.ImmutableMap;
import org.junit.Test;

import static org.junit.Assert.*;


public class TestRunLoopConfig {

@Test
public void testWatermarkCallbackTimeoutDefaultsToTaskCallbackTimeout() {
long taskCallbackTimeout = 10L;
Config config = new MapConfig(ImmutableMap.of(TaskConfig.CALLBACK_TIMEOUT_MS, Long.toString(taskCallbackTimeout)));
RunLoopConfig runLoopConfig = new RunLoopConfig(config);
assertEquals("Watermark callback timeout should default to task callback timeout",
taskCallbackTimeout, runLoopConfig.getWatermarkCallbackTimeoutMs());
}

@Test
public void testWatermarkCallbackTimeout() {
long taskCallbackTimeout = 10L;
long watermarkCallbackTimeout = 20L;
Config config = new MapConfig(ImmutableMap.of(
TaskConfig.CALLBACK_TIMEOUT_MS, Long.toString(taskCallbackTimeout),
TaskConfig.WATERMARK_CALLBACK_TIMEOUT_MS, Long.toString(watermarkCallbackTimeout)));

RunLoopConfig runLoopConfig = new RunLoopConfig(config);
assertEquals("Mismatch in watermark callback timeout",
watermarkCallbackTimeout, runLoopConfig.getWatermarkCallbackTimeoutMs());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
Expand All @@ -48,7 +49,7 @@
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;


Expand Down Expand Up @@ -90,6 +91,8 @@ public class TestRunLoop {
private final IncomingMessageEnvelope sspB0Drain = IncomingMessageEnvelope.buildDrainMessage(sspB0, runId);
private final IncomingMessageEnvelope sspB1Drain = IncomingMessageEnvelope.buildDrainMessage(sspB1, runId);

private final IncomingMessageEnvelope watermarkA0 = IncomingMessageEnvelope.buildWatermarkEnvelope(sspA0, 1L);

@Rule
public Timeout maxTestDurationInSeconds = Timeout.seconds(120);

Expand Down Expand Up @@ -736,6 +739,91 @@ public void testExceptionIsPropagated() {
runLoop.run();
}

@Test
public void testWatermarkCallbackTimeout() throws InterruptedException {
final CountDownLatch watermarkProcessLatch = new CountDownLatch(1);

when(mockRunLoopConfig.getTaskCallbackTimeoutMs()).thenReturn(5L);
when(mockRunLoopConfig.getWatermarkCallbackTimeoutMs()).thenReturn(15L);

SystemConsumers consumers = mock(SystemConsumers.class);

RunLoopTask task0 = getMockRunLoopTask(taskName0, sspA0);
doAnswer(invocation -> {
TaskCallbackFactory callbackFactory = invocation.getArgumentAt(2, TaskCallbackFactory.class);
TaskCallback callback = callbackFactory.createCallback();
Thread.sleep(10);
callback.complete();
return null;
}).when(task0).process(eq(watermarkA0), any(), any());

doAnswer(invocation -> {
TaskCallbackFactory callbackFactory = invocation.getArgumentAt(2, TaskCallbackFactory.class);
callbackFactory.createCallback().complete();
return null;
}).when(task0).process(eq(envelopeA00), any(), any());

doAnswer(invocation -> {
TaskCallbackFactory callbackFactory = invocation.getArgumentAt(2, TaskCallbackFactory.class);
watermarkProcessLatch.countDown();
callbackFactory.createCallback().complete();
return null;
}).when(task0).process(eq(envelopeA01), any(), any());

Map<TaskName, RunLoopTask> tasks = ImmutableMap.of(taskName0, task0);

RunLoop runLoop = new RunLoop(tasks, executor, consumers, containerMetrics, () -> 0L, mockRunLoopConfig);

when(consumers.choose(false))
.thenReturn(envelopeA00)
.thenReturn(watermarkA0)
.thenReturn(envelopeA01)
.thenReturn(sspA0EndOfStream)
.thenReturn(null);

runLoop.run();
assertTrue(watermarkProcessLatch.await(15L, TimeUnit.MILLISECONDS));
}

@Test
public void testWatermarkCallbackTimeoutThrowsException() {
when(mockRunLoopConfig.getTaskCallbackTimeoutMs()).thenReturn(10L);
when(mockRunLoopConfig.getWatermarkCallbackTimeoutMs()).thenReturn(1L);

SystemConsumers consumers = mock(SystemConsumers.class);

RunLoopTask task0 = getMockRunLoopTask(taskName0, sspA0);
doAnswer(invocation -> {
TaskCallbackFactory callbackFactory = invocation.getArgumentAt(2, TaskCallbackFactory.class);
TaskCallback callback = callbackFactory.createCallback();
Thread.sleep(5);
callback.complete();
return null;
}).when(task0).process(eq(watermarkA0), any(), any());

doAnswer(invocation -> {
TaskCallbackFactory callbackFactory = invocation.getArgumentAt(2, TaskCallbackFactory.class);
callbackFactory.createCallback().complete();
return null;
}).when(task0).process(eq(envelopeA00), any(), any());

Map<TaskName, RunLoopTask> tasks = ImmutableMap.of(taskName0, task0);

RunLoop runLoop = new RunLoop(tasks, executor, consumers, containerMetrics, () -> 0L, mockRunLoopConfig);

when(consumers.choose(false))
.thenReturn(envelopeA00)
.thenReturn(watermarkA0)
.thenReturn(null);

try {
runLoop.run();
fail("Watermark callback should have timed out and failed run loop");
} catch (SamzaException e) {

}
}

private RunLoopTask getMockRunLoopTask(TaskName taskName, SystemStreamPartition ... ssps) {
RunLoopTask task0 = mock(RunLoopTask.class);
when(task0.systemStreamPartitions()).thenReturn(new HashSet<>(Arrays.asList(ssps)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ public void testCreateCallback() {

@Test
public void testCreateDrainCallback() {
TaskCallbackImpl callback = callbackManager.createCallbackForDrain(new TaskName("Partition 0"), mock(IncomingMessageEnvelope.class), null, -1);
TaskCallbackImpl callback = callbackManager.createCallback(new TaskName("Partition 0"), mock(IncomingMessageEnvelope.class), null, -1);
assertTrue(callback.matchSeqNum(0));

callback = callbackManager.createCallbackForDrain(new TaskName("Partition 0"), mock(IncomingMessageEnvelope.class), null, -1);
callback = callbackManager.createCallback(new TaskName("Partition 0"), mock(IncomingMessageEnvelope.class), null, -1);
assertTrue(callback.matchSeqNum(1));
}

Expand Down

0 comments on commit 65bf4ae

Please sign in to comment.