From 25b6de1040925befed633c223c364fd597e0a685 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Tue, 21 Nov 2023 23:27:10 +0800 Subject: [PATCH 1/4] set dlq producerName --- pulsar/consumer_impl.go | 2 +- pulsar/consumer_regex_test.go | 4 ++-- pulsar/consumer_test.go | 6 +++++- pulsar/dlq_router.go | 28 ++++++++++++++++++---------- pulsar/reader_impl.go | 2 +- 5 files changed, 27 insertions(+), 15 deletions(-) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index d782beab5e..d701ab16d6 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -167,7 +167,7 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) { } } - dlq, err := newDlqRouter(client, options.DLQ, client.log) + dlq, err := newDlqRouter(client, options.DLQ, options.Topic, options.SubscriptionName, client.log) if err != nil { return nil, err } diff --git a/pulsar/consumer_regex_test.go b/pulsar/consumer_regex_test.go index 9cb600fe3a..de9461341f 100644 --- a/pulsar/consumer_regex_test.go +++ b/pulsar/consumer_regex_test.go @@ -154,7 +154,7 @@ func runRegexConsumerDiscoverPatternAll(t *testing.T, c Client, namespace string AutoDiscoveryPeriod: 5 * time.Minute, } - dlq, _ := newDlqRouter(c.(*client), nil, log.DefaultNopLogger()) + dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", log.DefaultNopLogger()) rlq, _ := newRetryRouter(c.(*client), nil, false, log.DefaultNopLogger()) consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq) if err != nil { @@ -192,7 +192,7 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c Client, namespace string AutoDiscoveryPeriod: 5 * time.Minute, } - dlq, _ := newDlqRouter(c.(*client), nil, log.DefaultNopLogger()) + dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", log.DefaultNopLogger()) rlq, _ := newRetryRouter(c.(*client), nil, false, log.DefaultNopLogger()) consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq) if err != nil { diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index 2d785b4b23..33382cff99 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -1449,9 +1449,10 @@ func DLQWithProducerOptions(t *testing.T, prodOpt *ProducerOptions) { if prodOpt != nil { dlqPolicy.ProducerOptions = *prodOpt } + sub := "my-sub" consumer, err := client.Subscribe(ConsumerOptions{ Topic: topic, - SubscriptionName: "my-sub", + SubscriptionName: sub, NackRedeliveryDelay: 1 * time.Second, Type: Shared, DLQ: &dlqPolicy, @@ -1506,6 +1507,9 @@ func DLQWithProducerOptions(t *testing.T, prodOpt *ProducerOptions) { expectMsg := fmt.Sprintf("hello-%d", expectedMsgIdx) assert.Equal(t, []byte(expectMsg), msg.Payload()) + // check dql produceName + assert.Equal(t, msg.ProducerName(), fmt.Sprintf("%s-%s-DLQ", topic, sub)) + // check original messageId assert.NotEmpty(t, msg.Properties()[PropertyOriginMessageID]) diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go index 6d30003640..a98e91577e 100644 --- a/pulsar/dlq_router.go +++ b/pulsar/dlq_router.go @@ -19,6 +19,7 @@ package pulsar import ( "context" + "fmt" "time" "github.com/apache/pulsar-client-go/pulsar/internal" @@ -26,19 +27,23 @@ import ( ) type dlqRouter struct { - client Client - producer Producer - policy *DLQPolicy - messageCh chan ConsumerMessage - closeCh chan interface{} - log log.Logger + client Client + producer Producer + policy *DLQPolicy + messageCh chan ConsumerMessage + closeCh chan interface{} + topicName string + subscriptionName string + log log.Logger } -func newDlqRouter(client Client, policy *DLQPolicy, logger log.Logger) (*dlqRouter, error) { +func newDlqRouter(client Client, policy *DLQPolicy, topicName, subscriptionName string, logger log.Logger) (*dlqRouter, error) { r := &dlqRouter{ - client: client, - policy: policy, - log: logger, + client: client, + policy: policy, + topicName: topicName, + subscriptionName: subscriptionName, + log: logger, } if policy != nil { @@ -152,6 +157,9 @@ func (r *dlqRouter) getProducer(schema Schema) Producer { opt := r.policy.ProducerOptions opt.Topic = r.policy.DeadLetterTopic opt.Schema = schema + if opt.Name == "" { + opt.Name = fmt.Sprintf("%s-%s-DLQ", r.topicName, r.subscriptionName) + } // the origin code sets to LZ4 compression with no options // so the new design allows compression type to be overwritten but still set lz4 by default diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go index ffc92dedde..0999e88fee 100644 --- a/pulsar/reader_impl.go +++ b/pulsar/reader_impl.go @@ -127,7 +127,7 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { } // Provide dummy dlq router with not dlq policy - dlq, err := newDlqRouter(client, nil, client.log) + dlq, err := newDlqRouter(client, nil, options.Topic, options.SubscriptionName, client.log) if err != nil { return nil, err } From 21c4f65bfd1ba52d5c17aba68dcdddcd5990e58d Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Tue, 21 Nov 2023 23:37:11 +0800 Subject: [PATCH 2/4] format code --- pulsar/dlq_router.go | 3 ++- pulsar/internal/pulsar_proto/PulsarApi.pb.go | 6 +++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go index a98e91577e..5b9314bddc 100644 --- a/pulsar/dlq_router.go +++ b/pulsar/dlq_router.go @@ -37,7 +37,8 @@ type dlqRouter struct { log log.Logger } -func newDlqRouter(client Client, policy *DLQPolicy, topicName, subscriptionName string, logger log.Logger) (*dlqRouter, error) { +func newDlqRouter(client Client, policy *DLQPolicy, topicName, subscriptionName string, + logger log.Logger) (*dlqRouter, error) { r := &dlqRouter{ client: client, policy: policy, diff --git a/pulsar/internal/pulsar_proto/PulsarApi.pb.go b/pulsar/internal/pulsar_proto/PulsarApi.pb.go index c8e6ad9b91..4ef5fc48bc 100644 --- a/pulsar/internal/pulsar_proto/PulsarApi.pb.go +++ b/pulsar/internal/pulsar_proto/PulsarApi.pb.go @@ -3366,8 +3366,8 @@ func (x *CommandLookupTopicResponse) GetProxyThroughServiceUrl() bool { return Default_CommandLookupTopicResponse_ProxyThroughServiceUrl } -/// Create a new Producer on a topic, assigning the given producer_id, -/// all messages sent with this producer_id will be persisted on the topic +// / Create a new Producer on a topic, assigning the given producer_id, +// / all messages sent with this producer_id will be persisted on the topic type CommandProducer struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -4617,7 +4617,7 @@ func (x *CommandSuccess) GetSchema() *Schema { return nil } -/// Response from CommandProducer +// / Response from CommandProducer type CommandProducerSuccess struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache From c6fcb85beb226f12631553f127a7ab7bab6487dc Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Tue, 21 Nov 2023 23:38:18 +0800 Subject: [PATCH 3/4] Revert "format code" This reverts commit 21c4f65bfd1ba52d5c17aba68dcdddcd5990e58d. --- pulsar/dlq_router.go | 3 +-- pulsar/internal/pulsar_proto/PulsarApi.pb.go | 6 +++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go index 5b9314bddc..a98e91577e 100644 --- a/pulsar/dlq_router.go +++ b/pulsar/dlq_router.go @@ -37,8 +37,7 @@ type dlqRouter struct { log log.Logger } -func newDlqRouter(client Client, policy *DLQPolicy, topicName, subscriptionName string, - logger log.Logger) (*dlqRouter, error) { +func newDlqRouter(client Client, policy *DLQPolicy, topicName, subscriptionName string, logger log.Logger) (*dlqRouter, error) { r := &dlqRouter{ client: client, policy: policy, diff --git a/pulsar/internal/pulsar_proto/PulsarApi.pb.go b/pulsar/internal/pulsar_proto/PulsarApi.pb.go index 4ef5fc48bc..c8e6ad9b91 100644 --- a/pulsar/internal/pulsar_proto/PulsarApi.pb.go +++ b/pulsar/internal/pulsar_proto/PulsarApi.pb.go @@ -3366,8 +3366,8 @@ func (x *CommandLookupTopicResponse) GetProxyThroughServiceUrl() bool { return Default_CommandLookupTopicResponse_ProxyThroughServiceUrl } -// / Create a new Producer on a topic, assigning the given producer_id, -// / all messages sent with this producer_id will be persisted on the topic +/// Create a new Producer on a topic, assigning the given producer_id, +/// all messages sent with this producer_id will be persisted on the topic type CommandProducer struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -4617,7 +4617,7 @@ func (x *CommandSuccess) GetSchema() *Schema { return nil } -// / Response from CommandProducer +/// Response from CommandProducer type CommandProducerSuccess struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache From fe033edad02cc894b760ec469ef0807d06c733f1 Mon Sep 17 00:00:00 2001 From: crossoverJie Date: Tue, 21 Nov 2023 23:42:11 +0800 Subject: [PATCH 4/4] refactor code --- pulsar/dlq_router.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go index a98e91577e..5b9314bddc 100644 --- a/pulsar/dlq_router.go +++ b/pulsar/dlq_router.go @@ -37,7 +37,8 @@ type dlqRouter struct { log log.Logger } -func newDlqRouter(client Client, policy *DLQPolicy, topicName, subscriptionName string, logger log.Logger) (*dlqRouter, error) { +func newDlqRouter(client Client, policy *DLQPolicy, topicName, subscriptionName string, + logger log.Logger) (*dlqRouter, error) { r := &dlqRouter{ client: client, policy: policy,