Skip to content

Commit

Permalink
Async subscriptions (#43)
Browse files Browse the repository at this point in the history
NATS Subscriptions don't call handlers in goroutines by default, so we need to change this to spawn goroutines for handlers
  • Loading branch information
dylanratcliffe authored May 13, 2022
1 parent 75250eb commit eb9a803
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 20 deletions.
15 changes: 10 additions & 5 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ func (e *Engine) connect() error {
return err
}

err = e.subscribe("cancel.all", e.CancelItemRequestHandler)
err = e.subscribe("cancel.all", e.CancelHandler)

if err != nil {
return err
Expand Down Expand Up @@ -423,11 +423,11 @@ func (e *Engine) connect() error {
// Now actually create the required subscriptions
if wildcardExists {
e.subscribe("request.context.>", e.ItemRequestHandler)
e.subscribe("cancel.context.>", e.CancelItemRequestHandler)
e.subscribe("cancel.context.>", e.CancelHandler)
} else {
for suffix := range subscriptionMap {
e.subscribe(fmt.Sprintf("request.context.%v", suffix), e.ItemRequestHandler)
e.subscribe(fmt.Sprintf("cancel.context.%v", suffix), e.CancelItemRequestHandler)
e.subscribe(fmt.Sprintf("cancel.context.%v", suffix), e.CancelHandler)
}
}

Expand Down Expand Up @@ -566,8 +566,13 @@ func (e *Engine) IsNATSConnected() bool {
return false
}

// CancelItemRequestHandler Takes a CancelItemRequest and cancels that request if it exists
func (e *Engine) CancelItemRequestHandler(cancelRequest *sdp.CancelItemRequest) {
// CancelHandler calls HandleCancelItemRequest in a goroutine
func (e *Engine) CancelHandler(cancelRequest *sdp.CancelItemRequest) {
go e.HandleCancelItemRequest(cancelRequest)
}

// HandleCancelItemRequest Takes a CancelItemRequest and cancels that request if it exists
func (e *Engine) HandleCancelItemRequest(cancelRequest *sdp.CancelItemRequest) {
u, err := uuid.FromBytes(cancelRequest.UUID)

if err != nil {
Expand Down
10 changes: 8 additions & 2 deletions requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,15 @@ func NewResponseSubject() string {
return fmt.Sprintf("return.response.%v", nats.NewInbox())
}

// NewItemRequestHandler Returns a function whose job is to handle a single
// request. This includes responses, linking etc.
// ItemRequestHandler Calls HandleItemRequest but in a goroutine so that it can
// happen in parallel
func (e *Engine) ItemRequestHandler(itemRequest *sdp.ItemRequest) {
go e.HandleItemRequest(itemRequest)
}

// HandleItemRequest Handles a single request. This includes responses, linking
// etc.
func (e *Engine) HandleItemRequest(itemRequest *sdp.ItemRequest) {
if !e.WillRespond(itemRequest) {
// If we don't have any relevant sources, exit
return
Expand Down
26 changes: 13 additions & 13 deletions requests_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func TestExecuteRequest(t *testing.T) {

}

func TestNewItemRequestHandler(t *testing.T) {
func TestHandleItemRequest(t *testing.T) {
e := Engine{
Name: "test",
}
Expand Down Expand Up @@ -188,7 +188,7 @@ func TestNewItemRequestHandler(t *testing.T) {
}

// Run the handler
e.ItemRequestHandler(&req)
e.HandleItemRequest(&req)

// I'm expecting both sources to get a request since the type was *
if l := len(personSource.GetCalls); l != 1 {
Expand All @@ -215,7 +215,7 @@ func TestNewItemRequestHandler(t *testing.T) {
}

// Run the handler
e.ItemRequestHandler(&req)
e.HandleItemRequest(&req)

if l := len(personSource.GetCalls); l != 2 {
t.Errorf("expected person backend to have 2 Get calls, got %v", l)
Expand Down Expand Up @@ -252,7 +252,7 @@ func TestWildcardSourceExpansion(t *testing.T) {
}

// Run the handler
e.ItemRequestHandler(&req)
e.HandleItemRequest(&req)

if len(personSource.GetCalls) != 1 {
t.Errorf("expected 1 get call got %v", len(personSource.GetCalls))
Expand Down Expand Up @@ -356,7 +356,7 @@ func TestExpandRequest(t *testing.T) {

e.AddSources(&simple)

e.ItemRequestHandler(&sdp.ItemRequest{
e.HandleItemRequest(&sdp.ItemRequest{
Type: "person",
Method: sdp.RequestMethod_GET,
Query: "Debby",
Expand Down Expand Up @@ -385,7 +385,7 @@ func TestExpandRequest(t *testing.T) {

e.AddSources(&many)

e.ItemRequestHandler(&sdp.ItemRequest{
e.HandleItemRequest(&sdp.ItemRequest{
Type: "person",
Method: sdp.RequestMethod_GET,
Query: "Debby",
Expand Down Expand Up @@ -420,7 +420,7 @@ func TestExpandRequest(t *testing.T) {
e.AddSources(&sx)
e.AddSources(&sy)

e.ItemRequestHandler(&sdp.ItemRequest{
e.HandleItemRequest(&sdp.ItemRequest{
Type: "person",
Method: sdp.RequestMethod_GET,
Query: "Daniel",
Expand Down Expand Up @@ -463,7 +463,7 @@ func TestExpandRequest(t *testing.T) {
e.AddSources(&sx)
e.AddSources(&sy)

e.ItemRequestHandler(&sdp.ItemRequest{
e.HandleItemRequest(&sdp.ItemRequest{
Type: "person",
Method: sdp.RequestMethod_GET,
Query: "Steven",
Expand Down Expand Up @@ -508,7 +508,7 @@ func TestExpandRequest(t *testing.T) {
e.AddSources(&sx)
e.AddSources(&sy)

e.ItemRequestHandler(&sdp.ItemRequest{
e.HandleItemRequest(&sdp.ItemRequest{
Type: "person",
Method: sdp.RequestMethod_GET,
Query: "Jane",
Expand Down Expand Up @@ -551,7 +551,7 @@ func TestExpandRequest(t *testing.T) {
e.AddSources(&sx)
e.AddSources(&sy)

e.ItemRequestHandler(&sdp.ItemRequest{
e.HandleItemRequest(&sdp.ItemRequest{
Type: "person",
Method: sdp.RequestMethod_FIND,
Query: "Jane",
Expand Down Expand Up @@ -582,7 +582,7 @@ func TestExpandRequest(t *testing.T) {

e.AddSources(&sx)

e.ItemRequestHandler(&sdp.ItemRequest{
e.HandleItemRequest(&sdp.ItemRequest{
Type: "person",
Method: sdp.RequestMethod_FIND,
Query: "Rachel",
Expand Down Expand Up @@ -625,7 +625,7 @@ func TestExpandRequest(t *testing.T) {
e.AddSources(&sy)
e.AddSources(&sz)

e.ItemRequestHandler(&sdp.ItemRequest{
e.HandleItemRequest(&sdp.ItemRequest{
Type: "person",
Method: sdp.RequestMethod_FIND,
Query: "Ross",
Expand Down Expand Up @@ -677,7 +677,7 @@ func TestExpandRequest(t *testing.T) {
e.AddSources(&sy)
e.AddSources(&sz)

e.ItemRequestHandler(&sdp.ItemRequest{
e.HandleItemRequest(&sdp.ItemRequest{
Type: "person",
Method: sdp.RequestMethod_FIND,
Query: "Ross",
Expand Down

0 comments on commit eb9a803

Please sign in to comment.