diff --git a/README.md b/README.md index 5f42c67..4584437 100644 --- a/README.md +++ b/README.md @@ -1,24 +1,105 @@ -# go_rocket_mq -go rocketmq client -#import -import rocketmq "github.com/didapinchegit/go_rocket_mq" +#Introduction +This is a RocketMQ client of golang. -#example - conf := &rocketmq.Config{ - Nameserver: "192.168.1.234:9876", - ClientIp: "192.168.1.23", - InstanceName: "DEFAULT", - } - consumer, err := rocketmq.NewDefaultConsumer("C_TEST", conf) - if err != nil { - log.Panic(err) - } - consumer.Subscribe("test2", "*") - consumer.Subscribe("test3", "*") - consumer.RegisterMessageListener(func(msgs []*rocketmq.MessageExt) error { - for i, msg := range msgs { - log.Print(i, string(msg.Body)) - } - return nil - }) - consumer.Start() +#Import package +import rocketmq "github.com/go_rocket_mq" + +#Getting started +###Get message with consumer +``` +group := "dev-VodHotClacSrcData" +topic := "canal_vod_collect__video_collected_count_live" +var timeSleep = 30 * time.Second +conf := &rocketmq.Config{ + Nameserver: "192.168.7.101:9876;192.168.7.102:9876;192.168.7.103:9876", + ClientIp: "192.168.1.23", + InstanceName: "DEFAULT", +} + +consumer, err := rocketmq.NewDefaultConsumer(consumerGroup, consumerConf) +if err != nil { + return err +} +consumer.Subscribe(consumerTopic, "*") +consumer.RegisterMessageListener( + func(msgs []*MessageExt) error { + for i, msg := range msgs { + fmt.Println("msg", i, msg.Topic, msg.Flag, msg.Properties, string(msg.Body)) + } + fmt.Println("Consume success!") + return nil + }) +consumer.Start() + +time.Sleep(timeSleep) +``` +###Send message with producer +- Synchronous sending +``` +group := "dev-VodHotClacSrcData" +topic := "canal_vod_collect__video_collected_count_live" +conf := &rocketmq.Config{ + Nameserver: "192.168.7.101:9876;192.168.7.102:9876;192.168.7.103:9876", + ClientIp: "192.168.1.23", + InstanceName: "DEFAULT", +} + +producer, err := rocketmq.NewDefaultProducer(group, conf) +producer.Start() +if err != nil { + return errors.New("NewDefaultProducer err") +} +msg := NewMessage(topic, []byte("Hello RocketMQ!") +if sendResult, err := producer.Send(msg); err != nil { + return errors.New("Sync send fail!") // 如果不是如预期的那么就报错 +} else { + fmt.Printlnf("sendResult", sendResult) + fmt.Printlnf("Sync send success!") +} +``` +- Asynchronous sending +``` +group := "dev-VodHotClacSrcData" +topic := "canal_vod_collect__video_collected_count_live" +conf := &rocketmq.Config{ + Nameserver: "192.168.7.101:9876;192.168.7.102:9876;192.168.7.103:9876", + ClientIp: "192.168.1.23", + InstanceName: "DEFAULT", +} +producer, err := rocketmq.NewDefaultProducer(group, conf) +producer.Start() +if err != nil { + return err +} +msg := NewMessage(topic, []byte("Hello RocketMQ!") +sendCallback := func() error { + fmt.Printlnf("I am callback") + return nil +} +if err := producer.SendAsync(msg, sendCallback); err != nil { + return err +} else { + fmt.Printlnf("Async send success!") +} +``` +- Oneway sending +``` +group := "dev-VodHotClacSrcData" +topic := "canal_vod_collect__video_collected_count_live" +conf := &rocketmq.Config{ + Nameserver: "192.168.7.101:9876;192.168.7.102:9876;192.168.7.103:9876", + ClientIp: "192.168.1.23", + InstanceName: "DEFAULT", +} +producer, err := rocketmq.NewDefaultProducer(group, conf) +producer.Start() +if err != nil { + return err +} +msg := NewMessage(topic, []byte("Hello RocketMQ!") +if err := producer.SendOneway(msg); err != nil { + return err +} else { + fmt.Printlnf("Oneway send success!") +} +``` \ No newline at end of file diff --git a/consumer.go b/consumer.go index 35b7f61..633fc7e 100644 --- a/consumer.go +++ b/consumer.go @@ -2,7 +2,7 @@ package rocketmq import ( "fmt" - "github.com/golang/glog" + //"github.com/golang/glog" "net" "os" "strconv" @@ -11,19 +11,19 @@ import ( ) const ( - BrokerSuspendMaxTimeMillis = 1000 * 15 - FLAG_COMMIT_OFFSET int32 = 0x1 << 0 - FLAG_SUSPEND int32 = 0x1 << 1 - FLAG_SUBSCRIPTION int32 = 0x1 << 2 - FLAG_CLASS_FILTER int32 = 0x1 << 3 + BrokerSuspendMaxTimeMillis = 1000 * 15 + FlagCommitOffset int32 = 0x1 << 0 + FlagSuspend int32 = 0x1 << 1 + FlagSubscription int32 = 0x1 << 2 + FlagClassFilter int32 = 0x1 << 3 ) type MessageListener func(msgs []*MessageExt) error -var DEFAULT_IP = GetLocalIp4() +var DefaultIp = GetLocalIp4() type Config struct { - Nameserver string + Namesrv string ClientIp string InstanceName string } @@ -34,7 +34,7 @@ type Consumer interface { Shutdown() RegisterMessageListener(listener MessageListener) Subscribe(topic string, subExpression string) - UnSubcribe(topic string) + UnSubscribe(topic string) SendMessageBack(msg MessageExt, delayLevel int) error SendMessageBack1(msg MessageExt, delayLevel int, brokerName string) error fetchSubscribeMessageQueues(topic string) error @@ -48,45 +48,45 @@ type DefaultConsumer struct { messageModel string unitMode bool - subscription map[string]string - messageListener MessageListener - offsetStore OffsetStore - brokers map[string]net.Conn + subscription map[string]string + messageListener MessageListener + offsetStore OffsetStore + brokers map[string]net.Conn - rebalance *Rebalance - remotingClient RemotingClient - mqClient *MqClient + rebalance *Rebalance + remotingClient RemotingClient + mqClient *MqClient } -func NewDefaultConsumer(name string, conf *Config) (Consumer, error) { +func NewDefaultConsumer(consumerGroup string, conf *Config) (Consumer, error) { if conf == nil { conf = &Config{ - Nameserver: os.Getenv("ROCKETMQ_NAMESVR"), + Namesrv: os.Getenv("ROCKETMQ_NAMESVR"), InstanceName: "DEFAULT", } } if conf.ClientIp == "" { - conf.ClientIp = DEFAULT_IP + conf.ClientIp = DefaultIp } remotingClient := NewDefaultRemotingClient() mqClient := NewMqClient() rebalance := NewRebalance() - rebalance.groupName = name + rebalance.groupName = consumerGroup rebalance.mqClient = mqClient offsetStore := new(RemoteOffsetStore) offsetStore.mqClient = mqClient - offsetStore.groupName = name + offsetStore.groupName = consumerGroup offsetStore.offsetTable = make(map[MessageQueue]int64) pullMessageService := NewPullMessageService() consumer := &DefaultConsumer{ conf: conf, - consumerGroup: name, + consumerGroup: consumerGroup, consumeFromWhere: "CONSUME_FROM_LAST_OFFSET", subscription: make(map[string]string), offsetStore: offsetStore, @@ -96,81 +96,81 @@ func NewDefaultConsumer(name string, conf *Config) (Consumer, error) { mqClient: mqClient, } - mqClient.consumerTable[name] = consumer + mqClient.consumerTable[consumerGroup] = consumer mqClient.remotingClient = remotingClient mqClient.conf = conf mqClient.clientId = conf.ClientIp + "@" + strconv.Itoa(os.Getpid()) mqClient.pullMessageService = pullMessageService rebalance.consumer = consumer - pullMessageService.consumer = consumer + pullMessageService.service = consumer return consumer, nil } -func (self *DefaultConsumer) Start() error { - self.mqClient.start() +func (c *DefaultConsumer) Start() error { + c.mqClient.start() return nil } -func (self *DefaultConsumer) Shutdown() { +func (c *DefaultConsumer) Shutdown() { } -func (self *DefaultConsumer) RegisterMessageListener(messageListener MessageListener) { - self.messageListener = messageListener +func (c *DefaultConsumer) RegisterMessageListener(messageListener MessageListener) { + c.messageListener = messageListener } -func (self *DefaultConsumer) Subscribe(topic string, subExpression string) { - self.subscription[topic] = subExpression +func (c *DefaultConsumer) Subscribe(topic string, subExpression string) { + c.subscription[topic] = subExpression subData := &SubscriptionData{ Topic: topic, SubString: subExpression, } - self.rebalance.subscriptionInner[topic] = subData + c.rebalance.subscriptionInner[topic] = subData } -func (self *DefaultConsumer) UnSubcribe(topic string) { - delete(self.subscription, topic) +func (c *DefaultConsumer) UnSubscribe(topic string) { + delete(c.subscription, topic) } -func (self *DefaultConsumer) SendMessageBack(msg MessageExt, delayLevel int) error { +func (c *DefaultConsumer) SendMessageBack(msg MessageExt, delayLevel int) error { return nil } -func (self *DefaultConsumer) SendMessageBack1(msg MessageExt, delayLevel int, brokerName string) error { +func (c *DefaultConsumer) SendMessageBack1(msg MessageExt, delayLevel int, brokerName string) error { return nil } -func (self *DefaultConsumer) fetchSubscribeMessageQueues(topic string) error { +func (c *DefaultConsumer) fetchSubscribeMessageQueues(topic string) error { return nil } -func (self *DefaultConsumer) pullMessage(pullRequest *PullRequest) { +func (c *DefaultConsumer) pullMessage(pullRequest *PullRequest) { commitOffsetEnable := false commitOffsetValue := int64(0) - commitOffsetValue = self.offsetStore.readOffset(pullRequest.messageQueue, READ_FROM_MEMORY) + commitOffsetValue = c.offsetStore.readOffset(pullRequest.messageQueue, ReadFromMemory) if commitOffsetValue > 0 { commitOffsetEnable = true } - var sysFlag int32 = 0 + var sysFlag = int32(0) if commitOffsetEnable { - sysFlag |= FLAG_COMMIT_OFFSET + sysFlag |= FlagCommitOffset } - sysFlag |= FLAG_SUSPEND + sysFlag |= FlagSuspend - subscriptionData, ok := self.rebalance.subscriptionInner[pullRequest.messageQueue.topic] + subscriptionData, ok := c.rebalance.subscriptionInner[pullRequest.messageQueue.topic] var subVersion int64 var subString string if ok { subVersion = subscriptionData.SubVersion subString = subscriptionData.SubString - sysFlag |= FLAG_SUBSCRIPTION + sysFlag |= FlagSubscription } requestHeader := new(PullMessageRequestHeader) @@ -189,11 +189,11 @@ func (self *DefaultConsumer) pullMessage(pullRequest *PullRequest) { } pullCallback := func(responseFuture *ResponseFuture) { - var nextBeginOffset int64 = pullRequest.nextOffset + var nextBeginOffset = pullRequest.nextOffset if responseFuture != nil { responseCommand := responseFuture.responseCommand - if responseCommand.Code == SUCCESS && len(responseCommand.Body) > 0 { + if responseCommand.Code == Success && len(responseCommand.Body) > 0 { var err error pullResult, ok := responseCommand.ExtFields.(map[string]interface{}) if ok { @@ -201,27 +201,24 @@ func (self *DefaultConsumer) pullMessage(pullRequest *PullRequest) { if nextBeginOffsetStr, ok := nextBeginOffsetInter.(string); ok { nextBeginOffset, err = strconv.ParseInt(nextBeginOffsetStr, 10, 64) if err != nil { - glog.Error(err) + fmt.Println(err) return } - } - } - } msgs := decodeMessage(responseFuture.responseCommand.Body) - err = self.messageListener(msgs) + err = c.messageListener(msgs) if err != nil { - glog.Error(err) + fmt.Println(err) //TODO retry } else { - self.offsetStore.updateOffset(pullRequest.messageQueue, nextBeginOffset, false) + c.offsetStore.updateOffset(pullRequest.messageQueue, nextBeginOffset, false) } - } else if responseCommand.Code == PULL_NOT_FOUND { - } else if responseCommand.Code == PULL_RETRY_IMMEDIATELY || responseCommand.Code == PULL_OFFSET_MOVED { - glog.Errorf("pull message error,code=%d,request=%v", responseCommand.Code,requestHeader) + } else if responseCommand.Code == PullNotFound { + } else if responseCommand.Code == PullRetryImmediately || responseCommand.Code == PullOffsetMoved { + fmt.Printf("pull message error,code=%d,request=%v", responseCommand.Code, requestHeader) var err error pullResult, ok := responseCommand.ExtFields.(map[string]interface{}) if ok { @@ -229,23 +226,19 @@ func (self *DefaultConsumer) pullMessage(pullRequest *PullRequest) { if nextBeginOffsetStr, ok := nextBeginOffsetInter.(string); ok { nextBeginOffset, err = strconv.ParseInt(nextBeginOffsetStr, 10, 64) if err != nil { - glog.Error(err) + fmt.Println(err) } - } - } - } - //time.Sleep(1 * time.Second) } else { - glog.Error(fmt.Sprintf("pull message error,code=%d,body=%s", responseCommand.Code, string(responseCommand.Body))) - glog.Error(pullRequest.messageQueue) + fmt.Println(fmt.Sprintf("pull message error,code=%d,body=%s", responseCommand.Code, string(responseCommand.Body))) + fmt.Println(pullRequest.messageQueue) time.Sleep(1 * time.Second) } } else { - glog.Error("responseFuture is nil") + fmt.Println("responseFuture is nil") } nextPullRequest := &PullRequest{ @@ -254,15 +247,15 @@ func (self *DefaultConsumer) pullMessage(pullRequest *PullRequest) { messageQueue: pullRequest.messageQueue, } - self.mqClient.pullMessageService.pullRequestQueue <- nextPullRequest + c.mqClient.pullMessageService.pullRequestQueue <- nextPullRequest } - brokerAddr, _, found := self.mqClient.findBrokerAddressInSubscribe(pullRequest.messageQueue.brokerName, 0, false) + brokerAddr, _, found := c.mqClient.findBrokerAddressInSubscribe(pullRequest.messageQueue.brokerName, 0, false) if found { currOpaque := atomic.AddInt32(&opaque, 1) remotingCommand := new(RemotingCommand) - remotingCommand.Code = PULL_MESSAGE + remotingCommand.Code = PullMsg remotingCommand.Opaque = currOpaque remotingCommand.Flag = 0 remotingCommand.Language = "JAVA" @@ -270,31 +263,41 @@ func (self *DefaultConsumer) pullMessage(pullRequest *PullRequest) { remotingCommand.ExtFields = requestHeader - self.remotingClient.invokeAsync(brokerAddr, remotingCommand, 1000, pullCallback) + c.remotingClient.invokeAsync(brokerAddr, remotingCommand, 1000, pullCallback) } } -func (self *DefaultConsumer) updateTopicSubscribeInfo(topic string, info []*MessageQueue) { - if self.rebalance.subscriptionInner != nil { - self.rebalance.subscriptionInnerLock.RLock() - _, ok := self.rebalance.subscriptionInner[topic] - self.rebalance.subscriptionInnerLock.RUnlock() +func (c *DefaultConsumer) updateTopicSubscribeInfo(topic string, info []*MessageQueue) { + if c.rebalance.subscriptionInner != nil { + c.rebalance.subscriptionInnerLock.RLock() + _, ok := c.rebalance.subscriptionInner[topic] + c.rebalance.subscriptionInnerLock.RUnlock() if ok { - self.rebalance.subscriptionInnerLock.Lock() - self.rebalance.topicSubscribeInfoTable[topic] = info - self.rebalance.subscriptionInnerLock.Unlock() + c.rebalance.subscriptionInnerLock.Lock() + c.rebalance.topicSubscribeInfoTable[topic] = info + c.rebalance.subscriptionInnerLock.Unlock() } } } -func (self *DefaultConsumer) subscriptions() []*SubscriptionData { +func (c *DefaultConsumer) subscriptions() []*SubscriptionData { subscriptions := make([]*SubscriptionData, 0) - for _, subscription := range self.rebalance.subscriptionInner { + for _, subscription := range c.rebalance.subscriptionInner { subscriptions = append(subscriptions, subscription) } return subscriptions } -func (self *DefaultConsumer) doRebalance() { - self.rebalance.doRebalance() +func (c *DefaultConsumer) doRebalance() { + c.rebalance.doRebalance() +} + +func (c *DefaultConsumer) isSubscribeTopicNeedUpdate(topic string) bool { + subTable := c.rebalance.subscriptionInner + if _, ok := subTable[topic]; ok { + if _, ok := c.rebalance.topicSubscribeInfoTable[topic]; !ok { + return true + } + } + return false } diff --git a/consumer_test.go b/consumer_test.go new file mode 100644 index 0000000..15a4be0 --- /dev/null +++ b/consumer_test.go @@ -0,0 +1,38 @@ +package rocketmq + +import ( + "testing" + "time" +) + +// dev-goProducerConsumerTest +var consumerGroup = "dev-goProducerConsumerTest" + +// goProducerConsumerTest +var consumerTopic = "goProducerConsumerTest" +var timeSleep = 60 * time.Second +var consumerConf = &Config{ + //Nameserver: "192.168.7.103:9876", + Namesrv: "192.168.6.69:9876", + ClientIp: "192.168.23.137", + InstanceName: "DEFAULT_tt", +} + +func TestConsume(t *testing.T) { + consumer, err := NewDefaultConsumer(consumerGroup, consumerConf) + if err != nil { + t.Fatalf("NewDefaultConsumer err, %s", err) + } + consumer.Subscribe(consumerTopic, "*") + consumer.RegisterMessageListener( + func(msgs []*MessageExt) error { + for i, msg := range msgs { + t.Log("msg", i, msg.Topic, msg.Flag, msg.Properties, string(msg.Body)) + } + t.Log("Consume success!") + return nil + }) + consumer.Start() + + time.Sleep(timeSleep) +} diff --git a/example/main.go b/example/main.go deleted file mode 100644 index 79c68ae..0000000 --- a/example/main.go +++ /dev/null @@ -1,29 +0,0 @@ -package main - -import ( - rocketmq "didapinche.com/go_rocket_mq" - "github.com/golang/glog" - "time" - "flag" -) - -func main() { - flag.Parse() - conf := &rocketmq.Config{ - Nameserver: "192.168.1.234:9876", - InstanceName: "DEFAULT", - } - consumer, err := rocketmq.NewDefaultConsumer("C_TEST", conf) - if err != nil { - panic(err) - } - consumer.Subscribe("t_city", "*") - consumer.RegisterMessageListener(func(msgs []*rocketmq.MessageExt) error { - for i, msg := range msgs { - glog.Info(i, string(msg.Body)) - } - return nil - }) - consumer.Start() - time.Sleep(1000 * time.Second) -} diff --git a/init.go b/init.go new file mode 100644 index 0000000..950f0ef --- /dev/null +++ b/init.go @@ -0,0 +1,10 @@ +package rocketmq + +import ( + "math/rand" + "time" +) + +func init() { + rand.Seed(time.Now().UnixNano()) +} diff --git a/message.go b/message.go index 8ab5a17..f9bb0c4 100644 --- a/message.go +++ b/message.go @@ -2,15 +2,33 @@ package rocketmq import ( "bytes" + "compress/zlib" "encoding/binary" - "github.com/golang/glog" "encoding/json" + "errors" + "fmt" "io/ioutil" - "compress/zlib" + "strconv" +) + +const ( + CompressedFlag = (0x1 << 0) + MultiTagsFlag = (0x1 << 1) + TransactionNotType = (0x0 << 2) + TransactionPreparedType = (0x1 << 2) + TransactionCommitType = (0x2 << 2) + TransactionRollbackType = (0x3 << 2) ) -const( - CompressedFlag = (0x1 << 0) + +const ( + NameValueSeparator = 1 + iota + PropertySeparator +) + +const ( + CharacterMaxLength = 255 ) + type Message struct { Topic string Flag int32 @@ -18,6 +36,14 @@ type Message struct { Body []byte } +func NewMessage(topic string, body []byte) *Message { + return &Message{ + Topic: topic, + Body: body, + Properties: make(map[string]string), + } +} + type MessageExt struct { Message QueueId int32 @@ -25,9 +51,9 @@ type MessageExt struct { QueueOffset int64 SysFlag int32 BornTimestamp int64 - //bornHost + // bornHost StoreTimestamp int64 - //storeHost + // storeHost MsgId string CommitLogOffset int64 BodyCRC int32 @@ -43,7 +69,7 @@ func decodeMessage(data []byte) []*MessageExt { var topic, body, properties, bornHost, storeHost []byte var propertiesLength int16 - var propertiesmap map[string]string + var propertiesMap map[string]string msgs := make([]*MessageExt, 0, 32) for buf.Len() > 0 { @@ -71,40 +97,38 @@ func decodeMessage(data []byte) []*MessageExt { body = make([]byte, bodyLength) binary.Read(buf, binary.BigEndian, body) - if (sysFlag & CompressedFlag) == CompressedFlag { + if (sysFlag & CompressedFlag) == CompressedFlag { b := bytes.NewReader(body) z, err := zlib.NewReader(b) if err != nil { - glog.Error(err) + fmt.Println(err) return nil } defer z.Close() body, err = ioutil.ReadAll(z) if err != nil { - glog.Error(err) + fmt.Println(err) return nil } } } binary.Read(buf, binary.BigEndian, &topicLen) - topic = make([]byte, topicLen) + topic = make([]byte, 0) binary.Read(buf, binary.BigEndian, &topic) binary.Read(buf, binary.BigEndian, &propertiesLength) - if propertiesLength >0 { + if propertiesLength > 0 { properties = make([]byte, propertiesLength) binary.Read(buf, binary.BigEndian, &properties) - propertiesmap = make(map[string]string) - json.Unmarshal(properties,&propertiesmap) + propertiesMap = make(map[string]string) + json.Unmarshal(properties, &propertiesMap) } if magicCode != -626843481 { - glog.Infof("magic code is error %d",magicCode) + fmt.Printf("magic code is error %d", magicCode) return nil } - - msg.Topic = string(topic) msg.QueueId = queueId msg.SysFlag = sysFlag @@ -118,10 +142,47 @@ func decodeMessage(data []byte) []*MessageExt { msg.StoreTimestamp = storeTimestamp msg.PreparedTransactionOffset = preparedTransactionOffset msg.Body = body - msg.Properties = propertiesmap + msg.Properties = propertiesMap msgs = append(msgs, msg) } return msgs } + +func messageProperties2String(properties map[string]string) string { + StringBuilder := bytes.NewBuffer([]byte{}) + if properties != nil && len(properties) != 0 { + for k, v := range properties { + binary.Write(StringBuilder, binary.BigEndian, k) // 4 + binary.Write(StringBuilder, binary.BigEndian, NameValueSeparator) // 4 + binary.Write(StringBuilder, binary.BigEndian, v) // 4 + binary.Write(StringBuilder, binary.BigEndian, PropertySeparator) // 4 + } + } + return StringBuilder.String() +} + +func (msg Message) checkMessage(producer *DefaultProducer) (err error) { + if err = checkTopic(msg.Topic); err != nil { + if len(msg.Body) == 0 { + err = errors.New("ResponseCode:" + strconv.Itoa(MsgIllegal) + ", the message body is null") + } else if len(msg.Body) > producer.maxMessageSize { + err = errors.New("ResponseCode:" + strconv.Itoa(MsgIllegal) + ", the message body size over max value, MAX:" + strconv.Itoa(producer.maxMessageSize)) + } + } + return +} + +func checkTopic(topic string) (err error) { + if topic == "" { + err = errors.New("the specified topic is blank") + } + if len(topic) > CharacterMaxLength { + err = errors.New("the specified topic is longer than topic max length 255") + } + if topic == DefaultTopic { + err = errors.New("the topic[" + topic + "] is conflict with default topic") + } + return +} diff --git a/message_client_id_setter.go b/message_client_id_setter.go new file mode 100644 index 0000000..cf4c1b1 --- /dev/null +++ b/message_client_id_setter.go @@ -0,0 +1,67 @@ +package rocketmq + +import ( + "bytes" + "encoding/binary" + "os" + "time" +) + +type messageClientIDSetter struct { + counter int + basePos int + startTime int64 + nextStartTime int64 + //ip + pid + classloaderid + counter + time + stringBuilder *bytes.Buffer + buffer *bytes.Buffer +} + +var stringBuilder = bytes.NewBuffer([]byte{}) + +func init() { + binary.Write(stringBuilder, binary.BigEndian, GetLocalIp4()) // 4 + binary.Write(stringBuilder, binary.BigEndian, os.Getpid()) // 2 + binary.Write(stringBuilder, binary.BigEndian, hashCode()) // 4 + MessageClientIDSetter.stringBuilder = stringBuilder + MessageClientIDSetter.setStartTime() +} + +var MessageClientIDSetter = messageClientIDSetter{ + //length := 4 + 2 + 4 + 4 + 2 + stringBuilder: bytes.NewBuffer([]byte{}), + basePos: stringBuilder.Len() * 2, + counter: 0, +} + +func hashCode() []byte { + tmpByte := []byte{1, 1, 1, 1} + return tmpByte +} + +func (m messageClientIDSetter) setUniqID(msg *Message) { + if msg.Properties[MessageConst.PropertyUniqClientMessageIdKeyidx] == "" { + msg.Properties[MessageConst.PropertyUniqClientMessageIdKeyidx] = m.createUniqID() + } +} + +func (m messageClientIDSetter) getUniqID(msg *Message) string { + return msg.Properties[MessageConst.PropertyUniqClientMessageIdKeyidx] +} + +func (m messageClientIDSetter) createUniqID() string { + current := time.Now().UnixNano() + if current > m.nextStartTime { + m.setStartTime() + } + binary.Write(m.stringBuilder, binary.BigEndian, time.Now().UnixNano()-m.startTime) + m.counter++ + binary.Write(m.stringBuilder, binary.BigEndian, m.counter) + + return m.stringBuilder.String() +} + +func (m messageClientIDSetter) setStartTime() { + m.startTime = time.Now().UnixNano() + m.nextStartTime = time.Now().UnixNano() + 2592000000000000 // next 30 days, 3600 * 24 * 30 * 1000 * 1000 *1000 +} diff --git a/message_const.go b/message_const.go new file mode 100644 index 0000000..2cc64d5 --- /dev/null +++ b/message_const.go @@ -0,0 +1,81 @@ +package rocketmq + +type messageConst struct { + PropertyKeys string + PropertyTags string + PropertyWaitStoreMsgOk string + PropertyDelayTimeLevel string + PropertyRetryTopic string + PropertyRealTopic string + PropertyRealQueueId string + PropertyTransactionPrepared string + PropertyProducerGroup string + PropertyMinOffset string + PropertyMaxOffset string + PropertyBuyerId string + PropertyOriginMessageId string + PropertyTransferFlag string + PropertyCorrectionFlag string + PropertyMq2Flag string + PropertyReconsumeTime string + PropertyMsgRegion string + PropertyUniqClientMessageIdKeyidx string + PropertyMaxReconsumeTimes string + PropertyConsumeStartTimeStamp string + + KeySeparator string + systemKeySet []string +} + +var MessageConst = &messageConst{ + PropertyKeys: "KEYS", + PropertyTags: "TAGS", + PropertyWaitStoreMsgOk: "WAIT", + PropertyDelayTimeLevel: "DELAY", + PropertyRetryTopic: "RETRY_TOPIC", + PropertyRealTopic: "REAL_TOPIC", + PropertyRealQueueId: "REAL_QID", + PropertyTransactionPrepared: "TRAN_MSG", + PropertyProducerGroup: "PGROUP", + PropertyMinOffset: "MIN_OFFSET", + PropertyMaxOffset: "MAX_OFFSET", + PropertyBuyerId: "BUYER_ID", + PropertyOriginMessageId: "ORIGIN_MESSAGE_ID", + PropertyTransferFlag: "TRANSFER_FLAG", + PropertyCorrectionFlag: "CORRECTION_FLAG", + PropertyMq2Flag: "MQ2_FLAG", + PropertyReconsumeTime: "RECONSUME_TIME", + PropertyMsgRegion: "MSG_REGION", + PropertyUniqClientMessageIdKeyidx: "UNIQ_KEY", + PropertyMaxReconsumeTimes: "MAX_RECONSUME_TIMES", + PropertyConsumeStartTimeStamp: "CONSUME_START_TIME", + + KeySeparator: "", +} + +func init() { + var systemKeySet = make([]string, 0) + systemKeySet = append(systemKeySet, MessageConst.PropertyKeys) + systemKeySet = append(systemKeySet, MessageConst.PropertyTags) + systemKeySet = append(systemKeySet, MessageConst.PropertyWaitStoreMsgOk) + systemKeySet = append(systemKeySet, MessageConst.PropertyDelayTimeLevel) + systemKeySet = append(systemKeySet, MessageConst.PropertyRetryTopic) + systemKeySet = append(systemKeySet, MessageConst.PropertyRealTopic) + systemKeySet = append(systemKeySet, MessageConst.PropertyRealQueueId) + systemKeySet = append(systemKeySet, MessageConst.PropertyTransactionPrepared) + systemKeySet = append(systemKeySet, MessageConst.PropertyProducerGroup) + systemKeySet = append(systemKeySet, MessageConst.PropertyMinOffset) + systemKeySet = append(systemKeySet, MessageConst.PropertyMaxOffset) + systemKeySet = append(systemKeySet, MessageConst.PropertyBuyerId) + systemKeySet = append(systemKeySet, MessageConst.PropertyOriginMessageId) + systemKeySet = append(systemKeySet, MessageConst.PropertyTransferFlag) + systemKeySet = append(systemKeySet, MessageConst.PropertyCorrectionFlag) + systemKeySet = append(systemKeySet, MessageConst.PropertyMq2Flag) + systemKeySet = append(systemKeySet, MessageConst.PropertyReconsumeTime) + systemKeySet = append(systemKeySet, MessageConst.PropertyMsgRegion) + systemKeySet = append(systemKeySet, MessageConst.PropertyUniqClientMessageIdKeyidx) + systemKeySet = append(systemKeySet, MessageConst.PropertyMaxReconsumeTimes) + systemKeySet = append(systemKeySet, MessageConst.PropertyConsumeStartTimeStamp) + + MessageConst.systemKeySet = systemKeySet +} diff --git a/message_queue.go b/message_queue.go index 3742650..f45d632 100644 --- a/message_queue.go +++ b/message_queue.go @@ -6,19 +6,31 @@ type MessageQueue struct { queueId int32 } -func (self *MessageQueue) clone() *MessageQueue { +func NewMessageQueue(topic string, brokerName string, queueId int32) *MessageQueue { + return &MessageQueue{ + topic: topic, + brokerName: brokerName, + queueId: queueId, + } +} + +func (m *MessageQueue) clone() *MessageQueue { no := new(MessageQueue) - no.topic = self.topic - no.queueId = self.queueId - no.brokerName = self.brokerName + no.topic = m.topic + no.queueId = m.queueId + no.brokerName = m.brokerName return no } +func (m MessageQueue) getBrokerName() string { + return m.brokerName +} + type MessageQueues []*MessageQueue -func (self MessageQueues) Less(i, j int) bool { - imq := self[i] - jmq := self[j] +func (m MessageQueues) Less(i, j int) bool { + imq := m[i] + jmq := m[j] if imq.topic < jmq.topic { return true @@ -34,15 +46,14 @@ func (self MessageQueues) Less(i, j int) bool { if imq.queueId < jmq.queueId { return true - } else { - return false } + return false } -func (self MessageQueues) Swap(i, j int) { - self[i], self[j] = self[j], self[i] +func (m MessageQueues) Swap(i, j int) { + m[i], m[j] = m[j], m[i] } -func (self MessageQueues) Len() int { - return len(self) +func (m MessageQueues) Len() int { + return len(m) } diff --git a/mix_all.go b/mix_all.go new file mode 100644 index 0000000..182d475 --- /dev/null +++ b/mix_all.go @@ -0,0 +1,56 @@ +package rocketmq + +import ( + "strconv" + "strings" +) + +const ( + RocketmqHomeEnv = "ROCKETMQ_HOME" + RocketmqHomeProperty = "rocketmq.home.dir" + NamesrvAddrEnv = "NAMESRV_ADDR" + NamesrvAddrProperty = "rocketmq.namesrv.addr" + MessageCompressLevel = "rocketmq.message.compressLevel" + WsDomainName = "192.168.7.101" + WsDomainSubgroup = "" + WsAddr = "http://" + WsDomainName + ":8080/rocketmq/" + WsDomainSubgroup + DefaultTopic = "TBW102" + BenchmarkTopic = "BenchmarkTest" + DefaultProducerGroup = "DEFAULT_PRODUCER" + DefaultConsumerGroup = "DEFAULT_CONSUMER" + ToolsConsumerGroup = "TOOLS_CONSUMER" + FiltersrvConsumerGroup = "FILTERSRV_CONSUMER" + MonitrorConsumerGroup = "__MONITOR_CONSUMER" + ClientInnerProducerGroup = "CLIENT_INNER_PRODUCER" + SelfTestProducerGroup = "SELF_TEST_P_GROUP" + SelfTestConsumerGroup = "SELF_TEST_C_GROUP" + SelfTestTopic = "SELF_TEST_TOPIC" + OffsetMovedEvent = "OFFSET_MOVED_EVENT" + OnsHttpProxyGroup = "CID_ONS-HTTP-PROXY" + CidOnsapiPermissionGroup = "CID_ONSAPI_PERMISSION" + CidOnsapiOwnerGroup = "CID_ONSAPI_OWNER" + CidOnsapiPullGroup = "CID_ONSAPI_PULL" + CidRmqSysPerfix = "CID_RMQ_SYS_" + + Localhost = "127.0.0.1" + DefaultCharset = "UTF-8" + MasterId = 0 + + RetryGroupTopicPrefix = "%RETRY%" + DlqGroupTopicPerfix = "%DLQ%" + SysTopicPerfix = "rmq_sys_" + UniqMsgQueryFlag = "_UNIQUE_KEY_QUERY" +) + +type MixAll struct{} + +func BrokerVIPChannel(isChange bool, brokerAddr string) (borkerAddrNew string) { + borkerAddrNew = brokerAddr + if isChange { + ipAndPort := strings.Split(brokerAddr, ":") + if port, err := strconv.Atoi(ipAndPort[1]); err == nil { + borkerAddrNew = ipAndPort[0] + ":" + strconv.Itoa(port-2) + } + } + return +} diff --git a/mq_client.go b/mq_client.go new file mode 100644 index 0000000..4427c96 --- /dev/null +++ b/mq_client.go @@ -0,0 +1,649 @@ +package rocketmq + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + //"github.com/golang/glog" + "math" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" +) + +type GetRouteInfoRequestHeader struct { + topic string +} + +func (g *GetRouteInfoRequestHeader) MarshalJSON() ([]byte, error) { + var buf bytes.Buffer + buf.WriteString("{\"topic\":\"") + buf.WriteString(g.topic) + buf.WriteString("\"}") + return buf.Bytes(), nil +} + +type QueueData struct { + BrokerName string + ReadQueueNums int32 + WriteQueueNums int32 + Perm int + TopicSynFlag int32 +} + +type BrokerData struct { + BrokerName string + BrokerAddrs map[string]string + BrokerAddrsLock sync.RWMutex +} + +type TopicRouteData struct { + OrderTopicConf string + QueueDatas []*QueueData + BrokerDatas []*BrokerData +} + +type MqClient struct { + clientId string + conf *Config + brokerAddrTable map[string]map[string]string //map[brokerName]map[bokerId]addrs + brokerAddrTableLock sync.RWMutex + consumerTable map[string]*DefaultConsumer + consumerTableLock sync.RWMutex + producerTable map[string]*DefaultProducer + producerTableLock sync.RWMutex + topicRouteTable map[string]*TopicRouteData + topicRouteTableLock sync.RWMutex + remotingClient RemotingClient + pullMessageService *PullMessageService + defaultProducer *DefaultProducer + serviceState int +} + +func NewMqClient() *MqClient { + return &MqClient{ + brokerAddrTable: make(map[string]map[string]string), + consumerTable: make(map[string]*DefaultConsumer), + producerTable: make(map[string]*DefaultProducer), + topicRouteTable: make(map[string]*TopicRouteData), + } +} + +type slice interface { + Len() int +} + +func sliceCompare(a, b interface{}) bool { + switch a.(type) { + case []*QueueData, []*BrokerData: + if a == nil && b == nil { + return true + } + if a == nil || b == nil { + return false + } + if len(a.([]interface{})) != len(b.([]interface{})) { + return false + } + for i := range a.([]interface{}) { + if a.([]interface{})[i] != b.([]interface{})[i] { + return false + } + } + return true + } + return false +} + +func (m *MqClient) findBrokerAddressInSubscribe(brokerName string, brokerId int64, onlyThisBroker bool) (brokerAddr string, slave bool, found bool) { + slave = false + found = false + m.brokerAddrTableLock.RLock() + brokerMap, ok := m.brokerAddrTable[brokerName] + m.brokerAddrTableLock.RUnlock() + if ok { + brokerAddr, ok = brokerMap[strconv.FormatInt(brokerId, 10)] + slave = (brokerId != 0) + found = ok + + if !found && !onlyThisBroker { + var id string + for id, brokerAddr = range brokerMap { + slave = (id != "0") + found = true + break + } + } + } + + return +} + +func (m *MqClient) findBrokerAddressInAdmin(brokerName string) (addr string, found, slave bool) { + found = false + slave = false + m.brokerAddrTableLock.RLock() + brokers, ok := m.brokerAddrTable[brokerName] + m.brokerAddrTableLock.RUnlock() + if ok { + for brokerId, addr := range brokers { + + if addr != "" { + found = true + if brokerId == "0" { + slave = false + } else { + slave = true + } + break + } + } + } + + return +} + +func (m *MqClient) findBrokerAddrByTopic(topic string) (addr string, ok bool) { + m.topicRouteTableLock.RLock() + topicRouteData, ok := m.topicRouteTable[topic] + m.topicRouteTableLock.RUnlock() + if !ok { + return "", ok + } + + brokers := topicRouteData.BrokerDatas + if brokers != nil && len(brokers) > 0 { + brokerData := brokers[0] + if ok { + brokerData.BrokerAddrsLock.RLock() + addr, ok = brokerData.BrokerAddrs["0"] + brokerData.BrokerAddrsLock.RUnlock() + + if ok { + return + } + for _, addr = range brokerData.BrokerAddrs { + return addr, ok + } + } + } + return +} + +func (m *MqClient) findConsumerIdList(topic string, groupName string) ([]string, error) { + brokerAddr, ok := m.findBrokerAddrByTopic(topic) + if !ok { + _, err := m.updateTopicRouteInfoFromNameServerKernel(topic, false, DefaultProducer{}) + fmt.Println(err) + brokerAddr, ok = m.findBrokerAddrByTopic(topic) + } + + if ok { + return m.getConsumerIdListByGroup(brokerAddr, groupName, 3000) + } + + return nil, errors.New("can't find broker") + +} + +type GetConsumerListByGroupRequestHeader struct { + ConsumerGroup string `json:"consumerGroup"` +} + +type GetConsumerListByGroupResponseBody struct { + ConsumerIdList []string +} + +func (m *MqClient) getConsumerIdListByGroup(addr string, consumerGroup string, timeoutMillis int64) ([]string, error) { + requestHeader := new(GetConsumerListByGroupRequestHeader) + requestHeader.ConsumerGroup = consumerGroup + + currOpaque := atomic.AddInt32(&opaque, 1) + request := &RemotingCommand{ + Code: GetConsumerListByGroup, + Language: "JAVA", + Version: 79, + Opaque: currOpaque, + Flag: 0, + ExtFields: requestHeader, + } + + response, err := m.remotingClient.invokeSync(addr, request, timeoutMillis) + if err != nil { + fmt.Println(err) + return nil, err + } + + if response.Code == Success { + getConsumerListByGroupResponseBody := new(GetConsumerListByGroupResponseBody) + bodyjson := strings.Replace(string(response.Body), "0:", "\"0\":", -1) + bodyjson = strings.Replace(bodyjson, "1:", "\"1\":", -1) + err := json.Unmarshal([]byte(bodyjson), getConsumerListByGroupResponseBody) + if err != nil { + fmt.Println(err) + return nil, err + } + return getConsumerListByGroupResponseBody.ConsumerIdList, nil + } + + return nil, errors.New("getConsumerIdListByGroup error") +} + +func (m *MqClient) getTopicRouteInfoFromNameServer(topic string, timeoutMillis int64) (*TopicRouteData, error) { + requestHeader := &GetRouteInfoRequestHeader{ + topic: topic, + } + + remotingCommand := new(RemotingCommand) + remotingCommand.Code = GetRouteinfoByTopic + currOpaque := atomic.AddInt32(&opaque, 1) + remotingCommand.Opaque = currOpaque + remotingCommand.Flag = 0 + remotingCommand.Language = "JAVA" + remotingCommand.Version = 79 + + remotingCommand.ExtFields = requestHeader + + response, err := m.remotingClient.invokeSync(m.conf.Namesrv, remotingCommand, timeoutMillis) + if err != nil { + return nil, err + } + if response.Code == Success { + topicRouteData := new(TopicRouteData) + bodyjson := strings.Replace(string(response.Body), ",0:", ",\"0\":", -1) + bodyjson = strings.Replace(bodyjson, ",1:", ",\"1\":", -1) + bodyjson = strings.Replace(bodyjson, "{0:", "{\"0\":", -1) + bodyjson = strings.Replace(bodyjson, "{1:", "{\"1\":", -1) + err = json.Unmarshal([]byte(bodyjson), topicRouteData) + if err != nil { + fmt.Println("json.Unmarshal", err) + return nil, err + } + return topicRouteData, nil + } else { + return nil, errors.New(fmt.Sprintf("get topicRouteInfo from nameServer error[code:%d,topic:%s]", response.Code, topic)) + } + +} + +func (m *MqClient) updateTopicRouteInfoFromNameServer() { + topicList := make([]string, 0) + for _, consumer := range m.consumerTable { + subscriptions := consumer.subscriptions() + for _, subData := range subscriptions { + topicList = append(topicList, subData.Topic) + } + } + + for _, producer := range m.producerTable { + topicList = append(topicList, producer.getPublishTopicList()...) + } + + for _, topic := range topicList { + m.updateTopicRouteInfoFromNameServerKernel(topic, false, DefaultProducer{}) + } +} + +func (m *MqClient) topicRouteDataIsChange(oldData *TopicRouteData, nowData *TopicRouteData) bool { + if !topicRouteDataIsNil(oldData) || !topicRouteDataIsNil(nowData) { + return false + } + if !sliceCompare(oldData.QueueDatas, nowData.QueueDatas) { + return false + } + if !sliceCompare(oldData.BrokerDatas, nowData.BrokerDatas) { + return false + } + return true +} + +func topicRouteDataIsNil(topicRouteData *TopicRouteData) (isNil bool) { + if len(topicRouteData.QueueDatas) == 0 && len(topicRouteData.BrokerDatas) == 0 { + isNil = true + } + return +} + +func (m *MqClient) isNeedUpdateTopicRouteInfo(topic string) (result bool) { + for _, producer := range m.producerTable { + if !result && producer.producerGroup != "" { + result = producer.isPublishTopicNeedUpdate(topic) + } + } + + for _, consumer := range m.consumerTable { + if !result && consumer.consumerGroup != "" { + result = consumer.isSubscribeTopicNeedUpdate(topic) + } + } + return result +} + +func (m *MqClient) updateTopicRouteInfoFromNameServerKernel(topic string, isDefault bool, producer DefaultProducer) (ok bool, err error) { + var topicRouteData *TopicRouteData + if isDefault && producer.producerGroup != "" { + topicRouteData, err = m.getTopicRouteInfoFromNameServer(producer.createTopicKey, 3000*1000) + for _, data := range topicRouteData.QueueDatas { + queueNums := int32(math.Min(float64(producer.defaultTopicQueueNums), float64(data.ReadQueueNums))) + data.ReadQueueNums = queueNums + data.WriteQueueNums = queueNums + } + } else { + topicRouteData, err = m.getTopicRouteInfoFromNameServer(topic, 3000*1000) + } + + if !topicRouteDataIsNil(topicRouteData) { + old := m.topicRouteTable[topic] + changed := sliceCompare(old, topicRouteData) + + if !changed { + changed = m.isNeedUpdateTopicRouteInfo(topic) + } else { + fmt.Println("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData) + } + + if changed { + for _, bd := range topicRouteData.BrokerDatas { + m.brokerAddrTableLock.Lock() + m.brokerAddrTable[bd.BrokerName] = bd.BrokerAddrs + m.brokerAddrTableLock.Unlock() + } + + // Update Pub info + publishInfo := topicRouteData2TopicPublishInfo(topic, topicRouteData) + publishInfo.haveTopicRouterInfo = true + for _, producer := range m.producerTable { + producer.updateTopicPublishInfo(topic, publishInfo) + } + + // Update sub info + mqList := make([]*MessageQueue, 0) + for _, queueData := range topicRouteData.QueueDatas { + var i int32 + for i = 0; i < queueData.ReadQueueNums; i++ { + mq := &MessageQueue{ + topic: topic, + brokerName: queueData.BrokerName, + queueId: i, + } + mqList = append(mqList, mq) + } + } + for _, consumer := range m.consumerTable { + consumer.updateTopicSubscribeInfo(topic, mqList) + } + m.topicRouteTableLock.Lock() + m.topicRouteTable[topic] = topicRouteData + m.topicRouteTableLock.Unlock() + + return true, err + } + } + + return false, errors.New("updateTopicRouteInfoFromNameServer wrong") +} + +func topicRouteData2TopicPublishInfo(topic string, route *TopicRouteData) (publishInfo *TopicPublishInfo) { + publishInfo = NewTopicPublishInfo() + publishInfo.topicRouteData = route + if route.OrderTopicConf != "" { + brokers := strings.Split(route.OrderTopicConf, ";") + for _, borker := range brokers { + item := strings.Split(borker, ":") + nums, _ := strconv.Atoi(item[1]) + for i := int32(0); i < int32(nums); i++ { + mq := NewMessageQueue(topic, item[0], i) + publishInfo.messageQueueList = append(publishInfo.messageQueueList, mq) + } + } + publishInfo.orderTopic = true + } else { + qds := route.QueueDatas + for _, qd := range qds { + if PermName.isWritable(qd.Perm) { + var brokerData *BrokerData + for _, bd := range route.BrokerDatas { + if bd.BrokerName == qd.BrokerName { + brokerData = bd + break + } + } + + if brokerData.BrokerName == "" { + continue + } + + if _, ok := brokerData.BrokerAddrs[strconv.Itoa(MasterId)]; !ok { + continue + } + + for i := int32(0); i < qd.WriteQueueNums; i++ { + mq := NewMessageQueue(topic, qd.BrokerName, i) + publishInfo.messageQueueList = append(publishInfo.messageQueueList, mq) + } + } + } + publishInfo.orderTopic = false + } + + return +} + +type ConsumerData struct { + GroupName string + ConsumerType string + MessageModel string + ConsumeFromWhere string + SubscriptionDataSet []*SubscriptionData + UnitMode bool +} + +type HeartbeatData struct { + ClientId string + ConsumerDataSet []*ConsumerData +} + +func (m *MqClient) prepareHeartbeatData() *HeartbeatData { + heartbeatData := new(HeartbeatData) + heartbeatData.ClientId = m.clientId + heartbeatData.ConsumerDataSet = make([]*ConsumerData, 0) + for group, consumer := range m.consumerTable { + consumerData := new(ConsumerData) + consumerData.GroupName = group + consumerData.ConsumerType = consumer.consumerType + consumerData.ConsumeFromWhere = consumer.consumeFromWhere + consumerData.MessageModel = consumer.messageModel + consumerData.SubscriptionDataSet = consumer.subscriptions() + consumerData.UnitMode = consumer.unitMode + + heartbeatData.ConsumerDataSet = append(heartbeatData.ConsumerDataSet, consumerData) + } + return heartbeatData +} + +func (m *MqClient) sendHeartbeatToAllBrokerWithLock() error { + heartbeatData := m.prepareHeartbeatData() + if len(heartbeatData.ConsumerDataSet) == 0 { + return errors.New("send heartbeat error") + } + + m.brokerAddrTableLock.RLock() + for _, brokerTable := range m.brokerAddrTable { + for brokerId, addr := range brokerTable { + if addr == "" || brokerId != "0" { + continue + } + currOpaque := atomic.AddInt32(&opaque, 1) + remotingCommand := &RemotingCommand{ + Code: HeartBeat, + Language: "JAVA", + Version: 79, + Opaque: currOpaque, + Flag: 0, + } + + data, err := json.Marshal(*heartbeatData) + if err != nil { + fmt.Println(err) + return err + } + remotingCommand.Body = data + fmt.Println("send heartbeat to broker[", addr+"]") + response, err := m.remotingClient.invokeSync(addr, remotingCommand, 3000) + if err != nil { + fmt.Println(err) + } else { + if response == nil || response.Code != Success { + fmt.Println("send heartbeat response error") + } + } + } + } + m.brokerAddrTableLock.RUnlock() + return nil +} + +func (m *MqClient) startScheduledTask() { + go func() { + updateTopicRouteTimer := time.NewTimer(5 * time.Second) + for { + <-updateTopicRouteTimer.C + m.updateTopicRouteInfoFromNameServer() + updateTopicRouteTimer.Reset(5 * time.Second) + } + }() + + go func() { + heartbeatTimer := time.NewTimer(10 * time.Second) + for { + <-heartbeatTimer.C + m.sendHeartbeatToAllBrokerWithLock() + heartbeatTimer.Reset(5 * time.Second) + } + }() + + go func() { + rebalanceTimer := time.NewTimer(15 * time.Second) + for { + <-rebalanceTimer.C + m.doRebalance() + rebalanceTimer.Reset(30 * time.Second) + } + }() + + go func() { + timeoutTimer := time.NewTimer(3 * time.Second) + for { + <-timeoutTimer.C + m.remotingClient.ScanResponseTable() + timeoutTimer.Reset(time.Second) + } + }() +} + +func (m *MqClient) doRebalance() { + for _, consumer := range m.consumerTable { + consumer.doRebalance() + } +} + +func (m *MqClient) start() { + switch m.serviceState { + case CreateJust: + m.serviceState = StartFailed + m.startScheduledTask() + go m.pullMessageService.start() + if m.defaultProducer != nil { + m.defaultProducer.start(false) + } + fmt.Println("the client factory [{}] start OK", m.clientId) + m.serviceState = Running + case Running, ShutdownAlready, StartFailed: + fmt.Println("The Factory object[" + m.clientId + "] has been created before, and failed.") + } + +} + +type QueryConsumerOffsetRequestHeader struct { + ConsumerGroup string `json:"consumerGroup"` + Topic string `json:"topic"` + QueueId int32 `json:"queueId"` +} + +func (m *MqClient) queryConsumerOffset(addr string, requestHeader *QueryConsumerOffsetRequestHeader, timeoutMillis int64) (int64, error) { + currOpaque := atomic.AddInt32(&opaque, 1) + remotingCommand := &RemotingCommand{ + Code: QueryConsumerOffset, + Language: "JAVA", + Version: 79, + Opaque: currOpaque, + Flag: 0, + } + + remotingCommand.ExtFields = requestHeader + reponse, err := m.remotingClient.invokeSync(addr, remotingCommand, timeoutMillis) + + if err != nil { + fmt.Println(err) + return 0, err + } + + if reponse.Code == QueryNotFound { + return 0, nil + } + + if extFields, ok := (reponse.ExtFields).(map[string]interface{}); ok { + if offsetInter, ok := extFields["offset"]; ok { + if offsetStr, ok := offsetInter.(string); ok { + offset, err := strconv.ParseInt(offsetStr, 10, 64) + if err != nil { + fmt.Println(err) + return 0, err + } + return offset, nil + + } + } + } + fmt.Println(requestHeader, reponse) + return 0, errors.New("query offset error") +} + +func (m *MqClient) updateConsumerOffsetOneway(addr string, header *UpdateConsumerOffsetRequestHeader, timeoutMillis int64) { + + currOpaque := atomic.AddInt32(&opaque, 1) + remotingCommand := &RemotingCommand{ + Code: QueryConsumerOffset, + Language: "JAVA", + Version: 79, + Opaque: currOpaque, + Flag: 0, + ExtFields: header, + } + + m.remotingClient.invokeSync(addr, remotingCommand, timeoutMillis) +} + +func (m *MqClient) findBrokerAddressInPublish(brokerName string) string { + tmpMap := m.brokerAddrTable[brokerName] + if tmpMap != nil && len(tmpMap) != 0 { + brokerName = tmpMap[strconv.Itoa(MasterId)] + } + return brokerName +} + +func (m *MqClient) registerProducer(group string, producer *DefaultProducer) bool { + if group == "" { + return false + } + if _, err := m.producerTable[group]; err { + fmt.Println("the producer group[{}] exist already.", group) + return false + } else { + m.producerTable[group] = producer + return true + } +} diff --git a/mqclient.go b/mqclient.go deleted file mode 100644 index 05eb61d..0000000 --- a/mqclient.go +++ /dev/null @@ -1,467 +0,0 @@ -package rocketmq - -import ( - "bytes" - "encoding/json" - "errors" - "fmt" - "github.com/golang/glog" - "strconv" - "strings" - "sync" - "sync/atomic" - "time" -) - -type GetRouteInfoRequestHeader struct { - topic string -} - -func (self *GetRouteInfoRequestHeader) MarshalJSON() ([]byte, error) { - var buf bytes.Buffer - buf.WriteString("{\"topic\":\"") - buf.WriteString(self.topic) - buf.WriteString("\"}") - return buf.Bytes(), nil -} - -type QueueData struct { - BrokerName string - ReadQueueNums int32 - WriteQueueNums int32 - Perm int32 - TopicSynFlag int32 -} - -type BrokerData struct { - BrokerName string - BrokerAddrs map[string]string - BrokerAddrsLock sync.RWMutex -} - -type TopicRouteData struct { - OrderTopicConf string - QueueDatas []*QueueData - BrokerDatas []*BrokerData -} - -type MqClient struct { - clientId string - conf *Config - brokerAddrTable map[string]map[string]string //map[brokerName]map[bokerId]addrs - brokerAddrTableLock sync.RWMutex - consumerTable map[string]*DefaultConsumer - consumerTableLock sync.RWMutex - topicRouteTable map[string]*TopicRouteData - topicRouteTableLock sync.RWMutex - remotingClient RemotingClient - pullMessageService *PullMessageService -} - -func NewMqClient() *MqClient { - return &MqClient{ - brokerAddrTable: make(map[string]map[string]string), - consumerTable: make(map[string]*DefaultConsumer), - topicRouteTable: make(map[string]*TopicRouteData), - } -} -func (self *MqClient) findBrokerAddressInSubscribe(brokerName string, brokerId int64, onlyThisBroker bool) (brokerAddr string, slave bool, found bool) { - slave = false - found = false - self.brokerAddrTableLock.RLock() - brokerMap, ok := self.brokerAddrTable[brokerName] - self.brokerAddrTableLock.RUnlock() - if ok { - brokerAddr, ok = brokerMap[strconv.FormatInt(brokerId, 10)] - slave = (brokerId != 0) - found = ok - - if !found && !onlyThisBroker { - var id string - for id, brokerAddr = range brokerMap { - slave = (id != "0") - found = true - break - } - } - } - - return -} - -func (self *MqClient) findBrokerAddressInAdmin(brokerName string) (addr string, found, slave bool) { - found = false - slave = false - self.brokerAddrTableLock.RLock() - brokers, ok := self.brokerAddrTable[brokerName] - self.brokerAddrTableLock.RUnlock() - if ok { - for brokerId, addr := range brokers { - - if addr != "" { - found = true - if brokerId == "0" { - slave = false - } else { - slave = true - } - break - } - } - } - - return -} - -func (self *MqClient) findBrokerAddrByTopic(topic string) (addr string, ok bool) { - self.topicRouteTableLock.RLock() - topicRouteData, ok := self.topicRouteTable[topic] - self.topicRouteTableLock.RUnlock() - if !ok { - return "", ok - } - - brokers := topicRouteData.BrokerDatas - if brokers != nil && len(brokers) > 0 { - brokerData := brokers[0] - if ok { - brokerData.BrokerAddrsLock.RLock() - addr, ok = brokerData.BrokerAddrs["0"] - brokerData.BrokerAddrsLock.RUnlock() - - if ok { - return - } - for _, addr = range brokerData.BrokerAddrs { - return addr, ok - } - } - } - return -} -func (self *MqClient) findConsumerIdList(topic string, groupName string) ([]string, error) { - brokerAddr, ok := self.findBrokerAddrByTopic(topic) - if !ok { - err := self.updateTopicRouteInfoFromNameServerByTopic(topic) - glog.Error(err) - brokerAddr, ok = self.findBrokerAddrByTopic(topic) - } - - if ok { - return self.getConsumerIdListByGroup(brokerAddr, groupName, 3000) - } - - return nil, errors.New("can't find broker") - -} - -type GetConsumerListByGroupRequestHeader struct { - ConsumerGroup string `json:"consumerGroup"` -} - -type GetConsumerListByGroupResponseBody struct { - ConsumerIdList []string -} - -func (self *MqClient) getConsumerIdListByGroup(addr string, consumerGroup string, timeoutMillis int64) ([]string, error) { - requestHeader := new(GetConsumerListByGroupRequestHeader) - requestHeader.ConsumerGroup = consumerGroup - - currOpaque := atomic.AddInt32(&opaque, 1) - request := &RemotingCommand{ - Code: GET_CONSUMER_LIST_BY_GROUP, - Language: "JAVA", - Version: 79, - Opaque: currOpaque, - Flag: 0, - ExtFields: requestHeader, - } - - response, err := self.remotingClient.invokeSync(addr, request, timeoutMillis) - if err != nil { - glog.Error(err) - return nil, err - } - - if response.Code == SUCCESS { - getConsumerListByGroupResponseBody := new(GetConsumerListByGroupResponseBody) - bodyjson := strings.Replace(string(response.Body), "0:", "\"0\":", -1) - bodyjson = strings.Replace(bodyjson, "1:", "\"1\":", -1) - err := json.Unmarshal([]byte(bodyjson), getConsumerListByGroupResponseBody) - if err != nil { - glog.Error(err) - return nil, err - } - return getConsumerListByGroupResponseBody.ConsumerIdList, nil - } - - return nil, errors.New("getConsumerIdListByGroup error") -} - -func (self *MqClient) getTopicRouteInfoFromNameServer(topic string, timeoutMillis int64) (*TopicRouteData, error) { - requestHeader := &GetRouteInfoRequestHeader{ - topic: topic, - } - - remotingCommand := new(RemotingCommand) - remotingCommand.Code = GET_ROUTEINTO_BY_TOPIC - currOpaque := atomic.AddInt32(&opaque, 1) - remotingCommand.Opaque = currOpaque - remotingCommand.Flag = 0 - remotingCommand.Language = "JAVA" - remotingCommand.Version = 79 - - remotingCommand.ExtFields = requestHeader - response, err := self.remotingClient.invokeSync(self.conf.Nameserver, remotingCommand, timeoutMillis) - if err != nil { - return nil, err - } - if response.Code == SUCCESS { - topicRouteData := new(TopicRouteData) - bodyjson := strings.Replace(string(response.Body), ",0:", ",\"0\":", -1) - bodyjson = strings.Replace(bodyjson, ",1:", ",\"1\":", -1) - bodyjson = strings.Replace(bodyjson, "{0:", "{\"0\":", -1) - bodyjson = strings.Replace(bodyjson, "{1:", "{\"1\":", -1) - err = json.Unmarshal([]byte(bodyjson), topicRouteData) - if err != nil { - glog.Error(err) - return nil, err - } - return topicRouteData, nil - } else { - return nil, errors.New(fmt.Sprintf("get topicRouteInfo from nameServer error[code:%d,topic:%s]", response.Code, topic)) - } - -} - -func (self *MqClient) updateTopicRouteInfoFromNameServer() { - for _, consumer := range self.consumerTable { - subscriptions := consumer.subscriptions() - for _, subData := range subscriptions { - self.updateTopicRouteInfoFromNameServerByTopic(subData.Topic) - } - } -} - -func (self *MqClient) updateTopicRouteInfoFromNameServerByTopic(topic string) error { - - topicRouteData, err := self.getTopicRouteInfoFromNameServer(topic, 3000*1000) - if err != nil { - glog.Error(err) - return err - } - - for _, bd := range topicRouteData.BrokerDatas { - self.brokerAddrTableLock.Lock() - self.brokerAddrTable[bd.BrokerName] = bd.BrokerAddrs - self.brokerAddrTableLock.Unlock() - } - - mqList := make([]*MessageQueue, 0) - for _, queueData := range topicRouteData.QueueDatas { - var i int32 - for i = 0; i < queueData.ReadQueueNums; i++ { - mq := &MessageQueue{ - topic: topic, - brokerName: queueData.BrokerName, - queueId: i, - } - - mqList = append(mqList, mq) - } - } - - for _, consumer := range self.consumerTable { - consumer.updateTopicSubscribeInfo(topic, mqList) - } - self.topicRouteTableLock.Lock() - self.topicRouteTable[topic] = topicRouteData - self.topicRouteTableLock.Unlock() - - return nil -} - -type ConsumerData struct { - GroupName string - ConsumerType string - MessageModel string - ConsumeFromWhere string - SubscriptionDataSet []*SubscriptionData - UnitMode bool -} - -type HeartbeatData struct { - ClientId string - ConsumerDataSet []*ConsumerData -} - -func (self *MqClient) prepareHeartbeatData() *HeartbeatData { - heartbeatData := new(HeartbeatData) - heartbeatData.ClientId = self.clientId - heartbeatData.ConsumerDataSet = make([]*ConsumerData, 0) - for group, consumer := range self.consumerTable { - consumerData := new(ConsumerData) - consumerData.GroupName = group - consumerData.ConsumerType = consumer.consumerType - consumerData.ConsumeFromWhere = consumer.consumeFromWhere - consumerData.MessageModel = consumer.messageModel - consumerData.SubscriptionDataSet = consumer.subscriptions() - consumerData.UnitMode = consumer.unitMode - - heartbeatData.ConsumerDataSet = append(heartbeatData.ConsumerDataSet, consumerData) - } - return heartbeatData -} - -func (self *MqClient) sendHeartbeatToAllBrokerWithLock() error { - heartbeatData := self.prepareHeartbeatData() - if len(heartbeatData.ConsumerDataSet) == 0 { - return errors.New("send heartbeat error") - } - - self.brokerAddrTableLock.RLock() - for _, brokerTable := range self.brokerAddrTable { - for brokerId, addr := range brokerTable { - if addr == "" || brokerId != "0" { - continue - } - currOpaque := atomic.AddInt32(&opaque, 1) - remotingCommand := &RemotingCommand{ - Code: HEART_BEAT, - Language: "JAVA", - Version: 79, - Opaque: currOpaque, - Flag: 0, - } - - data, err := json.Marshal(*heartbeatData) - if err != nil { - glog.Error(err) - return err - } - remotingCommand.Body = data - glog.V(1).Info("send heartbeat to broker[", addr+"]") - response, err := self.remotingClient.invokeSync(addr, remotingCommand, 3000) - if err != nil { - glog.Error(err) - } else { - if response == nil || response.Code != SUCCESS { - glog.Error("send heartbeat response error") - } - } - } - } - self.brokerAddrTableLock.RUnlock() - return nil -} - -func (self *MqClient) startScheduledTask() { - go func() { - updateTopicRouteTimer := time.NewTimer(5 * time.Second) - for { - <-updateTopicRouteTimer.C - self.updateTopicRouteInfoFromNameServer() - updateTopicRouteTimer.Reset(5 * time.Second) - } - }() - - go func() { - heartbeatTimer := time.NewTimer(10 * time.Second) - for { - <-heartbeatTimer.C - self.sendHeartbeatToAllBrokerWithLock() - heartbeatTimer.Reset(5 * time.Second) - } - }() - - go func() { - rebalanceTimer := time.NewTimer(15 * time.Second) - for { - <-rebalanceTimer.C - self.doRebalance() - rebalanceTimer.Reset(30 * time.Second) - } - }() - - - go func() { - timeoutTimer := time.NewTimer(3 * time.Second) - for { - <-timeoutTimer.C - self.remotingClient.ScanResponseTable() - timeoutTimer.Reset(time.Second) - } - }() - -} - -func (self *MqClient) doRebalance() { - for _, consumer := range self.consumerTable { - consumer.doRebalance() - } -} - -func (self *MqClient) start() { - self.startScheduledTask() - go self.pullMessageService.start() -} - -type QueryConsumerOffsetRequestHeader struct { - ConsumerGroup string `json:"consumerGroup"` - Topic string `json:"topic"` - QueueId int32 `json:"queueId"` -} - -func (self *MqClient) queryConsumerOffset(addr string, requestHeader *QueryConsumerOffsetRequestHeader, timeoutMillis int64) (int64, error) { - currOpaque := atomic.AddInt32(&opaque, 1) - remotingCommand := &RemotingCommand{ - Code: QUERY_CONSUMER_OFFSET, - Language: "JAVA", - Version: 79, - Opaque: currOpaque, - Flag: 0, - } - - remotingCommand.ExtFields = requestHeader - reponse, err := self.remotingClient.invokeSync(addr, remotingCommand, timeoutMillis) - - if err != nil { - glog.Error(err) - return 0, err - } - - if reponse.Code == QUERY_NOT_FOUND { - return 0,nil - } - - if extFields, ok := (reponse.ExtFields).(map[string]interface{}); ok { - if offsetInter, ok := extFields["offset"]; ok { - if offsetStr, ok := offsetInter.(string); ok { - offset, err := strconv.ParseInt(offsetStr, 10, 64) - if err != nil { - glog.Error(err) - return 0, err - } - return offset, nil - - } - } - } - glog.Error(requestHeader,reponse) - return 0, errors.New("query offset error") -} - -func (self *MqClient) updateConsumerOffsetOneway(addr string, header *UpdateConsumerOffsetRequestHeader, timeoutMillis int64) { - - currOpaque := atomic.AddInt32(&opaque, 1) - remotingCommand := &RemotingCommand{ - Code: QUERY_CONSUMER_OFFSET, - Language: "JAVA", - Version: 79, - Opaque: currOpaque, - Flag: 0, - ExtFields: header, - } - - self.remotingClient.invokeSync(addr, remotingCommand, timeoutMillis) -} diff --git a/net.go b/net.go index 3a8f613..cb6d218 100644 --- a/net.go +++ b/net.go @@ -1,30 +1,30 @@ package rocketmq + import ( "net" "strings" ) - +// Get local IPV4 Address func GetLocalIp4() (ip string) { - interfaces,err:= net.Interfaces() + interfaces, err := net.Interfaces() if err != nil { return } - for _, face:= range interfaces{ - if strings.Contains(face.Name,"lo"){ + for _, face := range interfaces { + if strings.Contains(face.Name, "lo") { continue } - addrs,err := face.Addrs() + addrs, err := face.Addrs() if err != nil { return } - - for _,addr := range addrs { + for _, addr := range addrs { if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { if ipnet.IP.To4() != nil { currIp := ipnet.IP.String() - if !strings.Contains(currIp,":")&&currIp != "127.0.0.1" && isIntranetIpv4(currIp){ + if !strings.Contains(currIp, ":") && currIp != "127.0.0.1" && isIntranetIpv4(currIp) { ip = currIp } } @@ -35,8 +35,8 @@ func GetLocalIp4() (ip string) { return } -func isIntranetIpv4(ip string) bool{ - if strings.HasPrefix(ip,"192.168.") || strings.HasPrefix(ip,"169.254.") { +func isIntranetIpv4(ip string) bool { + if strings.HasPrefix(ip, "192.168.") || strings.HasPrefix(ip, "169.254.") { return true } return false diff --git a/net_test.go b/net_test.go index 1b3c8ac..0a9b843 100644 --- a/net_test.go +++ b/net_test.go @@ -1,6 +1,7 @@ package rocketmq + import "testing" -func TextGetLocalIp4(t testing.T) { +func TextGetLocalIp4(t testing.T) { println(GetLocalIp4()) -} \ No newline at end of file +} diff --git a/perm.go b/perm.go new file mode 100644 index 0000000..697e990 --- /dev/null +++ b/perm.go @@ -0,0 +1,52 @@ +package rocketmq + +import ( + "bytes" +) + +type permName struct { + PermPriority int + PermRead int + PermWrite int + PermInherit int +} + +var PermName = permName{ + PermPriority: 0x1 << 3, + PermRead: 0x1 << 2, + PermWrite: 0x1 << 1, + PermInherit: 0x1 << 0, +} + +func (p permName) perm2String(perm int) string { + stringBuffer := bytes.NewBuffer([]byte{}) + if PermName.isReadable(perm) { + stringBuffer.WriteString("R") + } else { + stringBuffer.WriteString("-") + } + if PermName.isWritable(perm) { + stringBuffer.WriteString("W") + } else { + stringBuffer.WriteString("-") + } + if PermName.isInherited(perm) { + stringBuffer.WriteString("X") + } else { + stringBuffer.WriteString("-") + } + + return stringBuffer.String() +} + +func (p permName) isReadable(perm int) bool { + return (perm & p.PermRead) == p.PermRead +} + +func (p permName) isWritable(perm int) bool { + return (perm & p.PermWrite) == p.PermWrite +} + +func (p permName) isInherited(perm int) bool { + return (perm & p.PermInherit) == p.PermInherit +} diff --git a/producer.go b/producer.go new file mode 100644 index 0000000..95f1fd4 --- /dev/null +++ b/producer.go @@ -0,0 +1,500 @@ +package rocketmq + +import ( + _ "fmt" + //"github.com/golang/glog" + _ "encoding/binary" + _ "encoding/json" + "errors" + "fmt" + "os" + "strconv" + "strings" + "sync/atomic" + _ "sync/atomic" + "time" +) + +type Producer interface { + Start() error + Shutdown() + Send(msg *Message) (*SendResult, error) + SendAsync(msg *Message, sendCallback SendCallback) error + SendOneway(msg *Message) error +} + +// communicationMode +const ( + Sync = iota + Async + Oneway +) + +// ServiceState +const ( + CreateJust = iota + Running + ShutdownAlready + StartFailed +) + +type SendCallback func() error + +type DefaultProducer struct { + conf *Config + producerGroup string + producerType string + + rebalance *Rebalance + remotingClient RemotingClient + mqClient *MqClient + topicPublishInfoTable map[string]*TopicPublishInfo + instanceName string + + sendMsgTimeout int64 + serviceState int + createTopicKey string + defaultTopicQueueNums int + compressMsgBodyOverHowmuch int + retryTimesWhenSendFailed int + retryTimesWhenSendAsyncFailed int + retryAnotherBrokerWhenNotStoreOK bool + maxMessageSize int + + vipChannelEnabled bool +} + +func NewDefaultProducer(producerGroup string, conf *Config) (Producer, error) { + if conf == nil { + conf = &Config{ + Namesrv: os.Getenv("ROCKETMQ_NAMESVR"), + InstanceName: "DEFAULT", + } + } + + if conf.ClientIp == "" { + conf.ClientIp = DefaultIp + } + + remotingClient := NewDefaultRemotingClient() + mqClient := NewMqClient() + pullMessageService := NewPullMessageService() + producer := &DefaultProducer{ + conf: conf, + producerGroup: producerGroup, + + remotingClient: remotingClient, + mqClient: mqClient, + topicPublishInfoTable: make(map[string]*TopicPublishInfo), + + sendMsgTimeout: 3000, + instanceName: "DEFAULT", + serviceState: CreateJust, + createTopicKey: DefaultTopic, + defaultTopicQueueNums: 4, + compressMsgBodyOverHowmuch: 1024 * 4, + retryTimesWhenSendFailed: 2, + retryTimesWhenSendAsyncFailed: 2, + retryAnotherBrokerWhenNotStoreOK: false, + maxMessageSize: 1024 * 1024 * 4, // 4M + } + + mqClient.remotingClient = remotingClient + mqClient.conf = conf + mqClient.clientId = conf.ClientIp + "@" + strconv.Itoa(os.Getpid()) + mqClient.pullMessageService = pullMessageService + mqClient.defaultProducer = producer + + pullMessageService.service = producer + + return producer, nil +} + +func (d *DefaultProducer) start(startFactory bool) (err error) { + switch d.serviceState { + case CreateJust: + d.serviceState = StartFailed + d.checkConfig() + if d.producerGroup == ClientInnerProducerGroup { + if d.instanceName == "DEFAULT" { + d.instanceName = strconv.Itoa(os.Getpid()) + } + } + + if registerOK := d.mqClient.registerProducer(d.producerGroup, d); !registerOK { + d.serviceState = CreateJust + err = errors.New("The producer group[" + d.producerGroup + "] has been created before, specify another name please.") + return + } + + topicPublishInfo := NewTopicPublishInfo() + d.topicPublishInfoTable[d.createTopicKey] = topicPublishInfo + if startFactory { + d.mqClient.start() + } + d.serviceState = Running + + case Running, ShutdownAlready, StartFailed: + err = errors.New("The producer service state not OK, maybe started once," + strconv.Itoa(d.serviceState)) + } + return +} + +func (d *DefaultProducer) Start() error { + return d.start(true) +} + +func (d *DefaultProducer) checkConfig() (err error) { + if d.producerGroup == DefaultProducerGroup { + err = errors.New("producerGroup can not equal " + DefaultProducerGroup + ", please specify another one.") + } + return +} + +func (d *DefaultProducer) makeSureStateOK() (err error) { + if d.serviceState != Running { + err = errors.New("The producer service state not OK," + strconv.Itoa(d.serviceState)) + } + return +} + +func (d *DefaultProducer) Shutdown() { +} + +func (d *DefaultProducer) Send(msg *Message) (*SendResult, error) { + return d.send(msg, Sync, nil, d.sendMsgTimeout) +} + +func (d *DefaultProducer) SendAsync(msg *Message, sendCallback SendCallback) (err error) { + _, err = d.send(msg, Async, sendCallback, d.sendMsgTimeout) + return +} + +func (d *DefaultProducer) SendOneway(msg *Message) error { + d.send(msg, Oneway, nil, d.sendMsgTimeout) + return nil +} + +func (d *DefaultProducer) send(msg *Message, communicationMode int, sendCallback SendCallback, timeout int64) (sendResult *SendResult, err error) { + if err = d.makeSureStateOK(); err != nil { + return + } + if err = msg.checkMessage(d); err != nil { + return + } + topicPublishInfo := d.tryToFindTopicPublishInfo(msg.Topic) + + // TODO handle topicPublishInfo + if topicPublishInfo.ok() { + timesTotal := 1 + if communicationMode == Sync { + timesTotal = 1 + d.getRetryTimesWhenSendFailed() + } + + var mq *MessageQueue + brokersSent := make([]string, 0) + + for times := 0; times < timesTotal; times++ { + lastBrokerName := "" + if mq == nil { + lastBrokerName = "" + } else { + lastBrokerName = mq.getBrokerName() + } + tmpmq := topicPublishInfo.selectOneMessageQueue(lastBrokerName) + if tmpmq != nil { + mq = tmpmq + brokersSent = append(brokersSent, mq.getBrokerName()) + beginTimestampPrev := time.Now().Unix() + sendResult, err = d.sendKernel(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout) + endTimestamp := time.Now().Unix() + d.updateFaultItem(mq.getBrokerName(), endTimestamp-beginTimestampPrev, false) + + switch communicationMode { + case Async: + return + case Oneway: + return + case Sync: + if sendResult.sendStatus != SendStatusOK { + if d.retryAnotherBrokerWhenNotStoreOK { + continue + } + } + return sendResult, nil + default: + } + } + } + } + + if d.remotingClient.getNameServerAddressList() == nil || len(d.remotingClient.getNameServerAddressList()) == 0 { + err = errors.New("No name server address, please set it") + } + err = errors.New("No route info of this topic," + msg.Topic) + return +} + +func (d *DefaultProducer) getRetryTimesWhenSendFailed() int { + return d.retryTimesWhenSendFailed +} + +func (d *DefaultProducer) tryToFindTopicPublishInfo(topic string) (topicPublishInfo *TopicPublishInfo) { + ok := false + if topicPublishInfo, ok = d.topicPublishInfoTable[topic]; !ok || !topicPublishInfo.ok() { + d.topicPublishInfoTable[topic] = NewTopicPublishInfo() + d.mqClient.updateTopicRouteInfoFromNameServerKernel(topic, false, DefaultProducer{}) + topicPublishInfo = d.topicPublishInfoTable[topic] + } + + // TODO handle topicPublishInfo + if !topicPublishInfo.haveTopicRouterInfo && !topicPublishInfo.ok() { + d.mqClient.updateTopicRouteInfoFromNameServerKernel(topic, true, *d) + topicPublishInfo = d.topicPublishInfoTable[topic] + } + + return +} + +func (d *DefaultProducer) sendKernel(msg *Message, mq *MessageQueue, communicationMode int, sendCallback SendCallback, + topicPublishInfo *TopicPublishInfo, timeout int64) (sendResult *SendResult, err error) { + brokerAddr := d.mqClient.findBrokerAddressInPublish(mq.getBrokerName()) + if brokerAddr == "" { + d.tryToFindTopicPublishInfo(msg.Topic) + brokerAddr = d.mqClient.findBrokerAddressInPublish(mq.getBrokerName()) + } + context := newSendMessageContext() + if brokerAddr != "" { + brokerAddr = BrokerVIPChannel(d.vipChannelEnabled, brokerAddr) + prevBody := msg.Body + + MessageClientIDSetter.setUniqID(msg) + + sysFlag := 0 + if d.tryToCompressMessage(msg) { + sysFlag |= CompressedFlag + } + + tranMsg := msg.Properties[MessageConst.PropertyTransactionPrepared] + if b, err := strconv.ParseBool(tranMsg); tranMsg != "" && err == nil && b { + sysFlag |= TransactionPreparedType + } + + if d.hasCheckForbiddenHook() { + fmt.Println(brokerAddr) + } + if d.hasSendMessageHook() { + } + + requestHeader := new(SendMessageRequestHeader) + requestHeader.ProducerGroup = d.producerGroup + requestHeader.Topic = msg.Topic + requestHeader.DefaultTopic = d.createTopicKey + requestHeader.DefaultTopicQueueNums = d.defaultTopicQueueNums + requestHeader.QueueId = mq.queueId + requestHeader.SysFlag = sysFlag + requestHeader.Properties = messageProperties2String(msg.Properties) + requestHeader.ReconsumeTimes = 0 + + if strings.HasPrefix(requestHeader.Topic, RetryGroupTopicPrefix) { + if MessageConst.PropertyReconsumeTime != "" { + requestHeader.ReconsumeTimes, _ = strconv.Atoi(MessageConst.PropertyReconsumeTime) + delete(msg.Properties, MessageConst.PropertyReconsumeTime) + } + if MessageConst.PropertyMaxReconsumeTimes != "" { + requestHeader.MaxReconsumeTimes, _ = strconv.Atoi(MessageConst.PropertyMaxReconsumeTimes) + delete(msg.Properties, MessageConst.PropertyMaxReconsumeTimes) + } + } + + switch communicationMode { + case Async: + sendResult, err = d.sendMessage( + brokerAddr, + mq.brokerName, + msg, + requestHeader, + timeout, + communicationMode, + sendCallback, + topicPublishInfo, + d.mqClient, + d.retryTimesWhenSendAsyncFailed, + context) + case Oneway, Sync: + sendResult, err = d.sendMessage( + brokerAddr, + mq.brokerName, + msg, + requestHeader, + timeout, + communicationMode, + sendCallback, + topicPublishInfo, + d.mqClient, + d.retryTimesWhenSendFailed, + context) + } + if d.hasSendMessageHook() { + if msg.Properties[MessageConst.PropertyTransactionPrepared] == "true" { + context.msgType = strconv.Itoa(TransMsgHalf) + } + if msg.Properties["__STARTDELIVERTIME"] != "" || msg.Properties[MessageConst.PropertyDelayTimeLevel] != "" { + context.msgType = strconv.Itoa(DelayMsg) + } + } + + msg.Body = prevBody + } + return +} + +func (d *DefaultProducer) tryToCompressMessage(msg *Message) bool { + return false + // TODO add compression + //body := msg.Body + //if body != nil && len(body) != 0 { + //} +} + +func (d *DefaultProducer) hasCheckForbiddenHook() bool { + return false +} + +func (d *DefaultProducer) hasSendMessageHook() bool { + return false +} + +func (d *DefaultProducer) pullMessage(pullRequest *PullRequest) { + return +} + +func (d *DefaultProducer) sendMessage(addr string, brokerName string, msg *Message, requestHeader *SendMessageRequestHeader, + timeoutMillis int64, communicationMode int, sendCallback SendCallback, topicPublishInfo *TopicPublishInfo, mqClient *MqClient, + retryTimesWhenSendFailed int, context *SendMessageContext) (sendResult *SendResult, err error) { + remotingCommand := new(RemotingCommand) + remotingCommand.Code = SendMsg + currOpaque := atomic.AddInt32(&opaque, 1) + remotingCommand.Opaque = currOpaque + remotingCommand.Flag = 0 + remotingCommand.Language = "JAVA" + remotingCommand.Version = 79 + remotingCommand.ExtFields = requestHeader + remotingCommand.Body = msg.Body + + switch communicationMode { + case Async: + times := atomic.AddInt32(&opaque, 1) + d.sendMessageAsync(addr, brokerName, msg, timeoutMillis, remotingCommand, sendCallback, topicPublishInfo, mqClient, + retryTimesWhenSendFailed, times, context, d) + case Oneway: + err = d.remotingClient.invokeOneway(addr, remotingCommand, timeoutMillis) + case Sync: + sendResult, err = d.sendMessageSync(addr, brokerName, msg, timeoutMillis, remotingCommand) + } + + return +} + +func (d *DefaultProducer) sendMessageSync(addr string, brokerName string, msg *Message, timeoutMillis int64, remotingCommand *RemotingCommand) (sendResult *SendResult, err error) { + var response *RemotingCommand + fmt.Println("msg:", msg.Topic, msg.Flag, string(msg.Body), msg.Properties) + if response, err = d.remotingClient.invokeSync(addr, remotingCommand, timeoutMillis); err != nil { + fmt.Println("sendMessageSync err", err) + } + return d.processSendResponse(brokerName, msg, response) +} + +func (d *DefaultProducer) sendMessageAsync(addr string, brokerName string, msg *Message, timeoutMillis int64, + remotingCommand *RemotingCommand, sendCallback SendCallback, topicPublishInfo *TopicPublishInfo, mqClient *MqClient, + retryTimesWhenSendFailed int, times int32, context *SendMessageContext, producer *DefaultProducer) (err error) { + invokeCallback := func(responseFuture *ResponseFuture) { + var sendResult *SendResult + responseCommand := responseFuture.responseCommand + + if sendCallback == nil && responseCommand != nil { + sendResult, err = d.processSendResponse(brokerName, msg, responseCommand) + if err != nil && context != nil && sendResult != nil { + context.sendResult = sendResult + // TODO add send hook + } + d.updateFaultItem(brokerName, time.Now().Unix()-responseFuture.beginTimestamp, false) + return + } + + sendCallback() + if responseCommand != nil { + if sendResult, err = d.processSendResponse(brokerName, msg, responseCommand); sendResult == nil || err != nil { + fmt.Println("sendResult can't be null, error ", err) + producer.updateFaultItem(brokerName, time.Now().Unix()-responseFuture.beginTimestamp, true) + return + } else { + if context != nil { + context.sendResult = sendResult + // TODO add send hook + producer.updateFaultItem(brokerName, time.Now().Unix()-responseFuture.beginTimestamp, false) + } + } + } + producer.updateFaultItem(brokerName, time.Now().Unix()-responseFuture.beginTimestamp, true) + } + err = d.remotingClient.invokeAsync(addr, remotingCommand, 1000, invokeCallback) + return +} + +func (d *DefaultProducer) processSendResponse(brokerName string, msg *Message, response *RemotingCommand) (sendResult *SendResult, err error) { + var sendStatus int + switch response.Code { + // TODO add log + case FlushDiskTimeout: + sendStatus = SendStatusFlushDiskTimeout + case FlushSlaveTimeout: + sendStatus = SendStatusFlushSlaveTimeout + case SlaveNotAvailable: + sendStatus = SendStatusSlaveNotAvailable + case Success: + sendStatus = SendStatusOK + responseHeader := response.decodeCommandCustomHeader() + messageQueue := NewMessageQueue(msg.Topic, brokerName, responseHeader.queueId) + + sendResult = NewSendResult(sendStatus, MessageClientIDSetter.getUniqID(msg), responseHeader.msgId, messageQueue, responseHeader.queueOffset) + sendResult.transactionId = responseHeader.transactionId + + pullResult, ok := response.ExtFields.(map[string]interface{}) + if ok { + if regionId, ok := pullResult[MessageConst.PropertyMsgRegion]; ok { + if regionId == nil || regionId == "" { + regionId = "DefaultRegion" + } + sendResult.regionId = regionId.(string) + } + } + return + } + err = errors.New("processSendResponse error") + + return +} + +func (d *DefaultProducer) updateFaultItem(brokerName string, currentLatency int64, isolation bool) { +} + +func (d *DefaultProducer) getPublishTopicList() (topicList []string) { + topicList = make([]string, 0) + for topic := range d.topicPublishInfoTable { + topicList = append(topicList, topic) + } + + return +} + +func (d *DefaultProducer) isPublishTopicNeedUpdate(topic string) bool { + prev := d.topicPublishInfoTable[topic] + return !prev.ok() +} + +func (d *DefaultProducer) updateTopicPublishInfo(topic string, topicPublishInfo *TopicPublishInfo) { + if topic != "" { + d.topicPublishInfoTable[topic] = topicPublishInfo + } +} diff --git a/producer_test.go b/producer_test.go new file mode 100644 index 0000000..dcf9b0b --- /dev/null +++ b/producer_test.go @@ -0,0 +1,87 @@ +package rocketmq + +import ( + "strconv" + "testing" +) + +// dev-goProducerConsumerTest +var group = "dev-goProducerConsumerTest" + +// goProducerConsumerTest +var topic = "goProducerConsumerTest" +var conf = &Config{ + //Nameserver: "192.168.7.101:9876;192.168.7.102:9876;192.168.7.103:9876", + Namesrv: "192.168.6.69:9876", + ClientIp: "192.168.23.137", + InstanceName: "DEFAULT_tt", +} + +func TestSend(t *testing.T) { + producer, err := NewDefaultProducer(group, conf) + producer.Start() + if err != nil { + t.Fatalf("NewDefaultProducer err, %s", err) + } + for i := 0; i < 3; i++ { + msg := NewMessage(topic, []byte("Hello RocketMQ "+strconv.Itoa(i))) + if sendResult, err := producer.Send(msg); err != nil { + t.Error("Sync send fail!") // 如果不是如预期的那么就报错 + t.Fatalf("err->%s", err) + } else { + t.Logf("sendResult", sendResult) + t.Logf("Sync send success, %d", i) + //t.Logf("sendResult.sendStatus", sendResult.sendStatus) + //t.Logf("sendResult.msgId", sendResult.msgId) + //t.Logf("sendResult.messageQueue", sendResult.messageQueue) + //t.Logf("sendResult.queueOffset", sendResult.queueOffset) + //t.Logf("sendResult.transactionId", sendResult.transactionId) + //t.Logf("sendResult.offsetMsgId", sendResult.offsetMsgId) + //t.Logf("sendResult.regionId", sendResult.regionId) + } + } + + t.Log("Sync send success!") +} + +func TestSendOneway(t *testing.T) { + producer, err := NewDefaultProducer(group, conf) + producer.Start() + if err != nil { + t.Fatalf("NewDefaultProducer err, %s", err) + } + for i := 0; i < 3; i++ { + msg := NewMessage(topic, []byte("Hello RocketMQ "+strconv.Itoa(i))) + if err := producer.SendOneway(msg); err != nil { + t.Error("Oneway send fail!") // 如果不是如预期的那么就报错 + t.Fatalf("err->%s", err) + } else { + t.Logf("Oneway send success, %d", i) + } + } + + t.Log("Oneway send success!") +} + +func TestSendAsync(t *testing.T) { + producer, err := NewDefaultProducer(group, conf) + producer.Start() + if err != nil { + t.Fatalf("NewDefaultProducer err, %s", err) + } + for i := 0; i < 3; i++ { + msg := NewMessage(topic, []byte("Hello RocketMQ "+strconv.Itoa(i))) + sendCallback := func() error { + t.Logf("I am callback") + return nil + } + if err := producer.SendAsync(msg, sendCallback); err != nil { + t.Error("Async send fail!") // 如果不是如预期的那么就报错 + t.Fatalf("err->%s", err) + } else { + t.Logf("Async send success, %d", i) + } + } + + t.Log("Async send success!") +} diff --git a/pull_message.go b/pull_message.go index ac3b678..ac50367 100644 --- a/pull_message.go +++ b/pull_message.go @@ -19,9 +19,13 @@ type PullMessageRequestHeader struct { SubVersion int64 `json:"subVersion"` } +type Service interface { + pullMessage(pullRequest *PullRequest) +} + type PullMessageService struct { pullRequestQueue chan *PullRequest - consumer *DefaultConsumer + service Service } func NewPullMessageService() *PullMessageService { @@ -30,9 +34,9 @@ func NewPullMessageService() *PullMessageService { } } -func (self *PullMessageService) start() { +func (p *PullMessageService) start() { for { - pullRequest := <-self.pullRequestQueue - self.consumer.pullMessage(pullRequest) + pullRequest := <-p.pullRequestQueue + p.service.pullMessage(pullRequest) } } diff --git a/rebalance.go b/rebalance.go index a7c73b0..94fa9d5 100644 --- a/rebalance.go +++ b/rebalance.go @@ -2,7 +2,8 @@ package rocketmq import ( "errors" - "github.com/golang/glog" + //"github.com/golang/glog" + "fmt" "sort" "sync" ) @@ -19,15 +20,16 @@ type Rebalance struct { groupName string messageModel string topicSubscribeInfoTable map[string][]*MessageQueue - topicSubscribeInfoTableLock sync.RWMutex + topicSubscribeInfoTableLock sync.RWMutex subscriptionInner map[string]*SubscriptionData - subscriptionInnerLock sync.RWMutex + subscriptionInnerLock sync.RWMutex mqClient *MqClient allocateMessageQueueStrategy AllocateMessageQueueStrategy consumer *DefaultConsumer + producer *DefaultProducer processQueueTable map[MessageQueue]int32 - processQueueTableLock sync.RWMutex - mutex sync.Mutex + processQueueTableLock sync.RWMutex + mutex sync.Mutex } func NewRebalance() *Rebalance { @@ -40,20 +42,20 @@ func NewRebalance() *Rebalance { } } -func (self *Rebalance) doRebalance() { - self.mutex.Lock() - defer self.mutex.Unlock() - for topic, _ := range self.subscriptionInner { - self.rebalanceByTopic(topic) +func (r *Rebalance) doRebalance() { + r.mutex.Lock() + defer r.mutex.Unlock() + for topic, _ := range r.subscriptionInner { + r.rebalanceByTopic(topic) } } type ConsumerIdSorter []string -func (self ConsumerIdSorter) Len() int { return len(self) } -func (self ConsumerIdSorter) Swap(i, j int) { self[i], self[j] = self[j], self[i] } -func (self ConsumerIdSorter) Less(i, j int) bool { - if self[i] < self[j] { +func (r ConsumerIdSorter) Len() int { return len(r) } +func (r ConsumerIdSorter) Swap(i, j int) { r[i], r[j] = r[j], r[i] } +func (r ConsumerIdSorter) Less(i, j int) bool { + if r[i] < r[j] { return true } return false @@ -64,7 +66,7 @@ type AllocateMessageQueueStrategy interface { } type AllocateMessageQueueAveragely struct{} -func (self *AllocateMessageQueueAveragely) allocate(consumerGroup string, currentCID string, mqAll []*MessageQueue, cidAll []string) ([]*MessageQueue, error) { +func (r *AllocateMessageQueueAveragely) allocate(consumerGroup string, currentCID string, mqAll []*MessageQueue, cidAll []string) ([]*MessageQueue, error) { if currentCID == "" { return nil, errors.New("currentCID is empty") } @@ -119,16 +121,16 @@ func (self *AllocateMessageQueueAveragely) allocate(consumerGroup string, curren return nil, errors.New("cant't find currentCID") } -func (self *Rebalance) rebalanceByTopic(topic string) error { - cidAll, err := self.mqClient.findConsumerIdList(topic, self.groupName) +func (r *Rebalance) rebalanceByTopic(topic string) error { + cidAll, err := r.mqClient.findConsumerIdList(topic, r.groupName) if err != nil { - glog.Error(err) + fmt.Println(err) return err } - self.topicSubscribeInfoTableLock.RLock() - mqs, ok := self.topicSubscribeInfoTable[topic] - self.topicSubscribeInfoTableLock.RUnlock() + r.topicSubscribeInfoTableLock.RLock() + mqs, ok := r.topicSubscribeInfoTable[topic] + r.topicSubscribeInfoTableLock.RUnlock() if ok && len(mqs) > 0 && len(cidAll) > 0 { var messageQueues MessageQueues = mqs var consumerIdSorter ConsumerIdSorter = cidAll @@ -137,40 +139,40 @@ func (self *Rebalance) rebalanceByTopic(topic string) error { sort.Sort(consumerIdSorter) } - allocateResult, err := self.allocateMessageQueueStrategy.allocate(self.groupName, self.mqClient.clientId, mqs, cidAll) + allocateResult, err := r.allocateMessageQueueStrategy.allocate(r.groupName, r.mqClient.clientId, mqs, cidAll) if err != nil { - glog.Error(err) + fmt.Println(err) return err } - glog.V(1).Infof("rebalance topic[%s]", topic) - self.updateProcessQueueTableInRebalance(topic, allocateResult) + fmt.Println("rebalance topic[%s]", topic) + r.updateProcessQueueTableInRebalance(topic, allocateResult) return nil } -func (self *Rebalance) updateProcessQueueTableInRebalance(topic string, mqSet []*MessageQueue) { +func (r *Rebalance) updateProcessQueueTableInRebalance(topic string, mqSet []*MessageQueue) { for _, mq := range mqSet { - self.processQueueTableLock.RLock() - _, ok := self.processQueueTable[*mq] - self.processQueueTableLock.RUnlock() + r.processQueueTableLock.RLock() + _, ok := r.processQueueTable[*mq] + r.processQueueTableLock.RUnlock() if !ok { pullRequest := new(PullRequest) - pullRequest.consumerGroup = self.groupName + pullRequest.consumerGroup = r.groupName pullRequest.messageQueue = mq - pullRequest.nextOffset = self.computePullFromWhere(mq) - self.mqClient.pullMessageService.pullRequestQueue <- pullRequest - self.processQueueTableLock.Lock() - self.processQueueTable[*mq] = 1 - self.processQueueTableLock.Unlock() + pullRequest.nextOffset = r.computePullFromWhere(mq) + r.mqClient.pullMessageService.pullRequestQueue <- pullRequest + r.processQueueTableLock.Lock() + r.processQueueTable[*mq] = 1 + r.processQueueTableLock.Unlock() } } } -func (self *Rebalance) computePullFromWhere(mq *MessageQueue) int64 { +func (r *Rebalance) computePullFromWhere(mq *MessageQueue) int64 { var result int64 = -1 - lastOffset := self.consumer.offsetStore.readOffset(mq, READ_FROM_STORE) + lastOffset := r.consumer.offsetStore.readOffset(mq, ReadFromStore) if lastOffset >= 0 { result = lastOffset diff --git a/remote_cmd.go b/remote_cmd.go index 1962960..9e8e15c 100644 --- a/remote_cmd.go +++ b/remote_cmd.go @@ -5,12 +5,13 @@ import ( "encoding/binary" "encoding/json" "log" + "strconv" "sync" ) const ( - RPC_TYPE int = 0 - RPC_ONEWAYint = 1 + RpcType = 0 + RpcOneway = 1 ) var opaque int32 @@ -23,7 +24,7 @@ var ( ) type RemotingCommand struct { - //header + // header Code int `json:"code"` Language string `json:"language"` Version int `json:"version"` @@ -31,52 +32,52 @@ type RemotingCommand struct { Flag int `json:"flag"` remark string `json:"remark"` ExtFields interface{} `json:"extFields"` - //body + // body Body []byte `json:"body,omitempty"` } -func (self *RemotingCommand) encodeHeader() []byte { +func (r *RemotingCommand) encodeHeader() []byte { length := 4 - headerData := self.buildHeader() + headerData := r.buildHeader() length += len(headerData) - if self.Body != nil { - length += len(self.Body) + if r.Body != nil { + length += len(r.Body) } buf := bytes.NewBuffer([]byte{}) binary.Write(buf, binary.BigEndian, length) - binary.Write(buf, binary.BigEndian, len(self.Body)) + binary.Write(buf, binary.BigEndian, len(r.Body)) buf.Write(headerData) return buf.Bytes() } -func (self *RemotingCommand) buildHeader() []byte { - buf, err := json.Marshal(self) +func (r *RemotingCommand) buildHeader() []byte { + buf, err := json.Marshal(r) if err != nil { return nil } return buf } -func (self *RemotingCommand) encode() []byte { +func (r *RemotingCommand) encode() []byte { length := 4 - headerData := self.buildHeader() + headerData := r.buildHeader() length += len(headerData) - if self.Body != nil { - length += len(self.Body) + if r.Body != nil { + length += len(r.Body) } buf := bytes.NewBuffer([]byte{}) binary.Write(buf, binary.LittleEndian, length) - binary.Write(buf, binary.LittleEndian, len(self.Body)) + binary.Write(buf, binary.LittleEndian, len(r.Body)) buf.Write(headerData) - if self.Body != nil { - buf.Write(self.Body) + if r.Body != nil { + buf.Write(r.Body) } return buf.Bytes() @@ -96,3 +97,21 @@ func decodeRemoteCommand(header, body []byte) *RemotingCommand { cmd.Body = body return cmd } + +func (r *RemotingCommand) decodeCommandCustomHeader() (responseHeader SendMessageResponseHeader) { + msgId := r.ExtFields.(map[string]interface{})["msgId"].(string) + queueId, _ := strconv.Atoi(r.ExtFields.(map[string]interface{})["queueId"].(string)) + queueOffset, _ := strconv.Atoi(r.ExtFields.(map[string]interface{})["queueOffset"].(string)) + responseHeader = SendMessageResponseHeader{ + msgId: msgId, + queueId: int32(queueId), + queueOffset: int64(queueOffset), + transactionId: "", + } + return +} + +func (r *RemotingCommand) markOnewayRPC() { + bits := 1 << RpcOneway + r.Flag |= bits +} diff --git a/remoting_client.go b/remoting_client.go index 027b056..d30bd11 100644 --- a/remoting_client.go +++ b/remoting_client.go @@ -5,8 +5,11 @@ import ( "encoding/binary" "encoding/json" "errors" - "github.com/golang/glog" + //"github.com/golang/glog" + "fmt" + "math/rand" "net" + "strings" "sync" "time" ) @@ -28,76 +31,114 @@ type RemotingClient interface { connect(addr string) (net.Conn, error) invokeAsync(addr string, request *RemotingCommand, timeoutMillis int64, invokeCallback InvokeCallback) error invokeSync(addr string, request *RemotingCommand, timeoutMillis int64) (*RemotingCommand, error) + invokeOneway(addr string, request *RemotingCommand, timeoutMillis int64) error ScanResponseTable() + getNameServerAddressList() []string } -type DefalutRemotingClient struct { - connTable map[string]net.Conn - connTableLock sync.RWMutex - responseTable map[int32]*ResponseFuture - responseTableLock sync.RWMutex - namesrvAddrList []string - namesrvAddrChoosed string +type DefaultRemotingClient struct { + connTable map[string]net.Conn + connTableLock sync.RWMutex + responseTable map[int32]*ResponseFuture + responseTableLock sync.RWMutex + namesrvAddrList []string + namesrvAddrChosen string } func NewDefaultRemotingClient() RemotingClient { - return &DefalutRemotingClient{ - connTable: make(map[string]net.Conn), - responseTable: make(map[int32]*ResponseFuture), + return &DefaultRemotingClient{ + connTable: make(map[string]net.Conn), + responseTable: make(map[int32]*ResponseFuture), + namesrvAddrList: make([]string, 0), } } -func (self *DefalutRemotingClient) ScanResponseTable() { - self.responseTableLock.Lock() - for seq, response := range self.responseTable { - if (response.beginTimestamp + 30) <= time.Now().Unix() { - - delete(self.responseTable, seq) - +func (d *DefaultRemotingClient) ScanResponseTable() { + d.responseTableLock.Lock() + for seq, response := range d.responseTable { + if (response.beginTimestamp + 30) <= time.Now().Unix() { + delete(d.responseTable, seq) if response.invokeCallback != nil { response.invokeCallback(nil) - glog.Warningf("remove time out request %v", response) + fmt.Printf("remove time out request %v", response) } } } - self.responseTableLock.Unlock() + d.responseTableLock.Unlock() } -func (self *DefalutRemotingClient) connect(addr string) (conn net.Conn, err error) { +func (d *DefaultRemotingClient) getAndCreateConn(addr string) (conn net.Conn, ok bool) { + // TODO optimize algorithm of getting a nameserver connection + if strings.ContainsAny(addr, ";") { + namesrvAddrList := strings.Split(addr, ";") + namesrvAddrListlen := len(namesrvAddrList) + namesrvAddr, namesrvAddrList := removeOne(rand.Intn(namesrvAddrListlen), namesrvAddrList) + + var err error + for i := 0; i < namesrvAddrListlen-1; i++ { + conn, err = d.connect(namesrvAddr) + if err != nil { + fmt.Println("getAndCreateConn error, ", err) + namesrvAddr, namesrvAddrList = removeOne(rand.Intn(namesrvAddrListlen), namesrvAddrList) + } else { + d.namesrvAddrChosen = namesrvAddr + break + } + } + addr = d.namesrvAddrChosen + } + + d.connTableLock.RLock() + conn, ok = d.connTable[addr] + d.connTableLock.RUnlock() + + return +} + +func removeOne(index int, namesrvAddrList []string) (namesrvAddr string, newNamesrvAddrList []string) { + namesrvAddrListlen := len(namesrvAddrList) + index = rand.Intn(namesrvAddrListlen) + namesrvAddr = namesrvAddrList[index] + namesrvAddrList[index] = namesrvAddrList[namesrvAddrListlen-1] + newNamesrvAddrList = namesrvAddrList[:namesrvAddrListlen-1] + + return +} + +func (d *DefaultRemotingClient) connect(addr string) (conn net.Conn, err error) { if addr == "" { - addr = self.namesrvAddrChoosed + addr = d.namesrvAddrChosen } - self.connTableLock.RLock() - conn, ok := self.connTable[addr] - self.connTableLock.RUnlock() + d.connTableLock.RLock() + conn, ok := d.connTable[addr] + d.connTableLock.RUnlock() if !ok { conn, err = net.Dial("tcp", addr) if err != nil { - glog.Error(err) + fmt.Println(err) return nil, err } - self.connTableLock.Lock() - self.connTable[addr] = conn - self.connTableLock.Unlock() - glog.Info("connect to:", addr) - go self.handlerConn(conn, addr) + d.connTableLock.Lock() + d.connTable[addr] = conn + d.connTableLock.Unlock() + fmt.Println("connect to:", addr) + go d.handlerConn(conn, addr) } return conn, nil } -func (self *DefalutRemotingClient) invokeSync(addr string, request *RemotingCommand, timeoutMillis int64) (*RemotingCommand, error) { - self.connTableLock.RLock() - conn, ok := self.connTable[addr] - self.connTableLock.RUnlock() +func (d *DefaultRemotingClient) invokeSync(addr string, request *RemotingCommand, timeoutMillis int64) (*RemotingCommand, error) { + //fmt.Println("invokeSync->", addr, request, timeoutMillis) + conn, ok := d.getAndCreateConn(addr) var err error if !ok { - conn, err = self.connect(addr) + conn, err = d.connect(addr) if err != nil { - glog.Error(err) + fmt.Println(err) return nil, err } } @@ -113,12 +154,21 @@ func (self *DefalutRemotingClient) invokeSync(addr string, request *RemotingComm header := request.encodeHeader() body := request.Body - self.responseTableLock.Lock() - self.responseTable[request.Opaque] = response - self.responseTableLock.Unlock() - err = self.sendRequest(header, body, conn, addr) + d.responseTableLock.Lock() + d.responseTable[request.Opaque] = response + d.responseTableLock.Unlock() + + //fmt.Println("------------------------------------------------------------------") + //fmt.Println("request", string(request.Body)) + //fmt.Println("header", string(header)) + //fmt.Println("conn", &conn) + //fmt.Println("addr", addr) + //fmt.Println("d.responseTable", (*d).responseTable) + //fmt.Println("------------------------------------------------------------------") + + err = d.sendRequest(header, body, conn, addr) if err != nil { - glog.Error(err) + fmt.Println("invokeSync:err", err) return nil, err } select { @@ -130,16 +180,15 @@ func (self *DefalutRemotingClient) invokeSync(addr string, request *RemotingComm } -func (self *DefalutRemotingClient) invokeAsync(addr string, request *RemotingCommand, timeoutMillis int64, invokeCallback InvokeCallback) error { - self.connTableLock.RLock() - conn, ok := self.connTable[addr] - self.connTableLock.RUnlock() +func (d *DefaultRemotingClient) invokeAsync(addr string, request *RemotingCommand, timeoutMillis int64, invokeCallback InvokeCallback) (err error) { + d.connTableLock.RLock() + conn, ok := d.connTable[addr] + d.connTableLock.RUnlock() - var err error if !ok { - conn, err = self.connect(addr) + conn, err = d.connect(addr) if err != nil { - glog.Error(err) + fmt.Println(err) return err } } @@ -152,38 +201,58 @@ func (self *DefalutRemotingClient) invokeAsync(addr string, request *RemotingCom invokeCallback: invokeCallback, } - self.responseTableLock.Lock() - self.responseTable[request.Opaque] = response - self.responseTableLock.Unlock() + d.responseTableLock.Lock() + d.responseTable[request.Opaque] = response + d.responseTableLock.Unlock() header := request.encodeHeader() body := request.Body - err = self.sendRequest(header, body, conn, addr) + err = d.sendRequest(header, body, conn, addr) if err != nil { - glog.Error(err) + fmt.Println(err) return err } return nil } -func (self *DefalutRemotingClient) handlerConn(conn net.Conn, addr string) { +func (d *DefaultRemotingClient) invokeOneway(addr string, request *RemotingCommand, timeoutMillis int64) (err error) { + d.connTableLock.RLock() + conn, ok := d.connTable[addr] + d.connTableLock.RUnlock() + + if !ok { + conn, err = d.connect(addr) + if err != nil { + fmt.Println(err) + return err + } + } + + request.markOnewayRPC() + header := request.encodeHeader() + body := request.Body + + return d.sendRequest(header, body, conn, addr) +} + +func (d *DefaultRemotingClient) handlerConn(conn net.Conn, addr string) { b := make([]byte, 1024) var length, headerLength, bodyLength int32 var buf = bytes.NewBuffer([]byte{}) var header, body []byte - var flag int = 0 + var flag = 0 for { n, err := conn.Read(b) if err != nil { - self.releaseConn(addr, conn) - glog.Error(err, addr) + d.releaseConn(addr, conn) + fmt.Println(err, addr) return } _, err = buf.Write(b[:n]) if err != nil { - self.releaseConn(addr, conn) + d.releaseConn(addr, conn) return } @@ -192,7 +261,7 @@ func (self *DefalutRemotingClient) handlerConn(conn net.Conn, addr string) { if buf.Len() >= 4 { err = binary.Read(buf, binary.BigEndian, &length) if err != nil { - glog.Error(err) + fmt.Println(err) return } flag = 1 @@ -205,7 +274,7 @@ func (self *DefalutRemotingClient) handlerConn(conn net.Conn, addr string) { if buf.Len() >= 4 { err = binary.Read(buf, binary.BigEndian, &headerLength) if err != nil { - glog.Error(err) + fmt.Println(err) return } flag = 2 @@ -220,7 +289,7 @@ func (self *DefalutRemotingClient) handlerConn(conn net.Conn, addr string) { header = make([]byte, headerLength) _, err = buf.Read(header) if err != nil { - glog.Error(err) + fmt.Println(err) return } flag = 3 @@ -239,7 +308,7 @@ func (self *DefalutRemotingClient) handlerConn(conn net.Conn, addr string) { body = make([]byte, int(bodyLength)) _, err = buf.Read(body) if err != nil { - glog.Error(err) + fmt.Println(err) return } flag = 0 @@ -256,13 +325,13 @@ func (self *DefalutRemotingClient) handlerConn(conn net.Conn, addr string) { copy(bodyCopy, body) go func() { cmd := decodeRemoteCommand(headerCopy, bodyCopy) - self.responseTableLock.RLock() - response, ok := self.responseTable[cmd.Opaque] - self.responseTableLock.RUnlock() + d.responseTableLock.RLock() + response, ok := d.responseTable[cmd.Opaque] + d.responseTableLock.RUnlock() - self.responseTableLock.Lock() - delete(self.responseTable, cmd.Opaque) - self.responseTableLock.Unlock() + d.responseTableLock.Lock() + delete(d.responseTable, cmd.Opaque) + d.responseTableLock.Unlock() if ok { response.responseCommand = cmd @@ -274,15 +343,15 @@ func (self *DefalutRemotingClient) handlerConn(conn net.Conn, addr string) { response.done <- true } } else { - if cmd.Code == NOTIFY_CONSUMER_IDS_CHANGED { + if cmd.Code == NotifyConsumerIdsChanged { return } jsonCmd, err := json.Marshal(cmd) if err != nil { - glog.Error(err) + fmt.Println(err) } - glog.Error(string(jsonCmd)) + fmt.Println(string(jsonCmd)) } }() } @@ -291,28 +360,28 @@ func (self *DefalutRemotingClient) handlerConn(conn net.Conn, addr string) { } } -func (self *DefalutRemotingClient) sendRequest(header, body []byte, conn net.Conn, addr string) error { +func (d *DefaultRemotingClient) sendRequest(header, body []byte, conn net.Conn, addr string) error { buf := bytes.NewBuffer([]byte{}) - binary.Write(buf, binary.BigEndian, int32(len(header) + len(body) + 4)) + binary.Write(buf, binary.BigEndian, int32(len(header)+len(body)+4)) binary.Write(buf, binary.BigEndian, int32(len(header))) _, err := conn.Write(buf.Bytes()) if err != nil { - self.releaseConn(addr, conn) + d.releaseConn(addr, conn) return err } _, err = conn.Write(header) if err != nil { - self.releaseConn(addr, conn) + d.releaseConn(addr, conn) return err } if body != nil && len(body) > 0 { _, err = conn.Write(body) if err != nil { - self.releaseConn(addr, conn) + d.releaseConn(addr, conn) return err } } @@ -320,9 +389,13 @@ func (self *DefalutRemotingClient) sendRequest(header, body []byte, conn net.Con return nil } -func (self *DefalutRemotingClient) releaseConn(addr string, conn net.Conn) { +func (d *DefaultRemotingClient) releaseConn(addr string, conn net.Conn) { conn.Close() - self.connTableLock.Lock() - delete(self.connTable, addr) - self.connTableLock.Unlock() + d.connTableLock.Lock() + delete(d.connTable, addr) + d.connTableLock.Unlock() +} + +func (d *DefaultRemotingClient) getNameServerAddressList() []string { + return d.namesrvAddrList } diff --git a/request_code.go b/request_code.go index 8ce3931..d61e907 100644 --- a/request_code.go +++ b/request_code.go @@ -2,171 +2,171 @@ package rocketmq const ( // Broker 发送消息 - SEND_MESSAGE = 10 + SendMsg = 10 // Broker 订阅消息 - PULL_MESSAGE = 11 + PullMsg = 11 // Broker 查询消息 - QUERY_MESSAGE = 12 + QueryMESSAGE = 12 // Broker 查询Broker Offset - QUERY_BROKER_OFFSET = 13 + QueryBrokerOffset = 13 // Broker 查询Consumer Offset - QUERY_CONSUMER_OFFSET = 14 + QueryConsumerOffset = 14 // Broker 更新Consumer Offset - UPDATE_CONSUMER_OFFSET = 15 + UpdateCconsumerOffset = 15 // Broker 更新或者增加一个Topic - UPDATE_AND_CREATE_TOPIC = 17 + UpdateAndCreateTopic = 17 // Broker 获取所有Topic的配置(Slave和Namesrv都会向Master请求此配置) - GET_ALL_TOPIC_CONFIG = 21 + GetAllTopicConfig = 21 // Broker 获取所有Topic配置(Slave和Namesrv都会向Master请求此配置) - GET_TOPIC_CONFIG_LIST = 22 + GetTopicConfigList = 22 // Broker 获取所有Topic名称列表 - GET_TOPIC_NAME_LIST = 23 + GetTopicNameList = 23 // Broker 更新Broker上的配置 - UPDATE_BROKER_CONFIG = 25 + UpdateBrokerConfig = 25 // Broker 获取Broker上的配置 - GET_BROKER_CONFIG = 26 + GetBrokerConfig = 26 // Broker 触发Broker删除文件 - TRIGGER_DELETE_FILES = 27 + TriggerDeleteFILES = 27 // Broker 获取Broker运行时信息 - GET_BROKER_RUNTIME_INFO = 28 + GetBrokerRuntimeInfo = 28 // Broker 根据时间查询队列的Offset - SEARCH_OFFSET_BY_TIMESTAMP = 29 + SearchOffsetByTimeStamp = 29 // Broker 查询队列最大Offset - GET_MAX_OFFSET = 30 + GetMaxOffset = 30 // Broker 查询队列最小Offset - GET_MIN_OFFSET = 31 + GetMinOffset = 31 // Broker 查询队列最早消息对应时间 - GET_EARLIEST_MSG_STORETIME = 32 + GetEarliestMsgStoreTime = 32 // Broker 根据消息ID来查询消息 - VIEW_MESSAGE_BY_ID = 33 + ViewMsgById = 33 // Broker Client向Client发送心跳,并注册自身 - HEART_BEAT = 34 + HeartBeat = 34 // Broker Client注销 - UNREGISTER_CLIENT = 35 + UnregisterClient = 35 // Broker Consumer将处理不了的消息发回服务器 - CONSUMER_SEND_MSG_BACK = 36 + CconsumerSendMsgBack = 36 // Broker Commit或者Rollback事务 - END_TRANSACTION = 37 + EndTransaction = 37 // Broker 获取ConsumerId列表通过GroupName - GET_CONSUMER_LIST_BY_GROUP = 38 + GetConsumerListByGroup = 38 // Broker 主动向Producer回查事务状态 - CHECK_TRANSACTION_STATE = 39 + CheckTransactionState = 39 // Broker Broker通知Consumer列表变化 - NOTIFY_CONSUMER_IDS_CHANGED = 40 + NotifyConsumerIdsChanged = 40 // Broker Consumer向Master锁定队列 - LOCK_BATCH_MQ = 41 + LockBatchMq = 41 // Broker Consumer向Master解锁队列 - UNLOCK_BATCH_MQ = 42 + UNLockBatchMq = 42 // Broker 获取所有Consumer Offset - GET_ALL_CONSUMER_OFFSET = 43 + GetAllCconsumerOffset = 43 // Broker 获取所有定时进度 - GET_ALL_DELAY_OFFSET = 45 + GetAllDelayOffset = 45 // Namesrv 向Namesrv追加KV配置 - PUT_KV_CONFIG = 100 + PutKVConfig = 100 // Namesrv 从Namesrv获取KV配置 - GET_KV_CONFIG = 101 + GetKVConfig = 101 // Namesrv 从Namesrv获取KV配置 - DELETE_KV_CONFIG = 102 + DeleteKVConfig = 102 // Namesrv 注册一个Broker,数据都是持久化的,如果存在则覆盖配置 - REGISTER_BROKER = 103 + RegisterBroker = 103 // Namesrv 卸载一个Broker,数据都是持久化的 - UNREGISTER_BROKER = 104 + UnregisterBroker = 104 // Namesrv 根据Topic获取Broker Name、队列数(包含读队列与写队列) - GET_ROUTEINTO_BY_TOPIC = 105 + GetRouteinfoByTopic = 105 // Namesrv 获取注册到Name Server的所有Broker集群信息 - GET_BROKER_CLUSTER_INFO = 106 - UPDATE_AND_CREATE_SUBSCRIPTIONGROUP = 200 - GET_ALL_SUBSCRIPTIONGROUP_CONFIG = 201 - GET_TOPIC_STATS_INFO = 202 - GET_CONSUMER_CONNECTION_LIST = 203 - GET_PRODUCER_CONNECTION_LIST = 204 - WIPE_WRITE_PERM_OF_BROKER = 205 + GetBrokerClusterInfo = 106 + UpdateAndCreateSubscriptionGroup = 200 + GetAllSubscriptionGroupConfig = 201 + GetTopicStatsInfo = 202 + GetConsumerConnList = 203 + GetProducerConnList = 204 + WipeWritePermOfBroker = 205 // 从Name Server获取完整Topic列表 - GET_ALL_TOPIC_LIST_FROM_NAMESERVER = 206 + GetAllTopicListFromNamesrv = 206 // 从Broker删除订阅组 - DELETE_SUBSCRIPTIONGROUP = 207 + DeleteSubscriptionGroup = 207 // 从Broker获取消费状态(进度) - GET_CONSUME_STATS = 208 + GetConsumeStats = 208 // Suspend Consumer消费过程 - SUSPEND_CONSUMER = 209 + SuspendConsumer = 209 // Resume Consumer消费过程 - RESUME_CONSUMER = 210 + ResumeConsumer = 210 // 重置Consumer Offset - RESET_CONSUMER_OFFSET_IN_CONSUMER = 211 + ResetCconsumerOffsetInConsumer = 211 // 重置Consumer Offset - RESET_CONSUMER_OFFSET_IN_BROKER = 212 + ResetCconsumerOffsetInBroker = 212 // 调整Consumer线程池数量 - ADJUST_CONSUMER_THREAD_POOL = 213 + AdjustCconsumerThreadPoolPOOL = 213 // 查询消息被哪些消费组消费 - WHO_CONSUME_THE_MESSAGE = 214 + WhoConsumeTHE_MESSAGE = 214 // 从Broker删除Topic配置 - DELETE_TOPIC_IN_BROKER = 215 + DeleteTopicInBroker = 215 // 从Namesrv删除Topic配置 - DELETE_TOPIC_IN_NAMESRV = 216 + DeleteTopicInNamesrv = 216 // Namesrv 通过 project 获取所有的 server ip 信息 - GET_KV_CONFIG_BY_VALUE = 217 + GetKvConfigByValue = 217 // Namesrv 删除指定 project group 下的所有 server ip 信息 - DELETE_KV_CONFIG_BY_VALUE = 218 + DeleteKvConfigByValue = 218 // 通过NameSpace获取所有的KV List - GET_KVLIST_BY_NAMESPACE = 219 + GetKvlistByNamespace = 219 // offset 重置 - RESET_CONSUMER_CLIENT_OFFSET = 220 + ResetCconsumerClientOffset = 220 // 客户端订阅消息 - GET_CONSUMER_STATUS_FROM_CLIENT = 221 + GetCconsumerStatusFromClient = 221 // 通知 broker 调用 offset 重置处理 - INVOKE_BROKER_TO_RESET_OFFSET = 222 + InvokeBrokerToResetOffset = 222 // 通知 broker 调用客户端订阅消息处理 - INVOKE_BROKER_TO_GET_CONSUMER_STATUS = 223 + InvokeBrokerToGetCconsumerSTATUS = 223 // Broker 查询topic被谁消费 // 2014-03-21 Add By shijia - QUERY_TOPIC_CONSUME_BY_WHO = 300 + QueryTopicConsumeByWho = 300 // 获取指定集群下的所有 topic // 2014-03-26 - GET_TOPICS_BY_CLUSTER = 224 + GetTopicsByCluster = 224 // 向Broker注册Filter Server // 2014-04-06 Add By shijia - REGISTER_FILTER_SERVER = 301 + RegisterFilterServer = 301 // 向Filter Server注册Class // 2014-04-06 Add By shijia - REGISTER_MESSAGE_FILTER_CLASS = 302 + RegisterMsgFilterClass = 302 // 根据 topic 和 group 获取消息的时间跨度 - QUERY_CONSUME_TIME_SPAN = 303 + QueryConsumeTimeSpan = 303 // 获取所有系统内置 Topic 列表 - GET_SYSTEM_TOPIC_LIST_FROM_NS = 304 - GET_SYSTEM_TOPIC_LIST_FROM_BROKER = 305 + GetSysTopicListFromNS = 304 + GetSysTopicListFromBroker = 305 // 清理失效队列 - CLEAN_EXPIRED_CONSUMEQUEUE = 306 + CleanExpiredConsumequeue = 306 // 通过Broker查询Consumer内存数据 // 2014-07-19 Add By shijia - GET_CONSUMER_RUNNING_INFO = 307 + GetCconsumerRunningInfo = 307 // 查找被修正 offset (转发组件) - QUERY_CORRECTION_OFFSET = 308 + QueryCorrectionOffset = 308 // 通过Broker直接向某个Consumer发送一条消息,并立刻消费,返回结果给broker,再返回给调用方 // 2014-08-11 Add By shijia - CONSUME_MESSAGE_DIRECTLY = 309 + ConsumeMsgDirectly = 309 // Broker 发送消息,优化网络数据包 - SEND_MESSAGE_V2 = 310 + SendMsgV2 = 310 // 单元化相关 topic - GET_UNIT_TOPIC_LIST = 311 + GetUnitTopicList = 311 // 获取含有单元化订阅组的 Topic 列表 - GET_HAS_UNIT_SUB_TOPIC_LIST = 312 + GetHasUnitSubTopicList = 312 // 获取含有单元化订阅组的非单元化 Topic 列表 - GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST = 313 + GetHasUnitSubUnunitTopicList = 313 // 克隆某一个组的消费进度到新的组 - CLONE_GROUP_OFFSET = 314 + CloneGroupOffset = 314 // 查看Broker上的各种统计信息 - VIEW_BROKER_STATS_DATA = 315 + ViewBrokerStatsData = 315 ) diff --git a/response_code.go b/response_code.go index fa1a1f4..d93b3ba 100644 --- a/response_code.go +++ b/response_code.go @@ -2,66 +2,63 @@ package rocketmq const ( // 成功 - SUCCESS = 0 + Success = 0 // 发生了未捕获异常 - SYSTEM_ERROR = 1 + SysError = 1 // 由于线程池拥堵,系统繁忙 - SYSTEM_BUSY = 2 + SysBusy = 2 // 请求代码不支持 - REQUEST_CODE_NOT_SUPPORTED = 3 + RequestCodeNotSupported = 3 //事务失败,添加db失败 - TRANSACTION_FAILED = 4 + TransactionFailed = 4 // Broker 刷盘超时 - FLUSH_DISK_TIMEOUT = 10 + FlushDiskTimeout = 10 // Broker 同步双写,Slave不可用 - SLAVE_NOT_AVAILABLE = 11 + SlaveNotAvailable = 11 // Broker 同步双写,等待Slave应答超时 - FLUSH_SLAVE_TIMEOUT = 12 + FlushSlaveTimeout = 12 // Broker 消息非法 - MESSAGE_ILLEGAL = 13 + MsgIllegal = 13 // Broker, Namesrv 服务不可用,可能是正在关闭或者权限问题 - SERVICE_NOT_AVAILABLE = 14 + ServiceNotAvailable = 14 // Broker, Namesrv 版本号不支持 - VERSION_NOT_SUPPORTED = 15 + VersionNotSupported = 15 // Broker, Namesrv 无权限执行此操作,可能是发、收、或者其他操作 - NO_PERMISSION = 16 + NoPermission = 16 // Broker, Topic不存在 - TOPIC_NOT_EXIST = 17 + TopicNotExist = 17 // Broker, Topic已经存在,创建Topic - TOPIC_EXIST_ALREADY = 18 + TopicExistAlready = 18 // Broker 拉消息未找到(请求的Offset等于最大Offset,最大Offset无对应消息) - PULL_NOT_FOUND = 19 + PullNotFound = 19 // Broker 可能被过滤,或者误通知等 - PULL_RETRY_IMMEDIATELY = 20 + PullRetryImmediately = 20 // Broker 拉消息请求的Offset不合法,太小或太大 - PULL_OFFSET_MOVED = 21 + PullOffsetMoved = 21 // Broker 查询消息未找到 - QUERY_NOT_FOUND = 22 + QueryNotFound = 22 // Broker 订阅关系解析失败 - SUBSCRIPTION_PARSE_FAILED = 23 + SubscriptionParseFailed = 23 // Broker 订阅关系不存在 - SUBSCRIPTION_NOT_EXIST = 24 + SubscriptionNotExist = 24 // Broker 订阅关系不是最新的 - SUBSCRIPTION_NOT_LATEST = 25 + SubscriptionNotLatest = 25 // Broker 订阅组不存在 - SUBSCRIPTION_GROUP_NOT_EXIST = 26 + SubscriptionGroupNotExist = 26 // Producer 事务应该被提交 - TRANSACTION_SHOULD_COMMIT = 200 + TransactionShouldCommit = 200 // Producer 事务应该被回滚 - TRANSACTION_SHOULD_ROLLBACK = 201 + TransactionShouldRollback = 201 // Producer 事务状态未知 - TRANSACTION_STATE_UNKNOW = 202 + TransactionStateUnknow = 202 // Producer ProducerGroup错误 - TRANSACTION_STATE_GROUP_WRONG = 203 + TransactionStateGroupWrong = 203 // 单元化消息,需要设置 buyerId - NO_BUYER_ID = 204 - + NoBuyerId = 204 // 单元化消息,非本单元消息 - NOT_IN_CURRENT_UNIT = 205 - + NotInCurrentUint = 205 // Consumer不在线 - CONSUMER_NOT_ONLINE = 206 - + ConsumerNotOnline = 206 // Consumer消费消息超时 - CONSUME_MSG_TIMEOUT = 207 + ConsumeMsgTimeout = 207 ) diff --git a/send_message.go b/send_message.go new file mode 100644 index 0000000..6f160f5 --- /dev/null +++ b/send_message.go @@ -0,0 +1,40 @@ +package rocketmq + +type SendRequest struct { + producerGroup string + messageQueue *MessageQueue + nextOffset int64 +} + +type SendMessageRequestHeader struct { + ProducerGroup string `json:"producerGroup"` + Topic string `json:"topic"` + DefaultTopic string `json:"defaultTopic"` + DefaultTopicQueueNums int `json:"defaultTopicQueueNums"` + QueueId int32 `json:"queueId"` + SysFlag int `json:"sysFlag"` + BornTimestamp int64 `json:"bornTimestamp"` + Flag int32 `json:"flag"` + Properties string `json:"properties"` + ReconsumeTimes int `json:"reconsumeTimes"` + UnitMode bool `json:"unitMode"` + MaxReconsumeTimes int `json:"maxReconsumeTimes"` +} + +type SendMessageService struct { + pushRequestQueue chan *SendRequest + producer *DefaultProducer +} + +func NewSendMessageService() *SendMessageService { + return &SendMessageService{ + pushRequestQueue: make(chan *SendRequest, 1024), + } +} + +func (s *SendMessageService) start() { + //for { + // pushRequest := <-self.pushRequestQueue + // self.producer.sendMessage(pushRequest) + //} +} diff --git a/send_message_context.go b/send_message_context.go new file mode 100644 index 0000000..594ca61 --- /dev/null +++ b/send_message_context.go @@ -0,0 +1,30 @@ +package rocketmq + +const ( + NormalMsg = iota + TransMsgHalf + TransMsgCommit + DelayMsg +) + +type SendMessageContext struct { + producerGroup string + Message Message + mq MessageQueue + brokerAddr string + bornHost string + communicationMode string + sendResult *SendResult + props map[string]string + producer Producer + msgType string +} + +func newSendMessageContext() *SendMessageContext { + return &SendMessageContext{} +} + +// TODO add decodeMessage +func (s *SendMessageContext) decodeMessage(data []byte) (messageExt []*MessageExt) { + return messageExt +} diff --git a/send_message_response_header.go b/send_message_response_header.go new file mode 100644 index 0000000..8620d14 --- /dev/null +++ b/send_message_response_header.go @@ -0,0 +1,8 @@ +package rocketmq + +type SendMessageResponseHeader struct { + msgId string + queueId int32 + queueOffset int64 + transactionId string +} diff --git a/send_result.go b/send_result.go new file mode 100644 index 0000000..c61462d --- /dev/null +++ b/send_result.go @@ -0,0 +1,33 @@ +package rocketmq + +const ( + SendStatusOK = iota + SendStatusFlushDiskTimeout + SendStatusFlushSlaveTimeout + SendStatusSlaveNotAvailable +) + +type SendResult struct { + sendStatus int + msgId string + messageQueue *MessageQueue + queueOffset int64 + transactionId string + offsetMsgId string + regionId string +} + +func NewSendResult(sendStatus int, msgId string, offsetMsgId string, messageQueue *MessageQueue, queueOffset int64) *SendResult { + return &SendResult{ + sendStatus: sendStatus, + msgId: msgId, + offsetMsgId: offsetMsgId, + messageQueue: messageQueue, + queueOffset: queueOffset, + } +} + +func (s *SendResult) SendResult(SendStatus int, msgId string, messageQueue MessageQueue, queueOffset uint64, + transactionId string, offsetMsgId string, regionId string) (ok bool) { + return +} diff --git a/store.go b/store.go index 672ef1e..304c31c 100644 --- a/store.go +++ b/store.go @@ -2,15 +2,16 @@ package rocketmq import ( "errors" - "github.com/golang/glog" + //"github.com/golang/glog" + "fmt" "sync" "sync/atomic" ) const ( - MEMORY_FIRST_THEN_STORE = 0 - READ_FROM_MEMORY = 1 - READ_FROM_STORE = 2 + MemoryFirstThenStore = 0 + ReadFromMemory = 1 + ReadFromStore = 2 ) type OffsetStore interface { @@ -23,33 +24,33 @@ type OffsetStore interface { //cloneOffsetTable(topic string) map[MessageQueue]int64 } type RemoteOffsetStore struct { - groupName string - mqClient *MqClient - offsetTable map[MessageQueue]int64 + groupName string + mqClient *MqClient + offsetTable map[MessageQueue]int64 offsetTableLock sync.RWMutex } -func (self *RemoteOffsetStore) readOffset(mq *MessageQueue, readType int) int64 { +func (r *RemoteOffsetStore) readOffset(mq *MessageQueue, readType int) int64 { switch readType { - case MEMORY_FIRST_THEN_STORE: - case READ_FROM_MEMORY: - self.offsetTableLock.RLock() - offset, ok := self.offsetTable[*mq] - self.offsetTableLock.RUnlock() + case MemoryFirstThenStore: + case ReadFromMemory: + r.offsetTableLock.RLock() + offset, ok := r.offsetTable[*mq] + r.offsetTableLock.RUnlock() if ok { return offset - } else if readType == READ_FROM_MEMORY { + } else if readType == ReadFromMemory { return -1 } - case READ_FROM_STORE: - offset, err := self.fetchConsumeOffsetFromBroker(mq) + case ReadFromStore: + offset, err := r.fetchConsumeOffsetFromBroker(mq) if err != nil { - glog.Error(err) + fmt.Println(err) return -1 } - self.updateOffset(mq, offset, false) + r.updateOffset(mq, offset, false) return offset } @@ -57,33 +58,33 @@ func (self *RemoteOffsetStore) readOffset(mq *MessageQueue, readType int) int64 } -func (self *RemoteOffsetStore) fetchConsumeOffsetFromBroker(mq *MessageQueue) (int64, error) { - brokerAddr, _, found := self.mqClient.findBrokerAddressInSubscribe(mq.brokerName, 0, false) +func (r *RemoteOffsetStore) fetchConsumeOffsetFromBroker(mq *MessageQueue) (int64, error) { + brokerAddr, _, found := r.mqClient.findBrokerAddressInSubscribe(mq.brokerName, 0, false) if !found { - if err := self.mqClient.updateTopicRouteInfoFromNameServerByTopic(mq.topic); err != nil { + if _, err := r.mqClient.updateTopicRouteInfoFromNameServerKernel(mq.topic, false, DefaultProducer{}); err != nil { return 0, err } - brokerAddr, _, found = self.mqClient.findBrokerAddressInSubscribe(mq.brokerName, 0, false) + brokerAddr, _, found = r.mqClient.findBrokerAddressInSubscribe(mq.brokerName, 0, false) } if found { requestHeader := &QueryConsumerOffsetRequestHeader{} requestHeader.Topic = mq.topic requestHeader.QueueId = mq.queueId - requestHeader.ConsumerGroup = self.groupName - return self.mqClient.queryConsumerOffset(brokerAddr, requestHeader, 3000) + requestHeader.ConsumerGroup = r.groupName + return r.mqClient.queryConsumerOffset(brokerAddr, requestHeader, 3000) } return 0, errors.New("fetch consumer offset error") } -func (self *RemoteOffsetStore) persist(mq *MessageQueue) { - offset, ok := self.offsetTable[*mq] +func (r *RemoteOffsetStore) persist(mq *MessageQueue) { + offset, ok := r.offsetTable[*mq] if ok { - err := self.updateConsumeOffsetToBroker(mq, offset) + err := r.updateConsumeOffsetToBroker(mq, offset) if err != nil { - glog.Error(err) + fmt.Println(err) } } } @@ -95,48 +96,48 @@ type UpdateConsumerOffsetRequestHeader struct { commitOffset int64 } -func (self *RemoteOffsetStore) updateConsumeOffsetToBroker(mq *MessageQueue, offset int64) error { - addr, found, _ := self.mqClient.findBrokerAddressInSubscribe(mq.brokerName, 0, false) +func (r *RemoteOffsetStore) updateConsumeOffsetToBroker(mq *MessageQueue, offset int64) error { + addr, found, _ := r.mqClient.findBrokerAddressInSubscribe(mq.brokerName, 0, false) if !found { - if err := self.mqClient.updateTopicRouteInfoFromNameServerByTopic(mq.topic); err != nil { + if _, err := r.mqClient.updateTopicRouteInfoFromNameServerKernel(mq.topic, false, DefaultProducer{}); err != nil { return err } - addr, found, _ = self.mqClient.findBrokerAddressInSubscribe(mq.brokerName, 0, false) + addr, found, _ = r.mqClient.findBrokerAddressInSubscribe(mq.brokerName, 0, false) } if found { requestHeader := &UpdateConsumerOffsetRequestHeader{ - consumerGroup: self.groupName, + consumerGroup: r.groupName, topic: mq.topic, queueId: mq.queueId, commitOffset: offset, } - self.mqClient.updateConsumerOffsetOneway(addr, requestHeader, 5*1000) + r.mqClient.updateConsumerOffsetOneway(addr, requestHeader, 5*1000) return nil } return errors.New("not found broker") } -func (self *RemoteOffsetStore) updateOffset(mq *MessageQueue, offset int64, increaseOnly bool) { +func (r *RemoteOffsetStore) updateOffset(mq *MessageQueue, offset int64, increaseOnly bool) { if mq != nil { - self.offsetTableLock.RLock() - offsetOld, ok := self.offsetTable[*mq] - self.offsetTableLock.RUnlock() + r.offsetTableLock.RLock() + offsetOld, ok := r.offsetTable[*mq] + r.offsetTableLock.RUnlock() if !ok { - self.offsetTableLock.Lock() - self.offsetTable[*mq] = offset - self.offsetTableLock.Unlock() + r.offsetTableLock.Lock() + r.offsetTable[*mq] = offset + r.offsetTableLock.Unlock() } else { if increaseOnly { atomic.AddInt64(&offsetOld, offset) - self.offsetTableLock.Lock() - self.offsetTable[*mq] = offsetOld - self.offsetTableLock.Unlock() + r.offsetTableLock.Lock() + r.offsetTable[*mq] = offsetOld + r.offsetTableLock.Unlock() } else { - self.offsetTableLock.Lock() - self.offsetTable[*mq] = offset - self.offsetTableLock.Unlock() + r.offsetTableLock.Lock() + r.offsetTable[*mq] = offset + r.offsetTableLock.Unlock() } } diff --git a/topic_publish_info.go b/topic_publish_info.go new file mode 100644 index 0000000..87a39ca --- /dev/null +++ b/topic_publish_info.go @@ -0,0 +1,39 @@ +package rocketmq + +import ( + "math/rand" +) + +type TopicPublishInfo struct { + orderTopic bool + haveTopicRouterInfo bool + messageQueueList []*MessageQueue + topicRouteData *TopicRouteData +} + +func NewTopicPublishInfo() *TopicPublishInfo { + return &TopicPublishInfo{} +} + +func (t *TopicPublishInfo) ok() (ok bool) { + if len(t.messageQueueList) != 0 { + ok = true + } + return +} + +func (t *TopicPublishInfo) selectOneMessageQueue(lastBrokerName string) (messageQueue *MessageQueue) { + // TODO add sendLatencyFaultEnable trigger and handle it + // TODO optimize algorithm of getting a queue + mqCnt := len(t.messageQueueList) + messageQueue = t.messageQueueList[rand.Intn(mqCnt)] + if lastBrokerName != "" { + for i := 0; i < mqCnt; i++ { + if lastBrokerName == t.messageQueueList[i].brokerName { + messageQueue = t.messageQueueList[i] + return + } + } + } + return +}