diff --git a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java index d40f64844797..7acab8762fb8 100644 --- a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java +++ b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java @@ -86,6 +86,7 @@ static class ParallelIterator implements CloseableIterator { private final CompletableFuture>>[] taskFutures; private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); private final AtomicBoolean closed = new AtomicBoolean(false); + private final int maxQueueSize; private ParallelIterator( Iterable> iterables, ExecutorService workerPool, int maxQueueSize) { @@ -97,6 +98,7 @@ private ParallelIterator( this.workerPool = workerPool; // submit 2 tasks per worker at a time this.taskFutures = new CompletableFuture[2 * ThreadPools.WORKER_THREAD_POOL_SIZE]; + this.maxQueueSize = maxQueueSize; } @Override @@ -153,6 +155,7 @@ private synchronized boolean checkTasks() { try { Optional> continuation = taskFutures[i].get(); continuation.ifPresent(yieldedTasks::addLast); + taskFutures[i] = null; } catch (ExecutionException e) { if (e.getCause() instanceof RuntimeException) { // rethrow a runtime exception @@ -165,7 +168,10 @@ private synchronized boolean checkTasks() { } } - taskFutures[i] = submitNextTask(); + // submit a new task if there is space in the queue + if (queue.size() < maxQueueSize) { + taskFutures[i] = submitNextTask(); + } } if (taskFutures[i] != null) { @@ -257,17 +263,24 @@ private static class Task implements Supplier>>, Closeable { @Override public Optional> get() { try { + if (queue.size() >= approximateMaxQueueSize) { + // Yield when queue is over the size limit. Task will be resubmitted later and continue + // the work. + // + // Tasks might hold references (via iterator) to constrained resources + // (e.g. pooled connections). Hence, tasks should yield only when + // iterator is not instantiated. Otherwise, there could be + // a deadlock when yielded tasks are waiting to be executed while + // currently executed tasks are waiting for the resources that are held + // by the yielded tasks. + return Optional.of(this); + } + if (iterator == null) { iterator = input.iterator(); } while (iterator.hasNext()) { - if (queue.size() >= approximateMaxQueueSize) { - // Yield when queue is over the size limit. Task will be resubmitted later and continue - // the work. - return Optional.of(this); - } - T next = iterator.next(); if (closed.get()) { break; diff --git a/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java b/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java index 410e33058d0c..a1e14a22a74d 100644 --- a/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java +++ b/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -39,6 +40,7 @@ import org.apache.iceberg.util.ParallelIterable.ParallelIterator; import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; public class TestParallelIterable { @Test @@ -143,7 +145,7 @@ public CloseableIterator iterator() { @Test public void limitQueueSize() { - ExecutorService executor = Executors.newCachedThreadPool(); + ExecutorService executor = Executors.newSingleThreadExecutor(); try { List> iterables = ImmutableList.of( @@ -167,7 +169,7 @@ public void limitQueueSize() { while (iterator.hasNext()) { assertThat(iterator.queueSize()) .as("iterator internal queue size") - .isLessThanOrEqualTo(maxQueueSize + iterables.size()); + .isLessThanOrEqualTo(100); actualValues.add(iterator.next()); } @@ -182,41 +184,61 @@ public void limitQueueSize() { } @Test - public void queueSizeOne() { - ExecutorService executor = Executors.newCachedThreadPool(); + @Timeout(10) + public void noDeadlock() { + // This test simulates a scenario where iterators use a constrained resource + // (e.g. an S3 connection pool that has a limit on the number of connections). + // In this case, the constrained resource shouldn't cause a deadlock when queue + // is full and the iterator is waiting for the queue to be drained. + ExecutorService executor = Executors.newFixedThreadPool(1); try { - List> iterables = - ImmutableList.of( - () -> IntStream.range(0, 100).iterator(), - () -> IntStream.range(0, 100).iterator(), - () -> IntStream.range(0, 100).iterator()); + Semaphore semaphore = new Semaphore(1); - Multiset expectedValues = - IntStream.range(0, 100) - .boxed() - .flatMap(i -> Stream.of(i, i, i)) - .collect(ImmutableMultiset.toImmutableMultiset()); + List> iterablesA = + ImmutableList.of( + testIterable( + semaphore::acquire, semaphore::release, IntStream.range(0, 100).iterator())); + List> iterablesB = + ImmutableList.of( + testIterable( + semaphore::acquire, semaphore::release, IntStream.range(200, 300).iterator())); - ParallelIterable parallelIterable = new ParallelIterable<>(iterables, executor, 1); - ParallelIterator iterator = (ParallelIterator) parallelIterable.iterator(); + ParallelIterable parallelIterableA = new ParallelIterable<>(iterablesA, executor, 1); + ParallelIterable parallelIterableB = new ParallelIterable<>(iterablesB, executor, 1); - Multiset actualValues = HashMultiset.create(); + parallelIterableA.iterator().next(); + parallelIterableB.iterator().next(); + } finally { + executor.shutdownNow(); + } + } - while (iterator.hasNext()) { - assertThat(iterator.queueSize()) - .as("iterator internal queue size") - .isLessThanOrEqualTo(1 + iterables.size()); - actualValues.add(iterator.next()); + private CloseableIterable testIterable( + RunnableWithException open, RunnableWithException close, Iterator iterator) { + return new CloseableIterable() { + @Override + public void close() { + try { + close.run(); + } catch (Exception e) { + throw new RuntimeException(e); + } } - assertThat(actualValues) - .as("multiset of values returned by the iterator") - .isEqualTo(expectedValues); + @Override + public CloseableIterator iterator() { + try { + open.run(); + return CloseableIterator.withClose(iterator); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }; + } - iterator.close(); - } finally { - executor.shutdown(); - } + private interface RunnableWithException { + void run() throws Exception; } private void queueHasElements(ParallelIterator iterator) {