diff --git a/dwio/nimble/velox/FieldWriter.cpp b/dwio/nimble/velox/FieldWriter.cpp index 45104bf..419926c 100644 --- a/dwio/nimble/velox/FieldWriter.cpp +++ b/dwio/nimble/velox/FieldWriter.cpp @@ -20,7 +20,9 @@ #include "dwio/nimble/velox/SchemaBuilder.h" #include "dwio/nimble/velox/SchemaTypes.h" #include "folly/ScopeGuard.h" +#include "folly/concurrency/ConcurrentHashMap.h" #include "velox/common/base/CompareFlags.h" +#include "velox/dwio/common/ExecutorBarrier.h" #include "velox/vector/ComplexVector.h" #include "velox/vector/DictionaryVector.h" #include "velox/vector/FlatVector.h" @@ -369,6 +371,11 @@ class RowFieldWriter : public FieldWriter { : FieldWriter{context, context.schemaBuilder.createRowTypeBuilder(type->size())}, nullsStream_{context_.createNullsStreamData( typeBuilder_->asRow().nullsDescriptor())} { + if (context.writeExecutor) { + barrier_ = std::make_unique( + std::move(context_.writeExecutor)); + } + auto rowType = std::dynamic_pointer_cast(type->type()); @@ -417,8 +424,26 @@ class RowFieldWriter : public FieldWriter { Decoded{decoded}, [&](auto offset) { childRanges.add(offset, 1); }); } - for (auto i = 0; i < fields_.size(); ++i) { - fields_[i]->write(row->childAt(i), *childRangesPtr); + + if (barrier_) { + for (auto i = 0; i < fields_.size(); ++i) { + const auto& kind = fields_[i]->typeBuilder()->kind(); + if (kind == Kind::FlatMap) { + // if flatmap handle within due to fieldvaluewriter creation + fields_[i]->write(row->childAt(i), *childRangesPtr); + } else { + barrier_->add([&field = fields_[i], + &rowItem = row->childAt(i), + &childRanges = *childRangesPtr]() { + field->write(rowItem, childRanges); + }); + } + } + barrier_->waitAll(); + } else { + for (auto i = 0; i < fields_.size(); ++i) { + fields_[i]->write(row->childAt(i), *childRangesPtr); + } } } @@ -437,6 +462,7 @@ class RowFieldWriter : public FieldWriter { } private: + std::unique_ptr barrier_; std::vector> fields_; NullsStreamData& nullsStream_; }; @@ -836,7 +862,12 @@ class FlatMapFieldWriter : public FieldWriter { NimbleTypeTraits::scalarKind)), nullsStream_{context_.createNullsStreamData( typeBuilder_->asFlatMap().nullsDescriptor())}, - valueType_{type->childAt(1)} {} + valueType_{type->childAt(1)} { + if (context.writeExecutor) { + barrier_ = std::make_unique( + std::move(context.writeExecutor)); + } + } void write(const velox::VectorPtr& vector, const OrderedRanges& ranges) override { @@ -999,8 +1030,16 @@ class FlatMapFieldWriter : public FieldWriter { // Now actually ingest the map values if (nonNullCount > 0) { auto& values = map->mapValues(); - for (auto& pair : currentValueFields_) { - pair.second->write(values, nonNullCount); + + if (barrier_) { + for (auto& pair : currentValueFields_) { + barrier_->add([&]() { pair.second->write(values, nonNullCount); }); + } + barrier_->waitAll(); + } else { + for (auto& pair : currentValueFields_) { + pair.second->write(values, nonNullCount); + } } } nonNullCount_ += nonNullCount; @@ -1037,6 +1076,7 @@ class FlatMapFieldWriter : public FieldWriter { } private: + std::unique_ptr barrier_; FlatMapValueFieldWriter* getValueFieldWriter(KeyType key, uint32_t size) { auto it = currentValueFields_.find(key); if (it != currentValueFields_.end()) { @@ -1075,7 +1115,8 @@ class FlatMapFieldWriter : public FieldWriter { NullsStreamData& nullsStream_; // This map store the FlatMapValue fields used in current flush unit. - folly::F14FastMap currentValueFields_; + folly::ConcurrentHashMap + currentValueFields_; // This map stores the FlatMapPassthrough fields. folly::F14FastMap< @@ -1086,7 +1127,7 @@ class FlatMapFieldWriter : public FieldWriter { uint64_t nonNullCount_ = 0; // This map store all FlatMapValue fields encountered by the VeloxWriter // across the whole file. - folly::F14FastMap> + folly::ConcurrentHashMap> allValueFields_; }; diff --git a/dwio/nimble/velox/FieldWriter.h b/dwio/nimble/velox/FieldWriter.h index 3de9531..a9220b6 100644 --- a/dwio/nimble/velox/FieldWriter.h +++ b/dwio/nimble/velox/FieldWriter.h @@ -93,12 +93,14 @@ class DecodingContextPool { struct FieldWriterContext { explicit FieldWriterContext( velox::memory::MemoryPool& memoryPool, + std::shared_ptr writeExecutor = nullptr, std::unique_ptr reclaimer = nullptr, std::function vectorDecoderVisitor = []() {}) : bufferMemoryPool{memoryPool.addLeafChild( "field_writer_buffer", true, std::move(reclaimer))}, + writeExecutor{std::move(writeExecutor)}, inputBufferGrowthPolicy{ DefaultInputBufferGrowthPolicy::withDefaultRanges()}, decodingContextPool_{std::make_unique( @@ -107,6 +109,7 @@ struct FieldWriterContext { } std::shared_ptr bufferMemoryPool; + std::shared_ptr writeExecutor; SchemaBuilder schemaBuilder; folly::F14FastSet flatMapNodeIds; diff --git a/dwio/nimble/velox/VeloxWriter.cpp b/dwio/nimble/velox/VeloxWriter.cpp index 0c5ea32..194d079 100644 --- a/dwio/nimble/velox/VeloxWriter.cpp +++ b/dwio/nimble/velox/VeloxWriter.cpp @@ -64,7 +64,12 @@ class WriterContext : public FieldWriterContext { WriterContext( velox::memory::MemoryPool& memoryPool, VeloxWriterOptions options) - : FieldWriterContext{memoryPool, options.reclaimerFactory(), options.vectorDecoderVisitor}, + : FieldWriterContext{ + memoryPool, + options.writeExecutor, + options.reclaimerFactory(), + options.vectorDecoderVisitor + }, options{std::move(options)}, logger{this->options.metricsLogger} { flushPolicy = this->options.flushPolicyFactory(); diff --git a/dwio/nimble/velox/VeloxWriterOptions.h b/dwio/nimble/velox/VeloxWriterOptions.h index 8e4386a..f4c0ecf 100644 --- a/dwio/nimble/velox/VeloxWriterOptions.h +++ b/dwio/nimble/velox/VeloxWriterOptions.h @@ -129,6 +129,8 @@ struct VeloxWriterOptions { // If provided, internal encoding operations will happen in parallel using // this executor. std::shared_ptr encodingExecutor; + // If provided, internal write operations will happen in parallel + std::shared_ptr writeExecutor; bool enableChunking = false; diff --git a/dwio/nimble/velox/tests/VeloxReaderTests.cpp b/dwio/nimble/velox/tests/VeloxReaderTests.cpp index b192107..12657a5 100644 --- a/dwio/nimble/velox/tests/VeloxReaderTests.cpp +++ b/dwio/nimble/velox/tests/VeloxReaderTests.cpp @@ -2433,6 +2433,8 @@ TEST_F(VeloxReaderTests, FuzzSimple) { if (parallelismFactor > 0) { writerOptions.encodingExecutor = std::make_shared(parallelismFactor); + writerOptions.writeExecutor = + std::make_shared(parallelismFactor); } for (auto i = 0; i < iterations; ++i) { @@ -2519,9 +2521,12 @@ TEST_F(VeloxReaderTests, FuzzComplex) { for (auto parallelismFactor : {0U, 1U, 2U, std::thread::hardware_concurrency()}) { LOG(INFO) << "Parallelism Factor: " << parallelismFactor; - writerOptions.encodingExecutor = parallelismFactor > 0 - ? std::make_shared(parallelismFactor) - : nullptr; + if (parallelismFactor > 0) { + writerOptions.encodingExecutor = + std::make_shared(parallelismFactor); + writerOptions.writeExecutor = + std::make_shared(parallelismFactor); + } for (auto i = 0; i < iterations; ++i) { writeAndVerify(