Skip to content

Commit

Permalink
KAFKA-17299: add unit tests for previous fix
Browse files Browse the repository at this point in the history
#17899 fixed the issue, but did not
add any unit tests.
  • Loading branch information
mjsax committed Nov 23, 2024
1 parent d36b24f commit 888fe6d
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -250,10 +250,11 @@ StampedRecord nextRecord(final RecordInfo info, final long wallClockTime) {

if (queue != null) {
// get the first record from this queue.
final int oldSize = queue.size();
record = queue.poll(wallClockTime);

if (record != null) {
--totalBuffered;
totalBuffered -= (oldSize - queue.size());

Check notice on line 257 in streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java

View workflow job for this annotation

GitHub Actions / build / Compile and Check Java

Checkstyle error

Unnecessary parentheses around assignment right-hand side.

if (queue.isEmpty()) {
// if a certain queue has been drained, reset the flag
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -606,6 +606,88 @@ public void shouldUpdatePartitionQueuesShrinkAndExpand() {
assertThat(group.nextRecord(new RecordInfo(), time.milliseconds()), nullValue()); // all available records removed
}

@Test
public void shouldUpdateBufferSizeCorrectlyForSkippedRecords() {
final PartitionGroup group = new PartitionGroup(
logContext,
mkMap(mkEntry(partition1, queue1)),
tp -> OptionalLong.of(0L),
getValueSensor(metrics, lastLatenessValue),
enforcedProcessingSensor,
maxTaskIdleMs
);
final List<ConsumerRecord<byte[], byte[]>> list1 = Arrays.asList(
new ConsumerRecord<>("topic", 1, 1L, recordKey, recordValue),
new ConsumerRecord<>("topic", 1, 5L, recordKey, recordValue),
new ConsumerRecord<>(
"topic",
1,
-1, // offset as invalid timestamp
-1, // invalid timestamp
TimestampType.CREATE_TIME,
0,
0,
recordKey,
recordValue,
new RecordHeaders(),
Optional.empty()
),
new ConsumerRecord<>(
"topic",
1,
11,
0,
TimestampType.CREATE_TIME,
0,
0,
new byte[0], // corrupted key
recordValue,
new RecordHeaders(),
Optional.empty()
),
new ConsumerRecord<>(
"topic",
1,
-1, // offset as invalid timestamp
-1, // invalid timestamp
TimestampType.CREATE_TIME,
0,
0,
recordKey,
recordValue,
new RecordHeaders(),
Optional.empty()
),
new ConsumerRecord<>(
"topic",
1,
13,
0,
TimestampType.CREATE_TIME,
0,
0,
recordKey,
new byte[0], // corrupted value
new RecordHeaders(),
Optional.empty()
),
new ConsumerRecord<>("topic", 1, 20L, recordKey, recordValue)
);

group.addRawRecords(partition1, list1);
assertEquals(7, group.numBuffered());

group.nextRecord(new RecordInfo(), time.milliseconds());
assertEquals(6, group.numBuffered());

// drain corrupted records
group.nextRecord(new RecordInfo(), time.milliseconds());
assertEquals(1, group.numBuffered());

group.nextRecord(new RecordInfo(), time.milliseconds());
assertEquals(0, group.numBuffered());
}

@Test
public void shouldNeverWaitIfIdlingIsDisabled() {
final PartitionGroup group = new PartitionGroup(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,8 +157,8 @@ public class StreamTaskTest {
private final LogContext logContext = new LogContext("[test] ");
private final String topic1 = "topic1";
private final String topic2 = "topic2";
private final TopicPartition partition1 = new TopicPartition(topic1, 1);
private final TopicPartition partition2 = new TopicPartition(topic2, 1);
private final TopicPartition partition1 = new TopicPartition(topic1, 0);
private final TopicPartition partition2 = new TopicPartition(topic2, 0);
private final Set<TopicPartition> partitions = new HashSet<>(List.of(partition1, partition2));
private final Serializer<Integer> intSerializer = new IntegerSerializer();
private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
Expand Down Expand Up @@ -1082,6 +1082,70 @@ public void shouldPauseAndResumeBasedOnBufferedRecords() {
assertEquals(0, consumer.paused().size());
}

@Test
public void shouldResumePartitionWhenSkippingOverRecordsWithInvalidTs() {
when(stateManager.taskId()).thenReturn(taskId);
when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
task = createStatelessTask(createConfig(
StreamsConfig.AT_LEAST_ONCE,
"-1",
LogAndContinueExceptionHandler.class,
LogAndFailProcessingExceptionHandler.class,
LogAndSkipOnInvalidTimestamp.class
));
task.initializeIfNeeded();
task.completeRestoration(noOpResetter -> { });

task.addRecords(partition1, asList(
getConsumerRecordWithOffsetAsTimestamp(partition1, 10),
getConsumerRecordWithOffsetAsTimestamp(partition1, 20),
getConsumerRecordWithInvalidTimestamp(30),
getConsumerRecordWithInvalidTimestamp(40),
getConsumerRecordWithInvalidTimestamp(50)
));
assertTrue(consumer.paused().contains(partition1));

assertTrue(task.process(0L));

task.resumePollingForPartitionsWithAvailableSpace();
assertTrue(consumer.paused().contains(partition1));

assertTrue(task.process(0L));

task.resumePollingForPartitionsWithAvailableSpace();
assertEquals(0, consumer.paused().size());

assertTrue(task.process(0L)); // drain head record (ie, last invalid record)
assertFalse(task.process(0L));
assertFalse(task.hasRecordsQueued());


// repeat test for deserialization error
task.resumePollingForPartitionsWithAvailableSpace();
task.addRecords(partition1, asList(
getConsumerRecordWithOffsetAsTimestamp(partition1, 110),
getConsumerRecordWithOffsetAsTimestamp(partition1, 120),
getCorruptedConsumerRecordWithOffsetAsTimestamp(130),
getCorruptedConsumerRecordWithOffsetAsTimestamp(140),
getCorruptedConsumerRecordWithOffsetAsTimestamp(150)
));
assertTrue(consumer.paused().contains(partition1));

assertTrue(task.process(0L));

task.resumePollingForPartitionsWithAvailableSpace();
assertTrue(consumer.paused().contains(partition1));

assertTrue(task.process(0L));

task.resumePollingForPartitionsWithAvailableSpace();
assertEquals(0, consumer.paused().size());

assertTrue(task.process(0L)); // drain head record (ie, last corrupted record)
assertFalse(task.process(0L));
assertFalse(task.hasRecordsQueued());
}

@Test
public void shouldPunctuateOnceStreamTimeAfterGap() {
when(stateManager.taskId()).thenReturn(taskId);
Expand Down Expand Up @@ -3314,7 +3378,7 @@ private ConsumerRecord<byte[], byte[]> getConsumerRecordWithOffsetAsTimestamp(fi
private ConsumerRecord<byte[], byte[]> getConsumerRecordWithOffsetAsTimestamp(final Integer key, final long offset) {
return new ConsumerRecord<>(
topic1,
1,
0,
offset,
offset, // use the offset as the timestamp
TimestampType.CREATE_TIME,
Expand All @@ -3330,7 +3394,7 @@ private ConsumerRecord<byte[], byte[]> getConsumerRecordWithOffsetAsTimestamp(fi
private ConsumerRecord<byte[], byte[]> getConsumerRecordWithInvalidTimestamp(final long offset) {
return new ConsumerRecord<>(
topic1,
1,
0,
offset,
-1L, // invalid (negative) timestamp
TimestampType.CREATE_TIME,
Expand All @@ -3347,24 +3411,24 @@ private ConsumerRecord<byte[], byte[]> getConsumerRecordWithOffsetAsTimestampWit
final long offset,
final int leaderEpoch) {
return new ConsumerRecord<>(
topicPartition.topic(),
topicPartition.partition(),
offset,
offset, // use the offset as the timestamp
TimestampType.CREATE_TIME,
0,
0,
recordKey,
recordValue,
new RecordHeaders(),
Optional.of(leaderEpoch)
topicPartition.topic(),
topicPartition.partition(),
offset,
offset, // use the offset as the timestamp
TimestampType.CREATE_TIME,
0,
0,
recordKey,
recordValue,
new RecordHeaders(),
Optional.of(leaderEpoch)
);
}

private ConsumerRecord<byte[], byte[]> getCorruptedConsumerRecordWithOffsetAsTimestamp(final long offset) {
return new ConsumerRecord<>(
topic1,
1,
0,
offset,
offset, // use the offset as the timestamp
TimestampType.CREATE_TIME,
Expand Down

0 comments on commit 888fe6d

Please sign in to comment.