From c231bcd4508fe5c6f8bfbf62afc5a0e25db3670a Mon Sep 17 00:00:00 2001 From: Pierangelo Di Pilato Date: Thu, 12 Dec 2024 16:06:39 +0100 Subject: [PATCH] Complete Consumer verticle stop promise only after closing dependencies Completing the stop promise before the dependencies (dispatcher, client, etc) before will cause the WebClient to be closed. Signed-off-by: Pierangelo Di Pilato --- .../dispatcher/impl/consumer/ConsumerVerticle.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/ConsumerVerticle.java b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/ConsumerVerticle.java index ca36dc3005..8ba588419a 100644 --- a/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/ConsumerVerticle.java +++ b/data-plane/dispatcher/src/main/java/dev/knative/eventing/kafka/broker/dispatcher/impl/consumer/ConsumerVerticle.java @@ -65,14 +65,14 @@ public void start(Promise startPromise) { @Override public void stop(Promise stopPromise) { - logger.info("Stopping consumer {}", consumerVerticleContext.getLoggingKeyValue()); + logger.info("Stopping consumer verticle {}", consumerVerticleContext.getLoggingKeyValue()); AsyncCloseable.compose(this.recordDispatcher, this.closeable, this.consumer::close) .close() - .onComplete( - r -> logger.info("Consumer verticle closed {}", consumerVerticleContext.getLoggingKeyValue())); - - stopPromise.tryComplete(); + .onComplete(r -> { + stopPromise.tryComplete(); + logger.info("Consumer verticle closed {}", consumerVerticleContext.getLoggingKeyValue()); + }); } public void setConsumer(ReactiveKafkaConsumer consumer) {