diff --git a/internal/ccsmp/ccsmp_helper.h b/internal/ccsmp/ccsmp_helper.h index e7a7373..9412a96 100644 --- a/internal/ccsmp/ccsmp_helper.h +++ b/internal/ccsmp/ccsmp_helper.h @@ -49,4 +49,4 @@ typedef struct solClient_errorInfo_wrapper void * uintptr_to_void_p(solClient_uint64_t ptr); -#endif \ No newline at end of file +#endif diff --git a/internal/ccsmp/ccsmp_message.go b/internal/ccsmp/ccsmp_message.go index 7bb5112..61dd6e8 100644 --- a/internal/ccsmp/ccsmp_message.go +++ b/internal/ccsmp/ccsmp_message.go @@ -538,8 +538,8 @@ func SolClientMessageSetTimeToLive(messageP SolClientMessagePt, timeToLive int64 // Utility functions -const defaultMsgDumpBufferSize = 1000 -const msgDumpMultiplier = 5 +// Set the default message dump buffer size to accommodate +// the Distributed Tracing properties in the message dump, SOL-107974 const maxDumpSize = 10000 // SolClientMessageDump function @@ -587,11 +587,9 @@ func SolClientMessageDump(messageP SolClientMessagePt) string { } } - bufferSize := C.ulong(defaultMsgDumpBufferSize + payloadSize*msgDumpMultiplier) // Truncate the message after 10,000 characters, SOL-62945 - if bufferSize > maxDumpSize { - bufferSize = maxDumpSize - } + // removed the dynamic calculation of buffer size as defaultMsgDumpBufferSize{1000} + (payloadSize * msgDumpMultiplier{5}) + bufferSize := C.ulong(maxDumpSize) buffer := (*C.char)(C.malloc(bufferSize)) defer C.free(unsafe.Pointer(buffer)) diff --git a/internal/ccsmp/ccsmp_message_tracing.go b/internal/ccsmp/ccsmp_message_tracing.go new file mode 100644 index 0000000..e4a732e --- /dev/null +++ b/internal/ccsmp/ccsmp_message_tracing.go @@ -0,0 +1,352 @@ +// pubsubplus-go-client +// +// Copyright 2021-2024 Solace Corporation. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package ccsmp + +/* +#include +#include + +#include "solclient/solClient.h" +#include "solclient/solClientMsg.h" +#include "solclient/solClientMsgTracingSupport.h" +*/ +import "C" +import ( + "fmt" + "unsafe" + + "solace.dev/go/messaging/internal/impl/logging" +) + +// Reexport of various CCSMP types + +// SolClientMessageTracingContextType is assigned a value +type SolClientMessageTracingContextType = C.solClient_msg_tracing_context_type_t + +// SolClientMessageTracingInjectionStandardType is assigned a value +type SolClientMessageTracingInjectionStandardType = C.solClient_msg_tracing_injection_standard_type_t + +// SolClientContextTypeTransportContext is assigned a value +const SolClientContextTypeTransportContext = C.TRANSPORT_CONTEXT + +// SolClientContextTypeCreationContext is assigned a value +const SolClientContextTypeCreationContext = C.CREATION_CONTEXT + +// SolClientMessageTracingInjectionStandardTypeSMF is assigned a value +const SolClientMessageTracingInjectionStandardTypeSMF = C.SOLCLIENT_INJECTION_STANDARD_SMF + +// SolClientMessageTracingInjectionStandardTypeW3C is assigned a value +const SolClientMessageTracingInjectionStandardTypeW3C = C.SOLCLIENT_INJECTION_STANDARD_W3C + +// TODO the calls to handleCcsmpError are slow since they lock the thread. +// Ideally, we wrap these calls in C such that the golang scheduler cannot +// interrupt us, and then there is no need to lock the thread. This should +// be done for all datapath functionality, ie. the contents of this file. + +// Distributed tracing properties + +// SolClientMessageGetTraceContextTraceID function +func SolClientMessageGetTraceContextTraceID(messageP SolClientMessagePt, contextType SolClientMessageTracingContextType) ([16]byte, *SolClientErrorInfoWrapper) { + // to hold the traceID property + var cChar C.solClient_uint8_t + + errorInfo := handleCcsmpError(func() SolClientReturnCode { + return C.solClient_msg_tracing_getTraceIdByte(messageP, contextType, &cChar, C.size_t(16)) + }) + if errorInfo != nil { + if errorInfo.ReturnCode == SolClientReturnCodeFail { + logging.Default.Warning( + fmt.Sprintf( + "Encountered error fetching Creation context traceID prop: %s, subcode: %d", + errorInfo.GetMessageAsString(), + errorInfo.SubCode)) + } + return [16]byte{}, errorInfo + } + + traceID := *(*[16]byte)(unsafe.Pointer(&cChar)) + return traceID, errorInfo +} + +// SolClientMessageSetTraceContextTraceID function +func SolClientMessageSetTraceContextTraceID(messageP SolClientMessagePt, traceID [16]byte, contextType SolClientMessageTracingContextType) *SolClientErrorInfoWrapper { + if len(traceID) > 0 { + cTraceID := (*C.solClient_uint8_t)(C.CBytes(traceID[:])) + + defer C.free(unsafe.Pointer(cTraceID)) // free the pointer after function executes + + errorInfo := handleCcsmpError(func() SolClientReturnCode { + return C.solClient_msg_tracing_setTraceIdByte(messageP, contextType, cTraceID, C.size_t(len(traceID))) + }) + return errorInfo + } + return nil +} + +// SolClientMessageGetTraceContextSpanID function +func SolClientMessageGetTraceContextSpanID(messageP SolClientMessagePt, contextType SolClientMessageTracingContextType) ([8]byte, *SolClientErrorInfoWrapper) { + // to hold the spanID property + var cChar C.solClient_uint8_t + + errorInfo := handleCcsmpError(func() SolClientReturnCode { + return C.solClient_msg_tracing_getSpanIdByte(messageP, contextType, &cChar, C.size_t(8)) + }) + if errorInfo != nil { + if errorInfo.ReturnCode == SolClientReturnCodeFail { + logging.Default.Warning( + fmt.Sprintf( + "Encountered error fetching Creation context spanID prop: %s, subcode: %d", + errorInfo.GetMessageAsString(), + errorInfo.SubCode)) + } + return [8]byte{}, errorInfo + } + + spanID := *(*[8]byte)(unsafe.Pointer(&cChar)) + return spanID, errorInfo +} + +// SolClientMessageSetTraceContextSpanID function +func SolClientMessageSetTraceContextSpanID(messageP SolClientMessagePt, spanID [8]byte, contextType SolClientMessageTracingContextType) *SolClientErrorInfoWrapper { + if len(spanID) > 0 { + cSpanID := (*C.solClient_uint8_t)(C.CBytes(spanID[:])) + + defer C.free(unsafe.Pointer(cSpanID)) // free the pointer after function executes + + errorInfo := handleCcsmpError(func() SolClientReturnCode { + return C.solClient_msg_tracing_setSpanIdByte(messageP, contextType, cSpanID, C.size_t(len(spanID))) + }) + return errorInfo + } + return nil +} + +// SolClientMessageGetTraceContextSampled function +func SolClientMessageGetTraceContextSampled(messageP SolClientMessagePt, contextType SolClientMessageTracingContextType) (bool, *SolClientErrorInfoWrapper) { + // to hold the Sampled property + var cSampled C.solClient_bool_t + + errorInfo := handleCcsmpError(func() SolClientReturnCode { + return C.solClient_msg_tracing_isSampled(messageP, contextType, &cSampled) + }) + if errorInfo != nil { + if errorInfo.ReturnCode == SolClientReturnCodeFail { + logging.Default.Warning( + fmt.Sprintf( + "Encountered error fetching Creation context sampled prop: %s, subcode: %d", + errorInfo.GetMessageAsString(), + errorInfo.SubCode)) + } + return false, errorInfo + } + + isSampled := *(*bool)(unsafe.Pointer(&cSampled)) + return isSampled, errorInfo +} + +// SolClientMessageSetTraceContextSampled function +func SolClientMessageSetTraceContextSampled(messageP SolClientMessagePt, sampled bool, contextType SolClientMessageTracingContextType) *SolClientErrorInfoWrapper { + var isSampled C.solClient_bool_t = 0 + if sampled { + isSampled = 1 + } + return handleCcsmpError(func() SolClientReturnCode { + return C.solClient_msg_tracing_setSampled(messageP, contextType, isSampled) + }) +} + +// SolClientMessageGetTraceContextTraceState function +func SolClientMessageGetTraceContextTraceState(messageP SolClientMessagePt, contextType SolClientMessageTracingContextType) (string, *SolClientErrorInfoWrapper) { + // to hold the trace state + var traceStateChar *C.char + var traceStateSize C.size_t + defer C.free(unsafe.Pointer(traceStateChar)) + + errorInfo := handleCcsmpError(func() SolClientReturnCode { + return C.solClient_msg_tracing_getTraceStatePtr(messageP, contextType, &traceStateChar, &traceStateSize) + }) + + if errorInfo != nil { + if errorInfo.ReturnCode == SolClientReturnCodeFail { + logging.Default.Warning( + fmt.Sprintf( + "Encountered error fetching Creation contex traceState prop: %s, subcode: %d", + errorInfo.GetMessageAsString(), + errorInfo.SubCode)) + } + return "", errorInfo + } + + return C.GoStringN(traceStateChar, C.int(traceStateSize)), errorInfo +} + +// SolClientMessageSetTraceContextTraceState function +func SolClientMessageSetTraceContextTraceState(messageP SolClientMessagePt, traceState string, contextType SolClientMessageTracingContextType) *SolClientErrorInfoWrapper { + cStr := C.CString(traceState) + // use the length of the traceState string not len(traceState) + 1 + traceStateLen := len(traceState) + defer C.free(unsafe.Pointer(cStr)) // free the pointer after function executes + + errorInfo := handleCcsmpError(func() SolClientReturnCode { + // trace state is not null terminal in SMF protocol + // write only char bytes not including null terminal, so use the length of the traceState string not len(traceState) + 1 + return C.solClient_msg_tracing_setTraceStatePtr(messageP, contextType, cStr, C.ulong(traceStateLen)) + }) + return errorInfo +} + +// For the Creation Context + +// SolClientMessageGetCreationTraceContextTraceID function +func SolClientMessageGetCreationTraceContextTraceID(messageP SolClientMessagePt) ([16]byte, *SolClientErrorInfoWrapper) { + // return the traceID property for the creation trace context + return SolClientMessageGetTraceContextTraceID(messageP, SolClientContextTypeCreationContext) +} + +// SolClientMessageSetCreationTraceContextTraceID function +func SolClientMessageSetCreationTraceContextTraceID(messageP SolClientMessagePt, traceID [16]byte) *SolClientErrorInfoWrapper { + // Sets the traceID property for the creation trace context + return SolClientMessageSetTraceContextTraceID(messageP, traceID, SolClientContextTypeCreationContext) +} + +// SolClientMessageGetCreationTraceContextSpanID function +func SolClientMessageGetCreationTraceContextSpanID(messageP SolClientMessagePt) ([8]byte, *SolClientErrorInfoWrapper) { + // return the spanID property for the creation trace context + return SolClientMessageGetTraceContextSpanID(messageP, SolClientContextTypeCreationContext) +} + +// SolClientMessageSetCreationTraceContextSpanID function +func SolClientMessageSetCreationTraceContextSpanID(messageP SolClientMessagePt, spanID [8]byte) *SolClientErrorInfoWrapper { + // Sets the spanID property for the creation trace context + return SolClientMessageSetTraceContextSpanID(messageP, spanID, SolClientContextTypeCreationContext) +} + +// SolClientMessageGetCreationTraceContextSampled function +func SolClientMessageGetCreationTraceContextSampled(messageP SolClientMessagePt) (bool, *SolClientErrorInfoWrapper) { + // return the Sampled property for the creation trace context + return SolClientMessageGetTraceContextSampled(messageP, SolClientContextTypeCreationContext) +} + +// SolClientMessageSetCreationTraceContextSampled function +func SolClientMessageSetCreationTraceContextSampled(messageP SolClientMessagePt, sampled bool) *SolClientErrorInfoWrapper { + // Sets the Sampled property for the creation trace context + return SolClientMessageSetTraceContextSampled(messageP, sampled, SolClientContextTypeCreationContext) +} + +// SolClientMessageGetCreationTraceContextTraceState function +func SolClientMessageGetCreationTraceContextTraceState(messageP SolClientMessagePt) (string, *SolClientErrorInfoWrapper) { + // return the trace state property for the creation trace context + return SolClientMessageGetTraceContextTraceState(messageP, SolClientContextTypeCreationContext) +} + +// SolClientMessageSetCreationTraceContextTraceState function +func SolClientMessageSetCreationTraceContextTraceState(messageP SolClientMessagePt, traceState string) *SolClientErrorInfoWrapper { + // Sets the trace state property for the creation trace context + return SolClientMessageSetTraceContextTraceState(messageP, traceState, SolClientContextTypeCreationContext) +} + +// For the Transport Context + +// SolClientMessageGetTransportTraceContextTraceID function +func SolClientMessageGetTransportTraceContextTraceID(messageP SolClientMessagePt) ([16]byte, *SolClientErrorInfoWrapper) { + // return the traceID property for the transport trace context + return SolClientMessageGetTraceContextTraceID(messageP, SolClientContextTypeTransportContext) +} + +// SolClientMessageSetTransportTraceContextTraceID function +func SolClientMessageSetTransportTraceContextTraceID(messageP SolClientMessagePt, traceID [16]byte) *SolClientErrorInfoWrapper { + // Sets the traceID property for the transport trace context + return SolClientMessageSetTraceContextTraceID(messageP, traceID, SolClientContextTypeTransportContext) +} + +// SolClientMessageGetTransportTraceContextSpanID function +func SolClientMessageGetTransportTraceContextSpanID(messageP SolClientMessagePt) ([8]byte, *SolClientErrorInfoWrapper) { + // return the spanID property for the transport trace context + return SolClientMessageGetTraceContextSpanID(messageP, SolClientContextTypeTransportContext) +} + +// SolClientMessageSetTransportTraceContextSpanID function +func SolClientMessageSetTransportTraceContextSpanID(messageP SolClientMessagePt, spanID [8]byte) *SolClientErrorInfoWrapper { + // Sets the spanID property for the transport trace context + return SolClientMessageSetTraceContextSpanID(messageP, spanID, SolClientContextTypeTransportContext) +} + +// SolClientMessageGetTransportTraceContextSampled function +func SolClientMessageGetTransportTraceContextSampled(messageP SolClientMessagePt) (bool, *SolClientErrorInfoWrapper) { + // return the Sampled property for the transport trace context + return SolClientMessageGetTraceContextSampled(messageP, SolClientContextTypeTransportContext) +} + +// SolClientMessageSetTransportTraceContextSampled function +func SolClientMessageSetTransportTraceContextSampled(messageP SolClientMessagePt, sampled bool) *SolClientErrorInfoWrapper { + // Sets the Sampled property for the transport trace context + return SolClientMessageSetTraceContextSampled(messageP, sampled, SolClientContextTypeTransportContext) +} + +// SolClientMessageGetTransportTraceContextTraceState function +func SolClientMessageGetTransportTraceContextTraceState(messageP SolClientMessagePt) (string, *SolClientErrorInfoWrapper) { + // return the trace state property for the transport trace context + return SolClientMessageGetTraceContextTraceState(messageP, SolClientContextTypeTransportContext) +} + +// SolClientMessageSetTransportTraceContextTraceState function +func SolClientMessageSetTransportTraceContextTraceState(messageP SolClientMessagePt, traceState string) *SolClientErrorInfoWrapper { + // Sets the trace state property for the transport trace context + return SolClientMessageSetTraceContextTraceState(messageP, traceState, SolClientContextTypeTransportContext) +} + +// For the Baggage + +// SolClientMessageGetBaggage function +func SolClientMessageGetBaggage(messageP SolClientMessagePt) (string, *SolClientErrorInfoWrapper) { + var baggageChar *C.char + var baggageSize C.size_t + errorInfo := handleCcsmpError(func() SolClientReturnCode { + return C.solClient_msg_tracing_getBaggagePtr(messageP, &baggageChar, &baggageSize) + }) + if errorInfo != nil { + if errorInfo.ReturnCode == SolClientReturnCodeFail { + logging.Default.Warning( + fmt.Sprintf( + "Encountered error fetching baggage: %s, subcode: %d", + errorInfo.GetMessageAsString(), + errorInfo.SubCode)) + } + return "", errorInfo + } + + // use baggageSize - 1 to exclude the null character at the end of the baggage string + return C.GoStringN(baggageChar, C.int(baggageSize)-1), errorInfo +} + +// SolClientMessageSetBaggage function +func SolClientMessageSetBaggage(messageP SolClientMessagePt, baggage string) *SolClientErrorInfoWrapper { + // if the baggage is empty, delete the baggage from the message pointer + if baggage == "" { + deleteErrorInfo := handleCcsmpError(func() SolClientReturnCode { + return C.solClient_msg_tracing_deleteBaggage(messageP) + }) + return deleteErrorInfo + } + // set the baggage if there is an actual baggage string + cStr := C.CString(baggage) + defer C.free(unsafe.Pointer(cStr)) // free the pointer after function executes + errorInfo := handleCcsmpError(func() SolClientReturnCode { + return C.solClient_msg_tracing_setBaggage(messageP, cStr) + }) + return errorInfo +} diff --git a/internal/ccsmp/lib/linux_arm64/libsolclient.a b/internal/ccsmp/lib/linux_arm64/libsolclient.a old mode 100755 new mode 100644 diff --git a/internal/impl/message/message_impl.go b/internal/impl/message/message_impl.go index 8db27ac..6ed5b73 100644 --- a/internal/impl/message/message_impl.go +++ b/internal/impl/message/message_impl.go @@ -312,6 +312,214 @@ func (message *MessageImpl) GetClassOfService() int { return classOfService } +// GetCreationTraceContext will return the trace context metadata used for distributed message tracing message +// creation context information across service boundaries. +// It allows correlating the producer with the consumers of a message, regardless of intermediary +// instrumentation. It must not be altered by intermediaries. +// If the content is not accessible, an empty slice will be returned and the ok flag will be false. +func (message *MessageImpl) GetCreationTraceContext() (traceID [16]byte, spanID [8]byte, sampled bool, traceState string, ok bool) { + var traceIDErr, spanIDErr, sampledErr, traceStateErr core.ErrorInfo + ok = true // will remain true if we are able to retrieve all the trace context properties + // get the creation trace context properties + traceID, traceIDErr = ccsmp.SolClientMessageGetCreationTraceContextTraceID(message.messagePointer) + if traceIDErr != nil { + ok = false + if traceIDErr.ReturnCode == ccsmp.SolClientReturnCodeFail { + logging.Default.Warning(fmt.Sprintf("Failed to retrieve Creation Context traceID property: "+traceIDErr.GetMessageAsString()+", sub code %d", traceIDErr.SubCode)) + } + } + + spanID, spanIDErr = ccsmp.SolClientMessageGetCreationTraceContextSpanID(message.messagePointer) + if spanIDErr != nil { + ok = false + if spanIDErr.ReturnCode == ccsmp.SolClientReturnCodeFail { + logging.Default.Warning(fmt.Sprintf("Failed to retrieve Creation Context spanID property: "+spanIDErr.GetMessageAsString()+", sub code %d", spanIDErr.SubCode)) + } + } + + sampled, sampledErr = ccsmp.SolClientMessageGetCreationTraceContextSampled(message.messagePointer) + if sampledErr != nil { + ok = false + if sampledErr.ReturnCode == ccsmp.SolClientReturnCodeFail { + logging.Default.Warning(fmt.Sprintf("Failed to retrieve Creation Context sampled property: "+sampledErr.GetMessageAsString()+", sub code %d", sampledErr.SubCode)) + } + } + + traceState, traceStateErr = ccsmp.SolClientMessageGetCreationTraceContextTraceState(message.messagePointer) + if traceStateErr != nil { + // if we got an actual error + if traceStateErr.ReturnCode == ccsmp.SolClientReturnCodeFail { + ok = false // set to false + logging.Default.Warning(fmt.Sprintf("Failed to retrieve Creation Context traceState property: "+traceStateErr.GetMessageAsString()+", sub code %d", traceStateErr.SubCode)) + } + } + + // add the null terminate character to the returned traceState value + return traceID, spanID, sampled, traceState, ok +} + +// SetCreationTraceContext will set creation trace context metadata used for distributed message tracing. +// Creation context considered to be immutable, and should not be set multiple times. +// If the content could not be set into the message, the ok flag will be false. +func (message *MessageImpl) SetCreationTraceContext(traceID [16]byte, spanID [8]byte, sampled bool, traceState *string) (ok bool) { + var traceIDErr, spanIDErr, sampledErr, traceStateErr core.ErrorInfo + ok = true // will remain true if we are able to set all the trace context properties + // set the creation trace context properties + traceIDErr = ccsmp.SolClientMessageSetCreationTraceContextTraceID(message.messagePointer, traceID) + if traceIDErr != nil { + ok = false + if traceIDErr.ReturnCode == ccsmp.SolClientReturnCodeFail { + logging.Default.Warning(fmt.Sprintf("Failed to set Creation Context traceID property: "+traceIDErr.GetMessageAsString()+", sub code %d", traceIDErr.SubCode)) + } + } + + spanIDErr = ccsmp.SolClientMessageSetCreationTraceContextSpanID(message.messagePointer, spanID) + if spanIDErr != nil { + ok = false + if spanIDErr.ReturnCode == ccsmp.SolClientReturnCodeFail { + logging.Default.Warning(fmt.Sprintf("Failed to set Creation Context spanID property: "+spanIDErr.GetMessageAsString()+", sub code %d", spanIDErr.SubCode)) + } + } + + sampledErr = ccsmp.SolClientMessageSetCreationTraceContextSampled(message.messagePointer, sampled) + if sampledErr != nil { + ok = false + if sampledErr.ReturnCode == ccsmp.SolClientReturnCodeFail { + logging.Default.Warning(fmt.Sprintf("Failed to set Creation Context sampled property: "+sampledErr.GetMessageAsString()+", sub code %d", sampledErr.SubCode)) + } + } + + if traceState != nil { + traceStateErr = ccsmp.SolClientMessageSetCreationTraceContextTraceState(message.messagePointer, *traceState) + if traceStateErr != nil { + ok = false + if traceStateErr.ReturnCode == ccsmp.SolClientReturnCodeFail { + logging.Default.Warning(fmt.Sprintf("Failed to set Creation Context traceState property: "+traceStateErr.GetMessageAsString()+", sub code %d", traceStateErr.SubCode)) + } + } + } + + return ok +} + +// GetTransportTraceContext will return the trace context metadata used for distributed message tracing +// It allows correlating the producer and the consumer with an intermediary. +// It also allows correlating multiple intermediaries among each other. +// When no transport context is present it may return a creation context when available as +// an initial transport context. +// If the content is not accessible, an empty slice will be returned and the ok flag will be false. +func (message *MessageImpl) GetTransportTraceContext() (traceID [16]byte, spanID [8]byte, sampled bool, traceState string, ok bool) { + var traceIDErr, spanIDErr, sampledErr, traceStateErr core.ErrorInfo + ok = true // will remain true if we are able to retrieve all the trace context properties + // get the transport trace context properties + traceID, traceIDErr = ccsmp.SolClientMessageGetTransportTraceContextTraceID(message.messagePointer) + if traceIDErr != nil { + ok = false + if traceIDErr.ReturnCode == ccsmp.SolClientReturnCodeFail { + logging.Default.Warning(fmt.Sprintf("Failed to retrieve Transport Context traceID property: "+traceIDErr.GetMessageAsString()+", sub code %d", traceIDErr.SubCode)) + } + } + + spanID, spanIDErr = ccsmp.SolClientMessageGetTransportTraceContextSpanID(message.messagePointer) + if spanIDErr != nil { + ok = false + if spanIDErr.ReturnCode == ccsmp.SolClientReturnCodeFail { + logging.Default.Warning(fmt.Sprintf("Failed to retrieve Transport Context spanID property: "+spanIDErr.GetMessageAsString()+", sub code %d", spanIDErr.SubCode)) + } + } + + sampled, sampledErr = ccsmp.SolClientMessageGetTransportTraceContextSampled(message.messagePointer) + if sampledErr != nil { + ok = false + if sampledErr.ReturnCode == ccsmp.SolClientReturnCodeFail { + logging.Default.Warning(fmt.Sprintf("Failed to retrieve Transport Context sampled property: "+sampledErr.GetMessageAsString()+", sub code %d", sampledErr.SubCode)) + } + } + + traceState, traceStateErr = ccsmp.SolClientMessageGetTransportTraceContextTraceState(message.messagePointer) + if traceStateErr != nil { + // if we got an actual error + if traceStateErr.ReturnCode == ccsmp.SolClientReturnCodeFail { + ok = false // set to false + logging.Default.Warning(fmt.Sprintf("Failed to retrieve Transport Context traceState property: "+traceStateErr.GetMessageAsString()+", sub code %d", traceStateErr.SubCode)) + } + } + + // add the null terminate character to the returned traceState value + return traceID, spanID, sampled, traceState, ok +} + +// SetTransportTraceContext will set transport trace context metadata used for distributed message tracing. +// If the content could not be set into the message, the ok flag will be false. +func (message *MessageImpl) SetTransportTraceContext(traceID [16]byte, spanID [8]byte, sampled bool, traceState *string) (ok bool) { + var traceIDErr, spanIDErr, sampledErr, traceStateErr core.ErrorInfo + ok = true // will remain true if we are able to set all the trace context properties + // set the transport trace context properties + traceIDErr = ccsmp.SolClientMessageSetTransportTraceContextTraceID(message.messagePointer, traceID) + if traceIDErr != nil { + ok = false + if traceIDErr.ReturnCode == ccsmp.SolClientReturnCodeFail { + logging.Default.Warning(fmt.Sprintf("Failed to set Transport Context traceID property: "+traceIDErr.GetMessageAsString()+", sub code %d", traceIDErr.SubCode)) + } + } + + spanIDErr = ccsmp.SolClientMessageSetTransportTraceContextSpanID(message.messagePointer, spanID) + if spanIDErr != nil { + ok = false + if spanIDErr.ReturnCode == ccsmp.SolClientReturnCodeFail { + logging.Default.Warning(fmt.Sprintf("Failed to set Transport Context spanID property: "+spanIDErr.GetMessageAsString()+", sub code %d", spanIDErr.SubCode)) + } + } + + sampledErr = ccsmp.SolClientMessageSetTransportTraceContextSampled(message.messagePointer, sampled) + if sampledErr != nil { + ok = false + if sampledErr.ReturnCode == ccsmp.SolClientReturnCodeFail { + logging.Default.Warning(fmt.Sprintf("Failed to set Transport Context sampled property: "+sampledErr.GetMessageAsString()+", sub code %d", sampledErr.SubCode)) + } + } + + if traceState != nil { + traceStateErr = ccsmp.SolClientMessageSetTransportTraceContextTraceState(message.messagePointer, string(*traceState)) + if traceStateErr != nil { + ok = false + if traceStateErr.ReturnCode == ccsmp.SolClientReturnCodeFail { + logging.Default.Warning(fmt.Sprintf("Failed to set Transport Context traceState property: "+traceStateErr.GetMessageAsString()+", sub code %d", traceStateErr.SubCode)) + } + } + } + + return ok +} + +// GetBaggage will return the baggage string associated with the message +// It is expected that string is UTF8 encoded. +// If the content is not accessible, an empty slice will +// be returned and the ok flag will be false. +func (message *MessageImpl) GetBaggage() (baggage string, ok bool) { + var err core.ErrorInfo + baggage, err = ccsmp.SolClientMessageGetBaggage(message.messagePointer) + ok = true // return true if value ir accessiable + if err != nil { + if err.ReturnCode == ccsmp.SolClientReturnCodeFail { + ok = false // return false, only when cannot access the data + logging.Default.Warning(fmt.Sprintf("Failed to retrieve Baggage: "+err.GetMessageAsString()+", sub code %d", err.SubCode)) + } + } + return baggage, ok +} + +// SetBaggage will set the baggage string associated with the message +// It is expected that string is UTF8 encoded. +func (message *MessageImpl) SetBaggage(baggage string) error { + err := ccsmp.SolClientMessageSetBaggage(message.messagePointer, baggage) + if err != nil { + logging.Default.Warning(fmt.Sprintf("Failed to set the Baggage: "+err.GetMessageAsString()+", sub code %d", err.SubCode)) + return core.ToNativeError(err, "error setting baggage: ") + } + return nil +} + func (message *MessageImpl) String() string { return ccsmp.SolClientMessageDump(message.messagePointer) } diff --git a/internal/impl/message/message_test.go b/internal/impl/message/message_test.go index 1fab989..cd62cf8 100644 --- a/internal/impl/message/message_test.go +++ b/internal/impl/message/message_test.go @@ -17,6 +17,8 @@ package message import ( + "encoding/hex" + "strings" "testing" "solace.dev/go/messaging/internal/ccsmp" @@ -62,3 +64,324 @@ func TestInboundMessageFree(t *testing.T) { t.Error("expected MessagePointer to be freed and set to nil, it was not") } } + +// Distributed Tracing + +func TestSetCreationTraceContext(t *testing.T) { + // the creation context value to test + creationCtxTraceID, _ := hex.DecodeString("79f90916c9a3dad1eb4b328e00469e45") + creationCtxSpanID, _ := hex.DecodeString("3b364712c4e1f17f") + sampledValue := true + traceStateValue := "sometrace=Example" + emptyTraceStateValue := "" + + var creationCtxTraceID16, emptyCreationCtxTraceID16 [16]byte + var creationCtxSpanID8, emptyCreationCtxSpanID8 [8]byte + copy(creationCtxTraceID16[:], creationCtxTraceID) + copy(creationCtxSpanID8[:], creationCtxSpanID) + + msgP, ccsmpErr := ccsmp.SolClientMessageAlloc() + if ccsmpErr != nil { + t.Error("did not expect error, got " + ccsmpErr.GetMessageAsString()) + } + msg := NewInboundMessage(msgP, false) + if msg.messagePointer == nil { + t.Error("expected message pointer to not be nil") + } + if msg.IsDisposed() { + t.Error("message is disposed before disposed called") + } + + // test setting the creation context value + ok := msg.SetCreationTraceContext(creationCtxTraceID16, creationCtxSpanID8, sampledValue, &traceStateValue) + if !ok { + t.Error("expected SetCreationTraceContext() function for valid values to succeed and return true") + } + + // test setting the creation context value - empty TraceID + okEmptyTraceID := msg.SetCreationTraceContext(emptyCreationCtxTraceID16, creationCtxSpanID8, sampledValue, &traceStateValue) + if !okEmptyTraceID { + t.Error("expected SetCreationTraceContext() function for empty TraceID to succeed and return true") + } + + // test setting the creation context value - empty SpanID + okEmptySpanID := msg.SetCreationTraceContext(creationCtxTraceID16, emptyCreationCtxSpanID8, sampledValue, &traceStateValue) + if !okEmptySpanID { + t.Error("expected SetCreationTraceContext() function for empty SpanID to succeed and return true") + } + + // test setting the creation context value - empty trace state + okEmptyTraceState := msg.SetCreationTraceContext(creationCtxTraceID16, creationCtxSpanID8, sampledValue, &emptyTraceStateValue) + if !okEmptyTraceState { + t.Error("expected SetCreationTraceContext() function for empty traceState to succeed and return true") + } + + // test setting the creation context value - nil trace state + okNilTraceState := msg.SetCreationTraceContext(creationCtxTraceID16, creationCtxSpanID8, sampledValue, nil) + if !okNilTraceState { + t.Error("expected SetCreationTraceContext() function for Nil traceState to succeed and return true") + } + + msg.Dispose() + if !msg.IsDisposed() { + t.Error("IsDisposed returned false, expected true") + } +} + +func TestGetCreationTraceContext(t *testing.T) { + // the creation context value to test + creationCtxTraceID, _ := hex.DecodeString("79f90916c9a3dad1eb4b328e00469e45") + creationCtxSpanID, _ := hex.DecodeString("3b364712c4e1f17f") + sampledValue := true + traceStateValue := "sometrace=Example" + + var creationCtxTraceID16 [16]byte + var creationCtxSpanID8 [8]byte + copy(creationCtxTraceID16[:], creationCtxTraceID) + copy(creationCtxSpanID8[:], creationCtxSpanID) + + msgP, ccsmpErr := ccsmp.SolClientMessageAlloc() + if ccsmpErr != nil { + t.Error("did not expect error, got " + ccsmpErr.GetMessageAsString()) + } + msg := NewInboundMessage(msgP, false) + if msg.messagePointer == nil { + t.Error("expected message pointer to not be nil") + } + if msg.IsDisposed() { + t.Error("message is disposed before disposed called") + } + + // test setting the creation context value + setValuesOk := msg.SetCreationTraceContext(creationCtxTraceID16, creationCtxSpanID8, sampledValue, &traceStateValue) + if !setValuesOk { + t.Error("expected SetCreationTraceContext() function for valid values to succeed and return true") + } + + // get the creation context values + traceID, spanID, sampled, traceState, ok := msg.GetCreationTraceContext() + if !ok { + t.Error("expected GetCreationTraceContext() function to return creation context values and return true") + } + + // test traceID equality + if traceID != creationCtxTraceID16 { + t.Error("expected GetCreationTraceContext() traceID from message should be the same as what was set in message") + } + + // test spanID equality + if spanID != creationCtxSpanID8 { + t.Error("expected GetCreationTraceContext() spanID from message should be the same as what was set in message") + } + + // test traceID equality + if sampled != sampledValue { + t.Error("expected GetCreationTraceContext() sampled value from message should be the same as what was set in message") + } + + // test traceState equality + if strings.Compare(traceState, traceStateValue) != 0 { + t.Error("expected GetCreationTraceContext() traceState from message should be the same as what was set in message") + } + + msg.Dispose() + if !msg.IsDisposed() { + t.Error("IsDisposed returned false, expected true") + } +} + +func TestSetTransportTraceContext(t *testing.T) { + // the transport context value to test + transportCtxTraceID, _ := hex.DecodeString("55d30916c9a3dad1eb4b328e00469e45") + transportCtxSpanID, _ := hex.DecodeString("a7164712c4e1f17f") + sampledValue := true + traceStateValue := "sometrace=Example" + emptyTraceStateValue := "" + + var transportCtxTraceID16, emptyTransportCtxTraceID16 [16]byte + var transportCtxSpanID8, emptyTransportCtxSpanID8 [8]byte + copy(transportCtxTraceID16[:], transportCtxTraceID) + copy(transportCtxSpanID8[:], transportCtxSpanID) + + msgP, ccsmpErr := ccsmp.SolClientMessageAlloc() + if ccsmpErr != nil { + t.Error("did not expect error, got " + ccsmpErr.GetMessageAsString()) + } + msg := NewInboundMessage(msgP, false) + if msg.messagePointer == nil { + t.Error("expected message pointer to not be nil") + } + if msg.IsDisposed() { + t.Error("message is disposed before disposed called") + } + + // test setting the transport context value + ok := msg.SetTransportTraceContext(transportCtxTraceID16, transportCtxSpanID8, sampledValue, &traceStateValue) + if !ok { + t.Error("expected SetTransportTraceContext() function for valid values to succeed and return true") + } + + // test setting the transport context value - empty TraceID + okEmptyTraceID := msg.SetTransportTraceContext(emptyTransportCtxTraceID16, transportCtxSpanID8, sampledValue, &traceStateValue) + if !okEmptyTraceID { + t.Error("expected SetTransportTraceContext() function for empty TraceID to succeed and return true") + } + + // test setting the transport context value - empty SpanID + okEmptySpanID := msg.SetTransportTraceContext(transportCtxTraceID16, emptyTransportCtxSpanID8, sampledValue, &traceStateValue) + if !okEmptySpanID { + t.Error("expected SetTransportTraceContext() function for empty SpanID to succeed and return true") + } + + // test setting the transport context value - empty trace state + okEmptyTraceState := msg.SetTransportTraceContext(transportCtxTraceID16, transportCtxSpanID8, sampledValue, &emptyTraceStateValue) + if !okEmptyTraceState { + t.Error("expected SetTransportTraceContext() function for empty traceState to succeed and return true") + } + + // test setting the transport context value - nil trace state + okNilTraceState := msg.SetTransportTraceContext(transportCtxTraceID16, transportCtxSpanID8, sampledValue, nil) + if !okNilTraceState { + t.Error("expected SetTransportTraceContext() function for nil traceState to succeed and return true") + } + + msg.Dispose() + if !msg.IsDisposed() { + t.Error("IsDisposed returned false, expected true") + } +} + +func TestGetTransportTraceContext(t *testing.T) { + // the transport context value to test + transportCtxTraceID, _ := hex.DecodeString("55d30916c9a3dad1eb4b328e00469e45") + transportCtxSpanID, _ := hex.DecodeString("a7164712c4e1f17f") + sampledValue := true + traceStateValue := "sometrace=Example" + + var transportCtxTraceID16 [16]byte + var transportCtxSpanID8 [8]byte + copy(transportCtxTraceID16[:], transportCtxTraceID) + copy(transportCtxSpanID8[:], transportCtxSpanID) + + msgP, ccsmpErr := ccsmp.SolClientMessageAlloc() + if ccsmpErr != nil { + t.Error("did not expect error, got " + ccsmpErr.GetMessageAsString()) + } + msg := NewInboundMessage(msgP, false) + if msg.messagePointer == nil { + t.Error("expected message pointer to not be nil") + } + if msg.IsDisposed() { + t.Error("message is disposed before disposed called") + } + + // test setting the transport context value + msg.SetCreationTraceContext(transportCtxTraceID16, transportCtxSpanID8, false, nil) // have to set creation context to prevent undesired behaviour + setValuesOk := msg.SetTransportTraceContext(transportCtxTraceID16, transportCtxSpanID8, sampledValue, &traceStateValue) + if !setValuesOk { + t.Error("expected SetTransportTraceContext() function for valid values to succeed and return true") + } + + // get the transport context values + traceID, spanID, sampled, traceState, ok := msg.GetTransportTraceContext() + if !ok { + t.Error("expected GetTransportTraceContext() function to return transport context values and return true") + } + + // test traceID equality + if traceID != transportCtxTraceID16 { + t.Error("expected GetTransportTraceContext() traceID from message should be the same as what was set in message") + } + + // test spanID equality + if spanID != transportCtxSpanID8 { + t.Error("expected GetTransportTraceContext() spanID from message should be the same as what was set in message") + } + + // test traceID equality + if sampled != sampledValue { + t.Error("expected GetTransportTraceContext() sampled value from message should be the same as what was set in message") + } + + // test traceState equality + if strings.Compare(traceState, traceStateValue) != 0 { + t.Error("expected GetTransportTraceContext() traceState from message should be the same as what was set in message") + } + + msg.Dispose() + if !msg.IsDisposed() { + t.Error("IsDisposed returned false, expected true") + } +} + +func TestSetBaggage(t *testing.T) { + // the baggage value to test + baggageValue := "baggage=value1;example=value2" + + msgP, ccsmpErr := ccsmp.SolClientMessageAlloc() + if ccsmpErr != nil { + t.Error("did not expect error, got " + ccsmpErr.GetMessageAsString()) + } + msg := NewInboundMessage(msgP, false) + if msg.messagePointer == nil { + t.Error("expected message pointer to not be nil") + } + if msg.IsDisposed() { + t.Error("message is disposed before disposed called") + } + + // test setting a valid baggage value + err := msg.SetBaggage(baggageValue) + if err != nil { + t.Error(err) + } + + // test setting empty baggage + emptyCaseErr := msg.SetBaggage("") + if emptyCaseErr != nil { + t.Error(emptyCaseErr) + } + + msg.Dispose() + if !msg.IsDisposed() { + t.Error("IsDisposed returned false, expected true") + } +} + +func TestGetBaggage(t *testing.T) { + // the baggage value to test + baggageValue := "baggage=value1;example=value2" + + msgP, ccsmpErr := ccsmp.SolClientMessageAlloc() + if ccsmpErr != nil { + t.Error("did not expect error, got " + ccsmpErr.GetMessageAsString()) + } + msg := NewInboundMessage(msgP, false) + if msg.messagePointer == nil { + t.Error("expected message pointer to not be nil") + } + if msg.IsDisposed() { + t.Error("message is disposed before disposed called") + } + // should not throw any errors + err := msg.SetBaggage(baggageValue) + if err != nil { + t.Error(err) + } + + baggage, ok := msg.GetBaggage() + if !ok { + t.Error("expected GetBaggage() function to return baggage and true") + } + if baggage == "" { + t.Error("expected baggage not to be an empty string") + } + if baggage != baggageValue { + t.Error("expected baggage from message should be the same baggage set on the message") + } + + msg.Dispose() + if !msg.IsDisposed() { + t.Error("IsDisposed returned false, expected true") + } +} diff --git a/test/message_test.go b/test/message_test.go index ab63659..df55783 100644 --- a/test/message_test.go +++ b/test/message_test.go @@ -17,6 +17,7 @@ package test import ( + "encoding/hex" "fmt" "reflect" "time" @@ -80,6 +81,54 @@ var fromConfigProviderInvalidTestCases = map[config.MessageProperty]interface{}{ config.MessagePropertySenderID: dummyStruct{}, } +// InboundMessageWithTracingSupport represents a message received by a consumer. +type InboundMessageWithTracingSupport interface { + // Extend the InboundMessage interface. + message.InboundMessage + + // GetCreationTraceContext will return the trace context metadata used for distributed message tracing message + GetCreationTraceContext() (traceID [16]byte, spanID [8]byte, sampled bool, traceState string, ok bool) + + // SetTraceContext will set creation trace context metadata used for distributed message tracing. + SetCreationTraceContext(traceID [16]byte, spanID [8]byte, sampled bool, traceState *string) (ok bool) + + // GetTransportTraceContext will return the trace context metadata used for distributed message tracing + GetTransportTraceContext() (traceID [16]byte, spanID [8]byte, sampled bool, traceState string, ok bool) + + // SetTraceContext will set transport trace context metadata used for distributed message tracing. + SetTransportTraceContext(traceID [16]byte, spanID [8]byte, sampled bool, traceState *string) (ok bool) + + // GetBaggage will return the baggage string associated with the message + GetBaggage() (baggage string, ok bool) + + // SetBaggage will set the baggage string associated with the message + SetBaggage(baggage string) error +} + +// OutboundMessageWithTracingSupport represents a message received by a consumer. +type OutboundMessageWithTracingSupport interface { + // Extend the OutboundMessage interface. + message.OutboundMessage + + // GetCreationTraceContext will return the trace context metadata used for distributed message tracing message + GetCreationTraceContext() (traceID [16]byte, spanID [8]byte, sampled bool, traceState string, ok bool) + + // SetTraceContext will set creation trace context metadata used for distributed message tracing. + SetCreationTraceContext(traceID [16]byte, spanID [8]byte, sampled bool, traceState *string) (ok bool) + + // GetTransportTraceContext will return the trace context metadata used for distributed message tracing + GetTransportTraceContext() (traceID [16]byte, spanID [8]byte, sampled bool, traceState string, ok bool) + + // SetTraceContext will set transport trace context metadata used for distributed message tracing. + SetTransportTraceContext(traceID [16]byte, spanID [8]byte, sampled bool, traceState *string) (ok bool) + + // GetBaggage will return the baggage string associated with the message + GetBaggage() (baggage string, ok bool) + + // SetBaggage will set the baggage string associated with the message + SetBaggage(baggage string) error +} + var _ = Describe("Local MessageBuilder Tests", func() { var messageBuilder solace.OutboundMessageBuilder @@ -1935,4 +1984,371 @@ var _ = Describe("Remote Message Tests", func() { }) }) + Describe("Published and received message with Distributed Tracing support", func() { + var publisher solace.DirectMessagePublisher + var receiver solace.DirectMessageReceiver + var inboundMessageChannel chan InboundMessageWithTracingSupport + + BeforeEach(func() { + var err error + err = messagingService.Connect() + Expect(err).ToNot(HaveOccurred()) + + publisher, err = messagingService.CreateDirectMessagePublisherBuilder().Build() + Expect(err).ToNot(HaveOccurred()) + receiver, err = messagingService.CreateDirectMessageReceiverBuilder().WithSubscriptions(resource.TopicSubscriptionOf(topic)).Build() + Expect(err).ToNot(HaveOccurred()) + + err = publisher.Start() + Expect(err).ToNot(HaveOccurred()) + + inboundMessageChannel = make(chan InboundMessageWithTracingSupport) + receiver.ReceiveAsync(func(inboundMessage message.InboundMessage) { + inboundMessageChannel <- inboundMessage.(InboundMessageWithTracingSupport) + }) + + err = receiver.Start() + Expect(err).ToNot(HaveOccurred()) + }) + + AfterEach(func() { + var err error + err = publisher.Terminate(10 * time.Second) + Expect(err).ToNot(HaveOccurred()) + err = receiver.Terminate(10 * time.Second) + Expect(err).ToNot(HaveOccurred()) + + err = messagingService.Disconnect() + Expect(err).ToNot(HaveOccurred()) + }) + + It("should be able to publish/receive a message with no creation context", func() { + message, err := messageBuilder.Build() // no creation context is set on message + Expect(err).ToNot(HaveOccurred()) + + publisher.Publish(message, resource.TopicOf(topic)) + + select { + case message := <-inboundMessageChannel: + traceID, spanID, sampled, traceState, ok := message.GetCreationTraceContext() + Expect(ok).To(BeFalse()) + Expect(traceID).To(Equal([16]byte{})) // empty + Expect(spanID).To(Equal([8]byte{})) // empty + Expect(sampled).To(BeFalse()) + Expect(traceState).To(Equal("")) + case <-time.After(1 * time.Second): + Fail("timed out waiting for message to be delivered") + } + }) + + It("should be able to publish/receive a message with a valid creation context", func() { + message, err := messageBuilder.Build() + Expect(err).ToNot(HaveOccurred()) + + // cast the message to the extended interface that has message tracing support + messageWithDT := message.(OutboundMessageWithTracingSupport) + + // set creation context on message + creationCtxTraceID, _ := hex.DecodeString("79f90916c9a3dad1eb4b328e00469e45") + creationCtxSpanID, _ := hex.DecodeString("3b364712c4e1f17f") + sampledValue := true + traceStateValue := "sometrace1=Example1" + + var creationCtxTraceID16 [16]byte + var creationCtxSpanID8 [8]byte + copy(creationCtxTraceID16[:], creationCtxTraceID) + copy(creationCtxSpanID8[:], creationCtxSpanID) + ok := messageWithDT.SetCreationTraceContext(creationCtxTraceID16, creationCtxSpanID8, sampledValue, &traceStateValue) + Expect(ok).To(BeTrue()) + + publisher.Publish(messageWithDT, resource.TopicOf(topic)) + + select { + case message := <-inboundMessageChannel: + traceID, spanID, sampled, traceState, ok := message.GetCreationTraceContext() + Expect(ok).To(BeTrue()) + Expect(traceID).To(Equal(creationCtxTraceID16)) // should be equal + Expect(spanID).To(Equal(creationCtxSpanID8)) // should be equal + Expect(sampled).To(Equal(sampledValue)) + Expect(traceState).To(Equal(traceStateValue)) + case <-time.After(1 * time.Second): + Fail("timed out waiting for message to be delivered") + } + }) + + It("should be able to publish/receive a message with a valid creation context without trace state", func() { + message, err := messageBuilder.Build() + Expect(err).ToNot(HaveOccurred()) + + // cast the message to the extended interface that has message tracing support + messageWithDT := message.(OutboundMessageWithTracingSupport) + + // set creation context on message + creationCtxTraceID, _ := hex.DecodeString("79f90916c9a3dad1eb4b328e00469e45") + creationCtxSpanID, _ := hex.DecodeString("3b364712c4e1f17f") + sampledValue := true + + var creationCtxTraceID16 [16]byte + var creationCtxSpanID8 [8]byte + copy(creationCtxTraceID16[:], creationCtxTraceID) + copy(creationCtxSpanID8[:], creationCtxSpanID) + ok := messageWithDT.SetCreationTraceContext(creationCtxTraceID16, creationCtxSpanID8, sampledValue, nil) // no trace state + Expect(ok).To(BeTrue()) + + publisher.Publish(messageWithDT, resource.TopicOf(topic)) + + select { + case message := <-inboundMessageChannel: + traceID, spanID, sampled, traceState, ok := message.GetCreationTraceContext() + Expect(ok).To(BeTrue()) + Expect(traceID).To(Equal(creationCtxTraceID16)) // should be equal + Expect(spanID).To(Equal(creationCtxSpanID8)) // should be equal + Expect(sampled).To(Equal(sampledValue)) + Expect(traceState).To(Equal("")) // should be empty + case <-time.After(1 * time.Second): + Fail("timed out waiting for message to be delivered") + } + }) + + It("should be able to publish/receive a message with no tranport context", func() { + message, err := messageBuilder.Build() // no creation context is set on message + Expect(err).ToNot(HaveOccurred()) + + publisher.Publish(message, resource.TopicOf(topic)) + + select { + case message := <-inboundMessageChannel: + traceID, spanID, sampled, traceState, ok := message.GetTransportTraceContext() + Expect(ok).To(BeFalse()) + Expect(traceID).To(Equal([16]byte{})) // empty + Expect(spanID).To(Equal([8]byte{})) // empty + Expect(sampled).To(BeFalse()) + Expect(traceState).To(Equal("")) + case <-time.After(1 * time.Second): + Fail("timed out waiting for message to be delivered") + } + }) + + It("should be able to publish/receive a message with a valid transport context", func() { + message, err := messageBuilder.Build() + Expect(err).ToNot(HaveOccurred()) + + // cast the message to the extended interface that has message tracing support + messageWithDT := message.(OutboundMessageWithTracingSupport) + + // set transport context on message + transportCtxTraceID, _ := hex.DecodeString("55d30916c9a3dad1eb4b328e00469e45") + transportCtxSpanID, _ := hex.DecodeString("a7164712c4e1f17f") + + sampledValue := true + traceStateValue := "trace=Sample" + + var transportCtxTraceID16 [16]byte + var transportCtxSpanID8 [8]byte + copy(transportCtxTraceID16[:], transportCtxTraceID) + copy(transportCtxSpanID8[:], transportCtxSpanID) + ok := messageWithDT.SetTransportTraceContext(transportCtxTraceID16, transportCtxSpanID8, sampledValue, &traceStateValue) + Expect(ok).To(BeTrue()) + + publisher.Publish(messageWithDT, resource.TopicOf(topic)) + + select { + case message := <-inboundMessageChannel: + traceID, spanID, sampled, traceState, ok := message.GetTransportTraceContext() + Expect(ok).To(BeTrue()) + Expect(traceID).To(Equal(transportCtxTraceID16)) // should be equal + Expect(spanID).To(Equal(transportCtxSpanID8)) // should be equal + Expect(sampled).To(BeTrue()) + Expect(traceState).To(Equal(traceStateValue)) + case <-time.After(1 * time.Second): + Fail("timed out waiting for message to be delivered") + } + }) + + It("should be able to publish/receive a message with a valid transport context without trace state", func() { + message, err := messageBuilder.Build() + Expect(err).ToNot(HaveOccurred()) + + // cast the message to the extended interface that has message tracing support + messageWithDT := message.(OutboundMessageWithTracingSupport) + + // set transport context on message + transportCtxTraceID, _ := hex.DecodeString("55d30916c9a3dad1eb4b328e00469e45") + transportCtxSpanID, _ := hex.DecodeString("a7164712c4e1f17f") + sampledValue := true + + var transportCtxTraceID16 [16]byte + var transportCtxSpanID8 [8]byte + copy(transportCtxTraceID16[:], transportCtxTraceID) + copy(transportCtxSpanID8[:], transportCtxSpanID) + ok := messageWithDT.SetTransportTraceContext(transportCtxTraceID16, transportCtxSpanID8, sampledValue, nil) + Expect(ok).To(BeTrue()) + + publisher.Publish(messageWithDT, resource.TopicOf(topic)) + + select { + case message := <-inboundMessageChannel: + traceID, spanID, sampled, traceState, ok := message.GetTransportTraceContext() + Expect(ok).To(BeTrue()) + Expect(traceID).To(Equal(transportCtxTraceID16)) // should be equal + Expect(spanID).To(Equal(transportCtxSpanID8)) // should be equal + Expect(sampled).To(BeTrue()) + Expect(traceState).To(Equal("")) // should be empty + case <-time.After(1 * time.Second): + Fail("timed out waiting for message to be delivered") + } + }) + + It("should be able to publish/receive a message with a valid creation context and no transport context", func() { + message, err := messageBuilder.Build() + Expect(err).ToNot(HaveOccurred()) + + // cast the message to the extended interface that has message tracing support + messageWithDT := message.(OutboundMessageWithTracingSupport) + + // set creation context on message + creationCtxTraceID, _ := hex.DecodeString("79f90916c9a3dad1eb4b328e00469e45") + creationCtxSpanID, _ := hex.DecodeString("3b364712c4e1f17f") + sampledValue := true + traceStateValue := "sometrace=Example" + + var creationCtxTraceID16 [16]byte + var creationCtxSpanID8 [8]byte + copy(creationCtxTraceID16[:], creationCtxTraceID) + copy(creationCtxSpanID8[:], creationCtxSpanID) + ok := messageWithDT.SetCreationTraceContext(creationCtxTraceID16, creationCtxSpanID8, sampledValue, &traceStateValue) + Expect(ok).To(BeTrue()) + + publisher.Publish(messageWithDT, resource.TopicOf(topic)) + + select { + case message := <-inboundMessageChannel: + creationTraceID, creationSpanID, creationSampled, creationTraceState, creationOk := message.GetCreationTraceContext() + transportTraceID, transportSpanID, transportSampled, transportTraceState, transportOk := message.GetTransportTraceContext() + + Expect(creationOk).To(BeTrue()) + Expect(creationTraceID).To(Equal(creationCtxTraceID16)) // should be equal + Expect(creationSpanID).To(Equal(creationCtxSpanID8)) // should be equal + Expect(creationSampled).To(BeTrue()) + Expect(creationTraceState).To(Equal(traceStateValue)) + + Expect(transportOk).To(BeFalse()) + Expect(transportTraceID).To(Equal([16]byte{})) // empty + Expect(transportSpanID).To(Equal([8]byte{})) // empty + Expect(transportSampled).To(BeFalse()) + Expect(transportTraceState).To(Equal("")) + + case <-time.After(1 * time.Second): + Fail("timed out waiting for message to be delivered") + } + }) + + It("should be able to publish/receive a message with different creation context and transport context", func() { + message, err := messageBuilder.Build() + Expect(err).ToNot(HaveOccurred()) + + // cast the message to the extended interface that has message tracing support + messageWithDT := message.(OutboundMessageWithTracingSupport) + + // set creation context on message + creationCtxTraceID, _ := hex.DecodeString("79f90916c9a3dad1eb4b328e00469e45") + creationCtxSpanID, _ := hex.DecodeString("3b364712c4e1f17f") + creationCtxTraceState := "sometrace1=Example1" + + // set transport context on message + transportCtxTraceID, _ := hex.DecodeString("79f90916c9a3dad1eb4b328e00469e45") + transportCtxSpanID, _ := hex.DecodeString("a7164712c4e1f17f") + transportCtxTraceState := "sometrace2=Example2" + + var creationCtxTraceID16, transportCtxTraceID16 [16]byte + var creationCtxSpanID8, transportCtxSpanID8 [8]byte + copy(creationCtxTraceID16[:], creationCtxTraceID) + copy(creationCtxSpanID8[:], creationCtxSpanID) + setCreationCtxOk := messageWithDT.SetCreationTraceContext(creationCtxTraceID16, creationCtxSpanID8, true, &creationCtxTraceState) + Expect(setCreationCtxOk).To(BeTrue()) + + copy(transportCtxTraceID16[:], transportCtxTraceID) + copy(transportCtxSpanID8[:], transportCtxSpanID) + setTransportCtxOk := messageWithDT.SetTransportTraceContext(transportCtxTraceID16, transportCtxSpanID8, true, &transportCtxTraceState) + Expect(setTransportCtxOk).To(BeTrue()) + + publisher.Publish(messageWithDT, resource.TopicOf(topic)) + + select { + case message := <-inboundMessageChannel: + creationTraceID, creationSpanID, creationSampled, creationTraceState, creationOk := message.GetCreationTraceContext() + transportTraceID, transportSpanID, transportSampled, transportTraceState, transportOk := message.GetTransportTraceContext() + + Expect(creationOk).To(BeTrue()) + Expect(creationTraceID).To(Equal(creationCtxTraceID16)) // should be equal + Expect(creationSpanID).To(Equal(creationCtxSpanID8)) // should be equal + Expect(creationSampled).To(BeTrue()) + Expect(creationTraceState).To(Equal(creationCtxTraceState)) + + Expect(transportOk).To(BeTrue()) + Expect(transportTraceID).ToNot(Equal([16]byte{})) // not empty + Expect(transportTraceID).To(Equal(transportCtxTraceID16)) // should be equal + + Expect(transportSpanID).ToNot(Equal([8]byte{})) // not empty + Expect(transportSpanID).To(Equal(transportCtxSpanID8)) // should be equal + + Expect(transportSampled).To(BeTrue()) + Expect(transportTraceState).To(Equal(transportCtxTraceState)) + + Expect(creationTraceID).To(Equal(transportTraceID)) // should be equal + Expect(creationSpanID).ToNot(Equal(transportSpanID)) // should not be equal + Expect(creationTraceState).ToNot(Equal(transportTraceState)) // should not be equal + + case <-time.After(1 * time.Second): + Fail("timed out waiting for message to be delivered") + } + }) + + It("should be able to publish/receive a message with no baggage", func() { + message, err := messageBuilder.Build() + Expect(err).ToNot(HaveOccurred()) + + // cast the message to the extended interface that has message tracing support + messageWithDT := message.(OutboundMessageWithTracingSupport) + + baggageErr := messageWithDT.SetBaggage("") // set empty baggage + Expect(baggageErr).To(BeNil()) + + publisher.Publish(messageWithDT, resource.TopicOf(topic)) + + select { + case message := <-inboundMessageChannel: + baggage, ok := message.GetBaggage() + Expect(ok).To(BeTrue()) + Expect(baggage).To(Equal("")) + case <-time.After(1 * time.Second): + Fail("timed out waiting for message to be delivered") + } + }) + + It("should be able to publish/receive a message with a valid baggage", func() { + baggage := "baggage1=payload1" + message, err := messageBuilder.Build() + Expect(err).ToNot(HaveOccurred()) + + // cast the message to the extended interface that has message tracing support + messageWithDT := message.(OutboundMessageWithTracingSupport) + + baggageErr := messageWithDT.SetBaggage(baggage) // set a valid baggage string + Expect(baggageErr).To(BeNil()) + + publisher.Publish(messageWithDT, resource.TopicOf(topic)) + + select { + case message := <-inboundMessageChannel: + receivedBaggage, ok := message.GetBaggage() + Expect(ok).To(BeTrue()) + Expect(receivedBaggage).To(Equal(baggage)) + case <-time.After(1 * time.Second): + Fail("timed out waiting for message to be delivered") + } + }) + + }) + }) diff --git a/version.go b/version.go index 95f719b..2cf33bb 100644 --- a/version.go +++ b/version.go @@ -23,4 +23,4 @@ func init() { core.SetVersion(version) } -const version = "1.4.1" +const version = "1.5.0"