Skip to content

Commit

Permalink
SAMZA-2790: Cleanup RunLoop constructor explosion (apache#1680)
Browse files Browse the repository at this point in the history
Description:
Runloop currently takes in lot of parameters and the constructor has grown to the point where it is unmanageable with multiple overloads. Introducing new configuration requires lot of updates to existing tests and components even if the parameters have no effect on all of the usages.

With this PR, we should be able to decouple different users of RunLoop and enable these components to have their own scoped config. e.g., SideInputManager can now have its own set of runloop parameters without having to tie itself with TaskConfig.

Changes:
Introduce RunLoopConfig, a container object to hold all required parameters for runloop from Config.
Remove existing overloads of constructor
Simplify the constructor to take RunLoopConfig and initialize the necessary components and fields
Introduce SideInputManagerRunLoopConfig, an overload of RunLoopConfig to be used within SideInputManager
Modify RunLoopFactory create method signature
Clean up ApplicationUtil and moved the method to ApplicationConfig and added unit tests

API Changes:
No external API change
  • Loading branch information
mynameborat authored Aug 29, 2023
1 parent fa4008e commit aeeaf0b
Show file tree
Hide file tree
Showing 10 changed files with 293 additions and 191 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,4 @@ public static SamzaApplication fromConfig(Config config) {
}
return new LegacyTaskApplication(taskClassOption.get());
}

