Skip to content

Commit

Permalink
Introduce a ENABLE_VIRTUAL_THREADS env variable to enable virtual t…
Browse files Browse the repository at this point in the history
…hreads, default to real threads for now (knative-extensions#3882) (#1082)

* Fix profiler job



* Use async logger



* Use real threads for profiling job



* Info level logging for profiler job



* Format java



* Artifact log per event



* Enable virtual threads flag



* Use and handle consumer wakeup



* Allow closing consumer only once



---------

Signed-off-by: Pierangelo Di Pilato <[email protected]>
  • Loading branch information
pierDipi authored May 8, 2024
1 parent eadf637 commit f343021
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,19 @@ public class LoomKafkaConsumer<K, V> implements ReactiveKafkaConsumer<K, V> {
private final BlockingQueue<Runnable> taskQueue;
private final AtomicBoolean isClosed;
private final Thread taskRunnerThread;
private final Promise<Void> closePromise = Promise.promise();

public LoomKafkaConsumer(Vertx vertx, Consumer<K, V> consumer) {
this.consumer = consumer;
this.taskQueue = new LinkedBlockingQueue<>();
this.isClosed = new AtomicBoolean(false);

this.taskRunnerThread = Thread.ofVirtual().start(this::processTaskQueue);
if (Boolean.parseBoolean(System.getenv("ENABLE_VIRTUAL_THREADS"))) {
this.taskRunnerThread = Thread.ofVirtual().start(this::processTaskQueue);
} else {
this.taskRunnerThread = new Thread(this::processTaskQueue);
this.taskRunnerThread.start();
}
}

private void addTask(Runnable task, Promise<?> promise) {
Expand Down Expand Up @@ -92,19 +98,21 @@ public Future<Map<TopicPartition, OffsetAndMetadata>> commit(Map<TopicPartition,

@Override
public Future<Void> close() {
if (!this.isClosed.compareAndSet(false, true)) {
return closePromise.future();
}

final Promise<Void> promise = Promise.promise();
taskQueue.add(() -> {
try {
logger.debug("Closing underlying Kafka consumer client");
consumer.wakeup();
consumer.close();
} catch (Exception e) {
promise.tryFail(e);
closePromise.tryFail(e);
}
});

logger.debug("Closing consumer {}", keyValue("size", taskQueue.size()));
isClosed.set(true);

Thread.ofVirtual().start(() -> {
try {
Expand All @@ -116,7 +124,7 @@ public Future<Void> close() {

taskRunnerThread.interrupt();
taskRunnerThread.join();
promise.tryComplete();
closePromise.tryComplete();

logger.debug("Background thread completed");

Expand All @@ -126,11 +134,11 @@ public Future<Void> close() {
"Interrupted while waiting for taskRunnerThread to finish {}",
keyValue("taskQueueSize", size),
e);
promise.tryFail(new InterruptedException("taskQueue.size = " + size + ". " + e.getMessage()));
closePromise.tryFail(new InterruptedException("taskQueue.size = " + size + ". " + e.getMessage()));
}
});

return promise.future();
return closePromise.future();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -143,7 +144,7 @@ private void poll() {
.poll(POLLING_TIMEOUT)
.onSuccess(records -> vertx.runOnContext(v -> this.recordsHandler(records)))
.onFailure(t -> {
if (this.closed.get()) {
if (this.closed.get() || t instanceof WakeupException) {
// The failure might have been caused by stopping the consumer, so we just ignore it
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -86,7 +87,7 @@ private synchronized void poll() {
return;
}
if (inFlightRecords.get() >= getConsumerVerticleContext().getMaxPollRecords()) {
logger.info(
logger.debug(
"In flight records exceeds " + ConsumerConfig.MAX_POLL_RECORDS_CONFIG
+ " waiting for response from subscriber before polling for new records {} {} {}",
keyValue(
Expand All @@ -101,6 +102,10 @@ private synchronized void poll() {
.poll(POLL_TIMEOUT)
.onSuccess(records -> vertx.runOnContext(v -> this.handleRecords(records)))
.onFailure(cause -> {
if (cause instanceof WakeupException) {
return; // Do nothing we're shutting down
}

isPollInFlight.set(false);
logger.error(
"Failed to poll messages {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@
public class Main {

static {
System.setProperty("logback.configurationFile", "/etc/logging/config.xml");
if (System.getProperty("logback.configurationFile") == null
|| System.getProperty("logback.configurationFile").isEmpty()) {
System.setProperty("logback.configurationFile", "/etc/logging/config.xml");
}
}

private static final Logger logger = LoggerFactory.getLogger(Main.class);
Expand Down
7 changes: 6 additions & 1 deletion data-plane/profiler/resources/config-logging.xml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@
<appender name="jsonConsoleAppender" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="net.logstash.logback.encoder.LogstashEncoder"/>
</appender>
<appender name="async" class="ch.qos.logback.classic.AsyncAppender">
<appender-ref ref="jsonConsoleAppender" />
<neverBlock>true</neverBlock>
<maxFlushTime>1000</maxFlushTime>
</appender>
<root level="INFO">
<appender-ref ref="jsonConsoleAppender"/>
<appender-ref ref="async"/>
</root>
</configuration>
6 changes: 6 additions & 0 deletions data-plane/profiler/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ export METRICS_PUBLISH_QUANTILES="false"
export EGRESSES_INITIAL_CAPACITY="1"
export HTTP2_DISABLE="true"
export WAIT_STARTUP_SECONDS="8"
export CONFIG_FEATURES_PATH=""
export ENABLE_VIRTUAL_THREADS="true"

# Define receiver specific env variables.
export SERVICE_NAME="kafka-broker-receiver"
Expand All @@ -109,6 +111,8 @@ export INSTANCE_ID="receiver"
java \
-XX:+UnlockDiagnosticVMOptions \
-XX:+DebugNonSafepoints \
-XX:+EnableDynamicAgentLoading \
-Djdk.tracePinnedThreads=full \
-Dlogback.configurationFile="${RESOURCES_DIR}"/config-logging.xml \
-jar "${PROJECT_ROOT_DIR}"/receiver-vertx/target/receiver-vertx-1.0-SNAPSHOT.jar >"${LOG_DIR}/receiver.log" &
receiver_pid=$!
Expand All @@ -125,6 +129,8 @@ export INSTANCE_ID="dispatcher"
java \
-XX:+UnlockDiagnosticVMOptions \
-XX:+DebugNonSafepoints \
-XX:+EnableDynamicAgentLoading \
-Djdk.tracePinnedThreads=full \
-Dlogback.configurationFile="${RESOURCES_DIR}"/config-logging.xml \
-jar "${PROJECT_ROOT_DIR}"/dispatcher-vertx/target/dispatcher-vertx-1.0-SNAPSHOT.jar >"${LOG_DIR}/dispatcher.log" &
dispatcher_pid=$!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,12 @@ public LoomKafkaProducer(Vertx v, Producer<K, V> producer) {
this.tracer = null;
}

sendFromQueueThread = Thread.ofVirtual().start(this::sendFromQueue);
if (Boolean.parseBoolean(System.getenv("ENABLE_VIRTUAL_THREADS"))) {
this.sendFromQueueThread = Thread.ofVirtual().start(this::sendFromQueue);
} else {
this.sendFromQueueThread = new Thread(this::sendFromQueue);
this.sendFromQueueThread.start();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@
public class Main {

static {
System.setProperty("logback.configurationFile", "/etc/logging/config.xml");
if (System.getProperty("logback.configurationFile") == null
|| System.getProperty("logback.configurationFile").isEmpty()) {
System.setProperty("logback.configurationFile", "/etc/logging/config.xml");
}
}

private static final Logger logger = LoggerFactory.getLogger(Main.class);
Expand Down Expand Up @@ -109,7 +112,7 @@ public static void start(final String[] args, final ReactiveProducerFactory kafk
.toCompletionStage()
.toCompletableFuture()
.get();
} catch (ExecutionException ex) {
} catch (Exception ex) {
if (featuresConfig.isAuthenticationOIDC()) {
logger.error("Could not load OIDC config while OIDC authentication feature is enabled.");
throw ex;
Expand Down

0 comments on commit f343021

Please sign in to comment.