Skip to content

Commit

Permalink
fix: resource and policy declaration fixes (#88)
Browse files Browse the repository at this point in the history
* fix: ensure resources can be declared separate to policies

* fix: ensure topics and subscriptions can be created in separate services
  • Loading branch information
davemooreuws authored Jul 8, 2024
1 parent 64afac8 commit cc86a67
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 35 deletions.
49 changes: 48 additions & 1 deletion nitric/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,24 @@ type Manager interface {
Run() error
addWorker(name string, s workers.Worker)
resourceServiceClient() (v1.ResourcesClient, error)
registerResource(request *v1.ResourceDeclareRequest, registerResourceChan chan RegisterResult)
registerPolicy(res *v1.ResourceIdentifier, actions ...v1.Action) (*manager, error)

newApi(name string, opts ...ApiOption) (Api, error)
newBucket(name string, permissions ...BucketPermission) (storage.Bucket, error)
newSecret(name string, permissions ...SecretPermission) (secrets.SecretRef, error)
newQueue(name string, permissions ...QueuePermission) (queues.Queue, error)
newSchedule(name string) Schedule
newTopic(name string, permissions ...TopicPermission) (Topic, error)
newWebsocket(socket string) (Websocket, error)
newKv(name string, permissions ...KvStorePermission) (keyvalue.Store, error)
newOidcSecurityDefinition(apiName string, options OidcOptions) (OidcSecurityDefinition, error)
}

type RegisterResult struct {
Identifier *v1.ResourceIdentifier
Err error
}

type manager struct {
workers map[string]workers.Worker
conn grpc.ClientConnInterface
Expand Down Expand Up @@ -104,6 +110,47 @@ func (m *manager) resourceServiceClient() (v1.ResourcesClient, error) {
return m.rsc, nil
}

func (m *manager) registerResource(request *v1.ResourceDeclareRequest, registerResourceChan chan RegisterResult) {
rsc, err := m.resourceServiceClient()
if err != nil {
registerResourceChan <- RegisterResult{
Err: err,
Identifier: nil,
}

return
}

_, err = rsc.Declare(context.Background(), request)
if err != nil {
registerResourceChan <- RegisterResult{
Err: err,
Identifier: nil,
}

return
}

registerResourceChan <- RegisterResult{
Err: nil,
Identifier: request.Id,
}
}

func (m *manager) registerPolicy(res *v1.ResourceIdentifier, actions ...v1.Action) (*manager, error) {
rsc, err := m.resourceServiceClient()
if err != nil {
return m, err
}

_, err = rsc.Declare(context.Background(), functionResourceDeclareRequest(res, actions))
if err != nil {
return m, err
}

return m, nil
}

// Run will run the function and callback the required handlers when these events are received.
func Run() error {
return defaultManager.Run()
Expand Down
61 changes: 27 additions & 34 deletions nitric/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package nitric

import (
"context"
"fmt"

"github.com/nitrictech/go-sdk/api/topics"
Expand Down Expand Up @@ -52,48 +51,37 @@ type topic struct {
}

type subscribableTopic struct {
name string
manager Manager
name string
manager Manager
registerChan chan RegisterResult
}

// NewTopic creates a new Topic with the give permissions.
func NewTopic(name string) SubscribableTopic {
return &subscribableTopic{
name: name,
manager: defaultManager,
}
}

func (t *subscribableTopic) Allow(permission TopicPermission, permissions ...TopicPermission) (Topic, error) {
allPerms := append([]TopicPermission{permission}, permissions...)

return defaultManager.newTopic(t.name, allPerms...)
}

func (m *manager) newTopic(name string, permissions ...TopicPermission) (Topic, error) {
rsc, err := m.resourceServiceClient()
if err != nil {
return nil, err
}

res := &v1.ResourceIdentifier{
Type: v1.ResourceType_Topic,
Name: name,
topic := &subscribableTopic{
name: name,
manager: defaultManager,
registerChan: make(chan RegisterResult),
}

dr := &v1.ResourceDeclareRequest{
Id: res,
go defaultManager.registerResource(&v1.ResourceDeclareRequest{
Id: &v1.ResourceIdentifier{
Type: v1.ResourceType_Topic,
Name: name,
},
Config: &v1.ResourceDeclareRequest_Topic{
Topic: &v1.TopicResource{},
},
}
_, err = rsc.Declare(context.Background(), dr)
if err != nil {
return nil, err
}
}, topic.registerChan)

return topic
}

func (t *subscribableTopic) Allow(permission TopicPermission, permissions ...TopicPermission) (Topic, error) {
allPerms := append([]TopicPermission{permission}, permissions...)

actions := []v1.Action{}
for _, perm := range permissions {
for _, perm := range allPerms {
switch perm {
case TopicPublish:
actions = append(actions, v1.Action_TopicPublish)
Expand All @@ -102,7 +90,12 @@ func (m *manager) newTopic(name string, permissions ...TopicPermission) (Topic,
}
}

_, err = rsc.Declare(context.Background(), functionResourceDeclareRequest(res, actions))
registerResult := <-t.registerChan
if registerResult.Err != nil {
return nil, registerResult.Err
}

m, err := t.manager.registerPolicy(registerResult.Identifier, actions...)
if err != nil {
return nil, err
}
Expand All @@ -116,7 +109,7 @@ func (m *manager) newTopic(name string, permissions ...TopicPermission) (Topic,
}

return &topic{
Topic: m.topics.Topic(name),
Topic: m.topics.Topic(t.name),
manager: m,
}, nil
}
Expand Down

0 comments on commit cc86a67

Please sign in to comment.