Skip to content

Commit

Permalink
Merge pull request #25 from SolaceDev/dev
Browse files Browse the repository at this point in the history
Release v1.8.0
  • Loading branch information
oodigie authored Dec 13, 2024
2 parents b633bee + 918421e commit ebe047d
Show file tree
Hide file tree
Showing 13 changed files with 796 additions and 69 deletions.
19 changes: 19 additions & 0 deletions internal/ccsmp/ccsmp_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,18 @@ type SolClientFlowEventInfoPt = C.solClient_flow_eventCallbackInfo_pt
// SolClientFlowRxMsgDispatchFuncInfo is assigned a value
type SolClientFlowRxMsgDispatchFuncInfo = C.solClient_flow_rxMsgDispatchFuncInfo_t

// SolClientMessageSettlementOutcome is assigned a value
type SolClientMessageSettlementOutcome = C.solClient_msgOutcome_t

// SolClientSettlementOutcomeAccepted - the message was successfully processed.
const SolClientSettlementOutcomeAccepted = C.SOLCLIENT_OUTCOME_ACCEPTED

// SolClientSettlementOutcomeFailed - message processing failed temporarily, attempt redelivery if configured.
const SolClientSettlementOutcomeFailed = C.SOLCLIENT_OUTCOME_FAILED

// SolClientSettlementOutcomeRejected - message was processed and rejected, removed from the queue.
const SolClientSettlementOutcomeRejected = C.SOLCLIENT_OUTCOME_REJECTED

// Callbacks

// SolClientFlowMessageCallback is assigned a function
Expand Down Expand Up @@ -174,6 +186,13 @@ func (flow *SolClientFlow) SolClientFlowAck(msgID SolClientMessageID) *SolClient
})
}

// SolClientFlowSettleMessage function
func (flow *SolClientFlow) SolClientFlowSettleMessage(msgID SolClientMessageID, settlementOutcome SolClientMessageSettlementOutcome) *SolClientErrorInfoWrapper {
return handleCcsmpError(func() SolClientReturnCode {
return C.solClient_flow_settleMsg(flow.pointer, msgID, settlementOutcome)
})
}

