-
Notifications
You must be signed in to change notification settings - Fork 20
/
performances.go
133 lines (116 loc) · 3.61 KB
/
performances.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package main
import (
"bufio"
"fmt"
"github.com/google/uuid"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
"os"
"strconv"
"sync/atomic"
"time"
)
func CheckErr(err error) {
if err != nil {
fmt.Printf("%s ", err)
os.Exit(1)
}
}
var messagesConfirmed int32
func handlePublishConfirm(confirms stream.ChannelPublishConfirm) {
go func() {
for confirmed := range confirms {
for _, msg := range confirmed {
if msg.IsConfirmed() {
atomic.AddInt32(&messagesConfirmed, 1)
}
}
}
}()
}
func main() {
useSyncBatch := os.Args[1] == "sync"
useAsyncSend := os.Args[1] == "async"
messagesToSend, err := strconv.Atoi(os.Args[2])
CheckErr(err)
batchSize, err := strconv.Atoi(os.Args[3])
messagesToSend = messagesToSend / batchSize
reader := bufio.NewReader(os.Stdin)
fmt.Println("RabbitMQ performance example")
// Connect to the broker ( or brokers )
env, err := stream.NewEnvironment(
stream.NewEnvironmentOptions().
SetHost("localhost").
SetPort(5552).
SetUser("guest").
SetPassword("guest"))
CheckErr(err)
fmt.Println("Connected to the RabbitMQ server")
streamName := uuid.New().String()
err = env.DeclareStream(streamName,
&stream.StreamOptions{
MaxLengthBytes: stream.ByteCapacity{}.GB(2),
},
)
CheckErr(err)
fmt.Printf("Created Stream: %s \n", streamName)
producer, err := env.NewProducer(streamName,
stream.NewProducerOptions().
SetBatchSize(batchSize).
SetBatchPublishingDelay(100))
CheckErr(err)
chPublishConfirm := producer.NotifyPublishConfirmation()
handlePublishConfirm(chPublishConfirm)
fmt.Printf("------------------------------------------\n\n")
fmt.Printf("Start sending %d messages, data size: %d bytes\n", messagesToSend*batchSize, len("hello_world"))
var averageLatency time.Duration
var messagesConsumed int32
handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
atomic.AddInt32(&messagesConsumed, 1)
var latency time.Time
err := latency.UnmarshalBinary(message.Data[0])
CheckErr(err)
averageLatency += time.Since(latency)
}
_, err = env.NewConsumer(streamName, handleMessages, stream.NewConsumerOptions().SetOffset(stream.OffsetSpecification{}.First()))
CheckErr(err)
start := time.Now()
// here the client sends the messages in batch and it is up to the user to aggregate the messages
if useSyncBatch {
var arr []message.StreamMessage
for i := 0; i < messagesToSend; i++ {
for i := 0; i < batchSize; i++ {
latency, err := time.Now().MarshalBinary()
CheckErr(err)
arr = append(arr, amqp.NewMessage(latency))
}
err := producer.BatchSend(arr)
CheckErr(err)
arr = arr[:0]
}
}
// here the client aggregates the messages based on the batch size and batch publishing delay
if useAsyncSend {
for i := 0; i < messagesToSend; i++ {
for i := 0; i < batchSize; i++ {
latency, err := time.Now().MarshalBinary()
CheckErr(err)
err = producer.Send(amqp.NewMessage(latency))
CheckErr(err)
}
}
}
duration := time.Since(start)
fmt.Println("Press any key to report and stop ")
_, _ = reader.ReadString('\n')
fmt.Printf("------------------------------------------\n\n")
fmt.Printf("Sent %d messages in %s. Confirmed: %d avarage latency: %s \n", messagesToSend*batchSize, duration, messagesConfirmed, averageLatency/time.Duration(messagesConsumed))
fmt.Printf("------------------------------------------\n\n")
time.Sleep(200 * time.Millisecond)
CheckErr(err)
err = env.DeleteStream(streamName)
CheckErr(err)
err = env.Close()
CheckErr(err)
}