forked from SolaceSamples/solace-samples-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
hello_world.go
141 lines (109 loc) · 3.59 KB
/
hello_world.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package main
import (
"fmt"
"os"
"os/signal"
"strconv"
"time"
"solace.dev/go/messaging"
"solace.dev/go/messaging/pkg/solace/config"
"solace.dev/go/messaging/pkg/solace/message"
"solace.dev/go/messaging/pkg/solace/resource"
)
// Message Handler
func MessageHandler(message message.InboundMessage) {
fmt.Printf("Message Dump %s \n", message)
}
func getEnv(key, def string) string {
if val, ok := os.LookupEnv(key); ok {
return val
}
return def
}
// Define Topic Prefix
const TopicPrefix = "solace/samples"
func main() {
// Configuration parameters
brokerConfig := config.ServicePropertyMap{
config.TransportLayerPropertyHost: getEnv("SOLACE_HOST", "tcp://localhost:55555,tcp://localhost:55554"),
config.ServicePropertyVPNName: getEnv("SOLACE_VPN", "default"),
config.AuthenticationPropertySchemeBasicPassword: getEnv("SOLACE_PASSWORD", "default"),
config.AuthenticationPropertySchemeBasicUserName: getEnv("SOLACE_USERNAME", "default"),
}
messagingService, err := messaging.NewMessagingServiceBuilder().FromConfigurationProvider(brokerConfig).Build()
if err != nil {
panic(err)
}
// Connect to the messaging serice
if err := messagingService.Connect(); err != nil {
panic(err)
}
fmt.Println("Connected to the broker? ", messagingService.IsConnected())
// Build a Direct Message Publisher
directPublisher, builderErr := messagingService.CreateDirectMessagePublisherBuilder().Build()
if builderErr != nil {
panic(builderErr)
}
startErr := directPublisher.Start()
if startErr != nil {
panic(startErr)
}
fmt.Println("Direct Publisher running? ", directPublisher.IsRunning())
// Build a Direct Message Receiver
directReceiver, err := messagingService.CreateDirectMessageReceiverBuilder().
WithSubscriptions(resource.TopicSubscriptionOf(TopicPrefix + "/*/hello/>")).
Build()
if err != nil {
panic(err)
}
// Start Direct Message Receiver
if err := directReceiver.Start(); err != nil {
panic(err)
}
fmt.Println("Direct Receiver running? ", directReceiver.IsRunning())
if regErr := directReceiver.ReceiveAsync(MessageHandler); regErr != nil {
panic(regErr)
}
fmt.Print("\nEnter your name: ")
var uniqueName string
fmt.Scanln(&uniqueName)
msgSeqNum := 0
// Prepare outbound message payload and body
messageBody := "Hello from Go HelloWorld Sample"
messageBuilder := messagingService.MessageBuilder().
WithProperty("application", "samples").
WithProperty("language", "go")
go func() {
println("Subscribe to topic ", TopicPrefix+"/>")
for directPublisher.IsReady() {
msgSeqNum++
message, err := messageBuilder.BuildWithStringPayload(messageBody + " --> " + strconv.Itoa(msgSeqNum))
if err != nil {
panic(err)
}
publishErr := directPublisher.Publish(message, resource.TopicOf(TopicPrefix+"/go/hello/"+uniqueName+"/"+strconv.Itoa(msgSeqNum)))
if publishErr != nil {
panic(publishErr)
}
time.Sleep(1 * time.Second)
}
}()
// Handle interrupts
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
// Block until a signal is received.
<-c
// TODO
// Find way to shutdown the go routine
// e.g use another channel, BOOl..etc
// TODO
// Terminate the Direct Receiver
directReceiver.Terminate(2 * time.Second)
fmt.Println("\nDirect Receiver Terminated? ", directReceiver.IsTerminated())
// Terminate the Direct Publisher
directPublisher.Terminate(2 * time.Second)
fmt.Println("\nDirect Publisher Terminated? ", directPublisher.IsTerminated())
// Disconnect the Message Service
messagingService.Disconnect()
fmt.Println("Messaging Service Disconnected? ", !messagingService.IsConnected())
}