Skip to content

Commit

Permalink
[GLUTEN-6724][CH] Shuffle writer supports compression level configura…
Browse files Browse the repository at this point in the history
…tion for CompressionCodecFactory (apache#6725)

* [GLUTEN-6724][CH] Shuffle writer supports compression level configuration for CompressionCodecFactory

* [GLUTEN-6724][CH] Shuffle writer supports compression level configuration for CompressionCodecFactory
  • Loading branch information
SteNicholas authored and shamirchen committed Oct 14, 2024
1 parent ac4e3de commit 7037175
Show file tree
Hide file tree
Showing 15 changed files with 75 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public BlockOutputStream(
SQLMetric dataSize,
boolean compressionEnable,
String defaultCompressionCodec,
int defaultCompressionLevel,
int bufferSize) {
OutputStream unwrapOutputStream =
CHShuffleWriteStreamFactory.unwrapSparkCompressionOutputStream(
Expand All @@ -50,14 +51,20 @@ public BlockOutputStream(
}
this.instance =
nativeCreate(
this.outputStream, buffer, defaultCompressionCodec, compressionEnable, bufferSize);
this.outputStream,
buffer,
defaultCompressionCodec,
defaultCompressionLevel,
compressionEnable,
bufferSize);
this.dataSize = dataSize;
}

private native long nativeCreate(
OutputStream outputStream,
byte[] buffer,
String defaultCompressionCodec,
int defaultCompressionLevel,
boolean compressionEnable,
int bufferSize);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public long make(
long mapId,
int bufferSize,
String codec,
int level,
String dataFile,
String localDirs,
int subDirsPerLocalDir,
Expand All @@ -43,6 +44,7 @@ public long make(
mapId,
bufferSize,
codec,
level,
dataFile,
localDirs,
subDirsPerLocalDir,
Expand All @@ -58,6 +60,7 @@ public long makeForRSS(
long mapId,
int bufferSize,
String codec,
int level,
long spillThreshold,
String hashAlgorithm,
Object pusher,
Expand All @@ -71,6 +74,7 @@ public long makeForRSS(
mapId,
bufferSize,
codec,
level,
spillThreshold,
hashAlgorithm,
pusher,
Expand All @@ -86,6 +90,7 @@ public native long nativeMake(
long mapId,
int bufferSize,
String codec,
int level,
String dataFile,
String localDirs,
int subDirsPerLocalDir,
Expand All @@ -103,6 +108,7 @@ public native long nativeMakeForRSS(
long mapId,
int bufferSize,
String codec,
int level,
long spillThreshold,
String hashAlgorithm,
Object pusher,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,14 @@ private class CHColumnarBatchSerializerInstance(
extends SerializerInstance
with Logging {

private lazy val compressionCodec =
GlutenShuffleUtils.getCompressionCodec(SparkEnv.get.conf).toUpperCase(Locale.ROOT)
private lazy val conf = SparkEnv.get.conf
private lazy val compressionCodec = GlutenShuffleUtils.getCompressionCodec(conf)
private lazy val capitalizedCompressionCodec = compressionCodec.toUpperCase(Locale.ROOT)
private lazy val compressionLevel =
GlutenShuffleUtils.getCompressionLevel(
conf,
compressionCodec,
GlutenConfig.getConf.columnarShuffleCodecBackend.orNull)

override def deserializeStream(in: InputStream): DeserializationStream = {
new DeserializationStream {
Expand Down Expand Up @@ -136,7 +142,8 @@ private class CHColumnarBatchSerializerInstance(
writeBuffer,
dataSize,
CHBackendSettings.useCustomizedShuffleCodec,
compressionCodec,
capitalizedCompressionCodec,
compressionLevel,
CHBackendSettings.customizeBufferSize
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,13 @@ class CHColumnarShuffleWriter[K, V](
.mkString(",")
private val subDirsPerLocalDir = blockManager.diskBlockManager.subDirsPerLocalDir
private val splitSize = GlutenConfig.getConf.maxBatchSize
private val customizedCompressCodec =
GlutenShuffleUtils.getCompressionCodec(conf).toUpperCase(Locale.ROOT)
private val compressionCodec = GlutenShuffleUtils.getCompressionCodec(conf)
private val capitalizedCompressionCodec = compressionCodec.toUpperCase(Locale.ROOT)
private val compressionLevel =
GlutenShuffleUtils.getCompressionLevel(
conf,
compressionCodec,
GlutenConfig.getConf.columnarShuffleCodecBackend.orNull)
private val maxSortBufferSize = GlutenConfig.getConf.chColumnarMaxSortBufferSize
private val forceMemorySortShuffle = GlutenConfig.getConf.chColumnarForceMemorySortShuffle
private val spillThreshold = GlutenConfig.getConf.chColumnarShuffleSpillThreshold
Expand Down Expand Up @@ -98,7 +103,8 @@ class CHColumnarShuffleWriter[K, V](
dep.shuffleId,
mapId,
splitSize,
customizedCompressCodec,
capitalizedCompressionCodec,
compressionLevel,
dataTmp.getAbsolutePath,
localDirs,
subDirsPerLocalDir,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,16 @@ object CHExecUtil extends Logging {
dataSize: SQLMetric,
iter: Iterator[ColumnarBatch],
compressionCodec: Option[String] = Some("lz4"),
compressionLevel: Option[Int] = None,
bufferSize: Int = 4 << 10): Iterator[(Int, Array[Byte])] = {
var count = 0
val bos = new ByteArrayOutputStream()
val buffer = new Array[Byte](bufferSize) // 4K
val level = compressionLevel.getOrElse(Int.MinValue)
val blockOutputStream =
compressionCodec
.map(new BlockOutputStream(bos, buffer, dataSize, true, _, bufferSize))
.getOrElse(new BlockOutputStream(bos, buffer, dataSize, false, "", bufferSize))
.map(new BlockOutputStream(bos, buffer, dataSize, true, _, level, bufferSize))
.getOrElse(new BlockOutputStream(bos, buffer, dataSize, false, "", level, bufferSize))
while (iter.hasNext) {
val batch = iter.next()
count += batch.numRows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,8 @@ object CHStorageJoinBenchmark extends SqlBasedBenchmark with CHSqlBasedBenchmark
batch =>
val bos = new ByteArrayOutputStream()
val buffer = new Array[Byte](4 << 10) // 4K
val dout = new BlockOutputStream(bos, buffer, dataSize, true, "lz4", buffer.length)
val dout =
new BlockOutputStream(bos, buffer, dataSize, true, "lz4", Int.MinValue, buffer.length)
dout.write(batch)
dout.flush()
dout.close()
Expand Down
10 changes: 5 additions & 5 deletions cpp-ch/local-engine/Shuffle/PartitionWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ size_t LocalPartitionWriter::evictPartitions()
{
auto file = getNextSpillFile();
WriteBufferFromFile output(file, shuffle_writer->options.io_buffer_size);
auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), {});
auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), shuffle_writer->options.compress_level);
CompressedWriteBuffer compressed_output(output, codec, shuffle_writer->options.io_buffer_size);
NativeWriter writer(compressed_output, shuffle_writer->output_header);

Expand Down Expand Up @@ -200,7 +200,7 @@ String Spillable::getNextSpillFile()

std::vector<UInt64> Spillable::mergeSpills(CachedShuffleWriter * shuffle_writer, WriteBuffer & data_file, ExtraData extra_data)
{
auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), {});
auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), shuffle_writer->options.compress_level);

CompressedWriteBuffer compressed_output(data_file, codec, shuffle_writer->options.io_buffer_size);
NativeWriter writer(compressed_output, shuffle_writer->output_header);
Expand Down Expand Up @@ -352,7 +352,7 @@ size_t MemorySortLocalPartitionWriter::evictPartitions()
return;
auto file = getNextSpillFile();
WriteBufferFromFile output(file, shuffle_writer->options.io_buffer_size);
auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), {});
auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), shuffle_writer->options.compress_level);
CompressedWriteBuffer compressed_output(output, codec, shuffle_writer->options.io_buffer_size);
NativeWriter writer(compressed_output, output_header);

Expand Down Expand Up @@ -453,7 +453,7 @@ size_t MemorySortCelebornPartitionWriter::evictPartitions()
return;

WriteBufferFromOwnString output;
auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), {});
auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), shuffle_writer->options.compress_level);
CompressedWriteBuffer compressed_output(output, codec, shuffle_writer->options.io_buffer_size);
NativeWriter writer(compressed_output, shuffle_writer->output_header);

Expand Down Expand Up @@ -564,7 +564,7 @@ size_t CelebornPartitionWriter::evictSinglePartition(size_t partition_id)
return;

WriteBufferFromOwnString output;
auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), {});
auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(shuffle_writer->options.compress_method), shuffle_writer->options.compress_level);
CompressedWriteBuffer compressed_output(output, codec, shuffle_writer->options.io_buffer_size);
NativeWriter writer(compressed_output, shuffle_writer->output_header);

Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Shuffle/ShuffleCommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ struct SplitOptions
std::string hash_exprs;
std::string out_exprs;
std::string compress_method = "zstd";
int compress_level;
std::optional<int> compress_level;
size_t spill_threshold = 300 * 1024 * 1024;
std::string hash_algorithm;
size_t max_sort_buffer_size = 1_GiB;
Expand Down
4 changes: 2 additions & 2 deletions cpp-ch/local-engine/Shuffle/ShuffleWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@ using namespace DB;
namespace local_engine
{
ShuffleWriter::ShuffleWriter(
jobject output_stream, jbyteArray buffer, const std::string & codecStr, bool enable_compression, size_t customize_buffer_size)
jobject output_stream, jbyteArray buffer, const std::string & codecStr, jint level, bool enable_compression, size_t customize_buffer_size)
{
compression_enable = enable_compression;
write_buffer = std::make_unique<WriteBufferFromJavaOutputStream>(output_stream, buffer, customize_buffer_size);
if (compression_enable)
{
auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(codecStr), {});
auto codec = DB::CompressionCodecFactory::instance().get(boost::to_upper_copy(codecStr), level < 0 ? std::nullopt : std::optional<int>(level));
compressed_out = std::make_unique<CompressedWriteBuffer>(*write_buffer, codec);
}
}
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Shuffle/ShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class ShuffleWriter
{
public:
ShuffleWriter(
jobject output_stream, jbyteArray buffer, const std::string & codecStr, bool enable_compression, size_t customize_buffer_size);
jobject output_stream, jbyteArray buffer, const std::string & codecStr, jint level, bool enable_compression, size_t customize_buffer_size);
virtual ~ShuffleWriter();
void write(const DB::Block & block);
void flush();
Expand Down
8 changes: 6 additions & 2 deletions cpp-ch/local-engine/local_engine_jni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,7 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_na
jlong map_id,
jint split_size,
jstring codec,
jint compress_level,
jstring data_file,
jstring local_dirs,
jint num_sub_dirs,
Expand Down Expand Up @@ -585,6 +586,7 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_na
.hash_exprs = hash_exprs,
.out_exprs = out_exprs,
.compress_method = jstring2string(env, codec),
.compress_level = compress_level < 0 ? std::nullopt : std::optional<int>(compress_level),
.spill_threshold = static_cast<size_t>(spill_threshold),
.hash_algorithm = jstring2string(env, hash_algorithm),
.max_sort_buffer_size = static_cast<size_t>(max_sort_buffer_size),
Expand All @@ -606,6 +608,7 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_na
jlong map_id,
jint split_size,
jstring codec,
jint compress_level,
jlong spill_threshold,
jstring hash_algorithm,
jobject pusher,
Expand Down Expand Up @@ -637,6 +640,7 @@ JNIEXPORT jlong Java_org_apache_gluten_vectorized_CHShuffleSplitterJniWrapper_na
.hash_exprs = hash_exprs,
.out_exprs = out_exprs,
.compress_method = jstring2string(env, codec),
.compress_level = compress_level < 0 ? std::nullopt : std::optional<int>(compress_level),
.spill_threshold = static_cast<size_t>(spill_threshold),
.hash_algorithm = jstring2string(env, hash_algorithm),
.force_memory_sort = static_cast<bool>(force_memory_sort)};
Expand Down Expand Up @@ -1160,11 +1164,11 @@ JNIEXPORT jint Java_org_apache_gluten_vectorized_BlockSplitIterator_nativeNextPa
}

