Skip to content

Commit

Permalink
Handle SDP's errorSubject (#48)
Browse files Browse the repository at this point in the history
This PR allows an engine to send all errors it encounters along an errorSubject. This means that no errors are hidden from the client.
  • Loading branch information
dylanratcliffe authored Jul 4, 2022
1 parent fd2962d commit ef2326c
Show file tree
Hide file tree
Showing 12 changed files with 765 additions and 569 deletions.
16 changes: 10 additions & 6 deletions engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func TestNats(t *testing.T) {
ItemSubject: NewItemSubject(),
})

_, err := req.Execute(e.natsConnection)
_, _, err := req.Execute(e.natsConnection)

if err != nil {
t.Error(err)
Expand All @@ -232,7 +232,7 @@ func TestNats(t *testing.T) {
ItemSubject: NewItemSubject(),
})

_, err := req.Execute(e.natsConnection)
_, _, err := req.Execute(e.natsConnection)

if err != nil {
t.Error(err)
Expand Down Expand Up @@ -302,9 +302,10 @@ func TestNatsCancel(t *testing.T) {
UUID: u[:],
})

items := make(chan *sdp.Item)
items := make(chan *sdp.Item, 1000)
errs := make(chan *sdp.ItemRequestError, 1000)

err := progress.Start(conn, items)
err := progress.Start(conn, items, errs)

if err != nil {
t.Error(err)
Expand All @@ -316,8 +317,11 @@ func TestNatsCancel(t *testing.T) {
UUID: u[:],
})

// Read and discard all items and errors until they are closed
for range items {
}
for range errs {
}

if progress.NumCancelled() != 1 {
t.Errorf("Expected query to be cancelled, got\n%v", progress.String())
Expand Down Expand Up @@ -587,7 +591,7 @@ func TestNatsAuth(t *testing.T) {
e.ClearCache()
})

