diff --git a/data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java b/data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java index 18f34a7dca..33b904ad94 100644 --- a/data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java +++ b/data-plane/dispatcher-loom/src/main/java/dev/knative/eventing/kafka/broker/dispatcherloom/LoomKafkaConsumer.java @@ -45,13 +45,19 @@ public class LoomKafkaConsumer implements ReactiveKafkaConsumer { private final BlockingQueue taskQueue; private final AtomicBoolean isClosed; private final Thread taskRunnerThread; + private final Promise closePromise = Promise.promise(); public LoomKafkaConsumer(Vertx vertx, Consumer consumer) { this.consumer = consumer; this.taskQueue = new LinkedBlockingQueue<>(); this.isClosed = new AtomicBoolean(false); - this.taskRunnerThread = Thread.ofVirtual().start(this::processTaskQueue); + if (Boolean.parseBoolean(System.getenv("ENABLE_VIRTUAL_THREADS"))) { + this.taskRunnerThread = Thread.ofVirtual().start(this::processTaskQueue); + } else { + this.taskRunnerThread = new Thread(this::processTaskQueue); + this.taskRunnerThread.start(); + } } private void addTask(Runnable task, Promise promise) { @@ -92,19 +98,21 @@ public Future> commit(Map close() { + if (!this.isClosed.compareAndSet(false, true)) { + return closePromise.future(); + } - final Promise promise = Promise.promise(); taskQueue.add(() -> { try { logger.debug("Closing underlying Kafka consumer client"); + consumer.wakeup(); consumer.close(); } catch (Exception e) { - promise.tryFail(e); + closePromise.tryFail(e); } }); logger.debug("Closing consumer {}", keyValue("size", taskQueue.size())); - isClosed.set(true); Thread.ofVirtual().start(() -> { try { @@ -116,7 +124,7 @@ public Future close() { taskRunnerThread.interrupt(); taskRunnerThread.join(); - promise.tryComplete(); + closePromise.tryComplete(); logger.debug("Background thread completed"); @@ -126,11 +134,11 @@ public Future close() { "Interrupted while waiting for taskRunnerThread to finish {}", keyValue("taskQueueSize", size), e); - promise.tryFail(new InterruptedException("taskQueue.size = " + size + ". " + e.getMessage())); + closePromise.tryFail(new InterruptedException("taskQueue.size = " + size + ". " + e.getMessage())); } }); - return promise.future(); + return closePromise.future(); } @Override diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedConsumerVerticle.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedConsumerVerticle.java index edd11e3eff..fe813f3115 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedConsumerVerticle.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/OrderedConsumerVerticle.java @@ -38,6 +38,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.WakeupException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -143,7 +144,7 @@ private void poll() { .poll(POLLING_TIMEOUT) .onSuccess(records -> vertx.runOnContext(v -> this.recordsHandler(records))) .onFailure(t -> { - if (this.closed.get()) { + if (this.closed.get() || t instanceof WakeupException) { // The failure might have been caused by stopping the consumer, so we just ignore it return; } diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/UnorderedConsumerVerticle.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/UnorderedConsumerVerticle.java index 1b43eb79ce..937519468e 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/UnorderedConsumerVerticle.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/UnorderedConsumerVerticle.java @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.errors.WakeupException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,7 +87,7 @@ private synchronized void poll() { return; } if (inFlightRecords.get() >= getConsumerVerticleContext().getMaxPollRecords()) { - logger.info( + logger.debug( "In flight records exceeds " + ConsumerConfig.MAX_POLL_RECORDS_CONFIG + " waiting for response from subscriber before polling for new records {} {} {}", keyValue( @@ -101,6 +102,10 @@ private synchronized void poll() { .poll(POLL_TIMEOUT) .onSuccess(records -> vertx.runOnContext(v -> this.handleRecords(records))) .onFailure(cause -> { + if (cause instanceof WakeupException) { + return; // Do nothing we're shutting down + } + isPollInFlight.set(false); logger.error( "Failed to poll messages {}", diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/Main.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/Main.java index 3b4dbb50c4..98143b4932 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/Main.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/main/Main.java @@ -54,7 +54,10 @@ public class Main { static { - System.setProperty("logback.configurationFile", "/etc/logging/config.xml"); + if (System.getProperty("logback.configurationFile") == null + || System.getProperty("logback.configurationFile").isEmpty()) { + System.setProperty("logback.configurationFile", "/etc/logging/config.xml"); + } } private static final Logger logger = LoggerFactory.getLogger(Main.class); diff --git a/data-plane/profiler/resources/config-logging.xml b/data-plane/profiler/resources/config-logging.xml index 19828c5f4c..55b5ca1e8e 100644 --- a/data-plane/profiler/resources/config-logging.xml +++ b/data-plane/profiler/resources/config-logging.xml @@ -19,7 +19,12 @@ + + + true + 1000 + - + diff --git a/data-plane/profiler/run.sh b/data-plane/profiler/run.sh index 46a61b3607..45722f774b 100755 --- a/data-plane/profiler/run.sh +++ b/data-plane/profiler/run.sh @@ -96,6 +96,8 @@ export METRICS_PUBLISH_QUANTILES="false" export EGRESSES_INITIAL_CAPACITY="1" export HTTP2_DISABLE="true" export WAIT_STARTUP_SECONDS="8" +export CONFIG_FEATURES_PATH="" +export ENABLE_VIRTUAL_THREADS="true" # Define receiver specific env variables. export SERVICE_NAME="kafka-broker-receiver" @@ -109,6 +111,8 @@ export INSTANCE_ID="receiver" java \ -XX:+UnlockDiagnosticVMOptions \ -XX:+DebugNonSafepoints \ + -XX:+EnableDynamicAgentLoading \ + -Djdk.tracePinnedThreads=full \ -Dlogback.configurationFile="${RESOURCES_DIR}"/config-logging.xml \ -jar "${PROJECT_ROOT_DIR}"/receiver-vertx/target/receiver-vertx-1.0-SNAPSHOT.jar >"${LOG_DIR}/receiver.log" & receiver_pid=$! @@ -125,6 +129,8 @@ export INSTANCE_ID="dispatcher" java \ -XX:+UnlockDiagnosticVMOptions \ -XX:+DebugNonSafepoints \ + -XX:+EnableDynamicAgentLoading \ + -Djdk.tracePinnedThreads=full \ -Dlogback.configurationFile="${RESOURCES_DIR}"/config-logging.xml \ -jar "${PROJECT_ROOT_DIR}"/dispatcher-vertx/target/dispatcher-vertx-1.0-SNAPSHOT.jar >"${LOG_DIR}/dispatcher.log" & dispatcher_pid=$! diff --git a/data-plane/receiver-loom/src/main/java/dev/knative/eventing/kafka/broker/receiverloom/LoomKafkaProducer.java b/data-plane/receiver-loom/src/main/java/dev/knative/eventing/kafka/broker/receiverloom/LoomKafkaProducer.java index 9d1ee7b2c6..402760854e 100644 --- a/data-plane/receiver-loom/src/main/java/dev/knative/eventing/kafka/broker/receiverloom/LoomKafkaProducer.java +++ b/data-plane/receiver-loom/src/main/java/dev/knative/eventing/kafka/broker/receiverloom/LoomKafkaProducer.java @@ -61,7 +61,12 @@ public LoomKafkaProducer(Vertx v, Producer producer) { this.tracer = null; } - sendFromQueueThread = Thread.ofVirtual().start(this::sendFromQueue); + if (Boolean.parseBoolean(System.getenv("ENABLE_VIRTUAL_THREADS"))) { + this.sendFromQueueThread = Thread.ofVirtual().start(this::sendFromQueue); + } else { + this.sendFromQueueThread = new Thread(this::sendFromQueue); + this.sendFromQueueThread.start(); + } } @Override diff --git a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java index 4d0f4cd902..932f71afb0 100644 --- a/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java +++ b/data-plane/receiver/src/main/java/dev/knative/eventing/kafka/broker/receiver/main/Main.java @@ -52,7 +52,10 @@ public class Main { static { - System.setProperty("logback.configurationFile", "/etc/logging/config.xml"); + if (System.getProperty("logback.configurationFile") == null + || System.getProperty("logback.configurationFile").isEmpty()) { + System.setProperty("logback.configurationFile", "/etc/logging/config.xml"); + } } private static final Logger logger = LoggerFactory.getLogger(Main.class); @@ -109,7 +112,7 @@ public static void start(final String[] args, final ReactiveProducerFactory kafk .toCompletionStage() .toCompletableFuture() .get(); - } catch (ExecutionException ex) { + } catch (Exception ex) { if (featuresConfig.isAuthenticationOIDC()) { logger.error("Could not load OIDC config while OIDC authentication feature is enabled."); throw ex;