From 8e0d56e427049e8f18b966d9f52ebe53f1e5dffa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=AB=98=E9=98=B3=E9=98=B3?= Date: Thu, 11 Jul 2024 09:20:06 +0800 Subject: [PATCH] [UT] Test input_file_name, input_file_block_start & input_file_block_length when scan falls back (#6318) --- .../sql/GlutenColumnExpressionSuite.scala | 52 ++++++++----------- .../sql/GlutenColumnExpressionSuite.scala | 52 ++++++++----------- .../sql/GlutenColumnExpressionSuite.scala | 52 ++++++++----------- .../sql/GlutenColumnExpressionSuite.scala | 52 ++++++++----------- 4 files changed, 92 insertions(+), 116 deletions(-) diff --git a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala index a4b530e637af..da22e60f932d 100644 --- a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala +++ b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala @@ -18,38 +18,32 @@ package org.apache.spark.sql import org.apache.spark.sql.execution.ProjectExec import org.apache.spark.sql.functions.{expr, input_file_name} -import org.apache.spark.sql.types.{ArrayType, IntegerType, StringType, StructField, StructType} class GlutenColumnExpressionSuite extends ColumnExpressionSuite with GlutenSQLTestsTrait { - testGluten("input_file_name with scan is fallback") { - withTempPath { - dir => - val rawData = Seq( - Row(1, "Alice", Seq(Row(Seq(1, 2, 3)))), - Row(2, "Bob", Seq(Row(Seq(4, 5)))), - Row(3, "Charlie", Seq(Row(Seq(6, 7, 8, 9)))) - ) - val schema = StructType( - Array( - StructField("id", IntegerType, nullable = false), - StructField("name", StringType, nullable = false), - StructField( - "nested_column", - ArrayType( - StructType(Array( - StructField("array_in_struct", ArrayType(IntegerType), nullable = true) - ))), - nullable = true) - )) - val data: DataFrame = spark.createDataFrame(sparkContext.parallelize(rawData), schema) - data.write.parquet(dir.getCanonicalPath) + import testImplicits._ + testGluten( + "input_file_name, input_file_block_start and input_file_block_length " + + "should fall back if scan falls back") { + withSQLConf(("spark.gluten.sql.columnar.filescan", "false")) { + withTempPath { + dir => + val data = sparkContext.parallelize(0 to 10).toDF("id") + data.write.parquet(dir.getCanonicalPath) - val q = - spark.read.parquet(dir.getCanonicalPath).select(input_file_name(), expr("nested_column")) - val firstRow = q.head() - assert(firstRow.getString(0).contains(dir.toURI.getPath)) - val project = q.queryExecution.executedPlan.collect { case p: ProjectExec => p } - assert(project.size == 1) + val q = + spark.read + .parquet(dir.getCanonicalPath) + .select( + input_file_name(), + expr("input_file_block_start()"), + expr("input_file_block_length()")) + val firstRow = q.head() + assert(firstRow.getString(0).contains(dir.toURI.getPath)) + assert(firstRow.getLong(1) == 0) + assert(firstRow.getLong(2) > 0) + val project = q.queryExecution.executedPlan.collect { case p: ProjectExec => p } + assert(project.size == 1) + } } } } diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala index a4b530e637af..da22e60f932d 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala @@ -18,38 +18,32 @@ package org.apache.spark.sql import org.apache.spark.sql.execution.ProjectExec import org.apache.spark.sql.functions.{expr, input_file_name} -import org.apache.spark.sql.types.{ArrayType, IntegerType, StringType, StructField, StructType} class GlutenColumnExpressionSuite extends ColumnExpressionSuite with GlutenSQLTestsTrait { - testGluten("input_file_name with scan is fallback") { - withTempPath { - dir => - val rawData = Seq( - Row(1, "Alice", Seq(Row(Seq(1, 2, 3)))), - Row(2, "Bob", Seq(Row(Seq(4, 5)))), - Row(3, "Charlie", Seq(Row(Seq(6, 7, 8, 9)))) - ) - val schema = StructType( - Array( - StructField("id", IntegerType, nullable = false), - StructField("name", StringType, nullable = false), - StructField( - "nested_column", - ArrayType( - StructType(Array( - StructField("array_in_struct", ArrayType(IntegerType), nullable = true) - ))), - nullable = true) - )) - val data: DataFrame = spark.createDataFrame(sparkContext.parallelize(rawData), schema) - data.write.parquet(dir.getCanonicalPath) + import testImplicits._ + testGluten( + "input_file_name, input_file_block_start and input_file_block_length " + + "should fall back if scan falls back") { + withSQLConf(("spark.gluten.sql.columnar.filescan", "false")) { + withTempPath { + dir => + val data = sparkContext.parallelize(0 to 10).toDF("id") + data.write.parquet(dir.getCanonicalPath) - val q = - spark.read.parquet(dir.getCanonicalPath).select(input_file_name(), expr("nested_column")) - val firstRow = q.head() - assert(firstRow.getString(0).contains(dir.toURI.getPath)) - val project = q.queryExecution.executedPlan.collect { case p: ProjectExec => p } - assert(project.size == 1) + val q = + spark.read + .parquet(dir.getCanonicalPath) + .select( + input_file_name(), + expr("input_file_block_start()"), + expr("input_file_block_length()")) + val firstRow = q.head() + assert(firstRow.getString(0).contains(dir.toURI.getPath)) + assert(firstRow.getLong(1) == 0) + assert(firstRow.getLong(2) > 0) + val project = q.queryExecution.executedPlan.collect { case p: ProjectExec => p } + assert(project.size == 1) + } } } } diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala index a4b530e637af..da22e60f932d 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala @@ -18,38 +18,32 @@ package org.apache.spark.sql import org.apache.spark.sql.execution.ProjectExec import org.apache.spark.sql.functions.{expr, input_file_name} -import org.apache.spark.sql.types.{ArrayType, IntegerType, StringType, StructField, StructType} class GlutenColumnExpressionSuite extends ColumnExpressionSuite with GlutenSQLTestsTrait { - testGluten("input_file_name with scan is fallback") { - withTempPath { - dir => - val rawData = Seq( - Row(1, "Alice", Seq(Row(Seq(1, 2, 3)))), - Row(2, "Bob", Seq(Row(Seq(4, 5)))), - Row(3, "Charlie", Seq(Row(Seq(6, 7, 8, 9)))) - ) - val schema = StructType( - Array( - StructField("id", IntegerType, nullable = false), - StructField("name", StringType, nullable = false), - StructField( - "nested_column", - ArrayType( - StructType(Array( - StructField("array_in_struct", ArrayType(IntegerType), nullable = true) - ))), - nullable = true) - )) - val data: DataFrame = spark.createDataFrame(sparkContext.parallelize(rawData), schema) - data.write.parquet(dir.getCanonicalPath) + import testImplicits._ + testGluten( + "input_file_name, input_file_block_start and input_file_block_length " + + "should fall back if scan falls back") { + withSQLConf(("spark.gluten.sql.columnar.filescan", "false")) { + withTempPath { + dir => + val data = sparkContext.parallelize(0 to 10).toDF("id") + data.write.parquet(dir.getCanonicalPath) - val q = - spark.read.parquet(dir.getCanonicalPath).select(input_file_name(), expr("nested_column")) - val firstRow = q.head() - assert(firstRow.getString(0).contains(dir.toURI.getPath)) - val project = q.queryExecution.executedPlan.collect { case p: ProjectExec => p } - assert(project.size == 1) + val q = + spark.read + .parquet(dir.getCanonicalPath) + .select( + input_file_name(), + expr("input_file_block_start()"), + expr("input_file_block_length()")) + val firstRow = q.head() + assert(firstRow.getString(0).contains(dir.toURI.getPath)) + assert(firstRow.getLong(1) == 0) + assert(firstRow.getLong(2) > 0) + val project = q.queryExecution.executedPlan.collect { case p: ProjectExec => p } + assert(project.size == 1) + } } } } diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala index 8a28c4e98a26..da22e60f932d 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/GlutenColumnExpressionSuite.scala @@ -18,38 +18,32 @@ package org.apache.spark.sql import org.apache.spark.sql.execution.ProjectExec import org.apache.spark.sql.functions.{expr, input_file_name} -import org.apache.spark.sql.types._ class GlutenColumnExpressionSuite extends ColumnExpressionSuite with GlutenSQLTestsTrait { - testGluten("input_file_name with scan is fallback") { - withTempPath { - dir => - val rawData = Seq( - Row(1, "Alice", Seq(Row(Seq(1, 2, 3)))), - Row(2, "Bob", Seq(Row(Seq(4, 5)))), - Row(3, "Charlie", Seq(Row(Seq(6, 7, 8, 9)))) - ) - val schema = StructType( - Array( - StructField("id", IntegerType, nullable = false), - StructField("name", StringType, nullable = false), - StructField( - "nested_column", - ArrayType( - StructType(Array( - StructField("array_in_struct", ArrayType(IntegerType), nullable = true) - ))), - nullable = true) - )) - val data: DataFrame = spark.createDataFrame(sparkContext.parallelize(rawData), schema) - data.write.parquet(dir.getCanonicalPath) + import testImplicits._ + testGluten( + "input_file_name, input_file_block_start and input_file_block_length " + + "should fall back if scan falls back") { + withSQLConf(("spark.gluten.sql.columnar.filescan", "false")) { + withTempPath { + dir => + val data = sparkContext.parallelize(0 to 10).toDF("id") + data.write.parquet(dir.getCanonicalPath) - val q = - spark.read.parquet(dir.getCanonicalPath).select(input_file_name(), expr("nested_column")) - val firstRow = q.head() - assert(firstRow.getString(0).contains(dir.toURI.getPath)) - val project = q.queryExecution.executedPlan.collect { case p: ProjectExec => p } - assert(project.size == 1) + val q = + spark.read + .parquet(dir.getCanonicalPath) + .select( + input_file_name(), + expr("input_file_block_start()"), + expr("input_file_block_length()")) + val firstRow = q.head() + assert(firstRow.getString(0).contains(dir.toURI.getPath)) + assert(firstRow.getLong(1) == 0) + assert(firstRow.getLong(2) > 0) + val project = q.queryExecution.executedPlan.collect { case p: ProjectExec => p } + assert(project.size == 1) + } } } }