_, err := sdp.NewRequestProgress(&sdp.ItemRequest{
_, _, err := sdp.NewRequestProgress(&sdp.ItemRequest{
Type: "person",
Method: sdp.RequestMethod_GET,
Query: "basic",
Expand All @@ -612,7 +616,7 @@ func TestNatsAuth(t *testing.T) {
e.ClearCache()
})

_, err := sdp.NewRequestProgress(&sdp.ItemRequest{
_, _, err := sdp.NewRequestProgress(&sdp.ItemRequest{
Type: "person",
Method: sdp.RequestMethod_GET,
Query: "deeplink",
Expand Down
22 changes: 10 additions & 12 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/nats-io/nats-server/v2 v2.8.4
github.com/nats-io/nats.go v1.16.0
github.com/overmindtech/multiconn v0.3.3
github.com/overmindtech/sdp-go v0.10.4
github.com/overmindtech/sdp-go v0.11.0
github.com/overmindtech/sdpcache v0.3.2
github.com/sirupsen/logrus v1.8.1
google.golang.org/protobuf v1.28.0
Expand All @@ -20,23 +20,21 @@ require (
github.com/dgraph-io/dgo/v210 v210.0.0-20220113041351-ba0e5dfc4c3e // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/go-cmp v0.5.8 // indirect
github.com/klauspost/compress v1.15.6 // indirect
github.com/klauspost/compress v1.15.7 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.2.1-0.20220330180145-442af02fd36a // indirect
github.com/nats-io/jwt/v2 v2.3.0 // indirect
github.com/nats-io/nkeys v0.3.0 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/overmindtech/tokenx-client v0.1.2 // indirect
github.com/overmindtech/tokenx-client v0.2.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/stretchr/testify v1.7.1 // indirect
golang.org/x/crypto v0.0.0-20220525230936-793ad666bf5e // indirect
golang.org/x/net v0.0.0-20220607020251-c690dde0001d // indirect
golang.org/x/oauth2 v0.0.0-20220608161450-d0670ef3b1eb // indirect
golang.org/x/sys v0.0.0-20220610221304-9f5ed59c137d // indirect
github.com/stretchr/testify v1.8.0 // indirect
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d // indirect
golang.org/x/net v0.0.0-20220630215102-69896b714898 // indirect
golang.org/x/oauth2 v0.0.0-20220630143837-2104d58473e0 // indirect
golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20220609170525-579cf78fd858 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20220608133413-ed9918b62aac // indirect
google.golang.org/genproto v0.0.0-20220630174209-ad1d48641aa7 // indirect
google.golang.org/grpc v1.47.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
252 changes: 240 additions & 12 deletions go.sum

Large diffs are not rendered by default.

10 changes: 4 additions & 6 deletions performance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ type TimedResults struct {
MaxTime time.Duration
TimeTaken time.Duration
Results []*sdp.Item
Errors []error
Errors []*sdp.ItemRequestError
}

func TimeRequests(numRequests int, linkDepth int, numParallel int) TimedResults {
Expand All @@ -146,7 +146,7 @@ func TimeRequests(numRequests int, linkDepth int, numParallel int) TimedResults
}

results := make([]*sdp.Item, 0)
errors := make([]error, 0)
errors := make([]*sdp.ItemRequestError, 0)
resultsMutex := sync.Mutex{}
wg := sync.WaitGroup{}

Expand All @@ -169,13 +169,11 @@ func TimeRequests(numRequests int, linkDepth int, numParallel int) TimedResults
go func(rt *RequestTracker) {
defer wg.Done()

items, err := rt.Execute()
items, errs, _ := rt.Execute()

resultsMutex.Lock()
results = append(results, items...)
if err != nil {
errors = append(errors, err)
}
errors = append(errors, errs...)
resultsMutex.Unlock()
}(&rt)
}
Expand Down
141 changes: 105 additions & 36 deletions request_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,33 +176,59 @@ func (r *RequestTracker) linkItem(ctx context.Context, parent *sdp.Item) {
return
}

linkedItems, err := r.Engine.ExecuteRequest(ctx, req)

if err == nil {
itemMutex.Lock()

for _, li := range linkedItems {
// Add the new items back into the queue to be linked
// further if required
r.queueUnlinkedItem(li)

// Create a reference to the newly found item and attach to
// the parent item
ref := li.Reference()
p.LinkedItems = append(p.LinkedItems, &ref)
items := make(chan *sdp.Item)
errs := make(chan *sdp.ItemRequestError)
requestErr := make(chan error)
var shouldRemove bool

go func(e chan error) {
e <- r.Engine.ExecuteRequest(ctx, req, items, errs)
}(requestErr)

for {
select {
case li, ok := <-items:
if ok {
itemMutex.Lock()

// Add the new items back into the queue to be linked
// further if required
r.queueUnlinkedItem(li)

// Create a reference to the newly found item and attach
// to the parent item
ref := li.Reference()
p.LinkedItems = append(p.LinkedItems, &ref)

itemMutex.Unlock()
} else {
items = nil
}
case err, ok := <-errs:
if ok {
if err.ErrorType == sdp.ItemRequestError_NOTFOUND {
// If we looked and didn't find the item then
// there is no point keeping the request around.
// Mark as true so that it's removed from the
// item
shouldRemove = true
}
} else {
errs = nil
}
}

p.LinkedItemRequests = deleteItemRequest(p.LinkedItemRequests, req)

itemMutex.Unlock()
} else {
if sdpErr, ok := err.(*sdp.ItemRequestError); ok {
if sdpErr.ErrorType == sdp.ItemRequestError_NOCONTEXT || sdpErr.ErrorType == sdp.ItemRequestError_TIMEOUT {
// If there was no context or it timed out, leave it for the remote linker
return
}
if items == nil && errs == nil {
break
}
}

err := <-requestErr

if err == nil || shouldRemove {
// Delete the item request if we were able to resolve it locally
// OR it failed to resolve but because we looked and we know it
// doesn't exist
itemMutex.Lock()
p.LinkedItemRequests = deleteItemRequest(p.LinkedItemRequests, req)
itemMutex.Unlock()
Expand All @@ -220,19 +246,28 @@ func (r *RequestTracker) stopLinking() {
close(r.unlinkedItems)
}

func (r *RequestTracker) Execute() ([]*sdp.Item, error) {
// Execute Executes a given item request and publishes results and errors on the
// relevant nats subjects. Returns the full list of items, errors, and a final
// error. The final error will be populated if all sources failed, or some other
// error was encountered while trying run the request
func (r *RequestTracker) Execute() ([]*sdp.Item, []*sdp.ItemRequestError, error) {
if r.unlinkedItems == nil {
r.unlinkedItems = make(chan *sdp.Item)
}

if r.Request == nil {
return nil, nil
return nil, nil, nil
}

if r.Engine == nil {
return nil, errors.New("no engine supplied, cannot execute")
return nil, nil, errors.New("no engine supplied, cannot execute")
}

items := make(chan *sdp.Item)
errs := make(chan *sdp.ItemRequestError)
errChan := make(chan error)
sdpErrs := make([]*sdp.ItemRequestError, 0)

// Create context to enforce timeouts
ctx, cancel := r.Request.TimeoutContext()
r.cancelFuncMutex.Lock()
Expand All @@ -243,25 +278,59 @@ func (r *RequestTracker) Execute() ([]*sdp.Item, error) {
r.startLinking(ctx)

// Run the request
items, err := r.Engine.ExecuteRequest(ctx, r.Request)

// If it worked, put the items in the unlinked queue for linking
if err == nil {
for _, item := range items {
// Add to the queue. This will be picked up by other
// goroutine, linked and published once done
r.queueUnlinkedItem(item)
go func() {
errChan <- r.Engine.ExecuteRequest(ctx, r.Request, items, errs)
}()

// Process the items and errors as they come in
for {
select {
case item, ok := <-items:
if ok {
// Add to the queue. This will be picked up by other goroutine,
// linked and published to NATS once done
r.queueUnlinkedItem(item)
} else {
items = nil
}
case err, ok := <-errs:
if ok {
sdpErrs = append(sdpErrs, err)

if r.Request.ErrorSubject != "" && r.Engine.natsConnection != nil {
pubErr := r.Engine.natsConnection.Publish(r.Request.ErrorSubject, err)

if pubErr != nil {
// TODO: I probably shouldn't be logging directly here but I
// don't want the error to be lost
log.WithFields(log.Fields{
"error": err,
}).Error("Error publishing item request error")
}
}
} else {
errs = nil
}
}

if items == nil && errs == nil {
// If both channels have been closed and set to nil, we're done so
// break
break
}
}

// Wait for all of the initial requests to be done processing
r.stopLinking()

// Get the result of the execution
err := <-errChan

if err != nil {
return r.LinkedItems(), err
return r.LinkedItems(), sdpErrs, err
}

return r.LinkedItems(), ctx.Err()
return r.LinkedItems(), sdpErrs, ctx.Err()
}

// Cancel Cancells the currently running request
Expand Down
Loading

0 comments on commit ef2326c

Please sign in to comment.