Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release v1.6.1 #19

Merged
merged 9 commits into from
May 6, 2024
25 changes: 24 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ jobs:
- name: Setup Go environment
uses: actions/[email protected]
with:
go-version: '1.21'
go-version: '1.22'
check-latest: true
- name: Check Go Version
run: go version
Expand All @@ -66,6 +66,29 @@ jobs:
echo "$OUTPUT"
exit 1
fi

- name: Runs go vet
if: ${{ success() }}
run: |
OUTPUT=$(go vet ./... 2>&1)
if [ ! -z "$OUTPUT" ]; then
echo "go vet failed on the following:"
echo "$OUTPUT"
exit 1
fi

- name: Runs staticcheck
if: ${{ success() }}
run: |
# use pinned version of the tool but check if this needs an update on go version bump
go install honnef.co/go/tools/cmd/[email protected]
OUTPUT=$(staticcheck --checks=all ./...)
if [ ! -z "$OUTPUT" ]; then
echo "staticcheck failed on the following:"
echo "$OUTPUT"
exit 1
fi

- name: Runs unit tests
if: ${{ success() }}
run: go test -coverprofile ./unitcoverage.out ./...
Expand Down
30 changes: 14 additions & 16 deletions internal/ccsmp/ccsmp_core.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,12 +348,11 @@ func (session *SolClientSession) solClientSessionSubscribeWithFlags(topic string
return handleCcsmpError(func() SolClientReturnCode {
cString := C.CString(topic)
defer C.free(unsafe.Pointer(cString))
// This is not an unsafe usage of unsafe.Pointer as we are using correlationId as data, not as a pointer
return C.SessionTopicSubscribeWithFlags(session.pointer,
cString,
flags,
C.uintptr_to_void_p(C.solClient_uint64_t(dispatchID)),
C.uintptr_to_void_p(C.solClient_uint64_t(correlationID)))
C.solClient_uint64_t(dispatchID),
C.solClient_uint64_t(correlationID))
})
}

Expand All @@ -362,12 +361,11 @@ func (session *SolClientSession) solClientSessionSubscribeReplyTopicWithFlags(to
return handleCcsmpError(func() SolClientReturnCode {
cString := C.CString(topic)
defer C.free(unsafe.Pointer(cString))
// This is not an unsafe usage of unsafe.Pointer as we are using correlationId as data, not as a pointer
return C.SessionReplyTopicSubscribeWithFlags(session.pointer,
cString,
flags,
C.uintptr_to_void_p(C.solClient_uint64_t(dispatchID)),
C.uintptr_to_void_p(C.solClient_uint64_t(correlationID)))
C.solClient_uint64_t(dispatchID),
C.solClient_uint64_t(correlationID))
})
}

Expand All @@ -376,12 +374,11 @@ func (session *SolClientSession) solClientSessionUnsubscribeWithFlags(topic stri
return handleCcsmpError(func() SolClientReturnCode {
cString := C.CString(topic)
defer C.free(unsafe.Pointer(cString))
// This is not an unsafe usage of unsafe.Pointer as we are using correlationId as data, not as a pointer
return C.SessionTopicUnsubscribeWithFlags(session.pointer,
cString,
flags,
C.uintptr_to_void_p(C.solClient_uint64_t(dispatchID)),
C.uintptr_to_void_p(C.solClient_uint64_t(correlationID)))
C.solClient_uint64_t(dispatchID),
C.solClient_uint64_t(correlationID))
})
}

Expand All @@ -390,12 +387,11 @@ func (session *SolClientSession) solClientSessionUnsubscribeReplyTopicWithFlags(
return handleCcsmpError(func() SolClientReturnCode {
cString := C.CString(topic)
defer C.free(unsafe.Pointer(cString))
// This is not an unsafe usage of unsafe.Pointer as we are using correlationId as data, not as a pointer
return C.SessionReplyTopicUnsubscribeWithFlags(session.pointer,
cString,
flags,
C.uintptr_to_void_p(C.solClient_uint64_t(dispatchID)),
C.uintptr_to_void_p(C.solClient_uint64_t(correlationID)))
C.solClient_uint64_t(dispatchID),
C.solClient_uint64_t(correlationID))
})
}

