From 809fef6aae21139d0ed690fed72662383ef52235 Mon Sep 17 00:00:00 2001 From: Nicholas Jiang Date: Mon, 5 Aug 2024 09:26:24 +0800 Subject: [PATCH] [GLUTEN-6656][UNIFFLE] VeloxUniffleColumnarShuffleWriter should send commit for all ColumnBatch with empty rows (#6698) --- .../VeloxUniffleColumnarShuffleWriter.java | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java index a80e34fb1d99..d2032fa48564 100644 --- a/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java +++ b/gluten-uniffle/velox/src/main/java/org/apache/spark/shuffle/writer/VeloxUniffleColumnarShuffleWriter.java @@ -126,8 +126,8 @@ public VeloxUniffleColumnarShuffleWriter( @Override protected void writeImpl(Iterator> records) { - if (!records.hasNext() && !isMemoryShuffleEnabled) { - super.sendCommit(); + if (!records.hasNext()) { + sendCommit(); return; } // writer already init @@ -189,11 +189,13 @@ public long spill(MemoryTarget self, Spiller.Phase phase, long size) { } } - long startTime = System.nanoTime(); LOG.info("nativeShuffleWriter value {}", nativeShuffleWriter); + // If all of the ColumnarBatch have empty rows, the nativeShuffleWriter still equals -1 if (nativeShuffleWriter == -1L) { - throw new IllegalStateException("nativeShuffleWriter should not be -1L"); + sendCommit(); + return; } + long startTime = System.nanoTime(); SplitResult splitResult; try { splitResult = jniWrapper.stop(nativeShuffleWriter); @@ -219,9 +221,7 @@ public long spill(MemoryTarget self, Spiller.Phase phase, long size) { long pushMergedDataTime = System.nanoTime(); // clear all sendRestBlockAndWait(); - if (!isMemoryShuffleEnabled) { - super.sendCommit(); - } + sendCommit(); long writeDurationMs = System.nanoTime() - pushMergedDataTime; shuffleWriteMetrics.incWriteTime(writeDurationMs); LOG.info( @@ -229,6 +229,13 @@ public long spill(MemoryTarget self, Spiller.Phase phase, long size) { TimeUnit.MILLISECONDS.toNanos(writeDurationMs)); } + @Override + protected void sendCommit() { + if (!isMemoryShuffleEnabled) { + super.sendCommit(); + } + } + @Override public Option stop(boolean success) { if (!stopping) {