Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Acking after previously unacked message can be skipped #8434

Open
danielkec opened this issue Feb 28, 2024 · 0 comments
Open

Acking after previously unacked message can be skipped #8434

danielkec opened this issue Feb 28, 2024 · 0 comments
Assignees
Labels
4.x Version 4.x bug Something isn't working messaging Reactive Messaging P2

Comments

@danielkec
Copy link
Contributor

danielkec commented Feb 28, 2024

Environment Details

  • Helidon Version: 2,3,4

Problem Description

As reported by our user @Ayush-Kukreti, offset commit of Kafka messages acked after previously unacked ones in Kafka connector
can be skipped or delayed.

Steps to reproduce

diff --git a/tests/integration/kafka/src/test/java/io/helidon/messaging/connectors/kafka/KafkaSeTest.java b/tests/integration/kafka/src/test/java/io/helidon/messaging/connectors/kafka/KafkaSeTest.java
--- a/tests/integration/kafka/src/test/java/io/helidon/messaging/connectors/kafka/KafkaSeTest.java	(revision 500f77702de7ffa536ba0b6be9cc9e21245f508f)
+++ b/tests/integration/kafka/src/test/java/io/helidon/messaging/connectors/kafka/KafkaSeTest.java	(date 1709141955514)
@@ -29,7 +29,9 @@
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.function.Function;
 import java.util.logging.Handler;
 import java.util.logging.Level;
@@ -54,6 +56,7 @@
 import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
+import org.checkerframework.checker.units.qual.C;
 import org.eclipse.microprofile.reactive.messaging.Message;
 import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
 import org.junit.jupiter.api.AfterAll;
@@ -74,6 +77,7 @@
 
     private static final Duration TIMEOUT = Duration.of(45, ChronoUnit.SECONDS);
     private static final String TEST_DQL_TOPIC = "test-dlq-topic";
+    private static final String TEST_SE_TOPIC_01 = "special-se-topic-01";
     private static final String TEST_SE_TOPIC_1 = "special-se-topic-1";
     private static final String TEST_SE_TOPIC_2 = "special-se-topic-2";
     private static final String TEST_SE_TOPIC_3 = "special-se-topic-3";
@@ -114,6 +118,7 @@
         kafkaResource.startKafka();
 
         nackHandlerLogLogger.addHandler(testHandler);
+        kafkaResource.getKafkaTestUtils().createTopic(TEST_SE_TOPIC_01, 2, (short) 1);
         kafkaResource.getKafkaTestUtils().createTopic(TEST_SE_TOPIC_1, 4, (short) 1);
         kafkaResource.getKafkaTestUtils().createTopic(TEST_SE_TOPIC_2, 4, (short) 1);
         kafkaResource.getKafkaTestUtils().createTopic(TEST_SE_TOPIC_3, 4, (short) 1);
@@ -132,6 +137,54 @@
         kafkaResource.stopKafka();
     }
 
+    @Test
+    void offsetJump() throws InterruptedException, ExecutionException, TimeoutException {
+        Channel<String> fromKafka = Channel.<String>builder()
+                .name("from-kafka")
+                .publisherConfig(KafkaConnector.configBuilder()
+                                         .bootstrapServers(kafkaResource.getKafkaConnectString())
+                                         .groupId("test-group")
+                                         .topic(TEST_SE_TOPIC_01)
+                                         .autoOffsetReset(KafkaConfigBuilder.AutoOffsetReset.EARLIEST)
+                                         .enableAutoCommit(false)
+                                         .keyDeserializer(StringDeserializer.class)
+                                         .valueDeserializer(StringDeserializer.class)
+                                         .build()
+                )
+                .build();
+
+        KafkaConnector kafkaConnector = KafkaConnector.create();
+
+        CompletableFuture<KafkaMessage<String, String>> latch = new CompletableFuture<>();
+        Messaging messaging = Messaging.builder()
+                .connector(kafkaConnector)
+                .subscriber(fromKafka, ReactiveStreams.<KafkaMessage<String, String>>builder()
+                        .forEach(message -> {
+                            System.out.println("consuming: " + message.getPayload());
+                            if (message.getPayload().contains("accept")) {
+                                message.ack()
+                                        .thenAcceptAsync(x -> {
+                                            System.out.println("committed offset: " + message.getOffset().orElse(null));
+                                            latch.complete(message);
+                                        });
+                            }
+                        }))
+                .build();
+
+        try {
+            kafkaResource.produce("POCEvent1", "Sample kafka message:", TEST_SE_TOPIC_01);
+            kafkaResource.produce("POCEvent2", "Sample kafka message:", TEST_SE_TOPIC_01);
+            kafkaResource.produce("POCEvent3", "Sample kafka message:", TEST_SE_TOPIC_01);
+            kafkaResource.produce("POCEvent4", "Sample kafka message:", TEST_SE_TOPIC_01);
+            kafkaResource.produce("POCEvent5", "Sample kafka message: accept", TEST_SE_TOPIC_01);
+            messaging.start();
+            latch.get(15, TimeUnit.SECONDS);
+        } finally {
+            messaging.stop();
+        }
+
+    }
+
     @Test
     void sendToKafka() throws InterruptedException {
 
@danielkec danielkec added the messaging Reactive Messaging label Feb 28, 2024
@m0mus m0mus added 4.x Version 4.x bug Something isn't working P2 labels Feb 29, 2024
@m0mus m0mus added this to Backlog Aug 12, 2024
@m0mus m0mus moved this to High priority in Backlog Aug 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
4.x Version 4.x bug Something isn't working messaging Reactive Messaging P2
Projects
Status: High priority
Development

No branches or pull requests

2 participants