Skip to content

Commit

Permalink
Fixed an issue where multiple goroutines publishing to the same publi…
Browse files Browse the repository at this point in the history
…sher may lead to errors (#10)

SOL-85837
  • Loading branch information
mcardy authored Jan 13, 2023
1 parent 43da94d commit 95e9618
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 2 deletions.
8 changes: 6 additions & 2 deletions internal/impl/publisher/direct_message_publisher_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package publisher

import (
"fmt"
"sync"
"time"

"solace.dev/go/messaging/internal/impl/constants"
Expand Down Expand Up @@ -52,8 +53,9 @@ type directMessagePublisherImpl struct {
// the parameters for backpressure
backpressureConfiguration backpressureConfiguration
// buffers for backpressure
buffer chan *publishable
taskBuffer buffer.PublisherTaskBuffer
buffer chan *publishable
taskBuffer buffer.PublisherTaskBuffer
bufferPublishLock sync.Mutex

terminateWaitInterrupt chan struct{}
}
Expand Down Expand Up @@ -465,6 +467,8 @@ func (publisher *directMessagePublisherImpl) publish(msg *message.OutboundMessag
}
}
}()
publisher.bufferPublishLock.Lock()
defer publisher.bufferPublishLock.Unlock()
pub := &publishable{msg, dest}
if publisher.backpressureConfiguration == backpressureConfigurationReject {
select {
Expand Down
3 changes: 3 additions & 0 deletions internal/impl/publisher/persistent_message_publisher_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ type persistentMessagePublisherImpl struct {
backpressureConfiguration backpressureConfiguration
buffer chan *persistentPublishable
taskBuffer buffer.PublisherTaskBuffer
bufferPublishLock sync.Mutex

terminateWaitInterrupt chan struct{}

Expand Down Expand Up @@ -647,6 +648,8 @@ func (publisher *persistentMessagePublisherImpl) publish(msg *message.OutboundMe
}
}
}()
publisher.bufferPublishLock.Lock()
defer publisher.bufferPublishLock.Unlock()
pub := &persistentPublishable{msg, dest, ctx}
if publisher.backpressureConfiguration == backpressureConfigurationReject {
select {
Expand Down

0 comments on commit 95e9618

Please sign in to comment.