From 7f1ca537a7eae23a22bffd5a7f771dd6b37f7163 Mon Sep 17 00:00:00 2001 From: Anatolii B Date: Tue, 23 Apr 2019 10:44:30 -0700 Subject: [PATCH] Fix client tests (#8) * Fix client tests * Reverted to use of NewConnected * Fixed arguments on some New Client calls * Optimize conn checking in readLoop * Add error handling for gearmanClient.Do() call --- client/client.go | 9 +++++-- client/client_test.go | 40 ++++++++++------------------- example/client/client.go | 2 +- example/client/persistent_client.go | 4 +-- 4 files changed, 22 insertions(+), 33 deletions(-) diff --git a/client/client.go b/client/client.go index 9bf3603..b94a21c 100644 --- a/client/client.go +++ b/client/client.go @@ -204,11 +204,12 @@ func (client *Client) writeLoop() { ibuf := make([]byte, 4) length := uint32(0) var i int + chans := client.loadChans() // Pipeline requests; but only write them one at a time. To allow multiple // goroutines to all write as quickly as possible, uses a channel and the // writeLoop lives in a separate goroutine. - for req := range client.loadChans().outbound { + for req := range chans.outbound { conn := client.loadConn() if conn == nil { @@ -348,8 +349,12 @@ func (client *Client) readLoop() { var err error var resp *Response startConn := client.loadConn() + newConn := startConn + if startConn == nil { + return + } - for startConn == client.loadConn() { + for ; newConn == startConn; newConn = client.loadConn() { if _, exit := client.readReconnect(startConn, header); exit { return diff --git a/client/client_test.go b/client/client_test.go index bb9ec3f..ca3d38e 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -6,7 +6,6 @@ import ( "encoding/hex" "errors" "fmt" - "github.com/appscode/go/log" rt "github.com/quantcast/g2/pkg/runtime" "io" "net" @@ -188,17 +187,9 @@ func drain(observed io.Reader) { func TestClose(test *testing.T) { - handlerConnOpen := func() (conn net.Conn, err error) { - log.Infoln("Creating net.Pipe connection...") - conn, _ = net.Pipe() - return - } - - handlerConnClose := func(conn net.Conn) (err error) { - return conn.Close() - } + client, _ := net.Pipe() - gearmanc := NewClient(handlerConnClose, handlerConnOpen) + gearmanc := NewConnected(client) if gearmanc.Close() != nil { test.Fatalf("expected no error in closing connected client") @@ -218,17 +209,9 @@ func TestSnapshot(test *testing.T) { test.Fatalf("error loading snapshot: %s\n", err) } - handlerConnOpen := func() (conn net.Conn, err error) { - return client, nil // return pre-created pipe client - } - - handlerConnClose := func(conn net.Conn) (err error) { - return conn.Close() - } - // This has to be done in another go-routine since all of the reads/writes // are synchronous - gearmanClient := NewClient(handlerConnClose, handlerConnOpen) + gearmanClient := NewConnected(client) if err = snapshot.replay(server, "server", "client"); err != nil { test.Fatalf("error loading snapshot: %s", err) @@ -240,14 +223,17 @@ func TestSnapshot(test *testing.T) { errors := make(chan error) - go gearmanClient.Do("test", payload, rt.JobNormal, func(r *Response) { - if !reflect.DeepEqual(payload, r.Data) { - errors <- fmt.Errorf("\nexpected:\n%s\nobserved:\n%s\n", - hex.Dump(payload), hex.Dump(r.Data)) + go func() { + if _, err := gearmanClient.Do("test", payload, rt.JobNormal, func(r *Response) { + if !reflect.DeepEqual(payload, r.Data) { + errors <- fmt.Errorf("\nexpected:\n%s\nobserved:\n%s\n", + hex.Dump(payload), hex.Dump(r.Data)) + } + close(errors) + }); err != nil { + errors <- fmt.Errorf("\nError:%v", err.Error()) } - - close(errors) - }) + }() for err := range errors { test.Fatalf("error: %s", err) diff --git a/example/client/client.go b/example/client/client.go index 329f098..3e2ac8a 100644 --- a/example/client/client.go +++ b/example/client/client.go @@ -15,7 +15,7 @@ func main() { // by implementing IdGenerator interface. // client.IdGen = client.NewAutoIncId() - c, err := client.NewNetClient(rt.Network, "127.0.0.1:4730") + c, err := client.New(rt.Network, "127.0.0.1:4730", nil) if err != nil { log.Fatalln(err) } diff --git a/example/client/persistent_client.go b/example/client/persistent_client.go index bd934f6..db20d39 100644 --- a/example/client/persistent_client.go +++ b/example/client/persistent_client.go @@ -27,9 +27,7 @@ func main() { // by implementing IdGenerator interface. // client.IdGen = client.NewAutoIncId() - logs.InitLogs() - logs.FlushLogs() - c, err := client.NewNetClient(rt.Network, "127.0.0.1:4730") + c, err := client.New(rt.Network, "127.0.0.1:4730", logHandler) if err != nil { log.Fatalln(err) }