From d3ccd4aea027455752d84f54bf1f3b589660bd4a Mon Sep 17 00:00:00 2001 From: Wenzheng Liu Date: Tue, 11 Jun 2024 17:57:01 +0800 Subject: [PATCH] [GLUTEN-5979][CH] Fix CHListenerApi initialize twice on spark local mode (#6037) --- .../clickhouse/CHListenerApi.scala | 6 +- .../GlutenClickHouseNativeLibSuite.scala | 79 +++++++++++++++++++ 2 files changed, 84 insertions(+), 1 deletion(-) create mode 100644 backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseNativeLibSuite.scala diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala index 665fdba88e55..43e0627dffef 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala @@ -48,7 +48,11 @@ class CHListenerApi extends ListenerApi with Logging { override def onExecutorStart(pc: PluginContext): Unit = { GlutenExecutorEndpoint.executorEndpoint = new GlutenExecutorEndpoint(pc.executorID, pc.conf) - initialize(pc.conf, isDriver = false) + if (pc.conf().get("spark.master").startsWith("local")) { + logDebug("Skipping duplicate initializing clickhouse backend on spark local mode") + } else { + initialize(pc.conf, isDriver = false) + } } override def onExecutorShutdown(): Unit = shutdown() diff --git a/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseNativeLibSuite.scala b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseNativeLibSuite.scala new file mode 100644 index 000000000000..0221f06bd681 --- /dev/null +++ b/backends-clickhouse/src/test/scala/org/apache/gluten/execution/GlutenClickHouseNativeLibSuite.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gluten.execution + +import org.apache.gluten.GlutenConfig +import org.apache.gluten.exception.GlutenException +import org.apache.gluten.utils.UTSystemParameters + +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.plans.PlanTest + +class GlutenClickHouseNativeLibSuite extends PlanTest { + + private def baseSparkConf: SparkConf = { + new SparkConf() + .set("spark.plugins", "org.apache.gluten.GlutenPlugin") + .set("spark.default.parallelism", "1") + .set("spark.memory.offHeap.enabled", "true") + .set("spark.memory.offHeap.size", "1024MB") + .set("spark.gluten.sql.enable.native.validation", "false") + } + + test("test columnar lib path not exist") { + var spark: SparkSession = null + try { + spark = SparkSession + .builder() + .master("local[1]") + .config(baseSparkConf) + .config(GlutenConfig.GLUTEN_LIB_PATH, "path/not/exist/libch.so") + .getOrCreate() + spark.sql("select 1").show() + } catch { + case e: Exception => + assert(e.isInstanceOf[GlutenException]) + assert( + e.getMessage.contains( + "library at path: path/not/exist/libch.so is not a file or does not exist")) + } finally { + if (spark != null) { + spark.stop() + } + } + } + + test("test CHListenerApi initialize only once") { + var spark: SparkSession = null + try { + spark = SparkSession + .builder() + .master("local[1]") + .config(baseSparkConf) + .config(GlutenConfig.GLUTEN_LIB_PATH, UTSystemParameters.clickHouseLibPath) + .config(GlutenConfig.GLUTEN_EXECUTOR_LIB_PATH, "/path/not/exist/libch.so") + .getOrCreate() + spark.sql("select 1").show() + } finally { + if (spark != null) { + spark.stop() + } + } + } + +}