Skip to content

Commit

Permalink
Merge pull request #37 from Gsantomaggio/credits
Browse files Browse the repository at this point in the history
Implement Credit
  • Loading branch information
Zerpet authored Jan 31, 2023
2 parents c231685 + 11ff4f8 commit 48543d1
Show file tree
Hide file tree
Showing 8 changed files with 270 additions and 4 deletions.
2 changes: 2 additions & 0 deletions internal/command_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ const (
CommandDeletePublisher uint16 = 0x0006 // 6
CommandSubscribe uint16 = 0x0007 // 7
CommandDeliver uint16 = 0x0008 // 8
CommandCredit uint16 = 0x0009 // 9
CommandCreate uint16 = 0x000d // 13
CommandDelete uint16 = 0x000e // 14
CommandPeerProperties uint16 = 0x0011 // 17
Expand All @@ -99,6 +100,7 @@ const (
CommandDeclarePublisherResponse uint16 = 0x8001
CommandDeletePublisherResponse uint16 = 0x8006
CommandSubscribeResponse uint16 = 0x8007
CommandCreditResponse uint16 = 0x8009
CommandCreateResponse uint16 = 0x800d
CommandDeleteResponse uint16 = 0x800e
CommandPeerPropertiesResponse uint16 = 0x8011
Expand Down
80 changes: 80 additions & 0 deletions internal/credit.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package internal

import (
"bufio"
"bytes"
)

type CreditRequest struct {
subscriptionId uint8
// number of chunks that can be sent
credit uint16
}

func NewCreditRequest(subscriptionId uint8, credit uint16) *CreditRequest {
return &CreditRequest{subscriptionId: subscriptionId, credit: credit}
}

func (c *CreditRequest) UnmarshalBinary(data []byte) error {
return readMany(bytes.NewReader(data), &c.subscriptionId, &c.credit)
}

func (c *CreditRequest) SubscriptionId() uint8 {
return c.subscriptionId
}

func (c *CreditRequest) Credit() uint16 {
return c.credit
}

func (c *CreditRequest) Write(w *bufio.Writer) (int, error) {
return writeMany(w, c.subscriptionId, c.credit)
}

func (c *CreditRequest) Key() uint16 {
return CommandCredit
}

func (c *CreditRequest) SizeNeeded() int {
return streamProtocolHeaderSizeBytes + streamProtocolKeySizeUint8 + streamProtocolKeySizeUint16
}

func (c *CreditRequest) Version() int16 {
return Version1
}

type CreditResponse struct {
responseCode uint16
subscriptionId uint8
}

func (c *CreditResponse) ResponseCode() uint16 {
return c.responseCode
}

func (c *CreditResponse) SubscriptionId() uint8 {
return c.subscriptionId
}

func (c *CreditResponse) MarshalBinary() (data []byte, err error) {
w := &bytes.Buffer{}
n, err := writeMany(w, c.responseCode, c.subscriptionId)
if err != nil {
return nil, err
}

if n != 3 {
return nil, errWriteShort
}

data = w.Bytes()
return
}

func NewCreditResponse(responseCode uint16, subscriptionId uint8) *CreditResponse {
return &CreditResponse{responseCode: responseCode, subscriptionId: subscriptionId}
}

func (c *CreditResponse) Read(r *bufio.Reader) error {
return readMany(r, &c.responseCode, &c.subscriptionId)
}
54 changes: 54 additions & 0 deletions internal/credit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package internal

import (
"bufio"
"bytes"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

var _ = Describe("Internal/Credit", func() {
Context("Request", func() {
It("knows the size needed to encode itself", func() {
c := &CreditRequest{
subscriptionId: 123,
credit: 987,
}
Expect(c.SizeNeeded()).To(BeNumerically("==", streamProtocolHeaderSizeBytes+1+2))
})

It("encodes itself into a binary sequence", func() {
c := &CreditRequest{
subscriptionId: 5,
credit: 255,
}
buff := &bytes.Buffer{}
wr := bufio.NewWriter(buff)
Expect(c.Write(wr)).To(BeNumerically("==", 3))
Expect(wr.Flush()).To(Succeed())

expectedByteSequence := []byte{
0x05, // subscription ID
0x00, 0xff, // credit
}
Expect(buff.Bytes()).To(Equal(expectedByteSequence))
})
})

Context("Response", func() {
It("decodes itself into a response struct", func() {
byteSequence := []byte{
0x00, 0x0f, // response code
0x10, // sub ID
}

buff := bytes.NewBuffer(byteSequence)
r := bufio.NewReader(buff)
c := &CreditResponse{}

Expect(c.Read(r)).Should(Succeed())
Expect(c.responseCode).To(BeNumerically("==", 15))
Expect(c.subscriptionId).To(BeNumerically("==", 16))
})
})
})
7 changes: 7 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,19 @@ func main() {
go func() {
for c := range chunkChan {
received += int(c.NumEntries)
err := streamClient.Credit(ctx, 1, 1)
if err != nil {
log.Error(err, "error sending credits")
}
if (received % totalMessages) == 0 {
log.Info("Received", "messages ", received)
}
}
}()

// this should log a warning message: subscription does not exist
_ = streamClient.Credit(ctx, 123, 1)

err = streamClient.Subscribe(ctx, stream, constants.OffsetTypeFirst, 1, 10, map[string]string{"name": "my_consumer"}, 10)
fmt.Println("Press any key to stop ")
reader := bufio.NewReader(os.Stdin)
Expand Down
46 changes: 46 additions & 0 deletions pkg/raw/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type Client struct {
connectionProperties map[string]string
confirmsCh chan *PublishConfirm
chunkCh chan *Chunk
notifyCh chan *CreditError
}

// IsOpen returns true if the connection is open, false otherwise
Expand Down Expand Up @@ -341,6 +342,32 @@ func (tc *Client) handleIncoming(ctx context.Context) error {
log.Error(err, "error ")
}
tc.handleResponse(ctx, exchangeResponse)
case internal.CommandCreditResponse:
creditResp := new(CreditError)
err = creditResp.Read(buffer)
log.Error(
errUnknownSubscription,
"received credit response for unknown subscription",
"responseCode",
creditResp.ResponseCode(),
"subscriptionId",
creditResp.SubscriptionId(),
)
if err != nil {
log.Error(err, "error in credit response")
return err
}

tc.mu.Lock()
if tc.notifyCh != nil {
select {
case <-ctx.Done():
tc.mu.Unlock()
return ctx.Err()
case tc.notifyCh <- creditResp:
}
}
tc.mu.Unlock()
default:
log.Info("frame not implemented", "command ID", fmt.Sprintf("%X", header.Command()))
_, err := buffer.Discard(header.Length() - 4)
Expand Down Expand Up @@ -945,6 +972,17 @@ func (tc *Client) ExchangeCommandVersions(ctx context.Context) error {
return streamErrorOrNil(response.ResponseCode())
}

// Credit TODO: go docs
func (tc *Client) Credit(ctx context.Context, subscriptionID uint8, credits uint16) error {
if ctx == nil {
return errNilContext
}
logger := logr.FromContextOrDiscard(ctx).WithName("Credit")
logger.V(debugLevel).Info("starting credit")

return tc.request(ctx, internal.NewCreditRequest(subscriptionID, credits))
}

// NotifyPublish TODO: godocs
func (tc *Client) NotifyPublish(c chan *PublishConfirm) <-chan *PublishConfirm {
tc.mu.Lock()
Expand All @@ -963,3 +1001,11 @@ func (tc *Client) NotifyChunk(c chan *Chunk) <-chan *Chunk {
tc.chunkCh = c
return c
}

// NotifyCreditError TODO: go docs
func (tc *Client) NotifyCreditError(notification chan *CreditError) <-chan *CreditError {
tc.mu.Lock()
defer tc.mu.Unlock()
tc.notifyCh = notification
return notification
}
30 changes: 30 additions & 0 deletions pkg/raw/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,36 @@ var _ = Describe("Client", func() {
Expect(streamClient.DeclareStream(itCtx, "test-stream", constants.StreamConfiguration{"some-key": "some-value"})).To(Succeed())
})

Context("credits", func() {
It("sends credits to the server", func(ctx SpecContext) {
Expect(fakeClientConn.SetDeadline(time.Now().Add(time.Second))).To(Succeed())
streamClient := raw.NewClient(fakeClientConn, conf)

go fakeRabbitMQ.fakeRabbitMQCredit(2, 100)

Expect(streamClient.Credit(ctx, 2, 100)).To(Succeed())
})

When("sending credits for non-existing subscription", func() {
It("returns an error", func(ctx SpecContext) {
Expect(fakeClientConn.SetDeadline(time.Now().Add(time.Second))).To(Succeed())
streamClient := raw.NewClient(fakeClientConn, conf)
go streamClient.(*raw.Client).StartFrameListener(ctx)

go fakeRabbitMQ.fakeRabbitMQCreditResponse(
newContextWithResponseCode(ctx, streamResponseCodeSubscriptionIdDoesNotExist, "credit"),
123,
)
var notification *raw.CreditError
notificationCh := streamClient.NotifyCreditError(make(chan *raw.CreditError))
Eventually(notificationCh).Should(Receive(&notification))
Expect(notification).To(BeAssignableToTypeOf(&raw.CreditError{}))
Expect(notification.ResponseCode()).To(BeNumerically("==", streamResponseCodeSubscriptionIdDoesNotExist))
Expect(notification.SubscriptionId()).To(BeNumerically("==", 123))
})
})
})

It("Delete a stream", func(ctx SpecContext) {
itCtx, cancel := context.WithTimeout(logr.NewContext(ctx, GinkgoLogr), time.Second*3)
defer cancel()
Expand Down
12 changes: 8 additions & 4 deletions pkg/raw/client_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ import (
)

var (
errURIScheme = errors.New("RabbitMQ Stream scheme must be either 'rabbitmq-stream://' or 'rabbitmq-stream+tls://'")
errURIWhitespace = errors.New("URI must not contain whitespace")
errNilContext = errors.New("context cannot be nil")
errNilConfig = errors.New("RabbitmqConfiguration cannot be nil")
errURIScheme = errors.New("RabbitMQ Stream scheme must be either 'rabbitmq-stream://' or 'rabbitmq-stream+tls://'")
errURIWhitespace = errors.New("URI must not contain whitespace")
errNilContext = errors.New("context cannot be nil")
errNilConfig = errors.New("RabbitmqConfiguration cannot be nil")
errUnknownSubscription = errors.New("unknown subscription ID")
)

var schemePorts = map[string]int{"rabbitmq-stream": 5552, "rabbitmq-stream+tls": 5551}
Expand Down Expand Up @@ -106,6 +107,7 @@ func NewClientConfiguration(rabbitmqUrls ...string) (*ClientConfiguration, error

type PublishConfirm = internal.PublishConfirmResponse
type Chunk = internal.ChunkResponse
type CreditError = internal.CreditResponse

type Clienter interface {
Connect(ctx context.Context) error
Expand All @@ -120,4 +122,6 @@ type Clienter interface {
NotifyPublish(chan *PublishConfirm) <-chan *PublishConfirm
NotifyChunk(c chan *Chunk) <-chan *Chunk
ExchangeCommandVersions(ctx context.Context) error
Credit(ctx context.Context, subscriptionId uint8, credit uint16) error
NotifyCreditError(notification chan *CreditError) <-chan *CreditError
}
43 changes: 43 additions & 0 deletions pkg/raw/stream_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,49 @@ func (rmq *fakeRabbitMQServer) fakeRabbitMQPublisherConfirms(pubId uint8, numOfC
expectOffset1(err).ToNot(HaveOccurred())
}

func (rmq *fakeRabbitMQServer) fakeRabbitMQCredit(subscriptionId uint8, credits uint16) {
defer GinkgoRecover()
expectOffset1(rmq.connection.SetDeadline(time.Now().Add(time.Second))).
To(Succeed())

serverReader := bufio.NewReader(rmq.connection)

header := new(internal.Header)
expectOffset1(header.Read(serverReader)).To(Succeed())
expectOffset1(header.Command()).To(BeNumerically("==", 0x0009))
expectOffset1(header.Version()).To(BeNumerically("==", 1))

buff := make([]byte, header.Length()-4)
expectOffset1(io.ReadFull(serverReader, buff)).
To(BeNumerically("==", header.Length()-4))

body := new(internal.CreditRequest)
expectOffset1(body.UnmarshalBinary(buff)).To(Succeed())
expectOffset1(body.SubscriptionId()).To(Equal(subscriptionId))
expectOffset1(body.Credit()).To(Equal(credits))
}

func (rmq *fakeRabbitMQServer) fakeRabbitMQCreditResponse(ctx context.Context, subscriptionId uint8) {
defer GinkgoRecover()
expectOffset1(rmq.connection.SetDeadline(time.Now().Add(time.Second))).
To(Succeed())

serverWriter := bufio.NewWriter(rmq.connection)

header := internal.NewHeader(4, 0x8009, 1)
expectOffset1(header.Write(serverWriter)).To(BeNumerically("==", 8))

body := internal.NewCreditResponse(responseCodeFromContext(ctx, "credit"), subscriptionId)
creditResponse, err := body.MarshalBinary()
expectOffset1(err).NotTo(HaveOccurred())

n, err := serverWriter.Write(creditResponse)
expectOffset1(err).NotTo(HaveOccurred())
expectOffset1(n).To(BeNumerically("==", 3))

expectOffset1(serverWriter.Flush()).To(Succeed())
}

func newContextWithResponseCode(ctx context.Context, respCode uint16, suffix ...string) context.Context {
var key = "rabbitmq-stream.response-code"
if suffix != nil {
Expand Down

0 comments on commit 48543d1

Please sign in to comment.