diff --git a/internal/ccsmp/ccsmp_cache.go b/internal/ccsmp/ccsmp_cache.go index f19c4f8..14de854 100644 --- a/internal/ccsmp/ccsmp_cache.go +++ b/internal/ccsmp/ccsmp_cache.go @@ -17,6 +17,7 @@ package ccsmp /* +#cgo CFLAGS: -DSOLCLIENT_PSPLUS_GO #include #include @@ -41,13 +42,13 @@ solClient_returnCode_t _solClient_version_set(solClient_version_info_pt version_ */ import "C" import ( - "unsafe" - "sync" - "fmt" + "fmt" + "sync" + "unsafe" - "solace.dev/go/messaging/pkg/solace/resource" - "solace.dev/go/messaging/pkg/solace/message" - "solace.dev/go/messaging/internal/impl/logging" + "solace.dev/go/messaging/internal/impl/logging" + "solace.dev/go/messaging/pkg/solace/message" + "solace.dev/go/messaging/pkg/solace/resource" ) // CachedMessageSubscriptionRequestStrategyMappingToCCSMPCacheRequestFlags is the mapping for Cached message Subscription Request Strategies to respective CCSMP cache request flags @@ -59,14 +60,13 @@ var CachedMessageSubscriptionRequestStrategyMappingToCCSMPCacheRequestFlags = ma } // CachedMessageSubscriptionRequestStrategyMappingToCCSMPSubscribeFlags is the mapping for Cached message Subscription Request Strategies to respective CCSMP subscription flags -var CachedMessageSubscriptionRequestStrategyMappingToCCSMPSubscribeFlags = map[resource.CachedMessageSubscriptionStrategy]C.solClient_subscribeFlags_t { - resource.AsAvailable: C.SOLCLIENT_SUBSCRIBE_FLAGS_REQUEST_CONFIRM, - resource.LiveCancelsCached: C.SOLCLIENT_SUBSCRIBE_FLAGS_REQUEST_CONFIRM, - resource.CachedFirst: C.SOLCLIENT_SUBSCRIBE_FLAGS_REQUEST_CONFIRM, - resource.CachedOnly: C.SOLCLIENT_SUBSCRIBE_FLAGS_LOCAL_DISPATCH_ONLY, +var CachedMessageSubscriptionRequestStrategyMappingToCCSMPSubscribeFlags = map[resource.CachedMessageSubscriptionStrategy]C.solClient_subscribeFlags_t{ + resource.AsAvailable: C.SOLCLIENT_SUBSCRIBE_FLAGS_REQUEST_CONFIRM, + resource.LiveCancelsCached: C.SOLCLIENT_SUBSCRIBE_FLAGS_REQUEST_CONFIRM, + resource.CachedFirst: C.SOLCLIENT_SUBSCRIBE_FLAGS_REQUEST_CONFIRM, + resource.CachedOnly: C.SOLCLIENT_SUBSCRIBE_FLAGS_LOCAL_DISPATCH_ONLY, } - // SolClientCacheSessionPt is assigned a value type SolClientCacheSessionPt = C.solClient_opaqueCacheSession_pt @@ -74,10 +74,12 @@ type SolClientCacheSessionPt = C.solClient_opaqueCacheSession_pt // SolClientCacheSession structure type SolClientCacheSession struct { - pointer SolClientCacheSessionPt + pointer SolClientCacheSessionPt } -/* sessionToCacheEventCallbackMap is required as a global var even though cache sessions etc. are +/* + sessionToCacheEventCallbackMap is required as a global var even though cache sessions etc. are + scoped to a single receiver. This is required because the event callback that is passed to CCSMP when performing an async cache request cannot have a pointer into Go-managed memory by being associated with receiver. If it did, and CCSMP the event callback after the Go-managed receiver instance was garbage @@ -91,63 +93,61 @@ to this one. var cacheToEventCallbackMap sync.Map func CreateCacheSession(cacheSessionProperties []string, sessionP SolClientSessionPt) (SolClientCacheSessionPt, *SolClientErrorInfoWrapper) { - cacheSessionPropertiesP , freeArrayFunc := ToCArray(cacheSessionProperties, true) - defer freeArrayFunc() - var cacheSessionP SolClientCacheSessionPt - errorInfo := handleCcsmpError(func() SolClientReturnCode { - return C.SessionCreateCacheSession( - cacheSessionPropertiesP, - sessionP, - &cacheSessionP, - ) - }) - fmt.Printf("cacheSessionP after cacheSessionCreate is: %p\n", cacheSessionP) - /* TODO: handle errors from cache sessino create? */ - return cacheSessionP, errorInfo + cacheSessionPropertiesP, freeArrayFunc := ToCArray(cacheSessionProperties, true) + defer freeArrayFunc() + var cacheSessionP SolClientCacheSessionPt + errorInfo := handleCcsmpError(func() SolClientReturnCode { + return C.SessionCreateCacheSession( + cacheSessionPropertiesP, + sessionP, + &cacheSessionP, + ) + }) + return cacheSessionP, errorInfo } -func SendCacheRequest ( - dispatchID uintptr, - cacheSessionP SolClientCacheSessionPt, - topic string, - cacheRequestID message.CacheRequestID, - cacheRequestFlags C.solClient_cacheRequestFlags_t, // There may be a custom type for this? TBD - subscribeFlags C.solClient_subscribeFlags_t, // There may be a custom type for this? TBD - eventCallback SolClientCacheEventCallback, - ) *SolClientErrorInfoWrapper { - - cacheToEventCallbackMap.Store(cacheSessionP, eventCallback) - /* NOTE: Do we need to add the dispatch callback to the map here, or is that already done by the receiver in calls to Receive() or ReceiveAsync()? */ - - topicP := C.CString(topic) - errorInfo := handleCcsmpError(func() SolClientReturnCode { - return C.CacheSessionSendCacheRequest( - C.solClient_uint64_t(dispatchID), - // TODO: rework this, we should probably just have a pointer and not a whole object. - cacheSessionP, - topicP, - C.solClient_uint64_t(cacheRequestID), // This is done elsewhere such as in SolClientMessgeSetSequenceNumber - cacheRequestFlags, - subscribeFlags, - ) - }) - return errorInfo +func SendCacheRequest( + dispatchID uintptr, + cacheSessionP SolClientCacheSessionPt, + topic string, + cacheRequestID message.CacheRequestID, + cacheRequestFlags C.solClient_cacheRequestFlags_t, // There may be a custom type for this? TBD + subscribeFlags C.solClient_subscribeFlags_t, // There may be a custom type for this? TBD + eventCallback SolClientCacheEventCallback, +) *SolClientErrorInfoWrapper { + + cacheToEventCallbackMap.Store(cacheSessionP, eventCallback) + /* NOTE: Do we need to add the dispatch callback to the map here, or is that already done by the receiver in calls to Receive() or ReceiveAsync()? */ + + topicP := C.CString(topic) + errorInfo := handleCcsmpError(func() SolClientReturnCode { + return C.CacheSessionSendCacheRequest( + C.solClient_uint64_t(dispatchID), + // TODO: rework this, we should probably just have a pointer and not a whole object. + cacheSessionP, + topicP, + C.solClient_uint64_t(cacheRequestID), // This is done elsewhere such as in SolClientMessgeSetSequenceNumber + cacheRequestFlags, + subscribeFlags, + ) + }) + return errorInfo } func DestroyCacheSession(cacheSessionP SolClientCacheSessionPt) *SolClientErrorInfoWrapper { - /* NOTE: We remove the cache session pointer from the map before we destroy the cache session - * so that map access using that pointer as an index won't collide with another cache session - * that is created immediately after this one is destroyed.*/ - cacheToEventCallbackMap.Delete(cacheSessionP) - return handleCcsmpError(func() SolClientReturnCode { - return C.CacheSessionDestroy(&cacheSessionP) - }) + /* NOTE: We remove the cache session pointer from the map before we destroy the cache session + * so that map access using that pointer as an index won't collide with another cache session + * that is created immediately after this one is destroyed.*/ + cacheToEventCallbackMap.Delete(cacheSessionP) + return handleCcsmpError(func() SolClientReturnCode { + return C.CacheSessionDestroy(&cacheSessionP) + }) } -func CancelCacheRequest(cacheSessionP SolClientCacheSessionPt) * SolClientErrorInfoWrapper { - return handleCcsmpError(func () SolClientReturnCode { - return C.CacheSessionCancelRequests(cacheSessionP) - }) +func CancelCacheRequest(cacheSessionP SolClientCacheSessionPt) *SolClientErrorInfoWrapper { + return handleCcsmpError(func() SolClientReturnCode { + return C.CacheSessionCancelRequests(cacheSessionP) + }) } // SolClientCacheEventCallback functions should format CCSMP args into Go objects and then pass those objects @@ -159,66 +159,63 @@ type SolClientCacheEventInfoPt = C.solCache_eventCallbackInfo_pt type SolClientCacheEvent = C.solCache_event_t type SolClientCacheEventInfo struct { - /* TODO: Rename this to CacheEventInfo to better distinguish it from CCSMP objects, since it now has more than - * just the original event fields. */ - cacheSessionP SolClientCacheSessionPt - event SolClientCacheEvent - topic string - returnCode SolClientReturnCode - subCode SolClientSubCode - cacheRequestID message.CacheRequestID - err error + /* TODO: Rename this to CacheEventInfo to better distinguish it from CCSMP objects, since it now has more than + * just the original event fields. */ + cacheSessionP SolClientCacheSessionPt + event SolClientCacheEvent + topic string + returnCode SolClientReturnCode + subCode SolClientSubCode + cacheRequestID message.CacheRequestID + err error +} + +func (eventInfo *SolClientCacheEventInfo) String() string { + return fmt.Sprintf("SolClientCacheEventInfo:\n\tcacheSesionP: 0x%x\n\tevent: %d\n\ttopic: %s\n\treturnCode: %d\n\tsubCode: %d\n\tcacheRequestID: %d\n\terr: %s", eventInfo.cacheSessionP, eventInfo.event, eventInfo.topic, eventInfo.returnCode, eventInfo.subCode, eventInfo.cacheRequestID, eventInfo.err) } const ( - SolClientCacheEventRequestCompletedNotice SolClientCacheEvent = 1 + SolClientCacheEventRequestCompletedNotice SolClientCacheEvent = 1 ) func NewConfiguredSolClientCacheEventInfo(cacheSessionP SolClientCacheSessionPt, cacheRequestID message.CacheRequestID, topic string, err error) SolClientCacheEventInfo { - return SolClientCacheEventInfo{ - cacheSessionP: cacheSessionP, - event: SolClientCacheEventRequestCompletedNotice, - topic: topic, - returnCode: SolClientReturnCodeFail, - subCode: SolClientSubCodeInternalError, - cacheRequestID: cacheRequestID, - err: err, - } + return SolClientCacheEventInfo{ + cacheSessionP: cacheSessionP, + event: SolClientCacheEventRequestCompletedNotice, + topic: topic, + returnCode: SolClientReturnCodeFail, + subCode: SolClientSubCodeInternalError, + cacheRequestID: cacheRequestID, + err: err, + } } -func (i * SolClientCacheEventInfo) GetCacheSessionPointer() SolClientCacheSessionPt { - return i.cacheSessionP +func (i *SolClientCacheEventInfo) GetCacheSessionPointer() SolClientCacheSessionPt { + return i.cacheSessionP } -func SolClientCacheEventInfoFromCoreCacheEventInfo(eventCallbackInfo SolClientCacheEventInfoPt, userP unsafe.Pointer) SolClientCacheEventInfo { - return SolClientCacheEventInfo{ - cacheSessionP: SolClientCacheSessionPt(userP), - event: SolClientCacheEvent(eventCallbackInfo.cacheEvent), - topic: C.GoString(eventCallbackInfo.topic), - returnCode: SolClientReturnCode(eventCallbackInfo.rc), - subCode: SolClientSubCode(eventCallbackInfo.subCode), - cacheRequestID: message.CacheRequestID(eventCallbackInfo.cacheRequestId), - } +func SolClientCacheEventInfoFromCoreCacheEventInfo(eventCallbackInfo SolClientCacheEventInfoPt, userP uintptr) SolClientCacheEventInfo { + return SolClientCacheEventInfo{ + cacheSessionP: SolClientCacheSessionPt(userP), + event: SolClientCacheEvent(eventCallbackInfo.cacheEvent), + topic: C.GoString(eventCallbackInfo.topic), + returnCode: SolClientReturnCode(eventCallbackInfo.rc), + subCode: SolClientSubCode(eventCallbackInfo.subCode), + cacheRequestID: message.CacheRequestID(eventCallbackInfo.cacheRequestId), + } } //export goCacheEventCallback -func goCacheEventCallback(/*opaqueSessionP*/_ SolClientSessionPt, eventCallbackInfo SolClientCacheEventInfoPt, userP unsafe.Pointer) { - /* NOTE: We don't need to use the session pointer since we can use the user_p(a.k.a. the cache session pointer) - * which is guaranteed to be unique for at least the duration that the cache session pointer is in the global - * map since during receiver termination we destory the cache session only after we remove it from all maps. - */ - fmt.Printf("Got to goCacheEventCallback\n") - fmt.Printf("goCacheEvntCallback::userP is %p\n", uintptr(userP)) - fmt.Printf("goCacheEvntCallback::userP is %p\n", SolClientCacheSessionPt(userP)) - //if callback, ok := cacheToEventCallbackMap.Load(uintptr(userP)); ok { - if callback, ok := cacheToEventCallbackMap.Load(SolClientCacheSessionPt(userP)); ok { -callback.(SolClientCacheEventCallback)(SolClientCacheEventInfoFromCoreCacheEventInfo(eventCallbackInfo, userP)) - fmt.Printf("Finished receiver cache event callback\n") - } else { - fmt.Printf("Received event callback from core API without an associated cache event callback\n") - if logging.Default.IsDebugEnabled() { - logging.Default.Debug("Received event callback from core API without an associated cache event callback") - } - } - fmt.Printf("finished goCacheEventCallback\n") +func goCacheEventCallback( /*opaqueSessionP*/ _ SolClientSessionPt, eventCallbackInfo SolClientCacheEventInfoPt, userP unsafe.Pointer) { + /* NOTE: We don't need to use the session pointer since we can use the user_p(a.k.a. the cache session pointer) + * which is guaranteed to be unique for at least the duration that the cache session pointer is in the global + * map since during receiver termination we destory the cache session only after we remove it from all maps. + */ + if callback, ok := cacheToEventCallbackMap.Load(SolClientCacheSessionPt(uintptr(userP))); ok { + callback.(SolClientCacheEventCallback)(SolClientCacheEventInfoFromCoreCacheEventInfo(eventCallbackInfo, uintptr(userP))) + } else { + if logging.Default.IsDebugEnabled() { + logging.Default.Debug("Received event callback from core API without an associated cache event callback") + } + } } diff --git a/internal/ccsmp/ccsmp_core.go b/internal/ccsmp/ccsmp_core.go index 306e807..b1c42e2 100644 --- a/internal/ccsmp/ccsmp_core.go +++ b/internal/ccsmp/ccsmp_core.go @@ -111,17 +111,12 @@ var sessionToEventCallbackMap sync.Map //export goMessageReceiveCallback func goMessageReceiveCallback(sessionP SolClientSessionPt, msgP SolClientMessagePt, userP unsafe.Pointer) C.solClient_rxMsgCallback_returnCode_t { - fmt.Printf("Got to goMessageReceiveCallback\n") if callback, ok := sessionToRXCallbackMap.Load(sessionP); ok { - fmt.Printf("goMessageReceiveCallback::callback found\n") if callback.(SolClientMessageCallback)(msgP, userP) { - fmt.Printf("goMessageReceiveCallback::callback returned true\n") return C.SOLCLIENT_CALLBACK_TAKE_MSG } - fmt.Printf("goMessageReceiveCallback::callback returned false\n") return C.SOLCLIENT_CALLBACK_OK } - fmt.Printf("goMessageReceiveCallback::callback not found\n") logging.Default.Error("Received message from core API without an associated session callback") return C.SOLCLIENT_CALLBACK_OK } @@ -150,7 +145,7 @@ func goEventCallback(sessionP SolClientSessionPt, eventInfoP SolClientSessionEve if callback, ok := sessionToEventCallbackMap.Load(sessionP); ok { callback.(SolClientSessionEventCallback)(SolClientSessionEvent(eventInfoP.sessionEvent), eventInfoP.responseCode, C.GoString(eventInfoP.info_p), eventInfoP.correlation_p, userP) } else { - logging.Default.Debug(fmt.Sprintf("Received event callback from core API without an associated session callback:\n%s\n", C.GoString(eventInfoP.info_p))) + logging.Default.Debug(fmt.Sprintf("Received event callback from core API without an associated session callback:\n%s\n", C.GoString(eventInfoP.info_p))) } } @@ -264,7 +259,7 @@ type SolClientSession struct { // GetPointer returns the session pointer. func (session *SolClientSession) GetPointer() SolClientSessionPt { - return session.pointer + return session.pointer } // SetMessageCallback sets the message callback to use diff --git a/internal/ccsmp/ccsmp_helper.c b/internal/ccsmp/ccsmp_helper.c index 4e7bd92..d5a92fc 100644 --- a/internal/ccsmp/ccsmp_helper.c +++ b/internal/ccsmp/ccsmp_helper.c @@ -303,9 +303,7 @@ CacheSessionSendCacheRequest( * object is necessarily unique. It is easiest to use the cache session pointer since it is a unique * value that will persist until the end of the cache request lifetime, at which point the user_p is * removed from any subscription tables anyways. */ - printf("CacheSessionSendCacheRequest::opaqueCacheSession_p is %p", opaqueCacheSession_p); void * user_p = (void *)opaqueCacheSession_p; - printf("CacheSessionSendCacheRequest::user_p is %p", user_p); solClient_session_rxMsgDispatchFuncInfo_t dispatchInfo; /* msg dispatch callback to set */ dispatchInfo.dispatchType = SOLCLIENT_DISPATCH_TYPE_CALLBACK; dispatchInfo.callback_p = messageReceiveCallback; diff --git a/internal/ccsmp/generator/ccsmp_cache_session_prop_generator.go b/internal/ccsmp/generator/ccsmp_cache_session_prop_generator.go index 61b3c3d..f6cd0e3 100644 --- a/internal/ccsmp/generator/ccsmp_cache_session_prop_generator.go +++ b/internal/ccsmp/generator/ccsmp_cache_session_prop_generator.go @@ -31,7 +31,7 @@ import ( // The environment variable SOLCACHE_H must be set to the absolute path to project_root/lib//include/solclient/solCache.h. const outputFile string = "ccsmp_cache_session_prop_generated.go" -const header string = "package ccsmp\n// Code generated by ccsmp_session_prop_generator.go via go generate. DO NOT EDIT.\n\n\n/*\n#include \"solclient/solCache.h\"\n*/\nimport \"C\"\nconst (\n" +const header string = "package ccsmp\n// Code generated by ccsmp_session_prop_generator.go via go generate. DO NOT EDIT.\n\n\n/*\n#cgo CFLAGS: -DSOLCLIENT_PSPLUS_GO\n#include \"solclient/solCache.h\"\n*/\nimport \"C\"\nconst (\n" const footer string = ")\n" const namePrefix string = "SolClientCacheSessionProp" const definePrefix string = "SOLCLIENT_CACHESESSION_PROP" diff --git a/internal/ccsmp/lib/include/solclient/solCache.h b/internal/ccsmp/lib/include/solclient/solCache.h index 840622b..1df8718 100644 --- a/internal/ccsmp/lib/include/solclient/solCache.h +++ b/internal/ccsmp/lib/include/solclient/solCache.h @@ -66,7 +66,12 @@ typedef enum solCache_event { SOLCACHE_EVENT_REQUEST_COMPLETED_NOTICE /** Cache Request has finished. The event returnCode and subCode provide status information */ } solCache_event_t; -typedef void *solClient_opaqueCacheSession_pt; /**< An opaque pointer to a cache session. */ +#if defined(SOLCLIENT_PSPLUS_GO) +#include +typedef uintptr_t solClient_opaqueCacheSession_pt; /**< An opaque pointer to a cache session. */ +#else +typedef void *solClient_opaqueCacheSession_pt; /**< An opaque pointer to a cache session. */ +#endif /** * diff --git a/internal/impl/constants/error_strings.go b/internal/impl/constants/error_strings.go index 05a7003..0e462a8 100644 --- a/internal/impl/constants/error_strings.go +++ b/internal/impl/constants/error_strings.go @@ -226,5 +226,41 @@ const AttemptingCancellationNoticeGeneration = "Attempting to generate a cache r // FailedToDestroyCacheSession error string const FailedToDestroyCacheSession = "Failed to destroy cache session" -// InvalidCachedMessageSubscriptionRequestStrategy error string +// InvalidCachedMessageSubscriptionStrategyPassed error string const InvalidCachedMessageSubscriptionStrategyPassed = "an invalid CachedMessageSubscriptionStrategy was passed" + +// UnableToPassCacheResponseToApplication error string +const UnableToPassCacheResponseToApplication = "Unable to pass cache response to application because: " + +// NoCacheChannelAvailable error string +const NoCacheChannelAvailable = "The API failed to retrieve the configured channel that was intended for the application" + +// UnableToRunApplicationCacheCallback error string +const UnableToRunApplicationCacheCallback = "Unable to run the cache response callback given by the application because: " + +// NoCacheCallbackAvailable error string +const NoCacheCallbackAvailable = "The application did not provide a callback that could be used to process the cache response." + +// UnableToProcessCacheResponse error string +const UnableToProcessCacheResponse = "Unable to process cache response because: " + +// InvalidCacheSession error string +const InvalidCacheSession = "The cache session associated with the given cache request/response was invalid" + +// FailedToRetrieveCacheResponseProcessor error string +const FailedToRetrieveCacheResponseProcessor = "Tried to retrieve CacheResponseProcessor from cacheSessionMap, but none existed for the given cacheSessionP:" + +// ApplicationTriedToCreateCacheRequest error string +const ApplicationTriedToCreateCacheRequest = "The application API to create a new cache request using cache session pointer" + +// AnotherCacheSessionAlreadyExists error string +const AnotherCacheSessionAlreadyExists = "but another cache request's cache session under that pointer already exists." + +// StartedCachePolling error string +const StartedCachePolling = "Started go routine for polling cache response channel." + +// DidntStartCachePolling error string +const DidntStartCachePolling = "Didn't start go routine for polling cache response channel again because it is already running." + +// InitializedReceiverCacheSessionMap error string +const InitializedReceiverCacheSessionMap = "Initialized receiver cacheSessionMap" diff --git a/internal/impl/core/metrics.go b/internal/impl/core/metrics.go index 9bac6f5..a02ba89 100644 --- a/internal/impl/core/metrics.go +++ b/internal/impl/core/metrics.go @@ -189,7 +189,7 @@ func (backedMetrics *ccsmpBackedMetrics) getNextGenStat(metric NextGenMetric) ui return atomic.LoadUint64(&backedMetrics.metrics[metric]) } -func (metrics *ccsmpBackedMetrics) getAggregateStat(stats []interface{}) uint64 { +func (backedMetrics *ccsmpBackedMetrics) getAggregateStat(stats []interface{}) uint64 { // accumulate multiple ccsmp metrics to generate an aggregated metric aggregatedMetricsCount := uint64(0) for _, stat := range stats { @@ -197,10 +197,10 @@ func (metrics *ccsmpBackedMetrics) getAggregateStat(stats []interface{}) uint64 switch casted := stat.(type) { case ccsmp.SolClientStatsRX: // this is a RX stat - aggregatedMetricsCount += metrics.getRXStat(casted) + aggregatedMetricsCount += backedMetrics.getRXStat(casted) case ccsmp.SolClientStatsTX: // this is a TX stat - aggregatedMetricsCount += metrics.getTXStat(casted) + aggregatedMetricsCount += backedMetrics.getTXStat(casted) default: // don't recognize the metric stat, continue logging.Default.Warning("Could not find mapping for aggregated metric with ID " + fmt.Sprint(stat)) diff --git a/internal/impl/core/receiver.go b/internal/impl/core/receiver.go index 9836b2a..722e57c 100644 --- a/internal/impl/core/receiver.go +++ b/internal/impl/core/receiver.go @@ -71,11 +71,11 @@ type Receiver interface { IncrementDuplicateAckCount() // Creates a new persistent receiver with the given callback NewPersistentReceiver(properties []string, callback RxCallback, eventCallback PersistentEventCallback) (PersistentReceiver, ErrorInfo) - /* TODO: The `GetSessionPointer` method seems a litte out of place here. For now it works, but it might be an, - * anti-pattern so we should look into clarifying this and maybe doing this differently in a future iteration.*/ + /* TODO: The `GetSessionPointer` method seems a litte out of place here. For now it works, but it might be an, + * anti-pattern so we should look into clarifying this and maybe doing this differently in a future iteration.*/ - // Retrieves the sesion pointer - GetSessionPointer() ccsmp.SolClientSessionPt + // Retrieves the sesion pointer + GetSessionPointer() ccsmp.SolClientSessionPt } // PersistentReceiver interface @@ -206,7 +206,6 @@ func (receiver *ccsmpBackedReceiver) Events() Events { } func (receiver *ccsmpBackedReceiver) rxCallback(msg Receivable, userP unsafe.Pointer) bool { - fmt.Printf("Got to rxCallback with user_p %p\n", userP) receiver.rxLock.RLock() defer receiver.rxLock.RUnlock() callback, ok := receiver.rxMap[uintptr(userP)] @@ -384,7 +383,7 @@ func (receiver *ccsmpBackedReceiver) NewPersistentReceiver(properties []string, /* FFC: It might be better if this were in ccsmp_core.go? */ func (receiver *ccsmpBackedReceiver) GetSessionPointer() ccsmp.SolClientSessionPt { - return receiver.session.GetPointer() + return receiver.session.GetPointer() } // Destroy destroys the flow diff --git a/internal/impl/receiver/cache_util.go b/internal/impl/receiver/cache_util.go index 84b8ab3..dc320c1 100644 --- a/internal/impl/receiver/cache_util.go +++ b/internal/impl/receiver/cache_util.go @@ -17,7 +17,7 @@ package receiver import ( - "fmt" + "fmt" "solace.dev/go/messaging/internal/ccsmp" "solace.dev/go/messaging/internal/impl/constants" @@ -26,69 +26,64 @@ import ( "solace.dev/go/messaging/pkg/solace/message" ) -const UnableToPassCacheResponseToApplication = "Unable to pass cache response to application because: " -const NoCacheChannelAvailable = "The API failed to retrieve the configured channel that was intended for the application" -const UnableToRunApplicationCacheCallback = "Unable to run the cache response callback given by the application because: " -const NoCacheCallbackAvailable = "The application did not provide a callback that could be used to process the cache response." -const UnableToProcessCacheResponse = "Unable to process cache response because: " -const InvalidCacheSession = "The cache session associated with the given cache request/response was invalid" - // PollAndProcessCacheResponseChannel is intended to be run as a go routine. -func (receiver * directMessageReceiverImpl) PollAndProcessCacheResponseChannel() { - receiver.cachePollingRunning.Store(true) - var cacheEventInfo ccsmp.SolClientCacheEventInfo - channelIsOpen := true - /* poll cacheventinfo channel */ - for channelIsOpen { - cacheEventInfo, channelIsOpen = <- receiver.cacheResponseChan - if !channelIsOpen { - // If channel is closed, we can stop polling. In this case we don't need to handle - // the cacheEventInfo since there won't be a menaingful one left on the queue. - // Any function that closes the channel must guarantee this. - /* TODO: This may need to be reworked? */ - break - } - /* We decrement the counter first, since as soon as we pop the SolClientCacheEventInfo - * off the channel, CCSMP is able to put another on. If CCSMP is able resume processing the - * cache responses, we should unblock the application by allowing it to submit more cache - * requests ASAP.*/ - receiver.cacheResponseChanCounter.Add(-1) - receiver.ProcessCacheEvent(cacheEventInfo) - } - // Indicate that this function has stopped running. - receiver.cachePollingRunning.Store(false) -} - -/* TODO: This should probably be refactored to the direct_message_receiver_impl module */ +func (receiver *directMessageReceiverImpl) PollAndProcessCacheResponseChannel() { + receiver.cachePollingRunning.Store(true) + var cacheEventInfo ccsmp.SolClientCacheEventInfo + channelIsOpen := true + /* poll cacheventinfo channel */ + for channelIsOpen { + cacheEventInfo, channelIsOpen = <-receiver.cacheResponseChan + receiver.cacheResponseChanCounter.Add(-1) + if !channelIsOpen { + // If channel is closed, we can stop polling. In this case we don't need to handle + // the cacheEventInfo since there won't be a menaingful one left on the queue. + // Any function that closes the channel must guarantee this. + break + } + /* We decrement the counter first, since as soon as we pop the SolClientCacheEventInfo + * off the channel, CCSMP is able to put another on. If CCSMP is able resume processing the + * cache responses, we should unblock the application by allowing it to submit more cache + * requests ASAP.*/ + receiver.ProcessCacheEvent(cacheEventInfo) + } + // Indicate that this function has stopped running. + receiver.cachePollingRunning.Store(false) +} + // ProcessCacheEvent is intended to be run by any agent trying to process a cache response. This can be run from a polling go routine, or during termination to cleanup remaining resources, and possibly by other agents as well. func (receiver *directMessageReceiverImpl) ProcessCacheEvent(cacheEventInfo ccsmp.SolClientCacheEventInfo) { - cacheSessionP := cacheEventInfo.GetCacheSessionPointer() - /* TODO: Make sure there are no mutations to the map during this read.*/ - if cacheResponseHolder, found := receiver.cacheSessionMap.Load(cacheSessionP); !found { - if receiver.logger.IsDebugEnabled() { - /* TODO: refactor consts to separate file */ - /* This can occur when there has been a duplicate event, where for some reason CCSMP was able - * produce an event, but PSPGo thought CCSMP was not, so PSPGo generated an event on CCSMP's - * behalf, but after CCSMP's event was put on the channel. This would result in the CCSMP- - * generated event being processed and its cache session pointer being removed from the tabel - * and the duplicate event that was processed afterwards having the same cache session pointer, - * but no matching entry in the table since it was already removed by the original entry. This - * is not a bug, and the application doesn't need to be concerned about this, so we log it as - * debug. */ - receiver.logger.Debug(UnableToProcessCacheResponse + InvalidCacheSession) - } - } else { - cacheResponseHolder.(CacheResponseProcessor).ProcessCacheResponse() - } - /* Lifecycle management of cache sessions */ - /* NOTE: In the event of a duplicate event in the receiver.cacheResponseChan channel, the following deletion - * will not panic. */ - receiver.cacheSessionMap.Delete(cacheSessionP) - if errorInfo := ccsmp.DestroyCacheSession(cacheSessionP); errorInfo != nil { - /* NOTE: If we can't destroy the cache session, there is no follow up action that can be taken, so - * there is no point in returning an error. We just log it and move on. */ - receiver.logger.Error(fmt.Sprintf("%s %s and %s. ErrorInfo is: [%s]", constants.FailedToDestroyCacheSession, constants.WithCacheSessionPointer, constants.WithCacheRequestID, errorInfo.GetMessageAsString())) - } + cacheSessionP := cacheEventInfo.GetCacheSessionPointer() + receiver.cacheLock.Lock() + cacheResponseHolder, found := receiver.loadCacheResponseProcessorFromMapUnsafe(cacheSessionP) + receiver.cacheLock.Unlock() + if !found { + if receiver.logger.IsDebugEnabled() { + /* This can occur when there has been a duplicate event, where for some reason CCSMP was able + * produce an event, but PSPGo thought CCSMP was not, so PSPGo generated an event on CCSMP's + * behalf, but after CCSMP's event was put on the channel. This would result in the CCSMP- + * generated event being processed and its cache session pointer being removed from the tabel + * and the duplicate event that was processed afterwards having the same cache session pointer, + * but no matching entry in the table since it was already removed by the original entry. This + * is not a bug, and the application doesn't need to be concerned about this, so we log it as + * debug. */ + receiver.logger.Debug(constants.UnableToProcessCacheResponse + constants.InvalidCacheSession) + } + } else { + cacheResponse := solace.CacheResponse{} + cacheResponseHolder.ProcessCacheResponse(cacheResponse) + } + /* Lifecycle management of cache sessions */ + /* NOTE: In the event of a duplicate event in the receiver.cacheResponseChan channel, the following deletion + * will not panic. */ + receiver.cacheLock.Lock() + receiver.deleteCacheResponseProcessorFromMapUnsafe(cacheSessionP) + receiver.cacheLock.Unlock() + if errorInfo := ccsmp.DestroyCacheSession(cacheSessionP); errorInfo != nil { + /* NOTE: If we can't destroy the cache session, there is no follow up action that can be taken, so + * there is no point in returning an error. We just log it and move on. */ + receiver.logger.Error(fmt.Sprintf("%s %s and %s. ErrorInfo is: [%s]", constants.FailedToDestroyCacheSession, constants.WithCacheSessionPointer, constants.WithCacheRequestID, errorInfo.GetMessageAsString())) + } } // CacheRequestInfo holds the original information that was used to send the cache request. @@ -98,133 +93,131 @@ func (receiver *directMessageReceiverImpl) ProcessCacheEvent(cacheEventInfo ccsm /* NOTE: This is actually most useful in generating cache response stubs for cache sessions that somehow got lost and * that still need to be cleaned up during termination.*/ type CacheRequestInfo struct { - /* NOTE: we don't need to include the cache session pointer in this struct since it is only ever stored - * in a map where the cache session pointer is used as the key.*/ - cacheRequestID message.CacheRequestID - topic string + /* NOTE: we don't need to include the cache session pointer in this struct since it is only ever stored + * in a map where the cache session pointer is used as the key.*/ + cacheRequestID message.CacheRequestID + topic string } func NewCacheRequestInfo(cacheRequestID message.CacheRequestID, topic string) CacheRequestInfo { - return CacheRequestInfo{ - cacheRequestID: cacheRequestID, - topic: topic, - } + return CacheRequestInfo{ + cacheRequestID: cacheRequestID, + topic: topic, + } } -func (i * CacheRequestInfo) GetTopic() string { - return i.topic +func (i *CacheRequestInfo) GetTopic() string { + return i.topic } -func (i * CacheRequestInfo) GetCacheRequestID() message.CacheRequestID { - return i.cacheRequestID +func (i *CacheRequestInfo) GetCacheRequestID() message.CacheRequestID { + return i.cacheRequestID } // CacheResponseProcessor provides an interface through which the information necessary to process a cache response // that is passed from CCSMP can be acquired. type CacheResponseProcessor interface { - /* This model of having a common interface be implemented by multiple concrete types so that we can have a - * heterogeneous set of value types in the map is useful, but would become tedious as more implementing - * types were added since every type would have to implement a nil accessor for each other type. This is fine - * while there are only two implementing types, and more types are not expected. If the number of implementing - * types should increase, this design pattern should be revisited. - */ + /* This model of having a common interface be implemented by multiple concrete types so that we can have a + * heterogeneous set of value types in the map is useful, but would become tedious as more implementing + * types were added since every type would have to implement a nil accessor for each other type. This is fine + * while there are only two implementing types, and more types are not expected. If the number of implementing + * types should increase, this design pattern should be revisited. + */ - // GetChannel returns the channel that is used to pass the CacheResponse back to the application if such a - // channel exists or will otherwise be nil. If the channel exists, the bool will return as true. If the channel - // does not exist, the bool will return false. - GetChannel() (chan solace.CacheResponse, bool) + // GetChannel returns the channel that is used to pass the CacheResponse back to the application if such a + // channel exists or will otherwise be nil. If the channel exists, the bool will return as true. If the channel + // does not exist, the bool will return false. + GetChannel() (chan solace.CacheResponse, bool) - // GetCallback returns the callback that is used to post-process the CacheResponse if such a callback exists. - // If the callback exists, the bool will return as true. If the callback does not exist, the bool will return - // false. - GetCallback() (func(solace.CacheResponse), bool) + // GetCallback returns the callback that is used to post-process the CacheResponse if such a callback exists. + // If the callback exists, the bool will return as true. If the callback does not exist, the bool will return + // false. + GetCallback() (func(solace.CacheResponse), bool) - // ProcessCacheResponse processes the cache response according to the implementation - ProcessCacheResponse() + // ProcessCacheResponse processes the cache response according to the implementation + ProcessCacheResponse(solace.CacheResponse) - // GetCacheRequestInfo returns the original information that was used to send the cache request. - // This is useful for comparing a received cache response during processing, or for adding - // information to logging or error messages when this information cannot be retrieved from the - // cache response. - GetCacheRequestInfo() * CacheRequestInfo + // GetCacheRequestInfo returns the original information that was used to send the cache request. + // This is useful for comparing a received cache response during processing, or for adding + // information to logging or error messages when this information cannot be retrieved from the + // cache response. + GetCacheRequestInfo() *CacheRequestInfo } // CacheResponseCallbackHolder holds an application-provided callback that is responsible for post-processing the cache // response. CacheResponseCallbackHolder implements the CacheResponseProcessor interface to allow safe access of this // callback when being retrieved from a map of heterogeneous values. type CacheResponseCallbackHolder struct { - CacheResponseProcessor - cacheRequestInfo CacheRequestInfo - callback func(solace.CacheResponse) + CacheResponseProcessor + cacheRequestInfo CacheRequestInfo + callback func(solace.CacheResponse) } func NewCacheResponseCallbackHolder(callback func(solace.CacheResponse), cacheRequestInfo CacheRequestInfo) CacheResponseCallbackHolder { - return CacheResponseCallbackHolder{ - cacheRequestInfo: cacheRequestInfo, - callback: callback, - } + return CacheResponseCallbackHolder{ + cacheRequestInfo: cacheRequestInfo, + callback: callback, + } } func (cbHolder CacheResponseCallbackHolder) GetCallback() (func(solace.CacheResponse), bool) { - return cbHolder.callback, true + return cbHolder.callback, true } func (cbHolder CacheResponseCallbackHolder) GetChannel() (chan solace.CacheResponse, bool) { - return nil, false + return nil, false } -func (cbHolder CacheResponseCallbackHolder) ProcessCacheResponse() { - if callback, found := cbHolder.GetCallback(); found { - cacheResponse := solace.CacheResponse{} - callback(cacheResponse) - } else { - logging.Default.Error(UnableToPassCacheResponseToApplication + NoCacheCallbackAvailable) - } +func (cbHolder CacheResponseCallbackHolder) ProcessCacheResponse(cacheResponse solace.CacheResponse) { + if callback, found := cbHolder.GetCallback(); found { + callback(cacheResponse) + } else { + logging.Default.Error(constants.UnableToPassCacheResponseToApplication + constants.NoCacheCallbackAvailable) + } } -func (cbHolder CacheResponseCallbackHolder) GetCacheRequestInfo() * CacheRequestInfo { - return &cbHolder.cacheRequestInfo +func (cbHolder CacheResponseCallbackHolder) GetCacheRequestInfo() *CacheRequestInfo { + return &cbHolder.cacheRequestInfo } // CacheResponseChannelHolder holds a API-provided channel to which the cache reponse will be pushed. // CacheResponseChannelHolder implements the CacheResponseProcessor interface to allow safe access of this callback // when being retrieved from a map of heterogeneous values. type CacheResponseChannelHolder struct { - CacheResponseProcessor - cacheRequestInfo CacheRequestInfo - channel chan solace.CacheResponse + CacheResponseProcessor + cacheRequestInfo CacheRequestInfo + channel chan solace.CacheResponse } func NewCacheResponseChannelHolder(channel chan solace.CacheResponse, cacheRequestInfo CacheRequestInfo) CacheResponseChannelHolder { - return CacheResponseChannelHolder{ - cacheRequestInfo: cacheRequestInfo, - channel: channel, - } + return CacheResponseChannelHolder{ + cacheRequestInfo: cacheRequestInfo, + channel: channel, + } } func (chHolder CacheResponseChannelHolder) GetCallback() (func(solace.CacheResponse), bool) { - return nil, false + return nil, false } func (chHolder CacheResponseChannelHolder) GetChannel() (chan solace.CacheResponse, bool) { - return chHolder.channel, true -} - -func (chHolder CacheResponseChannelHolder) ProcessCacheResponse() { - /* Because function pointers and channels are both pointer types, they could be nil. So, we should - * check to make sure that they are not. There could be an error where the API did not create the - * correct holder type, which would cause the holder's value to be nil and the API would panic.*/ - if channel, found := chHolder.GetChannel(); found { - cacheResponse := solace.CacheResponse{} - /* This will not block because the channel is created with a buffer size of 1 in RequestCachedAsync() */ - channel <- cacheResponse - close(channel) - } else { - /* This is an error log because it is the API's responsiblity to create and manage the channel. */ - logging.Default.Error(UnableToPassCacheResponseToApplication + NoCacheChannelAvailable) - } -} - -func (chHolder CacheResponseChannelHolder) GetCacheRequestInfo() * CacheRequestInfo { - return &chHolder.cacheRequestInfo + return chHolder.channel, true +} + +func (chHolder CacheResponseChannelHolder) ProcessCacheResponse(cacheResponse solace.CacheResponse) { + /* Because function pointers and channels are both pointer types, they could be nil. So, we should + * check to make sure that they are not. There could be an error where the API did not create the + * correct holder type, which would cause the holder's value to be nil and the API would panic.*/ + if channel, found := chHolder.GetChannel(); found { + /* This will not block because the channel is created with a buffer size of 1 in RequestCachedAsync() */ + channel <- cacheResponse + close(channel) + } else { + /* This is an error log because it is the API's responsiblity to create and manage the channel. */ + logging.Default.Error(constants.UnableToPassCacheResponseToApplication + constants.NoCacheChannelAvailable) + } +} + +func (chHolder CacheResponseChannelHolder) GetCacheRequestInfo() *CacheRequestInfo { + return &chHolder.cacheRequestInfo } diff --git a/internal/impl/receiver/direct_message_receiver_impl.go b/internal/impl/receiver/direct_message_receiver_impl.go index 55c28a9..dd03401 100644 --- a/internal/impl/receiver/direct_message_receiver_impl.go +++ b/internal/impl/receiver/direct_message_receiver_impl.go @@ -20,11 +20,11 @@ import ( "fmt" "regexp" "runtime/debug" + "strconv" "sync" "sync/atomic" "time" "unsafe" - "strconv" "solace.dev/go/messaging/internal/ccsmp" "solace.dev/go/messaging/internal/impl/constants" @@ -89,22 +89,22 @@ type directMessageReceiverImpl struct { terminationHandlerID uint - // cacheSessionMap is used to map the cache session pointer to the method for handling the cache response, - // as specified by the application on a call to a [ReceiverCacheRequester] interface. - cacheSessionMap sync.Map // ([keyType]valueType) [ccsmp.SolClientCacheSessionPt]CacheResponseProcessor - // cacheResponseChan is used to buffer the cache responses from CCSMP. - cacheResponseChan chan ccsmp.SolClientCacheEventInfo - /* NOTE: This data type could be changed from int32 if necessary, but must match the type of - * directMessageReceiverImpl.cacheResponseChanCounter. */ - // cacheResponseChanCounter is used to prevent cache requests from being submitted if the - // cacheResponseChan buffer is full. - cacheResponseChanCounter atomic.Int32 - // cachePollingRunning is used to determine whether or not the goroutine that polls the cacheResponseChan - // has been started yet. - cachePollingRunning atomic.Bool - - // cacheLock is used to ensure that the application cannot submit cache requests while the receiver is terminating. - cacheLock sync.Mutex + // cacheSessionMap is used to map the cache session pointer to the method for handling the cache response, + // as specified by the application on a call to a [ReceiverCacheRequester] interface. + cacheSessionMap map[ccsmp.SolClientCacheSessionPt]CacheResponseProcessor // ([keyType]valueType) [ccsmp.SolClientCacheSessionPt]CacheResponseProcessor + // cacheResponseChan is used to buffer the cache responses from CCSMP. + cacheResponseChan chan ccsmp.SolClientCacheEventInfo + /* NOTE: This data type could be changed from int32 if necessary, but must match the type of + * directMessageReceiverImpl.cacheResponseChanCounter. */ + // cacheResponseChanCounter is used to prevent cache requests from being submitted if the + // cacheResponseChan buffer is full. + cacheResponseChanCounter atomic.Int32 + // cachePollingRunning is used to determine whether or not the goroutine that polls the cacheResponseChan + // has been started yet. + cachePollingRunning atomic.Bool + + // cacheLock is used to ensure that the application cannot submit cache requests while the receiver is terminating. + cacheLock sync.Mutex } type directInboundMessage struct { @@ -330,7 +330,7 @@ func (receiver *directMessageReceiverImpl) Terminate(gracePeriod time.Duration) // is still being processed by the async callback, we will not terminate until that message callback is complete <-receiver.terminationComplete } - receiver.teardownCache() + receiver.teardownCache() return nil } @@ -900,98 +900,134 @@ func (receiver *directMessageReceiverImpl) String() string { return fmt.Sprintf("solace.DirectMessageReceiver at %p", receiver) } +// loadCacheResponseProcessorFromMapUnsafe retrieves the cacheResponseProcessor associated with the given cacheSessionP. +// If the given cacheSessionP cannot be found, this method will return false, and a nil CacheResponseProcessor. If the +// givenCacheSessionP is found this method will return true, and the found CacheResponseProcessor. +// This method is unsafe because it is not thread safe and requires the directMessageReceiverImpl.cacheLock mutex to be +// be held for thread safety to be guaranteed. +func (receiver *directMessageReceiverImpl) loadCacheResponseProcessorFromMapUnsafe(cacheSessionP ccsmp.SolClientCacheSessionPt) (CacheResponseProcessor, bool) { + var nilComparison CacheResponseProcessor + if cacheResponseProcessor := receiver.cacheSessionMap[cacheSessionP]; cacheResponseProcessor == nilComparison { + if receiver.logger.IsDebugEnabled() { + receiver.logger.Debug(fmt.Sprintf("%s 0x%x", constants.FailedToRetrieveCacheResponseProcessor, cacheSessionP)) + } + return nil, false + } else { + return cacheResponseProcessor, true + } +} + +// storeCacheResponseProcessorFromMapUnsafe associates the given cacheResponseProcessor with the given cacheSessionP. +// This method is unsafe because it is not thread safe and requires the directMessageReceiverImpl.cacheLock mutex to be +// be held for thread safety to be guaranteed. +func (receiver *directMessageReceiverImpl) storeCacheResponseProcessorFromMapUnsafe(cacheSessionP ccsmp.SolClientCacheSessionPt, cacheResponseProcessor CacheResponseProcessor) { + receiver.cacheSessionMap[cacheSessionP] = cacheResponseProcessor +} + +// deleteCacheResponseProcessorFromMapUnsafe dissociates the previously configured cacheSessionP/CacheResponseHolder pair, if the given cacheSessionP exists. If it does not exist, this function is a no-op. +// This method is unsafe because it is not thread safe and requires the directMessageReceiverImpl.cacheLock mutex to be +// be held for thread safety to be guaranteed. +func (receiver *directMessageReceiverImpl) deleteCacheResponseProcessorFromMapUnsafe(cacheSessionP ccsmp.SolClientCacheSessionPt) { + delete(receiver.cacheSessionMap, cacheSessionP) +} + func (receiver *directMessageReceiverImpl) addCacheSessionToMapIfNotPresent(holder CacheResponseProcessor, cacheSessionP ccsmp.SolClientCacheSessionPt) error { - var err error - err = nil - if preExistingCacheSession, found := receiver.cacheSessionMap.Load(cacheSessionP); found { - /* Pre-existing cache session found. This error is fatal to the operation but not to the API since we can - * this does not block other activities like subscribing or trying to send a distint cache request, but does - * prevent the API from indexing the cache session which is necessary for tracking cache request lifecycles. - */ - err = solace.NewError(&solace.IllegalStateError{}, fmt.Sprintf("The application API to create a new cache request using cache session [%d] but another cache request's cache session [%d] already exists.", cacheSessionP, preExistingCacheSession), nil) - return err - } - /* No pre-existing cache session found, we can index the current one and continue. */ - receiver.cacheSessionMap.Store(cacheSessionP, holder) - return err + var err error + err = nil + if _, found := receiver.loadCacheResponseProcessorFromMapUnsafe(cacheSessionP); found { + /* Pre-existing cache session found. This error is fatal to the operation but not to the API since we can + * this does not block other activities like subscribing or trying to send a distint cache request, but does + * prevent the API from indexing the cache session which is necessary for tracking cache request lifecycles. + */ + err = solace.NewError(&solace.IllegalStateError{}, + fmt.Sprintf("%s [0x%x] %s", constants.ApplicationTriedToCreateCacheRequest, cacheSessionP, constants.AnotherCacheSessionAlreadyExists), nil) + return err + } + /* No pre-existing cache session found, we can index the current one and continue. */ + receiver.storeCacheResponseProcessorFromMapUnsafe(cacheSessionP, holder) + return err } // isAvailableForCache returns true if the receiver is ready to send a cache request, or false if it is not. func (receiver *directMessageReceiverImpl) checkStateForCacheRequest() error { - var err error = nil - var errorString string = "" - receiverState := receiver.getState() - if receiverState != messageReceiverStateStarted { - errorString = fmt.Sprintf("Could not perform cache operations because receiver was in state %s instead of the started state.", messageReceiverStateNames[receiverState]) - err = solace.NewError(&solace.IllegalStateError{}, errorString, nil) - } else if !receiver.internalReceiver.IsRunning() { - /* NOTE: it would be great if we could provide a more detailed error string here, but - * ccsmpBackedReceiver.IsRunning() only returns a boolean, so we can't say more than we already have. - */ - errorString = "Could not perform cache operations because messaging service was not running." - err = solace.NewError(&solace.IllegalStateError{}, errorString, nil) - } else if receiver.cacheResponseChanCounter.Load() >= cacheResponseChannelMaxSize { - errorString = fmt.Sprintf("Could not perform cache operations because more than %d cache resonses are still waiting to be processed by the application.", cacheResponseChannelMaxSize) - err = solace.NewError(&solace.IllegalStateError{}, errorString, nil) - } - if errorString != "" { - /* Warn log because application tried to conduct operation without properly configuring the object. */ - receiver.logger.Warning(errorString) - } - return err + var err error + var errorString string = "" + receiverState := receiver.getState() + if receiverState != messageReceiverStateStarted { + errorString = fmt.Sprintf("Could not perform cache operations because receiver was in state %s instead of the started state.", messageReceiverStateNames[receiverState]) + err = solace.NewError(&solace.IllegalStateError{}, errorString, nil) + } else if !receiver.internalReceiver.IsRunning() { + /* NOTE: it would be great if we could provide a more detailed error string here, but + * ccsmpBackedReceiver.IsRunning() only returns a boolean, so we can't say more than we already have. + */ + errorString = "Could not perform cache operations because messaging service was not running." + err = solace.NewError(&solace.IllegalStateError{}, errorString, nil) + } else if receiver.cacheResponseChanCounter.Load() >= cacheResponseChannelMaxSize { + errorString = fmt.Sprintf("Could not perform cache operations because more than %d cache responses are still waiting to be processed by the application.", cacheResponseChannelMaxSize) + err = solace.NewError(&solace.IllegalStateError{}, errorString, nil) + } + if errorString != "" { + /* Warn log because application tried to conduct operation without properly configuring the object. */ + receiver.logger.Warning(errorString) + } + return err } -func (receiver * directMessageReceiverImpl) initCacheResourcesIfNotPresent() { - receiver.logger.Debug(fmt.Sprintf("cacheResponseChan before assignment is nil: %t\n", receiver.cacheResponseChan == nil)) - if receiver.cacheResponseChan == nil { - receiver.cacheResponseChan = make(chan ccsmp.SolClientCacheEventInfo, cacheResponseChannelMaxSize) - receiver.logger.Debug(fmt.Sprintf("cacheResponseChan after assignment is nil: %t\n", receiver.cacheResponseChan == nil)) - } - if !receiver.cachePollingRunning.Load() { - go receiver.PollAndProcessCacheResponseChannel() - if receiver.logger.IsDebugEnabled() { - receiver.logger.Debug("Started go routine for polling cache response channel.") - } - } else { - if receiver.logger.IsDebugEnabled() { - receiver.logger.Debug("Didn't start go routine for polling cache response channel again because it is already running.") - } - } +func (receiver *directMessageReceiverImpl) initCacheResourcesIfNotPresent() { + if receiver.cacheResponseChan == nil { + receiver.cacheResponseChan = make(chan ccsmp.SolClientCacheEventInfo, cacheResponseChannelMaxSize) + } + if !receiver.cachePollingRunning.Load() { + go receiver.PollAndProcessCacheResponseChannel() + if receiver.logger.IsDebugEnabled() { + receiver.logger.Debug(constants.StartedCachePolling) + } + } else { + if receiver.logger.IsDebugEnabled() { + receiver.logger.Debug(constants.DidntStartCachePolling) + } + } + if receiver.cacheSessionMap == nil { + receiver.cacheSessionMap = make(map[ccsmp.SolClientCacheSessionPt]CacheResponseProcessor) + if receiver.logger.IsDebugEnabled() { + receiver.logger.Debug(constants.InitializedReceiverCacheSessionMap) + } + } } -func (receiver * directMessageReceiverImpl) generateCacheRequestCancellationNotice(cacheSessionP ccsmp.SolClientCacheSessionPt, cacheResponseProcessor CacheResponseProcessor, errorInfo * ccsmp.SolClientErrorInfoWrapper) ccsmp.SolClientCacheEventInfo { - if receiver.logger.IsDebugEnabled() { - receiver.logger.Debug(constants.AttemptingCancellationNoticeGeneration) - } - cacheEventInfo := ccsmp.NewConfiguredSolClientCacheEventInfo(cacheSessionP, cacheResponseProcessor.GetCacheRequestInfo().GetCacheRequestID(), cacheResponseProcessor.GetCacheRequestInfo().GetTopic(), core.ToNativeError(errorInfo, constants.FailedToCancelCacheRequest)) - return cacheEventInfo +func (receiver *directMessageReceiverImpl) generateCacheRequestCancellationNotice(cacheSessionP ccsmp.SolClientCacheSessionPt, cacheResponseProcessor CacheResponseProcessor, errorInfo *ccsmp.SolClientErrorInfoWrapper) ccsmp.SolClientCacheEventInfo { + if receiver.logger.IsDebugEnabled() { + receiver.logger.Debug(constants.AttemptingCancellationNoticeGeneration) + } + cacheEventInfo := ccsmp.NewConfiguredSolClientCacheEventInfo(cacheSessionP, cacheResponseProcessor.GetCacheRequestInfo().GetCacheRequestID(), cacheResponseProcessor.GetCacheRequestInfo().GetTopic(), core.ToNativeError(errorInfo, constants.FailedToCancelCacheRequest)) + return cacheEventInfo } // cancelAllPendingCacheRequests will cancel all pending cache requests and potentially block until all cancellations // are pushed to the cacheResponse channel. -func (receiver * directMessageReceiverImpl) cancelAllPendingCacheRequests() { - receiver.cacheSessionMap.Range( func(uncastedCacheSessionP, uncastedCacheResponseHolder interface{}) bool { - cacheSessionP := ccsmp.SolClientCacheSessionPt(&uncastedCacheSessionP) - cacheResponseHolder := uncastedCacheResponseHolder.(CacheResponseProcessor) - errorInfo := ccsmp.CancelCacheRequest(cacheSessionP) - if errorInfo.ReturnCode != ccsmp.SolClientReturnCodeOk { - /* FFC: might need to add additional checking for subcodes or error strings specific to a CCSMP - * trying to cancel cache requests on an invalid cache session. */ - /* There was a failure in cancelling the cache request, but we still - * have a cache session pointer, so we generate a cache response to notify - * the application that something went wrong and defer destroying the cache - * session to a later point.*/ - /* FFC: Maybe this should be a different log level? */ - receiver.logger.Warning(fmt.Sprintf("%s %s %d and %s %d.", constants.FailedToCancelCacheRequest, constants.WithCacheRequestID, cacheResponseHolder.GetCacheRequestInfo().GetCacheRequestID(), constants.WithCacheSessionPointer, cacheSessionP)) - generatedEvent := receiver.generateCacheRequestCancellationNotice(cacheSessionP, cacheResponseHolder, errorInfo) - /* WARNING: This will block if the next cache response in the channel is associated with a - * cache request that the application elected to process their cache responses through - * a callback and the channel is full, until the application finishes processing the event - * through that callback.*/ - receiver.cacheResponseChan <- generatedEvent - } - return true - }) +func (receiver *directMessageReceiverImpl) cancelAllPendingCacheRequests() { + for cacheSessionP, cacheResponseProcessor := range receiver.cacheSessionMap { + errorInfo := ccsmp.CancelCacheRequest(cacheSessionP) + if errorInfo != nil { + if errorInfo.ReturnCode != ccsmp.SolClientReturnCodeOk { + /* FFC: might need to add additional checking for subcodes or error strings specific to a CCSMP + * trying to cancel cache requests on an invalid cache session. */ + /* There was a failure in cancelling the cache request, but we still + * have a cache session pointer, so we generate a cache response to notify + * the application that something went wrong and defer destroying the cache + * session to a later point.*/ + /* FFC: Maybe this should be a different log level? */ + receiver.logger.Warning(fmt.Sprintf("%s %s %d and %s %d.", constants.FailedToCancelCacheRequest, constants.WithCacheRequestID, cacheResponseProcessor.GetCacheRequestInfo().GetCacheRequestID(), constants.WithCacheSessionPointer, cacheSessionP)) + generatedEvent := receiver.generateCacheRequestCancellationNotice(cacheSessionP, cacheResponseProcessor, errorInfo) + /* WARNING: This will block if the next cache response in the channel is associated with a + * cache request that the application elected to process their cache responses through + * a callback and the channel is full, until the application finishes processing the event + * through that callback.*/ + receiver.cacheResponseChanCounter.Add(1) + receiver.cacheResponseChan <- generatedEvent + } + } + } } // teardownCache is used to clean up cache-related resources as a part of termination. This method assumes @@ -999,151 +1035,137 @@ func (receiver * directMessageReceiverImpl) cancelAllPendingCacheRequests() { // of its parents should hold the state for it. /* WARNING: If the application has submitted any cache requests with a callback for processing, this function will * block until all the callbacks are processed. */ -func (receiver * directMessageReceiverImpl) teardownCache() { - if running := receiver.cachePollingRunning.Load(); !running { - /* We can return early since either the resources and shutdown are being handled by a - * different thread right now, or it's already been done before. */ - return - } - receiver.cacheLock.Lock() - defer receiver.cacheLock.Unlock() - if running := receiver.cachePollingRunning.Load(); !running { - /* We can return early since either the resources and shutdown are being handled by a - * different thread right now, or it's already been done before. */ - return - } - // We don't need to do additional work to prevent the application from submitting cache requests - // since this method assumes that the receiver is already in the terminating state, which should - // prevent further cache requests from being submitted because of the state check in - // directMessageReceiverImpl.checkStateForCacheRequest(). - - /* For all cache sessions remaining in the map, issue CCSMP cancellation. Possibly on go routine? check python */ - receiver.cancelAllPendingCacheRequests() - /* For all cache sessions remaining in the map, generate cancellation events for them and put them - * on the cacheResponseChan. These should get to the chan after the actual CCSMP cancellations. Edit - * ProcessCacheEvent if necessary to ignore events without a corresponding cache session so that if - * any duplicates are generated, they are ignored by ProcessCacheEvent and not passed to the - * application.*/ - /* Release the mutex and then shutdown the PollAndProcessCacheResponseChannel goroutine and - * cacheResponseChan. */ - close(receiver.cacheResponseChan) - for len(receiver.cacheResponseChan) > 0 { - receiver.ProcessCacheEvent(<-receiver.cacheResponseChan) - } +func (receiver *directMessageReceiverImpl) teardownCache() { + if running := receiver.cachePollingRunning.Load(); !running { + /* We can return early since either the resources and shutdown are being handled by a + * different thread right now, or it's already been done before. */ + return + } + receiver.cacheLock.Lock() + if running := receiver.cachePollingRunning.Load(); !running { + /* We can return early since either the resources and shutdown are being handled by a + * different thread right now, or it's already been done before. */ + return + } + // We don't need to do additional work to prevent the application from submitting cache requests + // since this method assumes that the receiver is already in the terminating state, which should + // prevent further cache requests from being submitted because of the state check in + // directMessageReceiverImpl.checkStateForCacheRequest(). + + /* For all cache sessions remaining in the map, issue CCSMP cancellation. Possibly on go routine? check python */ + receiver.cancelAllPendingCacheRequests() + /* For all cache sessions remaining in the map, generate cancellation events for them and put them + * on the cacheResponseChan. These should get to the chan after the actual CCSMP cancellations. Edit + * ProcessCacheEvent if necessary to ignore events without a corresponding cache session so that if + * any duplicates are generated, they are ignored by ProcessCacheEvent and not passed to the + * application.*/ + /* Release the mutex and then shutdown the PollAndProcessCacheResponseChannel goroutine and + * cacheResponseChan. */ + close(receiver.cacheResponseChan) + /* NOTE: Release the mutex so that `ProcessCacheEvent()` can clear the channel.*/ + receiver.cacheLock.Unlock() + for len(receiver.cacheResponseChan) > 0 { + receiver.ProcessCacheEvent(<-receiver.cacheResponseChan) + } } // requestCached assumes that the directMessageReceiverImpl.cacheLock is being held by the caller. func (receiver *directMessageReceiverImpl) requestCached(cachedMessageSubscriptionRequest resource.CachedMessageSubscriptionRequest, cacheRequestID apimessage.CacheRequestID, cacheResponseProcessor CacheResponseProcessor) error { - var err error - err = nil - - receiver.logger.Debug("Got to requestCached") - - /* Check receiver and messaging service state */ - if err:= receiver.checkStateForCacheRequest(); err != nil { - return err - } - - receiver.logger.Debug("Got past state check") - - /* init cache resources if not present (tables, etc.) */ - receiver.initCacheResourcesIfNotPresent() - - receiver.logger.Debug("Got past resource init") - - /* create cache session */ - /* TODO: Once the CachedMessageSubscriptionRequest PR is merged to the feature branch, remove propsList and use cachedMessageSubscriptionRequest instead. */ - propsList := []string{ - ccsmp.SolClientCacheSessionPropCacheName, cachedMessageSubscriptionRequest.GetCacheName(), - ccsmp.SolClientCacheSessionPropMaxAge, strconv.FormatInt(int64(cachedMessageSubscriptionRequest.GetCachedMessageAge()), 10), - ccsmp.SolClientCacheSessionPropMaxMsgs, strconv.FormatInt(int64(cachedMessageSubscriptionRequest.GetMaxCachedMessages()), 10), - ccsmp.SolClientCacheSessionPropRequestreplyTimeoutMs, strconv.FormatInt(int64(cachedMessageSubscriptionRequest.GetCacheAccessTimeout()), 10), - } - /* We don't need to release the mutex on calls to CreateCacheSession because there are no callbacks. */ - cacheSessionP, errInfo := ccsmp.CreateCacheSession(propsList, receiver.internalReceiver.GetSessionPointer()) - - fmt.Printf("cacheSessionP after CreateCacheSession is %p", cacheSessionP) - - receiver.logger.Debug("Created Cache Session") - - if errInfo != nil { - errorString := constants.FailedToCreateCacheSession + constants.WithCacheRequestID + fmt.Sprintf("%d", cacheRequestID) - receiver.logger.Warning(errorString) - return core.ToNativeError(errInfo, errorString) - } - - /* store cache session in table with channel */ - if err = receiver.addCacheSessionToMapIfNotPresent(cacheResponseProcessor, cacheSessionP); err != nil { - return err - } - - receiver.logger.Debug("Added cache session to map") - - /* Run go routine that sends cache request */ - var cacheEventCallback ccsmp.SolClientCacheEventCallback = func(cacheEventInfo ccsmp.SolClientCacheEventInfo) { - fmt.Printf("Got to receiver event callback") - receiver.cacheResponseChanCounter.Add(1) - receiver.cacheResponseChan <- cacheEventInfo - fmt.Printf("pushed cacheEventInfo onto channel") - } - cacheStrategy := cachedMessageSubscriptionRequest.GetCachedMessageSubscriptionRequestStrategy() - if cacheStrategy == nil { - errorString := fmt.Sprintf("%s %s %d and %s %d because %s", constants.FailedToSendCacheRequest, constants.WithCacheRequestID, cacheRequestID, constants.WithCacheSessionPointer, cacheSessionP, constants.InvalidCachedMessageSubscriptionStrategyPassed) - receiver.logger.Warning(errorString) - return solace.NewError(&solace.IllegalArgumentError{}, errorString, nil) - } - /* FFC: Do we need to release the mutex during calls to SendCacheRequest? */ - fmt.Printf("Sending cache request") - errInfo = ccsmp.SendCacheRequest(receiver.dispatch, - cacheSessionP, - cachedMessageSubscriptionRequest.GetName(), - cacheRequestID, - ccsmp.CachedMessageSubscriptionRequestStrategyMappingToCCSMPCacheRequestFlags[*cacheStrategy], - ccsmp.CachedMessageSubscriptionRequestStrategyMappingToCCSMPSubscribeFlags[*cacheStrategy], - cacheEventCallback) - fmt.Printf("Finished sending cache request") - if errInfo != nil { - errorString := fmt.Sprintf("%s %s %d and %s %p", constants.FailedToSendCacheRequest, constants.WithCacheRequestID, cacheRequestID, constants.WithCacheSessionPointer, cacheSessionP) - receiver.logger.Warning(errorString) - return core.ToNativeError(errInfo, errorString) - } - - receiver.logger.Debug("Sent cache request") - - return err + var err error + err = nil + + /* Check receiver and messaging service state */ + if err := receiver.checkStateForCacheRequest(); err != nil { + return err + } + + /* init cache resources if not present (tables, etc.) */ + receiver.initCacheResourcesIfNotPresent() + + /* create cache session */ + propsList := []string{ + ccsmp.SolClientCacheSessionPropCacheName, cachedMessageSubscriptionRequest.GetCacheName(), + ccsmp.SolClientCacheSessionPropMaxAge, strconv.FormatInt(int64(cachedMessageSubscriptionRequest.GetCachedMessageAge()), 10), + ccsmp.SolClientCacheSessionPropMaxMsgs, strconv.FormatInt(int64(cachedMessageSubscriptionRequest.GetMaxCachedMessages()), 10), + ccsmp.SolClientCacheSessionPropRequestreplyTimeoutMs, strconv.FormatInt(int64(cachedMessageSubscriptionRequest.GetCacheAccessTimeout()), 10), + } + /* We don't need to release the mutex on calls to CreateCacheSession because there are no callbacks. */ + cacheSessionP, errInfo := ccsmp.CreateCacheSession(propsList, receiver.internalReceiver.GetSessionPointer()) + + if errInfo != nil { + errorString := constants.FailedToCreateCacheSession + constants.WithCacheRequestID + fmt.Sprintf("%d", cacheRequestID) + receiver.logger.Warning(errorString) + return core.ToNativeError(errInfo, errorString) + } + + /* store cache session in table with channel */ + if err = receiver.addCacheSessionToMapIfNotPresent(cacheResponseProcessor, cacheSessionP); err != nil { + return err + } + + /* Run go routine that sends cache request */ + var cacheEventCallback ccsmp.SolClientCacheEventCallback = func(cacheEventInfo ccsmp.SolClientCacheEventInfo) { + receiver.cacheResponseChanCounter.Add(1) + receiver.cacheResponseChan <- cacheEventInfo + } + cacheStrategy := cachedMessageSubscriptionRequest.GetCachedMessageSubscriptionRequestStrategy() + if cacheStrategy == nil { + errorString := fmt.Sprintf("%s %s %d and %s %d because %s", constants.FailedToSendCacheRequest, constants.WithCacheRequestID, cacheRequestID, constants.WithCacheSessionPointer, cacheSessionP, constants.InvalidCachedMessageSubscriptionStrategyPassed) + receiver.logger.Warning(errorString) + return solace.NewError(&solace.IllegalArgumentError{}, errorString, nil) + } + errInfo = ccsmp.SendCacheRequest(receiver.dispatch, + cacheSessionP, + cachedMessageSubscriptionRequest.GetName(), + cacheRequestID, + ccsmp.CachedMessageSubscriptionRequestStrategyMappingToCCSMPCacheRequestFlags[*cacheStrategy], + ccsmp.CachedMessageSubscriptionRequestStrategyMappingToCCSMPSubscribeFlags[*cacheStrategy], + cacheEventCallback) + if errInfo != nil { + errorString := fmt.Sprintf("%s %s %d and %s 0x%x", constants.FailedToSendCacheRequest, constants.WithCacheRequestID, cacheRequestID, constants.WithCacheSessionPointer, cacheSessionP) + receiver.logger.Warning(errorString) + return core.ToNativeError(errInfo, errorString) + } + + receiver.logger.Debug("Sent cache request") + + return err } -func (receiver *directMessageReceiverImpl) RequestCachedAsync(cachedMessageSubscriptionRequest resource.CachedMessageSubscriptionRequest, cacheRequestID apimessage.CacheRequestID) (<- chan solace.CacheResponse, error) { - receiver.cacheLock.Lock() - defer receiver.cacheLock.Unlock() - fmt.Printf("Acquired lock in RequestCachedAsync\n") - chHolder := NewCacheResponseChannelHolder(make(chan solace.CacheResponse, 1), NewCacheRequestInfo(cacheRequestID, cachedMessageSubscriptionRequest.GetName())) - fmt.Printf("Created channel holder in RequestCachedAsync\n") - /* We don't need to check the channel that is returned here since this functionality is tested through unit - * testing and because we just instantiated the channel ourselves. */ - if channel, ok := chHolder.GetChannel(); ok { - fmt.Printf("Got channel in RequestAsync, calling requestCached() now\n") - return channel, receiver.requestCached(cachedMessageSubscriptionRequest, cacheRequestID, chHolder) - } - /* NOTE: We should never get to this point since we know we just set the holder, but we need to include error - * handling just in case, so that the program doesn't panic.*/ - errorString := constants.FailedToRetrieveChannel - receiver.logger.Error(errorString) - return nil, solace.NewError(&solace.OperationFailedError{}, errorString, nil) +func (receiver *directMessageReceiverImpl) RequestCachedAsync(cachedMessageSubscriptionRequest resource.CachedMessageSubscriptionRequest, cacheRequestID apimessage.CacheRequestID) (<-chan solace.CacheResponse, error) { + receiver.cacheLock.Lock() + defer receiver.cacheLock.Unlock() + chHolder := NewCacheResponseChannelHolder(make(chan solace.CacheResponse, 1), NewCacheRequestInfo(cacheRequestID, cachedMessageSubscriptionRequest.GetName())) + /* We don't need to check the channel that is returned here since this functionality is tested through unit + * testing and because we just instantiated the channel ourselves. */ + if channel, ok := chHolder.GetChannel(); ok { + err := receiver.requestCached(cachedMessageSubscriptionRequest, cacheRequestID, chHolder) + if err != nil { + /* NOTE: Rely on requestCached to log error. */ + close(channel) + return nil, err + } + return channel, err + } + /* NOTE: We should never get to this point since we know we just set the holder, but we need to include error + * handling just in case, so that the program doesn't panic.*/ + errorString := constants.FailedToRetrieveChannel + receiver.logger.Error(errorString) + return nil, solace.NewError(&solace.OperationFailedError{}, errorString, nil) } func (receiver *directMessageReceiverImpl) RequestCachedAsyncWithCallback(cachedMessageSubscriptionRequest resource.CachedMessageSubscriptionRequest, cacheRequestID apimessage.CacheRequestID, callback func(solace.CacheResponse)) error { - receiver.cacheLock.Lock() - defer receiver.cacheLock.Unlock() - cbHolder := NewCacheResponseCallbackHolder(callback, NewCacheRequestInfo(cacheRequestID, cachedMessageSubscriptionRequest.GetName())) - if _, ok := cbHolder.GetCallback(); ok { - return receiver.requestCached(cachedMessageSubscriptionRequest, cacheRequestID, cbHolder) - } - /* NOTE: We should never get to this point since we know we just set the holder, but we need to include error - * handling just in case, so that the program doesn't panic.*/ - errorString := constants.FailedToRetrieveCallback - receiver.logger.Error(errorString) - return solace.NewError(&solace.OperationFailedError{}, errorString, nil) + receiver.cacheLock.Lock() + defer receiver.cacheLock.Unlock() + cbHolder := NewCacheResponseCallbackHolder(callback, NewCacheRequestInfo(cacheRequestID, cachedMessageSubscriptionRequest.GetName())) + if _, ok := cbHolder.GetCallback(); ok { + return receiver.requestCached(cachedMessageSubscriptionRequest, cacheRequestID, cbHolder) + } + /* NOTE: We should never get to this point since we know we just set the holder, but we need to include error + * handling just in case, so that the program doesn't panic.*/ + errorString := constants.FailedToRetrieveCallback + receiver.logger.Error(errorString) + return solace.NewError(&solace.OperationFailedError{}, errorString, nil) } type directMessageReceiverBuilderImpl struct { diff --git a/internal/impl/receiver/message_receiver_impl_test.go b/internal/impl/receiver/message_receiver_impl_test.go index 76e1531..17d5026 100644 --- a/internal/impl/receiver/message_receiver_impl_test.go +++ b/internal/impl/receiver/message_receiver_impl_test.go @@ -298,7 +298,7 @@ type mockInternalReceiver struct { // GetSessionPointer needs to be implemented only to satisfy the interface. Retrieving a session pointer would // require an actual session, which would go beyond the scope of testing intended for this module. func (mock *mockInternalReceiver) GetSessionPointer() ccsmp.SolClientSessionPt { - return nil + return ccsmp.SolClientOpaquePointerInvalidValue } func (mock *mockInternalReceiver) Events() core.Events { diff --git a/pkg/solace/receiver_cache_requests.go b/pkg/solace/receiver_cache_requests.go index bf9c0da..df6b35c 100644 --- a/pkg/solace/receiver_cache_requests.go +++ b/pkg/solace/receiver_cache_requests.go @@ -17,8 +17,8 @@ package solace import ( - "solace.dev/go/messaging/pkg/solace/resource" - "solace.dev/go/messaging/pkg/solace/message" + "solace.dev/go/messaging/pkg/solace/message" + "solace.dev/go/messaging/pkg/solace/resource" ) // ReceiverCacheRequests Provides an interface through which the application can request cached messages from a cache. @@ -32,19 +32,19 @@ import ( // resulting from outstanding cache requests. Data messages related to the cache response willbe passed through the // conventional [Receiver] interfaces of [Receive()] and [ReceiveAsync()]. type ReceiverCacheRequests interface { - /* TODO: Check the error types in this doc string are correct. */ + /* TODO: Check the error types in this doc string are correct. */ - // RequestCachedAsync asynchronously requests cached data from a cache and defers processing of the resulting - // cache response to the application throufh the returned channel. - // Returns PubSubPlusClientError if the operation could not be performed. - // Returns IllegalStateError if the service is not connected or the receiver is not running. - RequestCachedAsync(cachedMessageSubscriptionRequest resource.CachedMessageSubscriptionRequest, cacheRequestID message.CacheRequestID) (<- chan CacheResponse, error) + // RequestCachedAsync asynchronously requests cached data from a cache and defers processing of the resulting + // cache response to the application throufh the returned channel. + // Returns IllegalStateError if the service is not connected or the receiver is not running. + // Returns InvalidConfigurationError if an invalid [resource.CachedMessageSubscriptionRequest] was passed. + RequestCachedAsync(cachedMessageSubscriptionRequest resource.CachedMessageSubscriptionRequest, cacheRequestID message.CacheRequestID) (<-chan CacheResponse, error) - /* TODO: Check the error types in this doc string are correct. */ + /* TODO: Check the error types in this doc string are correct. */ - // RequestCachedAsyncWithCallback asynchronously requests cached data from a cache and processes the resulting - // cache response through the provided function callback. - // Returns PubSubPlusClientError if the operation could not be performed. - // Returns IllegalStateError if the service is not connected or the receiver is not running. - RequestCachedAsyncWithCallback(cachedMessageSubscriptionRequest resource.CachedMessageSubscriptionRequest, cacheRequestID message.CacheRequestID, callback func(CacheResponse)) error + // RequestCachedAsyncWithCallback asynchronously requests cached data from a cache and processes the resulting + // cache response through the provided function callback. + // Returns IllegalStateError if the service is not connected or the receiver is not running. + // Returns InvalidConfigurationError if an invalid [resource.CachedMessageSubscriptionRequest] was passed. + RequestCachedAsyncWithCallback(cachedMessageSubscriptionRequest resource.CachedMessageSubscriptionRequest, cacheRequestID message.CacheRequestID, callback func(CacheResponse)) error } diff --git a/pkg/solace/resource/destination.go b/pkg/solace/resource/destination.go index 49d286b..97f5303 100644 --- a/pkg/solace/resource/destination.go +++ b/pkg/solace/resource/destination.go @@ -288,11 +288,11 @@ func NewCachedMessageSubscriptionRequest(cachedMessageSubscriptionStrategy Cache var cachedMsgSubStrategy *CachedMessageSubscriptionStrategy = nil switch cachedMessageSubscriptionStrategy { case AsAvailable: - fallthrough + fallthrough case CachedFirst: - fallthrough + fallthrough case CachedOnly: - fallthrough + fallthrough case LiveCancelsCached: // these are valid cachedMsgSubStrategy = &cachedMessageSubscriptionStrategy diff --git a/test/cache_test.go b/test/cache_test.go index 5e604d1..3ad6ef6 100644 --- a/test/cache_test.go +++ b/test/cache_test.go @@ -18,7 +18,7 @@ package test import ( "fmt" - //"time" + "time" "solace.dev/go/messaging" "solace.dev/go/messaging/pkg/solace" @@ -27,7 +27,7 @@ import ( "solace.dev/go/messaging/pkg/solace/message" "solace.dev/go/messaging/pkg/solace/metrics" "solace.dev/go/messaging/pkg/solace/resource" - //"solace.dev/go/messaging/pkg/solace/subcode" + "solace.dev/go/messaging/pkg/solace/subcode" "solace.dev/go/messaging/test/helpers" "solace.dev/go/messaging/test/testcontext" @@ -46,494 +46,881 @@ func CheckCacheProxy() { Skip("The infrastructure required for running cache proxy tests is not available, skipping this test since it requires a cache proxy.") } } + var _ = Describe("Cache Strategy", func() { - logging.SetLogLevel(logging.LogLevelDebug) - var messagingService solace.MessagingService - var receiver solace.DirectMessageReceiver - Describe("When the cache is available and configured", func() { - BeforeEach(func() { - logging.SetLogLevel(logging.LogLevelDebug) - CheckCache() // skips test with message if cache image is not available - helpers.InitAllCacheClustersWithMessages() - var err error - messagingService, err = messaging.NewMessagingServiceBuilder().FromConfigurationProvider(helpers.DefaultCacheConfiguration()).Build() - Expect(err).To(BeNil()) - }) - AfterEach(func () { - var err error - if messagingService.IsConnected() { - err = messagingService.Disconnect() - Expect(err).To(BeNil()) - } - Expect(messagingService.IsConnected()).To(BeFalse()) - if receiver.IsRunning() { - err = receiver.Terminate(0) - Expect(err).To(BeNil()) - } - Expect(receiver.IsRunning()).To(BeFalse()) - Expect(receiver.IsTerminated()).To(BeTrue()) - }) - DescribeTable("a direct receiver should be able to submit a valid cache request, receive a response, and terminate", - /* name test_async_cache_req_with_live_data. */ - func(strategy resource.CachedMessageSubscriptionStrategy, cacheResponseProcessStrategy helpers.CacheResponseProcessStrategy) { - logging.SetLogLevel(logging.LogLevelDebug) - strategyString := "" - switch strategy { - case resource.AsAvailable: - strategyString = "AsAvailable" - case resource.CachedOnly: - strategyString = "CachedOnly" - case resource.CachedFirst: - strategyString = "CachedFirst" - case resource.LiveCancelsCached: - strategyString = "LiveCancelsCached" - } - //time.Sleep(time.Second * 100) - numExpectedCachedMessages := 3 - numExpectedLiveMessages := 1 - numSentCacheRequests := 1 - numExpectedCacheResponses := numSentCacheRequests - numExpectedSentMessages := 0 - totalMessagesReceived := 0 - numExpectedReceivedMessages := numExpectedSentMessages - switch strategy { - case resource.AsAvailable: - numExpectedReceivedMessages += numExpectedCachedMessages - numExpectedReceivedMessages += numExpectedLiveMessages - case resource.LiveCancelsCached: - numExpectedReceivedMessages += numExpectedLiveMessages - case resource.CachedFirst: - numExpectedReceivedMessages += numExpectedCachedMessages - numExpectedReceivedMessages += numExpectedLiveMessages - case resource.CachedOnly: - numExpectedReceivedMessages += numExpectedCachedMessages - } - numExpectedSentDirectMessages := numSentCacheRequests + numExpectedSentMessages - err := messagingService.Connect() - defer func() { - messagingService.Disconnect() - }() - Expect(err).To(BeNil()) - receiver, err := messagingService.CreateDirectMessageReceiverBuilder().Build() - defer func() { - receiver.Terminate(0) - }() - Expect(err).To(BeNil()) - err = receiver.Start() - Expect(err).To(BeNil()) - topic := fmt.Sprintf("MaxMsgs%d/%s/data1", numExpectedCachedMessages, testcontext.Cache().Vpn) - cacheName := fmt.Sprintf("MaxMsgs%d/delay=2000,msgs=%d", numExpectedCachedMessages, numExpectedLiveMessages) - cacheRequestConfig := helpers.GetValidCacheRequestConfig(strategy, cacheName, topic) - cacheRequestID := message.CacheRequestID(1) - receivedMsgChan := make(chan message.InboundMessage, 3) - defer close(receivedMsgChan) - receiver.ReceiveAsync(func(msg message.InboundMessage) { - fmt.Printf("Received message from test:\n%s\n", msg.String()) - receivedMsgChan <- msg - }) - switch cacheResponseProcessStrategy { - case helpers.ProcessCacheResponseThroughChannel: - cacheResponseChan, err := receiver.RequestCachedAsync(cacheRequestConfig, cacheRequestID) - Expect(err).To(BeNil()) - for i := 0; i < numExpectedCacheResponses; i++ { - Eventually(cacheResponseChan, "10s").Should(Receive()) - fmt.Printf("Got response %d for channel\n", i) - } - case helpers.ProcessCacheResponseThroughCallback: - cacheResponseSignalChan := make(chan solace.CacheResponse, 1) - defer close(cacheResponseSignalChan) - cacheResponseCallback := func (cacheResponse solace.CacheResponse) { - cacheResponseSignalChan <- cacheResponse - } - err = receiver.RequestCachedAsyncWithCallback(cacheRequestConfig, cacheRequestID, cacheResponseCallback) - Expect(err).To(BeNil()) - for i := 0; i < numExpectedCacheResponses; i++ { - Eventually(cacheResponseSignalChan, "10s").Should(Receive()) - fmt.Printf("Got response %d for callback\n", i) - } - default: - Fail(fmt.Sprintf("Got unexpected CacheResponseProcessStrategy %d", cacheResponseProcessStrategy)) - } - for i := 0; i < numExpectedReceivedMessages; i ++ { - Eventually(receivedMsgChan, "10s").Should(Receive()) - fmt.Printf("Found message\n") - totalMessagesReceived ++ - } - Expect(messagingService.Metrics().GetValue(metrics.CacheRequestsSent)).To(BeNumerically("==", numSentCacheRequests), fmt.Sprintf("CacheRequestsSent for %s was wrong", strategyString)) - Expect(messagingService.Metrics().GetValue(metrics.CacheRequestsSucceeded)).To(BeNumerically("==", numSentCacheRequests), fmt.Sprintf("CacheRequestsSucceeded for %s was wrong", strategyString)) - Expect(messagingService.Metrics().GetValue(metrics.CacheRequestsFailed)).To(BeNumerically("==", 0), fmt.Sprintf("CacheRequestsFailed for %s was wrong", strategyString)) - Expect(messagingService.Metrics().GetValue(metrics.DirectMessagesSent)).To(BeNumerically("==", numExpectedSentDirectMessages), fmt.Sprintf("DirectMessagesSent for %s was wrong", strategyString)) - Expect(totalMessagesReceived).To(BeNumerically("==", numExpectedReceivedMessages)) - }, - Entry("test cache RR for valid AsAvailable with channel", resource.AsAvailable, helpers.ProcessCacheResponseThroughChannel), - Entry("test cache RR for valid AsAvailable with callback", resource.AsAvailable, helpers.ProcessCacheResponseThroughCallback), - Entry("test cache RR for valid CachedFirst with channel", resource.CachedFirst, helpers.ProcessCacheResponseThroughChannel), - Entry("test cache RR for valid CachedFirst with callback", resource.CachedFirst, helpers.ProcessCacheResponseThroughCallback), - Entry("test cache RR for valid CachedOnly with channel", resource.CachedOnly, helpers.ProcessCacheResponseThroughChannel), - Entry("test cache RR for valid CachedOnly with callback", resource.CachedOnly, helpers.ProcessCacheResponseThroughCallback), - Entry("test cache RR for valid LiveCancelsCached with channel", resource.LiveCancelsCached, helpers.ProcessCacheResponseThroughChannel), - Entry("test cache RR for valid LivCancelsCached with callback", resource.LiveCancelsCached, helpers.ProcessCacheResponseThroughCallback), - ) - Describe("Lifecycle tests", func() { - var messagingService solace.MessagingService - var messageReceiver solace.DirectMessageReceiver - type terminationContext struct { - terminateFunction func(messagingService solace.MessagingService, - messageReceiver solace.DirectMessageReceiver) - configuration func() config.ServicePropertyMap - cleanupFunc func(messagingService solace.MessagingService) - - } - terminationCases := map[string]terminationContext { - /* - "messaging service disconnect": { - terminateFunction: func(messagingService solace.MessagingService, - messageReceiver solace.DirectMessageReceiver) { - Expect(messagingService.Disconnect()).To(BeNil()) - Expect(messagingService.IsConnected()).To(BeFalse()) - Eventually(messageReceiver.IsTerminated(), "5s").Should(BeTrue()) - }, - configuration: func() config.ServicePropertyMap { - return helpers.DefaultCacheConfiguration() - }, - }, - "messaging service disconnect async with channel": { - terminateFunction: func(messagingService solace.MessagingService, - messageReceiver solace.DirectMessageReceiver) { - var err error - Eventually(messagingService.DisconnectAsync(), "5s").Should(Receive(&err)) - Expect(err).To(BeNil()) - Expect(messagingService.IsConnected()).To(BeFalse()) - Eventually(messageReceiver.IsTerminated(), "5s").Should(BeTrue()) - }, - configuration: func() config.ServicePropertyMap { - return helpers.DefaultCacheConfiguration() - }, - }, - "messaging service disconnect async with callback": { - terminateFunction: func(messagingService solace.MessagingService, - messageReceiver solace.DirectMessageReceiver) { - errorChan := make(chan error) - messagingService.DisconnectAsyncWithCallback(func(err error) { - errorChan <- err - }) - select { - case err := <-errorChan: - Expect(err).To(BeNil()) - case <- time.After(time.Second * 5): - Fail("Timed out waiting for error chan from call to DisconnectAsyncWithCallback") - } - Expect(messagingService.IsConnected()).To(BeFalse()) - Eventually(messageReceiver.IsTerminated(), "5s").Should(BeTrue()) - }, - configuration: func() config.ServicePropertyMap { - return helpers.DefaultCacheConfiguration() - }, - }, - "management disconnect": { - terminateFunction: func(messagingService solace.MessagingService, - messageReceiver solace.DirectMessageReceiver) { - eventChan := make(chan solace.ServiceEvent) - messagingService.AddServiceInterruptionListener(func(event solace.ServiceEvent) { - eventChan <- event - }) - helpers.ForceDisconnectViaSEMPv2(messagingService) - select { - case event := <- eventChan: - helpers.ValidateNativeError(event.GetCause(), subcode.CommunicationError) - case <- time.After(time.Second * 10): - Fail("Timed out waiting for management disconnect") - } - Expect(messagingService.IsConnected()).To(BeFalse()) - Eventually(messageReceiver.IsTerminated(), "5s").Should(BeTrue()) - }, - configuration: func() config.ServicePropertyMap { - return helpers.DefaultCacheConfiguration() - }, - }, - "toxic disconnect": { - terminateFunction: func(messagingService solace.MessagingService, - messageReceiver solace.DirectMessageReceiver) { - eventChan := make(chan solace.ServiceEvent) - messagingService.AddServiceInterruptionListener(func(event solace.ServiceEvent) { - eventChan <- event - }) - testcontext.Toxi().SMF().Disable() - select { - case event := <- eventChan: - helpers.ValidateNativeError(event.GetCause(), subcode.CommunicationError) - case <- time.After(time.Second * 10): - Fail("Timed out waiting for toxic disconnect") - } - Expect(messagingService.IsConnected()).To(BeFalse()) - Eventually(messageReceiver.IsTerminated(), "5s").Should(BeTrue()) - }, - configuration: func() config.ServicePropertyMap { - helpers.CheckToxiProxy() - return helpers.CacheToxicConfiguration() - }, - cleanupFunc: func(_ solace.MessagingService) { - testcontext.Toxi().ResetProxies() - }, - }, - */ - "receiver terminate": { - terminateFunction: func(messagingService solace.MessagingService, - messageReceiver solace.DirectMessageReceiver) { - Expect(messageReceiver.Terminate(0)).To(BeNil()) - Expect(messagingService.IsConnected()).To(BeTrue()) - }, - configuration: func() config.ServicePropertyMap { - return helpers.DefaultCacheConfiguration() - }, - cleanupFunc: func(messagingService solace.MessagingService) { - Expect(messagingService.Disconnect()).To(BeNil()) - Expect(messagingService.IsConnected()).To(BeFalse()) - }, - }, - /* - "receiver terminate with grace period": { - terminateFunction: func(messagingService solace.MessagingService, - messageReceiver solace.DirectMessageReceiver) { - gracePeriod := time.Second * 5 - Eventually(messageReceiver.Terminate(gracePeriod), gracePeriod + 1).Should(BeNil()) - Expect(messagingService.IsConnected()).To(BeTrue()) - }, - configuration: func() config.ServicePropertyMap { - return helpers.DefaultCacheConfiguration() - }, - cleanupFunc: func(messagingService solace.MessagingService) { - Expect(messagingService.Disconnect()).To(BeNil()) - Expect(messagingService.IsConnected()).To(BeFalse()) - }, - }, - "receiver terminate async with channel": { - terminateFunction: func(messagingService solace.MessagingService, - messageReceiver solace.DirectMessageReceiver) { - gracePeriod := time.Second * 0 - var err error - Eventually(messageReceiver.TerminateAsync(gracePeriod), gracePeriod + 1).Should(Receive(&err)) - Expect(err).To(BeNil()) - Expect(messagingService.IsConnected()).To(BeTrue()) - }, - configuration: func() config.ServicePropertyMap { - return helpers.DefaultCacheConfiguration() - }, - cleanupFunc: func(messagingService solace.MessagingService) { - Expect(messagingService.Disconnect()).To(BeNil()) - Expect(messagingService.IsConnected()).To(BeFalse()) - }, - }, - "receiver terminate async with grace period with channel": { - terminateFunction: func(messagingService solace.MessagingService, - messageReceiver solace.DirectMessageReceiver) { - gracePeriod := time.Second * 5 - var err error - Eventually(messageReceiver.TerminateAsync(gracePeriod), gracePeriod + 1).Should(Receive(&err)) - Expect(err).To(BeNil()) - Expect(messagingService.IsConnected()).To(BeTrue()) - }, - configuration: func() config.ServicePropertyMap { - return helpers.DefaultCacheConfiguration() - }, - cleanupFunc: func(messagingService solace.MessagingService) { - Expect(messagingService.Disconnect()).To(BeNil()) - Expect(messagingService.IsConnected()).To(BeFalse()) - }, - }, - "receiver terminate async with callback": { - terminateFunction: func(messagingService solace.MessagingService, - messageReceiver solace.DirectMessageReceiver) { - gracePeriod := time.Second * 0 - errChan := make(chan error) - var err error - messageReceiver.TerminateAsyncCallback(gracePeriod, func (err error) { - errChan <- err - }) - Eventually(errChan, gracePeriod + 1).Should(Receive(&err)) - Expect(err).To(BeNil()) - Expect(messagingService.IsConnected()).To(BeTrue()) - }, - configuration: func() config.ServicePropertyMap { - return helpers.DefaultCacheConfiguration() - }, - cleanupFunc: func(messagingService solace.MessagingService) { - Expect(messagingService.Disconnect()).To(BeNil()) - Expect(messagingService.IsConnected()).To(BeFalse()) - }, - }, - "receiver terminate async with grace period with callback": { - terminateFunction: func(messagingService solace.MessagingService, - messageReceiver solace.DirectMessageReceiver) { - gracePeriod := time.Second * 5 - errChan := make(chan error) - var err error - messageReceiver.TerminateAsyncCallback(gracePeriod, func (err error) { - errChan <- err - }) - Eventually(errChan, gracePeriod + 1).Should(Receive(&err)) - Expect(err).To(BeNil()) - Expect(messagingService.IsConnected()).To(BeTrue()) - }, - configuration: func() config.ServicePropertyMap { - return helpers.DefaultCacheConfiguration() - }, - cleanupFunc: func(messagingService solace.MessagingService) { - Expect(messagingService.Disconnect()).To(BeNil()) - Expect(messagingService.IsConnected()).To(BeFalse()) - }, - }, - */ - } - cacheRequestID := 0 - for terminationCaseName, terminationContextRef := range terminationCases { - terminationConfiguration := terminationContextRef.configuration - terminationFunction := terminationContextRef.terminateFunction - terminationCleanup := terminationContextRef.cleanupFunc - Context("using termination scheme " + terminationCaseName, func() { - var terminate func() - const numExpectedCachedMessages int = 3 - const numExpectedLiveMessages int = 1 - const delay int = 25000 - var receivedMsgChan chan message.InboundMessage - - var cacheName string - var topic string - BeforeEach(func() { - logging.SetLogLevel(logging.LogLevelDebug) - CheckCache() // skips test with message if cache image is not available - helpers.InitAllCacheClustersWithMessages() - var err error - messagingService, err = messaging.NewMessagingServiceBuilder().FromConfigurationProvider(terminationConfiguration()).Build() - Expect(err).To(BeNil()) - err = messagingService.Connect() - Expect(err).To(BeNil()) - messageReceiver, err = messagingService.CreateDirectMessageReceiverBuilder().Build() - Expect(err).To(BeNil()) - err = messageReceiver.Start() - Expect(err).To(BeNil()) - receivedMsgChan = make(chan message.InboundMessage, 3) - defer close(receivedMsgChan) - messageReceiver.ReceiveAsync(func(msg message.InboundMessage) { - fmt.Printf("Received message from test:\n%s\n", msg.String()) - receivedMsgChan <- msg - }) - - topic = fmt.Sprintf("MaxMsgs%d/%s/data1", numExpectedCachedMessages, testcontext.Cache().Vpn) - cacheName = fmt.Sprintf("MaxMsgs%d/delay=%d,msgs=%d", numExpectedCachedMessages, delay, numExpectedLiveMessages) - terminate = func() { - Expect(messagingService.IsConnected()).To(BeTrue()) - Expect(messageReceiver.IsRunning()).To(BeTrue()) - terminationFunction(messagingService, messageReceiver) - Eventually(messageReceiver.IsTerminated(), "5s").Should(BeTrue()) - } - }) - AfterEach(func () { - if messageReceiver.IsRunning() { - messageReceiver.Terminate(0) - } - if messagingService.IsConnected() { - messagingService.Disconnect() - } - if terminationCleanup != nil { - terminationCleanup(messagingService) - } - }) - - DescribeTable("A receiver should be able to terminate gracefully with inflight cache requests", - func(strategy resource.CachedMessageSubscriptionStrategy, cacheResponseProcessStrategy helpers.CacheResponseProcessStrategy) { - fmt.Printf("Got to beginning of DescribeTable iteration\n") - fmt.Printf("strategy is %d and response strategy is %d\n", strategy, cacheResponseProcessStrategy) - logging.SetLogLevel(logging.LogLevelDebug) - strategyString := "" - switch strategy { - case resource.AsAvailable: - strategyString = "AsAvailable" - case resource.CachedOnly: - strategyString = "CachedOnly" - case resource.CachedFirst: - strategyString = "CachedFirst" - case resource.LiveCancelsCached: - strategyString = "LiveCancelsCached" - } - numSentCacheRequests := 1 - numExpectedCacheResponses := numSentCacheRequests - numExpectedSentMessages := 0 - totalMessagesReceived := 0 - numExpectedReceivedMessages := numExpectedSentMessages - switch strategy { - case resource.AsAvailable: - numExpectedReceivedMessages += numExpectedCachedMessages - numExpectedReceivedMessages += numExpectedLiveMessages - case resource.LiveCancelsCached: - numExpectedReceivedMessages += numExpectedLiveMessages - case resource.CachedFirst: - numExpectedReceivedMessages += numExpectedCachedMessages - numExpectedReceivedMessages += numExpectedLiveMessages - case resource.CachedOnly: - numExpectedReceivedMessages += numExpectedCachedMessages - } - numExpectedSentDirectMessages := numSentCacheRequests + numExpectedSentMessages - - cacheRequestConfig := helpers.GetValidCacheRequestConfig(strategy, cacheName, topic) - cacheRequestID := message.CacheRequestID(cacheRequestID) - fmt.Printf("Finished all setup, choosing cache request strat\n") - switch cacheResponseProcessStrategy { - case helpers.ProcessCacheResponseThroughChannel: - fmt.Printf("About to send cache request") - cacheResponseChan, err := messageReceiver.RequestCachedAsync(cacheRequestConfig, cacheRequestID) - Expect(err).To(BeNil()) - fmt.Printf("Sent cache request!") - terminate() - for i := 0; i < numExpectedCacheResponses; i++ { - Eventually(cacheResponseChan, delay * 2).Should(Receive()) - fmt.Printf("Got response %d for channel\n", i) - } - case helpers.ProcessCacheResponseThroughCallback: - fmt.Printf("About to send cache request") - cacheResponseSignalChan := make(chan solace.CacheResponse, numExpectedCacheResponses) - defer close(cacheResponseSignalChan) - cacheResponseCallback := func (cacheResponse solace.CacheResponse) { - cacheResponseSignalChan <- cacheResponse - } - err := messageReceiver.RequestCachedAsyncWithCallback(cacheRequestConfig, cacheRequestID, cacheResponseCallback) - Expect(err).To(BeNil()) - fmt.Printf("Sent cache request!") - terminate() - for i := 0; i < numExpectedCacheResponses; i++ { - Eventually(cacheResponseSignalChan, delay * 2).Should(Receive()) - fmt.Printf("Got response %d for callback\n", i) - } - default: - Fail(fmt.Sprintf("Got unexpected CacheResponseProcessStrategy %d", cacheResponseProcessStrategy)) - } - /* TODO: Should this even be here? */ - for i := 0; i < numExpectedReceivedMessages; i ++ { - Eventually(receivedMsgChan, "10s").Should(Receive()) - fmt.Printf("Found message\n") - totalMessagesReceived ++ - } - Expect(messagingService.Metrics().GetValue(metrics.CacheRequestsSent)).To(BeNumerically("==", numSentCacheRequests), fmt.Sprintf("CacheRequestsSent for %s was wrong", strategyString)) - Expect(messagingService.Metrics().GetValue(metrics.CacheRequestsSucceeded)).To(BeNumerically("==", numSentCacheRequests), fmt.Sprintf("CacheRequestsSucceeded for %s was wrong", strategyString)) - Expect(messagingService.Metrics().GetValue(metrics.CacheRequestsFailed)).To(BeNumerically("==", 0), fmt.Sprintf("CacheRequestsFailed for %s was wrong", strategyString)) - Expect(messagingService.Metrics().GetValue(metrics.DirectMessagesSent)).To(BeNumerically("==", numExpectedSentDirectMessages), fmt.Sprintf("DirectMessagesSent for %s was wrong", strategyString)) - Expect(totalMessagesReceived).To(BeNumerically("==", numExpectedReceivedMessages)) - - }, - Entry("test cache RR for valid AsAvailable with channel", resource.AsAvailable, helpers.ProcessCacheResponseThroughChannel), - /* - Entry("test cache RR for valid AsAvailable with callback", resource.AsAvailable, helpers.ProcessCacheResponseThroughCallback), - Entry("test cache RR for valid CachedFirst with channel", resource.CachedFirst, helpers.ProcessCacheResponseThroughChannel), - Entry("test cache RR for valid CachedFirst with callback", resource.CachedFirst, helpers.ProcessCacheResponseThroughCallback), - Entry("test cache RR for valid CachedOnly with channel", resource.CachedOnly, helpers.ProcessCacheResponseThroughChannel), - Entry("test cache RR for valid CachedOnly with callback", resource.CachedOnly, helpers.ProcessCacheResponseThroughCallback), - Entry("test cache RR for valid LiveCancelsCached with channel", resource.LiveCancelsCached, helpers.ProcessCacheResponseThroughChannel), - Entry("test cache RR for valid LivCancelsCached with callback", resource.LiveCancelsCached, helpers.ProcessCacheResponseThroughCallback), - */ - ) - }) - cacheRequestID++ - } - }) -}) + logging.SetLogLevel(logging.LogLevelDebug) + var messagingService solace.MessagingService + var receiver solace.DirectMessageReceiver + Describe("When the cache is available and configured", func() { + BeforeEach(func() { + logging.SetLogLevel(logging.LogLevelDebug) + CheckCache() // skips test with message if cache image is not available + helpers.InitAllCacheClustersWithMessages() + var err error + messagingService, err = messaging.NewMessagingServiceBuilder().FromConfigurationProvider(helpers.DefaultCacheConfiguration()).Build() + Expect(err).To(BeNil()) + err = messagingService.Connect() + Expect(err).To(BeNil()) + receiver, err = messagingService.CreateDirectMessageReceiverBuilder().Build() + Expect(err).To(BeNil()) + err = receiver.Start() + Expect(err).To(BeNil()) + }) + AfterEach(func() { + var err error + if messagingService.IsConnected() { + err = messagingService.Disconnect() + Expect(err).To(BeNil()) + } + Expect(messagingService.IsConnected()).To(BeFalse()) + if receiver.IsRunning() { + err = receiver.Terminate(0) + Expect(err).To(BeNil()) + } + Expect(receiver.IsRunning()).To(BeFalse()) + Expect(receiver.IsTerminated()).To(BeTrue()) + }) + It("a direct receiver should get an error when trying to send an invalid cache request", func() { + var cacheRequestID message.CacheRequestID = 0 + trivialCacheName := "trivial cache name" + trivialTopic := "trivial topic" + strategy := resource.AsAvailable + invalidCacheRequestConfig := helpers.GetInvalidCacheRequestConfig(strategy, trivialCacheName, trivialTopic) + channel, err := receiver.RequestCachedAsync(invalidCacheRequestConfig, cacheRequestID) + Expect(channel).To(BeNil()) + Expect(err).To(BeAssignableToTypeOf(&solace.InvalidConfigurationError{})) + callback := func(solace.CacheResponse) { + Fail("This callback function should never run!") + } + err = receiver.RequestCachedAsyncWithCallback(invalidCacheRequestConfig, cacheRequestID, callback) + Expect(err).To(BeAssignableToTypeOf(&solace.InvalidConfigurationError{})) + }) + It("a direct receiver should be able to submit multiple concurrent cache requests with the same cache request ID without error", func() { + cacheRequestID := message.CacheRequestID(1) + numExpectedCachedMessages := 3 + /* NOTE: delay will give us time to have concurrent cache requests with the same ID */ + delay := 25000 + topic := fmt.Sprintf("MaxMsgs%d/%s/data1", numExpectedCachedMessages, testcontext.Cache().Vpn) + cacheName := fmt.Sprintf("MaxMsgs%d/delay=%d,", numExpectedCachedMessages, delay) + cacheRequestConfig := resource.NewCachedMessageSubscriptionRequest(resource.AsAvailable, cacheName, resource.TopicSubscriptionOf(topic), int32(delay+5000), helpers.ValidMaxCachedMessages, helpers.ValidCachedMessageAge) + + channelOne, err := receiver.RequestCachedAsync(cacheRequestConfig, cacheRequestID) + Expect(channelOne).ToNot(BeNil()) + Expect(err).To(BeNil()) + Eventually(func() uint64 { return messagingService.Metrics().GetValue(metrics.CacheRequestsSent) }, "5s").Should(BeNumerically("==", 1)) + + channelTwo, err := receiver.RequestCachedAsync(cacheRequestConfig, cacheRequestID) + Expect(channelTwo).ToNot(BeNil()) + Expect(err).To(BeNil()) + Eventually(func() uint64 { return messagingService.Metrics().GetValue(metrics.CacheRequestsSent) }, "5s").Should(BeNumerically("==", 2)) + + Consistently(channelOne, "1ms").ShouldNot(Receive()) + Consistently(channelTwo, "1ms").ShouldNot(Receive()) + }) + It("a direct receiver should be able to submit multiple consecutive cache requests with the same cache request ID without error", func() { + cacheRequestID := message.CacheRequestID(1) + numExpectedCachedMessages := 3 + topic := fmt.Sprintf("MaxMsgs%d/%s/data1", numExpectedCachedMessages, testcontext.Cache().Vpn) + cacheName := fmt.Sprintf("MaxMsgs%d", numExpectedCachedMessages) + cacheRequestConfig := helpers.GetValidCacheRequestConfig(resource.AsAvailable, cacheName, topic) + + channelOne, err := receiver.RequestCachedAsync(cacheRequestConfig, cacheRequestID) + Expect(channelOne).ToNot(BeNil()) + Expect(err).To(BeNil()) + Eventually(func() uint64 { return messagingService.Metrics().GetValue(metrics.CacheRequestsSent) }, "5s").Should(BeNumerically("==", 1)) + var cacheResponseOne solace.CacheResponse + Eventually(channelOne, "10s").Should(Receive(&cacheResponseOne)) + Expect(cacheResponseOne).ToNot(BeNil()) + + channelTwo, err := receiver.RequestCachedAsync(cacheRequestConfig, cacheRequestID) + Expect(channelTwo).ToNot(BeNil()) + Expect(err).To(BeNil()) + Eventually(func() uint64 { return messagingService.Metrics().GetValue(metrics.CacheRequestsSent) }, "5s").Should(BeNumerically("==", 2)) + var cacheResponseTwo solace.CacheResponse + Eventually(channelTwo, "10s").Should(Receive(&cacheResponseTwo)) + Expect(cacheResponseTwo).ToNot(BeNil()) + }) + It("a direct receiver that tries to submit more than the maximum number of cache requests should get an IllegalStateError", func() { + maxCacheRequests := 1024 + /* NOTE: First we will fill the internal buffer, then we will try one more and assert an error */ + i := 0 + numExpectedCachedMessages := 3 + topic := fmt.Sprintf("MaxMsgs%d/%s/data1", numExpectedCachedMessages, testcontext.Cache().Vpn) + cacheName := fmt.Sprintf("MaxMsgs%d", numExpectedCachedMessages) + cacheRequestConfig := helpers.GetValidCacheRequestConfig(resource.AsAvailable, cacheName, topic) + cacheResponseSignalChan := make(chan solace.CacheResponse) + callback := func(cacheResponse solace.CacheResponse) { + cacheResponseSignalChan <- cacheResponse + } + for ; i <= maxCacheRequests; i++ { + cacheRequestID := message.CacheRequestID(i) + err := receiver.RequestCachedAsyncWithCallback(cacheRequestConfig, cacheRequestID, callback) + Expect(err).To(BeNil()) + } + Eventually(func() uint64 { return messagingService.Metrics().GetValue(metrics.CacheRequestsSent) }).Should(BeNumerically("==", maxCacheRequests+1)) + Eventually(func() uint64 { return messagingService.Metrics().GetValue(metrics.CacheRequestsSucceeded) }).Should(BeNumerically("==", maxCacheRequests+1)) + + err := receiver.RequestCachedAsyncWithCallback(cacheRequestConfig, message.CacheRequestID(maxCacheRequests+1), callback) + Expect(err).To(BeAssignableToTypeOf(&solace.IllegalStateError{})) + + channel, err := receiver.RequestCachedAsync(cacheRequestConfig, message.CacheRequestID(maxCacheRequests+1)) + Expect(err).To(BeAssignableToTypeOf(&solace.IllegalStateError{})) + Expect(channel).To(BeNil()) + for i := 0; i <= maxCacheRequests; i++ { + <-cacheResponseSignalChan + } + }) + DescribeTable("a direct receiver should be able to submit a valid cache request, receive a response, and terminate", + func(strategy resource.CachedMessageSubscriptionStrategy, cacheResponseProcessStrategy helpers.CacheResponseProcessStrategy) { + logging.SetLogLevel(logging.LogLevelDebug) + strategyString := "" + numExpectedCachedMessages := 3 + numExpectedLiveMessages := 1 + numSentCacheRequests := 1 + numExpectedCacheResponses := numSentCacheRequests + numExpectedSentMessages := 0 + totalMessagesReceived := 0 + numExpectedReceivedMessages := numExpectedSentMessages + switch strategy { + case resource.AsAvailable: + strategyString = "AsAvailable" + numExpectedReceivedMessages += numExpectedCachedMessages + numExpectedReceivedMessages += numExpectedLiveMessages + case resource.LiveCancelsCached: + strategyString = "LiveCancelsCached" + numExpectedReceivedMessages += numExpectedLiveMessages + case resource.CachedFirst: + strategyString = "CachedFirst" + numExpectedReceivedMessages += numExpectedCachedMessages + numExpectedReceivedMessages += numExpectedLiveMessages + case resource.CachedOnly: + strategyString = "CachedOnly" + numExpectedReceivedMessages += numExpectedCachedMessages + } + numExpectedSentDirectMessages := numSentCacheRequests + numExpectedSentMessages + topic := fmt.Sprintf("MaxMsgs%d/%s/data1", numExpectedCachedMessages, testcontext.Cache().Vpn) + cacheName := fmt.Sprintf("MaxMsgs%d/delay=2000,msgs=%d", numExpectedCachedMessages, numExpectedLiveMessages) + cacheRequestConfig := helpers.GetValidCacheRequestConfig(strategy, cacheName, topic) + cacheRequestID := message.CacheRequestID(1) + receivedMsgChan := make(chan message.InboundMessage, 3) + defer close(receivedMsgChan) + receiver.ReceiveAsync(func(msg message.InboundMessage) { + receivedMsgChan <- msg + }) + switch cacheResponseProcessStrategy { + case helpers.ProcessCacheResponseThroughChannel: + cacheResponseChan, err := receiver.RequestCachedAsync(cacheRequestConfig, cacheRequestID) + Expect(err).To(BeNil()) + for i := 0; i < numExpectedCacheResponses; i++ { + Eventually(cacheResponseChan, "10s").Should(Receive()) + } + case helpers.ProcessCacheResponseThroughCallback: + cacheResponseSignalChan := make(chan solace.CacheResponse, 1) + defer close(cacheResponseSignalChan) + cacheResponseCallback := func(cacheResponse solace.CacheResponse) { + cacheResponseSignalChan <- cacheResponse + } + err := receiver.RequestCachedAsyncWithCallback(cacheRequestConfig, cacheRequestID, cacheResponseCallback) + Expect(err).To(BeNil()) + for i := 0; i < numExpectedCacheResponses; i++ { + Eventually(cacheResponseSignalChan, "10s").Should(Receive()) + } + default: + Fail(fmt.Sprintf("Got unexpected CacheResponseProcessStrategy %d", cacheResponseProcessStrategy)) + } + for i := 0; i < numExpectedReceivedMessages; i++ { + Eventually(receivedMsgChan, "10s").Should(Receive()) + totalMessagesReceived++ + } + Expect(messagingService.Metrics().GetValue(metrics.CacheRequestsSent)).To(BeNumerically("==", numSentCacheRequests), fmt.Sprintf("CacheRequestsSent for %s was wrong", strategyString)) + Expect(messagingService.Metrics().GetValue(metrics.CacheRequestsSucceeded)).To(BeNumerically("==", numSentCacheRequests), fmt.Sprintf("CacheRequestsSucceeded for %s was wrong", strategyString)) + Expect(messagingService.Metrics().GetValue(metrics.CacheRequestsFailed)).To(BeNumerically("==", 0), fmt.Sprintf("CacheRequestsFailed for %s was wrong", strategyString)) + Expect(messagingService.Metrics().GetValue(metrics.DirectMessagesSent)).To(BeNumerically("==", numExpectedSentDirectMessages), fmt.Sprintf("DirectMessagesSent for %s was wrong", strategyString)) + Expect(totalMessagesReceived).To(BeNumerically("==", numExpectedReceivedMessages)) + }, + Entry("test cache RR for valid AsAvailable with channel", resource.AsAvailable, helpers.ProcessCacheResponseThroughChannel), + Entry("test cache RR for valid AsAvailable with callback", resource.AsAvailable, helpers.ProcessCacheResponseThroughCallback), + Entry("test cache RR for valid CachedFirst with channel", resource.CachedFirst, helpers.ProcessCacheResponseThroughChannel), + Entry("test cache RR for valid CachedFirst with callback", resource.CachedFirst, helpers.ProcessCacheResponseThroughCallback), + Entry("test cache RR for valid CachedOnly with channel", resource.CachedOnly, helpers.ProcessCacheResponseThroughChannel), + Entry("test cache RR for valid CachedOnly with callback", resource.CachedOnly, helpers.ProcessCacheResponseThroughCallback), + Entry("test cache RR for valid LiveCancelsCached with channel", resource.LiveCancelsCached, helpers.ProcessCacheResponseThroughChannel), + Entry("test cache RR for valid LivCancelsCached with callback", resource.LiveCancelsCached, helpers.ProcessCacheResponseThroughCallback), + ) + Describe("Lifecycle tests", func() { + var messagingService solace.MessagingService + var messageReceiver solace.DirectMessageReceiver + var cacheRequestID = 0 + type terminationContext struct { + terminateFunction func(messagingService solace.MessagingService, + messageReceiver solace.DirectMessageReceiver) + configuration func() config.ServicePropertyMap + cleanupFunc func(messagingService solace.MessagingService) + // blockable indicates whether or not it is feasible for the application to block this + // termination method. Termination methods that run on the main thread are candidates + // since the test(application) can block termination only from the main thread and so would + // reach a deadlock, or have to call terminate in another thread which would be equivalent + // to the asynchronous interface. This would create redundant test coverage since the + // asynchronous methods are already being tested. + blockable bool + // seversConnection informs the test that it should expect cache requests to be incomplete + // since the connection is broken before the request completes. + seversConnection bool + } + terminationCases := map[string]terminationContext{ + "messaging service disconnect": { + terminateFunction: func(messagingService solace.MessagingService, + messageReceiver solace.DirectMessageReceiver) { + Expect(messagingService.Disconnect()).To(BeNil()) + Expect(messagingService.IsConnected()).To(BeFalse()) + Eventually(messageReceiver.IsTerminated(), "5s").Should(BeTrue()) + }, + configuration: func() config.ServicePropertyMap { + return helpers.DefaultCacheConfiguration() + }, + blockable: true, + seversConnection: true, + // FFC: Assert subcode SESSION_NOT_ESTABLISHED, outcome.FAILED + }, + "messaging service disconnect async with channel": { + terminateFunction: func(messagingService solace.MessagingService, + messageReceiver solace.DirectMessageReceiver) { + var err error + Eventually(messagingService.DisconnectAsync(), "5s").Should(Receive(&err)) + Expect(err).To(BeNil()) + Expect(messagingService.IsConnected()).To(BeFalse()) + Eventually(messageReceiver.IsTerminated(), "5s").Should(BeTrue()) + }, + configuration: func() config.ServicePropertyMap { + return helpers.DefaultCacheConfiguration() + }, + blockable: true, + seversConnection: true, + // FFC: Assert subcode SESSION_NOT_ESTABLISHED, outcome.FAILED + }, + "messaging service disconnect async with callback": { + terminateFunction: func(messagingService solace.MessagingService, + messageReceiver solace.DirectMessageReceiver) { + errorChan := make(chan error) + messagingService.DisconnectAsyncWithCallback(func(err error) { + errorChan <- err + }) + var err_holder error + Eventually(errorChan, "5s").Should(Receive(&err_holder)) + Expect(err_holder).To(BeNil()) + Expect(messagingService.IsConnected()).To(BeFalse()) + Eventually(messageReceiver.IsTerminated(), "5s").Should(BeTrue()) + }, + configuration: func() config.ServicePropertyMap { + return helpers.DefaultCacheConfiguration() + }, + blockable: true, + seversConnection: true, + // FFC: Assert subcode SESSION_NOT_ESTABLISHED, outcome.FAILED + }, + "management disconnect": { + terminateFunction: func(messagingService solace.MessagingService, + messageReceiver solace.DirectMessageReceiver) { + eventChan := make(chan solace.ServiceEvent) + messagingService.AddServiceInterruptionListener(func(event solace.ServiceEvent) { + eventChan <- event + }) + helpers.ForceDisconnectViaSEMPv2WithConfiguration( + messagingService, + // make sure this is the same config as assigned to the termination + // strategy's `configuration` field. + helpers.DefaultCacheConfiguration()) + var event_holder solace.ServiceEvent + Eventually(eventChan, "5s").Should(Receive(&event_holder)) + Expect(event_holder).To(Not(BeNil())) + helpers.ValidateNativeError(event_holder.GetCause(), subcode.CommunicationError) + + Expect(messagingService.IsConnected()).To(BeFalse()) + Eventually(messageReceiver.IsTerminated(), "5s").Should(BeTrue()) + }, + configuration: func() config.ServicePropertyMap { + return helpers.DefaultCacheConfiguration() + }, + blockable: true, + seversConnection: true, + // FFC: Assert subcode COMMUNICATION_ERROR, outcome.FAILED + }, + "toxic disconnect": { + terminateFunction: func(messagingService solace.MessagingService, + messageReceiver solace.DirectMessageReceiver) { + eventChan := make(chan solace.ServiceEvent) + messagingService.AddServiceInterruptionListener(func(event solace.ServiceEvent) { + eventChan <- event + }) + testcontext.Toxi().SMF().Delete() + + var event_holder solace.ServiceEvent + Eventually(eventChan, "30s").Should(Receive(&event_holder)) + Expect(event_holder).To(Not(BeNil())) + helpers.ValidateNativeError(event_holder.GetCause(), subcode.CommunicationError) + + Expect(messagingService.IsConnected()).To(BeFalse()) + Eventually(messageReceiver.IsTerminated(), "5s").Should(BeTrue()) + }, + configuration: func() config.ServicePropertyMap { + helpers.CheckToxiProxy() + return helpers.CacheToxicConfiguration() + }, + cleanupFunc: func(_ solace.MessagingService) { + testcontext.Toxi().ResetProxies() + }, + blockable: true, + seversConnection: true, + // FFC: Assert subcode COMMUNICATION_ERROR, outcome.FAILED + }, + "receiver terminate": { + terminateFunction: func(messagingService solace.MessagingService, + messageReceiver solace.DirectMessageReceiver) { + Expect(messageReceiver.Terminate(0)).To(BeNil()) + Expect(messagingService.IsConnected()).To(BeTrue()) + }, + configuration: func() config.ServicePropertyMap { + return helpers.DefaultCacheConfiguration() + }, + cleanupFunc: func(messagingService solace.MessagingService) { + Expect(messagingService.Disconnect()).To(BeNil()) + Expect(messagingService.IsConnected()).To(BeFalse()) + }, + blockable: false, + seversConnection: false, + // FFC: Assert subcode CACHE_REQUEST_CANCELLED, outcome.FAILED + }, + "receiver terminate with grace period": { + terminateFunction: func(messagingService solace.MessagingService, + messageReceiver solace.DirectMessageReceiver) { + gracePeriod := time.Second * 5 + Expect(messageReceiver.Terminate(gracePeriod)).To(BeNil()) + Eventually(messageReceiver.IsRunning(), gracePeriod+(time.Second*1)).Should(BeFalse()) + Expect(messageReceiver.IsTerminated()).To(BeTrue()) + Expect(messagingService.IsConnected()).To(BeTrue()) + }, + configuration: func() config.ServicePropertyMap { + return helpers.DefaultCacheConfiguration() + }, + cleanupFunc: func(messagingService solace.MessagingService) { + Expect(messagingService.Disconnect()).To(BeNil()) + Expect(messagingService.IsConnected()).To(BeFalse()) + }, + blockable: false, + seversConnection: false, + // FFC: Assert subcode CACHE_REQUEST_CANCELLED, outcome.FAILED + }, + "receiver terminate async with channel": { + terminateFunction: func(messagingService solace.MessagingService, + messageReceiver solace.DirectMessageReceiver) { + gracePeriod := time.Second * 0 + var err error + Eventually(messageReceiver.TerminateAsync(gracePeriod), gracePeriod+(time.Second*1)).Should(Receive(&err)) + Expect(err).To(BeNil()) + Expect(messageReceiver.IsRunning()).To(BeFalse()) + Expect(messageReceiver.IsTerminated()).To(BeTrue()) + Expect(messagingService.IsConnected()).To(BeTrue()) + }, + configuration: func() config.ServicePropertyMap { + return helpers.DefaultCacheConfiguration() + }, + cleanupFunc: func(messagingService solace.MessagingService) { + Expect(messagingService.Disconnect()).To(BeNil()) + Expect(messagingService.IsConnected()).To(BeFalse()) + }, + blockable: true, + seversConnection: false, + // FFC: Assert subcode CACHE_REQUEST_CANCELLED, outcome.FAILED + }, + "receiver terminate async with grace period with channel": { + terminateFunction: func(messagingService solace.MessagingService, + messageReceiver solace.DirectMessageReceiver) { + gracePeriod := time.Second * 5 + var err error + Eventually(messageReceiver.TerminateAsync(gracePeriod), gracePeriod+(time.Second*1)).Should(Receive(&err)) + Expect(err).To(BeNil()) + Expect(messageReceiver.IsRunning()).To(BeFalse()) + Expect(messageReceiver.IsTerminated()).To(BeTrue()) + Expect(messagingService.IsConnected()).To(BeTrue()) + }, + configuration: func() config.ServicePropertyMap { + return helpers.DefaultCacheConfiguration() + }, + cleanupFunc: func(messagingService solace.MessagingService) { + Expect(messagingService.Disconnect()).To(BeNil()) + Expect(messagingService.IsConnected()).To(BeFalse()) + }, + blockable: true, + seversConnection: false, + // FFC: Assert subcode CACHE_REQUEST_CANCELLED, outcome.FAILED + }, + "receiver terminate async with callback": { + terminateFunction: func(messagingService solace.MessagingService, + messageReceiver solace.DirectMessageReceiver) { + gracePeriod := time.Second * 0 + errChan := make(chan error) + var err error + messageReceiver.TerminateAsyncCallback(gracePeriod, func(err error) { + errChan <- err + }) + Eventually(errChan, gracePeriod+(time.Second*1)).Should(Receive(&err)) + Expect(err).To(BeNil()) + Expect(messageReceiver.IsRunning()).To(BeFalse()) + Expect(messageReceiver.IsTerminated()).To(BeTrue()) + Expect(messagingService.IsConnected()).To(BeTrue()) + }, + configuration: func() config.ServicePropertyMap { + return helpers.DefaultCacheConfiguration() + }, + cleanupFunc: func(messagingService solace.MessagingService) { + Expect(messagingService.Disconnect()).To(BeNil()) + Expect(messagingService.IsConnected()).To(BeFalse()) + }, + blockable: true, + seversConnection: false, + // FFC: Assert subcode CACHE_REQUEST_CANCELLED, outcome.FAILED + }, + "receiver terminate async with grace period with callback": { + terminateFunction: func(messagingService solace.MessagingService, + messageReceiver solace.DirectMessageReceiver) { + gracePeriod := time.Second * 5 + errChan := make(chan error) + var err error + messageReceiver.TerminateAsyncCallback(gracePeriod, func(err error) { + errChan <- err + }) + Eventually(errChan, gracePeriod+(time.Second*1)).Should(Receive(&err)) + Expect(err).To(BeNil()) + Expect(messageReceiver.IsRunning()).To(BeFalse()) + Expect(messageReceiver.IsTerminated()).To(BeTrue()) + Expect(messagingService.IsConnected()).To(BeTrue()) + }, + configuration: func() config.ServicePropertyMap { + return helpers.DefaultCacheConfiguration() + }, + cleanupFunc: func(messagingService solace.MessagingService) { + Expect(messagingService.Disconnect()).To(BeNil()) + Expect(messagingService.IsConnected()).To(BeFalse()) + }, + blockable: true, + seversConnection: false, + // FFC: Assert subcode CACHE_REQUEST_CANCELLED, outcome.FAILED + }, + } + Context("a connected messaging service with a built direct message receiver", func() { + const cacheName string = "trivial cache name" + const topic string = "trivial topic" + const strategy resource.CachedMessageSubscriptionStrategy = resource.AsAvailable + const cacheRequestID message.CacheRequestID = 1 + BeforeEach(func() { + logging.SetLogLevel(logging.LogLevelDebug) + CheckCache() // skips test with message if cache image is not available + var err error + messagingService, err = messaging.NewMessagingServiceBuilder().FromConfigurationProvider(helpers.DefaultCacheConfiguration()).Build() + Expect(err).To(BeNil()) + err = messagingService.Connect() + Expect(err).To(BeNil()) + messageReceiver, err = messagingService.CreateDirectMessageReceiverBuilder().Build() + Expect(err).To(BeNil()) + }) + AfterEach(func() { + var err error + if messageReceiver.IsRunning() { + err = messageReceiver.Terminate(0) + Expect(err).To(BeNil()) + } + // One of the test cases involves an unstarted receiver, whose only common terminal + // state with a terminated receiver is !IsRunning(). To make this block resuable + // between both test cases, we only check that the receiver is not running at the end + // of the test, which is enough state coverage anyways. + Expect(messageReceiver.IsRunning()).To(BeFalse()) + if messagingService.IsConnected() { + err = messagingService.Disconnect() + Expect(err).To(BeNil()) + } + Expect(messagingService.IsConnected()).To(BeFalse()) + }) + It("will return an IllegalStateError when a cache request is attempted before the receiver is started", func() { + cacheRequestConfig := helpers.GetValidCacheRequestConfig(strategy, cacheName, topic) + _, err := messageReceiver.RequestCachedAsync(cacheRequestConfig, cacheRequestID) + Expect(err).To(BeAssignableToTypeOf(&solace.IllegalStateError{})) + + cacheResponseCallback := func(cacheResponse solace.CacheResponse) { + Fail("This function should never be called.") + } + err = messageReceiver.RequestCachedAsyncWithCallback(cacheRequestConfig, cacheRequestID, cacheResponseCallback) + Expect(err).To(BeAssignableToTypeOf(&solace.IllegalStateError{})) + }) + }) + for terminationCaseName, terminationContextRef := range terminationCases { + terminationConfiguration := terminationContextRef.configuration + terminationFunction := terminationContextRef.terminateFunction + terminationCleanup := terminationContextRef.cleanupFunc + + Context("a connected messaging service with a built direct message receiver", func() { + const cacheName string = "trivial cache name" + const topic string = "trivial topic" + const strategy resource.CachedMessageSubscriptionStrategy = resource.AsAvailable + const cacheRequestID message.CacheRequestID = 1 + BeforeEach(func() { + logging.SetLogLevel(logging.LogLevelDebug) + CheckCache() // skips test with message if cache image is not available + var err error + messagingService, err = messaging.NewMessagingServiceBuilder().FromConfigurationProvider(terminationConfiguration()).Build() + Expect(err).To(BeNil()) + err = messagingService.Connect() + Expect(err).To(BeNil()) + messageReceiver, err = messagingService.CreateDirectMessageReceiverBuilder().Build() + Expect(err).To(BeNil()) + Expect(messageReceiver.Start()).To(BeNil()) + Expect(messageReceiver.IsRunning()).To(BeTrue()) + Expect(messageReceiver.Terminate(0)).To(BeNil()) + + }) + AfterEach(func() { + var err error + if messageReceiver.IsRunning() { + err = messageReceiver.Terminate(0) + Expect(err).To(BeNil()) + } + // One of the test cases involves an unstarted receiver, whose only common terminal + // state with a terminated receiver is !IsRunning(). To make this block resuable + // between both test cases, we only check that the receiver is not running at the end + // of the test, which is enough state coverage anyways. + Expect(messageReceiver.IsRunning()).To(BeFalse()) + if messagingService.IsConnected() { + err = messagingService.Disconnect() + Expect(err).To(BeNil()) + } + Expect(messagingService.IsConnected()).To(BeFalse()) + if terminationCleanup != nil { + terminationCleanup(messagingService) + } + }) + + It("will return an IllegalStateError when a cache request is attempted after the receiver is terminated using the "+terminationCaseName+"termination method", func() { + terminationFunction(messagingService, messageReceiver) + cacheRequestConfig := helpers.GetValidCacheRequestConfig(strategy, cacheName, topic) + _, err := messageReceiver.RequestCachedAsync(cacheRequestConfig, cacheRequestID) + Expect(err).To(BeAssignableToTypeOf(&solace.IllegalStateError{})) + + cacheResponseCallback := func(cacheResponse solace.CacheResponse) { + Fail("This function should never be called.") + } + err = messageReceiver.RequestCachedAsyncWithCallback(cacheRequestConfig, cacheRequestID, cacheResponseCallback) + Expect(err).To(BeAssignableToTypeOf(&solace.IllegalStateError{})) + }) + }) + } + for terminationCaseName, terminationContextRef := range terminationCases { + terminationConfiguration := terminationContextRef.configuration + terminationFunction := terminationContextRef.terminateFunction + terminationCleanup := terminationContextRef.cleanupFunc + Context("using termination scheme "+terminationCaseName, func() { + const numConfiguredCachedMessages int = 3 + const numExpectedCachedMessages int = 0 + const numExpectedLiveMessages int = 0 + const delay int = 25000 + var cacheName string + var topic string + + var terminate func() + var receivedMsgChan chan message.InboundMessage + + BeforeEach(func() { + cacheName = fmt.Sprintf("MaxMsgs%d/delay=%d", numConfiguredCachedMessages, delay) + topic = fmt.Sprintf("MaxMsgs%d/%s/data1", numConfiguredCachedMessages, testcontext.Cache().Vpn) + cacheRequestID++ + logging.SetLogLevel(logging.LogLevelDebug) + CheckCache() // skips test with message if cache image is not available + helpers.InitAllCacheClustersWithMessages() + var err error + messagingService, err = messaging.NewMessagingServiceBuilder().FromConfigurationProvider(terminationConfiguration()).Build() + Expect(err).To(BeNil()) + err = messagingService.Connect() + Expect(err).To(BeNil()) + messageReceiver, err = messagingService.CreateDirectMessageReceiverBuilder().Build() + Expect(err).To(BeNil()) + err = messageReceiver.Start() + Expect(err).To(BeNil()) + receivedMsgChan = make(chan message.InboundMessage, 3) + messageReceiver.ReceiveAsync(func(msg message.InboundMessage) { + receivedMsgChan <- msg + }) + + terminate = func() { + Expect(messagingService.IsConnected()).To(BeTrue()) + Expect(messageReceiver.IsRunning()).To(BeTrue()) + terminationFunction(messagingService, messageReceiver) + Eventually(messageReceiver.IsTerminated(), "5s").Should(BeTrue()) + } + }) + AfterEach(func() { + if messageReceiver.IsRunning() { + messageReceiver.Terminate(0) + } + if messagingService.IsConnected() { + messagingService.Disconnect() + } + if terminationCleanup != nil { + terminationCleanup(messagingService) + } + close(receivedMsgChan) + }) + + DescribeTable("a receiver should be able to terminate gracefully with inflight cache requests", + func(strategy resource.CachedMessageSubscriptionStrategy, cacheResponseProcessStrategy helpers.CacheResponseProcessStrategy) { + logging.SetLogLevel(logging.LogLevelDebug) + strategyString := "" + numSentCacheRequests := 1 + numExpectedCacheResponses := numSentCacheRequests + // The cache request should be cancelled, so it is not successful + numExpectedSuccessfulCacheRequests := 0 + // The cache request should be cancelled, which counts as the + // application stopping the cache request, not as an error/failure + numExpectedFailedCacheRequests := 0 + numExpectedSentMessages := 0 + totalMessagesReceived := 0 + numExpectedReceivedMessages := numExpectedSentMessages + switch strategy { + case resource.AsAvailable: + strategyString = "AsAvailable" + numExpectedReceivedMessages += numExpectedCachedMessages + numExpectedReceivedMessages += numExpectedLiveMessages + case resource.LiveCancelsCached: + strategyString = "LiveCancelsCached" + numExpectedReceivedMessages += numExpectedLiveMessages + case resource.CachedFirst: + strategyString = "CachedFirst" + numExpectedReceivedMessages += numExpectedCachedMessages + numExpectedReceivedMessages += numExpectedLiveMessages + case resource.CachedOnly: + strategyString = "CachedOnly" + numExpectedReceivedMessages += numExpectedCachedMessages + } + numExpectedSentDirectMessages := numSentCacheRequests + numExpectedSentMessages + + cacheRequestConfig := helpers.GetValidCacheRequestConfig(strategy, cacheName, topic) + cacheRequestID := message.CacheRequestID(cacheRequestID) + switch cacheResponseProcessStrategy { + case helpers.ProcessCacheResponseThroughChannel: + cacheResponseChan, err := messageReceiver.RequestCachedAsync(cacheRequestConfig, cacheRequestID) + Expect(err).To(BeNil()) + Eventually(func() uint64 { + return messagingService.Metrics().GetValue(metrics.CacheRequestsSent) + }, "10s").Should(BeNumerically("==", 1)) + + for i := 0; i < numExpectedReceivedMessages; i++ { + Eventually(receivedMsgChan, "10s").Should(Receive()) + totalMessagesReceived++ + } + + terminate() + for i := 0; i < numExpectedCacheResponses; i++ { + Eventually(cacheResponseChan, delay*2).Should(Receive()) + } + case helpers.ProcessCacheResponseThroughCallback: + cacheResponseSignalChan := make(chan solace.CacheResponse, numExpectedCacheResponses) + defer func() { + close(cacheResponseSignalChan) + }() + cacheResponseCallback := func(cacheResponse solace.CacheResponse) { + cacheResponseSignalChan <- cacheResponse + } + err := messageReceiver.RequestCachedAsyncWithCallback(cacheRequestConfig, cacheRequestID, cacheResponseCallback) + Expect(err).To(BeNil()) + + Eventually(func() uint64 { + return messagingService.Metrics().GetValue(metrics.CacheRequestsSent) + }, "10s").Should(BeNumerically("==", 1)) + for i := 0; i < numExpectedReceivedMessages; i++ { + Eventually(receivedMsgChan, "10s").Should(Receive()) + totalMessagesReceived++ + } + terminate() + for i := 0; i < numExpectedCacheResponses; i++ { + Eventually(cacheResponseSignalChan, delay*2).Should(Receive()) + } + default: + Fail(fmt.Sprintf("Got unexpected CacheResponseProcessStrategy %d", cacheResponseProcessStrategy)) + } + + Expect(messagingService.Metrics().GetValue(metrics.CacheRequestsSent)).To(BeNumerically("==", numSentCacheRequests), fmt.Sprintf("CacheRequestsSent for %s was wrong", strategyString)) + Expect(messagingService.Metrics().GetValue(metrics.CacheRequestsSucceeded)).To(BeNumerically("==", numExpectedSuccessfulCacheRequests), fmt.Sprintf("CacheRequestsSucceeded for %s was wrong", strategyString)) + Expect(messagingService.Metrics().GetValue(metrics.CacheRequestsFailed)).To(BeNumerically("==", numExpectedFailedCacheRequests), fmt.Sprintf("CacheRequestsFailed for %s was wrong", strategyString)) + Expect(messagingService.Metrics().GetValue(metrics.DirectMessagesSent)).To(BeNumerically("==", numExpectedSentDirectMessages), fmt.Sprintf("DirectMessagesSent for %s was wrong", strategyString)) + Expect(totalMessagesReceived).To(BeNumerically("==", numExpectedReceivedMessages)) + }, + Entry("test cache RR for valid AsAvailable with channel", resource.AsAvailable, helpers.ProcessCacheResponseThroughChannel), + Entry("test cache RR for valid AsAvailable with callback", resource.AsAvailable, helpers.ProcessCacheResponseThroughCallback), + Entry("test cache RR for valid CachedFirst with channel", resource.CachedFirst, helpers.ProcessCacheResponseThroughChannel), + Entry("test cache RR for valid CachedFirst with callback", resource.CachedFirst, helpers.ProcessCacheResponseThroughCallback), + Entry("test cache RR for valid CachedOnly with channel", resource.CachedOnly, helpers.ProcessCacheResponseThroughChannel), + Entry("test cache RR for valid CachedOnly with callback", resource.CachedOnly, helpers.ProcessCacheResponseThroughCallback), + Entry("test cache RR for valid LiveCancelsCached with channel", resource.LiveCancelsCached, helpers.ProcessCacheResponseThroughChannel), + Entry("test cache RR for valid LivCancelsCached with callback", resource.LiveCancelsCached, helpers.ProcessCacheResponseThroughCallback), + ) + }) + } + for terminationCaseName, terminationContextRef := range terminationCases { + if !terminationContextRef.blockable { + /* NOTE: The selected termination method cannot be blocked, so this test must be + * skipped. + */ + continue + } + terminationConfiguration := terminationContextRef.configuration + terminationFunction := terminationContextRef.terminateFunction + terminationCleanup := terminationContextRef.cleanupFunc + terminationSeversConnection := terminationContextRef.seversConnection + Context("using termination scheme "+terminationCaseName, func() { + const numConfiguredCachedMessages int = 3 + const numExpectedCachedMessages int = 0 + const numExpectedLiveMessages int = 0 + var cacheName string + var topic string + + var terminate func() + var receivedMsgChan chan message.InboundMessage + + BeforeEach(func() { + cacheName = fmt.Sprintf("MaxMsgs%d", numConfiguredCachedMessages) + topic = fmt.Sprintf("MaxMsgs%d/%s/data1", numConfiguredCachedMessages, testcontext.Cache().Vpn) + cacheRequestID++ + logging.SetLogLevel(logging.LogLevelDebug) + CheckCache() // skips test with message if cache image is not available + helpers.InitAllCacheClustersWithMessages() + var err error + messagingService, err = messaging.NewMessagingServiceBuilder().FromConfigurationProvider(terminationConfiguration()).Build() + Expect(err).To(BeNil()) + err = messagingService.Connect() + Expect(err).To(BeNil()) + messageReceiver, err = messagingService.CreateDirectMessageReceiverBuilder().Build() + Expect(err).To(BeNil()) + err = messageReceiver.Start() + Expect(err).To(BeNil()) + receivedMsgChan = make(chan message.InboundMessage, 3) + messageReceiver.ReceiveAsync(func(msg message.InboundMessage) { + receivedMsgChan <- msg + }) + + terminate = func() { + Expect(messagingService.IsConnected()).To(BeTrue()) + Expect(messageReceiver.IsRunning()).To(BeTrue()) + terminationFunction(messagingService, messageReceiver) + Eventually(messageReceiver.IsTerminated(), "5s").Should(BeTrue()) + } + }) + AfterEach(func() { + if messageReceiver.IsRunning() { + messageReceiver.Terminate(0) + } + if messagingService.IsConnected() { + messagingService.Disconnect() + } + if terminationCleanup != nil { + terminationCleanup(messagingService) + } + close(receivedMsgChan) + }) + + DescribeTable("a receiver should be able to terminate gracefully with received cache responses and termination blocked by the application", + func(strategy resource.CachedMessageSubscriptionStrategy) { + logging.SetLogLevel(logging.LogLevelDebug) + strategyString := "" + numSentCacheRequests := 1 + numExpectedCacheResponses := numSentCacheRequests + var numExpectedSuccessfulCacheRequests int + if terminationSeversConnection { + // The cache request should not complete, so it's not successful + numExpectedSuccessfulCacheRequests = 0 + } else { + // The cache request should be completed, so it's successful + numExpectedSuccessfulCacheRequests = numSentCacheRequests + } + numExpectedFailedCacheRequests := 0 + numExpectedSentMessages := 0 + totalMessagesReceived := 0 + numExpectedReceivedMessages := numExpectedSentMessages + switch strategy { + case resource.AsAvailable: + strategyString = "AsAvailable" + numExpectedReceivedMessages += numExpectedCachedMessages + numExpectedReceivedMessages += numExpectedLiveMessages + case resource.LiveCancelsCached: + strategyString = "LiveCancelsCached" + numExpectedReceivedMessages += numExpectedLiveMessages + case resource.CachedFirst: + strategyString = "CachedFirst" + numExpectedReceivedMessages += numExpectedCachedMessages + numExpectedReceivedMessages += numExpectedLiveMessages + case resource.CachedOnly: + strategyString = "CachedOnly" + numExpectedReceivedMessages += numExpectedCachedMessages + } + numExpectedSentDirectMessages := numSentCacheRequests + numExpectedSentMessages + + cacheRequestConfig := helpers.GetValidCacheRequestConfig(strategy, cacheName, topic) + cacheRequestID := message.CacheRequestID(cacheRequestID) + /* NOTE: This channel receives the cache response and indicates to the + * test that it is time to call terminate(). + */ + cacheResponseChan := make(chan solace.CacheResponse) + defer func() { + close(cacheResponseChan) + }() + /* NOTE: We make the signal chan size 0 so that all writers (the API) + * have to wait for the reader (the application) to empty the channel. + * This allows us to simulate blocking behaviour. + */ + cacheResponseSignalChan := make(chan bool) + defer close(cacheResponseSignalChan) + cacheResponseCallback := func(cacheResponse solace.CacheResponse) { + cacheResponseChan <- cacheResponse + <-cacheResponseSignalChan + } + err := messageReceiver.RequestCachedAsyncWithCallback(cacheRequestConfig, cacheRequestID, cacheResponseCallback) + Expect(err).To(BeNil()) + + Eventually(func() uint64 { + return messagingService.Metrics().GetValue(metrics.CacheRequestsSent) + }, "10s").Should(BeNumerically("==", 1)) + + for i := 0; i < numExpectedCacheResponses; i++ { + Eventually(cacheResponseChan, "5s").Should(Receive()) + } + /* NOTE: We call terminate after confirming that we have received the + * cache response so that we can verify termination behaviour when the + * application is blocking termination through the provided callback. + */ + terminate() + cacheResponseSignalChan <- true + + Expect(messagingService.Metrics().GetValue(metrics.CacheRequestsSent)).To(BeNumerically("==", numSentCacheRequests), fmt.Sprintf("CacheRequestsSent for %s was wrong", strategyString)) + Expect(messagingService.Metrics().GetValue(metrics.CacheRequestsSucceeded)).To(BeNumerically("==", numExpectedSuccessfulCacheRequests), fmt.Sprintf("CacheRequestsSucceeded for %s was wrong", strategyString)) + Expect(messagingService.Metrics().GetValue(metrics.CacheRequestsFailed)).To(BeNumerically("==", numExpectedFailedCacheRequests), fmt.Sprintf("CacheRequestsFailed for %s was wrong", strategyString)) + Expect(messagingService.Metrics().GetValue(metrics.DirectMessagesSent)).To(BeNumerically("==", numExpectedSentDirectMessages), fmt.Sprintf("DirectMessagesSent for %s was wrong", strategyString)) + Expect(totalMessagesReceived).To(BeNumerically("==", numExpectedReceivedMessages)) + + }, + /* NOTE: The point of this test is to verify that if the application's + * processing of the cache response is blocking, it blocks termination. Since + * the `RequestCachedAsync()` method returns a channel for the application to + * listen to, there is no way for the application's processing of its response + * to block termination. This makes this test unapplicable to the + * `RequestCachedAsync`() method. In contrast, the + * `RequestCachedAsyncWithCallback()` interface requires that the application + * process the cache response in an application-provided callback. This provides + * the application with the opportunity to block during termination, making the + * `RequestCachedAsyncWithCallback()` interface relevant to this test. + */ + Entry("test cache RR for valid AsAvailable with callback", resource.AsAvailable), + Entry("test cache RR for valid CachedFirst with callback", resource.CachedFirst), + Entry("test cache RR for valid CachedOnly with callback", resource.CachedOnly), + Entry("test cache RR for valid LivCancelsCached with callback", resource.LiveCancelsCached), + ) + }) + } + }) + }) }) diff --git a/test/data/config/config_remote.json b/test/data/config/config_remote.json index 5ed911f..fc1734e 100644 --- a/test/data/config/config_remote.json +++ b/test/data/config/config_remote.json @@ -5,6 +5,14 @@ "semp": { "host": "localhost" }, + "toxiproxy": { + "upstream": "solbroker", + "host": "localhost", + "port": 8474, + "plaintext_port": 15555, + "compressed_port": 15003, + "secure_port": 15443 + }, "cache": { "vpn": "SolCacheUT", "distributed_caches": [ diff --git a/test/helpers/cache_helpers.go b/test/helpers/cache_helpers.go index 9689ba6..24e62f9 100644 --- a/test/helpers/cache_helpers.go +++ b/test/helpers/cache_helpers.go @@ -34,76 +34,43 @@ import ( type CacheHandlerCustomCallback = func(HelpersCacheResponse) -type HelpersCacheResponse struct {} +type HelpersCacheResponse struct{} func NewHelpersCacheResponseFromCacheResponse(cacheResponse solace.CacheResponse) HelpersCacheResponse { - return HelpersCacheResponse{} + return HelpersCacheResponse{} } type HelpersCacheResponseHandler struct { - cacheResponses []HelpersCacheResponse - customCallback CacheHandlerCustomCallback + cacheResponses []HelpersCacheResponse + customCallback CacheHandlerCustomCallback } func NewHelpersCacheResponseHandler(customCallback CacheHandlerCustomCallback) HelpersCacheResponseHandler { - return HelpersCacheResponseHandler { - cacheResponses: []HelpersCacheResponse{}, - customCallback: customCallback, - } + return HelpersCacheResponseHandler{ + cacheResponses: []HelpersCacheResponse{}, + customCallback: customCallback, + } } -func (h * HelpersCacheResponseHandler) Callback(cacheResponse solace.CacheResponse) { - helpersCacheResponse := NewHelpersCacheResponseFromCacheResponse(cacheResponse) - h.cacheResponses = append(h.cacheResponses, helpersCacheResponse) - h.customCallback(helpersCacheResponse) +func (h *HelpersCacheResponseHandler) Callback(cacheResponse solace.CacheResponse) { + helpersCacheResponse := NewHelpersCacheResponseFromCacheResponse(cacheResponse) + h.cacheResponses = append(h.cacheResponses, helpersCacheResponse) + h.customCallback(helpersCacheResponse) } -func (h * HelpersCacheResponseHandler) ResetMetrics() { - h.cacheResponses = []HelpersCacheResponse{} +func (h *HelpersCacheResponseHandler) ResetMetrics() { + h.cacheResponses = []HelpersCacheResponse{} } const ( - ValidCachedMessageAge int32 = 5 - ValidMaxCachedMessages int32 = 5 - ValidCacheAccessTimeout int32 = 5000 + ValidCachedMessageAge int32 = 5 + ValidMaxCachedMessages int32 = 5 + ValidCacheAccessTimeout int32 = 5000 ) -/* -type TestMessageHandler interface { - Callback(inboundMessage message.InboundMessage) - NumHandledMessages() int -} - -type CacheMessageHandler struct { - receivedMessages []message.InboundMessage - customCallback solace.MessageHandler -} - -func DefaultCacheMessageHandlerCustomCallback(inboundMessage message.InboundMessage) { - return -} -// NewCachedMessageHandler creates a new message handler that can be passed to ReceiveAsync(). -// If nil is passed as the value for customCallback, the default callback -// [DefaultCacheMessageHandlerCustomCallback] is used. -func NewCachedMessageHandler(customCallback solace.MessageHandler) CacheMessageHandler { - if customCallback == nil { - customCallback = DefaultCacheMessageHandlerCustomCallback - } - return CacheMessageHandler{ - receivedMessages: []message.InboundMessage{}, - customCallback: customCallback, - } -} - -func (messageHandler * CacheMessageHandler) Callback(inboundMessage message.InboundMessage) { - messageHandler.receivedMessages = append(messageHandler.receivedMessages, inboundMessage) - messageHandler.customCallback(inboundMessage) -} - -func (messageHandler * CacheMessageHandler) NumHandledMessages() int { - return len(messageHandler.receivedMessages) -} -*/ +const ( + InvalidCacheAccessTimoeout int32 = 1000 +) func DefaultCacheConfiguration() config.ServicePropertyMap { connectionDetails := testcontext.Messaging() @@ -119,124 +86,128 @@ func DefaultCacheConfiguration() config.ServicePropertyMap { } func SendMsgsToTopic(topic string, numMessages int) { - builder := messaging.NewMessagingServiceBuilder().FromConfigurationProvider(DefaultCacheConfiguration()) - messagingService := buildMessagingService(builder, 2) - defer func() { - err := messagingService.Disconnect() - Expect(err).To(BeNil()) - }() - err := messagingService.Connect() - Expect(err).To(BeNil()) - receiver, err := messagingService.CreateDirectMessageReceiverBuilder().WithSubscriptions(resource.TopicSubscriptionOf(topic)).Build() - Expect(err).To(BeNil()) - defer func() { - err := receiver.Terminate(0) - Expect(err).To(BeNil()) - }() - err = receiver.Start() - Expect(err).To(BeNil()) - publisher, err := messagingService.CreateDirectMessagePublisherBuilder().OnBackPressureReject(0).Build() - Expect(err).To(BeNil()) - defer func() { - err := publisher.Terminate(0) - Expect(err).To(BeNil()) - }() - err = publisher.Start() - Expect(err).To(BeNil()) - //cacheMessageHandler := NewCachedMessageHandler(nil) - //err = receiver.ReceiveAsync(cacheMessageHandler.Callback) - receivedMsgs := make(chan message.InboundMessage, numMessages) - cacheMessageHandlerCallback := func (msg message.InboundMessage) { - receivedMsgs <- msg - } - err = receiver.ReceiveAsync(cacheMessageHandlerCallback) - Expect(err).To(BeNil()) - counter := 0 - for counter < numMessages { - msg, err := messagingService.MessageBuilder().BuildWithStringPayload(fmt.Sprintf("message %d", counter)) - Expect(err).To(BeNil()) - err = publisher.Publish(msg, resource.TopicOf(topic)) - Expect(err).To(BeNil()) - counter++ - } - for i := 0; i < numMessages; i++ { - var receivedMessage message.InboundMessage - Eventually(receivedMsgs, "5000ms").Should(Receive(&receivedMessage)) - Expect(receivedMessage.GetDestinationName()).To(Equal(topic)) - } + builder := messaging.NewMessagingServiceBuilder().FromConfigurationProvider(DefaultCacheConfiguration()) + messagingService := buildMessagingService(builder, 2) + defer func() { + err := messagingService.Disconnect() + Expect(err).To(BeNil()) + }() + err := messagingService.Connect() + Expect(err).To(BeNil()) + receiver, err := messagingService.CreateDirectMessageReceiverBuilder().WithSubscriptions(resource.TopicSubscriptionOf(topic)).Build() + Expect(err).To(BeNil()) + defer func() { + err := receiver.Terminate(0) + Expect(err).To(BeNil()) + }() + err = receiver.Start() + Expect(err).To(BeNil()) + publisher, err := messagingService.CreateDirectMessagePublisherBuilder().OnBackPressureReject(0).Build() + Expect(err).To(BeNil()) + defer func() { + err := publisher.Terminate(0) + Expect(err).To(BeNil()) + }() + err = publisher.Start() + Expect(err).To(BeNil()) + //cacheMessageHandler := NewCachedMessageHandler(nil) + //err = receiver.ReceiveAsync(cacheMessageHandler.Callback) + receivedMsgs := make(chan message.InboundMessage, numMessages) + cacheMessageHandlerCallback := func(msg message.InboundMessage) { + receivedMsgs <- msg + } + err = receiver.ReceiveAsync(cacheMessageHandlerCallback) + Expect(err).To(BeNil()) + counter := 0 + for counter < numMessages { + msg, err := messagingService.MessageBuilder().BuildWithStringPayload(fmt.Sprintf("message %d", counter)) + Expect(err).To(BeNil()) + err = publisher.Publish(msg, resource.TopicOf(topic)) + Expect(err).To(BeNil()) + counter++ + } + for i := 0; i < numMessages; i++ { + var receivedMessage message.InboundMessage + Eventually(receivedMsgs, "5000ms").Should(Receive(&receivedMessage)) + Expect(receivedMessage.GetDestinationName()).To(Equal(topic)) + } } // InitCacheWithPreExistingMessages assumes that `clusterName` is the name of a valid cache cluster. func InitCacheWithPreExistingMessages(cacheCluster testcontext.CacheClusterConfig) { - topics := []string{} - const defaultNumMessages int = 1 - const standardClusterNamePrefix string = "MaxMsgs" - vpnName := testcontext.Cache().Vpn - numMessages := defaultNumMessages - clusterName := cacheCluster.Name - for _, topic := range cacheCluster.Topics { - if strings.HasPrefix(topic, fmt.Sprintf("%s/*/data", clusterName)) { - /* NOTE: Checking the length is greater than the prefix means we can - * split the string immediately instead of needing to check that the - * slice length is 2. */ - if strings.HasPrefix(clusterName, standardClusterNamePrefix) && (len(clusterName) != len(standardClusterNamePrefix)) { - if convertedNum, err := strconv.Atoi(strings.Split(clusterName, standardClusterNamePrefix)[1]); err == nil { - numMessages = convertedNum - } - } - splitString := strings.Split(topic, "*") - /* NOTE: This should never happen, but we have this check just in case - * something goes wrong so we can avoid a panic if we try to go outside - * the list size in the next line. */ - Expect(len(splitString)).To(BeNumerically("==", 2)) - topics = append(topics, fmt.Sprintf("%s%s%s", splitString[0], vpnName, splitString[1])) - } - } - for _, topic := range topics { - SendMsgsToTopic(topic, numMessages) - } + topics := []string{} + const defaultNumMessages int = 1 + const standardClusterNamePrefix string = "MaxMsgs" + vpnName := testcontext.Cache().Vpn + numMessages := defaultNumMessages + clusterName := cacheCluster.Name + for _, topic := range cacheCluster.Topics { + if strings.HasPrefix(topic, fmt.Sprintf("%s/*/data", clusterName)) { + /* NOTE: Checking the length is greater than the prefix means we can + * split the string immediately instead of needing to check that the + * slice length is 2. */ + if strings.HasPrefix(clusterName, standardClusterNamePrefix) && (len(clusterName) != len(standardClusterNamePrefix)) { + if convertedNum, err := strconv.Atoi(strings.Split(clusterName, standardClusterNamePrefix)[1]); err == nil { + numMessages = convertedNum + } + } + splitString := strings.Split(topic, "*") + /* NOTE: This should never happen, but we have this check just in case + * something goes wrong so we can avoid a panic if we try to go outside + * the list size in the next line. */ + Expect(len(splitString)).To(BeNumerically("==", 2)) + topics = append(topics, fmt.Sprintf("%s%s%s", splitString[0], vpnName, splitString[1])) + } + } + for _, topic := range topics { + SendMsgsToTopic(topic, numMessages) + } } func InitAllCacheClustersWithMessages() { - for _, distributedCache := range testcontext.Cache().DistributedCaches { - for _, cacheCluster := range distributedCache.CacheClusters { - InitCacheWithPreExistingMessages(cacheCluster) - } - } + for _, distributedCache := range testcontext.Cache().DistributedCaches { + for _, cacheCluster := range distributedCache.CacheClusters { + InitCacheWithPreExistingMessages(cacheCluster) + } + } } func GetValidAsAvailableCacheRequestConfig(cacheName string, topic string) resource.CachedMessageSubscriptionRequest { - return GetValidCacheRequestConfig(resource.AsAvailable, cacheName, topic) + return GetValidCacheRequestConfig(resource.AsAvailable, cacheName, topic) } func GetValidCachedOnlyCacheRequestConfig(cacheName string, topic string) resource.CachedMessageSubscriptionRequest { - return GetValidCacheRequestConfig(resource.CachedOnly, cacheName, topic) + return GetValidCacheRequestConfig(resource.CachedOnly, cacheName, topic) } func GetValidLiveCancelsCachedRequestConfig(cacheName string, topic string) resource.CachedMessageSubscriptionRequest { - return GetValidCacheRequestConfig(resource.LiveCancelsCached, cacheName, topic) + return GetValidCacheRequestConfig(resource.LiveCancelsCached, cacheName, topic) } func GetValidCachedFirstCacheRequestConfig(cacheName string, topic string) resource.CachedMessageSubscriptionRequest { - return GetValidCacheRequestConfig(resource.CachedFirst, cacheName, topic) + return GetValidCacheRequestConfig(resource.CachedFirst, cacheName, topic) } func GetValidCacheRequestConfig(strategy resource.CachedMessageSubscriptionStrategy, cacheName string, topic string) resource.CachedMessageSubscriptionRequest { - return resource.NewCachedMessageSubscriptionRequest(strategy, cacheName, resource.TopicSubscriptionOf(topic), ValidCacheAccessTimeout, ValidMaxCachedMessages, ValidCachedMessageAge) + return resource.NewCachedMessageSubscriptionRequest(strategy, cacheName, resource.TopicSubscriptionOf(topic), ValidCacheAccessTimeout, ValidMaxCachedMessages, ValidCachedMessageAge) +} + +func GetInvalidCacheRequestConfig(strategy resource.CachedMessageSubscriptionStrategy, cacheName string, topic string) resource.CachedMessageSubscriptionRequest { + return resource.NewCachedMessageSubscriptionRequest(strategy, cacheName, resource.TopicSubscriptionOf(topic), InvalidCacheAccessTimoeout, ValidMaxCachedMessages, ValidCachedMessageAge) } type CacheResponseProcessStrategy = int const ( - ProcessCacheResponseThroughChannel CacheResponseProcessStrategy = iota - ProcessCacheResponseThroughCallback CacheResponseProcessStrategy = iota + ProcessCacheResponseThroughChannel CacheResponseProcessStrategy = iota + ProcessCacheResponseThroughCallback CacheResponseProcessStrategy = iota ) func CacheToxicConfiguration() config.ServicePropertyMap { - if toxiConfig := ToxicConfiguration(); toxiConfig == nil { - return nil - } else { - toxiConfig[config.ServicePropertyVPNName] = testcontext.Cache().Vpn - return toxiConfig - } + if toxiConfig := ToxicConfiguration(); toxiConfig == nil { + return nil + } else { + toxiConfig[config.ServicePropertyVPNName] = testcontext.Cache().Vpn + return toxiConfig + } } diff --git a/test/helpers/messaging_service_helpers.go b/test/helpers/messaging_service_helpers.go index ea83444..5d03825 100644 --- a/test/helpers/messaging_service_helpers.go +++ b/test/helpers/messaging_service_helpers.go @@ -180,11 +180,23 @@ func DisconnectMessagingServiceWithFunction(messagingService solace.MessagingSer // ForceDisconnectViaSEMPv2 function func ForceDisconnectViaSEMPv2(messagingService solace.MessagingService) { + ForceDisconnectViaSEMPv2WithConfiguration(messagingService, DefaultConfiguration()) +} + +// ForceDisconnectViaSEMPv2WithConfiguration function allows different configurations to be provided, such as VPN name +func ForceDisconnectViaSEMPv2WithConfiguration(messagingService solace.MessagingService, configuration config.ServicePropertyMap) { + vpnName, ok := configuration[config.ServicePropertyVPNName].(string) + /* NOTE: This interface currently doesn't support returning an error, so if the conversion failed we just set + * the vpn name to the error string and let the test run. This will fail the test and alert the developer that + * something went wrong without needing to change this interface. */ + if !ok { + vpnName = "string conversion for VPN name from service property map failed." + } _, _, err := testcontext.SEMP().Action().ClientApi. DoMsgVpnClientDisconnect( testcontext.SEMP().ActionCtx(), action.MsgVpnClientDisconnect{}, - testcontext.Messaging().VPN, + vpnName, url.QueryEscape(messagingService.GetApplicationID()), ) ExpectWithOffset(1, err).ToNot(HaveOccurred()) diff --git a/test/testcontext/config.go b/test/testcontext/config.go index 5ded700..146c7ee 100644 --- a/test/testcontext/config.go +++ b/test/testcontext/config.go @@ -33,8 +33,8 @@ type TestConfig struct { TestContainers *TestContainersConfig `json:"testcontainers,omitempty"` Kerberos *KerberosConfig `json:"kerberos,omitempty"` OAuth *OAuthConfig `json:"oauth,omitempty"` - Cache *CacheConfig `json:"cache,omitempty"` - CacheProxy *CacheProxyConfig `json:"cache_proxy,omitempty"` + Cache *CacheConfig `json:"cache,omitempty"` + CacheProxy *CacheProxyConfig `json:"cache_proxy,omitempty"` } // TestContainersConfig common context specific config should be placed here @@ -71,46 +71,46 @@ type KerberosConfig struct { // CacheConfig represents Cache's config type CacheConfig struct { - Image string `env:"SOLCACHE_TEST_IMAGE"` // The image is proprietary, so we don't want to commit its name - // or other info to vcs. - Hostname string `json:"hostname" env:"PUBSUB_CACHE_HOSTNAME"` - SuspectHostname string `json:"suspect_hostname" env:"PUBSUB_CACHE_SUSPECT_HOSTNAME"` - Vpn string `json:"vpn"` - DistributedCaches []DistributedCacheConfig `json:"distributed_caches"` + Image string `env:"SOLCACHE_TEST_IMAGE"` // The image is proprietary, so we don't want to commit its name + // or other info to vcs. + Hostname string `json:"hostname" env:"PUBSUB_CACHE_HOSTNAME"` + SuspectHostname string `json:"suspect_hostname" env:"PUBSUB_CACHE_SUSPECT_HOSTNAME"` + Vpn string `json:"vpn"` + DistributedCaches []DistributedCacheConfig `json:"distributed_caches"` } // DistributedCacheConfig represents the DistributedCache's config type DistributedCacheConfig struct { - Name string `json:"name"` - Properties *struct {} `json:"properties,omitempty"` - CacheClusters []CacheClusterConfig `json:"cache_clusters"` + Name string `json:"name"` + Properties *struct{} `json:"properties,omitempty"` + CacheClusters []CacheClusterConfig `json:"cache_clusters"` } // CacheClusterConfig represents the CacheCluster's config type CacheClusterConfig struct { - Name string `json:"name"` - Topics []string `json:"topics"` - Properties *struct { - MaxMsgsPerTopic int `json:"maxMsgsPerTopic"` - MaxTopicCount int `json:"maxTopicCount"` - MaxMemory int `json:"maxMemory"` - } `json:"properties,omitempty"` - CacheInstances []CacheInstanceConfig `json:"cache_instances"` + Name string `json:"name"` + Topics []string `json:"topics"` + Properties *struct { + MaxMsgsPerTopic int `json:"maxMsgsPerTopic"` + MaxTopicCount int `json:"maxTopicCount"` + MaxMemory int `json:"maxMemory"` + } `json:"properties,omitempty"` + CacheInstances []CacheInstanceConfig `json:"cache_instances"` } // CacheInstanceConfig represents the CacheIntance's config type CacheInstanceConfig struct { - Name string `json:"name"` - Autostart bool `json:"autostart"` - OperationalState string `json:"operational_state"` - Properties *struct { - StopOnLostMsgEnabled bool `json:"stopOnLostMsgEnabled,omitempty"` - } `json:"properties,omitempty"` + Name string `json:"name"` + Autostart bool `json:"autostart"` + OperationalState string `json:"operational_state"` + Properties *struct { + StopOnLostMsgEnabled bool `json:"stopOnLostMsgEnabled,omitempty"` + } `json:"properties,omitempty"` } // CacheProxyConfig represents Cache Proxy's config type CacheProxyConfig struct { - Image string `env:"SOLCACHEPROXY_TEST_IMAGE"` + Image string `env:"SOLCACHEPROXY_TEST_IMAGE"` } // ToEnvironment dumps the config to a map of environment variables diff --git a/test/testcontext/test_context.go b/test/testcontext/test_context.go index d8cc1d3..aa843a7 100644 --- a/test/testcontext/test_context.go +++ b/test/testcontext/test_context.go @@ -77,12 +77,12 @@ func OAuth() *OAuthConfig { // Cache returns the Cache config func Cache() *CacheConfig { - return instance.Cache() + return instance.Cache() } // CacheProxy returns the CacheProxy config func CacheProxy() *CacheProxyConfig { - return instance.CacheProxy() + return instance.CacheProxy() } // function to wait for semp serivces @@ -93,13 +93,13 @@ func WaitForSEMPReachable() error { // CacheEnabled returns `true` if the infrastructure required for running cache tests is available. Returns `false` // otherwise. func CacheEnabled() bool { - return instance.CacheEnabled() + return instance.CacheEnabled() } // CacheProxyEnabled returns `true` if the infrastructure required for running cache proxy tests is // available. Returns `false` otherwise. func CacheProxyEnabled() bool { - return instance.CacheProxyEnabled() + return instance.CacheProxyEnabled() } // testContext represents a test context @@ -122,27 +122,27 @@ type testContext interface { ToxiProxy() ToxiProxy // OAuth returns the OAuth config OAuth() *OAuthConfig - // Cache returns the Cache config - Cache() *CacheConfig - // CacheProxy returns the CacheProxyConfig - CacheProxy() *CacheProxyConfig + // Cache returns the Cache config + Cache() *CacheConfig + // CacheProxy returns the CacheProxyConfig + CacheProxy() *CacheProxyConfig // waits for semp service to be reachable WaitForSEMPReachable() error - // CacheEnabled returns `true` if the infrastructure required for running cache tests is available. Returns `false` - // otherwise. - CacheEnabled() bool - // CacheProxyEnabled returns `true` if the infrastructure required for running cache proxy tests is available. - // Returns `false` otherwise - CacheProxyEnabled() bool + // CacheEnabled returns `true` if the infrastructure required for running cache tests is available. Returns `false` + // otherwise. + CacheEnabled() bool + // CacheProxyEnabled returns `true` if the infrastructure required for running cache proxy tests is available. + // Returns `false` otherwise + CacheProxyEnabled() bool } type testContextCommon struct { - config *TestConfig - semp *sempV2Impl - toxi *toxiProxyImpl - kerberosEnabled bool - cacheEnabled bool - cacheProxyEnabled bool + config *TestConfig + semp *sempV2Impl + toxi *toxiProxyImpl + kerberosEnabled bool + cacheEnabled bool + cacheProxyEnabled bool } // GetConnectionDetails impl @@ -173,19 +173,19 @@ func (context *testContextCommon) Kerberos() bool { } func (context *testContextCommon) Cache() *CacheConfig { - return context.config.Cache + return context.config.Cache } func (context *testContextCommon) CacheEnabled() bool { - return context.cacheEnabled + return context.cacheEnabled } func (context *testContextCommon) CacheProxy() *CacheProxyConfig { - return context.config.CacheProxy + return context.config.CacheProxy } func (context *testContextCommon) CacheProxyEnabled() bool { - return context.cacheProxyEnabled + return context.cacheProxyEnabled } // loads the configs based on the given path diff --git a/test/testcontext/test_context_remote.go b/test/testcontext/test_context_remote.go index 5d3804d..0a5fdc2 100644 --- a/test/testcontext/test_context_remote.go +++ b/test/testcontext/test_context_remote.go @@ -55,7 +55,7 @@ func (context *remoteTestContext) Setup() error { return err } context.toxi = newToxiProxy(context.config.ToxiProxy) - context.toxi.setup() + context.toxi.setupWithPreExistingProxy() } if context.config.Cache.Image != "" { diff --git a/test/testcontext/toxiproxy.go b/test/testcontext/toxiproxy.go index 7811471..c9a1176 100644 --- a/test/testcontext/toxiproxy.go +++ b/test/testcontext/toxiproxy.go @@ -23,11 +23,16 @@ import ( ) const ( - smfProxyName = "smf" - compressedSmfProxyName = "compressedSmf" - secureSmfProxyName = "secureSmf" + smfProxyName = "smf" + compressedSmfProxyName = "compressedSmf" + secureSmfProxyName = "secureSmf" + proxyAlreadyExistsErrorMessage = "HTTP 409: proxy already exists" ) +var proxiesToSetup []string = []string{smfProxyName, + compressedSmfProxyName, + secureSmfProxyName} + // ToxiProxy interface type ToxiProxy interface { Config() *ToxiProxyConfig @@ -70,6 +75,7 @@ func (toxiProxy *toxiProxyImpl) setup() error { fmt.Sprintf(":%d", toxiProxy.config.PlaintextPort), fmt.Sprintf("%s:%d", toxiProxy.config.Upstream, 55555), ) + if err != nil { return err } @@ -92,6 +98,29 @@ func (toxiProxy *toxiProxyImpl) setup() error { return nil } +// setupWithPreExistingProxy checks for existing proxy endpoints on the proxy client. +// If any proxy endpoints that we need to configure are found, they are replaced. +// Any proxy endpoints that we do not need to configure are not mutated. +func (toxiProxy *toxiProxyImpl) setupWithPreExistingProxy() error { + proxyMap, err := toxiProxy.client.Proxies() + if err != nil { + return err + } + for _, foundProxy := range proxyMap { + for _, neededProxy := range proxiesToSetup { + if foundProxy.Name == neededProxy { + if err = foundProxy.Delete(); err != nil { + return nil + } + } + } + } + if err = toxiProxy.setup(); err != nil { + return err + } + return err +} + func (toxiProxy *toxiProxyImpl) teardown() error { for _, proxy := range toxiProxy.proxies { err := proxy.Delete()