Skip to content

Commit

Permalink
fix(s3stream): fix compaction exit on empty stream metadata list (#2095)
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh committed Nov 7, 2024
1 parent c098ee3 commit bd0b029
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -228,10 +228,6 @@ public CompletableFuture<Void> compact() {
return CompletableFuture.failedFuture(e);
}
return this.streamManager.getStreams(new ArrayList<>(streamIds)).thenAcceptAsync(streamMetadataList -> {
if (streamMetadataList.isEmpty()) {
logger.info("No stream metadata found for stream set objects");
return;
}
filterInvalidStreamDataBlocks(streamMetadataList);
this.compact(streamMetadataList, objectMetadataList);
}, compactThreadPool);
Expand Down Expand Up @@ -422,10 +418,6 @@ public CompletableFuture<Void> forceSplitAll() {
List<Long> streamIds = streamDataBlockMap.values().stream().flatMap(Collection::stream)
.map(StreamDataBlock::getStreamId).distinct().collect(Collectors.toList());
this.streamManager.getStreams(streamIds).thenAcceptAsync(streamMetadataList -> {
if (streamMetadataList.isEmpty()) {
logger.info("No stream metadata found for stream set objects");
return;
}
filterInvalidStreamDataBlocks(streamMetadataList);
forceSplitObjects(streamMetadataList, objectMetadataList);
cf.complete(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,72 @@ public void testForceSplitWithOutDatedObject() {
Assertions.assertTrue(request.getStreamRanges().isEmpty());
}


@Test
public void testForceSplitWithNonExistStream() {
List<StreamMetadata> streamMetadataList = this.streamManager.getStreams(Collections.emptyList()).join();
streamMetadataList = streamMetadataList.stream().filter(s -> s.streamId() != STREAM_0).collect(Collectors.toList());
List<S3ObjectMetadata> s3ObjectMetadata = this.objectManager.getServerObjects().join();
when(config.streamSetObjectCompactionForceSplitPeriod()).thenReturn(0);
compactionManager = new CompactionManager(config, objectManager, streamManager, objectStorage);

compactionManager.updateStreamDataBlockMap(List.of(s3ObjectMetadata.get(0)));
compactionManager.filterInvalidStreamDataBlocks(streamMetadataList);
CommitStreamSetObjectRequest request = compactionManager.buildSplitRequest(streamMetadataList, s3ObjectMetadata.get(0));
Assertions.assertEquals(-1, request.getObjectId());
Assertions.assertEquals(List.of(OBJECT_0), request.getCompactedObjectIds());
Assertions.assertEquals(2, request.getStreamObjects().size());
Assertions.assertTrue(checkDataIntegrity(streamMetadataList, Collections.singletonList(s3ObjectMetadata.get(0)), request));

compactionManager.updateStreamDataBlockMap(List.of(s3ObjectMetadata.get(1)));
compactionManager.filterInvalidStreamDataBlocks(streamMetadataList);
request = compactionManager.buildSplitRequest(streamMetadataList, s3ObjectMetadata.get(1));
Assertions.assertEquals(-1, request.getObjectId());
Assertions.assertEquals(List.of(OBJECT_1), request.getCompactedObjectIds());
Assertions.assertEquals(1, request.getStreamObjects().size());
Assertions.assertTrue(checkDataIntegrity(streamMetadataList, Collections.singletonList(s3ObjectMetadata.get(1)), request));

compactionManager.updateStreamDataBlockMap(List.of(s3ObjectMetadata.get(2)));
compactionManager.filterInvalidStreamDataBlocks(streamMetadataList);
request = compactionManager.buildSplitRequest(streamMetadataList, s3ObjectMetadata.get(2));
Assertions.assertEquals(-1, request.getObjectId());
Assertions.assertEquals(List.of(OBJECT_2), request.getCompactedObjectIds());
Assertions.assertEquals(2, request.getStreamObjects().size());
Assertions.assertTrue(checkDataIntegrity(streamMetadataList, Collections.singletonList(s3ObjectMetadata.get(2)), request));
}

@Test
public void testForceSplitWithEmptyStreamList() {
List<StreamMetadata> streamMetadataList = Collections.emptyList();
List<S3ObjectMetadata> s3ObjectMetadata = this.objectManager.getServerObjects().join();
when(config.streamSetObjectCompactionForceSplitPeriod()).thenReturn(0);
compactionManager = new CompactionManager(config, objectManager, streamManager, objectStorage);

compactionManager.updateStreamDataBlockMap(List.of(s3ObjectMetadata.get(0)));
compactionManager.filterInvalidStreamDataBlocks(streamMetadataList);
CommitStreamSetObjectRequest request = compactionManager.buildSplitRequest(streamMetadataList, s3ObjectMetadata.get(0));
Assertions.assertEquals(-1, request.getObjectId());
Assertions.assertEquals(List.of(OBJECT_0), request.getCompactedObjectIds());
Assertions.assertTrue(request.getStreamObjects().isEmpty());
Assertions.assertTrue(request.getStreamRanges().isEmpty());

compactionManager.updateStreamDataBlockMap(List.of(s3ObjectMetadata.get(1)));
compactionManager.filterInvalidStreamDataBlocks(streamMetadataList);
request = compactionManager.buildSplitRequest(streamMetadataList, s3ObjectMetadata.get(1));
Assertions.assertEquals(-1, request.getObjectId());
Assertions.assertEquals(List.of(OBJECT_1), request.getCompactedObjectIds());
Assertions.assertTrue(request.getStreamObjects().isEmpty());
Assertions.assertTrue(request.getStreamRanges().isEmpty());

compactionManager.updateStreamDataBlockMap(List.of(s3ObjectMetadata.get(2)));
compactionManager.filterInvalidStreamDataBlocks(streamMetadataList);
request = compactionManager.buildSplitRequest(streamMetadataList, s3ObjectMetadata.get(2));
Assertions.assertEquals(-1, request.getObjectId());
Assertions.assertEquals(List.of(OBJECT_2), request.getCompactedObjectIds());
Assertions.assertTrue(request.getStreamObjects().isEmpty());
Assertions.assertTrue(request.getStreamRanges().isEmpty());
}

@Test
public void testForceSplitWithException() {
S3AsyncClient s3AsyncClient = Mockito.mock(S3AsyncClient.class);
Expand Down Expand Up @@ -462,6 +528,21 @@ public void testCompactWithNonExistStream() {
Assertions.assertTrue(checkDataIntegrity(streamMetadataList, S3_WAL_OBJECT_METADATA_LIST, request));
}

@Test
public void testCompactWithEmptyStream() {
compactionManager = new CompactionManager(config, objectManager, streamManager, objectStorage);
List<StreamMetadata> streamMetadataList = Collections.emptyList();
compactionManager.updateStreamDataBlockMap(S3_WAL_OBJECT_METADATA_LIST);
compactionManager.filterInvalidStreamDataBlocks(streamMetadataList);
CommitStreamSetObjectRequest request = compactionManager.buildCompactRequest(streamMetadataList, S3_WAL_OBJECT_METADATA_LIST);

Assertions.assertEquals(-1, request.getObjectId());
Assertions.assertTrue(request.getStreamObjects().isEmpty());
Assertions.assertTrue(request.getStreamRanges().isEmpty());
assertEquals(List.of(OBJECT_0, OBJECT_1, OBJECT_2), request.getCompactedObjectIds());
Assertions.assertTrue(checkDataIntegrity(streamMetadataList, S3_WAL_OBJECT_METADATA_LIST, request));
}

@Test
public void testCompactNoneExistObjects() {
when(config.streamSetObjectCompactionStreamSplitSize()).thenReturn(100L);
Expand Down

0 comments on commit bd0b029

Please sign in to comment.