Skip to content

Commit

Permalink
EBP-18: Added testing. Cleanup up source.
Browse files Browse the repository at this point in the history
  • Loading branch information
TrentDaniel committed Jan 9, 2025
1 parent f8271e8 commit 7285966
Show file tree
Hide file tree
Showing 21 changed files with 1,666 additions and 1,214 deletions.
223 changes: 110 additions & 113 deletions internal/ccsmp/ccsmp_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package ccsmp

/*
#cgo CFLAGS: -DSOLCLIENT_PSPLUS_GO
#include <stdlib.h>
#include <stdio.h>
Expand All @@ -41,13 +42,13 @@ solClient_returnCode_t _solClient_version_set(solClient_version_info_pt version_
*/
import "C"
import (
"unsafe"
"sync"
"fmt"
"fmt"
"sync"
"unsafe"

"solace.dev/go/messaging/pkg/solace/resource"
"solace.dev/go/messaging/pkg/solace/message"
"solace.dev/go/messaging/internal/impl/logging"
"solace.dev/go/messaging/internal/impl/logging"
"solace.dev/go/messaging/pkg/solace/message"
"solace.dev/go/messaging/pkg/solace/resource"
)

// CachedMessageSubscriptionRequestStrategyMappingToCCSMPCacheRequestFlags is the mapping for Cached message Subscription Request Strategies to respective CCSMP cache request flags
Expand All @@ -59,25 +60,26 @@ var CachedMessageSubscriptionRequestStrategyMappingToCCSMPCacheRequestFlags = ma
}

// CachedMessageSubscriptionRequestStrategyMappingToCCSMPSubscribeFlags is the mapping for Cached message Subscription Request Strategies to respective CCSMP subscription flags
var CachedMessageSubscriptionRequestStrategyMappingToCCSMPSubscribeFlags = map[resource.CachedMessageSubscriptionStrategy]C.solClient_subscribeFlags_t {
resource.AsAvailable: C.SOLCLIENT_SUBSCRIBE_FLAGS_REQUEST_CONFIRM,
resource.LiveCancelsCached: C.SOLCLIENT_SUBSCRIBE_FLAGS_REQUEST_CONFIRM,
resource.CachedFirst: C.SOLCLIENT_SUBSCRIBE_FLAGS_REQUEST_CONFIRM,
resource.CachedOnly: C.SOLCLIENT_SUBSCRIBE_FLAGS_LOCAL_DISPATCH_ONLY,
var CachedMessageSubscriptionRequestStrategyMappingToCCSMPSubscribeFlags = map[resource.CachedMessageSubscriptionStrategy]C.solClient_subscribeFlags_t{
resource.AsAvailable: C.SOLCLIENT_SUBSCRIBE_FLAGS_REQUEST_CONFIRM,
resource.LiveCancelsCached: C.SOLCLIENT_SUBSCRIBE_FLAGS_REQUEST_CONFIRM,
resource.CachedFirst: C.SOLCLIENT_SUBSCRIBE_FLAGS_REQUEST_CONFIRM,
resource.CachedOnly: C.SOLCLIENT_SUBSCRIBE_FLAGS_LOCAL_DISPATCH_ONLY,
}


// SolClientCacheSessionPt is assigned a value
type SolClientCacheSessionPt = C.solClient_opaqueCacheSession_pt

/* TODO: Not sure if we really need this. */

// SolClientCacheSession structure
type SolClientCacheSession struct {
pointer SolClientCacheSessionPt
pointer SolClientCacheSessionPt
}

/* sessionToCacheEventCallbackMap is required as a global var even though cache sessions etc. are
/*
sessionToCacheEventCallbackMap is required as a global var even though cache sessions etc. are
scoped to a single receiver. This is required because the event callback that is passed to CCSMP when
performing an async cache request cannot have a pointer into Go-managed memory by being associated with
receiver. If it did, and CCSMP the event callback after the Go-managed receiver instance was garbage
Expand All @@ -91,63 +93,61 @@ to this one.
var cacheToEventCallbackMap sync.Map

func CreateCacheSession(cacheSessionProperties []string, sessionP SolClientSessionPt) (SolClientCacheSessionPt, *SolClientErrorInfoWrapper) {
cacheSessionPropertiesP , freeArrayFunc := ToCArray(cacheSessionProperties, true)
defer freeArrayFunc()
var cacheSessionP SolClientCacheSessionPt
errorInfo := handleCcsmpError(func() SolClientReturnCode {
return C.SessionCreateCacheSession(
cacheSessionPropertiesP,
sessionP,
&cacheSessionP,
)
})
fmt.Printf("cacheSessionP after cacheSessionCreate is: %p\n", cacheSessionP)
/* TODO: handle errors from cache sessino create? */
return cacheSessionP, errorInfo
cacheSessionPropertiesP, freeArrayFunc := ToCArray(cacheSessionProperties, true)
defer freeArrayFunc()
var cacheSessionP SolClientCacheSessionPt
errorInfo := handleCcsmpError(func() SolClientReturnCode {
return C.SessionCreateCacheSession(
cacheSessionPropertiesP,
sessionP,
&cacheSessionP,
)
})
return cacheSessionP, errorInfo
}

