-
Notifications
You must be signed in to change notification settings - Fork 311
+ FIX : add the ping/ack msg to session.Pingack #40
base: master
Are you sure you want to change the base?
Conversation
@@ -191,6 +191,12 @@ func (this *service) processIncoming(msg message.Message) error { | |||
func (this *service) processAcked(ackq *sessions.Ackqueue) { | |||
for _, ackmsg := range ackq.Acked() { | |||
// Let's get the messages from the saved message byte slices. | |||
|
|||
if len(ackmsg.Msgbuf) == 0 || len(ackmsg.Ackbuf) == 0 { | |||
glog.Errorf("process/processAcked: Unable to decode new %s as ackmsg Msgbuf buf is %v or ackmsg Ackbuf buf is %v", ackmsg.Mtype, ackmsg.Msgbuf, ackmsg.Ackbuf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi there @ljy2010a! Thanks so much for this PR, just applied it on my fork of this repo. However keeping the server alive via PINGREQs seems to throw this error a lot. From what I can tell since it's a ping this is not a problem, correct? Figured I'd ask someone like you who knows MQTT better than I :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If it's not an issue, I'll probably just make it:
if (len(ackmsg.Msgbuf) == 0 || len(ackmsg.Ackbuf) == 0) && ackmsg.Mtype!="PINGREQ"
But so far I have not had the i/o timeout issues I was having before, so I really like this (thanks for this PR!)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i/o timeout issues is case by the server setTimeout , when you have a heartbeat or send message interval less than timeout,it should not apper
the code here is just check the buffer len more than 0,that the message library
miss
you should not get the 0 buffer , so I fill the PINGRESP it in ackqueue.go like below
but if you get the 0 buffer , you should check the message that client send ,
or the server get message
, it may have a bug
by the way , i did not use this in Industrial env
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here is my test example
package main
import (
"flag"
"log"
"os"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/surge/glog"
"github.com/surgemq/message"
"github.com/surgemq/surgemq/service"
)
var addr = "tcp://127.0.0.1:12345"
func main() {
flag.Parse()
defer glog.Flush()
go func() {
srv := service.Server{}
glog.Info("starting MQTT server at :12345")
glog.Fatal(srv.ListenAndServe(addr))
}()
// wait a bit, the internal MQTT server is still starting up
time.Sleep(time.Second)
surgemqClient()
// pahoClient()
}
func surgemqClient() {
client := &service.Client{}
msg := message.NewConnectMessage()
msg.SetVersion(4)
KeepAlive := 40
msg.SetKeepAlive(uint16(KeepAlive))
msg.SetCleanSession(true)
// make the byte like Paho
msg.SetClientId([]byte("Paho"))
if err := client.Connect(addr, msg); err != nil {
glog.Fatal(err)
}
go heartbeatProcess(client, KeepAlive)
glog.Infoln("connected to port 12345")
i := 0
for _ = range time.Tick(time.Second * 1) {
i++
glog.Info(i)
}
}
func pahoClient() {
mqtt.ERROR = log.New(os.Stdout, "", 0)
opts := mqtt.NewClientOptions().AddBroker(addr).SetClientID("Paho")
opts.SetKeepAlive(2 * time.Second)
opts.SetPingTimeout(1 * time.Second)
c := mqtt.NewClient(opts)
if token := c.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
i := 0
for _ = range time.Tick(time.Second * 1) {
i++
glog.Info(i)
}
}
func heartbeatProcess(c *service.Client, KeepAlive int) {
for _ = range time.Tick(time.Duration(KeepAlive) * time.Second) {
c.Ping(func(msg, ack message.Message, err error) error {
glog.Infof("Ping \n")
return nil
})
}
}
for fix #27 #29
add the ping/ack msg to session.Pingack for
panics with slice bounds out of range.