Skip to content

Commit

Permalink
Merge pull request #19 from SolaceDev/dev
Browse files Browse the repository at this point in the history
Release v1.6.1
  • Loading branch information
cjwmorgan-sol authored May 6, 2024
2 parents 73d5d85 + 33cebd5 commit cd7f02d
Show file tree
Hide file tree
Showing 7 changed files with 194 additions and 88 deletions.
25 changes: 24 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ jobs:
- name: Setup Go environment
uses: actions/[email protected]
with:
go-version: '1.21'
go-version: '1.22'
check-latest: true
- name: Check Go Version
run: go version
Expand All @@ -66,6 +66,29 @@ jobs:
echo "$OUTPUT"
exit 1
fi
- name: Runs go vet
if: ${{ success() }}
run: |
OUTPUT=$(go vet ./... 2>&1)
if [ ! -z "$OUTPUT" ]; then
echo "go vet failed on the following:"
echo "$OUTPUT"
exit 1
fi
- name: Runs staticcheck
if: ${{ success() }}
run: |
# use pinned version of the tool but check if this needs an update on go version bump
go install honnef.co/go/tools/cmd/[email protected]
OUTPUT=$(staticcheck --checks=all ./...)
if [ ! -z "$OUTPUT" ]; then
echo "staticcheck failed on the following:"
echo "$OUTPUT"
exit 1
fi
- name: Runs unit tests
if: ${{ success() }}
run: go test -coverprofile ./unitcoverage.out ./...
Expand Down
30 changes: 14 additions & 16 deletions internal/ccsmp/ccsmp_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,12 +348,11 @@ func (session *SolClientSession) solClientSessionSubscribeWithFlags(topic string
return handleCcsmpError(func() SolClientReturnCode {
cString := C.CString(topic)
defer C.free(unsafe.Pointer(cString))
// This is not an unsafe usage of unsafe.Pointer as we are using correlationId as data, not as a pointer
return C.SessionTopicSubscribeWithFlags(session.pointer,
cString,
flags,
C.uintptr_to_void_p(C.solClient_uint64_t(dispatchID)),
C.uintptr_to_void_p(C.solClient_uint64_t(correlationID)))
C.solClient_uint64_t(dispatchID),
C.solClient_uint64_t(correlationID))
})
}

Expand All @@ -362,12 +361,11 @@ func (session *SolClientSession) solClientSessionSubscribeReplyTopicWithFlags(to
return handleCcsmpError(func() SolClientReturnCode {
cString := C.CString(topic)
defer C.free(unsafe.Pointer(cString))
// This is not an unsafe usage of unsafe.Pointer as we are using correlationId as data, not as a pointer
return C.SessionReplyTopicSubscribeWithFlags(session.pointer,
cString,
flags,
C.uintptr_to_void_p(C.solClient_uint64_t(dispatchID)),
C.uintptr_to_void_p(C.solClient_uint64_t(correlationID)))
C.solClient_uint64_t(dispatchID),
C.solClient_uint64_t(correlationID))
})
}

Expand All @@ -376,12 +374,11 @@ func (session *SolClientSession) solClientSessionUnsubscribeWithFlags(topic stri
return handleCcsmpError(func() SolClientReturnCode {
cString := C.CString(topic)
defer C.free(unsafe.Pointer(cString))
// This is not an unsafe usage of unsafe.Pointer as we are using correlationId as data, not as a pointer
return C.SessionTopicUnsubscribeWithFlags(session.pointer,
cString,
flags,
C.uintptr_to_void_p(C.solClient_uint64_t(dispatchID)),
C.uintptr_to_void_p(C.solClient_uint64_t(correlationID)))
C.solClient_uint64_t(dispatchID),
C.solClient_uint64_t(correlationID))
})
}

Expand All @@ -390,12 +387,11 @@ func (session *SolClientSession) solClientSessionUnsubscribeReplyTopicWithFlags(
return handleCcsmpError(func() SolClientReturnCode {
cString := C.CString(topic)
defer C.free(unsafe.Pointer(cString))
// This is not an unsafe usage of unsafe.Pointer as we are using correlationId as data, not as a pointer
return C.SessionReplyTopicUnsubscribeWithFlags(session.pointer,
cString,
flags,
C.uintptr_to_void_p(C.solClient_uint64_t(dispatchID)),
C.uintptr_to_void_p(C.solClient_uint64_t(correlationID)))
C.solClient_uint64_t(dispatchID),
C.solClient_uint64_t(correlationID))
})
}