func SendCacheRequest (
dispatchID uintptr,
cacheSessionP SolClientCacheSessionPt,
topic string,
cacheRequestID message.CacheRequestID,
cacheRequestFlags C.solClient_cacheRequestFlags_t, // There may be a custom type for this? TBD
subscribeFlags C.solClient_subscribeFlags_t, // There may be a custom type for this? TBD
eventCallback SolClientCacheEventCallback,
) *SolClientErrorInfoWrapper {

cacheToEventCallbackMap.Store(cacheSessionP, eventCallback)
/* NOTE: Do we need to add the dispatch callback to the map here, or is that already done by the receiver in calls to Receive() or ReceiveAsync()? */

topicP := C.CString(topic)
errorInfo := handleCcsmpError(func() SolClientReturnCode {
return C.CacheSessionSendCacheRequest(
C.solClient_uint64_t(dispatchID),
// TODO: rework this, we should probably just have a pointer and not a whole object.
cacheSessionP,
topicP,
C.solClient_uint64_t(cacheRequestID), // This is done elsewhere such as in SolClientMessgeSetSequenceNumber
cacheRequestFlags,
subscribeFlags,
)
})
return errorInfo
func SendCacheRequest(
dispatchID uintptr,
cacheSessionP SolClientCacheSessionPt,
topic string,
cacheRequestID message.CacheRequestID,
cacheRequestFlags C.solClient_cacheRequestFlags_t, // There may be a custom type for this? TBD
subscribeFlags C.solClient_subscribeFlags_t, // There may be a custom type for this? TBD
eventCallback SolClientCacheEventCallback,
) *SolClientErrorInfoWrapper {

cacheToEventCallbackMap.Store(cacheSessionP, eventCallback)
/* NOTE: Do we need to add the dispatch callback to the map here, or is that already done by the receiver in calls to Receive() or ReceiveAsync()? */

topicP := C.CString(topic)
errorInfo := handleCcsmpError(func() SolClientReturnCode {
return C.CacheSessionSendCacheRequest(
C.solClient_uint64_t(dispatchID),
// TODO: rework this, we should probably just have a pointer and not a whole object.
cacheSessionP,
topicP,
C.solClient_uint64_t(cacheRequestID), // This is done elsewhere such as in SolClientMessgeSetSequenceNumber
cacheRequestFlags,
subscribeFlags,
)
})
return errorInfo
}

func DestroyCacheSession(cacheSessionP SolClientCacheSessionPt) *SolClientErrorInfoWrapper {
/* NOTE: We remove the cache session pointer from the map before we destroy the cache session
* so that map access using that pointer as an index won't collide with another cache session
* that is created immediately after this one is destroyed.*/
cacheToEventCallbackMap.Delete(cacheSessionP)
return handleCcsmpError(func() SolClientReturnCode {
return C.CacheSessionDestroy(&cacheSessionP)
})
/* NOTE: We remove the cache session pointer from the map before we destroy the cache session
* so that map access using that pointer as an index won't collide with another cache session
* that is created immediately after this one is destroyed.*/
cacheToEventCallbackMap.Delete(cacheSessionP)
return handleCcsmpError(func() SolClientReturnCode {
return C.CacheSessionDestroy(&cacheSessionP)
})
}

func CancelCacheRequest(cacheSessionP SolClientCacheSessionPt) * SolClientErrorInfoWrapper {
return handleCcsmpError(func () SolClientReturnCode {
return C.CacheSessionCancelRequests(cacheSessionP)
})
func CancelCacheRequest(cacheSessionP SolClientCacheSessionPt) *SolClientErrorInfoWrapper {
return handleCcsmpError(func() SolClientReturnCode {
return C.CacheSessionCancelRequests(cacheSessionP)
})
}

// SolClientCacheEventCallback functions should format CCSMP args into Go objects and then pass those objects
Expand All @@ -159,66 +159,63 @@ type SolClientCacheEventInfoPt = C.solCache_eventCallbackInfo_pt
type SolClientCacheEvent = C.solCache_event_t

type SolClientCacheEventInfo struct {
/* TODO: Rename this to CacheEventInfo to better distinguish it from CCSMP objects, since it now has more than
* just the original event fields. */
cacheSessionP SolClientCacheSessionPt
event SolClientCacheEvent
topic string
returnCode SolClientReturnCode
subCode SolClientSubCode
cacheRequestID message.CacheRequestID
err error
/* TODO: Rename this to CacheEventInfo to better distinguish it from CCSMP objects, since it now has more than
* just the original event fields. */
cacheSessionP SolClientCacheSessionPt
event SolClientCacheEvent
topic string
returnCode SolClientReturnCode
subCode SolClientSubCode
cacheRequestID message.CacheRequestID
err error
}

func (eventInfo *SolClientCacheEventInfo) String() string {
return fmt.Sprintf("SolClientCacheEventInfo:\n\tcacheSesionP: 0x%x\n\tevent: %d\n\ttopic: %s\n\treturnCode: %d\n\tsubCode: %d\n\tcacheRequestID: %d\n\terr: %s", eventInfo.cacheSessionP, eventInfo.event, eventInfo.topic, eventInfo.returnCode, eventInfo.subCode, eventInfo.cacheRequestID, eventInfo.err)
}

const (
SolClientCacheEventRequestCompletedNotice SolClientCacheEvent = 1
SolClientCacheEventRequestCompletedNotice SolClientCacheEvent = 1
)

func NewConfiguredSolClientCacheEventInfo(cacheSessionP SolClientCacheSessionPt, cacheRequestID message.CacheRequestID, topic string, err error) SolClientCacheEventInfo {
return SolClientCacheEventInfo{
cacheSessionP: cacheSessionP,
event: SolClientCacheEventRequestCompletedNotice,
topic: topic,
returnCode: SolClientReturnCodeFail,
subCode: SolClientSubCodeInternalError,
cacheRequestID: cacheRequestID,
err: err,
}
return SolClientCacheEventInfo{
cacheSessionP: cacheSessionP,
event: SolClientCacheEventRequestCompletedNotice,
topic: topic,
returnCode: SolClientReturnCodeFail,
subCode: SolClientSubCodeInternalError,
cacheRequestID: cacheRequestID,
err: err,
}
}

func (i * SolClientCacheEventInfo) GetCacheSessionPointer() SolClientCacheSessionPt {
return i.cacheSessionP
func (i *SolClientCacheEventInfo) GetCacheSessionPointer() SolClientCacheSessionPt {
return i.cacheSessionP
}

func SolClientCacheEventInfoFromCoreCacheEventInfo(eventCallbackInfo SolClientCacheEventInfoPt, userP unsafe.Pointer) SolClientCacheEventInfo {
return SolClientCacheEventInfo{
cacheSessionP: SolClientCacheSessionPt(userP),
event: SolClientCacheEvent(eventCallbackInfo.cacheEvent),
topic: C.GoString(eventCallbackInfo.topic),
returnCode: SolClientReturnCode(eventCallbackInfo.rc),
subCode: SolClientSubCode(eventCallbackInfo.subCode),
cacheRequestID: message.CacheRequestID(eventCallbackInfo.cacheRequestId),
}
func SolClientCacheEventInfoFromCoreCacheEventInfo(eventCallbackInfo SolClientCacheEventInfoPt, userP uintptr) SolClientCacheEventInfo {
return SolClientCacheEventInfo{
cacheSessionP: SolClientCacheSessionPt(userP),
event: SolClientCacheEvent(eventCallbackInfo.cacheEvent),
topic: C.GoString(eventCallbackInfo.topic),
returnCode: SolClientReturnCode(eventCallbackInfo.rc),
subCode: SolClientSubCode(eventCallbackInfo.subCode),
cacheRequestID: message.CacheRequestID(eventCallbackInfo.cacheRequestId),
}
}

//export goCacheEventCallback
func goCacheEventCallback(/*opaqueSessionP*/_ SolClientSessionPt, eventCallbackInfo SolClientCacheEventInfoPt, userP unsafe.Pointer) {
/* NOTE: We don't need to use the session pointer since we can use the user_p(a.k.a. the cache session pointer)
* which is guaranteed to be unique for at least the duration that the cache session pointer is in the global
* map since during receiver termination we destory the cache session only after we remove it from all maps.
*/
fmt.Printf("Got to goCacheEventCallback\n")
fmt.Printf("goCacheEvntCallback::userP is %p\n", uintptr(userP))
fmt.Printf("goCacheEvntCallback::userP is %p\n", SolClientCacheSessionPt(userP))
//if callback, ok := cacheToEventCallbackMap.Load(uintptr(userP)); ok {
if callback, ok := cacheToEventCallbackMap.Load(SolClientCacheSessionPt(userP)); ok {
callback.(SolClientCacheEventCallback)(SolClientCacheEventInfoFromCoreCacheEventInfo(eventCallbackInfo, userP))
fmt.Printf("Finished receiver cache event callback\n")
} else {
fmt.Printf("Received event callback from core API without an associated cache event callback\n")
if logging.Default.IsDebugEnabled() {
logging.Default.Debug("Received event callback from core API without an associated cache event callback")
}
}
fmt.Printf("finished goCacheEventCallback\n")
func goCacheEventCallback( /*opaqueSessionP*/ _ SolClientSessionPt, eventCallbackInfo SolClientCacheEventInfoPt, userP unsafe.Pointer) {
/* NOTE: We don't need to use the session pointer since we can use the user_p(a.k.a. the cache session pointer)
* which is guaranteed to be unique for at least the duration that the cache session pointer is in the global
* map since during receiver termination we destory the cache session only after we remove it from all maps.
*/
if callback, ok := cacheToEventCallbackMap.Load(SolClientCacheSessionPt(uintptr(userP))); ok {
callback.(SolClientCacheEventCallback)(SolClientCacheEventInfoFromCoreCacheEventInfo(eventCallbackInfo, uintptr(userP)))
} else {
if logging.Default.IsDebugEnabled() {
logging.Default.Debug("Received event callback from core API without an associated cache event callback")
}
}
}
9 changes: 2 additions & 7 deletions internal/ccsmp/ccsmp_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,17 +111,12 @@ var sessionToEventCallbackMap sync.Map

//export goMessageReceiveCallback
func goMessageReceiveCallback(sessionP SolClientSessionPt, msgP SolClientMessagePt, userP unsafe.Pointer) C.solClient_rxMsgCallback_returnCode_t {
fmt.Printf("Got to goMessageReceiveCallback\n")
if callback, ok := sessionToRXCallbackMap.Load(sessionP); ok {
fmt.Printf("goMessageReceiveCallback::callback found\n")
if callback.(SolClientMessageCallback)(msgP, userP) {
fmt.Printf("goMessageReceiveCallback::callback returned true\n")
return C.SOLCLIENT_CALLBACK_TAKE_MSG
}
fmt.Printf("goMessageReceiveCallback::callback returned false\n")
return C.SOLCLIENT_CALLBACK_OK
}
fmt.Printf("goMessageReceiveCallback::callback not found\n")
logging.Default.Error("Received message from core API without an associated session callback")
return C.SOLCLIENT_CALLBACK_OK
}
Expand Down Expand Up @@ -150,7 +145,7 @@ func goEventCallback(sessionP SolClientSessionPt, eventInfoP SolClientSessionEve
if callback, ok := sessionToEventCallbackMap.Load(sessionP); ok {
callback.(SolClientSessionEventCallback)(SolClientSessionEvent(eventInfoP.sessionEvent), eventInfoP.responseCode, C.GoString(eventInfoP.info_p), eventInfoP.correlation_p, userP)
} else {
logging.Default.Debug(fmt.Sprintf("Received event callback from core API without an associated session callback:\n%s\n", C.GoString(eventInfoP.info_p)))
logging.Default.Debug(fmt.Sprintf("Received event callback from core API without an associated session callback:\n%s\n", C.GoString(eventInfoP.info_p)))
}
}

Expand Down Expand Up @@ -264,7 +259,7 @@ type SolClientSession struct {

// GetPointer returns the session pointer.
func (session *SolClientSession) GetPointer() SolClientSessionPt {
return session.pointer
return session.pointer
}

// SetMessageCallback sets the message callback to use
Expand Down
2 changes: 0 additions & 2 deletions internal/ccsmp/ccsmp_helper.c
Original file line number Diff line number Diff line change
Expand Up @@ -303,9 +303,7 @@ CacheSessionSendCacheRequest(
* object is necessarily unique. It is easiest to use the cache session pointer since it is a unique
* value that will persist until the end of the cache request lifetime, at which point the user_p is
* removed from any subscription tables anyways. */
printf("CacheSessionSendCacheRequest::opaqueCacheSession_p is %p", opaqueCacheSession_p);
void * user_p = (void *)opaqueCacheSession_p;
printf("CacheSessionSendCacheRequest::user_p is %p", user_p);
solClient_session_rxMsgDispatchFuncInfo_t dispatchInfo; /* msg dispatch callback to set */
dispatchInfo.dispatchType = SOLCLIENT_DISPATCH_TYPE_CALLBACK;
dispatchInfo.callback_p = messageReceiveCallback;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
// The environment variable SOLCACHE_H must be set to the absolute path to project_root/lib/<dist>/include/solclient/solCache.h.

const outputFile string = "ccsmp_cache_session_prop_generated.go"
const header string = "package ccsmp\n// Code generated by ccsmp_session_prop_generator.go via go generate. DO NOT EDIT.\n\n\n/*\n#include \"solclient/solCache.h\"\n*/\nimport \"C\"\nconst (\n"
const header string = "package ccsmp\n// Code generated by ccsmp_session_prop_generator.go via go generate. DO NOT EDIT.\n\n\n/*\n#cgo CFLAGS: -DSOLCLIENT_PSPLUS_GO\n#include \"solclient/solCache.h\"\n*/\nimport \"C\"\nconst (\n"
const footer string = ")\n"
const namePrefix string = "SolClientCacheSessionProp"
const definePrefix string = "SOLCLIENT_CACHESESSION_PROP"
Expand Down
7 changes: 6 additions & 1 deletion internal/ccsmp/lib/include/solclient/solCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,12 @@ typedef enum solCache_event {
SOLCACHE_EVENT_REQUEST_COMPLETED_NOTICE /** Cache Request has finished. The event returnCode and subCode provide status information */
} solCache_event_t;

typedef void *solClient_opaqueCacheSession_pt; /**< An opaque pointer to a cache session. */
#if defined(SOLCLIENT_PSPLUS_GO)
#include <stdint.h>
typedef uintptr_t solClient_opaqueCacheSession_pt; /**< An opaque pointer to a cache session. */
#else
typedef void *solClient_opaqueCacheSession_pt; /**< An opaque pointer to a cache session. */
#endif

/**
*
Expand Down
Loading

0 comments on commit 7285966

Please sign in to comment.