Skip to content

Commit

Permalink
[GLUTEN-6656][UNIFFLE] VeloxUniffleColumnarShuffleWriter should send …
Browse files Browse the repository at this point in the history
…commit for all ColumnBatch with empty rows (#6698)
  • Loading branch information
SteNicholas authored Aug 5, 2024
1 parent 01a44d9 commit 809fef6
Showing 1 changed file with 14 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,8 @@ public VeloxUniffleColumnarShuffleWriter(

@Override
protected void writeImpl(Iterator<Product2<K, V>> records) {
if (!records.hasNext() && !isMemoryShuffleEnabled) {
super.sendCommit();
if (!records.hasNext()) {
sendCommit();
return;
}
// writer already init
Expand Down Expand Up @@ -189,11 +189,13 @@ public long spill(MemoryTarget self, Spiller.Phase phase, long size) {
}
}

long startTime = System.nanoTime();
LOG.info("nativeShuffleWriter value {}", nativeShuffleWriter);
// If all of the ColumnarBatch have empty rows, the nativeShuffleWriter still equals -1
if (nativeShuffleWriter == -1L) {
throw new IllegalStateException("nativeShuffleWriter should not be -1L");
sendCommit();
return;
}
long startTime = System.nanoTime();
SplitResult splitResult;
try {
splitResult = jniWrapper.stop(nativeShuffleWriter);
Expand All @@ -219,16 +221,21 @@ public long spill(MemoryTarget self, Spiller.Phase phase, long size) {
long pushMergedDataTime = System.nanoTime();
// clear all
sendRestBlockAndWait();
if (!isMemoryShuffleEnabled) {
super.sendCommit();
}
sendCommit();
long writeDurationMs = System.nanoTime() - pushMergedDataTime;
shuffleWriteMetrics.incWriteTime(writeDurationMs);
LOG.info(
"Finish write shuffle with rest write {} ms",
TimeUnit.MILLISECONDS.toNanos(writeDurationMs));
}

@Override
protected void sendCommit() {
if (!isMemoryShuffleEnabled) {
super.sendCommit();
}
}

@Override
public Option<MapStatus> stop(boolean success) {
if (!stopping) {
Expand Down

0 comments on commit 809fef6

Please sign in to comment.