Skip to content

Commit

Permalink
Implement stream stats (#196)
Browse files Browse the repository at this point in the history
* Implement stream stats

Closes: #190
Signed-off-by: Gabriele Santomaggio <[email protected]>

Signed-off-by: Gabriele Santomaggio <[email protected]>
  • Loading branch information
Gsantomaggio authored Mar 14, 2023
1 parent ec6e973 commit 5cadb98
Show file tree
Hide file tree
Showing 7 changed files with 211 additions and 17 deletions.
64 changes: 47 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Go client for [RabbitMQ Stream Queues](https://github.com/rabbitmq/rabbitmq-serv
* [Load Balancer](#load-balancer)
* [TLS](#tls)
* [Streams](#streams)
* [Statistics](#streams-statistics)
* [Publish messages](#publish-messages)
* [`Send` vs `BatchSend`](#send-vs-batchsend)
* [Publish Confirmation](#publish-confirmation)
Expand Down Expand Up @@ -58,7 +59,7 @@ imports:
### Run server with Docker
---
You may need a server to test locally. Let's start the broker:
```shell
```shell
docker run -it --rm --name rabbitmq -p 5552:5552 -p 15672:15672\
-e RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS='-rabbitmq_stream advertised_host localhost -rabbit loopback_users "none"' \
rabbitmq:3.9-management
Expand Down Expand Up @@ -182,6 +183,35 @@ The function `DeclareStream` doesn't return errors if a stream is already define
Note that it returns the precondition failed when it doesn't have the same parameters
Use `StreamExists` to check if a stream exists.

### Streams Statistics

To get stream statistics you need to use the the `environment.StreamStats` method.

```golang
stats, err := environment.StreamStats(testStreamName)

// FirstOffset - The first offset in the stream.
// return first offset in the stream /
// Error if there is no first offset yet

firstOffset, err := stats.FirstOffset() // first offset of the stream

// LastOffset - The last offset in the stream.
// return last offset in the stream
// error if there is no first offset yet
lastOffset, err := stats.LastOffset() // last offset of the stream

// CommittedChunkId - The ID (offset) of the committed chunk (block of messages) in the stream.
//
// It is the offset of the first message in the last chunk confirmed by a quorum of the stream
// cluster members (leader and replicas).
//
// The committed chunk ID is a good indication of what the last offset of a stream can be at a
// given time. The value can be stale as soon as the application reads it though, as the committed
// chunk ID for a stream that is published to changes all the time.

committedChunkId, err := statsAfter.CommittedChunkId()
```

### Publish messages

Expand Down Expand Up @@ -241,14 +271,14 @@ The `Send` interface works in most of the cases, In some condition is about 15/2

### Publish Confirmation

For each publish the server sends back to the client the confirmation or an error.
For each publish the server sends back to the client the confirmation or an error.
The client provides an interface to receive the confirmation:

```golang
//optional publish confirmation channel
chPublishConfirm := producer.NotifyPublishConfirmation()
handlePublishConfirm(chPublishConfirm)

func handlePublishConfirm(confirms stream.ChannelPublishConfirm) {
go func() {
for confirmed := range confirms {
Expand All @@ -264,7 +294,7 @@ func handlePublishConfirm(confirms stream.ChannelPublishConfirm) {
}
```

In the MessageStatus struct you can find two `publishingId`:
In the MessageStatus struct you can find two `publishingId`:
```golang
//first one
messageStatus.GetMessage().GetPublishingId()
Expand All @@ -277,12 +307,12 @@ The second one is assigned automatically by the client.
In case the user specifies the `publishingId` with:
```golang
msg = amqp.NewMessage([]byte("mymessage"))
msg.SetPublishingId(18) // <---
msg.SetPublishingId(18) // <---
```


The filed: `messageStatus.GetMessage().HasPublishingId()` is true and </br>
the values `messageStatus.GetMessage().GetPublishingId()` and `messageStatus.GetPublishingId()` are the same.
the values `messageStatus.GetMessage().GetPublishingId()` and `messageStatus.GetPublishingId()` are the same.


See also "Getting started" example in the [examples](./examples/) directory
Expand All @@ -303,8 +333,8 @@ publishingId, err := producer.GetLastPublishingId()

### Sub Entries Batching

The number of messages to put in a sub-entry. A sub-entry is one "slot" in a publishing frame,
meaning outbound messages are not only batched in publishing frames, but in sub-entries as well.
The number of messages to put in a sub-entry. A sub-entry is one "slot" in a publishing frame,
meaning outbound messages are not only batched in publishing frames, but in sub-entries as well.
Use this feature to increase throughput at the cost of increased latency. </br>
You can find a "Sub Entries Batching" example in the [examples](./examples/) directory. </br>

Expand All @@ -319,7 +349,7 @@ producer, err := env.NewProducer(streamName, stream.NewProducerOptions().

### Ha Producer Experimental
The ha producer is built up the standard producer. </br>
Features:
Features:
- auto-reconnect in case of disconnection
- handle the unconfirmed messages automatically in case of fail.

Expand All @@ -329,7 +359,7 @@ You can find a "HA producer" example in the [examples](./examples/) directory. <
haproducer := NewHAProducer(
env *stream.Environment, // mandatory
streamName string, // mandatory
producerOptions *stream.ProducerOptions, //optional
producerOptions *stream.ProducerOptions, //optional
confirmMessageHandler ConfirmMessageHandler // mandatory
)
```
Expand All @@ -352,7 +382,7 @@ With `ConsumerOptions` it is possible to customize the consumer behaviour.
```golang
stream.NewConsumerOptions().
SetConsumerName("my_consumer"). // set a consumer name
SetCRCCheck(false). // Enable/Disable the CRC control.
SetCRCCheck(false). // Enable/Disable the CRC control.
SetOffset(stream.OffsetSpecification{}.First())) // start consuming from the beginning
```
Disabling the CRC control can increase the performances.
Expand All @@ -374,7 +404,7 @@ handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Mes
consumer, err := env.NewConsumer(
..
stream.NewConsumerOptions().
SetConsumerName("my_consumer"). <------
SetConsumerName("my_consumer"). <------
```
A consumer must have a name to be able to store offsets. <br>
Note: *AVOID to store the offset for each single message, it will reduce the performances*
Expand All @@ -388,9 +418,9 @@ processMessageAsync := func(consumer stream.Consumer, message *amqp.Message, off
err := consumer.StoreCustomOffset(offset) // commit all messages up to this offset
....
```
This is useful in situations where we have to process messages asynchronously and we cannot block the original message
This is useful in situations where we have to process messages asynchronously and we cannot block the original message
handler. Which means we cannot store the current or latest delivered offset as we saw in the `handleMessages` function
above.
above.
### Automatic Track Offset
Expand Down Expand Up @@ -422,9 +452,9 @@ stream.NewConsumerOptions().
// set a consumerOffsetNumber name
SetConsumerName("my_consumer").
SetAutoCommit(stream.NewAutoCommitStrategy().
SetCountBeforeStorage(50). // store each 50 messages stores
SetCountBeforeStorage(50). // store each 50 messages stores
SetFlushInterval(10*time.Second)). // store each 10 seconds
SetOffset(stream.OffsetSpecification{}.First()))
SetOffset(stream.OffsetSpecification{}.First()))
```
See also "Automatic Offset Tracking" example in the [examples](./examples/) directory
Expand Down Expand Up @@ -453,7 +483,7 @@ In this way it is possible to handle fail-over
### Performance test tool
Performance test tool it is useful to execute tests.
Performance test tool it is useful to execute tests.
See also the [Java Performance](https://rabbitmq.github.io/rabbitmq-stream-java-client/stable/htmlsingle/#the-performance-tool) tool
Expand Down
25 changes: 25 additions & 0 deletions pkg/stream/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -799,3 +799,28 @@ func (c *Client) DeclareSubscriber(streamName string,
}()
return consumer, err.Err
}

func (c *Client) StreamStats(streamName string) (*StreamStats, error) {

resp := c.coordinator.NewResponse(commandStreamStatus)
correlationId := resp.correlationid

length := 2 + 2 + 4 + 2 + len(streamName)

var b = bytes.NewBuffer(make([]byte, 0, length+4))
writeProtocolHeader(b, length, commandStreamStatus,
correlationId)
writeString(b, streamName)

err := c.handleWriteWithResponse(b.Bytes(), resp, false)
offset := <-resp.data
_ = c.coordinator.RemoveResponseById(resp.correlationid)
if err.Err != nil {
return nil, err.Err
}
m, ok := offset.(map[string]int64)
if !ok {
return nil, fmt.Errorf("invalid response, expected map[string]int64 but got %T", offset)
}
return newStreamStats(m, streamName), nil
}
46 changes: 46 additions & 0 deletions pkg/stream/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,52 @@ var _ = Describe("Streaming testEnvironment", func() {
Expect(testEnvironment.DeleteStream(testStreamName)).NotTo(HaveOccurred())
})

It("Stream Status", func() {
Expect(testEnvironment.DeclareStream(testStreamName, nil)).
NotTo(HaveOccurred())
stats, err := testEnvironment.StreamStats(testStreamName)
Expect(err).NotTo(HaveOccurred())
Expect(stats).NotTo(BeNil())

DeferCleanup(func() {
Expect(testEnvironment.DeleteStream(testStreamName)).NotTo(HaveOccurred())
})

_, err = stats.FirstOffset()
Expect(fmt.Sprintf("%s", err)).
To(ContainSubstring("FirstOffset not found for"))

_, err = stats.LastOffset()
Expect(fmt.Sprintf("%s", err)).
To(ContainSubstring("LastOffset not found for"))

_, err = stats.CommittedChunkId()
Expect(fmt.Sprintf("%s", err)).
To(ContainSubstring("CommittedChunkId not found for"))

producer, err := testEnvironment.NewProducer(testStreamName, nil)
Expect(err).NotTo(HaveOccurred())
Expect(producer.BatchSend(CreateArrayMessagesForTesting(1_000))).NotTo(HaveOccurred())
time.Sleep(time.Millisecond * 800)
Expect(producer.Close()).NotTo(HaveOccurred())

statsAfter, err := testEnvironment.StreamStats(testStreamName)
Expect(err).NotTo(HaveOccurred())
Expect(statsAfter).NotTo(BeNil())

offset, err := statsAfter.FirstOffset()
Expect(err).NotTo(HaveOccurred())
Expect(offset == 0).To(BeTrue())

offset, err = statsAfter.LastOffset()
Expect(err).NotTo(HaveOccurred())
Expect(offset > 0).To(BeTrue())

offset, err = statsAfter.CommittedChunkId()
Expect(err).NotTo(HaveOccurred())
Expect(offset > 0).To(BeTrue())
})

It("Create two times Stream precondition fail", func() {
Expect(testEnvironment.DeclareStream(testStreamName, nil)).NotTo(HaveOccurred())
err := testEnvironment.DeclareStream(testStreamName,
Expand Down
1 change: 1 addition & 0 deletions pkg/stream/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ const (
commandOpen = 21
CommandClose = 22
commandHeartbeat = 23
commandStreamStatus = 28

/// used only for tests
commandUnitTest = 99
Expand Down
14 changes: 14 additions & 0 deletions pkg/stream/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,20 @@ func (env *Environment) QuerySequence(publisherReference string, streamName stri
return client.queryPublisherSequence(publisherReference, streamName)
}

func (env *Environment) StreamStats(streamName string) (*StreamStats, error) {
client, err := env.newReconnectClient()
defer func(client *Client) {
err := client.Close()
if err != nil {
return
}
}(client)
if err != nil {
return nil, err
}
return client.StreamStats(streamName)
}

func (env *Environment) StreamMetaData(streamName string) (*StreamMetadata, error) {
client, err := env.newReconnectClient()
defer func(client *Client) {
Expand Down
27 changes: 27 additions & 0 deletions pkg/stream/server_frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ func (c *Client) handleResponse() {
c.queryOffsetFrameHandler(readerProtocol, buffer)

}
case commandStreamStatus:
{
c.streamStatusFrameHandler(readerProtocol, buffer)
}
case commandMetadata:
{
c.metadataFrameHandler(readerProtocol, buffer)
Expand Down Expand Up @@ -478,6 +482,29 @@ func (c *Client) metadataUpdateFrameHandler(buffer *bufio.Reader) {
}
}

func (c *Client) streamStatusFrameHandler(readProtocol *ReaderProtocol,
r *bufio.Reader) {

c.handleGenericResponse(readProtocol, r)

count, _ := readUInt(r)
streamStatus := make(map[string]int64)

for i := 0; i < int(count); i++ {
key := readString(r)
value := readInt64(r)
streamStatus[key] = value
}
res, err := c.coordinator.GetResponseById(readProtocol.CorrelationId)
if err != nil {
logs.LogWarn("stream status response not found")
return
}
res.code <- Code{id: readProtocol.ResponseCode}
res.data <- streamStatus

}

func (c *Client) metadataFrameHandler(readProtocol *ReaderProtocol,
r *bufio.Reader) {
readProtocol.CorrelationId, _ = readUInt(r)
Expand Down
51 changes: 51 additions & 0 deletions pkg/stream/stream_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package stream

import "fmt"

type StreamStats struct {
stats map[string]int64
streamName string
}

func newStreamStats(stats map[string]int64, streamName string) *StreamStats {
return &StreamStats{stats: stats, streamName: streamName}

}

// FirstOffset - The first offset in the stream.
// return first offset in the stream /
// Error if there is no first offset yet
func (s *StreamStats) FirstOffset() (int64, error) {
if s.stats["first_chunk_id"] == -1 {
return -1, fmt.Errorf("FirstOffset not found for %s", s.streamName)
}
return s.stats["first_chunk_id"], nil
}

// LastOffset - The last offset in the stream.
// return last offset in the stream
// error if there is no first offset yet
func (s *StreamStats) LastOffset() (int64, error) {
if s.stats["last_chunk_id"] == -1 {
return -1, fmt.Errorf("LastOffset not found for %s", s.streamName)
}
return s.stats["last_chunk_id"], nil
}

// CommittedChunkId - The ID (offset) of the committed chunk (block of messages) in the stream.
//
// It is the offset of the first message in the last chunk confirmed by a quorum of the stream
// cluster members (leader and replicas).
//
// The committed chunk ID is a good indication of what the last offset of a stream can be at a
// given time. The value can be stale as soon as the application reads it though, as the committed
// chunk ID for a stream that is published to changes all the time.
//
// return committed offset in this stream
// Error if there is no committed chunk yet
func (s *StreamStats) CommittedChunkId() (int64, error) {
if s.stats["committed_chunk_id"] == -1 {
return -1, fmt.Errorf("CommittedChunkId not found for %s", s.streamName)
}
return s.stats["committed_chunk_id"], nil
}

0 comments on commit 5cadb98

Please sign in to comment.