From c01f39eca267675a37a21407879769521227f031 Mon Sep 17 00:00:00 2001 From: Lianet Magrans Date: Fri, 22 Nov 2024 11:36:09 -0500 Subject: [PATCH 1/2] Avoid refresh event if no pattern --- .../consumer/internals/AsyncKafkaConsumer.java | 14 ++++++++------ .../events/ApplicationEventProcessor.java | 7 ++++--- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java index ba7eed19f11f..097c9a2cc6c5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java @@ -1765,12 +1765,14 @@ private void sendPrefetches(Timer timer) { @Override public boolean updateAssignmentMetadataIfNeeded(Timer timer) { offsetCommitCallbackInvoker.executeCallbacks(); - try { - applicationEventHandler.addAndGet(new UpdatePatternSubscriptionEvent(calculateDeadlineMs(timer))); - } catch (TimeoutException e) { - return false; - } finally { - timer.update(); + if (subscriptions.hasPatternSubscription()) { + try { + applicationEventHandler.addAndGet(new UpdatePatternSubscriptionEvent(calculateDeadlineMs(timer))); + } catch (TimeoutException e) { + return false; + } finally { + timer.update(); + } } processBackgroundEvents(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java index 7bd2d1f28b7e..505f2cf887e5 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java @@ -320,11 +320,12 @@ private void process(final TopicPatternSubscriptionChangeEvent event) { * This will make the consumer send the updated subscription on the next poll. */ private void process(final UpdatePatternSubscriptionEvent event) { + if (!subscriptions.hasPatternSubscription()) { + return; + } if (this.metadataVersionSnapshot < metadata.updateVersion()) { this.metadataVersionSnapshot = metadata.updateVersion(); - if (subscriptions.hasPatternSubscription()) { - updatePatternSubscription(metadata.fetch()); - } + updatePatternSubscription(metadata.fetch()); } event.future().complete(null); } From 5f220b1871ab121e4d8d0f503e80f9a6fc8a8379 Mon Sep 17 00:00:00 2001 From: Lianet Magrans Date: Fri, 22 Nov 2024 11:36:15 -0500 Subject: [PATCH 2/2] tests --- .../internals/AsyncKafkaConsumerTest.java | 23 +++++++++++++++++++ .../events/ApplicationEventProcessorTest.java | 4 +--- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java index 8eb8ec4c85bd..0cdf3170214a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java @@ -53,6 +53,7 @@ import org.apache.kafka.clients.consumer.internals.events.TopicPatternSubscriptionChangeEvent; import org.apache.kafka.clients.consumer.internals.events.TopicSubscriptionChangeEvent; import org.apache.kafka.clients.consumer.internals.events.UnsubscribeEvent; +import org.apache.kafka.clients.consumer.internals.events.UpdatePatternSubscriptionEvent; import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; import org.apache.kafka.common.TopicPartition; @@ -1829,6 +1830,27 @@ public void testSeekToEnd() { assertEquals(OffsetResetStrategy.LATEST, resetOffsetEvent.offsetResetStrategy()); } + @Test + public void testUpdatePatternSubscriptionEventGeneratedOnlyIfPatternUsed() { + consumer = newConsumer(); + doReturn(Fetch.empty()).when(fetchCollector).collectFetch(any(FetchBuffer.class)); + when(applicationEventHandler.addAndGet(any(CheckAndUpdatePositionsEvent.class))).thenReturn(true); + doReturn(LeaderAndEpoch.noLeaderOrEpoch()).when(metadata).currentLeader(any()); + completeAssignmentChangeEventSuccessfully(); + completeTopicPatternSubscriptionChangeEventSuccessfully(); + completeUnsubscribeApplicationEventSuccessfully(); + + consumer.assign(singleton(new TopicPartition("topic1", 0))); + consumer.poll(Duration.ZERO); + verify(applicationEventHandler, never()).addAndGet(any(UpdatePatternSubscriptionEvent.class)); + + consumer.unsubscribe(); + + consumer.subscribe(Pattern.compile("t*")); + consumer.poll(Duration.ZERO); + verify(applicationEventHandler).addAndGet(any(UpdatePatternSubscriptionEvent.class)); + } + private Map mockTopicPartitionOffset() { final TopicPartition t0 = new TopicPartition("t0", 2); final TopicPartition t1 = new TopicPartition("t0", 3); @@ -1913,6 +1935,7 @@ private void completeFetchedCommittedOffsetApplicationEventExceptionally(Excepti private void completeUnsubscribeApplicationEventSuccessfully() { doAnswer(invocation -> { UnsubscribeEvent event = invocation.getArgument(0); + consumer.subscriptions().unsubscribe(); event.future().complete(null); return null; }).when(applicationEventHandler).add(ArgumentMatchers.isA(UnsubscribeEvent.class)); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java index a16f9612c741..ea09fc2ae8be 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessorTest.java @@ -355,11 +355,10 @@ public void testUpdatePatternSubscriptionEventOnlyTakesEffectWhenMetadataHasNewV UpdatePatternSubscriptionEvent event1 = new UpdatePatternSubscriptionEvent(12345); setupProcessor(true); - + when(subscriptionState.hasPatternSubscription()).thenReturn(true); when(metadata.updateVersion()).thenReturn(0); processor.process(event1); - verify(subscriptionState, never()).hasPatternSubscription(); assertDoesNotThrow(() -> event1.future().get()); Cluster cluster = mock(Cluster.class); @@ -377,7 +376,6 @@ public void testUpdatePatternSubscriptionEventOnlyTakesEffectWhenMetadataHasNewV UpdatePatternSubscriptionEvent event2 = new UpdatePatternSubscriptionEvent(12345); processor.process(event2); verify(metadata).requestUpdateForNewTopics(); - verify(subscriptionState).hasPatternSubscription(); verify(subscriptionState).subscribeFromPattern(topics); assertEquals(1, processor.metadataVersionSnapshot()); verify(membershipManager).onSubscriptionUpdated();