Skip to content

Commit

Permalink
Merge pull request #26 from Gsantomaggio/raw_send
Browse files Browse the repository at this point in the history
Raw send
  • Loading branch information
Zerpet authored Dec 12, 2022
2 parents 3ff1aef + 18f66c2 commit 702d99e
Show file tree
Hide file tree
Showing 25 changed files with 588 additions and 156 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ go-mod-tidy: ## Run 'go mod tidy' with compatibility to Go 1.18

.PHONY: go-generate-mocks
go-generate-mocks: | $(MOCKGEN) ## Generate Mocks for testing
$(MOCKGEN) -destination=./pkg/stream/mock_conn_test.go -package=stream_test net Conn
$(MOCKGEN) -destination=./pkg/raw/mock_conn_test.go -package=raw_test net Conn

### build

Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+Licev
github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/onsi/ginkgo/v2 v2.5.0 h1:TRtrvv2vdQqzkwrQ1ke6vtXf7IK34RBUJafIy1wMwls=
github.com/onsi/ginkgo/v2 v2.5.0/go.mod h1:Luc4sArBICYCS8THh8v3i3i5CuSZO+RaQRaJoeNwomw=
github.com/onsi/ginkgo/v2 v2.5.1 h1:auzK7OI497k6x4OvWq+TKAcpcSAlod0doAH72oIN0Jw=
github.com/onsi/ginkgo/v2 v2.5.1/go.mod h1:63DOGlLAH8+REH8jUGdL3YpCpu7JODesutUjdENfUAc=
github.com/onsi/gomega v1.24.1 h1:KORJXNNTzJXzu4ScJWssJfJMnJ+2QJqhoQSRwNlze9E=
Expand Down
4 changes: 2 additions & 2 deletions internal/close.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
"fmt"
)

