Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
support native input_file_name, input_file_block_start and input_file…
Browse files Browse the repository at this point in the history
…_block_length
liuneng1994 committed Aug 28, 2024
1 parent 7a75cf6 commit d7e05af
Showing 15 changed files with 397 additions and 31 deletions.
Original file line number Diff line number Diff line change
@@ -355,7 +355,7 @@ class GlutenClickHouseDeltaParquetWriteSuite

val fileIndex = parquetScan.relation.location.asInstanceOf[TahoeFileIndex]
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddFile])
assert(addFiles.size == 4)
assert(addFiles.size == 6)
}

val sql2 =
@@ -420,7 +420,7 @@ class GlutenClickHouseDeltaParquetWriteSuite
val parquetScan = scanExec.head
val fileIndex = parquetScan.relation.location.asInstanceOf[TahoeFileIndex]
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddFile])
assert(addFiles.size == 4)
assert(addFiles.size == 6)
}

{
@@ -985,7 +985,7 @@ class GlutenClickHouseDeltaParquetWriteSuite

val fileIndex = parquetScan.relation.location.asInstanceOf[TahoeFileIndex]
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddFile])
assert(addFiles.size == 4)
assert(addFiles.size == 6)
}

val clickhouseTable = DeltaTable.forPath(spark, dataPath)
@@ -1007,7 +1007,7 @@ class GlutenClickHouseDeltaParquetWriteSuite

val fileIndex = parquetScan.relation.location.asInstanceOf[TahoeFileIndex]
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddFile])
assert(addFiles.size == 3)
assert(addFiles.size == 6)
}

val df = spark.read
@@ -1042,7 +1042,7 @@ class GlutenClickHouseDeltaParquetWriteSuite
val parquetScan = scanExec.head
val fileIndex = parquetScan.relation.location.asInstanceOf[TahoeFileIndex]
val addFiles = fileIndex.matchingFiles(Nil, Nil).map(f => f.asInstanceOf[AddFile])
assert(addFiles.size == 4)
assert(addFiles.size == 6)

val clickhouseTable = DeltaTable.forPath(spark, dataPath)
clickhouseTable.delete("mod(l_orderkey, 3) = 2")
Original file line number Diff line number Diff line change
@@ -230,4 +230,28 @@ class GlutenClickhouseFunctionSuite extends GlutenClickHouseTPCHAbstractSuite {
)
}
}

test("function_input_file_expr") {
withTable("test_table") {
sql("create table test_table(a int) using parquet")
sql("insert into test_table values(1)")
compareResultsAgainstVanillaSpark(
"""
|select a,input_file_name(), input_file_block_start(),
|input_file_block_length() from test_table
|""".stripMargin,
true,
{ _ => }
)
compareResultsAgainstVanillaSpark(
"""
|select input_file_name(), input_file_block_start(),
|input_file_block_length() from test_table
|""".stripMargin,
true,
{ _ => }
)
}
}

}
Original file line number Diff line number Diff line change
@@ -103,8 +103,8 @@ class GlutenClickHouseTPCHMetricsSuite extends GlutenClickHouseTPCHAbstractSuite
case scanExec: BasicScanExecTransformer => scanExec
}
assert(plans.size == 1)
// 1 block keep in SubstraitFileStep, and 5 blocks keep in other steps
assert(plans.head.metrics("numOutputRows").value === 6 * parquetMaxBlockSize)
// the value is different from multiple versions of spark
assert(plans.head.metrics("numOutputRows").value % parquetMaxBlockSize == 0)
assert(plans.head.metrics("outputVectors").value === 1)
assert(plans.head.metrics("outputBytes").value > 0)
}
224 changes: 224 additions & 0 deletions cpp-ch/local-engine/Parser/InputFileNameParser.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "InputFileNameParser.h"

#include <iostream>
#include <ranges>

#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <Processors/ISimpleTransform.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <QueryPipeline/QueryPipelineBuilder.h>

