Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SOL-112355: Integration tests to cover the processing of requests and responses #44

Merged
merged 5 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions test/helpers/messaging_service_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,32 +341,34 @@ func PublishNRequestReplyMessages(messagingService solace.MessagingService, topi
}

// A handler for the request-reply publisher
ret := make(chan message.InboundMessage, n)
replyHandler := func(message message.InboundMessage, userContext interface{}, err error) {
replyChannel := make(chan message.InboundMessage)
replyHandler := func(inboundMessage message.InboundMessage, userContext interface{}, err error) {
go func() {
ret <- message
replyChannel <- inboundMessage
}()
}

publisher, err := messagingService.RequestReply().CreateRequestReplyMessagePublisherBuilder().OnBackPressureReject(0).Build()
ExpectWithOffset(1, err).ToNot(HaveOccurred(), "Expected request-reply publisher to build without error")
ExpectWithOffset(1, publisher.Start()).ToNot(HaveOccurred(), "Expected request-reply publisher to start without error")

builder := messagingService.MessageBuilder()

for i := 0; i < n; i++ {
msgPayload := str
if len(template) == 0 {
msgPayload = fmt.Sprintf(str, i)
}
msg, err := messagingService.MessageBuilder().BuildWithStringPayload(msgPayload)
msg, err := builder.BuildWithStringPayload(msgPayload)
ExpectWithOffset(1, err).ToNot(HaveOccurred(), "Expected message to build without error")

pubErr := publisher.Publish(msg, replyHandler, resource.TopicOf(topic), timeOut, config.MessagePropertyMap{
err = publisher.Publish(msg, replyHandler, resource.TopicOf(topic), timeOut, config.MessagePropertyMap{
config.MessagePropertyCorrelationID: fmt.Sprint(i),
}, nil /* usercontext */)
ExpectWithOffset(1, pubErr).ToNot(HaveOccurred(), "Expected publish to be successful")
ExpectWithOffset(1, err).ToNot(HaveOccurred(), "Expected publish to be successful")
}
ExpectWithOffset(1, publisher.Terminate(10*time.Second)).ToNot(HaveOccurred(), "Expected request-reply publisher to terminate gracefully")
return ret
return replyChannel
}

// ReceiveOneMessage function
Expand Down
332 changes: 267 additions & 65 deletions test/request_reply_message_receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package test
import (
"fmt"
"net/url"
"sync/atomic"
"time"

"solace.dev/go/messaging"
Expand Down Expand Up @@ -761,80 +762,281 @@ var _ = Describe("RequestReplyReceiver", func() {
Eventually(terminateChannel).Should(Receive(BeNil()))
})

It("should time out waiting for a message", func() {
done := make(chan struct{})
go func() {
defer GinkgoRecover()
msg, replier, err := receiver.ReceiveMessage(1 * time.Second)
Expect(msg).To(BeNil())
Expect(replier).To(BeNil())
helpers.ValidateError(err, &solace.TimeoutError{})
close(done)
}()
// We want to make sure that it does not close for at least 500ms
Consistently(done, 500*time.Millisecond).ShouldNot(BeClosed())
// we want it to be closed after 1 second though
Eventually(done).Should(BeClosed())
})
// SOL-112355 - the tests to cover request/reply processing using RequestReply Publisher/Receiver
Describe("for request/reply processing", func() {

It("should time out waiting for a message", func() {
done := make(chan struct{})
go func() {
defer GinkgoRecover()
msg, replier, err := receiver.ReceiveMessage(1 * time.Second)
Expect(msg).To(BeNil())
Expect(replier).To(BeNil())
helpers.ValidateError(err, &solace.TimeoutError{})
close(done)
}()
// We want to make sure that it does not close for at least 500ms
Consistently(done, 500*time.Millisecond).ShouldNot(BeClosed())
// we want it to be closed after 1 second
Eventually(done).Should(BeClosed())
})

It("should be able to continue message delivery when a receive async panics", func() {
msgReceived := make(chan message.InboundMessage)
receiver.ReceiveAsync(func(inboundMessage message.InboundMessage, replier solace.Replier) {
msgReceived <- inboundMessage
panic("everybody stay calm, this should still pass")
It("should be able to continue message delivery when a receive async panics", func() {
msgReceived := make(chan message.InboundMessage)
receiver.ReceiveAsync(func(inboundMessage message.InboundMessage, replier solace.Replier) {
msgReceived <- inboundMessage
panic("this should still pass even though this Panic occurred")
})
const payloadOne = "one"
const payloadTwo = "two"
helpers.PublishNRequestReplyMessages(messagingService, topicString, publishTimeOut, 1, payloadOne)

// we should receive a message
var msg message.InboundMessage
Eventually(msgReceived).Should(Receive(&msg))
payload, ok := msg.GetPayloadAsString()
Expect(ok).To(BeTrue())
Expect(payload).To(Equal(payloadOne))
helpers.PublishNRequestReplyMessages(messagingService, topicString, publishTimeOut, 1, payloadTwo)
// we should continue to receive messages
Eventually(msgReceived).Should(Receive(&msg))
payload, ok = msg.GetPayloadAsString()
Expect(ok).To(BeTrue())
Expect(payload).To(Equal(payloadTwo))
// we should be able to terminate
Expect(receiver.Terminate(10 * time.Second)).ToNot(HaveOccurred())
})
const payloadOne = "one"
const payloadTwo = "two"
helpers.PublishNRequestReplyMessages(messagingService, topicString, publishTimeOut, 1, payloadOne)

// we should receive a message
var msg message.InboundMessage
Eventually(msgReceived).Should(Receive(&msg))
payload, ok := msg.GetPayloadAsString()
Expect(ok).To(BeTrue())
Expect(payload).To(Equal(payloadOne))
helpers.PublishNRequestReplyMessages(messagingService, topicString, publishTimeOut, 1, payloadTwo)
// we should continue to receive messages
Eventually(msgReceived).Should(Receive(&msg))
payload, ok = msg.GetPayloadAsString()
Expect(ok).To(BeTrue())
Expect(payload).To(Equal(payloadTwo))
// we should be able to terminate
Expect(receiver.Terminate(10 * time.Second)).ToNot(HaveOccurred())
})

It("should wait indefinitely for a message", func() {
done := make(chan struct{})
go func() {
defer GinkgoRecover()
It("should wait indefinitely for a message", func() {
done := make(chan struct{})
go func() {
defer GinkgoRecover()
msg, replier, err := receiver.ReceiveMessage(-1)
Expect(err).ToNot(HaveOccurred())
Expect(msg).ToNot(BeNil())
Expect(replier).ToNot(BeNil())
time.Sleep(3 * time.Second)
close(done)
}()
Consistently(done, 2*time.Second).ShouldNot(BeClosed()) // less than receive function's sleep
helpers.PublishNRequestReplyMessages(messagingService, topicString, publishTimeOut, 1)
Eventually(done).Should(BeClosed())
})

It("should be interrupted while waiting for a message", func() {
done := make(chan struct{})
go func() {
defer GinkgoRecover()
msg, replier, err := receiver.ReceiveMessage(-1)
Expect(msg).To(BeNil())
Expect(replier).To(BeNil())
helpers.ValidateError(err, &solace.IllegalStateError{}, "terminated")
close(done)
}()
Consistently(done, 1*time.Second).ShouldNot(BeClosed())
terminateChannel := receiver.TerminateAsync(10 * time.Second)
Eventually(terminateChannel).Should(Receive(BeNil()))
Eventually(done).Should(BeClosed())
})

receiverFunctions := map[string](func(solace.RequestReplyMessageReceiver, int) chan message.InboundMessage){
"Receive": func(rrmr solace.RequestReplyMessageReceiver, count int) chan message.InboundMessage {
msgChannel := make(chan message.InboundMessage, count)
for i := 0; i < count; i++ {
go func() {
defer GinkgoRecover()
msg, replier, err := rrmr.ReceiveMessage(-1)
Expect(err).ToNot(HaveOccurred())
Expect(msg).ToNot(BeNil())
Expect(replier).ToNot(BeNil())
err = replier.Reply(helpers.NewMessage(messagingService, "Pong")) // send reply back
Expect(err).ToNot(HaveOccurred())
msgChannel <- msg // put into channel
}()
}
return msgChannel
},
"ReceiveAsync": func(rrmr solace.RequestReplyMessageReceiver, count int) chan message.InboundMessage {
msgChannel := make(chan message.InboundMessage, count)
rrmr.ReceiveAsync(func(msg message.InboundMessage, replier solace.Replier) {
go func() { // send reply in a new go routine
err := replier.Reply(helpers.NewMessage(messagingService, "Pong")) // send reply back
Expect(err).ToNot(HaveOccurred())
msgChannel <- msg // put into channel
}()
})
return msgChannel
},
}

for receiverFunctionName, receiverFunction := range receiverFunctions {
receiverFunc := receiverFunction

It("should be able to receive a message successfully with "+receiverFunctionName+"() function", func() {
// send message
var publishReplyChan chan message.InboundMessage
go func() {
defer GinkgoRecover()
publishReplyChan = helpers.PublishNRequestReplyMessages(messagingService, topicString, publishTimeOut, 1, "Ping")
}()

// receive the request message and send back a reply
receivedMsgChannel := receiverFunc(receiver, 1)

// check that the message & reply was sent via semp
client := helpers.GetClient(messagingService)
Expect(client.DataTxMsgCount).To(Equal(int64(2)))

select {
case receivedMessage := <-receivedMsgChannel:
Expect(receivedMessage).ToNot(BeNil())
content, ok := receivedMessage.GetPayloadAsString()
Expect(ok).To(BeTrue())
Expect(content).To(Equal("Ping"))

// for the reply message
var replyMessage message.InboundMessage
Eventually(publishReplyChan).Should(Receive(&replyMessage)) // something was published
Expect(replyMessage).ToNot(BeNil())
content, ok = replyMessage.GetPayloadAsString()
Expect(ok).To(BeTrue())
Expect(content).To(Equal("Pong"))
case <-time.After(gracePeriod):
Fail("Timed out waiting to receive message")
}

Expect(messagingService.Metrics().GetValue(metrics.DirectMessagesReceived)).To(Equal(uint64(2))) // send & reply
})

It("should be able to receive multiple messages successfully with "+receiverFunctionName+"() function", func() {
// send messages
sentMessage := 500
const publishTimeOut = 3 * time.Second

var err error
// we need to properly configure the receiver's buffer to handle > 50 published messages
receiver, err = builder.OnBackPressureDropLatest(uint(sentMessage)).Build(resource.TopicSubscriptionOf(topicString))
Expect(err).ToNot(HaveOccurred())
err = receiver.Start()
Expect(err).ToNot(HaveOccurred())

// publish the request messages
var publishReplyChan chan message.InboundMessage
go func() {
publishReplyChan = helpers.PublishNRequestReplyMessages(messagingService, topicString, publishTimeOut, sentMessage, "Ping")
}()

receivedMsgChannel := receiverFunc(receiver, sentMessage)
Eventually(receivedMsgChannel).Should(HaveLen(sentMessage)) // replies should be sent back
// message in the channel should be a request message
receivedMessage := <-receivedMsgChannel
content, ok := receivedMessage.GetPayloadAsString()
Expect(ok).To(BeTrue())
Expect(content).To(Equal("Ping"))

select {
// message in the reply channel should be a reply message
case replyMessage := <-publishReplyChan:
Expect(replyMessage).ToNot(BeNil())
content, ok := replyMessage.GetPayloadAsString()
Expect(ok).To(BeTrue())
Expect(content).To(Equal("Pong"))
case <-time.After(20 * time.Second):
Fail("Timed out waiting to receive reply message")
}

// check that the message & reply was sent via semp
client := helpers.GetClient(messagingService)
Expect(client.DataTxMsgCount).To(Equal(int64(sentMessage * 2))) // requests & replies
Expect(messagingService.Metrics().GetValue(metrics.DirectMessagesReceived)).To(Equal(uint64(sentMessage * 2))) // send & reply
})
}

It("should properly handle direct massages published to request-reply topic with the Receive() function", func() {
helpers.PublishOneMessage(messagingService, topicString, "Ping") // publish direct message
// block and wait for direct message
msg, replier, err := receiver.ReceiveMessage(-1)
Expect(err).ToNot(HaveOccurred()) // not expecting an error
Expect(msg).ToNot(BeNil()) // we should receive a message
Expect(replier).To(BeNil()) // but no replier since this should be a direct message

// publish the request messages on another thread to same topic
helpers.PublishNRequestReplyMessages(messagingService, topicString, publishTimeOut, 1, "Ping")

// block and wait for request message
msg, replier, err = receiver.ReceiveMessage(-1)
Expect(err).ToNot(HaveOccurred())
Expect(msg).ToNot(BeNil())
Expect(replier).ToNot(BeNil())
time.Sleep(3 * time.Second)
close(done)
}()
Consistently(done, 2*time.Second).ShouldNot(BeClosed()) // less than receive function's sleep
helpers.PublishNRequestReplyMessages(messagingService, topicString, publishTimeOut, 1)
Eventually(done).Should(BeClosed())
})
err = replier.Reply(helpers.NewMessage(messagingService, "Pong")) // send reply back
Expect(err).ToNot(HaveOccurred())

It("should be interrupted while waiting for a message", func() {
done := make(chan struct{})
go func() {
defer GinkgoRecover()
msg, replier, err := receiver.ReceiveMessage(-1)
Expect(msg).To(BeNil())
Expect(replier).To(BeNil())
helpers.ValidateError(err, &solace.IllegalStateError{}, "terminated")
close(done)
}()
Consistently(done, 1*time.Second).ShouldNot(BeClosed())
terminateChannel := receiver.TerminateAsync(10 * time.Second)
Eventually(terminateChannel).Should(Receive(BeNil()))
Eventually(done).Should(BeClosed())
})
// check that the message & reply was sent via semp
client := helpers.GetClient(messagingService)
Expect(client.DataTxMsgCount).To(Equal(int64(3))) // 1 direct, 1 request and 1 reply message // requests & replies
Expect(messagingService.Metrics().GetValue(metrics.DirectMessagesReceived)).To(Equal(uint64(3))) // 3 messages
})

It("should properly handle direct messages published to request-reply topic with the ReceiveAsync() function", func() {
rRMessagesCount := uint64(0)
directMessagesCount := uint64(0)

publishedRRMessages := 250
publishedDirectMessages := 250
receiverBackPressureBufferLength := publishedRRMessages + publishedDirectMessages

// create a receiver with an adequate buffer size
var err error
// we need to properly configure the receiver's buffer to handle > 50 published messages
receiver, err = builder.OnBackPressureDropLatest(uint(receiverBackPressureBufferLength)).Build(resource.TopicSubscriptionOf(topicString))
Expect(err).ToNot(HaveOccurred())
err = receiver.Start()
Expect(err).ToNot(HaveOccurred())

receiver.ReceiveAsync(func(inboundMessage message.InboundMessage, replier solace.Replier) {
Expect(inboundMessage).ToNot(BeNil()) // we should receive a message
if replier != nil {
// request-reply message received
err := replier.Reply(helpers.NewMessage(messagingService, "Pong")) // send reply back
Expect(err).ToNot(HaveOccurred())
atomic.AddUint64(&rRMessagesCount, 1) // increment
} else {
// direct message received
atomic.AddUint64(&directMessagesCount, 1) // increment
}
})

for i := 0; i < publishedDirectMessages; i++ {
helpers.PublishOneMessage(messagingService, topicString, "Ping") // publish direct message
}

// publish the request messages on another thread to same topic
publishChan := helpers.PublishNRequestReplyMessages(messagingService, topicString, publishTimeOut, publishedRRMessages, "Ping")

select {
// last message (any message) in the reply channel should be a reply message
case replyMessage := <-publishChan:
Expect(replyMessage).ToNot(BeNil())
content, ok := replyMessage.GetPayloadAsString()
Expect(ok).To(BeTrue())
Expect(content).To(Equal("Pong"))
case <-time.After(20 * time.Second):
Fail("Timed out waiting to receive reply message")
}

// check that the message & reply was sent via semp
Eventually(func() int64 {
return helpers.GetClient(messagingService).DataTxMsgCount
}).Should(BeNumerically("==", int64(publishedRRMessages*2)+int64(publishedDirectMessages)))

Eventually(func() uint64 {
return messagingService.Metrics().GetValue(metrics.DirectMessagesReceived)
}).Should(BeNumerically("==", uint64(publishedRRMessages*2)+uint64(publishedDirectMessages))) // the messages

Expect(atomic.LoadUint64(&rRMessagesCount)).To(BeNumerically("==", publishedRRMessages))
Expect(atomic.LoadUint64(&directMessagesCount)).To(BeNumerically("==", publishedDirectMessages))
})
})
})

})
Expand Down
Loading