-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathquerytracker.go
115 lines (96 loc) · 3.06 KB
/
querytracker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package discovery
import (
"context"
"errors"
"github.com/overmindtech/sdp-go"
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/trace"
)
// QueryTracker is used for tracking the progress of a single query. This
// is used because a single query could have a link depth that results in many
// additional queries being executed meaning that we need to not only track the first
// query, but also all other queries and items that result from linking
type QueryTracker struct {
// The query to track
Query *sdp.Query
Context context.Context // The context that this query is running in
Cancel context.CancelFunc // The cancel function for the context
// The engine that this is connected to, used for sending NATS messages
Engine *Engine
}
// Execute Executes a given item query and publishes results and errors on the
// relevant nats subjects. Returns the full list of items, errors, and a final
// error. The final error will be populated if all adapters failed, or some other
// error was encountered while trying run the query
//
// If the context is cancelled, all query work will stop
func (qt *QueryTracker) Execute(ctx context.Context) ([]*sdp.Item, []*sdp.QueryError, error) {
if qt.Query == nil {
return nil, nil, nil
}
if qt.Engine == nil {
return nil, nil, errors.New("no engine supplied, cannot execute")
}
span := trace.SpanFromContext(ctx)
items := make(chan *sdp.Item)
errs := make(chan *sdp.QueryError)
errChan := make(chan error)
sdpErrs := make([]*sdp.QueryError, 0)
sdpItems := make([]*sdp.Item, 0)
// Run the query
go func(e chan error) {
defer LogRecoverToReturn(ctx, "Execute -> ExecuteQuery")
e <- qt.Engine.ExecuteQuery(ctx, qt.Query, items, errs)
}(errChan)
// Process the items and errors as they come in
for {
select {
case item, ok := <-items:
if ok {
sdpItems = append(sdpItems, item)
if qt.Query.Subject() != "" && qt.Engine.natsConnection != nil {
// Respond with the Item
err := qt.Engine.natsConnection.Publish(ctx, qt.Query.Subject(), &sdp.QueryResponse{
ResponseType: &sdp.QueryResponse_NewItem{
NewItem: item,
},
})
if err != nil {
span.RecordError(err)
log.WithFields(log.Fields{
"error": err,
}).Error("Response publishing error")
}
}
} else {
items = nil
}
case err, ok := <-errs:
if ok {
sdpErrs = append(sdpErrs, err)
if qt.Query.Subject() != "" && qt.Engine.natsConnection != nil {
pubErr := qt.Engine.natsConnection.Publish(ctx, qt.Query.Subject(), &sdp.QueryResponse{ResponseType: &sdp.QueryResponse_Error{Error: err}})
if pubErr != nil {
span.RecordError(err)
log.WithFields(log.Fields{
"error": err,
}).Error("Error publishing item query error")
}
}
} else {
errs = nil
}
}
if items == nil && errs == nil {
// If both channels have been closed and set to nil, we're done so
// break
break
}
}
// Get the result of the execution
err := <-errChan
if err != nil {
return sdpItems, sdpErrs, err
}
return sdpItems, sdpErrs, ctx.Err()
}