diff --git a/internal/impl/publisher/direct_message_publisher_impl.go b/internal/impl/publisher/direct_message_publisher_impl.go index 042567e..5f76735 100644 --- a/internal/impl/publisher/direct_message_publisher_impl.go +++ b/internal/impl/publisher/direct_message_publisher_impl.go @@ -19,6 +19,7 @@ package publisher import ( "fmt" + "sync" "time" "solace.dev/go/messaging/internal/impl/constants" @@ -52,8 +53,9 @@ type directMessagePublisherImpl struct { // the parameters for backpressure backpressureConfiguration backpressureConfiguration // buffers for backpressure - buffer chan *publishable - taskBuffer buffer.PublisherTaskBuffer + buffer chan *publishable + taskBuffer buffer.PublisherTaskBuffer + bufferPublishLock sync.Mutex terminateWaitInterrupt chan struct{} } @@ -465,6 +467,8 @@ func (publisher *directMessagePublisherImpl) publish(msg *message.OutboundMessag } } }() + publisher.bufferPublishLock.Lock() + defer publisher.bufferPublishLock.Unlock() pub := &publishable{msg, dest} if publisher.backpressureConfiguration == backpressureConfigurationReject { select { diff --git a/internal/impl/publisher/persistent_message_publisher_impl.go b/internal/impl/publisher/persistent_message_publisher_impl.go index a74d0e8..1a4d326 100644 --- a/internal/impl/publisher/persistent_message_publisher_impl.go +++ b/internal/impl/publisher/persistent_message_publisher_impl.go @@ -54,6 +54,7 @@ type persistentMessagePublisherImpl struct { backpressureConfiguration backpressureConfiguration buffer chan *persistentPublishable taskBuffer buffer.PublisherTaskBuffer + bufferPublishLock sync.Mutex terminateWaitInterrupt chan struct{} @@ -647,6 +648,8 @@ func (publisher *persistentMessagePublisherImpl) publish(msg *message.OutboundMe } } }() + publisher.bufferPublishLock.Lock() + defer publisher.bufferPublishLock.Unlock() pub := &persistentPublishable{msg, dest, ctx} if publisher.backpressureConfiguration == backpressureConfigurationReject { select {