Skip to content

Commit

Permalink
Consumer support UnsubscribeForce api
Browse files Browse the repository at this point in the history
  • Loading branch information
crossoverJie committed May 22, 2024
1 parent b0111a2 commit 58872ba
Show file tree
Hide file tree
Showing 8 changed files with 244 additions and 0 deletions.
5 changes: 5 additions & 0 deletions pulsar/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,11 @@ type Consumer interface {
// where more than one consumer are currently connected.
UnsubscribeForce() error

// GetLastMessageIDs get all the last message id of the topics the consumer subscribed.
//
// The list of MessageID instances of all the topics that the consumer subscribed
GetLastMessageIDs() ([]MessageID, error)

// Receive a single message.
// This calls blocks until a message is available.
Receive(context.Context) (Message, error)
Expand Down
12 changes: 12 additions & 0 deletions pulsar/consumer_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,6 +472,18 @@ func (c *consumer) unsubscribe(force bool) error {
return nil
}

func (c *consumer) GetLastMessageIDs() ([]MessageID, error) {
ids := make([]MessageID, 0)
for _, pc := range c.consumers {
id, err := pc.getLastMessageID()
if err != nil {
return nil, err
}
ids = append(ids, id)
}
return ids, nil
}

func (c *consumer) Receive(ctx context.Context) (message Message, err error) {
for {
select {
Expand Down
12 changes: 12 additions & 0 deletions pulsar/consumer_multitopic.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,18 @@ func (c *multiTopicConsumer) UnsubscribeForce() error {
return errs
}

func (c *multiTopicConsumer) GetLastMessageIDs() ([]MessageID, error) {
ids := make([]MessageID, 0)
for _, c := range c.consumers {
id, err := c.GetLastMessageIDs()
if err != nil {
return nil, err
}
ids = append(ids, id...)
}
return ids, nil
}

func (c *multiTopicConsumer) Receive(ctx context.Context) (message Message, err error) {
for {
select {
Expand Down
49 changes: 49 additions & 0 deletions pulsar/consumer_multitopic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,3 +137,52 @@ func TestMultiTopicConsumerForceUnsubscribe(t *testing.T) {
err = consumer.UnsubscribeForce()
assert.Error(t, err)
}

func TestMultiTopicGetLastMessageIDs(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})

assert.Nil(t, err)
defer client.Close()

topic1 := newTopicName()
topic2 := newTopicName()
topics := []string{topic1, topic2}
partition := 1
// create consumer
consumer, err := client.Subscribe(ConsumerOptions{
Topics: topics,
SubscriptionName: "my-sub",
Type: Shared,
})
assert.Nil(t, err)
defer consumer.Close()

// produce messages
totalMessage := 20
for i, topic := range topics {
p, err := client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: true,
})
if err != nil {
t.Fatal(err)
}
err = genMessages(p, totalMessage, func(idx int) string {
return fmt.Sprintf("topic-%d-hello-%d", i+1, idx)
})
p.Close()
if err != nil {
assert.Nil(t, err)
}
}

messageIDs, err := consumer.GetLastMessageIDs()
assert.Nil(t, err)
assert.Equal(t, len(topics), len(messageIDs))
for _, id := range messageIDs {
assert.Equal(t, int(id.EntryID()), totalMessage/partition-1)
}

}
12 changes: 12 additions & 0 deletions pulsar/consumer_regex.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,18 @@ func (c *regexConsumer) UnsubscribeForce() error {
return errs
}

func (c *regexConsumer) GetLastMessageIDs() ([]MessageID, error) {
ids := make([]MessageID, 0)
for _, c := range c.consumers {
id, err := c.GetLastMessageIDs()
if err != nil {
return nil, err
}
ids = append(ids, id...)
}
return ids, nil
}

func (c *regexConsumer) Receive(ctx context.Context) (message Message, err error) {
for {
select {
Expand Down
55 changes: 55 additions & 0 deletions pulsar/consumer_regex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,3 +443,58 @@ func cloneConsumers(rc *regexConsumer) map[string]Consumer {
}
return consumers
}

func TestRegexTopicGetLastMessageIDs(t *testing.T) {

client, err := NewClient(ClientOptions{
URL: lookupURL,
})

assert.Nil(t, err)
defer client.Close()

partition := 1
topic1 := fmt.Sprintf("regex-topic-%v", time.Now().Nanosecond())
topic2 := fmt.Sprintf("regex-topic-%v", time.Now().Nanosecond())
err = createPartitionedTopic(topic1, partition)
assert.Nil(t, err)
err = createPartitionedTopic(topic2, partition)
assert.Nil(t, err)
topics := []string{topic1, topic2}

// create consumer
topicsPattern := "persistent://public/default/regex-topic-.*"
consumer, err := client.Subscribe(ConsumerOptions{
TopicsPattern: topicsPattern,
SubscriptionName: "my-sub",
Type: Shared,
})
assert.Nil(t, err)
defer consumer.Close()

// produce messages
totalMessage := 20
for i, topic := range topics {
p, err := client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: true,
})
if err != nil {
t.Fatal(err)
}
err = genMessages(p, totalMessage, func(idx int) string {
return fmt.Sprintf("topic-%d-hello-%d", i+1, idx)
})
p.Close()
if err != nil {
assert.Nil(t, err)
}
}

messageIDs, err := consumer.GetLastMessageIDs()
assert.Nil(t, err)
assert.Equal(t, len(topics), len(messageIDs))
for _, id := range messageIDs {
assert.Equal(t, int(id.EntryID()), totalMessage/partition-1)
}
}
94 changes: 94 additions & 0 deletions pulsar/consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4478,3 +4478,97 @@ func TestConsumerForceUnSubscribe(t *testing.T) {
assert.Error(t, err)

}

func TestConsumerGetLastMessageIDs(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})

assert.Nil(t, err)
defer client.Close()

partition := 1
topic := "my-topic"
// create consumer
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
Type: Shared,
})
assert.Nil(t, err)
defer consumer.Close()

// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: true,
})
assert.Nil(t, err)
defer producer.Close()

ctx := context.Background()
// send messages
totalMessage := 10
for i := 0; i < totalMessage; i++ {
if _, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
}); err != nil {
assert.Nil(t, err)
}
}

messageIDs, err := consumer.GetLastMessageIDs()
assert.Nil(t, err)
assert.Equal(t, partition, len(messageIDs))

}

func TestPartitionConsumerGetLastMessageIDs(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
})

assert.Nil(t, err)
defer client.Close()

topic := newTopicName()
partition := 3
err = createPartitionedTopic(topic, partition)
assert.Nil(t, err)

// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: true,
})
assert.Nil(t, err)
defer producer.Close()

// create consumer
consumer, err := client.Subscribe(ConsumerOptions{
Topic: topic,
SubscriptionName: "my-sub",
Type: Shared,
})
assert.Nil(t, err)
defer consumer.Close()

ctx := context.Background()
totalMessage := 30
// send messages
for i := 0; i < totalMessage; i++ {
if _, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
}); err != nil {
assert.Nil(t, err)
}
}

messageIDs, err := consumer.GetLastMessageIDs()
assert.Nil(t, err)
assert.Equal(t, partition, len(messageIDs))
for _, id := range messageIDs {
assert.Equal(t, int(id.EntryID()), totalMessage/partition-1)
}

}
5 changes: 5 additions & 0 deletions pulsar/internal/pulsartracing/consumer_interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,8 @@ func (c *mockConsumer) SeekByTime(time time.Time) error {
func (c *mockConsumer) Name() string {
return ""
}

func (c *mockConsumer) GetLastMessageIDs() ([]pulsar.MessageID, error) {
ids := make([]pulsar.MessageID, 0)
return ids, nil
}

0 comments on commit 58872ba

Please sign in to comment.