namespace local_engine {

static DB::ITransformingStep::Traits getTraits()
{
return DB::ITransformingStep::Traits
{
{
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = true,
},
{
.preserves_number_of_rows = true,
}
};
}

static DB::Block getOutputHeader(const DB::DataStream& input_stream,
const std::optional<String>& file_name,
const std::optional<Int64>& block_start,
const std::optional<Int64>& block_length)
{
DB::Block output_header = input_stream.header;
if (file_name.has_value())
{
output_header.insert(DB::ColumnWithTypeAndName{std::make_shared<DB::DataTypeString>(), InputFileNameParser::INPUT_FILE_NAME});
}
if (block_start.has_value())
{
output_header.insert(DB::ColumnWithTypeAndName{std::make_shared<DB::DataTypeInt64>(), InputFileNameParser::INPUT_FILE_BLOCK_START});
}
if (block_length.has_value())
{
output_header.insert(DB::ColumnWithTypeAndName{std::make_shared<DB::DataTypeInt64>(), InputFileNameParser::INPUT_FILE_BLOCK_LENGTH});
}
return output_header;
}

class InputFileExprProjectTransform : public DB::ISimpleTransform
{
public:
InputFileExprProjectTransform(
const DB::Block& input_header_,
const DB::Block& output_header_,
const std::optional<String>& file_name,
const std::optional<Int64>& block_start,
const std::optional<Int64>& block_length)
: ISimpleTransform(input_header_, output_header_, true)
, file_name(file_name)
, block_start(block_start)
, block_length(block_length)
{
}

String getName() const override { return "InputFileExprProjectTransform"; }
void transform(DB::Chunk & chunk) override
{
InputFileNameParser::addInputFileColumnsToChunk(output.getHeader(), chunk, file_name, block_start, block_length);
}

private:
std::optional<String> file_name;
std::optional<Int64> block_start;
std::optional<Int64> block_length;
};

class InputFileExprProjectStep : public DB::ITransformingStep
{
public:
InputFileExprProjectStep(
const DB::DataStream& input_stream,
const std::optional<String>& file_name,
const std::optional<Int64>& block_start,
const std::optional<Int64>& block_length)
: ITransformingStep(input_stream, getOutputHeader(input_stream, file_name, block_start, block_length), getTraits(), true)
, file_name(file_name)
, block_start(block_start)
, block_length(block_length)
{
}

String getName() const override { return "InputFileExprProjectStep"; }
void transformPipeline(DB::QueryPipelineBuilder & pipeline, const DB::BuildQueryPipelineSettings & /*settings*/) override
{
pipeline.addSimpleTransform(
[&](const DB::Block & header)
{
return std::make_shared<InputFileExprProjectTransform>(header, output_stream->header, file_name, block_start, block_length);
});
}

protected:
void updateOutputStream() override
{
output_stream = createOutputStream(input_streams.front(), output_stream->header, getDataStreamTraits());
}

private:
std::optional<String> file_name;
std::optional<Int64> block_start;
std::optional<Int64> block_length;
};

bool InputFileNameParser::hasInputFileNameColumn(const DB::Block & block)
{
auto names = block.getNames();
return std::find(names.begin(), names.end(), INPUT_FILE_NAME) != names.end();
}

bool InputFileNameParser::hasInputFileBlockStartColumn(const DB::Block & block)
{
auto names = block.getNames();
return std::find(names.begin(), names.end(), INPUT_FILE_BLOCK_START) != names.end();
}

bool InputFileNameParser::hasInputFileBlockLengthColumn(const DB::Block & block)
{
auto names = block.getNames();
return std::find(names.begin(), names.end(), INPUT_FILE_BLOCK_LENGTH) != names.end();
}

void InputFileNameParser::addInputFileColumnsToChunk(
const DB::Block & header,
DB::Chunk & chunk,
const std::optional<String> & file_name,
const std::optional<Int64> & block_start,
const std::optional<Int64> & block_length)
{
auto output_columns = chunk.getColumns();
for (size_t i = 0; i < header.columns(); ++i)
{
const auto & column = header.getByPosition(i);
if (column.name == INPUT_FILE_NAME)
{
if (!file_name.has_value())
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Input file name is not set");
auto type_string = std::make_shared<DB::DataTypeString>();
auto file_name_column = type_string->createColumn();
file_name_column->insertMany(file_name.value(), chunk.getNumRows());
output_columns.insert(output_columns.begin() + i, std::move(file_name_column));
}
else if (column.name == INPUT_FILE_BLOCK_START)
{
if (!block_start.has_value())
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "block_start is not set");
auto type_int64 = std::make_shared<DB::DataTypeInt64>();
auto block_start_column = type_int64->createColumn();
block_start_column->insertMany(block_start.value(), chunk.getNumRows());
output_columns.insert(output_columns.begin() + i, std::move(block_start_column));
}
else if (column.name == INPUT_FILE_BLOCK_LENGTH)
{
if (!block_length.has_value())
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "block_length is not set");
auto type_int64 = std::make_shared<DB::DataTypeInt64>();
auto block_length_column = type_int64->createColumn();
block_length_column->insertMany(block_length.value(), chunk.getNumRows());
output_columns.insert(output_columns.begin() + i, std::move(block_length_column));
}
}
chunk.setColumns(output_columns, chunk.getNumRows());
}