// CloseRequest can be initiated from the Client or from the Server. This struct must implement both internal.CommandRead
// and internal.CommandWrite
// CloseRequest can be initiated from the Client or from the Server. This struct must implement both internal.SyncCommandRead
// and internal.SyncCommandWrite
type CloseRequest struct {
correlationId uint32
closingCode uint16
Expand Down
51 changes: 50 additions & 1 deletion internal/command_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,26 +4,74 @@ import (
"bufio"
)

// CommandRead is the interface that wraps the Read method.
// Read reads the command from the reader.
// nto related to any correlation ID. for example publish confirm
type CommandRead interface {
Read(reader *bufio.Reader) error
}

// SyncCommandRead reads the response from the stream.
// It reads the header and then the response.
// the Sync part is related to the correlation ID.
// So the caller waits for the response based on correlation
type SyncCommandRead interface {
CommandRead
CorrelationId() uint32
ResponseCode() uint16
}

// CommandWrite is the interface that wraps the Write method.
// The interface is implemented by all commands that are sent to the server.
// and that have no responses. Fire and forget style
// Command like: PublishRequest and Store Offset.
type CommandWrite interface {
Write(writer *bufio.Writer) (int, error)
Key() uint16
// SizeNeeded must return the size required to encode this CommandWrite
// plus the size of the Header. The size of the Header is always 4 bytes
SizeNeeded() int
Version() int16
}

// SyncCommandWrite is the interface that wraps the WriteTo method.
// The interface is implemented by all commands that are sent to the server.
// and that have responses in RPC style.
// Command like: Create Stream, Delete Stream, Declare Publisher, etc.
// SetCorrelationId CorrelationId is used to match the response with the request.
type SyncCommandWrite interface {
CommandWrite // Embedding the CommandWrite interface
SetCorrelationId(id uint32)
CorrelationId() uint32
Version() int16
}

// WriteCommand sends the Commands to the server.
// The commands are sent in the following order:
// 1. Header
// 2. Command
// 3. Flush
// The flush is required to make sure that the commands are sent to the server.
// WriteCommand doesn't care about the response.
func WriteCommand[T CommandWrite](request T, writer *bufio.Writer) error {
hWritten, err := NewHeaderRequest(request).Write(writer)
if err != nil {
return err
}
bWritten, err := request.Write(writer)
if err != nil {
return err
}
if (bWritten + hWritten) != (request.SizeNeeded() + 4) {
panic("WriteTo Command: Not all bytes written")
}
return writer.Flush()
}

// command IDs
const (
CommandDeclarePublisher uint16 = 0x0001 // 1
CommandPublish uint16 = 0x0002 // 2
CommandPublishConfirm uint16 = 0x0003 // 3
CommandDeletePublisher uint16 = 0x0006 // 6
CommandCreate uint16 = 0x000d // 13
CommandDelete uint16 = 0x000e // 14
Expand Down Expand Up @@ -55,6 +103,7 @@ const (
streamProtocolCorrelationIdSizeBytes
streamProtocolKeySizeBytes = 2
streamProtocolKeySizeUint8 = 1
streamProtocolKeySizeUint32 = 4
streamProtocolVersionSizeBytes = 2
streamProtocolCorrelationIdSizeBytes = 4
streamProtocolStringLenSizeBytes = 2
Expand Down
6 changes: 3 additions & 3 deletions internal/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@ package internal
import (
"bufio"
"bytes"
"github.com/gsantomaggio/rabbitmq-stream-go-client/pkg/common"
"github.com/gsantomaggio/rabbitmq-stream-go-client/pkg/constants"
)

type CreateRequest struct {
correlationId uint32
stream string
arguments common.StreamConfiguration
arguments constants.StreamConfiguration
}

func (c *CreateRequest) Arguments() common.StreamConfiguration {
func (c *CreateRequest) Arguments() constants.StreamConfiguration {
return c.arguments
}

Expand Down
5 changes: 3 additions & 2 deletions internal/header.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"bytes"
"encoding/binary"
"fmt"
"io"
)

type Header struct {
Expand Down Expand Up @@ -76,10 +77,10 @@ func NewHeaderRequest(command CommandWrite) *Header {
}
}

func (h *Header) Write(writer *bufio.Writer) (int, error) {
func (h *Header) Write(writer io.Writer) (int, error) {
return writeMany(writer, h.length, h.command, h.version)
}

func (h *Header) Read(reader *bufio.Reader) error {
func (h *Header) Read(reader io.Reader) error {
return readMany(reader, &h.length, &h.command, &h.version)
}
33 changes: 33 additions & 0 deletions internal/publish.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package internal

import "bufio"

type PublishRequest struct {
publisherId uint8
messageCount uint32
messages []byte
}

func (p *PublishRequest) Write(writer *bufio.Writer) (int, error) {
return writeMany(writer, p.publisherId, p.messageCount, p.messages)
}

func (p *PublishRequest) Key() uint16 {
return CommandPublish
}

func (p *PublishRequest) SizeNeeded() int {
return streamProtocolKeySizeBytes +
streamProtocolVersionSizeBytes +
streamProtocolKeySizeUint8 + // publisherId
streamProtocolKeySizeUint32 + // messageCount
len(p.messages) // messages
}

func (p *PublishRequest) Version() int16 {
return Version1
}

func NewPublishRequest(publisherId uint8, messageCount uint32, messages []byte) *PublishRequest {
return &PublishRequest{publisherId: publisherId, messageCount: messageCount, messages: messages}
}
27 changes: 27 additions & 0 deletions internal/publish_confirm.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package internal

import "bufio"

// PublishConfirmResponse is a response that contains a publisher ID and a list of publishing IDs.
// Publish commands return this type of response. It is used to confirm that the publishing was successful.
// It is asynchronous and the response could contain a list of publishing IDs.
type PublishConfirmResponse struct {
publisherID uint8 // publisher id
publishingIds []uint64
}

func (p *PublishConfirmResponse) Read(reader *bufio.Reader) error {
var publishingIdCount uint32
err := readMany(reader, &p.publisherID, &publishingIdCount)
if err != nil {
return err
}
p.publishingIds = make([]uint64, publishingIdCount)
for i := uint32(0); i < publishingIdCount; i++ {
err = readMany(reader, &p.publishingIds[i])
if err != nil {
return err
}
}
return nil
}
30 changes: 30 additions & 0 deletions internal/publish_confirm_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package internal

import (
"bufio"
"bytes"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

var _ = Describe("Publish Confirm Response", func() {
Context("Response", func() {
It("decodes a binary sequence", func() {
buff := new(bytes.Buffer)
wr := bufio.NewWriter(buff)
_, err := writeMany(wr, uint8(12), uint32(3), uint64(12345), uint64(12346), uint64(12347))
Expect(err).ToNot(HaveOccurred())
Expect(wr.Flush()).To(Succeed())

response := PublishConfirmResponse{}
Expect(response.Read(bufio.NewReader(buff))).To(Succeed())

Expect(response.publisherID).To(BeNumerically("==", 12))
Expect(len(response.publishingIds)).To(BeNumerically("==", 3))
Expect(response.publishingIds).To(HaveLen(3))
Expect(response.publishingIds[0]).To(BeNumerically("==", 12345))
Expect(response.publishingIds[1]).To(BeNumerically("==", 12346))
Expect(response.publishingIds[2]).To(BeNumerically("==", 12347))
})
})
})
95 changes: 95 additions & 0 deletions internal/publish_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package internal

import (
"bufio"
"bytes"
"github.com/gsantomaggio/rabbitmq-stream-go-client/pkg/common"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

var _ = Describe("Create", func() {
Context("Request", func() {
It("returns the expected attributes", func() {
seq := []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x64} // publishingId sequence number 100
seq = append(seq, []byte{0x00, 0x00, 0x00, 0x03}...) //message size
seq = append(seq, []byte("foo")...) // message body

createRequest := NewPublishRequest(17, 1, seq)
Expect(createRequest.Key()).To(BeNumerically("==", 0x0002))
Expect(createRequest.Version()).To(BeNumerically("==", 1))

expectedSize := streamProtocolKeySizeBytes +
streamProtocolVersionSizeBytes +
streamProtocolKeySizeUint8 + // publisherId
streamProtocolKeySizeUint32 + // message count
+len(seq) // message body

Expect(createRequest.SizeNeeded()).To(BeNumerically("==", expectedSize))
})

It("encodes itself into a binary sequence", func() {
seq := []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x64} // publishingId sequence number 100
seq = append(seq, []byte{0x00, 0x00, 0x00, 0x03}...) //message size
seq = append(seq, []byte("foo")...) // message body

c := NewPublishRequest(17, 1, seq)
buff := new(bytes.Buffer)
wr := bufio.NewWriter(buff)
Expect(c.Write(wr)).To(BeNumerically("==", c.SizeNeeded()-streamProtocolHeaderSizeBytes))
Expect(wr.Flush()).To(Succeed())

expectedByteSequence := []byte{
0x11, // publisher ID 17
0x00, 0x00, 0x00, 0x01, // message count 1

}
expectedByteSequence = append(expectedByteSequence, seq...)
Expect(buff.Bytes()).To(Equal(expectedByteSequence))
})

It("encodes fake message", func() {
buffFakeMessages := new(bytes.Buffer)
writerFakeMessages := bufio.NewWriter(buffFakeMessages)

// we simulate a message aggregation
// that can be done by the client
// First loop to aggregate the messages
var fakeMessages []common.StreamerMessage
fakeMessages = append(fakeMessages, NewFakeMessage(17, []byte("foo")))
fakeMessages = append(fakeMessages, NewFakeMessage(21, []byte("wine")))
fakeMessages = append(fakeMessages, NewFakeMessage(23, []byte("beer")))
fakeMessages = append(fakeMessages, NewFakeMessage(29, []byte("water")))

// It is time to prepare the buffer to send to the server
for _, fakeMessage := range fakeMessages {
Expect(fakeMessage.WriteTo(writerFakeMessages)).To(BeNumerically("==", 8+4+len(fakeMessage.Body())))
}
Expect(writerFakeMessages.Flush()).To(Succeed())

c := NewPublishRequest(200, uint32(len(fakeMessages)), buffFakeMessages.Bytes())
buff := new(bytes.Buffer)
wr := bufio.NewWriter(buff)
Expect(c.Write(wr)).To(BeNumerically("==", c.SizeNeeded()-streamProtocolHeaderSizeBytes))
Expect(wr.Flush()).To(Succeed())

})

})

Context("Response", func() {
It("decodes a binary sequence", func() {
buff := new(bytes.Buffer)
wr := bufio.NewWriter(buff)
_, err := writeMany(wr, uint32(12345), uint16(101))
Expect(err).ToNot(HaveOccurred())
Expect(wr.Flush()).To(Succeed())

response := SimpleResponse{}
Expect(response.Read(bufio.NewReader(buff))).To(Succeed())

Expect(response.correlationId).To(BeNumerically("==", 12345))
Expect(response.responseCode).To(BeNumerically("==", 101))
})
})
})
4 changes: 2 additions & 2 deletions internal/sasl_mechanisms.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"bytes"
"encoding"
"fmt"
"github.com/gsantomaggio/rabbitmq-stream-go-client/pkg/common"
"github.com/gsantomaggio/rabbitmq-stream-go-client/pkg/constants"
)

type SaslHandshakeRequest struct {
Expand Down Expand Up @@ -255,7 +255,7 @@ func (s *SaslAuthenticateResponse) Read(reader *bufio.Reader) error {
return err
}

if s.responseCode == common.ResponseCodeSASLChallenge {
if s.responseCode == constants.ResponseCodeSASLChallenge {
challengeResponse, err := readByteSlice(reader)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion internal/simple_response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
. "github.com/onsi/gomega"
)

var _ = Describe("SaslMechanisms", func() {
var _ = Describe("Simple Response", func() {
Context("Response", func() {
It("decodes a binary sequence", func() {
buff := new(bytes.Buffer)
Expand Down
Loading

0 comments on commit 702d99e

Please sign in to comment.