diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index 033b16ba7b6a..646087ee58d9 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -1766,12 +1766,14 @@ private void sendPrefetches(Timer timer) { @Override public boolean updateAssignmentMetadataIfNeeded(Timer timer) { offsetCommitCallbackInvoker.executeCallbacks(); - try { - applicationEventHandler.addAndGet(new UpdatePatternSubscriptionEvent(calculateDeadlineMs(timer))); - } catch (TimeoutException e) { - return false; - } finally { - timer.update(); + if (subscriptions.hasPatternSubscription()) { + try { + applicationEventHandler.addAndGet(new UpdatePatternSubscriptionEvent(calculateDeadlineMs(timer))); + } catch (TimeoutException e) { + return false; + } finally { + timer.update(); + } } processBackgroundEvents(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index d2e45370c66d..c9735617abbb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -320,11 +320,12 @@ private void process(final TopicPatternSubscriptionChangeEvent event) { * This will make the consumer send the updated subscription on the next poll. */ private void process(final UpdatePatternSubscriptionEvent event) { + if (!subscriptions.hasPatternSubscription()) { + return; + } if (this.metadataVersionSnapshot < metadata.updateVersion()) { this.metadataVersionSnapshot = metadata.updateVersion(); - if (subscriptions.hasPatternSubscription()) { - updatePatternSubscription(metadata.fetch()); - } + updatePatternSubscription(metadata.fetch()); } event.future().complete(null); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 91e4cae0f98a..940d9cbaef3f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -54,6 +54,7 @@ import org.apache.kafka.clients.consumer.internals.events.TopicPatternSubscriptionChangeEvent; import org.apache.kafka.clients.consumer.internals.events.TopicSubscriptionChangeEvent; import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent; +import org.apache.kafka.clients.consumer.internals.events.UpdatePatternSubscriptionEvent; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; import org.apache.kafka.common.TopicPartition; @@ -1830,6 +1831,27 @@ public void testSeekToEnd() { assertEquals(OffsetResetStrategy.LATEST, resetOffsetEvent.offsetResetStrategy()); } + @Test + public void testUpdatePatternSubscriptionEventGeneratedOnlyIfPatternUsed() { + consumer = newConsumer(); + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); + completeAssignmentChangeEventSuccessfully(); + completeTopicPatternSubscriptionChangeEventSuccessfully(); + completeUnsubscribeApplicationEventSuccessfully(); + + consumer.assign(singleton(new TopicPartition("topic1", 0))); + consumer.poll(Duration.ZERO); + verify(applicationEventHandler, never()).addAndGet(any(UpdatePatternSubscriptionEvent.class)); + + consumer.unsubscribe(); + + consumer.subscribe(Pattern.compile("t*")); + consumer.poll(Duration.ZERO); + verify(applicationEventHandler).addAndGet(any(UpdatePatternSubscriptionEvent.class)); + } + @Test public void testSubscribeToRe2JPatternValidation() { consumer = newConsumer(); @@ -1927,6 +1949,7 @@ private void completeFetchedCommittedOffsetApplicationEventExceptionally(Excepti private void completeUnsubscribeApplicationEventSuccessfully() { doAnswer(invocation -> { UnsubscribeEvent event = invocation.getArgument(0); + consumer.subscriptions().unsubscribe(); event.future().complete(null); return null; }).when(applicationEventHandler).add(ArgumentMatchers.isA(UnsubscribeEvent.class)); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java index a16f9612c741..ea09fc2ae8be 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java @@ -355,11 +355,10 @@ public void testUpdatePatternSubscriptionEventOnlyTakesEffectWhenMetadataHasNewV UpdatePatternSubscriptionEvent event1 = new UpdatePatternSubscriptionEvent(12345); setupProcessor(true); - + when(subscriptionState.hasPatternSubscription()).thenReturn(true); when(metadata.updateVersion()).thenReturn(0); processor.process(event1); - verify(subscriptionState, never()).hasPatternSubscription(); assertDoesNotThrow(() -> event1.future().get()); Cluster cluster = mock(Cluster.class); @@ -377,7 +376,6 @@ public void testUpdatePatternSubscriptionEventOnlyTakesEffectWhenMetadataHasNewV UpdatePatternSubscriptionEvent event2 = new UpdatePatternSubscriptionEvent(12345); processor.process(event2); verify(metadata).requestUpdateForNewTopics(); - verify(subscriptionState).hasPatternSubscription(); verify(subscriptionState).subscribeFromPattern(topics); assertEquals(1, processor.metadataVersionSnapshot()); verify(membershipManager).onSubscriptionUpdated();