Skip to content

Commit

Permalink
Remove ShutdownNotificationAware and update javadocs (#1358)
Browse files Browse the repository at this point in the history
* Deprecate ShutdownNotificationAware and update javadocs

ShutdownNotificationAware is not used by KCL, this PR
marks it as deprecated and updates the javadoc

Co-authored-by: nakulj <[email protected]>
  • Loading branch information
akidambisrinivasan and nakulj authored Jul 2, 2024
1 parent 715690d commit 6cba7f4
Show file tree
Hide file tree
Showing 4 changed files with 12 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@
import software.amazon.kinesis.processor.FormerStreamsLeasesDeletionStrategy;
import software.amazon.kinesis.processor.ProcessorConfig;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.processor.ShutdownNotificationAware;
import software.amazon.kinesis.processor.StreamTracker;
import software.amazon.kinesis.retrieval.AggregatorUtil;
import software.amazon.kinesis.retrieval.RecordsPublisher;
Expand Down Expand Up @@ -768,8 +767,8 @@ boolean shouldShutdown() {
}

/**
* Requests a graceful shutdown of the worker, notifying record processors, that implement
* {@link ShutdownNotificationAware}, of the impending shutdown. This gives the record processor a final chance to
* Requests a graceful shutdown of the worker, notifying record processors
* of the impending shutdown. This gives the record processor a final chance to
* checkpoint.
*
* This will only create a single shutdown future. Additional attempts to start a graceful shutdown will return the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.processor.ShutdownNotificationAware;
import software.amazon.kinesis.processor.ShardRecordProcessor;

/**
* Contains callbacks for completion of stages in a requested record processor shutdown.
Expand All @@ -45,7 +45,7 @@ public class ShardConsumerShutdownNotification implements ShutdownNotification {
* the lease that this shutdown request will free once initial shutdown is complete
* @param notificationCompleteLatch
* used to inform the caller once the
* {@link ShutdownNotificationAware} object has been
* {@link ShardRecordProcessor} object has been
* notified of the shutdown request.
* @param shutdownCompleteLatch
* used to inform the caller once the record processor is fully shutdown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

/**
* Allows a record processor to indicate it's aware of requested shutdowns, and handle the request.
* @deprecated This class is not used, {@link ShardRecordProcessor} provide shutdownRequested
* notifications already.
*/
@Deprecated
public interface ShutdownNotificationAware {

/**
* Called when the worker has been requested to shutdown, and gives the record processor a chance to checkpoint.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,13 @@
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.RecordProcessorCheckpointer;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShutdownNotificationAware;
import software.amazon.kinesis.retrieval.KinesisClientRecord;

/**
* Streamlet that tracks records it's seen - useful for testing.
*/
@Slf4j
public class TestStreamlet implements ShardRecordProcessor, ShutdownNotificationAware {
public class TestStreamlet implements ShardRecordProcessor {
private List<KinesisClientRecord> records = new ArrayList<>();

private Set<String> processedSeqNums = new HashSet<String>(); // used for deduping
Expand Down Expand Up @@ -139,7 +138,10 @@ public void shardEnded(ShardEndedInput shardEndedInput) {
}

@Override
public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {}
public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
shutdownNotificationCalled = true;
notifyShutdownLatch.countDown();
}

/**
* @return the shardId
Expand All @@ -166,12 +168,6 @@ public boolean isShutdownNotificationCalled() {
return shutdownNotificationCalled;
}

@Override
public void shutdownRequested(RecordProcessorCheckpointer checkpointer) {
shutdownNotificationCalled = true;
notifyShutdownLatch.countDown();
}

public CountDownLatch getInitializeLatch() {
return initializeLatch;
}
Expand Down

0 comments on commit 6cba7f4

Please sign in to comment.