Skip to content

Commit

Permalink
Merge pull request #44 from SolaceDev/SOL-111699--SOL-112355
Browse files Browse the repository at this point in the history
SOL-112355: Integration tests to cover the processing of requests and responses
  • Loading branch information
cjwmorgan-sol authored Mar 13, 2024
2 parents b970424 + adace85 commit 2f33dab
Show file tree
Hide file tree
Showing 2 changed files with 276 additions and 72 deletions.
16 changes: 9 additions & 7 deletions test/helpers/messaging_service_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,32 +341,34 @@ func PublishNRequestReplyMessages(messagingService solace.MessagingService, topi
}

// A handler for the request-reply publisher
ret := make(chan message.InboundMessage, n)
replyHandler := func(message message.InboundMessage, userContext interface{}, err error) {
replyChannel := make(chan message.InboundMessage)
replyHandler := func(inboundMessage message.InboundMessage, userContext interface{}, err error) {
go func() {
ret <- message
replyChannel <- inboundMessage
}()
}

publisher, err := messagingService.RequestReply().CreateRequestReplyMessagePublisherBuilder().OnBackPressureReject(0).Build()
ExpectWithOffset(1, err).ToNot(HaveOccurred(), "Expected request-reply publisher to build without error")
ExpectWithOffset(1, publisher.Start()).ToNot(HaveOccurred(), "Expected request-reply publisher to start without error")

builder := messagingService.MessageBuilder()

for i := 0; i < n; i++ {
msgPayload := str
if len(template) == 0 {
msgPayload = fmt.Sprintf(str, i)
}
msg, err := messagingService.MessageBuilder().BuildWithStringPayload(msgPayload)
msg, err := builder.BuildWithStringPayload(msgPayload)
ExpectWithOffset(1, err).ToNot(HaveOccurred(), "Expected message to build without error")

pubErr := publisher.Publish(msg, replyHandler, resource.TopicOf(topic), timeOut, config.MessagePropertyMap{
err = publisher.Publish(msg, replyHandler, resource.TopicOf(topic), timeOut, config.MessagePropertyMap{
config.MessagePropertyCorrelationID: fmt.Sprint(i),
}, nil /* usercontext */)
ExpectWithOffset(1, pubErr).ToNot(HaveOccurred(), "Expected publish to be successful")
ExpectWithOffset(1, err).ToNot(HaveOccurred(), "Expected publish to be successful")
}
ExpectWithOffset(1, publisher.Terminate(10*time.Second)).ToNot(HaveOccurred(), "Expected request-reply publisher to terminate gracefully")
return ret
return replyChannel
}

// ReceiveOneMessage function
Expand Down
332 changes: 267 additions & 65 deletions test/request_reply_message_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package test
import (
"fmt"
"net/url"
"sync/atomic"
"time"

"solace.dev/go/messaging"
Expand Down Expand Up @@ -761,80 +762,281 @@ var _ = Describe("RequestReplyReceiver", func() {
Eventually(terminateChannel).Should(Receive(BeNil()))
})

It("should time out waiting for a message", func() {
done := make(chan struct{})
go func() {
defer GinkgoRecover()
msg, replier, err := receiver.ReceiveMessage(1 * time.Second)
Expect(msg).To(BeNil())
Expect(replier).To(BeNil())
helpers.ValidateError(err, &solace.TimeoutError{})
close(done)
}()
// We want to make sure that it does not close for at least 500ms
Consistently(done, 500*time.Millisecond).ShouldNot(BeClosed())
// we want it to be closed after 1 second though
Eventually(done).Should(BeClosed())
})
// SOL-112355 - the tests to cover request/reply processing using RequestReply Publisher/Receiver
Describe("for request/reply processing", func() {

It("should time out waiting for a message", func() {
done := make(chan struct{})
go func() {
defer GinkgoRecover()
msg, replier, err := receiver.ReceiveMessage(1 * time.Second)
Expect(msg).To(BeNil())
Expect(replier).To(BeNil())
helpers.ValidateError(err, &solace.TimeoutError{})
close(done)
}()
// We want to make sure that it does not close for at least 500ms
Consistently(done, 500*time.Millisecond).ShouldNot(BeClosed())
// we want it to be closed after 1 second
Eventually(done).Should(BeClosed())
})

It("should be able to continue message delivery when a receive async panics", func() {
msgReceived := make(chan message.InboundMessage)
receiver.ReceiveAsync(func(inboundMessage message.InboundMessage, replier solace.Replier) {
msgReceived <- inboundMessage
panic("everybody stay calm, this should still pass")
It("should be able to continue message delivery when a receive async panics", func() {
msgReceived := make(chan message.InboundMessage)
receiver.ReceiveAsync(func(inboundMessage message.InboundMessage, replier solace.Replier) {
msgReceived <- inboundMessage
panic("this should still pass even though this Panic occurred")
})
const payloadOne = "one"
const payloadTwo = "two"
helpers.PublishNRequestReplyMessages(messagingService, topicString, publishTimeOut, 1, payloadOne)

// we should receive a message
var msg message.InboundMessage
Eventually(msgReceived).Should(Receive(&msg))
payload, ok := msg.GetPayloadAsString()
Expect(ok).To(BeTrue())
Expect(payload).To(Equal(payloadOne))
helpers.PublishNRequestReplyMessages(messagingService, topicString, publishTimeOut, 1, payloadTwo)
// we should continue to receive messages
Eventually(msgReceived).Should(Receive(&msg))
payload, ok = msg.GetPayloadAsString()
Expect(ok).To(BeTrue())
Expect(payload).To(Equal(payloadTwo))
// we should be able to terminate
Expect(receiver.Terminate(10 * time.Second)).ToNot(HaveOccurred())
})
const payloadOne = "one"
const payloadTwo = "two"
helpers.PublishNRequestReplyMessages(messagingService, topicString, publishTimeOut, 1, payloadOne)

// we should receive a message
var msg message.InboundMessage
Eventually(msgReceived).Should(Receive(&msg))
payload, ok := msg.GetPayloadAsString()
Expect(ok).To(BeTrue())
Expect(payload).To(Equal(payloadOne))
helpers.PublishNRequestReplyMessages(messagingService, topicString, publishTimeOut, 1, payloadTwo)
// we should continue to receive messages
Eventually(msgReceived).Should(Receive(&msg))
payload, ok = msg.GetPayloadAsString()
Expect(ok).To(BeTrue())
Expect(payload).To(Equal(payloadTwo))
// we should be able to terminate
Expect(receiver.Terminate(10 * time.Second)).ToNot(HaveOccurred())
})

It("should wait indefinitely for a message", func() {
done := make(chan struct{})
go func() {
defer GinkgoRecover()
It("should wait indefinitely for a message", func() {
done := make(chan struct{})
go func() {
defer GinkgoRecover()
msg, replier, err := receiver.ReceiveMessage(-1)
Expect(err).ToNot(HaveOccurred())
Expect(msg).ToNot(BeNil())
Expect(replier).ToNot(BeNil())
time.Sleep(3 * time.Second)
close(done)
}()
Consistently(done, 2*time.Second).ShouldNot(BeClosed()) // less than receive function's sleep
helpers.PublishNRequestReplyMessages(messagingService, topicString, publishTimeOut, 1)
Eventually(done).Should(BeClosed())
})

It("should be interrupted while waiting for a message", func() {
done := make(chan struct{})
go func() {
defer GinkgoRecover()
msg, replier, err := receiver.ReceiveMessage(-1)
Expect(msg).To(BeNil())
Expect(replier).To(BeNil())
helpers.ValidateError(err, &solace.IllegalStateError{}, "terminated")
close(done)
}()
Consistently(done, 1*time.Second).ShouldNot(BeClosed())
terminateChannel := receiver.TerminateAsync(10 * time.Second)
Eventually(terminateChannel).Should(Receive(BeNil()))
Eventually(done).Should(BeClosed())
})

receiverFunctions := map[string](func(solace.RequestReplyMessageReceiver, int) chan message.InboundMessage){
"Receive": func(rrmr solace.RequestReplyMessageReceiver, count int) chan message.InboundMessage {
msgChannel := make(chan message.InboundMessage, count)
for i := 0; i < count; i++ {
go func() {
defer GinkgoRecover()
msg, replier, err := rrmr.ReceiveMessage(-1)
Expect(err).ToNot(HaveOccurred())
Expect(msg).ToNot(BeNil())
Expect(replier).ToNot(BeNil())
err = replier.Reply(helpers.NewMessage(messagingService, "Pong")) // send reply back
Expect(err).ToNot(HaveOccurred())
msgChannel <- msg // put into channel
}()
}
return msgChannel
},
"ReceiveAsync": func(rrmr solace.RequestReplyMessageReceiver, count int) chan message.InboundMessage {
msgChannel := make(chan message.InboundMessage, count)
rrmr.ReceiveAsync(func(msg message.InboundMessage, replier solace.Replier) {
go func() { // send reply in a new go routine
err := replier.Reply(helpers.NewMessage(messagingService, "Pong")) // send reply back
Expect(err).ToNot(HaveOccurred())
msgChannel <- msg // put into channel
}()
})
return msgChannel
},
}

for receiverFunctionName, receiverFunction := range receiverFunctions {
receiverFunc := receiverFunction

It("should be able to receive a message successfully with "+receiverFunctionName+"() function", func() {
// send message
var publishReplyChan chan message.InboundMessage
go func() {
defer GinkgoRecover()
publishReplyChan = helpers.PublishNRequestReplyMessages(messagingService, topicString, publishTimeOut, 1, "Ping")
}()

// receive the request message and send back a reply
receivedMsgChannel := receiverFunc(receiver, 1)

// check that the message & reply was sent via semp
client := helpers.GetClient(messagingService)
Expect(client.DataTxMsgCount).To(Equal(int64(2)))

select {
case receivedMessage := <-receivedMsgChannel:
Expect(receivedMessage).ToNot(BeNil())
content, ok := receivedMessage.GetPayloadAsString()
Expect(ok).To(BeTrue())
Expect(content).To(Equal("Ping"))

// for the reply message
var replyMessage message.InboundMessage
Eventually(publishReplyChan).Should(Receive(&replyMessage)) // something was published
Expect(replyMessage).ToNot(BeNil())
content, ok = replyMessage.GetPayloadAsString()
Expect(ok).To(BeTrue())
Expect(content).To(Equal("Pong"))
case <-time.After(gracePeriod):
Fail("Timed out waiting to receive message")
}

Expect(messagingService.Metrics().GetValue(metrics.DirectMessagesReceived)).To(Equal(uint64(2))) // send & reply
})

It("should be able to receive multiple messages successfully with "+receiverFunctionName+"() function", func() {
// send messages
sentMessage := 500
const publishTimeOut = 3 * time.Second

var err error
// we need to properly configure the receiver's buffer to handle > 50 published messages
receiver, err = builder.OnBackPressureDropLatest(uint(sentMessage)).Build(resource.TopicSubscriptionOf(topicString))
Expect(err).ToNot(HaveOccurred())
err = receiver.Start()
Expect(err).ToNot(HaveOccurred())

// publish the request messages
var publishReplyChan chan message.InboundMessage
go func() {
publishReplyChan = helpers.PublishNRequestReplyMessages(messagingService, topicString, publishTimeOut, sentMessage, "Ping")
}()

receivedMsgChannel := receiverFunc(receiver, sentMessage)
Eventually(receivedMsgChannel).Should(HaveLen(sentMessage)) // replies should be sent back
// message in the channel should be a request message
receivedMessage := <-receivedMsgChannel
content, ok := receivedMessage.GetPayloadAsString()
Expect(ok).To(BeTrue())
Expect(content).To(Equal("Ping"))

select {
// message in the reply channel should be a reply message
case replyMessage := <-publishReplyChan:
Expect(replyMessage).ToNot(BeNil())
content, ok := replyMessage.GetPayloadAsString()
Expect(ok).To(BeTrue())
Expect(content).To(Equal("Pong"))
case <-time.After(20 * time.Second):
Fail("Timed out waiting to receive reply message")
}

// check that the message & reply was sent via semp
client := helpers.GetClient(messagingService)
Expect(client.DataTxMsgCount).To(Equal(int64(sentMessage * 2))) // requests & replies
Expect(messagingService.Metrics().GetValue(metrics.DirectMessagesReceived)).To(Equal(uint64(sentMessage * 2))) // send & reply
})
}

It("should properly handle direct massages published to request-reply topic with the Receive() function", func() {
helpers.PublishOneMessage(messagingService, topicString, "Ping") // publish direct message
// block and wait for direct message
msg, replier, err := receiver.ReceiveMessage(-1)
Expect(err).ToNot(HaveOccurred()) // not expecting an error
Expect(msg).ToNot(BeNil()) // we should receive a message
Expect(replier).To(BeNil()) // but no replier since this should be a direct message

// publish the request messages on another thread to same topic
helpers.PublishNRequestReplyMessages(messagingService, topicString, publishTimeOut, 1, "Ping")

// block and wait for request message
msg, replier, err = receiver.ReceiveMessage(-1)
Expect(err).ToNot(HaveOccurred())
Expect(msg).ToNot(BeNil())
Expect(replier).ToNot(BeNil())
time.Sleep(3 * time.Second)
close(done)
}()
Consistently(done, 2*time.Second).ShouldNot(BeClosed()) // less than receive function's sleep
helpers.PublishNRequestReplyMessages(messagingService, topicString, publishTimeOut, 1)
Eventually(done).Should(BeClosed())
})
err = replier.Reply(helpers.NewMessage(messagingService, "Pong")) // send reply back
Expect(err).ToNot(HaveOccurred())

It("should be interrupted while waiting for a message", func() {
done := make(chan struct{})
go func() {
defer GinkgoRecover()
msg, replier, err := receiver.ReceiveMessage(-1)
Expect(msg).To(BeNil())
Expect(replier).To(BeNil())
helpers.ValidateError(err, &solace.IllegalStateError{}, "terminated")
close(done)
}()
Consistently(done, 1*time.Second).ShouldNot(BeClosed())
terminateChannel := receiver.TerminateAsync(10 * time.Second)
Eventually(terminateChannel).Should(Receive(BeNil()))
Eventually(done).Should(BeClosed())
})
// check that the message & reply was sent via semp
client := helpers.GetClient(messagingService)
Expect(client.DataTxMsgCount).To(Equal(int64(3))) // 1 direct, 1 request and 1 reply message // requests & replies
Expect(messagingService.Metrics().GetValue(metrics.DirectMessagesReceived)).To(Equal(uint64(3))) // 3 messages
})

It("should properly handle direct messages published to request-reply topic with the ReceiveAsync() function", func() {
rRMessagesCount := uint64(0)
directMessagesCount := uint64(0)

publishedRRMessages := 250
publishedDirectMessages := 250
receiverBackPressureBufferLength := publishedRRMessages + publishedDirectMessages

// create a receiver with an adequate buffer size
var err error
// we need to properly configure the receiver's buffer to handle > 50 published messages
receiver, err = builder.OnBackPressureDropLatest(uint(receiverBackPressureBufferLength)).Build(resource.TopicSubscriptionOf(topicString))
Expect(err).ToNot(HaveOccurred())
err = receiver.Start()
Expect(err).ToNot(HaveOccurred())

receiver.ReceiveAsync(func(inboundMessage message.InboundMessage, replier solace.Replier) {
Expect(inboundMessage).ToNot(BeNil()) // we should receive a message
if replier != nil {
// request-reply message received
err := replier.Reply(helpers.NewMessage(messagingService, "Pong")) // send reply back
Expect(err).ToNot(HaveOccurred())
atomic.AddUint64(&rRMessagesCount, 1) // increment
} else {
// direct message received
atomic.AddUint64(&directMessagesCount, 1) // increment
}
})

for i := 0; i < publishedDirectMessages; i++ {
helpers.PublishOneMessage(messagingService, topicString, "Ping") // publish direct message
}

// publish the request messages on another thread to same topic
publishChan := helpers.PublishNRequestReplyMessages(messagingService, topicString, publishTimeOut, publishedRRMessages, "Ping")

select {
// last message (any message) in the reply channel should be a reply message
case replyMessage := <-publishChan:
Expect(replyMessage).ToNot(BeNil())
content, ok := replyMessage.GetPayloadAsString()
Expect(ok).To(BeTrue())
Expect(content).To(Equal("Pong"))
case <-time.After(20 * time.Second):
Fail("Timed out waiting to receive reply message")
}

// check that the message & reply was sent via semp
Eventually(func() int64 {
return helpers.GetClient(messagingService).DataTxMsgCount
}).Should(BeNumerically("==", int64(publishedRRMessages*2)+int64(publishedDirectMessages)))

Eventually(func() uint64 {
return messagingService.Metrics().GetValue(metrics.DirectMessagesReceived)
}).Should(BeNumerically("==", uint64(publishedRRMessages*2)+uint64(publishedDirectMessages))) // the messages

Expect(atomic.LoadUint64(&rRMessagesCount)).To(BeNumerically("==", publishedRRMessages))
Expect(atomic.LoadUint64(&directMessagesCount)).To(BeNumerically("==", publishedDirectMessages))
})
})
})

})
Expand Down

0 comments on commit 2f33dab

Please sign in to comment.