diff --git a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java index f55a4a28a..f6a63d277 100644 --- a/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java +++ b/acceptance-test/src/acceptance-test/java/org/zalando/nakadi/webservice/hila/HilaAT.java @@ -735,6 +735,7 @@ public void whenIsLookingForDeadLetterAndSendAllEventsOneByOneThenBackToNormalBa Assert.assertEquals(10, client.getJsonBatches().get(0).getEvents().size()); Assert.assertEquals("001-0001-000000000000000009", client.getJsonBatches().get(0).getCursor().getOffset()); + // receive a single event in a batch and commit it so that Nakadi sends the next batch with a single event client.startWithAutocommit(batches -> { // 12 because the last one is stream limit reached debug info Assert.assertEquals(12, batches.size()); @@ -767,7 +768,7 @@ public void shouldSkipDeadLetterdAndConsumptionToBeContinued() throws IOExceptio .anyMatch(event -> event.get("foo").equals("{\"foo\":\"bar10\"}"))) { // skipp commit to introduce poison pill cursorWithPoisonPill.set(streamBatch.getCursor()); - throw new RuntimeException(); + throw new RuntimeException("poison pill found"); } else { try { NakadiTestUtils.commitCursors( diff --git a/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java b/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java index 8465e403f..fd998e03c 100644 --- a/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java +++ b/api-consumption/src/main/java/org/zalando/nakadi/service/subscription/state/StreamingState.java @@ -234,11 +234,6 @@ private void rememberEvent(final ConsumedEvent event) { } private long getMessagesAllowedToSend() { - if (failedCommitPartitions.values().stream() - .anyMatch(Partition::isLookingForDeadLetter)) { - return 1; - } - final long unconfirmed = offsets.values().stream().mapToLong(PartitionData::getUnconfirmed).sum(); final long limit = getParameters().maxUncommittedMessages - unconfirmed; return getParameters().getMessagesAllowedToSend(limit, this.sentEvents); @@ -263,8 +258,8 @@ private void streamToOutput() { private void streamToOutput(final boolean streamTimeoutReached) { final long currentTimeMillis = System.currentTimeMillis(); - int messagesAllowedToSend = (int) getMessagesAllowedToSend(); final boolean wasCommitted = isEverythingCommitted(); + int messagesAllowedToSend = (int) getMessagesAllowedToSend(); boolean sentSomething = false; for (final Map.Entry e : offsets.entrySet()) { @@ -272,6 +267,9 @@ private void streamToOutput(final boolean streamTimeoutReached) { final PartitionData partitionData = e.getValue(); Partition partition = failedCommitPartitions.get(etp); + int messagesAllowedForPartition = + (partition != null && partition.isLookingForDeadLetter()) ? 1 : messagesAllowedToSend; + // loop sends all the events from partition, until max uncommitted reached or no more events while (true) { if (partition != null && partition.isLookingForDeadLetter()) { final NakadiCursor lastDeadLetterCursor = getContext().getCursorConverter().convert( @@ -283,14 +281,14 @@ private void streamToOutput(final boolean streamTimeoutReached) { .map(p -> p.toLastDeadLetterOffset(null)) .toArray(Partition[]::new)); failedCommitPartitions.remove(etp); - messagesAllowedToSend = (int) getMessagesAllowedToSend(); // fixme think partition = null; + messagesAllowedForPartition = messagesAllowedToSend; } } final List toSend = partitionData.takeEventsToStream( currentTimeMillis, - Math.min(getBatchLimitEvents(), messagesAllowedToSend), + Math.min(getBatchLimitEvents(), messagesAllowedForPartition), getParameters().batchTimeoutMillis, streamTimeoutReached); @@ -329,7 +327,9 @@ private void streamToOutput(final boolean streamTimeoutReached) { if (toSend.isEmpty()) { break; } + messagesAllowedToSend -= toSend.size(); + messagesAllowedForPartition -= toSend.size(); } }