Skip to content

Commit

Permalink
Capture KeepAlive instead of Executor in WriterOptions
Browse files Browse the repository at this point in the history
Summary:
folly::Executor::KeepAlive<> is the recommended way of holding
references to Executors, as they ensure the executor is kept alive until the
KeepAlive object is destroyed. Because of this, some folly APIs can only return
KeepAlive (and not shared_ptr), such as Global pools.

These APIs cannot use WriterOption is it takes a shared_ptr

Reviewed By: xiaoxmeng

Differential Revision: D66741079
  • Loading branch information
pedroerp authored and facebook-github-bot committed Dec 11, 2024
1 parent cc6724a commit 17dd2ba
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 11 deletions.
13 changes: 8 additions & 5 deletions dwio/nimble/velox/VeloxWriterOptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,14 @@ struct VeloxWriterOptions {

const velox::common::SpillConfig* spillConfig{nullptr};

// If provided, internal encoding operations will happen in parallel using
// this executor.
std::shared_ptr<folly::Executor> encodingExecutor;
// If provided, internal ingestion operations will happen in parallel
std::shared_ptr<folly::Executor> writeExecutor;
// If provided, internal writing/encoding operations will happen in parallel
// using the specified executors. The KeepAlive wrappers ensures the executor
// will be kept open.
//
// - encodingExecutor: execute stream encoding operations in parallel.
// - writeExecutor: execute FieldWriter::write() operations in parallel.
folly::Executor::KeepAlive<> encodingExecutor;
folly::Executor::KeepAlive<> writeExecutor;

bool enableChunking = false;

Expand Down
17 changes: 11 additions & 6 deletions dwio/nimble/velox/tests/VeloxReaderTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2426,14 +2426,17 @@ TEST_F(VeloxReaderTests, FuzzSimple) {
auto iterations = 20;
auto batches = 20;
std::mt19937 rng{seed};

for (auto parallelismFactor : {0U, 1U, std::thread::hardware_concurrency()}) {
LOG(INFO) << "Parallelism Factor: " << parallelismFactor;
nimble::VeloxWriterOptions writerOptions;
std::shared_ptr<folly::CPUThreadPoolExecutor> executor;

if (parallelismFactor > 0) {
auto executor =
executor =
std::make_shared<folly::CPUThreadPoolExecutor>(parallelismFactor);
writerOptions.encodingExecutor = executor;
writerOptions.writeExecutor = executor;
writerOptions.encodingExecutor = folly::getKeepAliveToken(*executor);
writerOptions.writeExecutor = folly::getKeepAliveToken(*executor);
}

for (auto i = 0; i < iterations; ++i) {
Expand Down Expand Up @@ -2527,11 +2530,13 @@ TEST_F(VeloxReaderTests, FuzzComplex) {

for (auto parallelismFactor : {0U, 1U, std::thread::hardware_concurrency()}) {
LOG(INFO) << "Parallelism Factor: " << parallelismFactor;
std::shared_ptr<folly::CPUThreadPoolExecutor> executor;

if (parallelismFactor > 0) {
auto executor =
executor =
std::make_shared<folly::CPUThreadPoolExecutor>(parallelismFactor);
writerOptions.encodingExecutor = executor;
writerOptions.writeExecutor = executor;
writerOptions.encodingExecutor = folly::getKeepAliveToken(*executor);
writerOptions.writeExecutor = folly::getKeepAliveToken(*executor);
}

for (auto i = 0; i < iterations; ++i) {
Expand Down

0 comments on commit 17dd2ba

Please sign in to comment.