You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
{{ message }}
This repository has been archived by the owner on Oct 23, 2023. It is now read-only.
Shutdown is called on the KinesisConsumer (either explicitly or via the jvm shutdown hook)
This then calls requestShutdown on the KCL Worker, blocking until completion.
The KCL Worker propagates this down to the ConsumerProcessingManager (Which is the IRecordProcessor) - calling shutdownRequested on each instance (one per shard).
When shutdownRequested is called, this sends a GracefulShutdown message to the ConsumerWorker Actor, blocking until a response is received (Ask + Await).
On receipt of this message, the ConsumerWorker switches context to ignore all future messages. If a batch is currently being processed, it responds to the sender of that batch (the manager), which will currently be blocking awaiting confirmation of the batch (this is by design, the KCL requires that we don't complete the processRecords function until we have finished the batch, otherwise the next batch is immediately sent)
The ConsumerWorker then forces a final checkpoint, responding to the manager once completed (or failed), which allows shutdown to continue and the KinesisConsumer to shutdown.
So this all sounds great, however if we're processing a batch (and therefore blocking processRecords), the KCL doesn't allocate a separate thread to call shutdownRequested. This means that even though in the ConsumerWorker we allow the batch processing to be aborted early, this never happens because until batch processing is complete the processRecords thread is blocked.
Possible solutions
What needs to happen is the KCL calls requestShutdown on a separate thread, we'll then unblock processRecords automatically and checkpoint accordingly. This will require a change to the KCL (assuming the issue is indeed with the KCL). We'd need to write a test which reproduces this (using Java). The raising an issue in the KCL github for them to fix it.
Alternatively, maybe the issue is with us? Potentially the GracefulShutdown message is stuck in the mailbox whilst we process the batch. If this is the case (a test could prove this where the message isn't acked before sending shutdown), then one option is to use the a priority mailbox to allow GracefulShutdown message a higher priority - skipping the queue.
The text was updated successfully, but these errors were encountered:
Currently the Kinesis Shutdown works as follows:
KinesisConsumer
(either explicitly or via the jvm shutdown hook)requestShutdown
on the KCL Worker, blocking until completion.ConsumerProcessingManager
(Which is theIRecordProcessor
) - callingshutdownRequested
on each instance (one per shard).shutdownRequested
is called, this sends aGracefulShutdown
message to theConsumerWorker
Actor, blocking until a response is received (Ask + Await).ConsumerWorker
switches context to ignore all future messages. If a batch is currently being processed, it responds to the sender of that batch (the manager), which will currently be blocking awaiting confirmation of the batch (this is by design, the KCL requires that we don't complete theprocessRecords
function until we have finished the batch, otherwise the next batch is immediately sent)ConsumerWorker
then forces a final checkpoint, responding to the manager once completed (or failed), which allows shutdown to continue and theKinesisConsumer
to shutdown.So this all sounds great, however if we're processing a batch (and therefore blocking processRecords), the KCL doesn't allocate a separate thread to call
shutdownRequested
. This means that even though in theConsumerWorker
we allow the batch processing to be aborted early, this never happens because until batch processing is complete theprocessRecords
thread is blocked.Possible solutions
What needs to happen is the KCL calls
requestShutdown
on a separate thread, we'll then unblockprocessRecords
automatically and checkpoint accordingly. This will require a change to the KCL (assuming the issue is indeed with the KCL). We'd need to write a test which reproduces this (using Java). The raising an issue in the KCL github for them to fix it.Alternatively, maybe the issue is with us? Potentially the
GracefulShutdown
message is stuck in the mailbox whilst we process the batch. If this is the case (a test could prove this where the message isn't acked before sending shutdown), then one option is to use the a priority mailbox to allowGracefulShutdown
message a higher priority - skipping the queue.The text was updated successfully, but these errors were encountered: