Skip to content
This repository has been archived by the owner on Jun 7, 2024. It is now read-only.

Commit

Permalink
store failed commit on exit (#1556)
Browse files Browse the repository at this point in the history
* store failed commit on exit

the fix ensures the closing state does not lose failed commits in case the stream is closed before commit timeout reached
  • Loading branch information
adyach authored Oct 11, 2023
1 parent 26c5662 commit f34c6f6
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 27 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.zalando.nakadi.webservice.hila;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -37,6 +38,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

import static com.jayway.restassured.RestAssured.given;
Expand All @@ -56,6 +58,7 @@
import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.createEventType;
import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.createSubscription;
import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.getNumberOfAssignedStreams;
import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.publishBusinessEventWithUserDefinedPartition;
import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.publishEvent;
import static org.zalando.nakadi.webservice.utils.NakadiTestUtils.publishEvents;
import static org.zalando.nakadi.webservice.utils.TestStreamingClient.SESSION_ID_UNKNOWN;
Expand Down Expand Up @@ -100,7 +103,7 @@ public void whenStreamTimeoutReachedPossibleToCommit() throws Exception {
@Test(timeout = 30000)
public void whenEventTypeRepartitionedTheNewSubscriptionShouldHaveUpdatedPartition() throws Exception {
final EventType eventType = NakadiTestUtils.createBusinessEventTypeWithPartitions(1);
NakadiTestUtils.publishBusinessEventWithUserDefinedPartition(
publishBusinessEventWithUserDefinedPartition(
eventType.getName(), 1, x -> "{\"foo\":\"bar\"}", p -> "0");
NakadiTestUtils.repartitionEventType(eventType, 2);
final Subscription subscription = createSubscription(
Expand All @@ -111,7 +114,7 @@ public void whenEventTypeRepartitionedTheNewSubscriptionShouldHaveUpdatedPartiti
final TestStreamingClient clientAfterRepartitioning = TestStreamingClient
.create(URL, subscription.getId(), "")
.start();
NakadiTestUtils.publishBusinessEventWithUserDefinedPartition(
publishBusinessEventWithUserDefinedPartition(
eventType.getName(), 1, x -> "{\"foo\":\"bar" + x + "\"}", p -> "1");
waitFor(() -> assertThat(clientAfterRepartitioning.getJsonBatches(), Matchers.hasSize(2)));
Assert.assertTrue(clientAfterRepartitioning.getJsonBatches().stream()
Expand Down Expand Up @@ -600,7 +603,7 @@ public void whenPatchThenCursorsAreInitializedAndPatched() throws Exception {

@Test(timeout = 15000)
public void whenCommitFailsThreeTimesAndSingleBatchEventFailsThreeTimesThenEventSkipped() throws IOException {
final Subscription subscription = createAutoDLQSubscription();
final Subscription subscription = createAutoDLQSubscription(eventType);
final TestStreamingClient client = TestStreamingClient
.create(URL, subscription.getId(), "batch_limit=3&commit_timeout=1")
.start();
Expand Down Expand Up @@ -641,7 +644,7 @@ public void whenCommitFailsThreeTimesAndSingleBatchEventFailsThreeTimesThenEvent

@Test(timeout = 15000)
public void whenIsLookingForDeadLetterAndCommitComesThenContinueLooking() throws IOException {
final Subscription subscription = createAutoDLQSubscription();
final Subscription subscription = createAutoDLQSubscription(eventType);
final TestStreamingClient client = TestStreamingClient
.create(URL, subscription.getId(), "batch_limit=10&commit_timeout=1")
.start();
Expand Down Expand Up @@ -707,7 +710,7 @@ public void whenIsLookingForDeadLetterAndCommitComesThenContinueLooking() throws
@Test(timeout = 20_000)
public void whenIsLookingForDeadLetterAndSendAllEventsOneByOneThenBackToNormalBatchSize()
throws InterruptedException, IOException {
final Subscription subscription = createAutoDLQSubscription();
final Subscription subscription = createAutoDLQSubscription(eventType);
final TestStreamingClient client = TestStreamingClient
.create(URL, subscription.getId(), "batch_limit=10&commit_timeout=1&stream_limit=20")
.start();
Expand Down Expand Up @@ -746,13 +749,54 @@ public void whenIsLookingForDeadLetterAndSendAllEventsOneByOneThenBackToNormalBa
waitFor(() -> Assert.assertFalse(client.isRunning()), 15_000);
}

@Test(timeout = 20_000)
public void shouldSkipDeadLetterdAndConsumptionToBeContinued() throws IOException {
final EventType eventType = NakadiTestUtils.createBusinessEventTypeWithPartitions(4);

publishBusinessEventWithUserDefinedPartition(eventType.getName(),
50, i -> String.format("{\"foo\":\"bar%d\"}", i), i -> String.valueOf(i % 4));

final Subscription subscription = createAutoDLQSubscription(eventType);

final AtomicReference<SubscriptionCursor> cursorWithPoisonPill = new AtomicReference<>();
while (true) {
final TestStreamingClient client = TestStreamingClient.create(
URL, subscription.getId(), "batch_limit=3&commit_timeout=1&stream_timeout=2");
client.start(streamBatch -> {
if (streamBatch.getEvents().stream()
.anyMatch(event -> event.get("foo").equals("{\"foo\":\"bar10\"}"))) {
// skipp commit to introduce poison pill
cursorWithPoisonPill.set(streamBatch.getCursor());
throw new RuntimeException();
} else {
try {
NakadiTestUtils.commitCursors(
subscription.getId(), ImmutableList.of(streamBatch.getCursor()), client.getSessionId());
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
});

waitFor(() -> Assert.assertFalse(client.isRunning()));

if (client.getJsonBatches().stream()
.filter(streamBatch -> streamBatch.getCursor().getPartition()
.equals(cursorWithPoisonPill.get().getPartition()))
.anyMatch(streamBatch -> streamBatch.getCursor().getOffset()
.compareTo(cursorWithPoisonPill.get().getOffset()) > 0)) {
return;
}
}
}

private static boolean isCommitTimeoutReached(final TestStreamingClient client) {
return client.getJsonBatches().stream()
.filter(batch -> batch.getMetadata() != null)
.anyMatch(batch -> batch.getMetadata().getDebug().equals("Commit timeout reached"));
}

private Subscription createAutoDLQSubscription() throws IOException {
private Subscription createAutoDLQSubscription(final EventType eventType) throws IOException {
final SubscriptionBase subscription = RandomSubscriptionBuilder.builder()
.withEventType(eventType.getName())
.withStartFrom(BEGIN)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,15 @@ private TestStreamingClient startInternal(final boolean wait,

public TestStreamingClient start() {
try {
return startInternal(false, new JsonConsumer());
return startInternal(false, new JsonConsumer((ignore) -> {}));
} catch (final InterruptedException ignore) {
throw new RuntimeException(ignore);
}
}

public TestStreamingClient start(final Consumer<StreamBatch> onBatch) {
try {
return startInternal(false, new JsonConsumer(onBatch));
} catch (final InterruptedException ignore) {
throw new RuntimeException(ignore);
}
Expand All @@ -123,7 +131,7 @@ public TestStreamingClient startBinary() {
public TestStreamingClient startWithAutocommit(final Consumer<List<StreamBatch>> batchesListener)
throws InterruptedException {
this.batchesListener = batchesListener;
final TestStreamingClient client = startInternal(true, new JsonConsumer());
final TestStreamingClient client = startInternal(true, new JsonConsumer((ignore)->{}));
final Thread autocommitThread = new Thread(() -> {
int oldIdx = 0;
while (client.isRunning()) {
Expand Down Expand Up @@ -252,6 +260,13 @@ public void run() {

private class JsonConsumer extends ConsumerThread {


private final Consumer<StreamBatch> onBatch;

JsonConsumer(final Consumer<StreamBatch> onBatch) {
this.onBatch = onBatch;
}

@Override
void addHeaders() {
}
Expand All @@ -270,6 +285,7 @@ void readBatches(final InputStream inputStream) throws IOException {
synchronized (jsonBatches) {
jsonBatches.add(streamBatch);
}
onBatch.accept(streamBatch);
} catch (final SocketTimeoutException ste) {
LOG.info("No data in 10 ms, retrying read data");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ class ClosingState extends State {

@Override
public void onExit() {
try {
updateFailedCommitsCount();
} catch (final RuntimeException re) {
LOG.error("Failed to update failed commits count", re);
}

try {
getAutocommit().autocommit();
freePartitions(new HashSet<>(listeners.keySet()));
Expand All @@ -57,15 +63,24 @@ public void onExit() {
}
}

private void updateFailedCommitsCount() {
if (getContext().getMaxEventSendCount() == null) {
return;
}

getZk().updateTopology(topology -> Arrays.stream(topology.getPartitions())
.filter(p -> uncommittedOffsets.containsKey(new EventTypePartition(p.getEventType(), p.getPartition())))
.map(Partition::toIncFailedCommits)
.toArray(Partition[]::new));
}

@Override
public void onEnter() {
final long timeToWaitMillis = getParameters().commitTimeoutMillis -
(System.currentTimeMillis() - lastCommitSupplier.getAsLong());
uncommittedOffsets = uncommittedOffsetsSupplier.get();
if (!uncommittedOffsets.isEmpty() && timeToWaitMillis > 0) {
scheduleTask(() -> {
// commit timeout will be reached for the partitions, lets update topology with number of failed commits
updateFailedCommitsCount();
switchState(new CleanupState());
}, timeToWaitMillis, TimeUnit.MILLISECONDS);

Expand All @@ -76,27 +91,11 @@ public void onEnter() {
return;
}
reactOnTopologyChange();
} else if (!uncommittedOffsets.isEmpty()) {
// commit timeout reached for these partitions, lets update topology with number of failed commits
updateFailedCommitsCount();
switchState(new CleanupState());
} else {
switchState(new CleanupState());
}
}

private void updateFailedCommitsCount() {
if (getContext().getMaxEventSendCount() == null) {
return;
}

getZk().updateTopology(topology -> Arrays.stream(topology.getPartitions())
.filter(p -> uncommittedOffsets.containsKey(new EventTypePartition(p.getEventType(), p.getPartition())))
.map(Partition::toIncFailedCommits)
.toArray(Partition[]::new)
);
}

private void onTopologyChanged() {
if (topologyListener == null) {
throw new IllegalStateException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -514,7 +514,7 @@ void reactOnTopologyChange() {
trackIdleness(topology);

if (getContext().getMaxEventSendCount() != null) {
failedCommitPartitions = Arrays.stream(topology.getPartitions())
failedCommitPartitions = Arrays.stream(assignedPartitions)
.filter(p -> p.getFailedCommitsCount() > 0 || p.isLookingForDeadLetter())
.collect(Collectors.toMap(
p -> new EventTypePartition(p.getEventType(), p.getPartition()),
Expand Down

0 comments on commit f34c6f6

Please sign in to comment.