diff --git a/pulsar/consumer.go b/pulsar/consumer.go index 3161af5143..ee5c6480e5 100644 --- a/pulsar/consumer.go +++ b/pulsar/consumer.go @@ -278,6 +278,11 @@ type Consumer interface { // where more than one consumer are currently connected. UnsubscribeForce() error + // GetLastMessageIDs get all the last message id of the topics the consumer subscribed. + // + // The list of MessageID instances of all the topics that the consumer subscribed + GetLastMessageIDs() ([]MessageID, error) + // Receive a single message. // This calls blocks until a message is available. Receive(context.Context) (Message, error) diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go index b22eeaafce..3a6b9d0138 100644 --- a/pulsar/consumer_impl.go +++ b/pulsar/consumer_impl.go @@ -472,6 +472,18 @@ func (c *consumer) unsubscribe(force bool) error { return nil } +func (c *consumer) GetLastMessageIDs() ([]MessageID, error) { + ids := make([]MessageID, 0) + for _, pc := range c.consumers { + id, err := pc.getLastMessageID() + if err != nil { + return nil, err + } + ids = append(ids, id) + } + return ids, nil +} + func (c *consumer) Receive(ctx context.Context) (message Message, err error) { for { select { diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go index eaf42ad748..224cbb5820 100644 --- a/pulsar/consumer_multitopic.go +++ b/pulsar/consumer_multitopic.go @@ -109,6 +109,18 @@ func (c *multiTopicConsumer) UnsubscribeForce() error { return errs } +func (c *multiTopicConsumer) GetLastMessageIDs() ([]MessageID, error) { + ids := make([]MessageID, 0) + for _, c := range c.consumers { + id, err := c.GetLastMessageIDs() + if err != nil { + return nil, err + } + ids = append(ids, id...) + } + return ids, nil +} + func (c *multiTopicConsumer) Receive(ctx context.Context) (message Message, err error) { for { select { diff --git a/pulsar/consumer_multitopic_test.go b/pulsar/consumer_multitopic_test.go index 58ad09576c..eb0ade529d 100644 --- a/pulsar/consumer_multitopic_test.go +++ b/pulsar/consumer_multitopic_test.go @@ -137,3 +137,52 @@ func TestMultiTopicConsumerForceUnsubscribe(t *testing.T) { err = consumer.UnsubscribeForce() assert.Error(t, err) } + +func TestMultiTopicGetLastMessageIDs(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + topic1 := newTopicName() + topic2 := newTopicName() + topics := []string{topic1, topic2} + partition := 1 + // create consumer + consumer, err := client.Subscribe(ConsumerOptions{ + Topics: topics, + SubscriptionName: "my-sub", + Type: Shared, + }) + assert.Nil(t, err) + defer consumer.Close() + + // produce messages + totalMessage := 20 + for i, topic := range topics { + p, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: true, + }) + if err != nil { + t.Fatal(err) + } + err = genMessages(p, totalMessage, func(idx int) string { + return fmt.Sprintf("topic-%d-hello-%d", i+1, idx) + }) + p.Close() + if err != nil { + assert.Nil(t, err) + } + } + + messageIDs, err := consumer.GetLastMessageIDs() + assert.Nil(t, err) + assert.Equal(t, len(topics), len(messageIDs)) + for _, id := range messageIDs { + assert.Equal(t, int(id.EntryID()), totalMessage/partition-1) + } + +} diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go index 87acfe3ea9..fb8dbb2e1d 100644 --- a/pulsar/consumer_regex.go +++ b/pulsar/consumer_regex.go @@ -148,6 +148,18 @@ func (c *regexConsumer) UnsubscribeForce() error { return errs } +func (c *regexConsumer) GetLastMessageIDs() ([]MessageID, error) { + ids := make([]MessageID, 0) + for _, c := range c.consumers { + id, err := c.GetLastMessageIDs() + if err != nil { + return nil, err + } + ids = append(ids, id...) + } + return ids, nil +} + func (c *regexConsumer) Receive(ctx context.Context) (message Message, err error) { for { select { diff --git a/pulsar/consumer_regex_test.go b/pulsar/consumer_regex_test.go index 6bc3f7006c..149f21c00c 100644 --- a/pulsar/consumer_regex_test.go +++ b/pulsar/consumer_regex_test.go @@ -443,3 +443,58 @@ func cloneConsumers(rc *regexConsumer) map[string]Consumer { } return consumers } + +func TestRegexTopicGetLastMessageIDs(t *testing.T) { + + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + partition := 1 + topic1 := fmt.Sprintf("regex-topic-%v", time.Now().Nanosecond()) + topic2 := fmt.Sprintf("regex-topic-%v", time.Now().Nanosecond()) + err = createPartitionedTopic(topic1, partition) + assert.Nil(t, err) + err = createPartitionedTopic(topic2, partition) + assert.Nil(t, err) + topics := []string{topic1, topic2} + + // create consumer + topicsPattern := "persistent://public/default/regex-topic-.*" + consumer, err := client.Subscribe(ConsumerOptions{ + TopicsPattern: topicsPattern, + SubscriptionName: "my-sub", + Type: Shared, + }) + assert.Nil(t, err) + defer consumer.Close() + + // produce messages + totalMessage := 20 + for i, topic := range topics { + p, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: true, + }) + if err != nil { + t.Fatal(err) + } + err = genMessages(p, totalMessage, func(idx int) string { + return fmt.Sprintf("topic-%d-hello-%d", i+1, idx) + }) + p.Close() + if err != nil { + assert.Nil(t, err) + } + } + + messageIDs, err := consumer.GetLastMessageIDs() + assert.Nil(t, err) + assert.Equal(t, len(topics), len(messageIDs)) + for _, id := range messageIDs { + assert.Equal(t, int(id.EntryID()), totalMessage/partition-1) + } +} diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go index b3b797f1b6..933343361f 100644 --- a/pulsar/consumer_test.go +++ b/pulsar/consumer_test.go @@ -4478,3 +4478,97 @@ func TestConsumerForceUnSubscribe(t *testing.T) { assert.Error(t, err) } + +func TestConsumerGetLastMessageIDs(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + partition := 1 + topic := "my-topic" + // create consumer + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "my-sub", + Type: Shared, + }) + assert.Nil(t, err) + defer consumer.Close() + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: true, + }) + assert.Nil(t, err) + defer producer.Close() + + ctx := context.Background() + // send messages + totalMessage := 10 + for i := 0; i < totalMessage; i++ { + if _, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }); err != nil { + assert.Nil(t, err) + } + } + + messageIDs, err := consumer.GetLastMessageIDs() + assert.Nil(t, err) + assert.Equal(t, partition, len(messageIDs)) + +} + +func TestPartitionConsumerGetLastMessageIDs(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: lookupURL, + }) + + assert.Nil(t, err) + defer client.Close() + + topic := newTopicName() + partition := 3 + err = createPartitionedTopic(topic, partition) + assert.Nil(t, err) + + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: true, + }) + assert.Nil(t, err) + defer producer.Close() + + // create consumer + consumer, err := client.Subscribe(ConsumerOptions{ + Topic: topic, + SubscriptionName: "my-sub", + Type: Shared, + }) + assert.Nil(t, err) + defer consumer.Close() + + ctx := context.Background() + totalMessage := 30 + // send messages + for i := 0; i < totalMessage; i++ { + if _, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }); err != nil { + assert.Nil(t, err) + } + } + + messageIDs, err := consumer.GetLastMessageIDs() + assert.Nil(t, err) + assert.Equal(t, partition, len(messageIDs)) + for _, id := range messageIDs { + assert.Equal(t, int(id.EntryID()), totalMessage/partition-1) + } + +} diff --git a/pulsar/internal/pulsartracing/consumer_interceptor_test.go b/pulsar/internal/pulsartracing/consumer_interceptor_test.go index 92ce2311c1..b8375343d1 100644 --- a/pulsar/internal/pulsartracing/consumer_interceptor_test.go +++ b/pulsar/internal/pulsartracing/consumer_interceptor_test.go @@ -110,3 +110,8 @@ func (c *mockConsumer) SeekByTime(time time.Time) error { func (c *mockConsumer) Name() string { return "" } + +func (c *mockConsumer) GetLastMessageIDs() ([]pulsar.MessageID, error) { + ids := make([]pulsar.MessageID, 0) + return ids, nil +}