diff --git a/internal/ccsmp/ccsmp_flow.go b/internal/ccsmp/ccsmp_flow.go index c482e33..efa271c 100644 --- a/internal/ccsmp/ccsmp_flow.go +++ b/internal/ccsmp/ccsmp_flow.go @@ -45,6 +45,18 @@ type SolClientFlowEventInfoPt = C.solClient_flow_eventCallbackInfo_pt // SolClientFlowRxMsgDispatchFuncInfo is assigned a value type SolClientFlowRxMsgDispatchFuncInfo = C.solClient_flow_rxMsgDispatchFuncInfo_t +// SolClientMessageSettlementOutcome is assigned a value +type SolClientMessageSettlementOutcome = C.solClient_msgOutcome_t + +// SolClientSettlementOutcomeAccepted - the message was successfully processed. +const SolClientSettlementOutcomeAccepted = C.SOLCLIENT_OUTCOME_ACCEPTED + +// SolClientSettlementOutcomeFailed - message processing failed temporarily, attempt redelivery if configured. +const SolClientSettlementOutcomeFailed = C.SOLCLIENT_OUTCOME_FAILED + +// SolClientSettlementOutcomeRejected - message was processed and rejected, removed from the queue. +const SolClientSettlementOutcomeRejected = C.SOLCLIENT_OUTCOME_REJECTED + // Callbacks // SolClientFlowMessageCallback is assigned a function @@ -174,6 +186,13 @@ func (flow *SolClientFlow) SolClientFlowAck(msgID SolClientMessageID) *SolClient }) } +// SolClientFlowSettleMessage function +func (flow *SolClientFlow) SolClientFlowSettleMessage(msgID SolClientMessageID, settlementOutcome SolClientMessageSettlementOutcome) *SolClientErrorInfoWrapper { + return handleCcsmpError(func() SolClientReturnCode { + return C.solClient_flow_settleMsg(flow.pointer, msgID, settlementOutcome) + }) +} + // SolClientFlowGetDestination function func (flow *SolClientFlow) SolClientFlowGetDestination() (name string, durable bool, err *SolClientErrorInfoWrapper) { var dest *SolClientDestination = &SolClientDestination{} diff --git a/internal/impl/constants/error_strings.go b/internal/impl/constants/error_strings.go index fabf6d7..938d8f8 100644 --- a/internal/impl/constants/error_strings.go +++ b/internal/impl/constants/error_strings.go @@ -64,11 +64,14 @@ const UnableToStartReceiver = "cannot start the receiver as it has already been // UnableToStartReceiverParentServiceNotStarted error string const UnableToStartReceiverParentServiceNotStarted = "cannot start receiver unless parent MessagingService is connected" -// UnableToAcknowledgeAlreadyTerminated error string -const UnableToAcknowledgeAlreadyTerminated = "unable to acknowledge message: message receiver has been terminated" +// UnableToSettleAlreadyTerminated error string +const UnableToSettleAlreadyTerminated = "unable to settle message: message receiver has been terminated" -// UnableToAcknowledgeNotStarted error string -const UnableToAcknowledgeNotStarted = "unable to acknowledge meessage: message receiver is not yet started" +// UnableToSettleNotStarted error string +const UnableToSettleNotStarted = "unable to settle meessage: message receiver is not yet started" + +// InvalidMessageSettlementOutcome error string +const InvalidMessageSettlementOutcome = "invalid message settlement outcome used to settle message" // UnableToModifySubscriptionBadState error string const UnableToModifySubscriptionBadState = "unable to modify subscriptions in state %s" diff --git a/internal/impl/core/metrics.go b/internal/impl/core/metrics.go index c2dfa29..a327200 100644 --- a/internal/impl/core/metrics.go +++ b/internal/impl/core/metrics.go @@ -43,6 +43,9 @@ var rxMetrics = map[metrics.Metric]ccsmp.SolClientStatsRX{ metrics.TotalBytesReceived: ccsmp.SolClientStatsRXTotalDataBytes, metrics.TotalMessagesReceived: ccsmp.SolClientStatsRXTotalDataMsgs, metrics.CompressedBytesReceived: ccsmp.SolClientStatsRXCompressedBytes, + metrics.PersistentMessagesAccepted: ccsmp.SolClientStatsRXSettleAccepted, + metrics.PersistentMessagesFailed: ccsmp.SolClientStatsRXSettleFailed, + metrics.PersistentMessagesRejected: ccsmp.SolClientStatsRXSettleRejected, } var txMetrics = map[metrics.Metric]ccsmp.SolClientStatsTX{ @@ -105,8 +108,9 @@ type Metrics interface { // Implementation type ccsmpBackedMetrics struct { - session *ccsmp.SolClientSession - metrics []uint64 + session *ccsmp.SolClientSession + metrics []uint64 + duplicateAcks uint64 metricLock sync.RWMutex capturedTxMetrics map[ccsmp.SolClientStatsTX]uint64 @@ -115,98 +119,111 @@ type ccsmpBackedMetrics struct { func newCcsmpMetrics(session *ccsmp.SolClientSession) *ccsmpBackedMetrics { return &ccsmpBackedMetrics{ - metrics: make([]uint64, metricCount), - session: session, + metrics: make([]uint64, metricCount), + duplicateAcks: uint64(0), // used to track duplicate acks count (auto-acks) + session: session, } } -func (metrics *ccsmpBackedMetrics) terminate() { - metrics.metricLock.Lock() - defer metrics.metricLock.Unlock() - metrics.captureTXStats() - metrics.captureRXStats() +func (backedMetrics *ccsmpBackedMetrics) terminate() { + backedMetrics.metricLock.Lock() + defer backedMetrics.metricLock.Unlock() + backedMetrics.captureTXStats() + backedMetrics.captureRXStats() } -func (metrics *ccsmpBackedMetrics) captureTXStats() { - if metrics.capturedTxMetrics != nil { +func (backedMetrics *ccsmpBackedMetrics) captureTXStats() { + if backedMetrics.capturedTxMetrics != nil { return } - metrics.capturedTxMetrics = make(map[ccsmp.SolClientStatsTX]uint64) + backedMetrics.capturedTxMetrics = make(map[ccsmp.SolClientStatsTX]uint64) for _, txStat := range txMetrics { - metrics.capturedTxMetrics[txStat] = metrics.session.SolClientSessionGetTXStat(txStat) + backedMetrics.capturedTxMetrics[txStat] = backedMetrics.session.SolClientSessionGetTXStat(txStat) } } -func (metrics *ccsmpBackedMetrics) captureRXStats() { - if metrics.capturedRxMetrics != nil { +func (backedMetrics *ccsmpBackedMetrics) captureRXStats() { + if backedMetrics.capturedRxMetrics != nil { return } - metrics.capturedRxMetrics = make(map[ccsmp.SolClientStatsRX]uint64) + backedMetrics.capturedRxMetrics = make(map[ccsmp.SolClientStatsRX]uint64) for _, rxStat := range rxMetrics { - metrics.capturedRxMetrics[rxStat] = metrics.session.SolClientSessionGetRXStat(rxStat) + backedMetrics.capturedRxMetrics[rxStat] = backedMetrics.session.SolClientSessionGetRXStat(rxStat) } } -func (metrics *ccsmpBackedMetrics) getTXStat(stat ccsmp.SolClientStatsTX) uint64 { - metrics.metricLock.RLock() - defer metrics.metricLock.RUnlock() - if metrics.capturedTxMetrics != nil { - return metrics.capturedTxMetrics[stat] +func (backedMetrics *ccsmpBackedMetrics) getTXStat(stat ccsmp.SolClientStatsTX) uint64 { + backedMetrics.metricLock.RLock() + defer backedMetrics.metricLock.RUnlock() + if backedMetrics.capturedTxMetrics != nil { + return backedMetrics.capturedTxMetrics[stat] } - return metrics.session.SolClientSessionGetTXStat(stat) + return backedMetrics.session.SolClientSessionGetTXStat(stat) } -func (metrics *ccsmpBackedMetrics) getRXStat(stat ccsmp.SolClientStatsRX) uint64 { - metrics.metricLock.RLock() - defer metrics.metricLock.RUnlock() - if metrics.capturedRxMetrics != nil { - return metrics.capturedRxMetrics[stat] +func (backedMetrics *ccsmpBackedMetrics) getRXStat(stat ccsmp.SolClientStatsRX) uint64 { + backedMetrics.metricLock.RLock() + defer backedMetrics.metricLock.RUnlock() + if backedMetrics.capturedRxMetrics != nil { + return backedMetrics.capturedRxMetrics[stat] } - return metrics.session.SolClientSessionGetRXStat(stat) + return backedMetrics.session.SolClientSessionGetRXStat(stat) } -func (metrics *ccsmpBackedMetrics) getNextGenStat(metric NextGenMetric) uint64 { - return atomic.LoadUint64(&metrics.metrics[metric]) +func (backedMetrics *ccsmpBackedMetrics) getNextGenStat(metric NextGenMetric) uint64 { + return atomic.LoadUint64(&backedMetrics.metrics[metric]) } -func (metrics *ccsmpBackedMetrics) GetStat(metric metrics.Metric) uint64 { +func (backedMetrics *ccsmpBackedMetrics) GetStat(metric metrics.Metric) uint64 { if rxMetric, ok := rxMetrics[metric]; ok { - return metrics.getRXStat(rxMetric) + // check for duplicate counts due to auto-acks and remove from final result + rxStat := backedMetrics.getRXStat(rxMetric) + duplicateAck := atomic.LoadUint64(&backedMetrics.duplicateAcks) + // take the difference and retrun as the settled accepted metric + if duplicateAck > 0 && metric == metrics.PersistentMessagesAccepted { + return rxStat - duplicateAck + } + return rxStat // return other types of metrics as is } else if txMetric, ok := txMetrics[metric]; ok { - return metrics.getTXStat(txMetric) + return backedMetrics.getTXStat(txMetric) } else if clientMetric, ok := clientMetrics[metric]; ok { - return metrics.getNextGenStat(clientMetric) + return backedMetrics.getNextGenStat(clientMetric) } logging.Default.Warning("Could not find mapping for metric with ID " + fmt.Sprint(metric)) return 0 } -func (metrics *ccsmpBackedMetrics) ResetStats() { +func (backedMetrics *ccsmpBackedMetrics) ResetStats() { for i := 0; i < metricCount; i++ { - atomic.StoreUint64(&metrics.metrics[i], 0) + atomic.StoreUint64(&backedMetrics.metrics[i], 0) } - metrics.resetNativeStats() + atomic.StoreUint64(&backedMetrics.duplicateAcks, 0) // reset this duplicate acks counter to zero + backedMetrics.resetNativeStats() } -func (metrics *ccsmpBackedMetrics) resetNativeStats() { - metrics.metricLock.Lock() - defer metrics.metricLock.Unlock() - if metrics.capturedRxMetrics != nil { - for key := range metrics.capturedRxMetrics { - metrics.capturedRxMetrics[key] = 0 +func (backedMetrics *ccsmpBackedMetrics) resetNativeStats() { + backedMetrics.metricLock.Lock() + defer backedMetrics.metricLock.Unlock() + if backedMetrics.capturedRxMetrics != nil { + for key := range backedMetrics.capturedRxMetrics { + backedMetrics.capturedRxMetrics[key] = 0 } - for key := range metrics.capturedTxMetrics { - metrics.capturedTxMetrics[key] = 0 + for key := range backedMetrics.capturedTxMetrics { + backedMetrics.capturedTxMetrics[key] = 0 } } else { - errorInfo := metrics.session.SolClientSessionClearStats() + errorInfo := backedMetrics.session.SolClientSessionClearStats() if errorInfo != nil { logging.Default.Warning("Could not reset metrics: " + errorInfo.String()) } } } -func (metrics *ccsmpBackedMetrics) IncrementMetric(metric NextGenMetric, amount uint64) { - atomic.AddUint64(&metrics.metrics[metric], amount) +func (backedMetrics *ccsmpBackedMetrics) IncrementMetric(metric NextGenMetric, amount uint64) { + atomic.AddUint64(&backedMetrics.metrics[metric], amount) +} + +func (backedMetrics *ccsmpBackedMetrics) incrementDuplicateAckCount() { + atomic.AddUint64(&backedMetrics.duplicateAcks, uint64(1)) } diff --git a/internal/impl/core/metrics_test.go b/internal/impl/core/metrics_test.go index 6a7afd9..561ec24 100644 --- a/internal/impl/core/metrics_test.go +++ b/internal/impl/core/metrics_test.go @@ -16,7 +16,10 @@ package core -import "testing" +import ( + "sync/atomic" + "testing" +) func TestSolClientMetrics(t *testing.T) { metrics := []NextGenMetric{ @@ -24,6 +27,7 @@ func TestSolClientMetrics(t *testing.T) { MetricPublishMessagesTerminationDiscarded, MetricReceivedMessagesBackpressureDiscarded, MetricReceivedMessagesTerminationDiscarded, + MetricInternalDiscardNotifications, } for _, metric := range metrics { metricsImpl := newCcsmpMetrics(nil) @@ -36,3 +40,24 @@ func TestSolClientMetrics(t *testing.T) { } } } + +func TestIncrementDuplicateAckCount(t *testing.T) { + metricsImpl := newCcsmpMetrics(nil) + duplicateAck := atomic.LoadUint64(&metricsImpl.duplicateAcks) + if duplicateAck != 0 { + t.Error("Expected zero state of duplicate Acks counter to be 0") + } + metricsImpl.incrementDuplicateAckCount() + duplicateAck = atomic.LoadUint64(&metricsImpl.duplicateAcks) + if duplicateAck != 1 { + t.Error("Expected duplicate Acks to be 1") + } + for i := 0; i < 10; i++ { + // call 10 times + metricsImpl.incrementDuplicateAckCount() + } + duplicateAck = atomic.LoadUint64(&metricsImpl.duplicateAcks) + if duplicateAck != 11 { + t.Error("Expected duplicate Acks to be 11") + } +} diff --git a/internal/impl/core/receiver.go b/internal/impl/core/receiver.go index 6adccb8..c37c2d2 100644 --- a/internal/impl/core/receiver.go +++ b/internal/impl/core/receiver.go @@ -65,8 +65,10 @@ type Receiver interface { ProvisionEndpoint(queueName string, isExclusive bool) ErrorInfo // EndpointUnsubscribe will call endpoint unsubscribe on the endpoint EndpointUnsubscribe(queueName string, topic string) (SubscriptionCorrelationID, <-chan SubscriptionEvent, ErrorInfo) - // Increments receiver metrics + // IncrementMetric - Increments receiver metrics IncrementMetric(metric NextGenMetric, amount uint64) + // IncrementDuplicateAckCount - Increments receiver duplicate acks (track duplicate accepted settlement outcome metrics from auto-acks) + IncrementDuplicateAckCount() // Creates a new persistent receiver with the given callback NewPersistentReceiver(properties []string, callback RxCallback, eventCallback PersistentEventCallback) (PersistentReceiver, ErrorInfo) } @@ -85,7 +87,13 @@ type PersistentReceiver interface { // Unsubscribe will remove the subscription from the persistent receiver Unsubscribe(topic string) (SubscriptionCorrelationID, <-chan SubscriptionEvent, ErrorInfo) // Ack will acknowledge the given message + // This method is equivalent to calling the Settle method with the ACCEPTED outcome. Ack(msgID MessageID) ErrorInfo + // Settle generates and sends a positive or negative acknowledgement for a message.InboundMessage as + // indicated by the MessageSettlementOutcome argument. To use the negative outcomes FAILED and REJECTED, + // the receiver has to have been preconfigured via its builder to support these settlement outcomes. + // Attempts to settle a message on an auto-acking receiver is ignored for ACCEPTED and raises error for FAILED and REJECTED. + Settle(msgID MessageID, msgSettlementOutcome MessageSettlementOutcome) ErrorInfo // Destionation returns the destination if retrievable, or an error if one occurred Destination() (destination string, durable bool, errorInfo ErrorInfo) } @@ -99,6 +107,9 @@ type ReplyPublishable = ccsmp.SolClientMessagePt // MessageID type defined type MessageID = ccsmp.SolClientMessageID +// MessageSettlementOutcome type defined +type MessageSettlementOutcome = ccsmp.SolClientMessageSettlementOutcome + // RxCallback type defined type RxCallback func(msg Receivable) bool @@ -280,6 +291,10 @@ func (receiver *ccsmpBackedReceiver) IncrementMetric(metric NextGenMetric, amoun receiver.metrics.IncrementMetric(metric, amount) } +func (receiver *ccsmpBackedReceiver) IncrementDuplicateAckCount() { + receiver.metrics.incrementDuplicateAckCount() +} + func (receiver *ccsmpBackedReceiver) ClearSubscriptionCorrelation(id SubscriptionCorrelationID) { receiver.subscriptionCorrelationLock.Lock() defer receiver.subscriptionCorrelationLock.Unlock() @@ -408,6 +423,10 @@ func (receiver *ccsmpBackedPersistentReceiver) Ack(msgID MessageID) ErrorInfo { return receiver.flow.SolClientFlowAck(msgID) } +func (receiver *ccsmpBackedPersistentReceiver) Settle(msgID MessageID, msgSettlementOutcome MessageSettlementOutcome) ErrorInfo { + return receiver.flow.SolClientFlowSettleMessage(msgID, msgSettlementOutcome) +} + func (receiver *ccsmpBackedPersistentReceiver) Destination() (destination string, durable bool, errorInfo ErrorInfo) { return receiver.flow.SolClientFlowGetDestination() } diff --git a/internal/impl/receiver/message_receiver_impl_test.go b/internal/impl/receiver/message_receiver_impl_test.go index 939ab9d..723d37b 100644 --- a/internal/impl/receiver/message_receiver_impl_test.go +++ b/internal/impl/receiver/message_receiver_impl_test.go @@ -283,15 +283,16 @@ type result struct { } type mockInternalReceiver struct { - events func() core.Events - replier func() core.Replier - isRunning func() bool - registerRxCallback func(callback core.RxCallback) uintptr - unregisterRxCallback func(ptr uintptr) - subscribe func(topic string, ptr uintptr) (core.SubscriptionCorrelationID, <-chan core.SubscriptionEvent, core.ErrorInfo) - unsubscribe func(topic string, ptr uintptr) (core.SubscriptionCorrelationID, <-chan core.SubscriptionEvent, core.ErrorInfo) - incrementMetric func(metric core.NextGenMetric, amount uint64) - newPersistentReceiver func(props []string, callback core.RxCallback, eventCallback core.PersistentEventCallback) (core.PersistentReceiver, *ccsmp.SolClientErrorInfoWrapper) + events func() core.Events + replier func() core.Replier + isRunning func() bool + registerRxCallback func(callback core.RxCallback) uintptr + unregisterRxCallback func(ptr uintptr) + subscribe func(topic string, ptr uintptr) (core.SubscriptionCorrelationID, <-chan core.SubscriptionEvent, core.ErrorInfo) + unsubscribe func(topic string, ptr uintptr) (core.SubscriptionCorrelationID, <-chan core.SubscriptionEvent, core.ErrorInfo) + incrementMetric func(metric core.NextGenMetric, amount uint64) + incrementDuplicateAckCount func() + newPersistentReceiver func(props []string, callback core.RxCallback, eventCallback core.PersistentEventCallback) (core.PersistentReceiver, *ccsmp.SolClientErrorInfoWrapper) } func (mock *mockInternalReceiver) Events() core.Events { @@ -364,6 +365,12 @@ func (mock *mockInternalReceiver) IncrementMetric(metric core.NextGenMetric, amo } } +func (mock *mockInternalReceiver) IncrementDuplicateAckCount() { + if mock.incrementDuplicateAckCount != nil { + mock.incrementDuplicateAckCount() + } +} + func (mock *mockInternalReceiver) NewPersistentReceiver(props []string, callback core.RxCallback, eventCallback core.PersistentEventCallback) (core.PersistentReceiver, *ccsmp.SolClientErrorInfoWrapper) { if mock.newPersistentReceiver != nil { return mock.newPersistentReceiver(props, callback, eventCallback) @@ -378,6 +385,7 @@ type mockPersistentReceiver struct { subscribe func(topic string) (core.SubscriptionCorrelationID, <-chan core.SubscriptionEvent, core.ErrorInfo) unsubscribe func(topic string) (core.SubscriptionCorrelationID, <-chan core.SubscriptionEvent, core.ErrorInfo) ack func(msg core.MessageID) *ccsmp.SolClientErrorInfoWrapper + settle func(msg core.MessageID, outcome core.MessageSettlementOutcome) *ccsmp.SolClientErrorInfoWrapper } // Destroy destroys the flow @@ -429,6 +437,14 @@ func (mock *mockPersistentReceiver) Ack(msgID core.MessageID) *ccsmp.SolClientEr return nil } +// Settle will settle the given message with the provided outcome +func (mock *mockPersistentReceiver) Settle(msgID core.MessageID, outcome core.MessageSettlementOutcome) *ccsmp.SolClientErrorInfoWrapper { + if mock.settle != nil { + return mock.settle(msgID, outcome) + } + return nil +} + func (mock *mockPersistentReceiver) Destination() (destination string, durable bool, errorInfo *ccsmp.SolClientErrorInfoWrapper) { return } diff --git a/internal/impl/receiver/persistent_message_receiver_impl.go b/internal/impl/receiver/persistent_message_receiver_impl.go index ac39ff4..4505a43 100644 --- a/internal/impl/receiver/persistent_message_receiver_impl.go +++ b/internal/impl/receiver/persistent_message_receiver_impl.go @@ -20,6 +20,7 @@ package receiver import ( "fmt" "runtime/debug" + "strings" "sync" "sync/atomic" "time" @@ -808,14 +809,27 @@ func (receiver *persistentMessageReceiverImpl) ReceiverInfo() (solace.Persistent } // Ack will acknowledge a message +// This method is equivalent to calling the settle method with +// the ACCEPTED outcome like this: PersistentMessageReceiver.Settle(inboundMessage, config.PersistentReceiverAcceptedOutcome) func (receiver *persistentMessageReceiverImpl) Ack(msg apimessage.InboundMessage) error { + // call the Settle() method with the accepted message settlement outcome + return receiver.Settle(msg, config.PersistentReceiverAcceptedOutcome) +} + +// Settle generates and sends a positive or negative acknowledgement for a message.InboundMessage as +// indicated by the MessageSettlementOutcome argument. To use the negative outcomes FAILED and REJECTED, +// the receiver has to have been preconfigured via its builder to support these settlement outcomes. +// Attempts to settle a message on an auto-acking receiver is ignored for ACCEPTED +// (albeit it counts as a manual ACCEPTED in the stats), raises error for FAILED and REJECTED. +// this returns an error object with the reason for the error if it was not possible to settle the message. +func (receiver *persistentMessageReceiverImpl) Settle(msg apimessage.InboundMessage, outcome config.MessageSettlementOutcome) error { state := receiver.getState() if state != messageReceiverStateStarted && state != messageReceiverStateTerminating { var message string if state == messageReceiverStateTerminated { - message = constants.UnableToAcknowledgeAlreadyTerminated + message = constants.UnableToSettleAlreadyTerminated } else { - message = constants.UnableToAcknowledgeNotStarted + message = constants.UnableToSettleNotStarted } return solace.NewError(&solace.IllegalStateError{}, message, nil) } @@ -827,7 +841,23 @@ func (receiver *persistentMessageReceiverImpl) Ack(msg apimessage.InboundMessage if !present { return solace.NewError(&solace.IllegalArgumentError{}, constants.UnableToRetrieveMessageID, nil) } - errInfo := receiver.internalFlow.Ack(msgID) + + // to hold the settlement outcome + var msgSettlementOutcome ccsmp.SolClientMessageSettlementOutcome + + switch outcome { + case config.PersistentReceiverAcceptedOutcome: + msgSettlementOutcome = ccsmp.SolClientSettlementOutcomeAccepted + case config.PersistentReceiverFailedOutcome: + msgSettlementOutcome = ccsmp.SolClientSettlementOutcomeFailed + case config.PersistentReceiverRejectedOutcome: + msgSettlementOutcome = ccsmp.SolClientSettlementOutcomeRejected + default: + // return error here + return solace.NewError(&solace.IllegalArgumentError{}, constants.InvalidMessageSettlementOutcome, nil) + } + + errInfo := receiver.internalFlow.Settle(msgID, msgSettlementOutcome) if errInfo != nil { return core.ToNativeError(errInfo) } @@ -961,6 +991,8 @@ func (receiver *persistentMessageReceiverImpl) ReceiveMessage(timeout time.Durat msg.Dispose() return nil, core.ToNativeError(errInfo) } + // Successful Auto-ack, increment the auto-ack duplicate counter + receiver.internalReceiver.IncrementDuplicateAckCount() } else { receiver.logger.Error(fmt.Sprintf("Could not retrieve message ID from message %s", msg)) } @@ -1129,6 +1161,9 @@ func (receiver *persistentMessageReceiverImpl) run() { errInfo := receiver.internalFlow.Ack(msgID) if errInfo != nil { receiver.logger.Warning("Failed to acknowledge message: " + errInfo.GetMessageAsString() + ", sub code: " + fmt.Sprint(errInfo.SubCode())) + } else { + // Successful Auto-Ack, increment the auto-ack duplicate counter + receiver.internalReceiver.IncrementDuplicateAckCount() } } } @@ -1314,6 +1349,44 @@ func (builder *persistentMessageReceiverBuilderImpl) Build(queue *resource.Queue } } + // message settlement outcome property + if settlementOutcomesInterface, ok := builder.properties[config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport]; ok { + if settlementOutcomesStr, ok := settlementOutcomesInterface.(string); ok { + addedOutcomes := make(map[string]bool) // to track duplicates + settlementOutcomes := strings.Split(settlementOutcomesStr, ",") + // iterate through to validate the message settlement outcome values + for _, settlementOutcome := range settlementOutcomes { + _, present, err := validation.StringPropertyValidation( + string(config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport), + settlementOutcome, + string(config.PersistentReceiverAcceptedOutcome), + string(config.PersistentReceiverFailedOutcome), + string(config.PersistentReceiverRejectedOutcome)) + + if !present || err != nil { + return nil, solace.NewError(&solace.IllegalArgumentError{}, + fmt.Sprintf("invalid value for receiver message settlement outcome, expected either 'ACCEPTED', 'FAILED' or 'REJECTED', got %T", settlementOutcome), nil) + } + + // ignore duplicate message settlement outcomes from the ccsmp properties array + if _, added := addedOutcomes[settlementOutcome]; !added { + // add the corresponding ccsmp property to the properties array + switch config.MessageSettlementOutcome(settlementOutcome) { + case config.PersistentReceiverFailedOutcome: + properties = append(properties, ccsmp.SolClientFlowPropRequiredOutcomeFailed, ccsmp.SolClientPropEnableVal) + case config.PersistentReceiverRejectedOutcome: + properties = append(properties, ccsmp.SolClientFlowPropRequiredOutcomeRejected, ccsmp.SolClientPropEnableVal) + case config.PersistentReceiverAcceptedOutcome: + default: + logging.Default.Info(builder.String() + ": Receiver message settlement outcome of 'ACCEPTED' is supported by default") + } + // mark it as added + addedOutcomes[settlementOutcome] = true + } + } + } + } + // Create the receiver with the given properties receiver := &persistentMessageReceiverImpl{} receiver.construct( @@ -1413,6 +1486,32 @@ func (builder *persistentMessageReceiverBuilderImpl) WithMessageReplay(strategy return builder } +// WithRequiredMessageOutcomeSupport configures the types of settlements the receiver can use. +// Any combination of PersistentReceiverAcceptedOutcome, PersistentReceiverFailedOutcome, and +// PersistentReceiverRejectedOutcome; the order is irrelevant. +// Attempting to Settle() a message later with an Outcome not listed here may result in an error. +func (builder *persistentMessageReceiverBuilderImpl) WithRequiredMessageOutcomeSupport(messageSettlementOutcomes ...config.MessageSettlementOutcome) solace.PersistentMessageReceiverBuilder { + settlementOutcomesStrArray := []string{} + addedOutcomes := make(map[config.MessageSettlementOutcome]bool) + for _, settlementOutcome := range messageSettlementOutcomes { + if !isSupportedMessageSettlementOutcome(settlementOutcome) { + logging.Default.Warning( + builder.String() + ": Unknown message settlement outcome(s) passed to WithRequiredMessageOutcomeSupport, " + string(settlementOutcome) + + "allowed values are: 'config.PersistentReceiverAcceptedOutcome', 'config.PersistentReceiverFailedOutcome' and 'config.PersistentReceiverRejectedOutcome'") + // We'll still add it so we can return an error on Build(). The setter can't return the error even if it knows about it. + } + if _, added := addedOutcomes[settlementOutcome]; !added { + addedOutcomes[settlementOutcome] = true // mark it as added + settlementOutcomesStrArray = append(settlementOutcomesStrArray, string(settlementOutcome)) + } else { + logging.Default.Warning( + builder.String() + ": Repeat message settlement outcome(s) passed to WithRequiredMessageOutcomeSupport: " + string(settlementOutcome)) + } + } + builder.properties[config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport] = strings.Join(settlementOutcomesStrArray, ",") + return builder +} + func (builder *persistentMessageReceiverBuilderImpl) String() string { return fmt.Sprintf("solace.PersistentMessageReceiverBuilder at %p", builder) } @@ -1426,6 +1525,12 @@ func checkPersistentMessageReceiverSubscriptionType(subscription resource.Subscr return solace.NewError(&solace.IllegalArgumentError{}, fmt.Sprintf(constants.PersistentReceiverUnsupportedSubscriptionType, subscription), nil) } +func isSupportedMessageSettlementOutcome(messageSettlementOutcome config.MessageSettlementOutcome) bool { + return (messageSettlementOutcome == config.PersistentReceiverAcceptedOutcome || + messageSettlementOutcome == config.PersistentReceiverFailedOutcome || + messageSettlementOutcome == config.PersistentReceiverRejectedOutcome) +} + type persistentReceiverInfoImpl struct { resourceInfo *resourceInfoImpl } diff --git a/pkg/solace/config/message_receiver_properties.go b/pkg/solace/config/message_receiver_properties.go index 0f0ae49..ecaa3f3 100644 --- a/pkg/solace/config/message_receiver_properties.go +++ b/pkg/solace/config/message_receiver_properties.go @@ -86,6 +86,9 @@ const ( // ReceiverPropertyPersistentMessageAckStrategy specifies the acknowledgement strategy for the message receiver. ReceiverPropertyPersistentMessageAckStrategy ReceiverProperty = "solace.messaging.receiver.persistent.ack.strategy" + // ReceiverPropertyPersistentMessageRequiredOutcomeSupport for configuring the settlement outcomes for the message receiver. + ReceiverPropertyPersistentMessageRequiredOutcomeSupport ReceiverProperty = "solace.messaging.receiver.persistent.ack.required-message-outcome-support" + // ReceiverPropertyPersistentMessageReplayStrategy enables message replay and to specify a replay strategy. ReceiverPropertyPersistentMessageReplayStrategy ReceiverProperty = "solace.messaging.receiver.persistent.replay.strategy" diff --git a/pkg/solace/config/messaging_receiver_constants.go b/pkg/solace/config/messaging_receiver_constants.go new file mode 100644 index 0000000..25c4e2b --- /dev/null +++ b/pkg/solace/config/messaging_receiver_constants.go @@ -0,0 +1,34 @@ +// pubsubplus-go-client +// +// Copyright 2021-2024 Solace Corporation. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +// MessageSettlementOutcome - represents the type for supported message settlement outcome on a PersistentMessageReceiver. +type MessageSettlementOutcome string + +// The various message settlement outcomes available for use when configuring a PersistentMessageReceiver. +const ( + // Settles the message with a positive acknowledgement, removing it from the queue. + // Same as calling Ack() on the message. + PersistentReceiverAcceptedOutcome MessageSettlementOutcome = "ACCEPTED" + + // Settles the message with a negative acknowledgement without removing it from the queue. + // This may or may not make the message eligible for redelivery or eventually the DMQ, depending on the queue configuration. + PersistentReceiverFailedOutcome MessageSettlementOutcome = "FAILED" + + // Settles the message with a negative acknowledgement, removing it from the queue. + PersistentReceiverRejectedOutcome MessageSettlementOutcome = "REJECTED" +) diff --git a/pkg/solace/metrics/metrics.go b/pkg/solace/metrics/metrics.go index 1b72b4e..71ae434 100644 --- a/pkg/solace/metrics/metrics.go +++ b/pkg/solace/metrics/metrics.go @@ -110,6 +110,15 @@ const ( // persistent message receivers on the MessagingService. PersistentOutOfOrderMessagesDiscarded + // Number of messages settled with "ACCEPTED" outcome. + PersistentMessagesAccepted + + // Number of messages settled with "FAILED" outcome. + PersistentMessagesFailed + + // Number of messages settled with "REJECTED" outcome. + PersistentMessagesRejected + // PublishMessagesDiscarded is the number of messages discarded due to // channel failure. PublishMessagesDiscarded diff --git a/pkg/solace/persistent_message_receiver.go b/pkg/solace/persistent_message_receiver.go index 2ad5e25..a1369be 100644 --- a/pkg/solace/persistent_message_receiver.go +++ b/pkg/solace/persistent_message_receiver.go @@ -29,8 +29,18 @@ type PersistentMessageReceiver interface { MessageReceiver // Include all functionality of MessageReceiver. // Ack acknowledges that a message was received. + // This method is equivalent to calling the settle method with + // the ACCEPTED outcome like this: PersistentMessageReceiver.Settle(inboundMessage, config.PersistentReceiverAcceptedOutcome) Ack(message message.InboundMessage) error + // Settle generates and sends a positive or negative acknowledgement for a message.InboundMessage as + // indicated by the MessageSettlementOutcome argument. To use the negative outcomes FAILED and REJECTED, + // the receiver has to have been preconfigured via its builder to support these settlement outcomes. + // Attempts to settle a message on an auto-acking receiver is ignored for ACCEPTED + // (albeit it counts as a manual ACCEPTED in the stats), raises error for FAILED and REJECTED. + // this returns an error object with the reason for the error if it was not possible to settle the message. + Settle(message message.InboundMessage, outcome config.MessageSettlementOutcome) error + // StartAsyncCallback starts the PersistentMessageReceiver asynchronously. // Calls the callback when started with an error if one occurred, otherwise nil // if successful. @@ -113,6 +123,12 @@ type PersistentMessageReceiverBuilder interface { // to when starting the receiver. Accepts *resource.TopicSubscription subscriptions. WithSubscriptions(topics ...resource.Subscription) PersistentMessageReceiverBuilder + // WithRequiredMessageOutcomeSupport configures the types of settlements the receiver can use. + // Any combination of PersistentReceiverAcceptedOutcome, PersistentReceiverFailedOutcome, and + // PersistentReceiverRejectedOutcome; the order is irrelevant. + // Attempting to Settle() a message later with an Outcome not listed here may result in an error. + WithRequiredMessageOutcomeSupport(messageSettlementOutcomes ...config.MessageSettlementOutcome) PersistentMessageReceiverBuilder + // FromConfigurationProvider configures the persistent receiver with the specified properties. // The built-in ReceiverPropertiesConfigurationProvider implementations include: // ReceiverPropertyMap, a map of ReceiverProperty keys to values diff --git a/test/persistent_receiver_test.go b/test/persistent_receiver_test.go index c3b51c9..ab8136c 100644 --- a/test/persistent_receiver_test.go +++ b/test/persistent_receiver_test.go @@ -301,6 +301,19 @@ var _ = Describe("PersistentReceiver", func() { err := receiver.Ack(nil) helpers.ValidateError(err, &solace.IllegalStateError{}) }) + DescribeTable("should fail to settle a nil message as ", + func(outcome config.MessageSettlementOutcome) { + err := receiver.Settle(nil, outcome) + helpers.ValidateError(err, &solace.IllegalStateError{}) + }, + Entry("accepted", config.PersistentReceiverAcceptedOutcome), + Entry("rejected", config.PersistentReceiverRejectedOutcome), + Entry("failed", config.PersistentReceiverFailedOutcome), + ) + It("should fail to settle a nil message with garbage", func() { + err := receiver.Settle(nil, "garbage") + helpers.ValidateError(err, &solace.IllegalStateError{}) + }) }) Context("with a terminated receiver", func() { @@ -330,10 +343,23 @@ var _ = Describe("PersistentReceiver", func() { err := receiver.Pause() helpers.ValidateError(err, &solace.IllegalStateError{}) }) - It("should fail to acknowledge a message", func() { + It("should fail to acknowledge a nil message", func() { err := receiver.Ack(nil) helpers.ValidateError(err, &solace.IllegalStateError{}) }) + DescribeTable("should fail to settle a nil message as ", + func(outcome config.MessageSettlementOutcome) { + err := receiver.Settle(nil, outcome) + helpers.ValidateError(err, &solace.IllegalStateError{}) + }, + Entry("accepted", config.PersistentReceiverAcceptedOutcome), + Entry("rejected", config.PersistentReceiverRejectedOutcome), + Entry("failed", config.PersistentReceiverFailedOutcome), + ) + It("should fail to settle a nil message with garbage", func() { + err := receiver.Settle(nil, "garbage") + helpers.ValidateError(err, &solace.IllegalStateError{}) + }) }) It("should fail to start with an invalid queue name", func() { @@ -426,8 +452,11 @@ var _ = Describe("PersistentReceiver", func() { actualPayload, ok := msg.GetPayloadAsString() Expect(ok).To(BeTrue()) Expect(actualPayload).To(Equal(payload)) + + helpers.ValidateMetric(messagingService, metrics.PersistentMessagesAccepted, 0) err := receiver.Ack(msg) Expect(err).ToNot(HaveOccurred()) + helpers.ValidateMetric(messagingService, metrics.PersistentMessagesAccepted, 1) }) } @@ -805,6 +834,29 @@ var _ = Describe("PersistentReceiver", func() { err := receiver.Ack(directMsg) helpers.ValidateError(err, &solace.IllegalArgumentError{}) }) + DescribeTable("fails to settle a direct message as ", func(outcome config.MessageSettlementOutcome) { + const topic = "direct-message-ack" + directMsgChan := helpers.ReceiveOneMessage(messagingService, topic) + helpers.PublishOneMessage(messagingService, topic) + var directMsg message.InboundMessage + Eventually(directMsgChan).Should(Receive(&directMsg)) + err := receiver.Settle(directMsg, outcome) // should fail to settle message + helpers.ValidateError(err, &solace.IllegalArgumentError{}) + }, + Entry("accepted", config.PersistentReceiverAcceptedOutcome), + Entry("rejected", config.PersistentReceiverRejectedOutcome), + Entry("failed", config.PersistentReceiverFailedOutcome), + ) + It("fails to settle a direct message as garbage", func() { + const topic = "direct-message-ack" + directMsgChan := helpers.ReceiveOneMessage(messagingService, topic) + helpers.PublishOneMessage(messagingService, topic) + var directMsg message.InboundMessage + Eventually(directMsgChan).Should(Receive(&directMsg)) + // I don't know why this compiles, but the Entry version doesn't. + err := receiver.Settle(directMsg, "garbage") + helpers.ValidateError(err, &solace.IllegalArgumentError{}) + }) }) const numQueuedMessages = 5000 @@ -1204,6 +1256,415 @@ var _ = Describe("PersistentReceiver", func() { Expect(receiver.Terminate(10 * time.Second)).ToNot(HaveOccurred()) }) + for _, autoAck := range []bool{true, false} { + autoAckText := "" + // ??? I don't speak go, so I'm just rolling with it. + shouldAutoAck := autoAck + if autoAck { + autoAckText = " withAutoAck" + } + DescribeTable("Happy cases for outcome configuration on ReceiverBuilder"+autoAckText, + func(configFunc func(solace.PersistentMessageReceiverBuilder), outcome config.MessageSettlementOutcome, autoAckConfig bool) { + receiverBuilder := messagingService.CreatePersistentMessageReceiverBuilder() + configFunc(receiverBuilder) + if autoAckConfig { + receiverBuilder.WithMessageAutoAcknowledgement() + } + receiver, err := receiverBuilder.Build(resource.QueueDurableExclusive(queueName)) + Expect(err).ToNot(HaveOccurred()) + Expect(receiver.Start()).ToNot(HaveOccurred()) + Expect(helpers.GetQueueMessages(queueName)).To(HaveLen(0)) + messagingService.Metrics().Reset() + Expect(err).ToNot(HaveOccurred()) + + helpers.PublishOnePersistentMessage(messagingService, topicString) + msg, err := receiver.ReceiveMessage(10 * time.Second) + Expect(err).ToNot(HaveOccurred()) + + err = receiver.Settle(msg, outcome) + Expect(err).ToNot(HaveOccurred()) + /* if (autoAckConfig) { + helpers.ValidateMetric(messagingService, metrics.PersistentMessagesAccepted, 0) + helpers.ValidateMetric(messagingService, metrics.PersistentMessagesRejected, 0) + helpers.ValidateMetric(messagingService, metrics.PersistentMessagesFailed, 0) + } else */if outcome == config.PersistentReceiverAcceptedOutcome { + helpers.ValidateMetric(messagingService, metrics.PersistentMessagesAccepted, 1) + helpers.ValidateMetric(messagingService, metrics.PersistentMessagesRejected, 0) + helpers.ValidateMetric(messagingService, metrics.PersistentMessagesFailed, 0) + } else if outcome == config.PersistentReceiverRejectedOutcome { + helpers.ValidateMetric(messagingService, metrics.PersistentMessagesAccepted, 0) + helpers.ValidateMetric(messagingService, metrics.PersistentMessagesRejected, 1) + helpers.ValidateMetric(messagingService, metrics.PersistentMessagesFailed, 0) + } else if outcome == config.PersistentReceiverFailedOutcome { + helpers.ValidateMetric(messagingService, metrics.PersistentMessagesAccepted, 0) + helpers.ValidateMetric(messagingService, metrics.PersistentMessagesRejected, 0) + helpers.ValidateMetric(messagingService, metrics.PersistentMessagesFailed, 1) + } + // I guess this drains the queue? + if !autoAckConfig && outcome == config.PersistentReceiverFailedOutcome { + Consistently( + func() []monitor.MsgVpnQueueMsg { + return helpers.GetQueueMessages(queueName) + }, + 2*time.Second).Should(HaveLen(1)) + } + }, + Entry( + "Default can Accept", func(builder solace.PersistentMessageReceiverBuilder) {}, + config.PersistentReceiverAcceptedOutcome, shouldAutoAck), + Entry( + "Accept via setter", func(builder solace.PersistentMessageReceiverBuilder) { + builder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverAcceptedOutcome) + }, + config.PersistentReceiverAcceptedOutcome, shouldAutoAck), + Entry( + "Accept & Fail via setter can accept", func(builder solace.PersistentMessageReceiverBuilder) { + builder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverFailedOutcome, config.PersistentReceiverAcceptedOutcome) + }, + config.PersistentReceiverAcceptedOutcome, shouldAutoAck), + Entry( + "Accept & Fail via setter can fail", func(builder solace.PersistentMessageReceiverBuilder) { + builder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverFailedOutcome, config.PersistentReceiverAcceptedOutcome) + }, + config.PersistentReceiverFailedOutcome, shouldAutoAck), + Entry( + "Fail via setter can accept", func(builder solace.PersistentMessageReceiverBuilder) { + builder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverFailedOutcome) + }, + config.PersistentReceiverAcceptedOutcome, shouldAutoAck), + Entry( + "Fail via setter can fail", func(builder solace.PersistentMessageReceiverBuilder) { + builder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverFailedOutcome) + }, + config.PersistentReceiverFailedOutcome, shouldAutoAck), + Entry( + "Accept via property", func(builder solace.PersistentMessageReceiverBuilder) { + builder.FromConfigurationProvider(config.ReceiverPropertyMap{ + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s", config.PersistentReceiverAcceptedOutcome), + }) + }, + config.PersistentReceiverAcceptedOutcome, shouldAutoAck), + Entry( + "Fail via property can accept", func(builder solace.PersistentMessageReceiverBuilder) { + builder.FromConfigurationProvider(config.ReceiverPropertyMap{ + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s", config.PersistentReceiverFailedOutcome), + }) + }, + config.PersistentReceiverAcceptedOutcome, shouldAutoAck), + Entry( + "Fail via property can fail", func(builder solace.PersistentMessageReceiverBuilder) { + builder.FromConfigurationProvider(config.ReceiverPropertyMap{ + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s", config.PersistentReceiverFailedOutcome), + }) + }, + config.PersistentReceiverFailedOutcome, shouldAutoAck), + Entry( + "Accept & Fail via property can accept", func(builder solace.PersistentMessageReceiverBuilder) { + builder.FromConfigurationProvider(config.ReceiverPropertyMap{ + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s,%s", config.PersistentReceiverFailedOutcome, config.PersistentReceiverAcceptedOutcome), + }) + }, + config.PersistentReceiverAcceptedOutcome, shouldAutoAck), + Entry( + "Accept & Fail via property can fail", func(builder solace.PersistentMessageReceiverBuilder) { + builder.FromConfigurationProvider(config.ReceiverPropertyMap{ + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s,%s", config.PersistentReceiverFailedOutcome, config.PersistentReceiverAcceptedOutcome), + }) + }, + config.PersistentReceiverFailedOutcome, shouldAutoAck), + + Entry( + "Accept & Reject via setter can accept", func(builder solace.PersistentMessageReceiverBuilder) { + builder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverRejectedOutcome, config.PersistentReceiverAcceptedOutcome) + }, + config.PersistentReceiverAcceptedOutcome, shouldAutoAck), + Entry( + "Accept & Reject via setter can reject", func(builder solace.PersistentMessageReceiverBuilder) { + builder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverRejectedOutcome, config.PersistentReceiverAcceptedOutcome) + }, + config.PersistentReceiverRejectedOutcome, shouldAutoAck), + Entry( + "Reject via setter can accept", func(builder solace.PersistentMessageReceiverBuilder) { + builder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverRejectedOutcome) + }, + config.PersistentReceiverAcceptedOutcome, shouldAutoAck), + Entry( + "Reject via setter can reject", func(builder solace.PersistentMessageReceiverBuilder) { + builder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverRejectedOutcome) + }, + config.PersistentReceiverRejectedOutcome, shouldAutoAck), + Entry( + "Reject via property can accept", func(builder solace.PersistentMessageReceiverBuilder) { + builder.FromConfigurationProvider(config.ReceiverPropertyMap{ + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s", config.PersistentReceiverRejectedOutcome), + }) + }, + config.PersistentReceiverAcceptedOutcome, shouldAutoAck), + Entry( + "Reject via property can reject", func(builder solace.PersistentMessageReceiverBuilder) { + builder.FromConfigurationProvider(config.ReceiverPropertyMap{ + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s", config.PersistentReceiverRejectedOutcome), + }) + }, + config.PersistentReceiverRejectedOutcome, shouldAutoAck), + Entry( + "Accept & Reject via property can accept", func(builder solace.PersistentMessageReceiverBuilder) { + builder.FromConfigurationProvider(config.ReceiverPropertyMap{ + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s,%s", config.PersistentReceiverRejectedOutcome, config.PersistentReceiverAcceptedOutcome), + }) + }, + config.PersistentReceiverAcceptedOutcome, shouldAutoAck), + Entry( + "Accept & Reject via property can reject", func(builder solace.PersistentMessageReceiverBuilder) { + builder.FromConfigurationProvider(config.ReceiverPropertyMap{ + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s,%s", config.PersistentReceiverRejectedOutcome, config.PersistentReceiverAcceptedOutcome), + }) + }, + config.PersistentReceiverRejectedOutcome, shouldAutoAck), + ) // describe + } // for + + // Config time fail cases + + // I don't think the setter should accept garbage. + It("Garbage to setter", func() { + receiverBuilder := messagingService.CreatePersistentMessageReceiverBuilder() + receiverBuilder.WithRequiredMessageOutcomeSupport("garbage") + receiver, err := receiverBuilder.Build(resource.QueueDurableExclusive(queueName)) + helpers.ValidateError(err, &solace.IllegalArgumentError{}) + Expect(receiver).To(BeNil()) + }) + It("Legit outcome mixed with garbage to setter", func() { + receiverBuilder := messagingService.CreatePersistentMessageReceiverBuilder() + receiverBuilder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverFailedOutcome, "garbage") + receiver, err := receiverBuilder.Build(resource.QueueDurableExclusive(queueName)) + helpers.ValidateError(err, &solace.IllegalArgumentError{}) + Expect(receiver).To(BeNil()) + }) + It("Legit outcome mixed with garbage to setter backwards", func() { + receiverBuilder := messagingService.CreatePersistentMessageReceiverBuilder() + receiverBuilder.WithRequiredMessageOutcomeSupport("garbage", config.PersistentReceiverFailedOutcome) + receiver, err := receiverBuilder.Build(resource.QueueDurableExclusive(queueName)) + helpers.ValidateError(err, &solace.IllegalArgumentError{}) + Expect(receiver).To(BeNil()) + }) + It("Garbage as property", func() { + receiverBuilder := messagingService.CreatePersistentMessageReceiverBuilder() + receiverBuilder.FromConfigurationProvider(config.ReceiverPropertyMap{ + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: "garbage", + }) + receiver, err := receiverBuilder.Build(resource.QueueDurableExclusive(queueName)) + helpers.ValidateError(err, &solace.IllegalArgumentError{}) + Expect(receiver).To(BeNil()) + }) + It("Garbage mixed in as property", func() { + receiverBuilder := messagingService.CreatePersistentMessageReceiverBuilder() + receiverBuilder.FromConfigurationProvider(config.ReceiverPropertyMap{ + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s,garbage",config.PersistentReceiverRejectedOutcome), + }) + receiver, err := receiverBuilder.Build(resource.QueueDurableExclusive(queueName)) + helpers.ValidateError(err, &solace.IllegalArgumentError{}) + Expect(receiver).To(BeNil()) + }) + It("Poor punctuation in property", func() { + receiverBuilder := messagingService.CreatePersistentMessageReceiverBuilder() + receiverBuilder.FromConfigurationProvider(config.ReceiverPropertyMap{ + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s %s",config.PersistentReceiverRejectedOutcome, config.PersistentReceiverAcceptedOutcome), + }) + receiver, err := receiverBuilder.Build(resource.QueueDurableExclusive(queueName)) + helpers.ValidateError(err, &solace.IllegalArgumentError{}) + Expect(receiver).To(BeNil()) + }) + + // Settle() time fail cases + + DescribeTable("Message Settlement Outcome with Client Ack Configured", + func(configFunc func(solace.PersistentMessageReceiverBuilder), outcome config.MessageSettlementOutcome, expectNackSupport bool, useReceiveAsync bool) { + receiverBuilder := messagingService.CreatePersistentMessageReceiverBuilder() + receiverBuilder.WithMessageClientAcknowledgement() // with Client- Ack configured (the default) + configFunc(receiverBuilder) + receiver, err := receiverBuilder.Build(resource.QueueDurableExclusive(queueName)) + Expect(err).ToNot(HaveOccurred()) + Expect(receiver.Start()).ToNot(HaveOccurred()) + + Expect(helpers.GetQueueMessages(queueName)).To(HaveLen(0)) + messagingService.Metrics().Reset() + + var msg message.InboundMessage + if useReceiveAsync { + msgChan := make(chan message.InboundMessage, 1) + receiver.ReceiveAsync(func(inboundMessage message.InboundMessage) { + defer GinkgoRecover() + Expect(helpers.GetQueueMessages(queueName)).To(HaveLen(1)) + msgChan <- inboundMessage + }) + helpers.PublishOnePersistentMessage(messagingService, topicString) + Eventually(msgChan).Should(Receive(&msg)) + } else { + helpers.PublishOnePersistentMessage(messagingService, topicString) + inboundMessage, err := receiver.ReceiveMessage(10 * time.Second) + Expect(err).ToNot(HaveOccurred()) + msg = inboundMessage + } + + err = receiver.Settle(msg, outcome) + // message should still be in the queue - for FAILED settlement outcome + if expectNackSupport && outcome == config.PersistentReceiverFailedOutcome { + Expect(err).ToNot(HaveOccurred()) + Consistently(func() []monitor.MsgVpnQueueMsg { + return helpers.GetQueueMessages(queueName) + }, 2*time.Second).Should(HaveLen(1)) + } else if expectNackSupport { + // should be removed from the queue - for ACCEPTED and REJECTED settlement outcomes + Expect(err).ToNot(HaveOccurred()) + Eventually(func() []monitor.MsgVpnQueueMsg { + return helpers.GetQueueMessages(queueName) + }, 2*time.Second).Should(HaveLen(0)) + } else { + Expect(err).To(HaveOccurred()) + } + }, + // Message settlement not enabled on flow, expect error while caling settle() with Nack + Entry( + "RejectedOutcome - NACK not supported on flow sync receive", func(builder solace.PersistentMessageReceiverBuilder) {}, + config.PersistentReceiverRejectedOutcome, + false, false), + Entry( + "FailedOutcome - NACK not supported on flow sync receive", func(builder solace.PersistentMessageReceiverBuilder) {}, + config.PersistentReceiverFailedOutcome, + false, false), + Entry( + "RejectedOutcome - NACK not supported on flow with async receive", func(builder solace.PersistentMessageReceiverBuilder) {}, + config.PersistentReceiverRejectedOutcome, + false, true), + Entry( + "FailedOutcome - NACK not supported on flow with async receive", func(builder solace.PersistentMessageReceiverBuilder) {}, + config.PersistentReceiverFailedOutcome, + false, true), + + // Nil Nack support through property - expected Nack to not be supported on flow + Entry("Property NACK not supported with sync receive", func(builder solace.PersistentMessageReceiverBuilder) { + builder.FromConfigurationProvider(config.ReceiverPropertyMap{ + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: nil, + }) + }, config.PersistentReceiverRejectedOutcome, false, false), + Entry("Property NACK not supported with async receive", func(builder solace.PersistentMessageReceiverBuilder) { + builder.FromConfigurationProvider(config.ReceiverPropertyMap{ + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: nil, + }) + }, config.PersistentReceiverRejectedOutcome, false, true), + + // With configured Nack support through builder + Entry("RejectedOutcome - Builder Nack support with sync receive", func(builder solace.PersistentMessageReceiverBuilder) { + builder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverFailedOutcome, config.PersistentReceiverRejectedOutcome) + }, config.PersistentReceiverRejectedOutcome, true, false), + Entry("FailedOutcome - Builder Nack support with sync receive", func(builder solace.PersistentMessageReceiverBuilder) { + builder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverFailedOutcome, config.PersistentReceiverRejectedOutcome) + }, config.PersistentReceiverFailedOutcome, true, false), + Entry("RejectedOutcome - Builder Nack support with async receive", func(builder solace.PersistentMessageReceiverBuilder) { + builder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverFailedOutcome, config.PersistentReceiverRejectedOutcome) + }, config.PersistentReceiverRejectedOutcome, true, true), + Entry("FailedOutcome - Builder Nack support with async receive", func(builder solace.PersistentMessageReceiverBuilder) { + builder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverFailedOutcome, config.PersistentReceiverRejectedOutcome) + }, config.PersistentReceiverFailedOutcome, true, true), + + // With configured Nack support through property + Entry("RejectedOutcome - Property Nack support with sync receive", func(builder solace.PersistentMessageReceiverBuilder) { + builder.FromConfigurationProvider(config.ReceiverPropertyMap{ + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s,%s", config.PersistentReceiverFailedOutcome, config.PersistentReceiverRejectedOutcome), + }) + }, config.PersistentReceiverRejectedOutcome, true, false), + Entry("FailedOutcome - Property Nack support with sync receive", func(builder solace.PersistentMessageReceiverBuilder) { + builder.FromConfigurationProvider(config.ReceiverPropertyMap{ + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s,%s", config.PersistentReceiverFailedOutcome, config.PersistentReceiverRejectedOutcome), + }) + }, config.PersistentReceiverFailedOutcome, true, false), + Entry("RejectedOutcome - Property Nack support with async receive", func(builder solace.PersistentMessageReceiverBuilder) { + builder.FromConfigurationProvider(config.ReceiverPropertyMap{ + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s,%s", config.PersistentReceiverFailedOutcome, config.PersistentReceiverRejectedOutcome), + }) + }, config.PersistentReceiverRejectedOutcome, true, true), + Entry("FailedOutcome - Property Nack support with async receive", func(builder solace.PersistentMessageReceiverBuilder) { + builder.FromConfigurationProvider(config.ReceiverPropertyMap{ + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s,%s", config.PersistentReceiverFailedOutcome, config.PersistentReceiverRejectedOutcome), + }) + }, config.PersistentReceiverFailedOutcome, true, true), + ) + + DescribeTable("Message Settlement Outcome with Auto Ack Configured", + func(configFunc func(solace.PersistentMessageReceiverBuilder), outcome config.MessageSettlementOutcome, useReceiveAsync bool) { + receiverBuilder := messagingService.CreatePersistentMessageReceiverBuilder() + receiverBuilder.WithMessageAutoAcknowledgement() // with Auto- Ack configured + configFunc(receiverBuilder) + receiver, err := receiverBuilder.Build(resource.QueueDurableExclusive(queueName)) + Expect(err).ToNot(HaveOccurred()) + Expect(receiver.Start()).ToNot(HaveOccurred()) + + Expect(helpers.GetQueueMessages(queueName)).To(HaveLen(0)) + messagingService.Metrics().Reset() + + var msg message.InboundMessage + if useReceiveAsync { + msgChan := make(chan message.InboundMessage, 1) + receiver.ReceiveAsync(func(inboundMessage message.InboundMessage) { + defer GinkgoRecover() + Expect(helpers.GetQueueMessages(queueName)).To(HaveLen(1)) + msgChan <- inboundMessage + }) + helpers.PublishOnePersistentMessage(messagingService, topicString) + Eventually(msgChan).Should(Receive(&msg)) + } else { + helpers.PublishOnePersistentMessage(messagingService, topicString) + inboundMessage, err := receiver.ReceiveMessage(10 * time.Second) + Expect(err).ToNot(HaveOccurred()) + msg = inboundMessage + } + + // no message should be in the queue since already Auto-Acked + err = receiver.Settle(msg, outcome) + // we don't expect any errors when calling settle() since message not founf + Expect(err).ToNot(HaveOccurred()) + Eventually(func() []monitor.MsgVpnQueueMsg { + return helpers.GetQueueMessages(queueName) + }, 2*time.Second).Should(HaveLen(0)) + }, + // With configured Nack support through builder + Entry("RejectedOutcome - Builder Nack support with sync receive", func(builder solace.PersistentMessageReceiverBuilder) { + builder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverFailedOutcome, config.PersistentReceiverRejectedOutcome) + }, config.PersistentReceiverRejectedOutcome, false), + Entry("FailedOutcome - Builder Nack support with sync receive", func(builder solace.PersistentMessageReceiverBuilder) { + builder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverFailedOutcome, config.PersistentReceiverRejectedOutcome) + }, config.PersistentReceiverFailedOutcome, false), + Entry("RejectedOutcome - Builder Nack support with async receive", func(builder solace.PersistentMessageReceiverBuilder) { + builder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverFailedOutcome, config.PersistentReceiverRejectedOutcome) + }, config.PersistentReceiverRejectedOutcome, true), + Entry("FailedOutcome - Builder Nack support with async receive", func(builder solace.PersistentMessageReceiverBuilder) { + builder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverFailedOutcome, config.PersistentReceiverRejectedOutcome) + }, config.PersistentReceiverFailedOutcome, true), + + // With configured Nack support through property + Entry("RejectedOutcome - Property Nack support with sync receive", func(builder solace.PersistentMessageReceiverBuilder) { + builder.FromConfigurationProvider(config.ReceiverPropertyMap{ + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s,%s", config.PersistentReceiverFailedOutcome, config.PersistentReceiverRejectedOutcome), + }) + }, config.PersistentReceiverRejectedOutcome, false), + Entry("FailedOutcome - Property Nack support with sync receive", func(builder solace.PersistentMessageReceiverBuilder) { + builder.FromConfigurationProvider(config.ReceiverPropertyMap{ + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s,%s", config.PersistentReceiverFailedOutcome, config.PersistentReceiverRejectedOutcome), + }) + }, config.PersistentReceiverFailedOutcome, false), + Entry("RejectedOutcome - Property Nack support with async receive", func(builder solace.PersistentMessageReceiverBuilder) { + builder.FromConfigurationProvider(config.ReceiverPropertyMap{ + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s,%s", config.PersistentReceiverFailedOutcome, config.PersistentReceiverRejectedOutcome), + }) + }, config.PersistentReceiverRejectedOutcome, true), + Entry("FailedOutcome - Property Nack support with async receive", func(builder solace.PersistentMessageReceiverBuilder) { + builder.FromConfigurationProvider(config.ReceiverPropertyMap{ + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s,%s", config.PersistentReceiverFailedOutcome, config.PersistentReceiverRejectedOutcome), + }) + }, config.PersistentReceiverFailedOutcome, true), + ) + Context("with an ACL deny exception", func() { const deniedTopic = "persistent-receiver-acl-deny" BeforeEach(func() {