From 4b4c3640fc5fc6b418ed4d927ddd5df681bf3563 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=A2=81=E9=AA=8F=E5=AE=87?= Date: Fri, 22 Jul 2016 01:51:19 +0800 Subject: [PATCH] + FIX : add the ping/ack msg to session.Pingack --- service/process.go | 6 ++++++ sessions/ackqueue.go | 12 ++++++++++++ 2 files changed, 18 insertions(+) diff --git a/service/process.go b/service/process.go index bbdc5f3..c640717 100644 --- a/service/process.go +++ b/service/process.go @@ -191,6 +191,12 @@ func (this *service) processIncoming(msg message.Message) error { func (this *service) processAcked(ackq *sessions.Ackqueue) { for _, ackmsg := range ackq.Acked() { // Let's get the messages from the saved message byte slices. + + if len(ackmsg.Msgbuf) == 0 || len(ackmsg.Ackbuf) == 0 { + glog.Errorf("process/processAcked: Unable to decode new %s as ackmsg Msgbuf buf is %v or ackmsg Ackbuf buf is %v", ackmsg.Mtype, ackmsg.Msgbuf, ackmsg.Ackbuf) + continue + } + msg, err := ackmsg.Mtype.New() if err != nil { glog.Errorf("process/processAcked: Unable to creating new %s message: %v", ackmsg.Mtype, err) diff --git a/sessions/ackqueue.go b/sessions/ackqueue.go index ab6206c..1f8a18c 100644 --- a/sessions/ackqueue.go +++ b/sessions/ackqueue.go @@ -128,6 +128,12 @@ func (this *Ackqueue) Wait(msg message.Message, onComplete interface{}) error { State: message.RESERVED, OnComplete: onComplete, } + ml := msg.Len() + this.ping.Msgbuf = make([]byte, ml) + _, err := msg.Encode(this.ping.Msgbuf) + if err != nil { + return err + } default: return errWaitMessage @@ -165,6 +171,12 @@ func (this *Ackqueue) Ack(msg message.Message) error { case message.PINGRESP: if this.ping.Mtype == message.PINGREQ { this.ping.State = message.PINGRESP + ml := msg.Len() + this.ping.Ackbuf = make([]byte, ml) + _, err := msg.Encode(this.ping.Ackbuf) + if err != nil { + return err + } } default: