diff --git a/.github/workflows/build_bundle_package.yml b/.github/workflows/build_bundle_package.yml index 3afe5b5e02a8..d4f7046f2b0c 100644 --- a/.github/workflows/build_bundle_package.yml +++ b/.github/workflows/build_bundle_package.yml @@ -17,6 +17,7 @@ name: Build bundle package env: ACTIONS_ALLOW_USE_UNSECURE_NODE_VERSION: true + CCACHE_DIR: "${{ github.workspace }}/.ccache" concurrency: group: ${{ github.repository }}-${{ github.head_ref || github.sha }}-${{ github.workflow }} @@ -25,10 +26,6 @@ concurrency: on: workflow_dispatch: inputs: - os: - description: 'OS version: ubuntu:20.04, ubuntu:22.04, centos:7 or centos:8' - required: true - default: 'ubuntu:20.04' spark: description: 'Spark version: spark-3.2, spark-3.3, spark-3.4 or spark-3.5' required: true @@ -41,19 +38,22 @@ on: jobs: build-native-lib: runs-on: ubuntu-20.04 - container: apache/gluten:gluten-vcpkg-builder_2024_05_29 + container: apache/gluten:vcpkg-centos-7 steps: - uses: actions/checkout@v2 + - name: Get Ccache + uses: actions/cache/restore@v3 + with: + path: '${{ env.CCACHE_DIR }}' + key: ccache-centos7-release-default-${{github.sha}} + restore-keys: | + ccache-centos7-release-default - name: Build Gluten velox third party run: | - yum install sudo patch java-1.8.0-openjdk-devel -y && \ - cd $GITHUB_WORKSPACE/ep/build-velox/src && \ - ./get_velox.sh && \ - source /opt/rh/devtoolset-11/enable && \ - cd $GITHUB_WORKSPACE/ && \ - export NUM_THREADS=4 - ./dev/builddeps-veloxbe.sh --enable_vcpkg=ON --build_tests=OFF --build_benchmarks=OFF --enable_s3=OFF \ - --enable_gcs=OFF --enable_hdfs=ON --enable_abfs=OFF + df -a + yum install ccache -y + cd $GITHUB_WORKSPACE/ + bash dev/ci-velox-buildstatic-centos-7.sh - name: Upload native libs uses: actions/upload-artifact@v2 with: @@ -66,44 +66,10 @@ jobs: path: /root/.m2/repository/org/apache/arrow/ name: velox-arrow-jar-centos-7-${{github.sha}} - build-bundle-package-ubuntu: - if: startsWith(github.event.inputs.os, 'ubuntu') - needs: build-native-lib - runs-on: ubuntu-20.04 - container: ${{ github.event.inputs.os }} - steps: - - uses: actions/checkout@v2 - - name: Download All Artifacts - uses: actions/download-artifact@v2 - with: - name: velox-native-lib-${{github.sha}} - path: ./cpp/build/releases - - name: Download All Arrow Jar Artifacts - uses: actions/download-artifact@v2 - with: - name: velox-arrow-jar-centos-7-${{github.sha}} - path: /root/.m2/repository/org/apache/arrow/ - - name: Setup java and maven - run: | - apt-get update && \ - apt-get install -y openjdk-8-jdk maven && \ - apt remove openjdk-11* -y - - name: Build for Spark ${{ github.event.inputs.spark }} - run: | - cd $GITHUB_WORKSPACE/ && \ - mvn clean install -P${{ github.event.inputs.spark }} -Dhadoop.version=${{ github.event.inputs.hadoop }} -Pbackends-velox -Pceleborn -DskipTests -Dmaven.source.skip - - name: Upload bundle package - uses: actions/upload-artifact@v2 - with: - name: gluten-velox-bundle-package - path: package/target/gluten-velox-bundle-*.jar - retention-days: 7 - build-bundle-package-centos7: - if: ${{ github.event.inputs.os == 'centos:7' }} needs: build-native-lib runs-on: ubuntu-20.04 - container: ${{ github.event.inputs.os }} + container: centos:7 steps: - uses: actions/checkout@v2 - name: Download All Artifacts @@ -127,7 +93,7 @@ jobs: cd $GITHUB_WORKSPACE/ && \ export MAVEN_HOME=/usr/lib/maven && \ export PATH=${PATH}:${MAVEN_HOME}/bin && \ - mvn clean install -P${{ github.event.inputs.spark }} -Dhadoop.version=${{ github.event.inputs.hadoop }} -Pbackends-velox -Pceleborn -DskipTests -Dmaven.source.skip + mvn clean install -P${{ github.event.inputs.spark }} -Dhadoop.version=${{ github.event.inputs.hadoop }} -Pbackends-velox -Pceleborn -Puniffle -DskipTests -Dmaven.source.skip - name: Upload bundle package uses: actions/upload-artifact@v2 with: @@ -135,42 +101,3 @@ jobs: path: package/target/gluten-velox-bundle-*.jar retention-days: 7 - build-bundle-package-centos8: - if: ${{ github.event.inputs.os == 'centos:8' }} - needs: build-native-lib - runs-on: ubuntu-20.04 - container: ${{ github.event.inputs.os }} - steps: - - uses: actions/checkout@v2 - - name: Download All Artifacts - uses: actions/download-artifact@v2 - with: - name: velox-native-lib-${{github.sha}} - path: ./cpp/build/releases - - name: Download All Arrow Jar Artifacts - uses: actions/download-artifact@v2 - with: - name: velox-arrow-jar-centos-7-${{github.sha}} - path: /root/.m2/repository/org/apache/arrow/ - - name: Update mirror list - run: | - sed -i -e "s|mirrorlist=|#mirrorlist=|g" /etc/yum.repos.d/CentOS-* || true && \ - sed -i -e "s|#baseurl=http://mirror.centos.org|baseurl=http://vault.centos.org|g" /etc/yum.repos.d/CentOS-* || true - - name: Setup java and maven - run: | - yum update -y && yum install -y java-1.8.0-openjdk-devel wget && \ - wget https://downloads.apache.org/maven/maven-3/3.8.8/binaries/apache-maven-3.8.8-bin.tar.gz && \ - tar -xvf apache-maven-3.8.8-bin.tar.gz && \ - mv apache-maven-3.8.8 /usr/lib/maven - - name: Build for Spark ${{ github.event.inputs.spark }} - run: | - cd $GITHUB_WORKSPACE/ && \ - export MAVEN_HOME=/usr/lib/maven && \ - export PATH=${PATH}:${MAVEN_HOME}/bin && \ - mvn clean install -P${{ github.event.inputs.spark }} -Dhadoop.version=${{ github.event.inputs.hadoop }} -Pbackends-velox -Pceleborn -DskipTests -Dmaven.source.skip - - name: Upload bundle package - uses: actions/upload-artifact@v2 - with: - name: gluten-velox-bundle-package - path: package/target/gluten-velox-bundle-*.jar - retention-days: 7 diff --git a/.github/workflows/velox_backend.yml b/.github/workflows/velox_backend.yml index ba5d4f96ff14..6e9d0eab7c70 100644 --- a/.github/workflows/velox_backend.yml +++ b/.github/workflows/velox_backend.yml @@ -993,16 +993,14 @@ jobs: pip3 install setuptools && \ pip3 install pyspark==3.5.3 cython && \ pip3 install pandas pyarrow - - name: (To be fixed) Build and Run unit test for Spark 3.5.3 (other tests) - continue-on-error: true + - name: Build and Run unit test for Spark 3.5.3 (other tests) run: | cd $GITHUB_WORKSPACE/ export SPARK_SCALA_VERSION=2.12 $MVN_CMD clean test -Pspark-3.5 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut \ -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark35/spark_home/ -Dspark.gluten.ras.enabled=true" \ -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.SkipTestTags - - name: (To be enabled) Upload test report - if: false + - name: Upload test report uses: actions/upload-artifact@v4 with: name: test-report-spark35-ras @@ -1035,15 +1033,13 @@ jobs: - name: Prepare spark.test.home for Spark 3.5.3 (slow tests) run: | bash .github/workflows/util/install_spark_resources.sh 3.5 - - name: (To be fixed) Build and Run unit test for Spark 3.5.3 (slow tests) - continue-on-error: true + - name: Build and Run unit test for Spark 3.5.3 (slow tests) run: | cd $GITHUB_WORKSPACE/ $MVN_CMD clean test -Pspark-3.5 -Pbackends-velox -Pceleborn -Piceberg -Pdelta -Pspark-ut \ -DargLine="-Dspark.test.home=$GITHUB_WORKSPACE//shims/spark35/spark_home/ -Dspark.gluten.ras.enabled=true" \ -DtagsToInclude=org.apache.spark.tags.ExtendedSQLTest - - name: (To be enabled) Upload test report - if: false + - name: Upload test report uses: actions/upload-artifact@v4 with: name: test-report-spark35-slow-ras diff --git a/backends-clickhouse/src/main/java/org/apache/gluten/execution/CHNativeCacheManager.java b/backends-clickhouse/src/main/java/org/apache/gluten/execution/CHNativeCacheManager.java index 4033d8c6b1cc..7c89cf6b41ad 100644 --- a/backends-clickhouse/src/main/java/org/apache/gluten/execution/CHNativeCacheManager.java +++ b/backends-clickhouse/src/main/java/org/apache/gluten/execution/CHNativeCacheManager.java @@ -19,11 +19,12 @@ import java.util.Set; public class CHNativeCacheManager { - public static String cacheParts(String table, Set columns) { - return nativeCacheParts(table, String.join(",", columns)); + public static String cacheParts(String table, Set columns, boolean onlyMetaCache) { + return nativeCacheParts(table, String.join(",", columns), onlyMetaCache); } - private static native String nativeCacheParts(String table, String columns); + private static native String nativeCacheParts( + String table, String columns, boolean onlyMetaCache); public static CacheResult getCacheStatus(String jobId) { return nativeGetCacheStatus(jobId); diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala index 59d912d8e75d..9a1b00f71431 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHBackend.scala @@ -142,10 +142,11 @@ object CHBackendSettings extends BackendSettingsApi with Logging { .toLowerCase(Locale.getDefault) } - override def validateScan( + override def validateScanExec( format: ReadFileFormat, fields: Array[StructField], - rootPaths: Seq[String]): ValidationResult = { + rootPaths: Seq[String], + properties: Map[String, String]): ValidationResult = { // Validate if all types are supported. def hasComplexType: Boolean = { diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala index 91698d4cde85..dea0d50c9da6 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHRuleApi.scala @@ -86,7 +86,8 @@ private object CHRuleApi { injector.injectTransform( c => intercept( - SparkPlanRules.extendedColumnarRule(c.conf.extendedColumnarTransformRules)(c.session))) + SparkPlanRules.extendedColumnarRule(c.glutenConf.extendedColumnarTransformRules)( + c.session))) injector.injectTransform(c => InsertTransitions(c.outputsColumnar)) // Gluten columnar: Fallback policies. @@ -98,14 +99,15 @@ private object CHRuleApi { SparkShimLoader.getSparkShims .getExtendedColumnarPostRules() .foreach(each => injector.injectPost(c => intercept(each(c.session)))) - injector.injectPost(c => ColumnarCollapseTransformStages(c.conf)) + injector.injectPost(c => ColumnarCollapseTransformStages(c.glutenConf)) injector.injectTransform( c => - intercept(SparkPlanRules.extendedColumnarRule(c.conf.extendedColumnarPostRules)(c.session))) + intercept( + SparkPlanRules.extendedColumnarRule(c.glutenConf.extendedColumnarPostRules)(c.session))) // Gluten columnar: Final rules. injector.injectFinal(c => RemoveGlutenTableCacheColumnarToRow(c.session)) - injector.injectFinal(c => GlutenFallbackReporter(c.conf, c.session)) + injector.injectFinal(c => GlutenFallbackReporter(c.glutenConf, c.session)) injector.injectFinal(_ => RemoveFallbackTagRule()) } diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHValidatorApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHValidatorApi.scala index 5fe740694624..eed493cffe1e 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHValidatorApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHValidatorApi.scala @@ -28,7 +28,7 @@ import org.apache.gluten.vectorized.CHNativeExpressionEvaluator import org.apache.spark.internal.Logging import org.apache.spark.shuffle.utils.RangePartitionerBoundsGenerator -import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning} import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper @@ -71,6 +71,7 @@ class CHValidatorApi extends ValidatorApi with AdaptiveSparkPlanHelper with Logg } override def doColumnarShuffleExchangeExecValidate( + outputAttributes: Seq[Attribute], outputPartitioning: Partitioning, child: SparkPlan): Option[String] = { val outputAttributes = child.output diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcastHashJoinRules.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcastHashJoinRules.scala index ec465a3c1506..207bb0e3a4d7 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcastHashJoinRules.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/FallbackBroadcastHashJoinRules.scala @@ -38,7 +38,7 @@ import scala.util.control.Breaks.{break, breakable} // queryStagePrepRules. case class FallbackBroadcastHashJoinPrepQueryStage(session: SparkSession) extends Rule[SparkPlan] { override def apply(plan: SparkPlan): SparkPlan = { - val columnarConf: GlutenConfig = GlutenConfig.getConf + val glutenConf: GlutenConfig = GlutenConfig.getConf plan.foreach { case bhj: BroadcastHashJoinExec => val buildSidePlan = bhj.buildSide match { @@ -53,8 +53,8 @@ case class FallbackBroadcastHashJoinPrepQueryStage(session: SparkSession) extend case Some(exchange @ BroadcastExchangeExec(mode, child)) => val isTransformable = if ( - !columnarConf.enableColumnarBroadcastExchange || - !columnarConf.enableColumnarBroadcastJoin + !glutenConf.enableColumnarBroadcastExchange || + !glutenConf.enableColumnarBroadcastJoin ) { ValidationResult.failed( "columnar broadcast exchange is disabled or " + diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/MergeTwoPhasesHashBaseAggregate.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/MergeTwoPhasesHashBaseAggregate.scala index 63c5fe017f5e..a10659b6d5e7 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/extension/MergeTwoPhasesHashBaseAggregate.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/extension/MergeTwoPhasesHashBaseAggregate.scala @@ -38,9 +38,9 @@ case class MergeTwoPhasesHashBaseAggregate(session: SparkSession) extends Rule[SparkPlan] with Logging { - val columnarConf: GlutenConfig = GlutenConfig.getConf - val scanOnly: Boolean = columnarConf.enableScanOnly - val enableColumnarHashAgg: Boolean = !scanOnly && columnarConf.enableColumnarHashAgg + val glutenConf: GlutenConfig = GlutenConfig.getConf + val scanOnly: Boolean = glutenConf.enableScanOnly + val enableColumnarHashAgg: Boolean = !scanOnly && glutenConf.enableColumnarHashAgg val replaceSortAggWithHashAgg: Boolean = GlutenConfig.getConf.forceToUseHashAgg private def isPartialAgg(partialAgg: BaseAggregateExec, finalAgg: BaseAggregateExec): Boolean = { diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala index 559a22cb12c2..b9ae9ff363b6 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenExecutorEndpoint.scala @@ -70,9 +70,9 @@ class GlutenExecutorEndpoint(val executorId: String, val conf: SparkConf) } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { - case GlutenMergeTreeCacheLoad(mergeTreeTable, columns) => + case GlutenMergeTreeCacheLoad(mergeTreeTable, columns, onlyMetaCache) => try { - val jobId = CHNativeCacheManager.cacheParts(mergeTreeTable, columns) + val jobId = CHNativeCacheManager.cacheParts(mergeTreeTable, columns, onlyMetaCache) context.reply(CacheJobInfo(status = true, jobId)) } catch { case e: Exception => diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala index e596e94fed72..8127c324b79c 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/rpc/GlutenRpcMessages.scala @@ -36,7 +36,10 @@ object GlutenRpcMessages { extends GlutenRpcMessage // for mergetree cache - case class GlutenMergeTreeCacheLoad(mergeTreeTable: String, columns: util.Set[String]) + case class GlutenMergeTreeCacheLoad( + mergeTreeTable: String, + columns: util.Set[String], + onlyMetaCache: Boolean) extends GlutenRpcMessage case class GlutenCacheLoadStatus(jobId: String) diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala index 1c7b4f232205..69a8c4218714 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/commands/GlutenCHCacheDataCommand.scala @@ -201,7 +201,7 @@ case class GlutenCHCacheDataCommand( ( executorId, executor.executorEndpointRef.ask[CacheJobInfo]( - GlutenMergeTreeCacheLoad(tableMessage, selectedColumns.toSet.asJava) + GlutenMergeTreeCacheLoad(tableMessage, selectedColumns.toSet.asJava, onlyMetaCache) ))) }) } else { @@ -213,7 +213,7 @@ case class GlutenCHCacheDataCommand( ( value._1, executorData.executorEndpointRef.ask[CacheJobInfo]( - GlutenMergeTreeCacheLoad(value._2, selectedColumns.toSet.asJava) + GlutenMergeTreeCacheLoad(value._2, selectedColumns.toSet.asJava, onlyMetaCache) ))) }) } diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseFileFormatSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseFileFormatSuite.scala index e8ddbd12f1fc..88a34a786a8c 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseFileFormatSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseFileFormatSuite.scala @@ -1461,4 +1461,26 @@ class GlutenClickHouseFileFormatSuite spark.createDataFrame(data, schema).toDF().write.parquet(fileName) fileName } + + /** TODO: fix the issue and test in spark 3.5 */ + testSparkVersionLE33("write into hdfs") { + + /** + * There is a bug in pipeline write to HDFS; when a pipeline returns column batch, it doesn't + * close the hdfs file, and hence the file is not flushed.HDFS file is closed when LocalExecutor + * is destroyed, but before that, the file moved by spark committer. + */ + val tableName = "write_into_hdfs" + val tablePath = s"$HDFS_URL_ENDPOINT/$SPARK_DIR_NAME/$tableName/" + val format = "parquet" + val sql = + s""" + | select * + | from $format.`$tablePath` + | where long_field > 30 + |""".stripMargin + withSQLConf(("spark.gluten.sql.native.writer.enabled", "true")) { + testFileFormatBase(tablePath, format, sql, df => {}) + } + } } diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSuite.scala index a56f45d1ba3d..8dc178e46ce5 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseTPCHSuite.scala @@ -563,5 +563,12 @@ class GlutenClickHouseTPCHSuite extends GlutenClickHouseTPCHAbstractSuite { compareResultsAgainstVanillaSpark(sql, true, { _ => }) spark.sql("drop table t1") } + + test("GLUTEN-7780 fix split diff") { + val sql = "select split(concat('a|b|c', cast(id as string)), '\\|')" + + ", split(concat('a|b|c', cast(id as string)), '\\\\|')" + + ", split(concat('a|b|c', cast(id as string)), '|') from range(10)" + compareResultsAgainstVanillaSpark(sql, true, { _ => }) + } } // scalastyle:off line.size.limit diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala index b3e1bd21e957..272323c48732 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseWholeStageTransformerSuite.scala @@ -35,26 +35,26 @@ class GlutenClickHouseWholeStageTransformerSuite extends WholeStageTransformerSu val DBL_RELAX_EPSILON: Double = Math.pow(10, -11) val FLT_EPSILON = 1.19209290e-07f - protected val sparkVersion: String = { + private val sparkVersion: String = { val version = SPARK_VERSION_SHORT.split("\\.") version(0) + "." + version(1) } + val SPARK_DIR_NAME: String = sparkVersion.replace(".", "-") - val S3_METADATA_PATH = s"/tmp/metadata/s3/$sparkVersion/" - val S3_CACHE_PATH = s"/tmp/s3_cache/$sparkVersion/" + val S3_METADATA_PATH = s"/tmp/metadata/s3/$SPARK_DIR_NAME/" + val S3_CACHE_PATH = s"/tmp/s3_cache/$SPARK_DIR_NAME/" val S3_ENDPOINT = "s3://127.0.0.1:9000/" val MINIO_ENDPOINT: String = S3_ENDPOINT.replace("s3", "http") - val SPARK_DIR_NAME: String = sparkVersion.replace(".", "-") val BUCKET_NAME: String = SPARK_DIR_NAME val WHOLE_PATH: String = MINIO_ENDPOINT + BUCKET_NAME + "/" - val HDFS_METADATA_PATH = s"/tmp/metadata/hdfs/$sparkVersion/" - val HDFS_CACHE_PATH = s"/tmp/hdfs_cache/$sparkVersion/" + val HDFS_METADATA_PATH = s"/tmp/metadata/hdfs/$SPARK_DIR_NAME/" + val HDFS_CACHE_PATH = s"/tmp/hdfs_cache/$SPARK_DIR_NAME/" val HDFS_URL_ENDPOINT = "hdfs://127.0.0.1:8020" - val HDFS_URL = s"$HDFS_URL_ENDPOINT/$sparkVersion" + val HDFS_URL = s"$HDFS_URL_ENDPOINT/$SPARK_DIR_NAME" - val S3_ACCESS_KEY = "BypTYzcXOlfr03FFIvt4" - val S3_SECRET_KEY = "K9MDaGItPSaphorZM8t4hXf30gHF9dBWi6L2dK5E" + val S3_ACCESS_KEY = "minioadmin" + val S3_SECRET_KEY = "minioadmin" val CH_DEFAULT_STORAGE_DIR = "/data" diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseHiveTableSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseHiveTableSuite.scala index 4e2b5ad63e0a..8d311614c7de 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseHiveTableSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/hive/GlutenClickHouseHiveTableSuite.scala @@ -542,6 +542,37 @@ class GlutenClickHouseHiveTableSuite ) } + test("GLUTEN-7700: test hive table with partition values contain space") { + val tbl = "test_7700" + val create_table_sql = + s""" + |create table if not exists $tbl ( + | id int + |) partitioned by (itime string) + |stored as orc; + |""".stripMargin + val insert_sql = + s""" + |insert overwrite table $tbl partition (itime = '2024-10-24 10:02:04') + |select id from range(3) + |""".stripMargin + val select_sql = + s""" + |select * from $tbl + |""".stripMargin + val drop_sql = s"drop table if exists $tbl" + + spark.sql(create_table_sql) + spark.sql(insert_sql) + + compareResultsAgainstVanillaSpark( + select_sql, + compareResult = true, + df => assert(df.count() == 3) + ) + spark.sql(drop_sql) + } + test("test hive compressed txt table") { withSQLConf(SQLConf.FILES_MAX_PARTITION_BYTES.key -> "11") { Seq("DefaultCodec", "BZip2Codec").foreach { diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala index bf3be1e52907..a85a9094d38f 100644 --- a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/mergetree/GlutenClickHouseMergeTreeCacheDataSuite.scala @@ -134,7 +134,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite | aaa='ccc')""".stripMargin) .collect() assertResult(true)(res(0).getBoolean(0)) - val metaPath = new File(HDFS_METADATA_PATH + s"$sparkVersion/test/lineitem_mergetree_hdfs") + val metaPath = new File(HDFS_METADATA_PATH + s"$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs") assertResult(true)(metaPath.exists() && metaPath.isDirectory) assertResult(22)(metaPath.list().length) assert(countFiles(dataPath) > initial_cache_files) @@ -238,7 +238,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite | aaa='ccc')""".stripMargin) .collect() assertResult(true)(res(0).getBoolean(0)) - val metaPath = new File(HDFS_METADATA_PATH + s"$sparkVersion/test/lineitem_mergetree_hdfs") + val metaPath = new File(HDFS_METADATA_PATH + s"$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs") assertResult(true)(metaPath.exists() && metaPath.isDirectory) eventually(timeout(60.seconds), interval(2.seconds)) { assertResult(22)(metaPath.list().length) @@ -346,7 +346,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite | aaa='ccc')""".stripMargin) .collect() assertResult(true)(res(0).getBoolean(0)) - val metaPath = new File(HDFS_METADATA_PATH + s"$sparkVersion/test/lineitem_mergetree_hdfs") + val metaPath = new File(HDFS_METADATA_PATH + s"$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs") assertResult(true)(metaPath.exists() && metaPath.isDirectory) assertResult(22)(metaPath.list().length) assert(countFiles(dataPath) > initial_cache_files) @@ -439,7 +439,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite val dataPath = new File(HDFS_CACHE_PATH) val initial_cache_files = countFiles(dataPath) - val metaPath = new File(HDFS_METADATA_PATH + s"$sparkVersion/test/lineitem_mergetree_hdfs") + val metaPath = new File(HDFS_METADATA_PATH + s"$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs") val res1 = spark.sql(s"cache data select * from lineitem_mergetree_hdfs").collect() assertResult(true)(res1(0).getBoolean(0)) assertResult(1)(metaPath.list().length) @@ -539,7 +539,7 @@ class GlutenClickHouseMergeTreeCacheDataSuite | aaa='ccc')""".stripMargin) .collect() assertResult(true)(res(0).getBoolean(0)) - val metaPath = new File(HDFS_METADATA_PATH + s"$sparkVersion/test/lineitem_mergetree_hdfs") + val metaPath = new File(HDFS_METADATA_PATH + s"$SPARK_DIR_NAME/test/lineitem_mergetree_hdfs") assertResult(true)(metaPath.exists() && metaPath.isDirectory) assertResult(22)(metaPath.list().length) assert(countFiles(dataPath) > initial_cache_files) diff --git a/backends-velox/pom.xml b/backends-velox/pom.xml index 7cab49b25550..e6900e50dd70 100755 --- a/backends-velox/pom.xml +++ b/backends-velox/pom.xml @@ -72,12 +72,6 @@ spark-hive_${scala.binary.version} provided - - org.apache.gluten - gluten-substrait - ${project.version} - compile - org.apache.gluten gluten-arrow diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala index 251b93cc7cec..539059cdb6f8 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala @@ -35,12 +35,12 @@ import org.apache.spark.sql.catalyst.catalog.BucketSpec import org.apache.spark.sql.catalyst.expressions.{Alias, CumeDist, DenseRank, Descending, Expression, Lag, Lead, NamedExpression, NthValue, NTile, PercentRank, RangeFrame, Rank, RowNumber, SortOrder, SpecialFrameBoundary, SpecifiedWindowFrame} import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, ApproximatePercentile, Percentile} import org.apache.spark.sql.catalyst.plans.{JoinType, LeftOuter, RightOuter} -import org.apache.spark.sql.catalyst.util.CharVarcharUtils +import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, CharVarcharUtils} import org.apache.spark.sql.execution.{ColumnarCachedBatchSerializer, SparkPlan} import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.execution.command.CreateDataSourceTableAsSelectCommand import org.apache.spark.sql.execution.datasources.{FileFormat, InsertIntoHadoopFsRelationCommand} -import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat +import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions} import org.apache.spark.sql.hive.execution.HiveFileFormat import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -88,78 +88,94 @@ object VeloxBackendSettings extends BackendSettingsApi { val GLUTEN_VELOX_INTERNAL_UDF_LIB_PATHS = VeloxBackend.CONF_PREFIX + ".internal.udfLibraryPaths" val GLUTEN_VELOX_UDF_ALLOW_TYPE_CONVERSION = VeloxBackend.CONF_PREFIX + ".udfAllowTypeConversion" - val MAXIMUM_BATCH_SIZE: Int = 32768 - - override def validateScan( + override def validateScanExec( format: ReadFileFormat, fields: Array[StructField], - rootPaths: Seq[String]): ValidationResult = { - val filteredRootPaths = distinctRootPaths(rootPaths) - if ( - filteredRootPaths.nonEmpty && !VeloxFileSystemValidationJniWrapper - .allSupportedByRegisteredFileSystems(filteredRootPaths.toArray) - ) { - return ValidationResult.failed( - s"Scheme of [$filteredRootPaths] is not supported by registered file systems.") - } - // Validate if all types are supported. - def validateTypes(validatorFunc: PartialFunction[StructField, String]): ValidationResult = { - // Collect unsupported types. - val unsupportedDataTypeReason = fields.collect(validatorFunc) - if (unsupportedDataTypeReason.isEmpty) { - ValidationResult.succeeded + rootPaths: Seq[String], + properties: Map[String, String]): ValidationResult = { + + def validateScheme(): Option[String] = { + val filteredRootPaths = distinctRootPaths(rootPaths) + if ( + filteredRootPaths.nonEmpty && !VeloxFileSystemValidationJniWrapper + .allSupportedByRegisteredFileSystems(filteredRootPaths.toArray) + ) { + Some(s"Scheme of [$filteredRootPaths] is not supported by registered file systems.") } else { - ValidationResult.failed( - s"Found unsupported data type in $format: ${unsupportedDataTypeReason.mkString(", ")}.") + None } } - format match { - case ParquetReadFormat => - val typeValidator: PartialFunction[StructField, String] = { - // Parquet timestamp is not fully supported yet - case StructField(_, TimestampType, _, _) - if GlutenConfig.getConf.forceParquetTimestampTypeScanFallbackEnabled => - "TimestampType" - } - validateTypes(typeValidator) - case DwrfReadFormat => ValidationResult.succeeded - case OrcReadFormat => - if (!GlutenConfig.getConf.veloxOrcScanEnabled) { - ValidationResult.failed(s"Velox ORC scan is turned off.") + def validateFormat(): Option[String] = { + def validateTypes(validatorFunc: PartialFunction[StructField, String]): Option[String] = { + // Collect unsupported types. + val unsupportedDataTypeReason = fields.collect(validatorFunc) + if (unsupportedDataTypeReason.nonEmpty) { + Some( + s"Found unsupported data type in $format: ${unsupportedDataTypeReason.mkString(", ")}.") } else { + None + } + } + + def isCharType(stringType: StringType, metadata: Metadata): Boolean = { + val charTypePattern = "char\\((\\d+)\\)".r + GlutenConfig.getConf.forceOrcCharTypeScanFallbackEnabled && charTypePattern + .findFirstIn( + CharVarcharUtils + .getRawTypeString(metadata) + .getOrElse(stringType.catalogString)) + .isDefined + } + + format match { + case ParquetReadFormat => val typeValidator: PartialFunction[StructField, String] = { - case StructField(_, arrayType: ArrayType, _, _) - if arrayType.elementType.isInstanceOf[StructType] => - "StructType as element in ArrayType" - case StructField(_, arrayType: ArrayType, _, _) - if arrayType.elementType.isInstanceOf[ArrayType] => - "ArrayType as element in ArrayType" - case StructField(_, mapType: MapType, _, _) - if mapType.keyType.isInstanceOf[StructType] => - "StructType as Key in MapType" - case StructField(_, mapType: MapType, _, _) - if mapType.valueType.isInstanceOf[ArrayType] => - "ArrayType as Value in MapType" - case StructField(_, stringType: StringType, _, metadata) - if isCharType(stringType, metadata) => - CharVarcharUtils.getRawTypeString(metadata) + " not support" - case StructField(_, TimestampType, _, _) => "TimestampType not support" + // Parquet timestamp is not fully supported yet + case StructField(_, TimestampType, _, _) + if GlutenConfig.getConf.forceParquetTimestampTypeScanFallbackEnabled => + "TimestampType(force fallback)" } - validateTypes(typeValidator) - } - case _ => ValidationResult.failed(s"Unsupported file format for $format.") + val parquetOptions = new ParquetOptions(CaseInsensitiveMap(properties), SQLConf.get) + if (parquetOptions.mergeSchema) { + // https://github.com/apache/incubator-gluten/issues/7174 + Some(s"not support when merge schema is true") + } else { + validateTypes(typeValidator) + } + case DwrfReadFormat => None + case OrcReadFormat => + if (!GlutenConfig.getConf.veloxOrcScanEnabled) { + Some(s"Velox ORC scan is turned off, ${GlutenConfig.VELOX_ORC_SCAN_ENABLED.key}") + } else { + val typeValidator: PartialFunction[StructField, String] = { + case StructField(_, arrayType: ArrayType, _, _) + if arrayType.elementType.isInstanceOf[StructType] => + "StructType as element in ArrayType" + case StructField(_, arrayType: ArrayType, _, _) + if arrayType.elementType.isInstanceOf[ArrayType] => + "ArrayType as element in ArrayType" + case StructField(_, mapType: MapType, _, _) + if mapType.keyType.isInstanceOf[StructType] => + "StructType as Key in MapType" + case StructField(_, mapType: MapType, _, _) + if mapType.valueType.isInstanceOf[ArrayType] => + "ArrayType as Value in MapType" + case StructField(_, stringType: StringType, _, metadata) + if isCharType(stringType, metadata) => + CharVarcharUtils.getRawTypeString(metadata) + "(force fallback)" + case StructField(_, TimestampType, _, _) => "TimestampType" + } + validateTypes(typeValidator) + } + case _ => Some(s"Unsupported file format for $format.") + } } - } - def isCharType(stringType: StringType, metadata: Metadata): Boolean = { - val charTypePattern = "char\\((\\d+)\\)".r - GlutenConfig.getConf.forceOrcCharTypeScanFallbackEnabled && charTypePattern - .findFirstIn( - CharVarcharUtils - .getRawTypeString(metadata) - .getOrElse(stringType.catalogString)) - .isDefined + validateScheme().orElse(validateFormat()) match { + case Some(reason) => ValidationResult.failed(reason) + case _ => ValidationResult.succeeded + } } def distinctRootPaths(paths: Seq[String]): Seq[String] = { diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala index a838c463c390..3554bc5c9c01 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxRuleApi.scala @@ -80,11 +80,11 @@ private object VeloxRuleApi { SparkShimLoader.getSparkShims .getExtendedColumnarPostRules() .foreach(each => injector.injectPost(c => each(c.session))) - injector.injectPost(c => ColumnarCollapseTransformStages(c.conf)) + injector.injectPost(c => ColumnarCollapseTransformStages(c.glutenConf)) // Gluten columnar: Final rules. injector.injectFinal(c => RemoveGlutenTableCacheColumnarToRow(c.session)) - injector.injectFinal(c => GlutenFallbackReporter(c.conf, c.session)) + injector.injectFinal(c => GlutenFallbackReporter(c.glutenConf, c.session)) injector.injectFinal(_ => RemoveFallbackTagRule()) } @@ -116,9 +116,9 @@ private object VeloxRuleApi { SparkShimLoader.getSparkShims .getExtendedColumnarPostRules() .foreach(each => injector.inject(c => each(c.session))) - injector.inject(c => ColumnarCollapseTransformStages(c.conf)) + injector.inject(c => ColumnarCollapseTransformStages(c.glutenConf)) injector.inject(c => RemoveGlutenTableCacheColumnarToRow(c.session)) - injector.inject(c => GlutenFallbackReporter(c.conf, c.session)) + injector.inject(c => GlutenFallbackReporter(c.glutenConf, c.session)) injector.inject(_ => RemoveFallbackTagRule()) } } diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala index 00a8f8cb0e09..ddf77e5fa3d4 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxValidatorApi.scala @@ -22,7 +22,7 @@ import org.apache.gluten.substrait.plan.PlanNode import org.apache.gluten.validate.NativePlanValidationInfo import org.apache.gluten.vectorized.NativePlanEvaluator -import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.types._ @@ -87,11 +87,16 @@ class VeloxValidatorApi extends ValidatorApi { } override def doColumnarShuffleExchangeExecValidate( + outputAttributes: Seq[Attribute], outputPartitioning: Partitioning, child: SparkPlan): Option[String] = { + if (outputAttributes.isEmpty) { + // See: https://github.com/apache/incubator-gluten/issues/7600. + return Some("Shuffle with empty output schema is not supported") + } if (child.output.isEmpty) { // See: https://github.com/apache/incubator-gluten/issues/7600. - return Some("Shuffle with empty schema is not supported") + return Some("Shuffle with empty input schema is not supported") } doSchemaValidate(child.schema) } diff --git a/cpp-ch/clickhouse.version b/cpp-ch/clickhouse.version index 6bdb05c33269..445cd99068a1 100644 --- a/cpp-ch/clickhouse.version +++ b/cpp-ch/clickhouse.version @@ -1,3 +1,3 @@ CH_ORG=Kyligence -CH_BRANCH=rebase_ch/20241030 -CH_COMMIT=847cfa6237c \ No newline at end of file +CH_BRANCH=rebase_ch/20241101 +CH_COMMIT=7cd7bb8ece2 \ No newline at end of file diff --git a/cpp-ch/local-engine/Functions/SparkFunctionHashingExtended.h b/cpp-ch/local-engine/Functions/SparkFunctionHashingExtended.h index c6499031492e..28757d62beea 100644 --- a/cpp-ch/local-engine/Functions/SparkFunctionHashingExtended.h +++ b/cpp-ch/local-engine/Functions/SparkFunctionHashingExtended.h @@ -568,7 +568,7 @@ static ALWAYS_INLINE uint32_t rotl32(uint32_t x, int8_t r) return (x << r) | (x >> (32 - r)); } -static void SparkMurmurHash3_x86_32(const void * key, size_t len, uint32_t seed, void * out) +static void SparkMurmurHash3_32_Impl(const void * key, size_t len, uint32_t seed, void * out) { const uint8_t * data = static_cast(key); const int nblocks = static_cast(len >> 2); @@ -599,8 +599,8 @@ static void SparkMurmurHash3_x86_32(const void * key, size_t len, uint32_t seed, uint32_t k1 = 0; while (tail != data + len) { - /// Notice: we must cast uint8_t to char, otherwise k1 is wrong. - k1 = static_cast(*tail); + /// Notice: we must use int8_t here, to compatible with all platforms (x86, arm...). + k1 = static_cast(*tail); k1 *= c1; k1 = rotl32(k1, 15); @@ -641,7 +641,7 @@ struct SparkMurmurHash3_32 UInt32 h; char bytes[sizeof(h)]; }; - SparkMurmurHash3_x86_32(data, size, static_cast(seed), bytes); + SparkMurmurHash3_32_Impl(data, size, static_cast(seed), bytes); return h; } }; diff --git a/cpp-ch/local-engine/Functions/SparkFunctionSplitByRegexp.cpp b/cpp-ch/local-engine/Functions/SparkFunctionSplitByRegexp.cpp new file mode 100644 index 000000000000..66f37c62033f --- /dev/null +++ b/cpp-ch/local-engine/Functions/SparkFunctionSplitByRegexp.cpp @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int ILLEGAL_COLUMN; +} + + +/** Functions that split strings into an array of strings or vice versa. + * + * splitByRegexp(regexp, s[, max_substrings]) + */ +namespace +{ + +using Pos = const char *; + +class SparkSplitByRegexpImpl +{ +private: + Regexps::RegexpPtr re; + OptimizedRegularExpression::MatchVec matches; + + Pos pos; + Pos end; + + std::optional max_splits; + size_t splits; + bool max_substrings_includes_remaining_string; + +public: + static constexpr auto name = "splitByRegexpSpark"; + + static bool isVariadic() { return true; } + static size_t getNumberOfArguments() { return 0; } + + static ColumnNumbers getArgumentsThatAreAlwaysConstant() { return {0, 2}; } + + static void checkArguments(const IFunction & func, const ColumnsWithTypeAndName & arguments) + { + checkArgumentsWithSeparatorAndOptionalMaxSubstrings(func, arguments); + } + + static constexpr auto strings_argument_position = 1uz; + + void init(const ColumnsWithTypeAndName & arguments, bool max_substrings_includes_remaining_string_) + { + const ColumnConst * col = checkAndGetColumnConstStringOrFixedString(arguments[0].column.get()); + + if (!col) + throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Illegal column {} of first argument of function {}. " + "Must be constant string.", arguments[0].column->getName(), name); + + if (!col->getValue().empty()) + re = std::make_shared(Regexps::createRegexp(col->getValue())); + + max_substrings_includes_remaining_string = max_substrings_includes_remaining_string_; + max_splits = extractMaxSplits(arguments, 2); + } + + /// Called for each next string. + void set(Pos pos_, Pos end_) + { + pos = pos_; + end = end_; + splits = 0; + } + + /// Get the next token, if any, or return false. + bool get(Pos & token_begin, Pos & token_end) + { + if (!re) + { + if (pos == end) + return false; + + token_begin = pos; + + if (max_splits) + { + if (max_substrings_includes_remaining_string) + { + if (splits == *max_splits - 1) + { + token_end = end; + pos = end; + return true; + } + } + else + if (splits == *max_splits) + return false; + } + + ++pos; + token_end = pos; + ++splits; + } + else + { + if (!pos || pos > end) + return false; + + token_begin = pos; + + if (max_splits) + { + if (max_substrings_includes_remaining_string) + { + if (splits == *max_splits - 1) + { + token_end = end; + pos = nullptr; + return true; + } + } + else + if (splits == *max_splits) + return false; + } + + auto res = re->match(pos, end - pos, matches); + if (!res) + { + token_end = end; + pos = end + 1; + } + else if (!matches[0].length) + { + /// If match part is empty, increment position to avoid infinite loop. + token_end = (pos == end ? end : pos + 1); + ++pos; + ++splits; + } + else + { + token_end = pos + matches[0].offset; + pos = token_end + matches[0].length; + ++splits; + } + } + + return true; + } +}; + +using SparkFunctionSplitByRegexp = FunctionTokens; + +/// Fallback splitByRegexp to splitByChar when its 1st argument is a trivial char for better performance +class SparkSplitByRegexpOverloadResolver : public IFunctionOverloadResolver +{ +public: + static constexpr auto name = "splitByRegexpSpark"; + static FunctionOverloadResolverPtr create(ContextPtr context) { return std::make_unique(context); } + + explicit SparkSplitByRegexpOverloadResolver(ContextPtr context_) + : context(context_) + , split_by_regexp(SparkFunctionSplitByRegexp::create(context)) {} + + String getName() const override { return name; } + size_t getNumberOfArguments() const override { return SparkSplitByRegexpImpl::getNumberOfArguments(); } + bool isVariadic() const override { return SparkSplitByRegexpImpl::isVariadic(); } + + FunctionBasePtr buildImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & return_type) const override + { + if (patternIsTrivialChar(arguments)) + return FunctionFactory::instance().getImpl("splitByChar", context)->build(arguments); + return std::make_unique( + split_by_regexp, collections::map(arguments, [](const auto & elem) { return elem.type; }), return_type); + } + + DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override + { + return split_by_regexp->getReturnTypeImpl(arguments); + } + +private: + bool patternIsTrivialChar(const ColumnsWithTypeAndName & arguments) const + { + if (!arguments[0].column.get()) + return false; + const ColumnConst * col = checkAndGetColumnConstStringOrFixedString(arguments[0].column.get()); + if (!col) + return false; + + String pattern = col->getValue(); + if (pattern.size() == 1) + { + OptimizedRegularExpression re = Regexps::createRegexp(pattern); + + std::string required_substring; + bool is_trivial; + bool required_substring_is_prefix; + re.getAnalyzeResult(required_substring, is_trivial, required_substring_is_prefix); + return is_trivial && required_substring == pattern; + } + return false; + } + + ContextPtr context; + FunctionPtr split_by_regexp; +}; +} + +REGISTER_FUNCTION(SparkSplitByRegexp) +{ + factory.registerFunction(); +} + +} diff --git a/cpp-ch/local-engine/IO/SplittableBzip2ReadBuffer.cpp b/cpp-ch/local-engine/IO/SplittableBzip2ReadBuffer.cpp index 779e79416fb2..ba36baaf4cb1 100644 --- a/cpp-ch/local-engine/IO/SplittableBzip2ReadBuffer.cpp +++ b/cpp-ch/local-engine/IO/SplittableBzip2ReadBuffer.cpp @@ -192,7 +192,7 @@ SplittableBzip2ReadBuffer::SplittableBzip2ReadBuffer( changeStateToProcessABlock(); LOG_DEBUG( getLogger("SplittableBzip2ReadBuffer"), - "adjusted_start: {} first_block_need_special_process: {} last_block_need_special_process: {}", + "adjusted_start:{} first_block_need_special_process:{} last_block_need_special_process:{}", *adjusted_start, first_block_need_special_process, last_block_need_special_process); @@ -217,8 +217,6 @@ Int32 SplittableBzip2ReadBuffer::read(char * dest, size_t dest_size, size_t offs result = b; skipResult = skipToNextMarker(SplittableBzip2ReadBuffer::BLOCK_DELIMITER, DELIMITER_BIT_LENGTH); - // auto * seekable = dynamic_cast(in.get()); - // std::cout << "skipResult:" << skipResult << " position:" << seekable->getPosition() << " b:" << b << std::endl; changeStateToProcessABlock(); } return result; @@ -413,7 +411,13 @@ bool SplittableBzip2ReadBuffer::skipToNextMarker(Int64 marker, Int32 markerBitLe void SplittableBzip2ReadBuffer::reportCRCError() { - throw Exception(ErrorCodes::LOGICAL_ERROR, "CRC error"); + auto * seekable = dynamic_cast(in.get()); + throw Exception( + ErrorCodes::LOGICAL_ERROR, + "CRC error in position:{} computedBlockCRC:{} storedBlockCRC:{}", + seekable->getPosition(), + computedBlockCRC, + storedBlockCRC); } void SplittableBzip2ReadBuffer::makeMaps() @@ -440,6 +444,8 @@ void SplittableBzip2ReadBuffer::changeStateToProcessABlock() void SplittableBzip2ReadBuffer::initBlock() { + auto * seekable = dynamic_cast(in.get()); + size_t position = seekable->getPosition(); storedBlockCRC = bsGetInt(); blockRandomised = (bsR(1) == 1); @@ -914,7 +920,7 @@ void SplittableBzip2ReadBuffer::setupRandPartB() } else if (++su_count >= 4) { - su_z = static_cast(data->ll8[su_tPos] & 0xff); + su_z = data->ll8[su_tPos] & 0xff; su_tPos = data->tt[su_tPos]; if (su_rNToGo == 0) { @@ -965,7 +971,7 @@ void SplittableBzip2ReadBuffer::setupNoRandPartB() } else if (++su_count >= 4) { - su_z = static_cast(data->ll8[su_tPos] & 0xff); + su_z = data->ll8[su_tPos] & 0xff; su_tPos = data->tt[su_tPos]; su_j2 = 0; setupNoRandPartC(); diff --git a/cpp-ch/local-engine/IO/SplittableBzip2ReadBuffer.h b/cpp-ch/local-engine/IO/SplittableBzip2ReadBuffer.h index 93a7ca64df27..375a6c885812 100644 --- a/cpp-ch/local-engine/IO/SplittableBzip2ReadBuffer.h +++ b/cpp-ch/local-engine/IO/SplittableBzip2ReadBuffer.h @@ -23,6 +23,7 @@ #include #include #include +#include namespace DB { @@ -157,6 +158,7 @@ class SplittableBzip2ReadBuffer : public CompressedReadBufferWrapper if (temp < 0) temp = 256 + temp; globalCrc = (globalCrc << 8) ^ static_cast(crc32Table[temp]); + // std::cout << "input:" << inCh << " crc:" << globalCrc << std::endl; } void updateCRC(Int32 inCh, Int32 repeat) { @@ -237,7 +239,7 @@ class SplittableBzip2ReadBuffer : public CompressedReadBufferWrapper Int32 su_rNToGo; Int32 su_rTPos; Int32 su_tPos; - char su_z; + UInt16 su_z; /// SplittableBzip2ReadBuffer will skip bytes before the first block header. adjusted_start records file position after skipping. /// It is only valid when input stream is seekable and block header could be found in input stream. diff --git a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp index 32ead1070880..9799933b3385 100644 --- a/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp +++ b/cpp-ch/local-engine/Parser/SerializedPlanParser.cpp @@ -229,11 +229,10 @@ std::unique_ptr SerializedPlanParser::createExecutor(const substr QueryPlanPtr SerializedPlanParser::parseOp(const substrait::Rel & rel, std::list & rel_stack) { - DB::QueryPlanPtr query_plan; auto rel_parser = RelParserFactory::instance().getBuilder(rel.rel_type_case())(parser_context); auto all_input_rels = rel_parser->getInputs(rel); - assert(all_input_rels.size() == 1 || all_input_rels.size() == 2); + assert(all_input_rels.size() == 0 || all_input_rels.size() == 1 || all_input_rels.size() == 2); std::vector input_query_plans; rel_stack.push_back(&rel); for (const auto * input_rel : all_input_rels) @@ -276,7 +275,7 @@ QueryPlanPtr SerializedPlanParser::parseOp(const substrait::Rel & rel, std::list } } - query_plan = rel_parser->parse(input_query_plans, rel, rel_stack); + DB::QueryPlanPtr query_plan = rel_parser->parse(input_query_plans, rel, rel_stack); for (auto & extra_plan : rel_parser->extraPlans()) { extra_plan_holder.push_back(std::move(extra_plan)); diff --git a/cpp-ch/local-engine/Parser/scalar_function_parser/split.cpp b/cpp-ch/local-engine/Parser/scalar_function_parser/split.cpp index ed17c27eade9..3ffd64decb92 100644 --- a/cpp-ch/local-engine/Parser/scalar_function_parser/split.cpp +++ b/cpp-ch/local-engine/Parser/scalar_function_parser/split.cpp @@ -19,14 +19,14 @@ namespace local_engine { -class SparkFunctionSplitParser : public FunctionParser +class FunctionSplitParser : public FunctionParser { public: - SparkFunctionSplitParser(ParserContextPtr parser_context_) : FunctionParser(parser_context_) {} - ~SparkFunctionSplitParser() override = default; + FunctionSplitParser(ParserContextPtr parser_context_) : FunctionParser(parser_context_) {} + ~FunctionSplitParser() override = default; static constexpr auto name = "split"; String getName() const override { return name; } - String getCHFunctionName(const substrait::Expression_ScalarFunction &) const override { return "splitByRegexp"; } + String getCHFunctionName(const substrait::Expression_ScalarFunction &) const override { return "splitByRegexpSpark"; } const DB::ActionsDAG::Node * parse(const substrait::Expression_ScalarFunction & substrait_func, DB::ActionsDAG & actions_dag) const override { @@ -35,7 +35,7 @@ class SparkFunctionSplitParser : public FunctionParser for (const auto & arg : args) parsed_args.emplace_back(parseExpression(actions_dag, arg.value())); /// In Spark: split(str, regex [, limit] ) - /// In CH: splitByRegexp(regexp, str [, limit]) + /// In CH: splitByRegexpSpark(regexp, str [, limit]) if (parsed_args.size() >= 2) std::swap(parsed_args[0], parsed_args[1]); auto ch_function_name = getCHFunctionName(substrait_func); @@ -43,6 +43,6 @@ class SparkFunctionSplitParser : public FunctionParser return convertNodeTypeIfNeeded(substrait_func, func_node, actions_dag); } }; -static FunctionParserRegister register_split; +static FunctionParserRegister register_split; } diff --git a/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp b/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp index bb688724c3d4..e2ba48e9d272 100644 --- a/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp +++ b/cpp-ch/local-engine/Storages/Cache/CacheManager.cpp @@ -81,17 +81,31 @@ struct CacheJobContext MergeTreeTableInstance table; }; -Task CacheManager::cachePart(const MergeTreeTableInstance & table, const MergeTreePart & part, const std::unordered_set & columns) +Task CacheManager::cachePart( + const MergeTreeTableInstance & table, const MergeTreePart & part, const std::unordered_set & columns, bool only_meta_cache) { CacheJobContext job_context{table}; job_context.table.parts.clear(); job_context.table.parts.push_back(part); job_context.table.snapshot_id = ""; - Task task = [job_detail = job_context, context = this->context, read_columns = columns]() + Task task = [job_detail = job_context, context = this->context, read_columns = columns, only_meta_cache]() { try { auto storage = job_detail.table.restoreStorage(context); + std::vector selected_parts + = StorageMergeTreeFactory::getDataPartsByNames(storage->getStorageID(), "", {job_detail.table.parts.front().name}); + + if (only_meta_cache) + { + LOG_INFO( + getLogger("CacheManager"), + "Load meta cache of table {}.{} part {} success.", + job_detail.table.database, + job_detail.table.table, + job_detail.table.parts.front().name); + return; + } auto storage_snapshot = std::make_shared(*storage, storage->getInMemoryMetadataPtr()); NamesAndTypesList names_and_types_list; @@ -102,8 +116,6 @@ Task CacheManager::cachePart(const MergeTreeTableInstance & table, const MergeTr names_and_types_list.push_back(NameAndTypePair(column.name, column.type)); } auto query_info = buildQueryInfo(names_and_types_list); - std::vector selected_parts - = StorageMergeTreeFactory::getDataPartsByNames(storage->getStorageID(), "", {job_detail.table.parts.front().name}); auto read_step = storage->reader.readFromParts( selected_parts, storage->getMutationsSnapshot({}), @@ -135,13 +147,13 @@ Task CacheManager::cachePart(const MergeTreeTableInstance & table, const MergeTr return std::move(task); } -JobId CacheManager::cacheParts(const MergeTreeTableInstance & table, const std::unordered_set& columns) +JobId CacheManager::cacheParts(const MergeTreeTableInstance & table, const std::unordered_set& columns, bool only_meta_cache) { JobId id = toString(UUIDHelpers::generateV4()); Job job(id); for (const auto & part : table.parts) { - job.addTask(cachePart(table, part, columns)); + job.addTask(cachePart(table, part, columns, only_meta_cache)); } auto& scheduler = JobScheduler::instance(); scheduler.scheduleJob(std::move(job)); diff --git a/cpp-ch/local-engine/Storages/Cache/CacheManager.h b/cpp-ch/local-engine/Storages/Cache/CacheManager.h index b59963ec4fa7..8fd26d249abc 100644 --- a/cpp-ch/local-engine/Storages/Cache/CacheManager.h +++ b/cpp-ch/local-engine/Storages/Cache/CacheManager.h @@ -40,7 +40,7 @@ class CacheManager static CacheManager & instance(); static void initialize(const DB::ContextMutablePtr & context); - JobId cacheParts(const MergeTreeTableInstance & table, const std::unordered_set & columns); + JobId cacheParts(const MergeTreeTableInstance & table, const std::unordered_set & columns, bool only_meta_cache); static jobject getCacheStatus(JNIEnv * env, const String & jobId); Task cacheFile(const substrait::ReadRel::LocalFiles::FileOrFiles & file, ReadBufferBuilderPtr read_buffer_builder); @@ -48,7 +48,8 @@ class CacheManager static void removeFiles(String file, String cache_name); private: - Task cachePart(const MergeTreeTableInstance & table, const MergeTreePart & part, const std::unordered_set & columns); + Task cachePart( + const MergeTreeTableInstance & table, const MergeTreePart & part, const std::unordered_set & columns, bool only_meta_cache); CacheManager() = default; DB::ContextMutablePtr context; }; diff --git a/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp b/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp index 9c90c67f69b3..45be9dcf7442 100644 --- a/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp +++ b/cpp-ch/local-engine/Storages/MergeTree/SparkStorageMergeTree.cpp @@ -22,6 +22,12 @@ #include #include +namespace ProfileEvents +{ +extern const Event LoadedDataParts; +extern const Event LoadedDataPartsMicroseconds; +} + namespace DB { namespace MergeTreeSetting @@ -176,6 +182,7 @@ void SparkStorageMergeTree::prefetchMetaDataFile(std::unordered_set std::vector SparkStorageMergeTree::loadDataPartsWithNames(const std::unordered_set & parts) { + Stopwatch watch; prefetchMetaDataFile(parts); std::vector data_parts; const auto disk = getStoragePolicy()->getDisks().at(0); @@ -187,6 +194,10 @@ std::vector SparkStorageMergeTree::loadDataPartsWithNames( data_parts.emplace_back(res.part); } + watch.stop(); + LOG_DEBUG(log, "Loaded data parts ({} items) took {} microseconds", parts.size(), watch.elapsedMicroseconds()); + ProfileEvents::increment(ProfileEvents::LoadedDataParts, parts.size()); + ProfileEvents::increment(ProfileEvents::LoadedDataPartsMicroseconds, watch.elapsedMicroseconds()); return data_parts; } diff --git a/cpp-ch/local-engine/Storages/Output/WriteBufferBuilder.cpp b/cpp-ch/local-engine/Storages/Output/WriteBufferBuilder.cpp index ea93480b7683..c03b1918d271 100644 --- a/cpp-ch/local-engine/Storages/Output/WriteBufferBuilder.cpp +++ b/cpp-ch/local-engine/Storages/Output/WriteBufferBuilder.cpp @@ -29,7 +29,7 @@ namespace DB { namespace ErrorCodes { - extern const int BAD_ARGUMENTS; +extern const int BAD_ARGUMENTS; } } @@ -47,7 +47,7 @@ class LocalFileWriteBufferBuilder : public WriteBufferBuilder Poco::URI file_uri(file_uri_); const String & file_path = file_uri.getPath(); - //mkdir + // mkdir std::filesystem::path p(file_path); if (!std::filesystem::exists(p.parent_path())) std::filesystem::create_directories(p.parent_path()); @@ -78,14 +78,19 @@ class HDFSFileWriteBufferBuilder : public WriteBufferBuilder auto builder = DB::createHDFSBuilder(new_file_uri, context->getConfigRef()); auto fs = DB::createHDFSFS(builder.get()); - auto first = new_file_uri.find('/', new_file_uri.find("//") + 2); - auto last = new_file_uri.find_last_of('/'); - auto dir = new_file_uri.substr(first, last - first); + + auto begin_of_path = new_file_uri.find('/', new_file_uri.find("//") + 2); + auto url_without_path = new_file_uri.substr(0, begin_of_path); + + // use uri.getPath() instead of new_file_uri.substr(begin_of_path) to avoid space character uri-encoded + std::filesystem::path file_path(uri.getPath()); + auto dir = file_path.parent_path().string(); + if (hdfsCreateDirectory(fs.get(), dir.c_str())) throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Cannot create dir for {} because {}", dir, std::string(hdfsGetLastError())); DB::WriteSettings write_settings; - return std::make_unique(new_file_uri, context->getConfigRef(), 0, write_settings); + return std::make_unique(url_without_path, file_path.string(), context->getConfigRef(), 0, write_settings); } }; #endif diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp index 02e606e1edc8..8ff8a866b7ca 100644 --- a/cpp-ch/local-engine/local_engine_jni.cpp +++ b/cpp-ch/local-engine/local_engine_jni.cpp @@ -1340,8 +1340,8 @@ JNIEXPORT void Java_org_apache_gluten_utils_TestExceptionUtils_generateNativeExc } -JNIEXPORT jstring -Java_org_apache_gluten_execution_CHNativeCacheManager_nativeCacheParts(JNIEnv * env, jobject, jstring table_, jstring columns_) +JNIEXPORT jstring Java_org_apache_gluten_execution_CHNativeCacheManager_nativeCacheParts( + JNIEnv * env, jobject, jstring table_, jstring columns_, jboolean only_meta_cache_) { LOCAL_ENGINE_JNI_METHOD_START auto table_def = jstring2string(env, table_); @@ -1351,7 +1351,7 @@ Java_org_apache_gluten_execution_CHNativeCacheManager_nativeCacheParts(JNIEnv * for (const auto & col : tokenizer) column_set.insert(col); local_engine::MergeTreeTableInstance table(table_def); - auto id = local_engine::CacheManager::instance().cacheParts(table, column_set); + auto id = local_engine::CacheManager::instance().cacheParts(table, column_set, only_meta_cache_); return local_engine::charTojstring(env, id.c_str()); LOCAL_ENGINE_JNI_METHOD_END(env, nullptr); } diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index 45f19c25c749..6a0a5b0057f7 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -1079,6 +1079,10 @@ JNIEXPORT jobject JNICALL Java_org_apache_gluten_vectorized_ColumnarBatchSeriali auto serializer = ctx->createColumnarBatchSerializer(nullptr); auto buffer = serializer->serializeColumnarBatches(batches); auto bufferArr = env->NewByteArray(buffer->size()); + GLUTEN_CHECK( + bufferArr != nullptr, + "Cannot construct a byte array of size " + std::to_string(buffer->size()) + + " byte(s) to serialize columnar batches"); env->SetByteArrayRegion(bufferArr, 0, buffer->size(), reinterpret_cast(buffer->data())); jobject columnarBatchSerializeResult = diff --git a/cpp/velox/compute/WholeStageResultIterator.cc b/cpp/velox/compute/WholeStageResultIterator.cc index 5ece7179b97f..29f467c5e761 100644 --- a/cpp/velox/compute/WholeStageResultIterator.cc +++ b/cpp/velox/compute/WholeStageResultIterator.cc @@ -494,8 +494,8 @@ std::unordered_map WholeStageResultIterator::getQueryC std::to_string(veloxCfg_->get(kSpillWriteBufferSize, 4L * 1024 * 1024)); configs[velox::core::QueryConfig::kSpillStartPartitionBit] = std::to_string(veloxCfg_->get(kSpillStartPartitionBit, 29)); - configs[velox::core::QueryConfig::kJoinSpillPartitionBits] = - std::to_string(veloxCfg_->get(kSpillPartitionBits, 2)); + configs[velox::core::QueryConfig::kSpillNumPartitionBits] = + std::to_string(veloxCfg_->get(kSpillPartitionBits, 3)); configs[velox::core::QueryConfig::kSpillableReservationGrowthPct] = std::to_string(veloxCfg_->get(kSpillableReservationGrowthPct, 25)); configs[velox::core::QueryConfig::kSpillCompressionKind] = diff --git a/dev/build-thirdparty.sh b/dev/build-thirdparty.sh index 90fc5e58fbd5..ee827ef197f7 100755 --- a/dev/build-thirdparty.sh +++ b/dev/build-thirdparty.sh @@ -82,4 +82,4 @@ elif [ "$LINUX_OS" == "debian" ]; then fi fi cd $THIRDPARTY_LIB/ -jar cvf gluten-thirdparty-lib-$LINUX_OS-$VERSION-$ARCH.jar ./ +$JAVA_HOME/bin/jar cvf gluten-thirdparty-lib-$LINUX_OS-$VERSION-$ARCH.jar ./ diff --git a/docs/release.md b/docs/release.md index a3f20bde857e..99661c07c9a3 100644 --- a/docs/release.md +++ b/docs/release.md @@ -6,10 +6,18 @@ nav_order: 11 [Gluten](https://github.com/apache/incubator-gluten) is a plugin for Apache Spark to double SparkSQL's performance. -## Latest release for velox backend -* [Gluten-1.1.1](https://github.com/apache/incubator-gluten/releases/tag/v1.1.1) (Mar. 2 2024) +## Latest release for Velox backend +* [Gluten-1.2.0](https://github.com/apache/incubator-gluten/releases/tag/v1.2.0) (Sep. 3 2024) + +## Planned release for Velox backend +* [Gluten-1.2.1] (Nov. 30 2024) +* [Gluten-1.3.0] (Dec. 30 2024) +* [Gluten-1.3.1] (Jan. 30 2025) +* [Gluten-1.4.0] (Mar. 30 2025) +* To be updated ## Archived releases +* [Gluten-1.1.1](https://github.com/apache/incubator-gluten/releases/tag/v1.1.1) (Mar. 2 2024) * [Gluten-1.1.0](https://github.com/apache/incubator-gluten/releases/tag/v1.1.0) (Nov. 30 2023) * [Gluten-1.0.0](https://github.com/apache/incubator-gluten/releases/tag/v1.0.0) (Jul. 13 2023) * [Gluten-0.5.0](https://github.com/apache/incubator-gluten/releases/tag/0.5.0) (Apr. 7 2023) diff --git a/ep/build-velox/src/get_velox.sh b/ep/build-velox/src/get_velox.sh index 7ee8df7a8762..ebd2d69981ed 100755 --- a/ep/build-velox/src/get_velox.sh +++ b/ep/build-velox/src/get_velox.sh @@ -16,6 +16,7 @@ set -exu + VELOX_REPO=https://github.com/JkSelf/velox.git VELOX_BRANCH=libhdfs3-support VELOX_HOME="" diff --git a/ep/build-velox/src/setup-centos7.sh b/ep/build-velox/src/setup-centos7.sh index 87150634134b..45880161a4a5 100755 --- a/ep/build-velox/src/setup-centos7.sh +++ b/ep/build-velox/src/setup-centos7.sh @@ -242,7 +242,7 @@ dnf_install epel-release dnf-plugins-core # For ccache, ninja dnf_install ccache wget which libevent-devel \ yasm \ openssl-devel libzstd-devel lz4-devel double-conversion-devel \ - curl-devel libxml2-devel libgsasl-devel libuuid-devel patch + curl-devel libxml2-devel libgsasl-devel libuuid-devel patch libicu-devel $SUDO dnf remove -y gflags diff --git a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala index ecf13967e3ef..bf7b84c9b316 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/extension/columnar/ColumnarRuleApplier.scala @@ -31,7 +31,7 @@ object ColumnarRuleApplier { val session: SparkSession, val ac: AdaptiveContext, val outputsColumnar: Boolean) { - val conf: GlutenConfig = { + val glutenConf: GlutenConfig = { new GlutenConfig(session.sessionState.conf) } } diff --git a/gluten-core/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala b/gluten-core/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala index 44ff5bb9bcf2..c72a2680b5f7 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/softaffinity/SoftAffinityManager.scala @@ -50,7 +50,8 @@ abstract class AffinityManager extends LogLevelUtil with Logging { GlutenConfig.GLUTEN_SOFT_AFFINITY_DUPLICATE_READING_MAX_CACHE_ITEMS_DEFAULT_VALUE // (execId, host) list - val fixedIdForExecutors = new mutable.ListBuffer[Option[(String, String)]]() + private val idForExecutors = new mutable.ListBuffer[(String, String)]() + var sortedIdForExecutors = new mutable.ListBuffer[(String, String)]() // host list val nodesExecutorsMap = new mutable.HashMap[String, mutable.HashSet[String]]() @@ -96,27 +97,23 @@ abstract class AffinityManager extends LogLevelUtil with Logging { try { // first, check whether the execId exists if ( - !fixedIdForExecutors.exists( + !idForExecutors.exists( exec => { - exec.isDefined && exec.get._1.equals(execHostId._1) + exec._1.equals(execHostId._1) }) ) { val executorsSet = nodesExecutorsMap.getOrElseUpdate(execHostId._2, new mutable.HashSet[String]()) executorsSet.add(execHostId._1) - if (fixedIdForExecutors.exists(_.isEmpty)) { - // replace the executor which was removed - val replaceIdx = fixedIdForExecutors.indexWhere(_.isEmpty) - fixedIdForExecutors(replaceIdx) = Option(execHostId) - } else { - fixedIdForExecutors += Option(execHostId) - } + idForExecutors += execHostId + sortedIdForExecutors = idForExecutors.sortBy(_._2) totalRegisteredExecutors.addAndGet(1) } logOnLevel( GlutenConfig.getConf.softAffinityLogLevel, s"After adding executor ${execHostId._1} on host ${execHostId._2}, " + - s"fixedIdForExecutors is ${fixedIdForExecutors.mkString(",")}, " + + s"idForExecutors is ${idForExecutors.mkString(",")}, " + + s"sortedIdForExecutors is ${sortedIdForExecutors.mkString(",")}, " + s"nodesExecutorsMap is ${nodesExecutorsMap.keySet.mkString(",")}, " + s"actual executors count is ${totalRegisteredExecutors.intValue()}." ) @@ -128,29 +125,27 @@ abstract class AffinityManager extends LogLevelUtil with Logging { def handleExecutorRemoved(execId: String): Unit = { resourceRWLock.writeLock().lock() try { - val execIdx = fixedIdForExecutors.indexWhere( + val execIdx = idForExecutors.indexWhere( execHost => { - if (execHost.isDefined) { - execHost.get._1.equals(execId) - } else { - false - } + execHost._1.equals(execId) }) if (execIdx != -1) { - val findedExecId = fixedIdForExecutors(execIdx) - fixedIdForExecutors(execIdx) = None - val nodeExecs = nodesExecutorsMap(findedExecId.get._2) - nodeExecs -= findedExecId.get._1 + val findedExecId = idForExecutors(execIdx) + idForExecutors.remove(execIdx) + val nodeExecs = nodesExecutorsMap(findedExecId._2) + nodeExecs -= findedExecId._1 if (nodeExecs.isEmpty) { // there is no executor on this host, remove - nodesExecutorsMap.remove(findedExecId.get._2) + nodesExecutorsMap.remove(findedExecId._2) } + sortedIdForExecutors = idForExecutors.sortBy(_._2) totalRegisteredExecutors.addAndGet(-1) } logOnLevel( GlutenConfig.getConf.softAffinityLogLevel, s"After removing executor $execId, " + - s"fixedIdForExecutors is ${fixedIdForExecutors.mkString(",")}, " + + s"idForExecutors is ${idForExecutors.mkString(",")}, " + + s"sortedIdForExecutors is ${sortedIdForExecutors.mkString(",")}, " + s"nodesExecutorsMap is ${nodesExecutorsMap.keySet.mkString(",")}, " + s"actual executors count is ${totalRegisteredExecutors.intValue()}." ) @@ -242,7 +237,7 @@ abstract class AffinityManager extends LogLevelUtil with Logging { if (nodesExecutorsMap.size < 1) { Array.empty } else { - softAffinityAllocation.allocateExecs(file, fixedIdForExecutors) + softAffinityAllocation.allocateExecs(file, sortedIdForExecutors) } } finally { resourceRWLock.readLock().unlock() @@ -252,11 +247,11 @@ abstract class AffinityManager extends LogLevelUtil with Logging { def askExecutors(f: FilePartition): Array[(String, String)] = { resourceRWLock.readLock().lock() try { - if (fixedIdForExecutors.size < 1) { + if (sortedIdForExecutors.size < 1) { Array.empty } else { val result = getDuplicateReadingLocation(f) - result.filter(r => fixedIdForExecutors.exists(s => s.isDefined && s.get._1 == r._1)).toArray + result.filter(r => sortedIdForExecutors.exists(s => s._1 == r._1)).toArray } } finally { resourceRWLock.readLock().unlock() diff --git a/gluten-core/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityAllocationTrait.scala b/gluten-core/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityAllocationTrait.scala index 8191e378428f..639efd22e976 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityAllocationTrait.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityAllocationTrait.scala @@ -30,7 +30,5 @@ trait SoftAffinityAllocationTrait { ) /** allocate target executors for file */ - def allocateExecs( - file: String, - candidates: ListBuffer[Option[(String, String)]]): Array[(String, String)] + def allocateExecs(file: String, candidates: ListBuffer[(String, String)]): Array[(String, String)] } diff --git a/gluten-core/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityStrategy.scala b/gluten-core/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityStrategy.scala index bc36c3b1ea0d..7af5f212c1b8 100644 --- a/gluten-core/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityStrategy.scala +++ b/gluten-core/src/main/scala/org/apache/gluten/softaffinity/strategy/SoftAffinityStrategy.scala @@ -26,7 +26,7 @@ class SoftAffinityStrategy extends SoftAffinityAllocationTrait with Logging { /** allocate target executors for file */ override def allocateExecs( file: String, - candidates: ListBuffer[Option[(String, String)]]): Array[(String, String)] = { + candidates: ListBuffer[(String, String)]): Array[(String, String)] = { if (candidates.size < 1) { Array.empty } else { @@ -37,15 +37,10 @@ class SoftAffinityStrategy extends SoftAffinityAllocationTrait with Logging { // TODO: try to use ConsistentHash val mod = file.hashCode % candidatesSize val c1 = if (mod < 0) (mod + candidatesSize) else mod - // check whether the executor with index c1 is down - if (candidates(c1).isDefined) { - resultSet.add(candidates(c1).get) - } + resultSet.add(candidates(c1)) for (i <- 1 until softAffinityReplicationNum) { val c2 = (c1 + halfCandidatesSize + i) % candidatesSize - if (candidates(c2).isDefined) { - resultSet.add(candidates(c2).get) - } + resultSet.add(candidates(c2)) } resultSet.toArray } diff --git a/gluten-ras/common/src/main/scala/org/apache/gluten/ras/memo/Memo.scala b/gluten-ras/common/src/main/scala/org/apache/gluten/ras/memo/Memo.scala index c67120357311..68a8ca74574f 100644 --- a/gluten-ras/common/src/main/scala/org/apache/gluten/ras/memo/Memo.scala +++ b/gluten-ras/common/src/main/scala/org/apache/gluten/ras/memo/Memo.scala @@ -158,7 +158,6 @@ object Memo { } // The new node already memorized to memo. - val cachedCluster = parent.cache(cacheKey) if (cachedCluster == targetCluster) { // The new node already memorized to memo and in the target cluster. diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala index f1f46dd87e17..177d19c0c709 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/BackendSettingsApi.scala @@ -30,10 +30,11 @@ import org.apache.spark.sql.execution.datasources.{FileFormat, InsertIntoHadoopF import org.apache.spark.sql.types.StructField trait BackendSettingsApi { - def validateScan( + def validateScanExec( format: ReadFileFormat, fields: Array[StructField], - rootPaths: Seq[String]): ValidationResult = ValidationResult.succeeded + rootPaths: Seq[String], + properties: Map[String, String]): ValidationResult = ValidationResult.succeeded def supportWriteFilesExec( format: FileFormat, diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/ValidatorApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/ValidatorApi.scala index 90f132d78ba5..4a18a618bfc7 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/ValidatorApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/ValidatorApi.scala @@ -19,7 +19,7 @@ package org.apache.gluten.backendsapi import org.apache.gluten.extension.ValidationResult import org.apache.gluten.substrait.plan.PlanNode -import org.apache.spark.sql.catalyst.expressions.Expression +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.types.DataType @@ -58,6 +58,7 @@ trait ValidatorApi { /** Validate against ColumnarShuffleExchangeExec. */ def doColumnarShuffleExchangeExecValidate( + outputAttributes: Seq[Attribute], outputPartitioning: Partitioning, child: SparkPlan): Option[String] } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala index d7b824b397e5..f272dc3eca72 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/execution/BasicScanExecTransformer.scala @@ -95,7 +95,7 @@ trait BasicScanExecTransformer extends LeafTransformSupport with BaseDataSource } val validationResult = BackendsApiManager.getSettings - .validateScan(fileFormat, fields, getRootFilePaths) + .validateScanExec(fileFormat, fields, getRootFilePaths, getProperties) if (!validationResult.ok()) { return validationResult } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/FallbackRules.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/FallbackRules.scala index a5bba46dc605..794186bfa957 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/FallbackRules.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/FallbackRules.scala @@ -169,9 +169,9 @@ case class FallbackOnANSIMode(session: SparkSession) extends Rule[SparkPlan] { } case class FallbackMultiCodegens(session: SparkSession) extends Rule[SparkPlan] { - lazy val columnarConf: GlutenConfig = GlutenConfig.getConf - lazy val physicalJoinOptimize = columnarConf.enablePhysicalJoinOptimize - lazy val optimizeLevel: Integer = columnarConf.physicalJoinOptimizationThrottle + lazy val glutenConf: GlutenConfig = GlutenConfig.getConf + lazy val physicalJoinOptimize = glutenConf.enablePhysicalJoinOptimize + lazy val optimizeLevel: Integer = glutenConf.physicalJoinOptimizationThrottle def existsMultiCodegens(plan: SparkPlan, count: Int = 0): Boolean = plan match { diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RasOffload.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RasOffload.scala index 1299fffb995c..52798f712c12 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RasOffload.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/enumerated/RasOffload.scala @@ -17,7 +17,7 @@ package org.apache.gluten.extension.columnar.enumerated import org.apache.gluten.extension.GlutenPlan -import org.apache.gluten.extension.columnar.{FallbackTags, OffloadSingleNode} +import org.apache.gluten.extension.columnar.OffloadSingleNode import org.apache.gluten.extension.columnar.rewrite.RewriteSingleNode import org.apache.gluten.extension.columnar.validator.Validator import org.apache.gluten.ras.path.Pattern @@ -25,6 +25,7 @@ import org.apache.gluten.ras.path.Pattern.node import org.apache.gluten.ras.rule.{RasRule, Shape} import org.apache.gluten.ras.rule.Shapes.pattern +import org.apache.spark.internal.Logging import org.apache.spark.sql.execution.SparkPlan import scala.reflect.{classTag, ClassTag} @@ -70,52 +71,76 @@ object RasOffload { new RuleImpl(base, validator) } - private class RuleImpl(base: RasOffload, validator: Validator) extends RasRule[SparkPlan] { + private class RuleImpl(base: RasOffload, validator: Validator) + extends RasRule[SparkPlan] + with Logging { private val typeIdentifier: TypeIdentifier = base.typeIdentifier() final override def shift(node: SparkPlan): Iterable[SparkPlan] = { // 0. If the node is already offloaded, fail fast. assert(typeIdentifier.isInstance(node)) - // 1. Rewrite the node to form that native library supports. - val rewritten = rewrites.foldLeft(node) { - case (node, rewrite) => - node.transformUp { - case p => - val out = rewrite.rewrite(p) - out - } + // 1. Pre-validate the input node. Fast fail if no good. + validator.validate(node) match { + case Validator.Passed => + case Validator.Failed(reason) => + // TODO: Tag the original plan with fallback reason. + return List.empty } - // 2. Walk the rewritten tree. + // 2. Rewrite the node to form that native library supports. + val rewritten = + try { + rewrites.foldLeft(node) { + case (node, rewrite) => + node.transformUp { + case p => + val out = rewrite.rewrite(p) + out + } + } + } catch { + case e: Exception => + // TODO: Remove this catch block + // See https://github.com/apache/incubator-gluten/issues/7766 + logWarning( + s"Exception thrown during rewriting the plan ${node.nodeName}. Skip offloading it", + e) + return List.empty + } + + // 3. Walk the rewritten tree. val offloaded = rewritten.transformUp { case from if typeIdentifier.isInstance(from) => - // 3. Validate current node. If passed, offload it. + // 4. Validate current node. If passed, offload it. validator.validate(from) match { case Validator.Passed => val offloadedPlan = base.offload(from) val offloadedNodes = offloadedPlan.collect[GlutenPlan] { case t: GlutenPlan => t } val outComes = offloadedNodes.map(_.doValidate()).filter(!_.ok()) if (outComes.nonEmpty) { - // 4. If native validation fails on the offloaded node, return the - // original one. - outComes.foreach(FallbackTags.add(from, _)) + // 4. If native validation fails on at least one of the offloaded nodes, return + // the original one. + // + // TODO: Tag the original plan with fallback reason. This is a non-trivial work + // in RAS as the query plan we got here may be a copy so may not propagate tags + // to original plan. from } else { offloadedPlan } case Validator.Failed(reason) => - FallbackTags.add(from, reason) + // TODO: Tag the original plan with fallback reason. from } } - // 5. If rewritten plan is not offload-able, discard it. + // 6. If rewritten plan is not offload-able, discard it. if (offloaded.fastEquals(rewritten)) { return List.empty } - // 6. Otherwise, return the final tree. + // 7. Otherwise, return the final tree. List(offloaded) } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/PullOutPreProject.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/PullOutPreProject.scala index 51cdb76a1559..ee81c4124493 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/PullOutPreProject.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/PullOutPreProject.scala @@ -222,7 +222,13 @@ object PullOutPreProject extends RewriteSingleNode with PullOutProjectHelper { case expand: ExpandExec if needsPreProject(expand) => val expressionMap = new mutable.HashMap[Expression, NamedExpression]() val newProjections = - expand.projections.map(_.map(replaceExpressionWithAttribute(_, expressionMap))) + expand.projections.map( + _.map( + replaceExpressionWithAttribute( + _, + expressionMap, + replaceBoundReference = false, + replaceLiteral = false))) expand.copy( projections = newProjections, child = ProjectExec( diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteSparkPlanRulesManager.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteSparkPlanRulesManager.scala index e005a3dc8163..c6d5e1cca1f7 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteSparkPlanRulesManager.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/rewrite/RewriteSparkPlanRulesManager.scala @@ -88,7 +88,10 @@ class RewriteSparkPlanRulesManager private (rewriteRules: Seq[RewriteSingleNode] } (rewrittenPlan, None) } catch { - case e: Exception => (origin, Option(e.getMessage)) + case e: Exception => + // TODO: Remove this catch block + // See https://github.com/apache/incubator-gluten/issues/7766 + (origin, Option(e.getMessage)) } } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala index 50201efc07d2..898a2dbae824 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/extension/columnar/validator/Validators.scala @@ -149,47 +149,50 @@ object Validators { } } - private class FallbackByUserOptions(conf: GlutenConfig) extends Validator { + private class FallbackByUserOptions(glutenConf: GlutenConfig) extends Validator { override def validate(plan: SparkPlan): Validator.OutCome = plan match { - case p: SortExec if !conf.enableColumnarSort => fail(p) - case p: WindowExec if !conf.enableColumnarWindow => fail(p) - case p: SortMergeJoinExec if !conf.enableColumnarSortMergeJoin => fail(p) - case p: BatchScanExec if !conf.enableColumnarBatchScan => fail(p) - case p: FileSourceScanExec if !conf.enableColumnarFileScan => fail(p) - case p: ProjectExec if !conf.enableColumnarProject => fail(p) - case p: FilterExec if !conf.enableColumnarFilter => fail(p) - case p: UnionExec if !conf.enableColumnarUnion => fail(p) - case p: ExpandExec if !conf.enableColumnarExpand => fail(p) - case p: SortAggregateExec if !conf.forceToUseHashAgg => fail(p) - case p: ShuffledHashJoinExec if !conf.enableColumnarShuffledHashJoin => fail(p) - case p: ShuffleExchangeExec if !conf.enableColumnarShuffle => fail(p) - case p: BroadcastExchangeExec if !conf.enableColumnarBroadcastExchange => fail(p) - case p @ (_: LocalLimitExec | _: GlobalLimitExec) if !conf.enableColumnarLimit => fail(p) - case p: GenerateExec if !conf.enableColumnarGenerate => fail(p) - case p: CoalesceExec if !conf.enableColumnarCoalesce => fail(p) - case p: CartesianProductExec if !conf.cartesianProductTransformerEnabled => fail(p) + case p: SortExec if !glutenConf.enableColumnarSort => fail(p) + case p: WindowExec if !glutenConf.enableColumnarWindow => fail(p) + case p: SortMergeJoinExec if !glutenConf.enableColumnarSortMergeJoin => fail(p) + case p: BatchScanExec if !glutenConf.enableColumnarBatchScan => fail(p) + case p: FileSourceScanExec if !glutenConf.enableColumnarFileScan => fail(p) + case p: ProjectExec if !glutenConf.enableColumnarProject => fail(p) + case p: FilterExec if !glutenConf.enableColumnarFilter => fail(p) + case p: UnionExec if !glutenConf.enableColumnarUnion => fail(p) + case p: ExpandExec if !glutenConf.enableColumnarExpand => fail(p) + case p: SortAggregateExec if !glutenConf.forceToUseHashAgg => fail(p) + case p: ShuffledHashJoinExec if !glutenConf.enableColumnarShuffledHashJoin => fail(p) + case p: ShuffleExchangeExec if !glutenConf.enableColumnarShuffle => fail(p) + case p: BroadcastExchangeExec if !glutenConf.enableColumnarBroadcastExchange => fail(p) + case p @ (_: LocalLimitExec | _: GlobalLimitExec) if !glutenConf.enableColumnarLimit => + fail(p) + case p: GenerateExec if !glutenConf.enableColumnarGenerate => fail(p) + case p: CoalesceExec if !glutenConf.enableColumnarCoalesce => fail(p) + case p: CartesianProductExec if !glutenConf.cartesianProductTransformerEnabled => fail(p) case p: TakeOrderedAndProjectExec - if !(conf.enableTakeOrderedAndProject && conf.enableColumnarSort && - conf.enableColumnarShuffle && conf.enableColumnarProject) => + if !(glutenConf.enableTakeOrderedAndProject && glutenConf.enableColumnarSort && + glutenConf.enableColumnarShuffle && glutenConf.enableColumnarProject) => fail(p) - case p: BroadcastHashJoinExec if !conf.enableColumnarBroadcastJoin => + case p: BroadcastHashJoinExec if !glutenConf.enableColumnarBroadcastJoin => fail(p) case p: BroadcastNestedLoopJoinExec - if !(conf.enableColumnarBroadcastJoin && - conf.broadcastNestedLoopJoinTransformerTransformerEnabled) => + if !(glutenConf.enableColumnarBroadcastJoin && + glutenConf.broadcastNestedLoopJoinTransformerTransformerEnabled) => fail(p) case p @ (_: HashAggregateExec | _: SortAggregateExec | _: ObjectHashAggregateExec) - if !conf.enableColumnarHashAgg => + if !glutenConf.enableColumnarHashAgg => fail(p) case p if SparkShimLoader.getSparkShims.isWindowGroupLimitExec( - plan) && !conf.enableColumnarWindowGroupLimit => + plan) && !glutenConf.enableColumnarWindowGroupLimit => fail(p) case p - if HiveTableScanExecTransformer.isHiveTableScan(p) && !conf.enableColumnarHiveTableScan => + if HiveTableScanExecTransformer.isHiveTableScan( + p) && !glutenConf.enableColumnarHiveTableScan => fail(p) case p: SampleExec - if !(conf.enableColumnarSample && BackendsApiManager.getSettings.supportSampleExec()) => + if !(glutenConf.enableColumnarSample && BackendsApiManager.getSettings + .supportSampleExec()) => fail(p) case _ => pass() } diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/utils/PullOutProjectHelper.scala b/gluten-substrait/src/main/scala/org/apache/gluten/utils/PullOutProjectHelper.scala index 85be57493f02..e4fc11441031 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/utils/PullOutProjectHelper.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/utils/PullOutProjectHelper.scala @@ -62,7 +62,8 @@ trait PullOutProjectHelper { protected def replaceExpressionWithAttribute( expr: Expression, projectExprsMap: mutable.HashMap[Expression, NamedExpression], - replaceBoundReference: Boolean = false): Expression = + replaceBoundReference: Boolean = false, + replaceLiteral: Boolean = true): Expression = expr match { case alias: Alias => alias.child match { @@ -73,6 +74,7 @@ trait PullOutProjectHelper { } case attr: Attribute => attr case e: BoundReference if !replaceBoundReference => e + case literal: Literal if !replaceLiteral => literal case other => projectExprsMap .getOrElseUpdate(other.canonicalized, Alias(other, generatePreAliasName)()) diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala index ada7283da8d5..32575e4f13d5 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarCollapseTransformStages.scala @@ -114,7 +114,7 @@ case class InputIteratorTransformer(child: SparkPlan) extends UnaryTransformSupp * generate/compile code. */ case class ColumnarCollapseTransformStages( - glutenConfig: GlutenConfig, + glutenConf: GlutenConfig, transformStageCounter: AtomicInteger = ColumnarCollapseTransformStages.transformStageCounter) extends Rule[SparkPlan] { diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala index f12e3ae0b33e..4f62377b09e3 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala @@ -115,7 +115,7 @@ case class ColumnarShuffleExchangeExec( override protected def doValidateInternal(): ValidationResult = { BackendsApiManager.getValidatorApiInstance - .doColumnarShuffleExchangeExecValidate(outputPartitioning, child) + .doColumnarShuffleExchangeExecValidate(output, outputPartitioning, child) .map { reason => ValidationResult.failed( diff --git a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala index f6e23e7cff67..481e16b0a5be 100644 --- a/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala +++ b/gluten-substrait/src/main/scala/org/apache/spark/sql/execution/GlutenFallbackReporter.scala @@ -31,12 +31,12 @@ import org.apache.spark.sql.execution.ui.GlutenEventUtils * This rule is used to collect all fallback reason. * 1. print fallback reason for each plan node 2. post all fallback reason using one event */ -case class GlutenFallbackReporter(glutenConfig: GlutenConfig, spark: SparkSession) +case class GlutenFallbackReporter(glutenConf: GlutenConfig, spark: SparkSession) extends Rule[SparkPlan] with LogLevelUtil { override def apply(plan: SparkPlan): SparkPlan = { - if (!glutenConfig.enableFallbackReport) { + if (!glutenConf.enableFallbackReport) { return plan } printFallbackReason(plan) @@ -52,7 +52,7 @@ case class GlutenFallbackReporter(glutenConfig: GlutenConfig, spark: SparkSessio } private def printFallbackReason(plan: SparkPlan): Unit = { - val validationLogLevel = glutenConfig.validationLogLevel + val validationLogLevel = glutenConf.validationLogLevel plan.foreachUp { case _: GlutenPlan => // ignore case p: SparkPlan if FallbackTags.nonEmpty(p) => diff --git a/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala b/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala index 9337317d961d..4145fd5bc6b2 100644 --- a/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala +++ b/gluten-substrait/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala @@ -31,6 +31,8 @@ import org.apache.spark.sql.catalyst.expressions.PredicateHelper import org.apache.spark.sql.execution.datasources.FilePartition import org.apache.spark.sql.test.SharedSparkSession +import scala.collection.mutable.ListBuffer + class SoftAffinitySuite extends QueryTest with SharedSparkSession with PredicateHelper { override protected def sparkConf: SparkConf = super.sparkConf @@ -80,14 +82,14 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate "fakePath0", 0, 100, - Array("host-1", "host-2") + Array("192.168.22.1", "host-2") ), SparkShimLoader.getSparkShims.generatePartitionedFile( InternalRow.empty, "fakePath1", 0, 200, - Array("host-4", "host-5") + Array("192.168.22.1", "host-5") ) ).toArray ) @@ -98,13 +100,7 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations = locations) - val affinityResultSet = if (scalaVersion.startsWith("2.12")) { - Set("host-1", "host-4", "host-5") - } else if (scalaVersion.startsWith("2.13")) { - Set("host-5", "host-4", "host-2") - } - - assertResult(affinityResultSet) { + assertResult(Set("192.168.22.1", "host-5", "host-2")) { nativePartition.preferredLocations().toSet } } @@ -136,30 +132,7 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations = locations) - assertResult(Set("executor_host-2_2", "executor_host-1_0")) { - nativePartition.preferredLocations().toSet - } - } - - def generateNativePartition4(): Unit = { - val partition = FilePartition( - 0, - Seq( - SparkShimLoader.getSparkShims.generatePartitionedFile( - InternalRow.empty, - "fakePath_0", - 0, - 100) - ).toArray - ) - - val locations = SoftAffinity.getFilePartitionLocations( - partition.files.map(_.filePath.toString), - partition.preferredLocations()) - - val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations = locations) - - assertResult(Set("executor_host-1_1")) { + assertResult(Set("executor_192.168.22.1_1", "executor_10.1.1.33_6")) { nativePartition.preferredLocations().toSet } } @@ -206,11 +179,11 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate val addEvent0 = SparkListenerExecutorAdded( System.currentTimeMillis(), "0", - new ExecutorInfo("host-1", 3, null)) + new ExecutorInfo("192.168.22.1", 3, null)) val addEvent1 = SparkListenerExecutorAdded( System.currentTimeMillis(), "1", - new ExecutorInfo("host-1", 3, null)) + new ExecutorInfo("192.168.22.1", 3, null)) val addEvent2 = SparkListenerExecutorAdded( System.currentTimeMillis(), "2", @@ -234,7 +207,7 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate val addEvent6 = SparkListenerExecutorAdded( System.currentTimeMillis(), "6", - new ExecutorInfo("host-4", 3, null)) + new ExecutorInfo("10.1.1.33", 3, null)) val removedEvent0 = SparkListenerExecutorRemoved(System.currentTimeMillis(), "0", "") val removedEvent1 = SparkListenerExecutorRemoved(System.currentTimeMillis(), "1", "") @@ -256,23 +229,35 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate executorsListListener.onExecutorAdded(addEvent3_1) assert(SoftAffinityManager.nodesExecutorsMap.size == 3) - assert(SoftAffinityManager.fixedIdForExecutors.size == 4) + assert(SoftAffinityManager.sortedIdForExecutors.size == 4) executorsListListener.onExecutorRemoved(removedEvent3) // test removing executor repeatedly executorsListListener.onExecutorRemoved(removedEvent3_1) assert(SoftAffinityManager.nodesExecutorsMap.size == 2) - assert(SoftAffinityManager.fixedIdForExecutors.size == 4) - assert(SoftAffinityManager.fixedIdForExecutors.exists(_.isEmpty)) + assert(SoftAffinityManager.sortedIdForExecutors.size == 3) + assert( + SoftAffinityManager.sortedIdForExecutors.equals( + ListBuffer[(String, String)](("0", "192.168.22.1"), ("1", "192.168.22.1"), ("2", "host-2")) + )) executorsListListener.onExecutorAdded(addEvent4) executorsListListener.onExecutorAdded(addEvent5) executorsListListener.onExecutorAdded(addEvent6) assert(SoftAffinityManager.nodesExecutorsMap.size == 4) - assert(SoftAffinityManager.fixedIdForExecutors.size == 6) - assert(!SoftAffinityManager.fixedIdForExecutors.exists(_.isEmpty)) + assert(SoftAffinityManager.sortedIdForExecutors.size == 6) + assert( + SoftAffinityManager.sortedIdForExecutors.equals( + ListBuffer[(String, String)]( + ("6", "10.1.1.33"), + ("0", "192.168.22.1"), + ("1", "192.168.22.1"), + ("2", "host-2"), + ("5", "host-2"), + ("4", "host-3")) + )) // all target hosts exist in computing hosts list, return the original hosts list generateNativePartition1() @@ -286,19 +271,21 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate executorsListListener.onExecutorRemoved(removedEvent4) assert(SoftAffinityManager.nodesExecutorsMap.size == 3) - assert(SoftAffinityManager.fixedIdForExecutors.size == 6) - assert(SoftAffinityManager.fixedIdForExecutors.exists(_.isEmpty)) + assert(SoftAffinityManager.sortedIdForExecutors.size == 4) + assert( + SoftAffinityManager.sortedIdForExecutors.equals( + ListBuffer[(String, String)]( + ("6", "10.1.1.33"), + ("0", "192.168.22.1"), + ("1", "192.168.22.1"), + ("5", "host-2")) + )) executorsListListener.onExecutorRemoved(removedEvent2) executorsListListener.onExecutorRemoved(removedEvent4) assert(SoftAffinityManager.nodesExecutorsMap.size == 3) - assert(SoftAffinityManager.fixedIdForExecutors.size == 6) - assert(SoftAffinityManager.fixedIdForExecutors.exists(_.isEmpty)) - - // there are only one target host existing in computing hosts list, - // but the hash executors were removed, so return the original hosts list - generateNativePartition4() + assert(SoftAffinityManager.sortedIdForExecutors.size == 4) executorsListListener.onExecutorRemoved(removedEvent0) executorsListListener.onExecutorRemoved(removedEvent1) @@ -307,10 +294,29 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate executorsListListener.onExecutorRemoved(removedEvent7) assert(SoftAffinityManager.nodesExecutorsMap.isEmpty) - assert(SoftAffinityManager.fixedIdForExecutors.size == 6) - assert(SoftAffinityManager.fixedIdForExecutors.exists(_.isEmpty)) + assert(SoftAffinityManager.sortedIdForExecutors.isEmpty) // all executors were removed, return the original hosts list generateNativePartition5() + + executorsListListener.onExecutorAdded(addEvent0) + executorsListListener.onExecutorAdded(addEvent1) + executorsListListener.onExecutorAdded(addEvent2) + executorsListListener.onExecutorAdded(addEvent3) + executorsListListener.onExecutorAdded(addEvent4) + executorsListListener.onExecutorAdded(addEvent5) + executorsListListener.onExecutorAdded(addEvent6) + assert(SoftAffinityManager.sortedIdForExecutors.size == 7) + assert( + SoftAffinityManager.sortedIdForExecutors.equals( + ListBuffer[(String, String)]( + ("6", "10.1.1.33"), + ("0", "192.168.22.1"), + ("1", "192.168.22.1"), + ("2", "host-2"), + ("5", "host-2"), + ("3", "host-3"), + ("4", "host-3")) + )) } } diff --git a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala index 27e26606f653..50110f15d457 100644 --- a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala @@ -844,8 +844,6 @@ class ClickHouseTestSettings extends BackendTestSettings { .exclude("SPARK-32110: compare special double/float values in struct") enableSuite[GlutenRandomSuite].exclude("random").exclude("SPARK-9127 codegen with long seed") enableSuite[GlutenRegexpExpressionsSuite] - .exclude("LIKE ALL") - .exclude("LIKE ANY") .exclude("LIKE Pattern") .exclude("LIKE Pattern ESCAPE '/'") .exclude("LIKE Pattern ESCAPE '#'") @@ -854,8 +852,6 @@ class ClickHouseTestSettings extends BackendTestSettings { .exclude("RegexReplace") .exclude("RegexExtract") .exclude("RegexExtractAll") - .exclude("SPLIT") - .exclude("SPARK-34814: LikeSimplification should handle NULL") enableSuite[GlutenSortOrderExpressionsSuite].exclude("SortPrefix") enableSuite[GlutenStringExpressionsSuite] .exclude("StringComparison") diff --git a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 55fb4ae16d1e..363f9c85ed1d 100644 --- a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -861,7 +861,6 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenParquetFileFormatV2Suite] enableSuite[GlutenParquetV1FilterSuite] // Rewrite. - .exclude("Filter applied on merged Parquet schema with new column should work") .exclude("SPARK-23852: Broken Parquet push-down for partially-written stats") .exclude("SPARK-25207: exception when duplicate fields in case-insensitive mode") // Rewrite for supported INT96 - timestamp. @@ -875,8 +874,6 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("SPARK-17091: Convert IN predicate to Parquet filter push-down") .exclude("Support Parquet column index") .exclude("SPARK-34562: Bloom filter push down") - // https://github.com/apache/incubator-gluten/issues/7174 - .excludeGlutenTest("Filter applied on merged Parquet schema with new column should work") enableSuite[GlutenParquetV2FilterSuite] // Rewrite. .exclude("Filter applied on merged Parquet schema with new column should work") diff --git a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala index 51204b0777d6..2f690c615556 100644 --- a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala +++ b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala @@ -26,7 +26,6 @@ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.parseColumnPath import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, HadoopFsRelation, LogicalRelation, PushableColumnAndNestedColumn} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan -import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy.{CORRECTED, LEGACY} import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType.INT96 @@ -106,54 +105,6 @@ abstract class GlutenParquetFilterSuite extends ParquetFilterSuite with GlutenSQ } } - testGluten("Filter applied on merged Parquet schema with new column should work") { - import testImplicits._ - withAllParquetReaders { - withSQLConf( - SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true", - SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") { - withTempPath { - dir => - val path1 = s"${dir.getCanonicalPath}/table1" - (1 to 3) - .map(i => (i, i.toString, null: String)) - .toDF("a", "b", "c") - .write - .parquet(path1) - val path2 = s"${dir.getCanonicalPath}/table2" - (1 to 3) - .map(i => (null: Integer, i.toString, i.toString)) - .toDF("a", "b", "c") - .write - .parquet(path2) - - // No matter "c = 1" gets pushed down or not, this query should work without exception. - val df = spark.read.parquet(path1, path2).filter("c = 1").selectExpr("c", "b", "a") - df.show() - - // Annotated for the type check fails. - // checkAnswer(df, Row(1, "1", null)) - - val path3 = s"${dir.getCanonicalPath}/table3" - val dfStruct = sparkContext.parallelize(Seq((1, 1, null))).toDF("a", "b", "c") - dfStruct.select(struct("a").as("s")).write.parquet(path3) - - val path4 = s"${dir.getCanonicalPath}/table4" - val dfStruct2 = sparkContext.parallelize(Seq((null, 1, 1))).toDF("a", "b", "c") - dfStruct2.select(struct("c").as("s")).write.parquet(path4) - - // No matter "s.c = 1" gets pushed down or not, this query - // should work without exception. - val dfStruct3 = spark.read - .parquet(path3, path4) - .filter("s.c = 1") - .selectExpr("s") - checkAnswer(dfStruct3, Row(Row(null, 1))) - } - } - } - } - testGluten("SPARK-12218: 'Not' is included in Parquet filter pushdown") { import testImplicits._ diff --git a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala index da950e2fc1ee..9b3b090e326d 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala @@ -817,8 +817,6 @@ class ClickHouseTestSettings extends BackendTestSettings { .exclude("SPARK-32110: compare special double/float values in struct") enableSuite[GlutenRandomSuite].exclude("random").exclude("SPARK-9127 codegen with long seed") enableSuite[GlutenRegexpExpressionsSuite] - .exclude("LIKE ALL") - .exclude("LIKE ANY") .exclude("LIKE Pattern") .exclude("LIKE Pattern ESCAPE '/'") .exclude("LIKE Pattern ESCAPE '#'") @@ -827,8 +825,6 @@ class ClickHouseTestSettings extends BackendTestSettings { .exclude("RegexReplace") .exclude("RegexExtract") .exclude("RegexExtractAll") - .exclude("SPLIT") - .exclude("SPARK - 34814: LikeSimplification should handleNULL") enableSuite[GlutenSortOrderExpressionsSuite].exclude("SortPrefix") enableSuite[GlutenStringExpressionsSuite] .exclude("StringComparison") diff --git a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 8b56f63f65df..4e8f13ee4414 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -663,7 +663,6 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenParquetFileFormatV2Suite] enableSuite[GlutenParquetV1FilterSuite] // Rewrite. - .exclude("Filter applied on merged Parquet schema with new column should work") .exclude("SPARK-23852: Broken Parquet push-down for partially-written stats") // Rewrite for supported INT96 - timestamp. .exclude("filter pushdown - timestamp") @@ -679,11 +678,8 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("Support Parquet column index") .exclude("SPARK-34562: Bloom filter push down") .exclude("SPARK-16371 Do not push down filters when inner name and outer name are the same") - // https://github.com/apache/incubator-gluten/issues/7174 - .excludeGlutenTest("Filter applied on merged Parquet schema with new column should work") enableSuite[GlutenParquetV2FilterSuite] // Rewrite. - .exclude("Filter applied on merged Parquet schema with new column should work") .exclude("SPARK-23852: Broken Parquet push-down for partially-written stats") // Rewrite for supported INT96 - timestamp. .exclude("filter pushdown - timestamp") @@ -699,8 +695,6 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("Support Parquet column index") .exclude("SPARK-34562: Bloom filter push down") .exclude("SPARK-16371 Do not push down filters when inner name and outer name are the same") - // https://github.com/apache/incubator-gluten/issues/7174 - .excludeGlutenTest("Filter applied on merged Parquet schema with new column should work") enableSuite[GlutenParquetInteroperabilitySuite] .exclude("parquet timestamp conversion") enableSuite[GlutenParquetIOSuite] diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala index a1163f9525b4..02b30a46a63c 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala @@ -26,7 +26,6 @@ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.parseColumnPath import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, HadoopFsRelation, LogicalRelation, PushableColumnAndNestedColumn} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan -import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy.{CORRECTED, LEGACY} import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType.INT96 @@ -106,54 +105,6 @@ abstract class GlutenParquetFilterSuite extends ParquetFilterSuite with GlutenSQ } } - testGluten("Filter applied on merged Parquet schema with new column should work") { - import testImplicits._ - withAllParquetReaders { - withSQLConf( - SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true", - SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") { - withTempPath { - dir => - val path1 = s"${dir.getCanonicalPath}/table1" - (1 to 3) - .map(i => (i, i.toString, null: String)) - .toDF("a", "b", "c") - .write - .parquet(path1) - val path2 = s"${dir.getCanonicalPath}/table2" - (1 to 3) - .map(i => (null: Integer, i.toString, i.toString)) - .toDF("a", "b", "c") - .write - .parquet(path2) - - // No matter "c = 1" gets pushed down or not, this query should work without exception. - val df = spark.read.parquet(path1, path2).filter("c = 1").selectExpr("c", "b", "a") - df.show() - - // Annotated for the type check fails. - // checkAnswer(df, Row(1, "1", null)) - - val path3 = s"${dir.getCanonicalPath}/table3" - val dfStruct = sparkContext.parallelize(Seq((1, 1, null))).toDF("a", "b", "c") - dfStruct.select(struct("a").as("s")).write.parquet(path3) - - val path4 = s"${dir.getCanonicalPath}/table4" - val dfStruct2 = sparkContext.parallelize(Seq((null, 1, 1))).toDF("a", "b", "c") - dfStruct2.select(struct("c").as("s")).write.parquet(path4) - - // No matter "s.c = 1" gets pushed down or not, this query should work - // without exception. - val dfStruct3 = spark.read - .parquet(path3, path4) - .filter("s.c = 1") - .selectExpr("s") - checkAnswer(dfStruct3, Row(Row(null, 1))) - } - } - } - } - testGluten("SPARK-12218: 'Not' is included in Parquet filter pushdown") { import testImplicits._ diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala index c2446e38d75f..4a73a8dc72de 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala @@ -21,6 +21,7 @@ import org.apache.gluten.events.GlutenPlanFallbackEvent import org.apache.gluten.execution.FileSourceScanExecTransformer import org.apache.gluten.utils.BackendTestUtils +import org.apache.spark.SparkConf import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent} import org.apache.spark.sql.{GlutenSQLTestsTrait, Row} import org.apache.spark.sql.execution.ProjectExec @@ -32,6 +33,10 @@ import org.apache.spark.status.ElementTrackingStore import scala.collection.mutable.ArrayBuffer class GlutenFallbackSuite extends GlutenSQLTestsTrait with AdaptiveSparkPlanHelper { + override def sparkConf: SparkConf = { + super.sparkConf + .set(GlutenConfig.RAS_ENABLED.key, "false") + } testGluten("test fallback logging") { val testAppender = new LogAppender("fallback reason") diff --git a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala index ac08fc5a80cc..e91f1495fbe9 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala @@ -740,8 +740,6 @@ class ClickHouseTestSettings extends BackendTestSettings { .exclude("SPARK-32110: compare special double/float values in struct") enableSuite[GlutenRandomSuite].exclude("random").exclude("SPARK-9127 codegen with long seed") enableSuite[GlutenRegexpExpressionsSuite] - .exclude("LIKE ALL") - .exclude("LIKE ANY") .exclude("LIKE Pattern") .exclude("LIKE Pattern ESCAPE '/'") .exclude("LIKE Pattern ESCAPE '#'") @@ -750,8 +748,6 @@ class ClickHouseTestSettings extends BackendTestSettings { .exclude("RegexReplace") .exclude("RegexExtract") .exclude("RegexExtractAll") - .exclude("SPLIT") - .exclude("SPARK - 34814: LikeSimplification should handleNULL") enableSuite[GlutenSortOrderExpressionsSuite].exclude("SortPrefix") enableSuite[GlutenStringExpressionsSuite] .exclude("StringComparison") diff --git a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 22a9e62c09ae..0f3c43dfdf99 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -644,7 +644,6 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenParquetFileFormatV2Suite] enableSuite[GlutenParquetV1FilterSuite] // Rewrite. - .exclude("Filter applied on merged Parquet schema with new column should work") .exclude("SPARK-23852: Broken Parquet push-down for partially-written stats") // Rewrite for supported INT96 - timestamp. .exclude("filter pushdown - timestamp") @@ -660,11 +659,8 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("SPARK-34562: Bloom filter push down") .exclude("SPARK-16371 Do not push down filters when inner name and outer name are the same") .exclude("filter pushdown - StringPredicate") - // https://github.com/apache/incubator-gluten/issues/7174 - .excludeGlutenTest("Filter applied on merged Parquet schema with new column should work") enableSuite[GlutenParquetV2FilterSuite] // Rewrite. - .exclude("Filter applied on merged Parquet schema with new column should work") .exclude("SPARK-23852: Broken Parquet push-down for partially-written stats") // Rewrite for supported INT96 - timestamp. .exclude("filter pushdown - timestamp") @@ -680,8 +676,6 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("SPARK-34562: Bloom filter push down") .exclude("SPARK-16371 Do not push down filters when inner name and outer name are the same") .exclude("filter pushdown - StringPredicate") - // https://github.com/apache/incubator-gluten/issues/7174 - .excludeGlutenTest("Filter applied on merged Parquet schema with new column should work") enableSuite[GlutenParquetInteroperabilitySuite] .exclude("parquet timestamp conversion") enableSuite[GlutenParquetIOSuite] diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala index b4a4b6017b67..a4f830e18716 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala @@ -26,7 +26,6 @@ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.parseColumnPath import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, HadoopFsRelation, LogicalRelation, PushableColumnAndNestedColumn} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan -import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.LegacyBehaviorPolicy.{CORRECTED, LEGACY} import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType.INT96 @@ -105,54 +104,6 @@ abstract class GlutenParquetFilterSuite extends ParquetFilterSuite with GlutenSQ } } - testGluten("Filter applied on merged Parquet schema with new column should work") { - import testImplicits._ - withAllParquetReaders { - withSQLConf( - SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true", - SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") { - withTempPath { - dir => - val path1 = s"${dir.getCanonicalPath}/table1" - (1 to 3) - .map(i => (i, i.toString, null: String)) - .toDF("a", "b", "c") - .write - .parquet(path1) - val path2 = s"${dir.getCanonicalPath}/table2" - (1 to 3) - .map(i => (null: Integer, i.toString, i.toString)) - .toDF("a", "b", "c") - .write - .parquet(path2) - - // No matter "c = 1" gets pushed down or not, this query should work without exception. - val df = spark.read.parquet(path1, path2).filter("c = 1").selectExpr("c", "b", "a") - df.show() - - // Annotated for the type check fails. - // checkAnswer(df, Row(1, "1", null)) - - val path3 = s"${dir.getCanonicalPath}/table3" - val dfStruct = sparkContext.parallelize(Seq((1, 1, null))).toDF("a", "b", "c") - dfStruct.select(struct("a").as("s")).write.parquet(path3) - - val path4 = s"${dir.getCanonicalPath}/table4" - val dfStruct2 = sparkContext.parallelize(Seq((null, 1, 1))).toDF("a", "b", "c") - dfStruct2.select(struct("c").as("s")).write.parquet(path4) - - // No matter "s.c = 1" gets pushed down or not, this query should work - // without exception. - val dfStruct3 = spark.read - .parquet(path3, path4) - .filter("s.c = 1") - .selectExpr("s") - checkAnswer(dfStruct3, Row(Row(null, 1))) - } - } - } - } - testGluten("SPARK-12218: 'Not' is included in Parquet filter pushdown") { import testImplicits._ diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala index d7ec1f0fa57e..7855f289a707 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala @@ -21,6 +21,7 @@ import org.apache.gluten.events.GlutenPlanFallbackEvent import org.apache.gluten.execution.FileSourceScanExecTransformer import org.apache.gluten.utils.BackendTestUtils +import org.apache.spark.SparkConf import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent} import org.apache.spark.sql.{GlutenSQLTestsTrait, Row} import org.apache.spark.sql.execution.ProjectExec @@ -32,6 +33,10 @@ import org.apache.spark.status.ElementTrackingStore import scala.collection.mutable.ArrayBuffer class GlutenFallbackSuite extends GlutenSQLTestsTrait with AdaptiveSparkPlanHelper { + override def sparkConf: SparkConf = { + super.sparkConf + .set(GlutenConfig.RAS_ENABLED.key, "false") + } testGluten("test fallback logging") { val testAppender = new LogAppender("fallback reason") diff --git a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala index 9e4c81081de1..f0637839a762 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/clickhouse/ClickHouseTestSettings.scala @@ -740,8 +740,6 @@ class ClickHouseTestSettings extends BackendTestSettings { .exclude("SPARK-32110: compare special double/float values in struct") enableSuite[GlutenRandomSuite].exclude("random").exclude("SPARK-9127 codegen with long seed") enableSuite[GlutenRegexpExpressionsSuite] - .exclude("LIKE ALL") - .exclude("LIKE ANY") .exclude("LIKE Pattern") .exclude("LIKE Pattern ESCAPE '/'") .exclude("LIKE Pattern ESCAPE '#'") @@ -750,8 +748,6 @@ class ClickHouseTestSettings extends BackendTestSettings { .exclude("RegexReplace") .exclude("RegexExtract") .exclude("RegexExtractAll") - .exclude("SPLIT") - .exclude("SPARK - 34814: LikeSimplification should handleNULL") enableSuite[GlutenSortOrderExpressionsSuite].exclude("SortPrefix") enableSuite[GlutenStringExpressionsSuite] .exclude("StringComparison") diff --git a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 3f6bea5dd1ce..a9525b1b0e4a 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -655,7 +655,6 @@ class VeloxTestSettings extends BackendTestSettings { enableSuite[GlutenParquetFileFormatV2Suite] enableSuite[GlutenParquetV1FilterSuite] // Rewrite. - .exclude("Filter applied on merged Parquet schema with new column should work") .exclude("SPARK-23852: Broken Parquet push-down for partially-written stats") // Rewrite for supported INT96 - timestamp. .exclude("filter pushdown - timestamp") @@ -671,11 +670,8 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("SPARK-34562: Bloom filter push down") .exclude("SPARK-16371 Do not push down filters when inner name and outer name are the same") .exclude("filter pushdown - StringPredicate") - // https://github.com/apache/incubator-gluten/issues/7174 - .excludeGlutenTest("Filter applied on merged Parquet schema with new column should work") enableSuite[GlutenParquetV2FilterSuite] // Rewrite. - .exclude("Filter applied on merged Parquet schema with new column should work") .exclude("SPARK-23852: Broken Parquet push-down for partially-written stats") // Rewrite for supported INT96 - timestamp. .exclude("filter pushdown - timestamp") @@ -691,8 +687,6 @@ class VeloxTestSettings extends BackendTestSettings { .exclude("SPARK-34562: Bloom filter push down") .exclude("SPARK-16371 Do not push down filters when inner name and outer name are the same") .exclude("filter pushdown - StringPredicate") - // https://github.com/apache/incubator-gluten/issues/7174 - .excludeGlutenTest("Filter applied on merged Parquet schema with new column should work") enableSuite[GlutenParquetInteroperabilitySuite] .exclude("parquet timestamp conversion") enableSuite[GlutenParquetIOSuite] diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala index 063b424e0d13..3c52ec82e9fc 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/GlutenParquetFilterSuite.scala @@ -26,7 +26,6 @@ import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.parseColumnPath import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, HadoopFsRelation, LogicalRelation, PushableColumnAndNestedColumn} import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan -import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.LegacyBehaviorPolicy.{CORRECTED, LEGACY} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf.ParquetOutputTimestampType.INT96 @@ -105,54 +104,6 @@ abstract class GlutenParquetFilterSuite extends ParquetFilterSuite with GlutenSQ } } - testGluten("Filter applied on merged Parquet schema with new column should work") { - import testImplicits._ - withAllParquetReaders { - withSQLConf( - SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true", - SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true") { - withTempPath { - dir => - val path1 = s"${dir.getCanonicalPath}/table1" - (1 to 3) - .map(i => (i, i.toString, null: String)) - .toDF("a", "b", "c") - .write - .parquet(path1) - val path2 = s"${dir.getCanonicalPath}/table2" - (1 to 3) - .map(i => (null: Integer, i.toString, i.toString)) - .toDF("a", "b", "c") - .write - .parquet(path2) - - // No matter "c = 1" gets pushed down or not, this query should work without exception. - val df = spark.read.parquet(path1, path2).filter("c = 1").selectExpr("c", "b", "a") - df.show() - - // Annotated for the type check fails. - // checkAnswer(df, Row(1, "1", null)) - - val path3 = s"${dir.getCanonicalPath}/table3" - val dfStruct = sparkContext.parallelize(Seq((1, 1, null))).toDF("a", "b", "c") - dfStruct.select(struct("a").as("s")).write.parquet(path3) - - val path4 = s"${dir.getCanonicalPath}/table4" - val dfStruct2 = sparkContext.parallelize(Seq((null, 1, 1))).toDF("a", "b", "c") - dfStruct2.select(struct("c").as("s")).write.parquet(path4) - - // No matter "s.c = 1" gets pushed down or not, this query should work - // without exception. - val dfStruct3 = spark.read - .parquet(path3, path4) - .filter("s.c = 1") - .selectExpr("s") - checkAnswer(dfStruct3, Row(Row(null, 1))) - } - } - } - } - testGluten("SPARK-12218: 'Not' is included in Parquet filter pushdown") { import testImplicits._ diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala index d7ec1f0fa57e..7855f289a707 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/gluten/GlutenFallbackSuite.scala @@ -21,6 +21,7 @@ import org.apache.gluten.events.GlutenPlanFallbackEvent import org.apache.gluten.execution.FileSourceScanExecTransformer import org.apache.gluten.utils.BackendTestUtils +import org.apache.spark.SparkConf import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent} import org.apache.spark.sql.{GlutenSQLTestsTrait, Row} import org.apache.spark.sql.execution.ProjectExec @@ -32,6 +33,10 @@ import org.apache.spark.status.ElementTrackingStore import scala.collection.mutable.ArrayBuffer class GlutenFallbackSuite extends GlutenSQLTestsTrait with AdaptiveSparkPlanHelper { + override def sparkConf: SparkConf = { + super.sparkConf + .set(GlutenConfig.RAS_ENABLED.key, "false") + } testGluten("test fallback logging") { val testAppender = new LogAppender("fallback reason") diff --git a/gluten-ut/test/src/test/scala/org/apache/gluten/expressions/GlutenExpressionMappingSuite.scala b/gluten-ut/test/src/test/scala/org/apache/gluten/expressions/GlutenExpressionMappingSuite.scala index 68b9ab1a3697..693b46283cde 100644 --- a/gluten-ut/test/src/test/scala/org/apache/gluten/expressions/GlutenExpressionMappingSuite.scala +++ b/gluten-ut/test/src/test/scala/org/apache/gluten/expressions/GlutenExpressionMappingSuite.scala @@ -98,16 +98,18 @@ class GlutenExpressionMappingSuite testWithSpecifiedSparkVersion( "GLUTEN-7213: Check fallback reason with CheckOverflowInTableInsert", Some("3.4")) { - withTable("t1", "t2") { - sql("create table t1 (a float) using parquet") - sql("insert into t1 values(1.1)") - sql("create table t2 (b decimal(10,4)) using parquet") + withSQLConf(GlutenConfig.RAS_ENABLED.key -> "false") { + withTable("t1", "t2") { + sql("create table t1 (a float) using parquet") + sql("insert into t1 values(1.1)") + sql("create table t2 (b decimal(10,4)) using parquet") - val msg = - "CheckOverflowInTableInsert is used in ANSI mode, but Gluten does not support ANSI mode." - import org.apache.spark.sql.execution.GlutenImplicits._ - val fallbackSummary = sql("insert overwrite t2 select * from t1").fallbackSummary() - assert(fallbackSummary.fallbackNodeToReason.flatMap(_.values).exists(_.contains(msg))) + val msg = + "CheckOverflowInTableInsert is used in ANSI mode, but Gluten does not support ANSI mode." + import org.apache.spark.sql.execution.GlutenImplicits._ + val fallbackSummary = sql("insert overwrite t2 select * from t1").fallbackSummary() + assert(fallbackSummary.fallbackNodeToReason.flatMap(_.values).exists(_.contains(msg))) + } } } } diff --git a/pom.xml b/pom.xml index 69a0a17872cb..d3c67f231e9e 100644 --- a/pom.xml +++ b/pom.xml @@ -534,7 +534,7 @@ mac - x86 + x86_64