From 7037175c92a9e1e76f78076410d72c37d2d88ee1 Mon Sep 17 00:00:00 2001 From: Nicholas Jiang Date: Mon, 12 Aug 2024 11:14:12 +0800 Subject: [PATCH] [GLUTEN-6724][CH] Shuffle writer supports compression level configuration for CompressionCodecFactory (#6725) * [GLUTEN-6724][CH] Shuffle writer supports compression level configuration for CompressionCodecFactory * [GLUTEN-6724][CH] Shuffle writer supports compression level configuration for CompressionCodecFactory --- .../apache/gluten/vectorized/BlockOutputStream.java | 9 ++++++++- .../vectorized/CHShuffleSplitterJniWrapper.java | 6 ++++++ .../vectorized/CHColumnarBatchSerializer.scala | 13 ++++++++++--- .../spark/shuffle/CHColumnarShuffleWriter.scala | 12 +++++++++--- .../spark/sql/execution/utils/CHExecUtil.scala | 6 ++++-- .../benchmarks/CHStorageJoinBenchmark.scala | 3 ++- cpp-ch/local-engine/Shuffle/PartitionWriter.cpp | 10 +++++----- cpp-ch/local-engine/Shuffle/ShuffleCommon.h | 2 +- cpp-ch/local-engine/Shuffle/ShuffleWriter.cpp | 4 ++-- cpp-ch/local-engine/Shuffle/ShuffleWriter.h | 2 +- cpp-ch/local-engine/local_engine_jni.cpp | 8 ++++++-- .../shuffle/CHCelebornColumnarBatchSerializer.scala | 11 ++++++++--- .../shuffle/CHCelebornColumnarShuffleWriter.scala | 5 +++-- .../shuffle/CelebornColumnarShuffleWriter.scala | 5 ++++- .../writer/VeloxUniffleColumnarShuffleWriter.java | 8 ++++++-- 15 files changed, 75 insertions(+), 29 deletions(-) diff --git a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BlockOutputStream.java b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BlockOutputStream.java index 40e2c2c56b77..e209010b2f85 100644 --- a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BlockOutputStream.java +++ b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/BlockOutputStream.java @@ -38,6 +38,7 @@ public BlockOutputStream( SQLMetric dataSize, boolean compressionEnable, String defaultCompressionCodec, + int defaultCompressionLevel, int bufferSize) { OutputStream unwrapOutputStream = CHShuffleWriteStreamFactory.unwrapSparkCompressionOutputStream( @@ -50,7 +51,12 @@ public BlockOutputStream( } this.instance = nativeCreate( - this.outputStream, buffer, defaultCompressionCodec, compressionEnable, bufferSize); + this.outputStream, + buffer, + defaultCompressionCodec, + defaultCompressionLevel, + compressionEnable, + bufferSize); this.dataSize = dataSize; } @@ -58,6 +64,7 @@ private native long nativeCreate( OutputStream outputStream, byte[] buffer, String defaultCompressionCodec, + int defaultCompressionLevel, boolean compressionEnable, int bufferSize); diff --git a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHShuffleSplitterJniWrapper.java b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHShuffleSplitterJniWrapper.java index 864cc4eb70ac..7bc4f5dac6b8 100644 --- a/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHShuffleSplitterJniWrapper.java +++ b/backends-clickhouse/src/main/java/org/apache/gluten/vectorized/CHShuffleSplitterJniWrapper.java @@ -27,6 +27,7 @@ public long make( long mapId, int bufferSize, String codec, + int level, String dataFile, String localDirs, int subDirsPerLocalDir, @@ -43,6 +44,7 @@ public long make( mapId, bufferSize, codec, + level, dataFile, localDirs, subDirsPerLocalDir, @@ -58,6 +60,7 @@ public long makeForRSS( long mapId, int bufferSize, String codec, + int level, long spillThreshold, String hashAlgorithm, Object pusher, @@ -71,6 +74,7 @@ public long makeForRSS( mapId, bufferSize, codec, + level, spillThreshold, hashAlgorithm, pusher, @@ -86,6 +90,7 @@ public native long nativeMake( long mapId, int bufferSize, String codec, + int level, String dataFile, String localDirs, int subDirsPerLocalDir, @@ -103,6 +108,7 @@ public native long nativeMakeForRSS( long mapId, int bufferSize, String codec, + int level, long spillThreshold, String hashAlgorithm, Object pusher, diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/CHColumnarBatchSerializer.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/CHColumnarBatchSerializer.scala index f640bfd2d7f1..fa6f8addf163 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/CHColumnarBatchSerializer.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/vectorized/CHColumnarBatchSerializer.scala @@ -54,8 +54,14 @@ private class CHColumnarBatchSerializerInstance( extends SerializerInstance with Logging { - private lazy val compressionCodec = - GlutenShuffleUtils.getCompressionCodec(SparkEnv.get.conf).toUpperCase(Locale.ROOT) + private lazy val conf = SparkEnv.get.conf + private lazy val compressionCodec = GlutenShuffleUtils.getCompressionCodec(conf) + private lazy val capitalizedCompressionCodec = compressionCodec.toUpperCase(Locale.ROOT) + private lazy val compressionLevel = + GlutenShuffleUtils.getCompressionLevel( + conf, + compressionCodec, + GlutenConfig.getConf.columnarShuffleCodecBackend.orNull) override def deserializeStream(in: InputStream): DeserializationStream = { new DeserializationStream { @@ -136,7 +142,8 @@ private class CHColumnarBatchSerializerInstance( writeBuffer, dataSize, CHBackendSettings.useCustomizedShuffleCodec, - compressionCodec, + capitalizedCompressionCodec, + compressionLevel, CHBackendSettings.customizeBufferSize ) diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala b/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala index d608734307fb..758c487a18aa 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/shuffle/CHColumnarShuffleWriter.scala @@ -51,8 +51,13 @@ class CHColumnarShuffleWriter[K, V]( .mkString(",") private val subDirsPerLocalDir = blockManager.diskBlockManager.subDirsPerLocalDir private val splitSize = GlutenConfig.getConf.maxBatchSize - private val customizedCompressCodec = - GlutenShuffleUtils.getCompressionCodec(conf).toUpperCase(Locale.ROOT) + private val compressionCodec = GlutenShuffleUtils.getCompressionCodec(conf) + private val capitalizedCompressionCodec = compressionCodec.toUpperCase(Locale.ROOT) + private val compressionLevel = + GlutenShuffleUtils.getCompressionLevel( + conf, + compressionCodec, + GlutenConfig.getConf.columnarShuffleCodecBackend.orNull) private val maxSortBufferSize = GlutenConfig.getConf.chColumnarMaxSortBufferSize private val forceMemorySortShuffle = GlutenConfig.getConf.chColumnarForceMemorySortShuffle private val spillThreshold = GlutenConfig.getConf.chColumnarShuffleSpillThreshold @@ -98,7 +103,8 @@ class CHColumnarShuffleWriter[K, V]( dep.shuffleId, mapId, splitSize, - customizedCompressCodec, + capitalizedCompressionCodec, + compressionLevel, dataTmp.getAbsolutePath, localDirs, subDirsPerLocalDir, diff --git a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala index 7526e6d3d70d..38c15fde7cdc 100644 --- a/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala +++ b/backends-clickhouse/src/main/scala/org/apache/spark/sql/execution/utils/CHExecUtil.scala @@ -59,14 +59,16 @@ object CHExecUtil extends Logging { dataSize: SQLMetric, iter: Iterator[ColumnarBatch], compressionCodec: Option[String] = Some("lz4"), + compressionLevel: Option[Int] = None, bufferSize: Int = 4 << 10): Iterator[(Int, Array[Byte])] = { var count = 0 val bos = new ByteArrayOutputStream() val buffer = new Array[Byte](bufferSize) // 4K + val level = compressionLevel.getOrElse(Int.MinValue) val blockOutputStream = compressionCodec - .map(new BlockOutputStream(bos, buffer, dataSize, true, _, bufferSize)) - .getOrElse(new BlockOutputStream(bos, buffer, dataSize, false, "", bufferSize)) + .map(new BlockOutputStream(bos, buffer, dataSize, true, _, level, bufferSize)) + .getOrElse(new BlockOutputStream(bos, buffer, dataSize, false, "", level, bufferSize)) while (iter.hasNext) { val batch = iter.next() count += batch.numRows diff --git a/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHStorageJoinBenchmark.scala b/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHStorageJoinBenchmark.scala index f8cd4bf57cc3..322c9521e74e 100644 --- a/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHStorageJoinBenchmark.scala +++ b/backends-clickhouse/src/test/scala/org/apache/spark/sql/execution/benchmarks/CHStorageJoinBenchmark.scala @@ -191,7 +191,8 @@ object CHStorageJoinBenchmark extends SqlBasedBenchmark with CHSqlBasedBenchmark batch => val bos = new ByteArrayOutputStream() val buffer = new Array[Byte](4 << 10) // 4K - val dout = new BlockOutputStream(bos, buffer, dataSize, true, "lz4", buffer.length) + val dout = + new BlockOutputStream(bos, buffer, dataSize, true, "lz4", Int.MinValue, buffer.length) dout.write(batch) dout.flush() dout.close() diff --git a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp index 58be564213cb..2f22d0e24139 100644 --- a/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp +++ b/cpp-ch/local-engine/Shuffle/PartitionWriter.cpp @@ -139,7 +139,7 @@ size_t LocalPartitionWriter::evictPartitions() { auto file = getNextSpillFile(); WriteBufferFromFile output(file, shuffle_writer->options.io_buffer_size); - auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), {}); + auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), shuffle_writer->options.compress_level); CompressedWriteBuffer compressed_output(output, codec, shuffle_writer->options.io_buffer_size); NativeWriter writer(compressed_output, shuffle_writer->output_header); @@ -200,7 +200,7 @@ String Spillable::getNextSpillFile() std::vector Spillable::mergeSpills(CachedShuffleWriter * shuffle_writer, WriteBuffer & data_file, ExtraData extra_data) { - auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), {}); + auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), shuffle_writer->options.compress_level); CompressedWriteBuffer compressed_output(data_file, codec, shuffle_writer->options.io_buffer_size); NativeWriter writer(compressed_output, shuffle_writer->output_header); @@ -352,7 +352,7 @@ size_t MemorySortLocalPartitionWriter::evictPartitions() return; auto file = getNextSpillFile(); WriteBufferFromFile output(file, shuffle_writer->options.io_buffer_size); - auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), {}); + auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), shuffle_writer->options.compress_level); CompressedWriteBuffer compressed_output(output, codec, shuffle_writer->options.io_buffer_size); NativeWriter writer(compressed_output, output_header); @@ -453,7 +453,7 @@ size_t MemorySortCelebornPartitionWriter::evictPartitions() return; WriteBufferFromOwnString output; - auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), {}); + auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), shuffle_writer->options.compress_level); CompressedWriteBuffer compressed_output(output, codec, shuffle_writer->options.io_buffer_size); NativeWriter writer(compressed_output, shuffle_writer->output_header); @@ -564,7 +564,7 @@ size_t CelebornPartitionWriter::evictSinglePartition(size_t partition_id) return; WriteBufferFromOwnString output; - auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), {}); + auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), shuffle_writer->options.compress_level); CompressedWriteBuffer compressed_output(output, codec, shuffle_writer->options.io_buffer_size); NativeWriter writer(compressed_output, shuffle_writer->output_header); diff --git a/cpp-ch/local-engine/Shuffle/ShuffleCommon.h b/cpp-ch/local-engine/Shuffle/ShuffleCommon.h index d398362aa4b6..052f6d2e37e9 100644 --- a/cpp-ch/local-engine/Shuffle/ShuffleCommon.h +++ b/cpp-ch/local-engine/Shuffle/ShuffleCommon.h @@ -44,7 +44,7 @@ struct SplitOptions std::string hash_exprs; std::string out_exprs; std::string compress_method = "zstd"; - int compress_level; + std::optional compress_level; size_t spill_threshold = 300 * 1024 * 1024; std::string hash_algorithm; size_t max_sort_buffer_size = 1_GiB; diff --git a/cpp-ch/local-engine/Shuffle/ShuffleWriter.cpp b/cpp-ch/local-engine/Shuffle/ShuffleWriter.cpp index dddf0b895fdf..8aa624ff9979 100644 --- a/cpp-ch/local-engine/Shuffle/ShuffleWriter.cpp +++ b/cpp-ch/local-engine/Shuffle/ShuffleWriter.cpp @@ -25,13 +25,13 @@ using namespace DB; namespace local_engine { ShuffleWriter::ShuffleWriter( - jobject output_stream, jbyteArray buffer, const std::string & codecStr, bool enable_compression, size_t customize_buffer_size) + jobject output_stream, jbyteArray buffer, const std::string & codecStr, jint level, bool enable_compression, size_t customize_buffer_size) { compression_enable = enable_compression; write_buffer = std::make_unique(output_stream, buffer, customize_buffer_size); if (compression_enable) { - auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(codecStr), {}); + auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(codecStr), level < 0 ? std::nullopt : std::optional(level)); compressed_out = std::make_unique(*write_buffer, codec); } } diff --git a/cpp-ch/local-engine/Shuffle/ShuffleWriter.h b/cpp-ch/local-engine/Shuffle/ShuffleWriter.h index 98f67d1ccadb..541e93e0347c 100644 --- a/cpp-ch/local-engine/Shuffle/ShuffleWriter.h +++ b/cpp-ch/local-engine/Shuffle/ShuffleWriter.h @@ -24,7 +24,7 @@ class ShuffleWriter { public: ShuffleWriter( - jobject output_stream, jbyteArray buffer, const std::string & codecStr, bool enable_compression, size_t customize_buffer_size); + jobject output_stream, jbyteArray buffer, const std::string & codecStr, jint level, bool enable_compression, size_t customize_buffer_size); virtual ~ShuffleWriter(); void write(const DB::Block & block); void flush(); diff --git a/cpp-ch/local-engine/local_engine_jni.cpp b/cpp-ch/local-engine/local_engine_jni.cpp index db0dd8b623b6..828556b4abf6 100644 --- a/cpp-ch/local-engine/local_engine_jni.cpp +++ b/cpp-ch/local-engine/local_engine_jni.cpp @@ -544,6 +544,7 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_na jlong map_id, jint split_size, jstring codec, + jint compress_level, jstring data_file, jstring local_dirs, jint num_sub_dirs, @@ -585,6 +586,7 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_na .hash_exprs = hash_exprs, .out_exprs = out_exprs, .compress_method = jstring2string(env, codec), + .compress_level = compress_level < 0 ? std::nullopt : std::optional(compress_level), .spill_threshold = static_cast(spill_threshold), .hash_algorithm = jstring2string(env, hash_algorithm), .max_sort_buffer_size = static_cast(max_sort_buffer_size), @@ -606,6 +608,7 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_na jlong map_id, jint split_size, jstring codec, + jint compress_level, jlong spill_threshold, jstring hash_algorithm, jobject pusher, @@ -637,6 +640,7 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_na .hash_exprs = hash_exprs, .out_exprs = out_exprs, .compress_method = jstring2string(env, codec), + .compress_level = compress_level < 0 ? std::nullopt : std::optional(compress_level), .spill_threshold = static_cast(spill_threshold), .hash_algorithm = jstring2string(env, hash_algorithm), .force_memory_sort = static_cast(force_memory_sort)}; @@ -1160,11 +1164,11 @@ JNIEXPORT jint Java_org_apache_gluten_vectorized_BlockSplitIterator_nativeNextPa } JNIEXPORT jlong Java_org_apache_gluten_vectorized_BlockOutputStream_nativeCreate( - JNIEnv * env, jobject, jobject output_stream, jbyteArray buffer, jstring codec, jboolean compressed, jint customize_buffer_size) + JNIEnv * env, jobject, jobject output_stream, jbyteArray buffer, jstring codec, jint level, jboolean compressed, jint customize_buffer_size) { LOCAL_ENGINE_JNI_METHOD_START local_engine::ShuffleWriter * writer - = new local_engine::ShuffleWriter(output_stream, buffer, jstring2string(env, codec), compressed, customize_buffer_size); + = new local_engine::ShuffleWriter(output_stream, buffer, jstring2string(env, codec), level, compressed, customize_buffer_size); return reinterpret_cast(writer); LOCAL_ENGINE_JNI_METHOD_END(env, -1) } diff --git a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarBatchSerializer.scala b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarBatchSerializer.scala index 3619855f74ed..5072ce6a1a2e 100644 --- a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarBatchSerializer.scala +++ b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarBatchSerializer.scala @@ -58,8 +58,12 @@ private class CHCelebornColumnarBatchSerializerInstance( extends SerializerInstance with Logging { - private lazy val compressionCodec = - GlutenShuffleUtils.getCompressionCodec(SparkEnv.get.conf).toUpperCase(Locale.ROOT) + private lazy val conf = SparkEnv.get.conf + private lazy val compressionCodec = GlutenShuffleUtils.getCompressionCodec(conf) + private lazy val capitalizedCompressionCodec = compressionCodec.toUpperCase(Locale.ROOT) + private lazy val compressionLevel = + GlutenShuffleUtils.getCompressionLevel(conf, compressionCodec, + GlutenConfig.getConf.columnarShuffleCodecBackend.orNull) override def deserializeStream(in: InputStream): DeserializationStream = { new DeserializationStream { @@ -199,7 +203,8 @@ private class CHCelebornColumnarBatchSerializerInstance( writeBuffer, dataSize, CHBackendSettings.useCustomizedShuffleCodec, - compressionCodec, + capitalizedCompressionCodec, + compressionLevel, CHBackendSettings.customizeBufferSize ) diff --git a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala index c7d7957c15b6..9b99e533f935 100644 --- a/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala +++ b/gluten-celeborn/clickhouse/src/main/scala/org/apache/spark/shuffle/CHCelebornColumnarShuffleWriter.scala @@ -47,7 +47,7 @@ class CHCelebornColumnarShuffleWriter[K, V]( client, writeMetrics) { - private val customizedCompressCodec = + private val capitalizedCompressionCodec = customizedCompressionCodec.toUpperCase(Locale.ROOT) private val jniWrapper = new CHShuffleSplitterJniWrapper @@ -105,7 +105,8 @@ class CHCelebornColumnarShuffleWriter[K, V]( shuffleId, mapId, nativeBufferSize, - customizedCompressCodec, + capitalizedCompressionCodec, + compressionLevel, GlutenConfig.getConf.chColumnarShuffleSpillThreshold, CHBackendSettings.shuffleHashAlgorithm, celebornPartitionPusher, diff --git a/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornColumnarShuffleWriter.scala b/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornColumnarShuffleWriter.scala index f71fadd4cd64..3f7c3586ced2 100644 --- a/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornColumnarShuffleWriter.scala +++ b/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornColumnarShuffleWriter.scala @@ -94,7 +94,10 @@ abstract class CelebornColumnarShuffleWriter[K, V]( } protected val compressionLevel: Int = - GlutenShuffleUtils.getCompressionLevel(conf, customizedCompressionCodec, null) + GlutenShuffleUtils.getCompressionLevel( + conf, + customizedCompressionCodec, + GlutenConfig.getConf.columnarShuffleCodecBackend.orNull) protected val bufferCompressThreshold: Int = GlutenConfig.getConf.columnarShuffleCompressionThreshold diff --git a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java index d505260b8412..2219fc674431 100644 --- a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java +++ b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java @@ -66,7 +66,7 @@ public class VeloxUniffleColumnarShuffleWriter extends RssShuffleWriter null)); } - compressionLevel = GlutenShuffleUtils.getCompressionLevel(sparkConf, compressionCodec, null); } @Override