Skip to content

Commit

Permalink
Merge branch 'SOL-62456' into EBP-18
Browse files Browse the repository at this point in the history
  • Loading branch information
TrentDaniel committed Jan 9, 2025
2 parents 7285966 + b41e135 commit 65e9ded
Show file tree
Hide file tree
Showing 6 changed files with 218 additions and 28 deletions.
9 changes: 9 additions & 0 deletions internal/ccsmp/ccsmp_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,15 @@ func SolClientMessageGetRGMID(messageP SolClientMessagePt) (SolClientRGMIDPt, *S
return &rmid, nil
}

// SolClientMessageGetCacheRequestID function
func SolClientMessageGetCacheRequestID(messageP SolClientMessagePt) (uint64, *SolClientErrorInfoWrapper) {
var cuint64 C.solClient_uint64_t
errorInfo := handleCcsmpError(func() SolClientReturnCode {
return C.solClient_msg_getCacheRequestId(messageP, &cuint64)
})
return uint64(cuint64), errorInfo
}

// Write only properties

// SolClientMessageSetAckImmediately function
Expand Down
66 changes: 40 additions & 26 deletions internal/impl/message/inbound_message_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,19 +56,19 @@ func newInboundMessage(msgP ccsmp.SolClientMessagePt) *InboundMessageImpl {
// block waiting for the first call to complete. Additional calls
// will return immediately. The instance is considered unusable after Dispose
// has been called.
func (message *InboundMessageImpl) Dispose() {
proceed := atomic.CompareAndSwapInt32(&message.disposed, 0, 1)
func (inboundMessage *InboundMessageImpl) Dispose() {
proceed := atomic.CompareAndSwapInt32(&inboundMessage.disposed, 0, 1)
if proceed {
// free ccsmp message pointer
freeInboundMessage(message)
freeInboundMessage(inboundMessage)
// clear the finalizer
runtime.SetFinalizer(message, nil)
runtime.SetFinalizer(inboundMessage, nil)
}
}

// free will free the underlying message pointer
func freeInboundMessage(message *InboundMessageImpl) {
err := ccsmp.SolClientMessageFree(&message.messagePointer)
func freeInboundMessage(inboundMessage *InboundMessageImpl) {
err := ccsmp.SolClientMessageFree(&inboundMessage.messagePointer)
if err != nil && logging.Default.IsErrorEnabled() {
logging.Default.Error("encountered unexpected error while freeing message pointer: " + err.GetMessageAsString() + " [sub code = " + strconv.Itoa(int(err.SubCode())) + "]")
}
Expand All @@ -77,8 +77,8 @@ func freeInboundMessage(message *InboundMessageImpl) {
// GetDestinationName gets the destination name on which the message was received.
// The destination may be either a topic or a queue.
// Returns an empty string if the information is not available.
func (message *InboundMessageImpl) GetDestinationName() string {
destName, errorInfo := ccsmp.SolClientMessageGetDestinationName(message.messagePointer)
func (inboundMessage *InboundMessageImpl) GetDestinationName() string {
destName, errorInfo := ccsmp.SolClientMessageGetDestinationName(inboundMessage.messagePointer)
if errorInfo != nil {
if errorInfo.ReturnCode == ccsmp.SolClientReturnCodeFail {
logging.Default.Debug(fmt.Sprintf("Unable to retrieve the destination this message was published to: %s, subcode: %d", errorInfo.GetMessageAsString(), errorInfo.SubCode()))
Expand All @@ -90,8 +90,8 @@ func (message *InboundMessageImpl) GetDestinationName() string {
// GetTimeStamp will get the timestamp as time.Time.
// This timestamp represents the time that the message was received by the API.
// This may differ from the time that the message is received by the MessageReceiver.
func (message *InboundMessageImpl) GetTimeStamp() (time.Time, bool) {
t, errInfo := ccsmp.SolClientMessageGetTimestamp(message.messagePointer)
func (inboundMessage *InboundMessageImpl) GetTimeStamp() (time.Time, bool) {
t, errInfo := ccsmp.SolClientMessageGetTimestamp(inboundMessage.messagePointer)
if errInfo != nil {
if errInfo.ReturnCode == ccsmp.SolClientReturnCodeFail {
logging.Default.Debug(fmt.Sprintf("Encountered error retrieving Sender Timestamp: %s, subcode: %d", errInfo.GetMessageAsString(), errInfo.SubCode()))
Expand All @@ -103,8 +103,8 @@ func (message *InboundMessageImpl) GetTimeStamp() (time.Time, bool) {

// GetSenderTimestamp will get the timestamp as time.Time.
// This timestamp is often set automatically when the message is published.
func (message *InboundMessageImpl) GetSenderTimestamp() (time.Time, bool) {
t, errInfo := ccsmp.SolClientMessageGetSenderTimestamp(message.messagePointer)
func (inboundMessage *InboundMessageImpl) GetSenderTimestamp() (time.Time, bool) {
t, errInfo := ccsmp.SolClientMessageGetSenderTimestamp(inboundMessage.messagePointer)
if errInfo != nil {
if errInfo.ReturnCode == ccsmp.SolClientReturnCodeFail {
logging.Default.Debug(fmt.Sprintf("Encountered error retrieving Sender Timestamp: %s, subcode: %d", errInfo.GetMessageAsString(), errInfo.SubCode()))
Expand All @@ -115,8 +115,8 @@ func (message *InboundMessageImpl) GetSenderTimestamp() (time.Time, bool) {
}

// GetSenderID will get the sender ID set on the message.
func (message *InboundMessageImpl) GetSenderID() (string, bool) {
id, errInfo := ccsmp.SolClientMessageGetSenderID(message.messagePointer)
func (inboundMessage *InboundMessageImpl) GetSenderID() (string, bool) {
id, errInfo := ccsmp.SolClientMessageGetSenderID(inboundMessage.messagePointer)
if errInfo != nil {
if errInfo.ReturnCode == ccsmp.SolClientReturnCodeFail {
logging.Default.Debug(fmt.Sprintf("Encountered error retrieving Sender ID: %s, subcode: %d", errInfo.GetMessageAsString(), errInfo.SubCode()))
Expand All @@ -127,13 +127,13 @@ func (message *InboundMessageImpl) GetSenderID() (string, bool) {
}

// IsRedelivered function
func (message *InboundMessageImpl) IsRedelivered() bool {
return ccsmp.SolClientMessageGetMessageIsRedelivered(message.messagePointer)
func (inboundMessage *InboundMessageImpl) IsRedelivered() bool {
return ccsmp.SolClientMessageGetMessageIsRedelivered(inboundMessage.messagePointer)
}

// GetReplicationGroupMessageID function
func (message *InboundMessageImpl) GetReplicationGroupMessageID() (rgmid.ReplicationGroupMessageID, bool) {
rmidPt, errInfo := ccsmp.SolClientMessageGetRGMID(message.messagePointer)
func (inboundMessage *InboundMessageImpl) GetReplicationGroupMessageID() (rgmid.ReplicationGroupMessageID, bool) {
rmidPt, errInfo := ccsmp.SolClientMessageGetRGMID(inboundMessage.messagePointer)
if errInfo != nil {
if errInfo.ReturnCode == ccsmp.SolClientReturnCodeFail {
logging.Default.Debug(fmt.Sprintf("Encountered error retrieving ReplicationGroupMessageID: %s, subcode: %d", errInfo.GetMessageAsString(), errInfo.SubCode()))
Expand All @@ -146,17 +146,31 @@ func (message *InboundMessageImpl) GetReplicationGroupMessageID() (rgmid.Replica
// GetMessageDiscardNotification retrieves the message discard notification about
// previously discarded messages. Returns a MessageDiscardNotification, not expected
// to be nil.
func (message *InboundMessageImpl) GetMessageDiscardNotification() message.MessageDiscardNotification {
if message.IsDisposed() {
func (inboundMessage *InboundMessageImpl) GetMessageDiscardNotification() message.MessageDiscardNotification {
if inboundMessage.IsDisposed() {
logging.Default.Warning("Failed to retrieve discard notification: Bad msg_p pointer '0x0'")
return nil
}
return &discardNotification{
internalDiscard: message.internalDiscard,
brokerDiscard: ccsmp.SolClientMessageGetMessageDiscardNotification(message.messagePointer),
internalDiscard: inboundMessage.internalDiscard,
brokerDiscard: ccsmp.SolClientMessageGetMessageDiscardNotification(inboundMessage.messagePointer),
}
}

// GetCacheRequestID retrieves the [CacheRequestID] of the message
// and a [True] result if the message was received as a part of a
// cache response. Otherwise, returns 0 and False.
func (inboundMessage *InboundMessageImpl) GetCacheRequestID() (message.CacheRequestID, bool) {
cacheID, errInfo := ccsmp.SolClientMessageGetCacheRequestID(inboundMessage.messagePointer)
if errInfo != nil {
if errInfo.ReturnCode == ccsmp.SolClientReturnCodeFail {
logging.Default.Info(fmt.Sprintf("Encountered error retrieving Cache ID: %s, subcode: %d", errInfo.GetMessageAsString(), errInfo.SubCode))
}
return 0, false
}
return message.CacheRequestID(cacheID), true
}

type discardNotification struct {
internalDiscard, brokerDiscard bool
}
Expand Down Expand Up @@ -184,17 +198,17 @@ func (notification *discardNotification) String() string {
type MessageID = ccsmp.SolClientMessageID

// GetMessageID function
func GetMessageID(message *InboundMessageImpl) (MessageID, bool) {
id, err := ccsmp.SolClientMessageGetMessageID(message.messagePointer)
func GetMessageID(inboundMessage *InboundMessageImpl) (MessageID, bool) {
id, err := ccsmp.SolClientMessageGetMessageID(inboundMessage.messagePointer)
if err != nil {
return 0, false
}
return id, true
}

// GetReplyToDestinationName function
func GetReplyToDestinationName(message *InboundMessageImpl) (string, bool) {
destName, errorInfo := ccsmp.SolClientMessageGetReplyToDestinationName(message.messagePointer)
func GetReplyToDestinationName(inboundMessage *InboundMessageImpl) (string, bool) {
destName, errorInfo := ccsmp.SolClientMessageGetReplyToDestinationName(inboundMessage.messagePointer)
if errorInfo != nil {
if errorInfo.ReturnCode == ccsmp.SolClientReturnCodeFail {
logging.Default.Debug(fmt.Sprintf("Unable to retrieve the reply to destination this message was published to: %s, subcode: %d", errorInfo.GetMessageAsString(), errorInfo.SubCode()))
Expand Down
53 changes: 53 additions & 0 deletions internal/impl/message/inbound_message_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// pubsubplus-go-client
//
// Copyright 2021-2024 Solace Corporation. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package message

import (
"testing"

"solace.dev/go/messaging/internal/ccsmp"
)

func TestGetCacheRequestID(t *testing.T) {
msgP, ccsmpErr := ccsmp.SolClientMessageAlloc()
if ccsmpErr != nil {
t.Error("did not expect error, got " + ccsmpErr.GetMessageAsString())
}
msg := NewInboundMessage(msgP, false)
if msg.messagePointer == nil {
t.Error("expected message pointer to not be nil")
}
if msg.IsDisposed() {
t.Error("message is disposed before disposed called")
}

// get the cache ID
cacheID, ok := msg.GetCacheRequestID()
if ok {
t.Error("expected GetCacheRequestID() function to return cacheID of 0 and false for Live inbound message")
}

// should not have any cacheID from a Live (normal) message
if cacheID != 0 {
t.Error("expected cacheID from Live inbound message to be zero")
}

msg.Dispose()
if !msg.IsDisposed() {
t.Error("IsDisposed returned false, expected true")
}
}
8 changes: 7 additions & 1 deletion pkg/solace/message/inbound_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import (
"solace.dev/go/messaging/pkg/solace/message/rgmid"
)

// CacheRequestID is a type to be used for correlating received, previously cached messages with their associated cache response.
// CacheRequestID - a type to be used for correlating received,
// previously cached messages with their associated cache response.
type CacheRequestID uint64

// InboundMessage represents a message received by a consumer.
Expand Down Expand Up @@ -63,6 +64,11 @@ type InboundMessage interface {
// IsRedelivered retrieves the message's redelivery status. Returns true if the message
// redelivery occurred in the past, otherwise false.
IsRedelivered() bool

// GetCacheRequestID retrieves the [CacheRequestID] of the message
// and a [True] result if the message was received as a part of a
// cache response. Otherwise, returns 0 and False.
GetCacheRequestID() (CacheRequestID, bool)
}

// MessageDiscardNotification is used to indicate that there are discarded messages.
Expand Down
90 changes: 89 additions & 1 deletion test/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package test

import (
"fmt"
"time"

"solace.dev/go/messaging"
Expand All @@ -28,6 +27,8 @@ import (
"solace.dev/go/messaging/pkg/solace/metrics"
"solace.dev/go/messaging/pkg/solace/resource"
"solace.dev/go/messaging/pkg/solace/subcode"
"solace.dev/go/messaging/pkg/solace/message"
"solace.dev/go/messaging/pkg/solace/resource"
"solace.dev/go/messaging/test/helpers"
"solace.dev/go/messaging/test/testcontext"

Expand Down Expand Up @@ -923,4 +924,91 @@ var _ = Describe("Cache Strategy", func() {
}
})
})

var _ = Describe("Remote Cache Message Tests", func() {
// The following tests are just placeholders until the actual implememntation
// for retrieving cache messages has been completed.
// They should be modified to real tests when we have the implementation to retrieve cache messages.

const topic = "remote-cache-message-tests"

var messagingService solace.MessagingService
var messageBuilder solace.OutboundMessageBuilder

BeforeEach(func() {
CheckCache() // skips test with message if cache image is not available
CheckCacheProxy() // skips test with message if cache proxy image is not available

builder := messaging.NewMessagingServiceBuilder().
FromConfigurationProvider(helpers.DefaultConfiguration())

var err error
messagingService, err = builder.Build()
Expect(err).ToNot(HaveOccurred())
messageBuilder = messagingService.MessageBuilder()
})

Describe("Published and received outbound message", func() {
var publisher solace.DirectMessagePublisher
var receiver solace.DirectMessageReceiver
var inboundMessageChannel chan message.InboundMessage

BeforeEach(func() {
var err error
err = messagingService.Connect()
Expect(err).ToNot(HaveOccurred())

publisher, err = messagingService.CreateDirectMessagePublisherBuilder().Build()
Expect(err).ToNot(HaveOccurred())
receiver, err = messagingService.CreateDirectMessageReceiverBuilder().
WithSubscriptions(resource.TopicSubscriptionOf(topic)).
Build()
Expect(err).ToNot(HaveOccurred())

err = publisher.Start()
Expect(err).ToNot(HaveOccurred())

inboundMessageChannel = make(chan message.InboundMessage)
receiver.ReceiveAsync(func(inboundMessage message.InboundMessage) {
inboundMessageChannel <- inboundMessage
})

err = receiver.Start()
Expect(err).ToNot(HaveOccurred())
})

AfterEach(func() {
var err error
err = publisher.Terminate(10 * time.Second)
Expect(err).ToNot(HaveOccurred())
err = receiver.Terminate(10 * time.Second)
Expect(err).ToNot(HaveOccurred())

err = messagingService.Disconnect()
Expect(err).ToNot(HaveOccurred())
})

// EBP-24 (second test case): Cache inbound message - check that messages returned as part of a cache response
// have valid cached request ID (calling GetCachedRequestID() on a cache message returns the ID and true)
It("should retrieve the valid cache request ID from received Cached message", func() {
msg, err := messageBuilder.BuildWithStringPayload("hello world")
Expect(err).ToNot(HaveOccurred())

publisher.Publish(msg, resource.TopicOf(topic))

select {
case inboundMessage := <-inboundMessageChannel:
cacheRequestID, ok := inboundMessage.GetCacheRequestID()
// @TODO: EBP-24: Modify these assertions for better test
// coverage when the feature to retrieve cache messages is done

Expect(ok).To(BeFalse()) // for a CACHE message
Expect(cacheRequestID).To(Equal(message.CacheRequestID(0))) // for a CACHE message
case <-time.After(1 * time.Second):
Fail("timed out waiting for message to be delivered")
}
})

})

})
20 changes: 20 additions & 0 deletions test/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1406,6 +1406,24 @@ var _ = Describe("Remote Message Tests", func() {
}
})

// Cache inbound message - check that messages that are not part of a cache response
// have no cached request ID (calling GetCachedRequestID() on a normal/live message returns 0 and false)
It("should get cache request ID of 0 from a received LIVE message", func() {
msg, err := messageBuilder.BuildWithStringPayload("hello world")
Expect(err).ToNot(HaveOccurred())

publisher.Publish(msg, resource.TopicOf(topic))

select {
case inboundMessage := <-inboundMessageChannel:
cacheRequestID, ok := inboundMessage.GetCacheRequestID()
Expect(ok).To(BeFalse()) // for a LIVE message
Expect(cacheRequestID).To(Equal(message.CacheRequestID(0))) // for a LIVE message
case <-time.After(1 * time.Second):
Fail("timed out waiting for message to be delivered")
}
})

inboundMessageGetterList := map[string](func(msg message.InboundMessage) func()){
"GetApplicationMessageID": func(msg message.InboundMessage) func() { return func() { msg.GetApplicationMessageID() } },
"GetApplicationMessageType": func(msg message.InboundMessage) func() { return func() { msg.GetApplicationMessageType() } },
Expand All @@ -1431,6 +1449,8 @@ var _ = Describe("Remote Message Tests", func() {
Expect(discardNotification).To(BeNil())
}
},

"GetCacheRequestID": func(msg message.InboundMessage) func() { return func() { msg.GetCacheRequestID() } },
}

for functionName, getter := range inboundMessageGetterList {
Expand Down

0 comments on commit 65e9ded

Please sign in to comment.