From b3f7c78f38a9e277c0735a82a79db3863849e6e0 Mon Sep 17 00:00:00 2001 From: goncalo-rodrigues Date: Tue, 5 Dec 2023 10:31:06 +0100 Subject: [PATCH 1/2] fix: channel deadlock in regexp consumer --- pulsar/consumer_regex.go | 40 +++++++++++++--------- pulsar/consumer_regex_test.go | 63 +++++++++++++++++++++++++++++++++++ 2 files changed, 88 insertions(+), 15 deletions(-) diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go index 79e2293b49..0e011570cd 100644 --- a/pulsar/consumer_regex.go +++ b/pulsar/consumer_regex.go @@ -76,8 +76,8 @@ func newRegexConsumer(c *client, opts ConsumerOptions, tn *internal.TopicName, p pattern: pattern, consumers: make(map[string]Consumer), - subscribeCh: make(chan []string, 1), - unsubscribeCh: make(chan []string, 1), + subscribeCh: make(chan []string), + unsubscribeCh: make(chan []string), closeCh: make(chan struct{}), @@ -163,12 +163,11 @@ func (c *regexConsumer) Ack(msg Message) error { return c.AckID(msg.ID()) } -func (c *regexConsumer) ReconsumeLater(msg Message, delay time.Duration) { +func (c *regexConsumer) ReconsumeLater(_ Message, _ time.Duration) { c.log.Warnf("regexp consumer not support ReconsumeLater yet.") } -func (c *regexConsumer) ReconsumeLaterWithCustomProperties(msg Message, customProperties map[string]string, - delay time.Duration) { +func (c *regexConsumer) ReconsumeLaterWithCustomProperties(_ Message, _ map[string]string, _ time.Duration) { c.log.Warnf("regexp consumer not support ReconsumeLaterWithCustomProperties yet.") } @@ -297,11 +296,11 @@ func (c *regexConsumer) Close() { }) } -func (c *regexConsumer) Seek(msgID MessageID) error { +func (c *regexConsumer) Seek(_ MessageID) error { return newError(SeekFailed, "seek command not allowed for regex consumer") } -func (c *regexConsumer) SeekByTime(time time.Time) error { +func (c *regexConsumer) SeekByTime(_ time.Time) error { return newError(SeekFailed, "seek command not allowed for regex consumer") } @@ -320,6 +319,25 @@ func (c *regexConsumer) closed() bool { } func (c *regexConsumer) monitor() { + defer close(c.subscribeCh) + defer close(c.unsubscribeCh) + + go func() { + for topics := range c.subscribeCh { + if len(topics) > 0 && !c.closed() { + c.subscribe(topics, c.dlq, c.rlq) + } + } + }() + + go func() { + for topics := range c.unsubscribeCh { + if len(topics) > 0 && !c.closed() { + c.unsubscribe(topics) + } + } + }() + for { select { case <-c.closeCh: @@ -329,14 +347,6 @@ func (c *regexConsumer) monitor() { if !c.closed() { c.discover() } - case topics := <-c.subscribeCh: - if len(topics) > 0 && !c.closed() { - c.subscribe(topics, c.dlq, c.rlq) - } - case topics := <-c.unsubscribeCh: - if len(topics) > 0 && !c.closed() { - c.unsubscribe(topics) - } } } } diff --git a/pulsar/consumer_regex_test.go b/pulsar/consumer_regex_test.go index de9461341f..3e5f1d61db 100644 --- a/pulsar/consumer_regex_test.go +++ b/pulsar/consumer_regex_test.go @@ -241,6 +241,7 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c Client, namespace string func TestRegexConsumer(t *testing.T) { t.Run("MatchOneTopic", runWithClientNamespace(runRegexConsumerMatchOneTopic)) t.Run("AddTopic", runWithClientNamespace(runRegexConsumerAddMatchingTopic)) + t.Run("AutoDiscoverTopics", runWithClientNamespace(runRegexConsumerAutoDiscoverTopics)) } func runRegexConsumerMatchOneTopic(t *testing.T, c Client, namespace string) { @@ -346,6 +347,68 @@ func runRegexConsumerAddMatchingTopic(t *testing.T, c Client, namespace string) } } +func runRegexConsumerAutoDiscoverTopics(t *testing.T, c Client, namespace string) { + topicsPattern := fmt.Sprintf("persistent://%s/foo.*", namespace) + opts := ConsumerOptions{ + TopicsPattern: topicsPattern, + SubscriptionName: "regex-sub", + // this is purposefully short to test parallelism between discover and subscribe calls + AutoDiscoveryPeriod: 1 * time.Nanosecond, + } + consumer, err := c.Subscribe(opts) + if err != nil { + t.Fatal(err) + } + defer consumer.Close() + + topicInRegex1 := namespace + "/foo-topic-1" + p1, err := c.CreateProducer(ProducerOptions{ + Topic: topicInRegex1, + DisableBatching: true, + }) + if err != nil { + t.Fatal(err) + } + defer p1.Close() + + topicInRegex2 := namespace + "/foo-topic-2" + p2, err := c.CreateProducer(ProducerOptions{ + Topic: topicInRegex2, + DisableBatching: true, + }) + if err != nil { + t.Fatal(err) + } + defer p2.Close() + + time.Sleep(100 * time.Millisecond) + + err = genMessages(p1, 5, func(idx int) string { + return fmt.Sprintf("foo-message-%d", idx) + }) + if err != nil { + t.Fatal(err) + } + + err = genMessages(p2, 5, func(idx int) string { + return fmt.Sprintf("foo-message-%d", idx) + }) + if err != nil { + t.Fatal(err) + } + + ctx := context.Background() + for i := 0; i < 10; i++ { + m, err := consumer.Receive(ctx) + if err != nil { + t.Errorf("failed to receive message error: %+v", err) + } else { + assert.Truef(t, strings.HasPrefix(string(m.Payload()), "foo-"), + "message does not start with foo: %s", string(m.Payload())) + } + } +} + func genMessages(p Producer, num int, msgFn func(idx int) string) error { ctx := context.Background() for i := 0; i < num; i++ { From 317330189457a6ed682b529a49e3647a490a2c7d Mon Sep 17 00:00:00 2001 From: goncalo-rodrigues Date: Wed, 6 Dec 2023 15:12:56 +0100 Subject: [PATCH 2/2] remove subscribe channels --- pulsar/consumer_regex.go | 33 +++++++-------------------------- 1 file changed, 7 insertions(+), 26 deletions(-) diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go index 0e011570cd..d36694ef90 100644 --- a/pulsar/consumer_regex.go +++ b/pulsar/consumer_regex.go @@ -50,8 +50,6 @@ type regexConsumer struct { consumersLock sync.Mutex consumers map[string]Consumer - subscribeCh chan []string - unsubscribeCh chan []string closeOnce sync.Once closeCh chan struct{} @@ -75,9 +73,7 @@ func newRegexConsumer(c *client, opts ConsumerOptions, tn *internal.TopicName, p namespace: tn.Namespace, pattern: pattern, - consumers: make(map[string]Consumer), - subscribeCh: make(chan []string), - unsubscribeCh: make(chan []string), + consumers: make(map[string]Consumer), closeCh: make(chan struct{}), @@ -319,25 +315,6 @@ func (c *regexConsumer) closed() bool { } func (c *regexConsumer) monitor() { - defer close(c.subscribeCh) - defer close(c.unsubscribeCh) - - go func() { - for topics := range c.subscribeCh { - if len(topics) > 0 && !c.closed() { - c.subscribe(topics, c.dlq, c.rlq) - } - } - }() - - go func() { - for topics := range c.unsubscribeCh { - if len(topics) > 0 && !c.closed() { - c.unsubscribe(topics) - } - } - }() - for { select { case <-c.closeCh: @@ -368,8 +345,12 @@ func (c *regexConsumer) discover() { }). Debug("discover topics") - c.unsubscribeCh <- staleTopics - c.subscribeCh <- newTopics + if len(staleTopics) > 0 { + c.unsubscribe(staleTopics) + } + if len(newTopics) > 0 { + c.subscribe(newTopics, c.dlq, c.rlq) + } } func (c *regexConsumer) knownTopics() []string {