Skip to content

Commit

Permalink
Standardise the way we handle errors (#395)
Browse files Browse the repository at this point in the history
* Standardise the way we handle errors

Also make sure that they all get captured in spans

* Update enginerequests.go

Co-authored-by: David Schmitt <[email protected]>

---------

Co-authored-by: David Schmitt <[email protected]>
  • Loading branch information
dylanratcliffe and DavidS-ovm authored Dec 9, 2024
1 parent 30060ca commit c5fdd27
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 63 deletions.
45 changes: 19 additions & 26 deletions enginerequests.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
log "github.com/sirupsen/logrus"
"github.com/sourcegraph/conc/pool"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"google.golang.org/protobuf/types/known/timestamppb"
)
Expand Down Expand Up @@ -386,6 +385,9 @@ func (e *Engine) Execute(ctx context.Context, q *sdp.Query, adapter Adapter, ite
return
}

// Record the error in the trace
span.RecordError(err, trace.WithStackTrace(true))

// Send the error back to the caller
numErrs.Add(1)
errs <- convertToSDPError(err, q, adapter, e.EngineConfig.SourceName)
Expand All @@ -408,71 +410,62 @@ func (e *Engine) Execute(ctx context.Context, q *sdp.Query, adapter Adapter, ite
return
}

var err error

switch q.GetMethod() {
case sdp.QueryMethod_GET:
var newItem *sdp.Item

newItem, err = adapter.Get(ctx, q.GetScope(), q.GetQuery(), q.GetIgnoreCache())
newItem, err := adapter.Get(ctx, q.GetScope(), q.GetQuery(), q.GetIgnoreCache())

if newItem != nil {
stream.SendItem(newItem)
}
if err != nil {
stream.SendError(err)
}
case sdp.QueryMethod_LIST:
if streamingAdapter, ok := adapter.(StreamingAdapter); ok {
// Prefer the streaming methods if they are available
streamingAdapter.ListStream(ctx, q.GetScope(), q.GetIgnoreCache(), stream)
} else if listableAdapter, ok := adapter.(ListableAdapter); ok {
var resultItems []*sdp.Item

// Fall back to the non-streaming methods
resultItems, err = listableAdapter.List(ctx, q.GetScope(), q.GetIgnoreCache())
resultItems, err := listableAdapter.List(ctx, q.GetScope(), q.GetIgnoreCache())

for _, i := range resultItems {
stream.SendItem(i)
}
if err != nil {
stream.SendError(err)
}
} else {
err = &sdp.QueryError{
stream.SendError(&sdp.QueryError{
ErrorType: sdp.QueryError_NOTFOUND,
ErrorString: "adapter is not listable",
}
})
}
case sdp.QueryMethod_SEARCH:
if streamingAdapter, ok := adapter.(StreamingAdapter); ok {
// Prefer the streaming methods if they are available
streamingAdapter.SearchStream(ctx, q.GetScope(), q.GetQuery(), q.GetIgnoreCache(), stream)
} else if searchableAdapter, ok := adapter.(SearchableAdapter); ok {
var resultItems []*sdp.Item

// Fall back to the non-streaming methods
resultItems, err = searchableAdapter.Search(ctx, q.GetScope(), q.GetQuery(), q.GetIgnoreCache())
resultItems, err := searchableAdapter.Search(ctx, q.GetScope(), q.GetQuery(), q.GetIgnoreCache())

for _, i := range resultItems {
stream.SendItem(i)
}
if err != nil {
stream.SendError(err)
}
} else {
err = &sdp.QueryError{
stream.SendError(&sdp.QueryError{
ErrorType: sdp.QueryError_NOTFOUND,
ErrorString: "adapter is not searchable",
}
})
}
}

span.SetAttributes(
attribute.Int("ovm.adapter.numItems", int(numItems.Load())),
attribute.Int("ovm.adapter.numErrors", int(numErrs.Load())),
attribute.Bool("ovm.adapter.cache", false),
)

if err != nil {
span.SetAttributes(attribute.String("ovm.adapter.error", err.Error()))
span.SetStatus(codes.Error, err.Error())
}

if err != nil {
errs <- convertToSDPError(err, q, adapter, e.EngineConfig.SourceName)
}
}

// Converts any error type to an SDP error, if it isn't already
Expand Down
77 changes: 40 additions & 37 deletions enginerequests_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/overmindtech/discovery/tracing"
"github.com/overmindtech/sdp-go"
"github.com/overmindtech/sdp-go/auth"
"github.com/sourcegraph/conc/pool"
"google.golang.org/protobuf/types/known/timestamppb"
)

Expand Down Expand Up @@ -321,8 +322,7 @@ func TestWildcardAdapterExpansion(t *testing.T) {
func TestSendQuerySync(t *testing.T) {
SkipWithoutNats(t)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
ctx := context.Background()

ctx, span := tracing.Tracer().Start(ctx, "TestSendQuerySync")
defer span.End()
Expand All @@ -336,49 +336,52 @@ func TestSendQuerySync(t *testing.T) {

e := newStartedEngine(t, "TestSendQuerySync", nil, &adapter)

p := pool.New()
for i := 0; i < 250; i++ {
u := uuid.New()
t.Log("starting query: ", u)

var progress *sdp.QueryProgress
var items []*sdp.Item

progress = sdp.NewQueryProgress(&sdp.Query{
Type: "person",
Method: sdp.QueryMethod_GET,
Query: "Dylan",
Scope: "test",
RecursionBehaviour: &sdp.Query_RecursionBehaviour{
LinkDepth: 0,
},
IgnoreCache: false,
UUID: u[:],
Deadline: timestamppb.New(time.Now().Add(10 * time.Minute)),
})
progress.StartTimeout = 10 * time.Millisecond

items, errs, err := progress.Execute(ctx, e.natsConnection)

if err != nil {
t.Fatal(err)
}

if len(errs) != 0 {
for _, err := range errs {
p.Go(func() {
u := uuid.New()
t.Log("starting query: ", u)

var progress *sdp.QueryProgress
var items []*sdp.Item

progress = sdp.NewQueryProgress(&sdp.Query{
Type: "person",
Method: sdp.QueryMethod_GET,
Query: "Dylan",
Scope: "test",
RecursionBehaviour: &sdp.Query_RecursionBehaviour{
LinkDepth: 0,
},
IgnoreCache: false,
UUID: u[:],
Deadline: timestamppb.New(time.Now().Add(10 * time.Minute)),
})
progress.StartTimeout = 1 * time.Second

items, errs, err := progress.Execute(ctx, e.natsConnection)

if err != nil {
t.Error(err)
}
}

if len(items) != 1 {
t.Fatalf("expected 1 item, got %v", len(items))
}
if len(errs) != 0 {
for _, err := range errs {
t.Error(err)
}
}

if progress.NumComplete() != 1 {
t.Fatalf("expected 1 to be complete, got %v\nProgress: %v", progress.NumComplete(), progress)
}
if len(items) != 1 {
t.Fatalf("expected 1 item, got %v", len(items))
}

if progress.NumComplete() != 1 {
t.Fatalf("expected 1 to be complete, got %v\nProgress: %v", progress.NumComplete(), progress)
}
})
}

p.Wait()
}

func TestExpandQuery(t *testing.T) {
Expand Down

0 comments on commit c5fdd27

Please sign in to comment.