Expand Down Expand Up @@ -435,9 +431,11 @@ func (session *SolClientSession) SolClientEndpointUnsusbcribe(properties []strin
defer C.free(unsafe.Pointer(cString))
endpointProps, endpointFree := ToCArray(properties, true)
defer endpointFree()
// This is not an unsafe usage of unsafe.Pointer as we are using correlationId as data, not as a pointer
return C.solClient_session_endpointTopicUnsubscribe(endpointProps, session.pointer,
C.SOLCLIENT_SUBSCRIBE_FLAGS_REQUEST_CONFIRM, cString, C.uintptr_to_void_p(C.solClient_uint64_t(correlationID)))
return C.SessionTopicEndpointUnsubscribeWithFlags(session.pointer,
endpointProps,
C.SOLCLIENT_SUBSCRIBE_FLAGS_REQUEST_CONFIRM,
cString,
C.solClient_uint64_t(correlationID))
})
}

Expand Down Expand Up @@ -570,7 +568,7 @@ func NewSessionDispatch(id uint64) (*SolClientSessionRxMsgDispatchFuncInfo, uint
return &SolClientSessionRxMsgDispatchFuncInfo{
dispatchType: C.SOLCLIENT_DISPATCH_TYPE_CALLBACK,
callback_p: (C.solClient_session_rxMsgCallbackFunc_t)(unsafe.Pointer(C.messageReceiveCallback)),
user_p: C.uintptr_to_void_p(C.solClient_uint64_t(ptr)),
user_p: nil, // this should be set to the uintptr
rfu_p: nil,
}, ptr
}
Expand Down
31 changes: 15 additions & 16 deletions internal/ccsmp/ccsmp_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@ import (
#include "solclient/solClient.h"
#include "solclient/solClientMsg.h"
#include "./ccsmp_helper.h"
solClient_rxMsgCallback_returnCode_t flowMessageReceiveCallback ( solClient_opaqueFlow_pt opaqueFlow_p, solClient_opaqueMsg_pt msg_p, void *user_p );
void flowEventCallback ( solClient_opaqueFlow_pt opaqueFlow_p, solClient_flow_eventCallbackInfo_pt eventInfo_p, void *user_p );
*/
import "C"

Expand Down Expand Up @@ -96,23 +93,19 @@ func (session *SolClientSession) SolClientSessionCreateFlow(properties []string,
flowPropsP, sessionPropertiesFreeFunction := ToCArray(properties, true)
defer sessionPropertiesFreeFunction()

var flowCreateFuncInfo C.solClient_flow_createFuncInfo_t

flowID := atomic.AddUintptr(&flowID, 1)

// These are not a misuse of unsafe.Pointer as the value is used for correlation
flowCreateFuncInfo.rxMsgInfo.callback_p = (C.solClient_flow_rxMsgCallbackFunc_t)(unsafe.Pointer(C.flowMessageReceiveCallback))
flowCreateFuncInfo.rxMsgInfo.user_p = C.uintptr_to_void_p(C.solClient_uint64_t(flowID))
flowCreateFuncInfo.eventInfo.callback_p = (C.solClient_flow_eventCallbackFunc_t)(unsafe.Pointer(C.flowEventCallback))
flowCreateFuncInfo.eventInfo.user_p = C.uintptr_to_void_p(C.solClient_uint64_t(flowID))

flowToRXCallbackMap.Store(flowID, msgCallback)
flowToEventCallbackMap.Store(flowID, eventCallback)

flow := &SolClientFlow{}
flow.userP = flowID
err := handleCcsmpError(func() SolClientReturnCode {
return C.solClient_session_createFlow(flowPropsP, session.pointer, &flow.pointer, &flowCreateFuncInfo, (C.size_t)(unsafe.Sizeof(flowCreateFuncInfo)))
// this will register the goFlowMessageReceiveCallback and goFlowEventCallback callbacks with the flowID
return C.SessionFlowCreate(session.pointer,
flowPropsP,
&flow.pointer,
C.solClient_uint64_t(flowID))
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -153,8 +146,11 @@ func (flow *SolClientFlow) SolClientFlowSubscribe(topic string, correlationID ui
return handleCcsmpError(func() SolClientReturnCode {
cString := C.CString(topic)
defer C.free(unsafe.Pointer(cString))
// This is not an unsafe usage of unsafe.Pointer as we are using correlationId as data, not as a pointer
return C.solClient_flow_topicSubscribeWithDispatch(flow.pointer, C.SOLCLIENT_SUBSCRIBE_FLAGS_REQUEST_CONFIRM, cString, nil, C.uintptr_to_void_p(C.solClient_uint64_t(correlationID)))
return C.FlowTopicSubscribeWithDispatch(flow.pointer,
C.SOLCLIENT_SUBSCRIBE_FLAGS_REQUEST_CONFIRM,
cString,
nil,
C.solClient_uint64_t(correlationID))
})
}

Expand All @@ -163,8 +159,11 @@ func (flow *SolClientFlow) SolClientFlowUnsubscribe(topic string, correlationID
return handleCcsmpError(func() SolClientReturnCode {
cString := C.CString(topic)
defer C.free(unsafe.Pointer(cString))
// This is not an unsafe usage of unsafe.Pointer as we are using correlationId as data, not as a pointer
return C.solClient_flow_topicUnsubscribeWithDispatch(flow.pointer, C.SOLCLIENT_SUBSCRIBE_FLAGS_REQUEST_CONFIRM, cString, nil, C.uintptr_to_void_p(C.solClient_uint64_t(correlationID)))
return C.FlowTopicUnsubscribeWithDispatch(flow.pointer,
C.SOLCLIENT_SUBSCRIBE_FLAGS_REQUEST_CONFIRM,
cString,
nil,
C.solClient_uint64_t(correlationID))
})
}

Expand Down
123 changes: 91 additions & 32 deletions internal/ccsmp/ccsmp_helper.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ messageReceiveCallback(solClient_opaqueSession_pt opaqueSession_p, solClient_opa
solClient_rxMsgCallback_returnCode_t
requestResponseReplyMessageReceiveCallback(solClient_opaqueSession_pt opaqueSession_p, solClient_opaqueMsg_pt msg_p, void *user_p);

void *uintptr_to_void_p(solClient_uint64_t ptr)
{
return (void *)ptr;
}
solClient_rxMsgCallback_returnCode_t
flowMessageReceiveCallback(solClient_opaqueFlow_pt opaqueFlow_p, solClient_opaqueMsg_pt msg_p, void *user_p);

void flowEventCallback(solClient_opaqueFlow_pt opaqueFlow_p, solClient_flow_eventCallbackInfo_pt eventInfo_p, void *user_p);

solClient_returnCode_t
solClientgo_msg_isRequestReponseMsg(solClient_opaqueMsg_pt msg_p, char **correlationId_p) {
Expand All @@ -54,103 +54,162 @@ solClientgo_msg_isRequestReponseMsg(solClient_opaqueMsg_pt msg_p, char **correla
}

solClient_returnCode_t
_SessionTopicSubscribeWithFlags( solClient_opaqueSession_pt opaqueSession_p,
const char *topicSubscription_p,
SessionFlowCreate( solClient_opaqueSession_pt opaqueSession_p,
solClient_propertyArray_pt flowPropsP,
solClient_opaqueFlow_pt *opaqueFlow_p,
solClient_uint64_t flowID)
{
/* set the flowID in the flow create struct */
solClient_flow_createFuncInfo_t flowCreateFuncInfo;
flowCreateFuncInfo.rxMsgInfo.callback_p = flowMessageReceiveCallback;
flowCreateFuncInfo.rxMsgInfo.user_p = (void *)flowID;
flowCreateFuncInfo.eventInfo.callback_p = (solClient_flow_eventCallbackFunc_t)flowEventCallback;
flowCreateFuncInfo.eventInfo.user_p = (void *)flowID;
// allocate these struct fields too
flowCreateFuncInfo.rxInfo.user_p = NULL;
flowCreateFuncInfo.rxInfo.callback_p = NULL;

return solClient_session_createFlow(flowPropsP, opaqueSession_p, opaqueFlow_p, &flowCreateFuncInfo, sizeof(flowCreateFuncInfo));
}

solClient_returnCode_t
FlowTopicSubscribeWithDispatch( solClient_opaqueFlow_pt opaqueFlow_p,
solClient_subscribeFlags_t flags,
solClient_session_rxMsgCallbackFunc_t callback_p,
void *dispatchId_p,
void *correlationTag_p)
const char *topicSubscription_p,
solClient_flow_rxMsgDispatchFuncInfo_t *dispatchFuncInfo_p,
solClient_uint64_t correlationTag)
{
return solClient_flow_topicSubscribeWithDispatch( opaqueFlow_p,
flags,
topicSubscription_p,
dispatchFuncInfo_p,
(void *)correlationTag);
}

solClient_returnCode_t
FlowTopicUnsubscribeWithDispatch( solClient_opaqueFlow_pt opaqueFlow_p,
solClient_subscribeFlags_t flags,
const char *topicSubscription_p,
solClient_flow_rxMsgDispatchFuncInfo_t *dispatchFuncInfo_p,
solClient_uint64_t correlationTag)
{
return solClient_flow_topicUnsubscribeWithDispatch( opaqueFlow_p,
flags,
topicSubscription_p,
dispatchFuncInfo_p,
(void *)correlationTag);
}

solClient_returnCode_t
_SessionTopicSubscribeWithFlags( solClient_opaqueSession_pt opaqueSession_p,
const char *topicSubscription_p,
solClient_subscribeFlags_t flags,
solClient_session_rxMsgCallbackFunc_t callback_p,
solClient_uint64_t dispatchId,
solClient_uint64_t correlationTag)
{
solClient_session_rxMsgDispatchFuncInfo_t dispatchInfo; /* msg dispatch callback to set */
dispatchInfo.dispatchType = SOLCLIENT_DISPATCH_TYPE_CALLBACK;
dispatchInfo.callback_p = callback_p;
dispatchInfo.user_p = dispatchId_p;
dispatchInfo.user_p = (void *)dispatchId;
dispatchInfo.rfu_p = NULL;
return solClient_session_topicSubscribeWithDispatch ( opaqueSession_p,
flags,
topicSubscription_p,
&dispatchInfo,
correlationTag_p);
(void *)correlationTag);
}


solClient_returnCode_t
SessionTopicSubscribeWithFlags( solClient_opaqueSession_pt opaqueSession_p,
const char *topicSubscription_p,
solClient_subscribeFlags_t flags,
void *dispatchId_p,
void *correlationTag_p)
solClient_uint64_t dispatchId,
solClient_uint64_t correlationTag)
{
return _SessionTopicSubscribeWithFlags ( opaqueSession_p,
topicSubscription_p,
flags,
messageReceiveCallback,
dispatchId_p,
correlationTag_p );
dispatchId,
correlationTag );
}

solClient_returnCode_t
SessionReplyTopicSubscribeWithFlags( solClient_opaqueSession_pt opaqueSession_p,
const char *topicSubscription_p,
solClient_subscribeFlags_t flags,
void *dispatchId_p,
void *correlationTag_p)
solClient_uint64_t dispatchId,
solClient_uint64_t correlationTag)
{
return _SessionTopicSubscribeWithFlags ( opaqueSession_p,
topicSubscription_p,
flags,
requestResponseReplyMessageReceiveCallback,
dispatchId_p,
correlationTag_p );
dispatchId,
correlationTag );
}

solClient_returnCode_t
_SessionTopicUnsubscribeWithFlags( solClient_opaqueSession_pt opaqueSession_p,
const char *topicSubscription_p,
solClient_subscribeFlags_t flags,
solClient_session_rxMsgCallbackFunc_t callback_p,
void *dispatchId_p,
void *correlationTag_p)
solClient_uint64_t dispatchId,
solClient_uint64_t correlationTag)
{
solClient_session_rxMsgDispatchFuncInfo_t dispatchInfo; /* msg dispatch callback to set */
dispatchInfo.dispatchType = SOLCLIENT_DISPATCH_TYPE_CALLBACK;
dispatchInfo.callback_p = callback_p;
dispatchInfo.user_p = dispatchId_p;
dispatchInfo.user_p = (void *)dispatchId;
dispatchInfo.rfu_p = NULL;
return solClient_session_topicUnsubscribeWithDispatch ( opaqueSession_p,
flags,
topicSubscription_p,
&dispatchInfo,
correlationTag_p);
(void *)correlationTag);
}

