diff --git a/backends-clickhouse/src/main/java/io/glutenproject/vectorized/BlockSplitIterator.java b/backends-clickhouse/src/main/java/io/glutenproject/vectorized/BlockSplitIterator.java index facb9f5eed20..3c0d919f6f5e 100644 --- a/backends-clickhouse/src/main/java/io/glutenproject/vectorized/BlockSplitIterator.java +++ b/backends-clickhouse/src/main/java/io/glutenproject/vectorized/BlockSplitIterator.java @@ -33,7 +33,8 @@ public BlockSplitIterator(Iterator in, IteratorOptions options) { options.getExpr(), options.getRequiredFields(), options.getPartitionNum(), - options.getBufferSize()); + options.getBufferSize(), + options.getHashAlgorithm()); } private native long nativeCreate( @@ -42,7 +43,8 @@ private native long nativeCreate( String expr, String schema, int partitionNum, - int bufferSize); + int bufferSize, + String hashAlgorithm); private native void nativeClose(long instance); @@ -80,6 +82,8 @@ public static class IteratorOptions implements Serializable { private String expr; private String requiredFields; + private String hashAlgorithm; + public int getPartitionNum() { return partitionNum; } @@ -119,5 +123,13 @@ public String getRequiredFields() { public void setRequiredFields(String requiredFields) { this.requiredFields = requiredFields; } + + public String getHashAlgorithm() { + return hashAlgorithm; + } + + public void setHashAlgorithm(String hashAlgorithm) { + this.hashAlgorithm = hashAlgorithm; + } } } diff --git a/backends-clickhouse/src/main/java/io/glutenproject/vectorized/CHNativeExpressionEvaluator.java b/backends-clickhouse/src/main/java/io/glutenproject/vectorized/CHNativeExpressionEvaluator.java index 0d598cf6cba1..3322afe0b9d5 100644 --- a/backends-clickhouse/src/main/java/io/glutenproject/vectorized/CHNativeExpressionEvaluator.java +++ b/backends-clickhouse/src/main/java/io/glutenproject/vectorized/CHNativeExpressionEvaluator.java @@ -27,7 +27,6 @@ import io.glutenproject.substrait.plan.PlanNode; import com.google.protobuf.Any; -import io.substrait.proto.Plan; import org.apache.spark.SparkConf; import org.apache.spark.sql.internal.SQLConf; @@ -80,12 +79,12 @@ private PlanNode buildNativeConfNode(Map confs) { // Used by WholeStageTransform to create the native computing pipeline and // return a columnar result iterator. public GeneralOutIterator createKernelWithBatchIterator( - Plan wsPlan, List iterList, boolean materializeInput) { + byte[] wsPlan, List iterList, boolean materializeInput) { long allocId = CHNativeMemoryAllocators.contextInstance().getNativeInstanceId(); long handle = jniWrapper.nativeCreateKernelWithIterator( allocId, - getPlanBytesBuf(wsPlan), + wsPlan, iterList.toArray(new GeneralInIterator[0]), buildNativeConfNode( GlutenConfig.getNativeBackendConf( @@ -115,10 +114,6 @@ public GeneralOutIterator createKernelWithBatchIterator( return createOutIterator(handle); } - private byte[] getPlanBytesBuf(Plan planNode) { - return planNode.toByteArray(); - } - private GeneralOutIterator createOutIterator(long nativeHandle) { return new BatchIterator(nativeHandle); } diff --git a/backends-clickhouse/src/main/java/io/glutenproject/vectorized/CHShuffleSplitterJniWrapper.java b/backends-clickhouse/src/main/java/io/glutenproject/vectorized/CHShuffleSplitterJniWrapper.java index bd40b8fef5fc..ec773343a09c 100644 --- a/backends-clickhouse/src/main/java/io/glutenproject/vectorized/CHShuffleSplitterJniWrapper.java +++ b/backends-clickhouse/src/main/java/io/glutenproject/vectorized/CHShuffleSplitterJniWrapper.java @@ -31,7 +31,8 @@ public long make( String localDirs, int subDirsPerLocalDir, boolean preferSpill, - long spillThreshold) { + long spillThreshold, + String hashAlgorithm) { return nativeMake( part.getShortName(), part.getNumPartitions(), @@ -45,7 +46,8 @@ public long make( localDirs, subDirsPerLocalDir, preferSpill, - spillThreshold); + spillThreshold, + hashAlgorithm); } public long makeForRSS( @@ -55,6 +57,7 @@ public long makeForRSS( int bufferSize, String codec, long spillThreshold, + String hashAlgorithm, Object pusher) { return nativeMakeForRSS( part.getShortName(), @@ -66,6 +69,7 @@ public long makeForRSS( bufferSize, codec, spillThreshold, + hashAlgorithm, pusher); } @@ -82,7 +86,8 @@ public native long nativeMake( String localDirs, int subDirsPerLocalDir, boolean preferSpill, - long spillThreshold); + long spillThreshold, + String hashAlgorithm); public native long nativeMakeForRSS( String shortName, @@ -94,6 +99,7 @@ public native long nativeMakeForRSS( int bufferSize, String codec, long spillThreshold, + String hashAlgorithm, Object pusher); public native void split(long splitterId, long block); diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHBackend.scala b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHBackend.scala index 62f784828444..5765e0293313 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHBackend.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHBackend.scala @@ -90,6 +90,24 @@ object CHBackendSettings extends BackendSettingsApi with Logging { private val GLUTEN_CLICKHOUSE_SHUFFLE_SUPPORTED_CODEC: Set[String] = Set("lz4", "zstd", "snappy") + // The algorithm for hash partition of the shuffle + private val GLUTEN_CLICKHOUSE_SHUFFLE_HASH_ALGORITHM: String = + GlutenConfig.GLUTEN_CONFIG_PREFIX + CHBackend.BACKEND_NAME + + ".shuffle.hash.algorithm" + // valid values are: cityHash64 or murmurHash3_32 + private val GLUTEN_CLICKHOUSE_SHUFFLE_HASH_ALGORITHM_DEFAULT = "cityHash64" + lazy val shuffleHashAlgorithm: String = { + val algorithm = SparkEnv.get.conf.get( + CHBackendSettings.GLUTEN_CLICKHOUSE_SHUFFLE_HASH_ALGORITHM, + CHBackendSettings.GLUTEN_CLICKHOUSE_SHUFFLE_HASH_ALGORITHM_DEFAULT + ) + if (!algorithm.equals("cityHash64") && !algorithm.equals("murmurHash3_32")) { + CHBackendSettings.GLUTEN_CLICKHOUSE_SHUFFLE_HASH_ALGORITHM_DEFAULT + } else { + algorithm + } + } + override def supportFileFormatRead( format: ReadFileFormat, fields: Array[StructField], diff --git a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala index aea4bb15df12..4820e4f96aeb 100644 --- a/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala +++ b/backends-clickhouse/src/main/scala/io/glutenproject/backendsapi/clickhouse/CHIteratorApi.scala @@ -91,19 +91,19 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { fileFormats(i)), SoftAffinityUtil.getFilePartitionLocations(f)) case _ => - throw new UnsupportedOperationException(s"Unsupport operators.") + throw new UnsupportedOperationException(s"Unsupported input partition.") }) wsCxt.substraitContext.initLocalFilesNodesIndex(0) wsCxt.substraitContext.setLocalFilesNodes(localFilesNodesWithLocations.map(_._1)) val substraitPlan = wsCxt.root.toProtobuf - if (index < 3) { + if (index == 0) { logOnLevel( GlutenConfig.getConf.substraitPlanLogLevel, s"The substrait plan for partition $index:\n${SubstraitPlanPrinterUtil .substraitPlanToJson(substraitPlan)}" ) } - GlutenPartition(index, substraitPlan, localFilesNodesWithLocations.head._2) + GlutenPartition(index, substraitPlan.toByteArray, localFilesNodesWithLocations.head._2) } /** @@ -185,7 +185,7 @@ class CHIteratorApi extends IteratorApi with Logging with LogLevelUtil { }.asJava) // we need to complete dependency RDD's firstly transKernel.createKernelWithBatchIterator( - rootNode.toProtobuf, + rootNode.toProtobuf.toByteArray, columnarNativeIterator, materializeInput) } diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala b/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala index 0248dec768f4..8af1a46875ec 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala @@ -17,6 +17,7 @@ package org.apache.spark.shuffle import io.glutenproject.GlutenConfig +import io.glutenproject.backendsapi.clickhouse.CHBackendSettings import io.glutenproject.memory.alloc.CHNativeMemoryAllocators import io.glutenproject.memory.memtarget.{MemoryTarget, Spiller} import io.glutenproject.vectorized._ @@ -100,7 +101,8 @@ class CHColumnarShuffleWriter[K, V]( localDirs, subDirsPerLocalDir, preferSpill, - spillThreshold + spillThreshold, + CHBackendSettings.shuffleHashAlgorithm ) CHNativeMemoryAllocators.createSpillable( "ShuffleWriter", diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala index 0544ad4b90b7..3144522c34e5 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.utils import io.glutenproject.GlutenConfig +import io.glutenproject.backendsapi.clickhouse.CHBackendSettings import io.glutenproject.expression.ConverterUtils import io.glutenproject.row.SparkRowInfo import io.glutenproject.vectorized._ @@ -210,6 +211,7 @@ object CHExecUtil extends Logging { options.setName(nativePartitioning.getShortName) options.setPartitionNum(nativePartitioning.getNumPartitions) options.setExpr(new String(nativePartitioning.getExprList)) + options.setHashAlgorithm(CHBackendSettings.shuffleHashAlgorithm) options.setRequiredFields(if (nativePartitioning.getRequiredFields != null) { new String(nativePartitioning.getRequiredFields) } else { diff --git a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala index 9b7cc2518d28..6d304ee0ba79 100644 --- a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala +++ b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHColumnarShuffleParquetAQESuite.scala @@ -43,6 +43,7 @@ class GlutenClickHouseTPCHColumnarShuffleParquetAQESuite .set("spark.sql.autoBroadcastJoinThreshold", "10MB") .set("spark.gluten.sql.columnar.backend.ch.use.v2", "false") .set("spark.sql.adaptive.enabled", "true") + .set("spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm", "murmurHash3_32") } override protected def createTPCHNotNullTables(): Unit = { diff --git a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHParquetAQESuite.scala b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHParquetAQESuite.scala index 292a78310470..6f70ac2c3065 100644 --- a/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHParquetAQESuite.scala +++ b/backends-clickhouse/src/test/scala/io/glutenproject/execution/GlutenClickHouseTPCHParquetAQESuite.scala @@ -46,6 +46,7 @@ class GlutenClickHouseTPCHParquetAQESuite .set("spark.gluten.sql.columnar.backend.ch.use.v2", "false") .set("spark.sql.adaptive.enabled", "true") .set("spark.gluten.sql.columnar.backend.ch.runtime_config.use_local_format", "true") + .set("spark.gluten.sql.columnar.backend.ch.shuffle.hash.algorithm", "murmurHash3_32") } override protected def createTPCHNotNullTables(): Unit = { diff --git a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala index bb5acb46a9ef..2aae3d9c8ca0 100644 --- a/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala +++ b/backends-velox/src/main/scala/io/glutenproject/backendsapi/velox/IteratorApiImpl.scala @@ -122,7 +122,7 @@ class IteratorApiImpl extends IteratorApi with Logging { wsCxt.substraitContext.initLocalFilesNodesIndex(0) wsCxt.substraitContext.setLocalFilesNodes(localFilesNodesWithLocations.map(_._1)) val substraitPlan = wsCxt.root.toProtobuf - GlutenPartition(index, substraitPlan, localFilesNodesWithLocations.head._2) + GlutenPartition(index, substraitPlan.toByteArray, localFilesNodesWithLocations.head._2) } /** @@ -187,7 +187,9 @@ class IteratorApiImpl extends IteratorApi with Logging { iter => new ColumnarBatchInIterator(iter.asJava) }.asJava) val nativeResultIterator = - transKernel.createKernelWithBatchIterator(rootNode.toProtobuf, columnarNativeIterator) + transKernel.createKernelWithBatchIterator( + rootNode.toProtobuf.toByteArray, + columnarNativeIterator) pipelineTime += TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - beforeBuild) diff --git a/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp b/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp index 4f3ae465078e..676231e048b9 100644 --- a/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp +++ b/cpp-ch/local-engine/Shuffle/CachedShuffleWriter.cpp @@ -50,7 +50,7 @@ CachedShuffleWriter::CachedShuffleWriter(const String & short_name, SplitOptions { hash_fields.push_back(std::stoi(expr)); } - partitioner = std::make_unique(options.partition_nums, hash_fields, "cityHash64"); + partitioner = std::make_unique(options.partition_nums, hash_fields, options_.hash_algorithm); } else if (short_name == "single") { diff --git a/cpp-ch/local-engine/Shuffle/NativeSplitter.cpp b/cpp-ch/local-engine/Shuffle/NativeSplitter.cpp index 4b8b56f99c57..eec4e05fffb9 100644 --- a/cpp-ch/local-engine/Shuffle/NativeSplitter.cpp +++ b/cpp-ch/local-engine/Shuffle/NativeSplitter.cpp @@ -209,7 +209,7 @@ HashNativeSplitter::HashNativeSplitter(NativeSplitter::Options options_, jobject output_columns_indicies.push_back(std::stoi(*iter)); } - selector_builder = std::make_unique(options.partition_nums, hash_fields, "cityHash64"); + selector_builder = std::make_unique(options.partition_nums, hash_fields, options_.hash_algorithm); } void HashNativeSplitter::computePartitionId(Block & block) diff --git a/cpp-ch/local-engine/Shuffle/NativeSplitter.h b/cpp-ch/local-engine/Shuffle/NativeSplitter.h index c883da4bfb4a..c30f235b6550 100644 --- a/cpp-ch/local-engine/Shuffle/NativeSplitter.h +++ b/cpp-ch/local-engine/Shuffle/NativeSplitter.h @@ -41,6 +41,7 @@ class NativeSplitter : BlockIterator size_t partition_nums; std::string exprs_buffer; std::string schema_buffer; + std::string hash_algorithm; }; struct Holder diff --git a/cpp-ch/local-engine/Shuffle/ShuffleSplitter.cpp b/cpp-ch/local-engine/Shuffle/ShuffleSplitter.cpp index 2f7d14410284..266c343a59e7 100644 --- a/cpp-ch/local-engine/Shuffle/ShuffleSplitter.cpp +++ b/cpp-ch/local-engine/Shuffle/ShuffleSplitter.cpp @@ -383,7 +383,7 @@ HashSplitter::HashSplitter(SplitOptions options_) : ShuffleSplitter(std::move(op output_columns_indicies.push_back(std::stoi(*iter)); } - selector_builder = std::make_unique(options.partition_nums, hash_fields, "cityHash64"); + selector_builder = std::make_unique(options.partition_nums, hash_fields, options_.hash_algorithm); } std::unique_ptr HashSplitter::create(SplitOptions && options_) { diff --git a/cpp-ch/local-engine/Shuffle/ShuffleSplitter.h b/cpp-ch/local-engine/Shuffle/ShuffleSplitter.h index 49fa967fc03e..e9a59c339cae 100644 --- a/cpp-ch/local-engine/Shuffle/ShuffleSplitter.h +++ b/cpp-ch/local-engine/Shuffle/ShuffleSplitter.h @@ -47,7 +47,7 @@ struct SplitOptions std::string compress_method = "zstd"; int compress_level; size_t spill_threshold = 300 * 1024 * 1024; - + std::string hash_algorithm; }; class ColumnsBuffer diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp index 5d6add257854..6813a5c9335f 100644 --- a/cpp-ch/local-engine/local_engine_jni.cpp +++ b/cpp-ch/local-engine/local_engine_jni.cpp @@ -653,7 +653,8 @@ JNIEXPORT jlong Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_nat jstring local_dirs, jint num_sub_dirs, jboolean prefer_spill, - jlong spill_threshold) + jlong spill_threshold, + jstring hash_algorithm) { LOCAL_ENGINE_JNI_METHOD_START std::string hash_exprs; @@ -694,7 +695,8 @@ JNIEXPORT jlong Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_nat .hash_exprs = hash_exprs, .out_exprs = out_exprs, .compress_method = jstring2string(env, codec), - .spill_threshold = static_cast(spill_threshold)}; + .spill_threshold = static_cast(spill_threshold), + .hash_algorithm = jstring2string(env, hash_algorithm)}; auto name = jstring2string(env, short_name); local_engine::SplitterHolder * splitter; if (prefer_spill) @@ -721,6 +723,7 @@ JNIEXPORT jlong Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_nat jint split_size, jstring codec, jlong spill_threshold, + jstring hash_algorithm, jobject pusher) { LOCAL_ENGINE_JNI_METHOD_START @@ -753,7 +756,8 @@ JNIEXPORT jlong Java_io_glutenproject_vectorized_CHShuffleSplitterJniWrapper_nat .hash_exprs = hash_exprs, .out_exprs = out_exprs, .compress_method = jstring2string(env, codec), - .spill_threshold = static_cast(spill_threshold)}; + .spill_threshold = static_cast(spill_threshold), + .hash_algorithm = jstring2string(env, hash_algorithm)}; auto name = jstring2string(env, short_name); local_engine::SplitterHolder * splitter; splitter = new local_engine::SplitterHolder{.splitter = std::make_unique(name, options, pusher)}; @@ -1060,12 +1064,14 @@ Java_io_glutenproject_vectorized_StorageJoinBuilder_nativeCleanBuildHashTable(JN // BlockSplitIterator JNIEXPORT jlong Java_io_glutenproject_vectorized_BlockSplitIterator_nativeCreate( - JNIEnv * env, jobject, jobject in, jstring name, jstring expr, jstring schema, jint partition_num, jint buffer_size) + JNIEnv * env, jobject, jobject in, jstring name, jstring expr, jstring schema, jint partition_num, jint buffer_size, jstring hash_algorithm) { LOCAL_ENGINE_JNI_METHOD_START local_engine::NativeSplitter::Options options; options.partition_nums = partition_num; options.buffer_size = buffer_size; + auto hash_algorithm_str = jstring2string(env, hash_algorithm); + options.hash_algorithm.swap(hash_algorithm_str); auto expr_str = jstring2string(env, expr); std::string schema_str; if (schema) diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc index 268cbe79f07e..78313a382f43 100644 --- a/cpp/core/jni/JniWrapper.cc +++ b/cpp/core/jni/JniWrapper.cc @@ -834,8 +834,9 @@ JNIEXPORT jlong JNICALL Java_io_glutenproject_vectorized_ShuffleWriterJniWrapper env->ReleaseStringUTFChars(dataFileJstr, dataFileC); auto localDirs = env->GetStringUTFChars(localDirsJstr, JNI_FALSE); - setenv(gluten::kGlutenSparkLocalDirs.c_str(), localDirs, 1); + shuffleWriterOptions.local_dirs = std::string(localDirs); env->ReleaseStringUTFChars(localDirsJstr, localDirs); + partitionWriterCreator = std::make_shared(); } else if (partitionWriterType == "celeborn") { shuffleWriterOptions.partition_writer_type = PartitionWriterType::kCeleborn; diff --git a/cpp/core/shuffle/LocalPartitionWriter.cc b/cpp/core/shuffle/LocalPartitionWriter.cc index e9fb75c7836e..313be10653b8 100644 --- a/cpp/core/shuffle/LocalPartitionWriter.cc +++ b/cpp/core/shuffle/LocalPartitionWriter.cc @@ -20,6 +20,7 @@ #include #include "shuffle/Utils.h" #include "utils/DebugOut.h" +#include "utils/StringUtil.h" #include "utils/Timer.h" namespace gluten { @@ -169,22 +170,12 @@ std::string LocalPartitionWriter::nextSpilledFileDir() { } arrow::Status LocalPartitionWriter::setLocalDirs() { - ARROW_ASSIGN_OR_RAISE(configuredDirs_, getConfiguredLocalDirs()); + configuredDirs_ = splitPaths(shuffleWriter_->options().local_dirs); // Shuffle the configured local directories. This prevents each task from using the same directory for spilled files. std::random_device rd; std::default_random_engine engine(rd()); std::shuffle(configuredDirs_.begin(), configuredDirs_.end(), engine); - subDirSelection_.assign(configuredDirs_.size(), 0); - - // Both data_file and shuffle_index_file should be set through jni. - // For test purpose, Create a temporary subdirectory in the system temporary - // dir with prefix "columnar-shuffle" - if (shuffleWriter_->options().data_file.length() == 0) { - std::string dataFileTemp; - size_t id = std::hash{}(std::this_thread::get_id()) % configuredDirs_.size(); - ARROW_ASSIGN_OR_RAISE(shuffleWriter_->options().data_file, createTempShuffleFile(configuredDirs_[id])); - } return arrow::Status::OK(); } diff --git a/cpp/core/shuffle/Utils.cc b/cpp/core/shuffle/Utils.cc index 7e8144aaeb8e..3a5a23707890 100644 --- a/cpp/core/shuffle/Utils.cc +++ b/cpp/core/shuffle/Utils.cc @@ -16,6 +16,8 @@ */ #include "shuffle/Utils.h" +#include "options.h" +#include "utils/StringUtil.h" #include #include @@ -39,31 +41,6 @@ std::string gluten::getSpilledShuffleFileDir(const std::string& configuredDir, i return dir; } -arrow::Result> gluten::getConfiguredLocalDirs() { - auto joinedDirsC = std::getenv(kGlutenSparkLocalDirs.c_str()); - if (joinedDirsC != nullptr && strcmp(joinedDirsC, "") > 0) { - auto joinedDirs = std::string(joinedDirsC); - std::string delimiter = ","; - - size_t pos; - std::vector res; - while ((pos = joinedDirs.find(delimiter)) != std::string::npos) { - auto dir = joinedDirs.substr(0, pos); - if (dir.length() > 0) { - res.push_back(std::move(dir)); - } - joinedDirs.erase(0, pos + delimiter.length()); - } - if (joinedDirs.length() > 0) { - res.push_back(std::move(joinedDirs)); - } - return res; - } else { - ARROW_ASSIGN_OR_RAISE(auto arrow_tmp_dir, arrow::internal::TemporaryDir::Make("columnar-shuffle-")); - return std::vector{arrow_tmp_dir->path().ToString()}; - } -} - arrow::Result gluten::createTempShuffleFile(const std::string& dir) { if (dir.length() == 0) { return arrow::Status::Invalid("Failed to create spilled file, got empty path."); diff --git a/cpp/core/shuffle/Utils.h b/cpp/core/shuffle/Utils.h index 3c0ee3f65f0d..9c104d759828 100644 --- a/cpp/core/shuffle/Utils.h +++ b/cpp/core/shuffle/Utils.h @@ -34,8 +34,6 @@ std::string generateUuid(); std::string getSpilledShuffleFileDir(const std::string& configuredDir, int32_t subDirId); -arrow::Result> getConfiguredLocalDirs(); - arrow::Result createTempShuffleFile(const std::string& dir); arrow::Result>> toShuffleWriterTypeId( diff --git a/cpp/core/shuffle/options.h b/cpp/core/shuffle/options.h index ba882302a810..703c5f1b494d 100644 --- a/cpp/core/shuffle/options.h +++ b/cpp/core/shuffle/options.h @@ -61,6 +61,7 @@ struct ShuffleWriterOptions { std::string partitioning_name{}; std::string data_file{}; + std::string local_dirs{}; arrow::MemoryPool* memory_pool{}; static ShuffleWriterOptions defaults(); diff --git a/cpp/core/utils/StringUtil.cc b/cpp/core/utils/StringUtil.cc index 3d9155c26aae..d6acb9e02126 100644 --- a/cpp/core/utils/StringUtil.cc +++ b/cpp/core/utils/StringUtil.cc @@ -24,6 +24,9 @@ #include "exception.h" std::vector gluten::splitByDelim(const std::string& s, const char delimiter) { + if (s.empty()) { + return {}; + } std::vector result; size_t start = 0; size_t end = s.find(delimiter); @@ -38,13 +41,16 @@ std::vector gluten::splitByDelim(const std::string& s, const char d return result; } -std::vector gluten::splitPaths(const std::string& s) { +std::vector gluten::splitPaths(const std::string& s, bool checkExists) { + if (s.empty()) { + return {}; + } auto splits = splitByDelim(s, ','); std::vector paths; for (auto i = 0; i < splits.size(); ++i) { if (!splits[i].empty()) { std::filesystem::path path(splits[i]); - if (!std::filesystem::exists(path)) { + if (checkExists && !std::filesystem::exists(path)) { throw gluten::GlutenException("File path not exists: " + splits[i]); } if (path.is_relative()) { diff --git a/cpp/core/utils/StringUtil.h b/cpp/core/utils/StringUtil.h index f06cfa75251f..8880229616a5 100644 --- a/cpp/core/utils/StringUtil.h +++ b/cpp/core/utils/StringUtil.h @@ -21,6 +21,6 @@ namespace gluten { std::vector splitByDelim(const std::string& s, const char delimiter); -std::vector splitPaths(const std::string& s); +std::vector splitPaths(const std::string& s, bool checkExists = false); } // namespace gluten diff --git a/cpp/core/utils/macros.h b/cpp/core/utils/macros.h index 091dbbe8de9d..6fd0f15c0454 100644 --- a/cpp/core/utils/macros.h +++ b/cpp/core/utils/macros.h @@ -99,14 +99,6 @@ } \ std::cout << std::endl; -#define THROW_NOT_OK(expr) \ - do { \ - auto __s = (expr); \ - if (!__s.ok()) { \ - throw GlutenException(__s.message()); \ - } \ - } while (false); - #define TIME_TO_STRING(time) (time > 10000 ? time / 1000 : time) << (time > 10000 ? " ms" : " us") #define TIME_NANO_TO_STRING(time) \ diff --git a/cpp/velox/benchmarks/GenericBenchmark.cc b/cpp/velox/benchmarks/GenericBenchmark.cc index 8162356b77b0..b66568d5b0f6 100644 --- a/cpp/velox/benchmarks/GenericBenchmark.cc +++ b/cpp/velox/benchmarks/GenericBenchmark.cc @@ -75,6 +75,8 @@ std::shared_ptr createShuffleWriter(VeloxMemoryManager* memo options.compression_type = arrow::Compression::GZIP; } + GLUTEN_THROW_NOT_OK(setLocalDirsAndDataFileFromEnv(options)); + GLUTEN_ASSIGN_OR_THROW( auto shuffleWriter, VeloxShuffleWriter::create( diff --git a/cpp/velox/benchmarks/ShuffleSplitBenchmark.cc b/cpp/velox/benchmarks/ShuffleSplitBenchmark.cc index 9e1325e5fd82..96ce90b92d10 100644 --- a/cpp/velox/benchmarks/ShuffleSplitBenchmark.cc +++ b/cpp/velox/benchmarks/ShuffleSplitBenchmark.cc @@ -115,9 +115,9 @@ class BenchmarkShuffleSplit { auto options = ShuffleWriterOptions::defaults(); options.buffer_size = kPartitionBufferSize; - options.buffered_write = true; options.memory_pool = pool.get(); options.partitioning_name = "rr"; + GLUTEN_THROW_NOT_OK(setLocalDirsAndDataFileFromEnv(options)); std::shared_ptr shuffleWriter; int64_t elapseRead = 0; diff --git a/cpp/velox/benchmarks/common/BenchmarkUtils.cc b/cpp/velox/benchmarks/common/BenchmarkUtils.cc index f4f53ba0d166..00dc804389fe 100644 --- a/cpp/velox/benchmarks/common/BenchmarkUtils.cc +++ b/cpp/velox/benchmarks/common/BenchmarkUtils.cc @@ -19,6 +19,8 @@ #include "compute/VeloxBackend.h" #include "compute/VeloxRuntime.h" #include "config/GlutenConfig.h" +#include "shuffle/Utils.h" +#include "utils/StringUtil.h" #include "velox/dwio/common/Options.h" using namespace facebook; @@ -150,3 +152,27 @@ void setCpu(uint32_t cpuindex) { exit(EXIT_FAILURE); } } + +arrow::Status setLocalDirsAndDataFileFromEnv(gluten::ShuffleWriterOptions& options) { + auto joinedDirsC = std::getenv(gluten::kGlutenSparkLocalDirs.c_str()); + if (joinedDirsC != nullptr && strcmp(joinedDirsC, "") > 0) { + // Set local dirs. + auto joinedDirs = std::string(joinedDirsC); + options.local_dirs = joinedDirs; + // Split local dirs and use thread id to choose one directory for data file. + auto localDirs = gluten::splitPaths(joinedDirs); + size_t id = std::hash{}(std::this_thread::get_id()) % localDirs.size(); + ARROW_ASSIGN_OR_RAISE(options.data_file, gluten::createTempShuffleFile(localDirs[id])); + } else { + // Otherwise create 1 temp dir and data file. + static const std::string kBenchmarkDirsPrefix = "columnar-shuffle-benchmark-"; + { + // Because tmpDir will be deleted in the dtor, allow it to be deleted upon exiting the block and then recreate it + // in createTempShuffleFile. + ARROW_ASSIGN_OR_RAISE(auto tmpDir, arrow::internal::TemporaryDir::Make(kBenchmarkDirsPrefix)) + options.local_dirs = tmpDir->path().ToString(); + } + ARROW_ASSIGN_OR_RAISE(options.data_file, gluten::createTempShuffleFile(options.local_dirs)); + } + return arrow::Status::OK(); +} diff --git a/cpp/velox/benchmarks/common/BenchmarkUtils.h b/cpp/velox/benchmarks/common/BenchmarkUtils.h index 859d94074bca..2bea28b20632 100644 --- a/cpp/velox/benchmarks/common/BenchmarkUtils.h +++ b/cpp/velox/benchmarks/common/BenchmarkUtils.h @@ -29,6 +29,7 @@ #include "compute/ProtobufUtils.h" #include "memory/VeloxColumnarBatch.h" #include "memory/VeloxMemoryManager.h" +#include "shuffle/options.h" #include "utils/exception.h" #include "velox/common/memory/Memory.h" #include "velox/dwio/common/tests/utils/DataFiles.h" @@ -104,4 +105,6 @@ inline std::shared_ptr convertBatch(std::shared_ptr VeloxColumnarBatch::getRowBytes(int32_t rowId) const { auto fast = std::make_unique(rowVector_); auto size = fast->rowSize(rowId); char* rowBytes = new char[size]; + std::memset(rowBytes, 0, size); fast->serialize(0, rowBytes); return std::make_pair(rowBytes, size); } diff --git a/cpp/velox/udf/UdfLoader.cc b/cpp/velox/udf/UdfLoader.cc index e4c44bd96af6..96996ed27cb4 100644 --- a/cpp/velox/udf/UdfLoader.cc +++ b/cpp/velox/udf/UdfLoader.cc @@ -20,6 +20,7 @@ #include #include #include +#include #include #include "substrait/VeloxToSubstraitType.h" @@ -42,7 +43,7 @@ void* loadSymFromLibrary(void* handle, const std::string& libPath, const std::st } // namespace void gluten::UdfLoader::loadUdfLibraries(const std::string& libPaths) { - const auto& paths = splitPaths(libPaths); + const auto& paths = splitPaths(libPaths, /*checkExists=*/true); loadUdfLibraries0(paths); } diff --git a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h index 26dab9e9bd51..bc553a25fbd1 100644 --- a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h +++ b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h @@ -46,16 +46,10 @@ struct ShuffleTestParams { class VeloxShuffleWriterTestBase : public facebook::velox::test::VectorTestBase { protected: void setUp() { - const std::string tmpDirPrefix = "columnar-shuffle-test"; - GLUTEN_ASSIGN_OR_THROW(tmpDir1_, arrow::internal::TemporaryDir::Make(tmpDirPrefix)) - GLUTEN_ASSIGN_OR_THROW(tmpDir2_, arrow::internal::TemporaryDir::Make(tmpDirPrefix)) - auto configDirs = tmpDir1_->path().ToString() + "," + tmpDir2_->path().ToString(); - - setenv(kGlutenSparkLocalDirs.c_str(), configDirs.c_str(), 1); - shuffleWriterOptions_ = ShuffleWriterOptions::defaults(); shuffleWriterOptions_.compression_threshold = 0; shuffleWriterOptions_.memory_pool = defaultArrowMemoryPool().get(); + GLUTEN_THROW_NOT_OK(setLocalDirsAndDataFile()); // Set up test data. children1_ = { @@ -126,16 +120,34 @@ class VeloxShuffleWriterTestBase : public facebook::velox::test::VectorTestBase return shuffleWriter.split(cb, ShuffleWriter::kMinMemLimit); } - virtual std::shared_ptr createShuffleWriter() = 0; + // Create multiple local dirs and join with comma. + arrow::Status setLocalDirsAndDataFile() { + auto& localDirs = shuffleWriterOptions_.local_dirs; + static const std::string kTestLocalDirsPrefix = "columnar-shuffle-test-"; + + // Create first tmp dir and create data file. + // To prevent tmpDirs from being deleted in the dtor, we need to store them. + tmpDirs_.emplace_back(); + ARROW_ASSIGN_OR_RAISE(tmpDirs_.back(), arrow::internal::TemporaryDir::Make(kTestLocalDirsPrefix)) + ARROW_ASSIGN_OR_RAISE(shuffleWriterOptions_.data_file, createTempShuffleFile(tmpDirs_.back()->path().ToString())); + localDirs += tmpDirs_.back()->path().ToString(); + localDirs.push_back(','); + + // Create second tmp dir. + tmpDirs_.emplace_back(); + ARROW_ASSIGN_OR_RAISE(tmpDirs_.back(), arrow::internal::TemporaryDir::Make(kTestLocalDirsPrefix)) + localDirs += tmpDirs_.back()->path().ToString(); + return arrow::Status::OK(); + } - // Temporary local directories to mock multiple SPARK_LOCAL_DIRS for local spilled files. - std::shared_ptr tmpDir1_; - std::shared_ptr tmpDir2_; + virtual std::shared_ptr createShuffleWriter() = 0; ShuffleWriterOptions shuffleWriterOptions_; std::shared_ptr partitionWriterCreator_; + std::vector> tmpDirs_; + std::vector children1_; std::vector children2_; std::vector childrenNoNull_; @@ -152,11 +164,8 @@ class VeloxShuffleWriterTest : public ::testing::TestWithParam(std::make_shared(dataFile)); + partitionWriterCreator_ = std::make_shared( + std::make_shared(shuffleWriterOptions_.data_file)); shuffleWriterOptions_.partition_writer_type = kCeleborn; } else { partitionWriterCreator_ = std::make_shared(); diff --git a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriter.scala b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriter.scala index 3cb52dcb4104..8d22624919c2 100644 --- a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriter.scala +++ b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornHashBasedColumnarShuffleWriter.scala @@ -21,6 +21,7 @@ import io.glutenproject.memory.alloc.CHNativeMemoryAllocators import io.glutenproject.memory.memtarget.MemoryTarget import io.glutenproject.memory.memtarget.Spiller import io.glutenproject.vectorized._ +import io.glutenproject.backendsapi.clickhouse.CHBackendSettings import org.apache.spark._ import org.apache.spark.scheduler.MapStatus @@ -68,6 +69,7 @@ class CHCelebornHashBasedColumnarShuffleWriter[K, V]( nativeBufferSize, customizedCompressCodec, GlutenConfig.getConf.chColumnarShuffleSpillThreshold, + CHBackendSettings.shuffleHashAlgorithm, celebornPartitionPusher ) CHNativeMemoryAllocators.createSpillable( diff --git a/gluten-core/src/main/java/io/glutenproject/substrait/plan/PlanBuilder.java b/gluten-core/src/main/java/io/glutenproject/substrait/plan/PlanBuilder.java index d9ea9a5f589b..1452af194a48 100644 --- a/gluten-core/src/main/java/io/glutenproject/substrait/plan/PlanBuilder.java +++ b/gluten-core/src/main/java/io/glutenproject/substrait/plan/PlanBuilder.java @@ -27,6 +27,9 @@ import java.util.Map; public class PlanBuilder { + + public static byte[] EMPTY_PLAN = empty().toProtobuf().toByteArray(); + private PlanBuilder() {} public static PlanNode makePlan( diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/GlutenWholeStageColumnarRDD.scala b/gluten-core/src/main/scala/io/glutenproject/execution/GlutenWholeStageColumnarRDD.scala index 33c234daf0b2..11b95251afb6 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/GlutenWholeStageColumnarRDD.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/GlutenWholeStageColumnarRDD.scala @@ -30,21 +30,22 @@ import org.apache.spark.sql.utils.OASPackageBridge.InputMetricsWrapper import org.apache.spark.sql.vectorized.ColumnarBatch import org.apache.spark.util.ExecutorManager -import io.substrait.proto.Plan - import scala.collection.mutable trait BaseGlutenPartition extends Partition with InputPartition { - def plan: Plan + def plan: Array[Byte] } -case class GlutenPartition(index: Int, plan: Plan, locations: Array[String] = Array.empty[String]) +case class GlutenPartition( + index: Int, + plan: Array[Byte], + locations: Array[String] = Array.empty[String]) extends BaseGlutenPartition { override def preferredLocations(): Array[String] = locations } -case class GlutenFilePartition(index: Int, files: Array[PartitionedFile], plan: Plan) +case class GlutenFilePartition(index: Int, files: Array[PartitionedFile], plan: Array[Byte]) extends BaseGlutenPartition { override def preferredLocations(): Array[String] = { // Computes total number of bytes can be retrieved from each host. @@ -74,15 +75,11 @@ case class GlutenMergeTreePartition( tablePath: String, minParts: Long, maxParts: Long, - plan: Plan = PlanBuilder.empty().toProtobuf) + plan: Array[Byte] = PlanBuilder.EMPTY_PLAN) extends BaseGlutenPartition { override def preferredLocations(): Array[String] = { Array.empty[String] } - - def copySubstraitPlan(newSubstraitPlan: Plan): GlutenMergeTreePartition = { - this.copy(plan = newSubstraitPlan) - } } case class FirstZippedPartitionsPartition( diff --git a/gluten-core/src/main/scala/io/glutenproject/execution/WholeStageTransformer.scala b/gluten-core/src/main/scala/io/glutenproject/execution/WholeStageTransformer.scala index b30e2a66f8aa..57df410defc5 100644 --- a/gluten-core/src/main/scala/io/glutenproject/execution/WholeStageTransformer.scala +++ b/gluten-core/src/main/scala/io/glutenproject/execution/WholeStageTransformer.scala @@ -265,7 +265,11 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f .genFilePartition(i, currentPartitions, allScanPartitionSchemas, fileFormats, wsCxt) }) (wsCxt, substraitPlanPartitions) - }(t => logOnLevel(substraitPlanLogLevel, s"Generating the Substrait plan took: $t ms.")) + }( + t => + logOnLevel( + substraitPlanLogLevel, + s"$nodeName generating the substrait plan took: $t ms.")) new GlutenWholeStageColumnarRDD( sparkContext, @@ -291,7 +295,8 @@ case class WholeStageTransformer(child: SparkPlan, materializeInput: Boolean = f * result, genFinalStageIterator rather than genFirstStageIterator will be invoked */ val resCtx = GlutenTimeMetric.withMillisTime(doWholeStageTransform()) { - t => logOnLevel(substraitPlanLogLevel, s"Generating the Substrait plan took: $t ms.") + t => + logOnLevel(substraitPlanLogLevel, s"$nodeName generating the substrait plan took: $t ms.") } new WholeStageZippedPartitionsRDD( sparkContext, diff --git a/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionConverter.scala b/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionConverter.scala index fd293f40905a..1db768807e90 100644 --- a/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionConverter.scala +++ b/gluten-core/src/main/scala/io/glutenproject/expression/ExpressionConverter.scala @@ -79,7 +79,7 @@ object ExpressionConverter extends SQLConfHelper with Logging { expr: Expression, attributeSeq: Seq[Attribute]): ExpressionTransformer = { logDebug( - s"replaceWithExpressionTransformer expr: $expr class: ${expr.getClass}} " + + s"replaceWithExpressionTransformer expr: $expr class: ${expr.getClass} " + s"name: ${expr.prettyName}") expr match { diff --git a/gluten-core/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala b/gluten-core/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala index 75302ab8ab2b..34117333d124 100644 --- a/gluten-core/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala +++ b/gluten-core/src/test/scala/org/apache/spark/softaffinity/SoftAffinitySuite.scala @@ -62,7 +62,7 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate val locations = SoftAffinityUtil.getFilePartitionLocations(partition) - val nativePartition = new GlutenPartition(0, PlanBuilder.empty().toProtobuf, locations) + val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations) assertResult(Set("host-1", "host-2", "host-3")) { nativePartition.preferredLocations().toSet } @@ -91,7 +91,7 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate val locations = SoftAffinityUtil.getFilePartitionLocations(partition) - val nativePartition = new GlutenPartition(0, PlanBuilder.empty().toProtobuf, locations) + val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations) assertResult(Set("host-1", "host-4", "host-5")) { nativePartition.preferredLocations().toSet @@ -121,7 +121,7 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate val locations = SoftAffinityUtil.getFilePartitionLocations(partition) - val nativePartition = new GlutenPartition(0, PlanBuilder.empty().toProtobuf, locations) + val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations) assertResult(Set("executor_host-2_2", "executor_host-1_0")) { nativePartition.preferredLocations().toSet @@ -133,7 +133,7 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate val locations = SoftAffinityUtil.getNativeMergeTreePartitionLocations(partition) - val nativePartition = new GlutenPartition(0, PlanBuilder.empty().toProtobuf, locations) + val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations) assertResult(Set("executor_host-1_1")) { nativePartition.preferredLocations().toSet @@ -163,7 +163,7 @@ class SoftAffinitySuite extends QueryTest with SharedSparkSession with Predicate val locations = SoftAffinityUtil.getFilePartitionLocations(partition) - val nativePartition = new GlutenPartition(0, PlanBuilder.empty().toProtobuf, locations) + val nativePartition = GlutenPartition(0, PlanBuilder.EMPTY_PLAN, locations) assertResult(Set("host-1", "host-5", "host-6")) { nativePartition.preferredLocations().toSet diff --git a/gluten-data/src/main/java/io/glutenproject/vectorized/NativePlanEvaluator.java b/gluten-data/src/main/java/io/glutenproject/vectorized/NativePlanEvaluator.java index 44a43f016f4b..468fc72ecf00 100644 --- a/gluten-data/src/main/java/io/glutenproject/vectorized/NativePlanEvaluator.java +++ b/gluten-data/src/main/java/io/glutenproject/vectorized/NativePlanEvaluator.java @@ -24,7 +24,6 @@ import io.glutenproject.utils.DebugUtil; import io.glutenproject.validate.NativePlanValidationInfo; -import io.substrait.proto.Plan; import org.apache.spark.TaskContext; import org.apache.spark.util.SparkDirectoryUtil; @@ -58,7 +57,7 @@ public NativePlanValidationInfo doNativeValidateWithFailureReason(byte[] subPlan // Used by WholeStageTransform to create the native computing pipeline and // return a columnar result iterator. public GeneralOutIterator createKernelWithBatchIterator( - Plan wsPlan, List iterList) throws RuntimeException, IOException { + byte[] wsPlan, List iterList) throws RuntimeException, IOException { final AtomicReference outIterator = new AtomicReference<>(); final NativeMemoryManager nmm = NativeMemoryManagers.create( @@ -85,7 +84,7 @@ public GeneralOutIterator createKernelWithBatchIterator( long iterHandle = jniWrapper.nativeCreateKernelWithIterator( memoryManagerHandle, - getPlanBytesBuf(wsPlan), + wsPlan, iterList.toArray(new GeneralInIterator[0]), TaskContext.get().stageId(), TaskContext.getPartitionId(), @@ -100,8 +99,4 @@ private ColumnarBatchOutIterator createOutIterator( Runtime runtime, long iterHandle, NativeMemoryManager nmm) throws IOException { return new ColumnarBatchOutIterator(runtime, iterHandle, nmm); } - - private byte[] getPlanBytesBuf(Plan planNode) { - return planNode.toByteArray(); - } } diff --git a/tools/gluten-it/common/src/main/java/io/glutenproject/integration/tpc/command/Parameterized.java b/tools/gluten-it/common/src/main/java/io/glutenproject/integration/tpc/command/Parameterized.java index e86235232ad9..afcd8ec82490 100644 --- a/tools/gluten-it/common/src/main/java/io/glutenproject/integration/tpc/command/Parameterized.java +++ b/tools/gluten-it/common/src/main/java/io/glutenproject/integration/tpc/command/Parameterized.java @@ -87,7 +87,7 @@ public Integer call() throws Exception { // matcher2.matches dimName = matcher2.group(1); dimValueName = matcher2.group(0); - confText = matcher1.group(2).substring(1); // trim leading "," + confText = matcher2.group(2).substring(1); // trim leading "," } final List> options = new ArrayList<>();