Skip to content

Commit

Permalink
Merge pull request #95 from overmindtech/undo-renaming
Browse files Browse the repository at this point in the history
Update for the big proto renaming
  • Loading branch information
DavidS-ovm authored Mar 6, 2023
2 parents c55efec + ec0978b commit da52d04
Show file tree
Hide file tree
Showing 19 changed files with 603 additions and 597 deletions.
12 changes: 6 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ This library is currently under development and documentation can be found on [p

## Engine

The engine is responsible for managing all communication over NATS, handling requests, reporting on progress, caching etc. Authors of sources should only need to do the following in order to have a functional source:
The engine is responsible for managing all communication over NATS, handling queries, reporting on progress, caching etc. Authors of sources should only need to do the following in order to have a functional source:

* Give the engine a name
* Note that this name is used as the `Responder` when responding to requests, this means that this name should be unique as if there are multiple responders with the same name, users will not be able to properly track the progress of their requests
* Note that this name is used as the `Responder` when responding to queries, this means that this name should be unique as if there are multiple responders with the same name, users will not be able to properly track the progress of their queries
* Provide the engine with config
* Manage the engine's lifecycle (start and stop it)

Expand All @@ -36,13 +36,13 @@ var trigger = Trigger{
// UniqueAttributeValueRegex match
UniqueAttributeValueRegex: regexp.MustCompile(`^[Dd]ylan$`),
// When both of the above match, the below function will be called, this
// function should return the request that should be forwarded to the
// function should return the query that should be forwarded to the
// engine that the trigger is registered with
RequestGenerator: func(in *sdp.Item) (*sdp.ItemRequest, error) {
QueryGenerator: func(in *sdp.Item) (*sdp.Query, error) {
if in.GetScope() != "something" {
return nil, errors.New("only 'something' scope supported")
} else {
return &sdp.ItemRequest{
return &sdp.Query{
Type: "dog",
Method: sdp.RequestMethod_SEARCH,
Query: "pug",
Expand All @@ -52,7 +52,7 @@ var trigger = Trigger{
}
```

When the above trigger fires it will result in the engine that it is assigned to processing a SEARCH request as defined above. Note that while only the `Type`, `Method` and `Query` attributes have been specified, the rest will be filled in automatically with data from the `Metadata.SourceRequest` of the originating item to ensure that the responses are sent to the user that originated the request.
When the above trigger fires it will result in the engine that it is assigned to processing a SEARCH query as defined above. Note that while only the `Type`, `Method` and `Query` attributes have been specified, the rest will be filled in automatically with data from the `Metadata.SourceQuery` of the originating item to ensure that the responses are sent to the user that originated the query.

## Default Sources

Expand Down
84 changes: 42 additions & 42 deletions engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,13 @@ type Engine struct {
// All Sources managed by this Engine
sh *SourceHost

// GetListMutex used for locking out Get requests when there's a List happening
// GetListMutex used for locking out Get querys when there's a List happening
gfm GetListMutex

// trackedRequests is used for storing requests that have a UUID so they can
// trackedQueries is used for storing querys that have a UUID so they can
// be cancelled if required
trackedRequests map[uuid.UUID]*RequestTracker
trackedRequestsMutex sync.RWMutex
trackedQueries map[uuid.UUID]*QueryTracker
trackedQueriesMutex sync.RWMutex

// Prevents the engine being restarted many times in parallel
restartMutex sync.Mutex
Expand All @@ -92,37 +92,37 @@ func NewEngine() (*Engine, error) {
ConnectionWatchInterval: DefaultConnectionWatchInterval,
cache: sdpcache.NewCache(),
sh: sh,
trackedRequests: make(map[uuid.UUID]*RequestTracker),
trackedQueries: make(map[uuid.UUID]*QueryTracker),
}, nil
}

// TrackRequest Stores a RequestTracker in the engine so that it can be looked
// TrackQuery Stores a QueryTracker in the engine so that it can be looked
// up later and cancelled if required. The UUID should be supplied as part of
// the request itself
func (e *Engine) TrackRequest(uuid uuid.UUID, request *RequestTracker) {
e.trackedRequestsMutex.Lock()
defer e.trackedRequestsMutex.Unlock()
e.trackedRequests[uuid] = request
// the query itself
func (e *Engine) TrackQuery(uuid uuid.UUID, qt *QueryTracker) {
e.trackedQueriesMutex.Lock()
defer e.trackedQueriesMutex.Unlock()
e.trackedQueries[uuid] = qt
}

// GetTrackedRequest Returns the RequestTracked object for a given UUID. This
// tracker can then be used to cancel the request
func (e *Engine) GetTrackedRequest(uuid uuid.UUID) (*RequestTracker, error) {
e.trackedRequestsMutex.RLock()
defer e.trackedRequestsMutex.RUnlock()
// GetTrackedQuery Returns the QueryTracker object for a given UUID. This
// tracker can then be used to cancel the query
func (e *Engine) GetTrackedQuery(uuid uuid.UUID) (*QueryTracker, error) {
e.trackedQueriesMutex.RLock()
defer e.trackedQueriesMutex.RUnlock()

if tracker, ok := e.trackedRequests[uuid]; ok {
return tracker, nil
if qt, ok := e.trackedQueries[uuid]; ok {
return qt, nil
} else {
return nil, fmt.Errorf("tracker with UUID %x not found", uuid)
}
}

// DeleteTrackedRequest Deletes a request from tracking
func (e *Engine) DeleteTrackedRequest(uuid [16]byte) {
e.trackedRequestsMutex.Lock()
defer e.trackedRequestsMutex.Unlock()
delete(e.trackedRequests, uuid)
// DeleteTrackedQuery Deletes a query from tracking
func (e *Engine) DeleteTrackedQuery(uuid [16]byte) {
e.trackedQueriesMutex.Lock()
defer e.trackedQueriesMutex.Unlock()
delete(e.trackedQueries, uuid)
}

// AddSources Adds a source to this engine
Expand Down Expand Up @@ -172,16 +172,16 @@ func (e *Engine) connect() error {
"URL:": e.natsConnection.Underlying().ConnectedUrl(),
}).Info("NATS connected")

err = e.subscribe("request.all", sdp.NewAsyncRawItemRequestHandler("ItemRequestHandler", func(ctx context.Context, _ *nats.Msg, i *sdp.ItemRequest) {
e.HandleItemRequest(ctx, i)
err = e.subscribe("request.all", sdp.NewAsyncRawQueryHandler("QueryHandler", func(ctx context.Context, _ *nats.Msg, i *sdp.Query) {
e.HandleQuery(ctx, i)
}))

if err != nil {
return err
}

err = e.subscribe("cancel.all", sdp.NewAsyncRawCancelItemRequestHandler("CancelHandler", func(ctx context.Context, m *nats.Msg, i *sdp.CancelItemRequest) {
e.HandleCancelItemRequest(ctx, i)
err = e.subscribe("cancel.all", sdp.NewAsyncRawCancelQueryHandler("CancelQueryHandler", func(ctx context.Context, m *nats.Msg, i *sdp.CancelQuery) {
e.HandleCancelQuery(ctx, i)
}))

if err != nil {
Expand Down Expand Up @@ -211,19 +211,19 @@ func (e *Engine) connect() error {

// Now actually create the required subscriptions
if wildcardExists {
e.subscribe("request.scope.>", sdp.NewAsyncRawItemRequestHandler("WildcardItemRequestHandler", func(ctx context.Context, m *nats.Msg, i *sdp.ItemRequest) {
e.HandleItemRequest(ctx, i)
e.subscribe("request.scope.>", sdp.NewAsyncRawQueryHandler("WildcardQueryHandler", func(ctx context.Context, m *nats.Msg, i *sdp.Query) {
e.HandleQuery(ctx, i)
}))
e.subscribe("cancel.scope.>", sdp.NewAsyncRawCancelItemRequestHandler("WildcardCancelHandler", func(ctx context.Context, m *nats.Msg, i *sdp.CancelItemRequest) {
e.HandleCancelItemRequest(ctx, i)
e.subscribe("cancel.scope.>", sdp.NewAsyncRawCancelQueryHandler("WildcardCancelQueryHandler", func(ctx context.Context, m *nats.Msg, i *sdp.CancelQuery) {
e.HandleCancelQuery(ctx, i)
}))
} else {
for suffix := range subscriptionMap {
e.subscribe(fmt.Sprintf("request.scope.%v", suffix), sdp.NewAsyncRawItemRequestHandler("WildcardItemRequestHandler", func(ctx context.Context, m *nats.Msg, i *sdp.ItemRequest) {
e.HandleItemRequest(ctx, i)
e.subscribe(fmt.Sprintf("request.scope.%v", suffix), sdp.NewAsyncRawQueryHandler("WildcardQueryHandler", func(ctx context.Context, m *nats.Msg, i *sdp.Query) {
e.HandleQuery(ctx, i)
}))
e.subscribe(fmt.Sprintf("cancel.scope.%v", suffix), sdp.NewAsyncRawCancelItemRequestHandler("WildcardCancelHandler", func(ctx context.Context, m *nats.Msg, i *sdp.CancelItemRequest) {
e.HandleCancelItemRequest(ctx, i)
e.subscribe(fmt.Sprintf("cancel.scope.%v", suffix), sdp.NewAsyncRawCancelQueryHandler("WildcardCancelQueryHandler", func(ctx context.Context, m *nats.Msg, i *sdp.CancelQuery) {
e.HandleCancelQuery(ctx, i)
}))
}
}
Expand Down Expand Up @@ -375,26 +375,26 @@ func (e *Engine) IsNATSConnected() bool {
return false
}

// HandleCancelItemRequest Takes a CancelItemRequest and cancels that request if it exists
func (e *Engine) HandleCancelItemRequest(ctx context.Context, cancelRequest *sdp.CancelItemRequest) {
u, err := uuid.FromBytes(cancelRequest.UUID)
// HandleCancelQuery Takes a CancelQuery and cancels that query if it exists
func (e *Engine) HandleCancelQuery(ctx context.Context, cancelQuery *sdp.CancelQuery) {
u, err := uuid.FromBytes(cancelQuery.UUID)

if err != nil {
log.Errorf("Error parsing UUID for cancel request: %v", err)
log.Errorf("Error parsing UUID for cancel query: %v", err)
return
}

rt, err := e.GetTrackedRequest(u)
rt, err := e.GetTrackedQuery(u)

if err != nil {
log.Debugf("Could not find tracked request %v. Possibly is has already finished", u.String())
log.Debugf("Could not find tracked query %v. Possibly is has already finished", u.String())
return
}

if rt != nil {
log.WithFields(log.Fields{
"UUID": u.String(),
}).Debug("Cancelling request")
}).Debug("Cancelling query")
rt.Cancel()
}
}
Expand Down
Loading

0 comments on commit da52d04

Please sign in to comment.