Skip to content

Commit

Permalink
[Improve] Set dlq producerName (#1137)
Browse files Browse the repository at this point in the history
### Motivation

To keep consistent with the Java client.

Releted PR: apache/pulsar#21589

*Explain here the context, and why you're making that change. What is the problem you're trying to solve.*

### Modifications

Set DLQ producerName `%s-%s-DLQ`
  • Loading branch information
crossoverJie authored Nov 24, 2023
1 parent d457442 commit 067dca1
Show file tree
Hide file tree
Showing 5 changed files with 28 additions and 15 deletions.
2 changes: 1 addition & 1 deletion pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
}
}

dlq, err := newDlqRouter(client, options.DLQ, client.log)
dlq, err := newDlqRouter(client, options.DLQ, options.Topic, options.SubscriptionName, client.log)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions pulsar/consumer_regex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func runRegexConsumerDiscoverPatternAll(t *testing.T, c Client, namespace string
AutoDiscoveryPeriod: 5 * time.Minute,
}

dlq, _ := newDlqRouter(c.(*client), nil, log.DefaultNopLogger())
dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", log.DefaultNopLogger())
rlq, _ := newRetryRouter(c.(*client), nil, false, log.DefaultNopLogger())
consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq)
if err != nil {
Expand Down Expand Up @@ -192,7 +192,7 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c Client, namespace string
AutoDiscoveryPeriod: 5 * time.Minute,
}

dlq, _ := newDlqRouter(c.(*client), nil, log.DefaultNopLogger())
dlq, _ := newDlqRouter(c.(*client), nil, tn.Topic, "regex-sub", log.DefaultNopLogger())
rlq, _ := newRetryRouter(c.(*client), nil, false, log.DefaultNopLogger())
consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq)
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1449,9 +1449,10 @@ func DLQWithProducerOptions(t *testing.T, prodOpt *ProducerOptions) {
if prodOpt != nil {
dlqPolicy.ProducerOptions = *prodOpt
}
sub := "my-sub"
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
SubscriptionName: sub,
NackRedeliveryDelay: 1 * time.Second,
Type: Shared,
DLQ: &dlqPolicy,
Expand Down Expand Up @@ -1506,6 +1507,9 @@ func DLQWithProducerOptions(t *testing.T, prodOpt *ProducerOptions) {
expectMsg := fmt.Sprintf("hello-%d", expectedMsgIdx)
assert.Equal(t, []byte(expectMsg), msg.Payload())

// check dql produceName
assert.Equal(t, msg.ProducerName(), fmt.Sprintf("%s-%s-DLQ", topic, sub))

// check original messageId
assert.NotEmpty(t, msg.Properties()[PropertyOriginMessageID])

Expand Down
29 changes: 19 additions & 10 deletions pulsar/dlq_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,26 +19,32 @@ package pulsar

import (
"context"
"fmt"
"time"

"github.com/apache/pulsar-client-go/pulsar/internal"
"github.com/apache/pulsar-client-go/pulsar/log"
)

type dlqRouter struct {
client Client
producer Producer
policy *DLQPolicy
messageCh chan ConsumerMessage
closeCh chan interface{}
log log.Logger
client Client
producer Producer
policy *DLQPolicy
messageCh chan ConsumerMessage
closeCh chan interface{}
topicName string
subscriptionName string
log log.Logger
}

func newDlqRouter(client Client, policy *DLQPolicy, logger log.Logger) (*dlqRouter, error) {
func newDlqRouter(client Client, policy *DLQPolicy, topicName, subscriptionName string,
logger log.Logger) (*dlqRouter, error) {
r := &dlqRouter{
client: client,
policy: policy,
log: logger,
client: client,
policy: policy,
topicName: topicName,
subscriptionName: subscriptionName,
log: logger,
}

if policy != nil {
Expand Down Expand Up @@ -152,6 +158,9 @@ func (r *dlqRouter) getProducer(schema Schema) Producer {
opt := r.policy.ProducerOptions
opt.Topic = r.policy.DeadLetterTopic
opt.Schema = schema
if opt.Name == "" {
opt.Name = fmt.Sprintf("%s-%s-DLQ", r.topicName, r.subscriptionName)
}

// the origin code sets to LZ4 compression with no options
// so the new design allows compression type to be overwritten but still set lz4 by default
Expand Down
2 changes: 1 addition & 1 deletion pulsar/reader_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func newReader(client *client, options ReaderOptions) (Reader, error) {
}

// Provide dummy dlq router with not dlq policy
dlq, err := newDlqRouter(client, nil, client.log)
dlq, err := newDlqRouter(client, nil, options.Topic, options.SubscriptionName, client.log)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 067dca1

Please sign in to comment.