-
Notifications
You must be signed in to change notification settings - Fork 336
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Producer] respect context cancellation in Flush #1165
Conversation
@RobertIndie Would you mind taking a look at this please? |
pulsar/producer.go
Outdated
@@ -239,7 +239,7 @@ type Producer interface { | |||
|
|||
// Flush all the messages buffered in the client and wait until all messages have been successfully | |||
// persisted. | |||
Flush() error | |||
Flush(ctx context.Context) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While I agree that we should have taking the context in this method, right now this would be a breaking API change.
I'd say to add a new FlushWithContext()
method and, perhaps, mark the other one as deprecated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just pushed an update. Lmk how it looks!
Btw, do you know if the library will support shutting down via context cancellation in the future? There's some examples of where we don't do it:
pulsar-client-go/pulsar/producer_partition.go
Line 491 in efd9806
func (p *partitionProducer) runEventsLoop() { |
pulsar-client-go/pulsar/producer_partition.go
Lines 1470 to 1481 in efd9806
func (p *partitionProducer) Close() { | |
if p.getProducerState() != producerReady { | |
// Producer is closing | |
return | |
} | |
cp := &closeProducer{doneCh: make(chan struct{})} | |
p.cmdChan <- cp | |
// wait for close producer request to complete | |
<-cp.doneCh | |
} |
As long as things don't take too long to shut down, I suppose it's fine. Ideally, if the context is cancelled, you don't want to wait for a clean/graceful shutdown where we wait for the done channel. We want all the goroutines to terminate right where they are and immediately returned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would be happy to file an issue if there is not one already.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't implement CloseWithCtx
such as in this PR because goroutines might leak (the call to close would terminate before the signal is given on the doneCh, indicating that things have shut down). I think it would be a bigger change.
722e024
to
efd9806
Compare
efd9806
to
e046f2a
Compare
e046f2a
to
7af836b
Compare
### Motivation The producer's `Flush` method does not respect context cancellation. If the caller's context get's cancelled, it will have to wait for the producer to finish flushing. ### Modifications This change adds a `FlushWithCtx` method which takes a context and selects on two channels. (cherry picked from commit 2a28e21)
Motivation
The producer's
Flush
method does not respect context cancellation. If the caller's context get's cancelled, it will have to wait for the producer to finish flushing.Modifications
This change adds a
FlushWithCtx
method which takes a context and selects on two channels.Verifying this change
This change is already covered by existing tests. See code changes for relevant tests.
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation