diff --git a/pulsar/reader_impl.go b/pulsar/reader_impl.go index 6328891d15..a514370667 100644 --- a/pulsar/reader_impl.go +++ b/pulsar/reader_impl.go @@ -114,7 +114,7 @@ func newReader(client *client, options ReaderOptions) (Reader, error) { decryption: options.Decryption, schema: options.Schema, backoffPolicy: options.BackoffPolicy, - interceptors: transformReaderInterceptors(options.ReaderInterceptors), + interceptors: transformReaderInterceptors(options.ReaderInterceptors), maxPendingChunkedMessage: options.MaxPendingChunkedMessage, expireTimeOfIncompleteChunk: options.ExpireTimeOfIncompleteChunk, autoAckIncompleteChunk: options.AutoAckIncompleteChunk, diff --git a/pulsar/reader_test.go b/pulsar/reader_test.go index ec10f8f162..38524ccdd5 100644 --- a/pulsar/reader_test.go +++ b/pulsar/reader_test.go @@ -20,6 +20,7 @@ package pulsar import ( "context" "fmt" + "sync/atomic" "testing" "time" @@ -943,3 +944,119 @@ func TestReaderGetLastMessageID(t *testing.T) { assert.Equal(t, lastMsgID.LedgerID(), getLastMessageID.LedgerID()) assert.Equal(t, lastMsgID.EntryID(), getLastMessageID.EntryID()) } + +var _ ReaderInterceptor = (*CounterReaderInterceptor)(nil) + +type CounterReaderInterceptor struct { + counter atomic.Int32 +} + +func (c *CounterReaderInterceptor) BeforeRead(_ ConsumerMessage) { + c.counter.Add(1) +} + +func TestSingleReaderInterceptor(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.Nil(t, err) + defer client.Close() + + topic := newTopicName() + ctx := context.Background() + schema := NewStringSchema(nil) + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: true, + Schema: schema, + }) + assert.Nil(t, err) + defer producer.Close() + + // send 10 messages + for i := 0; i < 10; i++ { + msgID, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }) + assert.NoError(t, err) + assert.NotNil(t, msgID) + } + + interceptor := &CounterReaderInterceptor{} + r, err := client.CreateReader(ReaderOptions{ + Topic: topic, + ReaderInterceptors: []ReaderInterceptor{interceptor}, + StartMessageID: EarliestMessageID(), + }) + + assert.NoError(t, err) + assert.NotNil(t, r) + defer r.Close() + + for { + if r.HasNext() { + _, err := r.Next(ctx) + assert.NoError(t, err) + } else { + break + } + } + + assert.Equal(t, interceptor.counter.Load(), int32(10)) +} + +func TestMultiReaderInterceptors(t *testing.T) { + client, err := NewClient(ClientOptions{ + URL: serviceURL, + }) + assert.Nil(t, err) + defer client.Close() + + topic := newTopicName() + ctx := context.Background() + schema := NewStringSchema(nil) + // create producer + producer, err := client.CreateProducer(ProducerOptions{ + Topic: topic, + DisableBatching: true, + Schema: schema, + }) + assert.Nil(t, err) + defer producer.Close() + + // send 10 messages + for i := 0; i < 10; i++ { + msgID, err := producer.Send(ctx, &ProducerMessage{ + Payload: []byte(fmt.Sprintf("hello-%d", i)), + }) + assert.NoError(t, err) + assert.NotNil(t, msgID) + } + + interceptor0 := &CounterReaderInterceptor{} + interceptor1 := &CounterReaderInterceptor{} + interceptor2 := &CounterReaderInterceptor{} + r, err := client.CreateReader(ReaderOptions{ + Topic: topic, + ReaderInterceptors: []ReaderInterceptor{interceptor0, interceptor1, interceptor2}, + StartMessageID: EarliestMessageID(), + }) + + assert.NoError(t, err) + assert.NotNil(t, r) + defer r.Close() + + for { + if r.HasNext() { + _, err := r.Next(ctx) + assert.NoError(t, err) + } else { + break + } + } + + assert.Equal(t, interceptor0.counter.Load(), int32(10)) + assert.Equal(t, interceptor1.counter.Load(), int32(10)) + assert.Equal(t, interceptor2.counter.Load(), int32(10)) +}