Skip to content

Commit

Permalink
Add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dao-jun committed Oct 21, 2023
1 parent 2a39d20 commit 7ecf6d3
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 1 deletion.
2 changes: 1 addition & 1 deletion pulsar/reader_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func newReader(client *client, options ReaderOptions) (Reader, error) {
decryption: options.Decryption,
schema: options.Schema,
backoffPolicy: options.BackoffPolicy,
interceptors: transformReaderInterceptors(options.ReaderInterceptors),
interceptors: transformReaderInterceptors(options.ReaderInterceptors),
maxPendingChunkedMessage: options.MaxPendingChunkedMessage,
expireTimeOfIncompleteChunk: options.ExpireTimeOfIncompleteChunk,
autoAckIncompleteChunk: options.AutoAckIncompleteChunk,
Expand Down
117 changes: 117 additions & 0 deletions pulsar/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package pulsar
import (
"context"
"fmt"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -943,3 +944,119 @@ func TestReaderGetLastMessageID(t *testing.T) {
assert.Equal(t, lastMsgID.LedgerID(), getLastMessageID.LedgerID())
assert.Equal(t, lastMsgID.EntryID(), getLastMessageID.EntryID())
}

var _ ReaderInterceptor = (*CounterReaderInterceptor)(nil)

type CounterReaderInterceptor struct {
counter atomic.Int32

Check failure on line 951 in pulsar/reader_test.go

View workflow job for this annotation

GitHub Actions / integration-tests (1.16)

undefined: "sync/atomic".Int32

Check failure on line 951 in pulsar/reader_test.go

View workflow job for this annotation

GitHub Actions / integration-tests (1.18)

undefined: atomic.Int32
}

func (c *CounterReaderInterceptor) BeforeRead(_ ConsumerMessage) {
c.counter.Add(1)
}

func TestSingleReaderInterceptor(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: serviceURL,
})
assert.Nil(t, err)
defer client.Close()

topic := newTopicName()
ctx := context.Background()
schema := NewStringSchema(nil)
// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: true,
Schema: schema,
})
assert.Nil(t, err)
defer producer.Close()

// send 10 messages
for i := 0; i < 10; i++ {
msgID, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
})
assert.NoError(t, err)
assert.NotNil(t, msgID)
}

interceptor := &CounterReaderInterceptor{}
r, err := client.CreateReader(ReaderOptions{
Topic: topic,
ReaderInterceptors: []ReaderInterceptor{interceptor},
StartMessageID: EarliestMessageID(),
})

assert.NoError(t, err)
assert.NotNil(t, r)
defer r.Close()

for {
if r.HasNext() {
_, err := r.Next(ctx)
assert.NoError(t, err)
} else {
break
}
}

assert.Equal(t, interceptor.counter.Load(), int32(10))
}

func TestMultiReaderInterceptors(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: serviceURL,
})
assert.Nil(t, err)
defer client.Close()

topic := newTopicName()
ctx := context.Background()
schema := NewStringSchema(nil)
// create producer
producer, err := client.CreateProducer(ProducerOptions{
Topic: topic,
DisableBatching: true,
Schema: schema,
})
assert.Nil(t, err)
defer producer.Close()

// send 10 messages
for i := 0; i < 10; i++ {
msgID, err := producer.Send(ctx, &ProducerMessage{
Payload: []byte(fmt.Sprintf("hello-%d", i)),
})
assert.NoError(t, err)
assert.NotNil(t, msgID)
}

interceptor0 := &CounterReaderInterceptor{}
interceptor1 := &CounterReaderInterceptor{}
interceptor2 := &CounterReaderInterceptor{}
r, err := client.CreateReader(ReaderOptions{
Topic: topic,
ReaderInterceptors: []ReaderInterceptor{interceptor0, interceptor1, interceptor2},
StartMessageID: EarliestMessageID(),
})

assert.NoError(t, err)
assert.NotNil(t, r)
defer r.Close()

for {
if r.HasNext() {
_, err := r.Next(ctx)
assert.NoError(t, err)
} else {
break
}
}

assert.Equal(t, interceptor0.counter.Load(), int32(10))
assert.Equal(t, interceptor1.counter.Load(), int32(10))
assert.Equal(t, interceptor2.counter.Load(), int32(10))
}

0 comments on commit 7ecf6d3

Please sign in to comment.