Skip to content

Commit

Permalink
Fix client tests (#8)
Browse files Browse the repository at this point in the history
* Fix client tests

* Reverted to use of NewConnected
* Fixed arguments on some New Client calls

* Optimize conn checking in readLoop

* Add error handling for gearmanClient.Do() call
  • Loading branch information
toli-belo authored Apr 23, 2019
1 parent afa6a31 commit 7f1ca53
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 33 deletions.
9 changes: 7 additions & 2 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,11 +204,12 @@ func (client *Client) writeLoop() {
ibuf := make([]byte, 4)
length := uint32(0)
var i int
chans := client.loadChans()

// Pipeline requests; but only write them one at a time. To allow multiple
// goroutines to all write as quickly as possible, uses a channel and the
// writeLoop lives in a separate goroutine.
for req := range client.loadChans().outbound {
for req := range chans.outbound {

conn := client.loadConn()
if conn == nil {
Expand Down Expand Up @@ -348,8 +349,12 @@ func (client *Client) readLoop() {
var err error
var resp *Response
startConn := client.loadConn()
newConn := startConn
if startConn == nil {
return
}

for startConn == client.loadConn() {
for ; newConn == startConn; newConn = client.loadConn() {

if _, exit := client.readReconnect(startConn, header); exit {
return
Expand Down
40 changes: 13 additions & 27 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"encoding/hex"
"errors"
"fmt"
"github.com/appscode/go/log"
rt "github.com/quantcast/g2/pkg/runtime"
"io"
"net"
Expand Down Expand Up @@ -188,17 +187,9 @@ func drain(observed io.Reader) {

func TestClose(test *testing.T) {

handlerConnOpen := func() (conn net.Conn, err error) {
log.Infoln("Creating net.Pipe connection...")
conn, _ = net.Pipe()
return
}

handlerConnClose := func(conn net.Conn) (err error) {
return conn.Close()
}
client, _ := net.Pipe()

gearmanc := NewClient(handlerConnClose, handlerConnOpen)
gearmanc := NewConnected(client)

if gearmanc.Close() != nil {
test.Fatalf("expected no error in closing connected client")
Expand All @@ -218,17 +209,9 @@ func TestSnapshot(test *testing.T) {
test.Fatalf("error loading snapshot: %s\n", err)
}

handlerConnOpen := func() (conn net.Conn, err error) {
return client, nil // return pre-created pipe client
}

handlerConnClose := func(conn net.Conn) (err error) {
return conn.Close()
}

// This has to be done in another go-routine since all of the reads/writes
// are synchronous
gearmanClient := NewClient(handlerConnClose, handlerConnOpen)
gearmanClient := NewConnected(client)

if err = snapshot.replay(server, "server", "client"); err != nil {
test.Fatalf("error loading snapshot: %s", err)
Expand All @@ -240,14 +223,17 @@ func TestSnapshot(test *testing.T) {

errors := make(chan error)

go gearmanClient.Do("test", payload, rt.JobNormal, func(r *Response) {
if !reflect.DeepEqual(payload, r.Data) {
errors <- fmt.Errorf("\nexpected:\n%s\nobserved:\n%s\n",
hex.Dump(payload), hex.Dump(r.Data))
go func() {
if _, err := gearmanClient.Do("test", payload, rt.JobNormal, func(r *Response) {
if !reflect.DeepEqual(payload, r.Data) {
errors <- fmt.Errorf("\nexpected:\n%s\nobserved:\n%s\n",
hex.Dump(payload), hex.Dump(r.Data))
}
close(errors)
}); err != nil {
errors <- fmt.Errorf("\nError:%v", err.Error())
}

close(errors)
})
}()

for err := range errors {
test.Fatalf("error: %s", err)
Expand Down
2 changes: 1 addition & 1 deletion example/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func main() {
// by implementing IdGenerator interface.
// client.IdGen = client.NewAutoIncId()

c, err := client.NewNetClient(rt.Network, "127.0.0.1:4730")
c, err := client.New(rt.Network, "127.0.0.1:4730", nil)
if err != nil {
log.Fatalln(err)
}
Expand Down
4 changes: 1 addition & 3 deletions example/client/persistent_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,7 @@ func main() {
// by implementing IdGenerator interface.
// client.IdGen = client.NewAutoIncId()

logs.InitLogs()
logs.FlushLogs()
c, err := client.NewNetClient(rt.Network, "127.0.0.1:4730")
c, err := client.New(rt.Network, "127.0.0.1:4730", logHandler)
if err != nil {
log.Fatalln(err)
}
Expand Down

0 comments on commit 7f1ca53

Please sign in to comment.