JNIEXPORT jlong Java_org_apache_gluten_vectorized_BlockOutputStream_nativeCreate(
JNIEnv * env, jobject, jobject output_stream, jbyteArray buffer, jstring codec, jboolean compressed, jint customize_buffer_size)
JNIEnv * env, jobject, jobject output_stream, jbyteArray buffer, jstring codec, jint level, jboolean compressed, jint customize_buffer_size)
{
LOCAL_ENGINE_JNI_METHOD_START
local_engine::ShuffleWriter * writer
= new local_engine::ShuffleWriter(output_stream, buffer, jstring2string(env, codec), compressed, customize_buffer_size);
= new local_engine::ShuffleWriter(output_stream, buffer, jstring2string(env, codec), level, compressed, customize_buffer_size);
return reinterpret_cast<jlong>(writer);
LOCAL_ENGINE_JNI_METHOD_END(env, -1)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,12 @@ private class CHCelebornColumnarBatchSerializerInstance(
extends SerializerInstance
with Logging {

private lazy val compressionCodec =
GlutenShuffleUtils.getCompressionCodec(SparkEnv.get.conf).toUpperCase(Locale.ROOT)
private lazy val conf = SparkEnv.get.conf
private lazy val compressionCodec = GlutenShuffleUtils.getCompressionCodec(conf)
private lazy val capitalizedCompressionCodec = compressionCodec.toUpperCase(Locale.ROOT)
private lazy val compressionLevel =
GlutenShuffleUtils.getCompressionLevel(conf, compressionCodec,
GlutenConfig.getConf.columnarShuffleCodecBackend.orNull)

override def deserializeStream(in: InputStream): DeserializationStream = {
new DeserializationStream {
Expand Down Expand Up @@ -199,7 +203,8 @@ private class CHCelebornColumnarBatchSerializerInstance(
writeBuffer,
dataSize,
CHBackendSettings.useCustomizedShuffleCodec,
compressionCodec,
capitalizedCompressionCodec,
compressionLevel,
CHBackendSettings.customizeBufferSize
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ class CHCelebornColumnarShuffleWriter[K, V](
client,
writeMetrics) {

private val customizedCompressCodec =
private val capitalizedCompressionCodec =
customizedCompressionCodec.toUpperCase(Locale.ROOT)

private val jniWrapper = new CHShuffleSplitterJniWrapper
Expand Down Expand Up @@ -105,7 +105,8 @@ class CHCelebornColumnarShuffleWriter[K, V](
shuffleId,
mapId,
nativeBufferSize,
customizedCompressCodec,
capitalizedCompressionCodec,
compressionLevel,
GlutenConfig.getConf.chColumnarShuffleSpillThreshold,
CHBackendSettings.shuffleHashAlgorithm,
celebornPartitionPusher,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,10 @@ abstract class CelebornColumnarShuffleWriter[K, V](
}

protected val compressionLevel: Int =
GlutenShuffleUtils.getCompressionLevel(conf, customizedCompressionCodec, null)
GlutenShuffleUtils.getCompressionLevel(
conf,
customizedCompressionCodec,
GlutenConfig.getConf.columnarShuffleCodecBackend.orNull)

protected val bufferCompressThreshold: Int =
GlutenConfig.getConf.columnarShuffleCompressionThreshold
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public class VeloxUniffleColumnarShuffleWriter<K, V> extends RssShuffleWriter<K,
GlutenConfig.getConf().columnarShuffleCompressionThreshold();
private final double reallocThreshold = GlutenConfig.getConf().columnarShuffleReallocThreshold();
private String compressionCodec;
private final int compressionLevel;
private int compressionLevel;
private final int partitionId;

private final Runtime runtime = Runtimes.contextInstance("UniffleShuffleWriter");
Expand Down Expand Up @@ -120,8 +120,12 @@ public VeloxUniffleColumnarShuffleWriter(
RssSparkConfig.RSS_WRITER_BUFFER_SIZE.defaultValue().get());
if ((boolean) sparkConf.get(package$.MODULE$.SHUFFLE_COMPRESS())) {
compressionCodec = GlutenShuffleUtils.getCompressionCodec(sparkConf);
compressionLevel =
GlutenShuffleUtils.getCompressionLevel(
sparkConf,
compressionCodec,
GlutenConfig.getConf().columnarShuffleCodecBackend().getOrElse(() -> null));
}
compressionLevel = GlutenShuffleUtils.getCompressionLevel(sparkConf, compressionCodec, null);
}

@Override
Expand Down

0 comments on commit 7037175

Please sign in to comment.