Skip to content

Commit

Permalink
Fixed issues after resolving conflicts
Browse files Browse the repository at this point in the history
Signed-off-by: Piotr Piotrowski <[email protected]>
  • Loading branch information
piotrpio committed Jul 25, 2024
1 parent 7ba030d commit 9ea0dae
Showing 1 changed file with 4 additions and 4 deletions.
8 changes: 4 additions & 4 deletions jetstream/ordered.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,9 @@ func (c *orderedConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt
case <-sub.done:
s := sub.consumer.currentSub
if s != nil {
s.consumer.Lock()
sub.consumer.Lock()
s.Stop()
s.consumer.Unlock()
sub.consumer.Unlock()
}
return
case msgsLeft, ok := <-c.stopAfterMsgsLeft:
Expand Down Expand Up @@ -334,10 +334,10 @@ func (s *orderedSubscription) Stop() {
if !atomic.CompareAndSwapUint32(&s.closed, 0, 1) {
return
}
s.consumer.Lock()
defer s.consumer.Unlock()
if s.consumer.currentSub != nil {
s.consumer.currentConsumer.Lock()
s.consumer.currentSub.Stop()
s.consumer.currentConsumer.Unlock()
}
close(s.done)
}
Expand Down

0 comments on commit 9ea0dae

Please sign in to comment.