Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SOL-59692: Provide Ability to NACK a Specific AD Message and Force Re-delivery #66

Merged
merged 21 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
90c3175
EBP-52: Implement the receiver builder configuration for message sett…
oodigie Oct 2, 2024
37f8578
EBP-54: Implement the Settle() method
oodigie Oct 4, 2024
648b60f
EBP-54: fixed doc string format on MessageSettlementOutcome
oodigie Oct 7, 2024
fe4303c
EBP-58: added new metrics and response code for Nack
oodigie Oct 8, 2024
578d614
EBP-58: tracked auto-acks and substract it from settledAccepted
oodigie Oct 9, 2024
4eb83a4
EBP-60: write unit and integration tests
oodigie Oct 10, 2024
94fdbfa
Adding some dummy tests to see through a small change.
Oct 15, 2024
b9b192b
A few more simple tests.
Oct 15, 2024
2e5f086
EBP-79: Go NACK integration tests
Oct 22, 2024
5410a87
EBP-79: Go NACK integration tests
Oct 24, 2024
9ca4d04
Left a test focused by accident, sorry.
Oct 24, 2024
5b211de
merged changes from dev into branch and resolved merge conflicts
oodigie Oct 25, 2024
af60446
EBP-61: Write unit tests for the NACK feature
oodigie Oct 30, 2024
633ed96
EBP-79: a few more tests.
Oct 30, 2024
49e8fad
fixed the failing unit test
oodigie Oct 30, 2024
71864ce
EBP-79: Finishing touches for integration tests
Nov 4, 2024
605ce3f
EBP-79: returning error on Build() if garbage was mixed in to setter.
Nov 5, 2024
8ee4129
Merge pull request #70 from SolaceDev/SOL-59692
gszol Nov 5, 2024
c6894e6
SOL-59692: updated some integration test cases
oodigie Nov 5, 2024
c5a7de9
Merge branch 'SOL-59692' into EBP-79_NACK_tests
gszol Nov 5, 2024
9af2289
Merge pull request #69 from SolaceDev/EBP-79_NACK_tests
oodigie Nov 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions internal/ccsmp/ccsmp_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,18 @@ type SolClientFlowEventInfoPt = C.solClient_flow_eventCallbackInfo_pt
// SolClientFlowRxMsgDispatchFuncInfo is assigned a value
type SolClientFlowRxMsgDispatchFuncInfo = C.solClient_flow_rxMsgDispatchFuncInfo_t

// SolClientMessageSettlementOutcome is assigned a value
type SolClientMessageSettlementOutcome = C.solClient_msgOutcome_t

// SolClientSettlementOutcomeAccepted - the message was successfully processed.
const SolClientSettlementOutcomeAccepted = C.SOLCLIENT_OUTCOME_ACCEPTED

// SolClientSettlementOutcomeFailed - message processing failed temporarily, attempt redelivery if configured.
const SolClientSettlementOutcomeFailed = C.SOLCLIENT_OUTCOME_FAILED

// SolClientSettlementOutcomeRejected - message was processed and rejected, removed from the queue.
const SolClientSettlementOutcomeRejected = C.SOLCLIENT_OUTCOME_REJECTED

// Callbacks

// SolClientFlowMessageCallback is assigned a function
Expand Down Expand Up @@ -174,6 +186,13 @@ func (flow *SolClientFlow) SolClientFlowAck(msgID SolClientMessageID) *SolClient
})
}

// SolClientFlowSettleMessage function
func (flow *SolClientFlow) SolClientFlowSettleMessage(msgID SolClientMessageID, settlementOutcome SolClientMessageSettlementOutcome) *SolClientErrorInfoWrapper {
return handleCcsmpError(func() SolClientReturnCode {
return C.solClient_flow_settleMsg(flow.pointer, msgID, settlementOutcome)
})
}

