Skip to content

Commit

Permalink
Merge pull request basho#57 from basho/features/lrb/bashogh-55-per-co…
Browse files Browse the repository at this point in the history
…mmand-timeouts

Use the greater of operation or socket timeout
  • Loading branch information
lukebakken committed Feb 19, 2016
2 parents 987b29f + 7f528c1 commit 746be30
Show file tree
Hide file tree
Showing 20 changed files with 235 additions and 106 deletions.
File renamed without changes.
11 changes: 11 additions & 0 deletions .github/ISSUE_TEMPLATE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
Please ensure the following information is supplied for new issues:

- [ ] Riak Go Client version
- [ ] Golang
- [ ] Riak version
- [ ] Operating System / Distribution & Version
- [ ] Riak `error.log` file, if applicable
- [ ] What commands were being executed during the error
- [ ] What you *expected* to have happen

Thank you!
8 changes: 8 additions & 0 deletions .github/PULL_REQUEST_TEMPLATE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
Please ensure the following is present in your Pull Request:

- [ ] Unit tests for your change
- [ ] Integration tests (if applicable)

Pull Requests that are small and limited in scope are most welcome.

Thank you!
6 changes: 3 additions & 3 deletions cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func TestAddAndRemoveNodeFromCluster(t *testing.T) {

node := c.nodes[0]
// re-adding same node instance won't add it
if err := c.AddNode(node); err != nil {
if err = c.AddNode(node); err != nil {
t.Fatal(err)
}
if expected, actual := 1, len(c.nodes); expected != actual {
Expand All @@ -94,7 +94,7 @@ func TestAddAndRemoveNodeFromCluster(t *testing.T) {
if n, err = NewNode(opts); err != nil {
t.Fatal(err)
} else {
if err := c.AddNode(n); err != nil {
if err = c.AddNode(n); err != nil {
t.Fatal(err)
}
if port == portToRemove {
Expand All @@ -114,7 +114,7 @@ func TestAddAndRemoveNodeFromCluster(t *testing.T) {
}
}
// remove node with port 10027
if err := c.RemoveNode(nodeToRemove); err != nil {
if err = c.RemoveNode(nodeToRemove); err != nil {
t.Fatal(err)
}
if expected, actual := 4, len(c.nodes); expected != actual {
Expand Down
5 changes: 5 additions & 0 deletions command.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"encoding/binary"
"fmt"
"time"

proto "github.com/golang/protobuf/proto"
)
Expand All @@ -18,6 +19,10 @@ type streamingCommand interface {
isDone() bool
}

type timeoutCommand interface {
getTimeout() time.Duration
}

// Command interface enforces proper structure of a Command object
type Command interface {
Name() string
Expand Down
9 changes: 9 additions & 0 deletions command_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package riak
import (
"fmt"
"sync/atomic"
"time"
)

var c uint64 = 0
Expand Down Expand Up @@ -68,3 +69,11 @@ func (cmd *commandImpl) setLastNode(lastNode *Node) {
func (cmd *commandImpl) getLastNode() *Node {
return cmd.lastNode
}

type timeoutImpl struct {
timeout time.Duration
}

func (cmd *timeoutImpl) getTimeout() time.Duration {
return cmd.timeout
}
28 changes: 19 additions & 9 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,24 @@ func (c *connection) execute(cmd Command) (err error) {
return
}

if err = c.write(message); err != nil {
// Use the *greater* of the connection's request timeout
// or the Command's timeout
t := c.requestTimeout
if tc, ok := cmd.(timeoutCommand); ok {
tc := tc.getTimeout()
if tc > t {
t = tc
}
}

if err = c.write(message, t); err != nil {
return
}

var response []byte
var decoded proto.Message
for {
response, err = c.read() // NB: response *will* have entire pb message
response, err = c.read(t) // NB: response *will* have entire pb message
if err != nil {
cmd.onError(err)
return
Expand Down Expand Up @@ -206,12 +216,12 @@ func (c *connection) execute(cmd Command) (err error) {

// FUTURE: we should also take currently executing Command (Riak operation)
// timeout into account
func (c *connection) setReadDeadline() {
c.conn.SetReadDeadline(time.Now().Add(c.requestTimeout))
func (c *connection) setReadDeadline(t time.Duration) {
c.conn.SetReadDeadline(time.Now().Add(t))
}

// NB: This will read one full pb message from Riak, or error in doing so
func (c *connection) read() ([]byte, error) {
func (c *connection) read(t time.Duration) ([]byte, error) {
if !c.available() {
return nil, ErrCannotRead
}
Expand All @@ -220,7 +230,7 @@ func (c *connection) read() ([]byte, error) {
var count int
var messageLength uint32

c.setReadDeadline()
c.setReadDeadline(t)
if count, err = io.ReadFull(c.conn, c.sizeBuf); err == nil && count == 4 {
messageLength = binary.BigEndian.Uint32(c.sizeBuf)
if messageLength > uint32(cap(c.dataBuf)) {
Expand All @@ -230,7 +240,7 @@ func (c *connection) read() ([]byte, error) {
c.dataBuf = c.dataBuf[0:messageLength]
}
// FUTURE: large object warning / error
c.setReadDeadline()
c.setReadDeadline(t)
count, err = io.ReadFull(c.conn, c.dataBuf)
} else {
if err == nil && count != 4 {
Expand All @@ -252,12 +262,12 @@ func (c *connection) read() ([]byte, error) {
}
}

func (c *connection) write(data []byte) error {
func (c *connection) write(data []byte, t time.Duration) error {
if !c.available() {
return ErrCannotWrite
}
// FUTURE: we should also take currently executing Command (Riak operation) timeout into account
c.conn.SetWriteDeadline(time.Now().Add(c.requestTimeout))
c.conn.SetWriteDeadline(time.Now().Add(t))
count, err := c.conn.Write(data)
if err != nil {
logDebug("[Connection]", "error in write: '%v'", err)
Expand Down
Loading

0 comments on commit 746be30

Please sign in to comment.