Skip to content

Commit

Permalink
fix(core): fix potential infinite recursion on reading empty segment (#…
Browse files Browse the repository at this point in the history
…2231)

Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh authored Dec 17, 2024
1 parent 370f947 commit 0c5a2db
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ private CompletableFuture<Void> fetch0(FetchContext context, long startOffset, l
}
readSize += recordBatchWithContext.rawPayload().remaining();
}
if (readSize == 0) {
return CompletableFuture.completedFuture(null);
}
return fetch0(context, nextFetchOffset, endOffset, maxSize - readSize, results);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,10 @@ public CompletableFuture<FetchDataInfo> readAsync(long startOffset, int maxSize,
return CompletableFuture.failedFuture(new IllegalArgumentException("Invalid max size " + maxSize + " for log read from segment " + log));
// Note that relativePositionInSegment here is a fake value. There are no 'position' in elastic streams.

// if the start offset is less than base offset, use base offset. This usually happens when the prev segment is generated
// by compaction and its last offset is less than the base offset of the current segment.
startOffset = Utils.max(startOffset, baseOffset);

// if the start offset is already off the end of the log, return null
if (startOffset >= log.nextOffset()) {
return CompletableFuture.completedFuture(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import org.apache.kafka.common.Uuid
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.storage.internals.log.{FetchIsolation, LogConfig, LogDirFailureChannel, LogOffsetsListener}
import org.junit.jupiter.api.{Assertions, BeforeEach, Tag, Test}
import org.junit.jupiter.api.{Assertions, BeforeEach, Tag, Test, Timeout}

import java.io.File
import java.util.Properties
Expand Down Expand Up @@ -80,6 +80,51 @@ class ElasticLogCleanerTest extends LogCleanerTest {
}
}

@Test
@Timeout(value = 30)
def testCleanSegmentCauseHollowWithEmptySegment(): Unit = {
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
logProps.put(TopicConfig.SEGMENT_BYTES_CONFIG, 1024: java.lang.Integer)

val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
log.appendAsLeader(record(1, 0), leaderEpoch = 0)
log.appendAsLeader(record(2, 0), leaderEpoch = 0)
while (log.numberOfSegments < 2) {
log.appendAsLeader(record(1, log.logEndOffset.toInt), leaderEpoch = 0)
}
while (log.numberOfSegments < 3) {
log.appendAsLeader(record(3, 22), leaderEpoch = 0)
}
log.appendAsLeader(record(1, log.logEndOffset.toInt), leaderEpoch = 0)
log.appendAsLeader(record(3, log.logEndOffset.toInt), leaderEpoch = 0)

val map = new FakeOffsetMap(Int.MaxValue)
map.put(key(2L), 1L)
map.put(key(1L), log.logEndOffset - 2)
map.put(key(3L), log.logEndOffset - 1)

// create an empty segment in between first and last segment
cleaner.cleanSegments(log, log.logSegments.asScala.take(1).toSeq, map, 0L, new CleanerStats, new CleanedTransactionMetadata, -1)
cleaner.cleanSegments(log, log.logSegments.asScala.slice(1, 2).toSeq, map, 0L, new CleanerStats, new CleanedTransactionMetadata, -1)

log.logSegments.asScala.slice(1, 2).foreach(s => {
Assertions.assertEquals(0, s.size())
})

var offset = 0L
var total = 0
while (offset < log.logEndOffset) {
val rst = log.read(offset, 1, FetchIsolation.LOG_END, minOneMessage = true)
Assertions.assertNotNull(rst)
rst.records.batches.forEach(b => {
total += 1
offset = b.nextOffset()
})
}
Assertions.assertEquals(4, total)
}

override protected def makeLog(dir: File,
config: LogConfig,
recoveryPoint: Long): ElasticUnifiedLog = {
Expand Down

0 comments on commit 0c5a2db

Please sign in to comment.