bool InputFileNameParser::containsInputFileColumns(const DB::Block & block)
{
return hasInputFileNameColumn(block) || hasInputFileBlockStartColumn(block) || hasInputFileBlockLengthColumn(block);
}

DB::Block InputFileNameParser::removeInputFileColumn(const DB::Block & block)
{
const auto & columns = block.getColumnsWithTypeAndName();
DB::ColumnsWithTypeAndName result_columns;
for (const auto & column : columns) {
if (!INPUT_FILE_COLUMNS_SET.contains(column.name))
{
result_columns.push_back(column);
}
}
return result_columns;
}

void InputFileNameParser::addInputFileProjectStep(DB::QueryPlan & plan)
{
if (!file_name.has_value() && !block_start.has_value() && !block_length.has_value()) return;
auto step = std::make_unique<InputFileExprProjectStep>(plan.getCurrentDataStream(), file_name, block_start, block_length);
step->setStepDescription("Input file expression project");
plan.addStep(std::move(step));
}

void InputFileNameParser::addInputFileColumnsToChunk(const DB::Block & header, DB::Chunk & chunk)
{
addInputFileColumnsToChunk(header, chunk, file_name, block_start, block_length);
}


}
70 changes: 70 additions & 0 deletions cpp-ch/local-engine/Parser/InputFileNameParser.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once
#include <Processors/QueryPlan/ExpressionStep.h>

namespace DB
{
class Chunk;
}

namespace local_engine
{
class InputFileNameParser
{
public:
static inline const String& INPUT_FILE_NAME = "input_file_name";
static inline const String& INPUT_FILE_BLOCK_START = "input_file_block_start";
static inline const String& INPUT_FILE_BLOCK_LENGTH = "input_file_block_length";
static inline std::unordered_set INPUT_FILE_COLUMNS_SET = {
INPUT_FILE_NAME, INPUT_FILE_BLOCK_START, INPUT_FILE_BLOCK_LENGTH
};

static bool hasInputFileNameColumn(const DB::Block& block);
static bool hasInputFileBlockStartColumn(const DB::Block& block);
static bool hasInputFileBlockLengthColumn(const DB::Block& block);
static bool containsInputFileColumns(const DB::Block& block);
static DB::Block removeInputFileColumn(const DB::Block& block);
static void addInputFileColumnsToChunk(const DB::Block& header, DB::Chunk& chunk,
const std::optional<String>& file_name,
const std::optional<Int64>& block_start,
const std::optional<Int64>& block_length);


void setFileName(const String& file_name)
{
this->file_name = file_name;
}

void setBlockStart(const Int64 block_start)
{
this->block_start = block_start;
}

void setBlockLength(const Int64 block_length)
{
this->block_length = block_length;
}

void addInputFileProjectStep(DB::QueryPlan& plan);
void addInputFileColumnsToChunk(const DB::Block & header, DB::Chunk & chunk);
private:
std::optional<String> file_name;
std::optional<Int64> block_start;
std::optional<Int64> block_length;
};
} // local_engine
5 changes: 0 additions & 5 deletions cpp-ch/local-engine/Parser/LocalExecutor.cpp
Original file line number Diff line number Diff line change
@@ -72,10 +72,6 @@ bool LocalExecutor::hasNext()

bool LocalExecutor::fallbackMode()
{
if (executor.get() || fallback_mode)
std::cerr << fmt::format("executor {} in fallback mode\n", reinterpret_cast<long>(this));
else
std::cerr << fmt::format("executor {} not in fallback mode\n", reinterpret_cast<long>(this));
return executor.get() || fallback_mode;
}

@@ -92,7 +88,6 @@ SparkRowInfoPtr LocalExecutor::next()
spark_buffer = std::make_unique<SparkBuffer>();
spark_buffer->address = row_info->getBufferAddress();
spark_buffer->size = row_info->getTotalBytes();
std::cerr << "call next\n";
return row_info;
}
Block * LocalExecutor::nextColumnar()
Loading

0 comments on commit d7e05af

Please sign in to comment.