From bc48256c85da3d929886a0b74855d07a67622cc2 Mon Sep 17 00:00:00 2001 From: Yang Zhang Date: Wed, 10 Jul 2024 15:32:37 +0800 Subject: [PATCH] [VL] Minor refactor for ValueStream node construction and usage (#6382) --- cpp/velox/compute/VeloxPlanConverter.cc | 13 +------- .../operators/plannodes/RowVectorStream.h | 32 +++++++++++++------ cpp/velox/substrait/SubstraitToVeloxPlan.cc | 9 +++++- cpp/velox/substrait/SubstraitToVeloxPlan.h | 6 ++++ 4 files changed, 37 insertions(+), 23 deletions(-) diff --git a/cpp/velox/compute/VeloxPlanConverter.cc b/cpp/velox/compute/VeloxPlanConverter.cc index 315ff2da67ad..ed2545c78114 100644 --- a/cpp/velox/compute/VeloxPlanConverter.cc +++ b/cpp/velox/compute/VeloxPlanConverter.cc @@ -36,18 +36,7 @@ VeloxPlanConverter::VeloxPlanConverter( bool validationMode) : validationMode_(validationMode), substraitVeloxPlanConverter_(veloxPool, confMap, writeFilesTempPath, validationMode) { - // avoid include RowVectorStream.h in SubstraitToVeloxPlan.cpp, it may cause redefinition of array abi.h. - auto factory = [inputIters = std::move(inputIters), validationMode = validationMode]( - std::string nodeId, memory::MemoryPool* pool, int32_t streamIdx, RowTypePtr outputType) { - std::shared_ptr iterator; - if (!validationMode) { - VELOX_CHECK_LT(streamIdx, inputIters.size(), "Could not find stream index {} in input iterator list.", streamIdx); - iterator = inputIters[streamIdx]; - } - auto valueStream = std::make_shared(pool, iterator, outputType); - return std::make_shared(nodeId, outputType, std::move(valueStream)); - }; - substraitVeloxPlanConverter_.setValueStreamNodeFactory(std::move(factory)); + substraitVeloxPlanConverter_.setInputIters(std::move(inputIters)); } namespace { diff --git a/cpp/velox/operators/plannodes/RowVectorStream.h b/cpp/velox/operators/plannodes/RowVectorStream.h index e02b288c46e2..c72e9137f4a4 100644 --- a/cpp/velox/operators/plannodes/RowVectorStream.h +++ b/cpp/velox/operators/plannodes/RowVectorStream.h @@ -32,11 +32,17 @@ class RowVectorStream { : iterator_(std::move(iterator)), outputType_(outputType), pool_(pool) {} bool hasNext() { - return iterator_->hasNext(); + if (!finished_) { + finished_ = !iterator_->hasNext(); + } + return !finished_; } // Convert arrow batch to rowvector and use new output columns facebook::velox::RowVectorPtr next() { + if (finished_) { + return nullptr; + } const std::shared_ptr& vb = VeloxColumnarBatch::from(pool_, iterator_->next()); auto vp = vb->getRowVector(); VELOX_DCHECK(vp != nullptr); @@ -45,17 +51,18 @@ class RowVectorStream { } private: + bool finished_{false}; std::shared_ptr iterator_; const facebook::velox::RowTypePtr outputType_; facebook::velox::memory::MemoryPool* pool_; }; -class ValueStreamNode : public facebook::velox::core::PlanNode { +class ValueStreamNode final : public facebook::velox::core::PlanNode { public: ValueStreamNode( const facebook::velox::core::PlanNodeId& id, const facebook::velox::RowTypePtr& outputType, - std::shared_ptr valueStream) + std::unique_ptr valueStream) : facebook::velox::core::PlanNode(id), outputType_(outputType), valueStream_(std::move(valueStream)) { VELOX_CHECK_NOT_NULL(valueStream_); } @@ -68,8 +75,8 @@ class ValueStreamNode : public facebook::velox::core::PlanNode { return kEmptySources; }; - const std::shared_ptr& rowVectorStream() const { - return valueStream_; + RowVectorStream* rowVectorStream() const { + return valueStream_.get(); } std::string_view name() const override { @@ -84,7 +91,7 @@ class ValueStreamNode : public facebook::velox::core::PlanNode { void addDetails(std::stringstream& stream) const override{}; const facebook::velox::RowTypePtr outputType_; - std::shared_ptr valueStream_; + std::unique_ptr valueStream_; const std::vector kEmptySources; }; @@ -99,11 +106,14 @@ class ValueStream : public facebook::velox::exec::SourceOperator { valueStreamNode->outputType(), operatorId, valueStreamNode->id(), - "ValueStream") { + valueStreamNode->name().data()) { valueStream_ = valueStreamNode->rowVectorStream(); } facebook::velox::RowVectorPtr getOutput() override { + if (finished_) { + return nullptr; + } if (valueStream_->hasNext()) { return valueStream_->next(); } else { @@ -122,12 +132,14 @@ class ValueStream : public facebook::velox::exec::SourceOperator { private: bool finished_ = false; - std::shared_ptr valueStream_; + RowVectorStream* valueStream_; }; class RowVectorStreamOperatorTranslator : public facebook::velox::exec::Operator::PlanNodeTranslator { - std::unique_ptr - toOperator(facebook::velox::exec::DriverCtx* ctx, int32_t id, const facebook::velox::core::PlanNodePtr& node) { + std::unique_ptr toOperator( + facebook::velox::exec::DriverCtx* ctx, + int32_t id, + const facebook::velox::core::PlanNodePtr& node) override { if (auto valueStreamNode = std::dynamic_pointer_cast(node)) { return std::make_unique(id, ctx, valueStreamNode); } diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index 34710c35a40d..7b41f7071e84 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -18,6 +18,7 @@ #include "SubstraitToVeloxPlan.h" #include "TypeUtils.h" #include "VariantToVectorConverter.h" +#include "operators/plannodes/RowVectorStream.h" #include "velox/connectors/hive/HiveDataSink.h" #include "velox/exec/TableWriter.h" #include "velox/type/Filter.h" @@ -1107,7 +1108,13 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::constructValueStreamNode( } auto outputType = ROW(std::move(outNames), std::move(veloxTypeList)); - auto node = valueStreamNodeFactory_(nextPlanNodeId(), pool_, streamIdx, outputType); + std::shared_ptr iterator; + if (!validationMode_) { + VELOX_CHECK_LT(streamIdx, inputIters_.size(), "Could not find stream index {} in input iterator list.", streamIdx); + iterator = inputIters_[streamIdx]; + } + auto valueStream = std::make_unique(pool_, iterator, outputType); + auto node = std::make_shared(nextPlanNodeId(), outputType, std::move(valueStream)); auto splitInfo = std::make_shared(); splitInfo->isStream = true; diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.h b/cpp/velox/substrait/SubstraitToVeloxPlan.h index 1f2f39f51ae8..0e892469d098 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.h +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.h @@ -165,6 +165,10 @@ class SubstraitToVeloxPlanConverter { valueStreamNodeFactory_ = std::move(factory); } + void setInputIters(std::vector> inputIters) { + inputIters_ = std::move(inputIters); + } + /// Used to check if ReadRel specifies an input of stream. /// If yes, the index of input stream will be returned. /// If not, -1 will be returned. @@ -591,6 +595,8 @@ class SubstraitToVeloxPlanConverter { std::function valueStreamNodeFactory_; + std::vector> inputIters_; + /// The map storing the pre-built plan nodes which can be accessed through /// index. This map is only used when the computation of a Substrait plan /// depends on other input nodes.