From 90c3175d7ece3beaf675473914e9607aa82474cf Mon Sep 17 00:00:00 2001 From: Oseme Odigie Date: Wed, 2 Oct 2024 11:42:26 -0400 Subject: [PATCH 01/18] EBP-52: Implement the receiver builder configuration for message settlement outcome support --- .../persistent_message_receiver_impl.go | 66 +++++++++++++++++++ .../config/message_receiver_properties.go | 3 + .../config/messaging_receiver_constants.go | 34 ++++++++++ pkg/solace/persistent_message_receiver.go | 6 ++ 4 files changed, 109 insertions(+) create mode 100644 pkg/solace/config/messaging_receiver_constants.go diff --git a/internal/impl/receiver/persistent_message_receiver_impl.go b/internal/impl/receiver/persistent_message_receiver_impl.go index b70a59c..8390e5c 100644 --- a/internal/impl/receiver/persistent_message_receiver_impl.go +++ b/internal/impl/receiver/persistent_message_receiver_impl.go @@ -20,6 +20,7 @@ package receiver import ( "fmt" "runtime/debug" + "strings" "sync" "sync/atomic" "time" @@ -1314,6 +1315,44 @@ func (builder *persistentMessageReceiverBuilderImpl) Build(queue *resource.Queue } } + // message settlement outcome property + if settlementOutcomesInterface, ok := builder.properties[config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport]; ok { + if settlementOutcomesStr, ok := settlementOutcomesInterface.(string); ok { + addedOutcomes := make(map[string]bool) // to track duplicates + settlementOutcomes := strings.Split(settlementOutcomesStr, ",") + // iterate through to validate the message settlement outcome values + for _, settlementOutcome := range settlementOutcomes { + _, present, err := validation.StringPropertyValidation( + string(config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport), + settlementOutcome, + string(config.PersistentReceiverAcceptedOutcome), + string(config.PersistentReceiverFailedOutcome), + string(config.PersistentReceiverRejectedOutcome)) + + if !present || err != nil { + return nil, solace.NewError(&solace.IllegalArgumentError{}, + fmt.Sprintf("invalid value for receiver message settlement outcome, expected either 'ACCEPTED', 'FAILED' or 'REJECTED', got %T", settlementOutcome), nil) + } + + // ignore duplicate message settlement outcomes from the ccsmp properties array + if _, added := addedOutcomes[settlementOutcome]; !added { + // add the corresponding ccsmp property to the properties array + switch config.MessageSettlementOutcome(settlementOutcome) { + case config.PersistentReceiverFailedOutcome: + properties = append(properties, ccsmp.SolClientFlowPropRequiredOutcomeFailed, settlementOutcome) + case config.PersistentReceiverRejectedOutcome: + properties = append(properties, ccsmp.SolClientFlowPropRequiredOutcomeRejected, settlementOutcome) + case config.PersistentReceiverAcceptedOutcome: + default: + logging.Default.Info(builder.String() + ": Receiver message settlement outcome of 'ACCEPTED' is supported by default") + } + // mark it as added + addedOutcomes[settlementOutcome] = true + } + } + } + } + // Create the receiver with the given properties receiver := &persistentMessageReceiverImpl{} receiver.construct( @@ -1413,6 +1452,27 @@ func (builder *persistentMessageReceiverBuilderImpl) WithMessageReplay(strategy return builder } +// WithRequiredMessageOutcomeSupport configures the types of settlements the receiver can use. +// Any combination of PersistentReceiverAcceptedOutcome, PersistentReceiverFailedOutcome, and +// PersistentReceiverRejectedOutcome; the order is irrelevant. +// Attempting to Settle() a message later with an Outcome not listed here may result in an error. +func (builder *persistentMessageReceiverBuilderImpl) WithRequiredMessageOutcomeSupport(messageSettlementOutcomes ...config.MessageSettlementOutcome) solace.PersistentMessageReceiverBuilder { + settlementOutcomesStrArray := []string{} + addedOutcomes := make(map[config.MessageSettlementOutcome]bool) + for _, settlementOutcome := range messageSettlementOutcomes { + if _, added := addedOutcomes[settlementOutcome]; !added && isSupportedMessageSettlementOutcome(settlementOutcome) { + addedOutcomes[settlementOutcome] = true // mark it as added + settlementOutcomesStrArray = append(settlementOutcomesStrArray, string(settlementOutcome)) + } else { + logging.Default.Warning( + builder.String() + ": Unknown message settlement outcome(s) passed to WithRequiredMessageOutcomeSupport, " + + "allowed values are: 'config.PersistentReceiverAcceptedOutcome', 'config.PersistentReceiverFailedOutcome' and 'config.PersistentReceiverRejectedOutcome'") + } + } + builder.properties[config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport] = strings.Join(settlementOutcomesStrArray, ",") + return builder +} + func (builder *persistentMessageReceiverBuilderImpl) String() string { return fmt.Sprintf("solace.PersistentMessageReceiverBuilder at %p", builder) } @@ -1426,6 +1486,12 @@ func checkPersistentMessageReceiverSubscriptionType(subscription resource.Subscr return solace.NewError(&solace.IllegalArgumentError{}, fmt.Sprintf(constants.PersistentReceiverUnsupportedSubscriptionType, subscription), nil) } +func isSupportedMessageSettlementOutcome(messageSettlementOutcome config.MessageSettlementOutcome) bool { + return (messageSettlementOutcome == config.PersistentReceiverAcceptedOutcome || + messageSettlementOutcome == config.PersistentReceiverFailedOutcome || + messageSettlementOutcome == config.PersistentReceiverRejectedOutcome) +} + type persistentReceiverInfoImpl struct { resourceInfo *resourceInfoImpl } diff --git a/pkg/solace/config/message_receiver_properties.go b/pkg/solace/config/message_receiver_properties.go index 0f0ae49..ecaa3f3 100644 --- a/pkg/solace/config/message_receiver_properties.go +++ b/pkg/solace/config/message_receiver_properties.go @@ -86,6 +86,9 @@ const ( // ReceiverPropertyPersistentMessageAckStrategy specifies the acknowledgement strategy for the message receiver. ReceiverPropertyPersistentMessageAckStrategy ReceiverProperty = "solace.messaging.receiver.persistent.ack.strategy" + // ReceiverPropertyPersistentMessageRequiredOutcomeSupport for configuring the settlement outcomes for the message receiver. + ReceiverPropertyPersistentMessageRequiredOutcomeSupport ReceiverProperty = "solace.messaging.receiver.persistent.ack.required-message-outcome-support" + // ReceiverPropertyPersistentMessageReplayStrategy enables message replay and to specify a replay strategy. ReceiverPropertyPersistentMessageReplayStrategy ReceiverProperty = "solace.messaging.receiver.persistent.replay.strategy" diff --git a/pkg/solace/config/messaging_receiver_constants.go b/pkg/solace/config/messaging_receiver_constants.go new file mode 100644 index 0000000..79d0b3d --- /dev/null +++ b/pkg/solace/config/messaging_receiver_constants.go @@ -0,0 +1,34 @@ +// pubsubplus-go-client +// +// Copyright 2021-2024 Solace Corporation. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package config + +// Represents the type for supported message settlement outcome on a PersistentMessageReceiver. +type MessageSettlementOutcome string + +// The various message settlement outcomes available for use when configuring a PersistentMessageReceiver. +const ( + // Settles the message with a positive acknowledgement, removing it from the queue. + // Same as calling Ack() on the message. + PersistentReceiverAcceptedOutcome MessageSettlementOutcome = "ACCEPTED" + + // Settles the message with a negative acknowledgement without removing it from the queue. + // This may or may not make the message eligible for redelivery or eventually the DMQ, depending on the queue configuration. + PersistentReceiverFailedOutcome MessageSettlementOutcome = "FAILED" + + // Settles the message with a negative acknowledgement, removing it from the queue. + PersistentReceiverRejectedOutcome MessageSettlementOutcome = "REJECTED" +) diff --git a/pkg/solace/persistent_message_receiver.go b/pkg/solace/persistent_message_receiver.go index 2ad5e25..de9e92c 100644 --- a/pkg/solace/persistent_message_receiver.go +++ b/pkg/solace/persistent_message_receiver.go @@ -113,6 +113,12 @@ type PersistentMessageReceiverBuilder interface { // to when starting the receiver. Accepts *resource.TopicSubscription subscriptions. WithSubscriptions(topics ...resource.Subscription) PersistentMessageReceiverBuilder + // WithRequiredMessageOutcomeSupport configures the types of settlements the receiver can use. + // Any combination of PersistentReceiverAcceptedOutcome, PersistentReceiverFailedOutcome, and + // PersistentReceiverRejectedOutcome; the order is irrelevant. + // Attempting to Settle() a message later with an Outcome not listed here may result in an error. + WithRequiredMessageOutcomeSupport(messageSettlementOutcomes ...config.MessageSettlementOutcome) PersistentMessageReceiverBuilder + // FromConfigurationProvider configures the persistent receiver with the specified properties. // The built-in ReceiverPropertiesConfigurationProvider implementations include: // ReceiverPropertyMap, a map of ReceiverProperty keys to values From 37f85785ec84b21dcc724e5a5a39a8e391f1cfcf Mon Sep 17 00:00:00 2001 From: Oseme Odigie Date: Fri, 4 Oct 2024 13:45:22 -0400 Subject: [PATCH 02/18] EBP-54: Implement the Settle() method --- internal/ccsmp/ccsmp_flow.go | 19 +++++++ internal/impl/constants/error_strings.go | 9 ++++ internal/impl/core/receiver.go | 13 +++++ .../receiver/message_receiver_impl_test.go | 9 ++++ .../persistent_message_receiver_impl.go | 50 +++++++++++++++++++ pkg/solace/persistent_message_receiver.go | 10 ++++ 6 files changed, 110 insertions(+) diff --git a/internal/ccsmp/ccsmp_flow.go b/internal/ccsmp/ccsmp_flow.go index c482e33..efa271c 100644 --- a/internal/ccsmp/ccsmp_flow.go +++ b/internal/ccsmp/ccsmp_flow.go @@ -45,6 +45,18 @@ type SolClientFlowEventInfoPt = C.solClient_flow_eventCallbackInfo_pt // SolClientFlowRxMsgDispatchFuncInfo is assigned a value type SolClientFlowRxMsgDispatchFuncInfo = C.solClient_flow_rxMsgDispatchFuncInfo_t +// SolClientMessageSettlementOutcome is assigned a value +type SolClientMessageSettlementOutcome = C.solClient_msgOutcome_t + +// SolClientSettlementOutcomeAccepted - the message was successfully processed. +const SolClientSettlementOutcomeAccepted = C.SOLCLIENT_OUTCOME_ACCEPTED + +// SolClientSettlementOutcomeFailed - message processing failed temporarily, attempt redelivery if configured. +const SolClientSettlementOutcomeFailed = C.SOLCLIENT_OUTCOME_FAILED + +// SolClientSettlementOutcomeRejected - message was processed and rejected, removed from the queue. +const SolClientSettlementOutcomeRejected = C.SOLCLIENT_OUTCOME_REJECTED + // Callbacks // SolClientFlowMessageCallback is assigned a function @@ -174,6 +186,13 @@ func (flow *SolClientFlow) SolClientFlowAck(msgID SolClientMessageID) *SolClient }) } +// SolClientFlowSettleMessage function +func (flow *SolClientFlow) SolClientFlowSettleMessage(msgID SolClientMessageID, settlementOutcome SolClientMessageSettlementOutcome) *SolClientErrorInfoWrapper { + return handleCcsmpError(func() SolClientReturnCode { + return C.solClient_flow_settleMsg(flow.pointer, msgID, settlementOutcome) + }) +} + // SolClientFlowGetDestination function func (flow *SolClientFlow) SolClientFlowGetDestination() (name string, durable bool, err *SolClientErrorInfoWrapper) { var dest *SolClientDestination = &SolClientDestination{} diff --git a/internal/impl/constants/error_strings.go b/internal/impl/constants/error_strings.go index cca64cf..f67439a 100644 --- a/internal/impl/constants/error_strings.go +++ b/internal/impl/constants/error_strings.go @@ -70,6 +70,15 @@ const UnableToAcknowledgeAlreadyTerminated = "unable to acknowledge message: mes // UnableToAcknowledgeNotStarted error string const UnableToAcknowledgeNotStarted = "unable to acknowledge meessage: message receiver is not yet started" +// UnableToSettleAlreadyTerminated error string +const UnableToSettleAlreadyTerminated = "unable to settle message: message receiver has been terminated" + +// UnableToSettleNotStarted error string +const UnableToSettleNotStarted = "unable to settle meessage: message receiver is not yet started" + +// InvalidMessageSettlementOutcome error string +const InvalidMessageSettlementOutcome = "invalid message settlement outcome used to settle message" + // UnableToModifySubscriptionBadState error string const UnableToModifySubscriptionBadState = "unable to modify subscriptions in state %s" diff --git a/internal/impl/core/receiver.go b/internal/impl/core/receiver.go index fafd06f..3fdfeb5 100644 --- a/internal/impl/core/receiver.go +++ b/internal/impl/core/receiver.go @@ -85,7 +85,13 @@ type PersistentReceiver interface { // Unsubscribe will remove the subscription from the persistent receiver Unsubscribe(topic string) (SubscriptionCorrelationID, <-chan SubscriptionEvent, ErrorInfo) // Ack will acknowledge the given message + // This method is equivalent to calling the Settle method with the ACCEPTED outcome. Ack(msgID MessageID) ErrorInfo + // Settle generates and sends a positive or negative acknowledgement for a message.InboundMessage as + // indicated by the MessageSettlementOutcome argument. To use the negative outcomes FAILED and REJECTED, + // the receiver has to have been preconfigured via its builder to support these settlement outcomes. + // Attempts to settle a message on an auto-acking receiver is ignored for ACCEPTED and raises error for FAILED and REJECTED. + Settle(msgID MessageID, msgSettlementOutcome MessageSettlementOutcome) ErrorInfo // Destionation returns the destination if retrievable, or an error if one occurred Destination() (destination string, durable bool, errorInfo ErrorInfo) } @@ -99,6 +105,9 @@ type ReplyPublishable = ccsmp.SolClientMessagePt // MessageID type defined type MessageID = ccsmp.SolClientMessageID +// MessageSettlementOutcome type defined +type MessageSettlementOutcome = ccsmp.SolClientMessageSettlementOutcome + // RxCallback type defined type RxCallback func(msg Receivable) bool @@ -408,6 +417,10 @@ func (receiver *ccsmpBackedPersistentReceiver) Ack(msgID MessageID) ErrorInfo { return receiver.flow.SolClientFlowAck(msgID) } +func (receiver *ccsmpBackedPersistentReceiver) Settle(msgID MessageID, msgSettlementOutcome MessageSettlementOutcome) ErrorInfo { + return receiver.flow.SolClientFlowSettleMessage(msgID, msgSettlementOutcome) +} + func (receiver *ccsmpBackedPersistentReceiver) Destination() (destination string, durable bool, errorInfo ErrorInfo) { return receiver.flow.SolClientFlowGetDestination() } diff --git a/internal/impl/receiver/message_receiver_impl_test.go b/internal/impl/receiver/message_receiver_impl_test.go index 939ab9d..3d98bb1 100644 --- a/internal/impl/receiver/message_receiver_impl_test.go +++ b/internal/impl/receiver/message_receiver_impl_test.go @@ -378,6 +378,7 @@ type mockPersistentReceiver struct { subscribe func(topic string) (core.SubscriptionCorrelationID, <-chan core.SubscriptionEvent, core.ErrorInfo) unsubscribe func(topic string) (core.SubscriptionCorrelationID, <-chan core.SubscriptionEvent, core.ErrorInfo) ack func(msg core.MessageID) *ccsmp.SolClientErrorInfoWrapper + settle func(msg core.MessageID, outcome core.MessageSettlementOutcome) *ccsmp.SolClientErrorInfoWrapper } // Destroy destroys the flow @@ -429,6 +430,14 @@ func (mock *mockPersistentReceiver) Ack(msgID core.MessageID) *ccsmp.SolClientEr return nil } +// Settle will settle the given message with the provided outcome +func (mock *mockPersistentReceiver) Settle(msgID core.MessageID, outcome core.MessageSettlementOutcome) *ccsmp.SolClientErrorInfoWrapper { + if mock.settle != nil { + return mock.settle(msgID, outcome) + } + return nil +} + func (mock *mockPersistentReceiver) Destination() (destination string, durable bool, errorInfo *ccsmp.SolClientErrorInfoWrapper) { return } diff --git a/internal/impl/receiver/persistent_message_receiver_impl.go b/internal/impl/receiver/persistent_message_receiver_impl.go index 8390e5c..f5a73e7 100644 --- a/internal/impl/receiver/persistent_message_receiver_impl.go +++ b/internal/impl/receiver/persistent_message_receiver_impl.go @@ -809,6 +809,8 @@ func (receiver *persistentMessageReceiverImpl) ReceiverInfo() (solace.Persistent } // Ack will acknowledge a message +// This method is equivalent to calling the settle method with +// the ACCEPTED outcome like this: PersistentMessageReceiver.Settle(inboundMessage, config.PersistentReceiverAcceptedOutcome) func (receiver *persistentMessageReceiverImpl) Ack(msg apimessage.InboundMessage) error { state := receiver.getState() if state != messageReceiverStateStarted && state != messageReceiverStateTerminating { @@ -835,6 +837,54 @@ func (receiver *persistentMessageReceiverImpl) Ack(msg apimessage.InboundMessage return nil } +// Settle generates and sends a positive or negative acknowledgement for a message.InboundMessage as +// indicated by the MessageSettlementOutcome argument. To use the negative outcomes FAILED and REJECTED, +// the receiver has to have been preconfigured via its builder to support these settlement outcomes. +// Attempts to settle a message on an auto-acking receiver is ignored for ACCEPTED +// (albeit it counts as a manual ACCEPTED in the stats), raises error for FAILED and REJECTED. +// this returns an error object with the reason for the error if it was not possible to settle the message. +func (receiver *persistentMessageReceiverImpl) Settle(msg apimessage.InboundMessage, outcome config.MessageSettlementOutcome) error { + state := receiver.getState() + if state != messageReceiverStateStarted && state != messageReceiverStateTerminating { + var message string + if state == messageReceiverStateTerminated { + message = constants.UnableToSettleAlreadyTerminated + } else { + message = constants.UnableToSettleNotStarted + } + return solace.NewError(&solace.IllegalStateError{}, message, nil) + } + msgImpl, ok := msg.(*message.InboundMessageImpl) + if !ok { + return solace.NewError(&solace.IllegalArgumentError{}, fmt.Sprintf(constants.InvalidInboundMessageType, msg), nil) + } + msgID, present := message.GetMessageID(msgImpl) + if !present { + return solace.NewError(&solace.IllegalArgumentError{}, constants.UnableToRetrieveMessageID, nil) + } + + // to hold the settlement outcome + var msgSettlementOutcome ccsmp.SolClientMessageSettlementOutcome + + switch outcome { + case config.PersistentReceiverAcceptedOutcome: + msgSettlementOutcome = ccsmp.SolClientSettlementOutcomeAccepted + case config.PersistentReceiverFailedOutcome: + msgSettlementOutcome = ccsmp.SolClientSettlementOutcomeFailed + case config.PersistentReceiverRejectedOutcome: + msgSettlementOutcome = ccsmp.SolClientSettlementOutcomeRejected + default: + // return error here + return solace.NewError(&solace.IllegalArgumentError{}, constants.InvalidMessageSettlementOutcome, nil) + } + + errInfo := receiver.internalFlow.Settle(msgID, msgSettlementOutcome) + if errInfo != nil { + return core.ToNativeError(errInfo) + } + return nil +} + // Pause will pause the receiver's message delivery to asynchronous message handlers. // Pausing an already paused receiver will have no effect. // Returns an IllegalStateErorr if the receiver is not started or already terminated. diff --git a/pkg/solace/persistent_message_receiver.go b/pkg/solace/persistent_message_receiver.go index de9e92c..a1369be 100644 --- a/pkg/solace/persistent_message_receiver.go +++ b/pkg/solace/persistent_message_receiver.go @@ -29,8 +29,18 @@ type PersistentMessageReceiver interface { MessageReceiver // Include all functionality of MessageReceiver. // Ack acknowledges that a message was received. + // This method is equivalent to calling the settle method with + // the ACCEPTED outcome like this: PersistentMessageReceiver.Settle(inboundMessage, config.PersistentReceiverAcceptedOutcome) Ack(message message.InboundMessage) error + // Settle generates and sends a positive or negative acknowledgement for a message.InboundMessage as + // indicated by the MessageSettlementOutcome argument. To use the negative outcomes FAILED and REJECTED, + // the receiver has to have been preconfigured via its builder to support these settlement outcomes. + // Attempts to settle a message on an auto-acking receiver is ignored for ACCEPTED + // (albeit it counts as a manual ACCEPTED in the stats), raises error for FAILED and REJECTED. + // this returns an error object with the reason for the error if it was not possible to settle the message. + Settle(message message.InboundMessage, outcome config.MessageSettlementOutcome) error + // StartAsyncCallback starts the PersistentMessageReceiver asynchronously. // Calls the callback when started with an error if one occurred, otherwise nil // if successful. From 648b60f3d700a0d8af07115f5764cbe209b64210 Mon Sep 17 00:00:00 2001 From: Oseme Odigie Date: Mon, 7 Oct 2024 11:06:51 -0400 Subject: [PATCH 03/18] EBP-54: fixed doc string format on MessageSettlementOutcome --- pkg/solace/config/messaging_receiver_constants.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/solace/config/messaging_receiver_constants.go b/pkg/solace/config/messaging_receiver_constants.go index 79d0b3d..25c4e2b 100644 --- a/pkg/solace/config/messaging_receiver_constants.go +++ b/pkg/solace/config/messaging_receiver_constants.go @@ -16,7 +16,7 @@ package config -// Represents the type for supported message settlement outcome on a PersistentMessageReceiver. +// MessageSettlementOutcome - represents the type for supported message settlement outcome on a PersistentMessageReceiver. type MessageSettlementOutcome string // The various message settlement outcomes available for use when configuring a PersistentMessageReceiver. From fe4303c6eabfb23ee186cabb0d74fbebcce3ada7 Mon Sep 17 00:00:00 2001 From: Oseme Odigie Date: Tue, 8 Oct 2024 14:49:51 -0400 Subject: [PATCH 04/18] EBP-58: added new metrics and response code for Nack --- internal/impl/core/metrics.go | 3 +++ pkg/solace/metrics/metrics.go | 9 +++++++++ 2 files changed, 12 insertions(+) diff --git a/internal/impl/core/metrics.go b/internal/impl/core/metrics.go index c2dfa29..5cb9515 100644 --- a/internal/impl/core/metrics.go +++ b/internal/impl/core/metrics.go @@ -43,6 +43,9 @@ var rxMetrics = map[metrics.Metric]ccsmp.SolClientStatsRX{ metrics.TotalBytesReceived: ccsmp.SolClientStatsRXTotalDataBytes, metrics.TotalMessagesReceived: ccsmp.SolClientStatsRXTotalDataMsgs, metrics.CompressedBytesReceived: ccsmp.SolClientStatsRXCompressedBytes, + metrics.PersistentMessagesAccepted: ccsmp.SolClientStatsRXSettleAccepted, + metrics.PersistentMessagesFailed: ccsmp.SolClientStatsRXSettleFailed, + metrics.PersistentMessagesRejected: ccsmp.SolClientStatsRXSettleRejected, } var txMetrics = map[metrics.Metric]ccsmp.SolClientStatsTX{ diff --git a/pkg/solace/metrics/metrics.go b/pkg/solace/metrics/metrics.go index 1b72b4e..71ae434 100644 --- a/pkg/solace/metrics/metrics.go +++ b/pkg/solace/metrics/metrics.go @@ -110,6 +110,15 @@ const ( // persistent message receivers on the MessagingService. PersistentOutOfOrderMessagesDiscarded + // Number of messages settled with "ACCEPTED" outcome. + PersistentMessagesAccepted + + // Number of messages settled with "FAILED" outcome. + PersistentMessagesFailed + + // Number of messages settled with "REJECTED" outcome. + PersistentMessagesRejected + // PublishMessagesDiscarded is the number of messages discarded due to // channel failure. PublishMessagesDiscarded From 578d61449821703199bb6a46d9878371cf8ccce2 Mon Sep 17 00:00:00 2001 From: Oseme Odigie Date: Wed, 9 Oct 2024 14:10:43 -0400 Subject: [PATCH 05/18] EBP-58: tracked auto-acks and substract it from settledAccepted --- internal/impl/constants/error_strings.go | 6 - internal/impl/core/metrics.go | 112 ++++++++++-------- internal/impl/core/receiver.go | 8 +- .../receiver/message_receiver_impl_test.go | 25 ++-- .../persistent_message_receiver_impl.go | 30 ++--- 5 files changed, 93 insertions(+), 88 deletions(-) diff --git a/internal/impl/constants/error_strings.go b/internal/impl/constants/error_strings.go index f67439a..362eb87 100644 --- a/internal/impl/constants/error_strings.go +++ b/internal/impl/constants/error_strings.go @@ -64,12 +64,6 @@ const UnableToStartReceiver = "cannot start the receiver as it has already been // UnableToStartReceiverParentServiceNotStarted error string const UnableToStartReceiverParentServiceNotStarted = "cannot start receiver unless parent MessagingService is connected" -// UnableToAcknowledgeAlreadyTerminated error string -const UnableToAcknowledgeAlreadyTerminated = "unable to acknowledge message: message receiver has been terminated" - -// UnableToAcknowledgeNotStarted error string -const UnableToAcknowledgeNotStarted = "unable to acknowledge meessage: message receiver is not yet started" - // UnableToSettleAlreadyTerminated error string const UnableToSettleAlreadyTerminated = "unable to settle message: message receiver has been terminated" diff --git a/internal/impl/core/metrics.go b/internal/impl/core/metrics.go index 5cb9515..fc26ac4 100644 --- a/internal/impl/core/metrics.go +++ b/internal/impl/core/metrics.go @@ -108,8 +108,9 @@ type Metrics interface { // Implementation type ccsmpBackedMetrics struct { - session *ccsmp.SolClientSession - metrics []uint64 + session *ccsmp.SolClientSession + metrics []uint64 + duplicateAcks uint64 metricLock sync.RWMutex capturedTxMetrics map[ccsmp.SolClientStatsTX]uint64 @@ -118,98 +119,111 @@ type ccsmpBackedMetrics struct { func newCcsmpMetrics(session *ccsmp.SolClientSession) *ccsmpBackedMetrics { return &ccsmpBackedMetrics{ - metrics: make([]uint64, metricCount), - session: session, + metrics: make([]uint64, metricCount), + duplicateAcks: uint64(0), // used to track duplicate acks count (auto-acks) + session: session, } } -func (metrics *ccsmpBackedMetrics) terminate() { - metrics.metricLock.Lock() - defer metrics.metricLock.Unlock() - metrics.captureTXStats() - metrics.captureRXStats() +func (backedMetrics *ccsmpBackedMetrics) terminate() { + backedMetrics.metricLock.Lock() + defer backedMetrics.metricLock.Unlock() + backedMetrics.captureTXStats() + backedMetrics.captureRXStats() } -func (metrics *ccsmpBackedMetrics) captureTXStats() { - if metrics.capturedTxMetrics != nil { +func (backedMetrics *ccsmpBackedMetrics) captureTXStats() { + if backedMetrics.capturedTxMetrics != nil { return } - metrics.capturedTxMetrics = make(map[ccsmp.SolClientStatsTX]uint64) + backedMetrics.capturedTxMetrics = make(map[ccsmp.SolClientStatsTX]uint64) for _, txStat := range txMetrics { - metrics.capturedTxMetrics[txStat] = metrics.session.SolClientSessionGetTXStat(txStat) + backedMetrics.capturedTxMetrics[txStat] = backedMetrics.session.SolClientSessionGetTXStat(txStat) } } -func (metrics *ccsmpBackedMetrics) captureRXStats() { - if metrics.capturedRxMetrics != nil { +func (backedMetrics *ccsmpBackedMetrics) captureRXStats() { + if backedMetrics.capturedRxMetrics != nil { return } - metrics.capturedRxMetrics = make(map[ccsmp.SolClientStatsRX]uint64) + backedMetrics.capturedRxMetrics = make(map[ccsmp.SolClientStatsRX]uint64) for _, rxStat := range rxMetrics { - metrics.capturedRxMetrics[rxStat] = metrics.session.SolClientSessionGetRXStat(rxStat) + backedMetrics.capturedRxMetrics[rxStat] = backedMetrics.session.SolClientSessionGetRXStat(rxStat) } } -func (metrics *ccsmpBackedMetrics) getTXStat(stat ccsmp.SolClientStatsTX) uint64 { - metrics.metricLock.RLock() - defer metrics.metricLock.RUnlock() - if metrics.capturedTxMetrics != nil { - return metrics.capturedTxMetrics[stat] +func (backedMetrics *ccsmpBackedMetrics) getTXStat(stat ccsmp.SolClientStatsTX) uint64 { + backedMetrics.metricLock.RLock() + defer backedMetrics.metricLock.RUnlock() + if backedMetrics.capturedTxMetrics != nil { + return backedMetrics.capturedTxMetrics[stat] } - return metrics.session.SolClientSessionGetTXStat(stat) + return backedMetrics.session.SolClientSessionGetTXStat(stat) } -func (metrics *ccsmpBackedMetrics) getRXStat(stat ccsmp.SolClientStatsRX) uint64 { - metrics.metricLock.RLock() - defer metrics.metricLock.RUnlock() - if metrics.capturedRxMetrics != nil { - return metrics.capturedRxMetrics[stat] +func (backedMetrics *ccsmpBackedMetrics) getRXStat(stat ccsmp.SolClientStatsRX) uint64 { + backedMetrics.metricLock.RLock() + defer backedMetrics.metricLock.RUnlock() + if backedMetrics.capturedRxMetrics != nil { + return backedMetrics.capturedRxMetrics[stat] } - return metrics.session.SolClientSessionGetRXStat(stat) + return backedMetrics.session.SolClientSessionGetRXStat(stat) } -func (metrics *ccsmpBackedMetrics) getNextGenStat(metric NextGenMetric) uint64 { - return atomic.LoadUint64(&metrics.metrics[metric]) +func (backedMetrics *ccsmpBackedMetrics) getNextGenStat(metric NextGenMetric) uint64 { + return atomic.LoadUint64(&backedMetrics.metrics[metric]) } -func (metrics *ccsmpBackedMetrics) GetStat(metric metrics.Metric) uint64 { +func (backedMetrics *ccsmpBackedMetrics) GetStat(metric metrics.Metric) uint64 { if rxMetric, ok := rxMetrics[metric]; ok { - return metrics.getRXStat(rxMetric) + // check for duplicate counts due to auto-acks and remove from final result + rxStat := backedMetrics.getRXStat(rxMetric) + duplicateAck := uint64(0) + // take the difference and retrun as the settled accepted metric + if duplicateAck > 0 && metric == metrics.PersistentMessagesAccepted { + return rxStat - duplicateAck + } + return rxStat // return other types of metrics as is } else if txMetric, ok := txMetrics[metric]; ok { - return metrics.getTXStat(txMetric) + return backedMetrics.getTXStat(txMetric) } else if clientMetric, ok := clientMetrics[metric]; ok { - return metrics.getNextGenStat(clientMetric) + return backedMetrics.getNextGenStat(clientMetric) } logging.Default.Warning("Could not find mapping for metric with ID " + fmt.Sprint(metric)) return 0 } -func (metrics *ccsmpBackedMetrics) ResetStats() { +func (backedMetrics *ccsmpBackedMetrics) ResetStats() { for i := 0; i < metricCount; i++ { - atomic.StoreUint64(&metrics.metrics[i], 0) + atomic.StoreUint64(&backedMetrics.metrics[i], 0) } - metrics.resetNativeStats() + atomic.StoreUint64(&backedMetrics.duplicateAcks, 0) // reset this duplicate acks counter to zero + backedMetrics.resetNativeStats() } -func (metrics *ccsmpBackedMetrics) resetNativeStats() { - metrics.metricLock.Lock() - defer metrics.metricLock.Unlock() - if metrics.capturedRxMetrics != nil { - for key := range metrics.capturedRxMetrics { - metrics.capturedRxMetrics[key] = 0 +func (backedMetrics *ccsmpBackedMetrics) resetNativeStats() { + backedMetrics.metricLock.Lock() + defer backedMetrics.metricLock.Unlock() + if backedMetrics.capturedRxMetrics != nil { + for key := range backedMetrics.capturedRxMetrics { + backedMetrics.capturedRxMetrics[key] = 0 } - for key := range metrics.capturedTxMetrics { - metrics.capturedTxMetrics[key] = 0 + for key := range backedMetrics.capturedTxMetrics { + backedMetrics.capturedTxMetrics[key] = 0 } } else { - errorInfo := metrics.session.SolClientSessionClearStats() + errorInfo := backedMetrics.session.SolClientSessionClearStats() if errorInfo != nil { logging.Default.Warning("Could not reset metrics: " + errorInfo.String()) } } } -func (metrics *ccsmpBackedMetrics) IncrementMetric(metric NextGenMetric, amount uint64) { - atomic.AddUint64(&metrics.metrics[metric], amount) +func (backedMetrics *ccsmpBackedMetrics) IncrementMetric(metric NextGenMetric, amount uint64) { + atomic.AddUint64(&backedMetrics.metrics[metric], amount) +} + +func (backedMetrics *ccsmpBackedMetrics) incrementDuplicateAckCount() { + atomic.AddUint64(&backedMetrics.duplicateAcks, uint64(1)) } diff --git a/internal/impl/core/receiver.go b/internal/impl/core/receiver.go index 3fdfeb5..2744908 100644 --- a/internal/impl/core/receiver.go +++ b/internal/impl/core/receiver.go @@ -65,8 +65,10 @@ type Receiver interface { ProvisionEndpoint(queueName string, isExclusive bool) ErrorInfo // EndpointUnsubscribe will call endpoint unsubscribe on the endpoint EndpointUnsubscribe(queueName string, topic string) (SubscriptionCorrelationID, <-chan SubscriptionEvent, ErrorInfo) - // Increments receiver metrics + // IncrementMetric - Increments receiver metrics IncrementMetric(metric NextGenMetric, amount uint64) + // IncrementDuplicateAckCount - Increments receiver duplicate acks (track duplicate accepted settlement outcome metrics from auto-acks) + IncrementDuplicateAckCount() // Creates a new persistent receiver with the given callback NewPersistentReceiver(properties []string, callback RxCallback, eventCallback PersistentEventCallback) (PersistentReceiver, ErrorInfo) } @@ -289,6 +291,10 @@ func (receiver *ccsmpBackedReceiver) IncrementMetric(metric NextGenMetric, amoun receiver.metrics.IncrementMetric(metric, amount) } +func (receiver *ccsmpBackedReceiver) IncrementDuplicateAckCount() { + receiver.metrics.incrementDuplicateAckCount() +} + func (receiver *ccsmpBackedReceiver) ClearSubscriptionCorrelation(id SubscriptionCorrelationID) { receiver.subscriptionCorrelationLock.Lock() defer receiver.subscriptionCorrelationLock.Unlock() diff --git a/internal/impl/receiver/message_receiver_impl_test.go b/internal/impl/receiver/message_receiver_impl_test.go index 3d98bb1..723d37b 100644 --- a/internal/impl/receiver/message_receiver_impl_test.go +++ b/internal/impl/receiver/message_receiver_impl_test.go @@ -283,15 +283,16 @@ type result struct { } type mockInternalReceiver struct { - events func() core.Events - replier func() core.Replier - isRunning func() bool - registerRxCallback func(callback core.RxCallback) uintptr - unregisterRxCallback func(ptr uintptr) - subscribe func(topic string, ptr uintptr) (core.SubscriptionCorrelationID, <-chan core.SubscriptionEvent, core.ErrorInfo) - unsubscribe func(topic string, ptr uintptr) (core.SubscriptionCorrelationID, <-chan core.SubscriptionEvent, core.ErrorInfo) - incrementMetric func(metric core.NextGenMetric, amount uint64) - newPersistentReceiver func(props []string, callback core.RxCallback, eventCallback core.PersistentEventCallback) (core.PersistentReceiver, *ccsmp.SolClientErrorInfoWrapper) + events func() core.Events + replier func() core.Replier + isRunning func() bool + registerRxCallback func(callback core.RxCallback) uintptr + unregisterRxCallback func(ptr uintptr) + subscribe func(topic string, ptr uintptr) (core.SubscriptionCorrelationID, <-chan core.SubscriptionEvent, core.ErrorInfo) + unsubscribe func(topic string, ptr uintptr) (core.SubscriptionCorrelationID, <-chan core.SubscriptionEvent, core.ErrorInfo) + incrementMetric func(metric core.NextGenMetric, amount uint64) + incrementDuplicateAckCount func() + newPersistentReceiver func(props []string, callback core.RxCallback, eventCallback core.PersistentEventCallback) (core.PersistentReceiver, *ccsmp.SolClientErrorInfoWrapper) } func (mock *mockInternalReceiver) Events() core.Events { @@ -364,6 +365,12 @@ func (mock *mockInternalReceiver) IncrementMetric(metric core.NextGenMetric, amo } } +func (mock *mockInternalReceiver) IncrementDuplicateAckCount() { + if mock.incrementDuplicateAckCount != nil { + mock.incrementDuplicateAckCount() + } +} + func (mock *mockInternalReceiver) NewPersistentReceiver(props []string, callback core.RxCallback, eventCallback core.PersistentEventCallback) (core.PersistentReceiver, *ccsmp.SolClientErrorInfoWrapper) { if mock.newPersistentReceiver != nil { return mock.newPersistentReceiver(props, callback, eventCallback) diff --git a/internal/impl/receiver/persistent_message_receiver_impl.go b/internal/impl/receiver/persistent_message_receiver_impl.go index f5a73e7..98fb814 100644 --- a/internal/impl/receiver/persistent_message_receiver_impl.go +++ b/internal/impl/receiver/persistent_message_receiver_impl.go @@ -812,29 +812,8 @@ func (receiver *persistentMessageReceiverImpl) ReceiverInfo() (solace.Persistent // This method is equivalent to calling the settle method with // the ACCEPTED outcome like this: PersistentMessageReceiver.Settle(inboundMessage, config.PersistentReceiverAcceptedOutcome) func (receiver *persistentMessageReceiverImpl) Ack(msg apimessage.InboundMessage) error { - state := receiver.getState() - if state != messageReceiverStateStarted && state != messageReceiverStateTerminating { - var message string - if state == messageReceiverStateTerminated { - message = constants.UnableToAcknowledgeAlreadyTerminated - } else { - message = constants.UnableToAcknowledgeNotStarted - } - return solace.NewError(&solace.IllegalStateError{}, message, nil) - } - msgImpl, ok := msg.(*message.InboundMessageImpl) - if !ok { - return solace.NewError(&solace.IllegalArgumentError{}, fmt.Sprintf(constants.InvalidInboundMessageType, msg), nil) - } - msgID, present := message.GetMessageID(msgImpl) - if !present { - return solace.NewError(&solace.IllegalArgumentError{}, constants.UnableToRetrieveMessageID, nil) - } - errInfo := receiver.internalFlow.Ack(msgID) - if errInfo != nil { - return core.ToNativeError(errInfo) - } - return nil + // call the Settle() method with the accepted message settlement outcome + return receiver.Settle(msg, config.PersistentReceiverAcceptedOutcome) } // Settle generates and sends a positive or negative acknowledgement for a message.InboundMessage as @@ -1012,6 +991,8 @@ func (receiver *persistentMessageReceiverImpl) ReceiveMessage(timeout time.Durat msg.Dispose() return nil, core.ToNativeError(errInfo) } + // Successful Auto-ack, increment the auto-ack duplicate counter + receiver.internalReceiver.IncrementDuplicateAckCount() } else { receiver.logger.Error(fmt.Sprintf("Could not retrieve message ID from message %s", msg)) } @@ -1180,6 +1161,9 @@ func (receiver *persistentMessageReceiverImpl) run() { errInfo := receiver.internalFlow.Ack(msgID) if errInfo != nil { receiver.logger.Warning("Failed to acknowledge message: " + errInfo.GetMessageAsString() + ", sub code: " + fmt.Sprint(errInfo.SubCode)) + } else { + // Successful Auto-Ack, increment the auto-ack duplicate counter + receiver.internalReceiver.IncrementDuplicateAckCount() } } } From 4eb83a481d2981706dd94cd21c9284dbc609be3f Mon Sep 17 00:00:00 2001 From: Oseme Odigie Date: Thu, 10 Oct 2024 16:12:13 -0400 Subject: [PATCH 06/18] EBP-60: write unit and integration tests --- .../persistent_message_receiver_impl.go | 4 +- test/persistent_receiver_test.go | 204 ++++++++++++++++++ 2 files changed, 206 insertions(+), 2 deletions(-) diff --git a/internal/impl/receiver/persistent_message_receiver_impl.go b/internal/impl/receiver/persistent_message_receiver_impl.go index 98fb814..c6c02d8 100644 --- a/internal/impl/receiver/persistent_message_receiver_impl.go +++ b/internal/impl/receiver/persistent_message_receiver_impl.go @@ -1373,9 +1373,9 @@ func (builder *persistentMessageReceiverBuilderImpl) Build(queue *resource.Queue // add the corresponding ccsmp property to the properties array switch config.MessageSettlementOutcome(settlementOutcome) { case config.PersistentReceiverFailedOutcome: - properties = append(properties, ccsmp.SolClientFlowPropRequiredOutcomeFailed, settlementOutcome) + properties = append(properties, ccsmp.SolClientFlowPropRequiredOutcomeFailed, ccsmp.SolClientPropEnableVal) case config.PersistentReceiverRejectedOutcome: - properties = append(properties, ccsmp.SolClientFlowPropRequiredOutcomeRejected, settlementOutcome) + properties = append(properties, ccsmp.SolClientFlowPropRequiredOutcomeRejected, ccsmp.SolClientPropEnableVal) case config.PersistentReceiverAcceptedOutcome: default: logging.Default.Info(builder.String() + ": Receiver message settlement outcome of 'ACCEPTED' is supported by default") diff --git a/test/persistent_receiver_test.go b/test/persistent_receiver_test.go index c286b92..e3a248e 100644 --- a/test/persistent_receiver_test.go +++ b/test/persistent_receiver_test.go @@ -301,6 +301,10 @@ var _ = Describe("PersistentReceiver", func() { err := receiver.Ack(nil) helpers.ValidateError(err, &solace.IllegalStateError{}) }) + It("should fail to settle a message", func() { + err := receiver.Settle(nil, config.PersistentReceiverAcceptedOutcome) + helpers.ValidateError(err, &solace.IllegalStateError{}) + }) }) Context("with a terminated receiver", func() { @@ -334,6 +338,10 @@ var _ = Describe("PersistentReceiver", func() { err := receiver.Ack(nil) helpers.ValidateError(err, &solace.IllegalStateError{}) }) + It("should fail to settle a message", func() { + err := receiver.Settle(nil, config.PersistentReceiverAcceptedOutcome) + helpers.ValidateError(err, &solace.IllegalStateError{}) + }) }) It("should fail to start with an invalid queue name", func() { @@ -809,6 +817,15 @@ var _ = Describe("PersistentReceiver", func() { err := receiver.Ack(directMsg) helpers.ValidateError(err, &solace.IllegalArgumentError{}) }) + It("fails to settle a direct message", func() { + const topic = "direct-message-ack" + directMsgChan := helpers.ReceiveOneMessage(messagingService, topic) + helpers.PublishOneMessage(messagingService, topic) + var directMsg message.InboundMessage + Eventually(directMsgChan).Should(Receive(&directMsg)) + err := receiver.Settle(directMsg, config.PersistentReceiverRejectedOutcome) // should fail to settle message + helpers.ValidateError(err, &solace.IllegalArgumentError{}) + }) }) const numQueuedMessages = 10000 @@ -1210,6 +1227,193 @@ var _ = Describe("PersistentReceiver", func() { Expect(receiver.Terminate(10 * time.Second)).ToNot(HaveOccurred()) }) + DescribeTable("Message Settlement Outcome with Client Ack Configured", + func(configFunc func(solace.PersistentMessageReceiverBuilder), outcome config.MessageSettlementOutcome, expectNackSupport bool, useReceiveAsync bool) { + receiverBuilder := messagingService.CreatePersistentMessageReceiverBuilder() + receiverBuilder.WithMessageClientAcknowledgement() // with Client- Ack configured (the default) + configFunc(receiverBuilder) + receiver, err := receiverBuilder.Build(resource.QueueDurableExclusive(queueName)) + Expect(err).ToNot(HaveOccurred()) + Expect(receiver.Start()).ToNot(HaveOccurred()) + + Expect(helpers.GetQueueMessages(queueName)).To(HaveLen(0)) + messagingService.Metrics().Reset() + + var msg message.InboundMessage + if useReceiveAsync { + msgChan := make(chan message.InboundMessage, 1) + receiver.ReceiveAsync(func(inboundMessage message.InboundMessage) { + defer GinkgoRecover() + Expect(helpers.GetQueueMessages(queueName)).To(HaveLen(1)) + msgChan <- inboundMessage + }) + helpers.PublishOnePersistentMessage(messagingService, topicString) + Eventually(msgChan).Should(Receive(&msg)) + } else { + helpers.PublishOnePersistentMessage(messagingService, topicString) + inboundMessage, err := receiver.ReceiveMessage(10 * time.Second) + Expect(err).ToNot(HaveOccurred()) + msg = inboundMessage + } + + err = receiver.Settle(msg, outcome) + // message should still be in the queue - for FAILED settlement outcome + if expectNackSupport && outcome == config.PersistentReceiverFailedOutcome { + Expect(err).ToNot(HaveOccurred()) + Consistently(func() []monitor.MsgVpnQueueMsg { + return helpers.GetQueueMessages(queueName) + }, 2*time.Second).Should(HaveLen(1)) + } else if expectNackSupport { + // should be removed from the queue - for ACCEPTED and REJECTED settlement outcomes + Expect(err).ToNot(HaveOccurred()) + Eventually(func() []monitor.MsgVpnQueueMsg { + return helpers.GetQueueMessages(queueName) + }, 2*time.Second).Should(HaveLen(0)) + } else { + Expect(err).To(HaveOccurred()) + } + }, + // Message settlement not enabled on flow, expect error while caling settle() with Nack + Entry( + "RejectedOutcome - NACK not supported on flow sync receive", func(builder solace.PersistentMessageReceiverBuilder) {}, + config.PersistentReceiverRejectedOutcome, + false, false), + Entry( + "FailedOutcome - NACK not supported on flow sync receive", func(builder solace.PersistentMessageReceiverBuilder) {}, + config.PersistentReceiverFailedOutcome, + false, false), + Entry( + "RejectedOutcome - NACK not supported on flow with async receive", func(builder solace.PersistentMessageReceiverBuilder) {}, + config.PersistentReceiverRejectedOutcome, + false, true), + Entry( + "FailedOutcome - NACK not supported on flow with async receive", func(builder solace.PersistentMessageReceiverBuilder) {}, + config.PersistentReceiverFailedOutcome, + false, true), + + // Nil Nack support through property - expected Nack to not be supported on flow + Entry("Property NACK not supported with sync receive", func(builder solace.PersistentMessageReceiverBuilder) { + builder.FromConfigurationProvider(config.ReceiverPropertyMap{ + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: nil, + }) + }, config.PersistentReceiverRejectedOutcome, false, false), + Entry("Property NACK not supported with async receive", func(builder solace.PersistentMessageReceiverBuilder) { + builder.FromConfigurationProvider(config.ReceiverPropertyMap{ + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: nil, + }) + }, config.PersistentReceiverRejectedOutcome, false, true), + + // With configured Nack support through builder + Entry("RejectedOutcome - Builder Nack support with sync receive", func(builder solace.PersistentMessageReceiverBuilder) { + builder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverFailedOutcome, config.PersistentReceiverRejectedOutcome) + }, config.PersistentReceiverRejectedOutcome, true, false), + Entry("FailedOutcome - Builder Nack support with sync receive", func(builder solace.PersistentMessageReceiverBuilder) { + builder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverFailedOutcome, config.PersistentReceiverRejectedOutcome) + }, config.PersistentReceiverFailedOutcome, true, false), + Entry("RejectedOutcome - Builder Nack support with async receive", func(builder solace.PersistentMessageReceiverBuilder) { + builder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverFailedOutcome, config.PersistentReceiverRejectedOutcome) + }, config.PersistentReceiverRejectedOutcome, true, true), + Entry("FailedOutcome - Builder Nack support with async receive", func(builder solace.PersistentMessageReceiverBuilder) { + builder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverFailedOutcome, config.PersistentReceiverRejectedOutcome) + }, config.PersistentReceiverFailedOutcome, true, true), + + // With configured Nack support through property + Entry("RejectedOutcome - Property Nack support with sync receive", func(builder solace.PersistentMessageReceiverBuilder) { + builder.FromConfigurationProvider(config.ReceiverPropertyMap{ + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s,%s", config.PersistentReceiverFailedOutcome, config.PersistentReceiverRejectedOutcome), + }) + }, config.PersistentReceiverRejectedOutcome, true, false), + Entry("FailedOutcome - Property Nack support with sync receive", func(builder solace.PersistentMessageReceiverBuilder) { + builder.FromConfigurationProvider(config.ReceiverPropertyMap{ + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s,%s", config.PersistentReceiverFailedOutcome, config.PersistentReceiverRejectedOutcome), + }) + }, config.PersistentReceiverFailedOutcome, true, false), + Entry("RejectedOutcome - Property Nack support with async receive", func(builder solace.PersistentMessageReceiverBuilder) { + builder.FromConfigurationProvider(config.ReceiverPropertyMap{ + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s,%s", config.PersistentReceiverFailedOutcome, config.PersistentReceiverRejectedOutcome), + }) + }, config.PersistentReceiverRejectedOutcome, true, true), + Entry("FailedOutcome - Property Nack support with async receive", func(builder solace.PersistentMessageReceiverBuilder) { + builder.FromConfigurationProvider(config.ReceiverPropertyMap{ + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s,%s", config.PersistentReceiverFailedOutcome, config.PersistentReceiverRejectedOutcome), + }) + }, config.PersistentReceiverFailedOutcome, true, true), + ) + + DescribeTable("Message Settlement Outcome with Auto Ack Configured", + func(configFunc func(solace.PersistentMessageReceiverBuilder), outcome config.MessageSettlementOutcome, useReceiveAsync bool) { + receiverBuilder := messagingService.CreatePersistentMessageReceiverBuilder() + receiverBuilder.WithMessageAutoAcknowledgement() // with Auto- Ack configured + configFunc(receiverBuilder) + receiver, err := receiverBuilder.Build(resource.QueueDurableExclusive(queueName)) + Expect(err).ToNot(HaveOccurred()) + Expect(receiver.Start()).ToNot(HaveOccurred()) + + Expect(helpers.GetQueueMessages(queueName)).To(HaveLen(0)) + messagingService.Metrics().Reset() + + var msg message.InboundMessage + if useReceiveAsync { + msgChan := make(chan message.InboundMessage, 1) + receiver.ReceiveAsync(func(inboundMessage message.InboundMessage) { + defer GinkgoRecover() + Expect(helpers.GetQueueMessages(queueName)).To(HaveLen(1)) + msgChan <- inboundMessage + }) + helpers.PublishOnePersistentMessage(messagingService, topicString) + Eventually(msgChan).Should(Receive(&msg)) + } else { + helpers.PublishOnePersistentMessage(messagingService, topicString) + inboundMessage, err := receiver.ReceiveMessage(10 * time.Second) + Expect(err).ToNot(HaveOccurred()) + msg = inboundMessage + } + + // no message should be in the queue since already Auto-Acked + err = receiver.Settle(msg, outcome) + // we don't expect any errors when calling settle() since message not founf + Expect(err).ToNot(HaveOccurred()) + Eventually(func() []monitor.MsgVpnQueueMsg { + return helpers.GetQueueMessages(queueName) + }, 2*time.Second).Should(HaveLen(0)) + }, + // With configured Nack support through builder + Entry("RejectedOutcome - Builder Nack support with sync receive", func(builder solace.PersistentMessageReceiverBuilder) { + builder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverFailedOutcome, config.PersistentReceiverRejectedOutcome) + }, config.PersistentReceiverRejectedOutcome, false), + Entry("FailedOutcome - Builder Nack support with sync receive", func(builder solace.PersistentMessageReceiverBuilder) { + builder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverFailedOutcome, config.PersistentReceiverRejectedOutcome) + }, config.PersistentReceiverFailedOutcome, false), + Entry("RejectedOutcome - Builder Nack support with async receive", func(builder solace.PersistentMessageReceiverBuilder) { + builder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverFailedOutcome, config.PersistentReceiverRejectedOutcome) + }, config.PersistentReceiverRejectedOutcome, true), + Entry("FailedOutcome - Builder Nack support with async receive", func(builder solace.PersistentMessageReceiverBuilder) { + builder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverFailedOutcome, config.PersistentReceiverRejectedOutcome) + }, config.PersistentReceiverFailedOutcome, true), + + // With configured Nack support through property + Entry("RejectedOutcome - Property Nack support with sync receive", func(builder solace.PersistentMessageReceiverBuilder) { + builder.FromConfigurationProvider(config.ReceiverPropertyMap{ + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s,%s", config.PersistentReceiverFailedOutcome, config.PersistentReceiverRejectedOutcome), + }) + }, config.PersistentReceiverRejectedOutcome, false), + Entry("FailedOutcome - Property Nack support with sync receive", func(builder solace.PersistentMessageReceiverBuilder) { + builder.FromConfigurationProvider(config.ReceiverPropertyMap{ + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s,%s", config.PersistentReceiverFailedOutcome, config.PersistentReceiverRejectedOutcome), + }) + }, config.PersistentReceiverFailedOutcome, false), + Entry("RejectedOutcome - Property Nack support with async receive", func(builder solace.PersistentMessageReceiverBuilder) { + builder.FromConfigurationProvider(config.ReceiverPropertyMap{ + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s,%s", config.PersistentReceiverFailedOutcome, config.PersistentReceiverRejectedOutcome), + }) + }, config.PersistentReceiverRejectedOutcome, true), + Entry("FailedOutcome - Property Nack support with async receive", func(builder solace.PersistentMessageReceiverBuilder) { + builder.FromConfigurationProvider(config.ReceiverPropertyMap{ + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s,%s", config.PersistentReceiverFailedOutcome, config.PersistentReceiverRejectedOutcome), + }) + }, config.PersistentReceiverFailedOutcome, true), + ) + Context("with an ACL deny exception", func() { const deniedTopic = "persistent-receiver-acl-deny" BeforeEach(func() { From 94fdbfab4e45e760ac9a43309202f43f60add734 Mon Sep 17 00:00:00 2001 From: Gabor Szokoli Date: Tue, 15 Oct 2024 14:30:03 -0400 Subject: [PATCH 07/18] Adding some dummy tests to see through a small change. --- test/persistent_receiver_test.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/test/persistent_receiver_test.go b/test/persistent_receiver_test.go index e3a248e..d7e22a6 100644 --- a/test/persistent_receiver_test.go +++ b/test/persistent_receiver_test.go @@ -301,6 +301,18 @@ var _ = Describe("PersistentReceiver", func() { err := receiver.Ack(nil) helpers.ValidateError(err, &solace.IllegalStateError{}) }) + It("should fail to settle a message as accepted", func() { + err := receiver.Settle(nil, config.PersistentReceiverAcceptedOutcome) + helpers.ValidateError(err, &solace.IllegalStateError{}) + }) + It("should fail to settle a message as failed", func() { + err := receiver.Settle(nil, config.PersistentReceiverFailedOutcome) + helpers.ValidateError(err, &solace.IllegalStateError{}) + }) + It("should fail to settle a message as rejected", func() { + err := receiver.Settle(nil, config.PersistentReceiverRejectedOutcome) + helpers.ValidateError(err, &solace.IllegalStateError{}) + }) It("should fail to settle a message", func() { err := receiver.Settle(nil, config.PersistentReceiverAcceptedOutcome) helpers.ValidateError(err, &solace.IllegalStateError{}) From b9b192b305c784c4fd0b715672bab43d62f45f0f Mon Sep 17 00:00:00 2001 From: Gabor Szokoli Date: Tue, 15 Oct 2024 15:04:39 -0400 Subject: [PATCH 08/18] A few more simple tests. --- test/persistent_receiver_test.go | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/test/persistent_receiver_test.go b/test/persistent_receiver_test.go index d7e22a6..19ec52b 100644 --- a/test/persistent_receiver_test.go +++ b/test/persistent_receiver_test.go @@ -829,7 +829,7 @@ var _ = Describe("PersistentReceiver", func() { err := receiver.Ack(directMsg) helpers.ValidateError(err, &solace.IllegalArgumentError{}) }) - It("fails to settle a direct message", func() { + It("fails to settle a direct message as rejected", func() { const topic = "direct-message-ack" directMsgChan := helpers.ReceiveOneMessage(messagingService, topic) helpers.PublishOneMessage(messagingService, topic) @@ -838,6 +838,24 @@ var _ = Describe("PersistentReceiver", func() { err := receiver.Settle(directMsg, config.PersistentReceiverRejectedOutcome) // should fail to settle message helpers.ValidateError(err, &solace.IllegalArgumentError{}) }) + It("fails to settle a direct message as failed", func() { + const topic = "direct-message-ack" + directMsgChan := helpers.ReceiveOneMessage(messagingService, topic) + helpers.PublishOneMessage(messagingService, topic) + var directMsg message.InboundMessage + Eventually(directMsgChan).Should(Receive(&directMsg)) + err := receiver.Settle(directMsg, config.PersistentReceiverFailedOutcome) // should fail to settle message + helpers.ValidateError(err, &solace.IllegalArgumentError{}) + }) + It("fails to settle a direct message as accepted", func() { + const topic = "direct-message-ack" + directMsgChan := helpers.ReceiveOneMessage(messagingService, topic) + helpers.PublishOneMessage(messagingService, topic) + var directMsg message.InboundMessage + Eventually(directMsgChan).Should(Receive(&directMsg)) + err := receiver.Settle(directMsg, config.PersistentReceiverAcceptedOutcome) // should fail to settle message + helpers.ValidateError(err, &solace.IllegalArgumentError{}) + }) }) const numQueuedMessages = 10000 From 2e5f08658e719998229d8623737380f38b45940e Mon Sep 17 00:00:00 2001 From: Gabor Szokoli Date: Tue, 22 Oct 2024 10:25:01 -0400 Subject: [PATCH 09/18] EBP-79: Go NACK integration tests Still more reshuffling than substantial tests, but I'm getting there. --- test/persistent_receiver_test.go | 100 +++++++++++++++++++++---------- 1 file changed, 70 insertions(+), 30 deletions(-) diff --git a/test/persistent_receiver_test.go b/test/persistent_receiver_test.go index 19ec52b..d683591 100644 --- a/test/persistent_receiver_test.go +++ b/test/persistent_receiver_test.go @@ -301,20 +301,18 @@ var _ = Describe("PersistentReceiver", func() { err := receiver.Ack(nil) helpers.ValidateError(err, &solace.IllegalStateError{}) }) - It("should fail to settle a message as accepted", func() { - err := receiver.Settle(nil, config.PersistentReceiverAcceptedOutcome) - helpers.ValidateError(err, &solace.IllegalStateError{}) - }) - It("should fail to settle a message as failed", func() { - err := receiver.Settle(nil, config.PersistentReceiverFailedOutcome) - helpers.ValidateError(err, &solace.IllegalStateError{}) - }) - It("should fail to settle a message as rejected", func() { - err := receiver.Settle(nil, config.PersistentReceiverRejectedOutcome) - helpers.ValidateError(err, &solace.IllegalStateError{}) - }) - It("should fail to settle a message", func() { - err := receiver.Settle(nil, config.PersistentReceiverAcceptedOutcome) + DescribeTable("should fail to settle a message as ", + func(outcome config.MessageSettlementOutcome) { + err := receiver.Settle(nil, outcome) + helpers.ValidateError(err, &solace.IllegalStateError{}) + }, + Entry("accepted", config.PersistentReceiverAcceptedOutcome), + Entry("rejected", config.PersistentReceiverRejectedOutcome), + Entry("failed", config.PersistentReceiverFailedOutcome), + //Entry("garbage", "garbage"), + ) + It("should fail to settle a message with garbage", func() { + err := receiver.Settle(nil, "garbage") helpers.ValidateError(err, &solace.IllegalStateError{}) }) }) @@ -350,8 +348,18 @@ var _ = Describe("PersistentReceiver", func() { err := receiver.Ack(nil) helpers.ValidateError(err, &solace.IllegalStateError{}) }) - It("should fail to settle a message", func() { - err := receiver.Settle(nil, config.PersistentReceiverAcceptedOutcome) + DescribeTable("should fail to settle a message as ", + func(outcome config.MessageSettlementOutcome) { + err := receiver.Settle(nil, outcome) + helpers.ValidateError(err, &solace.IllegalStateError{}) + }, + Entry("accepted", config.PersistentReceiverAcceptedOutcome), + Entry("rejected", config.PersistentReceiverRejectedOutcome), + Entry("failed", config.PersistentReceiverFailedOutcome), + //Entry("garbage", "garbage"), + ) + It("should fail to settle a message with garbage", func() { + err := receiver.Settle(nil, "garbage") helpers.ValidateError(err, &solace.IllegalStateError{}) }) }) @@ -446,8 +454,11 @@ var _ = Describe("PersistentReceiver", func() { actualPayload, ok := msg.GetPayloadAsString() Expect(ok).To(BeTrue()) Expect(actualPayload).To(Equal(payload)) + + helpers.ValidateMetric(messagingService, metrics.PersistentMessagesAccepted, 0) err := receiver.Ack(msg) Expect(err).ToNot(HaveOccurred()) + helpers.ValidateMetric(messagingService, metrics.PersistentMessagesAccepted, 1) }) } @@ -829,31 +840,28 @@ var _ = Describe("PersistentReceiver", func() { err := receiver.Ack(directMsg) helpers.ValidateError(err, &solace.IllegalArgumentError{}) }) - It("fails to settle a direct message as rejected", func() { - const topic = "direct-message-ack" - directMsgChan := helpers.ReceiveOneMessage(messagingService, topic) - helpers.PublishOneMessage(messagingService, topic) - var directMsg message.InboundMessage - Eventually(directMsgChan).Should(Receive(&directMsg)) - err := receiver.Settle(directMsg, config.PersistentReceiverRejectedOutcome) // should fail to settle message - helpers.ValidateError(err, &solace.IllegalArgumentError{}) - }) - It("fails to settle a direct message as failed", func() { + DescribeTable("fails to settle a direct message as ", func(outcome config.MessageSettlementOutcome) { const topic = "direct-message-ack" directMsgChan := helpers.ReceiveOneMessage(messagingService, topic) helpers.PublishOneMessage(messagingService, topic) var directMsg message.InboundMessage Eventually(directMsgChan).Should(Receive(&directMsg)) - err := receiver.Settle(directMsg, config.PersistentReceiverFailedOutcome) // should fail to settle message + err := receiver.Settle(directMsg, outcome) // should fail to settle message helpers.ValidateError(err, &solace.IllegalArgumentError{}) - }) - It("fails to settle a direct message as accepted", func() { + }, + Entry("accepted", config.PersistentReceiverAcceptedOutcome), + Entry("rejected", config.PersistentReceiverRejectedOutcome), + Entry("failed", config.PersistentReceiverFailedOutcome), + //Entry("garbage", "garbage"), + ) + It("fails to settle a direct message as garbage", func() { const topic = "direct-message-ack" directMsgChan := helpers.ReceiveOneMessage(messagingService, topic) helpers.PublishOneMessage(messagingService, topic) var directMsg message.InboundMessage Eventually(directMsgChan).Should(Receive(&directMsg)) - err := receiver.Settle(directMsg, config.PersistentReceiverAcceptedOutcome) // should fail to settle message + // I don't know why this compiles, but the Entry version doesn't. + err := receiver.Settle(directMsg, "garbage") helpers.ValidateError(err, &solace.IllegalArgumentError{}) }) }) @@ -1257,6 +1265,38 @@ var _ = Describe("PersistentReceiver", func() { Expect(receiver.Terminate(10 * time.Second)).ToNot(HaveOccurred()) }) + DescribeTable("Happy cases for outcome configuration on ReceiverBuilder", + func(configFunc func(solace.PersistentMessageReceiverBuilder), outcome config.MessageSettlementOutcome) { + receiverBuilder := messagingService.CreatePersistentMessageReceiverBuilder() + configFunc(receiverBuilder) + receiver, err := receiverBuilder.Build(resource.QueueDurableExclusive(queueName)) + Expect(err).ToNot(HaveOccurred()) + Expect(receiver.Start()).ToNot(HaveOccurred()) + Expect(helpers.GetQueueMessages(queueName)).To(HaveLen(0)) + messagingService.Metrics().Reset() + Expect(err).ToNot(HaveOccurred()) + //TODO: metrics! Does the builder or receiver have getters for autoack? + + // I guess this drains the queue? + if (outcome == config.PersistentReceiverFailedOutcome){ + Consistently( + func() []monitor.MsgVpnQueueMsg { + return helpers.GetQueueMessages(queueName) + }, + 2*time.Second).Should(HaveLen(1)) + } + }, + Entry( + "Default can Accept", func(builder solace.PersistentMessageReceiverBuilder) {}, + config.PersistentReceiverAcceptedOutcome), + Entry( + "Accept via setter", func(builder solace.PersistentMessageReceiverBuilder) { + builder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverAcceptedOutcome) + }, + config.PersistentReceiverAcceptedOutcome), + ) + + DescribeTable("Message Settlement Outcome with Client Ack Configured", func(configFunc func(solace.PersistentMessageReceiverBuilder), outcome config.MessageSettlementOutcome, expectNackSupport bool, useReceiveAsync bool) { receiverBuilder := messagingService.CreatePersistentMessageReceiverBuilder() From 5410a87c2281f33b02248b2cda4c5f21468c0841 Mon Sep 17 00:00:00 2001 From: Gabor Szokoli Date: Thu, 24 Oct 2024 14:21:03 -0400 Subject: [PATCH 10/18] EBP-79: Go NACK integration tests More happy cases, plus a small bug they uncoverred fixed. --- internal/impl/core/metrics.go | 2 +- test/persistent_receiver_test.go | 199 ++++++++++++++++++++++++++----- 2 files changed, 171 insertions(+), 30 deletions(-) diff --git a/internal/impl/core/metrics.go b/internal/impl/core/metrics.go index fc26ac4..a327200 100644 --- a/internal/impl/core/metrics.go +++ b/internal/impl/core/metrics.go @@ -178,7 +178,7 @@ func (backedMetrics *ccsmpBackedMetrics) GetStat(metric metrics.Metric) uint64 { if rxMetric, ok := rxMetrics[metric]; ok { // check for duplicate counts due to auto-acks and remove from final result rxStat := backedMetrics.getRXStat(rxMetric) - duplicateAck := uint64(0) + duplicateAck := atomic.LoadUint64(&backedMetrics.duplicateAcks) // take the difference and retrun as the settled accepted metric if duplicateAck > 0 && metric == metrics.PersistentMessagesAccepted { return rxStat - duplicateAck diff --git a/test/persistent_receiver_test.go b/test/persistent_receiver_test.go index d683591..4e5032e 100644 --- a/test/persistent_receiver_test.go +++ b/test/persistent_receiver_test.go @@ -1265,38 +1265,179 @@ var _ = Describe("PersistentReceiver", func() { Expect(receiver.Terminate(10 * time.Second)).ToNot(HaveOccurred()) }) - DescribeTable("Happy cases for outcome configuration on ReceiverBuilder", - func(configFunc func(solace.PersistentMessageReceiverBuilder), outcome config.MessageSettlementOutcome) { - receiverBuilder := messagingService.CreatePersistentMessageReceiverBuilder() - configFunc(receiverBuilder) - receiver, err := receiverBuilder.Build(resource.QueueDurableExclusive(queueName)) - Expect(err).ToNot(HaveOccurred()) - Expect(receiver.Start()).ToNot(HaveOccurred()) - Expect(helpers.GetQueueMessages(queueName)).To(HaveLen(0)) - messagingService.Metrics().Reset() - Expect(err).ToNot(HaveOccurred()) - //TODO: metrics! Does the builder or receiver have getters for autoack? - // I guess this drains the queue? - if (outcome == config.PersistentReceiverFailedOutcome){ - Consistently( - func() []monitor.MsgVpnQueueMsg { - return helpers.GetQueueMessages(queueName) - }, - 2*time.Second).Should(HaveLen(1)) - } - }, - Entry( - "Default can Accept", func(builder solace.PersistentMessageReceiverBuilder) {}, - config.PersistentReceiverAcceptedOutcome), - Entry( - "Accept via setter", func(builder solace.PersistentMessageReceiverBuilder) { - builder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverAcceptedOutcome) + for _, autoAck := range []bool{true, false} { + autoAckText := "" + // ??? I don't speak go, so I'm just rolling with it. + autoAck2 := autoAck + if autoAck {autoAckText = " withAutoAck"} + FDescribeTable("Happy cases for outcome configuration on ReceiverBuilder" + autoAckText, + func(configFunc func(solace.PersistentMessageReceiverBuilder), outcome config.MessageSettlementOutcome) { + receiverBuilder := messagingService.CreatePersistentMessageReceiverBuilder() + configFunc(receiverBuilder) + if autoAck2 { + receiverBuilder.WithMessageAutoAcknowledgement() + } + receiver, err := receiverBuilder.Build(resource.QueueDurableExclusive(queueName)) + Expect(err).ToNot(HaveOccurred()) + Expect(receiver.Start()).ToNot(HaveOccurred()) + Expect(helpers.GetQueueMessages(queueName)).To(HaveLen(0)) + messagingService.Metrics().Reset() + Expect(err).ToNot(HaveOccurred()) + + helpers.PublishOnePersistentMessage(messagingService, topicString) + msg, err := receiver.ReceiveMessage(10 * time.Second) + Expect(err).ToNot(HaveOccurred()) + + err = receiver.Settle(msg, outcome) + Expect(err).ToNot(HaveOccurred()) + /* if (autoAck2) { + helpers.ValidateMetric(messagingService, metrics.PersistentMessagesAccepted, 0) + helpers.ValidateMetric(messagingService, metrics.PersistentMessagesRejected, 0) + helpers.ValidateMetric(messagingService, metrics.PersistentMessagesFailed, 0) + } else */ if (outcome == config.PersistentReceiverAcceptedOutcome) { + helpers.ValidateMetric(messagingService, metrics.PersistentMessagesAccepted, 1) + helpers.ValidateMetric(messagingService, metrics.PersistentMessagesRejected, 0) + helpers.ValidateMetric(messagingService, metrics.PersistentMessagesFailed, 0) + } else if (outcome == config.PersistentReceiverRejectedOutcome) { + helpers.ValidateMetric(messagingService, metrics.PersistentMessagesAccepted, 0) + helpers.ValidateMetric(messagingService, metrics.PersistentMessagesRejected, 1) + helpers.ValidateMetric(messagingService, metrics.PersistentMessagesFailed, 0) + } else if (outcome == config.PersistentReceiverFailedOutcome) { + helpers.ValidateMetric(messagingService, metrics.PersistentMessagesAccepted, 0) + helpers.ValidateMetric(messagingService, metrics.PersistentMessagesRejected, 0) + helpers.ValidateMetric(messagingService, metrics.PersistentMessagesFailed, 1) + } + // I guess this drains the queue? + if (!autoAck2 && outcome == config.PersistentReceiverFailedOutcome){ + Consistently( + func() []monitor.MsgVpnQueueMsg { + return helpers.GetQueueMessages(queueName) + }, + 2*time.Second).Should(HaveLen(1)) + } }, - config.PersistentReceiverAcceptedOutcome), - ) + Entry( + "Default can Accept", func(builder solace.PersistentMessageReceiverBuilder) {}, + config.PersistentReceiverAcceptedOutcome), + Entry( + "Accept via setter", func(builder solace.PersistentMessageReceiverBuilder) { + builder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverAcceptedOutcome) + }, + config.PersistentReceiverAcceptedOutcome), + Entry( + "Accept & Fail via setter can accept", func(builder solace.PersistentMessageReceiverBuilder) { + builder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverFailedOutcome, config.PersistentReceiverAcceptedOutcome) + }, + config.PersistentReceiverAcceptedOutcome), + Entry( + "Accept & Fail via setter can fail", func(builder solace.PersistentMessageReceiverBuilder) { + builder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverFailedOutcome, config.PersistentReceiverAcceptedOutcome) + }, + config.PersistentReceiverFailedOutcome), + Entry( + "Fail via setter can accept", func(builder solace.PersistentMessageReceiverBuilder) { + builder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverFailedOutcome) + }, + config.PersistentReceiverAcceptedOutcome), + Entry( + "Fail via setter can fail", func(builder solace.PersistentMessageReceiverBuilder) { + builder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverFailedOutcome) + }, + config.PersistentReceiverFailedOutcome), + Entry( + "Accept via property", func(builder solace.PersistentMessageReceiverBuilder) { + builder.FromConfigurationProvider(config.ReceiverPropertyMap{ + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s",config.PersistentReceiverAcceptedOutcome), + }) + }, + config.PersistentReceiverAcceptedOutcome), + Entry( + "Fail via property can accept", func(builder solace.PersistentMessageReceiverBuilder) { + builder.FromConfigurationProvider(config.ReceiverPropertyMap{ + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s",config.PersistentReceiverFailedOutcome), + }) + }, + config.PersistentReceiverAcceptedOutcome), + Entry( + "Fail via property can fail", func(builder solace.PersistentMessageReceiverBuilder) { + builder.FromConfigurationProvider(config.ReceiverPropertyMap{ + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s",config.PersistentReceiverFailedOutcome), + }) + }, + config.PersistentReceiverFailedOutcome), + Entry( + "Accept & Fail via property can accept", func(builder solace.PersistentMessageReceiverBuilder) { + builder.FromConfigurationProvider(config.ReceiverPropertyMap{ + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s,%s",config.PersistentReceiverFailedOutcome, config.PersistentReceiverAcceptedOutcome), + }) + }, + config.PersistentReceiverAcceptedOutcome), + Entry( + "Accept & Fail via property can fail", func(builder solace.PersistentMessageReceiverBuilder) { + builder.FromConfigurationProvider(config.ReceiverPropertyMap{ + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s,%s",config.PersistentReceiverFailedOutcome, config.PersistentReceiverAcceptedOutcome), + }) + }, + config.PersistentReceiverFailedOutcome), + + + Entry( + "Accept & Reject via setter can accept", func(builder solace.PersistentMessageReceiverBuilder) { + builder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverRejectedOutcome, config.PersistentReceiverAcceptedOutcome) + }, + config.PersistentReceiverAcceptedOutcome), + Entry( + "Accept & Reject via setter can reject", func(builder solace.PersistentMessageReceiverBuilder) { + builder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverRejectedOutcome, config.PersistentReceiverAcceptedOutcome) + }, + config.PersistentReceiverRejectedOutcome), + Entry( + "Reject via setter can accept", func(builder solace.PersistentMessageReceiverBuilder) { + builder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverRejectedOutcome) + }, + config.PersistentReceiverAcceptedOutcome), + Entry( + "Reject via setter can reject", func(builder solace.PersistentMessageReceiverBuilder) { + builder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverRejectedOutcome) + }, + config.PersistentReceiverRejectedOutcome), + Entry( + "Reject via property can accept", func(builder solace.PersistentMessageReceiverBuilder) { + builder.FromConfigurationProvider(config.ReceiverPropertyMap{ + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s",config.PersistentReceiverRejectedOutcome), + }) + }, + config.PersistentReceiverAcceptedOutcome), + Entry( + "Reject via property can reject", func(builder solace.PersistentMessageReceiverBuilder) { + builder.FromConfigurationProvider(config.ReceiverPropertyMap{ + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s",config.PersistentReceiverRejectedOutcome), + }) + }, + config.PersistentReceiverRejectedOutcome), + Entry( + "Accept & Reject via property can accept", func(builder solace.PersistentMessageReceiverBuilder) { + builder.FromConfigurationProvider(config.ReceiverPropertyMap{ + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s,%s",config.PersistentReceiverRejectedOutcome, config.PersistentReceiverAcceptedOutcome), + }) + }, + config.PersistentReceiverAcceptedOutcome), + Entry( + "Accept & Reject via property can reject", func(builder solace.PersistentMessageReceiverBuilder) { + builder.FromConfigurationProvider(config.ReceiverPropertyMap{ + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s,%s",config.PersistentReceiverRejectedOutcome, config.PersistentReceiverAcceptedOutcome), + }) + }, + config.PersistentReceiverRejectedOutcome), + ) // describe + } // for + // Config time fail cases + + // Settle() time fail cases + DescribeTable("Message Settlement Outcome with Client Ack Configured", func(configFunc func(solace.PersistentMessageReceiverBuilder), outcome config.MessageSettlementOutcome, expectNackSupport bool, useReceiveAsync bool) { receiverBuilder := messagingService.CreatePersistentMessageReceiverBuilder() @@ -1482,7 +1623,7 @@ var _ = Describe("PersistentReceiver", func() { config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s,%s", config.PersistentReceiverFailedOutcome, config.PersistentReceiverRejectedOutcome), }) }, config.PersistentReceiverFailedOutcome, true), - ) + ) Context("with an ACL deny exception", func() { const deniedTopic = "persistent-receiver-acl-deny" From 9ca4d042c68046f681cfc3d3cb9dd9d345b56c8b Mon Sep 17 00:00:00 2001 From: Gabor Szokoli Date: Thu, 24 Oct 2024 14:28:30 -0400 Subject: [PATCH 11/18] Left a test focused by accident, sorry. --- test/persistent_receiver_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/persistent_receiver_test.go b/test/persistent_receiver_test.go index 4e5032e..7ce58e3 100644 --- a/test/persistent_receiver_test.go +++ b/test/persistent_receiver_test.go @@ -1271,7 +1271,7 @@ var _ = Describe("PersistentReceiver", func() { // ??? I don't speak go, so I'm just rolling with it. autoAck2 := autoAck if autoAck {autoAckText = " withAutoAck"} - FDescribeTable("Happy cases for outcome configuration on ReceiverBuilder" + autoAckText, + DescribeTable("Happy cases for outcome configuration on ReceiverBuilder" + autoAckText, func(configFunc func(solace.PersistentMessageReceiverBuilder), outcome config.MessageSettlementOutcome) { receiverBuilder := messagingService.CreatePersistentMessageReceiverBuilder() configFunc(receiverBuilder) From af6044642e69b5459a23b9d06258354a0888a138 Mon Sep 17 00:00:00 2001 From: Oseme Odigie Date: Wed, 30 Oct 2024 09:40:39 -0400 Subject: [PATCH 12/18] EBP-61: Write unit tests for the NACK feature --- internal/impl/core/metrics_test.go | 32 +++++++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/internal/impl/core/metrics_test.go b/internal/impl/core/metrics_test.go index 6a7afd9..75b6d7f 100644 --- a/internal/impl/core/metrics_test.go +++ b/internal/impl/core/metrics_test.go @@ -16,7 +16,10 @@ package core -import "testing" +import ( + "sync/atomic" + "testing" +) func TestSolClientMetrics(t *testing.T) { metrics := []NextGenMetric{ @@ -24,6 +27,7 @@ func TestSolClientMetrics(t *testing.T) { MetricPublishMessagesTerminationDiscarded, MetricReceivedMessagesBackpressureDiscarded, MetricReceivedMessagesTerminationDiscarded, + MetricInternalDiscardNotifications, } for _, metric := range metrics { metricsImpl := newCcsmpMetrics(nil) @@ -36,3 +40,29 @@ func TestSolClientMetrics(t *testing.T) { } } } + +func TestIncrementDuplicateAckCount(t *testing.T) { + metricsImpl := newCcsmpMetrics(nil) + duplicateAck := atomic.LoadUint64(&metricsImpl.duplicateAcks) + if duplicateAck != 0 { + t.Error("Expected zero state of duplicate Acks counter to be 0") + } + metricsImpl.incrementDuplicateAckCount() + duplicateAck = atomic.LoadUint64(&metricsImpl.duplicateAcks) + if duplicateAck != 1 { + t.Error("Expected duplicate Acks to be 1") + } + for i := 0; i < 10; i++ { + // call 10 times + metricsImpl.incrementDuplicateAckCount() + } + duplicateAck = atomic.LoadUint64(&metricsImpl.duplicateAcks) + if duplicateAck != 11 { + t.Error("Expected duplicate Acks to be 11") + } + metricsImpl.ResetStats() // reset all stats + duplicateAck = atomic.LoadUint64(&metricsImpl.duplicateAcks) + if duplicateAck != 0 { + t.Error("Expected state of duplicate Acks counter to be 0 after calling Reset()") + } +} From 633ed96d7e0ea1d4f5ace04abbbb3eb0d373b353 Mon Sep 17 00:00:00 2001 From: Gabor Szokoli Date: Wed, 30 Oct 2024 12:11:27 -0400 Subject: [PATCH 13/18] EBP-79: a few more tests. --- test/persistent_receiver_test.go | 43 ++++++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/test/persistent_receiver_test.go b/test/persistent_receiver_test.go index c114c6b..f98e74f 100644 --- a/test/persistent_receiver_test.go +++ b/test/persistent_receiver_test.go @@ -1430,6 +1430,49 @@ var _ = Describe("PersistentReceiver", func() { // Config time fail cases + // I don't think the setter should accept garbage. + It("Garbage to setter", func() { + receiverBuilder := messagingService.CreatePersistentMessageReceiverBuilder() + receiverBuilder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverFailedOutcome, "garbage") + receiver, err := receiverBuilder.Build(resource.QueueDurableExclusive(queueName)) + helpers.ValidateError(err, &solace.IllegalArgumentError{}) + Expect(receiver).To(BeNil()) + }) + It("Legit outcome mixed with garbage to setter", func() { + receiverBuilder := messagingService.CreatePersistentMessageReceiverBuilder() + receiverBuilder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverFailedOutcome, config.PersistentReceiverAcceptedOutcome, "garbage") + receiver, err := receiverBuilder.Build(resource.QueueDurableExclusive(queueName)) + helpers.ValidateError(err, &solace.IllegalArgumentError{}) + Expect(receiver).To(BeNil()) + }) + FIt("Garbage as property", func() { + receiverBuilder := messagingService.CreatePersistentMessageReceiverBuilder() + receiverBuilder.FromConfigurationProvider(config.ReceiverPropertyMap{ + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: "garbage", + }) + receiver, err := receiverBuilder.Build(resource.QueueDurableExclusive(queueName)) + helpers.ValidateError(err, &solace.IllegalArgumentError{}) + Expect(receiver).To(BeNil()) + }) + FIt("Garbage mixed in as property", func() { + receiverBuilder := messagingService.CreatePersistentMessageReceiverBuilder() + receiverBuilder.FromConfigurationProvider(config.ReceiverPropertyMap{ + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s,garbage",config.PersistentReceiverRejectedOutcome), + }) + receiver, err := receiverBuilder.Build(resource.QueueDurableExclusive(queueName)) + helpers.ValidateError(err, &solace.IllegalArgumentError{}) + Expect(receiver).To(BeNil()) + }) + FIt("Poor punctuation in property", func() { + receiverBuilder := messagingService.CreatePersistentMessageReceiverBuilder() + receiverBuilder.FromConfigurationProvider(config.ReceiverPropertyMap{ + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s %s",config.PersistentReceiverRejectedOutcome, config.PersistentReceiverAcceptedOutcome), + }) + receiver, err := receiverBuilder.Build(resource.QueueDurableExclusive(queueName)) + helpers.ValidateError(err, &solace.IllegalArgumentError{}) + Expect(receiver).To(BeNil()) + }) + // Settle() time fail cases DescribeTable("Message Settlement Outcome with Client Ack Configured", From 49e8fadebf91f1d4860a820e4708919078b0a3fc Mon Sep 17 00:00:00 2001 From: Oseme Odigie Date: Wed, 30 Oct 2024 13:34:34 -0400 Subject: [PATCH 14/18] fixed the failing unit test --- internal/impl/core/metrics_test.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/internal/impl/core/metrics_test.go b/internal/impl/core/metrics_test.go index 75b6d7f..561ec24 100644 --- a/internal/impl/core/metrics_test.go +++ b/internal/impl/core/metrics_test.go @@ -60,9 +60,4 @@ func TestIncrementDuplicateAckCount(t *testing.T) { if duplicateAck != 11 { t.Error("Expected duplicate Acks to be 11") } - metricsImpl.ResetStats() // reset all stats - duplicateAck = atomic.LoadUint64(&metricsImpl.duplicateAcks) - if duplicateAck != 0 { - t.Error("Expected state of duplicate Acks counter to be 0 after calling Reset()") - } } From 71864ce28a2774b889cd94a02a7f17277271dea2 Mon Sep 17 00:00:00 2001 From: Gabor Szokoli Date: Mon, 4 Nov 2024 11:12:13 -0500 Subject: [PATCH 15/18] EBP-79: Finishing touches for integration tests --- test/persistent_receiver_test.go | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/test/persistent_receiver_test.go b/test/persistent_receiver_test.go index f98e74f..20b0a82 100644 --- a/test/persistent_receiver_test.go +++ b/test/persistent_receiver_test.go @@ -301,7 +301,7 @@ var _ = Describe("PersistentReceiver", func() { err := receiver.Ack(nil) helpers.ValidateError(err, &solace.IllegalStateError{}) }) - DescribeTable("should fail to settle a message as ", + DescribeTable("should fail to settle a nil message as ", func(outcome config.MessageSettlementOutcome) { err := receiver.Settle(nil, outcome) helpers.ValidateError(err, &solace.IllegalStateError{}) @@ -309,9 +309,8 @@ var _ = Describe("PersistentReceiver", func() { Entry("accepted", config.PersistentReceiverAcceptedOutcome), Entry("rejected", config.PersistentReceiverRejectedOutcome), Entry("failed", config.PersistentReceiverFailedOutcome), - //Entry("garbage", "garbage"), ) - It("should fail to settle a message with garbage", func() { + It("should fail to settle a nil message with garbage", func() { err := receiver.Settle(nil, "garbage") helpers.ValidateError(err, &solace.IllegalStateError{}) }) @@ -344,11 +343,11 @@ var _ = Describe("PersistentReceiver", func() { err := receiver.Pause() helpers.ValidateError(err, &solace.IllegalStateError{}) }) - It("should fail to acknowledge a message", func() { + It("should fail to acknowledge a nil message", func() { err := receiver.Ack(nil) helpers.ValidateError(err, &solace.IllegalStateError{}) }) - DescribeTable("should fail to settle a message as ", + DescribeTable("should fail to settle a nil message as ", func(outcome config.MessageSettlementOutcome) { err := receiver.Settle(nil, outcome) helpers.ValidateError(err, &solace.IllegalStateError{}) @@ -356,9 +355,8 @@ var _ = Describe("PersistentReceiver", func() { Entry("accepted", config.PersistentReceiverAcceptedOutcome), Entry("rejected", config.PersistentReceiverRejectedOutcome), Entry("failed", config.PersistentReceiverFailedOutcome), - //Entry("garbage", "garbage"), ) - It("should fail to settle a message with garbage", func() { + It("should fail to settle a nil message with garbage", func() { err := receiver.Settle(nil, "garbage") helpers.ValidateError(err, &solace.IllegalStateError{}) }) @@ -848,7 +846,6 @@ var _ = Describe("PersistentReceiver", func() { Entry("accepted", config.PersistentReceiverAcceptedOutcome), Entry("rejected", config.PersistentReceiverRejectedOutcome), Entry("failed", config.PersistentReceiverFailedOutcome), - //Entry("garbage", "garbage"), ) It("fails to settle a direct message as garbage", func() { const topic = "direct-message-ack" @@ -1433,19 +1430,26 @@ var _ = Describe("PersistentReceiver", func() { // I don't think the setter should accept garbage. It("Garbage to setter", func() { receiverBuilder := messagingService.CreatePersistentMessageReceiverBuilder() - receiverBuilder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverFailedOutcome, "garbage") + receiverBuilder.WithRequiredMessageOutcomeSupport("garbage") receiver, err := receiverBuilder.Build(resource.QueueDurableExclusive(queueName)) helpers.ValidateError(err, &solace.IllegalArgumentError{}) Expect(receiver).To(BeNil()) }) It("Legit outcome mixed with garbage to setter", func() { receiverBuilder := messagingService.CreatePersistentMessageReceiverBuilder() - receiverBuilder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverFailedOutcome, config.PersistentReceiverAcceptedOutcome, "garbage") + receiverBuilder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverFailedOutcome, "garbage") + receiver, err := receiverBuilder.Build(resource.QueueDurableExclusive(queueName)) + helpers.ValidateError(err, &solace.IllegalArgumentError{}) + Expect(receiver).To(BeNil()) + }) + It("Legit outcome mixed with garbage to setter backwards", func() { + receiverBuilder := messagingService.CreatePersistentMessageReceiverBuilder() + receiverBuilder.WithRequiredMessageOutcomeSupport("garbage", config.PersistentReceiverFailedOutcome) receiver, err := receiverBuilder.Build(resource.QueueDurableExclusive(queueName)) helpers.ValidateError(err, &solace.IllegalArgumentError{}) Expect(receiver).To(BeNil()) }) - FIt("Garbage as property", func() { + It("Garbage as property", func() { receiverBuilder := messagingService.CreatePersistentMessageReceiverBuilder() receiverBuilder.FromConfigurationProvider(config.ReceiverPropertyMap{ config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: "garbage", @@ -1454,7 +1458,7 @@ var _ = Describe("PersistentReceiver", func() { helpers.ValidateError(err, &solace.IllegalArgumentError{}) Expect(receiver).To(BeNil()) }) - FIt("Garbage mixed in as property", func() { + It("Garbage mixed in as property", func() { receiverBuilder := messagingService.CreatePersistentMessageReceiverBuilder() receiverBuilder.FromConfigurationProvider(config.ReceiverPropertyMap{ config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s,garbage",config.PersistentReceiverRejectedOutcome), @@ -1463,7 +1467,7 @@ var _ = Describe("PersistentReceiver", func() { helpers.ValidateError(err, &solace.IllegalArgumentError{}) Expect(receiver).To(BeNil()) }) - FIt("Poor punctuation in property", func() { + It("Poor punctuation in property", func() { receiverBuilder := messagingService.CreatePersistentMessageReceiverBuilder() receiverBuilder.FromConfigurationProvider(config.ReceiverPropertyMap{ config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s %s",config.PersistentReceiverRejectedOutcome, config.PersistentReceiverAcceptedOutcome), From 605ce3fdf66c1a708551d4101411badd9159e258 Mon Sep 17 00:00:00 2001 From: Gabor Szokoli Date: Mon, 4 Nov 2024 19:29:06 -0500 Subject: [PATCH 16/18] EBP-79: returning error on Build() if garbage was mixed in to setter. --- .../impl/receiver/persistent_message_receiver_impl.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/internal/impl/receiver/persistent_message_receiver_impl.go b/internal/impl/receiver/persistent_message_receiver_impl.go index 26f6915..4505a43 100644 --- a/internal/impl/receiver/persistent_message_receiver_impl.go +++ b/internal/impl/receiver/persistent_message_receiver_impl.go @@ -1494,13 +1494,18 @@ func (builder *persistentMessageReceiverBuilderImpl) WithRequiredMessageOutcomeS settlementOutcomesStrArray := []string{} addedOutcomes := make(map[config.MessageSettlementOutcome]bool) for _, settlementOutcome := range messageSettlementOutcomes { - if _, added := addedOutcomes[settlementOutcome]; !added && isSupportedMessageSettlementOutcome(settlementOutcome) { + if !isSupportedMessageSettlementOutcome(settlementOutcome) { + logging.Default.Warning( + builder.String() + ": Unknown message settlement outcome(s) passed to WithRequiredMessageOutcomeSupport, " + string(settlementOutcome) + + "allowed values are: 'config.PersistentReceiverAcceptedOutcome', 'config.PersistentReceiverFailedOutcome' and 'config.PersistentReceiverRejectedOutcome'") + // We'll still add it so we can return an error on Build(). The setter can't return the error even if it knows about it. + } + if _, added := addedOutcomes[settlementOutcome]; !added { addedOutcomes[settlementOutcome] = true // mark it as added settlementOutcomesStrArray = append(settlementOutcomesStrArray, string(settlementOutcome)) } else { logging.Default.Warning( - builder.String() + ": Unknown message settlement outcome(s) passed to WithRequiredMessageOutcomeSupport, " + - "allowed values are: 'config.PersistentReceiverAcceptedOutcome', 'config.PersistentReceiverFailedOutcome' and 'config.PersistentReceiverRejectedOutcome'") + builder.String() + ": Repeat message settlement outcome(s) passed to WithRequiredMessageOutcomeSupport: " + string(settlementOutcome)) } } builder.properties[config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport] = strings.Join(settlementOutcomesStrArray, ",") From c6894e600c5abf4ae14bb02048570240dbeac18e Mon Sep 17 00:00:00 2001 From: Oseme Odigie Date: Tue, 5 Nov 2024 08:44:15 -0500 Subject: [PATCH 17/18] SOL-59692: updated some integration test cases --- test/persistent_receiver_test.go | 89 ++++++++++++++++---------------- 1 file changed, 44 insertions(+), 45 deletions(-) diff --git a/test/persistent_receiver_test.go b/test/persistent_receiver_test.go index c114c6b..9b80900 100644 --- a/test/persistent_receiver_test.go +++ b/test/persistent_receiver_test.go @@ -845,9 +845,9 @@ var _ = Describe("PersistentReceiver", func() { err := receiver.Settle(directMsg, outcome) // should fail to settle message helpers.ValidateError(err, &solace.IllegalArgumentError{}) }, - Entry("accepted", config.PersistentReceiverAcceptedOutcome), - Entry("rejected", config.PersistentReceiverRejectedOutcome), - Entry("failed", config.PersistentReceiverFailedOutcome), + Entry("accepted", config.PersistentReceiverAcceptedOutcome), + Entry("rejected", config.PersistentReceiverRejectedOutcome), + Entry("failed", config.PersistentReceiverFailedOutcome), //Entry("garbage", "garbage"), ) It("fails to settle a direct message as garbage", func() { @@ -1259,17 +1259,18 @@ var _ = Describe("PersistentReceiver", func() { Expect(receiver.Terminate(10 * time.Second)).ToNot(HaveOccurred()) }) - for _, autoAck := range []bool{true, false} { autoAckText := "" // ??? I don't speak go, so I'm just rolling with it. - autoAck2 := autoAck - if autoAck {autoAckText = " withAutoAck"} - DescribeTable("Happy cases for outcome configuration on ReceiverBuilder" + autoAckText, - func(configFunc func(solace.PersistentMessageReceiverBuilder), outcome config.MessageSettlementOutcome) { + shouldAutoAck := autoAck + if autoAck { + autoAckText = " withAutoAck" + } + DescribeTable("Happy cases for outcome configuration on ReceiverBuilder"+autoAckText, + func(configFunc func(solace.PersistentMessageReceiverBuilder), outcome config.MessageSettlementOutcome, autoAckConfig bool) { receiverBuilder := messagingService.CreatePersistentMessageReceiverBuilder() configFunc(receiverBuilder) - if autoAck2 { + if autoAckConfig { receiverBuilder.WithMessageAutoAcknowledgement() } receiver, err := receiverBuilder.Build(resource.QueueDurableExclusive(queueName)) @@ -1285,25 +1286,25 @@ var _ = Describe("PersistentReceiver", func() { err = receiver.Settle(msg, outcome) Expect(err).ToNot(HaveOccurred()) - /* if (autoAck2) { + /* if (autoAckConfig) { helpers.ValidateMetric(messagingService, metrics.PersistentMessagesAccepted, 0) helpers.ValidateMetric(messagingService, metrics.PersistentMessagesRejected, 0) helpers.ValidateMetric(messagingService, metrics.PersistentMessagesFailed, 0) - } else */ if (outcome == config.PersistentReceiverAcceptedOutcome) { + } else */if outcome == config.PersistentReceiverAcceptedOutcome { helpers.ValidateMetric(messagingService, metrics.PersistentMessagesAccepted, 1) helpers.ValidateMetric(messagingService, metrics.PersistentMessagesRejected, 0) helpers.ValidateMetric(messagingService, metrics.PersistentMessagesFailed, 0) - } else if (outcome == config.PersistentReceiverRejectedOutcome) { + } else if outcome == config.PersistentReceiverRejectedOutcome { helpers.ValidateMetric(messagingService, metrics.PersistentMessagesAccepted, 0) helpers.ValidateMetric(messagingService, metrics.PersistentMessagesRejected, 1) helpers.ValidateMetric(messagingService, metrics.PersistentMessagesFailed, 0) - } else if (outcome == config.PersistentReceiverFailedOutcome) { + } else if outcome == config.PersistentReceiverFailedOutcome { helpers.ValidateMetric(messagingService, metrics.PersistentMessagesAccepted, 0) helpers.ValidateMetric(messagingService, metrics.PersistentMessagesRejected, 0) helpers.ValidateMetric(messagingService, metrics.PersistentMessagesFailed, 1) } // I guess this drains the queue? - if (!autoAck2 && outcome == config.PersistentReceiverFailedOutcome){ + if !autoAckConfig && outcome == config.PersistentReceiverFailedOutcome { Consistently( func() []monitor.MsgVpnQueueMsg { return helpers.GetQueueMessages(queueName) @@ -1313,121 +1314,119 @@ var _ = Describe("PersistentReceiver", func() { }, Entry( "Default can Accept", func(builder solace.PersistentMessageReceiverBuilder) {}, - config.PersistentReceiverAcceptedOutcome), + config.PersistentReceiverAcceptedOutcome, shouldAutoAck), Entry( "Accept via setter", func(builder solace.PersistentMessageReceiverBuilder) { builder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverAcceptedOutcome) }, - config.PersistentReceiverAcceptedOutcome), + config.PersistentReceiverAcceptedOutcome, shouldAutoAck), Entry( "Accept & Fail via setter can accept", func(builder solace.PersistentMessageReceiverBuilder) { builder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverFailedOutcome, config.PersistentReceiverAcceptedOutcome) }, - config.PersistentReceiverAcceptedOutcome), + config.PersistentReceiverAcceptedOutcome, shouldAutoAck), Entry( "Accept & Fail via setter can fail", func(builder solace.PersistentMessageReceiverBuilder) { builder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverFailedOutcome, config.PersistentReceiverAcceptedOutcome) }, - config.PersistentReceiverFailedOutcome), + config.PersistentReceiverFailedOutcome, shouldAutoAck), Entry( "Fail via setter can accept", func(builder solace.PersistentMessageReceiverBuilder) { builder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverFailedOutcome) }, - config.PersistentReceiverAcceptedOutcome), + config.PersistentReceiverAcceptedOutcome, shouldAutoAck), Entry( "Fail via setter can fail", func(builder solace.PersistentMessageReceiverBuilder) { builder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverFailedOutcome) }, - config.PersistentReceiverFailedOutcome), + config.PersistentReceiverFailedOutcome, shouldAutoAck), Entry( "Accept via property", func(builder solace.PersistentMessageReceiverBuilder) { builder.FromConfigurationProvider(config.ReceiverPropertyMap{ - config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s",config.PersistentReceiverAcceptedOutcome), + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s", config.PersistentReceiverAcceptedOutcome), }) }, - config.PersistentReceiverAcceptedOutcome), + config.PersistentReceiverAcceptedOutcome, shouldAutoAck), Entry( "Fail via property can accept", func(builder solace.PersistentMessageReceiverBuilder) { builder.FromConfigurationProvider(config.ReceiverPropertyMap{ - config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s",config.PersistentReceiverFailedOutcome), + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s", config.PersistentReceiverFailedOutcome), }) }, - config.PersistentReceiverAcceptedOutcome), + config.PersistentReceiverAcceptedOutcome, shouldAutoAck), Entry( "Fail via property can fail", func(builder solace.PersistentMessageReceiverBuilder) { builder.FromConfigurationProvider(config.ReceiverPropertyMap{ - config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s",config.PersistentReceiverFailedOutcome), + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s", config.PersistentReceiverFailedOutcome), }) }, - config.PersistentReceiverFailedOutcome), + config.PersistentReceiverFailedOutcome, shouldAutoAck), Entry( "Accept & Fail via property can accept", func(builder solace.PersistentMessageReceiverBuilder) { builder.FromConfigurationProvider(config.ReceiverPropertyMap{ - config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s,%s",config.PersistentReceiverFailedOutcome, config.PersistentReceiverAcceptedOutcome), + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s,%s", config.PersistentReceiverFailedOutcome, config.PersistentReceiverAcceptedOutcome), }) }, - config.PersistentReceiverAcceptedOutcome), + config.PersistentReceiverAcceptedOutcome, shouldAutoAck), Entry( "Accept & Fail via property can fail", func(builder solace.PersistentMessageReceiverBuilder) { builder.FromConfigurationProvider(config.ReceiverPropertyMap{ - config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s,%s",config.PersistentReceiverFailedOutcome, config.PersistentReceiverAcceptedOutcome), + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s,%s", config.PersistentReceiverFailedOutcome, config.PersistentReceiverAcceptedOutcome), }) }, - config.PersistentReceiverFailedOutcome), - + config.PersistentReceiverFailedOutcome, shouldAutoAck), Entry( "Accept & Reject via setter can accept", func(builder solace.PersistentMessageReceiverBuilder) { builder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverRejectedOutcome, config.PersistentReceiverAcceptedOutcome) }, - config.PersistentReceiverAcceptedOutcome), + config.PersistentReceiverAcceptedOutcome, shouldAutoAck), Entry( "Accept & Reject via setter can reject", func(builder solace.PersistentMessageReceiverBuilder) { builder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverRejectedOutcome, config.PersistentReceiverAcceptedOutcome) }, - config.PersistentReceiverRejectedOutcome), + config.PersistentReceiverRejectedOutcome, shouldAutoAck), Entry( "Reject via setter can accept", func(builder solace.PersistentMessageReceiverBuilder) { builder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverRejectedOutcome) }, - config.PersistentReceiverAcceptedOutcome), + config.PersistentReceiverAcceptedOutcome, shouldAutoAck), Entry( "Reject via setter can reject", func(builder solace.PersistentMessageReceiverBuilder) { builder.WithRequiredMessageOutcomeSupport(config.PersistentReceiverRejectedOutcome) }, - config.PersistentReceiverRejectedOutcome), + config.PersistentReceiverRejectedOutcome, shouldAutoAck), Entry( "Reject via property can accept", func(builder solace.PersistentMessageReceiverBuilder) { builder.FromConfigurationProvider(config.ReceiverPropertyMap{ - config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s",config.PersistentReceiverRejectedOutcome), + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s", config.PersistentReceiverRejectedOutcome), }) }, - config.PersistentReceiverAcceptedOutcome), + config.PersistentReceiverAcceptedOutcome, shouldAutoAck), Entry( "Reject via property can reject", func(builder solace.PersistentMessageReceiverBuilder) { builder.FromConfigurationProvider(config.ReceiverPropertyMap{ - config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s",config.PersistentReceiverRejectedOutcome), + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s", config.PersistentReceiverRejectedOutcome), }) }, - config.PersistentReceiverRejectedOutcome), + config.PersistentReceiverRejectedOutcome, shouldAutoAck), Entry( "Accept & Reject via property can accept", func(builder solace.PersistentMessageReceiverBuilder) { builder.FromConfigurationProvider(config.ReceiverPropertyMap{ - config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s,%s",config.PersistentReceiverRejectedOutcome, config.PersistentReceiverAcceptedOutcome), + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s,%s", config.PersistentReceiverRejectedOutcome, config.PersistentReceiverAcceptedOutcome), }) }, - config.PersistentReceiverAcceptedOutcome), + config.PersistentReceiverAcceptedOutcome, shouldAutoAck), Entry( "Accept & Reject via property can reject", func(builder solace.PersistentMessageReceiverBuilder) { builder.FromConfigurationProvider(config.ReceiverPropertyMap{ - config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s,%s",config.PersistentReceiverRejectedOutcome, config.PersistentReceiverAcceptedOutcome), + config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s,%s", config.PersistentReceiverRejectedOutcome, config.PersistentReceiverAcceptedOutcome), }) }, - config.PersistentReceiverRejectedOutcome), + config.PersistentReceiverRejectedOutcome, shouldAutoAck), ) // describe } // for - // Config time fail cases // Settle() time fail cases @@ -1617,7 +1616,7 @@ var _ = Describe("PersistentReceiver", func() { config.ReceiverPropertyPersistentMessageRequiredOutcomeSupport: fmt.Sprintf("%s,%s", config.PersistentReceiverFailedOutcome, config.PersistentReceiverRejectedOutcome), }) }, config.PersistentReceiverFailedOutcome, true), - ) + ) Context("with an ACL deny exception", func() { const deniedTopic = "persistent-receiver-acl-deny" From 00b5a1441a95a01d657ab9e998e935e50e768d22 Mon Sep 17 00:00:00 2001 From: Oseme Odigie Date: Thu, 5 Dec 2024 15:00:32 -0500 Subject: [PATCH 18/18] EBP-368: bumped up the Go API version to 1.8.0 --- version.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/version.go b/version.go index f886197..9727653 100644 --- a/version.go +++ b/version.go @@ -23,4 +23,4 @@ func init() { core.SetVersion(version) } -const version = "1.7.0" +const version = "1.8.0"