diff --git a/README.md b/README.md index be2fedf4..25ae75c1 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,7 @@ Go client for [RabbitMQ Stream Queues](https://github.com/rabbitmq/rabbitmq-serv * [Load Balancer](#load-balancer) * [TLS](#tls) * [Streams](#streams) + * [Statistics](#streams-statistics) * [Publish messages](#publish-messages) * [`Send` vs `BatchSend`](#send-vs-batchsend) * [Publish Confirmation](#publish-confirmation) @@ -58,7 +59,7 @@ imports: ### Run server with Docker --- You may need a server to test locally. Let's start the broker: -```shell +```shell docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672\ -e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost -rabbit loopback_users "none"' \ rabbitmq:3.9-management @@ -182,6 +183,35 @@ The function `DeclareStream` doesn't return errors if a stream is already define Note that it returns the precondition failed when it doesn't have the same parameters Use `StreamExists` to check if a stream exists. +### Streams Statistics + +To get stream statistics you need to use the the `environment.StreamStats` method. + +```golang +stats, err := environment.StreamStats(testStreamName) + +// FirstOffset - The first offset in the stream. +// return first offset in the stream / +// Error if there is no first offset yet + +firstOffset, err := stats.FirstOffset() // first offset of the stream + +// LastOffset - The last offset in the stream. +// return last offset in the stream +// error if there is no first offset yet +lastOffset, err := stats.LastOffset() // last offset of the stream + +// CommittedChunkId - The ID (offset) of the committed chunk (block of messages) in the stream. +// +// It is the offset of the first message in the last chunk confirmed by a quorum of the stream +// cluster members (leader and replicas). +// +// The committed chunk ID is a good indication of what the last offset of a stream can be at a +// given time. The value can be stale as soon as the application reads it though, as the committed +// chunk ID for a stream that is published to changes all the time. + +committedChunkId, err := statsAfter.CommittedChunkId() +``` ### Publish messages @@ -241,14 +271,14 @@ The `Send` interface works in most of the cases, In some condition is about 15/2 ### Publish Confirmation -For each publish the server sends back to the client the confirmation or an error. +For each publish the server sends back to the client the confirmation or an error. The client provides an interface to receive the confirmation: ```golang //optional publish confirmation channel chPublishConfirm := producer.NotifyPublishConfirmation() handlePublishConfirm(chPublishConfirm) - + func handlePublishConfirm(confirms stream.ChannelPublishConfirm) { go func() { for confirmed := range confirms { @@ -264,7 +294,7 @@ func handlePublishConfirm(confirms stream.ChannelPublishConfirm) { } ``` -In the MessageStatus struct you can find two `publishingId`: +In the MessageStatus struct you can find two `publishingId`: ```golang //first one messageStatus.GetMessage().GetPublishingId() @@ -277,12 +307,12 @@ The second one is assigned automatically by the client. In case the user specifies the `publishingId` with: ```golang msg = amqp.NewMessage([]byte("mymessage")) -msg.SetPublishingId(18) // <--- +msg.SetPublishingId(18) // <--- ``` The filed: `messageStatus.GetMessage().HasPublishingId()` is true and
-the values `messageStatus.GetMessage().GetPublishingId()` and `messageStatus.GetPublishingId()` are the same. +the values `messageStatus.GetMessage().GetPublishingId()` and `messageStatus.GetPublishingId()` are the same. See also "Getting started" example in the [examples](./examples/) directory @@ -303,8 +333,8 @@ publishingId, err := producer.GetLastPublishingId() ### Sub Entries Batching -The number of messages to put in a sub-entry. A sub-entry is one "slot" in a publishing frame, -meaning outbound messages are not only batched in publishing frames, but in sub-entries as well. +The number of messages to put in a sub-entry. A sub-entry is one "slot" in a publishing frame, +meaning outbound messages are not only batched in publishing frames, but in sub-entries as well. Use this feature to increase throughput at the cost of increased latency.
You can find a "Sub Entries Batching" example in the [examples](./examples/) directory.
@@ -319,7 +349,7 @@ producer, err := env.NewProducer(streamName, stream.NewProducerOptions(). ### Ha Producer Experimental The ha producer is built up the standard producer.
-Features: +Features: - auto-reconnect in case of disconnection - handle the unconfirmed messages automatically in case of fail. @@ -329,7 +359,7 @@ You can find a "HA producer" example in the [examples](./examples/) directory. < haproducer := NewHAProducer( env *stream.Environment, // mandatory streamName string, // mandatory - producerOptions *stream.ProducerOptions, //optional + producerOptions *stream.ProducerOptions, //optional confirmMessageHandler ConfirmMessageHandler // mandatory ) ``` @@ -352,7 +382,7 @@ With `ConsumerOptions` it is possible to customize the consumer behaviour. ```golang stream.NewConsumerOptions(). SetConsumerName("my_consumer"). // set a consumer name - SetCRCCheck(false). // Enable/Disable the CRC control. + SetCRCCheck(false). // Enable/Disable the CRC control. SetOffset(stream.OffsetSpecification{}.First())) // start consuming from the beginning ``` Disabling the CRC control can increase the performances. @@ -374,7 +404,7 @@ handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Mes consumer, err := env.NewConsumer( .. stream.NewConsumerOptions(). - SetConsumerName("my_consumer"). <------ + SetConsumerName("my_consumer"). <------ ``` A consumer must have a name to be able to store offsets.
Note: *AVOID to store the offset for each single message, it will reduce the performances* @@ -388,9 +418,9 @@ processMessageAsync := func(consumer stream.Consumer, message *amqp.Message, off err := consumer.StoreCustomOffset(offset) // commit all messages up to this offset .... ``` -This is useful in situations where we have to process messages asynchronously and we cannot block the original message +This is useful in situations where we have to process messages asynchronously and we cannot block the original message handler. Which means we cannot store the current or latest delivered offset as we saw in the `handleMessages` function -above. +above. ### Automatic Track Offset @@ -422,9 +452,9 @@ stream.NewConsumerOptions(). // set a consumerOffsetNumber name SetConsumerName("my_consumer"). SetAutoCommit(stream.NewAutoCommitStrategy(). - SetCountBeforeStorage(50). // store each 50 messages stores + SetCountBeforeStorage(50). // store each 50 messages stores SetFlushInterval(10*time.Second)). // store each 10 seconds - SetOffset(stream.OffsetSpecification{}.First())) + SetOffset(stream.OffsetSpecification{}.First())) ``` See also "Automatic Offset Tracking" example in the [examples](./examples/) directory @@ -453,7 +483,7 @@ In this way it is possible to handle fail-over ### Performance test tool -Performance test tool it is useful to execute tests. +Performance test tool it is useful to execute tests. See also the [Java Performance](https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#the-performance-tool) tool diff --git a/pkg/stream/client.go b/pkg/stream/client.go index e5534e8b..2ee213f6 100644 --- a/pkg/stream/client.go +++ b/pkg/stream/client.go @@ -799,3 +799,28 @@ func (c *Client) DeclareSubscriber(streamName string, }() return consumer, err.Err } + +func (c *Client) StreamStats(streamName string) (*StreamStats, error) { + + resp := c.coordinator.NewResponse(commandStreamStatus) + correlationId := resp.correlationid + + length := 2 + 2 + 4 + 2 + len(streamName) + + var b = bytes.NewBuffer(make([]byte, 0, length+4)) + writeProtocolHeader(b, length, commandStreamStatus, + correlationId) + writeString(b, streamName) + + err := c.handleWriteWithResponse(b.Bytes(), resp, false) + offset := <-resp.data + _ = c.coordinator.RemoveResponseById(resp.correlationid) + if err.Err != nil { + return nil, err.Err + } + m, ok := offset.(map[string]int64) + if !ok { + return nil, fmt.Errorf("invalid response, expected map[string]int64 but got %T", offset) + } + return newStreamStats(m, streamName), nil +} diff --git a/pkg/stream/client_test.go b/pkg/stream/client_test.go index 6caafa58..5536540d 100644 --- a/pkg/stream/client_test.go +++ b/pkg/stream/client_test.go @@ -115,6 +115,52 @@ var _ = Describe("Streaming testEnvironment", func() { Expect(testEnvironment.DeleteStream(testStreamName)).NotTo(HaveOccurred()) }) + It("Stream Status", func() { + Expect(testEnvironment.DeclareStream(testStreamName, nil)). + NotTo(HaveOccurred()) + stats, err := testEnvironment.StreamStats(testStreamName) + Expect(err).NotTo(HaveOccurred()) + Expect(stats).NotTo(BeNil()) + + DeferCleanup(func() { + Expect(testEnvironment.DeleteStream(testStreamName)).NotTo(HaveOccurred()) + }) + + _, err = stats.FirstOffset() + Expect(fmt.Sprintf("%s", err)). + To(ContainSubstring("FirstOffset not found for")) + + _, err = stats.LastOffset() + Expect(fmt.Sprintf("%s", err)). + To(ContainSubstring("LastOffset not found for")) + + _, err = stats.CommittedChunkId() + Expect(fmt.Sprintf("%s", err)). + To(ContainSubstring("CommittedChunkId not found for")) + + producer, err := testEnvironment.NewProducer(testStreamName, nil) + Expect(err).NotTo(HaveOccurred()) + Expect(producer.BatchSend(CreateArrayMessagesForTesting(1_000))).NotTo(HaveOccurred()) + time.Sleep(time.Millisecond * 800) + Expect(producer.Close()).NotTo(HaveOccurred()) + + statsAfter, err := testEnvironment.StreamStats(testStreamName) + Expect(err).NotTo(HaveOccurred()) + Expect(statsAfter).NotTo(BeNil()) + + offset, err := statsAfter.FirstOffset() + Expect(err).NotTo(HaveOccurred()) + Expect(offset == 0).To(BeTrue()) + + offset, err = statsAfter.LastOffset() + Expect(err).NotTo(HaveOccurred()) + Expect(offset > 0).To(BeTrue()) + + offset, err = statsAfter.CommittedChunkId() + Expect(err).NotTo(HaveOccurred()) + Expect(offset > 0).To(BeTrue()) + }) + It("Create two times Stream precondition fail", func() { Expect(testEnvironment.DeclareStream(testStreamName, nil)).NotTo(HaveOccurred()) err := testEnvironment.DeclareStream(testStreamName, diff --git a/pkg/stream/constants.go b/pkg/stream/constants.go index 8f5dadc9..c5dc2160 100644 --- a/pkg/stream/constants.go +++ b/pkg/stream/constants.go @@ -44,6 +44,7 @@ const ( commandOpen = 21 CommandClose = 22 commandHeartbeat = 23 + commandStreamStatus = 28 /// used only for tests commandUnitTest = 99 diff --git a/pkg/stream/environment.go b/pkg/stream/environment.go index 120c4717..a19a16ac 100644 --- a/pkg/stream/environment.go +++ b/pkg/stream/environment.go @@ -186,6 +186,20 @@ func (env *Environment) QuerySequence(publisherReference string, streamName stri return client.queryPublisherSequence(publisherReference, streamName) } +func (env *Environment) StreamStats(streamName string) (*StreamStats, error) { + client, err := env.newReconnectClient() + defer func(client *Client) { + err := client.Close() + if err != nil { + return + } + }(client) + if err != nil { + return nil, err + } + return client.StreamStats(streamName) +} + func (env *Environment) StreamMetaData(streamName string) (*StreamMetadata, error) { client, err := env.newReconnectClient() defer func(client *Client) { diff --git a/pkg/stream/server_frame.go b/pkg/stream/server_frame.go index cf2f098e..807707f9 100644 --- a/pkg/stream/server_frame.go +++ b/pkg/stream/server_frame.go @@ -99,6 +99,10 @@ func (c *Client) handleResponse() { c.queryOffsetFrameHandler(readerProtocol, buffer) } + case commandStreamStatus: + { + c.streamStatusFrameHandler(readerProtocol, buffer) + } case commandMetadata: { c.metadataFrameHandler(readerProtocol, buffer) @@ -478,6 +482,29 @@ func (c *Client) metadataUpdateFrameHandler(buffer *bufio.Reader) { } } +func (c *Client) streamStatusFrameHandler(readProtocol *ReaderProtocol, + r *bufio.Reader) { + + c.handleGenericResponse(readProtocol, r) + + count, _ := readUInt(r) + streamStatus := make(map[string]int64) + + for i := 0; i < int(count); i++ { + key := readString(r) + value := readInt64(r) + streamStatus[key] = value + } + res, err := c.coordinator.GetResponseById(readProtocol.CorrelationId) + if err != nil { + logs.LogWarn("stream status response not found") + return + } + res.code <- Code{id: readProtocol.ResponseCode} + res.data <- streamStatus + +} + func (c *Client) metadataFrameHandler(readProtocol *ReaderProtocol, r *bufio.Reader) { readProtocol.CorrelationId, _ = readUInt(r) diff --git a/pkg/stream/stream_stats.go b/pkg/stream/stream_stats.go new file mode 100644 index 00000000..7d05f6ae --- /dev/null +++ b/pkg/stream/stream_stats.go @@ -0,0 +1,51 @@ +package stream + +import "fmt" + +type StreamStats struct { + stats map[string]int64 + streamName string +} + +func newStreamStats(stats map[string]int64, streamName string) *StreamStats { + return &StreamStats{stats: stats, streamName: streamName} + +} + +// FirstOffset - The first offset in the stream. +// return first offset in the stream / +// Error if there is no first offset yet +func (s *StreamStats) FirstOffset() (int64, error) { + if s.stats["first_chunk_id"] == -1 { + return -1, fmt.Errorf("FirstOffset not found for %s", s.streamName) + } + return s.stats["first_chunk_id"], nil +} + +// LastOffset - The last offset in the stream. +// return last offset in the stream +// error if there is no first offset yet +func (s *StreamStats) LastOffset() (int64, error) { + if s.stats["last_chunk_id"] == -1 { + return -1, fmt.Errorf("LastOffset not found for %s", s.streamName) + } + return s.stats["last_chunk_id"], nil +} + +// CommittedChunkId - The ID (offset) of the committed chunk (block of messages) in the stream. +// +// It is the offset of the first message in the last chunk confirmed by a quorum of the stream +// cluster members (leader and replicas). +// +// The committed chunk ID is a good indication of what the last offset of a stream can be at a +// given time. The value can be stale as soon as the application reads it though, as the committed +// chunk ID for a stream that is published to changes all the time. +// +// return committed offset in this stream +// Error if there is no committed chunk yet +func (s *StreamStats) CommittedChunkId() (int64, error) { + if s.stats["committed_chunk_id"] == -1 { + return -1, fmt.Errorf("CommittedChunkId not found for %s", s.streamName) + } + return s.stats["committed_chunk_id"], nil +}