Expand Down Expand Up @@ -435,9 +431,11 @@ func (session *SolClientSession) SolClientEndpointUnsusbcribe(properties []strin
defer C.free(unsafe.Pointer(cString))
endpointProps, endpointFree := ToCArray(properties, true)
defer endpointFree()
// This is not an unsafe usage of unsafe.Pointer as we are using correlationId as data, not as a pointer
return C.solClient_session_endpointTopicUnsubscribe(endpointProps, session.pointer,
C.SOLCLIENT_SUBSCRIBE_FLAGS_REQUEST_CONFIRM, cString, C.uintptr_to_void_p(C.solClient_uint64_t(correlationID)))
return C.SessionTopicEndpointUnsubscribeWithFlags(session.pointer,
endpointProps,
C.SOLCLIENT_SUBSCRIBE_FLAGS_REQUEST_CONFIRM,
cString,
C.solClient_uint64_t(correlationID))
})
}

Expand Down Expand Up @@ -570,7 +568,7 @@ func NewSessionDispatch(id uint64) (*SolClientSessionRxMsgDispatchFuncInfo, uint
return &SolClientSessionRxMsgDispatchFuncInfo{
dispatchType: C.SOLCLIENT_DISPATCH_TYPE_CALLBACK,
callback_p: (C.solClient_session_rxMsgCallbackFunc_t)(unsafe.Pointer(C.messageReceiveCallback)),
user_p: C.uintptr_to_void_p(C.solClient_uint64_t(ptr)),
user_p: nil, // this should be set to the uintptr
rfu_p: nil,
}, ptr
}
Expand Down
31 changes: 15 additions & 16 deletions internal/ccsmp/ccsmp_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@ import (
#include "solclient/solClient.h"
#include "solclient/solClientMsg.h"
#include "./ccsmp_helper.h"

solClient_rxMsgCallback_returnCode_t flowMessageReceiveCallback ( solClient_opaqueFlow_pt opaqueFlow_p, solClient_opaqueMsg_pt msg_p, void *user_p );
void flowEventCallback ( solClient_opaqueFlow_pt opaqueFlow_p, solClient_flow_eventCallbackInfo_pt eventInfo_p, void *user_p );
*/
import "C"

Expand Down Expand Up @@ -96,23 +93,19 @@ func (session *SolClientSession) SolClientSessionCreateFlow(properties []string,
flowPropsP, sessionPropertiesFreeFunction := ToCArray(properties, true)
defer sessionPropertiesFreeFunction()

var flowCreateFuncInfo C.solClient_flow_createFuncInfo_t

flowID := atomic.AddUintptr(&flowID, 1)

// These are not a misuse of unsafe.Pointer as the value is used for correlation
flowCreateFuncInfo.rxMsgInfo.callback_p = (C.solClient_flow_rxMsgCallbackFunc_t)(unsafe.Pointer(C.flowMessageReceiveCallback))
flowCreateFuncInfo.rxMsgInfo.user_p = C.uintptr_to_void_p(C.solClient_uint64_t(flowID))
flowCreateFuncInfo.eventInfo.callback_p = (C.solClient_flow_eventCallbackFunc_t)(unsafe.Pointer(C.flowEventCallback))
flowCreateFuncInfo.eventInfo.user_p = C.uintptr_to_void_p(C.solClient_uint64_t(flowID))

flowToRXCallbackMap.Store(flowID, msgCallback)
flowToEventCallbackMap.Store(flowID, eventCallback)

flow := &SolClientFlow{}
flow.userP = flowID
err := handleCcsmpError(func() SolClientReturnCode {
return C.solClient_session_createFlow(flowPropsP, session.pointer, &flow.pointer, &flowCreateFuncInfo, (C.size_t)(unsafe.Sizeof(flowCreateFuncInfo)))
// this will register the goFlowMessageReceiveCallback and goFlowEventCallback callbacks with the flowID
return C.SessionFlowCreate(session.pointer,
flowPropsP,
&flow.pointer,
C.solClient_uint64_t(flowID))
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -153,8 +146,11 @@ func (flow *SolClientFlow) SolClientFlowSubscribe(topic string, correlationID ui
return handleCcsmpError(func() SolClientReturnCode {
cString := C.CString(topic)
defer C.free(unsafe.Pointer(cString))
// This is not an unsafe usage of unsafe.Pointer as we are using correlationId as data, not as a pointer
return C.solClient_flow_topicSubscribeWithDispatch(flow.pointer, C.SOLCLIENT_SUBSCRIBE_FLAGS_REQUEST_CONFIRM, cString, nil, C.uintptr_to_void_p(C.solClient_uint64_t(correlationID)))
return C.FlowTopicSubscribeWithDispatch(flow.pointer,
C.SOLCLIENT_SUBSCRIBE_FLAGS_REQUEST_CONFIRM,
cString,
nil,
C.solClient_uint64_t(correlationID))
})
}

Expand All @@ -163,8 +159,11 @@ func (flow *SolClientFlow) SolClientFlowUnsubscribe(topic string, correlationID
return handleCcsmpError(func() SolClientReturnCode {
cString := C.CString(topic)
defer C.free(unsafe.Pointer(cString))
// This is not an unsafe usage of unsafe.Pointer as we are using correlationId as data, not as a pointer
return C.solClient_flow_topicUnsubscribeWithDispatch(flow.pointer, C.SOLCLIENT_SUBSCRIBE_FLAGS_REQUEST_CONFIRM, cString, nil, C.uintptr_to_void_p(C.solClient_uint64_t(correlationID)))
return C.FlowTopicUnsubscribeWithDispatch(flow.pointer,
C.SOLCLIENT_SUBSCRIBE_FLAGS_REQUEST_CONFIRM,
cString,
nil,
C.solClient_uint64_t(correlationID))
})
}

Expand Down
123 changes: 91 additions & 32 deletions internal/ccsmp/ccsmp_helper.c
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ messageReceiveCallback(solClient_opaqueSession_pt opaqueSession_p, solClient_opa
solClient_rxMsgCallback_returnCode_t
requestResponseReplyMessageReceiveCallback(solClient_opaqueSession_pt opaqueSession_p, solClient_opaqueMsg_pt msg_p, void *user_p);

void *uintptr_to_void_p(solClient_uint64_t ptr)
{
return (void *)ptr;
}
solClient_rxMsgCallback_returnCode_t
flowMessageReceiveCallback(solClient_opaqueFlow_pt opaqueFlow_p, solClient_opaqueMsg_pt msg_p, void *user_p);

void flowEventCallback(solClient_opaqueFlow_pt opaqueFlow_p, solClient_flow_eventCallbackInfo_pt eventInfo_p, void *user_p);

solClient_returnCode_t
solClientgo_msg_isRequestReponseMsg(solClient_opaqueMsg_pt msg_p, char **correlationId_p) {
Expand All @@ -54,103 +54,162 @@ solClientgo_msg_isRequestReponseMsg(solClient_opaqueMsg_pt msg_p, char **correla
}

solClient_returnCode_t
_SessionTopicSubscribeWithFlags( solClient_opaqueSession_pt opaqueSession_p,
const char *topicSubscription_p,
SessionFlowCreate( solClient_opaqueSession_pt opaqueSession_p,
solClient_propertyArray_pt flowPropsP,
solClient_opaqueFlow_pt *opaqueFlow_p,
solClient_uint64_t flowID)
{
/* set the flowID in the flow create struct */
solClient_flow_createFuncInfo_t flowCreateFuncInfo;
flowCreateFuncInfo.rxMsgInfo.callback_p = flowMessageReceiveCallback;
flowCreateFuncInfo.rxMsgInfo.user_p = (void *)flowID;
flowCreateFuncInfo.eventInfo.callback_p = (solClient_flow_eventCallbackFunc_t)flowEventCallback;
flowCreateFuncInfo.eventInfo.user_p = (void *)flowID;
// allocate these struct fields too
flowCreateFuncInfo.rxInfo.user_p = NULL;
flowCreateFuncInfo.rxInfo.callback_p = NULL;

return solClient_session_createFlow(flowPropsP, opaqueSession_p, opaqueFlow_p, &flowCreateFuncInfo, sizeof(flowCreateFuncInfo));
}

solClient_returnCode_t
FlowTopicSubscribeWithDispatch( solClient_opaqueFlow_pt opaqueFlow_p,
solClient_subscribeFlags_t flags,
solClient_session_rxMsgCallbackFunc_t callback_p,
void *dispatchId_p,
void *correlationTag_p)
const char *topicSubscription_p,
solClient_flow_rxMsgDispatchFuncInfo_t *dispatchFuncInfo_p,
solClient_uint64_t correlationTag)
{
return solClient_flow_topicSubscribeWithDispatch( opaqueFlow_p,
flags,
topicSubscription_p,
dispatchFuncInfo_p,
(void *)correlationTag);
}

solClient_returnCode_t
FlowTopicUnsubscribeWithDispatch( solClient_opaqueFlow_pt opaqueFlow_p,
solClient_subscribeFlags_t flags,
const char *topicSubscription_p,
solClient_flow_rxMsgDispatchFuncInfo_t *dispatchFuncInfo_p,
solClient_uint64_t correlationTag)
{
return solClient_flow_topicUnsubscribeWithDispatch( opaqueFlow_p,
flags,
topicSubscription_p,
dispatchFuncInfo_p,
(void *)correlationTag);
}

solClient_returnCode_t
_SessionTopicSubscribeWithFlags( solClient_opaqueSession_pt opaqueSession_p,
const char *topicSubscription_p,
solClient_subscribeFlags_t flags,
solClient_session_rxMsgCallbackFunc_t callback_p,
solClient_uint64_t dispatchId,
solClient_uint64_t correlationTag)
{
solClient_session_rxMsgDispatchFuncInfo_t dispatchInfo; /* msg dispatch callback to set */
dispatchInfo.dispatchType = SOLCLIENT_DISPATCH_TYPE_CALLBACK;
dispatchInfo.callback_p = callback_p;
dispatchInfo.user_p = dispatchId_p;
dispatchInfo.user_p = (void *)dispatchId;
dispatchInfo.rfu_p = NULL;
return solClient_session_topicSubscribeWithDispatch ( opaqueSession_p,
flags,
topicSubscription_p,
&dispatchInfo,
correlationTag_p);
(void *)correlationTag);
}


solClient_returnCode_t
SessionTopicSubscribeWithFlags( solClient_opaqueSession_pt opaqueSession_p,
const char *topicSubscription_p,
solClient_subscribeFlags_t flags,
void *dispatchId_p,
void *correlationTag_p)
solClient_uint64_t dispatchId,
solClient_uint64_t correlationTag)
{
return _SessionTopicSubscribeWithFlags ( opaqueSession_p,
topicSubscription_p,
flags,
messageReceiveCallback,
dispatchId_p,
correlationTag_p );
dispatchId,
correlationTag );
}

solClient_returnCode_t
SessionReplyTopicSubscribeWithFlags( solClient_opaqueSession_pt opaqueSession_p,
const char *topicSubscription_p,
solClient_subscribeFlags_t flags,
void *dispatchId_p,
void *correlationTag_p)
solClient_uint64_t dispatchId,
solClient_uint64_t correlationTag)
{
return _SessionTopicSubscribeWithFlags ( opaqueSession_p,
topicSubscription_p,
flags,
requestResponseReplyMessageReceiveCallback,
dispatchId_p,
correlationTag_p );
dispatchId,
correlationTag );
}

solClient_returnCode_t
_SessionTopicUnsubscribeWithFlags( solClient_opaqueSession_pt opaqueSession_p,
const char *topicSubscription_p,
solClient_subscribeFlags_t flags,
solClient_session_rxMsgCallbackFunc_t callback_p,
void *dispatchId_p,
void *correlationTag_p)
solClient_uint64_t dispatchId,
solClient_uint64_t correlationTag)
{
solClient_session_rxMsgDispatchFuncInfo_t dispatchInfo; /* msg dispatch callback to set */
dispatchInfo.dispatchType = SOLCLIENT_DISPATCH_TYPE_CALLBACK;
dispatchInfo.callback_p = callback_p;
dispatchInfo.user_p = dispatchId_p;
dispatchInfo.user_p = (void *)dispatchId;
dispatchInfo.rfu_p = NULL;
return solClient_session_topicUnsubscribeWithDispatch ( opaqueSession_p,
flags,
topicSubscription_p,
&dispatchInfo,
correlationTag_p);
(void *)correlationTag);
}

