From 65bf4ae058d4d674cc63c3f172f3f348a4513640 Mon Sep 17 00:00:00 2001 From: Bharath Kumarasubramanian Date: Tue, 29 Aug 2023 14:49:50 -0700 Subject: [PATCH] SAMZA-2791: Introduce callback timeout specific to watermark messages (#1681) Description: Currently, watermark is implemented as a special message within Samza. However, in terms of processing semantics, it shares similar behavior to normal messages processed by the task. i.e., task.callback.timeout.ms, a configuration to tune the time until which runloop waits for a message to be processed applies to both watermark and normal messages. However, this tie up constrains watermark processing logic to be bounded by the processing messages time bound. For Beam on Samza, we use watermark as a trigger to execute event timers which can take a long time depending on the number of timers accumulated. Especially, when the application is down, the timers accumulated could be too large and users will have to tune this configuration which will also impact fault tolerance behavior in case of failures/delays during processing messages. Changes: - Introduce callback timeout configuration specific to watermark - Update configuration documentation - Consolidate overload methods for TaskCallbackManager - Always use watermark specific timeout even when run loop is in draining mode API Changes: - Internal change to constructor Upgrade Instructions: None Usage Instructions: - Users can configure the timeout for watermark messages using task.callback.watermark.timeout.ms - Refer to the configuration documentation for more details and defaults. --- .../versioned/jobs/configuration-table.html | 12 +++ .../versioned/jobs/samza-configurations.md | 1 + .../samza/config/ApplicationConfig.java | 1 + .../apache/samza/config/RunLoopConfig.java | 4 + .../org/apache/samza/config/TaskConfig.java | 6 ++ .../org/apache/samza/container/RunLoop.java | 36 ++++++-- .../samza/task/TaskCallbackManager.java | 13 +-- .../samza/config/TestRunLoopConfig.java | 50 +++++++++++ .../apache/samza/container/TestRunLoop.java | 90 ++++++++++++++++++- .../samza/task/TestTaskCallbackManager.java | 4 +- 10 files changed, 196 insertions(+), 21 deletions(-) create mode 100644 samza-core/src/test/java/org/apache/samza/config/TestRunLoopConfig.java diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html index 18a782f497..f4c8d4d7ba 100644 --- a/docs/learn/documentation/versioned/jobs/configuration-table.html +++ b/docs/learn/documentation/versioned/jobs/configuration-table.html @@ -863,6 +863,18 @@

Samza Configuration Reference

+ + task.callback.watermark.timeout.ms + task.callback.watermark.timeout.ms + + It defines the upper bound on the time taken by the task to process a watermark message. + When the timeout happens, it will throw a TaskCallbackTimeoutException and shut down the container. + Default is task.callback.timeout.ms. Note: In event of draining state, it is recommended + to keep the task.callback.drain.timeout.ms to be same as task.callback.watermark.timeout.ms + in order to not terminate drain prematurely due to higher latency for watermark processing. + + + task.consumer.batch.size 1 diff --git a/docs/learn/documentation/versioned/jobs/samza-configurations.md b/docs/learn/documentation/versioned/jobs/samza-configurations.md index 60d8ae8b59..6188e90b47 100644 --- a/docs/learn/documentation/versioned/jobs/samza-configurations.md +++ b/docs/learn/documentation/versioned/jobs/samza-configurations.md @@ -85,6 +85,7 @@ These are the basic properties for setting up a Samza application. |job.systemstreampartition.
input.expansion.enabled|true|When enabled, this allows stateful jobs to expand or contract their partition count by a multiple of the previous count so that events from an input stream partition are processed on the same task as before. This will prevent erroneous results. This feature is disabled if the configuration is set to false or if the job is stateless. See [SEP-5](https://cwiki.apache.org/confluence/display/SAMZA/SEP-5%3A+Enable+partition+expansion+of+input+streams) for more details.| |job.security.manager.
factory|(none)|This is the factory class used to create the proper SecurityManager to handle security for Samza containers when running in a secure environment, such as Yarn with Kerberos eanbled. Samza ships with one security manager by default:

`org.apache.samza.job.yarn.SamzaYarnSecurityManagerFactory`
Supports Samza containers to run properly in a Kerberos enabled Yarn cluster. Each Samza container, once started, will create a SamzaContainerSecurityManager. SamzaContainerSecurityManager runs on its separate thread and update user's delegation tokens at the interval specified by yarn.token.renewal.interval.seconds. See Yarn Security for details.| |task.callback.timeout.ms|-1(no timeout)|For an AsyncStreamTask, this defines the max allowed time for a processAsync callback to complete. For a StreamTask, this is the max allowed time for a process call to complete. When the timeout happens,the container is shutdown. Default is no timeout.| +|task.callback.watermark.timeout.ms|task.callback.timeout.ms|It defines the upper bound on the time taken by the task to process a watermark message. When the timeout happens, it will throw a TaskCallbackTimeoutException and shut down the container. In event of draining, it is recommended to keep `task.callback.drain.timeout.ms` to be same as `task.callback.watermark.timeout.ms` in order to prevent drain from terminating prematurely due to higher latency for watermark processing.| |task.chooser.class|`org.apache.samza.`
`system.chooser.`
`RoundRobinChooserFactory`|This property can be optionally set to override the default [message chooser](../container/streams.html#messagechooser), which determines the order in which messages from multiple input streams are processed. The value of this property is the fully-qualified name of a Java class that implements [MessageChooserFactory](../api/javadocs/org/apache/samza/system/chooser/MessageChooserFactory.html).| |task.command.class|`org.apache.samza.job.`
`ShellCommandBuilder`|The fully-qualified name of the Java class which determines the command line and environment variables for a [container](../container/samza-container.html). It must be a subclass of [CommandBuilder](../api/javadocs/org/apache/samza/job/CommandBuilder.html). This defaults to task.command.class=`org.apache.samza.job.ShellCommandBuilder`.| |task.drop.deserialization.errors|false|This property is to define how the system deals with deserialization failure situation. If set to true, the system will skip the error messages and keep running. If set to false, the system with throw exceptions and fail the container. | diff --git a/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java b/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java index 2363bc7536..da5402de9d 100644 --- a/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/ApplicationConfig.java @@ -114,6 +114,7 @@ public String getAppRunnerClass() { public ApplicationApiType getAppApiType() { return ApplicationApiType.valueOf(get(APP_API_TYPE, ApplicationApiType.HIGH_LEVEL.name()).toUpperCase()); } + public boolean isHighLevelApiJob() { return getAppApiType() == ApplicationApiType.HIGH_LEVEL; } diff --git a/samza-core/src/main/java/org/apache/samza/config/RunLoopConfig.java b/samza-core/src/main/java/org/apache/samza/config/RunLoopConfig.java index ee1d7f71f0..2c95c51413 100644 --- a/samza-core/src/main/java/org/apache/samza/config/RunLoopConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/RunLoopConfig.java @@ -50,6 +50,10 @@ public long getDrainCallbackTimeoutMs() { return taskConfig.getDrainCallbackTimeoutMs(); } + public long getWatermarkCallbackTimeoutMs() { + return taskConfig.getWatermarkCallbackTimeoutMs(); + } + public boolean asyncCommitEnabled() { return taskConfig.getAsyncCommit(); } diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java index 4d7847a91a..b1a51e5220 100644 --- a/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfig.java @@ -97,6 +97,8 @@ public class TaskConfig extends MapConfig { // default timeout for triggering a callback during drain static final long DEFAULT_DRAIN_CALLBACK_TIMEOUT_MS = -1L; + public static final String WATERMARK_CALLBACK_TIMEOUT_MS = "task.callback.watermark.timeout.ms"; + // enable async commit public static final String ASYNC_COMMIT = "task.async.commit"; // maximum time to wait for a task worker to complete when there are no new messages to handle @@ -235,6 +237,10 @@ public long getDrainCallbackTimeoutMs() { return getLong(DRAIN_CALLBACK_TIMEOUT_MS, DEFAULT_DRAIN_CALLBACK_TIMEOUT_MS); } + public long getWatermarkCallbackTimeoutMs() { + return getLong(WATERMARK_CALLBACK_TIMEOUT_MS, getCallbackTimeoutMs()); + } + public boolean getAsyncCommit() { return getBoolean(ASYNC_COMMIT, false); } diff --git a/samza-core/src/main/java/org/apache/samza/container/RunLoop.java b/samza-core/src/main/java/org/apache/samza/container/RunLoop.java index 25c924f005..e96f986dd7 100644 --- a/samza-core/src/main/java/org/apache/samza/container/RunLoop.java +++ b/samza-core/src/main/java/org/apache/samza/container/RunLoop.java @@ -78,8 +78,10 @@ public class RunLoop implements Runnable, Throttleable { private final int maxConcurrency; private final long windowMs; private final long commitMs; - private final long callbackTimeoutMs; + private final long messageCallbackTimeoutMs; private final long drainCallbackTimeoutMs; + + private final long watermarkCallbackTimeoutMs; private final long maxIdleMs; private final SamzaContainerMetrics containerMetrics; private final ScheduledExecutorService workerTimer; @@ -121,12 +123,15 @@ public RunLoop(Map runLoopTasks, this.maxConcurrency = config.getMaxConcurrency(); log.info("Got task concurrency: {}.", maxConcurrency); - this.callbackTimeoutMs = config.getTaskCallbackTimeoutMs(); - log.info("Got callback timeout for task in milliseconds: {}.", callbackTimeoutMs); + this.messageCallbackTimeoutMs = config.getTaskCallbackTimeoutMs(); + log.info("Got default callback timeout for task in milliseconds: {}.", messageCallbackTimeoutMs); this.drainCallbackTimeoutMs = config.getDrainCallbackTimeoutMs(); log.info("Got callback timeout for drain in milliseconds: {}.", drainCallbackTimeoutMs); + this.watermarkCallbackTimeoutMs = config.getWatermarkCallbackTimeoutMs(); + log.info("Got callback timeout for watermark in milliseconds: {}.", watermarkCallbackTimeoutMs); + this.maxIdleMs = config.getMaxIdleMs(); log.info("Got max idle in milliseconds: {}.", maxIdleMs); @@ -152,7 +157,7 @@ public RunLoop(Map runLoopTasks, this.latch = new Object(); this.workerTimer = Executors.newSingleThreadScheduledExecutor(); - this.callbackTimer = (callbackTimeoutMs > 0) ? Executors.newSingleThreadScheduledExecutor() : null; + this.callbackTimer = (messageCallbackTimeoutMs > 0) ? Executors.newSingleThreadScheduledExecutor() : null; this.callbackExecutor = new ThrottlingScheduler(config.getMaxThrottlingDelayMs()); this.coordinatorRequests = new CoordinatorRequests(runLoopTasks.keySet()); @@ -492,7 +497,7 @@ private class AsyncTaskWorker implements TaskCallbackListener { AsyncTaskWorker(RunLoopTask task) { this.task = task; - this.callbackManager = new TaskCallbackManager(this, callbackTimer, callbackTimeoutMs, maxConcurrency, clock); + this.callbackManager = new TaskCallbackManager(this, callbackTimer, messageCallbackTimeoutMs, maxConcurrency, clock); Set sspSet = getWorkingSSPSet(task); this.state = new AsyncTaskState(task.taskName(), task.metrics(), sspSet, !task.intermediateStreams().isEmpty(), isHighLevelApiJob, runId); @@ -635,9 +640,24 @@ public TaskCallback createCallback() { containerMetrics.processes().inc(); // report 1 whenever the contaienr is running. Can be used to calculate the number of containers not running containerMetrics.containerRunning().set(1L); - return isDraining && (envelope.isDrain() || envelope.isWatermark()) - ? callbackManager.createCallbackForDrain(task.taskName(), envelope, coordinator, drainCallbackTimeoutMs) - : callbackManager.createCallback(task.taskName(), envelope, coordinator); + + /* + * Timeout used in the task callback. The value is determined based on the following logic + * 1. If run loop is in draining mode and the envelope is drain, use drainCallbackTimeoutMs + * 2. If the envelope is watermark, use watermarkCallbackTimeoutMs regardless of the modes. Setting a lower + * watermark callback timeout during draining mode can cause drain to be unsuccessful prematurely and + * vice-versa. + * 3. Use callbackTimeoutMs otherwise + */ + long timeout = messageCallbackTimeoutMs; + + if (envelope.isWatermark()) { + timeout = watermarkCallbackTimeoutMs; + } else if (isDraining && envelope.isDrain()) { + timeout = drainCallbackTimeoutMs; + } + + return callbackManager.createCallback(task.taskName(), envelope, coordinator, timeout); } }; diff --git a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java index d435615d28..8576c79e30 100644 --- a/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java +++ b/samza-core/src/main/java/org/apache/samza/task/TaskCallbackManager.java @@ -109,16 +109,9 @@ public TaskCallbackImpl createCallback(TaskName taskName, * @param taskName task name * @param envelope incoming envelope * @param coordinator coordinator - * @param drainTimeout timeout for processing drain messages. + * @param callbackTimeout timeout to expire the callback * */ - public TaskCallbackImpl createCallbackForDrain(TaskName taskName, - IncomingMessageEnvelope envelope, - ReadableCoordinator coordinator, - long drainTimeout) { - return createCallback(taskName, envelope, coordinator, drainTimeout); - } - - private TaskCallbackImpl createCallback(TaskName taskName, + public TaskCallbackImpl createCallback(TaskName taskName, IncomingMessageEnvelope envelope, ReadableCoordinator coordinator, long callbackTimeout) { @@ -129,7 +122,7 @@ private TaskCallbackImpl createCallback(TaskName taskName, @Override public void run() { ThreadUtil.logThreadDump("Thread dump at task callback timeout"); - String msg = "Callback for task {} " + callback.taskName + " timed out after " + timeout + " ms."; + String msg = "Callback for task {} " + callback.taskName + " timed out after " + callbackTimeout + " ms."; callback.failure(new SamzaException(msg)); } }; diff --git a/samza-core/src/test/java/org/apache/samza/config/TestRunLoopConfig.java b/samza-core/src/test/java/org/apache/samza/config/TestRunLoopConfig.java new file mode 100644 index 0000000000..e0e737366e --- /dev/null +++ b/samza-core/src/test/java/org/apache/samza/config/TestRunLoopConfig.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.config; + +import com.google.common.collect.ImmutableMap; +import org.junit.Test; + +import static org.junit.Assert.*; + + +public class TestRunLoopConfig { + + @Test + public void testWatermarkCallbackTimeoutDefaultsToTaskCallbackTimeout() { + long taskCallbackTimeout = 10L; + Config config = new MapConfig(ImmutableMap.of(TaskConfig.CALLBACK_TIMEOUT_MS, Long.toString(taskCallbackTimeout))); + RunLoopConfig runLoopConfig = new RunLoopConfig(config); + assertEquals("Watermark callback timeout should default to task callback timeout", + taskCallbackTimeout, runLoopConfig.getWatermarkCallbackTimeoutMs()); + } + + @Test + public void testWatermarkCallbackTimeout() { + long taskCallbackTimeout = 10L; + long watermarkCallbackTimeout = 20L; + Config config = new MapConfig(ImmutableMap.of( + TaskConfig.CALLBACK_TIMEOUT_MS, Long.toString(taskCallbackTimeout), + TaskConfig.WATERMARK_CALLBACK_TIMEOUT_MS, Long.toString(watermarkCallbackTimeout))); + + RunLoopConfig runLoopConfig = new RunLoopConfig(config); + assertEquals("Mismatch in watermark callback timeout", + watermarkCallbackTimeout, runLoopConfig.getWatermarkCallbackTimeoutMs()); + } +} diff --git a/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java b/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java index c3a030a9db..5afb82abc3 100644 --- a/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java +++ b/samza-core/src/test/java/org/apache/samza/container/TestRunLoop.java @@ -27,6 +27,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.samza.Partition; import org.apache.samza.SamzaException; @@ -48,7 +49,7 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; -import static org.junit.Assert.assertEquals; +import static org.junit.Assert.*; import static org.mockito.Mockito.*; @@ -90,6 +91,8 @@ public class TestRunLoop { private final IncomingMessageEnvelope sspB0Drain = IncomingMessageEnvelope.buildDrainMessage(sspB0, runId); private final IncomingMessageEnvelope sspB1Drain = IncomingMessageEnvelope.buildDrainMessage(sspB1, runId); + private final IncomingMessageEnvelope watermarkA0 = IncomingMessageEnvelope.buildWatermarkEnvelope(sspA0, 1L); + @Rule public Timeout maxTestDurationInSeconds = Timeout.seconds(120); @@ -736,6 +739,91 @@ public void testExceptionIsPropagated() { runLoop.run(); } + @Test + public void testWatermarkCallbackTimeout() throws InterruptedException { + final CountDownLatch watermarkProcessLatch = new CountDownLatch(1); + + when(mockRunLoopConfig.getTaskCallbackTimeoutMs()).thenReturn(5L); + when(mockRunLoopConfig.getWatermarkCallbackTimeoutMs()).thenReturn(15L); + + SystemConsumers consumers = mock(SystemConsumers.class); + + RunLoopTask task0 = getMockRunLoopTask(taskName0, sspA0); + doAnswer(invocation -> { + TaskCallbackFactory callbackFactory = invocation.getArgumentAt(2, TaskCallbackFactory.class); + TaskCallback callback = callbackFactory.createCallback(); + Thread.sleep(10); + callback.complete(); + return null; + }).when(task0).process(eq(watermarkA0), any(), any()); + + doAnswer(invocation -> { + TaskCallbackFactory callbackFactory = invocation.getArgumentAt(2, TaskCallbackFactory.class); + callbackFactory.createCallback().complete(); + return null; + }).when(task0).process(eq(envelopeA00), any(), any()); + + doAnswer(invocation -> { + TaskCallbackFactory callbackFactory = invocation.getArgumentAt(2, TaskCallbackFactory.class); + watermarkProcessLatch.countDown(); + callbackFactory.createCallback().complete(); + return null; + }).when(task0).process(eq(envelopeA01), any(), any()); + + Map tasks = ImmutableMap.of(taskName0, task0); + + RunLoop runLoop = new RunLoop(tasks, executor, consumers, containerMetrics, () -> 0L, mockRunLoopConfig); + + when(consumers.choose(false)) + .thenReturn(envelopeA00) + .thenReturn(watermarkA0) + .thenReturn(envelopeA01) + .thenReturn(sspA0EndOfStream) + .thenReturn(null); + + runLoop.run(); + assertTrue(watermarkProcessLatch.await(15L, TimeUnit.MILLISECONDS)); + } + + @Test + public void testWatermarkCallbackTimeoutThrowsException() { + when(mockRunLoopConfig.getTaskCallbackTimeoutMs()).thenReturn(10L); + when(mockRunLoopConfig.getWatermarkCallbackTimeoutMs()).thenReturn(1L); + + SystemConsumers consumers = mock(SystemConsumers.class); + + RunLoopTask task0 = getMockRunLoopTask(taskName0, sspA0); + doAnswer(invocation -> { + TaskCallbackFactory callbackFactory = invocation.getArgumentAt(2, TaskCallbackFactory.class); + TaskCallback callback = callbackFactory.createCallback(); + Thread.sleep(5); + callback.complete(); + return null; + }).when(task0).process(eq(watermarkA0), any(), any()); + + doAnswer(invocation -> { + TaskCallbackFactory callbackFactory = invocation.getArgumentAt(2, TaskCallbackFactory.class); + callbackFactory.createCallback().complete(); + return null; + }).when(task0).process(eq(envelopeA00), any(), any()); + + Map tasks = ImmutableMap.of(taskName0, task0); + + RunLoop runLoop = new RunLoop(tasks, executor, consumers, containerMetrics, () -> 0L, mockRunLoopConfig); + + when(consumers.choose(false)) + .thenReturn(envelopeA00) + .thenReturn(watermarkA0) + .thenReturn(null); + + try { + runLoop.run(); + fail("Watermark callback should have timed out and failed run loop"); + } catch (SamzaException e) { + + } + } + private RunLoopTask getMockRunLoopTask(TaskName taskName, SystemStreamPartition ... ssps) { RunLoopTask task0 = mock(RunLoopTask.class); when(task0.systemStreamPartitions()).thenReturn(new HashSet<>(Arrays.asList(ssps))); diff --git a/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java b/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java index de418c0141..972f8d3b80 100644 --- a/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java +++ b/samza-core/src/test/java/org/apache/samza/task/TestTaskCallbackManager.java @@ -67,10 +67,10 @@ public void testCreateCallback() { @Test public void testCreateDrainCallback() { - TaskCallbackImpl callback = callbackManager.createCallbackForDrain(new TaskName("Partition 0"), mock(IncomingMessageEnvelope.class), null, -1); + TaskCallbackImpl callback = callbackManager.createCallback(new TaskName("Partition 0"), mock(IncomingMessageEnvelope.class), null, -1); assertTrue(callback.matchSeqNum(0)); - callback = callbackManager.createCallbackForDrain(new TaskName("Partition 0"), mock(IncomingMessageEnvelope.class), null, -1); + callback = callbackManager.createCallback(new TaskName("Partition 0"), mock(IncomingMessageEnvelope.class), null, -1); assertTrue(callback.matchSeqNum(1)); }