From eb9a8037be064c90d3aa08d336e39c1196f668cc Mon Sep 17 00:00:00 2001 From: Dylan Date: Fri, 13 May 2022 14:29:21 +0100 Subject: [PATCH] Async subscriptions (#43) NATS Subscriptions don't call handlers in goroutines by default, so we need to change this to spawn goroutines for handlers --- engine.go | 15 ++++++++++----- requests.go | 10 ++++++++-- requests_test.go | 26 +++++++++++++------------- 3 files changed, 31 insertions(+), 20 deletions(-) diff --git a/engine.go b/engine.go index 655a5c3..7da9b50 100644 --- a/engine.go +++ b/engine.go @@ -385,7 +385,7 @@ func (e *Engine) connect() error { return err } - err = e.subscribe("cancel.all", e.CancelItemRequestHandler) + err = e.subscribe("cancel.all", e.CancelHandler) if err != nil { return err @@ -423,11 +423,11 @@ func (e *Engine) connect() error { // Now actually create the required subscriptions if wildcardExists { e.subscribe("request.context.>", e.ItemRequestHandler) - e.subscribe("cancel.context.>", e.CancelItemRequestHandler) + e.subscribe("cancel.context.>", e.CancelHandler) } else { for suffix := range subscriptionMap { e.subscribe(fmt.Sprintf("request.context.%v", suffix), e.ItemRequestHandler) - e.subscribe(fmt.Sprintf("cancel.context.%v", suffix), e.CancelItemRequestHandler) + e.subscribe(fmt.Sprintf("cancel.context.%v", suffix), e.CancelHandler) } } @@ -566,8 +566,13 @@ func (e *Engine) IsNATSConnected() bool { return false } -// CancelItemRequestHandler Takes a CancelItemRequest and cancels that request if it exists -func (e *Engine) CancelItemRequestHandler(cancelRequest *sdp.CancelItemRequest) { +// CancelHandler calls HandleCancelItemRequest in a goroutine +func (e *Engine) CancelHandler(cancelRequest *sdp.CancelItemRequest) { + go e.HandleCancelItemRequest(cancelRequest) +} + +// HandleCancelItemRequest Takes a CancelItemRequest and cancels that request if it exists +func (e *Engine) HandleCancelItemRequest(cancelRequest *sdp.CancelItemRequest) { u, err := uuid.FromBytes(cancelRequest.UUID) if err != nil { diff --git a/requests.go b/requests.go index 83f5e04..c400712 100644 --- a/requests.go +++ b/requests.go @@ -26,9 +26,15 @@ func NewResponseSubject() string { return fmt.Sprintf("return.response.%v", nats.NewInbox()) } -// NewItemRequestHandler Returns a function whose job is to handle a single -// request. This includes responses, linking etc. +// ItemRequestHandler Calls HandleItemRequest but in a goroutine so that it can +// happen in parallel func (e *Engine) ItemRequestHandler(itemRequest *sdp.ItemRequest) { + go e.HandleItemRequest(itemRequest) +} + +// HandleItemRequest Handles a single request. This includes responses, linking +// etc. +func (e *Engine) HandleItemRequest(itemRequest *sdp.ItemRequest) { if !e.WillRespond(itemRequest) { // If we don't have any relevant sources, exit return diff --git a/requests_test.go b/requests_test.go index 228badd..eb27870 100644 --- a/requests_test.go +++ b/requests_test.go @@ -149,7 +149,7 @@ func TestExecuteRequest(t *testing.T) { } -func TestNewItemRequestHandler(t *testing.T) { +func TestHandleItemRequest(t *testing.T) { e := Engine{ Name: "test", } @@ -188,7 +188,7 @@ func TestNewItemRequestHandler(t *testing.T) { } // Run the handler - e.ItemRequestHandler(&req) + e.HandleItemRequest(&req) // I'm expecting both sources to get a request since the type was * if l := len(personSource.GetCalls); l != 1 { @@ -215,7 +215,7 @@ func TestNewItemRequestHandler(t *testing.T) { } // Run the handler - e.ItemRequestHandler(&req) + e.HandleItemRequest(&req) if l := len(personSource.GetCalls); l != 2 { t.Errorf("expected person backend to have 2 Get calls, got %v", l) @@ -252,7 +252,7 @@ func TestWildcardSourceExpansion(t *testing.T) { } // Run the handler - e.ItemRequestHandler(&req) + e.HandleItemRequest(&req) if len(personSource.GetCalls) != 1 { t.Errorf("expected 1 get call got %v", len(personSource.GetCalls)) @@ -356,7 +356,7 @@ func TestExpandRequest(t *testing.T) { e.AddSources(&simple) - e.ItemRequestHandler(&sdp.ItemRequest{ + e.HandleItemRequest(&sdp.ItemRequest{ Type: "person", Method: sdp.RequestMethod_GET, Query: "Debby", @@ -385,7 +385,7 @@ func TestExpandRequest(t *testing.T) { e.AddSources(&many) - e.ItemRequestHandler(&sdp.ItemRequest{ + e.HandleItemRequest(&sdp.ItemRequest{ Type: "person", Method: sdp.RequestMethod_GET, Query: "Debby", @@ -420,7 +420,7 @@ func TestExpandRequest(t *testing.T) { e.AddSources(&sx) e.AddSources(&sy) - e.ItemRequestHandler(&sdp.ItemRequest{ + e.HandleItemRequest(&sdp.ItemRequest{ Type: "person", Method: sdp.RequestMethod_GET, Query: "Daniel", @@ -463,7 +463,7 @@ func TestExpandRequest(t *testing.T) { e.AddSources(&sx) e.AddSources(&sy) - e.ItemRequestHandler(&sdp.ItemRequest{ + e.HandleItemRequest(&sdp.ItemRequest{ Type: "person", Method: sdp.RequestMethod_GET, Query: "Steven", @@ -508,7 +508,7 @@ func TestExpandRequest(t *testing.T) { e.AddSources(&sx) e.AddSources(&sy) - e.ItemRequestHandler(&sdp.ItemRequest{ + e.HandleItemRequest(&sdp.ItemRequest{ Type: "person", Method: sdp.RequestMethod_GET, Query: "Jane", @@ -551,7 +551,7 @@ func TestExpandRequest(t *testing.T) { e.AddSources(&sx) e.AddSources(&sy) - e.ItemRequestHandler(&sdp.ItemRequest{ + e.HandleItemRequest(&sdp.ItemRequest{ Type: "person", Method: sdp.RequestMethod_FIND, Query: "Jane", @@ -582,7 +582,7 @@ func TestExpandRequest(t *testing.T) { e.AddSources(&sx) - e.ItemRequestHandler(&sdp.ItemRequest{ + e.HandleItemRequest(&sdp.ItemRequest{ Type: "person", Method: sdp.RequestMethod_FIND, Query: "Rachel", @@ -625,7 +625,7 @@ func TestExpandRequest(t *testing.T) { e.AddSources(&sy) e.AddSources(&sz) - e.ItemRequestHandler(&sdp.ItemRequest{ + e.HandleItemRequest(&sdp.ItemRequest{ Type: "person", Method: sdp.RequestMethod_FIND, Query: "Ross", @@ -677,7 +677,7 @@ func TestExpandRequest(t *testing.T) { e.AddSources(&sy) e.AddSources(&sz) - e.ItemRequestHandler(&sdp.ItemRequest{ + e.HandleItemRequest(&sdp.ItemRequest{ Type: "person", Method: sdp.RequestMethod_FIND, Query: "Ross",