Skip to content

Commit

Permalink
refine valuestream node
Browse files Browse the repository at this point in the history
  • Loading branch information
Yohahaha committed Jul 9, 2024
1 parent e58bef3 commit 61afa31
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 23 deletions.
13 changes: 1 addition & 12 deletions cpp/velox/compute/VeloxPlanConverter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,7 @@ VeloxPlanConverter::VeloxPlanConverter(
bool validationMode)
: validationMode_(validationMode),
substraitVeloxPlanConverter_(veloxPool, confMap, writeFilesTempPath, validationMode) {
// avoid include RowVectorStream.h in SubstraitToVeloxPlan.cpp, it may cause redefinition of array abi.h.
auto factory = [inputIters = std::move(inputIters), validationMode = validationMode](
std::string nodeId, memory::MemoryPool* pool, int32_t streamIdx, RowTypePtr outputType) {
std::shared_ptr<ResultIterator> iterator;
if (!validationMode) {
VELOX_CHECK_LT(streamIdx, inputIters.size(), "Could not find stream index {} in input iterator list.", streamIdx);
iterator = inputIters[streamIdx];
}
auto valueStream = std::make_shared<RowVectorStream>(pool, iterator, outputType);
return std::make_shared<ValueStreamNode>(nodeId, outputType, std::move(valueStream));
};
substraitVeloxPlanConverter_.setValueStreamNodeFactory(std::move(factory));
substraitVeloxPlanConverter_.setInputIters(std::move(inputIters));
}

