diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java index e3c2cf40cf318..bec02e94c79ab 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherMultipleConsumers.java @@ -68,6 +68,10 @@ public SubType getType() { public abstract boolean isConsumerAvailable(Consumer consumer); + /** + * Cancel a possible pending read that is a Managed Cursor waiting to be notified for more entries. + * This won't cancel any other pending reads that are currently in progress. + */ protected void cancelPendingRead() {} /** diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java index b1cd186c31784..fa03a260e131e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java @@ -687,9 +687,8 @@ public synchronized CompletableFuture disconnectAllConsumers( @Override protected void cancelPendingRead() { - if ((havePendingRead || havePendingReplayRead) && cursor.cancelPendingReadRequest()) { + if (havePendingRead && cursor.cancelPendingReadRequest()) { havePendingRead = false; - havePendingReplayRead = false; } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java index 6ab7acfa56da8..910491e60b2cf 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumersClassic.java @@ -600,9 +600,8 @@ public synchronized CompletableFuture disconnectAllConsumers( @Override protected void cancelPendingRead() { - if ((havePendingRead || havePendingReplayRead) && cursor.cancelPendingReadRequest()) { + if (havePendingRead && cursor.cancelPendingReadRequest()) { havePendingRead = false; - havePendingReplayRead = false; } }