// SolClientFlowGetDestination function
func (flow *SolClientFlow) SolClientFlowGetDestination() (name string, durable bool, err *SolClientErrorInfoWrapper) {
var dest *SolClientDestination = &SolClientDestination{}
Expand Down
11 changes: 7 additions & 4 deletions internal/impl/constants/error_strings.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,14 @@ const UnableToStartReceiver = "cannot start the receiver as it has already been
// UnableToStartReceiverParentServiceNotStarted error string
const UnableToStartReceiverParentServiceNotStarted = "cannot start receiver unless parent MessagingService is connected"

// UnableToAcknowledgeAlreadyTerminated error string
const UnableToAcknowledgeAlreadyTerminated = "unable to acknowledge message: message receiver has been terminated"
// UnableToSettleAlreadyTerminated error string
const UnableToSettleAlreadyTerminated = "unable to settle message: message receiver has been terminated"

// UnableToAcknowledgeNotStarted error string
const UnableToAcknowledgeNotStarted = "unable to acknowledge meessage: message receiver is not yet started"
// UnableToSettleNotStarted error string
const UnableToSettleNotStarted = "unable to settle meessage: message receiver is not yet started"

// InvalidMessageSettlementOutcome error string
const InvalidMessageSettlementOutcome = "invalid message settlement outcome used to settle message"

// UnableToModifySubscriptionBadState error string
const UnableToModifySubscriptionBadState = "unable to modify subscriptions in state %s"
Expand Down
115 changes: 66 additions & 49 deletions internal/impl/core/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ var rxMetrics = map[metrics.Metric]ccsmp.SolClientStatsRX{
metrics.TotalBytesReceived: ccsmp.SolClientStatsRXTotalDataBytes,
metrics.TotalMessagesReceived: ccsmp.SolClientStatsRXTotalDataMsgs,
metrics.CompressedBytesReceived: ccsmp.SolClientStatsRXCompressedBytes,
metrics.PersistentMessagesAccepted: ccsmp.SolClientStatsRXSettleAccepted,
metrics.PersistentMessagesFailed: ccsmp.SolClientStatsRXSettleFailed,
metrics.PersistentMessagesRejected: ccsmp.SolClientStatsRXSettleRejected,
}

var txMetrics = map[metrics.Metric]ccsmp.SolClientStatsTX{
Expand Down Expand Up @@ -105,8 +108,9 @@ type Metrics interface {

// Implementation
type ccsmpBackedMetrics struct {
session *ccsmp.SolClientSession
metrics []uint64
session *ccsmp.SolClientSession
metrics []uint64
duplicateAcks uint64

metricLock sync.RWMutex
capturedTxMetrics map[ccsmp.SolClientStatsTX]uint64
Expand All @@ -115,98 +119,111 @@ type ccsmpBackedMetrics struct {

func newCcsmpMetrics(session *ccsmp.SolClientSession) *ccsmpBackedMetrics {
return &ccsmpBackedMetrics{
metrics: make([]uint64, metricCount),
session: session,
metrics: make([]uint64, metricCount),
duplicateAcks: uint64(0), // used to track duplicate acks count (auto-acks)
session: session,
}
}

func (metrics *ccsmpBackedMetrics) terminate() {
metrics.metricLock.Lock()
defer metrics.metricLock.Unlock()
metrics.captureTXStats()
metrics.captureRXStats()
func (backedMetrics *ccsmpBackedMetrics) terminate() {
backedMetrics.metricLock.Lock()
defer backedMetrics.metricLock.Unlock()
backedMetrics.captureTXStats()
backedMetrics.captureRXStats()
}

func (metrics *ccsmpBackedMetrics) captureTXStats() {
if metrics.capturedTxMetrics != nil {
func (backedMetrics *ccsmpBackedMetrics) captureTXStats() {
if backedMetrics.capturedTxMetrics != nil {
return
}
metrics.capturedTxMetrics = make(map[ccsmp.SolClientStatsTX]uint64)
backedMetrics.capturedTxMetrics = make(map[ccsmp.SolClientStatsTX]uint64)
for _, txStat := range txMetrics {
metrics.capturedTxMetrics[txStat] = metrics.session.SolClientSessionGetTXStat(txStat)
backedMetrics.capturedTxMetrics[txStat] = backedMetrics.session.SolClientSessionGetTXStat(txStat)
}
}

func (metrics *ccsmpBackedMetrics) captureRXStats() {
if metrics.capturedRxMetrics != nil {
func (backedMetrics *ccsmpBackedMetrics) captureRXStats() {
if backedMetrics.capturedRxMetrics != nil {
return
}
metrics.capturedRxMetrics = make(map[ccsmp.SolClientStatsRX]uint64)
backedMetrics.capturedRxMetrics = make(map[ccsmp.SolClientStatsRX]uint64)
for _, rxStat := range rxMetrics {
metrics.capturedRxMetrics[rxStat] = metrics.session.SolClientSessionGetRXStat(rxStat)
backedMetrics.capturedRxMetrics[rxStat] = backedMetrics.session.SolClientSessionGetRXStat(rxStat)
}
}

func (metrics *ccsmpBackedMetrics) getTXStat(stat ccsmp.SolClientStatsTX) uint64 {
metrics.metricLock.RLock()
defer metrics.metricLock.RUnlock()
if metrics.capturedTxMetrics != nil {
return metrics.capturedTxMetrics[stat]
func (backedMetrics *ccsmpBackedMetrics) getTXStat(stat ccsmp.SolClientStatsTX) uint64 {
backedMetrics.metricLock.RLock()
defer backedMetrics.metricLock.RUnlock()
if backedMetrics.capturedTxMetrics != nil {
return backedMetrics.capturedTxMetrics[stat]
}
return metrics.session.SolClientSessionGetTXStat(stat)
return backedMetrics.session.SolClientSessionGetTXStat(stat)
}

func (metrics *ccsmpBackedMetrics) getRXStat(stat ccsmp.SolClientStatsRX) uint64 {
metrics.metricLock.RLock()
defer metrics.metricLock.RUnlock()
if metrics.capturedRxMetrics != nil {
return metrics.capturedRxMetrics[stat]
func (backedMetrics *ccsmpBackedMetrics) getRXStat(stat ccsmp.SolClientStatsRX) uint64 {
backedMetrics.metricLock.RLock()
defer backedMetrics.metricLock.RUnlock()
if backedMetrics.capturedRxMetrics != nil {
return backedMetrics.capturedRxMetrics[stat]
}
return metrics.session.SolClientSessionGetRXStat(stat)
return backedMetrics.session.SolClientSessionGetRXStat(stat)
}

func (metrics *ccsmpBackedMetrics) getNextGenStat(metric NextGenMetric) uint64 {
return atomic.LoadUint64(&metrics.metrics[metric])
func (backedMetrics *ccsmpBackedMetrics) getNextGenStat(metric NextGenMetric) uint64 {
return atomic.LoadUint64(&backedMetrics.metrics[metric])
}

func (metrics *ccsmpBackedMetrics) GetStat(metric metrics.Metric) uint64 {
func (backedMetrics *ccsmpBackedMetrics) GetStat(metric metrics.Metric) uint64 {
if rxMetric, ok := rxMetrics[metric]; ok {
return metrics.getRXStat(rxMetric)
// check for duplicate counts due to auto-acks and remove from final result
rxStat := backedMetrics.getRXStat(rxMetric)
duplicateAck := atomic.LoadUint64(&backedMetrics.duplicateAcks)
// take the difference and retrun as the settled accepted metric
if duplicateAck > 0 && metric == metrics.PersistentMessagesAccepted {
return rxStat - duplicateAck
}
return rxStat // return other types of metrics as is
} else if txMetric, ok := txMetrics[metric]; ok {
return metrics.getTXStat(txMetric)
return backedMetrics.getTXStat(txMetric)
} else if clientMetric, ok := clientMetrics[metric]; ok {
return metrics.getNextGenStat(clientMetric)
return backedMetrics.getNextGenStat(clientMetric)
}
logging.Default.Warning("Could not find mapping for metric with ID " + fmt.Sprint(metric))
return 0

}

func (metrics *ccsmpBackedMetrics) ResetStats() {
func (backedMetrics *ccsmpBackedMetrics) ResetStats() {
for i := 0; i < metricCount; i++ {
atomic.StoreUint64(&metrics.metrics[i], 0)
atomic.StoreUint64(&backedMetrics.metrics[i], 0)
}
metrics.resetNativeStats()
atomic.StoreUint64(&backedMetrics.duplicateAcks, 0) // reset this duplicate acks counter to zero
backedMetrics.resetNativeStats()
}

func (metrics *ccsmpBackedMetrics) resetNativeStats() {
metrics.metricLock.Lock()
defer metrics.metricLock.Unlock()
if metrics.capturedRxMetrics != nil {
for key := range metrics.capturedRxMetrics {
metrics.capturedRxMetrics[key] = 0
func (backedMetrics *ccsmpBackedMetrics) resetNativeStats() {
backedMetrics.metricLock.Lock()
defer backedMetrics.metricLock.Unlock()
if backedMetrics.capturedRxMetrics != nil {
for key := range backedMetrics.capturedRxMetrics {
backedMetrics.capturedRxMetrics[key] = 0
}
for key := range metrics.capturedTxMetrics {
metrics.capturedTxMetrics[key] = 0
for key := range backedMetrics.capturedTxMetrics {
backedMetrics.capturedTxMetrics[key] = 0
}
} else {
errorInfo := metrics.session.SolClientSessionClearStats()
errorInfo := backedMetrics.session.SolClientSessionClearStats()
if errorInfo != nil {
logging.Default.Warning("Could not reset metrics: " + errorInfo.String())
}
}
}

func (metrics *ccsmpBackedMetrics) IncrementMetric(metric NextGenMetric, amount uint64) {
atomic.AddUint64(&metrics.metrics[metric], amount)
func (backedMetrics *ccsmpBackedMetrics) IncrementMetric(metric NextGenMetric, amount uint64) {
atomic.AddUint64(&backedMetrics.metrics[metric], amount)
}

func (backedMetrics *ccsmpBackedMetrics) incrementDuplicateAckCount() {
atomic.AddUint64(&backedMetrics.duplicateAcks, uint64(1))
}
27 changes: 26 additions & 1 deletion internal/impl/core/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,18 @@

package core

import "testing"
import (
"sync/atomic"
"testing"
)

func TestSolClientMetrics(t *testing.T) {
metrics := []NextGenMetric{
MetricPublishMessagesBackpressureDiscarded,
MetricPublishMessagesTerminationDiscarded,
MetricReceivedMessagesBackpressureDiscarded,
MetricReceivedMessagesTerminationDiscarded,
MetricInternalDiscardNotifications,
}
for _, metric := range metrics {
metricsImpl := newCcsmpMetrics(nil)
Expand All @@ -36,3 +40,24 @@ func TestSolClientMetrics(t *testing.T) {
}
}
}

func TestIncrementDuplicateAckCount(t *testing.T) {
metricsImpl := newCcsmpMetrics(nil)
duplicateAck := atomic.LoadUint64(&metricsImpl.duplicateAcks)
if duplicateAck != 0 {
t.Error("Expected zero state of duplicate Acks counter to be 0")
}
metricsImpl.incrementDuplicateAckCount()
duplicateAck = atomic.LoadUint64(&metricsImpl.duplicateAcks)
if duplicateAck != 1 {
t.Error("Expected duplicate Acks to be 1")
}
for i := 0; i < 10; i++ {
// call 10 times
metricsImpl.incrementDuplicateAckCount()
}
duplicateAck = atomic.LoadUint64(&metricsImpl.duplicateAcks)
if duplicateAck != 11 {
t.Error("Expected duplicate Acks to be 11")
}
}
21 changes: 20 additions & 1 deletion internal/impl/core/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,10 @@ type Receiver interface {
ProvisionEndpoint(queueName string, isExclusive bool) ErrorInfo
// EndpointUnsubscribe will call endpoint unsubscribe on the endpoint
EndpointUnsubscribe(queueName string, topic string) (SubscriptionCorrelationID, <-chan SubscriptionEvent, ErrorInfo)
// Increments receiver metrics
// IncrementMetric - Increments receiver metrics
IncrementMetric(metric NextGenMetric, amount uint64)
// IncrementDuplicateAckCount - Increments receiver duplicate acks (track duplicate accepted settlement outcome metrics from auto-acks)
IncrementDuplicateAckCount()
// Creates a new persistent receiver with the given callback
NewPersistentReceiver(properties []string, callback RxCallback, eventCallback PersistentEventCallback) (PersistentReceiver, ErrorInfo)
}
Expand All @@ -85,7 +87,13 @@ type PersistentReceiver interface {
// Unsubscribe will remove the subscription from the persistent receiver
Unsubscribe(topic string) (SubscriptionCorrelationID, <-chan SubscriptionEvent, ErrorInfo)
// Ack will acknowledge the given message
// This method is equivalent to calling the Settle method with the ACCEPTED outcome.
Ack(msgID MessageID) ErrorInfo
// Settle generates and sends a positive or negative acknowledgement for a message.InboundMessage as
// indicated by the MessageSettlementOutcome argument. To use the negative outcomes FAILED and REJECTED,
// the receiver has to have been preconfigured via its builder to support these settlement outcomes.
// Attempts to settle a message on an auto-acking receiver is ignored for ACCEPTED and raises error for FAILED and REJECTED.
Settle(msgID MessageID, msgSettlementOutcome MessageSettlementOutcome) ErrorInfo
// Destionation returns the destination if retrievable, or an error if one occurred
Destination() (destination string, durable bool, errorInfo ErrorInfo)
}
Expand All @@ -99,6 +107,9 @@ type ReplyPublishable = ccsmp.SolClientMessagePt
// MessageID type defined
type MessageID = ccsmp.SolClientMessageID

// MessageSettlementOutcome type defined
type MessageSettlementOutcome = ccsmp.SolClientMessageSettlementOutcome

// RxCallback type defined
type RxCallback func(msg Receivable) bool

Expand Down Expand Up @@ -280,6 +291,10 @@ func (receiver *ccsmpBackedReceiver) IncrementMetric(metric NextGenMetric, amoun
receiver.metrics.IncrementMetric(metric, amount)
}

func (receiver *ccsmpBackedReceiver) IncrementDuplicateAckCount() {
receiver.metrics.incrementDuplicateAckCount()
}

func (receiver *ccsmpBackedReceiver) ClearSubscriptionCorrelation(id SubscriptionCorrelationID) {
receiver.subscriptionCorrelationLock.Lock()
defer receiver.subscriptionCorrelationLock.Unlock()
Expand Down Expand Up @@ -408,6 +423,10 @@ func (receiver *ccsmpBackedPersistentReceiver) Ack(msgID MessageID) ErrorInfo {
return receiver.flow.SolClientFlowAck(msgID)
}

func (receiver *ccsmpBackedPersistentReceiver) Settle(msgID MessageID, msgSettlementOutcome MessageSettlementOutcome) ErrorInfo {
return receiver.flow.SolClientFlowSettleMessage(msgID, msgSettlementOutcome)
}

func (receiver *ccsmpBackedPersistentReceiver) Destination() (destination string, durable bool, errorInfo ErrorInfo) {
return receiver.flow.SolClientFlowGetDestination()
}
Expand Down
34 changes: 25 additions & 9 deletions internal/impl/receiver/message_receiver_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,15 +283,16 @@ type result struct {
}

type mockInternalReceiver struct {
events func() core.Events
replier func() core.Replier
isRunning func() bool
registerRxCallback func(callback core.RxCallback) uintptr
unregisterRxCallback func(ptr uintptr)
subscribe func(topic string, ptr uintptr) (core.SubscriptionCorrelationID, <-chan core.SubscriptionEvent, core.ErrorInfo)
unsubscribe func(topic string, ptr uintptr) (core.SubscriptionCorrelationID, <-chan core.SubscriptionEvent, core.ErrorInfo)
incrementMetric func(metric core.NextGenMetric, amount uint64)
newPersistentReceiver func(props []string, callback core.RxCallback, eventCallback core.PersistentEventCallback) (core.PersistentReceiver, *ccsmp.SolClientErrorInfoWrapper)
events func() core.Events
replier func() core.Replier
isRunning func() bool
registerRxCallback func(callback core.RxCallback) uintptr
unregisterRxCallback func(ptr uintptr)
subscribe func(topic string, ptr uintptr) (core.SubscriptionCorrelationID, <-chan core.SubscriptionEvent, core.ErrorInfo)
unsubscribe func(topic string, ptr uintptr) (core.SubscriptionCorrelationID, <-chan core.SubscriptionEvent, core.ErrorInfo)
incrementMetric func(metric core.NextGenMetric, amount uint64)
incrementDuplicateAckCount func()
newPersistentReceiver func(props []string, callback core.RxCallback, eventCallback core.PersistentEventCallback) (core.PersistentReceiver, *ccsmp.SolClientErrorInfoWrapper)
}

func (mock *mockInternalReceiver) Events() core.Events {
Expand Down Expand Up @@ -364,6 +365,12 @@ func (mock *mockInternalReceiver) IncrementMetric(metric core.NextGenMetric, amo
}
}

func (mock *mockInternalReceiver) IncrementDuplicateAckCount() {
if mock.incrementDuplicateAckCount != nil {
mock.incrementDuplicateAckCount()
}
}

func (mock *mockInternalReceiver) NewPersistentReceiver(props []string, callback core.RxCallback, eventCallback core.PersistentEventCallback) (core.PersistentReceiver, *ccsmp.SolClientErrorInfoWrapper) {
if mock.newPersistentReceiver != nil {
return mock.newPersistentReceiver(props, callback, eventCallback)
Expand All @@ -378,6 +385,7 @@ type mockPersistentReceiver struct {
subscribe func(topic string) (core.SubscriptionCorrelationID, <-chan core.SubscriptionEvent, core.ErrorInfo)
unsubscribe func(topic string) (core.SubscriptionCorrelationID, <-chan core.SubscriptionEvent, core.ErrorInfo)
ack func(msg core.MessageID) *ccsmp.SolClientErrorInfoWrapper
settle func(msg core.MessageID, outcome core.MessageSettlementOutcome) *ccsmp.SolClientErrorInfoWrapper
}

// Destroy destroys the flow
Expand Down Expand Up @@ -429,6 +437,14 @@ func (mock *mockPersistentReceiver) Ack(msgID core.MessageID) *ccsmp.SolClientEr
return nil
}

// Settle will settle the given message with the provided outcome
func (mock *mockPersistentReceiver) Settle(msgID core.MessageID, outcome core.MessageSettlementOutcome) *ccsmp.SolClientErrorInfoWrapper {
if mock.settle != nil {
return mock.settle(msgID, outcome)
}
return nil
}

func (mock *mockPersistentReceiver) Destination() (destination string, durable bool, errorInfo *ccsmp.SolClientErrorInfoWrapper) {
return
}
Expand Down
Loading

0 comments on commit ebe047d

Please sign in to comment.