From b51ec9da220842195d9b7877b54300b39c194386 Mon Sep 17 00:00:00 2001 From: Sasan Rose Date: Wed, 3 May 2023 11:34:37 +1000 Subject: [PATCH 1/2] Add mutex lock to Message struct --- public/service/message.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/public/service/message.go b/public/service/message.go index b1f4b00a2f..94ebfc82f6 100644 --- a/public/service/message.go +++ b/public/service/message.go @@ -2,6 +2,7 @@ package service import ( "context" + "sync" "github.com/benthosdev/benthos/v4/internal/bloblang/mapping" "github.com/benthosdev/benthos/v4/internal/bloblang/query" @@ -23,6 +24,7 @@ type MessageBatchHandlerFunc func(context.Context, MessageBatch) error // pipeline. It is safe to mutate the message via Set methods, but the // underlying byte data should not be edited directly. type Message struct { + Lock *sync.RWMutex part *message.Part } @@ -61,11 +63,12 @@ func (b MessageBatch) DeepCopy() MessageBatch { func NewMessage(content []byte) *Message { return &Message{ part: message.NewPart(content), + Lock: &sync.RWMutex{}, } } func newMessageFromPart(part *message.Part) *Message { - return &Message{part} + return &Message{part: part, Lock: &sync.RWMutex{}} } // Copy creates a shallow copy of a message that is safe to mutate with Set @@ -74,6 +77,7 @@ func newMessageFromPart(part *message.Part) *Message { func (m *Message) Copy() *Message { return &Message{ part: m.part.ShallowCopy(), + Lock: &sync.RWMutex{}, } } @@ -88,6 +92,7 @@ func (m *Message) Copy() *Message { func (m *Message) DeepCopy() *Message { return &Message{ part: m.part.DeepCopy(), + Lock: &sync.RWMutex{}, } } @@ -101,6 +106,7 @@ func (m *Message) Context() context.Context { func (m *Message) WithContext(ctx context.Context) *Message { return &Message{ part: message.WithContext(ctx, m.part), + Lock: &sync.RWMutex{}, } } From c54f0fe769abfbbdba3b401e6fa43f99305d9431 Mon Sep 17 00:00:00 2001 From: Sasan Rose Date: Wed, 3 May 2023 12:29:31 +1000 Subject: [PATCH 2/2] Add info about the lock in the docblock --- public/service/message.go | 1 + 1 file changed, 1 insertion(+) diff --git a/public/service/message.go b/public/service/message.go index 94ebfc82f6..daa4e301a9 100644 --- a/public/service/message.go +++ b/public/service/message.go @@ -23,6 +23,7 @@ type MessageBatchHandlerFunc func(context.Context, MessageBatch) error // Message represents a single discrete message passing through a Benthos // pipeline. It is safe to mutate the message via Set methods, but the // underlying byte data should not be edited directly. +// The Lock can be used to safely work with a Message concurrently in go routines type Message struct { Lock *sync.RWMutex part *message.Part