From 27c63fcde00a6054925670d386722e7e6ff154f9 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Thu, 13 Apr 2023 10:20:54 +0200 Subject: [PATCH 1/4] Implement query offset command closes: https://github.com/Gsantomaggio/rabbitmq-stream-go-client/issues/38 Signed-off-by: Gabriele Santomaggio --- internal/command_types.go | 1 + internal/query_offset.go | 82 +++++++++++++++++++++++++++++++++++ internal/query_offset_test.go | 52 ++++++++++++++++++++++ main.go | 3 -- 4 files changed, 135 insertions(+), 3 deletions(-) create mode 100644 internal/query_offset.go create mode 100644 internal/query_offset_test.go diff --git a/internal/command_types.go b/internal/command_types.go index c34cb24b..139b276b 100644 --- a/internal/command_types.go +++ b/internal/command_types.go @@ -85,6 +85,7 @@ const ( CommandSubscribe uint16 = 0x0007 // 7 CommandDeliver uint16 = 0x0008 // 8 CommandCredit uint16 = 0x0009 // 9 + CommandQueryOffset uint16 = 0x000b // 11 CommandCreate uint16 = 0x000d // 13 CommandDelete uint16 = 0x000e // 14 CommandPeerProperties uint16 = 0x0011 // 17 diff --git a/internal/query_offset.go b/internal/query_offset.go new file mode 100644 index 00000000..033f405a --- /dev/null +++ b/internal/query_offset.go @@ -0,0 +1,82 @@ +package internal + +import ( + "bufio" + "bytes" +) + +// QueryOffsetRequest is used to query the offset of a stream for a given consumer reference. +// ref: https://github.com/rabbitmq/rabbitmq-server/blob/main/deps/rabbitmq_stream/docs/PROTOCOL.adoc#queryoffset +type QueryOffsetRequest struct { + correlationId uint32 + consumerReference string // max 256 characters + stream string +} + +func (c *QueryOffsetRequest) Stream() string { + return c.stream +} + +func (c *QueryOffsetRequest) ConsumerReference() string { + return c.consumerReference +} + +func (c *QueryOffsetRequest) UnmarshalBinary(data []byte) error { + buff := bytes.NewReader(data) + rd := bufio.NewReader(buff) + return readMany(rd, &c.correlationId, &c.consumerReference, &c.stream) +} + +func NewQueryOffsetRequest(consumerReference string, stream string) *QueryOffsetRequest { + return &QueryOffsetRequest{consumerReference: consumerReference, stream: stream} +} + +func (c *QueryOffsetRequest) Write(writer *bufio.Writer) (int, error) { + return writeMany(writer, c.correlationId, c.consumerReference, c.stream) +} + +func (c *QueryOffsetRequest) Key() uint16 { + return CommandQueryOffset +} + +func (c *QueryOffsetRequest) SizeNeeded() int { + return streamProtocolHeader + + streamProtocolStringLenSizeBytes + len(c.consumerReference) + // consumer reference + streamProtocolStringLenSizeBytes + len(c.stream) // stream +} + +func (c *QueryOffsetRequest) SetCorrelationId(id uint32) { + c.correlationId = id +} + +func (c *QueryOffsetRequest) CorrelationId() uint32 { + return c.correlationId +} + +func (c *QueryOffsetRequest) Version() int16 { + return Version1 +} + +type QueryOffsetResponse struct { + correlationId uint32 + responseCode uint16 + offset uint64 +} + +func (c *QueryOffsetResponse) UnmarshalBinary(data []byte) error { + buff := bytes.NewReader(data) + rd := bufio.NewReader(buff) + return readMany(rd, &c.correlationId, &c.responseCode, &c.offset) +} + +func (c *QueryOffsetResponse) Read(reader *bufio.Reader) error { + return readMany(reader, &c.correlationId, &c.responseCode, &c.offset) +} + +func (c *QueryOffsetResponse) CorrelationId() uint32 { + return c.correlationId +} + +func NewQueryOffsetResponseWith(correlationId uint32, responseCode uint16, offset uint64) *QueryOffsetResponse { + return &QueryOffsetResponse{correlationId: correlationId, responseCode: responseCode, offset: offset} +} diff --git a/internal/query_offset_test.go b/internal/query_offset_test.go new file mode 100644 index 00000000..6d8e3cd9 --- /dev/null +++ b/internal/query_offset_test.go @@ -0,0 +1,52 @@ +package internal + +import ( + "bufio" + "bytes" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("QueryOffset", func() { + Describe("QueryOffsetRequest", func() { + + It("returns the size required to encode the frame", func() { + queryOffsetRequest := NewQueryOffsetRequest("my_c", "my_stream") + + expectedSize := 2 + 2 + 4 + // key ID + version + correlation ID + 2 + 4 + // uint16 for the consumerReference string + uint32 consumerReference string length + 2 + 9 // uint16 for the stream string + uint32 stream string length + Expect(queryOffsetRequest.SizeNeeded()).To(Equal(expectedSize)) + }) + + It("has the expected fields", func() { + queryOffsetRequest := NewQueryOffsetRequest("my_c", "my_stream") + queryOffsetRequest.SetCorrelationId(123) + Expect(queryOffsetRequest.Key()).To(BeNumerically("==", 0x000b)) + Expect(queryOffsetRequest.Stream()).To(Equal("my_stream")) + Expect(queryOffsetRequest.ConsumerReference()).To(Equal("my_c")) + Expect(queryOffsetRequest.CorrelationId()).To(BeNumerically("==", 123)) + + }) + + It("binary encodes the struct into a binary sequence", func() { + queryOffsetRequest := NewQueryOffsetRequest("my_c", "my_stream") + queryOffsetRequest.SetCorrelationId(123) + buff := new(bytes.Buffer) + wr := bufio.NewWriter(buff) + + bytesWritten, err := queryOffsetRequest.Write(wr) + Expect(err).ToNot(HaveOccurred()) + Expect(wr.Flush()).To(Succeed()) + + Expect(bytesWritten).To(BeNumerically("==", queryOffsetRequest.SizeNeeded()-streamProtocolHeaderSizeBytes)) + + expectedByteSequence := []byte{0x00, 0x00, 0x00, 123, // correlation id + 0x00, 0x04, byte('m'), byte('y'), byte('_'), byte('c'), // consumerReference len + consumerReference string + 0x00, 0x09, byte('m'), byte('y'), byte('_'), byte('s'), byte('t'), byte('r'), byte('e'), byte('a'), byte('m'), // stream len + stream string + } + Expect(buff.Bytes()).To(Equal(expectedByteSequence)) + }) + + }) +}) diff --git a/main.go b/main.go index f8e66d76..a89e5f15 100644 --- a/main.go +++ b/main.go @@ -105,9 +105,6 @@ func main() { } }() - // this should log a warning message: subscription does not exist - _ = streamClient.Credit(ctx, 123, 1) - err = streamClient.Subscribe(ctx, stream, constants.OffsetTypeFirst, 1, 10, map[string]string{"name": "my_consumer"}, 10) fmt.Println("Press any key to stop ") reader := bufio.NewReader(os.Stdin) From ff512246203d098207ebe0b4b94201fa3d3716ca Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Thu, 13 Apr 2023 10:45:56 +0200 Subject: [PATCH 2/4] Add response test Signed-off-by: Gabriele Santomaggio --- internal/query_offset.go | 12 ++++++++++++ internal/query_offset_test.go | 19 +++++++++++++++++++ 2 files changed, 31 insertions(+) diff --git a/internal/query_offset.go b/internal/query_offset.go index 033f405a..79bae9a3 100644 --- a/internal/query_offset.go +++ b/internal/query_offset.go @@ -77,6 +77,18 @@ func (c *QueryOffsetResponse) CorrelationId() uint32 { return c.correlationId } +func (c *QueryOffsetResponse) ResponseCode() uint16 { + return c.responseCode +} + +func (c *QueryOffsetResponse) Offset() uint64 { + return c.offset +} + func NewQueryOffsetResponseWith(correlationId uint32, responseCode uint16, offset uint64) *QueryOffsetResponse { return &QueryOffsetResponse{correlationId: correlationId, responseCode: responseCode, offset: offset} } + +func NewQueryOffsetResponse() *QueryOffsetResponse { + return &QueryOffsetResponse{} +} diff --git a/internal/query_offset_test.go b/internal/query_offset_test.go index 6d8e3cd9..0e680843 100644 --- a/internal/query_offset_test.go +++ b/internal/query_offset_test.go @@ -49,4 +49,23 @@ var _ = Describe("QueryOffset", func() { }) }) + + Describe("QueryOffsetResponse", func() { + It("decodes a binary sequence into a struct", func() { + someResponse := NewQueryOffsetResponse() + binaryQueryOffset := []byte{ + 0x00, 0x00, 0x00, 0xFF, // correlation id + 0x00, 0x0a, // response code + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x07, // offset + } + + buff := bytes.NewBuffer(binaryQueryOffset) + reader := bufio.NewReader(buff) + Expect(someResponse.Read(reader)).To(Succeed()) + + Expect(someResponse.CorrelationId()).To(BeNumerically("==", 0xFF)) + Expect(someResponse.ResponseCode()).To(BeNumerically("==", 0x0a)) + Expect(someResponse.Offset()).To(BeNumerically("==", 7)) + }) + }) }) From 6d168954fb24739d5e0b5937be5b5621d0754517 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Thu, 13 Apr 2023 12:16:26 +0200 Subject: [PATCH 3/4] Add client command Signed-off-by: Gabriele Santomaggio --- internal/command_types.go | 1 + pkg/raw/client.go | 32 ++++++++++++++++++++++++++++++++ pkg/raw/client_types.go | 1 + 3 files changed, 34 insertions(+) diff --git a/internal/command_types.go b/internal/command_types.go index 139b276b..b79af2ef 100644 --- a/internal/command_types.go +++ b/internal/command_types.go @@ -102,6 +102,7 @@ const ( CommandDeletePublisherResponse uint16 = 0x8006 CommandSubscribeResponse uint16 = 0x8007 CommandCreditResponse uint16 = 0x8009 + CommandQueryOffsetResponse uint16 = 0x800b CommandCreateResponse uint16 = 0x800d CommandDeleteResponse uint16 = 0x800e CommandPeerPropertiesResponse uint16 = 0x8011 diff --git a/pkg/raw/client.go b/pkg/raw/client.go index 85f77b8e..9a77a964 100644 --- a/pkg/raw/client.go +++ b/pkg/raw/client.go @@ -12,6 +12,7 @@ import ( "io" "math" "net" + "reflect" "strconv" "strings" "sync" @@ -342,6 +343,14 @@ func (tc *Client) handleIncoming(ctx context.Context) error { log.Error(err, "error ") } tc.handleResponse(ctx, exchangeResponse) + case internal.CommandQueryOffsetResponse: + queryOffsetResponse := new(internal.QueryOffsetResponse) + err = queryOffsetResponse.Read(buffer) + log.V(debugLevel).Info("received query offset response") + if err != nil { + log.Error(err, "error during receiving query offset response") + } + tc.handleResponse(ctx, queryOffsetResponse) case internal.CommandCreditResponse: creditResp := new(CreditError) err = creditResp.Read(buffer) @@ -1009,3 +1018,26 @@ func (tc *Client) NotifyCreditError(notification chan *CreditError) <-chan *Cred tc.notifyCh = notification return notification } + +// QueryOffset retrieves the last consumer offset stored for a given consumer Reference and stream. +// Useful for as consumer wants to know the last stored offset. +// NoOffsetFound is returned if no offset is stored for the given consumer Reference and stream. +// Zero (0) is a valid offset. +func (tc *Client) QueryOffset(ctx context.Context, reference string, stream string) (uint64, error) { + if ctx == nil { + return 0, errNilContext + } + logger := logr.FromContextOrDiscard(ctx).WithName("QueryOffset") + logger.V(debugLevel).Info("starting query offset", "reference", reference, "stream", stream) + response, err := tc.syncRequest(ctx, internal.NewQueryOffsetRequest(reference, stream)) + if err != nil { + logger.Error(err, "error sending sync request for query offset") + return 0, err + } + var offsetResponse *internal.QueryOffsetResponse + if reflect.TypeOf(response) == reflect.TypeOf(offsetResponse) { + return 0, errors.New("response is not of type *internal.QueryOffsetResponse") + } + offsetResponse = response.(*internal.QueryOffsetResponse) + return offsetResponse.Offset(), streamErrorOrNil(response.ResponseCode()) +} diff --git a/pkg/raw/client_types.go b/pkg/raw/client_types.go index 777a35e0..4b60878c 100644 --- a/pkg/raw/client_types.go +++ b/pkg/raw/client_types.go @@ -124,4 +124,5 @@ type Clienter interface { ExchangeCommandVersions(ctx context.Context) error Credit(ctx context.Context, subscriptionId uint8, credit uint16) error NotifyCreditError(notification chan *CreditError) <-chan *CreditError + QueryOffset(ctx context.Context, reference string, stream string) (uint64, error) } From 71c4d5dda4b3ad4ac024dd59330c84a8312bc70a Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Thu, 13 Apr 2023 15:39:10 +0200 Subject: [PATCH 4/4] Suite tests Signed-off-by: Gabriele Santomaggio --- internal/query_offset.go | 13 +++++++++++++ main.go | 8 ++++++++ pkg/constants/types.go | 2 +- pkg/raw/client.go | 6 +++--- pkg/raw/client_test.go | 12 ++++++++++++ pkg/raw/stream_suite_test.go | 37 ++++++++++++++++++++++++++++++++++++ 6 files changed, 74 insertions(+), 4 deletions(-) diff --git a/internal/query_offset.go b/internal/query_offset.go index 79bae9a3..58a830d1 100644 --- a/internal/query_offset.go +++ b/internal/query_offset.go @@ -85,6 +85,19 @@ func (c *QueryOffsetResponse) Offset() uint64 { return c.offset } +func (c *QueryOffsetResponse) MarshalBinary() ([]byte, error) { + var buff bytes.Buffer + wr := bufio.NewWriter(&buff) + _, err := writeMany(wr, c.correlationId, c.responseCode, c.offset) + if err != nil { + return nil, err + } + if err = wr.Flush(); err != nil { + return nil, err + } + return buff.Bytes(), nil +} + func NewQueryOffsetResponseWith(correlationId uint32, responseCode uint16, offset uint64) *QueryOffsetResponse { return &QueryOffsetResponse{correlationId: correlationId, responseCode: responseCode, offset: offset} } diff --git a/main.go b/main.go index a89e5f15..f0585d08 100644 --- a/main.go +++ b/main.go @@ -106,6 +106,14 @@ func main() { }() err = streamClient.Subscribe(ctx, stream, constants.OffsetTypeFirst, 1, 10, map[string]string{"name": "my_consumer"}, 10) + + offset, err := streamClient.QueryOffset(ctx, "my_consumer", stream) + if err != nil { + log.Error(err, "error querying offset") + } else { + log.Info("offset", "offset", offset) + } + fmt.Println("Press any key to stop ") reader := bufio.NewReader(os.Stdin) _, _ = reader.ReadString('\n') diff --git a/pkg/constants/types.go b/pkg/constants/types.go index d5eea365..4ff78212 100644 --- a/pkg/constants/types.go +++ b/pkg/constants/types.go @@ -23,7 +23,7 @@ var ResponseCodeToError = map[uint16]error{ ResponseCodeAccessRefused: errors.New("access refused"), ResponseCodePreconditionFailed: errors.New("precondition failed"), ResponseCodePublisherDoesNotExist: errors.New("publisher does not exist"), - ResponseCodeNoOffset: errors.New("no offset"), + ResponseCodeNoOffset: errors.New("no offset found"), } // Stream protocol response codes diff --git a/pkg/raw/client.go b/pkg/raw/client.go index 9a77a964..7d4c67dd 100644 --- a/pkg/raw/client.go +++ b/pkg/raw/client.go @@ -1021,7 +1021,7 @@ func (tc *Client) NotifyCreditError(notification chan *CreditError) <-chan *Cred // QueryOffset retrieves the last consumer offset stored for a given consumer Reference and stream. // Useful for as consumer wants to know the last stored offset. -// NoOffsetFound is returned if no offset is stored for the given consumer Reference and stream. +// "no offset found" error is returned if no offset is stored for the given consumer Reference and stream. // Zero (0) is a valid offset. func (tc *Client) QueryOffset(ctx context.Context, reference string, stream string) (uint64, error) { if ctx == nil { @@ -1031,11 +1031,11 @@ func (tc *Client) QueryOffset(ctx context.Context, reference string, stream stri logger.V(debugLevel).Info("starting query offset", "reference", reference, "stream", stream) response, err := tc.syncRequest(ctx, internal.NewQueryOffsetRequest(reference, stream)) if err != nil { - logger.Error(err, "error sending sync request for query offset") + logger.Error(err, "error sending sync request for query offset", "reference", reference, "stream", stream) return 0, err } var offsetResponse *internal.QueryOffsetResponse - if reflect.TypeOf(response) == reflect.TypeOf(offsetResponse) { + if reflect.TypeOf(response) != reflect.TypeOf(offsetResponse) { return 0, errors.New("response is not of type *internal.QueryOffsetResponse") } offsetResponse = response.(*internal.QueryOffsetResponse) diff --git a/pkg/raw/client_test.go b/pkg/raw/client_test.go index 9c9ab92f..e612c36b 100644 --- a/pkg/raw/client_test.go +++ b/pkg/raw/client_test.go @@ -258,6 +258,18 @@ var _ = Describe("Client", func() { Expect(streamClient.ExchangeCommandVersions(ctx)).To(Succeed()) }, SpecTimeout(time.Second*3)) + It("query offset", func(ctx SpecContext) { + Expect(fakeClientConn.SetDeadline(time.Now().Add(time.Second))).To(Succeed()) + streamClient := raw.NewClient(fakeClientConn, conf) + go streamClient.(*raw.Client).StartFrameListener(ctx) + + go fakeRabbitMQ.fakeRabbitMQQueryOffset(ctx, 123) + + offset, err := streamClient.QueryOffset(ctx, "reference", "stream") + Expect(err).To(Succeed()) + Expect(offset).To(BeNumerically("==", 123)) + }, SpecTimeout(time.Second*3)) + It("cancels requests after a timeout", func(ctx SpecContext) { // This test does not start a fake to mimic rabbitmq responses. By not starting a // fake rabbitmq, we simulate "rabbit not responding". The expectation is to diff --git a/pkg/raw/stream_suite_test.go b/pkg/raw/stream_suite_test.go index 57ec2dcf..443f11e8 100644 --- a/pkg/raw/stream_suite_test.go +++ b/pkg/raw/stream_suite_test.go @@ -361,6 +361,43 @@ func (rmq *fakeRabbitMQServer) fakeRabbitMQExchangeCommandVersions(ctx context.C expectOffset1(serverRW.Flush()).To(Succeed()) } +func (rmq *fakeRabbitMQServer) fakeRabbitMQQueryOffset(ctx context.Context, offset uint64) { + defer GinkgoRecover() + + expectOffset1(rmq.connection.SetDeadline(time.Now().Add(time.Second))). + To(Succeed()) + + serverRW := bufio.NewReadWriter(bufio.NewReader(rmq.connection), bufio.NewWriter(rmq.connection)) + + header := &internal.Header{} + expectOffset1(header.Read(serverRW.Reader)).To(Succeed()) + expectOffset1(header.Command()).To(BeNumerically("==", 0x000b)) + expectOffset1(header.Version()).To(BeNumerically("==", 1)) + + buff := make([]byte, header.Length()-4) + expectOffset1(io.ReadFull(serverRW, buff)).To(BeNumerically("==", header.Length()-4)) + + body := &internal.QueryOffsetRequest{} + expectOffset1(body.UnmarshalBinary(buff)).To(Succeed()) + Expect(body.Stream()).To(Equal("stream")) + Expect(body.ConsumerReference()).To(Equal("reference")) + + // there server says ok! :) + // writing the response to the client + frameSize := 4 + // header + 4 + // correlation ID + 2 + // response code + 8 // offset + header = internal.NewHeader(frameSize, 0x800b, 1) + expectOffset1(header.Write(serverRW)).To(BeNumerically("==", 8)) + + bodyResp := internal.NewQueryOffsetResponseWith(rmq.correlationIdSeq.next(), responseCodeFromContext(ctx, "query_offset"), offset) + b, err := bodyResp.MarshalBinary() + expectOffset1(err).ToNot(HaveOccurred()) + expectOffset1(serverRW.Write(b)).To(BeNumerically("==", frameSize-4)) + expectOffset1(serverRW.Flush()).To(Succeed()) +} + func (rmq *fakeRabbitMQServer) fakeRabbitMQDeletePublisher(ctx context.Context, publisherId uint8) { defer GinkgoRecover()