From 749fb1037e03baa32a62469371050cc8f3a5d33a Mon Sep 17 00:00:00 2001 From: liuneng1994 Date: Fri, 23 Aug 2024 13:35:10 +0800 Subject: [PATCH] support native input_file_name, input_file_block_start and input_file_block_length --- .../GlutenClickhouseFunctionSuite.scala | 22 ++ .../Parser/InputFileNameParser.cpp | 224 ++++++++++++++++++ .../local-engine/Parser/InputFileNameParser.h | 70 ++++++ cpp-ch/local-engine/Parser/LocalExecutor.cpp | 5 - .../Parser/MergeTreeRelParser.cpp | 30 ++- .../Parser/SerializedPlanParser.cpp | 4 +- .../Parser/SparkRowToCHColumn.cpp | 2 - .../local-engine/Shuffle/PartitionWriter.cpp | 4 +- .../Storages/SourceFromJavaIter.cpp | 13 +- .../SubstraitSource/SubstraitFileSource.cpp | 30 ++- .../SubstraitSource/SubstraitFileSource.h | 4 + cpp-ch/local-engine/local_engine_jni.cpp | 2 +- .../columnar/OffloadSingleNode.scala | 2 + 13 files changed, 388 insertions(+), 24 deletions(-) create mode 100644 cpp-ch/local-engine/Parser/InputFileNameParser.cpp create mode 100644 cpp-ch/local-engine/Parser/InputFileNameParser.h diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/compatibility/GlutenClickhouseFunctionSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/compatibility/GlutenClickhouseFunctionSuite.scala index 11d5290c0d0e4..ab9d9b79ea5d4 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/compatibility/GlutenClickhouseFunctionSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/compatibility/GlutenClickhouseFunctionSuite.scala @@ -230,4 +230,26 @@ class GlutenClickhouseFunctionSuite extends GlutenClickHouseTPCHAbstractSuite { ) } } + + test("function_input_file_expr") { + withTable("test_table") { + sql("create table test_table(a int) using parquet") + sql("insert into test_table values(1)") + compareResultsAgainstVanillaSpark( + """ + |select a,input_file_name(), input_file_block_start(), input_file_block_length() from test_table + |""".stripMargin, + true, + { _ => } + ) + compareResultsAgainstVanillaSpark( + """ + |select input_file_name(), input_file_block_start(), input_file_block_length() from test_table + |""".stripMargin, + true, + { _ => } + ) + } + } + } diff --git a/cpp-ch/local-engine/Parser/InputFileNameParser.cpp b/cpp-ch/local-engine/Parser/InputFileNameParser.cpp new file mode 100644 index 0000000000000..c1cb93d9e5942 --- /dev/null +++ b/cpp-ch/local-engine/Parser/InputFileNameParser.cpp @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "InputFileNameParser.h" + +#include +#include + +#include +#include +#include +#include +#include + +namespace local_engine { + +static DB::ITransformingStep::Traits getTraits() +{ + return DB::ITransformingStep::Traits + { + { + .returns_single_stream = false, + .preserves_number_of_streams = true, + .preserves_sorting = true, + }, + { + .preserves_number_of_rows = true, + } + }; +} + +static DB::Block getOutputHeader(const DB::DataStream& input_stream, + const std::optional& file_name, + const std::optional& block_start, + const std::optional& block_length) +{ + DB::Block output_header = input_stream.header; + if (file_name.has_value()) + { + output_header.insert(DB::ColumnWithTypeAndName{std::make_shared(), InputFileNameParser::INPUT_FILE_NAME}); + } + if (block_start.has_value()) + { + output_header.insert(DB::ColumnWithTypeAndName{std::make_shared(), InputFileNameParser::INPUT_FILE_BLOCK_START}); + } + if (block_length.has_value()) + { + output_header.insert(DB::ColumnWithTypeAndName{std::make_shared(), InputFileNameParser::INPUT_FILE_BLOCK_LENGTH}); + } + return output_header; +} + +class InputFileExprProjectTransform : public DB::ISimpleTransform +{ +public: + InputFileExprProjectTransform( + const DB::Block& input_header_, + const DB::Block& output_header_, + const std::optional& file_name, + const std::optional& block_start, + const std::optional& block_length) + : ISimpleTransform(input_header_, output_header_, true) + , file_name(file_name) + , block_start(block_start) + , block_length(block_length) + { + } + + String getName() const override { return "InputFileExprProjectTransform"; } + void transform(DB::Chunk & chunk) override + { + InputFileNameParser::addInputFileColumnsToChunk(output.getHeader(), chunk, file_name, block_start, block_length); + } + +private: + std::optional file_name; + std::optional block_start; + std::optional block_length; +}; + +class InputFileExprProjectStep : public DB::ITransformingStep +{ +public: + InputFileExprProjectStep( + const DB::DataStream& input_stream, + const std::optional& file_name, + const std::optional& block_start, + const std::optional& block_length) + : ITransformingStep(input_stream, getOutputHeader(input_stream, file_name, block_start, block_length), getTraits(), true) + , file_name(file_name) + , block_start(block_start) + , block_length(block_length) + { + } + + String getName() const override { return "InputFileExprProjectStep"; } + void transformPipeline(DB::QueryPipelineBuilder & pipeline, const DB::BuildQueryPipelineSettings & /*settings*/) override + { + pipeline.addSimpleTransform( + [&](const DB::Block & header) + { + return std::make_shared(header, output_stream->header, file_name, block_start, block_length); + }); + } + +protected: + void updateOutputStream() override + { + output_stream = createOutputStream(input_streams.front(), output_stream->header, getDataStreamTraits()); + } + +private: + std::optional file_name; + std::optional block_start; + std::optional block_length; +}; + +bool InputFileNameParser::hasInputFileNameColumn(const DB::Block & block) +{ + auto names = block.getNames(); + return std::find(names.begin(), names.end(), INPUT_FILE_NAME) != names.end(); +} + +bool InputFileNameParser::hasInputFileBlockStartColumn(const DB::Block & block) +{ + auto names = block.getNames(); + return std::find(names.begin(), names.end(), INPUT_FILE_BLOCK_START) != names.end(); +} + +bool InputFileNameParser::hasInputFileBlockLengthColumn(const DB::Block & block) +{ + auto names = block.getNames(); + return std::find(names.begin(), names.end(), INPUT_FILE_BLOCK_LENGTH) != names.end(); +} + +void InputFileNameParser::addInputFileColumnsToChunk( + const DB::Block & header, + DB::Chunk & chunk, + const std::optional & file_name, + const std::optional & block_start, + const std::optional & block_length) +{ + auto output_columns = chunk.getColumns(); + for (size_t i = 0; i < header.columns(); ++i) + { + const auto & column = header.getByPosition(i); + if (column.name == INPUT_FILE_NAME) + { + if (!file_name.has_value()) + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Input file name is not set"); + auto type_string = std::make_shared(); + auto file_name_column = type_string->createColumn(); + file_name_column->insertMany(file_name.value(), chunk.getNumRows()); + output_columns.insert(output_columns.begin() + i, std::move(file_name_column)); + } + else if (column.name == INPUT_FILE_BLOCK_START) + { + if (!block_start.has_value()) + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "block_start is not set"); + auto type_int64 = std::make_shared(); + auto block_start_column = type_int64->createColumn(); + block_start_column->insertMany(block_start.value(), chunk.getNumRows()); + output_columns.insert(output_columns.begin() + i, std::move(block_start_column)); + } + else if (column.name == INPUT_FILE_BLOCK_LENGTH) + { + if (!block_length.has_value()) + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "block_length is not set"); + auto type_int64 = std::make_shared(); + auto block_length_column = type_int64->createColumn(); + block_length_column->insertMany(block_length.value(), chunk.getNumRows()); + output_columns.insert(output_columns.begin() + i, std::move(block_length_column)); + } + } + chunk.setColumns(output_columns, chunk.getNumRows()); +} + +bool InputFileNameParser::containsInputFileColumns(const DB::Block & block) +{ + return hasInputFileNameColumn(block) || hasInputFileBlockStartColumn(block) || hasInputFileBlockLengthColumn(block); +} + +DB::Block InputFileNameParser::removeInputFileColumn(const DB::Block & block) +{ + const auto & columns = block.getColumnsWithTypeAndName(); + DB::ColumnsWithTypeAndName result_columns; + for (const auto & column : columns) { + if (!INPUT_FILE_COLUMNS_SET.contains(column.name)) + { + result_columns.push_back(column); + } + } + return result_columns; +} + +void InputFileNameParser::addInputFileProjectStep(DB::QueryPlan & plan) +{ + if (!file_name.has_value() && !block_start.has_value() && !block_length.has_value()) return; + auto step = std::make_unique(plan.getCurrentDataStream(), file_name, block_start, block_length); + step->setStepDescription("Input file expression project"); + plan.addStep(std::move(step)); +} + +void InputFileNameParser::addInputFileColumnsToChunk(const DB::Block & header, DB::Chunk & chunk) +{ + addInputFileColumnsToChunk(header, chunk, file_name, block_start, block_length); +} + + +} diff --git a/cpp-ch/local-engine/Parser/InputFileNameParser.h b/cpp-ch/local-engine/Parser/InputFileNameParser.h new file mode 100644 index 0000000000000..fb29211e93e22 --- /dev/null +++ b/cpp-ch/local-engine/Parser/InputFileNameParser.h @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once +#include + +namespace DB +{ + class Chunk; +} + +namespace local_engine +{ + class InputFileNameParser + { + public: + static inline const String& INPUT_FILE_NAME = "input_file_name"; + static inline const String& INPUT_FILE_BLOCK_START = "input_file_block_start"; + static inline const String& INPUT_FILE_BLOCK_LENGTH = "input_file_block_length"; + static inline std::unordered_set INPUT_FILE_COLUMNS_SET = { + INPUT_FILE_NAME, INPUT_FILE_BLOCK_START, INPUT_FILE_BLOCK_LENGTH + }; + + static bool hasInputFileNameColumn(const DB::Block& block); + static bool hasInputFileBlockStartColumn(const DB::Block& block); + static bool hasInputFileBlockLengthColumn(const DB::Block& block); + static bool containsInputFileColumns(const DB::Block& block); + static DB::Block removeInputFileColumn(const DB::Block& block); + static void addInputFileColumnsToChunk(const DB::Block& header, DB::Chunk& chunk, + const std::optional& file_name, + const std::optional& block_start, + const std::optional& block_length); + + + void setFileName(const String& file_name) + { + this->file_name = file_name; + } + + void setBlockStart(const Int64 block_start) + { + this->block_start = block_start; + } + + void setBlockLength(const Int64 block_length) + { + this->block_length = block_length; + } + + void addInputFileProjectStep(DB::QueryPlan& plan); + void addInputFileColumnsToChunk(const DB::Block & header, DB::Chunk & chunk); + private: + std::optional file_name; + std::optional block_start; + std::optional block_length; + }; +} // local_engine diff --git a/cpp-ch/local-engine/Parser/LocalExecutor.cpp b/cpp-ch/local-engine/Parser/LocalExecutor.cpp index 16ede643873e4..9bf9fbb841433 100644 --- a/cpp-ch/local-engine/Parser/LocalExecutor.cpp +++ b/cpp-ch/local-engine/Parser/LocalExecutor.cpp @@ -72,10 +72,6 @@ bool LocalExecutor::hasNext() bool LocalExecutor::fallbackMode() { - if (executor.get() || fallback_mode) - std::cerr << fmt::format("executor {} in fallback mode\n", reinterpret_cast(this)); - else - std::cerr << fmt::format("executor {} not in fallback mode\n", reinterpret_cast(this)); return executor.get() || fallback_mode; } @@ -92,7 +88,6 @@ SparkRowInfoPtr LocalExecutor::next() spark_buffer = std::make_unique(); spark_buffer->address = row_info->getBufferAddress(); spark_buffer->size = row_info->getTotalBytes(); - std::cerr << "call next\n"; return row_info; } Block * LocalExecutor::nextColumnar() diff --git a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp index b1b024ce5e6d8..45e39225e6bc3 100644 --- a/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp +++ b/cpp-ch/local-engine/Parser/MergeTreeRelParser.cpp @@ -20,11 +20,10 @@ #include #include -#include -#include #include #include #include +#include #include "MergeTreeRelParser.h" @@ -121,6 +120,8 @@ MergeTreeRelParser::copyToVirtualStorage(MergeTreeTable merge_tree_table, Contex return parseStorage(merge_tree_table, context); } + + DB::QueryPlanPtr MergeTreeRelParser::parseReadRel( DB::QueryPlanPtr query_plan, const substrait::ReadRel & rel, const substrait::ReadRel::ExtensionTable & extension_table) { @@ -143,6 +144,30 @@ DB::QueryPlanPtr MergeTreeRelParser::parseReadRel( &Poco::Logger::get("SerializedPlanParser"), "Try to read ({}) instead of empty header", one_column_name_type.front().dump()); } + InputFileNameParser input_file_name_parser; + if (InputFileNameParser::hasInputFileNameColumn(input)) + { + std::vector parts; + for(const auto & part : merge_tree_table.parts) + { + parts.push_back(merge_tree_table.absolute_path + "/" + part.name); + } + auto name = Poco::cat(",", parts.begin(), parts.end()); + input_file_name_parser.setFileName(name); + } + if (InputFileNameParser::hasInputFileBlockStartColumn(input)) + { + // mergetree doesn't support block start + input_file_name_parser.setBlockStart(0); + } + if (InputFileNameParser::hasInputFileBlockLengthColumn(input)) + { + // mergetree doesn't support block length + input_file_name_parser.setBlockLength(0); + } + + input = InputFileNameParser::removeInputFileColumn(input); + for (const auto & [name, sizes] : storage->getColumnSizes()) column_sizes[name] = sizes.data_compressed; auto storage_snapshot = std::make_shared(*storage, storage->getInMemoryMetadataPtr()); @@ -196,6 +221,7 @@ DB::QueryPlanPtr MergeTreeRelParser::parseReadRel( if (remove_null_step) steps.emplace_back(remove_null_step); } + input_file_name_parser.addInputFileProjectStep(*query_plan); return query_plan; } diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp index 52f36bd48a5ac..3c2984e069f4d 100644 --- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp +++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp @@ -84,7 +84,7 @@ #include #include #include - +#include namespace DB { @@ -360,7 +360,7 @@ void adjustOutput(const DB::QueryPlanPtr & query_plan, const substrait::PlanRel NamesWithAliases aliases; auto cols = query_plan->getCurrentDataStream().header.getNamesAndTypesList(); if (cols.getNames().size() != static_cast(root_rel.root().names_size())) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Missmatch result columns size."); + throw Exception(ErrorCodes::LOGICAL_ERROR, "Missmatch result columns size. plan column size {}, subtrait plan size {}.", cols.getNames().size(), root_rel.root().names_size()); for (int i = 0; i < static_cast(cols.getNames().size()); i++) aliases.emplace_back(NameWithAlias(cols.getNames()[i], root_rel.root().names(i))); actions_dag.project(aliases); diff --git a/cpp-ch/local-engine/Parser/SparkRowToCHColumn.cpp b/cpp-ch/local-engine/Parser/SparkRowToCHColumn.cpp index 3dd7dc8e39a44..a4edc029e6073 100644 --- a/cpp-ch/local-engine/Parser/SparkRowToCHColumn.cpp +++ b/cpp-ch/local-engine/Parser/SparkRowToCHColumn.cpp @@ -125,8 +125,6 @@ Block * SparkRowToCHColumn::getBlock(SparkRowToCHColumnHelper & helper) ColumnWithTypeAndName named_col(col, uint8_ty, "__anonymous_col__"); block->insert(named_col); } - std::cout << "spark row to ch column:\n"; - debug::headBlock(*block); return block; } diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp index ca6a66040a12e..73842396ee90e 100644 --- a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp +++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp @@ -384,7 +384,7 @@ size_t MemorySortCelebornPartitionWriter::evictPartitions() celeborn_client->pushPartitionData(cur_partition_id, data.data(), data.size()); split_result->total_io_time += push_time_watch.elapsedNanoseconds(); split_result->partition_lengths[cur_partition_id] += data.size(); - shuffle_writer->split_result.total_bytes_written += data.size(); + split_result->total_bytes_written += data.size(); } output.restart(); }; @@ -498,7 +498,7 @@ size_t CelebornPartitionWriter::evictSinglePartition(size_t partition_id) split_result->total_write_time += push_time_watch.elapsedNanoseconds(); split_result->total_io_time += push_time_watch.elapsedNanoseconds(); split_result->total_serialize_time += serialization_time_watch.elapsedNanoseconds(); - shuffle_writer->split_result.total_bytes_written += written_bytes; + split_result->total_bytes_written += written_bytes; }; Stopwatch spill_time_watch; diff --git a/cpp-ch/local-engine/Storages/SourceFromJavaIter.cpp b/cpp-ch/local-engine/Storages/SourceFromJavaIter.cpp index ea254d2cf0284..254ee4f79186a 100644 --- a/cpp-ch/local-engine/Storages/SourceFromJavaIter.cpp +++ b/cpp-ch/local-engine/Storages/SourceFromJavaIter.cpp @@ -21,6 +21,7 @@ #include #include #include +#include namespace DB { @@ -40,10 +41,8 @@ static DB::Block getRealHeader(const DB::Block & header, const std::optional SourceFromJavaIter::peekBlock(JNIEnv * env, jobject jav jbyteArray block_addr = static_cast(safeCallObjectMethod(env, java_iter, serialized_record_batch_iterator_next)); auto * block = reinterpret_cast(byteArrayToLong(env, block_addr)); - if (block) - return DB::Block(*block); - return std::nullopt; + if (block->columns()) + return std::optional(DB::Block(block->getColumnsWithTypeAndName())); + else + return std::nullopt; + } SourceFromJavaIter::SourceFromJavaIter( DB::ContextPtr context_, const DB::Block& header, jobject java_iter_, bool materialize_input_, std::optional && first_block_) - : DB::ISource(getRealHeader(header, first_block)) + : DB::ISource(getRealHeader(header, first_block_)) , context(context_) , original_header(header) , java_iter(java_iter_) diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp index d8f0ee0e35527..ffe1d18ae7851 100644 --- a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp +++ b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.cpp @@ -53,12 +53,28 @@ namespace local_engine // build blocks with a const virtual column to indicate how many rows is in it. static DB::Block getRealHeader(const DB::Block & header) { - return header ? header : BlockUtil::buildRowCountHeader(); + auto header_without_input_file_columns = InputFileNameParser::removeInputFileColumn(header); + auto result_header = header; + if (!header_without_input_file_columns.columns()) + { + auto virtual_header = BlockUtil::buildRowCountHeader(); + for (const auto & column_with_type_and_name : virtual_header.getColumnsWithTypeAndName()) + { + result_header.insert(column_with_type_and_name); + } + } + return result_header; } SubstraitFileSource::SubstraitFileSource( - const DB::ContextPtr & context_, const DB::Block & header_, const substrait::ReadRel::LocalFiles & file_infos) - : DB::SourceWithKeyCondition(getRealHeader(header_), false), context(context_), output_header(header_), to_read_header(output_header) + const DB::ContextPtr & context_, + const DB::Block & header_, + const substrait::ReadRel::LocalFiles & file_infos) + : DB::SourceWithKeyCondition(getRealHeader(header_), false) + , context(context_) + , output_header(InputFileNameParser::removeInputFileColumn(header_)) + , to_read_header(output_header) + , input_file_name(InputFileNameParser::containsInputFileColumns(header_)) { if (file_infos.items_size()) { @@ -95,7 +111,11 @@ DB::Chunk SubstraitFileSource::generate() DB::Chunk chunk; if (file_reader->pull(chunk)) + { + if (input_file_name) + input_file_name_parser.addInputFileColumnsToChunk(output.getHeader(), chunk); return chunk; + } /// try to read from next file file_reader.reset(); @@ -138,7 +158,9 @@ bool SubstraitFileSource::tryPrepareReader() } else file_reader = std::make_unique(current_file, context, to_read_header, output_header); - + input_file_name_parser.setFileName(current_file->getURIPath()); + input_file_name_parser.setBlockStart(current_file->getStartOffset()); + input_file_name_parser.setBlockLength(current_file->getLength()); file_reader->applyKeyCondition(key_condition, column_index_filter); return true; } diff --git a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.h b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.h index 113538a929224..d436a30d73b27 100644 --- a/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.h +++ b/cpp-ch/local-engine/Storages/SubstraitSource/SubstraitFileSource.h @@ -25,6 +25,8 @@ #include #include #include +#include + namespace local_engine { class FileReaderWrapper @@ -137,6 +139,8 @@ class SubstraitFileSource : public DB::SourceWithKeyCondition DB::Block output_header; /// Sample header may contains partitions keys DB::Block to_read_header; // Sample header not include partition keys FormatFiles files; + bool input_file_name = false; + InputFileNameParser input_file_name_parser; UInt32 current_file_index = 0; std::unique_ptr file_reader; diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp index 2a2e838c14ac4..2b1e08765bace 100644 --- a/cpp-ch/local-engine/local_engine_jni.cpp +++ b/cpp-ch/local-engine/local_engine_jni.cpp @@ -56,7 +56,7 @@ #include #include #include - +#include #ifdef __cplusplus namespace DB diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala index 70b85165c37b4..cba417f15a475 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/OffloadSingleNode.scala @@ -332,6 +332,8 @@ case class OffloadProject() extends OffloadSingleNode with LogLevelUtil { expr => rewriteExpr(expr, replacedExprs).asInstanceOf[NamedExpression] } val newChild = addMetadataCol(projectExec.child, replacedExprs) + // tag child again, addMetadataCol lost the tag. + addHint.apply(newChild) logDebug( s"Columnar Processing for ${projectExec.getClass} with " + s"ProjectList ${projectExec.projectList} is currently supported.")