Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL] Minor refactor for ValueStream node construction and usage #6382

Merged
merged 1 commit into from
Jul 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 1 addition & 12 deletions cpp/velox/compute/VeloxPlanConverter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,7 @@ VeloxPlanConverter::VeloxPlanConverter(
bool validationMode)
: validationMode_(validationMode),
substraitVeloxPlanConverter_(veloxPool, confMap, writeFilesTempPath, validationMode) {
// avoid include RowVectorStream.h in SubstraitToVeloxPlan.cpp, it may cause redefinition of array abi.h.
auto factory = [inputIters = std::move(inputIters), validationMode = validationMode](
std::string nodeId, memory::MemoryPool* pool, int32_t streamIdx, RowTypePtr outputType) {
std::shared_ptr<ResultIterator> iterator;
if (!validationMode) {
VELOX_CHECK_LT(streamIdx, inputIters.size(), "Could not find stream index {} in input iterator list.", streamIdx);
iterator = inputIters[streamIdx];
}
auto valueStream = std::make_shared<RowVectorStream>(pool, iterator, outputType);
return std::make_shared<ValueStreamNode>(nodeId, outputType, std::move(valueStream));
};
substraitVeloxPlanConverter_.setValueStreamNodeFactory(std::move(factory));
substraitVeloxPlanConverter_.setInputIters(std::move(inputIters));
}

namespace {
Expand Down
32 changes: 22 additions & 10 deletions cpp/velox/operators/plannodes/RowVectorStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,17 @@ class RowVectorStream {
: iterator_(std::move(iterator)), outputType_(outputType), pool_(pool) {}

bool hasNext() {
return iterator_->hasNext();
if (!finished_) {
finished_ = !iterator_->hasNext();
}
return !finished_;
}

// Convert arrow batch to rowvector and use new output columns
facebook::velox::RowVectorPtr next() {
if (finished_) {
return nullptr;
}
const std::shared_ptr<VeloxColumnarBatch>& vb = VeloxColumnarBatch::from(pool_, iterator_->next());
auto vp = vb->getRowVector();
VELOX_DCHECK(vp != nullptr);
Expand All @@ -45,17 +51,18 @@ class RowVectorStream {
}

private:
bool finished_{false};
std::shared_ptr<ResultIterator> iterator_;
const facebook::velox::RowTypePtr outputType_;
facebook::velox::memory::MemoryPool* pool_;
};

class ValueStreamNode : public facebook::velox::core::PlanNode {
class ValueStreamNode final : public facebook::velox::core::PlanNode {
public:
ValueStreamNode(
const facebook::velox::core::PlanNodeId& id,
const facebook::velox::RowTypePtr& outputType,
std::shared_ptr<RowVectorStream> valueStream)
std::unique_ptr<RowVectorStream> valueStream)
: facebook::velox::core::PlanNode(id), outputType_(outputType), valueStream_(std::move(valueStream)) {
VELOX_CHECK_NOT_NULL(valueStream_);
}
Expand All @@ -68,8 +75,8 @@ class ValueStreamNode : public facebook::velox::core::PlanNode {
return kEmptySources;
};

const std::shared_ptr<RowVectorStream>& rowVectorStream() const {
return valueStream_;
RowVectorStream* rowVectorStream() const {
return valueStream_.get();
}

std::string_view name() const override {
Expand All @@ -84,7 +91,7 @@ class ValueStreamNode : public facebook::velox::core::PlanNode {
void addDetails(std::stringstream& stream) const override{};

const facebook::velox::RowTypePtr outputType_;
std::shared_ptr<RowVectorStream> valueStream_;
std::unique_ptr<RowVectorStream> valueStream_;
const std::vector<facebook::velox::core::PlanNodePtr> kEmptySources;
};

Expand All @@ -99,11 +106,14 @@ class ValueStream : public facebook::velox::exec::SourceOperator {
valueStreamNode->outputType(),
operatorId,
valueStreamNode->id(),
"ValueStream") {
valueStreamNode->name().data()) {
valueStream_ = valueStreamNode->rowVectorStream();
}

facebook::velox::RowVectorPtr getOutput() override {
if (finished_) {
return nullptr;
}
if (valueStream_->hasNext()) {
return valueStream_->next();
} else {
Expand All @@ -122,12 +132,14 @@ class ValueStream : public facebook::velox::exec::SourceOperator {

private:
bool finished_ = false;
std::shared_ptr<RowVectorStream> valueStream_;
RowVectorStream* valueStream_;
};

class RowVectorStreamOperatorTranslator : public facebook::velox::exec::Operator::PlanNodeTranslator {
std::unique_ptr<facebook::velox::exec::Operator>
toOperator(facebook::velox::exec::DriverCtx* ctx, int32_t id, const facebook::velox::core::PlanNodePtr& node) {
std::unique_ptr<facebook::velox::exec::Operator> toOperator(
facebook::velox::exec::DriverCtx* ctx,
int32_t id,
const facebook::velox::core::PlanNodePtr& node) override {
if (auto valueStreamNode = std::dynamic_pointer_cast<const ValueStreamNode>(node)) {
return std::make_unique<ValueStream>(id, ctx, valueStreamNode);
}
Expand Down
9 changes: 8 additions & 1 deletion cpp/velox/substrait/SubstraitToVeloxPlan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "SubstraitToVeloxPlan.h"
#include "TypeUtils.h"
#include "VariantToVectorConverter.h"
#include "operators/plannodes/RowVectorStream.h"
#include "velox/connectors/hive/HiveDataSink.h"
#include "velox/exec/TableWriter.h"
#include "velox/type/Filter.h"
Expand Down Expand Up @@ -1107,7 +1108,13 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::constructValueStreamNode(
}

auto outputType = ROW(std::move(outNames), std::move(veloxTypeList));
auto node = valueStreamNodeFactory_(nextPlanNodeId(), pool_, streamIdx, outputType);
std::shared_ptr<ResultIterator> iterator;
if (!validationMode_) {
VELOX_CHECK_LT(streamIdx, inputIters_.size(), "Could not find stream index {} in input iterator list.", streamIdx);
iterator = inputIters_[streamIdx];
}
auto valueStream = std::make_unique<RowVectorStream>(pool_, iterator, outputType);
auto node = std::make_shared<ValueStreamNode>(nextPlanNodeId(), outputType, std::move(valueStream));

auto splitInfo = std::make_shared<SplitInfo>();
splitInfo->isStream = true;
Expand Down
6 changes: 6 additions & 0 deletions cpp/velox/substrait/SubstraitToVeloxPlan.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ class SubstraitToVeloxPlanConverter {
valueStreamNodeFactory_ = std::move(factory);
}

void setInputIters(std::vector<std::shared_ptr<ResultIterator>> inputIters) {
inputIters_ = std::move(inputIters);
}

/// Used to check if ReadRel specifies an input of stream.
/// If yes, the index of input stream will be returned.
/// If not, -1 will be returned.
Expand Down Expand Up @@ -591,6 +595,8 @@ class SubstraitToVeloxPlanConverter {

std::function<core::PlanNodePtr(std::string, memory::MemoryPool*, int32_t, RowTypePtr)> valueStreamNodeFactory_;

std::vector<std::shared_ptr<ResultIterator>> inputIters_;

/// The map storing the pre-built plan nodes which can be accessed through
/// index. This map is only used when the computation of a Substrait plan
/// depends on other input nodes.
Expand Down
Loading