From f633f8ffbc3c4fddfa54ae12228ce98374cf2954 Mon Sep 17 00:00:00 2001 From: Tanghui Lin Date: Wed, 12 Dec 2018 15:28:20 +0800 Subject: [PATCH 1/8] reset sub only in need --- proto/message.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/proto/message.go b/proto/message.go index 4601b9e8..92cf8ad9 100644 --- a/proto/message.go +++ b/proto/message.go @@ -127,7 +127,7 @@ func (m *Message) MarkEnd() { // ResetSubs will return the Msg data to flush and reset func (m *Message) ResetSubs() { - for i := range m.subs { + for i := range m.subs[:m.reqn] { m.subs[i].Reset() } m.reqn = 0 @@ -226,7 +226,7 @@ func (m *Message) Err() error { if m.err != nil { return m.err } - for _, s := range m.subs { + for _, s := range m.subs[:m.reqn] { if s.err != nil { return s.err } From 4eaaed3da25b926fe1efb8f0364e70b2b44118f5 Mon Sep 17 00:00:00 2001 From: Tanghui Lin Date: Thu, 13 Dec 2018 09:54:46 +0800 Subject: [PATCH 2/8] add change log --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4497ad58..4cc9ebbe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,6 @@ # Overlord +## Version 1.5.1 +1. reset sub message only in nedd. ## Version 1.5.0 1. refactor message pipeline. From e36ce1ac23054e7cd71c84d09d3066ae0616bec2 Mon Sep 17 00:00:00 2001 From: Tanghui Lin Date: Thu, 13 Dec 2018 14:25:41 +0800 Subject: [PATCH 3/8] fix batch err --- proto/message.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/proto/message.go b/proto/message.go index 92cf8ad9..6c14a6e4 100644 --- a/proto/message.go +++ b/proto/message.go @@ -90,6 +90,7 @@ func (m *Message) Reset() { // clear will clean the msg func (m *Message) clear() { m.Reset() + m.reqn = 0 m.req = nil m.wg = nil m.subs = nil @@ -226,6 +227,9 @@ func (m *Message) Err() error { if m.err != nil { return m.err } + if !m.IsBatch() { + return nil + } for _, s := range m.subs[:m.reqn] { if s.err != nil { return s.err From 515207402e26d966fd5bb9ab3a53eea76f68fb49 Mon Sep 17 00:00:00 2001 From: felixhao Date: Thu, 13 Dec 2018 14:55:28 +0800 Subject: [PATCH 4/8] fix batch subs err --- proto/message.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/proto/message.go b/proto/message.go index 6c14a6e4..fc795f15 100644 --- a/proto/message.go +++ b/proto/message.go @@ -128,6 +128,9 @@ func (m *Message) MarkEnd() { // ResetSubs will return the Msg data to flush and reset func (m *Message) ResetSubs() { + if !m.IsBatch() { + return + } for i := range m.subs[:m.reqn] { m.subs[i].Reset() } From c9914c753e26d88697d27874097131494ca425b7 Mon Sep 17 00:00:00 2001 From: felixhao Date: Thu, 13 Dec 2018 15:31:47 +0800 Subject: [PATCH 5/8] add message unit test --- proto/message_test.go | 60 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) create mode 100644 proto/message_test.go diff --git a/proto/message_test.go b/proto/message_test.go new file mode 100644 index 00000000..684373d5 --- /dev/null +++ b/proto/message_test.go @@ -0,0 +1,60 @@ +package proto + +import ( + "errors" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestMessage(t *testing.T) { + msgs := GetMsgs(1) + assert.Len(t, msgs, 1) + PutMsgs(msgs) + msgs = GetMsgs(1, 1) + assert.Len(t, msgs, 1) + PutMsgs(msgs) + + msg := NewMessage() + msg.Reset() + msg.clear() + msg.WithRequest(&mockRequest{}) + msg.ResetSubs() + + msg.WithRequest(&mockRequest{}) + req := msg.Request() + assert.NotNil(t, req) + reqs := msg.Requests() + assert.Len(t, reqs, 2) + isb := msg.IsBatch() + assert.True(t, isb) + msgs = msg.Batch() + assert.Len(t, msgs, 2) + + msg.ResetSubs() + req = msg.NextReq() + assert.NotNil(t, req) + + wg := &sync.WaitGroup{} + msg.WithWaitGroup(wg) + msg.Add() + msg.Done() + + msg.MarkStart() + time.Sleep(time.Millisecond * 50) + msg.MarkRead() + time.Sleep(time.Millisecond * 50) + msg.MarkWrite() + time.Sleep(time.Millisecond * 50) + msg.MarkEnd() + ts := msg.TotalDur() + assert.NotZero(t, ts) + ts = msg.RemoteDur() + assert.NotZero(t, ts) + + msg.WithError(errors.New("some error")) + err := msg.Err() + assert.EqualError(t, err, "some error") +} From 70aea49a4388a4a9a144c72c02801ad7fb95fa66 Mon Sep 17 00:00:00 2001 From: felixhao Date: Thu, 13 Dec 2018 15:33:32 +0800 Subject: [PATCH 6/8] add message unit test --- proto/message_test.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/proto/message_test.go b/proto/message_test.go index 684373d5..1f178516 100644 --- a/proto/message_test.go +++ b/proto/message_test.go @@ -57,4 +57,8 @@ func TestMessage(t *testing.T) { msg.WithError(errors.New("some error")) err := msg.Err() assert.EqualError(t, err, "some error") + + emsg := ErrMessage(errors.New("some error")) + err = emsg.Err() + assert.EqualError(t, err, "some error") } From a1960e636828fd4b504fb945d70d6a56deb48fcf Mon Sep 17 00:00:00 2001 From: felixhao Date: Sat, 15 Dec 2018 16:38:10 +0800 Subject: [PATCH 7/8] ping buffer size 128 --- proto/memcache/pinger.go | 4 ++-- proto/redis/pinger.go | 6 +++++- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/proto/memcache/pinger.go b/proto/memcache/pinger.go index 47e60b08..e9e7f4dc 100644 --- a/proto/memcache/pinger.go +++ b/proto/memcache/pinger.go @@ -12,7 +12,7 @@ import ( ) const ( - pingBufferSize = 8 + pingBufferSize = 128 ) var ( @@ -32,8 +32,8 @@ type mcPinger struct { func NewPinger(nc *libnet.Conn) proto.Pinger { return &mcPinger{ conn: nc, - bw: bufio.NewWriter(nc), br: bufio.NewReader(nc, bufio.NewBuffer(pingBufferSize)), + bw: bufio.NewWriter(nc), } } diff --git a/proto/redis/pinger.go b/proto/redis/pinger.go index 80c44ccd..30b5ab2d 100644 --- a/proto/redis/pinger.go +++ b/proto/redis/pinger.go @@ -12,6 +12,10 @@ import ( "github.com/pkg/errors" ) +const ( + pingBufferSize = 128 +) + // errors var ( ErrPingClosed = errs.New("ping interface has been closed") @@ -36,7 +40,7 @@ type pinger struct { func NewPinger(conn *libnet.Conn) proto.Pinger { return &pinger{ conn: conn, - br: bufio.NewReader(conn, bufio.NewBuffer(7)), + br: bufio.NewReader(conn, bufio.NewBuffer(pingBufferSize)), bw: bufio.NewWriter(conn), state: opened, } From 4288f5b643426c482a64823c28d7dd7d5ebc0880 Mon Sep 17 00:00:00 2001 From: felixhao Date: Sat, 15 Dec 2018 17:14:23 +0800 Subject: [PATCH 8/8] fix redis pinger unit test --- proto/redis/pinger_test.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/proto/redis/pinger_test.go b/proto/redis/pinger_test.go index b242afb1..1daa15c3 100644 --- a/proto/redis/pinger_test.go +++ b/proto/redis/pinger_test.go @@ -3,8 +3,6 @@ package redis import ( "testing" - "overlord/lib/bufio" - "github.com/pkg/errors" "github.com/stretchr/testify/assert" ) @@ -29,7 +27,7 @@ func TestPingerWrongResp(t *testing.T) { conn := _createConn([]byte("-Error: iam more than 7 bytes\r\n")) p := NewPinger(conn) err := p.Ping() - assert.Equal(t, bufio.ErrBufferFull, errors.Cause(err)) + assert.Equal(t, ErrBadPong, errors.Cause(err)) conn = _createConn([]byte("-Err\r\n")) p = NewPinger(conn)