diff --git a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BlockOutputStream.java b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BlockOutputStream.java index 40e2c2c56b77f..029d33e8feba9 100644 --- a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BlockOutputStream.java +++ b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BlockOutputStream.java @@ -38,6 +38,7 @@ public BlockOutputStream( SQLMetric dataSize, boolean compressionEnable, String defaultCompressionCodec, + int defaultCompressionLevel, int bufferSize) { OutputStream unwrapOutputStream = CHShuffleWriteStreamFactory.unwrapSparkCompressionOutputStream( @@ -50,7 +51,7 @@ public BlockOutputStream( } this.instance = nativeCreate( - this.outputStream, buffer, defaultCompressionCodec, compressionEnable, bufferSize); + this.outputStream, buffer, defaultCompressionCodec, defaultCompressionLevel, compressionEnable, bufferSize); this.dataSize = dataSize; } @@ -58,6 +59,7 @@ private native long nativeCreate( OutputStream outputStream, byte[] buffer, String defaultCompressionCodec, + int defaultCompressionLevel, boolean compressionEnable, int bufferSize); diff --git a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHShuffleSplitterJniWrapper.java b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHShuffleSplitterJniWrapper.java index 864cc4eb70ace..7bc4f5dac6b82 100644 --- a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHShuffleSplitterJniWrapper.java +++ b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHShuffleSplitterJniWrapper.java @@ -27,6 +27,7 @@ public long make( long mapId, int bufferSize, String codec, + int level, String dataFile, String localDirs, int subDirsPerLocalDir, @@ -43,6 +44,7 @@ public long make( mapId, bufferSize, codec, + level, dataFile, localDirs, subDirsPerLocalDir, @@ -58,6 +60,7 @@ public long makeForRSS( long mapId, int bufferSize, String codec, + int level, long spillThreshold, String hashAlgorithm, Object pusher, @@ -71,6 +74,7 @@ public long makeForRSS( mapId, bufferSize, codec, + level, spillThreshold, hashAlgorithm, pusher, @@ -86,6 +90,7 @@ public native long nativeMake( long mapId, int bufferSize, String codec, + int level, String dataFile, String localDirs, int subDirsPerLocalDir, @@ -103,6 +108,7 @@ public native long nativeMakeForRSS( long mapId, int bufferSize, String codec, + int level, long spillThreshold, String hashAlgorithm, Object pusher, diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/CHColumnarBatchSerializer.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/CHColumnarBatchSerializer.scala index f640bfd2d7f19..b27e80572ca20 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/CHColumnarBatchSerializer.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/CHColumnarBatchSerializer.scala @@ -54,8 +54,10 @@ private class CHColumnarBatchSerializerInstance( extends SerializerInstance with Logging { - private lazy val compressionCodec = - GlutenShuffleUtils.getCompressionCodec(SparkEnv.get.conf).toUpperCase(Locale.ROOT) + private lazy val conf = SparkEnv.get.conf + private lazy val compressionCodec = GlutenShuffleUtils.getCompressionCodec(conf) + private lazy val customizedCompressCodec = compressionCodec.toUpperCase(Locale.ROOT) + private lazy val compressionLevel = GlutenShuffleUtils.getCompressionLevel(conf, compressionCodec, null) override def deserializeStream(in: InputStream): DeserializationStream = { new DeserializationStream { @@ -136,7 +138,8 @@ private class CHColumnarBatchSerializerInstance( writeBuffer, dataSize, CHBackendSettings.useCustomizedShuffleCodec, - compressionCodec, + customizedCompressCodec, + compressionLevel, CHBackendSettings.customizeBufferSize ) diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala b/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala index db9bba5f170a3..1c8eaed4accf1 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala @@ -51,8 +51,9 @@ class CHColumnarShuffleWriter[K, V]( .mkString(",") private val subDirsPerLocalDir = blockManager.diskBlockManager.subDirsPerLocalDir private val splitSize = GlutenConfig.getConf.maxBatchSize - private val customizedCompressCodec = - GlutenShuffleUtils.getCompressionCodec(conf).toUpperCase(Locale.ROOT) + private val compressionCodec = GlutenShuffleUtils.getCompressionCodec(conf) + private val customizedCompressCodec = compressionCodec.toUpperCase(Locale.ROOT) + private val compressionLevel = GlutenShuffleUtils.getCompressionLevel(conf, compressionCodec, null) private val maxSortBufferSize = GlutenConfig.getConf.chColumnarMaxSortBufferSize private val forceMemorySortShuffle = GlutenConfig.getConf.chColumnarForceMemorySortShuffle private val spillThreshold = GlutenConfig.getConf.chColumnarShuffleSpillThreshold @@ -99,6 +100,7 @@ class CHColumnarShuffleWriter[K, V]( mapId, splitSize, customizedCompressCodec, + compressionLevel, dataTmp.getAbsolutePath, localDirs, subDirsPerLocalDir, diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala index 17eb0ed0b037b..e73000593896f 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala @@ -59,14 +59,15 @@ object CHExecUtil extends Logging { dataSize: SQLMetric, iter: Iterator[ColumnarBatch], compressionCodec: Option[String] = Some("lz4"), + compressionLevel: Option[Int] = Some(Int.MinValue), bufferSize: Int = 4 << 10): Iterator[(Int, Array[Byte])] = { var count = 0 val bos = new ByteArrayOutputStream() val buffer = new Array[Byte](bufferSize) // 4K val blockOutputStream = compressionCodec - .map(new BlockOutputStream(bos, buffer, dataSize, true, _, bufferSize)) - .getOrElse(new BlockOutputStream(bos, buffer, dataSize, false, "", bufferSize)) + .map(new BlockOutputStream(bos, buffer, dataSize, true, _, compressionLevel.getOrElse(Int.MinValue), bufferSize)) + .getOrElse(new BlockOutputStream(bos, buffer, dataSize, false, "", compressionLevel.getOrElse(Int.MinValue), bufferSize)) while (iter.hasNext) { val batch = iter.next() count += batch.numRows diff --git a/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHStorageJoinBenchmark.scala b/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHStorageJoinBenchmark.scala index 194eccc50878a..35914df49a38d 100644 --- a/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHStorageJoinBenchmark.scala +++ b/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHStorageJoinBenchmark.scala @@ -191,7 +191,7 @@ object CHStorageJoinBenchmark extends SqlBasedBenchmark with CHSqlBasedBenchmark batch => val bos = new ByteArrayOutputStream() val buffer = new Array[Byte](4 << 10) // 4K - val dout = new BlockOutputStream(bos, buffer, dataSize, true, "lz4", buffer.length) + val dout = new BlockOutputStream(bos, buffer, dataSize, true, "lz4", Int.MinValue, buffer.length) dout.write(batch) dout.flush() dout.close() diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp index a2ef0888aeff5..a870fe2f5fb2c 100644 --- a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp +++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp @@ -139,7 +139,7 @@ size_t LocalPartitionWriter::evictPartitions() { auto file = getNextSpillFile(); WriteBufferFromFile output(file, shuffle_writer->options.io_buffer_size); - auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), {}); + auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), std::optional(shuffle_writer->options.compress_level)); CompressedWriteBuffer compressed_output(output, codec, shuffle_writer->options.io_buffer_size); NativeWriter writer(compressed_output, shuffle_writer->output_header); @@ -200,7 +200,7 @@ String Spillable::getNextSpillFile() std::vector Spillable::mergeSpills(CachedShuffleWriter * shuffle_writer, WriteBuffer & data_file, ExtraData extra_data) { - auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), {}); + auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), std::optional(shuffle_writer->options.compress_level)); CompressedWriteBuffer compressed_output(data_file, codec, shuffle_writer->options.io_buffer_size); NativeWriter writer(compressed_output, shuffle_writer->output_header); @@ -352,7 +352,7 @@ size_t MemorySortLocalPartitionWriter::evictPartitions() return; auto file = getNextSpillFile(); WriteBufferFromFile output(file, shuffle_writer->options.io_buffer_size); - auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), {}); + auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), std::optional(shuffle_writer->options.compress_level)); CompressedWriteBuffer compressed_output(output, codec, shuffle_writer->options.io_buffer_size); NativeWriter writer(compressed_output, output_header); @@ -453,7 +453,7 @@ size_t MemorySortCelebornPartitionWriter::evictPartitions() return; WriteBufferFromOwnString output; - auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), {}); + auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), std::optional(shuffle_writer->options.compress_level)); CompressedWriteBuffer compressed_output(output, codec, shuffle_writer->options.io_buffer_size); NativeWriter writer(compressed_output, shuffle_writer->output_header); @@ -564,7 +564,7 @@ size_t CelebornPartitionWriter::evictSinglePartition(size_t partition_id) return; WriteBufferFromOwnString output; - auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), {}); + auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), std::optional(shuffle_writer->options.compress_level)); CompressedWriteBuffer compressed_output(output, codec, shuffle_writer->options.io_buffer_size); NativeWriter writer(compressed_output, shuffle_writer->output_header); diff --git a/cpp-ch/local-engine/Shuffle/ShuffleWriter.cpp b/cpp-ch/local-engine/Shuffle/ShuffleWriter.cpp index dddf0b895fdff..0e2a331044ee3 100644 --- a/cpp-ch/local-engine/Shuffle/ShuffleWriter.cpp +++ b/cpp-ch/local-engine/Shuffle/ShuffleWriter.cpp @@ -25,13 +25,13 @@ using namespace DB; namespace local_engine { ShuffleWriter::ShuffleWriter( - jobject output_stream, jbyteArray buffer, const std::string & codecStr, bool enable_compression, size_t customize_buffer_size) + jobject output_stream, jbyteArray buffer, const std::string & codecStr, jint level, bool enable_compression, size_t customize_buffer_size) { compression_enable = enable_compression; write_buffer = std::make_unique(output_stream, buffer, customize_buffer_size); if (compression_enable) { - auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(codecStr), {}); + auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(codecStr), std::optional(level)); compressed_out = std::make_unique(*write_buffer, codec); } } diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp index c4e8ec67b106a..6283881afae5a 100644 --- a/cpp-ch/local-engine/local_engine_jni.cpp +++ b/cpp-ch/local-engine/local_engine_jni.cpp @@ -544,6 +544,7 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_na jlong map_id, jint split_size, jstring codec, + jint compress_level, jstring data_file, jstring local_dirs, jint num_sub_dirs, @@ -585,6 +586,7 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_na .hash_exprs = hash_exprs, .out_exprs = out_exprs, .compress_method = jstring2string(env, codec), + .compress_level = compress_level, .spill_threshold = static_cast(spill_threshold), .hash_algorithm = jstring2string(env, hash_algorithm), .max_sort_buffer_size = static_cast(max_sort_buffer_size), @@ -606,6 +608,7 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_na jlong map_id, jint split_size, jstring codec, + jint compress_level, jlong spill_threshold, jstring hash_algorithm, jobject pusher, @@ -637,6 +640,7 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_na .hash_exprs = hash_exprs, .out_exprs = out_exprs, .compress_method = jstring2string(env, codec), + .compress_level = compress_level, .spill_threshold = static_cast(spill_threshold), .hash_algorithm = jstring2string(env, hash_algorithm), .force_memory_sort = static_cast(force_memory_sort)}; @@ -1158,11 +1162,11 @@ JNIEXPORT jint Java_org_apache_gluten_vectorized_BlockSplitIterator_nativeNextPa } JNIEXPORT jlong Java_org_apache_gluten_vectorized_BlockOutputStream_nativeCreate( - JNIEnv * env, jobject, jobject output_stream, jbyteArray buffer, jstring codec, jboolean compressed, jint customize_buffer_size) + JNIEnv * env, jobject, jobject output_stream, jbyteArray buffer, jstring codec, jint level, jboolean compressed, jint customize_buffer_size) { LOCAL_ENGINE_JNI_METHOD_START local_engine::ShuffleWriter * writer - = new local_engine::ShuffleWriter(output_stream, buffer, jstring2string(env, codec), compressed, customize_buffer_size); + = new local_engine::ShuffleWriter(output_stream, buffer, jstring2string(env, codec), level, compressed, customize_buffer_size); return reinterpret_cast(writer); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } diff --git a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarBatchSerializer.scala b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarBatchSerializer.scala index 3619855f74ed5..74bffeb0f8b3a 100644 --- a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarBatchSerializer.scala +++ b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarBatchSerializer.scala @@ -58,8 +58,10 @@ private class CHCelebornColumnarBatchSerializerInstance( extends SerializerInstance with Logging { - private lazy val compressionCodec = - GlutenShuffleUtils.getCompressionCodec(SparkEnv.get.conf).toUpperCase(Locale.ROOT) + private lazy val conf = SparkEnv.get.conf + private lazy val compressionCodec = GlutenShuffleUtils.getCompressionCodec(conf) + private lazy val customizedCompressCodec = compressionCodec.toUpperCase(Locale.ROOT) + private lazy val compressionLevel = GlutenShuffleUtils.getCompressionLevel(conf, compressionCodec, null) override def deserializeStream(in: InputStream): DeserializationStream = { new DeserializationStream { @@ -199,7 +201,8 @@ private class CHCelebornColumnarBatchSerializerInstance( writeBuffer, dataSize, CHBackendSettings.useCustomizedShuffleCodec, - compressionCodec, + customizedCompressCodec, + compressionLevel, CHBackendSettings.customizeBufferSize ) diff --git a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala index c7d7957c15b6b..d28da685e9d6b 100644 --- a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala +++ b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala @@ -106,6 +106,7 @@ class CHCelebornColumnarShuffleWriter[K, V]( mapId, nativeBufferSize, customizedCompressCodec, + compressionLevel, GlutenConfig.getConf.chColumnarShuffleSpillThreshold, CHBackendSettings.shuffleHashAlgorithm, celebornPartitionPusher,