From d9ef9d4c2f1a9f8947c4077083072a452a3db57d Mon Sep 17 00:00:00 2001 From: Trent Daniel Date: Mon, 13 Jan 2025 17:14:57 -0500 Subject: [PATCH] EBP-18: Made changes in response to PR feedback. --- internal/ccsmp/ccsmp_cache.go | 51 +++++++++---------- internal/ccsmp/ccsmp_core.go | 7 +-- internal/ccsmp/ccsmp_helper.c | 15 ++---- internal/impl/core/receiver.go | 4 +- internal/impl/receiver/cache_util.go | 11 ++-- .../receiver/direct_message_receiver_impl.go | 22 +++++--- pkg/solace/receiver_cache_requests.go | 3 -- 7 files changed, 55 insertions(+), 58 deletions(-) diff --git a/internal/ccsmp/ccsmp_cache.go b/internal/ccsmp/ccsmp_cache.go index 14de854..91fc97b 100644 --- a/internal/ccsmp/ccsmp_cache.go +++ b/internal/ccsmp/ccsmp_cache.go @@ -70,13 +70,6 @@ var CachedMessageSubscriptionRequestStrategyMappingToCCSMPSubscribeFlags = map[r // SolClientCacheSessionPt is assigned a value type SolClientCacheSessionPt = C.solClient_opaqueCacheSession_pt -/* TODO: Not sure if we really need this. */ - -// SolClientCacheSession structure -type SolClientCacheSession struct { - pointer SolClientCacheSessionPt -} - /* sessionToCacheEventCallbackMap is required as a global var even though cache sessions etc. are @@ -117,13 +110,11 @@ func SendCacheRequest( ) *SolClientErrorInfoWrapper { cacheToEventCallbackMap.Store(cacheSessionP, eventCallback) - /* NOTE: Do we need to add the dispatch callback to the map here, or is that already done by the receiver in calls to Receive() or ReceiveAsync()? */ topicP := C.CString(topic) errorInfo := handleCcsmpError(func() SolClientReturnCode { return C.CacheSessionSendCacheRequest( C.solClient_uint64_t(dispatchID), - // TODO: rework this, we should probably just have a pointer and not a whole object. cacheSessionP, topicP, C.solClient_uint64_t(cacheRequestID), // This is done elsewhere such as in SolClientMessgeSetSequenceNumber @@ -137,8 +128,12 @@ func SendCacheRequest( func DestroyCacheSession(cacheSessionP SolClientCacheSessionPt) *SolClientErrorInfoWrapper { /* NOTE: We remove the cache session pointer from the map before we destroy the cache session * so that map access using that pointer as an index won't collide with another cache session - * that is created immediately after this one is destroyed.*/ + * that is created immediately after this one is destroyed. This should be deleted in the cache + * event callback. We delete again here in case there was a problem in the cache event callback + * so that we don't leak resources. If the entry has already been deleted, the following call + * will not fail, and will effectively be a no-op.*/ cacheToEventCallbackMap.Delete(cacheSessionP) + return handleCcsmpError(func() SolClientReturnCode { return C.CacheSessionDestroy(&cacheSessionP) }) @@ -150,17 +145,14 @@ func CancelCacheRequest(cacheSessionP SolClientCacheSessionPt) *SolClientErrorIn }) } -// SolClientCacheEventCallback functions should format CCSMP args into Go objects and then pass those objects -// to a separate function that actually processes them, maybe through a channel with a background go routine -// polling it? -type SolClientCacheEventCallback = func(SolClientCacheEventInfo) -type SolClientCacheEventInfoPt = C.solCache_eventCallbackInfo_pt +// SolClientCacheEventCallback functions format CCSMP args into Go objects and then pass those objects +// to a separate function that actually processes them +type SolClientCacheEventCallback = func(CacheEventInfo) +type CacheEventInfoPt = C.solCache_eventCallbackInfo_pt type SolClientCacheEvent = C.solCache_event_t -type SolClientCacheEventInfo struct { - /* TODO: Rename this to CacheEventInfo to better distinguish it from CCSMP objects, since it now has more than - * just the original event fields. */ +type CacheEventInfo struct { cacheSessionP SolClientCacheSessionPt event SolClientCacheEvent topic string @@ -170,16 +162,16 @@ type SolClientCacheEventInfo struct { err error } -func (eventInfo *SolClientCacheEventInfo) String() string { - return fmt.Sprintf("SolClientCacheEventInfo:\n\tcacheSesionP: 0x%x\n\tevent: %d\n\ttopic: %s\n\treturnCode: %d\n\tsubCode: %d\n\tcacheRequestID: %d\n\terr: %s", eventInfo.cacheSessionP, eventInfo.event, eventInfo.topic, eventInfo.returnCode, eventInfo.subCode, eventInfo.cacheRequestID, eventInfo.err) +func (eventInfo *CacheEventInfo) String() string { + return fmt.Sprintf("CacheEventInfo:\n\tcacheSessionP: 0x%x\n\tevent: %d\n\ttopic: %s\n\treturnCode: %d\n\tsubCode: %d\n\tcacheRequestID: %d\n\terr: %s", eventInfo.cacheSessionP, eventInfo.event, eventInfo.topic, eventInfo.returnCode, eventInfo.subCode, eventInfo.cacheRequestID, eventInfo.err) } const ( SolClientCacheEventRequestCompletedNotice SolClientCacheEvent = 1 ) -func NewConfiguredSolClientCacheEventInfo(cacheSessionP SolClientCacheSessionPt, cacheRequestID message.CacheRequestID, topic string, err error) SolClientCacheEventInfo { - return SolClientCacheEventInfo{ +func NewConfiguredCacheEventInfo(cacheSessionP SolClientCacheSessionPt, cacheRequestID message.CacheRequestID, topic string, err error) CacheEventInfo { + return CacheEventInfo{ cacheSessionP: cacheSessionP, event: SolClientCacheEventRequestCompletedNotice, topic: topic, @@ -190,12 +182,12 @@ func NewConfiguredSolClientCacheEventInfo(cacheSessionP SolClientCacheSessionPt, } } -func (i *SolClientCacheEventInfo) GetCacheSessionPointer() SolClientCacheSessionPt { +func (i *CacheEventInfo) GetCacheSessionPointer() SolClientCacheSessionPt { return i.cacheSessionP } -func SolClientCacheEventInfoFromCoreCacheEventInfo(eventCallbackInfo SolClientCacheEventInfoPt, userP uintptr) SolClientCacheEventInfo { - return SolClientCacheEventInfo{ +func CacheEventInfoFromCoreCacheEventInfo(eventCallbackInfo CacheEventInfoPt, userP uintptr) CacheEventInfo { + return CacheEventInfo{ cacheSessionP: SolClientCacheSessionPt(userP), event: SolClientCacheEvent(eventCallbackInfo.cacheEvent), topic: C.GoString(eventCallbackInfo.topic), @@ -206,13 +198,18 @@ func SolClientCacheEventInfoFromCoreCacheEventInfo(eventCallbackInfo SolClientCa } //export goCacheEventCallback -func goCacheEventCallback( /*opaqueSessionP*/ _ SolClientSessionPt, eventCallbackInfo SolClientCacheEventInfoPt, userP unsafe.Pointer) { +func goCacheEventCallback( /*opaqueSessionP*/ _ SolClientSessionPt, eventCallbackInfo CacheEventInfoPt, userP unsafe.Pointer) { /* NOTE: We don't need to use the session pointer since we can use the user_p(a.k.a. the cache session pointer) * which is guaranteed to be unique for at least the duration that the cache session pointer is in the global * map since during receiver termination we destory the cache session only after we remove it from all maps. */ if callback, ok := cacheToEventCallbackMap.Load(SolClientCacheSessionPt(uintptr(userP))); ok { - callback.(SolClientCacheEventCallback)(SolClientCacheEventInfoFromCoreCacheEventInfo(eventCallbackInfo, uintptr(userP))) + eventInfo := CacheEventInfoFromCoreCacheEventInfo(eventCallbackInfo, uintptr(userP)) + callback.(SolClientCacheEventCallback)(eventInfo) + /* NOTE: We remove the cache session pointer from the map before we destroy the cache session + * so that map access using that pointer as an index won't collide with another cache session + * that is created immediately after this one is destroyed.*/ + cacheToEventCallbackMap.Delete(eventInfo.cacheSessionP) } else { if logging.Default.IsDebugEnabled() { logging.Default.Debug("Received event callback from core API without an associated cache event callback") diff --git a/internal/ccsmp/ccsmp_core.go b/internal/ccsmp/ccsmp_core.go index b1c42e2..4b3a7c3 100644 --- a/internal/ccsmp/ccsmp_core.go +++ b/internal/ccsmp/ccsmp_core.go @@ -145,7 +145,7 @@ func goEventCallback(sessionP SolClientSessionPt, eventInfoP SolClientSessionEve if callback, ok := sessionToEventCallbackMap.Load(sessionP); ok { callback.(SolClientSessionEventCallback)(SolClientSessionEvent(eventInfoP.sessionEvent), eventInfoP.responseCode, C.GoString(eventInfoP.info_p), eventInfoP.correlation_p, userP) } else { - logging.Default.Debug(fmt.Sprintf("Received event callback from core API without an associated session callback:\n%s\n", C.GoString(eventInfoP.info_p))) + logging.Default.Debug("Received event callback from core API without an associated session callback") } } @@ -334,11 +334,6 @@ func (context *SolClientContext) SolClientSessionCreate(properties []string) (se var sessionP SolClientSessionPt sessionPropsP, sessionPropertiesFreeFunction := ToCArray(properties, true) defer sessionPropertiesFreeFunction() - var sessionFuncInfo C.solClient_session_createFuncInfo_t - sessionFuncInfo.rxMsgInfo.callback_p = (C.solClient_session_rxMsgCallbackFunc_t)(unsafe.Pointer(C.defaultMessageReceiveCallback)) - sessionFuncInfo.rxMsgInfo.user_p = nil - sessionFuncInfo.eventInfo.callback_p = (C.solClient_session_eventCallbackFunc_t)(unsafe.Pointer(C.eventCallback)) - sessionFuncInfo.eventInfo.user_p = nil solClientErrorInfo := handleCcsmpError(func() SolClientReturnCode { return C.SessionCreate(sessionPropsP, diff --git a/internal/ccsmp/ccsmp_helper.c b/internal/ccsmp/ccsmp_helper.c index d5a92fc..1cc2812 100644 --- a/internal/ccsmp/ccsmp_helper.c +++ b/internal/ccsmp/ccsmp_helper.c @@ -18,7 +18,6 @@ #include "solclient/solClient.h" #include "solclient/solClientMsg.h" #include "solclient/solCache.h" -#include // remove later, only needed for debugging // // external callbacks defined in ccsmp_callbacks.c @@ -298,12 +297,6 @@ CacheSessionSendCacheRequest( solClient_cacheRequestFlags_t cacheFlags, solClient_subscribeFlags_t subscribeFlags) { - /* We can use the cache session pointer as the unique identifier for the cache request since by design - * are having only one cache request/response per cache session, and the pointer to the cache session - * object is necessarily unique. It is easiest to use the cache session pointer since it is a unique - * value that will persist until the end of the cache request lifetime, at which point the user_p is - * removed from any subscription tables anyways. */ - void * user_p = (void *)opaqueCacheSession_p; solClient_session_rxMsgDispatchFuncInfo_t dispatchInfo; /* msg dispatch callback to set */ dispatchInfo.dispatchType = SOLCLIENT_DISPATCH_TYPE_CALLBACK; dispatchInfo.callback_p = messageReceiveCallback; @@ -315,7 +308,7 @@ CacheSessionSendCacheRequest( topic_p, cacheRequestId, (solCache_eventCallbackFunc_t)cacheEventCallback, - /* CCSMP does not copy the contents of user_p, only the pointer. This means we cannot have + /* NOTE: CCSMP does not copy the contents of user_p, only the pointer. This means we cannot have * Go-allocated memory as the object being pointed to, since that object might be garbage * collected before the cache response is received and the user_p is used for some purpose * by either the CCSMP or PSPGo APIs. We also cannot allocate a struct for user_p in C since @@ -323,9 +316,11 @@ CacheSessionSendCacheRequest( * properly. This means that our only option, AFAIK, is to just pass the cache session * pointer as a void pointer, since its lifecycle is managed outside of this function in a * safe way, and must survive at least until CCSMP receives a cache response or the request - * is cancelled. + * is cancelled. While destroying the cache session, the user_p/opaqueCacheSession_p will be + * removed from the tables anyways. Cancelling a cache request is always followed by destroying + * the cache associated session. * */ - user_p, + (void *)opaqueCacheSession_p, cacheFlags, subscribeFlags, &dispatchInfo); diff --git a/internal/impl/core/receiver.go b/internal/impl/core/receiver.go index 722e57c..80ddffd 100644 --- a/internal/impl/core/receiver.go +++ b/internal/impl/core/receiver.go @@ -71,7 +71,7 @@ type Receiver interface { IncrementDuplicateAckCount() // Creates a new persistent receiver with the given callback NewPersistentReceiver(properties []string, callback RxCallback, eventCallback PersistentEventCallback) (PersistentReceiver, ErrorInfo) - /* TODO: The `GetSessionPointer` method seems a litte out of place here. For now it works, but it might be an, + /* FFC: The `GetSessionPointer` method seems a litte out of place here. For now it works, but it might be an, * anti-pattern so we should look into clarifying this and maybe doing this differently in a future iteration.*/ // Retrieves the sesion pointer @@ -381,7 +381,7 @@ func (receiver *ccsmpBackedReceiver) NewPersistentReceiver(properties []string, }, nil } -/* FFC: It might be better if this were in ccsmp_core.go? */ +// GetSessionPointer returns the opaque pointer to the session associated with the given receiver. func (receiver *ccsmpBackedReceiver) GetSessionPointer() ccsmp.SolClientSessionPt { return receiver.session.GetPointer() } diff --git a/internal/impl/receiver/cache_util.go b/internal/impl/receiver/cache_util.go index dc320c1..fafc8f7 100644 --- a/internal/impl/receiver/cache_util.go +++ b/internal/impl/receiver/cache_util.go @@ -29,7 +29,7 @@ import ( // PollAndProcessCacheResponseChannel is intended to be run as a go routine. func (receiver *directMessageReceiverImpl) PollAndProcessCacheResponseChannel() { receiver.cachePollingRunning.Store(true) - var cacheEventInfo ccsmp.SolClientCacheEventInfo + var cacheEventInfo ccsmp.CacheEventInfo channelIsOpen := true /* poll cacheventinfo channel */ for channelIsOpen { @@ -41,7 +41,7 @@ func (receiver *directMessageReceiverImpl) PollAndProcessCacheResponseChannel() // Any function that closes the channel must guarantee this. break } - /* We decrement the counter first, since as soon as we pop the SolClientCacheEventInfo + /* We decrement the counter first, since as soon as we pop the CacheEventInfo * off the channel, CCSMP is able to put another on. If CCSMP is able resume processing the * cache responses, we should unblock the application by allowing it to submit more cache * requests ASAP.*/ @@ -52,9 +52,14 @@ func (receiver *directMessageReceiverImpl) PollAndProcessCacheResponseChannel() } // ProcessCacheEvent is intended to be run by any agent trying to process a cache response. This can be run from a polling go routine, or during termination to cleanup remaining resources, and possibly by other agents as well. -func (receiver *directMessageReceiverImpl) ProcessCacheEvent(cacheEventInfo ccsmp.SolClientCacheEventInfo) { +func (receiver *directMessageReceiverImpl) ProcessCacheEvent(cacheEventInfo ccsmp.CacheEventInfo) { + fmt.Printf("ProcessCacheEvent::cacheEventInfo is:\n%s\n", cacheEventInfo.String()) + if receiver.logger.IsDebugEnabled() { + receiver.logger.Debug(fmt.Sprintf("ProcessCacheEvent::cacheEventInfo is:\n%s\n", cacheEventInfo.String())) + } cacheSessionP := cacheEventInfo.GetCacheSessionPointer() receiver.cacheLock.Lock() + fmt.Printf("ProcessCacheEvent::loading cache response processor\n") cacheResponseHolder, found := receiver.loadCacheResponseProcessorFromMapUnsafe(cacheSessionP) receiver.cacheLock.Unlock() if !found { diff --git a/internal/impl/receiver/direct_message_receiver_impl.go b/internal/impl/receiver/direct_message_receiver_impl.go index dd03401..9e87f12 100644 --- a/internal/impl/receiver/direct_message_receiver_impl.go +++ b/internal/impl/receiver/direct_message_receiver_impl.go @@ -93,7 +93,7 @@ type directMessageReceiverImpl struct { // as specified by the application on a call to a [ReceiverCacheRequester] interface. cacheSessionMap map[ccsmp.SolClientCacheSessionPt]CacheResponseProcessor // ([keyType]valueType) [ccsmp.SolClientCacheSessionPt]CacheResponseProcessor // cacheResponseChan is used to buffer the cache responses from CCSMP. - cacheResponseChan chan ccsmp.SolClientCacheEventInfo + cacheResponseChan chan ccsmp.CacheEventInfo /* NOTE: This data type could be changed from int32 if necessary, but must match the type of * directMessageReceiverImpl.cacheResponseChanCounter. */ // cacheResponseChanCounter is used to prevent cache requests from being submitted if the @@ -948,7 +948,7 @@ func (receiver *directMessageReceiverImpl) addCacheSessionToMapIfNotPresent(hold return err } -// isAvailableForCache returns true if the receiver is ready to send a cache request, or false if it is not. +// isAvailableForCache returns nil if the receiver is ready to send a cache request, or an error if it is not. func (receiver *directMessageReceiverImpl) checkStateForCacheRequest() error { var err error var errorString string = "" @@ -975,7 +975,7 @@ func (receiver *directMessageReceiverImpl) checkStateForCacheRequest() error { func (receiver *directMessageReceiverImpl) initCacheResourcesIfNotPresent() { if receiver.cacheResponseChan == nil { - receiver.cacheResponseChan = make(chan ccsmp.SolClientCacheEventInfo, cacheResponseChannelMaxSize) + receiver.cacheResponseChan = make(chan ccsmp.CacheEventInfo, cacheResponseChannelMaxSize) } if !receiver.cachePollingRunning.Load() { go receiver.PollAndProcessCacheResponseChannel() @@ -995,11 +995,11 @@ func (receiver *directMessageReceiverImpl) initCacheResourcesIfNotPresent() { } } -func (receiver *directMessageReceiverImpl) generateCacheRequestCancellationNotice(cacheSessionP ccsmp.SolClientCacheSessionPt, cacheResponseProcessor CacheResponseProcessor, errorInfo *ccsmp.SolClientErrorInfoWrapper) ccsmp.SolClientCacheEventInfo { +func (receiver *directMessageReceiverImpl) generateCacheRequestCancellationNotice(cacheSessionP ccsmp.SolClientCacheSessionPt, cacheResponseProcessor CacheResponseProcessor, errorInfo *ccsmp.SolClientErrorInfoWrapper) ccsmp.CacheEventInfo { if receiver.logger.IsDebugEnabled() { receiver.logger.Debug(constants.AttemptingCancellationNoticeGeneration) } - cacheEventInfo := ccsmp.NewConfiguredSolClientCacheEventInfo(cacheSessionP, cacheResponseProcessor.GetCacheRequestInfo().GetCacheRequestID(), cacheResponseProcessor.GetCacheRequestInfo().GetTopic(), core.ToNativeError(errorInfo, constants.FailedToCancelCacheRequest)) + cacheEventInfo := ccsmp.NewConfiguredCacheEventInfo(cacheSessionP, cacheResponseProcessor.GetCacheRequestInfo().GetCacheRequestID(), cacheResponseProcessor.GetCacheRequestInfo().GetTopic(), core.ToNativeError(errorInfo, constants.FailedToCancelCacheRequest)) return cacheEventInfo } @@ -1041,7 +1041,6 @@ func (receiver *directMessageReceiverImpl) teardownCache() { * different thread right now, or it's already been done before. */ return } - receiver.cacheLock.Lock() if running := receiver.cachePollingRunning.Load(); !running { /* We can return early since either the resources and shutdown are being handled by a * different thread right now, or it's already been done before. */ @@ -1061,6 +1060,7 @@ func (receiver *directMessageReceiverImpl) teardownCache() { * application.*/ /* Release the mutex and then shutdown the PollAndProcessCacheResponseChannel goroutine and * cacheResponseChan. */ + receiver.cacheLock.Lock() close(receiver.cacheResponseChan) /* NOTE: Release the mutex so that `ProcessCacheEvent()` can clear the channel.*/ receiver.cacheLock.Unlock() @@ -1091,6 +1091,9 @@ func (receiver *directMessageReceiverImpl) requestCached(cachedMessageSubscripti } /* We don't need to release the mutex on calls to CreateCacheSession because there are no callbacks. */ cacheSessionP, errInfo := ccsmp.CreateCacheSession(propsList, receiver.internalReceiver.GetSessionPointer()) + if receiver.logger.IsDebugEnabled() { + receiver.logger.Debug(fmt.Sprintf("Created cache session 0x%x", cacheSessionP)) + } if errInfo != nil { errorString := constants.FailedToCreateCacheSession + constants.WithCacheRequestID + fmt.Sprintf("%d", cacheRequestID) @@ -1104,7 +1107,7 @@ func (receiver *directMessageReceiverImpl) requestCached(cachedMessageSubscripti } /* Run go routine that sends cache request */ - var cacheEventCallback ccsmp.SolClientCacheEventCallback = func(cacheEventInfo ccsmp.SolClientCacheEventInfo) { + var cacheEventCallback ccsmp.SolClientCacheEventCallback = func(cacheEventInfo ccsmp.CacheEventInfo) { receiver.cacheResponseChanCounter.Add(1) receiver.cacheResponseChan <- cacheEventInfo } @@ -1114,6 +1117,9 @@ func (receiver *directMessageReceiverImpl) requestCached(cachedMessageSubscripti receiver.logger.Warning(errorString) return solace.NewError(&solace.IllegalArgumentError{}, errorString, nil) } + if receiver.logger.IsDebugEnabled() { + receiver.logger.Debug(fmt.Sprintf("Sending cache request on cache session px%x", cacheSessionP)) + } errInfo = ccsmp.SendCacheRequest(receiver.dispatch, cacheSessionP, cachedMessageSubscriptionRequest.GetName(), @@ -1139,6 +1145,7 @@ func (receiver *directMessageReceiverImpl) RequestCachedAsync(cachedMessageSubsc /* We don't need to check the channel that is returned here since this functionality is tested through unit * testing and because we just instantiated the channel ourselves. */ if channel, ok := chHolder.GetChannel(); ok { + //receiver.cacheLock.Unlock() err := receiver.requestCached(cachedMessageSubscriptionRequest, cacheRequestID, chHolder) if err != nil { /* NOTE: Rely on requestCached to log error. */ @@ -1147,6 +1154,7 @@ func (receiver *directMessageReceiverImpl) RequestCachedAsync(cachedMessageSubsc } return channel, err } + //receiver.cacheLock.Unlock() /* NOTE: We should never get to this point since we know we just set the holder, but we need to include error * handling just in case, so that the program doesn't panic.*/ errorString := constants.FailedToRetrieveChannel diff --git a/pkg/solace/receiver_cache_requests.go b/pkg/solace/receiver_cache_requests.go index df6b35c..5960bd7 100644 --- a/pkg/solace/receiver_cache_requests.go +++ b/pkg/solace/receiver_cache_requests.go @@ -32,7 +32,6 @@ import ( // resulting from outstanding cache requests. Data messages related to the cache response willbe passed through the // conventional [Receiver] interfaces of [Receive()] and [ReceiveAsync()]. type ReceiverCacheRequests interface { - /* TODO: Check the error types in this doc string are correct. */ // RequestCachedAsync asynchronously requests cached data from a cache and defers processing of the resulting // cache response to the application throufh the returned channel. @@ -40,8 +39,6 @@ type ReceiverCacheRequests interface { // Returns InvalidConfigurationError if an invalid [resource.CachedMessageSubscriptionRequest] was passed. RequestCachedAsync(cachedMessageSubscriptionRequest resource.CachedMessageSubscriptionRequest, cacheRequestID message.CacheRequestID) (<-chan CacheResponse, error) - /* TODO: Check the error types in this doc string are correct. */ - // RequestCachedAsyncWithCallback asynchronously requests cached data from a cache and processes the resulting // cache response through the provided function callback. // Returns IllegalStateError if the service is not connected or the receiver is not running.