Skip to content

Commit

Permalink
Remove a possible deadlock on polling queue fill (#462)
Browse files Browse the repository at this point in the history
* Remove a possible deadlock on polling queue fill

Adding new items to the receive queue for the PrefetchRecordsPublisher
when at capacity would deadlock retrievals as it was already holding
a lock on this.

The method addArrivedRecordsInput did not need to be synchronized on
this as it didn't change any of the protected
state (requestedResponses).  There is a call to drainQueueForRequests
immediately after the addArrivedRecordsInput that will ensure newly
arrived data is dispatched.

This fixes #448

* Small fix on the reasoning comment

* Adjust the test to act more like the ShardConsumer

The ShardConsuemr, which is the principal user of the
PrefetchRecordsPublisher, uses RxJava to consume from publisher. This
test uses RxJava to consume, and notifies the test thread once
MAX_ITEMS * 3 have been received. This ensures that we cycle through
the queue at least 3 times.

* Removed the upper limit on the retrievals

The way RxJava's request management makes it possible that more
requests than we might expect can happen.
  • Loading branch information
pfifer authored and sahilpalvia committed Nov 8, 2018
1 parent b83a32b commit f52f255
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public void cancel() {
});
}

private synchronized void addArrivedRecordsInput(ProcessRecordsInput processRecordsInput) throws InterruptedException {
private void addArrivedRecordsInput(ProcessRecordsInput processRecordsInput) throws InterruptedException {
getRecordsResultQueue.put(processRecordsInput);
prefetchCounters.added(processRecordsInput);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,38 +15,48 @@

package software.amazon.kinesis.retrieval.polling;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import lombok.extern.slf4j.Slf4j;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
Expand Down Expand Up @@ -222,10 +232,11 @@ public void testCallAfterShutdown() {
@Test
public void testExpiredIteratorException() {
log.info("Starting tests");
getRecordsCache.start(sequenceNumber, initialPosition);

when(getRecordsRetrievalStrategy.getRecords(MAX_RECORDS_PER_CALL)).thenThrow(ExpiredIteratorException.class)
.thenReturn(getRecordsResponse);

getRecordsCache.start(sequenceNumber, initialPosition);

doNothing().when(dataFetcher).restartIterator();

getRecordsCache.getNextResult();
Expand All @@ -235,6 +246,85 @@ public void testExpiredIteratorException() {
verify(dataFetcher).restartIterator();
}

@Test(timeout = 1000L)
public void testNoDeadlockOnFullQueue() {
//
// Fixes https://github.com/awslabs/amazon-kinesis-client/issues/448
//
// This test is to verify that the drain of a blocked queue no longer deadlocks.
// If the test times out before starting the subscriber it means something went wrong while filling the queue.
// After the subscriber is started one of the things that can trigger a timeout is a deadlock.
//
GetRecordsResponse response = GetRecordsResponse.builder().records(
Record.builder().data(SdkBytes.fromByteArray(new byte[] { 1, 2, 3 })).sequenceNumber("123").build())
.build();
when(getRecordsRetrievalStrategy.getRecords(anyInt())).thenReturn(response);

getRecordsCache.start(sequenceNumber, initialPosition);

//
// Wait for the queue to fill up, and the publisher to block on adding items to the queue.
//
log.info("Waiting for queue to fill up");
while (getRecordsCache.getRecordsResultQueue.size() < MAX_SIZE) {
Thread.yield();
}

log.info("Queue is currently at {} starting subscriber", getRecordsCache.getRecordsResultQueue.size());
AtomicInteger receivedItems = new AtomicInteger(0);
final int expectedItems = MAX_SIZE * 3;

Object lock = new Object();

Subscriber<ProcessRecordsInput> subscriber = new Subscriber<ProcessRecordsInput>() {
Subscription sub;

@Override
public void onSubscribe(Subscription s) {
sub = s;
s.request(1);
}

@Override
public void onNext(ProcessRecordsInput processRecordsInput) {
receivedItems.incrementAndGet();
if (receivedItems.get() >= expectedItems) {
synchronized (lock) {
log.info("Notifying waiters");
lock.notifyAll();
}
sub.cancel();
} else {
sub.request(1);
}
}

@Override
public void onError(Throwable t) {
log.error("Caught error", t);
throw new RuntimeException(t);
}

@Override
public void onComplete() {
fail("onComplete not expected in this test");
}
};

synchronized (lock) {
log.info("Awaiting notification");
Flowable.fromPublisher(getRecordsCache).subscribeOn(Schedulers.computation())
.observeOn(Schedulers.computation(), true, 8).subscribe(subscriber);
try {
lock.wait();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
verify(getRecordsRetrievalStrategy, atLeast(expectedItems)).getRecords(anyInt());
assertThat(receivedItems.get(), equalTo(expectedItems));
}

@After
public void shutdown() {
getRecordsCache.shutdown();
Expand Down

0 comments on commit f52f255

Please sign in to comment.