Skip to content

Commit

Permalink
rewrite events handling to remove watermill (#130)
Browse files Browse the repository at this point in the history
As discussed in the community call, watermill doesn't give us the
necessary features we'd like to utilize with the underlying nats
message. We decided to switch to using nats directly but still wanted
some support for possibly changing this out later.

This rewrites events to use our own interfaces to allow for the
possibility of a different event driver later.

Additionally this switches to using pull subscriptions instead of push,
supports Ack, Nak and Term as well as Request/Reply semantics.

Due to the Request/Reply semantics, no longer are there separate
Publisher and Subscriber configurations as the driver needs to be able
to handle both.

---------

Signed-off-by: Mike Mason <[email protected]>
  • Loading branch information
mikemrm authored Aug 4, 2023
1 parent 49f17be commit 1fb9cfe
Show file tree
Hide file tree
Showing 22 changed files with 1,584 additions and 688 deletions.
8 changes: 4 additions & 4 deletions entx/annotation.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ var EventsHookAnnotationName = "INFRA9_EVENTHOOKS"

// EventsHookAnnotation provides a ent.Annotation spec. These shouldn't be set directly, you should use EventsHookAdditionalSubject() and EventsHookSubjectName instead
type EventsHookAnnotation struct {
SubjectName string
IsAdditionalSubjectField bool
SubjectName string
AdditionalSubjectRelation string
}

// Name implements the ent Annotation interface.
Expand All @@ -29,9 +29,9 @@ func (a EventsHookAnnotation) Name() string {
}

// EventsHookAdditionalSubject marks this field as a field to return as an additional subject
func EventsHookAdditionalSubject() *EventsHookAnnotation {
func EventsHookAdditionalSubject(relation string) *EventsHookAnnotation {
return &EventsHookAnnotation{
IsAdditionalSubjectField: true,
AdditionalSubjectRelation: relation,
}
}

Expand Down
63 changes: 49 additions & 14 deletions entx/template/event_hooks.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

{{ $genPackage := base $.Config.Package }}

import "go.infratographer.com/permissions-api/pkg/permissions"

{{- range $node := $.Nodes }}
{{- if $nodeAnnotation := $node.Annotations.INFRA9_EVENTHOOKS }}
{{- if ne $nodeAnnotation.SubjectName "" }}
Expand All @@ -17,6 +19,7 @@
return hook.{{ $node.Name }}Func(func(ctx context.Context, m *generated.{{ $node.Name }}Mutation) (ent.Value, error) {
var err error
additionalSubjects := []gidx.PrefixedID{}
relationships := []events.AuthRelationshipRelation{}

objID, ok := m.{{ $node.ID.MutationGet }}()
if !ok {
Expand All @@ -40,7 +43,7 @@
{{ $currentValue }} := ""
{{ $f.Name }}, ok := m.{{ $f.MutationGet }}()
{{- $annotation := $f.Annotations.INFRA9_EVENTHOOKS }}
{{- if $annotation.IsAdditionalSubjectField }}
{{- if $annotation.AdditionalSubjectRelation }}
if !ok && !m.Op().Is(ent.OpCreate) {
// since we are doing an update or delete and these fields didn't change, load the "old" value
{{ $f.Name }}, err = m.{{ $f.MutationGetOld }}(ctx)
Expand All @@ -51,9 +54,19 @@
{{- if $f.Optional }}
if {{ $f.Name }} != gidx.NullPrefixedID {
additionalSubjects = append(additionalSubjects, {{ $f.Name }})

relationships = append(relationships, events.AuthRelationshipRelation{
Relation: "{{ $annotation.AdditionalSubjectRelation }}",
SubjectID: {{ $f.Name }},
})
}
{{- else }}
additionalSubjects = append(additionalSubjects, {{ $f.Name }})

relationships = append(relationships, events.AuthRelationshipRelation{
Relation: "{{ $annotation.AdditionalSubjectRelation }}",
SubjectID: {{ $f.Name }},
})
{{- end }}
{{ end }}

Expand Down Expand Up @@ -99,13 +112,19 @@
}
{{ end }}
{{ end }}

if len(relationships) != 0 {
if err := permissions.CreateAuthRelationships(ctx, "{{ $nodeAnnotation.SubjectName }}", objID, relationships...); err != nil {
return nil, fmt.Errorf("relationship request failed with error: %w", err)
}
}

msg := events.ChangeMessage{
EventType: eventType(m.Op()),
SubjectID: objID,
AdditionalSubjectIDs: additionalSubjects,
Timestamp: time.Now().UTC(),
FieldChanges: changeset,
EventType: eventType(m.Op()),
SubjectID: objID,
AdditionalSubjectIDs: additionalSubjects,
Timestamp: time.Now().UTC(),
FieldChanges: changeset,
}

// complete the mutation before we process the event
Expand All @@ -114,7 +133,7 @@
return retValue, err
}

if err := m.EventsPublisher.PublishChange(ctx, "{{ $nodeAnnotation.SubjectName }}", msg); err != nil {
if _, err := m.EventsPublisher.PublishChange(ctx, "{{ $nodeAnnotation.SubjectName }}", msg); err != nil {
return nil, fmt.Errorf("failed to publish change: %w", err)
}

Expand All @@ -128,6 +147,7 @@
func(next ent.Mutator) ent.Mutator {
return hook.{{ $node.Name }}Func(func(ctx context.Context, m *generated.{{ $node.Name }}Mutation) (ent.Value, error) {
additionalSubjects := []gidx.PrefixedID{}
relationships := []events.AuthRelationshipRelation{}

objID, ok := m.{{ $node.ID.MutationGet }}()
if !ok {
Expand All @@ -142,33 +162,48 @@
{{- range $f := $node.Fields }}
{{- if not $f.Sensitive }}
{{- $annotation := $f.Annotations.INFRA9_EVENTHOOKS }}
{{- if $annotation.IsAdditionalSubjectField }}
{{- if $annotation.AdditionalSubjectRelation }}
{{- if $f.Optional }}
if dbObj.{{ $f.MutationGet }} != gidx.NullPrefixedID {
additionalSubjects = append(additionalSubjects, dbObj.{{ $f.MutationGet }})

relationships = append(relationships, events.AuthRelationshipRelation{
Relation: "{{ $annotation.AdditionalSubjectRelation }}",
SubjectID: dbObj.{{ $f.MutationGet }},
})
}
{{- else }}
additionalSubjects = append(additionalSubjects, dbObj.{{ $f.MutationGet }})

relationships = append(relationships, events.AuthRelationshipRelation{
Relation: "{{ $annotation.AdditionalSubjectRelation }}",
SubjectID: dbObj.{{ $f.MutationGet }},
})
{{- end }}
{{ end }}
{{ end }}
{{ end }}

if len(relationships) != 0 {
if err := permissions.DeleteAuthRelationships(ctx, "{{ $nodeAnnotation.SubjectName }}", objID, relationships...); err != nil {
return nil, fmt.Errorf("relationship request failed with error: %w", err)
}
}

// we have all the info we need, now complete the mutation before we process the event
retValue, err := next.Mutate(ctx, m)
if err != nil {
return retValue, err
}

msg := events.ChangeMessage{
EventType: eventType(m.Op()),
SubjectID: objID,
AdditionalSubjectIDs: additionalSubjects,
Timestamp: time.Now().UTC(),
EventType: eventType(m.Op()),
SubjectID: objID,
AdditionalSubjectIDs: additionalSubjects,
Timestamp: time.Now().UTC(),
}


if err := m.EventsPublisher.PublishChange(ctx, "{{ $nodeAnnotation.SubjectName }}", msg); err != nil {
if _, err := m.EventsPublisher.PublishChange(ctx, "{{ $nodeAnnotation.SubjectName }}", msg); err != nil {
return nil, fmt.Errorf("failed to publish change: %w", err)
}

Expand Down
74 changes: 29 additions & 45 deletions events/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,62 +19,46 @@ import (

"github.com/spf13/pflag"
"github.com/spf13/viper"

"go.infratographer.com/x/viperx"
"go.uber.org/multierr"
"go.uber.org/zap"
)

var defaultTimeout = time.Second * 10

// PublisherConfig handles reading in all the config values available for setting up a pubsub publisher
type PublisherConfig struct {
URL string `mapstructure:"url"`
Timeout time.Duration `mapstructure:"timeout"`
Prefix string `mapstructure:"prefix"`
Source string `mapstructure:"source"`
NATSConfig NATSConfig `mapstructure:"nats"`
}
const (
defaultTimeout = time.Second * 10
tracerName = "go.infratographer.com/x/events"
)

// SubscriberConfig handles reading in all the config values available for setting up a pubsub publisher
type SubscriberConfig struct {
URL string `mapstructure:"url"`
Timeout time.Duration `mapstructure:"timeout"`
Prefix string `mapstructure:"prefix"`
QueueGroup string `mapstructure:"queueGroup"`
NATSConfig NATSConfig `mapstructure:"nats"`
// Config contains event provider configs.
type Config struct {
NATS NATSConfig `mapstructure:"nats"`
}

// NATSConfig handles reading in all pubsub values specific to NATS
type NATSConfig struct {
Token string `mapstructure:"token"`
CredsFile string `mapstructure:"credsFile"`
// MustViperFlags returns the cobra flags and viper config for events.
func MustViperFlags(v *viper.Viper, flags *pflag.FlagSet, appName string) {
MustViperFlagsForNATS(v, flags, appName)
}

// MustViperFlagsForPublisher returns the cobra flags and viper config for an event publisher
func MustViperFlagsForPublisher(v *viper.Viper, flags *pflag.FlagSet, appName string) {
flags.String("events-publisher-url", "nats://nats:4222", "nats server connection url")
viperx.MustBindFlag(v, "events.publisher.url", flags.Lookup("events-publisher-url"))
// Option configures a connection option.
type Option func(config *Config) error

v.MustBindEnv("events.publisher.timeout")
v.MustBindEnv("events.publisher.prefix")
v.MustBindEnv("events.publisher.source")
v.MustBindEnv("events.publisher.nats.token")
v.MustBindEnv("events.publisher.nats.credsFile")
// WithLogger sets the logger for the connection.
func WithLogger(logger *zap.SugaredLogger) Option {
return func(config *Config) error {
config.NATS.logger = logger

v.SetDefault("events.publisher.timeout", defaultTimeout)
v.SetDefault("events.publisher.source", appName)
return nil
}
}

// MustViperFlagsForSubscriber returns the cobra flags and viper config for an event subscriber
func MustViperFlagsForSubscriber(v *viper.Viper, flags *pflag.FlagSet) {
flags.String("events-subscriber-url", "nats://nats:4222", "nats server connection url")
viperx.MustBindFlag(v, "events.subscriber.url", flags.Lookup("events-subscriber-url"))
flags.String("events-subscriber-queuegroup", "", "subscriber queue group")
viperx.MustBindFlag(v, "events.subscriber.queueGroup", flags.Lookup("events-subscriber-queuegroup"))
// WithNATSOptions configures nats options.
func WithNATSOptions(options ...NATSOption) Option {
return func(config *Config) error {
var err error

v.MustBindEnv("events.subscriber.timeout")
v.MustBindEnv("events.subscriber.prefix")
v.MustBindEnv("events.subscriber.nats.token")
v.MustBindEnv("events.subscriber.nats.credsFile")
for _, opt := range options {
err = multierr.Append(err, opt(&config.NATS))
}

v.SetDefault("events.subscriber.timeout", defaultTimeout)
return err
}
}
69 changes: 69 additions & 0 deletions events/connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package events

import (
"context"

"go.uber.org/multierr"
)

// Connection defines a connection handler.
type Connection interface {
// Gracefully close the connection.
Shutdown(ctx context.Context) error

// Source gives you the raw underlying connection object.
Source() any

Subscriber
Publisher

AuthRelationshipSubscriber
AuthRelationshipPublisher
}

// Subscriber specifies subscriber methods.
type Subscriber interface {
// SubscribeChanges subscribes to the provided topic responding with an ChangeMessage message.
SubscribeChanges(ctx context.Context, topic string) (<-chan Message[ChangeMessage], error)
// SubscribeEvents subscribes to the provided topic responding with an EventMessage message.
SubscribeEvents(ctx context.Context, topic string) (<-chan Message[EventMessage], error)
}

// Publisher specifies publisher methods.
type Publisher interface {
// PublishChange publishes to the specified topic with the message given.
PublishChange(ctx context.Context, topic string, message ChangeMessage) (Message[ChangeMessage], error)
// PublishEvent publishes to the specified topic with the message given.
PublishEvent(ctx context.Context, topic string, message EventMessage) (Message[EventMessage], error)
}

// AuthRelationshipSubscriber specifies the auth relationship subscriber methods.
type AuthRelationshipSubscriber interface {
// SubscribeAuthRelationshipRequests subscribes to the provided topic responding with an AuthRelationshipRequest message.
SubscribeAuthRelationshipRequests(ctx context.Context, topic string) (<-chan Request[AuthRelationshipRequest, AuthRelationshipResponse], error)
}

// AuthRelationshipPublisher specifies the auth relationship publisher methods.
type AuthRelationshipPublisher interface {
// PublishAuthRelationshipRequest publishes to the specified topic with the message given.
PublishAuthRelationshipRequest(ctx context.Context, topic string, message AuthRelationshipRequest) (Message[AuthRelationshipResponse], error)
}

// NewConnection creates a new Connection from the provided config.
func NewConnection(config Config, options ...Option) (Connection, error) {
var err error

for _, opt := range options {
err = multierr.Append(err, opt(&config))
}

if err != nil {
return nil, err
}

if config.NATS.Configured() {
return NewNATSConnection(config.NATS)
}

return nil, ErrProviderNotConfigured
}
32 changes: 32 additions & 0 deletions events/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package events

import "errors"

var (
// ErrProviderNotConfigured is an error packages should return if no events provider is configured.
ErrProviderNotConfigured = errors.New("events provider not configured")

// ErrMissingChangeMessageEventType is returned when the event message has the incorrect field EventType value.
ErrMissingChangeMessageEventType = errors.New("change message EventType field required")
// ErrMissingChangeMessageSubjectID is returned when the event message has the incorrect field SubjectID value.
ErrMissingChangeMessageSubjectID = errors.New("change message SubjectID field required")

// ErrMissingEventMessageEventType is returned when the event message has the incorrect field EventType value.
ErrMissingEventMessageEventType = errors.New("event message EventType field required")
// ErrMissingEventMessageSubjectID is returned when the event message has the incorrect field SubjectID value.
ErrMissingEventMessageSubjectID = errors.New("event message SubjectID field required")

// ErrInvalidAuthRelationshipRequestAction is returned when the event message has the incorrect field Action value.
ErrInvalidAuthRelationshipRequestAction = errors.New("auth relationship request message Action field must be write or delete")
// ErrMissingAuthRelationshipRequestObjectID is returned when the event message has the incorrect field ObjectID value.
ErrMissingAuthRelationshipRequestObjectID = errors.New("auth relationship request message ObjectID field required")
// ErrMissingAuthRelationshipRequestRelation is returned when the event message has no relations defined.
ErrMissingAuthRelationshipRequestRelation = errors.New("auth relationship request message Relations field required")
// ErrMissingAuthRelationshipRequestRelationRelation is returned when the event message Relations has the incorrect field for Relation value.
ErrMissingAuthRelationshipRequestRelationRelation = errors.New("auth relationship request message Relations Relation field required")
// ErrMissingAuthRelationshipRequestRelationSubjectID is returned when the event message Relations has the incorrect field SubjectID value.
ErrMissingAuthRelationshipRequestRelationSubjectID = errors.New("auth relationship request message Relations SubjectID field required")

// ErrRequestNoResponders is returned when a request is attempted but no responder is listening.
ErrRequestNoResponders = errors.New("no responders for request")
)
Loading

0 comments on commit 1fb9cfe

Please sign in to comment.