Skip to content

Commit

Permalink
Fix CH BUG due to ClickHouse/ClickHouse#68135
Browse files Browse the repository at this point in the history
  • Loading branch information
baibaichen committed Aug 15, 2024
1 parent 8897a44 commit 27f7e31
Show file tree
Hide file tree
Showing 6 changed files with 188 additions and 24 deletions.
3 changes: 1 addition & 2 deletions cpp-ch/clickhouse.version
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
CH_ORG=Kyligence
CH_BRANCH=rebase_ch/20240815
CH_COMMIT=7bc18bdaab1

CH_COMMIT=d87dbba64fc
23 changes: 1 addition & 22 deletions cpp-ch/local-engine/Parser/CHColumnToSparkRow.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -687,7 +687,6 @@ int64_t VariableLengthDataWriter::writeArray(size_t row_idx, const DB::Array & a
bitSet(buffer_address + offset + start + 8, i);
else
{
// We can not use safeGet<char>() trick, since it will throw an exception if type mismatch
writer.write(elem, buffer_address + offset + start + 8 + len_null_bitmap + i * elem_size);
}
}
Expand Down Expand Up @@ -790,27 +789,7 @@ int64_t VariableLengthDataWriter::writeStruct(size_t row_idx, const DB::Tuple &
if (BackingDataLengthCalculator::isFixedLengthDataType(removeNullable(field_type)))
{
FixedLengthDataWriter writer(field_type);
if (writer.getWhichDataType().isFloat32())
{
// We can not use safeGet<char>() directly here to process Float32 field,
// because it will get 8 byte data, but Float32 is 4 byte, which will cause error conversion.
auto v = static_cast<Float32>(field_value.safeGet<Float32>());
writer.unsafeWrite(reinterpret_cast<const char *>(&v), buffer_address + offset + start + len_null_bitmap + i * 8);
}
else if (writer.getWhichDataType().isFloat64())
{
// Fix 'Invalid Field get from type Float64 to type Int64' in debug build.
auto v = field_value.safeGet<Float64>();
writer.unsafeWrite(reinterpret_cast<const char *>(&v), buffer_address + offset + start + len_null_bitmap + i * 8);
}
else if (writer.getWhichDataType().isDecimal64() || writer.getWhichDataType().isDateTime64())
{
auto v = field_value.safeGet<Decimal64>();
writer.unsafeWrite(reinterpret_cast<const char *>(&v), buffer_address + offset + start + len_null_bitmap + i * 8);
}
else
writer.unsafeWrite(
reinterpret_cast<const char *>(&field_value.safeGet<char>()), buffer_address + offset + start + len_null_bitmap + i * 8);
writer.write(field_value, buffer_address + offset + start + len_null_bitmap + i * 8);
}
else
{
Expand Down
6 changes: 6 additions & 0 deletions cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,10 @@ QueryPlanStepPtr SerializedPlanParser::parseReadRealWithLocalFile(const substrai
if (rel.has_local_files())
local_files = rel.local_files();
else
{
local_files = BinaryToMessage<substrait::ReadRel::LocalFiles>(split_infos.at(nextSplitInfoIndex()));
logDebugMessage(local_files, "local_files");
}
auto source = std::make_shared<SubstraitFileSource>(context, header, local_files);
auto source_pipe = Pipe(source);
auto source_step = std::make_unique<SubstraitFileSourceStep>(context, std::move(source_pipe), "substrait local files");
Expand Down Expand Up @@ -496,7 +499,10 @@ QueryPlanPtr SerializedPlanParser::parseOp(const substrait::Rel & rel, std::list
if (read.has_extension_table())
extension_table = read.extension_table();
else
{
extension_table = BinaryToMessage<substrait::ReadRel::ExtensionTable>(split_infos.at(nextSplitInfoIndex()));
logDebugMessage(extension_table, "extension_table");
}

MergeTreeRelParser mergeTreeParser(this, context);
query_plan = mergeTreeParser.parseReadRel(std::make_unique<QueryPlan>(), read, extension_table);
Expand Down
Binary file added cpp-ch/local-engine/tests/data/68135.snappy.parquet
Binary file not shown.
20 changes: 20 additions & 0 deletions cpp-ch/local-engine/tests/gtest_clickhouse_pr_verify.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,4 +96,24 @@ TEST(Clickhouse, PR65234)
const auto plan = local_engine::JsonStringToMessage<substrait::Plan>(
{reinterpret_cast<const char *>(gresource_embedded_pr_65234_jsonData), gresource_embedded_pr_65234_jsonSize});
auto query_plan = parser.parse(plan);
}

INCBIN(resource_embedded_pr_68135_json, SOURCE_DIR "/utils/extern-local-engine/tests/json/clickhouse_pr_68135.json");
TEST(Clickhouse, PR68135)
{
const std::string split_template
= R"({"items":[{"uriFile":"{replace_local_files}","partitionIndex":"0","length":"461","parquet":{},"schema":{},"metadataColumns":[{}]}]})";
const std::string split
= replaceLocalFilesWildcards(split_template, GLUTEN_DATA_DIR("/utils/extern-local-engine/tests/data/68135.snappy.parquet"));

SerializedPlanParser parser(SerializedPlanParser::global_context);
parser.addSplitInfo(local_engine::JsonStringToBinary<substrait::ReadRel::LocalFiles>(split));

const auto plan = local_engine::JsonStringToMessage<substrait::Plan>(
{reinterpret_cast<const char *>(gresource_embedded_pr_68135_jsonData), gresource_embedded_pr_68135_jsonSize});

auto local_executor = parser.createExecutor(plan);
EXPECT_TRUE(local_executor->hasNext());
const Block & x = *local_executor->nextColumnar();
debug::headBlock(x);
}
160 changes: 160 additions & 0 deletions cpp-ch/local-engine/tests/json/clickhouse_pr_68135.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
{
"relations": [
{
"root": {
"input": {
"filter": {
"common": {
"direct": {}
},
"input": {
"read": {
"common": {
"direct": {}
},
"baseSchema": {
"names": [
"a"
],
"struct": {
"types": [
{
"decimal": {
"scale": 2,
"precision": 9,
"nullability": "NULLABILITY_NULLABLE"
}
}
]
},
"columnTypes": [
"NORMAL_COL"
]
},
"filter": {
"singularOrList": {
"value": {
"selection": {
"directReference": {
"structField": {}
}
}
},
"options": [
{
"literal": {
"decimal": {
"value": "yAAAAAAAAAAAAAAAAAAAAA==",
"precision": 9,
"scale": 2
}
}
},
{
"literal": {
"decimal": {
"value": "LAEAAAAAAAAAAAAAAAAAAA==",
"precision": 9,
"scale": 2
}
}
},
{
"literal": {
"decimal": {
"value": "kAEAAAAAAAAAAAAAAAAAAA==",
"precision": 9,
"scale": 2
}
}
},
{
"literal": {
"decimal": {
"value": "9AEAAAAAAAAAAAAAAAAAAA==",
"precision": 9,
"scale": 2
}
}
}
]
}
},
"advancedExtension": {
"optimization": {
"@type": "type.googleapis.com/google.protobuf.StringValue",
"value": "isMergeTree=0\n"
}
}
}
},
"condition": {
"singularOrList": {
"value": {
"selection": {
"directReference": {
"structField": {}
}
}
},
"options": [
{
"literal": {
"decimal": {
"value": "yAAAAAAAAAAAAAAAAAAAAA==",
"precision": 9,
"scale": 2
}
}
},
{
"literal": {
"decimal": {
"value": "LAEAAAAAAAAAAAAAAAAAAA==",
"precision": 9,
"scale": 2
}
}
},
{
"literal": {
"decimal": {
"value": "kAEAAAAAAAAAAAAAAAAAAA==",
"precision": 9,
"scale": 2
}
}
},
{
"literal": {
"decimal": {
"value": "9AEAAAAAAAAAAAAAAAAAAA==",
"precision": 9,
"scale": 2
}
}
}
]
}
}
}
},
"names": [
"a#26"
],
"outputSchema": {
"types": [
{
"decimal": {
"scale": 2,
"precision": 9,
"nullability": "NULLABILITY_NULLABLE"
}
}
],
"nullability": "NULLABILITY_REQUIRED"
}
}
}
]
}

0 comments on commit 27f7e31

Please sign in to comment.