Skip to content

Commit

Permalink
Revert D66724539: refactor: Use KeepAlive instead of Executor*
Browse files Browse the repository at this point in the history
Differential Revision:
D66724539

Original commit changeset: 231527346bdd

Original Phabricator Diff: D66724539

fbshipit-source-id: a239159cc880400a3ef4650ad842f43331ac5d73
  • Loading branch information
Wenbin Lin authored and facebook-github-bot committed Dec 5, 2024
1 parent 09fbd54 commit 83cfd4f
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 32 deletions.
5 changes: 3 additions & 2 deletions velox/dwio/common/ExecutorBarrier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "velox/dwio/common/ExecutorBarrier.h"

namespace facebook::velox::dwio::common {

namespace {

class BarrierElement {
Expand Down Expand Up @@ -71,11 +72,11 @@ auto ExecutorBarrier::wrapMethod(folly::Func f) {
}

void ExecutorBarrier::add(folly::Func f) {
executor_->add(wrapMethod(std::move(f)));
executor_.add(wrapMethod(std::move(f)));
}

void ExecutorBarrier::addWithPriority(folly::Func f, int8_t priority) {
executor_->addWithPriority(wrapMethod(std::move(f)), priority);
executor_.addWithPriority(wrapMethod(std::move(f)), priority);
}

} // namespace facebook::velox::dwio::common
14 changes: 5 additions & 9 deletions velox/dwio/common/ExecutorBarrier.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,11 @@ namespace facebook::velox::dwio::common {

class ExecutorBarrier : public folly::Executor {
public:
explicit ExecutorBarrier(folly::Executor::KeepAlive<> executor)
: executor_{std::move(executor)}, count_{0} {}
explicit ExecutorBarrier(folly::Executor& executor)
: executor_{executor}, count_{0} {}

// Constructor version that holds ownership over the executor (holds a
// shared_ptr copy).
explicit ExecutorBarrier(std::shared_ptr<folly::Executor> executor)
: owned_{std::move(executor)},
executor_{folly::getKeepAliveToken(*owned_)},
count_{0} {}
: owned_{std::move(executor)}, executor_{*owned_}, count_{0} {}

~ExecutorBarrier() override {
// If this object gets destroyed while there are still tasks pending, those
Expand All @@ -54,7 +50,7 @@ class ExecutorBarrier : public folly::Executor {
void addWithPriority(folly::Func, int8_t priority) override;

uint8_t getNumPriorities() const override {
return executor_->getNumPriorities();
return executor_.getNumPriorities();
}

void waitAll() {
Expand All @@ -72,7 +68,7 @@ class ExecutorBarrier : public folly::Executor {
auto wrapMethod(folly::Func f);

std::shared_ptr<folly::Executor> owned_;
folly::Executor::KeepAlive<> executor_;
folly::Executor& executor_;
size_t count_;
std::mutex mutex_;
std::condition_variable cv_;
Expand Down
5 changes: 3 additions & 2 deletions velox/dwio/common/ParallelFor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "velox/dwio/common/ExecutorBarrier.h"

namespace facebook::velox::dwio::common {

namespace {

std::vector<std::pair<size_t, size_t>>
Expand Down Expand Up @@ -59,7 +60,7 @@ splitRange(size_t from, size_t to, size_t factor) {
} // namespace

ParallelFor::ParallelFor(
folly::Executor::KeepAlive<> executor,
folly::Executor* executor,
size_t from,
size_t to,
size_t parallelismFactor)
Expand Down Expand Up @@ -88,7 +89,7 @@ void ParallelFor::execute(std::function<void(size_t)> func) {
VELOX_CHECK(
executor_,
"Executor wasn't provided so we shouldn't have more than 1 range");
ExecutorBarrier barrier(executor_);
ExecutorBarrier barrier(*executor_);
const size_t last = ranges_.size() - 1;
// First N-1 ranges in executor threads
for (size_t r = 0; r < last; ++r) {
Expand Down
4 changes: 2 additions & 2 deletions velox/dwio/common/ParallelFor.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ namespace facebook::velox::dwio::common {
class ParallelFor {
public:
ParallelFor(
folly::Executor::KeepAlive<> executor,
folly::Executor* executor,
size_t from, // start index
size_t to, // past end index
// number of threads.
Expand All @@ -53,7 +53,7 @@ class ParallelFor {

private:
std::shared_ptr<folly::Executor> owned_;
folly::Executor::KeepAlive<> executor_;
folly::Executor* executor_;
std::vector<std::pair<size_t, size_t>> ranges_;
};

Expand Down
34 changes: 17 additions & 17 deletions velox/dwio/common/tests/ExecutorBarrierTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ TEST(ExecutorBarrierTest, GetNumPriorities) {
const uint8_t kNumPriorities = 5;
auto executor =
std::make_shared<folly::CPUThreadPoolExecutor>(10, kNumPriorities);
auto barrier = std::make_shared<ExecutorBarrier>(executor);
auto barrier = std::make_shared<ExecutorBarrier>(*executor);
EXPECT_EQ(barrier->getNumPriorities(), kNumPriorities);
}

Expand All @@ -41,15 +41,15 @@ TEST(ExecutorBarrierTest, CanOwn) {

TEST(ExecutorBarrierTest, CanAwaitMultipleTimes) {
auto executor = std::make_shared<folly::CPUThreadPoolExecutor>(10);
auto barrier = std::make_shared<ExecutorBarrier>(executor);
auto barrier = std::make_shared<ExecutorBarrier>(*executor);
for (int time = 0, multipleTimes = 10; time < multipleTimes; ++time) {
barrier->waitAll();
}
}

TEST(ExecutorBarrierTest, AddCanBeReused) {
auto executor = std::make_shared<folly::CPUThreadPoolExecutor>(10);
auto barrier = std::make_shared<ExecutorBarrier>(executor);
auto barrier = std::make_shared<ExecutorBarrier>(*executor);

const int kCalls = 30;
std::atomic<int> count{0};
Expand All @@ -68,7 +68,7 @@ TEST(ExecutorBarrierTest, AddCanBeReused) {

TEST(ExecutorBarrierTest, AddWithPriorityCanBeReused) {
auto executor = std::make_shared<folly::CPUThreadPoolExecutor>(10);
auto barrier = std::make_shared<ExecutorBarrier>(executor);
auto barrier = std::make_shared<ExecutorBarrier>(*executor);

const int kCalls = 30;
const int8_t kPriority = 4;
Expand All @@ -88,7 +88,7 @@ TEST(ExecutorBarrierTest, AddWithPriorityCanBeReused) {

TEST(ExecutorBarrierTest, AddCanBeReusedAfterException) {
auto executor = std::make_shared<folly::CPUThreadPoolExecutor>(10);
auto barrier = std::make_shared<ExecutorBarrier>(executor);
auto barrier = std::make_shared<ExecutorBarrier>(*executor);

const int kCalls = 30;
std::atomic<int> count{0};
Expand All @@ -110,7 +110,7 @@ TEST(ExecutorBarrierTest, AddCanBeReusedAfterException) {

TEST(ExecutorBarrierTest, AddWithPriorityCanBeReusedAfterException) {
auto executor = std::make_shared<folly::CPUThreadPoolExecutor>(10);
auto barrier = std::make_shared<ExecutorBarrier>(executor);
auto barrier = std::make_shared<ExecutorBarrier>(*executor);

const int kCalls = 30;
const int8_t kPriority = 4;
Expand All @@ -135,7 +135,7 @@ TEST(ExecutorBarrierTest, AddWithPriorityCanBeReusedAfterException) {

TEST(ExecutorBarrierTest, Add) {
auto executor = std::make_shared<folly::CPUThreadPoolExecutor>(10);
auto barrier = std::make_shared<ExecutorBarrier>(executor);
auto barrier = std::make_shared<ExecutorBarrier>(*executor);

const int kCalls = 30;
std::atomic<int> count{0};
Expand All @@ -148,7 +148,7 @@ TEST(ExecutorBarrierTest, Add) {

TEST(ExecutorBarrierTest, AddWithPriority) {
auto executor = std::make_shared<folly::CPUThreadPoolExecutor>(10);
auto barrier = std::make_shared<ExecutorBarrier>(executor);
auto barrier = std::make_shared<ExecutorBarrier>(*executor);

const int kCalls = 30;
const int8_t kPriority = 4;
Expand All @@ -162,7 +162,7 @@ TEST(ExecutorBarrierTest, AddWithPriority) {

TEST(ExecutorBarrierTest, AddCanIgnore) {
auto executor = std::make_shared<folly::CPUThreadPoolExecutor>(10);
auto barrier = std::make_shared<ExecutorBarrier>(executor);
auto barrier = std::make_shared<ExecutorBarrier>(*executor);

const int kCalls = 30;
for (int i = 0; i < kCalls; ++i) {
Expand All @@ -173,7 +173,7 @@ TEST(ExecutorBarrierTest, AddCanIgnore) {

TEST(ExecutorBarrierTest, AddWithPriorityCanIgnore) {
auto executor = std::make_shared<folly::CPUThreadPoolExecutor>(10);
auto barrier = std::make_shared<ExecutorBarrier>(executor);
auto barrier = std::make_shared<ExecutorBarrier>(*executor);

const int kCalls = 30;
for (int i = 0; i < kCalls; ++i) {
Expand All @@ -187,7 +187,7 @@ TEST(ExecutorBarrierTest, DestructorDoesntThrow) {
std::atomic<int> count{0};
{
auto executor = std::make_shared<folly::CPUThreadPoolExecutor>(10);
auto barrier = std::make_shared<ExecutorBarrier>(executor);
auto barrier = std::make_shared<ExecutorBarrier>(*executor);

for (int i = 0; i < kCalls; ++i) {
barrier->add([shouldThrow = (i == 0), &count]() {
Expand All @@ -203,7 +203,7 @@ TEST(ExecutorBarrierTest, DestructorDoesntThrow) {

TEST(ExecutorBarrierTest, AddException) {
auto executor = std::make_shared<folly::CPUThreadPoolExecutor>(10);
auto barrier = std::make_shared<ExecutorBarrier>(executor);
auto barrier = std::make_shared<ExecutorBarrier>(*executor);

const int kCalls = 30;
std::atomic<int> count{0};
Expand All @@ -221,7 +221,7 @@ TEST(ExecutorBarrierTest, AddException) {

TEST(ExecutorBarrierTest, AddWithPriorityException) {
auto executor = std::make_shared<folly::CPUThreadPoolExecutor>(10);
auto barrier = std::make_shared<ExecutorBarrier>(executor);
auto barrier = std::make_shared<ExecutorBarrier>(*executor);

const int kCalls = 30;
const int8_t kPriority = 4;
Expand All @@ -242,7 +242,7 @@ TEST(ExecutorBarrierTest, AddWithPriorityException) {

TEST(ExecutorBarrierTest, AddNonStdException) {
auto executor = std::make_shared<folly::CPUThreadPoolExecutor>(10);
auto barrier = std::make_shared<ExecutorBarrier>(executor);
auto barrier = std::make_shared<ExecutorBarrier>(*executor);

const int kCalls = 30;
std::atomic<int> count{0};
Expand All @@ -261,7 +261,7 @@ TEST(ExecutorBarrierTest, AddNonStdException) {

TEST(ExecutorBarrierTest, AddWithPriorityNonStdException) {
auto executor = std::make_shared<folly::CPUThreadPoolExecutor>(10);
auto barrier = std::make_shared<ExecutorBarrier>(executor);
auto barrier = std::make_shared<ExecutorBarrier>(*executor);

const int kCalls = 30;
const int8_t kPriority = 4;
Expand All @@ -283,7 +283,7 @@ TEST(ExecutorBarrierTest, AddWithPriorityNonStdException) {

TEST(ExecutorBarrierTest, AddExceptions) {
auto executor = std::make_shared<folly::CPUThreadPoolExecutor>(10);
auto barrier = std::make_shared<ExecutorBarrier>(executor);
auto barrier = std::make_shared<ExecutorBarrier>(*executor);

const int kCalls = 30;
std::atomic<int> count{0};
Expand All @@ -299,7 +299,7 @@ TEST(ExecutorBarrierTest, AddExceptions) {

TEST(ExecutorBarrierTest, AddWithPriorityExceptions) {
auto executor = std::make_shared<folly::CPUThreadPoolExecutor>(10);
auto barrier = std::make_shared<ExecutorBarrier>(executor);
auto barrier = std::make_shared<ExecutorBarrier>(*executor);

const int kCalls = 30;
const int8_t kPriority = 4;
Expand Down

0 comments on commit 83cfd4f

Please sign in to comment.