Skip to content

Commit

Permalink
Merge pull request #39 from felixhao/fix_reset
Browse files Browse the repository at this point in the history
reset sub only in need
  • Loading branch information
felixhao authored Dec 17, 2018
2 parents 0cfee0f + 4288f5b commit 14bc9bc
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 8 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# Overlord
## Version 1.5.1
1. reset sub message only in nedd.

## Version 1.5.0
1. refactor message pipeline.
Expand Down
4 changes: 2 additions & 2 deletions proto/memcache/pinger.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
)

const (
pingBufferSize = 8
pingBufferSize = 128
)

var (
Expand All @@ -32,8 +32,8 @@ type mcPinger struct {
func NewPinger(nc *libnet.Conn) proto.Pinger {
return &mcPinger{
conn: nc,
bw: bufio.NewWriter(nc),
br: bufio.NewReader(nc, bufio.NewBuffer(pingBufferSize)),
bw: bufio.NewWriter(nc),
}
}

Expand Down
11 changes: 9 additions & 2 deletions proto/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ func (m *Message) Reset() {
// clear will clean the msg
func (m *Message) clear() {
m.Reset()
m.reqn = 0
m.req = nil
m.wg = nil
m.subs = nil
Expand Down Expand Up @@ -127,7 +128,10 @@ func (m *Message) MarkEnd() {

// ResetSubs will return the Msg data to flush and reset
func (m *Message) ResetSubs() {
for i := range m.subs {
if !m.IsBatch() {
return
}
for i := range m.subs[:m.reqn] {
m.subs[i].Reset()
}
m.reqn = 0
Expand Down Expand Up @@ -226,7 +230,10 @@ func (m *Message) Err() error {
if m.err != nil {
return m.err
}
for _, s := range m.subs {
if !m.IsBatch() {
return nil
}
for _, s := range m.subs[:m.reqn] {
if s.err != nil {
return s.err
}
Expand Down
64 changes: 64 additions & 0 deletions proto/message_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package proto

import (
"errors"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestMessage(t *testing.T) {
msgs := GetMsgs(1)
assert.Len(t, msgs, 1)
PutMsgs(msgs)
msgs = GetMsgs(1, 1)
assert.Len(t, msgs, 1)
PutMsgs(msgs)

msg := NewMessage()
msg.Reset()
msg.clear()
msg.WithRequest(&mockRequest{})
msg.ResetSubs()

msg.WithRequest(&mockRequest{})
req := msg.Request()
assert.NotNil(t, req)
reqs := msg.Requests()
assert.Len(t, reqs, 2)
isb := msg.IsBatch()
assert.True(t, isb)
msgs = msg.Batch()
assert.Len(t, msgs, 2)

msg.ResetSubs()
req = msg.NextReq()
assert.NotNil(t, req)

wg := &sync.WaitGroup{}
msg.WithWaitGroup(wg)
msg.Add()
msg.Done()

msg.MarkStart()
time.Sleep(time.Millisecond * 50)
msg.MarkRead()
time.Sleep(time.Millisecond * 50)
msg.MarkWrite()
time.Sleep(time.Millisecond * 50)
msg.MarkEnd()
ts := msg.TotalDur()
assert.NotZero(t, ts)
ts = msg.RemoteDur()
assert.NotZero(t, ts)

msg.WithError(errors.New("some error"))
err := msg.Err()
assert.EqualError(t, err, "some error")

emsg := ErrMessage(errors.New("some error"))
err = emsg.Err()
assert.EqualError(t, err, "some error")
}
6 changes: 5 additions & 1 deletion proto/redis/pinger.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ import (
"github.com/pkg/errors"
)

const (
pingBufferSize = 128
)

// errors
var (
ErrPingClosed = errs.New("ping interface has been closed")
Expand All @@ -36,7 +40,7 @@ type pinger struct {
func NewPinger(conn *libnet.Conn) proto.Pinger {
return &pinger{
conn: conn,
br: bufio.NewReader(conn, bufio.NewBuffer(7)),
br: bufio.NewReader(conn, bufio.NewBuffer(pingBufferSize)),
bw: bufio.NewWriter(conn),
state: opened,
}
Expand Down
4 changes: 1 addition & 3 deletions proto/redis/pinger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package redis
import (
"testing"

"overlord/lib/bufio"

"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)
Expand All @@ -29,7 +27,7 @@ func TestPingerWrongResp(t *testing.T) {
conn := _createConn([]byte("-Error: iam more than 7 bytes\r\n"))
p := NewPinger(conn)
err := p.Ping()
assert.Equal(t, bufio.ErrBufferFull, errors.Cause(err))
assert.Equal(t, ErrBadPong, errors.Cause(err))

conn = _createConn([]byte("-Err\r\n"))
p = NewPinger(conn)
Expand Down

0 comments on commit 14bc9bc

Please sign in to comment.