/**
* Determines if the job is a Samza high-level job.
* @param config config
* */
public static boolean isHighLevelApiJob(Config config) {
final ApplicationConfig applicationConfig = new ApplicationConfig(config);
return applicationConfig.getAppApiType() == ApplicationApiType.HIGH_LEVEL;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,7 @@ public String getAppRunnerClass() {
public ApplicationApiType getAppApiType() {
return ApplicationApiType.valueOf(get(APP_API_TYPE, ApplicationApiType.HIGH_LEVEL.name()).toUpperCase());
}
public boolean isHighLevelApiJob() {
return getAppApiType() == ApplicationApiType.HIGH_LEVEL;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.config;

import java.util.concurrent.TimeUnit;


/**
* A container class to hold run loop related configurations to prevent constructor explosion
* in {@link org.apache.samza.container.RunLoop}
*/
public class RunLoopConfig extends MapConfig {
private static final String CONTAINER_DISK_QUOTA_DELAY_MAX_MS = "container.disk.quota.delay.max.ms";
private ApplicationConfig appConfig;
private JobConfig jobConfig;
private TaskConfig taskConfig;

public RunLoopConfig(Config config) {
super(config);
this.appConfig = new ApplicationConfig(config);
this.jobConfig = new JobConfig(config);
this.taskConfig = new TaskConfig(config);
}

public int getMaxConcurrency() {
return taskConfig.getMaxConcurrency();
}

public long getTaskCallbackTimeoutMs() {
return taskConfig.getCallbackTimeoutMs();
}

public long getDrainCallbackTimeoutMs() {
return taskConfig.getDrainCallbackTimeoutMs();
}

public boolean asyncCommitEnabled() {
return taskConfig.getAsyncCommit();
}

public long getWindowMs() {
return taskConfig.getWindowMs();
}

public long getCommitMs() {
return taskConfig.getCommitMs();
}

public long getMaxIdleMs() {
return taskConfig.getMaxIdleMs();
}

public long getMaxThrottlingDelayMs() {
return getLong(CONTAINER_DISK_QUOTA_DELAY_MAX_MS, TimeUnit.SECONDS.toMillis(1));
}

public String getRunId() {
return appConfig.getRunId();
}

public int getElasticityFactor() {
return jobConfig.getElasticityFactor();
}

public boolean isHighLevelApiJob() {
return appConfig.isHighLevelApiJob();
}
}
92 changes: 49 additions & 43 deletions samza-core/src/main/java/org/apache/samza/container/RunLoop.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.samza.SamzaException;
import org.apache.samza.config.RunLoopConfig;
import org.apache.samza.system.DrainMessage;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.MessageType;
Expand Down Expand Up @@ -94,61 +95,66 @@ public class RunLoop implements Runnable, Throttleable {
private final boolean isHighLevelApiJob;
private boolean isDraining = false;

/*
* Order of initialization
* 1. Initialize fields with arguments passed to the constructor
* 2. Initialize fields that are constructed within the constructor
* 3. Initialize fields that are derived and constructed using the arguments passed to the constructor
*/
public RunLoop(Map<TaskName, RunLoopTask> runLoopTasks,
ExecutorService threadPool,
SystemConsumers consumerMultiplexer,
int maxConcurrency,
long windowMs,
long commitMs,
long callbackTimeoutMs,
long drainCallbackTimeoutMs,
long maxThrottlingDelayMs,
long maxIdleMs,
SamzaContainerMetrics containerMetrics,
HighResolutionClock clock,
boolean isAsyncCommitEnabled) {
this(runLoopTasks, threadPool, consumerMultiplexer, maxConcurrency, windowMs, commitMs, callbackTimeoutMs,
drainCallbackTimeoutMs, maxThrottlingDelayMs, maxIdleMs, containerMetrics, clock, isAsyncCommitEnabled, 1, null, false);
}

public RunLoop(Map<TaskName, RunLoopTask> runLoopTasks,
ExecutorService threadPool,
SystemConsumers consumerMultiplexer,
int maxConcurrency,
long windowMs,
long commitMs,
long callbackTimeoutMs,
long drainCallbackTimeoutMs,
long maxThrottlingDelayMs,
long maxIdleMs,
SamzaContainerMetrics containerMetrics,
HighResolutionClock clock,
boolean isAsyncCommitEnabled,
int elasticityFactor,
String runId,
boolean isHighLevelApiJob) {
RunLoopConfig config) {

this.threadPool = threadPool;
this.consumerMultiplexer = consumerMultiplexer;
this.containerMetrics = containerMetrics;
this.windowMs = windowMs;
this.commitMs = commitMs;
this.maxConcurrency = maxConcurrency;
this.callbackTimeoutMs = callbackTimeoutMs;
this.drainCallbackTimeoutMs = drainCallbackTimeoutMs;
this.maxIdleMs = maxIdleMs;
this.callbackTimer = (callbackTimeoutMs > 0) ? Executors.newSingleThreadScheduledExecutor() : null;
this.callbackExecutor = new ThrottlingScheduler(maxThrottlingDelayMs);
this.coordinatorRequests = new CoordinatorRequests(runLoopTasks.keySet());
this.latch = new Object();
this.workerTimer = Executors.newSingleThreadScheduledExecutor();

this.windowMs = config.getWindowMs();
log.info("Got window milliseconds: {}.", windowMs);

this.commitMs = config.getCommitMs();
log.info("Got commit milliseconds: {}.", commitMs);

this.maxConcurrency = config.getMaxConcurrency();
log.info("Got task concurrency: {}.", maxConcurrency);

this.callbackTimeoutMs = config.getTaskCallbackTimeoutMs();
log.info("Got callback timeout for task in milliseconds: {}.", callbackTimeoutMs);

this.drainCallbackTimeoutMs = config.getDrainCallbackTimeoutMs();
log.info("Got callback timeout for drain in milliseconds: {}.", drainCallbackTimeoutMs);

this.maxIdleMs = config.getMaxIdleMs();
log.info("Got max idle in milliseconds: {}.", maxIdleMs);

this.clock = clock;
// assign runId before creating workers. As the inner AsyncTaskWorker class is not static, it relies on
// the outer class fields to be init first
this.runId = runId;
this.isHighLevelApiJob = isHighLevelApiJob;
this.isAsyncCommitEnabled = isAsyncCommitEnabled;
this.elasticityFactor = elasticityFactor;
this.runId = config.getRunId();
log.info("Got current run Id: {}.", runId);

this.isHighLevelApiJob = config.isHighLevelApiJob();
if (isHighLevelApiJob) {
log.info("The application uses high-level API.");
} else {
log.info("The application doesn't use high-level API.");
}

this.isAsyncCommitEnabled = config.asyncCommitEnabled();
log.info("Got async commit enabled={}.", isAsyncCommitEnabled);

this.elasticityFactor = config.getElasticityFactor();
log.info("Got elasticity factor: {}.", elasticityFactor);

this.latch = new Object();
this.workerTimer = Executors.newSingleThreadScheduledExecutor();

this.callbackTimer = (callbackTimeoutMs > 0) ? Executors.newSingleThreadScheduledExecutor() : null;
this.callbackExecutor = new ThrottlingScheduler(config.getMaxThrottlingDelayMs());
this.coordinatorRequests = new CoordinatorRequests(runLoopTasks.keySet());

Map<TaskName, AsyncTaskWorker> workers = new HashMap<>();
for (RunLoopTask task : runLoopTasks.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@

package org.apache.samza.container;

import org.apache.samza.config.TaskConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.RunLoopConfig;
import org.apache.samza.system.SystemConsumers;
import org.apache.samza.util.HighResolutionClock;
import org.slf4j.Logger;
Expand All @@ -37,65 +38,18 @@ public class RunLoopFactory {
public static Runnable createRunLoop(scala.collection.immutable.Map<TaskName, RunLoopTask> taskInstances,
SystemConsumers consumerMultiplexer,
ExecutorService threadPool,
long maxThrottlingDelayMs,
SamzaContainerMetrics containerMetrics,
TaskConfig taskConfig,
HighResolutionClock clock,
int elasticityFactor,
String runId,
boolean isHighLevelApiJob) {

long taskWindowMs = taskConfig.getWindowMs();

log.info("Got window milliseconds: {}.", taskWindowMs);

long taskCommitMs = taskConfig.getCommitMs();

log.info("Got commit milliseconds: {}.", taskCommitMs);

int taskMaxConcurrency = taskConfig.getMaxConcurrency();
log.info("Got taskMaxConcurrency: {}.", taskMaxConcurrency);

boolean isAsyncCommitEnabled = taskConfig.getAsyncCommit();
log.info("Got asyncCommitEnabled: {}.", isAsyncCommitEnabled);

long callbackTimeout = taskConfig.getCallbackTimeoutMs();
log.info("Got callbackTimeout: {}.", callbackTimeout);

long drainCallbackTimeout = taskConfig.getDrainCallbackTimeoutMs();
log.info("Got callback timeout for drain: {}.", callbackTimeout);

long maxIdleMs = taskConfig.getMaxIdleMs();
log.info("Got maxIdleMs: {}.", maxIdleMs);

log.info("Got elasticity factor: {}.", elasticityFactor);

log.info("Got current run Id: {}.", runId);

if (isHighLevelApiJob) {
log.info("The application uses high-level API.");
} else {
log.info("The application doesn't use high-level API.");
}
Config config) {

log.info("Run loop in asynchronous mode.");

return new RunLoop(
JavaConverters.mapAsJavaMapConverter(taskInstances).asJava(),
threadPool,
consumerMultiplexer,
taskMaxConcurrency,
taskWindowMs,
taskCommitMs,
callbackTimeout,
drainCallbackTimeout,
maxThrottlingDelayMs,
maxIdleMs,
containerMetrics,
clock,
isAsyncCommitEnabled,
elasticityFactor,
runId,
isHighLevelApiJob);
new RunLoopConfig(config));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import java.util.function.Consumer
import java.util.{Base64, Optional}
import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.samza.SamzaException
import org.apache.samza.application.ApplicationUtil
import org.apache.samza.checkpoint.{Checkpoint, CheckpointListener, OffsetManager, OffsetManagerMetrics}
import org.apache.samza.clustermanager.StandbyTaskUtil
import org.apache.samza.config.{StreamConfig, _}
Expand Down Expand Up @@ -624,21 +623,13 @@ object SamzaContainer extends Logging {
(taskName, taskInstance)
}).toMap

val maxThrottlingDelayMs = config.getLong("container.disk.quota.delay.max.ms", TimeUnit.SECONDS.toMillis(1))

val isHighLevelApiJob = ApplicationUtil.isHighLevelApiJob(config)

val runLoop: Runnable = RunLoopFactory.createRunLoop(
taskInstances,
consumerMultiplexer,
taskThreadPool,
maxThrottlingDelayMs,
samzaContainerMetrics,
taskConfig,
clock,
jobConfig.getElasticityFactor,
appConfig.getRunId,
isHighLevelApiJob)
config)

val systemStatisticsMonitor : SystemStatisticsMonitor = new StatisticsMonitorImpl()
systemStatisticsMonitor.registerListener(
Expand Down
Loading

0 comments on commit aeeaf0b

Please sign in to comment.