diff --git a/velox/dwio/common/ExecutorBarrier.cpp b/velox/dwio/common/ExecutorBarrier.cpp index 4d5d106f6dcc..f05e342a44f7 100644 --- a/velox/dwio/common/ExecutorBarrier.cpp +++ b/velox/dwio/common/ExecutorBarrier.cpp @@ -17,7 +17,6 @@ #include "velox/dwio/common/ExecutorBarrier.h" namespace facebook::velox::dwio::common { - namespace { class BarrierElement { @@ -72,11 +71,11 @@ auto ExecutorBarrier::wrapMethod(folly::Func f) { } void ExecutorBarrier::add(folly::Func f) { - executor_.add(wrapMethod(std::move(f))); + executor_->add(wrapMethod(std::move(f))); } void ExecutorBarrier::addWithPriority(folly::Func f, int8_t priority) { - executor_.addWithPriority(wrapMethod(std::move(f)), priority); + executor_->addWithPriority(wrapMethod(std::move(f)), priority); } } // namespace facebook::velox::dwio::common diff --git a/velox/dwio/common/ExecutorBarrier.h b/velox/dwio/common/ExecutorBarrier.h index c3004ea1e489..08ba1c583f67 100644 --- a/velox/dwio/common/ExecutorBarrier.h +++ b/velox/dwio/common/ExecutorBarrier.h @@ -26,11 +26,15 @@ namespace facebook::velox::dwio::common { class ExecutorBarrier : public folly::Executor { public: - explicit ExecutorBarrier(folly::Executor& executor) - : executor_{executor}, count_{0} {} + explicit ExecutorBarrier(folly::Executor::KeepAlive<> executor) + : executor_{std::move(executor)}, count_{0} {} + // Constructor version that holds ownership over the executor (holds a + // shared_ptr copy). explicit ExecutorBarrier(std::shared_ptr executor) - : owned_{std::move(executor)}, executor_{*owned_}, count_{0} {} + : owned_{std::move(executor)}, + executor_{folly::getKeepAliveToken(*owned_)}, + count_{0} {} ~ExecutorBarrier() override { // If this object gets destroyed while there are still tasks pending, those @@ -50,7 +54,7 @@ class ExecutorBarrier : public folly::Executor { void addWithPriority(folly::Func, int8_t priority) override; uint8_t getNumPriorities() const override { - return executor_.getNumPriorities(); + return executor_->getNumPriorities(); } void waitAll() { @@ -68,7 +72,7 @@ class ExecutorBarrier : public folly::Executor { auto wrapMethod(folly::Func f); std::shared_ptr owned_; - folly::Executor& executor_; + folly::Executor::KeepAlive<> executor_; size_t count_; std::mutex mutex_; std::condition_variable cv_; diff --git a/velox/dwio/common/ParallelFor.cpp b/velox/dwio/common/ParallelFor.cpp index 793b2f486acf..e20bf7c7f316 100644 --- a/velox/dwio/common/ParallelFor.cpp +++ b/velox/dwio/common/ParallelFor.cpp @@ -19,7 +19,6 @@ #include "velox/dwio/common/ExecutorBarrier.h" namespace facebook::velox::dwio::common { - namespace { std::vector> @@ -60,7 +59,7 @@ splitRange(size_t from, size_t to, size_t factor) { } // namespace ParallelFor::ParallelFor( - folly::Executor* executor, + folly::Executor::KeepAlive<> executor, size_t from, size_t to, size_t parallelismFactor) @@ -89,7 +88,7 @@ void ParallelFor::execute(std::function func) { VELOX_CHECK( executor_, "Executor wasn't provided so we shouldn't have more than 1 range"); - ExecutorBarrier barrier(*executor_); + ExecutorBarrier barrier(executor_); const size_t last = ranges_.size() - 1; // First N-1 ranges in executor threads for (size_t r = 0; r < last; ++r) { diff --git a/velox/dwio/common/ParallelFor.h b/velox/dwio/common/ParallelFor.h index f6debc9f14fd..4080f5cde5ea 100644 --- a/velox/dwio/common/ParallelFor.h +++ b/velox/dwio/common/ParallelFor.h @@ -36,7 +36,7 @@ namespace facebook::velox::dwio::common { class ParallelFor { public: ParallelFor( - folly::Executor* executor, + folly::Executor::KeepAlive<> executor, size_t from, // start index size_t to, // past end index // number of threads. @@ -53,7 +53,7 @@ class ParallelFor { private: std::shared_ptr owned_; - folly::Executor* executor_; + folly::Executor::KeepAlive<> executor_; std::vector> ranges_; }; diff --git a/velox/dwio/common/tests/ExecutorBarrierTest.cpp b/velox/dwio/common/tests/ExecutorBarrierTest.cpp index 0eb528c0bef9..1a9533b4a188 100644 --- a/velox/dwio/common/tests/ExecutorBarrierTest.cpp +++ b/velox/dwio/common/tests/ExecutorBarrierTest.cpp @@ -26,7 +26,7 @@ TEST(ExecutorBarrierTest, GetNumPriorities) { const uint8_t kNumPriorities = 5; auto executor = std::make_shared(10, kNumPriorities); - auto barrier = std::make_shared(*executor); + auto barrier = std::make_shared(executor); EXPECT_EQ(barrier->getNumPriorities(), kNumPriorities); } @@ -41,7 +41,7 @@ TEST(ExecutorBarrierTest, CanOwn) { TEST(ExecutorBarrierTest, CanAwaitMultipleTimes) { auto executor = std::make_shared(10); - auto barrier = std::make_shared(*executor); + auto barrier = std::make_shared(executor); for (int time = 0, multipleTimes = 10; time < multipleTimes; ++time) { barrier->waitAll(); } @@ -49,7 +49,7 @@ TEST(ExecutorBarrierTest, CanAwaitMultipleTimes) { TEST(ExecutorBarrierTest, AddCanBeReused) { auto executor = std::make_shared(10); - auto barrier = std::make_shared(*executor); + auto barrier = std::make_shared(executor); const int kCalls = 30; std::atomic count{0}; @@ -68,7 +68,7 @@ TEST(ExecutorBarrierTest, AddCanBeReused) { TEST(ExecutorBarrierTest, AddWithPriorityCanBeReused) { auto executor = std::make_shared(10); - auto barrier = std::make_shared(*executor); + auto barrier = std::make_shared(executor); const int kCalls = 30; const int8_t kPriority = 4; @@ -88,7 +88,7 @@ TEST(ExecutorBarrierTest, AddWithPriorityCanBeReused) { TEST(ExecutorBarrierTest, AddCanBeReusedAfterException) { auto executor = std::make_shared(10); - auto barrier = std::make_shared(*executor); + auto barrier = std::make_shared(executor); const int kCalls = 30; std::atomic count{0}; @@ -110,7 +110,7 @@ TEST(ExecutorBarrierTest, AddCanBeReusedAfterException) { TEST(ExecutorBarrierTest, AddWithPriorityCanBeReusedAfterException) { auto executor = std::make_shared(10); - auto barrier = std::make_shared(*executor); + auto barrier = std::make_shared(executor); const int kCalls = 30; const int8_t kPriority = 4; @@ -135,7 +135,7 @@ TEST(ExecutorBarrierTest, AddWithPriorityCanBeReusedAfterException) { TEST(ExecutorBarrierTest, Add) { auto executor = std::make_shared(10); - auto barrier = std::make_shared(*executor); + auto barrier = std::make_shared(executor); const int kCalls = 30; std::atomic count{0}; @@ -148,7 +148,7 @@ TEST(ExecutorBarrierTest, Add) { TEST(ExecutorBarrierTest, AddWithPriority) { auto executor = std::make_shared(10); - auto barrier = std::make_shared(*executor); + auto barrier = std::make_shared(executor); const int kCalls = 30; const int8_t kPriority = 4; @@ -162,7 +162,7 @@ TEST(ExecutorBarrierTest, AddWithPriority) { TEST(ExecutorBarrierTest, AddCanIgnore) { auto executor = std::make_shared(10); - auto barrier = std::make_shared(*executor); + auto barrier = std::make_shared(executor); const int kCalls = 30; for (int i = 0; i < kCalls; ++i) { @@ -173,7 +173,7 @@ TEST(ExecutorBarrierTest, AddCanIgnore) { TEST(ExecutorBarrierTest, AddWithPriorityCanIgnore) { auto executor = std::make_shared(10); - auto barrier = std::make_shared(*executor); + auto barrier = std::make_shared(executor); const int kCalls = 30; for (int i = 0; i < kCalls; ++i) { @@ -187,7 +187,7 @@ TEST(ExecutorBarrierTest, DestructorDoesntThrow) { std::atomic count{0}; { auto executor = std::make_shared(10); - auto barrier = std::make_shared(*executor); + auto barrier = std::make_shared(executor); for (int i = 0; i < kCalls; ++i) { barrier->add([shouldThrow = (i == 0), &count]() { @@ -203,7 +203,7 @@ TEST(ExecutorBarrierTest, DestructorDoesntThrow) { TEST(ExecutorBarrierTest, AddException) { auto executor = std::make_shared(10); - auto barrier = std::make_shared(*executor); + auto barrier = std::make_shared(executor); const int kCalls = 30; std::atomic count{0}; @@ -221,7 +221,7 @@ TEST(ExecutorBarrierTest, AddException) { TEST(ExecutorBarrierTest, AddWithPriorityException) { auto executor = std::make_shared(10); - auto barrier = std::make_shared(*executor); + auto barrier = std::make_shared(executor); const int kCalls = 30; const int8_t kPriority = 4; @@ -242,7 +242,7 @@ TEST(ExecutorBarrierTest, AddWithPriorityException) { TEST(ExecutorBarrierTest, AddNonStdException) { auto executor = std::make_shared(10); - auto barrier = std::make_shared(*executor); + auto barrier = std::make_shared(executor); const int kCalls = 30; std::atomic count{0}; @@ -261,7 +261,7 @@ TEST(ExecutorBarrierTest, AddNonStdException) { TEST(ExecutorBarrierTest, AddWithPriorityNonStdException) { auto executor = std::make_shared(10); - auto barrier = std::make_shared(*executor); + auto barrier = std::make_shared(executor); const int kCalls = 30; const int8_t kPriority = 4; @@ -283,7 +283,7 @@ TEST(ExecutorBarrierTest, AddWithPriorityNonStdException) { TEST(ExecutorBarrierTest, AddExceptions) { auto executor = std::make_shared(10); - auto barrier = std::make_shared(*executor); + auto barrier = std::make_shared(executor); const int kCalls = 30; std::atomic count{0}; @@ -299,7 +299,7 @@ TEST(ExecutorBarrierTest, AddExceptions) { TEST(ExecutorBarrierTest, AddWithPriorityExceptions) { auto executor = std::make_shared(10); - auto barrier = std::make_shared(*executor); + auto barrier = std::make_shared(executor); const int kCalls = 30; const int8_t kPriority = 4; diff --git a/velox/dwio/dwrf/test/TestColumnReader.cpp b/velox/dwio/dwrf/test/TestColumnReader.cpp index ef055a29f2fb..c903d3a3f2b9 100644 --- a/velox/dwio/dwrf/test/TestColumnReader.cpp +++ b/velox/dwio/dwrf/test/TestColumnReader.cpp @@ -121,7 +121,7 @@ class ColumnReaderTestBase { common::ScanSpec* scanSpec = nullptr) { const std::shared_ptr& rowType = std::dynamic_pointer_cast(requestedType); - if (parallelDecoding()) { + if (parallelDecoding() && !executor_) { executor_ = std::make_unique(2); } ColumnSelector cs(rowType, nodes, true); @@ -221,12 +221,12 @@ class ColumnReaderTestBase { virtual bool returnFlatVector() const = 0; virtual bool parallelDecoding() const = 0; + std::unique_ptr executor_; MockStripeStreams streams_; memory::AllocationPool pool_; StreamLabels labels_; std::unique_ptr columnReader_; std::unique_ptr selectiveColumnReader_; - std::unique_ptr executor_; private: std::unique_ptr scanSpec_;