namespace {
Expand Down
32 changes: 22 additions & 10 deletions cpp/velox/operators/plannodes/RowVectorStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,17 @@ class RowVectorStream {
: iterator_(std::move(iterator)), outputType_(outputType), pool_(pool) {}

bool hasNext() {
return iterator_->hasNext();
if (!finished_) {
finished_ = !iterator_->hasNext();
}
return !finished_;
}

// Convert arrow batch to rowvector and use new output columns
facebook::velox::RowVectorPtr next() {
if (finished_) {
return nullptr;
}
const std::shared_ptr<VeloxColumnarBatch>& vb = VeloxColumnarBatch::from(pool_, iterator_->next());
auto vp = vb->getRowVector();
VELOX_DCHECK(vp != nullptr);
Expand All @@ -45,17 +51,18 @@ class RowVectorStream {
}

private:
bool finished_{false};
std::shared_ptr<ResultIterator> iterator_;
const facebook::velox::RowTypePtr outputType_;
facebook::velox::memory::MemoryPool* pool_;
};

class ValueStreamNode : public facebook::velox::core::PlanNode {
class ValueStreamNode final : public facebook::velox::core::PlanNode {
public:
ValueStreamNode(
const facebook::velox::core::PlanNodeId& id,
const facebook::velox::RowTypePtr& outputType,
std::shared_ptr<RowVectorStream> valueStream)
std::unique_ptr<RowVectorStream> valueStream)
: facebook::velox::core::PlanNode(id), outputType_(outputType), valueStream_(std::move(valueStream)) {
VELOX_CHECK_NOT_NULL(valueStream_);
}
Expand All @@ -68,8 +75,8 @@ class ValueStreamNode : public facebook::velox::core::PlanNode {
return kEmptySources;
};

const std::shared_ptr<RowVectorStream>& rowVectorStream() const {
return valueStream_;
RowVectorStream* rowVectorStream() const {
return valueStream_.get();
}

std::string_view name() const override {
Expand All @@ -84,7 +91,7 @@ class ValueStreamNode : public facebook::velox::core::PlanNode {
void addDetails(std::stringstream& stream) const override{};

const facebook::velox::RowTypePtr outputType_;
std::shared_ptr<RowVectorStream> valueStream_;
std::unique_ptr<RowVectorStream> valueStream_;
const std::vector<facebook::velox::core::PlanNodePtr> kEmptySources;
};

Expand All @@ -99,11 +106,14 @@ class ValueStream : public facebook::velox::exec::SourceOperator {
valueStreamNode->outputType(),
operatorId,
valueStreamNode->id(),
"ValueStream") {
valueStreamNode->name().data()) {
valueStream_ = valueStreamNode->rowVectorStream();
}

facebook::velox::RowVectorPtr getOutput() override {
if (finished_) {
return nullptr;
}
if (valueStream_->hasNext()) {
return valueStream_->next();
} else {
Expand All @@ -122,12 +132,14 @@ class ValueStream : public facebook::velox::exec::SourceOperator {

private:
bool finished_ = false;
std::shared_ptr<RowVectorStream> valueStream_;
RowVectorStream* valueStream_;
};

class RowVectorStreamOperatorTranslator : public facebook::velox::exec::Operator::PlanNodeTranslator {
std::unique_ptr<facebook::velox::exec::Operator>
toOperator(facebook::velox::exec::DriverCtx* ctx, int32_t id, const facebook::velox::core::PlanNodePtr& node) {
std::unique_ptr<facebook::velox::exec::Operator> toOperator(
facebook::velox::exec::DriverCtx* ctx,
int32_t id,
const facebook::velox::core::PlanNodePtr& node) override {
if (auto valueStreamNode = std::dynamic_pointer_cast<const ValueStreamNode>(node)) {
return std::make_unique<ValueStream>(id, ctx, valueStreamNode);
}
Expand Down
9 changes: 8 additions & 1 deletion cpp/velox/substrait/SubstraitToVeloxPlan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "SubstraitToVeloxPlan.h"
#include "TypeUtils.h"
#include "VariantToVectorConverter.h"
#include "operators/plannodes/RowVectorStream.h"
#include "velox/connectors/hive/HiveDataSink.h"
#include "velox/exec/TableWriter.h"
#include "velox/type/Filter.h"
Expand Down Expand Up @@ -1107,7 +1108,13 @@ core::PlanNodePtr SubstraitToVeloxPlanConverter::constructValueStreamNode(
}

auto outputType = ROW(std::move(outNames), std::move(veloxTypeList));
auto node = valueStreamNodeFactory_(nextPlanNodeId(), pool_, streamIdx, outputType);
std::shared_ptr<ResultIterator> iterator;
if (!validationMode_) {
VELOX_CHECK_LT(streamIdx, inputIters_.size(), "Could not find stream index {} in input iterator list.", streamIdx);
iterator = inputIters_[streamIdx];
}
auto valueStream = std::make_unique<RowVectorStream>(pool_, iterator, outputType);
auto node = std::make_shared<ValueStreamNode>(nextPlanNodeId(), outputType, std::move(valueStream));

auto splitInfo = std::make_shared<SplitInfo>();
splitInfo->isStream = true;
Expand Down
6 changes: 6 additions & 0 deletions cpp/velox/substrait/SubstraitToVeloxPlan.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,10 @@ class SubstraitToVeloxPlanConverter {
valueStreamNodeFactory_ = std::move(factory);
}

void setInputIters(std::vector<std::shared_ptr<ResultIterator>> inputIters) {
inputIters_ = std::move(inputIters);
}

/// Used to check if ReadRel specifies an input of stream.
/// If yes, the index of input stream will be returned.
/// If not, -1 will be returned.
Expand Down Expand Up @@ -591,6 +595,8 @@ class SubstraitToVeloxPlanConverter {

std::function<core::PlanNodePtr(std::string, memory::MemoryPool*, int32_t, RowTypePtr)> valueStreamNodeFactory_;

std::vector<std::shared_ptr<ResultIterator>> inputIters_;

/// The map storing the pre-built plan nodes which can be accessed through
/// index. This map is only used when the computation of a Substrait plan
/// depends on other input nodes.
Expand Down

0 comments on commit 61afa31

Please sign in to comment.