diff --git a/cpp/core/jni/JniWrapper.cc b/cpp/core/jni/JniWrapper.cc
index 53af168508ba..2f8158d527b6 100644
--- a/cpp/core/jni/JniWrapper.cc
+++ b/cpp/core/jni/JniWrapper.cc
@@ -773,6 +773,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
     jstring codecJstr,
     jstring codecBackendJstr,
     jint compressionLevel,
+    jint compressionBufferSize,
     jint compressionThreshold,
     jstring compressionModeJstr,
     jint sortBufferInitialSize,
@@ -804,6 +805,7 @@ JNIEXPORT jlong JNICALL Java_org_apache_gluten_vectorized_ShuffleWriterJniWrappe
       .startPartitionId = startPartitionId,
       .shuffleWriterType = gluten::ShuffleWriter::stringToType(jStringToCString(env, shuffleWriterTypeJstr)),
       .sortBufferInitialSize = sortBufferInitialSize,
+      .compressionBufferSize = compressionBufferSize,
       .useRadixSort = static_cast<bool>(useRadixSort)};
 
   // Build PartitionWriterOptions.
diff --git a/cpp/core/shuffle/LocalPartitionWriter.cc b/cpp/core/shuffle/LocalPartitionWriter.cc
index fe206b488cf8..f0edfa257357 100644
--- a/cpp/core/shuffle/LocalPartitionWriter.cc
+++ b/cpp/core/shuffle/LocalPartitionWriter.cc
@@ -385,17 +385,15 @@ std::string LocalPartitionWriter::nextSpilledFileDir() {
   return spilledFileDir;
 }
 
