Skip to content

Commit

Permalink
[CH][CELEBORN] CHCelebornColumnarBatchSerializer uses AtomicBoolean t…
Browse files Browse the repository at this point in the history
…o identify whether to call close() to avoid calling close() twice situation (apache#6455)

[CH][CELEBORN] CHCelebornColumnarBatchSerializer uses AtomicBoolean to identify whether to call close() to avoid calling close() twice situation
  • Loading branch information
SteNicholas authored Jul 15, 2024
1 parent a8dd354 commit a31df44
Showing 1 changed file with 14 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.celeborn.client.read.CelebornInputStream
import java.io._
import java.nio.ByteBuffer
import java.util.Locale
import java.util.concurrent.atomic.AtomicBoolean

import scala.reflect.ClassTag

Expand Down Expand Up @@ -74,7 +75,8 @@ private class CHCelebornColumnarBatchSerializerInstance(
private var numBatchesTotal: Long = _
private var numRowsTotal: Long = _

private var isClosed: Boolean = false
// Otherwise calling close() twice would cause replication of metrics.
private val closeCalled: AtomicBoolean = new AtomicBoolean(false)

override def asIterator: Iterator[Any] = {
// This method is never called by shuffle code.
Expand Down Expand Up @@ -153,18 +155,18 @@ private class CHCelebornColumnarBatchSerializerInstance(
}

override def close(): Unit = {
if (!isClosed) {
if (numBatchesTotal > 0) {
readBatchNumRows.set(numRowsTotal.toDouble / numBatchesTotal)
}
numOutputRows += numRowsTotal
if (cb != null) {
cb.close()
cb = null
}
closeReader()
isClosed = true
if (!closeCalled.compareAndSet(false, true)) {
return
}
if (numBatchesTotal > 0) {
readBatchNumRows.set(numRowsTotal.toDouble / numBatchesTotal)
}
numOutputRows += numRowsTotal
if (cb != null) {
cb.close()
cb = null
}
closeReader()
}

def getReader: CHStreamReader = {
Expand Down

0 comments on commit a31df44

Please sign in to comment.