Skip to content

Commit

Permalink
[GLUTEN-6938][CH] Fix core dump when range partition include literal (a…
Browse files Browse the repository at this point in the history
…pache#6964)

* [GLUTEN-6938][CH] Fix core dump when range partition include literal

* update per comment
  • Loading branch information
baibaichen authored and shamirchen committed Oct 14, 2024
1 parent ba62e3e commit 54ef603
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ object CHExecUtil extends Logging {
// Thus in Columnar Shuffle we never use the "key" part.
val isOrderSensitive = isRoundRobin && !SQLConf.get.sortBeforeRepartition

val rddWithpartitionKey: RDD[Product2[Int, ColumnarBatch]] =
val rddWithPartitionKey: RDD[Product2[Int, ColumnarBatch]] =
if (
GlutenConfig.getConf.isUseColumnarShuffleManager
|| GlutenConfig.getConf.isUseCelebornShuffleManager
Expand All @@ -345,7 +345,7 @@ object CHExecUtil extends Logging {

val dependency =
new ColumnarShuffleDependency[Int, ColumnarBatch, ColumnarBatch](
rddWithpartitionKey,
rddWithPartitionKey,
new PartitionIdPassthrough(newPartitioning.numPartitions),
serializer,
shuffleWriterProcessor = ShuffleExchangeExec.createShuffleWriteProcessor(writeMetrics),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution
package org.apache.gluten.execution.compatibility

import org.apache.gluten.GlutenConfig
import org.apache.gluten.execution.GlutenClickHouseTPCHAbstractSuite
import org.apache.gluten.utils.UTSystemParameters

import org.apache.spark.SparkConf
Expand Down Expand Up @@ -78,6 +79,18 @@ class GlutenClickhouseFunctionSuite extends GlutenClickHouseTPCHAbstractSuite {
}
}

test("https://github.com/apache/incubator-gluten/issues/6938") {
val testSQL =
s"""
|select * from (
| select 1 as x, r_name as y, 's' as z from region
| union all
| select 2 as x, n_name as y, null as z from nation
|) order by y,x,z
|""".stripMargin
runQueryAndCompare(testSQL)(_ => ())
}

test("Support In list option contains non-foldable expression") {
runQueryAndCompare(
"""
Expand Down Expand Up @@ -217,5 +230,4 @@ class GlutenClickhouseFunctionSuite extends GlutenClickHouseTPCHAbstractSuite {
)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gluten.execution
package org.apache.gluten.execution.compatibility

import org.apache.gluten.execution.GlutenClickHouseWholeStageTransformerSuite

import org.apache.spark.SparkConf

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,12 @@
package org.apache.gluten.execution.parquet

import org.apache.gluten.execution.{FileSourceScanExecTransformer, GlutenClickHouseWholeStageTransformerSuite}
import org.apache.gluten.test.GlutenSQLTestUtils
import org.apache.gluten.test.{GlutenSQLTestUtils, GlutenTPCHBase}

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.gluten.test.GlutenTPCHBase
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.Decimal

Expand Down Expand Up @@ -385,13 +384,13 @@ class GlutenParquetFilterSuite
'p_size.int >= 1,
'p_partkey.long.isNotNull,
('p_brand.string === "Brand#12" &&
('p_container.string.in("SM CASE", "SM BOX", "SM PACK", "SM PKG")) &&
'p_container.string.in("SM CASE", "SM BOX", "SM PACK", "SM PKG") &&
'p_size.int <= 5) ||
('p_brand.string === "Brand#23" &&
('p_container.string.in("MED BAG", "MED BOX", "MED PKG", "MED PACK")) &&
'p_container.string.in("MED BAG", "MED BOX", "MED PKG", "MED PACK") &&
'p_size.int <= 10) ||
('p_brand.string === "Brand#34" &&
('p_container.string.in("LG CASE", "LG BOX", "LG PACK", "LG PKG")) &&
'p_container.string.in("LG CASE", "LG BOX", "LG PACK", "LG PKG") &&
'p_size.int <= 15)
)
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.gluten.test

import org.apache.gluten.test.GlutenTPCBase
package org.apache.gluten.test

import org.apache.spark.sql.catalyst.TableIdentifier

Expand Down Expand Up @@ -51,7 +49,11 @@ trait GlutenTPCHBase extends GlutenTPCBase {

override def dropTables(): Unit = {
tpchCreateTable.keys.foreach {
tableName => spark.sessionState.catalog.dropTable(TableIdentifier(tableName), true, true)
tableName =>
spark.sessionState.catalog.dropTable(
TableIdentifier(tableName),
ignoreIfNotExists = true,
purge = true)
}
}

Expand Down
16 changes: 14 additions & 2 deletions cpp-ch/local-engine/Shuffle/SelectorBuilder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -367,19 +367,31 @@ void RangeSelectorBuilder::computePartitionIdByBinarySearch(DB::Block & block, D
selector.emplace_back(selected_partition);
}
}
namespace {
int doCompareAt(const ColumnPtr & lhs, size_t n, size_t m, const IColumn & rhs, int nan_direction_hint)
{
if (const auto * l_const = typeid_cast<const ColumnConst *>(lhs.get()))
{
// we know rhs never be Const
chassert(l_const->getDataType() == rhs.getDataType());
return l_const->getDataColumn().compareAt(0, m, rhs, nan_direction_hint);
}
return lhs->compareAt(n, m, rhs, nan_direction_hint);
}
}

int RangeSelectorBuilder::compareRow(
const DB::Columns & columns,
const std::vector<size_t> & required_columns,
size_t row,
const DB::Columns & bound_columns,
size_t bound_row)
size_t bound_row) const
{
for (size_t i = 0, n = required_columns.size(); i < n; ++i)
{
auto lpos = required_columns[i];
auto rpos = i;
auto res = columns[lpos]->compareAt(row, bound_row, *bound_columns[rpos], sort_descriptions[i].nulls_direction)
auto res = doCompareAt(columns[lpos], row, bound_row, *bound_columns[rpos], sort_descriptions[i].nulls_direction)
* sort_descriptions[i].direction;
if (res != 0)
return res;
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Shuffle/SelectorBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class RangeSelectorBuilder : public SelectorBuilder
const std::vector<size_t> & required_columns,
size_t row,
const DB::Columns & bound_columns,
size_t bound_row);
size_t bound_row) const;

int binarySearchBound(
const DB::Columns & bound_columns,
Expand Down

0 comments on commit 54ef603

Please sign in to comment.