From ec987fb01dbad18269029bc8731c31e25d9f27d3 Mon Sep 17 00:00:00 2001 From: Anatolii B Date: Fri, 10 May 2019 15:12:25 -0700 Subject: [PATCH] Fix send/receive synchronization race condition (#12) * Fix send/receive synchronization race condition * restore cleanup and submitJob name * Rename submitJob * Add locking of job submission to single-thread it * Add some logging * Fix premature dequeuing issue * Remove extra logging * Remove submit locks * Remove drain debug logging * rename constants and change queue size to 8 --- client/client.go | 70 +++++++++++++++++++++++++++++------------------ client/request.go | 16 ++++++++--- 2 files changed, 55 insertions(+), 31 deletions(-) diff --git a/client/client.go b/client/client.go index c985849..9ef7e73 100644 --- a/client/client.go +++ b/client/client.go @@ -25,7 +25,8 @@ var ( ) const ( - WORK_HANDLE_DELAY_MS = 5 // milliseconds delay for re-try processing of work completion requests if handler hasn't been yet stored in hash map. + WorkHandleDelay = 5 // milliseconds delay for re-try processing of work completion requests if handler hasn't been yet stored in hash map. + InProgressQueueSize = 8 ) type connection struct { @@ -39,8 +40,8 @@ type connection struct { } type channels struct { - outbound chan *request - expected chan *Response + inProgress chan *request + outbound chan *request } type ConnCloseHandler func(conn net.Conn) (err error) @@ -165,12 +166,10 @@ func NewClient(connCloseHandler ConnCloseHandler, addr := conn.RemoteAddr() client = &Client{ - net: addr.Network(), - addr: addr.String(), - conn: &connection{Conn: conn}, - chans: &channels{ - expected: make(chan *Response), - outbound: make(chan *request)}, + net: addr.Network(), + addr: addr.String(), + conn: &connection{Conn: conn}, + chans: &channels{outbound: make(chan *request), inProgress: make(chan *request, InProgressQueueSize)}, ResponseTimeout: DefaultTimeout, responsePool: &sync.Pool{New: func() interface{} { return &Response{} }}, requestPool: &sync.Pool{New: func() interface{} { return &request{} }}, @@ -217,6 +216,7 @@ func (client *Client) writeLoop() { conn := client.loadConn() if conn == nil { + req.close() client.requestPool.Put(req) return } @@ -252,7 +252,7 @@ func (client *Client) writeLoop() { } } - client.requestPool.Put(req) + chans.inProgress <- req } } @@ -301,7 +301,8 @@ func (client *Client) reconnect(err error) error { } oldChans := client.loadChans() - close(oldChans.expected) + close(oldChans.inProgress) + client.drainInProgress() close(oldChans.outbound) conn, err := client.connOpenHandler() @@ -321,7 +322,7 @@ func (client *Client) reconnect(err error) error { // replace closed channels with new ones _ = (*channels)(atomic.SwapPointer( (*unsafe.Pointer)(unsafe.Pointer(&client.chans)), - unsafe.Pointer(&channels{expected: make(chan *Response), outbound: make(chan *request)}))) + unsafe.Pointer(&channels{outbound: make(chan *request), inProgress: make(chan *request, InProgressQueueSize)}))) go client.readLoop() go client.writeLoop() @@ -329,6 +330,18 @@ func (client *Client) reconnect(err error) error { return nil } +func (client *Client) drainInProgress() { + defer func() { + recover() + }() + + for req := range client.chans.inProgress { + req.close() + client.requestPool.Put(req) // recycle here since it didn't get to be processed + } + +} + func (client *Client) loadConn() *connection { return (*connection)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&client.conn)))) } @@ -407,6 +420,7 @@ func (client *Client) readLoop() { } client.process(resp) + } } @@ -417,13 +431,14 @@ func (client *Client) process(resp *Response) { // terminally should return it here. switch resp.DataType { case rt.PT_Error: - client.err(getError(resp.Data)) - - client.loadChans().expected <- resp - + fallthrough case rt.PT_StatusRes, rt.PT_JobCreated, rt.PT_EchoRes: - client.loadChans().expected <- resp + req := <-client.loadChans().inProgress + // recycle the request object, it's 2nd life has ended + req.expected <- resp + req.close() + client.requestPool.Put(req) case rt.PT_WorkComplete, rt.PT_WorkFail, rt.PT_WorkException: defer client.handlers.Delete(resp.Handle) fallthrough @@ -434,7 +449,7 @@ func (client *Client) process(resp *Response) { var ok bool if handler, ok = client.handlers.Load(resp.Handle); !ok { // possibly the response arrived faster than the job handler was added to client.handlers, we'll wait a bit and give it another try - time.Sleep(WORK_HANDLE_DELAY_MS * time.Millisecond) + time.Sleep(WorkHandleDelay * time.Millisecond) if handler, ok = client.handlers.Load(resp.Handle); !ok { client.err(errors.New(fmt.Sprintf("unexpected %s response for \"%s\" with no handler", resp.DataType, resp.Handle))) } @@ -463,7 +478,7 @@ func (client *Client) request() *request { return client.requestPool.Get().(*request) } -func (client *Client) submit(pt rt.PT, funcname string, payload []byte) (handle string, err error) { +func (client *Client) submit(reqType rt.PT, funcname string, payload []byte) (handle string, err error) { defer func() { if e := safeCastError(recover(), "panic in submit()"); e != nil { @@ -472,9 +487,10 @@ func (client *Client) submit(pt rt.PT, funcname string, payload []byte) (handle }() chans := client.loadChans() - chans.outbound <- client.request().submitJob(pt, funcname, IdGen.Id(), payload) + req := client.request().submitJob(reqType, funcname, IdGen.Id(), payload) + chans.outbound <- req - if res := <-chans.expected; res != nil { + if res := <-req.expected; res != nil { var err error if res.DataType == rt.PT_Error { err = getError(res.Data) @@ -582,9 +598,10 @@ func (client *Client) Status(handle string) (status *Status, err error) { }() chans := client.loadChans() - chans.outbound <- client.request().status(handle) + req := client.request().status(handle) + chans.outbound <- req - res := <-chans.expected + res := <-req.expected if res == nil { return nil, errors.New("Status response queue is empty, please resend") @@ -592,7 +609,6 @@ func (client *Client) Status(handle string) (status *Status, err error) { status, err = res.Status() client.responsePool.Put(res) - return } @@ -606,16 +622,16 @@ func (client *Client) Echo(data []byte) (echo []byte, err error) { }() chans := client.loadChans() - chans.outbound <- client.request().echo(data) + req := client.request().echo(data) + chans.outbound <- req - res := <-chans.expected + res := <-req.expected if res == nil { return nil, errors.New("Echo request got empty response, please resend") } echo = res.Data - client.responsePool.Put(res) return diff --git a/client/request.go b/client/request.go index 9caee8b..2c1c6fe 100644 --- a/client/request.go +++ b/client/request.go @@ -6,17 +6,25 @@ import ( // Request from client type request struct { - pt rt.PT - data [][]byte + pt rt.PT + expected chan *Response + data [][]byte +} + +func (req *request) close() { + if req.expected != nil { + close(req.expected) + } } func (req *request) args(args ...[]byte) { req.data = req.data[:0] req.data = append(req.data, args...) + req.expected = make(chan *Response, 1) } -func (req *request) submitJob(pt rt.PT, funcname, id string, arg []byte) *request { - req.pt = pt +func (req *request) submitJob(reqType rt.PT, funcname, id string, arg []byte) *request { + req.pt = reqType req.args([]byte(funcname), []byte(id), arg)