Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Fix possible deadlock in ParallelIterable #11781

Merged
merged 2 commits into from
Jan 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ static class ParallelIterator<T> implements CloseableIterator<T> {
private final CompletableFuture<Optional<Task<T>>>[] taskFutures;
private final ConcurrentLinkedQueue<T> queue = new ConcurrentLinkedQueue<>();
private final AtomicBoolean closed = new AtomicBoolean(false);
private final int maxQueueSize;

private ParallelIterator(
Iterable<? extends Iterable<T>> iterables, ExecutorService workerPool, int maxQueueSize) {
Expand All @@ -97,6 +98,7 @@ private ParallelIterator(
this.workerPool = workerPool;
// submit 2 tasks per worker at a time
this.taskFutures = new CompletableFuture[2 * ThreadPools.WORKER_THREAD_POOL_SIZE];
this.maxQueueSize = maxQueueSize;
}

@Override
Expand Down Expand Up @@ -153,6 +155,7 @@ private synchronized boolean checkTasks() {
try {
Optional<Task<T>> continuation = taskFutures[i].get();
continuation.ifPresent(yieldedTasks::addLast);
taskFutures[i] = null;
sopel39 marked this conversation as resolved.
Show resolved Hide resolved
} catch (ExecutionException e) {
if (e.getCause() instanceof RuntimeException) {
// rethrow a runtime exception
Expand All @@ -165,7 +168,10 @@ private synchronized boolean checkTasks() {
}
}

taskFutures[i] = submitNextTask();
// submit a new task if there is space in the queue
if (queue.size() < maxQueueSize) {
taskFutures[i] = submitNextTask();
sopel39 marked this conversation as resolved.
Show resolved Hide resolved
}
}

if (taskFutures[i] != null) {
Expand Down Expand Up @@ -257,17 +263,24 @@ private static class Task<T> implements Supplier<Optional<Task<T>>>, Closeable {
@Override
public Optional<Task<T>> get() {
try {
if (queue.size() >= approximateMaxQueueSize) {
sopel39 marked this conversation as resolved.
Show resolved Hide resolved
// Yield when queue is over the size limit. Task will be resubmitted later and continue
// the work.
//
// Tasks might hold references (via iterator) to constrained resources
sopel39 marked this conversation as resolved.
Show resolved Hide resolved
// (e.g. pooled connections). Hence, tasks should yield only when
// iterator is not instantiated. Otherwise, there could be
// a deadlock when yielded tasks are waiting to be executed while
// currently executed tasks are waiting for the resources that are held
// by the yielded tasks.
return Optional.of(this);
sopel39 marked this conversation as resolved.
Show resolved Hide resolved
}

if (iterator == null) {
iterator = input.iterator();
}

while (iterator.hasNext()) {
if (queue.size() >= approximateMaxQueueSize) {
sopel39 marked this conversation as resolved.
Show resolved Hide resolved
// Yield when queue is over the size limit. Task will be resubmitted later and continue
// the work.
return Optional.of(this);
}

T next = iterator.next();
if (closed.get()) {
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import java.util.stream.Stream;
Expand All @@ -39,6 +40,7 @@
import org.apache.iceberg.util.ParallelIterable.ParallelIterator;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

public class TestParallelIterable {
@Test
Expand Down Expand Up @@ -143,7 +145,7 @@ public CloseableIterator<Integer> iterator() {

@Test
public void limitQueueSize() {
ExecutorService executor = Executors.newCachedThreadPool();
ExecutorService executor = Executors.newSingleThreadExecutor();
sopel39 marked this conversation as resolved.
Show resolved Hide resolved
try {
List<Iterable<Integer>> iterables =
ImmutableList.of(
Expand All @@ -167,7 +169,7 @@ public void limitQueueSize() {
while (iterator.hasNext()) {
assertThat(iterator.queueSize())
.as("iterator internal queue size")
.isLessThanOrEqualTo(maxQueueSize + iterables.size());
.isLessThanOrEqualTo(100);
actualValues.add(iterator.next());
}

Expand All @@ -182,41 +184,61 @@ public void limitQueueSize() {
}

@Test
public void queueSizeOne() {
sopel39 marked this conversation as resolved.
Show resolved Hide resolved
ExecutorService executor = Executors.newCachedThreadPool();
@Timeout(10)
public void noDeadlock() {
sopel39 marked this conversation as resolved.
Show resolved Hide resolved
// This test simulates a scenario where iterators use a constrained resource
// (e.g. an S3 connection pool that has a limit on the number of connections).
// In this case, the constrained resource shouldn't cause a deadlock when queue
// is full and the iterator is waiting for the queue to be drained.
ExecutorService executor = Executors.newFixedThreadPool(1);
try {
List<Iterable<Integer>> iterables =
ImmutableList.of(
() -> IntStream.range(0, 100).iterator(),
() -> IntStream.range(0, 100).iterator(),
() -> IntStream.range(0, 100).iterator());
Semaphore semaphore = new Semaphore(1);

Multiset<Integer> expectedValues =
IntStream.range(0, 100)
.boxed()
.flatMap(i -> Stream.of(i, i, i))
.collect(ImmutableMultiset.toImmutableMultiset());
List<Iterable<Integer>> iterablesA =
ImmutableList.of(
testIterable(
semaphore::acquire, semaphore::release, IntStream.range(0, 100).iterator()));
List<Iterable<Integer>> iterablesB =
ImmutableList.of(
testIterable(
semaphore::acquire, semaphore::release, IntStream.range(200, 300).iterator()));

ParallelIterable<Integer> parallelIterable = new ParallelIterable<>(iterables, executor, 1);
ParallelIterator<Integer> iterator = (ParallelIterator<Integer>) parallelIterable.iterator();
ParallelIterable<Integer> parallelIterableA = new ParallelIterable<>(iterablesA, executor, 1);
ParallelIterable<Integer> parallelIterableB = new ParallelIterable<>(iterablesB, executor, 1);

Multiset<Integer> actualValues = HashMultiset.create();
parallelIterableA.iterator().next();
parallelIterableB.iterator().next();
} finally {
executor.shutdownNow();
}
}

while (iterator.hasNext()) {
assertThat(iterator.queueSize())
.as("iterator internal queue size")
.isLessThanOrEqualTo(1 + iterables.size());
actualValues.add(iterator.next());
private <T> CloseableIterable<T> testIterable(
RunnableWithException open, RunnableWithException close, Iterator<T> iterator) {
return new CloseableIterable<T>() {
@Override
public void close() {
try {
close.run();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

assertThat(actualValues)
.as("multiset of values returned by the iterator")
.isEqualTo(expectedValues);
@Override
public CloseableIterator<T> iterator() {
try {
open.run();
return CloseableIterator.withClose(iterator);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
}

iterator.close();
} finally {
executor.shutdown();
}
private interface RunnableWithException {
void run() throws Exception;
}

private void queueHasElements(ParallelIterator<Integer> iterator) {
Expand Down