-
Notifications
You must be signed in to change notification settings - Fork 2.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix ParallelIterable deadlock #11781
base: main
Are you sure you want to change the base?
Conversation
ec139c4
to
da723f0
Compare
core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java
Outdated
Show resolved
Hide resolved
core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java
Outdated
Show resolved
Hide resolved
core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java
Outdated
Show resolved
Hide resolved
b072cc9
to
cab3521
Compare
b41baf9
to
c94513b
Compare
rebased |
c94513b
to
66ed87f
Compare
From a discussion I had with @sopel39 today; I think we can go forward this solution but I think it will basically re-introduce the memory usage issue that we saw previously. (For some use cases) From our discussion I believe we have been working at this from the wrong direction. Recap -The original implementation basically assumed that we could read as made iterators as parallelism allowed. This led to basically unbounded memory usage. This ended up becoming an issue for systems using a lightweight coordinator which would be required to have a huge memory footprint (essentially all the metadata entires for a table would be held in memory per query). Next, to solve our issue with the memory footprint we added what is essentially a buffered read-ahead. A queue is filled with elements from the files we are reading in parallel and we check the queue depth every time we add elements to bound its size. The max queue size here is checked at every pull so we really can never go more than parallelism items over the max queue size. Unfortunately, this leads to the current deadlock issue since we can potentially yield an iterator in the middle of a file and be left only with iterators for files which cannot yet be opened because all file handles are owned by yielded iterators. The current proposed solution is to switch checking the queue size per element and instead check only before opening a new file for the first time. This means that any file that is opened is read completely into the read-ahead queue. This fixes the dead lock issue as we will never yield in the middle of the file but possibly reintroduces the memory issue. In the worst case scenario we would open up to "parallelism" files simultaneously and load them all into the queue before having a chance to check our queue size. Where do we go from here -The current implementation is basically trying to solve the issue of : But this is actually a bit over general for what we are actually trying to do. Our actual problem is The key difference here is that we know exactly how long each file is before we open it. Instead of simply opening I think what we should do is a bit like this (haven't thought this part through too much - synchronization primitives are just for representation, I realize it won't work exactly like this)
|
It was observed that with high concurrency/high workload scenario cluster deadlocks due to manifest readers waiting for connection from S3 pool. Specifically, ManifestGroup#plan will create ManifestReader per every ParallelIterable.Task. These readers will effectively hold onto S3 connection from the pool. When ParallelIterable queue is full, Task will be tabled for later use. Consider scenario: S3 connection pool size=1 approximateMaxQueueSize=1 workerPoolSize=1 ParallelIterable1: starts TaskP1 ParallelIterable1: TaskP1 produces result, queue gets full, TaskP1 is put on hold (holds S3 connection) ParallelIterable2: starts TaskP2, TaskP2 is scheduled on workerPool but is blocked on S3 connection pool ParallelIterable1: result gets consumed, TaskP1 is scheduled again ParallelIterable1: TaskP1 waits for workerPool to be free, but TaskP2 is waiting for TaskP1 to release connection The fix make sure Task is finished once it's started. This way limited resources like connection pool are not put on hold. Queue size might exceed strict limits, but it should still be bounded. Fixes apache#11768
66ed87f
to
3436e7f
Compare
@findepi I'm on board with this but I want to make sure you are also happy with this. I'm unsure of whether having the ability to yield before files will really help memory pressure, so I'm slightly thinking we should just revert the yielding capabilities all together and just go back to the implementation from last year. I'm willing to go forward though with either direction for now. In the future I think we really need a fake filesystem benchmarking test that we can use to simulate how the algorithms we write here will work since we are mostly working blind. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As noted in my comment to Piotr, I think this is a fix to the deadlock but I think it may be better to just remove the yielding behavior all together till we have a better replacement. If folks have experience where file level yielding would appropriately limit memory usage, I think we can go forward with this as an interim solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @RussellSpitzer for the great summary.
This change looks fine.
I also feel ok to revert the yield change completely, as I feel this change of avoiding deadlock might make the yield solution ineffective anyway. The scenario with OOM was large thread pool (like 184 thread) and large manifest files (like hundreds of MBs or GBs).
@sopel39 I assume you tried this change and it avoided deadlock problem. Has this been tested with the OOM scenario for large manifest files?
It was observed that with high concurrency/high workload scenario cluster deadlocks due to manifest readers waiting for connection from S3 pool.
Specifically, ManifestGroup#plan will create ManifestReader per every ParallelIterable.Task. These readers will effectively hold onto S3 connection from the pool. When ParallelIterable queue is full, Task will be tabled for later use.
Consider scenario:
S3 connection pool size=1
approximateMaxQueueSize=1
workerPoolSize=1
ParallelIterable1: starts TaskP1
ParallelIterable1: TaskP1 produces result, queue gets full, TaskP1 is put on hold (holds S3 connection)
ParallelIterable2: starts TaskP2, TaskP2 is scheduled on workerPool but is blocked on S3 connection pool
ParallelIterable1: result gets consumed, TaskP1 is scheduled again
ParallelIterable1: TaskP1 waits for workerPool to be free, but TaskP2 is waiting for TaskP1 to release connection
The fix make sure Task is finished once it's started. This way limited resources like connection pool are not put on hold. Queue size might exceed strict limits, but it should still be bounded.
Fixes #11768