diff --git a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java index b45cf88988..f55a4a28a6 100644 --- a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java +++ b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java @@ -1,5 +1,6 @@ package org.zalando.nakadi.webservice.hila; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; @@ -37,6 +38,7 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import static com.jayway.restassured.RestAssured.given; @@ -56,6 +58,7 @@ import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.createEventType; import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.createSubscription; import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.getNumberOfAssignedStreams; +import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.publishBusinessEventWithUserDefinedPartition; import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.publishEvent; import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.publishEvents; import static org.zalando.nakadi.webservice.utils.TestStreamingClient.SESSION_ID_UNKNOWN; @@ -100,7 +103,7 @@ public void whenStreamTimeoutReachedPossibleToCommit() throws Exception { @Test(timeout = 30000) public void whenEventTypeRepartitionedTheNewSubscriptionShouldHaveUpdatedPartition() throws Exception { final EventType eventType = NakadiTestUtils.createBusinessEventTypeWithPartitions(1); - NakadiTestUtils.publishBusinessEventWithUserDefinedPartition( + publishBusinessEventWithUserDefinedPartition( eventType.getName(), 1, x -> "{\"foo\":\"bar\"}", p -> "0"); NakadiTestUtils.repartitionEventType(eventType, 2); final Subscription subscription = createSubscription( @@ -111,7 +114,7 @@ public void whenEventTypeRepartitionedTheNewSubscriptionShouldHaveUpdatedPartiti final TestStreamingClient clientAfterRepartitioning = TestStreamingClient .create(URL, subscription.getId(), "") .start(); - NakadiTestUtils.publishBusinessEventWithUserDefinedPartition( + publishBusinessEventWithUserDefinedPartition( eventType.getName(), 1, x -> "{\"foo\":\"bar" + x + "\"}", p -> "1"); waitFor(() -> assertThat(clientAfterRepartitioning.getJsonBatches(), Matchers.hasSize(2))); Assert.assertTrue(clientAfterRepartitioning.getJsonBatches().stream() @@ -600,7 +603,7 @@ public void whenPatchThenCursorsAreInitializedAndPatched() throws Exception { @Test(timeout = 15000) public void whenCommitFailsThreeTimesAndSingleBatchEventFailsThreeTimesThenEventSkipped() throws IOException { - final Subscription subscription = createAutoDLQSubscription(); + final Subscription subscription = createAutoDLQSubscription(eventType); final TestStreamingClient client = TestStreamingClient .create(URL, subscription.getId(), "batch_limit=3&commit_timeout=1") .start(); @@ -641,7 +644,7 @@ public void whenCommitFailsThreeTimesAndSingleBatchEventFailsThreeTimesThenEvent @Test(timeout = 15000) public void whenIsLookingForDeadLetterAndCommitComesThenContinueLooking() throws IOException { - final Subscription subscription = createAutoDLQSubscription(); + final Subscription subscription = createAutoDLQSubscription(eventType); final TestStreamingClient client = TestStreamingClient .create(URL, subscription.getId(), "batch_limit=10&commit_timeout=1") .start(); @@ -707,7 +710,7 @@ public void whenIsLookingForDeadLetterAndCommitComesThenContinueLooking() throws @Test(timeout = 20_000) public void whenIsLookingForDeadLetterAndSendAllEventsOneByOneThenBackToNormalBatchSize() throws InterruptedException, IOException { - final Subscription subscription = createAutoDLQSubscription(); + final Subscription subscription = createAutoDLQSubscription(eventType); final TestStreamingClient client = TestStreamingClient .create(URL, subscription.getId(), "batch_limit=10&commit_timeout=1&stream_limit=20") .start(); @@ -746,13 +749,54 @@ public void whenIsLookingForDeadLetterAndSendAllEventsOneByOneThenBackToNormalBa waitFor(() -> Assert.assertFalse(client.isRunning()), 15_000); } + @Test(timeout = 20_000) + public void shouldSkipDeadLetterdAndConsumptionToBeContinued() throws IOException { + final EventType eventType = NakadiTestUtils.createBusinessEventTypeWithPartitions(4); + + publishBusinessEventWithUserDefinedPartition(eventType.getName(), + 50, i -> String.format("{\"foo\":\"bar%d\"}", i), i -> String.valueOf(i % 4)); + + final Subscription subscription = createAutoDLQSubscription(eventType); + + final AtomicReference cursorWithPoisonPill = new AtomicReference<>(); + while (true) { + final TestStreamingClient client = TestStreamingClient.create( + URL, subscription.getId(), "batch_limit=3&commit_timeout=1&stream_timeout=2"); + client.start(streamBatch -> { + if (streamBatch.getEvents().stream() + .anyMatch(event -> event.get("foo").equals("{\"foo\":\"bar10\"}"))) { + // skipp commit to introduce poison pill + cursorWithPoisonPill.set(streamBatch.getCursor()); + throw new RuntimeException(); + } else { + try { + NakadiTestUtils.commitCursors( + subscription.getId(), ImmutableList.of(streamBatch.getCursor()), client.getSessionId()); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + } + }); + + waitFor(() -> Assert.assertFalse(client.isRunning())); + + if (client.getJsonBatches().stream() + .filter(streamBatch -> streamBatch.getCursor().getPartition() + .equals(cursorWithPoisonPill.get().getPartition())) + .anyMatch(streamBatch -> streamBatch.getCursor().getOffset() + .compareTo(cursorWithPoisonPill.get().getOffset()) > 0)) { + return; + } + } + } + private static boolean isCommitTimeoutReached(final TestStreamingClient client) { return client.getJsonBatches().stream() .filter(batch -> batch.getMetadata() != null) .anyMatch(batch -> batch.getMetadata().getDebug().equals("Commit timeout reached")); } - private Subscription createAutoDLQSubscription() throws IOException { + private Subscription createAutoDLQSubscription(final EventType eventType) throws IOException { final SubscriptionBase subscription = RandomSubscriptionBuilder.builder() .withEventType(eventType.getName()) .withStartFrom(BEGIN) diff --git a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/utils/TestStreamingClient.java b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/utils/TestStreamingClient.java index 629655817d..640b9a1d4e 100644 --- a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/utils/TestStreamingClient.java +++ b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/utils/TestStreamingClient.java @@ -106,7 +106,15 @@ private TestStreamingClient startInternal(final boolean wait, public TestStreamingClient start() { try { - return startInternal(false, new JsonConsumer()); + return startInternal(false, new JsonConsumer((ignore) -> {})); + } catch (final InterruptedException ignore) { + throw new RuntimeException(ignore); + } + } + + public TestStreamingClient start(final Consumer onBatch) { + try { + return startInternal(false, new JsonConsumer(onBatch)); } catch (final InterruptedException ignore) { throw new RuntimeException(ignore); } @@ -123,7 +131,7 @@ public TestStreamingClient startBinary() { public TestStreamingClient startWithAutocommit(final Consumer> batchesListener) throws InterruptedException { this.batchesListener = batchesListener; - final TestStreamingClient client = startInternal(true, new JsonConsumer()); + final TestStreamingClient client = startInternal(true, new JsonConsumer((ignore)->{})); final Thread autocommitThread = new Thread(() -> { int oldIdx = 0; while (client.isRunning()) { @@ -252,6 +260,13 @@ public void run() { private class JsonConsumer extends ConsumerThread { + + private final Consumer onBatch; + + JsonConsumer(final Consumer onBatch) { + this.onBatch = onBatch; + } + @Override void addHeaders() { } @@ -270,6 +285,7 @@ void readBatches(final InputStream inputStream) throws IOException { synchronized (jsonBatches) { jsonBatches.add(streamBatch); } + onBatch.accept(streamBatch); } catch (final SocketTimeoutException ste) { LOG.info("No data in 10 ms, retrying read data"); } diff --git a/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/state/ClosingState.java b/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/state/ClosingState.java index 61d84d8249..e6777b9cb4 100644 --- a/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/state/ClosingState.java +++ b/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/state/ClosingState.java @@ -40,6 +40,12 @@ class ClosingState extends State { @Override public void onExit() { + try { + updateFailedCommitsCount(); + } catch (final RuntimeException re) { + LOG.error("Failed to update failed commits count", re); + } + try { getAutocommit().autocommit(); freePartitions(new HashSet<>(listeners.keySet())); @@ -57,6 +63,17 @@ public void onExit() { } } + private void updateFailedCommitsCount() { + if (getContext().getMaxEventSendCount() == null) { + return; + } + + getZk().updateTopology(topology -> Arrays.stream(topology.getPartitions()) + .filter(p -> uncommittedOffsets.containsKey(new EventTypePartition(p.getEventType(), p.getPartition()))) + .map(Partition::toIncFailedCommits) + .toArray(Partition[]::new)); + } + @Override public void onEnter() { final long timeToWaitMillis = getParameters().commitTimeoutMillis - @@ -64,8 +81,6 @@ public void onEnter() { uncommittedOffsets = uncommittedOffsetsSupplier.get(); if (!uncommittedOffsets.isEmpty() && timeToWaitMillis > 0) { scheduleTask(() -> { - // commit timeout will be reached for the partitions, lets update topology with number of failed commits - updateFailedCommitsCount(); switchState(new CleanupState()); }, timeToWaitMillis, TimeUnit.MILLISECONDS); @@ -76,27 +91,11 @@ public void onEnter() { return; } reactOnTopologyChange(); - } else if (!uncommittedOffsets.isEmpty()) { - // commit timeout reached for these partitions, lets update topology with number of failed commits - updateFailedCommitsCount(); - switchState(new CleanupState()); } else { switchState(new CleanupState()); } } - private void updateFailedCommitsCount() { - if (getContext().getMaxEventSendCount() == null) { - return; - } - - getZk().updateTopology(topology -> Arrays.stream(topology.getPartitions()) - .filter(p -> uncommittedOffsets.containsKey(new EventTypePartition(p.getEventType(), p.getPartition()))) - .map(Partition::toIncFailedCommits) - .toArray(Partition[]::new) - ); - } - private void onTopologyChanged() { if (topologyListener == null) { throw new IllegalStateException( diff --git a/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java b/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java index 0c223b307c..8465e403f3 100644 --- a/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java +++ b/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java @@ -514,7 +514,7 @@ void reactOnTopologyChange() { trackIdleness(topology); if (getContext().getMaxEventSendCount() != null) { - failedCommitPartitions = Arrays.stream(topology.getPartitions()) + failedCommitPartitions = Arrays.stream(assignedPartitions) .filter(p -> p.getFailedCommitsCount() > 0 || p.isLookingForDeadLetter()) .collect(Collectors.toMap( p -> new EventTypePartition(p.getEventType(), p.getPartition()),