// SolClientFlowGetDestination function
func (flow *SolClientFlow) SolClientFlowGetDestination() (name string, durable bool, err *SolClientErrorInfoWrapper) {
var dest *SolClientDestination = &SolClientDestination{}
Expand Down
11 changes: 7 additions & 4 deletions internal/impl/constants/error_strings.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,14 @@ const UnableToStartReceiver = "cannot start the receiver as it has already been
// UnableToStartReceiverParentServiceNotStarted error string
const UnableToStartReceiverParentServiceNotStarted = "cannot start receiver unless parent MessagingService is connected"

// UnableToAcknowledgeAlreadyTerminated error string
const UnableToAcknowledgeAlreadyTerminated = "unable to acknowledge message: message receiver has been terminated"
// UnableToSettleAlreadyTerminated error string
const UnableToSettleAlreadyTerminated = "unable to settle message: message receiver has been terminated"

// UnableToAcknowledgeNotStarted error string
const UnableToAcknowledgeNotStarted = "unable to acknowledge meessage: message receiver is not yet started"
// UnableToSettleNotStarted error string
const UnableToSettleNotStarted = "unable to settle meessage: message receiver is not yet started"

// InvalidMessageSettlementOutcome error string
const InvalidMessageSettlementOutcome = "invalid message settlement outcome used to settle message"

// UnableToModifySubscriptionBadState error string
const UnableToModifySubscriptionBadState = "unable to modify subscriptions in state %s"
Expand Down
115 changes: 66 additions & 49 deletions internal/impl/core/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ var rxMetrics = map[metrics.Metric]ccsmp.SolClientStatsRX{
metrics.TotalBytesReceived: ccsmp.SolClientStatsRXTotalDataBytes,
metrics.TotalMessagesReceived: ccsmp.SolClientStatsRXTotalDataMsgs,
metrics.CompressedBytesReceived: ccsmp.SolClientStatsRXCompressedBytes,
metrics.PersistentMessagesAccepted: ccsmp.SolClientStatsRXSettleAccepted,
metrics.PersistentMessagesFailed: ccsmp.SolClientStatsRXSettleFailed,
metrics.PersistentMessagesRejected: ccsmp.SolClientStatsRXSettleRejected,
}

var txMetrics = map[metrics.Metric]ccsmp.SolClientStatsTX{
Expand Down Expand Up @@ -105,8 +108,9 @@ type Metrics interface {

// Implementation
type ccsmpBackedMetrics struct {
session *ccsmp.SolClientSession
metrics []uint64
session *ccsmp.SolClientSession
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which indentation is our coding standard, before or after?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Go has its own standard for formatting for running go fmt on the project.
Not sure indentation is included in that for all cases.
If go fmt is happy will this the indentation standard for solace by default is 4 spaces. Unless there is overriding factor like go fmt.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@gszol, I just run the usual go fmt on the code before/while committing the code changes.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If everyone's happy, I'm happy :-)

metrics []uint64
duplicateAcks uint64

metricLock sync.RWMutex
capturedTxMetrics map[ccsmp.SolClientStatsTX]uint64
Expand All @@ -115,98 +119,111 @@ type ccsmpBackedMetrics struct {

func newCcsmpMetrics(session *ccsmp.SolClientSession) *ccsmpBackedMetrics {
return &ccsmpBackedMetrics{
metrics: make([]uint64, metricCount),
session: session,
metrics: make([]uint64, metricCount),
duplicateAcks: uint64(0), // used to track duplicate acks count (auto-acks)
session: session,
}
}

func (metrics *ccsmpBackedMetrics) terminate() {
metrics.metricLock.Lock()
defer metrics.metricLock.Unlock()
metrics.captureTXStats()
metrics.captureRXStats()
func (backedMetrics *ccsmpBackedMetrics) terminate() {
backedMetrics.metricLock.Lock()
defer backedMetrics.metricLock.Unlock()
backedMetrics.captureTXStats()
backedMetrics.captureRXStats()
}

func (metrics *ccsmpBackedMetrics) captureTXStats() {
if metrics.capturedTxMetrics != nil {
func (backedMetrics *ccsmpBackedMetrics) captureTXStats() {
if backedMetrics.capturedTxMetrics != nil {
return
}
metrics.capturedTxMetrics = make(map[ccsmp.SolClientStatsTX]uint64)
backedMetrics.capturedTxMetrics = make(map[ccsmp.SolClientStatsTX]uint64)
for _, txStat := range txMetrics {
metrics.capturedTxMetrics[txStat] = metrics.session.SolClientSessionGetTXStat(txStat)
backedMetrics.capturedTxMetrics[txStat] = backedMetrics.session.SolClientSessionGetTXStat(txStat)
}
}

func (metrics *ccsmpBackedMetrics) captureRXStats() {
if metrics.capturedRxMetrics != nil {
func (backedMetrics *ccsmpBackedMetrics) captureRXStats() {
if backedMetrics.capturedRxMetrics != nil {
return
}
metrics.capturedRxMetrics = make(map[ccsmp.SolClientStatsRX]uint64)
backedMetrics.capturedRxMetrics = make(map[ccsmp.SolClientStatsRX]uint64)
for _, rxStat := range rxMetrics {
metrics.capturedRxMetrics[rxStat] = metrics.session.SolClientSessionGetRXStat(rxStat)
backedMetrics.capturedRxMetrics[rxStat] = backedMetrics.session.SolClientSessionGetRXStat(rxStat)
}
}

func (metrics *ccsmpBackedMetrics) getTXStat(stat ccsmp.SolClientStatsTX) uint64 {
metrics.metricLock.RLock()
defer metrics.metricLock.RUnlock()
if metrics.capturedTxMetrics != nil {
return metrics.capturedTxMetrics[stat]
func (backedMetrics *ccsmpBackedMetrics) getTXStat(stat ccsmp.SolClientStatsTX) uint64 {
backedMetrics.metricLock.RLock()
defer backedMetrics.metricLock.RUnlock()
if backedMetrics.capturedTxMetrics != nil {
return backedMetrics.capturedTxMetrics[stat]
}
return metrics.session.SolClientSessionGetTXStat(stat)
return backedMetrics.session.SolClientSessionGetTXStat(stat)
}

func (metrics *ccsmpBackedMetrics) getRXStat(stat ccsmp.SolClientStatsRX) uint64 {
metrics.metricLock.RLock()
defer metrics.metricLock.RUnlock()
if metrics.capturedRxMetrics != nil {
return metrics.capturedRxMetrics[stat]
func (backedMetrics *ccsmpBackedMetrics) getRXStat(stat ccsmp.SolClientStatsRX) uint64 {
backedMetrics.metricLock.RLock()
defer backedMetrics.metricLock.RUnlock()
if backedMetrics.capturedRxMetrics != nil {
return backedMetrics.capturedRxMetrics[stat]
}
return metrics.session.SolClientSessionGetRXStat(stat)
return backedMetrics.session.SolClientSessionGetRXStat(stat)
}

func (metrics *ccsmpBackedMetrics) getNextGenStat(metric NextGenMetric) uint64 {
return atomic.LoadUint64(&metrics.metrics[metric])
func (backedMetrics *ccsmpBackedMetrics) getNextGenStat(metric NextGenMetric) uint64 {
return atomic.LoadUint64(&backedMetrics.metrics[metric])
}

func (metrics *ccsmpBackedMetrics) GetStat(metric metrics.Metric) uint64 {
func (backedMetrics *ccsmpBackedMetrics) GetStat(metric metrics.Metric) uint64 {
if rxMetric, ok := rxMetrics[metric]; ok {
return metrics.getRXStat(rxMetric)
// check for duplicate counts due to auto-acks and remove from final result
rxStat := backedMetrics.getRXStat(rxMetric)
duplicateAck := atomic.LoadUint64(&backedMetrics.duplicateAcks)
// take the difference and retrun as the settled accepted metric
if duplicateAck > 0 && metric == metrics.PersistentMessagesAccepted {
return rxStat - duplicateAck
}
return rxStat // return other types of metrics as is
} else if txMetric, ok := txMetrics[metric]; ok {
return metrics.getTXStat(txMetric)
return backedMetrics.getTXStat(txMetric)
} else if clientMetric, ok := clientMetrics[metric]; ok {
return metrics.getNextGenStat(clientMetric)
return backedMetrics.getNextGenStat(clientMetric)
}
logging.Default.Warning("Could not find mapping for metric with ID " + fmt.Sprint(metric))
return 0

}

func (metrics *ccsmpBackedMetrics) ResetStats() {
func (backedMetrics *ccsmpBackedMetrics) ResetStats() {
for i := 0; i < metricCount; i++ {
atomic.StoreUint64(&metrics.metrics[i], 0)
atomic.StoreUint64(&backedMetrics.metrics[i], 0)
}
metrics.resetNativeStats()
atomic.StoreUint64(&backedMetrics.duplicateAcks, 0) // reset this duplicate acks counter to zero
backedMetrics.resetNativeStats()
}

func (metrics *ccsmpBackedMetrics) resetNativeStats() {
metrics.metricLock.Lock()
defer metrics.metricLock.Unlock()
if metrics.capturedRxMetrics != nil {
for key := range metrics.capturedRxMetrics {
metrics.capturedRxMetrics[key] = 0
func (backedMetrics *ccsmpBackedMetrics) resetNativeStats() {
backedMetrics.metricLock.Lock()
defer backedMetrics.metricLock.Unlock()
if backedMetrics.capturedRxMetrics != nil {
for key := range backedMetrics.capturedRxMetrics {
backedMetrics.capturedRxMetrics[key] = 0
}
for key := range metrics.capturedTxMetrics {
metrics.capturedTxMetrics[key] = 0
for key := range backedMetrics.capturedTxMetrics {
backedMetrics.capturedTxMetrics[key] = 0
}
} else {
errorInfo := metrics.session.SolClientSessionClearStats()
errorInfo := backedMetrics.session.SolClientSessionClearStats()
if errorInfo != nil {
logging.Default.Warning("Could not reset metrics: " + errorInfo.String())
}
}
}

func (metrics *ccsmpBackedMetrics) IncrementMetric(metric NextGenMetric, amount uint64) {
atomic.AddUint64(&metrics.metrics[metric], amount)
func (backedMetrics *ccsmpBackedMetrics) IncrementMetric(metric NextGenMetric, amount uint64) {
atomic.AddUint64(&backedMetrics.metrics[metric], amount)
}

func (backedMetrics *ccsmpBackedMetrics) incrementDuplicateAckCount() {
atomic.AddUint64(&backedMetrics.duplicateAcks, uint64(1))
TrentDaniel marked this conversation as resolved.
Show resolved Hide resolved
}
27 changes: 26 additions & 1 deletion internal/impl/core/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,18 @@

package core

import "testing"
import (
"sync/atomic"
"testing"
)

func TestSolClientMetrics(t *testing.T) {
metrics := []NextGenMetric{
MetricPublishMessagesBackpressureDiscarded,
MetricPublishMessagesTerminationDiscarded,
MetricReceivedMessagesBackpressureDiscarded,
MetricReceivedMessagesTerminationDiscarded,
MetricInternalDiscardNotifications,
}
for _, metric := range metrics {
metricsImpl := newCcsmpMetrics(nil)
Expand All @@ -36,3 +40,24 @@ func TestSolClientMetrics(t *testing.T) {
}
}
}

func TestIncrementDuplicateAckCount(t *testing.T) {
metricsImpl := newCcsmpMetrics(nil)
duplicateAck := atomic.LoadUint64(&metricsImpl.duplicateAcks)
if duplicateAck != 0 {
t.Error("Expected zero state of duplicate Acks counter to be 0")
}
metricsImpl.incrementDuplicateAckCount()
duplicateAck = atomic.LoadUint64(&metricsImpl.duplicateAcks)
if duplicateAck != 1 {
t.Error("Expected duplicate Acks to be 1")
}
for i := 0; i < 10; i++ {
// call 10 times
metricsImpl.incrementDuplicateAckCount()
}
duplicateAck = atomic.LoadUint64(&metricsImpl.duplicateAcks)
if duplicateAck != 11 {
t.Error("Expected duplicate Acks to be 11")
}
}
21 changes: 20 additions & 1 deletion internal/impl/core/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,10 @@ type Receiver interface {
ProvisionEndpoint(queueName string, isExclusive bool) ErrorInfo
// EndpointUnsubscribe will call endpoint unsubscribe on the endpoint
EndpointUnsubscribe(queueName string, topic string) (SubscriptionCorrelationID, <-chan SubscriptionEvent, ErrorInfo)
// Increments receiver metrics
// IncrementMetric - Increments receiver metrics
IncrementMetric(metric NextGenMetric, amount uint64)
// IncrementDuplicateAckCount - Increments receiver duplicate acks (track duplicate accepted settlement outcome metrics from auto-acks)
IncrementDuplicateAckCount()
// Creates a new persistent receiver with the given callback
NewPersistentReceiver(properties []string, callback RxCallback, eventCallback PersistentEventCallback) (PersistentReceiver, ErrorInfo)
}
Expand All @@ -85,7 +87,13 @@ type PersistentReceiver interface {
// Unsubscribe will remove the subscription from the persistent receiver
Unsubscribe(topic string) (SubscriptionCorrelationID, <-chan SubscriptionEvent, ErrorInfo)
// Ack will acknowledge the given message
// This method is equivalent to calling the Settle method with the ACCEPTED outcome.
Ack(msgID MessageID) ErrorInfo
// Settle generates and sends a positive or negative acknowledgement for a message.InboundMessage as
// indicated by the MessageSettlementOutcome argument. To use the negative outcomes FAILED and REJECTED,
// the receiver has to have been preconfigured via its builder to support these settlement outcomes.
// Attempts to settle a message on an auto-acking receiver is ignored for ACCEPTED and raises error for FAILED and REJECTED.
Settle(msgID MessageID, msgSettlementOutcome MessageSettlementOutcome) ErrorInfo
// Destionation returns the destination if retrievable, or an error if one occurred
Destination() (destination string, durable bool, errorInfo ErrorInfo)
}
Expand All @@ -99,6 +107,9 @@ type ReplyPublishable = ccsmp.SolClientMessagePt
// MessageID type defined
type MessageID = ccsmp.SolClientMessageID

// MessageSettlementOutcome type defined
type MessageSettlementOutcome = ccsmp.SolClientMessageSettlementOutcome

// RxCallback type defined
type RxCallback func(msg Receivable) bool

Expand Down Expand Up @@ -280,6 +291,10 @@ func (receiver *ccsmpBackedReceiver) IncrementMetric(metric NextGenMetric, amoun
receiver.metrics.IncrementMetric(metric, amount)
}

func (receiver *ccsmpBackedReceiver) IncrementDuplicateAckCount() {
receiver.metrics.incrementDuplicateAckCount()
}

func (receiver *ccsmpBackedReceiver) ClearSubscriptionCorrelation(id SubscriptionCorrelationID) {
receiver.subscriptionCorrelationLock.Lock()
defer receiver.subscriptionCorrelationLock.Unlock()
Expand Down Expand Up @@ -408,6 +423,10 @@ func (receiver *ccsmpBackedPersistentReceiver) Ack(msgID MessageID) ErrorInfo {
return receiver.flow.SolClientFlowAck(msgID)
}

func (receiver *ccsmpBackedPersistentReceiver) Settle(msgID MessageID, msgSettlementOutcome MessageSettlementOutcome) ErrorInfo {
return receiver.flow.SolClientFlowSettleMessage(msgID, msgSettlementOutcome)
}

func (receiver *ccsmpBackedPersistentReceiver) Destination() (destination string, durable bool, errorInfo ErrorInfo) {
return receiver.flow.SolClientFlowGetDestination()
}
Expand Down
34 changes: 25 additions & 9 deletions internal/impl/receiver/message_receiver_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,15 +283,16 @@ type result struct {
}

type mockInternalReceiver struct {
events func() core.Events
replier func() core.Replier
isRunning func() bool
registerRxCallback func(callback core.RxCallback) uintptr
unregisterRxCallback func(ptr uintptr)
subscribe func(topic string, ptr uintptr) (core.SubscriptionCorrelationID, <-chan core.SubscriptionEvent, core.ErrorInfo)
unsubscribe func(topic string, ptr uintptr) (core.SubscriptionCorrelationID, <-chan core.SubscriptionEvent, core.ErrorInfo)
incrementMetric func(metric core.NextGenMetric, amount uint64)
newPersistentReceiver func(props []string, callback core.RxCallback, eventCallback core.PersistentEventCallback) (core.PersistentReceiver, *ccsmp.SolClientErrorInfoWrapper)
events func() core.Events
replier func() core.Replier
isRunning func() bool
registerRxCallback func(callback core.RxCallback) uintptr
unregisterRxCallback func(ptr uintptr)
subscribe func(topic string, ptr uintptr) (core.SubscriptionCorrelationID, <-chan core.SubscriptionEvent, core.ErrorInfo)
unsubscribe func(topic string, ptr uintptr) (core.SubscriptionCorrelationID, <-chan core.SubscriptionEvent, core.ErrorInfo)
incrementMetric func(metric core.NextGenMetric, amount uint64)
incrementDuplicateAckCount func()
newPersistentReceiver func(props []string, callback core.RxCallback, eventCallback core.PersistentEventCallback) (core.PersistentReceiver, *ccsmp.SolClientErrorInfoWrapper)
}

func (mock *mockInternalReceiver) Events() core.Events {
Expand Down Expand Up @@ -364,6 +365,12 @@ func (mock *mockInternalReceiver) IncrementMetric(metric core.NextGenMetric, amo
}
}

func (mock *mockInternalReceiver) IncrementDuplicateAckCount() {
if mock.incrementDuplicateAckCount != nil {
mock.incrementDuplicateAckCount()
}
}

func (mock *mockInternalReceiver) NewPersistentReceiver(props []string, callback core.RxCallback, eventCallback core.PersistentEventCallback) (core.PersistentReceiver, *ccsmp.SolClientErrorInfoWrapper) {
if mock.newPersistentReceiver != nil {
return mock.newPersistentReceiver(props, callback, eventCallback)
Expand All @@ -378,6 +385,7 @@ type mockPersistentReceiver struct {
subscribe func(topic string) (core.SubscriptionCorrelationID, <-chan core.SubscriptionEvent, core.ErrorInfo)
unsubscribe func(topic string) (core.SubscriptionCorrelationID, <-chan core.SubscriptionEvent, core.ErrorInfo)
ack func(msg core.MessageID) *ccsmp.SolClientErrorInfoWrapper
settle func(msg core.MessageID, outcome core.MessageSettlementOutcome) *ccsmp.SolClientErrorInfoWrapper
}

// Destroy destroys the flow
Expand Down Expand Up @@ -429,6 +437,14 @@ func (mock *mockPersistentReceiver) Ack(msgID core.MessageID) *ccsmp.SolClientEr
return nil
}

// Settle will settle the given message with the provided outcome
func (mock *mockPersistentReceiver) Settle(msgID core.MessageID, outcome core.MessageSettlementOutcome) *ccsmp.SolClientErrorInfoWrapper {
if mock.settle != nil {
return mock.settle(msgID, outcome)
}
return nil
}

func (mock *mockPersistentReceiver) Destination() (destination string, durable bool, errorInfo *ccsmp.SolClientErrorInfoWrapper) {
return
}
Expand Down
Loading
Loading