-arrow::Status LocalPartitionWriter::openDataFile() {
-  // open data file output stream
+arrow::Result<std::shared_ptr<arrow::io::OutputStream>> LocalPartitionWriter::openFile(const std::string& file) {
   std::shared_ptr<arrow::io::FileOutputStream> fout;
-  ARROW_ASSIGN_OR_RAISE(fout, arrow::io::FileOutputStream::Open(dataFile_));
+  ARROW_ASSIGN_OR_RAISE(fout, arrow::io::FileOutputStream::Open(file));
   if (options_.bufferedWrite) {
-    // Output stream buffer is neither partition buffer memory nor ipc memory.
-    ARROW_ASSIGN_OR_RAISE(dataFileOs_, arrow::io::BufferedOutputStream::Create(16384, pool_, fout));
-  } else {
-    dataFileOs_ = fout;
+    // The 16k bytes is a temporary allocation and will be freed with file close.
+    // Use default memory pool and count treat the memory as executor memory overhead to avoid unnecessary spill.
+    return arrow::io::BufferedOutputStream::Create(16384, arrow::default_memory_pool(), fout);
   }
-  return arrow::Status::OK();
+  return fout;
 }
 
 arrow::Status LocalPartitionWriter::clearResource() {
@@ -467,9 +465,7 @@ arrow::Status LocalPartitionWriter::stop(ShuffleWriterMetrics* metrics) {
     compressTime_ += spill->compressTime();
   } else {
     RETURN_NOT_OK(finishSpill(true));
-    // Open final data file.
-    // If options_.bufferedWrite is set, it will acquire 16KB memory that can trigger spill.
-    RETURN_NOT_OK(openDataFile());
+    ARROW_ASSIGN_OR_RAISE(dataFileOs_, openFile(dataFile_));
 
     int64_t endInFinalFile = 0;
     DLOG(INFO) << "LocalPartitionWriter stopped. Total spills: " << spills_.size();
@@ -523,14 +519,13 @@ arrow::Status LocalPartitionWriter::requestSpill(bool isFinal) {
     std::string spillFile;
     std::shared_ptr<arrow::io::OutputStream> os;
     if (isFinal) {
-      RETURN_NOT_OK(openDataFile());
+      ARROW_ASSIGN_OR_RAISE(dataFileOs_, openFile(dataFile_));
       spillFile = dataFile_;
       os = dataFileOs_;
       useSpillFileAsDataFile_ = true;
     } else {
       ARROW_ASSIGN_OR_RAISE(spillFile, createTempShuffleFile(nextSpilledFileDir()));
-      ARROW_ASSIGN_OR_RAISE(auto raw, arrow::io::FileOutputStream::Open(spillFile, true));
-      ARROW_ASSIGN_OR_RAISE(os, arrow::io::BufferedOutputStream::Create(16384, pool_, raw));
+      ARROW_ASSIGN_OR_RAISE(os, openFile(spillFile));
     }
     spiller_ = std::make_unique<LocalSpiller>(
         os, std::move(spillFile), options_.compressionThreshold, payloadPool_.get(), codec_.get());
@@ -548,42 +543,14 @@ arrow::Status LocalPartitionWriter::finishSpill(bool close) {
   return arrow::Status::OK();
 }
 
-arrow::Status LocalPartitionWriter::evict(
+arrow::Status LocalPartitionWriter::hashEvict(
     uint32_t partitionId,
     std::unique_ptr<InMemoryPayload> inMemoryPayload,
     Evict::type evictType,
     bool reuseBuffers,
-    bool hasComplexType,
-    bool isFinal) {
+    bool hasComplexType) {
   rawPartitionLengths_[partitionId] += inMemoryPayload->rawSize();
 
-  if (evictType == Evict::kSortSpill) {
-    if (lastEvictPid_ != -1 && (partitionId < lastEvictPid_ || (isFinal && !dataFileOs_))) {
-      lastEvictPid_ = -1;
-      RETURN_NOT_OK(finishSpill(true));
-    }
-    RETURN_NOT_OK(requestSpill(isFinal));
-
-    auto payloadType = codec_ ? Payload::Type::kCompressed : Payload::Type::kUncompressed;
-    ARROW_ASSIGN_OR_RAISE(
-        auto payload,
-        inMemoryPayload->toBlockPayload(payloadType, payloadPool_.get(), codec_ ? codec_.get() : nullptr));
-    if (!isFinal) {
-      RETURN_NOT_OK(spiller_->spill(partitionId, std::move(payload)));
-    } else {
-      if (spills_.size() > 0) {
-        for (auto pid = lastEvictPid_ + 1; pid <= partitionId; ++pid) {
-          auto bytesEvicted = totalBytesEvicted_;
-          RETURN_NOT_OK(mergeSpills(pid));
-          partitionLengths_[pid] = totalBytesEvicted_ - bytesEvicted;
-        }
-      }
-      RETURN_NOT_OK(spiller_->spill(partitionId, std::move(payload)));
-    }
-    lastEvictPid_ = partitionId;
-    return arrow::Status::OK();
-  }
-
   if (evictType == Evict::kSpill) {
     RETURN_NOT_OK(requestSpill(false));
     ARROW_ASSIGN_OR_RAISE(
@@ -609,6 +576,40 @@ arrow::Status LocalPartitionWriter::evict(
   return arrow::Status::OK();
 }
 
+arrow::Status LocalPartitionWriter::sortEvict(
+    uint32_t partitionId,
+    std::unique_ptr<InMemoryPayload> inMemoryPayload,
+    std::shared_ptr<arrow::Buffer> compressed,
+    bool isFinal) {
+  rawPartitionLengths_[partitionId] += inMemoryPayload->rawSize();
+
+  if (lastEvictPid_ != -1 && (partitionId < lastEvictPid_ || (isFinal && !dataFileOs_))) {
+    lastEvictPid_ = -1;
+    RETURN_NOT_OK(finishSpill(true));
+  }
+  RETURN_NOT_OK(requestSpill(isFinal));
+
+  auto payloadType = codec_ ? Payload::Type::kCompressed : Payload::Type::kUncompressed;
+  ARROW_ASSIGN_OR_RAISE(
+      auto payload,
+      inMemoryPayload->toBlockPayload(
+          payloadType, payloadPool_.get(), codec_ ? codec_.get() : nullptr, std::move(compressed)));
+  if (!isFinal) {
+    RETURN_NOT_OK(spiller_->spill(partitionId, std::move(payload)));
+  } else {
+    if (spills_.size() > 0) {
+      for (auto pid = lastEvictPid_ + 1; pid <= partitionId; ++pid) {
+        auto bytesEvicted = totalBytesEvicted_;
+        RETURN_NOT_OK(mergeSpills(pid));
+        partitionLengths_[pid] = totalBytesEvicted_ - bytesEvicted;
+      }
+    }
+    RETURN_NOT_OK(spiller_->spill(partitionId, std::move(payload)));
+  }
+  lastEvictPid_ = partitionId;
+  return arrow::Status::OK();
+}
+
 // FIXME: Remove this code path for local partition writer.
 arrow::Status LocalPartitionWriter::evict(uint32_t partitionId, std::unique_ptr<BlockPayload> blockPayload, bool stop) {
   rawPartitionLengths_[partitionId] += blockPayload->rawSize();
@@ -644,8 +645,7 @@ arrow::Status LocalPartitionWriter::reclaimFixedSize(int64_t size, int64_t* actu
   if (payloadCache_ && payloadCache_->canSpill()) {
     auto beforeSpill = payloadPool_->bytes_allocated();
     ARROW_ASSIGN_OR_RAISE(auto spillFile, createTempShuffleFile(nextSpilledFileDir()));
-    ARROW_ASSIGN_OR_RAISE(auto raw, arrow::io::FileOutputStream::Open(spillFile, true));
-    ARROW_ASSIGN_OR_RAISE(auto os, arrow::io::BufferedOutputStream::Create(16384, pool_, raw));
+    ARROW_ASSIGN_OR_RAISE(auto os, openFile(spillFile));
     spills_.emplace_back();
     ARROW_ASSIGN_OR_RAISE(
         spills_.back(),
diff --git a/cpp/core/shuffle/LocalPartitionWriter.h b/cpp/core/shuffle/LocalPartitionWriter.h
index 555632fedd5d..826eb22f7431 100644
--- a/cpp/core/shuffle/LocalPartitionWriter.h
+++ b/cpp/core/shuffle/LocalPartitionWriter.h
@@ -35,12 +35,17 @@ class LocalPartitionWriter : public PartitionWriter {
       const std::string& dataFile,
       const std::vector<std::string>& localDirs);
 
-  arrow::Status evict(
+  arrow::Status hashEvict(
       uint32_t partitionId,
       std::unique_ptr<InMemoryPayload> inMemoryPayload,
       Evict::type evictType,
       bool reuseBuffers,
-      bool hasComplexType,
+      bool hasComplexType) override;
+
+  arrow::Status sortEvict(
+      uint32_t partitionId,
+      std::unique_ptr<InMemoryPayload> inMemoryPayload,
+      std::shared_ptr<arrow::Buffer> compressed,
       bool isFinal) override;
 
   arrow::Status evict(uint32_t partitionId, std::unique_ptr<BlockPayload> blockPayload, bool stop) override;
@@ -87,7 +92,7 @@ class LocalPartitionWriter : public PartitionWriter {
 
   std::string nextSpilledFileDir();
 
-  arrow::Status openDataFile();
+  arrow::Result<std::shared_ptr<arrow::io::OutputStream>> openFile(const std::string& file);
 
   arrow::Status mergeSpills(uint32_t partitionId);
 
diff --git a/cpp/core/shuffle/Options.h b/cpp/core/shuffle/Options.h
index 2424ec557742..a3dc9f6260b0 100644
--- a/cpp/core/shuffle/Options.h
+++ b/cpp/core/shuffle/Options.h
@@ -30,6 +30,7 @@ static constexpr int64_t kDefaultSortBufferThreshold = 64 << 20;
 static constexpr int64_t kDefaultPushMemoryThreshold = 4096;
 static constexpr int32_t kDefaultNumSubDirs = 64;
 static constexpr int32_t kDefaultCompressionThreshold = 100;
+static constexpr int32_t kDefaultCompressionBufferSize = 32 * 1024;
 static const std::string kDefaultCompressionTypeStr = "lz4";
 static constexpr int32_t kDefaultBufferAlignment = 64;
 static constexpr double kDefaultBufferReallocThreshold = 0.25;
@@ -62,6 +63,7 @@ struct ShuffleWriterOptions {
 
   // Sort shuffle writer.
   int32_t sortBufferInitialSize = kDefaultSortBufferSize;
+  int32_t compressionBufferSize = kDefaultCompressionBufferSize;
   bool useRadixSort = kDefaultUseRadixSort;
 };
 
diff --git a/cpp/core/shuffle/PartitionWriter.h b/cpp/core/shuffle/PartitionWriter.h
index 3774198c996e..3a44d3836581 100644
--- a/cpp/core/shuffle/PartitionWriter.h
+++ b/cpp/core/shuffle/PartitionWriter.h
@@ -26,7 +26,7 @@
 namespace gluten {
 
 struct Evict {
-  enum type { kCache, kSpill, kSortSpill };
+  enum type { kCache, kSpill };
 };
 
 class PartitionWriter : public Reclaimable {
@@ -42,14 +42,26 @@ class PartitionWriter : public Reclaimable {
   virtual arrow::Status stop(ShuffleWriterMetrics* metrics) = 0;
 
   /// Evict buffers for `partitionId` partition.
-  virtual arrow::Status evict(
+  virtual arrow::Status hashEvict(
       uint32_t partitionId,
       std::unique_ptr<InMemoryPayload> inMemoryPayload,
       Evict::type evictType,
       bool reuseBuffers,
-      bool hasComplexType,
+      bool hasComplexType) = 0;
+
+  virtual arrow::Status sortEvict(
+      uint32_t partitionId,
+      std::unique_ptr<InMemoryPayload> inMemoryPayload,
+      std::shared_ptr<arrow::Buffer> compressed,
       bool isFinal) = 0;
 
+  std::optional<int64_t> getCompressedBufferLength(const std::vector<std::shared_ptr<arrow::Buffer>>& buffers) {
+    if (!codec_) {
+      return std::nullopt;
+    }
+    return BlockPayload::maxCompressedLength(buffers, codec_.get());
+  }
+
   virtual arrow::Status evict(uint32_t partitionId, std::unique_ptr<BlockPayload> blockPayload, bool stop) = 0;
 
   uint64_t cachedPayloadSize() {
diff --git a/cpp/core/shuffle/Payload.cc b/cpp/core/shuffle/Payload.cc
index d0c24e4bcaba..62e1823fe874 100644
--- a/cpp/core/shuffle/Payload.cc
+++ b/cpp/core/shuffle/Payload.cc
@@ -186,29 +186,30 @@ arrow::Result<std::unique_ptr<BlockPayload>> BlockPayload::fromBuffers(
     std::vector<std::shared_ptr<arrow::Buffer>> buffers,
     const std::vector<bool>* isValidityBuffer,
     arrow::MemoryPool* pool,
-    arrow::util::Codec* codec) {
+    arrow::util::Codec* codec,
+    std::shared_ptr<arrow::Buffer> compressed) {
   if (payloadType == Payload::Type::kCompressed) {
     Timer compressionTime;
     compressionTime.start();
     // Compress.
-    // Compressed buffer layout: | buffer1 compressedLength | buffer1 uncompressedLength | buffer1 | ...
-    const auto metadataLength = sizeof(int64_t) * 2 * buffers.size();
-    int64_t totalCompressedLength =
-        std::accumulate(buffers.begin(), buffers.end(), 0LL, [&](auto sum, const auto& buffer) {
-          if (!buffer) {
-            return sum;
-          }
-          return sum + codec->MaxCompressedLen(buffer->size(), buffer->data());
-        });
-    const auto maxCompressedLength = metadataLength + totalCompressedLength;
-    ARROW_ASSIGN_OR_RAISE(
-        std::shared_ptr<arrow::ResizableBuffer> compressed, arrow::AllocateResizableBuffer(maxCompressedLength, pool));
-
-    auto output = compressed->mutable_data();
+    auto maxLength = maxCompressedLength(buffers, codec);
+    std::shared_ptr<arrow::Buffer> compressedBuffer;
+    uint8_t* output;
+    if (compressed) {
+      ARROW_RETURN_IF(
+          compressed->size() < maxLength,
+          arrow::Status::Invalid(
+              "Compressed buffer length < maxCompressedLength. (", compressed->size(), " vs ", maxLength, ")"));
+      output = const_cast<uint8_t*>(compressed->data());
+    } else {
+      ARROW_ASSIGN_OR_RAISE(compressedBuffer, arrow::AllocateResizableBuffer(maxLength, pool));
+      output = compressedBuffer->mutable_data();
+    }
+
     int64_t actualLength = 0;
     // Compress buffers one by one.
     for (auto& buffer : buffers) {
-      auto availableLength = maxCompressedLength - actualLength;
+      auto availableLength = maxLength - actualLength;
       // Release buffer after compression.
       ARROW_ASSIGN_OR_RAISE(auto compressedSize, compressBuffer(std::move(buffer), output, availableLength, codec));
       output += compressedSize;
@@ -216,15 +217,14 @@ arrow::Result<std::unique_ptr<BlockPayload>> BlockPayload::fromBuffers(
     }
 
     ARROW_RETURN_IF(actualLength < 0, arrow::Status::Invalid("Writing compressed buffer out of bound."));
-    RETURN_NOT_OK(compressed->Resize(actualLength));
+    if (compressed) {
+      compressedBuffer = std::make_shared<arrow::Buffer>(compressed->data(), actualLength);
+    } else {
+      RETURN_NOT_OK(std::dynamic_pointer_cast<arrow::ResizableBuffer>(compressedBuffer)->Resize(actualLength));
+    }
     compressionTime.stop();
-    auto payload = std::unique_ptr<BlockPayload>(new BlockPayload(
-        Type::kCompressed,
-        numRows,
-        std::vector<std::shared_ptr<arrow::Buffer>>{compressed},
-        isValidityBuffer,
-        pool,
-        codec));
+    auto payload = std::unique_ptr<BlockPayload>(
+        new BlockPayload(Type::kCompressed, numRows, {compressedBuffer}, isValidityBuffer, pool, codec));
     payload->setCompressionTime(compressionTime.realTimeUsed());
     return payload;
   }
@@ -329,6 +329,21 @@ int64_t BlockPayload::rawSize() {
   return getBufferSize(buffers_);
 }
 
+int64_t BlockPayload::maxCompressedLength(
+    const std::vector<std::shared_ptr<arrow::Buffer>>& buffers,
+    arrow::util::Codec* codec) {
+  // Compressed buffer layout: | buffer1 compressedLength | buffer1 uncompressedLength | buffer1 | ...
+  const auto metadataLength = sizeof(int64_t) * 2 * buffers.size();
+  int64_t totalCompressedLength =
+      std::accumulate(buffers.begin(), buffers.end(), 0LL, [&](auto sum, const auto& buffer) {
+        if (!buffer) {
+          return sum;
+        }
+        return sum + codec->MaxCompressedLen(buffer->size(), buffer->data());
+      });
+  return metadataLength + totalCompressedLength;
+}
+
 arrow::Result<std::unique_ptr<InMemoryPayload>> InMemoryPayload::merge(
     std::unique_ptr<InMemoryPayload> source,
     std::unique_ptr<InMemoryPayload> append,
@@ -404,9 +419,13 @@ arrow::Result<std::unique_ptr<InMemoryPayload>> InMemoryPayload::merge(
   return std::make_unique<InMemoryPayload>(mergedRows, isValidityBuffer, std::move(merged));
 }
 
-arrow::Result<std::unique_ptr<BlockPayload>>
-InMemoryPayload::toBlockPayload(Payload::Type payloadType, arrow::MemoryPool* pool, arrow::util::Codec* codec) {
-  return BlockPayload::fromBuffers(payloadType, numRows_, std::move(buffers_), isValidityBuffer_, pool, codec);
+arrow::Result<std::unique_ptr<BlockPayload>> InMemoryPayload::toBlockPayload(
+    Payload::Type payloadType,
+    arrow::MemoryPool* pool,
+    arrow::util::Codec* codec,
+    std::shared_ptr<arrow::Buffer> compressed) {
+  return BlockPayload::fromBuffers(
+      payloadType, numRows_, std::move(buffers_), isValidityBuffer_, pool, codec, std::move(compressed));
 }
 
 arrow::Status InMemoryPayload::serialize(arrow::io::OutputStream* outputStream) {
diff --git a/cpp/core/shuffle/Payload.h b/cpp/core/shuffle/Payload.h
index 1bd8815a4c2a..ea8c897e96d0 100644
--- a/cpp/core/shuffle/Payload.h
+++ b/cpp/core/shuffle/Payload.h
@@ -84,7 +84,8 @@ class BlockPayload final : public Payload {
       std::vector<std::shared_ptr<arrow::Buffer>> buffers,
       const std::vector<bool>* isValidityBuffer,
       arrow::MemoryPool* pool,
-      arrow::util::Codec* codec);
+      arrow::util::Codec* codec,
+      std::shared_ptr<arrow::Buffer> compressed);
 
   static arrow::Result<std::vector<std::shared_ptr<arrow::Buffer>>> deserialize(
       arrow::io::InputStream* inputStream,
@@ -93,6 +94,10 @@ class BlockPayload final : public Payload {
       uint32_t& numRows,
       int64_t& decompressTime);
 
+  static int64_t maxCompressedLength(
+      const std::vector<std::shared_ptr<arrow::Buffer>>& buffers,
+      arrow::util::Codec* codec);
+
   arrow::Status serialize(arrow::io::OutputStream* outputStream) override;
 
   arrow::Result<std::shared_ptr<arrow::Buffer>> readBufferAt(uint32_t pos) override;
@@ -131,8 +136,11 @@ class InMemoryPayload final : public Payload {
 
   arrow::Result<std::shared_ptr<arrow::Buffer>> readBufferAt(uint32_t index) override;
 
-  arrow::Result<std::unique_ptr<BlockPayload>>
-  toBlockPayload(Payload::Type payloadType, arrow::MemoryPool* pool, arrow::util::Codec* codec);
+  arrow::Result<std::unique_ptr<BlockPayload>> toBlockPayload(
+      Payload::Type payloadType,
+      arrow::MemoryPool* pool,
+      arrow::util::Codec* codec,
+      std::shared_ptr<arrow::Buffer> compressed = nullptr);
 
   arrow::Status copyBuffers(arrow::MemoryPool* pool);
 
diff --git a/cpp/core/shuffle/rss/RssPartitionWriter.cc b/cpp/core/shuffle/rss/RssPartitionWriter.cc
index 8f75f999335f..8a072365a296 100644
--- a/cpp/core/shuffle/rss/RssPartitionWriter.cc
+++ b/cpp/core/shuffle/rss/RssPartitionWriter.cc
@@ -49,17 +49,41 @@ arrow::Status RssPartitionWriter::reclaimFixedSize(int64_t size, int64_t* actual
   return arrow::Status::OK();
 }
 
-arrow::Status RssPartitionWriter::evict(
+arrow::Status RssPartitionWriter::hashEvict(
     uint32_t partitionId,
     std::unique_ptr<InMemoryPayload> inMemoryPayload,
     Evict::type evictType,
     bool reuseBuffers,
-    bool hasComplexType,
+    bool hasComplexType) {
+  return doEvict(partitionId, std::move(inMemoryPayload), nullptr);
+}
+
+arrow::Status RssPartitionWriter::sortEvict(
+    uint32_t partitionId,
+    std::unique_ptr<InMemoryPayload> inMemoryPayload,
+    std::shared_ptr<arrow::Buffer> compressed,
     bool isFinal) {
+  return doEvict(partitionId, std::move(inMemoryPayload), std::move(compressed));
+}
+
+arrow::Status RssPartitionWriter::evict(uint32_t partitionId, std::unique_ptr<BlockPayload> blockPayload, bool) {
+  rawPartitionLengths_[partitionId] += blockPayload->rawSize();
+  ScopedTimer timer(&spillTime_);
+  ARROW_ASSIGN_OR_RAISE(auto buffer, blockPayload->readBufferAt(0));
+  bytesEvicted_[partitionId] += rssClient_->pushPartitionData(partitionId, buffer->data_as<char>(), buffer->size());
+  return arrow::Status::OK();
+}
+
+arrow::Status RssPartitionWriter::doEvict(
+    uint32_t partitionId,
+    std::unique_ptr<InMemoryPayload> inMemoryPayload,
+    std::shared_ptr<arrow::Buffer> compressed) {
   rawPartitionLengths_[partitionId] += inMemoryPayload->rawSize();
   auto payloadType = codec_ ? Payload::Type::kCompressed : Payload::Type::kUncompressed;
   ARROW_ASSIGN_OR_RAISE(
-      auto payload, inMemoryPayload->toBlockPayload(payloadType, payloadPool_.get(), codec_ ? codec_.get() : nullptr));
+      auto payload,
+      inMemoryPayload->toBlockPayload(
+          payloadType, payloadPool_.get(), codec_ ? codec_.get() : nullptr, std::move(compressed)));
   // Copy payload to arrow buffered os.
   ARROW_ASSIGN_OR_RAISE(auto rssBufferOs, arrow::io::BufferOutputStream::Create(options_.pushBufferMaxSize, pool_));
   RETURN_NOT_OK(payload->serialize(rssBufferOs.get()));
@@ -72,12 +96,4 @@ arrow::Status RssPartitionWriter::evict(
       partitionId, reinterpret_cast<char*>(const_cast<uint8_t*>(buffer->data())), buffer->size());
   return arrow::Status::OK();
 }
-
-arrow::Status RssPartitionWriter::evict(uint32_t partitionId, std::unique_ptr<BlockPayload> blockPayload, bool) {
-  rawPartitionLengths_[partitionId] += blockPayload->rawSize();
-  ScopedTimer timer(&spillTime_);
-  ARROW_ASSIGN_OR_RAISE(auto buffer, blockPayload->readBufferAt(0));
-  bytesEvicted_[partitionId] += rssClient_->pushPartitionData(partitionId, buffer->data_as<char>(), buffer->size());
-  return arrow::Status::OK();
-}
 } // namespace gluten
diff --git a/cpp/core/shuffle/rss/RssPartitionWriter.h b/cpp/core/shuffle/rss/RssPartitionWriter.h
index 01602e336918..dbcbbbb88129 100644
--- a/cpp/core/shuffle/rss/RssPartitionWriter.h
+++ b/cpp/core/shuffle/rss/RssPartitionWriter.h
@@ -37,12 +37,17 @@ class RssPartitionWriter final : public PartitionWriter {
     init();
   }
 
-  arrow::Status evict(
+  arrow::Status hashEvict(
       uint32_t partitionId,
       std::unique_ptr<InMemoryPayload> inMemoryPayload,
       Evict::type evictType,
       bool reuseBuffers,
-      bool hasComplexType,
+      bool hasComplexType) override;
+
+  arrow::Status sortEvict(
+      uint32_t partitionId,
+      std::unique_ptr<InMemoryPayload> inMemoryPayload,
+      std::shared_ptr<arrow::Buffer> compressed,
       bool isFinal) override;
 
   arrow::Status evict(uint32_t partitionId, std::unique_ptr<BlockPayload> blockPayload, bool stop) override;
@@ -54,6 +59,11 @@ class RssPartitionWriter final : public PartitionWriter {
  private:
   void init();
 
+  arrow::Status doEvict(
+      uint32_t partitionId,
+      std::unique_ptr<InMemoryPayload> inMemoryPayload,
+      std::shared_ptr<arrow::Buffer> compressed);
+
   std::shared_ptr<RssClient> rssClient_;
 
   std::vector<int64_t> bytesEvicted_;
diff --git a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
index 00d8be16656e..49ed14245f94 100644
--- a/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxHashShuffleWriter.cc
@@ -954,7 +954,7 @@ arrow::Status VeloxHashShuffleWriter::evictBuffers(
   if (!buffers.empty()) {
     auto payload = std::make_unique<InMemoryPayload>(numRows, &isValidityBuffer_, std::move(buffers));
     RETURN_NOT_OK(
-        partitionWriter_->evict(partitionId, std::move(payload), Evict::kCache, reuseBuffers, hasComplexType_, false));
+        partitionWriter_->hashEvict(partitionId, std::move(payload), Evict::kCache, reuseBuffers, hasComplexType_));
   }
   return arrow::Status::OK();
 }
@@ -1114,8 +1114,9 @@ arrow::Status VeloxHashShuffleWriter::reclaimFixedSize(int64_t size, int64_t* ac
 
   int64_t reclaimed = 0;
   if (reclaimed < size) {
+    auto before = partitionBufferPool_->bytes_allocated();
     ARROW_ASSIGN_OR_RAISE(auto cached, evictCachedPayload(size - reclaimed));
-    reclaimed += cached;
+    reclaimed += cached + (before - partitionBufferPool_->bytes_allocated());
   }
   if (reclaimed < size && shrinkPartitionBuffersAfterSpill()) {
     ARROW_ASSIGN_OR_RAISE(auto shrunken, shrinkPartitionBuffersMinSize(size - reclaimed));
@@ -1365,7 +1366,7 @@ arrow::Result<int64_t> VeloxHashShuffleWriter::evictPartitionBuffersMinSize(int6
       ARROW_ASSIGN_OR_RAISE(auto buffers, assembleBuffers(pid, false));
       auto payload = std::make_unique<InMemoryPayload>(item.second, &isValidityBuffer_, std::move(buffers));
       metrics_.totalBytesToEvict += payload->rawSize();
-      RETURN_NOT_OK(partitionWriter_->evict(pid, std::move(payload), Evict::kSpill, false, hasComplexType_, false));
+      RETURN_NOT_OK(partitionWriter_->hashEvict(pid, std::move(payload), Evict::kSpill, false, hasComplexType_));
       evicted = beforeEvict - partitionBufferPool_->bytes_allocated();
       if (evicted >= size) {
         break;
diff --git a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
index d2fb5e7f577c..4b3475547ac0 100644
--- a/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxRssSortShuffleWriter.cc
@@ -119,7 +119,8 @@ arrow::Status VeloxRssSortShuffleWriter::evictBatch(uint32_t partitionId) {
   auto buffer = bufferOutputStream_->getBuffer();
   auto arrowBuffer = std::make_shared<arrow::Buffer>(buffer->as<uint8_t>(), buffer->size());
   ARROW_ASSIGN_OR_RAISE(
-      auto payload, BlockPayload::fromBuffers(Payload::kRaw, 0, {std::move(arrowBuffer)}, nullptr, nullptr, nullptr));
+      auto payload,
+      BlockPayload::fromBuffers(Payload::kRaw, 0, {std::move(arrowBuffer)}, nullptr, nullptr, nullptr, nullptr));
   RETURN_NOT_OK(partitionWriter_->evict(partitionId, std::move(payload), stopped_));
   batch_ = std::make_unique<facebook::velox::VectorStreamGroup>(veloxPool_.get(), serde_.get());
   batch_->createStreamTree(rowType_, options_.bufferSize, &serdeOptions_);
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.cc b/cpp/velox/shuffle/VeloxShuffleReader.cc
index 3966857b9e9d..4eec461e6532 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.cc
+++ b/cpp/velox/shuffle/VeloxShuffleReader.cc
@@ -314,7 +314,7 @@ std::shared_ptr<ColumnarBatch> VeloxHashShuffleReaderDeserializer::next() {
     uint32_t numRows;
     GLUTEN_ASSIGN_OR_THROW(
         auto arrowBuffers, BlockPayload::deserialize(in_.get(), codec_, memoryPool_, numRows, decompressTime_));
-    if (numRows == 0) {
+    if (arrowBuffers.empty()) {
       // Reach EOS.
       return nullptr;
     }
@@ -333,7 +333,7 @@ std::shared_ptr<ColumnarBatch> VeloxHashShuffleReaderDeserializer::next() {
   while (!merged_ || merged_->numRows() < batchSize_) {
     GLUTEN_ASSIGN_OR_THROW(
         arrowBuffers, BlockPayload::deserialize(in_.get(), codec_, memoryPool_, numRows, decompressTime_));
-    if (numRows == 0) {
+    if (arrowBuffers.empty()) {
       reachEos_ = true;
       break;
     }
@@ -403,16 +403,22 @@ std::shared_ptr<ColumnarBatch> VeloxSortShuffleReaderDeserializer::next() {
     GLUTEN_ASSIGN_OR_THROW(
         auto arrowBuffers, BlockPayload::deserialize(in_.get(), codec_, arrowPool_, numRows, decompressTime_));
 
-    if (numRows == 0) {
+    if (arrowBuffers.empty()) {
       reachEos_ = true;
       if (cachedRows_ > 0) {
         return deserializeToBatch();
       }
       return nullptr;
     }
-    auto buffer = std::move(arrowBuffers[0]);
-    cachedInputs_.emplace_back(numRows, wrapInBufferViewAsOwner(buffer->data(), buffer->size(), buffer));
-    cachedRows_ += numRows;
+
+    if (numRows > 0) {
+      auto buffer = std::move(arrowBuffers[0]);
+      cachedInputs_.emplace_back(numRows, wrapInBufferViewAsOwner(buffer->data(), buffer->size(), buffer));
+      cachedRows_ += numRows;
+    } else {
+      // numRows = 0 indicates that we read a segment of a large row.
+      readLargeRow(arrowBuffers);
+    }
   }
   return deserializeToBatch();
 }
@@ -451,6 +457,35 @@ std::shared_ptr<ColumnarBatch> VeloxSortShuffleReaderDeserializer::deserializeTo
   return std::make_shared<VeloxColumnarBatch>(std::move(rowVector));
 }
 
+void VeloxSortShuffleReaderDeserializer::readLargeRow(std::vector<std::shared_ptr<arrow::Buffer>>& arrowBuffers) {
+  // Cache the read segment.
+  std::vector<std::shared_ptr<arrow::Buffer>> buffers;
+  auto rowSize = *reinterpret_cast<RowSizeType*>(const_cast<uint8_t*>(arrowBuffers[0]->data()));
+  RowSizeType bufferSize = arrowBuffers[0]->size();
+  buffers.emplace_back(std::move(arrowBuffers[0]));
+  // Read and cache the remaining segments.
+  uint32_t numRows;
+  while (bufferSize < rowSize) {
+    GLUTEN_ASSIGN_OR_THROW(
+        arrowBuffers, BlockPayload::deserialize(in_.get(), codec_, arrowPool_, numRows, decompressTime_));
+    VELOX_DCHECK_EQ(numRows, 0);
+    bufferSize += arrowBuffers[0]->size();
+    buffers.emplace_back(std::move(arrowBuffers[0]));
+  }
+  VELOX_CHECK_EQ(bufferSize, rowSize);
+  // Merge all segments.
+  GLUTEN_ASSIGN_OR_THROW(std::shared_ptr<arrow::Buffer> rowBuffer, arrow::AllocateBuffer(rowSize, arrowPool_));
+  RowSizeType bytes = 0;
+  auto* dst = rowBuffer->mutable_data();
+  for (const auto& buffer : buffers) {
+    VELOX_DCHECK_NOT_NULL(buffer);
+    gluten::fastCopy(dst + bytes, buffer->data(), buffer->size());
+    bytes += buffer->size();
+  }
+  cachedInputs_.emplace_back(1, wrapInBufferViewAsOwner(rowBuffer->data(), rowSize, rowBuffer));
+  cachedRows_++;
+}
+
 class VeloxRssSortShuffleReaderDeserializer::VeloxInputStream : public facebook::velox::GlutenByteInputStream {
  public:
   VeloxInputStream(std::shared_ptr<arrow::io::InputStream> input, facebook::velox::BufferPtr buffer);
diff --git a/cpp/velox/shuffle/VeloxShuffleReader.h b/cpp/velox/shuffle/VeloxShuffleReader.h
index 2be913aa13a7..f7ff05c5d13e 100644
--- a/cpp/velox/shuffle/VeloxShuffleReader.h
+++ b/cpp/velox/shuffle/VeloxShuffleReader.h
@@ -83,6 +83,8 @@ class VeloxSortShuffleReaderDeserializer final : public ColumnarBatchIterator {
  private:
   std::shared_ptr<ColumnarBatch> deserializeToBatch();
 
+  void readLargeRow(std::vector<std::shared_ptr<arrow::Buffer>>& arrowBuffers);
+
   std::shared_ptr<arrow::io::InputStream> in_;
   std::shared_ptr<arrow::Schema> schema_;
   std::shared_ptr<arrow::util::Codec> codec_;
diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
index 2bfc4908d2f6..55aa739e7a0d 100644
--- a/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
+++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.cc
@@ -32,8 +32,6 @@ constexpr uint64_t kMaskLower40Bits = (1UL << 40) - 1;
 constexpr uint32_t kPartitionIdStartByteIndex = 5;
 constexpr uint32_t kPartitionIdEndByteIndex = 7;
 
-constexpr uint32_t kSortedBufferSize = 1 * 1024 * 1024;
-
 uint64_t toCompactRowId(uint32_t partitionId, uint32_t pageNumber, uint32_t offsetInPage) {
   // |63 partitionId(24) |39 inputIndex(13) |26 rowIndex(27) |
   return (uint64_t)partitionId << 40 | (uint64_t)pageNumber << 27 | offsetInPage;
@@ -106,8 +104,17 @@ arrow::Status VeloxSortShuffleWriter::init() {
       options_.partitioning == Partitioning::kSingle,
       arrow::Status::Invalid("VeloxSortShuffleWriter doesn't support single partition."));
   allocateMinimalArray();
-  sortedBuffer_ = facebook::velox::AlignedBuffer::allocate<char>(kSortedBufferSize, veloxPool_.get());
-  rawBuffer_ = sortedBuffer_->asMutable<uint8_t>();
+  // In Spark, sortedBuffer_ memory and compressionBuffer_ memory are pre-allocated and counted into executor
+  // memory overhead. To align with Spark, we use arrow::default_memory_pool() to avoid counting these memory in Gluten.
+  ARROW_ASSIGN_OR_RAISE(
+      sortedBuffer_, arrow::AllocateBuffer(options_.compressionBufferSize, arrow::default_memory_pool()));
+  rawBuffer_ = sortedBuffer_->mutable_data();
+  auto compressedBufferLength = partitionWriter_->getCompressedBufferLength(
+      {std::make_shared<arrow::Buffer>(rawBuffer_, options_.compressionBufferSize)});
+  if (compressedBufferLength.has_value()) {
+    ARROW_ASSIGN_OR_RAISE(
+        compressionBuffer_, arrow::AllocateBuffer(*compressedBufferLength, arrow::default_memory_pool()));
+  }
   return arrow::Status::OK();
 }
 
@@ -266,6 +273,7 @@ arrow::Status VeloxSortShuffleWriter::evictAllPartitions() {
 }
 
 arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_t begin, size_t end) {
+  VELOX_DCHECK(begin < end);
   // Count copy row time into sortTime_.
   Timer sortTime{};
   // Serialize [begin, end)
@@ -278,33 +286,51 @@ arrow::Status VeloxSortShuffleWriter::evictPartition(uint32_t partitionId, size_
     auto pageIndex = extractPageNumberAndOffset(arrayPtr_[index]);
     addr = pageAddresses_[pageIndex.first] + pageIndex.second;
     size = *(RowSizeType*)addr;
-    if (offset + size > kSortedBufferSize) {
+    if (offset + size > options_.compressionBufferSize && offset > 0) {
       sortTime.stop();
-      RETURN_NOT_OK(evictPartition0(partitionId, index - begin, offset));
+      RETURN_NOT_OK(evictPartition0(partitionId, index - begin, rawBuffer_, offset));
       sortTime.start();
       begin = index;
       offset = 0;
     }
-    gluten::fastCopy(rawBuffer_ + offset, addr, size);
-    offset += size;
+    if (size > options_.compressionBufferSize) {
+      // Split large rows.
+      sortTime.stop();
+      RowSizeType bytes = 0;
+      auto* buffer = reinterpret_cast<uint8_t*>(addr);
+      while (bytes < size) {
+        auto rawLength = std::min<RowSizeType>((uint32_t)options_.compressionBufferSize, size - bytes);
+        // Use numRows = 0 to represent a part of row.
+        RETURN_NOT_OK(evictPartition0(partitionId, 0, buffer + bytes, rawLength));
+        bytes += rawLength;
+      }
+      begin++;
+      sortTime.start();
+    } else {
+      // Copy small rows.
+      gluten::fastCopy(rawBuffer_ + offset, addr, size);
+      offset += size;
+    }
     index++;
   }
   sortTime.stop();
-  RETURN_NOT_OK(evictPartition0(partitionId, index - begin, offset));
-
+  if (offset > 0) {
+    VELOX_CHECK(index > begin);
+    RETURN_NOT_OK(evictPartition0(partitionId, index - begin, rawBuffer_, offset));
+  }
   sortTime_ += sortTime.realTimeUsed();
   return arrow::Status::OK();
 }
 
-arrow::Status VeloxSortShuffleWriter::evictPartition0(uint32_t partitionId, uint32_t numRows, int64_t rawLength) {
+arrow::Status
+VeloxSortShuffleWriter::evictPartition0(uint32_t partitionId, int32_t numRows, uint8_t* buffer, int64_t rawLength) {
   VELOX_CHECK(rawLength > 0);
   auto payload = std::make_unique<InMemoryPayload>(
       numRows,
       nullptr,
-      std::vector<std::shared_ptr<arrow::Buffer>>{std::make_shared<arrow::Buffer>(rawBuffer_, rawLength)});
+      std::vector<std::shared_ptr<arrow::Buffer>>{std::make_shared<arrow::Buffer>(buffer, rawLength)});
   updateSpillMetrics(payload);
-  RETURN_NOT_OK(
-      partitionWriter_->evict(partitionId, std::move(payload), Evict::type::kSortSpill, false, false, stopped_));
+  RETURN_NOT_OK(partitionWriter_->sortEvict(partitionId, std::move(payload), compressionBuffer_, stopped_));
   return arrow::Status::OK();
 }
 
diff --git a/cpp/velox/shuffle/VeloxSortShuffleWriter.h b/cpp/velox/shuffle/VeloxSortShuffleWriter.h
index 1626573a7dc6..531ed1fe3e76 100644
--- a/cpp/velox/shuffle/VeloxSortShuffleWriter.h
+++ b/cpp/velox/shuffle/VeloxSortShuffleWriter.h
@@ -77,7 +77,7 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter {
 
   arrow::Status evictPartition(uint32_t partitionId, size_t begin, size_t end);
 
-  arrow::Status evictPartition0(uint32_t partitionId, uint32_t numRows, int64_t rawLength);
+  arrow::Status evictPartition0(uint32_t partitionId, int32_t numRows, uint8_t* buffer, int64_t rawLength);
 
   uint32_t maxRowsToInsert(uint32_t offset, uint32_t remainingRows);
 
@@ -107,8 +107,9 @@ class VeloxSortShuffleWriter final : public VeloxShuffleWriter {
   // For debug.
   uint32_t currenPageSize_;
 
-  facebook::velox::BufferPtr sortedBuffer_;
+  std::unique_ptr<arrow::Buffer> sortedBuffer_;
   uint8_t* rawBuffer_;
+  std::shared_ptr<arrow::Buffer> compressionBuffer_{nullptr};
 
   // Row ID -> Partition ID
   // subscript: The index of row in the current input RowVector
diff --git a/cpp/velox/tests/VeloxShuffleWriterTest.cc b/cpp/velox/tests/VeloxShuffleWriterTest.cc
index 7cbfbcd79cc9..d8b2fb72fdd2 100644
--- a/cpp/velox/tests/VeloxShuffleWriterTest.cc
+++ b/cpp/velox/tests/VeloxShuffleWriterTest.cc
@@ -70,12 +70,17 @@ std::vector<ShuffleTestParams> createShuffleTestParams() {
   std::vector<int32_t> mergeBufferSizes = {0, 3, 4, 10, 4096};
 
   for (const auto& compression : compressions) {
-    for (auto useRadixSort : {true, false}) {
-      params.push_back(ShuffleTestParams{
-          ShuffleWriterType::kSortShuffle, PartitionWriterType::kLocal, compression, 0, 0, useRadixSort});
+    for (const auto compressionBufferSize : {4, 56, 32 * 1024}) {
+      for (auto useRadixSort : {true, false}) {
+        params.push_back(ShuffleTestParams{
+            .shuffleWriterType = ShuffleWriterType::kSortShuffle,
+            .partitionWriterType = PartitionWriterType::kLocal,
+            .compressionType = compression,
+            .compressionBufferSize = compressionBufferSize,
+            .useRadixSort = useRadixSort});
+      }
     }
-    params.push_back(
-        ShuffleTestParams{ShuffleWriterType::kRssSortShuffle, PartitionWriterType::kRss, compression, 0, 0, false});
+    params.push_back(ShuffleTestParams{ShuffleWriterType::kRssSortShuffle, PartitionWriterType::kRss, compression});
     for (const auto compressionThreshold : compressionThresholds) {
       for (const auto mergeBufferSize : mergeBufferSizes) {
         params.push_back(ShuffleTestParams{
@@ -83,11 +88,10 @@ std::vector<ShuffleTestParams> createShuffleTestParams() {
             PartitionWriterType::kLocal,
             compression,
             compressionThreshold,
-            mergeBufferSize,
-            false /* unused */});
+            mergeBufferSize});
       }
       params.push_back(ShuffleTestParams{
-          ShuffleWriterType::kHashShuffle, PartitionWriterType::kRss, compression, compressionThreshold, 0});
+          ShuffleWriterType::kHashShuffle, PartitionWriterType::kRss, compression, compressionThreshold});
     }
   }
 
@@ -588,8 +592,10 @@ TEST_F(VeloxHashShuffleWriterMemoryTest, kSplitSingle) {
 TEST_F(VeloxHashShuffleWriterMemoryTest, kStop) {
   for (const auto partitioning : {Partitioning::kSingle, Partitioning::kRoundRobin}) {
     ASSERT_NOT_OK(initShuffleWriterOptions());
-    shuffleWriterOptions_.partitioning = partitioning;
-    shuffleWriterOptions_.bufferSize = 4;
+    shuffleWriterOptions_.bufferSize = 4096;
+    // Force compression.
+    partitionWriterOptions_.compressionThreshold = 0;
+    partitionWriterOptions_.mergeThreshold = 0;
     auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false);
     auto shuffleWriter = createShuffleWriter(&pool);
 
@@ -597,19 +603,23 @@ TEST_F(VeloxHashShuffleWriterMemoryTest, kStop) {
 
     for (int i = 0; i < 10; ++i) {
       ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
-      ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector2_));
-      ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVector1_));
     }
+    // Reclaim bytes to shrink partition buffer.
+    int64_t reclaimed = 0;
+    ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(2000, &reclaimed));
+    ASSERT(reclaimed >= 2000);
 
     // Trigger spill during stop.
-    // For single partitioning, spill is triggered by allocating buffered output stream.
     ASSERT_TRUE(pool.checkEvict(pool.bytes_allocated(), [&] { ASSERT_NOT_OK(shuffleWriter->stop()); }));
   }
 }
 
 TEST_F(VeloxHashShuffleWriterMemoryTest, kStopComplex) {
   ASSERT_NOT_OK(initShuffleWriterOptions());
-  shuffleWriterOptions_.bufferSize = 4;
+  shuffleWriterOptions_.bufferSize = 4096;
+  // Force compression.
+  partitionWriterOptions_.compressionThreshold = 0;
+  partitionWriterOptions_.mergeThreshold = 0;
   auto pool = SelfEvictedMemoryPool(defaultArrowMemoryPool().get(), false);
   auto shuffleWriter = createShuffleWriter(&pool);
 
@@ -617,6 +627,10 @@ TEST_F(VeloxHashShuffleWriterMemoryTest, kStopComplex) {
   for (int i = 0; i < 3; ++i) {
     ASSERT_NOT_OK(splitRowVector(*shuffleWriter, inputVectorComplex_));
   }
+  // Reclaim bytes to shrink partition buffer.
+  int64_t reclaimed = 0;
+  ASSERT_NOT_OK(shuffleWriter->reclaimFixedSize(2000, &reclaimed));
+  ASSERT(reclaimed >= 2000);
 
   // Reclaim from PartitionWriter to free cached bytes.
   auto payloadSize = shuffleWriter->cachedPayloadSize();
diff --git a/cpp/velox/utils/tests/MemoryPoolUtils.cc b/cpp/velox/utils/tests/MemoryPoolUtils.cc
index a595d011952f..5a0ae03b1496 100644
--- a/cpp/velox/utils/tests/MemoryPoolUtils.cc
+++ b/cpp/velox/utils/tests/MemoryPoolUtils.cc
@@ -128,6 +128,7 @@ int64_t SelfEvictedMemoryPool::num_allocations() const {
 
 arrow::Status SelfEvictedMemoryPool::ensureCapacity(int64_t size) {
   VELOX_CHECK_NOT_NULL(evictable_);
+  DLOG(INFO) << "Size: " << size << ", capacity_: " << capacity_ << ", bytes allocated: " << pool_->bytes_allocated();
   if (size > capacity_ - pool_->bytes_allocated()) {
     // Self evict.
     int64_t actual;
diff --git a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
index 102c73ca49fa..d9a2c1e2eaa5 100644
--- a/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
+++ b/cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
@@ -66,15 +66,17 @@ struct ShuffleTestParams {
   ShuffleWriterType shuffleWriterType;
   PartitionWriterType partitionWriterType;
   arrow::Compression::type compressionType;
-  int32_t compressionThreshold;
-  int32_t mergeBufferSize;
-  bool useRadixSort;
+  int32_t compressionThreshold{0};
+  int32_t mergeBufferSize{0};
+  int32_t compressionBufferSize{0};
+  bool useRadixSort{false};
 
   std::string toString() const {
     std::ostringstream out;
     out << "shuffleWriterType = " << shuffleWriterType << ", partitionWriterType = " << partitionWriterType
         << ", compressionType = " << compressionType << ", compressionThreshold = " << compressionThreshold
-        << ", mergeBufferSize = " << mergeBufferSize << ", useRadixSort = " << (useRadixSort ? "true" : "false");
+        << ", mergeBufferSize = " << mergeBufferSize << ", compressionBufferSize = " << compressionBufferSize
+        << ", useRadixSort = " << (useRadixSort ? "true" : "false");
     return out.str();
   }
 };
@@ -255,6 +257,7 @@ class VeloxShuffleWriterTest : public ::testing::TestWithParam<ShuffleTestParams
 
     ShuffleTestParams params = GetParam();
     shuffleWriterOptions_.useRadixSort = params.useRadixSort;
+    shuffleWriterOptions_.compressionBufferSize = params.compressionBufferSize;
     partitionWriterOptions_.compressionType = params.compressionType;
     switch (partitionWriterOptions_.compressionType) {
       case arrow::Compression::UNCOMPRESSED:
diff --git a/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornColumnarShuffleWriter.scala b/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornColumnarShuffleWriter.scala
index 3f7c3586ced2..6b853ceb02c7 100644
--- a/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornColumnarShuffleWriter.scala
+++ b/gluten-celeborn/common/src/main/scala/org/apache/spark/shuffle/CelebornColumnarShuffleWriter.scala
@@ -99,6 +99,9 @@ abstract class CelebornColumnarShuffleWriter[K, V](
       customizedCompressionCodec,
       GlutenConfig.getConf.columnarShuffleCodecBackend.orNull)
 
+  protected val compressionBufferSize: Int =
+    GlutenShuffleUtils.getCompressionBufferSize(conf, customizedCompressionCodec)
+
   protected val bufferCompressThreshold: Int =
     GlutenConfig.getConf.columnarShuffleCompressionThreshold
 
diff --git a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
index b7a0beae704b..eead7a0de9af 100644
--- a/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
+++ b/gluten-celeborn/velox/src/main/scala/org/apache/spark/shuffle/VeloxCelebornColumnarShuffleWriter.scala
@@ -118,6 +118,7 @@ class VeloxCelebornColumnarShuffleWriter[K, V](
       nativeBufferSize,
       customizedCompressionCodec,
       compressionLevel,
+      compressionBufferSize,
       bufferCompressThreshold,
       GlutenConfig.getConf.columnarShuffleCompressionMode,
       conf.get(SHUFFLE_SORT_INIT_BUFFER_SIZE).toInt,
diff --git a/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java b/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java
index 23c7118afc6e..2ac634b4488a 100644
--- a/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java
+++ b/gluten-data/src/main/java/org/apache/gluten/vectorized/ShuffleWriterJniWrapper.java
@@ -59,6 +59,7 @@ public long make(
       String codec,
       String codecBackend,
       int compressionLevel,
+      int compressionBufferSize,
       int bufferCompressThreshold,
       String compressionMode,
       int sortBufferInitialSize,
@@ -80,6 +81,7 @@ public long make(
         codec,
         codecBackend,
         compressionLevel,
+        compressionBufferSize,
         bufferCompressThreshold,
         compressionMode,
         sortBufferInitialSize,
@@ -111,6 +113,7 @@ public long makeForRSS(
       int bufferSize,
       String codec,
       int compressionLevel,
+      int compressionBufferSize,
       int bufferCompressThreshold,
       String compressionMode,
       int sortBufferInitialSize,
@@ -133,6 +136,7 @@ public long makeForRSS(
         codec,
         null,
         compressionLevel,
+        compressionBufferSize,
         bufferCompressThreshold,
         compressionMode,
         sortBufferInitialSize,
@@ -160,6 +164,7 @@ public native long nativeMake(
       String codec,
       String codecBackend,
       int compressionLevel,
+      int compressionBufferSize,
       int bufferCompressThreshold,
       String compressionMode,
       int sortBufferInitialSize,
diff --git a/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala b/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
index 251bb977f324..3780b93a2091 100644
--- a/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
+++ b/gluten-data/src/main/scala/org/apache/spark/shuffle/ColumnarShuffleWriter.scala
@@ -91,6 +91,9 @@ class ColumnarShuffleWriter[K, V](
   private val compressionLevel =
     GlutenShuffleUtils.getCompressionLevel(conf, compressionCodec, compressionCodecBackend)
 
+  private val compressionBufferSize =
+    GlutenShuffleUtils.getCompressionBufferSize(conf, compressionCodec)
+
   private val bufferCompressThreshold =
     GlutenConfig.getConf.columnarShuffleCompressionThreshold
 
@@ -149,6 +152,7 @@ class ColumnarShuffleWriter[K, V](
             compressionCodec,
             compressionCodecBackend,
             compressionLevel,
+            compressionBufferSize,
             bufferCompressThreshold,
             GlutenConfig.getConf.columnarShuffleCompressionMode,
             conf.get(SHUFFLE_SORT_INIT_BUFFER_SIZE).toInt,
diff --git a/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala b/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala
index d55060872866..a65211d86a3f 100644
--- a/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala
+++ b/gluten-substrait/src/main/scala/org/apache/spark/shuffle/GlutenShuffleUtils.scala
@@ -91,6 +91,23 @@ object GlutenShuffleUtils {
     }
   }
 
+  def getCompressionBufferSize(conf: SparkConf, codec: String): Int = {
+    def checkAndGetBufferSize(entry: ConfigEntry[Long]): Int = {
+      val bufferSize = conf.get(entry).toInt
+      if (bufferSize < 4) {
+        throw new IllegalArgumentException(s"${entry.key} must be >= 4, got $bufferSize")
+      }
+      bufferSize
+    }
+    if ("lz4" == codec) {
+      checkAndGetBufferSize(IO_COMPRESSION_LZ4_BLOCKSIZE)
+    } else if ("zstd" == codec) {
+      checkAndGetBufferSize(IO_COMPRESSION_ZSTD_BUFFERSIZE)
+    } else {
+      GlutenConfig.GLUTEN_SHUFFLE_DEFUALT_COMPRESSION_BUFFER_SIZE
+    }
+  }
+
   def getReaderParam[K, C](
       handle: ShuffleHandle,
       startMapIndex: Int,
diff --git a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
index 5a5bf5caa30c..28261bda3f9d 100644
--- a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
+++ b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java
@@ -67,6 +67,7 @@ public class VeloxUniffleColumnarShuffleWriter<K, V> extends RssShuffleWriter<K,
   private final double reallocThreshold = GlutenConfig.getConf().columnarShuffleReallocThreshold();
   private String compressionCodec;
   private int compressionLevel;
+  private int compressionBufferSize;
   private final int partitionId;
 
   private final Runtime runtime = Runtimes.contextInstance("UniffleShuffleWriter");
@@ -125,6 +126,8 @@ public VeloxUniffleColumnarShuffleWriter(
               sparkConf,
               compressionCodec,
               GlutenConfig.getConf().columnarShuffleCodecBackend().getOrElse(() -> null));
+      compressionBufferSize =
+          GlutenShuffleUtils.getCompressionBufferSize(sparkConf, compressionCodec);
     }
   }
 
@@ -150,6 +153,7 @@ protected void writeImpl(Iterator<Product2<K, V>> records) {
                   // use field do this
                   compressionCodec,
                   compressionLevel,
+                  compressionBufferSize,
                   compressThreshold,
                   GlutenConfig.getConf().columnarShuffleCompressionMode(),
                   (int) (long) sparkConf.get(package$.MODULE$.SHUFFLE_SORT_INIT_BUFFER_SIZE()),
diff --git a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
index 27ce1ec36379..df4b64c74ea0 100644
--- a/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
+++ b/shims/common/src/main/scala/org/apache/gluten/GlutenConfig.scala
@@ -588,8 +588,8 @@ object GlutenConfig {
 
   // Shuffle Writer buffer size.
   val GLUTEN_SHUFFLE_WRITER_BUFFER_SIZE = "spark.gluten.shuffleWriter.bufferSize"
-
   val GLUTEN_SHUFFLE_WRITER_MERGE_THRESHOLD = "spark.gluten.sql.columnar.shuffle.merge.threshold"
+  val GLUTEN_SHUFFLE_DEFUALT_COMPRESSION_BUFFER_SIZE = 32 * 1024
 
   // Controls whether to load DLL from jars. User can get dependent native libs packed into a jar
   // by executing dev/package.sh. Then, with that jar configured, Gluten can load the native libs