From 3717a72803043e1f3820d1bc75164fca17053d28 Mon Sep 17 00:00:00 2001 From: Ning Yu Date: Mon, 19 Feb 2024 16:30:17 +0800 Subject: [PATCH 1/6] feat: handle out of order records during recovery Signed-off-by: Ning Yu --- .../java/com/automq/stream/s3/S3Storage.java | 34 +++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index df7547598..6f4a86507 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -41,6 +41,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collections; +import java.util.Comparator; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; @@ -48,6 +49,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.PriorityQueue; import java.util.Queue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; @@ -142,6 +144,7 @@ static LogCache.LogCacheBlock recoverContinuousRecords(Iterator streamNextOffsets = new HashMap<>(); + Map> streamOutOfOrderRecords = new HashMap<>(); while (it.hasNext()) { WriteAheadLog.RecoverResult recoverResult = it.next(); logEndOffset = recoverResult.recordOffset(); @@ -159,15 +162,42 @@ static LogCache.LogCacheBlock recoverContinuousRecords(Iterator outOfOrderRecords = streamOutOfOrderRecords.get(streamId); if (expectNextOffset == null || expectNextOffset == streamRecordBatch.getBaseOffset()) { + // continuous record, put it into cache. cacheBlock.put(streamRecordBatch); - streamNextOffsets.put(streamRecordBatch.getStreamId(), streamRecordBatch.getLastOffset()); + expectNextOffset = streamRecordBatch.getLastOffset(); + // try to put out of order records into cache. + if (outOfOrderRecords != null) { + while (!outOfOrderRecords.isEmpty()) { + StreamRecordBatch peek = outOfOrderRecords.peek(); + if (peek.getBaseOffset() == expectNextOffset) { + cacheBlock.put(peek); + outOfOrderRecords.poll(); + expectNextOffset = peek.getLastOffset(); + } else { + break; + } + } + } + // update next offset. + streamNextOffsets.put(streamRecordBatch.getStreamId(), expectNextOffset); } else { + // unexpected record, put it into out of order records. + if (outOfOrderRecords == null) { + outOfOrderRecords = new PriorityQueue<>(Comparator.comparingLong(StreamRecordBatch::getBaseOffset)); + streamOutOfOrderRecords.put(streamId, outOfOrderRecords); + } + outOfOrderRecords.add(streamRecordBatch); + // TODO update log logger.error("unexpected WAL record, streamId={}, expectNextOffset={}, record={}", streamId, expectNextOffset, streamRecordBatch); - streamRecordBatch.release(); } } + // release all out of order records. + streamOutOfOrderRecords.values().forEach(queue -> queue.forEach(StreamRecordBatch::release)); + if (logEndOffset >= 0L) { cacheBlock.confirmOffset(logEndOffset); } From e7e902ff85acdecec76822707202a03ef83d8923 Mon Sep 17 00:00:00 2001 From: Ning Yu Date: Mon, 19 Feb 2024 16:39:32 +0800 Subject: [PATCH 2/6] feat: log it Signed-off-by: Ning Yu --- .../main/java/com/automq/stream/s3/S3Storage.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index 6f4a86507..ef28960d5 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -174,6 +174,8 @@ static LogCache.LogCacheBlock recoverContinuousRecords(Iterator queue.forEach(StreamRecordBatch::release)); + streamOutOfOrderRecords.values().forEach(queue -> { + if (queue.isEmpty()) { + return; + } + logger.info("drop discontinuous records, records={}", queue); + queue.forEach(StreamRecordBatch::release); + }); if (logEndOffset >= 0L) { cacheBlock.confirmOffset(logEndOffset); From 8322294d38b7721e718680196fcae611cdc1fd4f Mon Sep 17 00:00:00 2001 From: Ning Yu Date: Mon, 19 Feb 2024 16:47:39 +0800 Subject: [PATCH 3/6] test: wip Signed-off-by: Ning Yu --- .../src/main/java/com/automq/stream/s3/S3Storage.java | 2 +- .../test/java/com/automq/stream/s3/S3StorageTest.java | 11 ++++++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index ef28960d5..6a8695b81 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -175,7 +175,7 @@ static LogCache.LogCacheBlock recoverContinuousRecords(Iterator openingStreams = List.of(new StreamMetadata(233L, 0L, 0L, 11L, StreamState.OPENED)); - LogCache.LogCacheBlock cacheBlock = storage.recoverContinuousRecords(recoverResults.iterator(), openingStreams); + LogCache.LogCacheBlock cacheBlock = S3Storage.recoverContinuousRecords(recoverResults.iterator(), openingStreams); // ignore closed stream and noncontinuous records. assertEquals(1, cacheBlock.records().size()); List streamRecords = cacheBlock.records().get(233L); @@ -231,18 +231,23 @@ public void testRecoverContinuousRecords() { assertEquals(11L, streamRecords.get(0).getBaseOffset()); assertEquals(12L, streamRecords.get(1).getBaseOffset()); - // + // simulate data loss openingStreams = List.of( new StreamMetadata(233L, 0L, 0L, 5L, StreamState.OPENED)); boolean exception = false; try { - storage.recoverContinuousRecords(recoverResults.iterator(), openingStreams); + S3Storage.recoverContinuousRecords(recoverResults.iterator(), openingStreams); } catch (IllegalStateException e) { exception = true; } Assertions.assertTrue(exception); } + @Test + public void testRecoverOutOfOrderRecords() { + // TODO + } + @Test public void testWALOverCapacity() throws WriteAheadLog.OverCapacityException { storage.append(newRecord(233L, 10L)); From b884c41b5041f74b4e8fb3251833e5554dc8cf53 Mon Sep 17 00:00:00 2001 From: Ning Yu Date: Mon, 19 Feb 2024 16:49:40 +0800 Subject: [PATCH 4/6] refactor: rename `OutOfOrder` to `Discontinuous` Signed-off-by: Ning Yu --- .../java/com/automq/stream/s3/S3Storage.java | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index 6a8695b81..a0bd69e86 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -144,7 +144,7 @@ static LogCache.LogCacheBlock recoverContinuousRecords(Iterator streamNextOffsets = new HashMap<>(); - Map> streamOutOfOrderRecords = new HashMap<>(); + Map> streamDiscontinuousRecords = new HashMap<>(); while (it.hasNext()) { WriteAheadLog.RecoverResult recoverResult = it.next(); logEndOffset = recoverResult.recordOffset(); @@ -164,20 +164,20 @@ static LogCache.LogCacheBlock recoverContinuousRecords(Iterator outOfOrderRecords = streamOutOfOrderRecords.get(streamId); + Queue discontinuousRecords = streamDiscontinuousRecords.get(streamId); if (expectNextOffset == null || expectNextOffset == streamRecordBatch.getBaseOffset()) { // continuous record, put it into cache. cacheBlock.put(streamRecordBatch); expectNextOffset = streamRecordBatch.getLastOffset(); - // try to put out of order records into cache. - if (outOfOrderRecords != null) { - while (!outOfOrderRecords.isEmpty()) { - StreamRecordBatch peek = outOfOrderRecords.peek(); + // check if there are some out of order records in the queue. + if (discontinuousRecords != null) { + while (!discontinuousRecords.isEmpty()) { + StreamRecordBatch peek = discontinuousRecords.peek(); if (peek.getBaseOffset() == expectNextOffset) { // should never happen, log it. logger.error("[BUG] recover an out of order record, streamId={}, expectNextOffset={}, record={}", streamId, expectNextOffset, peek); cacheBlock.put(peek); - outOfOrderRecords.poll(); + discontinuousRecords.poll(); expectNextOffset = peek.getLastOffset(); } else { break; @@ -187,16 +187,16 @@ static LogCache.LogCacheBlock recoverContinuousRecords(Iterator(Comparator.comparingLong(StreamRecordBatch::getBaseOffset)); - streamOutOfOrderRecords.put(streamId, outOfOrderRecords); + // unexpected record, put it into discontinuous records queue. + if (discontinuousRecords == null) { + discontinuousRecords = new PriorityQueue<>(Comparator.comparingLong(StreamRecordBatch::getBaseOffset)); + streamDiscontinuousRecords.put(streamId, discontinuousRecords); } - outOfOrderRecords.add(streamRecordBatch); + discontinuousRecords.add(streamRecordBatch); } } - // release all out of order records. - streamOutOfOrderRecords.values().forEach(queue -> { + // release all discontinuous records. + streamDiscontinuousRecords.values().forEach(queue -> { if (queue.isEmpty()) { return; } From e93e63c836c0b5a24f16c18f4efb7556145eccc0 Mon Sep 17 00:00:00 2001 From: Ning Yu Date: Mon, 19 Feb 2024 17:05:09 +0800 Subject: [PATCH 5/6] test: test `recoverContinuousRecords` Signed-off-by: Ning Yu --- .../com/automq/stream/s3/S3StorageTest.java | 22 ++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/s3stream/src/test/java/com/automq/stream/s3/S3StorageTest.java b/s3stream/src/test/java/com/automq/stream/s3/S3StorageTest.java index dc740ac47..3413b65c9 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/S3StorageTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/S3StorageTest.java @@ -245,7 +245,27 @@ public void testRecoverContinuousRecords() { @Test public void testRecoverOutOfOrderRecords() { - // TODO + List recoverResults = List.of( + new TestRecoverResult(StreamRecordBatchCodec.encode(newRecord(42L, 9L))), + new TestRecoverResult(StreamRecordBatchCodec.encode(newRecord(42L, 10L))), + new TestRecoverResult(StreamRecordBatchCodec.encode(newRecord(42L, 13L))), + new TestRecoverResult(StreamRecordBatchCodec.encode(newRecord(42L, 11L))), + new TestRecoverResult(StreamRecordBatchCodec.encode(newRecord(42L, 12L))), + new TestRecoverResult(StreamRecordBatchCodec.encode(newRecord(42L, 14L))), + new TestRecoverResult(StreamRecordBatchCodec.encode(newRecord(42L, 20L))) + ); + + List openingStreams = List.of(new StreamMetadata(42L, 0L, 0L, 10L, StreamState.OPENED)); + LogCache.LogCacheBlock cacheBlock = S3Storage.recoverContinuousRecords(recoverResults.iterator(), openingStreams); + // ignore closed stream and noncontinuous records. + assertEquals(1, cacheBlock.records().size()); + List streamRecords = cacheBlock.records().get(42L); + assertEquals(5, streamRecords.size()); + assertEquals(10L, streamRecords.get(0).getBaseOffset()); + assertEquals(11L, streamRecords.get(1).getBaseOffset()); + assertEquals(12L, streamRecords.get(2).getBaseOffset()); + assertEquals(13L, streamRecords.get(3).getBaseOffset()); + assertEquals(14L, streamRecords.get(4).getBaseOffset()); } @Test From 867995293e3494f26f5cfac2c04f92c520b1cf3e Mon Sep 17 00:00:00 2001 From: Ning Yu Date: Mon, 19 Feb 2024 17:51:58 +0800 Subject: [PATCH 6/6] docs: java doc of `recoverContinuousRecords` Signed-off-by: Ning Yu --- .../java/com/automq/stream/s3/S3Storage.java | 26 +++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java index a0bd69e86..9c2d544f5 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/S3Storage.java @@ -138,6 +138,32 @@ static LogCache.LogCacheBlock recoverContinuousRecords(Iterator + *
  • the records that are not in the opening streams
  • + *
  • the records that have been committed
  • + *
  • the records that are not continuous, which means, all records after the first discontinuous record
  • + * + * + * It throws {@link IllegalStateException} if the start offset of the first recovered record mismatches + * the end offset of any opening stream, which indicates data loss. + *

    + * If there are out of order records (which should never happen or there is a BUG), it will try to re-order them. + *

    + * For example, if we recover following records from the WAL in a stream: + *

        1, 2, 3, 5, 4, 6, 10, 11
    + * and the {@link StreamMetadata#endOffset()} of this stream is 3. Then the returned {@link LogCache.LogCacheBlock} + * will contain records + *
        3, 4, 5, 6
    + * Here, + *
      + *
    • The record 1 and 2 are discarded because they have been committed (less than 3, the end offset of the stream)
    • + *
    • The record 10 and 11 are discarded because they are not continuous (10 is not 7, the next offset of 6)
    • + *
    • The record 5 and 4 are reordered because they are out of order, and we handle this bug here
    • + *
    + */ static LogCache.LogCacheBlock recoverContinuousRecords(Iterator it, List openingStreams, Logger logger) { Map openingStreamEndOffsets = openingStreams.stream().collect(Collectors.toMap(StreamMetadata::streamId, StreamMetadata::endOffset));