diff --git a/velox/dwio/common/ExecutorBarrier.cpp b/velox/dwio/common/ExecutorBarrier.cpp index f05e342a44f7..4d5d106f6dcc 100644 --- a/velox/dwio/common/ExecutorBarrier.cpp +++ b/velox/dwio/common/ExecutorBarrier.cpp @@ -17,6 +17,7 @@ #include "velox/dwio/common/ExecutorBarrier.h" namespace facebook::velox::dwio::common { + namespace { class BarrierElement { @@ -71,11 +72,11 @@ auto ExecutorBarrier::wrapMethod(folly::Func f) { } void ExecutorBarrier::add(folly::Func f) { - executor_->add(wrapMethod(std::move(f))); + executor_.add(wrapMethod(std::move(f))); } void ExecutorBarrier::addWithPriority(folly::Func f, int8_t priority) { - executor_->addWithPriority(wrapMethod(std::move(f)), priority); + executor_.addWithPriority(wrapMethod(std::move(f)), priority); } } // namespace facebook::velox::dwio::common diff --git a/velox/dwio/common/ExecutorBarrier.h b/velox/dwio/common/ExecutorBarrier.h index 08ba1c583f67..c3004ea1e489 100644 --- a/velox/dwio/common/ExecutorBarrier.h +++ b/velox/dwio/common/ExecutorBarrier.h @@ -26,15 +26,11 @@ namespace facebook::velox::dwio::common { class ExecutorBarrier : public folly::Executor { public: - explicit ExecutorBarrier(folly::Executor::KeepAlive<> executor) - : executor_{std::move(executor)}, count_{0} {} + explicit ExecutorBarrier(folly::Executor& executor) + : executor_{executor}, count_{0} {} - // Constructor version that holds ownership over the executor (holds a - // shared_ptr copy). explicit ExecutorBarrier(std::shared_ptr executor) - : owned_{std::move(executor)}, - executor_{folly::getKeepAliveToken(*owned_)}, - count_{0} {} + : owned_{std::move(executor)}, executor_{*owned_}, count_{0} {} ~ExecutorBarrier() override { // If this object gets destroyed while there are still tasks pending, those @@ -54,7 +50,7 @@ class ExecutorBarrier : public folly::Executor { void addWithPriority(folly::Func, int8_t priority) override; uint8_t getNumPriorities() const override { - return executor_->getNumPriorities(); + return executor_.getNumPriorities(); } void waitAll() { @@ -72,7 +68,7 @@ class ExecutorBarrier : public folly::Executor { auto wrapMethod(folly::Func f); std::shared_ptr owned_; - folly::Executor::KeepAlive<> executor_; + folly::Executor& executor_; size_t count_; std::mutex mutex_; std::condition_variable cv_; diff --git a/velox/dwio/common/ParallelFor.cpp b/velox/dwio/common/ParallelFor.cpp index e20bf7c7f316..793b2f486acf 100644 --- a/velox/dwio/common/ParallelFor.cpp +++ b/velox/dwio/common/ParallelFor.cpp @@ -19,6 +19,7 @@ #include "velox/dwio/common/ExecutorBarrier.h" namespace facebook::velox::dwio::common { + namespace { std::vector> @@ -59,7 +60,7 @@ splitRange(size_t from, size_t to, size_t factor) { } // namespace ParallelFor::ParallelFor( - folly::Executor::KeepAlive<> executor, + folly::Executor* executor, size_t from, size_t to, size_t parallelismFactor) @@ -88,7 +89,7 @@ void ParallelFor::execute(std::function func) { VELOX_CHECK( executor_, "Executor wasn't provided so we shouldn't have more than 1 range"); - ExecutorBarrier barrier(executor_); + ExecutorBarrier barrier(*executor_); const size_t last = ranges_.size() - 1; // First N-1 ranges in executor threads for (size_t r = 0; r < last; ++r) { diff --git a/velox/dwio/common/ParallelFor.h b/velox/dwio/common/ParallelFor.h index 4080f5cde5ea..f6debc9f14fd 100644 --- a/velox/dwio/common/ParallelFor.h +++ b/velox/dwio/common/ParallelFor.h @@ -36,7 +36,7 @@ namespace facebook::velox::dwio::common { class ParallelFor { public: ParallelFor( - folly::Executor::KeepAlive<> executor, + folly::Executor* executor, size_t from, // start index size_t to, // past end index // number of threads. @@ -53,7 +53,7 @@ class ParallelFor { private: std::shared_ptr owned_; - folly::Executor::KeepAlive<> executor_; + folly::Executor* executor_; std::vector> ranges_; }; diff --git a/velox/dwio/common/tests/ExecutorBarrierTest.cpp b/velox/dwio/common/tests/ExecutorBarrierTest.cpp index 1a9533b4a188..0eb528c0bef9 100644 --- a/velox/dwio/common/tests/ExecutorBarrierTest.cpp +++ b/velox/dwio/common/tests/ExecutorBarrierTest.cpp @@ -26,7 +26,7 @@ TEST(ExecutorBarrierTest, GetNumPriorities) { const uint8_t kNumPriorities = 5; auto executor = std::make_shared(10, kNumPriorities); - auto barrier = std::make_shared(executor); + auto barrier = std::make_shared(*executor); EXPECT_EQ(barrier->getNumPriorities(), kNumPriorities); } @@ -41,7 +41,7 @@ TEST(ExecutorBarrierTest, CanOwn) { TEST(ExecutorBarrierTest, CanAwaitMultipleTimes) { auto executor = std::make_shared(10); - auto barrier = std::make_shared(executor); + auto barrier = std::make_shared(*executor); for (int time = 0, multipleTimes = 10; time < multipleTimes; ++time) { barrier->waitAll(); } @@ -49,7 +49,7 @@ TEST(ExecutorBarrierTest, CanAwaitMultipleTimes) { TEST(ExecutorBarrierTest, AddCanBeReused) { auto executor = std::make_shared(10); - auto barrier = std::make_shared(executor); + auto barrier = std::make_shared(*executor); const int kCalls = 30; std::atomic count{0}; @@ -68,7 +68,7 @@ TEST(ExecutorBarrierTest, AddCanBeReused) { TEST(ExecutorBarrierTest, AddWithPriorityCanBeReused) { auto executor = std::make_shared(10); - auto barrier = std::make_shared(executor); + auto barrier = std::make_shared(*executor); const int kCalls = 30; const int8_t kPriority = 4; @@ -88,7 +88,7 @@ TEST(ExecutorBarrierTest, AddWithPriorityCanBeReused) { TEST(ExecutorBarrierTest, AddCanBeReusedAfterException) { auto executor = std::make_shared(10); - auto barrier = std::make_shared(executor); + auto barrier = std::make_shared(*executor); const int kCalls = 30; std::atomic count{0}; @@ -110,7 +110,7 @@ TEST(ExecutorBarrierTest, AddCanBeReusedAfterException) { TEST(ExecutorBarrierTest, AddWithPriorityCanBeReusedAfterException) { auto executor = std::make_shared(10); - auto barrier = std::make_shared(executor); + auto barrier = std::make_shared(*executor); const int kCalls = 30; const int8_t kPriority = 4; @@ -135,7 +135,7 @@ TEST(ExecutorBarrierTest, AddWithPriorityCanBeReusedAfterException) { TEST(ExecutorBarrierTest, Add) { auto executor = std::make_shared(10); - auto barrier = std::make_shared(executor); + auto barrier = std::make_shared(*executor); const int kCalls = 30; std::atomic count{0}; @@ -148,7 +148,7 @@ TEST(ExecutorBarrierTest, Add) { TEST(ExecutorBarrierTest, AddWithPriority) { auto executor = std::make_shared(10); - auto barrier = std::make_shared(executor); + auto barrier = std::make_shared(*executor); const int kCalls = 30; const int8_t kPriority = 4; @@ -162,7 +162,7 @@ TEST(ExecutorBarrierTest, AddWithPriority) { TEST(ExecutorBarrierTest, AddCanIgnore) { auto executor = std::make_shared(10); - auto barrier = std::make_shared(executor); + auto barrier = std::make_shared(*executor); const int kCalls = 30; for (int i = 0; i < kCalls; ++i) { @@ -173,7 +173,7 @@ TEST(ExecutorBarrierTest, AddCanIgnore) { TEST(ExecutorBarrierTest, AddWithPriorityCanIgnore) { auto executor = std::make_shared(10); - auto barrier = std::make_shared(executor); + auto barrier = std::make_shared(*executor); const int kCalls = 30; for (int i = 0; i < kCalls; ++i) { @@ -187,7 +187,7 @@ TEST(ExecutorBarrierTest, DestructorDoesntThrow) { std::atomic count{0}; { auto executor = std::make_shared(10); - auto barrier = std::make_shared(executor); + auto barrier = std::make_shared(*executor); for (int i = 0; i < kCalls; ++i) { barrier->add([shouldThrow = (i == 0), &count]() { @@ -203,7 +203,7 @@ TEST(ExecutorBarrierTest, DestructorDoesntThrow) { TEST(ExecutorBarrierTest, AddException) { auto executor = std::make_shared(10); - auto barrier = std::make_shared(executor); + auto barrier = std::make_shared(*executor); const int kCalls = 30; std::atomic count{0}; @@ -221,7 +221,7 @@ TEST(ExecutorBarrierTest, AddException) { TEST(ExecutorBarrierTest, AddWithPriorityException) { auto executor = std::make_shared(10); - auto barrier = std::make_shared(executor); + auto barrier = std::make_shared(*executor); const int kCalls = 30; const int8_t kPriority = 4; @@ -242,7 +242,7 @@ TEST(ExecutorBarrierTest, AddWithPriorityException) { TEST(ExecutorBarrierTest, AddNonStdException) { auto executor = std::make_shared(10); - auto barrier = std::make_shared(executor); + auto barrier = std::make_shared(*executor); const int kCalls = 30; std::atomic count{0}; @@ -261,7 +261,7 @@ TEST(ExecutorBarrierTest, AddNonStdException) { TEST(ExecutorBarrierTest, AddWithPriorityNonStdException) { auto executor = std::make_shared(10); - auto barrier = std::make_shared(executor); + auto barrier = std::make_shared(*executor); const int kCalls = 30; const int8_t kPriority = 4; @@ -283,7 +283,7 @@ TEST(ExecutorBarrierTest, AddWithPriorityNonStdException) { TEST(ExecutorBarrierTest, AddExceptions) { auto executor = std::make_shared(10); - auto barrier = std::make_shared(executor); + auto barrier = std::make_shared(*executor); const int kCalls = 30; std::atomic count{0}; @@ -299,7 +299,7 @@ TEST(ExecutorBarrierTest, AddExceptions) { TEST(ExecutorBarrierTest, AddWithPriorityExceptions) { auto executor = std::make_shared(10); - auto barrier = std::make_shared(executor); + auto barrier = std::make_shared(*executor); const int kCalls = 30; const int8_t kPriority = 4;