diff --git a/backends-velox/src/test/scala/io/glutenproject/execution/TestOperator.scala b/backends-velox/src/test/scala/io/glutenproject/execution/TestOperator.scala index ac38d961bbc1..54a78f9eb228 100644 --- a/backends-velox/src/test/scala/io/glutenproject/execution/TestOperator.scala +++ b/backends-velox/src/test/scala/io/glutenproject/execution/TestOperator.scala @@ -629,14 +629,20 @@ class TestOperator extends VeloxWholeStageTransformerSuite with AdaptiveSparkPla } test("Support get native plan tree string, Velox single aggregation") { - runQueryAndCompare("select l_partkey + 1, count(*) from lineitem group by l_partkey + 1") { + runQueryAndCompare(""" + |select l_partkey + 1, count(*) + |from (select /*+ repartition(2) */ * from lineitem) group by l_partkey + 1 + |""".stripMargin) { df => val wholeStageTransformers = collect(df.queryExecution.executedPlan) { case w: WholeStageTransformer => w } + assert(wholeStageTransformers.size == 3) val nativePlanString = wholeStageTransformers.head.nativePlanString() assert(nativePlanString.contains("Aggregation[SINGLE")) - assert(nativePlanString.contains("TableScan")) + assert(nativePlanString.contains("ValueStream")) + assert(wholeStageTransformers(1).nativePlanString().contains("ValueStream")) + assert(wholeStageTransformers.last.nativePlanString().contains("TableScan")) } } diff --git a/cpp/velox/compute/VeloxPlanConverter.cc b/cpp/velox/compute/VeloxPlanConverter.cc index aaa59958cf68..cffbd986ecdd 100644 --- a/cpp/velox/compute/VeloxPlanConverter.cc +++ b/cpp/velox/compute/VeloxPlanConverter.cc @@ -37,11 +37,15 @@ VeloxPlanConverter::VeloxPlanConverter( substraitVeloxPlanConverter_(veloxPool, confMap, writeFilesTempPath, validationMode), pool_(veloxPool) { // avoid include RowVectorStream.h in SubstraitToVeloxPlan.cpp, it may cause redefinition of array abi.h. - auto factory = [inputIters = std::move(inputIters)]( + auto factory = [inputIters = std::move(inputIters), validationMode = validationMode]( std::string nodeId, memory::MemoryPool* pool, int32_t streamIdx, RowTypePtr outputType) { - VELOX_CHECK_LT(streamIdx, inputIters.size(), "Could not find stream index {} in input iterator list.", streamIdx); - auto vectorStream = std::make_shared(pool, inputIters[streamIdx], outputType); - return std::make_shared(nodeId, outputType, std::move(vectorStream)); + std::shared_ptr iterator; + if (!validationMode) { + VELOX_CHECK_LT(streamIdx, inputIters.size(), "Could not find stream index {} in input iterator list.", streamIdx); + iterator = inputIters[streamIdx]; + } + auto valueStream = std::make_shared(pool, iterator, outputType); + return std::make_shared(nodeId, outputType, std::move(valueStream)); }; substraitVeloxPlanConverter_.setValueStreamNodeFactory(std::move(factory)); } diff --git a/cpp/velox/substrait/SubstraitToVeloxPlan.cc b/cpp/velox/substrait/SubstraitToVeloxPlan.cc index bdc71350adc5..5ac58b2e5447 100644 --- a/cpp/velox/substrait/SubstraitToVeloxPlan.cc +++ b/cpp/velox/substrait/SubstraitToVeloxPlan.cc @@ -1272,9 +1272,6 @@ std::string SubstraitToVeloxPlanConverter::findFuncSpec(uint64_t id) { } int32_t SubstraitToVeloxPlanConverter::getStreamIndex(const ::substrait::ReadRel& sRead) { - if (validationMode_) { - return -1; - } if (sRead.has_local_files()) { const auto& fileList = sRead.local_files().items(); if (fileList.size() == 0) {