diff --git a/cpp-ch/local-engine/Common/GlutenConfig.h b/cpp-ch/local-engine/Common/GlutenConfig.h index ac82b0fff03a..02bb8a9f4c04 100644 --- a/cpp-ch/local-engine/Common/GlutenConfig.h +++ b/cpp-ch/local-engine/Common/GlutenConfig.h @@ -17,9 +17,9 @@ #pragma once -#include -#include #include +#include +#include namespace local_engine { @@ -104,7 +104,7 @@ struct JoinConfig bool prefer_multi_join_on_clauses = true; size_t multi_join_on_clauses_build_side_rows_limit = 10000000; - static JoinConfig loadFromContext(DB::ContextPtr context) + static JoinConfig loadFromContext(const DB::ContextPtr & context) { JoinConfig config; config.prefer_multi_join_on_clauses = context->getConfigRef().getBool(PREFER_MULTI_JOIN_ON_CLAUSES, true); @@ -198,4 +198,3 @@ struct GlutenJobSchedulerConfig } }; } - diff --git a/cpp-ch/local-engine/Parser/JoinRelParser.cpp b/cpp-ch/local-engine/Parser/JoinRelParser.cpp index ef19e007d439..30651aff1b84 100644 --- a/cpp-ch/local-engine/Parser/JoinRelParser.cpp +++ b/cpp-ch/local-engine/Parser/JoinRelParser.cpp @@ -209,7 +209,7 @@ DB::QueryPlanPtr JoinRelParser::parseJoin(const substrait::JoinRel & join, DB::Q google::protobuf::StringValue optimization_info; optimization_info.ParseFromString(join.advanced_extension().optimization().value()); auto join_opt_info = JoinOptimizationInfo::parse(optimization_info.value()); - LOG_ERROR(getLogger("JoinRelParser"), "optimizaiton info:{}", optimization_info.value()); + LOG_DEBUG(getLogger("JoinRelParser"), "optimization info:{}", optimization_info.value()); auto storage_join = join_opt_info.is_broadcast ? BroadCastJoinBuilder::getJoin(join_opt_info.storage_join_key) : nullptr; if (storage_join) { diff --git a/cpp-ch/local-engine/Storages/Parquet/ParquetConverter.h b/cpp-ch/local-engine/Storages/Parquet/ParquetConverter.h index 312cea7efc0a..0ac16c11104d 100644 --- a/cpp-ch/local-engine/Storages/Parquet/ParquetConverter.h +++ b/cpp-ch/local-engine/Storages/Parquet/ParquetConverter.h @@ -35,12 +35,23 @@ template struct ToParquet { using T = typename PhysicalType::c_type; - T as(const DB::Field & value, const parquet::ColumnDescriptor &) + T as(const DB::Field & value, const parquet::ColumnDescriptor & s) { - if constexpr (std::is_same_v) - return static_cast(value.safeGet()); + if (s.logical_type()->is_decimal()) + { + if constexpr (std::is_same_v) + { + const auto v = value.safeGet>(); + return v.getValue().value; + } + if constexpr (std::is_same_v) + { + const auto v = value.safeGet>(); + return v.getValue().value; + } + } // parquet::BooleanType, parquet::Int64Type, parquet::FloatType, parquet::DoubleType - return value.safeGet(); // FLOAT, DOUBLE, INT64 + return value.safeGet(); // FLOAT, DOUBLE, INT64, Int32 } }; @@ -57,28 +68,44 @@ struct ToParquet } }; +template +parquet::FixedLenByteArray convertField(const DB::Field & value, uint8_t * buf, size_t type_length) +{ + assert(sizeof(T) >= type_length); + + T val = value.safeGet>>().getValue().value; + std::reverse(reinterpret_cast(&val), reinterpret_cast(&val) + sizeof(T)); + const int offset = sizeof(T) - type_length; + + memcpy(buf, reinterpret_cast(&val) + offset, type_length); + return parquet::FixedLenByteArray(buf); +} + template <> struct ToParquet { - uint8_t buf[256]; + uint8_t buf[16]; using T = parquet::FixedLenByteArray; T as(const DB::Field & value, const parquet::ColumnDescriptor & descriptor) { - if (value.getType() != DB::Field::Types::Decimal128) - throw DB::Exception( - DB::ErrorCodes::LOGICAL_ERROR, "Field type '{}' for FIXED_LEN_BYTE_ARRAY is not supported", value.getTypeName()); - static_assert(sizeof(Int128) <= sizeof(buf)); - if (descriptor.type_length() > sizeof(Int128)) + if (value.getType() == DB::Field::Types::Decimal256) + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Field type '{}' is not supported", value.getTypeName()); + + static_assert(sizeof(Int128) == sizeof(buf)); + + if (descriptor.type_length() > sizeof(buf)) throw DB::Exception( DB::ErrorCodes::LOGICAL_ERROR, - "descriptor.type_length() = {} , which is > {}, e.g. sizeof(Int128)", + "descriptor.type_length() = {} , which is > {}, e.g. sizeof(buf)", descriptor.type_length(), - sizeof(Int128)); - Int128 val = value.safeGet>().getValue(); - std::reverse(reinterpret_cast(&val), reinterpret_cast(&val) + sizeof(val)); - const int offset = sizeof(Int128) - descriptor.type_length(); - memcpy(buf, reinterpret_cast(&val) + offset, descriptor.type_length()); - return parquet::FixedLenByteArray(buf); + sizeof(buf)); + + if (value.getType() == DB::Field::Types::Decimal32) + return convertField(value, buf, descriptor.type_length()); + if (value.getType() == DB::Field::Types::Decimal64) + return convertField(value, buf, descriptor.type_length()); + + return convertField(value, buf, descriptor.type_length()); } }; @@ -86,7 +113,7 @@ struct ToParquet template struct ConverterNumeric { - using From = typename Col::Container::value_type; + using From = typename Col::ValueType; using To = typename DType::c_type; const Col & column; @@ -119,6 +146,7 @@ using ConverterInt64 = ConverterNumeric>; using ConverterDouble = ConverterNumeric>; +using ConverterFloat = ConverterNumeric>; struct ConverterString { @@ -141,7 +169,7 @@ struct ConverterString /// Like ConverterNumberAsFixedString, but converts to big-endian. Because that's the byte order /// Parquet uses for decimal types and literally nothing else, for some reason. -template +template struct ConverterDecimal { const parquet::ColumnDescriptor & descriptor; @@ -165,7 +193,7 @@ struct ConverterDecimal data_buf.resize(count * sizeof(T)); ptr_buf.resize(count); memcpy(data_buf.data(), reinterpret_cast(column.getData().data() + offset), count * sizeof(T)); - const size_t offset_in_buf = sizeof(Int128) - descriptor.type_length(); + const size_t offset_in_buf = sizeof(T) - descriptor.type_length(); ; for (size_t i = 0; i < count; ++i) { @@ -176,6 +204,13 @@ struct ConverterDecimal } }; +using Decimal128ToFLB = ConverterDecimal; +using Decimal64ToFLB = ConverterDecimal; +using Decimal32ToFLB = ConverterDecimal; + +using ConverterDecimal32 = ConverterNumeric>; +using ConverterDecimal64 = ConverterNumeric>; + class BaseConverter { public: @@ -239,6 +274,8 @@ std::shared_ptr> ParquetConverter::Make(const DB: case TypeIndex::UInt32: result = std::make_shared>(ConverterInt32_u(c)); break; + case TypeIndex::Decimal32: + result = std::make_shared>(ConverterDecimal32(c)); default: break; } @@ -251,6 +288,8 @@ std::shared_ptr> ParquetConverter::Make(const DB: case TypeIndex::UInt64: result = std::make_shared>(ConverterInt64_u(c)); break; + case TypeIndex::Decimal64: + result = std::make_shared>(ConverterDecimal64(c)); default: break; } @@ -258,6 +297,14 @@ std::shared_ptr> ParquetConverter::Make(const DB: case parquet::Type::INT96: break; case parquet::Type::FLOAT: + switch (c->getDataType()) + { + case TypeIndex::Float32: + result = std::make_shared>(ConverterFloat(c)); + break; + default: + break; + } break; case parquet::Type::DOUBLE: switch (c->getDataType()) @@ -283,8 +330,13 @@ std::shared_ptr> ParquetConverter::Make(const DB: switch (c->getDataType()) { case TypeIndex::Decimal128: - result = std::make_shared>>( - ConverterDecimal(c, desc)); + result = std::make_shared>(Decimal128ToFLB(c, desc)); + break; + case TypeIndex::Decimal64: + result = std::make_shared>(Decimal64ToFLB(c, desc)); + break; + case TypeIndex::Decimal32: + result = std::make_shared>(Decimal32ToFLB(c, desc)); break; default: break; diff --git a/cpp-ch/local-engine/tests/decmial_filter_push_down/18_2.json b/cpp-ch/local-engine/tests/decmial_filter_push_down/18_2.json new file mode 100644 index 000000000000..5ad0a62325de --- /dev/null +++ b/cpp-ch/local-engine/tests/decmial_filter_push_down/18_2.json @@ -0,0 +1,160 @@ +{ + "relations": [ + { + "root": { + "input": { + "filter": { + "common": { + "direct": {} + }, + "input": { + "read": { + "common": { + "direct": {} + }, + "baseSchema": { + "names": [ + "a" + ], + "struct": { + "types": [ + { + "decimal": { + "scale": 2, + "precision": 18, + "nullability": "NULLABILITY_NULLABLE" + } + } + ] + }, + "columnTypes": [ + "NORMAL_COL" + ] + }, + "filter": { + "singularOrList": { + "value": { + "selection": { + "directReference": { + "structField": {} + } + } + }, + "options": [ + { + "literal": { + "decimal": { + "value": "yAAAAAAAAAAAAAAAAAAAAA==", + "precision": 18, + "scale": 2 + } + } + }, + { + "literal": { + "decimal": { + "value": "LAEAAAAAAAAAAAAAAAAAAA==", + "precision": 18, + "scale": 2 + } + } + }, + { + "literal": { + "decimal": { + "value": "kAEAAAAAAAAAAAAAAAAAAA==", + "precision": 18, + "scale": 2 + } + } + }, + { + "literal": { + "decimal": { + "value": "9AEAAAAAAAAAAAAAAAAAAA==", + "precision": 18, + "scale": 2 + } + } + } + ] + } + }, + "advancedExtension": { + "optimization": { + "@type": "type.googleapis.com/google.protobuf.StringValue", + "value": "isMergeTree=0\n" + } + } + } + }, + "condition": { + "singularOrList": { + "value": { + "selection": { + "directReference": { + "structField": {} + } + } + }, + "options": [ + { + "literal": { + "decimal": { + "value": "yAAAAAAAAAAAAAAAAAAAAA==", + "precision": 18, + "scale": 2 + } + } + }, + { + "literal": { + "decimal": { + "value": "LAEAAAAAAAAAAAAAAAAAAA==", + "precision": 18, + "scale": 2 + } + } + }, + { + "literal": { + "decimal": { + "value": "kAEAAAAAAAAAAAAAAAAAAA==", + "precision": 18, + "scale": 2 + } + } + }, + { + "literal": { + "decimal": { + "value": "9AEAAAAAAAAAAAAAAAAAAA==", + "precision": 18, + "scale": 2 + } + } + } + ] + } + } + } + }, + "names": [ + "a#4772" + ], + "outputSchema": { + "types": [ + { + "decimal": { + "scale": 2, + "precision": 18, + "nullability": "NULLABILITY_NULLABLE" + } + } + ], + "nullability": "NULLABILITY_REQUIRED" + } + } + } + ] +} \ No newline at end of file diff --git a/cpp-ch/local-engine/tests/decmial_filter_push_down/18_2_flba.snappy.parquet b/cpp-ch/local-engine/tests/decmial_filter_push_down/18_2_flba.snappy.parquet new file mode 100644 index 000000000000..ac0b015900df Binary files /dev/null and b/cpp-ch/local-engine/tests/decmial_filter_push_down/18_2_flba.snappy.parquet differ diff --git a/cpp-ch/local-engine/tests/gtest_parquet_columnindex_bug.cpp b/cpp-ch/local-engine/tests/gtest_parquet_columnindex_bug.cpp new file mode 100644 index 000000000000..ee6e70305b27 --- /dev/null +++ b/cpp-ch/local-engine/tests/gtest_parquet_columnindex_bug.cpp @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include +#include +#include +#include +#include +#include +#include +#include + + +using namespace local_engine; + +using namespace DB; + +INCBIN(resource_embedded_pr_18_2_json, SOURCE_DIR "/utils/extern-local-engine/tests/decmial_filter_push_down/18_2.json"); +TEST(ColumnIndex, Deciaml182) +{ + // [precision,scale] = [18,2] + const auto context1 = DB::Context::createCopy(SerializedPlanParser::global_context); + + auto config = ExecutorConfig::loadFromContext(context1); + EXPECT_TRUE(config.use_local_format) << "gtest need set use_local_format to true"; + + const std::string split_template + = R"({"items":[{"uriFile":"{replace_local_files}","partitionIndex":"0","length":"488","parquet":{},"schema":{},"metadataColumns":[{}]}]})"; + const std::string split = replaceLocalFilesWildcards( + split_template, GLUTEN_DATA_DIR("/utils/extern-local-engine/tests/decmial_filter_push_down/18_2_flba.snappy.parquet")); + + SerializedPlanParser parser(context1); + parser.addSplitInfo(local_engine::JsonStringToBinary(split)); + + const auto plan = local_engine::JsonStringToMessage( + {reinterpret_cast(gresource_embedded_pr_18_2_jsonData), gresource_embedded_pr_18_2_jsonSize}); + + auto local_executor = parser.createExecutor(plan); + EXPECT_TRUE(local_executor->hasNext()); + const Block & x = *local_executor->nextColumnar(); + debug::headBlock(x); +} \ No newline at end of file diff --git a/cpp-ch/local-engine/tests/json/gtest_local_engine_config.json b/cpp-ch/local-engine/tests/json/gtest_local_engine_config.json index 10f0ea3dfdad..8ada07819bb6 100644 --- a/cpp-ch/local-engine/tests/json/gtest_local_engine_config.json +++ b/cpp-ch/local-engine/tests/json/gtest_local_engine_config.json @@ -260,6 +260,14 @@ "value": { "string": "false" } + }, + { + "key": { + "string": "spark.gluten.sql.columnar.backend.ch.runtime_config.use_local_format" + }, + "value": { + "string": "true" + } } ] } diff --git a/gluten-ut/common/src/test/scala/org/apache/gluten/utils/BackendTestSettings.scala b/gluten-ut/common/src/test/scala/org/apache/gluten/utils/BackendTestSettings.scala index 987635d067be..dce8ac83710c 100644 --- a/gluten-ut/common/src/test/scala/org/apache/gluten/utils/BackendTestSettings.scala +++ b/gluten-ut/common/src/test/scala/org/apache/gluten/utils/BackendTestSettings.scala @@ -30,7 +30,10 @@ abstract class BackendTestSettings { private val enabledSuites: java.util.Map[String, SuiteSettings] = new util.HashMap() protected def enableSuite[T: ClassTag]: SuiteSettings = { - val suiteName = implicitly[ClassTag[T]].runtimeClass.getCanonicalName + enableSuite(implicitly[ClassTag[T]].runtimeClass.getCanonicalName) + } + + protected def enableSuite(suiteName: String): SuiteSettings = { if (enabledSuites.containsKey(suiteName)) { throw new IllegalArgumentException("Duplicated suite name: " + suiteName) } diff --git a/gluten-ut/spark33/pom.xml b/gluten-ut/spark33/pom.xml index 9251ebc8ab78..539f60a63f1b 100644 --- a/gluten-ut/spark33/pom.xml +++ b/gluten-ut/spark33/pom.xml @@ -51,6 +51,28 @@ test + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-sources + generate-sources + + add-test-source + + + + src/test/backends-clickhouse + + + + + + + backends-velox diff --git a/gluten-ut/spark33/src/test/backends-clickhouse/org/apache/gluten/execution/parquet/GlutenParquetV1FilterSuite2.scala b/gluten-ut/spark33/src/test/backends-clickhouse/org/apache/gluten/execution/parquet/GlutenParquetV1FilterSuite2.scala new file mode 100644 index 000000000000..32c7784cff92 --- /dev/null +++ b/gluten-ut/spark33/src/test/backends-clickhouse/org/apache/gluten/execution/parquet/GlutenParquetV1FilterSuite2.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution.parquet + +import org.apache.spark.SparkConf +import org.apache.spark.sql.execution.datasources.parquet.GlutenParquetV1FilterSuite + +/** testing use_local_format parquet reader. */ +class GlutenParquetV1FilterSuite2 extends GlutenParquetV1FilterSuite { + override def sparkConf: SparkConf = + super.sparkConf + .set("spark.gluten.sql.columnar.backend.ch.runtime_config.use_local_format", "true") +} diff --git a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala index c8e162e61d66..660d693cce3f 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala @@ -1600,6 +1600,20 @@ class ClickHouseTestSettings extends BackendTestSettings { .exclude("SPARK-38825: in and notIn filters") .exclude("SPARK-36866: filter pushdown - year-month interval") .excludeGlutenTest("SPARK-25207: exception when duplicate fields in case-insensitive mode") + enableSuite("org.apache.gluten.execution.parquet.GlutenParquetV1FilterSuite2") + .exclude("filter pushdown - date") + .exclude("filter pushdown - timestamp") + .exclude("Filters should be pushed down for vectorized Parquet reader at row group level") + .exclude("SPARK-31026: Parquet predicate pushdown for fields having dots in the names") + .exclude("Filters should be pushed down for Parquet readers at row group level") + .exclude("filter pushdown - StringStartsWith") + .exclude("SPARK-17091: Convert IN predicate to Parquet filter push-down") + .exclude("SPARK-25207: exception when duplicate fields in case-insensitive mode") + .exclude("Support Parquet column index") + .exclude("SPARK-34562: Bloom filter push down") + .exclude("SPARK-38825: in and notIn filters") + .exclude("SPARK-36866: filter pushdown - year-month interval") + .excludeGlutenTest("SPARK-25207: exception when duplicate fields in case-insensitive mode") enableSuite[GlutenParquetV1PartitionDiscoverySuite] .exclude("SPARK-7847: Dynamic partition directory path escaping and unescaping") .exclude("Various partition value types") diff --git a/gluten-ut/spark35/src/test/backends-clickhouse/org/apache/gluten/GlutenColumnarWriteTestSupport.scala b/gluten-ut/spark35/src/test/backends-clickhouse/org/apache/gluten/GlutenColumnarWriteTestSupport.scala index 43b83afe9af3..4258cd891a5a 100644 --- a/gluten-ut/spark35/src/test/backends-clickhouse/org/apache/gluten/GlutenColumnarWriteTestSupport.scala +++ b/gluten-ut/spark35/src/test/backends-clickhouse/org/apache/gluten/GlutenColumnarWriteTestSupport.scala @@ -16,11 +16,12 @@ */ package org.apache.gluten -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.{ColumnarWriteFilesExec, SparkPlan} trait GlutenColumnarWriteTestSupport { def checkWriteFilesAndGetChild(sparkPlan: SparkPlan): SparkPlan = { - throw new UnsupportedOperationException("Clickhouse Backend does not support write files") + assert(sparkPlan.isInstanceOf[ColumnarWriteFilesExec]) + sparkPlan.asInstanceOf[ColumnarWriteFilesExec].child } } diff --git a/gluten-ut/spark35/src/test/backends-clickhouse/org/apache/gluten/execution/parquet/GlutenParquetV1FilterSuite2.scala b/gluten-ut/spark35/src/test/backends-clickhouse/org/apache/gluten/execution/parquet/GlutenParquetV1FilterSuite2.scala new file mode 100644 index 000000000000..d20a419597d1 --- /dev/null +++ b/gluten-ut/spark35/src/test/backends-clickhouse/org/apache/gluten/execution/parquet/GlutenParquetV1FilterSuite2.scala @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution.parquet + +import org.apache.spark.SparkConf +import org.apache.spark.sql.execution.datasources.parquet.GlutenParquetV1FilterSuite + + +/** testing use_local_format parquet reader. + * FIXME: Run this suite in Spark 35 CI Pipeline + * */ +class GlutenParquetV1FilterSuite2 extends GlutenParquetV1FilterSuite { + override def sparkConf: SparkConf = + super.sparkConf + .set("spark.gluten.sql.columnar.backend.ch.runtime_config.use_local_format", "true") +} diff --git a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala index 77c12621efeb..bf971aba7282 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala @@ -1438,6 +1438,8 @@ class ClickHouseTestSettings extends BackendTestSettings { .exclude("SPARK-34562: Bloom filter push down") .exclude("SPARK-38825: in and notIn filters") .exclude("SPARK-36866: filter pushdown - year-month interval") + .exclude("filter pushdown - StringContains") + .exclude("filter pushdown - StringPredicate") .excludeGlutenTest("SPARK-25207: exception when duplicate fields in case-insensitive mode") enableSuite[GlutenParquetV1PartitionDiscoverySuite] .exclude("SPARK-7847: Dynamic partition directory path escaping and unescaping")