Skip to content

Commit

Permalink
Suite tests
Browse files Browse the repository at this point in the history
Signed-off-by: Gabriele Santomaggio <[email protected]>
  • Loading branch information
Gsantomaggio committed Apr 13, 2023
1 parent 6d16895 commit 71c4d5d
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 4 deletions.
13 changes: 13 additions & 0 deletions internal/query_offset.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,19 @@ func (c *QueryOffsetResponse) Offset() uint64 {
return c.offset
}

func (c *QueryOffsetResponse) MarshalBinary() ([]byte, error) {
var buff bytes.Buffer
wr := bufio.NewWriter(&buff)
_, err := writeMany(wr, c.correlationId, c.responseCode, c.offset)
if err != nil {
return nil, err
}
if err = wr.Flush(); err != nil {
return nil, err
}
return buff.Bytes(), nil
}

func NewQueryOffsetResponseWith(correlationId uint32, responseCode uint16, offset uint64) *QueryOffsetResponse {
return &QueryOffsetResponse{correlationId: correlationId, responseCode: responseCode, offset: offset}
}
Expand Down
8 changes: 8 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,14 @@ func main() {
}()

err = streamClient.Subscribe(ctx, stream, constants.OffsetTypeFirst, 1, 10, map[string]string{"name": "my_consumer"}, 10)

offset, err := streamClient.QueryOffset(ctx, "my_consumer", stream)
if err != nil {
log.Error(err, "error querying offset")
} else {
log.Info("offset", "offset", offset)
}

fmt.Println("Press any key to stop ")
reader := bufio.NewReader(os.Stdin)
_, _ = reader.ReadString('\n')
Expand Down
2 changes: 1 addition & 1 deletion pkg/constants/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var ResponseCodeToError = map[uint16]error{
ResponseCodeAccessRefused: errors.New("access refused"),
ResponseCodePreconditionFailed: errors.New("precondition failed"),
ResponseCodePublisherDoesNotExist: errors.New("publisher does not exist"),
ResponseCodeNoOffset: errors.New("no offset"),
ResponseCodeNoOffset: errors.New("no offset found"),
}

// Stream protocol response codes
Expand Down
6 changes: 3 additions & 3 deletions pkg/raw/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1021,7 +1021,7 @@ func (tc *Client) NotifyCreditError(notification chan *CreditError) <-chan *Cred

// QueryOffset retrieves the last consumer offset stored for a given consumer Reference and stream.
// Useful for as consumer wants to know the last stored offset.
// NoOffsetFound is returned if no offset is stored for the given consumer Reference and stream.
// "no offset found" error is returned if no offset is stored for the given consumer Reference and stream.
// Zero (0) is a valid offset.
func (tc *Client) QueryOffset(ctx context.Context, reference string, stream string) (uint64, error) {
if ctx == nil {
Expand All @@ -1031,11 +1031,11 @@ func (tc *Client) QueryOffset(ctx context.Context, reference string, stream stri
logger.V(debugLevel).Info("starting query offset", "reference", reference, "stream", stream)
response, err := tc.syncRequest(ctx, internal.NewQueryOffsetRequest(reference, stream))
if err != nil {
logger.Error(err, "error sending sync request for query offset")
logger.Error(err, "error sending sync request for query offset", "reference", reference, "stream", stream)
return 0, err
}
var offsetResponse *internal.QueryOffsetResponse
if reflect.TypeOf(response) == reflect.TypeOf(offsetResponse) {
if reflect.TypeOf(response) != reflect.TypeOf(offsetResponse) {
return 0, errors.New("response is not of type *internal.QueryOffsetResponse")
}
offsetResponse = response.(*internal.QueryOffsetResponse)
Expand Down
12 changes: 12 additions & 0 deletions pkg/raw/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,18 @@ var _ = Describe("Client", func() {
Expect(streamClient.ExchangeCommandVersions(ctx)).To(Succeed())
}, SpecTimeout(time.Second*3))

It("query offset", func(ctx SpecContext) {
Expect(fakeClientConn.SetDeadline(time.Now().Add(time.Second))).To(Succeed())
streamClient := raw.NewClient(fakeClientConn, conf)
go streamClient.(*raw.Client).StartFrameListener(ctx)

go fakeRabbitMQ.fakeRabbitMQQueryOffset(ctx, 123)

offset, err := streamClient.QueryOffset(ctx, "reference", "stream")
Expect(err).To(Succeed())
Expect(offset).To(BeNumerically("==", 123))
}, SpecTimeout(time.Second*3))

It("cancels requests after a timeout", func(ctx SpecContext) {
// This test does not start a fake to mimic rabbitmq responses. By not starting a
// fake rabbitmq, we simulate "rabbit not responding". The expectation is to
Expand Down
37 changes: 37 additions & 0 deletions pkg/raw/stream_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,43 @@ func (rmq *fakeRabbitMQServer) fakeRabbitMQExchangeCommandVersions(ctx context.C
expectOffset1(serverRW.Flush()).To(Succeed())
}

func (rmq *fakeRabbitMQServer) fakeRabbitMQQueryOffset(ctx context.Context, offset uint64) {
defer GinkgoRecover()

expectOffset1(rmq.connection.SetDeadline(time.Now().Add(time.Second))).
To(Succeed())

serverRW := bufio.NewReadWriter(bufio.NewReader(rmq.connection), bufio.NewWriter(rmq.connection))

header := &internal.Header{}
expectOffset1(header.Read(serverRW.Reader)).To(Succeed())
expectOffset1(header.Command()).To(BeNumerically("==", 0x000b))
expectOffset1(header.Version()).To(BeNumerically("==", 1))

buff := make([]byte, header.Length()-4)
expectOffset1(io.ReadFull(serverRW, buff)).To(BeNumerically("==", header.Length()-4))

body := &internal.QueryOffsetRequest{}
expectOffset1(body.UnmarshalBinary(buff)).To(Succeed())
Expect(body.Stream()).To(Equal("stream"))
Expect(body.ConsumerReference()).To(Equal("reference"))

// there server says ok! :)
// writing the response to the client
frameSize := 4 + // header
4 + // correlation ID
2 + // response code
8 // offset
header = internal.NewHeader(frameSize, 0x800b, 1)
expectOffset1(header.Write(serverRW)).To(BeNumerically("==", 8))

bodyResp := internal.NewQueryOffsetResponseWith(rmq.correlationIdSeq.next(), responseCodeFromContext(ctx, "query_offset"), offset)
b, err := bodyResp.MarshalBinary()
expectOffset1(err).ToNot(HaveOccurred())
expectOffset1(serverRW.Write(b)).To(BeNumerically("==", frameSize-4))
expectOffset1(serverRW.Flush()).To(Succeed())
}

func (rmq *fakeRabbitMQServer) fakeRabbitMQDeletePublisher(ctx context.Context, publisherId uint8) {
defer GinkgoRecover()

Expand Down

0 comments on commit 71c4d5d

Please sign in to comment.