solClient_returnCode_t
SessionTopicUnsubscribeWithFlags( solClient_opaqueSession_pt opaqueSession_p,
const char *topicSubscription_p,
solClient_subscribeFlags_t flags,
void *dispatchId_p,
void *correlationTag_p)
solClient_uint64_t dispatchId,
solClient_uint64_t correlationTag)
{
return _SessionTopicUnsubscribeWithFlags ( opaqueSession_p,
topicSubscription_p,
flags,
messageReceiveCallback,
dispatchId_p,
correlationTag_p );
dispatchId,
correlationTag );
}

solClient_returnCode_t
SessionReplyTopicUnsubscribeWithFlags( solClient_opaqueSession_pt opaqueSession_p,
const char *topicSubscription_p,
solClient_subscribeFlags_t flags,
void *dispatchId_p,
void *correlationTag_p)
solClient_uint64_t dispatchId,
solClient_uint64_t correlationTag)
{
return _SessionTopicUnsubscribeWithFlags ( opaqueSession_p,
topicSubscription_p,
flags,
requestResponseReplyMessageReceiveCallback,
dispatchId_p,
correlationTag_p );
dispatchId,
correlationTag );
}

solClient_returnCode_t
SessionTopicEndpointUnsubscribeWithFlags( solClient_opaqueSession_pt opaqueSession_p,
solClient_propertyArray_pt endpointProps,
solClient_subscribeFlags_t flags,
const char *topicSubscription_p,
solClient_uint64_t correlationTag)
{
return solClient_session_endpointTopicUnsubscribe( endpointProps,
opaqueSession_p,
flags,
topicSubscription_p,
(void *)correlationTag);
}
Loading
Loading