solClient_returnCode_t
SessionTopicUnsubscribeWithFlags( solClient_opaqueSession_pt opaqueSession_p,
const char *topicSubscription_p,
solClient_subscribeFlags_t flags,
void *dispatchId_p,
void *correlationTag_p)
solClient_uint64_t dispatchId,
solClient_uint64_t correlationTag)
{
return _SessionTopicUnsubscribeWithFlags ( opaqueSession_p,
topicSubscription_p,
flags,
messageReceiveCallback,
dispatchId_p,
correlationTag_p );
dispatchId,
correlationTag );
}

solClient_returnCode_t
SessionReplyTopicUnsubscribeWithFlags( solClient_opaqueSession_pt opaqueSession_p,
const char *topicSubscription_p,
solClient_subscribeFlags_t flags,
void *dispatchId_p,
void *correlationTag_p)
solClient_uint64_t dispatchId,
solClient_uint64_t correlationTag)
{
return _SessionTopicUnsubscribeWithFlags ( opaqueSession_p,
topicSubscription_p,
flags,
requestResponseReplyMessageReceiveCallback,
dispatchId_p,
correlationTag_p );
dispatchId,
correlationTag );
}

solClient_returnCode_t
SessionTopicEndpointUnsubscribeWithFlags( solClient_opaqueSession_pt opaqueSession_p,
solClient_propertyArray_pt endpointProps,
solClient_subscribeFlags_t flags,
const char *topicSubscription_p,
solClient_uint64_t correlationTag)
{
return solClient_session_endpointTopicUnsubscribe( endpointProps,
opaqueSession_p,
flags,
topicSubscription_p,
(void *)correlationTag);
}
Loading

0 comments on commit cd7f02d

Please sign in to comment.