-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstreams.go
117 lines (100 loc) · 3.12 KB
/
streams.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package main
import (
"bufio"
"crypto/tls"
"fmt"
"os"
"strconv"
"github.com/google/uuid"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
)
func CheckErr(err error) {
if err != nil {
fmt.Printf("%s ", err)
os.Exit(1)
}
}
func main() {
reader := bufio.NewReader(os.Stdin)
//stream.SetLevelInfo(logs.DEBUG)
fmt.Println("Configure a load-balancer TLS for RabbitMQ")
fmt.Println("Connecting to RabbitMQ streaming ...")
// load balancer address in TLS
addressResolver := stream.AddressResolver{
Host: os.Getenv("rmqLBIP"),
Port: 5552,
}
conf := &tls.Config{InsecureSkipVerify: true}
env, err := stream.NewEnvironment(
stream.NewEnvironmentOptions().
SetHost(addressResolver.Host).
SetPort(addressResolver.Port).
IsTLS(false).
SetTLSConfig(conf).
SetAddressResolver(addressResolver).
SetUser(os.Getenv("username")).
SetPassword(os.Getenv("password")).
SetMaxProducersPerClient(5))
CheckErr(err)
/// We create a few streams, in order to distribute the streams across the cluster
var streamsName []string
for i := 0; i < 3; i++ {
streamsName = append(streamsName, uuid.New().String())
}
for _, streamName := range streamsName {
fmt.Printf("Create stream %s ...\n", streamName)
err = env.DeclareStream(streamName,
&stream.StreamOptions{
MaxLengthBytes: stream.ByteCapacity{}.GB(2),
},
)
}
CheckErr(err)
var producers []*stream.Producer
// The producer MUST connect to the leader stream
// here the AddressResolver try to get the leader
// if fails retry
for _, streamName := range streamsName {
fmt.Printf("Create producer for %s ...\n", streamName)
producer, err := env.NewProducer(streamName, nil)
producers = append(producers, producer)
CheckErr(err)
}
// just publish some message
for i := 0; i < 50; i++ {
for _, producer := range producers {
err := producer.Send(amqp.NewMessage([]byte("hello_world_" + strconv.Itoa(i))))
CheckErr(err)
}
}
handleMessages := func(consumerContext stream.ConsumerContext, message *amqp.Message) {
fmt.Printf("consumer name: %s, text: %s \n ", consumerContext.Consumer.GetName(), message.Data)
}
// the consumer can connect to the leader o follower
// the AddressResolver just resolve the ip
var consumers []*stream.Consumer
for _, streamName := range streamsName {
fmt.Printf("Create consumer for %s ...\n", streamName)
consumer, err := env.NewConsumer(
streamName,
handleMessages,
stream.NewConsumerOptions().
SetConsumerName(uuid.New().String()). // set a random name
SetOffset(stream.OffsetSpecification{}.First())) // start consuming from the beginning
CheckErr(err)
consumers = append(consumers, consumer)
}
/// check on the UI http://localhost:15673/#/stream/connections
// the producers are connected to the leader node
/// the consumers random nodes it doesn't matter
fmt.Println("Hit Enter to stop ")
_, _ = reader.ReadString('\n')
for _, streamName := range streamsName {
fmt.Printf("Delete stream %s ...\n", streamName)
err = env.DeleteStream(streamName)
}
CheckErr(err)
err = env.Close()
CheckErr(err)
}