Skip to content

Commit

Permalink
fmt code && add producer
Browse files Browse the repository at this point in the history
  • Loading branch information
sevennt committed Mar 10, 2017
1 parent bcc7baa commit d8d1b2b
Show file tree
Hide file tree
Showing 29 changed files with 2,387 additions and 940 deletions.
127 changes: 104 additions & 23 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,24 +1,105 @@
# go_rocket_mq
go rocketmq client
#import
import rocketmq "github.com/didapinchegit/go_rocket_mq"
#Introduction
This is a RocketMQ client of golang.

#example
conf := &rocketmq.Config{
Nameserver: "192.168.1.234:9876",
ClientIp: "192.168.1.23",
InstanceName: "DEFAULT",
}
consumer, err := rocketmq.NewDefaultConsumer("C_TEST", conf)
if err != nil {
log.Panic(err)
}
consumer.Subscribe("test2", "*")
consumer.Subscribe("test3", "*")
consumer.RegisterMessageListener(func(msgs []*rocketmq.MessageExt) error {
for i, msg := range msgs {
log.Print(i, string(msg.Body))
}
return nil
})
consumer.Start()
#Import package
import rocketmq "github.com/go_rocket_mq"

#Getting started
###Get message with consumer
```
group := "dev-VodHotClacSrcData"
topic := "canal_vod_collect__video_collected_count_live"
var timeSleep = 30 * time.Second
conf := &rocketmq.Config{
Nameserver: "192.168.7.101:9876;192.168.7.102:9876;192.168.7.103:9876",
ClientIp: "192.168.1.23",
InstanceName: "DEFAULT",
}
consumer, err := rocketmq.NewDefaultConsumer(consumerGroup, consumerConf)
if err != nil {
return err
}
consumer.Subscribe(consumerTopic, "*")
consumer.RegisterMessageListener(
func(msgs []*MessageExt) error {
for i, msg := range msgs {
fmt.Println("msg", i, msg.Topic, msg.Flag, msg.Properties, string(msg.Body))
}
fmt.Println("Consume success!")
return nil
})
consumer.Start()
time.Sleep(timeSleep)
```
###Send message with producer
- Synchronous sending
```
group := "dev-VodHotClacSrcData"
topic := "canal_vod_collect__video_collected_count_live"
conf := &rocketmq.Config{
Nameserver: "192.168.7.101:9876;192.168.7.102:9876;192.168.7.103:9876",
ClientIp: "192.168.1.23",
InstanceName: "DEFAULT",
}
producer, err := rocketmq.NewDefaultProducer(group, conf)
producer.Start()
if err != nil {
return errors.New("NewDefaultProducer err")
}
msg := NewMessage(topic, []byte("Hello RocketMQ!")
if sendResult, err := producer.Send(msg); err != nil {
return errors.New("Sync send fail!") // 如果不是如预期的那么就报错
} else {
fmt.Printlnf("sendResult", sendResult)
fmt.Printlnf("Sync send success!")
}
```
- Asynchronous sending
```
group := "dev-VodHotClacSrcData"
topic := "canal_vod_collect__video_collected_count_live"
conf := &rocketmq.Config{
Nameserver: "192.168.7.101:9876;192.168.7.102:9876;192.168.7.103:9876",
ClientIp: "192.168.1.23",
InstanceName: "DEFAULT",
}
producer, err := rocketmq.NewDefaultProducer(group, conf)
producer.Start()
if err != nil {
return err
}
msg := NewMessage(topic, []byte("Hello RocketMQ!")
sendCallback := func() error {
fmt.Printlnf("I am callback")
return nil
}
if err := producer.SendAsync(msg, sendCallback); err != nil {
return err
} else {
fmt.Printlnf("Async send success!")
}
```
- Oneway sending
```
group := "dev-VodHotClacSrcData"
topic := "canal_vod_collect__video_collected_count_live"
conf := &rocketmq.Config{
Nameserver: "192.168.7.101:9876;192.168.7.102:9876;192.168.7.103:9876",
ClientIp: "192.168.1.23",
InstanceName: "DEFAULT",
}
producer, err := rocketmq.NewDefaultProducer(group, conf)
producer.Start()
if err != nil {
return err
}
msg := NewMessage(topic, []byte("Hello RocketMQ!")
if err := producer.SendOneway(msg); err != nil {
return err
} else {
fmt.Printlnf("Oneway send success!")
}
```
Loading

0 comments on commit d8d1b2b

Please sign in to comment.