Skip to content

Commit

Permalink
test[SOL-103572]: Golang: Add the integration tests to support the co…
Browse files Browse the repository at this point in the history
…ntext propagation feature
  • Loading branch information
oodigie committed Oct 10, 2023
1 parent 39e26f0 commit 5ed5619
Showing 1 changed file with 184 additions and 0 deletions.
184 changes: 184 additions & 0 deletions test/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package test

import (
"encoding/hex"
"fmt"
"reflect"
"time"
Expand Down Expand Up @@ -1935,4 +1936,187 @@ var _ = Describe("Remote Message Tests", func() {
})
})

Describe("Published and received message with Distributed Tracing support", func() {
var publisher solace.DirectMessagePublisher
var receiver solace.DirectMessageReceiver
var inboundMessageChannel chan message.InboundMessage

BeforeEach(func() {
var err error
err = messagingService.Connect()
Expect(err).ToNot(HaveOccurred())

publisher, err = messagingService.CreateDirectMessagePublisherBuilder().Build()
Expect(err).ToNot(HaveOccurred())
receiver, err = messagingService.CreateDirectMessageReceiverBuilder().WithSubscriptions(resource.TopicSubscriptionOf(topic)).Build()
Expect(err).ToNot(HaveOccurred())

err = publisher.Start()
Expect(err).ToNot(HaveOccurred())

inboundMessageChannel = make(chan message.InboundMessage)
receiver.ReceiveAsync(func(inboundMessage message.InboundMessage) {
inboundMessageChannel <- inboundMessage
})

err = receiver.Start()
Expect(err).ToNot(HaveOccurred())
})

AfterEach(func() {
var err error
err = publisher.Terminate(10 * time.Second)
Expect(err).ToNot(HaveOccurred())
err = receiver.Terminate(10 * time.Second)
Expect(err).ToNot(HaveOccurred())

err = messagingService.Disconnect()
Expect(err).ToNot(HaveOccurred())
})

It("should be able to publish/receive a message with no creation context", func() {
message, err := messageBuilder.Build() // no creation context is set on message
Expect(err).ToNot(HaveOccurred())

publisher.Publish(message, resource.TopicOf(topic))

select {
case message := <-inboundMessageChannel:
var creationCtxTraceID16 [16]byte
var creationCtxSpanID8 [8]byte
traceID, spanID, sampled, traceState, ok := message.GetCreationTraceContext()
Expect(ok).To(BeFalse())
Expect(traceID).To(Equal(creationCtxTraceID16)) // empty
Expect(spanID).To(Equal(creationCtxSpanID8)) // empty
Expect(sampled).To(BeFalse())
Expect(traceState).To(Equal(""))
case <-time.After(1 * time.Second):
Fail("timed out waiting for message to be delivered")
}
})

It("should be able to publish/receive a message with a valid creation context", func() {
message, err := messageBuilder.Build()
Expect(err).ToNot(HaveOccurred())

// set creation context on message
creationCtxTraceID, _ := hex.DecodeString("79f90916c9a3dad1eb4b328e00469e45")
creationCtxSpanID, _ := hex.DecodeString("3b364712c4e1f17f")
sampledValue := true
traceStateValue := "sometrace=Example"

var creationCtxTraceID16 [16]byte
var creationCtxSpanID8 [8]byte
copy(creationCtxTraceID16[:], creationCtxTraceID)
copy(creationCtxSpanID8[:], creationCtxSpanID)
ok := message.SetCreationTraceContext(creationCtxTraceID16, creationCtxSpanID8, sampledValue, traceStateValue)
Expect(ok).To(BeTrue())

publisher.Publish(message, resource.TopicOf(topic))

select {
case message := <-inboundMessageChannel:
traceID, spanID, sampled, traceState, ok := message.GetCreationTraceContext()
Expect(ok).To(BeFalse())
Expect(traceID).To(Equal(creationCtxTraceID16)) // should be equal
Expect(spanID).To(Equal(creationCtxSpanID8)) // should be equal
Expect(sampled).To(Equal(sampledValue))
Expect(traceState).To(Equal(traceStateValue))
case <-time.After(1 * time.Second):
Fail("timed out waiting for message to be delivered")
}
})

It("should be able to publish/receive a message with no tranport context", func() {
message, err := messageBuilder.Build() // no creation context is set on message
Expect(err).ToNot(HaveOccurred())

publisher.Publish(message, resource.TopicOf(topic))

select {
case message := <-inboundMessageChannel:
var transportCtxTraceID16 [16]byte
var transportCtxSpanID8 [8]byte
traceID, spanID, sampled, traceState, ok := message.GetTransportTraceContext()
Expect(ok).To(BeFalse())
Expect(traceID).To(Equal(transportCtxTraceID16)) // empty
Expect(spanID).To(Equal(transportCtxSpanID8)) // empty
Expect(sampled).To(BeFalse())
Expect(traceState).To(Equal(""))
case <-time.After(1 * time.Second):
Fail("timed out waiting for message to be delivered")
}
})

It("should be able to publish/receive a message with a valid transport context", func() {
message, err := messageBuilder.Build()
Expect(err).ToNot(HaveOccurred())

// set transport context on message
transportCtxTraceID, _ := hex.DecodeString("55d30916c9a3dad1eb4b328e00469e45")
transportCtxSpanID, _ := hex.DecodeString("a7164712c4e1f17f")

sampledValue := true
traceStateValue := "trace=Sample"

var transportCtxTraceID16 [16]byte
var transportCtxSpanID8 [8]byte
copy(transportCtxTraceID16[:], transportCtxTraceID)
copy(transportCtxSpanID8[:], transportCtxSpanID)
ok := message.SetTransportTraceContext(transportCtxTraceID16, transportCtxSpanID8, sampledValue, traceStateValue)
Expect(ok).To(BeTrue())

publisher.Publish(message, resource.TopicOf(topic))

select {
case message := <-inboundMessageChannel:
traceID, spanID, sampled, traceState, ok := message.GetCreationTraceContext()
Expect(ok).To(BeFalse())
Expect(traceID).To(Equal(transportCtxTraceID16)) // should be equal
Expect(spanID).To(Equal(transportCtxSpanID8)) // should be equal
Expect(sampled).To(Equal(sampledValue))
Expect(traceState).To(Equal(traceStateValue))
case <-time.After(1 * time.Second):
Fail("timed out waiting for message to be delivered")
}
})

It("should be able to publish/receive a message with no baggage", func() {
message, err := messageBuilder.Build()
message.SetBaggage("") // set empty baggage
Expect(err).ToNot(HaveOccurred())

publisher.Publish(message, resource.TopicOf(topic))

select {
case message := <-inboundMessageChannel:
baggage, ok := message.GetBaggage()
Expect(ok).To(BeFalse())
Expect(baggage).To(Equal(""))
case <-time.After(1 * time.Second):
Fail("timed out waiting for message to be delivered")
}
})

It("should be able to publish/receive a message with a valid baggage", func() {
baggage := "baggageKey=baggageValue"
message, err := messageBuilder.Build()
baggageErr := message.SetBaggage(baggage) // set a valid baggage string
Expect(err).ToNot(HaveOccurred())
Expect(baggageErr).ToNot(HaveOccurred())

publisher.Publish(message, resource.TopicOf(topic))

select {
case message := <-inboundMessageChannel:
receivedBaggage, ok := message.GetBaggage()
Expect(ok).To(BeTrue())
Expect(receivedBaggage).To(Equal(baggage))
case <-time.After(1 * time.Second):
Fail("timed out waiting for message to be delivered")
}
})

})

})

0 comments on commit 5ed5619

Please sign in to comment.