Skip to content

Commit

Permalink
Merge pull request #43 from Gsantomaggio/query_offset
Browse files Browse the repository at this point in the history
Implement query offset command
  • Loading branch information
Zerpet authored Apr 13, 2023
2 parents 48543d1 + 71c4d5d commit 7bbb207
Show file tree
Hide file tree
Showing 9 changed files with 271 additions and 4 deletions.
2 changes: 2 additions & 0 deletions internal/command_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ const (
CommandSubscribe uint16 = 0x0007 // 7
CommandDeliver uint16 = 0x0008 // 8
CommandCredit uint16 = 0x0009 // 9
CommandQueryOffset uint16 = 0x000b // 11
CommandCreate uint16 = 0x000d // 13
CommandDelete uint16 = 0x000e // 14
CommandPeerProperties uint16 = 0x0011 // 17
Expand All @@ -101,6 +102,7 @@ const (
CommandDeletePublisherResponse uint16 = 0x8006
CommandSubscribeResponse uint16 = 0x8007
CommandCreditResponse uint16 = 0x8009
CommandQueryOffsetResponse uint16 = 0x800b
CommandCreateResponse uint16 = 0x800d
CommandDeleteResponse uint16 = 0x800e
CommandPeerPropertiesResponse uint16 = 0x8011
Expand Down
107 changes: 107 additions & 0 deletions internal/query_offset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package internal

import (
"bufio"
"bytes"
)

// QueryOffsetRequest is used to query the offset of a stream for a given consumer reference.
// ref: https://github.com/rabbitmq/rabbitmq-server/blob/main/deps/rabbitmq_stream/docs/PROTOCOL.adoc#queryoffset
type QueryOffsetRequest struct {
correlationId uint32
consumerReference string // max 256 characters
stream string
}

func (c *QueryOffsetRequest) Stream() string {
return c.stream
}

func (c *QueryOffsetRequest) ConsumerReference() string {
return c.consumerReference
}

func (c *QueryOffsetRequest) UnmarshalBinary(data []byte) error {
buff := bytes.NewReader(data)
rd := bufio.NewReader(buff)
return readMany(rd, &c.correlationId, &c.consumerReference, &c.stream)
}

func NewQueryOffsetRequest(consumerReference string, stream string) *QueryOffsetRequest {
return &QueryOffsetRequest{consumerReference: consumerReference, stream: stream}
}

func (c *QueryOffsetRequest) Write(writer *bufio.Writer) (int, error) {
return writeMany(writer, c.correlationId, c.consumerReference, c.stream)
}

func (c *QueryOffsetRequest) Key() uint16 {
return CommandQueryOffset
}

func (c *QueryOffsetRequest) SizeNeeded() int {
return streamProtocolHeader +
streamProtocolStringLenSizeBytes + len(c.consumerReference) + // consumer reference
streamProtocolStringLenSizeBytes + len(c.stream) // stream
}

func (c *QueryOffsetRequest) SetCorrelationId(id uint32) {
c.correlationId = id
}

func (c *QueryOffsetRequest) CorrelationId() uint32 {
return c.correlationId
}

func (c *QueryOffsetRequest) Version() int16 {
return Version1
}

type QueryOffsetResponse struct {
correlationId uint32
responseCode uint16
offset uint64
}

func (c *QueryOffsetResponse) UnmarshalBinary(data []byte) error {
buff := bytes.NewReader(data)
rd := bufio.NewReader(buff)
return readMany(rd, &c.correlationId, &c.responseCode, &c.offset)
}

func (c *QueryOffsetResponse) Read(reader *bufio.Reader) error {
return readMany(reader, &c.correlationId, &c.responseCode, &c.offset)
}

func (c *QueryOffsetResponse) CorrelationId() uint32 {
return c.correlationId
}

func (c *QueryOffsetResponse) ResponseCode() uint16 {
return c.responseCode
}

func (c *QueryOffsetResponse) Offset() uint64 {
return c.offset
}

func (c *QueryOffsetResponse) MarshalBinary() ([]byte, error) {
var buff bytes.Buffer
wr := bufio.NewWriter(&buff)
_, err := writeMany(wr, c.correlationId, c.responseCode, c.offset)
if err != nil {
return nil, err
}
if err = wr.Flush(); err != nil {
return nil, err
}
return buff.Bytes(), nil
}

func NewQueryOffsetResponseWith(correlationId uint32, responseCode uint16, offset uint64) *QueryOffsetResponse {
return &QueryOffsetResponse{correlationId: correlationId, responseCode: responseCode, offset: offset}
}

func NewQueryOffsetResponse() *QueryOffsetResponse {
return &QueryOffsetResponse{}
}
71 changes: 71 additions & 0 deletions internal/query_offset_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package internal

import (
"bufio"
"bytes"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

var _ = Describe("QueryOffset", func() {
Describe("QueryOffsetRequest", func() {

It("returns the size required to encode the frame", func() {
queryOffsetRequest := NewQueryOffsetRequest("my_c", "my_stream")

expectedSize := 2 + 2 + 4 + // key ID + version + correlation ID
2 + 4 + // uint16 for the consumerReference string + uint32 consumerReference string length
2 + 9 // uint16 for the stream string + uint32 stream string length
Expect(queryOffsetRequest.SizeNeeded()).To(Equal(expectedSize))
})

It("has the expected fields", func() {
queryOffsetRequest := NewQueryOffsetRequest("my_c", "my_stream")
queryOffsetRequest.SetCorrelationId(123)
Expect(queryOffsetRequest.Key()).To(BeNumerically("==", 0x000b))
Expect(queryOffsetRequest.Stream()).To(Equal("my_stream"))
Expect(queryOffsetRequest.ConsumerReference()).To(Equal("my_c"))
Expect(queryOffsetRequest.CorrelationId()).To(BeNumerically("==", 123))

})

It("binary encodes the struct into a binary sequence", func() {
queryOffsetRequest := NewQueryOffsetRequest("my_c", "my_stream")
queryOffsetRequest.SetCorrelationId(123)
buff := new(bytes.Buffer)
wr := bufio.NewWriter(buff)

bytesWritten, err := queryOffsetRequest.Write(wr)
Expect(err).ToNot(HaveOccurred())
Expect(wr.Flush()).To(Succeed())

Expect(bytesWritten).To(BeNumerically("==", queryOffsetRequest.SizeNeeded()-streamProtocolHeaderSizeBytes))

expectedByteSequence := []byte{0x00, 0x00, 0x00, 123, // correlation id
0x00, 0x04, byte('m'), byte('y'), byte('_'), byte('c'), // consumerReference len + consumerReference string
0x00, 0x09, byte('m'), byte('y'), byte('_'), byte('s'), byte('t'), byte('r'), byte('e'), byte('a'), byte('m'), // stream len + stream string
}
Expect(buff.Bytes()).To(Equal(expectedByteSequence))
})

})

Describe("QueryOffsetResponse", func() {
It("decodes a binary sequence into a struct", func() {
someResponse := NewQueryOffsetResponse()
binaryQueryOffset := []byte{
0x00, 0x00, 0x00, 0xFF, // correlation id
0x00, 0x0a, // response code
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x07, // offset
}

buff := bytes.NewBuffer(binaryQueryOffset)
reader := bufio.NewReader(buff)
Expect(someResponse.Read(reader)).To(Succeed())

Expect(someResponse.CorrelationId()).To(BeNumerically("==", 0xFF))
Expect(someResponse.ResponseCode()).To(BeNumerically("==", 0x0a))
Expect(someResponse.Offset()).To(BeNumerically("==", 7))
})
})
})
11 changes: 8 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,15 @@ func main() {
}
}()

// this should log a warning message: subscription does not exist
_ = streamClient.Credit(ctx, 123, 1)

err = streamClient.Subscribe(ctx, stream, constants.OffsetTypeFirst, 1, 10, map[string]string{"name": "my_consumer"}, 10)

offset, err := streamClient.QueryOffset(ctx, "my_consumer", stream)
if err != nil {
log.Error(err, "error querying offset")
} else {
log.Info("offset", "offset", offset)
}

fmt.Println("Press any key to stop ")
reader := bufio.NewReader(os.Stdin)
_, _ = reader.ReadString('\n')
Expand Down
2 changes: 1 addition & 1 deletion pkg/constants/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var ResponseCodeToError = map[uint16]error{
ResponseCodeAccessRefused: errors.New("access refused"),
ResponseCodePreconditionFailed: errors.New("precondition failed"),
ResponseCodePublisherDoesNotExist: errors.New("publisher does not exist"),
ResponseCodeNoOffset: errors.New("no offset"),
ResponseCodeNoOffset: errors.New("no offset found"),
}

// Stream protocol response codes
Expand Down
32 changes: 32 additions & 0 deletions pkg/raw/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"io"
"math"
"net"
"reflect"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -342,6 +343,14 @@ func (tc *Client) handleIncoming(ctx context.Context) error {
log.Error(err, "error ")
}
tc.handleResponse(ctx, exchangeResponse)
case internal.CommandQueryOffsetResponse:
queryOffsetResponse := new(internal.QueryOffsetResponse)
err = queryOffsetResponse.Read(buffer)
log.V(debugLevel).Info("received query offset response")
if err != nil {
log.Error(err, "error during receiving query offset response")
}
tc.handleResponse(ctx, queryOffsetResponse)
case internal.CommandCreditResponse:
creditResp := new(CreditError)
err = creditResp.Read(buffer)
Expand Down Expand Up @@ -1009,3 +1018,26 @@ func (tc *Client) NotifyCreditError(notification chan *CreditError) <-chan *Cred
tc.notifyCh = notification
return notification
}

// QueryOffset retrieves the last consumer offset stored for a given consumer Reference and stream.
// Useful for as consumer wants to know the last stored offset.
// "no offset found" error is returned if no offset is stored for the given consumer Reference and stream.
// Zero (0) is a valid offset.
func (tc *Client) QueryOffset(ctx context.Context, reference string, stream string) (uint64, error) {
if ctx == nil {
return 0, errNilContext
}
logger := logr.FromContextOrDiscard(ctx).WithName("QueryOffset")
logger.V(debugLevel).Info("starting query offset", "reference", reference, "stream", stream)
response, err := tc.syncRequest(ctx, internal.NewQueryOffsetRequest(reference, stream))
if err != nil {
logger.Error(err, "error sending sync request for query offset", "reference", reference, "stream", stream)
return 0, err
}
var offsetResponse *internal.QueryOffsetResponse
if reflect.TypeOf(response) != reflect.TypeOf(offsetResponse) {
return 0, errors.New("response is not of type *internal.QueryOffsetResponse")
}
offsetResponse = response.(*internal.QueryOffsetResponse)
return offsetResponse.Offset(), streamErrorOrNil(response.ResponseCode())
}
12 changes: 12 additions & 0 deletions pkg/raw/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,18 @@ var _ = Describe("Client", func() {
Expect(streamClient.ExchangeCommandVersions(ctx)).To(Succeed())
}, SpecTimeout(time.Second*3))

It("query offset", func(ctx SpecContext) {
Expect(fakeClientConn.SetDeadline(time.Now().Add(time.Second))).To(Succeed())
streamClient := raw.NewClient(fakeClientConn, conf)
go streamClient.(*raw.Client).StartFrameListener(ctx)

go fakeRabbitMQ.fakeRabbitMQQueryOffset(ctx, 123)

offset, err := streamClient.QueryOffset(ctx, "reference", "stream")
Expect(err).To(Succeed())
Expect(offset).To(BeNumerically("==", 123))
}, SpecTimeout(time.Second*3))

It("cancels requests after a timeout", func(ctx SpecContext) {
// This test does not start a fake to mimic rabbitmq responses. By not starting a
// fake rabbitmq, we simulate "rabbit not responding". The expectation is to
Expand Down
1 change: 1 addition & 0 deletions pkg/raw/client_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,5 @@ type Clienter interface {
ExchangeCommandVersions(ctx context.Context) error
Credit(ctx context.Context, subscriptionId uint8, credit uint16) error
NotifyCreditError(notification chan *CreditError) <-chan *CreditError
QueryOffset(ctx context.Context, reference string, stream string) (uint64, error)
}
37 changes: 37 additions & 0 deletions pkg/raw/stream_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,43 @@ func (rmq *fakeRabbitMQServer) fakeRabbitMQExchangeCommandVersions(ctx context.C
expectOffset1(serverRW.Flush()).To(Succeed())
}

func (rmq *fakeRabbitMQServer) fakeRabbitMQQueryOffset(ctx context.Context, offset uint64) {
defer GinkgoRecover()

expectOffset1(rmq.connection.SetDeadline(time.Now().Add(time.Second))).
To(Succeed())

serverRW := bufio.NewReadWriter(bufio.NewReader(rmq.connection), bufio.NewWriter(rmq.connection))

header := &internal.Header{}
expectOffset1(header.Read(serverRW.Reader)).To(Succeed())
expectOffset1(header.Command()).To(BeNumerically("==", 0x000b))
expectOffset1(header.Version()).To(BeNumerically("==", 1))

buff := make([]byte, header.Length()-4)
expectOffset1(io.ReadFull(serverRW, buff)).To(BeNumerically("==", header.Length()-4))

body := &internal.QueryOffsetRequest{}
expectOffset1(body.UnmarshalBinary(buff)).To(Succeed())
Expect(body.Stream()).To(Equal("stream"))
Expect(body.ConsumerReference()).To(Equal("reference"))

// there server says ok! :)
// writing the response to the client
frameSize := 4 + // header
4 + // correlation ID
2 + // response code
8 // offset
header = internal.NewHeader(frameSize, 0x800b, 1)
expectOffset1(header.Write(serverRW)).To(BeNumerically("==", 8))

bodyResp := internal.NewQueryOffsetResponseWith(rmq.correlationIdSeq.next(), responseCodeFromContext(ctx, "query_offset"), offset)
b, err := bodyResp.MarshalBinary()
expectOffset1(err).ToNot(HaveOccurred())
expectOffset1(serverRW.Write(b)).To(BeNumerically("==", frameSize-4))
expectOffset1(serverRW.Flush()).To(Succeed())
}

func (rmq *fakeRabbitMQServer) fakeRabbitMQDeletePublisher(ctx context.Context, publisherId uint8) {
defer GinkgoRecover()

Expand Down

0 comments on commit 7bbb207

Please sign in to comment.