Skip to content

Commit

Permalink
EBP-18: Made changes in response to PR feedback.
Browse files Browse the repository at this point in the history
  • Loading branch information
TrentDaniel committed Jan 13, 2025
1 parent 8c60bea commit d9ef9d4
Show file tree
Hide file tree
Showing 7 changed files with 55 additions and 58 deletions.
51 changes: 24 additions & 27 deletions internal/ccsmp/ccsmp_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,6 @@ var CachedMessageSubscriptionRequestStrategyMappingToCCSMPSubscribeFlags = map[r
// SolClientCacheSessionPt is assigned a value
type SolClientCacheSessionPt = C.solClient_opaqueCacheSession_pt

/* TODO: Not sure if we really need this. */

// SolClientCacheSession structure
type SolClientCacheSession struct {
pointer SolClientCacheSessionPt
}

/*
sessionToCacheEventCallbackMap is required as a global var even though cache sessions etc. are
Expand Down Expand Up @@ -117,13 +110,11 @@ func SendCacheRequest(
) *SolClientErrorInfoWrapper {

cacheToEventCallbackMap.Store(cacheSessionP, eventCallback)
/* NOTE: Do we need to add the dispatch callback to the map here, or is that already done by the receiver in calls to Receive() or ReceiveAsync()? */

topicP := C.CString(topic)
errorInfo := handleCcsmpError(func() SolClientReturnCode {
return C.CacheSessionSendCacheRequest(
C.solClient_uint64_t(dispatchID),
// TODO: rework this, we should probably just have a pointer and not a whole object.
cacheSessionP,
topicP,
C.solClient_uint64_t(cacheRequestID), // This is done elsewhere such as in SolClientMessgeSetSequenceNumber
Expand All @@ -137,8 +128,12 @@ func SendCacheRequest(
func DestroyCacheSession(cacheSessionP SolClientCacheSessionPt) *SolClientErrorInfoWrapper {
/* NOTE: We remove the cache session pointer from the map before we destroy the cache session
* so that map access using that pointer as an index won't collide with another cache session
* that is created immediately after this one is destroyed.*/
* that is created immediately after this one is destroyed. This should be deleted in the cache
* event callback. We delete again here in case there was a problem in the cache event callback
* so that we don't leak resources. If the entry has already been deleted, the following call
* will not fail, and will effectively be a no-op.*/
cacheToEventCallbackMap.Delete(cacheSessionP)

return handleCcsmpError(func() SolClientReturnCode {
return C.CacheSessionDestroy(&cacheSessionP)
})
Expand All @@ -150,17 +145,14 @@ func CancelCacheRequest(cacheSessionP SolClientCacheSessionPt) *SolClientErrorIn
})
}

// SolClientCacheEventCallback functions should format CCSMP args into Go objects and then pass those objects
// to a separate function that actually processes them, maybe through a channel with a background go routine
// polling it?
type SolClientCacheEventCallback = func(SolClientCacheEventInfo)
type SolClientCacheEventInfoPt = C.solCache_eventCallbackInfo_pt
// SolClientCacheEventCallback functions format CCSMP args into Go objects and then pass those objects
// to a separate function that actually processes them
type SolClientCacheEventCallback = func(CacheEventInfo)
type CacheEventInfoPt = C.solCache_eventCallbackInfo_pt

type SolClientCacheEvent = C.solCache_event_t

type SolClientCacheEventInfo struct {
/* TODO: Rename this to CacheEventInfo to better distinguish it from CCSMP objects, since it now has more than
* just the original event fields. */
type CacheEventInfo struct {
cacheSessionP SolClientCacheSessionPt
event SolClientCacheEvent
topic string
Expand All @@ -170,16 +162,16 @@ type SolClientCacheEventInfo struct {
err error
}

func (eventInfo *SolClientCacheEventInfo) String() string {
return fmt.Sprintf("SolClientCacheEventInfo:\n\tcacheSesionP: 0x%x\n\tevent: %d\n\ttopic: %s\n\treturnCode: %d\n\tsubCode: %d\n\tcacheRequestID: %d\n\terr: %s", eventInfo.cacheSessionP, eventInfo.event, eventInfo.topic, eventInfo.returnCode, eventInfo.subCode, eventInfo.cacheRequestID, eventInfo.err)
func (eventInfo *CacheEventInfo) String() string {
return fmt.Sprintf("CacheEventInfo:\n\tcacheSessionP: 0x%x\n\tevent: %d\n\ttopic: %s\n\treturnCode: %d\n\tsubCode: %d\n\tcacheRequestID: %d\n\terr: %s", eventInfo.cacheSessionP, eventInfo.event, eventInfo.topic, eventInfo.returnCode, eventInfo.subCode, eventInfo.cacheRequestID, eventInfo.err)
}

const (
SolClientCacheEventRequestCompletedNotice SolClientCacheEvent = 1
)

func NewConfiguredSolClientCacheEventInfo(cacheSessionP SolClientCacheSessionPt, cacheRequestID message.CacheRequestID, topic string, err error) SolClientCacheEventInfo {
return SolClientCacheEventInfo{
func NewConfiguredCacheEventInfo(cacheSessionP SolClientCacheSessionPt, cacheRequestID message.CacheRequestID, topic string, err error) CacheEventInfo {
return CacheEventInfo{
cacheSessionP: cacheSessionP,
event: SolClientCacheEventRequestCompletedNotice,
topic: topic,
Expand All @@ -190,12 +182,12 @@ func NewConfiguredSolClientCacheEventInfo(cacheSessionP SolClientCacheSessionPt,
}
}

func (i *SolClientCacheEventInfo) GetCacheSessionPointer() SolClientCacheSessionPt {
func (i *CacheEventInfo) GetCacheSessionPointer() SolClientCacheSessionPt {
return i.cacheSessionP
}

func SolClientCacheEventInfoFromCoreCacheEventInfo(eventCallbackInfo SolClientCacheEventInfoPt, userP uintptr) SolClientCacheEventInfo {
return SolClientCacheEventInfo{
func CacheEventInfoFromCoreCacheEventInfo(eventCallbackInfo CacheEventInfoPt, userP uintptr) CacheEventInfo {
return CacheEventInfo{
cacheSessionP: SolClientCacheSessionPt(userP),
event: SolClientCacheEvent(eventCallbackInfo.cacheEvent),
topic: C.GoString(eventCallbackInfo.topic),
Expand All @@ -206,13 +198,18 @@ func SolClientCacheEventInfoFromCoreCacheEventInfo(eventCallbackInfo SolClientCa
}

//export goCacheEventCallback
func goCacheEventCallback( /*opaqueSessionP*/ _ SolClientSessionPt, eventCallbackInfo SolClientCacheEventInfoPt, userP unsafe.Pointer) {
func goCacheEventCallback( /*opaqueSessionP*/ _ SolClientSessionPt, eventCallbackInfo CacheEventInfoPt, userP unsafe.Pointer) {
/* NOTE: We don't need to use the session pointer since we can use the user_p(a.k.a. the cache session pointer)
* which is guaranteed to be unique for at least the duration that the cache session pointer is in the global
* map since during receiver termination we destory the cache session only after we remove it from all maps.
*/
if callback, ok := cacheToEventCallbackMap.Load(SolClientCacheSessionPt(uintptr(userP))); ok {
callback.(SolClientCacheEventCallback)(SolClientCacheEventInfoFromCoreCacheEventInfo(eventCallbackInfo, uintptr(userP)))
eventInfo := CacheEventInfoFromCoreCacheEventInfo(eventCallbackInfo, uintptr(userP))
callback.(SolClientCacheEventCallback)(eventInfo)
/* NOTE: We remove the cache session pointer from the map before we destroy the cache session
* so that map access using that pointer as an index won't collide with another cache session
* that is created immediately after this one is destroyed.*/
cacheToEventCallbackMap.Delete(eventInfo.cacheSessionP)
} else {
if logging.Default.IsDebugEnabled() {
logging.Default.Debug("Received event callback from core API without an associated cache event callback")
Expand Down
7 changes: 1 addition & 6 deletions internal/ccsmp/ccsmp_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func goEventCallback(sessionP SolClientSessionPt, eventInfoP SolClientSessionEve
if callback, ok := sessionToEventCallbackMap.Load(sessionP); ok {
callback.(SolClientSessionEventCallback)(SolClientSessionEvent(eventInfoP.sessionEvent), eventInfoP.responseCode, C.GoString(eventInfoP.info_p), eventInfoP.correlation_p, userP)
} else {
logging.Default.Debug(fmt.Sprintf("Received event callback from core API without an associated session callback:\n%s\n", C.GoString(eventInfoP.info_p)))
logging.Default.Debug("Received event callback from core API without an associated session callback")
}
}

Expand Down Expand Up @@ -334,11 +334,6 @@ func (context *SolClientContext) SolClientSessionCreate(properties []string) (se
var sessionP SolClientSessionPt
sessionPropsP, sessionPropertiesFreeFunction := ToCArray(properties, true)
defer sessionPropertiesFreeFunction()
var sessionFuncInfo C.solClient_session_createFuncInfo_t
sessionFuncInfo.rxMsgInfo.callback_p = (C.solClient_session_rxMsgCallbackFunc_t)(unsafe.Pointer(C.defaultMessageReceiveCallback))
sessionFuncInfo.rxMsgInfo.user_p = nil
sessionFuncInfo.eventInfo.callback_p = (C.solClient_session_eventCallbackFunc_t)(unsafe.Pointer(C.eventCallback))
sessionFuncInfo.eventInfo.user_p = nil

solClientErrorInfo := handleCcsmpError(func() SolClientReturnCode {
return C.SessionCreate(sessionPropsP,
Expand Down
15 changes: 5 additions & 10 deletions internal/ccsmp/ccsmp_helper.c
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
#include "solclient/solClient.h"
#include "solclient/solClientMsg.h"
#include "solclient/solCache.h"
#include <stdio.h> // remove later, only needed for debugging

//
// external callbacks defined in ccsmp_callbacks.c
Expand Down Expand Up @@ -298,12 +297,6 @@ CacheSessionSendCacheRequest(
solClient_cacheRequestFlags_t cacheFlags,
solClient_subscribeFlags_t subscribeFlags)
{
/* We can use the cache session pointer as the unique identifier for the cache request since by design
* are having only one cache request/response per cache session, and the pointer to the cache session
* object is necessarily unique. It is easiest to use the cache session pointer since it is a unique
* value that will persist until the end of the cache request lifetime, at which point the user_p is
* removed from any subscription tables anyways. */
void * user_p = (void *)opaqueCacheSession_p;
solClient_session_rxMsgDispatchFuncInfo_t dispatchInfo; /* msg dispatch callback to set */
dispatchInfo.dispatchType = SOLCLIENT_DISPATCH_TYPE_CALLBACK;
dispatchInfo.callback_p = messageReceiveCallback;
Expand All @@ -315,17 +308,19 @@ CacheSessionSendCacheRequest(
topic_p,
cacheRequestId,
(solCache_eventCallbackFunc_t)cacheEventCallback,
/* CCSMP does not copy the contents of user_p, only the pointer. This means we cannot have
/* NOTE: CCSMP does not copy the contents of user_p, only the pointer. This means we cannot have
* Go-allocated memory as the object being pointed to, since that object might be garbage
* collected before the cache response is received and the user_p is used for some purpose
* by either the CCSMP or PSPGo APIs. We also cannot allocate a struct for user_p in C since
* it will go out of scope by the end of this function and we won't be able to clean it up
* properly. This means that our only option, AFAIK, is to just pass the cache session
* pointer as a void pointer, since its lifecycle is managed outside of this function in a
* safe way, and must survive at least until CCSMP receives a cache response or the request
* is cancelled.
* is cancelled. While destroying the cache session, the user_p/opaqueCacheSession_p will be
* removed from the tables anyways. Cancelling a cache request is always followed by destroying
* the cache associated session.
* */
user_p,
(void *)opaqueCacheSession_p,
cacheFlags,
subscribeFlags,
&dispatchInfo);
Expand Down
4 changes: 2 additions & 2 deletions internal/impl/core/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type Receiver interface {
IncrementDuplicateAckCount()
// Creates a new persistent receiver with the given callback
NewPersistentReceiver(properties []string, callback RxCallback, eventCallback PersistentEventCallback) (PersistentReceiver, ErrorInfo)
/* TODO: The `GetSessionPointer` method seems a litte out of place here. For now it works, but it might be an,
/* FFC: The `GetSessionPointer` method seems a litte out of place here. For now it works, but it might be an,
* anti-pattern so we should look into clarifying this and maybe doing this differently in a future iteration.*/

// Retrieves the sesion pointer
Expand Down Expand Up @@ -381,7 +381,7 @@ func (receiver *ccsmpBackedReceiver) NewPersistentReceiver(properties []string,
}, nil
}

/* FFC: It might be better if this were in ccsmp_core.go? */
// GetSessionPointer returns the opaque pointer to the session associated with the given receiver.
func (receiver *ccsmpBackedReceiver) GetSessionPointer() ccsmp.SolClientSessionPt {
return receiver.session.GetPointer()
}
Expand Down
11 changes: 8 additions & 3 deletions internal/impl/receiver/cache_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
// PollAndProcessCacheResponseChannel is intended to be run as a go routine.
func (receiver *directMessageReceiverImpl) PollAndProcessCacheResponseChannel() {
receiver.cachePollingRunning.Store(true)
var cacheEventInfo ccsmp.SolClientCacheEventInfo
var cacheEventInfo ccsmp.CacheEventInfo
channelIsOpen := true
/* poll cacheventinfo channel */
for channelIsOpen {
Expand All @@ -41,7 +41,7 @@ func (receiver *directMessageReceiverImpl) PollAndProcessCacheResponseChannel()
// Any function that closes the channel must guarantee this.
break
}
/* We decrement the counter first, since as soon as we pop the SolClientCacheEventInfo
/* We decrement the counter first, since as soon as we pop the CacheEventInfo
* off the channel, CCSMP is able to put another on. If CCSMP is able resume processing the
* cache responses, we should unblock the application by allowing it to submit more cache
* requests ASAP.*/
Expand All @@ -52,9 +52,14 @@ func (receiver *directMessageReceiverImpl) PollAndProcessCacheResponseChannel()
}

// ProcessCacheEvent is intended to be run by any agent trying to process a cache response. This can be run from a polling go routine, or during termination to cleanup remaining resources, and possibly by other agents as well.
func (receiver *directMessageReceiverImpl) ProcessCacheEvent(cacheEventInfo ccsmp.SolClientCacheEventInfo) {
func (receiver *directMessageReceiverImpl) ProcessCacheEvent(cacheEventInfo ccsmp.CacheEventInfo) {
fmt.Printf("ProcessCacheEvent::cacheEventInfo is:\n%s\n", cacheEventInfo.String())
if receiver.logger.IsDebugEnabled() {
receiver.logger.Debug(fmt.Sprintf("ProcessCacheEvent::cacheEventInfo is:\n%s\n", cacheEventInfo.String()))
}
cacheSessionP := cacheEventInfo.GetCacheSessionPointer()
receiver.cacheLock.Lock()
fmt.Printf("ProcessCacheEvent::loading cache response processor\n")
cacheResponseHolder, found := receiver.loadCacheResponseProcessorFromMapUnsafe(cacheSessionP)
receiver.cacheLock.Unlock()
if !found {
Expand Down
Loading

0 comments on commit d9ef9d4

Please sign in to comment.