Skip to content

Commit

Permalink
Enable worker to reconnect to gearman indefinitely (#7)
Browse files Browse the repository at this point in the history
* Add agent reference to ErrorHandler args to allow specific connection recovery

* Chang disconnect logic to attempt reconnecting to server indefinitely

* Change Agent struct back to un-exported "agent"

* Change Agent member back to un-exported "agent"

* Removed outdated comment

* Rename agent.Addr to agent.addr since don't need it to be exported

* Add/update persistent reconnect behavior to client, worker

* Add persistent initial connection to worker, consolidate Connect() and Reconnect()
* Add persistent initial connection and re-connect behavior to client
* Replace NewConnected client constructor with a universal version which takes
   connection open/close handlers as arguments.
* Add io error handling in a few io places where it was missing

* Update readme with updated function name

* Add a sense of ownership of which thread (read or write) is reconnecting

Problem: suppose readLoop() encountered an error on the connection,
it starts the reconnect process, but suppose at nearly the same time the
writeLoop() thread was using the same connection handle and also encountered
an error (since connection is broken). This fix is to prevent both of them attempting
to reconnect and have one of them claim the ownership of reconnect process
and have the other thread merely wait for it to complete and exit reconnect,
and then go on with using the new conn handle via getConn().

* Fix handle naming to camel case

* Remove orphaned comment

* Implement erroring out of orphaned waiters on chan expected by using channel close

Also changed couple of error events in agent and worker to cause reconnection.

* Some logging for investigation

* Make sure worker purges orphaned tasks when associated gearman dies

* Allow Write to bypass the agent lock

As result the previous Write() function becams identical to unexported write() so we export it.

* Rearrange agent locking and Grab() placement.

Make sure Grab is called after each successful execution.
NOTE: that it's also called when reconnections are made.

* Remove appscode/go/log, and remove got handle and got result messages

* Address PR comments.

* Revert Split to SplitN etc in response.go
* Introduce a writeReconnectCleanup function with a compact write/reconnect/cleanup op
* Add a todo for a more sophisticated gearman connection health checker
* Remove  commented out code

* Change usages of log in worker and client to new logHandler, make it optional to pass

Also rename some handles to match a common pattern such as [sS]omethingHandler

* Fix writeLoop exit on reconnect

* Work around using the Lock in client, optimize read and write loop reconnect behavior

* Streamline locking and new job grab/requesting

Add current jobs count for diagnostics

* Make improvements based on PR feedback.

* move channels back into client
* restore NewConnected constructor
* add safe error casting for recover output
* atomic loads for conn

* Update on PR comments, load atomics

* Move channel close to go after conn close

* Add conn comparison in read loop

* Make sure write loop checks connection before series of writes for a single transmission

This is to help the integrity of the writes in case reconnects occur mid-transmission

* Change some names to camelCase

* Close outbound and expected after they are replaced, make access atomic.

* Move channel close back up before reconnecting

Reason is we want any hung up submit, echo, status callers to
error out before we attempt reconnecting because it might be minutes
before the gearman is back up and we don't want them hanging.
  • Loading branch information
toli-belo authored Apr 17, 2019
1 parent 08663d4 commit afa6a31
Show file tree
Hide file tree
Showing 14 changed files with 715 additions and 249 deletions.
371 changes: 277 additions & 94 deletions client/client.go

Large diffs are not rendered by default.

27 changes: 22 additions & 5 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ import (
"encoding/hex"
"errors"
"fmt"
"github.com/appscode/go/log"
rt "github.com/quantcast/g2/pkg/runtime"
"io"
"net"
"os"
"reflect"
"strings"
"testing"

rt "github.com/quantcast/g2/pkg/runtime"
)

type snapshot struct {
Expand Down Expand Up @@ -187,9 +187,18 @@ func drain(observed io.Reader) {
}

func TestClose(test *testing.T) {
client, _ := net.Pipe()

gearmanc := NewConnected(client)
handlerConnOpen := func() (conn net.Conn, err error) {
log.Infoln("Creating net.Pipe connection...")
conn, _ = net.Pipe()
return
}

handlerConnClose := func(conn net.Conn) (err error) {
return conn.Close()
}

gearmanc := NewClient(handlerConnClose, handlerConnOpen)

if gearmanc.Close() != nil {
test.Fatalf("expected no error in closing connected client")
Expand All @@ -209,9 +218,17 @@ func TestSnapshot(test *testing.T) {
test.Fatalf("error loading snapshot: %s\n", err)
}

handlerConnOpen := func() (conn net.Conn, err error) {
return client, nil // return pre-created pipe client
}

handlerConnClose := func(conn net.Conn) (err error) {
return conn.Close()
}

// This has to be done in another go-routine since all of the reads/writes
// are synchronous
gearmanClient := NewConnected(client)
gearmanClient := NewClient(handlerConnClose, handlerConnOpen)

if err = snapshot.replay(server, "server", "client"); err != nil {
test.Fatalf("error loading snapshot: %s", err)
Expand Down
10 changes: 10 additions & 0 deletions client/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,13 @@ func getError(data []byte) (err error) {

// Error handler
type ErrorHandler func(error)

func safeCastError(e interface{}, defaultMessage string) error {
if e == nil {
return nil
}
if err, ok := e.(error); ok {
return err
}
return errors.New(defaultMessage)
}
2 changes: 1 addition & 1 deletion client/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (pool *Pool) Add(net, addr string, rate int) (err error) {
item.Rate = rate
} else {
var client *Client
client, err = New(net, addr)
client, err = New(net, addr, nil)
if err == nil {
item = &PoolClient{Client: client, Rate: rate}
pool.Clients[addr] = item
Expand Down
2 changes: 1 addition & 1 deletion client/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func (resp *Response) Update() (data []byte, err error) {
func (resp *Response) Status() (status *Status, err error) {
data := bytes.SplitN(resp.Data, []byte{'\x00'}, 2)
if len(data) != 2 {
err = fmt.Errorf("Invalid data: %v", resp.Data)
err = fmt.Errorf("Invalid data: %v, split resulted in fewer than 2 elements", resp.Data)
return
}
status = &Status{}
Expand Down
2 changes: 1 addition & 1 deletion example/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func main() {
// by implementing IdGenerator interface.
// client.IdGen = client.NewAutoIncId()

c, err := client.New(rt.Network, "127.0.0.1:4730")
c, err := client.NewNetClient(rt.Network, "127.0.0.1:4730")
if err != nil {
log.Fatalln(err)
}
Expand Down
124 changes: 124 additions & 0 deletions example/client/persistent_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package main

import (
"log"
"time"

"github.com/quantcast/g2/client"
rt "github.com/quantcast/g2/pkg/runtime"
)

func logHandler(level client.LogLevel, message ...string) {
switch level {
case client.Error:
log.Println("Error:", message)
case client.Warning:
log.Println("Warning", message)
case client.Info:
log.Println("Info:", message)
case client.Debug:
log.Println("Debug", message)
}
}

func main() {
// Set the autoinc id generator
// You can write your own id generator
// by implementing IdGenerator interface.
// client.IdGen = client.NewAutoIncId()

logs.InitLogs()
logs.FlushLogs()
c, err := client.NewNetClient(rt.Network, "127.0.0.1:4730")
if err != nil {
log.Fatalln(err)
}
defer c.Close()
c.ErrorHandler = func(e error) {
log.Println("ErrorHandler Received:", e)
}
echo := []byte("Hello\x00 world")
echomsg, err := c.Echo(echo)
if err != nil {
log.Printf("Error in Echo:", err)
} else {
log.Println("EchoMsg:", string(echomsg))
}

print_result := true
print_update := false
print_status := false

jobHandler := func(resp *client.Response) {
switch resp.DataType {
case rt.PT_WorkException:
fallthrough
case rt.PT_WorkFail:
fallthrough
case rt.PT_WorkComplete:
if print_result {
if data, err := resp.Result(); err == nil {
log.Printf("RESULT: %v, string:%v\n", data, string(data))
} else {
log.Printf("RESULT: %s\n", err)
}
}
case rt.PT_WorkWarning:
fallthrough
case rt.PT_WorkData:
if print_update {
if data, err := resp.Update(); err == nil {
log.Printf("UPDATE: %v\n", data)
} else {
log.Printf("UPDATE: %v, %s\n", data, err)
}
}
case rt.PT_WorkStatus:
if print_status {
if data, err := resp.Status(); err == nil {
log.Printf("STATUS: %v\n", data)
} else {
log.Printf("STATUS: %s\n", err)
}
}
default:
log.Printf("UNKNOWN: %v", resp.Data)
}
}

log.Println("Press Ctrl-C to exit ...")

for i := 0; ; i++ {

if !c.IsConnectionSet() {
log.Printf("No active connection to server.. waiting...")
time.Sleep(5 * time.Second)
continue
}

funcName := "ToUpper"
log.Println("Calling function", funcName, "with data:", echo)
handle, err := c.Do(funcName, echo, rt.JobNormal, jobHandler)
if err != nil {
log.Printf("Do %v ERROR:", funcName, err)
}

log.Printf("Calling Status for handle %v", handle)
status, err := c.Status(handle)
if err != nil {
log.Printf("Status: %v, ERROR: %v", status, err)
}

funcName = "Foobar"
log.Println("Calling function", funcName, "with data:", echo)
_, err = c.Do(funcName, echo, rt.JobNormal, jobHandler)
if err != nil {
log.Printf("Do %v ERROR:", funcName, err)
}
var sleep_seconds int = 0
log.Printf("Finished Cycle %v, sleeping %v seconds", i, sleep_seconds)
time.Sleep(time.Duration(sleep_seconds) * time.Second)

}

}
28 changes: 26 additions & 2 deletions example/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import (
"strings"
"time"

"github.com/quantcast/g2/worker"
"github.com/mikespook/golib/signal"
"github.com/quantcast/g2/worker"
)

func ToUpper(job worker.Job) ([]byte, error) {
Expand All @@ -34,13 +34,30 @@ func Foobar(job worker.Job) ([]byte, error) {
return job.Data(), nil
}

func logHandler(level worker.LogLevel, message ...string) {
switch level {
case worker.Error:
log.Println("Error:", message)
case worker.Warning:
log.Println("Warning", message)
case worker.Info:
log.Println("Info:", message)
case worker.Debug:
log.Println("Debug", message)
}
}

func main() {
log.Println("Starting ...")
defer log.Println("Shutdown complete!")

w := worker.New(worker.Unlimited)
w.SetLogHandler(logHandler)

defer w.Close()
w.ErrorHandler = func(e error) {
log.Println(e)
log.Println("ErrorHandler Received:", e)

if opErr, ok := e.(*net.OpError); ok {
if !opErr.Temporary() {
proc, err := os.FindProcess(os.Getpid())
Expand All @@ -53,6 +70,7 @@ func main() {
}
}
}

w.JobHandler = func(job worker.Job) error {
log.Printf("Data=%s\n", job.Data())
return nil
Expand All @@ -69,6 +87,12 @@ func main() {
return
}
go w.Work()

ticker := time.Tick(10 * time.Second)
for _ = range ticker {
activeJobs := w.GetActiveJobCount()
log.Printf("Current job count: %v", activeJobs)
}
signal.Bind(os.Interrupt, func() uint { return signal.BreakExit })
signal.Wait()
}
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ require (
github.com/appscode/go v0.0.0-20180628092646-df3c57fca2be
github.com/appscode/pat v0.0.0-20170521084856-48ff78925b79
github.com/beorn7/perks v0.0.0-20160229213445-3ac7bf7a47d1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/glog v0.0.0-20141105023935-44145f04b68c // indirect
github.com/golang/protobuf v1.3.1 // indirect
github.com/golang/snappy v0.0.0-20160529050041-d9eb7a3d35ec // indirect
github.com/google/uuid v0.0.0-20171113160352-8c31c18f31ed // indirect
github.com/inconshreveable/mousetrap v1.0.0 // indirect
Expand All @@ -21,7 +23,6 @@ require (
github.com/spf13/pflag v1.0.1
github.com/stretchr/testify v1.3.0
github.com/syndtr/goleveldb v0.0.0-20180815032940-ae2bd5eed72d
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4 // indirect
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
gopkg.in/mgo.v2 v2.0.0-20180705113604-9856a29383ce // indirect
gopkg.in/robfig/cron.v2 v2.0.0-20150107220207-be2e0b0deed5
Expand Down
Loading

0 comments on commit afa6a31

Please sign in to comment.