diff --git a/Makefile b/Makefile index c56bef6b..b53f6d3c 100644 --- a/Makefile +++ b/Makefile @@ -55,7 +55,7 @@ go-mod-tidy: ## Run 'go mod tidy' with compatibility to Go 1.18 .PHONY: go-generate-mocks go-generate-mocks: | $(MOCKGEN) ## Generate Mocks for testing - $(MOCKGEN) -destination=./pkg/stream/mock_conn_test.go -package=stream_test net Conn + $(MOCKGEN) -destination=./pkg/raw/mock_conn_test.go -package=raw_test net Conn ### build diff --git a/go.sum b/go.sum index e3383d2d..660b3068 100644 --- a/go.sum +++ b/go.sum @@ -10,8 +10,6 @@ github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+Licev github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/onsi/ginkgo/v2 v2.5.0 h1:TRtrvv2vdQqzkwrQ1ke6vtXf7IK34RBUJafIy1wMwls= -github.com/onsi/ginkgo/v2 v2.5.0/go.mod h1:Luc4sArBICYCS8THh8v3i3i5CuSZO+RaQRaJoeNwomw= github.com/onsi/ginkgo/v2 v2.5.1 h1:auzK7OI497k6x4OvWq+TKAcpcSAlod0doAH72oIN0Jw= github.com/onsi/ginkgo/v2 v2.5.1/go.mod h1:63DOGlLAH8+REH8jUGdL3YpCpu7JODesutUjdENfUAc= github.com/onsi/gomega v1.24.1 h1:KORJXNNTzJXzu4ScJWssJfJMnJ+2QJqhoQSRwNlze9E= diff --git a/internal/close.go b/internal/close.go index bfabe9af..e0132be7 100644 --- a/internal/close.go +++ b/internal/close.go @@ -6,8 +6,8 @@ import ( "fmt" ) -// CloseRequest can be initiated from the Client or from the Server. This struct must implement both internal.CommandRead -// and internal.CommandWrite +// CloseRequest can be initiated from the Client or from the Server. This struct must implement both internal.SyncCommandRead +// and internal.SyncCommandWrite type CloseRequest struct { correlationId uint32 closingCode uint16 diff --git a/internal/command_types.go b/internal/command_types.go index 4f42f2f3..2d672544 100644 --- a/internal/command_types.go +++ b/internal/command_types.go @@ -4,26 +4,74 @@ import ( "bufio" ) +// CommandRead is the interface that wraps the Read method. +// Read reads the command from the reader. +// nto related to any correlation ID. for example publish confirm type CommandRead interface { Read(reader *bufio.Reader) error +} + +// SyncCommandRead reads the response from the stream. +// It reads the header and then the response. +// the Sync part is related to the correlation ID. +// So the caller waits for the response based on correlation +type SyncCommandRead interface { + CommandRead CorrelationId() uint32 ResponseCode() uint16 } +// CommandWrite is the interface that wraps the Write method. +// The interface is implemented by all commands that are sent to the server. +// and that have no responses. Fire and forget style +// Command like: PublishRequest and Store Offset. type CommandWrite interface { Write(writer *bufio.Writer) (int, error) Key() uint16 // SizeNeeded must return the size required to encode this CommandWrite // plus the size of the Header. The size of the Header is always 4 bytes SizeNeeded() int + Version() int16 +} + +// SyncCommandWrite is the interface that wraps the WriteTo method. +// The interface is implemented by all commands that are sent to the server. +// and that have responses in RPC style. +// Command like: Create Stream, Delete Stream, Declare Publisher, etc. +// SetCorrelationId CorrelationId is used to match the response with the request. +type SyncCommandWrite interface { + CommandWrite // Embedding the CommandWrite interface SetCorrelationId(id uint32) CorrelationId() uint32 - Version() int16 +} + +// WriteCommand sends the Commands to the server. +// The commands are sent in the following order: +// 1. Header +// 2. Command +// 3. Flush +// The flush is required to make sure that the commands are sent to the server. +// WriteCommand doesn't care about the response. +func WriteCommand[T CommandWrite](request T, writer *bufio.Writer) error { + hWritten, err := NewHeaderRequest(request).Write(writer) + if err != nil { + return err + } + bWritten, err := request.Write(writer) + if err != nil { + return err + } + if (bWritten + hWritten) != (request.SizeNeeded() + 4) { + panic("WriteTo Command: Not all bytes written") + } + return writer.Flush() } // command IDs const ( CommandDeclarePublisher uint16 = 0x0001 // 1 + CommandPublish uint16 = 0x0002 // 2 + CommandPublishConfirm uint16 = 0x0003 // 3 CommandDeletePublisher uint16 = 0x0006 // 6 CommandCreate uint16 = 0x000d // 13 CommandDelete uint16 = 0x000e // 14 @@ -55,6 +103,7 @@ const ( streamProtocolCorrelationIdSizeBytes streamProtocolKeySizeBytes = 2 streamProtocolKeySizeUint8 = 1 + streamProtocolKeySizeUint32 = 4 streamProtocolVersionSizeBytes = 2 streamProtocolCorrelationIdSizeBytes = 4 streamProtocolStringLenSizeBytes = 2 diff --git a/internal/create.go b/internal/create.go index f18b75d5..d2e140fa 100644 --- a/internal/create.go +++ b/internal/create.go @@ -3,16 +3,16 @@ package internal import ( "bufio" "bytes" - "github.com/gsantomaggio/rabbitmq-stream-go-client/pkg/common" + "github.com/gsantomaggio/rabbitmq-stream-go-client/pkg/constants" ) type CreateRequest struct { correlationId uint32 stream string - arguments common.StreamConfiguration + arguments constants.StreamConfiguration } -func (c *CreateRequest) Arguments() common.StreamConfiguration { +func (c *CreateRequest) Arguments() constants.StreamConfiguration { return c.arguments } diff --git a/internal/header.go b/internal/header.go index 3bc75e77..921105d1 100644 --- a/internal/header.go +++ b/internal/header.go @@ -5,6 +5,7 @@ import ( "bytes" "encoding/binary" "fmt" + "io" ) type Header struct { @@ -76,10 +77,10 @@ func NewHeaderRequest(command CommandWrite) *Header { } } -func (h *Header) Write(writer *bufio.Writer) (int, error) { +func (h *Header) Write(writer io.Writer) (int, error) { return writeMany(writer, h.length, h.command, h.version) } -func (h *Header) Read(reader *bufio.Reader) error { +func (h *Header) Read(reader io.Reader) error { return readMany(reader, &h.length, &h.command, &h.version) } diff --git a/internal/publish.go b/internal/publish.go new file mode 100644 index 00000000..36afe2c9 --- /dev/null +++ b/internal/publish.go @@ -0,0 +1,33 @@ +package internal + +import "bufio" + +type PublishRequest struct { + publisherId uint8 + messageCount uint32 + messages []byte +} + +func (p *PublishRequest) Write(writer *bufio.Writer) (int, error) { + return writeMany(writer, p.publisherId, p.messageCount, p.messages) +} + +func (p *PublishRequest) Key() uint16 { + return CommandPublish +} + +func (p *PublishRequest) SizeNeeded() int { + return streamProtocolKeySizeBytes + + streamProtocolVersionSizeBytes + + streamProtocolKeySizeUint8 + // publisherId + streamProtocolKeySizeUint32 + // messageCount + len(p.messages) // messages +} + +func (p *PublishRequest) Version() int16 { + return Version1 +} + +func NewPublishRequest(publisherId uint8, messageCount uint32, messages []byte) *PublishRequest { + return &PublishRequest{publisherId: publisherId, messageCount: messageCount, messages: messages} +} diff --git a/internal/publish_confirm.go b/internal/publish_confirm.go new file mode 100644 index 00000000..84fc4bb1 --- /dev/null +++ b/internal/publish_confirm.go @@ -0,0 +1,27 @@ +package internal + +import "bufio" + +// PublishConfirmResponse is a response that contains a publisher ID and a list of publishing IDs. +// Publish commands return this type of response. It is used to confirm that the publishing was successful. +// It is asynchronous and the response could contain a list of publishing IDs. +type PublishConfirmResponse struct { + publisherID uint8 // publisher id + publishingIds []uint64 +} + +func (p *PublishConfirmResponse) Read(reader *bufio.Reader) error { + var publishingIdCount uint32 + err := readMany(reader, &p.publisherID, &publishingIdCount) + if err != nil { + return err + } + p.publishingIds = make([]uint64, publishingIdCount) + for i := uint32(0); i < publishingIdCount; i++ { + err = readMany(reader, &p.publishingIds[i]) + if err != nil { + return err + } + } + return nil +} diff --git a/internal/publish_confirm_test.go b/internal/publish_confirm_test.go new file mode 100644 index 00000000..2e9295d3 --- /dev/null +++ b/internal/publish_confirm_test.go @@ -0,0 +1,30 @@ +package internal + +import ( + "bufio" + "bytes" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("Publish Confirm Response", func() { + Context("Response", func() { + It("decodes a binary sequence", func() { + buff := new(bytes.Buffer) + wr := bufio.NewWriter(buff) + _, err := writeMany(wr, uint8(12), uint32(3), uint64(12345), uint64(12346), uint64(12347)) + Expect(err).ToNot(HaveOccurred()) + Expect(wr.Flush()).To(Succeed()) + + response := PublishConfirmResponse{} + Expect(response.Read(bufio.NewReader(buff))).To(Succeed()) + + Expect(response.publisherID).To(BeNumerically("==", 12)) + Expect(len(response.publishingIds)).To(BeNumerically("==", 3)) + Expect(response.publishingIds).To(HaveLen(3)) + Expect(response.publishingIds[0]).To(BeNumerically("==", 12345)) + Expect(response.publishingIds[1]).To(BeNumerically("==", 12346)) + Expect(response.publishingIds[2]).To(BeNumerically("==", 12347)) + }) + }) +}) diff --git a/internal/publish_test.go b/internal/publish_test.go new file mode 100644 index 00000000..78c221e4 --- /dev/null +++ b/internal/publish_test.go @@ -0,0 +1,95 @@ +package internal + +import ( + "bufio" + "bytes" + "github.com/gsantomaggio/rabbitmq-stream-go-client/pkg/common" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("Create", func() { + Context("Request", func() { + It("returns the expected attributes", func() { + seq := []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x64} // publishingId sequence number 100 + seq = append(seq, []byte{0x00, 0x00, 0x00, 0x03}...) //message size + seq = append(seq, []byte("foo")...) // message body + + createRequest := NewPublishRequest(17, 1, seq) + Expect(createRequest.Key()).To(BeNumerically("==", 0x0002)) + Expect(createRequest.Version()).To(BeNumerically("==", 1)) + + expectedSize := streamProtocolKeySizeBytes + + streamProtocolVersionSizeBytes + + streamProtocolKeySizeUint8 + // publisherId + streamProtocolKeySizeUint32 + // message count + +len(seq) // message body + + Expect(createRequest.SizeNeeded()).To(BeNumerically("==", expectedSize)) + }) + + It("encodes itself into a binary sequence", func() { + seq := []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x64} // publishingId sequence number 100 + seq = append(seq, []byte{0x00, 0x00, 0x00, 0x03}...) //message size + seq = append(seq, []byte("foo")...) // message body + + c := NewPublishRequest(17, 1, seq) + buff := new(bytes.Buffer) + wr := bufio.NewWriter(buff) + Expect(c.Write(wr)).To(BeNumerically("==", c.SizeNeeded()-streamProtocolHeaderSizeBytes)) + Expect(wr.Flush()).To(Succeed()) + + expectedByteSequence := []byte{ + 0x11, // publisher ID 17 + 0x00, 0x00, 0x00, 0x01, // message count 1 + + } + expectedByteSequence = append(expectedByteSequence, seq...) + Expect(buff.Bytes()).To(Equal(expectedByteSequence)) + }) + + It("encodes fake message", func() { + buffFakeMessages := new(bytes.Buffer) + writerFakeMessages := bufio.NewWriter(buffFakeMessages) + + // we simulate a message aggregation + // that can be done by the client + // First loop to aggregate the messages + var fakeMessages []common.StreamerMessage + fakeMessages = append(fakeMessages, NewFakeMessage(17, []byte("foo"))) + fakeMessages = append(fakeMessages, NewFakeMessage(21, []byte("wine"))) + fakeMessages = append(fakeMessages, NewFakeMessage(23, []byte("beer"))) + fakeMessages = append(fakeMessages, NewFakeMessage(29, []byte("water"))) + + // It is time to prepare the buffer to send to the server + for _, fakeMessage := range fakeMessages { + Expect(fakeMessage.WriteTo(writerFakeMessages)).To(BeNumerically("==", 8+4+len(fakeMessage.Body()))) + } + Expect(writerFakeMessages.Flush()).To(Succeed()) + + c := NewPublishRequest(200, uint32(len(fakeMessages)), buffFakeMessages.Bytes()) + buff := new(bytes.Buffer) + wr := bufio.NewWriter(buff) + Expect(c.Write(wr)).To(BeNumerically("==", c.SizeNeeded()-streamProtocolHeaderSizeBytes)) + Expect(wr.Flush()).To(Succeed()) + + }) + + }) + + Context("Response", func() { + It("decodes a binary sequence", func() { + buff := new(bytes.Buffer) + wr := bufio.NewWriter(buff) + _, err := writeMany(wr, uint32(12345), uint16(101)) + Expect(err).ToNot(HaveOccurred()) + Expect(wr.Flush()).To(Succeed()) + + response := SimpleResponse{} + Expect(response.Read(bufio.NewReader(buff))).To(Succeed()) + + Expect(response.correlationId).To(BeNumerically("==", 12345)) + Expect(response.responseCode).To(BeNumerically("==", 101)) + }) + }) +}) diff --git a/internal/sasl_mechanisms.go b/internal/sasl_mechanisms.go index 7d75a1ea..085facfc 100644 --- a/internal/sasl_mechanisms.go +++ b/internal/sasl_mechanisms.go @@ -5,7 +5,7 @@ import ( "bytes" "encoding" "fmt" - "github.com/gsantomaggio/rabbitmq-stream-go-client/pkg/common" + "github.com/gsantomaggio/rabbitmq-stream-go-client/pkg/constants" ) type SaslHandshakeRequest struct { @@ -255,7 +255,7 @@ func (s *SaslAuthenticateResponse) Read(reader *bufio.Reader) error { return err } - if s.responseCode == common.ResponseCodeSASLChallenge { + if s.responseCode == constants.ResponseCodeSASLChallenge { challengeResponse, err := readByteSlice(reader) if err != nil { return err diff --git a/internal/simple_response_test.go b/internal/simple_response_test.go index b502379a..7c13d260 100644 --- a/internal/simple_response_test.go +++ b/internal/simple_response_test.go @@ -7,7 +7,7 @@ import ( . "github.com/onsi/gomega" ) -var _ = Describe("SaslMechanisms", func() { +var _ = Describe("Simple Response", func() { Context("Response", func() { It("decodes a binary sequence", func() { buff := new(bytes.Buffer) diff --git a/internal/test_helper.go b/internal/test_helper.go new file mode 100644 index 00000000..1f21311c --- /dev/null +++ b/internal/test_helper.go @@ -0,0 +1,37 @@ +//go:build rabbitmq.stream.test + +package internal + +import ( + "io" +) + +type FakeMessage struct { + publishingId uint64 + body []byte +} + +func (f *FakeMessage) WriteTo(writer io.Writer) (int64, error) { + n, err := writeMany(writer, f.publishingId, len(f.body), f.body) + return int64(n), err +} + +func (f *FakeMessage) SetPublishingId(publishingId uint64) { + f.publishingId = publishingId +} + +func (f *FakeMessage) GetPublishingId() uint64 { + return f.publishingId +} + +func (f *FakeMessage) SetBody(body []byte) { + f.body = body +} + +func (f *FakeMessage) Body() []byte { + return f.body +} + +func NewFakeMessage(publishingId uint64, body []byte) *FakeMessage { + return &FakeMessage{publishingId: publishingId, body: body} +} diff --git a/internal/tune.go b/internal/tune.go index 2c8671c2..ad60b5e6 100644 --- a/internal/tune.go +++ b/internal/tune.go @@ -2,11 +2,11 @@ package internal import ( "bufio" - "github.com/gsantomaggio/rabbitmq-stream-go-client/pkg/common" + "github.com/gsantomaggio/rabbitmq-stream-go-client/pkg/constants" ) // TuneRequest is initiated by the server. It's the few commands where the request is initiated from the server -// In this case, request implements the internal.CommandRead interface, as opposed to other requests. +// In this case, request implements the internal.SyncCommandRead interface, as opposed to other requests. type TuneRequest struct { frameMaxSize uint32 heartbeatPeriod uint32 @@ -23,7 +23,7 @@ func (t *TuneRequest) HeartbeatPeriod() uint32 { // ResponseCode is always OK for Tune frames. Tune frames do not have a response code. This function is // implemented to conform with the interface func (t *TuneRequest) ResponseCode() uint16 { - return common.ResponseCodeOK + return constants.ResponseCodeOK } func (t *TuneRequest) Read(reader *bufio.Reader) error { @@ -35,13 +35,13 @@ func (t *TuneRequest) Read(reader *bufio.Reader) error { } // CorrelationId always returns 0. Tune frames do not have a correlation ID. This function is a placeholder to conform -// to the internal.CommandRead interface. +// to the internal.SyncCommandRead interface. func (t *TuneRequest) CorrelationId() uint32 { return 0 } // TuneResponse is sent by the client. It's the few commands where the server sends a request and expects a response. -// In this case, the response implements the internal.CommandWrite interface, as opposed to other responses. +// In this case, the response implements the internal.SyncCommandWrite interface, as opposed to other responses. type TuneResponse struct { TuneRequest } @@ -69,7 +69,7 @@ func (t *TuneResponse) SizeNeeded() int { } // SetCorrelationId is a no-op. Tune frames do not have a correlation ID. This function is a placeholder to conform to -// the internal.CommandWrite interface. +// the internal.SyncCommandWrite interface. func (t *TuneResponse) SetCorrelationId(correlationId uint32) {} func (t *TuneResponse) Version() int16 { diff --git a/internal/wire_formatting.go b/internal/wire_formatting.go index a56f6fdf..558b01db 100644 --- a/internal/wire_formatting.go +++ b/internal/wire_formatting.go @@ -3,15 +3,16 @@ package internal import ( "bufio" "encoding/binary" + "io" ) -func readUShort(readerStream *bufio.Reader) (uint16, error) { +func readUShort(readerStream io.Reader) (uint16, error) { var res uint16 err := binary.Read(readerStream, binary.BigEndian, &res) return res, err } -func readUInt(readerStream *bufio.Reader) (uint32, error) { +func readUInt(readerStream io.Reader) (uint32, error) { var res uint32 err := binary.Read(readerStream, binary.BigEndian, &res) return res, err @@ -25,7 +26,7 @@ func peekByte(readerStream *bufio.Reader) (uint8, error) { return res[0], nil } -func readString(readerStream *bufio.Reader) string { +func readString(readerStream io.Reader) string { // FIXME: handle the potential error from readUShort lenString, _ := readUShort(readerStream) buff := make([]byte, lenString) @@ -33,7 +34,7 @@ func readString(readerStream *bufio.Reader) string { return string(buff) } -func readByteSlice(readerStream *bufio.Reader) (data []byte, err error) { +func readByteSlice(readerStream io.Reader) (data []byte, err error) { numEntries, err := readUShort(readerStream) if err != nil { return nil, err @@ -54,7 +55,7 @@ func ExtractCommandCode(code uint16) uint16 { return code & 0b0111_1111_1111_1111 } -func readMany(readerStream *bufio.Reader, args ...interface{}) error { +func readMany(readerStream io.Reader, args ...interface{}) error { for _, arg := range args { err := readAny(readerStream, arg) if err != nil { @@ -64,7 +65,7 @@ func readMany(readerStream *bufio.Reader, args ...interface{}) error { return nil } -func readAny(readerStream *bufio.Reader, arg interface{}) error { +func readAny(readerStream io.Reader, arg interface{}) error { switch arg.(type) { case *int: @@ -104,8 +105,11 @@ func readAny(readerStream *bufio.Reader, arg interface{}) error { } return nil } +func WriteMany(writer io.Writer, args ...any) (int, error) { + return writeMany(writer, args...) +} -func writeMany(writer *bufio.Writer, args ...interface{}) (int, error) { +func writeMany(writer io.Writer, args ...any) (int, error) { var written int for _, arg := range args { @@ -152,7 +156,7 @@ func writeMany(writer *bufio.Writer, args ...interface{}) (int, error) { return written, nil } -func writeString(writer *bufio.Writer, value string) (nn int, err error) { +func writeString(writer io.Writer, value string) (nn int, err error) { shortLen, err := writeMany(writer, uint16(len(value))) if err != nil { return 0, err diff --git a/main.go b/main.go index b8057408..afeacd69 100644 --- a/main.go +++ b/main.go @@ -3,12 +3,16 @@ package main import ( "bufio" "context" + "encoding/binary" "fmt" "github.com/bombsimon/logrusr/v3" "github.com/go-logr/logr" + "github.com/gsantomaggio/rabbitmq-stream-go-client/pkg/common" "github.com/gsantomaggio/rabbitmq-stream-go-client/pkg/raw" "github.com/sirupsen/logrus" + "io" "os" + "time" ) func main() { @@ -28,7 +32,6 @@ func main() { log.Error(err, "error in dial") panic(err) } - log.Info("connection status", "open", streamClient.IsOpen()) err = streamClient.DeclareStream(ctx, stream, map[string]string{"name": "test-stream"}) @@ -43,6 +46,30 @@ func main() { panic(err) } + fmt.Println("Start sending messages") + var id uint64 + startTime := time.Now() + for j := 0; j < 100_000; j++ { + var fakeMessages []common.PublishingMessager + for i := 0; i < 100; i++ { + fakeMessages = append(fakeMessages, + raw.NewPublishingMessage(id, NewFakeMessage([]byte(fmt.Sprintf("message %d", i))))) + + id++ // increment the id + } + + err = streamClient.Send(ctx, 1, fakeMessages) + if err != nil { + log.Error(err, "error in sending messages") + panic(err) + } + } + fmt.Println("End sending messages") + fmt.Printf("Sent %d in : %s \n", id, time.Since(startTime)) + + fmt.Println("Press any key to stop ") + reader := bufio.NewReader(os.Stdin) + _, _ = reader.ReadString('\n') err = streamClient.DeletePublisher(ctx, 1) if err != nil { log.Error(err, "error in deleting publisher") @@ -51,12 +78,8 @@ func main() { err = streamClient.DeleteStream(ctx, stream) if err != nil { - log.Error(err, "error in deleting stream") - panic(err) + return } - fmt.Println("Press any key to stop ") - reader := bufio.NewReader(os.Stdin) - _, _ = reader.ReadString('\n') log.Info("closing connection") err = streamClient.Close(ctx) @@ -66,3 +89,35 @@ func main() { } log.Info("connection status", "open", streamClient.IsOpen()) } + +type FakeMessage struct { + body []byte +} + +func (f *FakeMessage) WriteTo(writer io.Writer) (int64, error) { + written := 0 + err := binary.Write(writer, binary.BigEndian, uint32(len(f.body))) + if err != nil { + panic(err) + } + written += binary.Size(uint32(len(f.body))) + + err = binary.Write(writer, binary.BigEndian, f.body) + if err != nil { + panic(err) + } + written += binary.Size(f.body) + return int64(written), nil +} + +func (f *FakeMessage) SetBody(body []byte) { + f.body = body +} + +func (f *FakeMessage) Body() []byte { + return f.body +} + +func NewFakeMessage(body []byte) *FakeMessage { + return &FakeMessage{body: body} +} diff --git a/pkg/common/configurations.go b/pkg/common/configurations.go deleted file mode 100644 index 0feb1485..00000000 --- a/pkg/common/configurations.go +++ /dev/null @@ -1,16 +0,0 @@ -package common - -import "context" - -type Clienter interface { - Connect(ctx context.Context) error - DeclareStream(ctx context.Context, stream string, configuration StreamConfiguration) error - DeleteStream(ctx context.Context, stream string) error - DeclarePublisher(ctx context.Context, publisherId uint8, publisherReference string, stream string) error - DeletePublisher(ctx context.Context, publisherId uint8) error - IsOpen() bool - Close(ctx context.Context) error -} - -// TODO: godocs -type StreamConfiguration = map[string]string diff --git a/pkg/common/types.go b/pkg/common/types.go index 74d71cc5..0f799581 100644 --- a/pkg/common/types.go +++ b/pkg/common/types.go @@ -1,50 +1,32 @@ package common import ( - "errors" + "context" + "github.com/gsantomaggio/rabbitmq-stream-go-client/pkg/constants" + "io" ) -var ResponseCodeToError = map[uint16]error{ - ResponseCodeOK: nil, // this is a special case where there is not error - ResponseCodeStreamDoesNotExist: errors.New("stream does not exist"), - ResponseCodeSubscriptionIdAlreadyExists: errors.New("subscription ID already exists"), - ResponseCodeSubscriptionIdDoesNotExist: errors.New("subscription ID does not exist"), - ResponseCodeStreamAlreadyExists: errors.New("stream already exists"), - ResponseCodeStreamNotAvailable: errors.New("stream not available"), - ResponseCodeSASLMechanismNotSupported: errors.New("SASL mechanism not supported"), - ResponseCodeAuthFailure: errors.New("authentication failure"), - ResponseCodeSASLError: errors.New("SASL error"), - ResponseCodeSASLChallenge: errors.New("SASL challenge"), - ResponseCodeSASLAuthFailureLoopback: errors.New("SASL authentication failure loopback"), - ResponseCodeVirtualHostAccessFailure: errors.New("virtual host access failure"), - ResponseCodeUnknownFrame: errors.New("unknown frame"), - ResponseCodeFrameTooLarge: errors.New("frame too large"), - ResponseCodeInternalError: errors.New("internal error"), - ResponseCodeAccessRefused: errors.New("access refused"), - ResponseCodePreconditionFailed: errors.New("precondition failed"), - ResponseCodePublisherDoesNotExist: errors.New("publisher does not exist"), - ResponseCodeNoOffset: errors.New("no offset"), +type Clienter interface { + Connect(ctx context.Context) error + DeclareStream(ctx context.Context, stream string, configuration constants.StreamConfiguration) error + DeleteStream(ctx context.Context, stream string) error + DeclarePublisher(ctx context.Context, publisherId uint8, publisherReference string, stream string) error + Send(ctx context.Context, publisherId uint8, messages []PublishingMessager) error + DeletePublisher(ctx context.Context, publisherId uint8) error + IsOpen() bool + Close(ctx context.Context) error } -// Stream protocol response codes -const ( - ResponseCodeOK uint16 = 0x01 - ResponseCodeStreamDoesNotExist uint16 = 0x02 - ResponseCodeSubscriptionIdAlreadyExists uint16 = 0x03 - ResponseCodeSubscriptionIdDoesNotExist uint16 = 0x04 - ResponseCodeStreamAlreadyExists uint16 = 0x05 - ResponseCodeStreamNotAvailable uint16 = 0x06 - ResponseCodeSASLMechanismNotSupported uint16 = 0x07 - ResponseCodeAuthFailure uint16 = 0x08 - ResponseCodeSASLError uint16 = 0x09 - ResponseCodeSASLChallenge uint16 = 0x0a - ResponseCodeSASLAuthFailureLoopback uint16 = 0x0b - ResponseCodeVirtualHostAccessFailure uint16 = 0x0c - ResponseCodeUnknownFrame uint16 = 0x0d - ResponseCodeFrameTooLarge uint16 = 0x0e - ResponseCodeInternalError uint16 = 0x0f - ResponseCodeAccessRefused uint16 = 0x10 - ResponseCodePreconditionFailed uint16 = 0x11 - ResponseCodePublisherDoesNotExist uint16 = 0x12 - ResponseCodeNoOffset uint16 = 0x13 -) +type StreamerMessage interface { + io.WriterTo + SetBody(body []byte) + Body() []byte +} + +type PublishingMessager interface { + io.WriterTo + SetPublishingId(publishingId uint64) + PublishingId() uint64 + SetMessage(message StreamerMessage) + Message() StreamerMessage +} diff --git a/pkg/constants/types.go b/pkg/constants/types.go new file mode 100644 index 00000000..7e4aa599 --- /dev/null +++ b/pkg/constants/types.go @@ -0,0 +1,52 @@ +package constants + +import ( + "errors" +) + +var ResponseCodeToError = map[uint16]error{ + ResponseCodeOK: nil, // this is a special case where there is not error + ResponseCodeStreamDoesNotExist: errors.New("stream does not exist"), + ResponseCodeSubscriptionIdAlreadyExists: errors.New("subscription ID already exists"), + ResponseCodeSubscriptionIdDoesNotExist: errors.New("subscription ID does not exist"), + ResponseCodeStreamAlreadyExists: errors.New("stream already exists"), + ResponseCodeStreamNotAvailable: errors.New("stream not available"), + ResponseCodeSASLMechanismNotSupported: errors.New("SASL mechanism not supported"), + ResponseCodeAuthFailure: errors.New("authentication failure"), + ResponseCodeSASLError: errors.New("SASL error"), + ResponseCodeSASLChallenge: errors.New("SASL challenge"), + ResponseCodeSASLAuthFailureLoopback: errors.New("SASL authentication failure loopback"), + ResponseCodeVirtualHostAccessFailure: errors.New("virtual host access failure"), + ResponseCodeUnknownFrame: errors.New("unknown frame"), + ResponseCodeFrameTooLarge: errors.New("frame too large"), + ResponseCodeInternalError: errors.New("internal error"), + ResponseCodeAccessRefused: errors.New("access refused"), + ResponseCodePreconditionFailed: errors.New("precondition failed"), + ResponseCodePublisherDoesNotExist: errors.New("publisher does not exist"), + ResponseCodeNoOffset: errors.New("no offset"), +} + +// Stream protocol response codes +const ( + ResponseCodeOK uint16 = 0x01 + ResponseCodeStreamDoesNotExist uint16 = 0x02 + ResponseCodeSubscriptionIdAlreadyExists uint16 = 0x03 + ResponseCodeSubscriptionIdDoesNotExist uint16 = 0x04 + ResponseCodeStreamAlreadyExists uint16 = 0x05 + ResponseCodeStreamNotAvailable uint16 = 0x06 + ResponseCodeSASLMechanismNotSupported uint16 = 0x07 + ResponseCodeAuthFailure uint16 = 0x08 + ResponseCodeSASLError uint16 = 0x09 + ResponseCodeSASLChallenge uint16 = 0x0a + ResponseCodeSASLAuthFailureLoopback uint16 = 0x0b + ResponseCodeVirtualHostAccessFailure uint16 = 0x0c + ResponseCodeUnknownFrame uint16 = 0x0d + ResponseCodeFrameTooLarge uint16 = 0x0e + ResponseCodeInternalError uint16 = 0x0f + ResponseCodeAccessRefused uint16 = 0x10 + ResponseCodePreconditionFailed uint16 = 0x11 + ResponseCodePublisherDoesNotExist uint16 = 0x12 + ResponseCodeNoOffset uint16 = 0x13 +) + +type StreamConfiguration = map[string]string diff --git a/pkg/raw/client.go b/pkg/raw/client.go index 4e2f7786..bb9b6972 100644 --- a/pkg/raw/client.go +++ b/pkg/raw/client.go @@ -1,12 +1,14 @@ package raw import ( + "bytes" "context" "errors" "fmt" "github.com/go-logr/logr" "github.com/gsantomaggio/rabbitmq-stream-go-client/internal" "github.com/gsantomaggio/rabbitmq-stream-go-client/pkg/common" + "github.com/gsantomaggio/rabbitmq-stream-go-client/pkg/constants" "io" "math" "net" @@ -18,11 +20,11 @@ import ( type correlation struct { id uint32 - chResponse chan internal.CommandRead + chResponse chan internal.SyncCommandRead } func newCorrelation(id uint32) *correlation { - return &correlation{chResponse: make(chan internal.CommandRead), + return &correlation{chResponse: make(chan internal.SyncCommandRead), id: id} } @@ -39,7 +41,7 @@ func (c *correlation) Close() { type Client struct { mu sync.Mutex // this channel is used for correlation-less incoming frames from the server - frameBodyListener chan internal.CommandRead + frameBodyListener chan internal.SyncCommandRead isOpen bool connection *internal.Connection correlationsMap sync.Map @@ -59,7 +61,7 @@ func (tc *Client) IsOpen() bool { // to establish a connection to RabbitMQ servers. func NewClient(connection net.Conn, configuration *ClientConfiguration) common.Clienter { rawClient := &Client{ - frameBodyListener: make(chan internal.CommandRead), + frameBodyListener: make(chan internal.SyncCommandRead), connection: internal.NewConnection(connection), isOpen: false, correlationsMap: sync.Map{}, @@ -77,7 +79,7 @@ func (tc *Client) getCorrelationById(id uint32) *correlation { return nil } -func (tc *Client) storeCorrelation(request internal.CommandWrite) { +func (tc *Client) storeCorrelation(request internal.SyncCommandWrite) { request.SetCorrelationId(tc.getNextCorrelation()) tc.correlationsMap.Store(request.CorrelationId(), newCorrelation(request.CorrelationId())) } @@ -102,24 +104,9 @@ func (tc *Client) getNextCorrelation() uint32 { // end correlation map section -func (tc *Client) writeCommand(request internal.CommandWrite) error { - hWritten, err := internal.NewHeaderRequest(request).Write(tc.connection.GetWriter()) - if err != nil { - return err - } - bWritten, err := request.Write(tc.connection.GetWriter()) - if err != nil { - return err - } - if (bWritten + hWritten) != (request.SizeNeeded() + 4) { - panic("Write Command: Not all bytes written") - } - return tc.connection.GetWriter().Flush() -} - -// This makes an RPC-style request. We send the frame, and we await for a response. The context is passed down from the +// This makes an RPC-style syncRequest. We send the frame, and we await for a response. The context is passed down from the // public functions. Context should have a deadline/timeout to avoid deadlocks on a non-responding RabbitMQ server. -func (tc *Client) request(ctx context.Context, request internal.CommandWrite) (internal.CommandRead, error) { +func (tc *Client) syncRequest(ctx context.Context, request internal.SyncCommandWrite) (internal.SyncCommandRead, error) { if ctx == nil { return nil, errNilContext } @@ -130,20 +117,20 @@ func (tc *Client) request(ctx context.Context, request internal.CommandWrite) (i return nil, ctx.Err() } - logger := logr.FromContextOrDiscard(ctx).WithName("request") + logger := logr.FromContextOrDiscard(ctx).WithName("syncRequest") tc.storeCorrelation(request) defer tc.removeCorrelation(ctx, request.CorrelationId()) - logger.V(traceLevel).Info("writing command to the wire", "request", request) - err := tc.writeCommand(request) + logger.V(traceLevel).Info("writing sync command to the wire", "syncRequest", request) + err := internal.WriteCommand(request, tc.connection.GetWriter()) if err != nil { logger.Error(err, "error writing command to the wire") return nil, err } if _, ok := ctx.Deadline(); !ok { - logger.Info("request does not have a timeout, consider adding a deadline to context") + logger.Info("syncRequest does not have a timeout, consider adding a deadline to context") } select { case <-ctx.Done(): @@ -153,7 +140,20 @@ func (tc *Client) request(ctx context.Context, request internal.CommandWrite) (i } } -func (tc *Client) handleResponse(ctx context.Context, read internal.CommandRead) { +// This makes an async-style syncRequest. We send the frame, and we don't wait for a response. The context is passed down from the +// public functions. Context should have a deadline/timeout to avoid deadlocks on a non-responding RabbitMQ server. +// `syncRequest` is the frame to send to the server that does not expect a response. +func (tc *Client) request(ctx context.Context, request internal.CommandWrite) error { + if ctx == nil { + return errNilContext + } + // TODO: check if context is canceled before proceeding to WriteCommand + logger := logr.FromContextOrDiscard(ctx) + logger.V(traceLevel).Info("writing command to the wire", "syncRequest", request) + return internal.WriteCommand(request, tc.connection.GetWriter()) +} + +func (tc *Client) handleResponse(ctx context.Context, read internal.SyncCommandRead) { var logger logr.Logger if ctx != nil { logger = logr.FromContextOrDiscard(ctx) @@ -247,7 +247,7 @@ func (tc *Client) handleIncoming(ctx context.Context) error { closeReq := new(internal.CloseRequest) err = closeReq.Read(buffer) if err != nil { - log.Error(err, "error decoding close request") + log.Error(err, "error decoding close syncRequest") return err } @@ -276,6 +276,14 @@ func (tc *Client) handleIncoming(ctx context.Context) error { return err } tc.handleResponse(ctx, createResp) + case internal.CommandPublishConfirm: + publishConfirm := new(internal.PublishConfirmResponse) + err = publishConfirm.Read(buffer) + if err != nil { + log.Error(err, "error decoding publish confirm") + return err + } + default: log.Info("frame not implemented", "command ID", fmt.Sprintf("%X", header.Command())) } @@ -290,9 +298,9 @@ func (tc *Client) peerProperties(ctx context.Context) error { log := logr.FromContextOrDiscard(ctx).WithName("peer properties") log.V(traceLevel).Info("starting peer properties") - serverPropertiesResponse, err := tc.request(ctx, internal.NewPeerPropertiesRequest()) + serverPropertiesResponse, err := tc.syncRequest(ctx, internal.NewPeerPropertiesRequest()) if err != nil { - log.Error(err, "error in request to server") + log.Error(err, "error in syncRequest to server") return err } response, ok := serverPropertiesResponse.(*internal.PeerPropertiesResponse) @@ -314,7 +322,7 @@ func (tc *Client) saslHandshake(ctx context.Context) error { log := logr.FromContextOrDiscard(ctx).WithName("sasl handshake") log.V(traceLevel).Info("starting SASL handshake") - saslMechanisms, err := tc.request(ctx, internal.NewSaslHandshakeRequest()) + saslMechanisms, err := tc.syncRequest(ctx, internal.NewSaslHandshakeRequest()) saslMechanismResponse, ok := saslMechanisms.(*internal.SaslHandshakeResponse) if !ok { panic("could not polymorph response") @@ -353,7 +361,7 @@ func (tc *Client) saslAuthenticate(ctx context.Context) error { return err } - response, err := tc.request(ctx, saslAuthReq) + response, err := tc.syncRequest(ctx, saslAuthReq) if err != nil { // no need to log here. the caller logs the error return err @@ -373,9 +381,9 @@ func (tc *Client) open(ctx context.Context, brokerIndex int) error { rabbit := tc.configuration.rabbitmqBrokers[brokerIndex] openReq := internal.NewOpenRequest(rabbit.Vhost) - openRespCommand, err := tc.request(ctx, openReq) + openRespCommand, err := tc.syncRequest(ctx, openReq) if err != nil { - log.Error(err, "error in open request") + log.Error(err, "error in open syncRequest") return err } @@ -384,7 +392,7 @@ func (tc *Client) open(ctx context.Context, brokerIndex int) error { panic("could not polymorph response") } log.V(debugLevel).Info( - "open request success", + "open syncRequest success", "connection properties", openResp.ConnectionProperties(), ) @@ -420,7 +428,7 @@ func (tc *Client) handleClose(ctx context.Context, req *internal.CloseRequest) e return err } - bdy := internal.NewSimpleResponseWith(req.CorrelationId(), common.ResponseCodeOK) + bdy := internal.NewSimpleResponseWith(req.CorrelationId(), constants.ResponseCodeOK) b, err := bdy.MarshalBinary() if err != nil { return err @@ -585,7 +593,7 @@ func (tc *Client) Connect(ctx context.Context) error { case tuneReqCommand := <-tc.frameBodyListener: tuneReq, ok := tuneReqCommand.(*internal.TuneRequest) if !ok { - panic("could not polymorph CommandRead into TuneRequest") + panic("could not polymorph SyncCommandRead into TuneRequest") } desiredFrameSize := math.Min(float64(tuneReq.FrameMaxSize()), float64(tc.configuration.clientMaxFrameSize)) @@ -599,7 +607,7 @@ func (tc *Client) Connect(ctx context.Context) error { desiredHeartbeat, ) tuneResp := internal.NewTuneResponse(uint32(desiredFrameSize), uint32(desiredHeartbeat)) - err = tc.writeCommand(tuneResp) + err = internal.WriteCommand(tuneResp, tc.connection.GetWriter()) if err != nil { logger.Error(err, "error in Tune") tuneCancel() @@ -632,9 +640,9 @@ func (tc *Client) Connect(ctx context.Context) error { return nil } -// DeclareStream sends a request to create a new Stream. If the error is nil, the +// DeclareStream sends a syncRequest to create a new Stream. If the error is nil, the // Stream was created successfully, and it is ready to use. -func (tc *Client) DeclareStream(ctx context.Context, stream string, configuration common.StreamConfiguration) error { +func (tc *Client) DeclareStream(ctx context.Context, stream string, configuration constants.StreamConfiguration) error { if ctx == nil { return errNilContext } @@ -642,7 +650,7 @@ func (tc *Client) DeclareStream(ctx context.Context, stream string, configuratio log := logr.FromContextOrDiscard(ctx).WithName("DeclareStream") log.V(debugLevel).Info("starting declare stream. ", "stream", stream) - createResponse, err := tc.request(ctx, internal.NewCreateRequest(stream, configuration)) + createResponse, err := tc.syncRequest(ctx, internal.NewCreateRequest(stream, configuration)) if err != nil { log.Error(err, "error declaring stream", "stream", stream, "stream-args", configuration) return err @@ -650,7 +658,7 @@ func (tc *Client) DeclareStream(ctx context.Context, stream string, configuratio return streamErrorOrNil(createResponse.ResponseCode()) } -// DeleteStream sends a request to delete a Stream. If the error is nil, the +// DeleteStream sends a syncRequest to delete a Stream. If the error is nil, the // Stream was deleted successfully. func (tc *Client) DeleteStream(ctx context.Context, stream string) error { if ctx == nil { @@ -660,7 +668,7 @@ func (tc *Client) DeleteStream(ctx context.Context, stream string) error { log := logr.FromContextOrDiscard(ctx).WithName("DeleteStream") log.V(debugLevel).Info("starting delete stream. ", "stream", stream) - deleteResponse, err := tc.request(ctx, internal.NewDeleteRequest(stream)) + deleteResponse, err := tc.syncRequest(ctx, internal.NewDeleteRequest(stream)) if err != nil { log.Error(err, "error deleting stream", "stream", stream) return err @@ -668,7 +676,7 @@ func (tc *Client) DeleteStream(ctx context.Context, stream string) error { return streamErrorOrNil(deleteResponse.ResponseCode()) } -// DeclarePublisher sends a request to create a new Publisher. If the error is +// DeclarePublisher sends a syncRequest to create a new Publisher. If the error is // nil, the Publisher was created successfully. // publisherId is the ID of the publisher to create. The publisherId is not tracked in this level of the client. // The publisherId is used to identify the publisher in the server. Per connection @@ -682,7 +690,7 @@ func (tc *Client) DeclarePublisher(ctx context.Context, publisherId uint8, publi log := logr.FromContextOrDiscard(ctx).WithName("DeclarePublisher") log.V(debugLevel).Info("starting declare publisher. ", "publisherId", publisherId, "publisherReference", publisherReference, "stream", stream) - deleteResponse, err := tc.request(ctx, internal.NewDeclarePublisherRequest(publisherId, publisherReference, stream)) + deleteResponse, err := tc.syncRequest(ctx, internal.NewDeclarePublisherRequest(publisherId, publisherReference, stream)) if err != nil { log.Error(err, "error declaring publisher", "publisherId", publisherId, "publisherReference", publisherReference, "stream", stream) return err @@ -690,7 +698,39 @@ func (tc *Client) DeclarePublisher(ctx context.Context, publisherId uint8, publi return streamErrorOrNil(deleteResponse.ResponseCode()) } -// DeletePublisher sends a request to delete a Publisher. If the error is nil, +// Send publishes a batch of messages for a given publisher. The messages to be +// published must have a publishing ID and a function to write the body to an +// io.Writer. The first parameter is a context.Context. The context will be +// checked before writing messages to the wire. This function publishes in a +// "fire and forget" fashion; this means it will not wait for a response from the +// server after sending the messages over the network. +// +// The publisher ID is the same ID used in a DeclarePublisher function call. The +// application must keep track of this ID for sending messages. +// +// The slice of common.PublishingMessager is a batch of messages to be sent. Note +// that RabbitMQ Stream protocol does not specify a format for the messages. This +// flexibility allows to send, in a single "Publish" frame, a batch of +// application messages; for example, a batch of AMQP 1.0 messages. +func (tc *Client) Send(ctx context.Context, publisherId uint8, publishingMessages []common.PublishingMessager) error { + if ctx == nil { + return errNilContext + } + + logger := logr.FromContextOrDiscard(ctx).WithName("Send") + logger.V(debugLevel).Info("starting send", "publisherId", publisherId, "message-count", len(publishingMessages)) + + buff := new(bytes.Buffer) + for _, msg := range publishingMessages { + _, err := msg.WriteTo(buff) + if err != nil { + return err + } + } + return tc.request(ctx, internal.NewPublishRequest(publisherId, uint32(len(publishingMessages)), buff.Bytes())) +} + +// DeletePublisher sends a syncRequest to delete a Publisher. If the error is nil, // the Publisher was deleted successfully. // publisherId is the ID of the publisher to delete. // publisherId is not tracked in this level of the client. @@ -702,9 +742,9 @@ func (tc *Client) DeletePublisher(ctx context.Context, publisherId uint8) error log := logr.FromContextOrDiscard(ctx).WithName("DeletePublisher") log.V(debugLevel).Info("starting delete publisher. ", "publisherId", publisherId) - deleteResponse, err := tc.request(ctx, internal.NewDeletePublisherRequest(publisherId)) + deleteResponse, err := tc.syncRequest(ctx, internal.NewDeletePublisherRequest(publisherId)) if err != nil { - log.Error(err, "error creating delete publisher request ", "publisherId", publisherId) + log.Error(err, "error creating delete publisher syncRequest ", "publisherId", publisherId) return err } return streamErrorOrNil(deleteResponse.ResponseCode()) @@ -712,7 +752,7 @@ func (tc *Client) DeletePublisher(ctx context.Context, publisherId uint8) error } // Close gracefully shutdowns the connection to RabbitMQ. The Client will send a -// close request to RabbitMQ, and it will await a response. It is recommended to +// close syncRequest to RabbitMQ, and it will await a response. It is recommended to // set a deadline in the context, to avoid waiting forever on a non-responding // RabbitMQ server. func (tc *Client) Close(ctx context.Context) error { @@ -723,9 +763,9 @@ func (tc *Client) Close(ctx context.Context) error { log := logr.FromContextOrDiscard(ctx).WithName("close") log.V(debugLevel).Info("starting connection close") - response, err := tc.request(ctx, internal.NewCloseRequest(common.ResponseCodeOK, "kthxbye")) + response, err := tc.syncRequest(ctx, internal.NewCloseRequest(constants.ResponseCodeOK, "kthxbye")) if err != nil { - log.Error(err, "error sending close request") + log.Error(err, "error sending close syncRequest") return err } diff --git a/pkg/raw/client_test.go b/pkg/raw/client_test.go index 80affa50..7e703ed3 100644 --- a/pkg/raw/client_test.go +++ b/pkg/raw/client_test.go @@ -5,6 +5,7 @@ import ( "github.com/go-logr/logr" "github.com/golang/mock/gomock" "github.com/gsantomaggio/rabbitmq-stream-go-client/pkg/common" + "github.com/gsantomaggio/rabbitmq-stream-go-client/pkg/constants" "github.com/gsantomaggio/rabbitmq-stream-go-client/pkg/raw" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" @@ -121,9 +122,9 @@ var _ = Describe("Client", func() { streamClient := raw.NewClient(fakeClientConn, conf) go streamClient.(*raw.Client).StartFrameListener(itCtx) - go fakeRabbitMQ.fakeRabbitMQDeclareStream(newContextWithResponseCode(itCtx, 0x0001), "test-stream", common.StreamConfiguration{"some-key": "some-value"}) + go fakeRabbitMQ.fakeRabbitMQDeclareStream(newContextWithResponseCode(itCtx, 0x0001), "test-stream", constants.StreamConfiguration{"some-key": "some-value"}) - err = streamClient.DeclareStream(itCtx, "test-stream", common.StreamConfiguration{"some-key": "some-value"}) + err = streamClient.DeclareStream(itCtx, "test-stream", constants.StreamConfiguration{"some-key": "some-value"}) Expect(err).To(Succeed()) }) @@ -208,8 +209,8 @@ var _ = Describe("Client", func() { streamClient := raw.NewClient(fakeClientConn, c) go streamClient.(*raw.Client).StartFrameListener(ctx2) - go fakeRabbitMQ.fakeRabbitMQDeclareStream(ctx2, "already-exists", common.StreamConfiguration{}) - Expect(streamClient.DeclareStream(ctx, "already-exists", common.StreamConfiguration{})).To(MatchError("stream already exists")) + go fakeRabbitMQ.fakeRabbitMQDeclareStream(ctx2, "already-exists", constants.StreamConfiguration{}) + Expect(streamClient.DeclareStream(ctx, "already-exists", constants.StreamConfiguration{})).To(MatchError("stream already exists")) }, SpecTimeout(1500*time.Millisecond)) }) @@ -263,7 +264,7 @@ var _ = Describe("Client", func() { Expect(client.Connect(ctx2)).To(MatchError("context canceled")) By("not blocking in stream declaration") - Expect(client.DeclareStream(ctx2, "not-created", common.StreamConfiguration{})). + Expect(client.DeclareStream(ctx2, "not-created", constants.StreamConfiguration{})). To(MatchError("context canceled")) By("not blocking on stream deletion") diff --git a/pkg/raw/mock_conn_test.go b/pkg/raw/mock_conn_test.go index 3055cfd8..97e55847 100644 --- a/pkg/raw/mock_conn_test.go +++ b/pkg/raw/mock_conn_test.go @@ -1,7 +1,7 @@ // Code generated by MockGen. DO NOT EDIT. // Source: net (interfaces: Conn) -// Package stream_test is a generated GoMock package. +// Package raw_test is a generated GoMock package. package raw_test import ( diff --git a/pkg/raw/publishing_message.go b/pkg/raw/publishing_message.go new file mode 100644 index 00000000..4caf2695 --- /dev/null +++ b/pkg/raw/publishing_message.go @@ -0,0 +1,44 @@ +package raw + +import ( + "github.com/gsantomaggio/rabbitmq-stream-go-client/internal" + "github.com/gsantomaggio/rabbitmq-stream-go-client/pkg/common" + "io" +) + +type PublishingMessage struct { + publishingId uint64 + message common.StreamerMessage +} + +func (m *PublishingMessage) SetPublishingId(publishingId uint64) { + m.publishingId = publishingId +} + +func (m *PublishingMessage) PublishingId() uint64 { + return m.publishingId +} + +func (m *PublishingMessage) SetMessage(message common.StreamerMessage) { + m.message = message +} + +func (m *PublishingMessage) Message() common.StreamerMessage { + return m.message +} + +func (m *PublishingMessage) WriteTo(writer io.Writer) (int64, error) { + written, err := internal.WriteMany(writer, m.publishingId) + if err != nil { + return int64(written), err + } + writtenM, err := m.message.WriteTo(writer) + if err != nil { + return int64(writtenM), err + } + return int64(written) + writtenM, nil +} + +func NewPublishingMessage(publishingId uint64, message common.StreamerMessage) *PublishingMessage { + return &PublishingMessage{publishingId: publishingId, message: message} +} diff --git a/pkg/raw/stream_suite_test.go b/pkg/raw/stream_suite_test.go index d9e13909..5f1f8a00 100644 --- a/pkg/raw/stream_suite_test.go +++ b/pkg/raw/stream_suite_test.go @@ -6,7 +6,7 @@ import ( "encoding/binary" "errors" "github.com/gsantomaggio/rabbitmq-stream-go-client/internal" - "github.com/gsantomaggio/rabbitmq-stream-go-client/pkg/common" + "github.com/gsantomaggio/rabbitmq-stream-go-client/pkg/constants" "io" "net" "os" @@ -209,7 +209,7 @@ func (rmq *fakeRabbitMQServer) fakeRabbitMQConnectionClose(ctx context.Context) rw := bufio.NewReadWriter(bufio.NewReader(rmq.connection), bufio.NewWriter(rmq.connection)) - // close request + // close syncRequest headerRequest := new(internal.Header) expectOffset1(headerRequest.Read(rw.Reader)).To(Succeed()) expectOffset1(headerRequest.Command()).To(BeNumerically("==", 0x0016)) @@ -240,7 +240,7 @@ func (rmq *fakeRabbitMQServer) fakeRabbitMQConnectionClose(ctx context.Context) expectOffset1(rw.Flush()).To(Succeed()) } -func (rmq *fakeRabbitMQServer) fakeRabbitMQDeclareStream(ctx context.Context, name string, args common.StreamConfiguration) { +func (rmq *fakeRabbitMQServer) fakeRabbitMQDeclareStream(ctx context.Context, name string, args constants.StreamConfiguration) { defer GinkgoRecover() expectOffset1(rmq.connection.SetDeadline(time.Now().Add(time.Second))). To(Succeed()) @@ -395,7 +395,7 @@ func responseCodeFromContext(ctx context.Context, suffix ...string) (responseCod } v := ctx.Value(key) if v == nil { - return common.ResponseCodeOK + return constants.ResponseCodeOK } responseCode = v.(uint16) return diff --git a/pkg/raw/util.go b/pkg/raw/util.go index 9a2840ca..90359f44 100644 --- a/pkg/raw/util.go +++ b/pkg/raw/util.go @@ -2,7 +2,7 @@ package raw import ( "fmt" - "github.com/gsantomaggio/rabbitmq-stream-go-client/pkg/common" + "github.com/gsantomaggio/rabbitmq-stream-go-client/pkg/constants" "net/url" "strconv" "strings" @@ -78,7 +78,7 @@ func parseURI(uri string) (broker, error) { } func streamErrorOrNil(responseCode uint16) error { - err, found := common.ResponseCodeToError[responseCode] + err, found := constants.ResponseCodeToError[responseCode] if !found { return fmt.Errorf("unknown response code %d", responseCode) }