From e28b2660b6a49416f361b885e7f2249804c813af Mon Sep 17 00:00:00 2001 From: Raymond Tukpe Date: Wed, 26 Feb 2025 12:34:39 +0100 Subject: [PATCH 1/4] add event-type filters --- api/api.go | 37 +- api/handlers/filter.go | 427 ++++++++++++++++++ api/models/filter.go | 72 +++ cmd/worker/worker.go | 13 +- database/postgres/filter.go | 334 ++++++++++++++ datastore/models.go | 15 +- datastore/repository.go | 19 + services/create_broadcast_event.go | 6 +- sql/1740483415.sql | 45 ++ .../task/process_broadcast_event_creation.go | 6 +- worker/task/process_dynamic_event_creation.go | 6 +- worker/task/process_event_channel.go | 7 +- worker/task/process_event_creation.go | 110 +++-- 13 files changed, 1047 insertions(+), 50 deletions(-) create mode 100644 api/handlers/filter.go create mode 100644 api/models/filter.go create mode 100644 database/postgres/filter.go create mode 100644 sql/1740483415.sql diff --git a/api/api.go b/api/api.go index 283f4bb949..1f51be18cc 100644 --- a/api/api.go +++ b/api/api.go @@ -3,14 +3,15 @@ package api import ( "embed" "fmt" - "github.com/frain-dev/convoy" - "github.com/frain-dev/convoy/util" - "github.com/go-chi/render" "io/fs" "net/http" "path" "strings" + "github.com/frain-dev/convoy" + "github.com/frain-dev/convoy/util" + "github.com/go-chi/render" + authz "github.com/Subomi/go-authz" "github.com/frain-dev/convoy/api/handlers" "github.com/frain-dev/convoy/api/policies" @@ -217,6 +218,16 @@ func (a *ApplicationHandler) BuildControlPlaneRoutes() *chi.Mux { subscriptionRouter.Get("/{subscriptionID}", handler.GetSubscription) subscriptionRouter.With(handler.RequireEnabledProject()).Put("/{subscriptionID}", handler.UpdateSubscription) subscriptionRouter.Put("/{subscriptionID}/toggle_status", handler.ToggleSubscriptionStatus) + + // Filter routes + subscriptionRouter.Route("/{subscriptionID}/filters", func(filterRouter chi.Router) { + filterRouter.With(handler.RequireEnabledProject()).Post("/", handler.CreateFilter) + filterRouter.Get("/", handler.GetFilters) + filterRouter.Get("/{filterID}", handler.GetFilter) + filterRouter.With(handler.RequireEnabledProject()).Put("/{filterID}", handler.UpdateFilter) + filterRouter.With(handler.RequireEnabledProject()).Delete("/{filterID}", handler.DeleteFilter) + filterRouter.With(handler.RequireEnabledProject()).Post("/test/{eventType}", handler.TestFilter) + }) }) projectSubRouter.Route("/sources", func(sourceRouter chi.Router) { @@ -398,6 +409,16 @@ func (a *ApplicationHandler) BuildControlPlaneRoutes() *chi.Mux { subscriptionRouter.With(handler.RequireEnabledProject()).Delete("/{subscriptionID}", handler.DeleteSubscription) subscriptionRouter.Get("/{subscriptionID}", handler.GetSubscription) subscriptionRouter.With(handler.RequireEnabledProject()).Put("/{subscriptionID}", handler.UpdateSubscription) + + // Filter routes + subscriptionRouter.Route("/{subscriptionID}/filters", func(filterRouter chi.Router) { + filterRouter.With(handler.RequireEnabledProject()).Post("/", handler.CreateFilter) + filterRouter.Get("/", handler.GetFilters) + filterRouter.Get("/{filterID}", handler.GetFilter) + filterRouter.With(handler.RequireEnabledProject()).Put("/{filterID}", handler.UpdateFilter) + filterRouter.With(handler.RequireEnabledProject()).Delete("/{filterID}", handler.DeleteFilter) + filterRouter.With(handler.RequireEnabledProject()).Post("/test/{eventType}", handler.TestFilter) + }) }) projectSubRouter.Route("/sources", func(sourceRouter chi.Router) { @@ -506,6 +527,16 @@ func (a *ApplicationHandler) BuildControlPlaneRoutes() *chi.Mux { subscriptionRouter.Delete("/{subscriptionID}", handler.DeleteSubscription) subscriptionRouter.Get("/{subscriptionID}", handler.GetSubscription) subscriptionRouter.Put("/{subscriptionID}", handler.UpdateSubscription) + + // Filter routes + subscriptionRouter.Route("/{subscriptionID}/filters", func(filterRouter chi.Router) { + filterRouter.Post("/", handler.CreateFilter) + filterRouter.Get("/", handler.GetFilters) + filterRouter.Get("/{filterID}", handler.GetFilter) + filterRouter.Put("/{filterID}", handler.UpdateFilter) + filterRouter.Delete("/{filterID}", handler.DeleteFilter) + filterRouter.Post("/test/{eventType}", handler.TestFilter) + }) }) }) diff --git a/api/handlers/filter.go b/api/handlers/filter.go new file mode 100644 index 0000000000..3a496de45c --- /dev/null +++ b/api/handlers/filter.go @@ -0,0 +1,427 @@ +package handlers + +import ( + "errors" + "github.com/frain-dev/convoy/database/postgres" + "net/http" + "time" + + "github.com/frain-dev/convoy/api/models" + "github.com/frain-dev/convoy/datastore" + "github.com/frain-dev/convoy/util" + "github.com/go-chi/chi/v5" + "github.com/go-chi/render" + "github.com/oklog/ulid/v2" +) + +// CreateFilter +// +// @Summary Create a new filter +// @Description This endpoint creates a new filter for a subscription +// @Id CreateFilter +// @Tags Filters +// @Accept json +// @Produce json +// @Param projectID path string true "Project ID" +// @Param subscriptionID path string true "Subscription ID" +// @Param filter body models.CreateFilterRequest true "Filter to create" +// @Success 201 {object} util.ServerResponse{data=models.FilterResponse} +// @Failure 400,401,404 {object} util.ServerResponse{data=Stub} +// @Security ApiKeyAuth +// @Router /v1/projects/{projectID}/subscriptions/{subscriptionID}/filters [post] +func (h *Handler) CreateFilter(w http.ResponseWriter, r *http.Request) { + projectID := chi.URLParam(r, "projectID") + subscriptionID := chi.URLParam(r, "subscriptionID") + + var newFilter models.CreateFilterRequest + if err := util.ReadJSON(r, &newFilter); err != nil { + _ = render.Render(w, r, util.NewErrorResponse(err.Error(), http.StatusBadRequest)) + return + } + + // Validate the request + err := util.Validate(newFilter) + if err != nil { + _ = render.Render(w, r, util.NewErrorResponse(err.Error(), http.StatusBadRequest)) + return + } + + subRepo := postgres.NewSubscriptionRepo(h.A.DB) + filterRepo := postgres.NewFilterRepo(h.A.DB) + + // Check if subscription exists + _, err = subRepo.FindSubscriptionByID(r.Context(), projectID, subscriptionID) + if err != nil { + if errors.Is(err, datastore.ErrSubscriptionNotFound) { + _ = render.Render(w, r, util.NewErrorResponse("subscription not found", http.StatusNotFound)) + return + } + _ = render.Render(w, r, util.NewErrorResponse("failed to find subscription", http.StatusNotFound)) + return + } + + // Check if filter with same event type already exists + existingFilter, err := filterRepo.FindFilterBySubscriptionAndEventType(r.Context(), subscriptionID, newFilter.EventType) + if err != nil && err.Error() != datastore.ErrFilterNotFound.Error() { + _ = render.Render(w, r, util.NewErrorResponse("failed to check for existing filter", http.StatusBadRequest)) + return + } + + if existingFilter != nil { + _ = render.Render(w, r, util.NewErrorResponse("filter with this event type already exists", http.StatusBadRequest)) + return + } + + // Create the filter + filter := &datastore.EventTypeFilter{ + UID: ulid.Make().String(), + SubscriptionID: subscriptionID, + EventType: newFilter.EventType, + Headers: newFilter.Headers, + Body: newFilter.Body, + RawHeaders: newFilter.Headers, + RawBody: newFilter.Body, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + } + + err = filterRepo.CreateFilter(r.Context(), filter) + if err != nil { + _ = render.Render(w, r, util.NewErrorResponse("failed to create filter", http.StatusBadRequest)) + return + } + + resp := models.FilterResponse{EventTypeFilter: filter} + + _ = render.Render(w, r, util.NewServerResponse("Filter created successfully", resp, http.StatusCreated)) +} + +// GetFilter +// +// @Summary Get a filter +// @Description This endpoint retrieves a single filter +// @Id GetFilter +// @Tags Filters +// @Accept json +// @Produce json +// @Param projectID path string true "Project ID" +// @Param subscriptionID path string true "Subscription ID" +// @Param filterID path string true "Filter ID" +// @Success 200 {object} util.ServerResponse{data=models.FilterResponse} +// @Failure 400,401,404 {object} util.ServerResponse{data=Stub} +// @Security ApiKeyAuth +// @Router /v1/projects/{projectID}/subscriptions/{subscriptionID}/filters/{filterID} [get] +func (h *Handler) GetFilter(w http.ResponseWriter, r *http.Request) { + projectID := chi.URLParam(r, "projectID") + subscriptionID := chi.URLParam(r, "subscriptionID") + filterID := chi.URLParam(r, "filterID") + + subRepo := postgres.NewSubscriptionRepo(h.A.DB) + filterRepo := postgres.NewFilterRepo(h.A.DB) + + // Check if subscription exists + _, err := subRepo.FindSubscriptionByID(r.Context(), projectID, subscriptionID) + if err != nil { + if errors.Is(err, datastore.ErrSubscriptionNotFound) { + _ = render.Render(w, r, util.NewErrorResponse("subscription not found", http.StatusNotFound)) + return + } + _ = render.Render(w, r, util.NewErrorResponse("failed to find subscription", http.StatusNotFound)) + return + } + + // Get the filter + filter, err := filterRepo.FindFilterByID(r.Context(), filterID) + if err != nil { + if errors.Is(err, datastore.ErrFilterNotFound) { + _ = render.Render(w, r, util.NewErrorResponse("filter not found", http.StatusNotFound)) + return + } + _ = render.Render(w, r, util.NewErrorResponse("failed to find filter", http.StatusNotFound)) + return + } + + // Check if filter belongs to the subscription + if filter.SubscriptionID != subscriptionID { + _ = render.Render(w, r, util.NewErrorResponse("filter does not belong to this subscription", http.StatusNotFound)) + return + } + + resp := models.FilterResponse{EventTypeFilter: filter} + _ = render.Render(w, r, util.NewServerResponse("Filter retrieved successfully", resp, http.StatusOK)) +} + +// GetFilters +// +// @Summary List all filters +// @Description This endpoint fetches all filters for a subscription +// @Id GetFilters +// @Tags Filters +// @Accept json +// @Produce json +// @Param projectID path string true "Project ID" +// @Param subscriptionID path string true "Subscription ID" +// @Success 200 {object} util.ServerResponse{data=models.FiltersResponse} +// @Failure 400,401,404 {object} util.ServerResponse{data=Stub} +// @Security ApiKeyAuth +// @Router /v1/projects/{projectID}/subscriptions/{subscriptionID}/filters [get] +func (h *Handler) GetFilters(w http.ResponseWriter, r *http.Request) { + projectID := chi.URLParam(r, "projectID") + subscriptionID := chi.URLParam(r, "subscriptionID") + + subRepo := postgres.NewSubscriptionRepo(h.A.DB) + filterRepo := postgres.NewFilterRepo(h.A.DB) + + // Check if subscription exists + _, err := subRepo.FindSubscriptionByID(r.Context(), projectID, subscriptionID) + if err != nil { + if errors.Is(err, datastore.ErrSubscriptionNotFound) { + _ = render.Render(w, r, util.NewErrorResponse("subscription not found", http.StatusNotFound)) + return + } + _ = render.Render(w, r, util.NewErrorResponse("failed to find subscription", http.StatusBadRequest)) + return + } + + // Get all filters for the subscription + filters, err := filterRepo.FindFiltersBySubscriptionID(r.Context(), subscriptionID) + if err != nil { + _ = render.Render(w, r, util.NewErrorResponse("failed to find filters", http.StatusNotFound)) + return + } + + var eventTypeFilters []datastore.EventTypeFilter + for _, filter := range filters { + eventTypeFilters = append(eventTypeFilters, datastore.EventTypeFilter{ + UID: filter.UID, + SubscriptionID: filter.SubscriptionID, + EventType: filter.EventType, + Headers: filter.Headers, + Body: filter.Body, + CreatedAt: filter.CreatedAt, + UpdatedAt: filter.UpdatedAt, + }) + } + + resp := models.NewListResponse(eventTypeFilters, func(f datastore.EventTypeFilter) models.FilterResponse { + return models.FilterResponse{EventTypeFilter: &f} + }) + + _ = render.Render(w, r, util.NewServerResponse("Filters retrieved successfully", resp, http.StatusOK)) +} + +// UpdateFilter +// +// @Summary Update a filter +// @Description This endpoint updates an existing filter +// @Id UpdateFilter +// @Tags Filters +// @Accept json +// @Produce json +// @Param projectID path string true "Project ID" +// @Param subscriptionID path string true "Subscription ID" +// @Param filterID path string true "Filter ID" +// @Param filter body models.UpdateFilterRequest true "Updated filter" +// @Success 200 {object} util.ServerResponse{data=models.FilterResponse} +// @Failure 400,401,404 {object} util.ServerResponse{data=Stub} +// @Security ApiKeyAuth +// @Router /v1/projects/{projectID}/subscriptions/{subscriptionID}/filters/{filterID} [put] +func (h *Handler) UpdateFilter(w http.ResponseWriter, r *http.Request) { + projectID := chi.URLParam(r, "projectID") + subscriptionID := chi.URLParam(r, "subscriptionID") + filterID := chi.URLParam(r, "filterID") + + var updateFilter models.UpdateFilterRequest + if err := util.ReadJSON(r, &updateFilter); err != nil { + _ = render.Render(w, r, util.NewErrorResponse(err.Error(), http.StatusBadRequest)) + return + } + + // Validate the request + err := util.Validate(updateFilter) + if err != nil { + _ = render.Render(w, r, util.NewErrorResponse(err.Error(), http.StatusBadRequest)) + return + } + + subRepo := postgres.NewSubscriptionRepo(h.A.DB) + filterRepo := postgres.NewFilterRepo(h.A.DB) + + // Check if subscription exists + _, err = subRepo.FindSubscriptionByID(r.Context(), projectID, subscriptionID) + if err != nil { + if errors.Is(err, datastore.ErrSubscriptionNotFound) { + _ = render.Render(w, r, util.NewErrorResponse("subscription not found", http.StatusNotFound)) + return + } + _ = render.Render(w, r, util.NewErrorResponse("failed to find subscription", http.StatusNotFound)) + return + } + + // Get the filter + filter, err := filterRepo.FindFilterByID(r.Context(), filterID) + if err != nil { + if errors.Is(err, datastore.ErrFilterNotFound) { + _ = render.Render(w, r, util.NewErrorResponse("filter not found", http.StatusNotFound)) + return + } + _ = render.Render(w, r, util.NewErrorResponse("failed to find filter", http.StatusNotFound)) + return + } + + // Check if filter belongs to the subscription + if filter.SubscriptionID != subscriptionID { + _ = render.Render(w, r, util.NewErrorResponse("filter does not belong to this subscription", http.StatusNotFound)) + return + } + + // If event-type is being changed, check if a filter with the new event type already exists + if updateFilter.EventType != "" && updateFilter.EventType != filter.EventType { + existingFilter, innerErr := filterRepo.FindFilterBySubscriptionAndEventType(r.Context(), subscriptionID, updateFilter.EventType) + if innerErr != nil && !errors.Is(innerErr, datastore.ErrFilterNotFound) { + _ = render.Render(w, r, util.NewErrorResponse("failed to check for existing filter", http.StatusBadRequest)) + return + } + + if existingFilter != nil { + _ = render.Render(w, r, util.NewErrorResponse("filter with this event type already exists", http.StatusBadRequest)) + return + } + + filter.EventType = updateFilter.EventType + } + + // Update the filter + if updateFilter.Headers != nil { + filter.Headers = updateFilter.Headers + filter.RawHeaders = updateFilter.Headers + } + + if updateFilter.Body != nil { + filter.Body = updateFilter.Body + filter.RawBody = updateFilter.Body + } + + err = filterRepo.UpdateFilter(r.Context(), filter) + if err != nil { + _ = render.Render(w, r, util.NewErrorResponse("failed to update filter", http.StatusBadRequest)) + return + } + + resp := models.FilterResponse{EventTypeFilter: filter} + _ = render.Render(w, r, util.NewServerResponse("Filter updated successfully", resp, http.StatusOK)) +} + +// DeleteFilter +// +// @Summary Delete a filter +// @Description This endpoint deletes a filter +// @Id DeleteFilter +// @Tags Filters +// @Accept json +// @Produce json +// @Param projectID path string true "Project ID" +// @Param subscriptionID path string true "Subscription ID" +// @Param filterID path string true "Filter ID" +// @Success 200 {object} util.ServerResponse{data=Stub} +// @Failure 400,401,404 {object} util.ServerResponse{data=Stub} +// @Security ApiKeyAuth +// @Router /v1/projects/{projectID}/subscriptions/{subscriptionID}/filters/{filterID} [delete] +func (h *Handler) DeleteFilter(w http.ResponseWriter, r *http.Request) { + projectID := chi.URLParam(r, "projectID") + subscriptionID := chi.URLParam(r, "subscriptionID") + filterID := chi.URLParam(r, "filterID") + + subRepo := postgres.NewSubscriptionRepo(h.A.DB) + filterRepo := postgres.NewFilterRepo(h.A.DB) + + // Check if subscription exists + _, err := subRepo.FindSubscriptionByID(r.Context(), projectID, subscriptionID) + if err != nil { + if errors.Is(err, datastore.ErrSubscriptionNotFound) { + _ = render.Render(w, r, util.NewErrorResponse("subscription not found", http.StatusNotFound)) + return + } + _ = render.Render(w, r, util.NewErrorResponse("failed to find subscription", http.StatusNotFound)) + return + } + + // Get the filter + filter, err := filterRepo.FindFilterByID(r.Context(), filterID) + if err != nil { + if errors.Is(err, datastore.ErrFilterNotFound) { + _ = render.Render(w, r, util.NewErrorResponse("filter not found", http.StatusNotFound)) + return + } + _ = render.Render(w, r, util.NewErrorResponse("failed to find filter", http.StatusNotFound)) + return + } + + // Check if filter belongs to the subscription + if filter.SubscriptionID != subscriptionID { + _ = render.Render(w, r, util.NewErrorResponse("filter does not belong to this subscription", http.StatusNotFound)) + return + } + + // Delete the filter + err = filterRepo.DeleteFilter(r.Context(), filterID) + if err != nil { + _ = render.Render(w, r, util.NewErrorResponse("failed to delete filter", http.StatusBadRequest)) + return + } + + _ = render.Render(w, r, util.NewServerResponse("Filter deleted successfully", nil, http.StatusOK)) +} + +// TestFilter +// +// @Summary Test a filter +// @Description This endpoint tests a filter against a payload +// @Id TestFilter +// @Tags Filters +// @Accept json +// @Produce json +// @Param projectID path string true "Project ID" +// @Param subscriptionID path string true "Subscription ID" +// @Param eventType path string true "Event Type" +// @Param payload body models.TestFilterRequest true "Payload to test" +// @Success 200 {object} util.ServerResponse{data=models.TestFilterResponse} +// @Failure 400,401,404 {object} util.ServerResponse{data=Stub} +// @Security ApiKeyAuth +// @Router /v1/projects/{projectID}/subscriptions/{subscriptionID}/filters/test/{eventType} [post] +func (h *Handler) TestFilter(w http.ResponseWriter, r *http.Request) { + projectID := chi.URLParam(r, "projectID") + subscriptionID := chi.URLParam(r, "subscriptionID") + eventType := chi.URLParam(r, "eventType") + + var testPayload models.TestFilterRequest + if err := util.ReadJSON(r, &testPayload); err != nil { + _ = render.Render(w, r, util.NewErrorResponse(err.Error(), http.StatusBadRequest)) + return + } + + subRepo := postgres.NewSubscriptionRepo(h.A.DB) + filterRepo := postgres.NewFilterRepo(h.A.DB) + + // Check if subscription exists + _, err := subRepo.FindSubscriptionByID(r.Context(), projectID, subscriptionID) + if err != nil { + if errors.Is(err, datastore.ErrSubscriptionNotFound) { + _ = render.Render(w, r, util.NewErrorResponse("subscription not found", http.StatusNotFound)) + return + } + _ = render.Render(w, r, util.NewErrorResponse("failed to find subscription", http.StatusNotFound)) + return + } + + // Test the filter + isMatch, err := filterRepo.TestFilter(r.Context(), subscriptionID, eventType, testPayload.Payload) + if err != nil { + _ = render.Render(w, r, util.NewErrorResponse("failed to test filter", http.StatusBadRequest)) + return + } + + resp := models.TestFilterResponse{IsMatch: isMatch} + + _ = render.Render(w, r, util.NewServerResponse("Filter test completed", resp, http.StatusOK)) +} diff --git a/api/models/filter.go b/api/models/filter.go new file mode 100644 index 0000000000..8b4aa00795 --- /dev/null +++ b/api/models/filter.go @@ -0,0 +1,72 @@ +package models + +import ( + "time" + + "github.com/frain-dev/convoy/datastore" +) + +// Filter represents a filter entity in the API +type Filter struct { + // Unique identifier for the filter + UID string `json:"uid"` + + // ID of the subscription this filter belongs to + SubscriptionID string `json:"subscription_id"` + + // Type of event this filter applies to + EventType string `json:"event_type"` + + // Header matching criteria (optional) + Headers datastore.M `json:"headers"` + + // Body matching criteria (optional) + Body datastore.M `json:"body"` + + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` +} + +// CreateFilterRequest represents the request to create a filter +type CreateFilterRequest struct { + // Type of event this filter applies to (required) + EventType string `json:"event_type" validate:"required"` + + // Header matching criteria (optional) + Headers datastore.M `json:"headers"` + + // Body matching criteria (optional) + Body datastore.M `json:"body"` +} + +// UpdateFilterRequest represents the request to update a filter +type UpdateFilterRequest struct { + // Type of event this filter applies to (optional) + EventType string `json:"event_type"` + + // Header matching criteria (optional) + Headers datastore.M `json:"headers"` + + // Body matching criteria (optional) + Body datastore.M `json:"body"` + + // Whether the filter uses flattened JSON paths (optional) + IsFlattened *bool `json:"is_flattened"` +} + +// TestFilterRequest represents the request to test a filter +type TestFilterRequest struct { + // Sample payload to test against the filter (required) + Payload interface{} `json:"payload" validate:"required"` +} + +// FilterResponse represents the response for a single filter +type FilterResponse struct { + *datastore.EventTypeFilter +} + +// TestFilterResponse represents the response for a filter test +type TestFilterResponse struct { + // Whether the payload matches the filter criteria + IsMatch bool `json:"is_match"` +} diff --git a/cmd/worker/worker.go b/cmd/worker/worker.go index 3273373346..d18507860d 100644 --- a/cmd/worker/worker.go +++ b/cmd/worker/worker.go @@ -228,6 +228,7 @@ func StartWorker(ctx context.Context, a *cli.App, cfg config.Configuration, inte deviceRepo := postgres.NewDeviceRepo(a.DB) configRepo := postgres.NewConfigRepo(a.DB) attemptRepo := postgres.NewDeliveryAttemptRepo(a.DB) + filterRepo := postgres.NewFilterRepo(a.DB) rd, err := rdb.NewClient(cfg.Redis.BuildDsn()) if err != nil { @@ -370,14 +371,12 @@ func StartWorker(ctx context.Context, a *cli.App, cfg config.Configuration, inte newTelemetry) consumer.RegisterHandlers(convoy.CreateEventProcessor, task.ProcessEventCreation( - defaultCh, endpointRepo, eventRepo, projectRepo, - eventDeliveryRepo, a.Queue, subRepo, - deviceRepo, + filterRepo, a.Licenser, a.TracerBackend), newTelemetry) @@ -401,23 +400,20 @@ func StartWorker(ctx context.Context, a *cli.App, cfg config.Configuration, inte endpointRepo, eventRepo, projectRepo, - eventDeliveryRepo, a.Queue, subRepo, - deviceRepo, + filterRepo, a.Licenser, a.TracerBackend), newTelemetry) consumer.RegisterHandlers(convoy.CreateDynamicEventProcessor, task.ProcessDynamicEventCreation( - dynamicCh, endpointRepo, eventRepo, projectRepo, - eventDeliveryRepo, a.Queue, subRepo, - deviceRepo, + filterRepo, a.Licenser, a.TracerBackend), newTelemetry) @@ -435,6 +431,7 @@ func StartWorker(ctx context.Context, a *cli.App, cfg config.Configuration, inte eventDeliveryRepo, a.Queue, subRepo, + filterRepo, deviceRepo, a.Licenser, a.TracerBackend), diff --git a/database/postgres/filter.go b/database/postgres/filter.go new file mode 100644 index 0000000000..82e7e6bd9e --- /dev/null +++ b/database/postgres/filter.go @@ -0,0 +1,334 @@ +package postgres + +import ( + "context" + "database/sql" + "errors" + "fmt" + "time" + + "github.com/frain-dev/convoy/database" + "github.com/frain-dev/convoy/datastore" + "github.com/frain-dev/convoy/pkg/compare" + "github.com/frain-dev/convoy/pkg/flatten" + "github.com/frain-dev/convoy/pkg/log" + "github.com/frain-dev/convoy/util" + "github.com/jmoiron/sqlx" + "github.com/oklog/ulid/v2" +) + +const ( + createFilter = ` + INSERT INTO convoy.filters ( + id, subscription_id, event_type, + headers, body, raw_headers, raw_body + ) + VALUES ($1, $2, $3, $4, $5, $6, $7); + ` + + updateFilter = ` + UPDATE convoy.filters SET + headers=$2, + body=$3, + raw_headers=$4, + raw_body=$5, + updated_at=now() + WHERE id = $1; + ` + + deleteFilter = ` + DELETE FROM convoy.filters + WHERE id = $1; + ` + + findFilterByID = ` + SELECT + id, subscription_id, event_type, + headers, body, raw_headers, raw_body, + created_at, updated_at + FROM convoy.filters + WHERE id = $1; + ` + + findFiltersBySubscriptionID = ` + SELECT + id, subscription_id, event_type, + headers, body, raw_headers, raw_body, + created_at, updated_at + FROM convoy.filters + WHERE subscription_id = $1; + ` + + findFilterBySubscriptionAndEventType = ` + SELECT + id, subscription_id, event_type, + headers, body, raw_headers, raw_body, + created_at, updated_at + FROM convoy.filters + WHERE subscription_id = $1 + AND event_type = $2; + ` +) + +var ( + ErrFilterNotCreated = errors.New("filter could not be created") + ErrFilterNotUpdated = errors.New("filter could not be updated") + ErrFilterNotDeleted = errors.New("filter could not be deleted") + ErrFilterNotFound = errors.New("filter not found") +) + +type filterRepo struct { + db database.Database +} + +func NewFilterRepo(db database.Database) datastore.FilterRepository { + return &filterRepo{db: db} +} + +func (f *filterRepo) CreateFilter(ctx context.Context, filter *datastore.EventTypeFilter) error { + if util.IsStringEmpty(filter.UID) { + filter.UID = ulid.Make().String() + } + + if filter.CreatedAt.IsZero() { + filter.CreatedAt = time.Now() + } + + if filter.UpdatedAt.IsZero() { + filter.UpdatedAt = time.Now() + } + + err := filter.Body.Flatten() + if err != nil { + return fmt.Errorf("failed to flatten body filter: %v", err) + } + + err = filter.Headers.Flatten() + if err != nil { + return fmt.Errorf("failed to flatten header filter: %v", err) + } + + _, err = f.db.GetDB().ExecContext( + ctx, + createFilter, + filter.UID, + filter.SubscriptionID, + filter.EventType, + filter.Headers, + filter.Body, + filter.RawHeaders, + filter.RawBody, + ) + + if err != nil { + log.FromContext(ctx).WithError(err).Error("failed to create filter") + return ErrFilterNotCreated + } + + return nil +} + +func (f *filterRepo) CreateFilters(ctx context.Context, filters []datastore.EventTypeFilter) error { + tx, err := f.db.GetDB().BeginTxx(ctx, nil) + if err != nil { + return err + } + defer tx.Rollback() + + for i := range filters { + filter := &filters[i] + if util.IsStringEmpty(filter.UID) { + filter.UID = ulid.Make().String() + } + + if filter.CreatedAt.IsZero() { + filter.CreatedAt = time.Now() + } + + if filter.UpdatedAt.IsZero() { + filter.UpdatedAt = time.Now() + } + + err = filter.Body.Flatten() + if err != nil { + return fmt.Errorf("failed to flatten body filter: %v", err) + } + + err = filter.Headers.Flatten() + if err != nil { + return fmt.Errorf("failed to flatten header filter: %v", err) + } + + _, err = tx.ExecContext( + ctx, + createFilter, + filter.UID, + filter.SubscriptionID, + filter.EventType, + filter.Headers, + filter.Body, + filter.RawHeaders, + filter.RawBody, + ) + + if err != nil { + log.FromContext(ctx).WithError(err).Error("failed to create filter") + return ErrFilterNotCreated + } + } + + err = tx.Commit() + if err != nil { + return err + } + + return nil +} + +func (f *filterRepo) UpdateFilter(ctx context.Context, filter *datastore.EventTypeFilter) error { + err := filter.Body.Flatten() + if err != nil { + return fmt.Errorf("failed to flatten body filter: %v", err) + } + + err = filter.Headers.Flatten() + if err != nil { + return fmt.Errorf("failed to flatten header filter: %v", err) + } + + result, err := f.db.GetDB().ExecContext( + ctx, + updateFilter, + filter.UID, + filter.Headers, + filter.Body, + filter.RawHeaders, + filter.RawBody, + ) + + if err != nil { + log.FromContext(ctx).WithError(err).Error("failed to update filter") + return ErrFilterNotUpdated + } + + rowsAffected, err := result.RowsAffected() + if err != nil { + log.FromContext(ctx).WithError(err).Error("failed to get rows affected") + return ErrFilterNotUpdated + } + + if rowsAffected == 0 { + return ErrFilterNotFound + } + + return nil +} + +func (f *filterRepo) DeleteFilter(ctx context.Context, filterID string) error { + result, err := f.db.GetDB().ExecContext(ctx, deleteFilter, filterID) + if err != nil { + log.FromContext(ctx).WithError(err).Error("failed to delete filter") + return ErrFilterNotDeleted + } + + rowsAffected, err := result.RowsAffected() + if err != nil { + log.FromContext(ctx).WithError(err).Error("failed to get rows affected") + return ErrFilterNotDeleted + } + + if rowsAffected == 0 { + return ErrFilterNotFound + } + + return nil +} + +func (f *filterRepo) FindFilterByID(ctx context.Context, filterID string) (*datastore.EventTypeFilter, error) { + var filter datastore.EventTypeFilter + err := f.db.GetDB().QueryRowxContext(ctx, findFilterByID, filterID).StructScan(&filter) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, ErrFilterNotFound + } + return nil, err + } + + return &filter, nil +} + +func (f *filterRepo) FindFiltersBySubscriptionID(ctx context.Context, subscriptionID string) ([]datastore.EventTypeFilter, error) { + rows, err := f.db.GetDB().QueryxContext(ctx, findFiltersBySubscriptionID, subscriptionID) + if err != nil { + return nil, err + } + defer rows.Close() + + return scanFilters(rows) +} + +func (f *filterRepo) FindFilterBySubscriptionAndEventType(ctx context.Context, subscriptionID, eventType string) (*datastore.EventTypeFilter, error) { + var filter datastore.EventTypeFilter + err := f.db.GetDB().QueryRowxContext(ctx, findFilterBySubscriptionAndEventType, subscriptionID, eventType).StructScan(&filter) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, ErrFilterNotFound + } + return nil, err + } + + return &filter, nil +} + +func (f *filterRepo) TestFilter(ctx context.Context, subscriptionID, eventType string, payload interface{}) (bool, error) { + filter, err := f.FindFilterBySubscriptionAndEventType(ctx, subscriptionID, eventType) + if err != nil { + if errors.Is(err, ErrFilterNotFound) { + // If no filter exists for this event type, check for a catch-all filter + _filter, _err := f.FindFilterBySubscriptionAndEventType(ctx, subscriptionID, "*") + if _err != nil { + if errors.Is(_err, ErrFilterNotFound) { + // there is no filtering, so it matches + return true, nil + } + return false, _err + } + + filter = _filter + } else { + return false, err + } + } + + if len(filter.Body) == 0 { + // Empty filter means it matches everything + return true, nil + } + + p, err := flatten.Flatten(payload) + if err != nil { + return false, err + } + + return compare.Compare(p, filter.Body) +} + +func scanFilters(rows *sqlx.Rows) ([]datastore.EventTypeFilter, error) { + var filters []datastore.EventTypeFilter + + for rows.Next() { + var filter datastore.EventTypeFilter + err := rows.StructScan(&filter) + if err != nil { + return nil, err + } + + filters = append(filters, filter) + } + + if err := rows.Err(); err != nil { + return nil, err + } + + return filters, nil +} diff --git a/datastore/models.go b/datastore/models.go index 4e9421fd3c..a1de466008 100644 --- a/datastore/models.go +++ b/datastore/models.go @@ -6,6 +6,7 @@ import ( "encoding/json" "errors" "fmt" + "golang.org/x/crypto/bcrypt" "math" "net/http" "strings" @@ -23,7 +24,6 @@ import ( "github.com/frain-dev/convoy/config" "github.com/frain-dev/convoy/pkg/httpheader" "github.com/lib/pq" - "golang.org/x/crypto/bcrypt" ) type Pageable struct { @@ -1249,6 +1249,19 @@ type FilterConfiguration struct { Filter FilterSchema `json:"filter" db:"filter"` } +// EventTypeFilter represents a filter configuration for a specific event type within a subscription +type EventTypeFilter struct { + UID string `json:"uid" db:"id"` + SubscriptionID string `json:"subscription_id" db:"subscription_id"` + EventType string `json:"event_type" db:"event_type"` + Headers M `json:"headers" db:"headers"` + Body M `json:"body" db:"body"` + RawHeaders M `json:"raw_headers" db:"raw_headers"` + RawBody M `json:"raw_body" db:"raw_body"` + CreatedAt time.Time `json:"-" db:"created_at" swaggertype:"string"` + UpdatedAt time.Time `json:"-" db:"updated_at" swaggertype:"string"` +} + type M map[string]interface{} // Flatten is only intended for use for filter body & headers diff --git a/datastore/repository.go b/datastore/repository.go index eb35cde690..62906b65f7 100644 --- a/datastore/repository.go +++ b/datastore/repository.go @@ -6,6 +6,7 @@ import ( "io" "time" + "errors" "github.com/frain-dev/convoy/pkg/flatten" ) @@ -120,6 +121,7 @@ type EndpointRepository interface { UpdateSecrets(ctx context.Context, endpointID string, projectID string, secrets Secrets) error DeleteSecret(ctx context.Context, endpoint *Endpoint, secretID string, projectID string) error } + type SubscriptionRepository interface { CreateSubscription(context.Context, string, *Subscription) error UpdateSubscription(ctx context.Context, projectID string, subscription *Subscription) error @@ -138,6 +140,17 @@ type SubscriptionRepository interface { FetchUpdatedSubscriptions(ctx context.Context, projectIDs []string, t time.Time, pageSize int64) ([]Subscription, error) } +type FilterRepository interface { + CreateFilter(ctx context.Context, filter *EventTypeFilter) error + CreateFilters(ctx context.Context, filters []EventTypeFilter) error + UpdateFilter(ctx context.Context, filter *EventTypeFilter) error + DeleteFilter(ctx context.Context, filterID string) error + FindFilterByID(ctx context.Context, filterID string) (*EventTypeFilter, error) + FindFiltersBySubscriptionID(ctx context.Context, subscriptionID string) ([]EventTypeFilter, error) + FindFilterBySubscriptionAndEventType(ctx context.Context, subscriptionID, eventType string) (*EventTypeFilter, error) + TestFilter(ctx context.Context, subscriptionID, eventType string, payload interface{}) (bool, error) +} + type SourceRepository interface { CreateSource(context.Context, *Source) error UpdateSource(ctx context.Context, projectId string, source *Source) error @@ -227,3 +240,9 @@ type EventTypesRepository interface { FetchEventTypeById(context.Context, string, string) (*ProjectEventType, error) FetchAllEventTypes(context.Context, string) ([]ProjectEventType, error) } + +// Filter errors +var ( + ErrFilterNotFound = errors.New("filter not found") + ErrDuplicateFilter = errors.New("duplicate filter") +) diff --git a/services/create_broadcast_event.go b/services/create_broadcast_event.go index 460fec6e63..17f8a20522 100644 --- a/services/create_broadcast_event.go +++ b/services/create_broadcast_event.go @@ -17,13 +17,9 @@ import ( ) type CreateBroadcastEventService struct { - EndpointRepo datastore.EndpointRepository - EventRepo datastore.EventRepository - PortalLinkRepo datastore.PortalLinkRepository - Queue queue.Queuer - BroadcastEvent *models.BroadcastEvent Project *datastore.Project + Queue queue.Queuer } func (e *CreateBroadcastEventService) Run(ctx context.Context) error { diff --git a/sql/1740483415.sql b/sql/1740483415.sql new file mode 100644 index 0000000000..e5cfd2b685 --- /dev/null +++ b/sql/1740483415.sql @@ -0,0 +1,45 @@ +-- +migrate Up +CREATE TABLE IF NOT EXISTS convoy.filters ( + id VARCHAR PRIMARY KEY, + subscription_id VARCHAR NOT NULL, + event_type VARCHAR NOT NULL, + headers JSONB NOT NULL DEFAULT '{}'::jsonb, + body JSONB NOT NULL DEFAULT '{}'::jsonb, + raw_headers JSONB NOT NULL DEFAULT '{}'::jsonb, + raw_body JSONB NOT NULL DEFAULT '{}'::jsonb, + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + updated_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + CONSTRAINT fk_subscription + FOREIGN KEY (subscription_id) + REFERENCES convoy.subscriptions(id) + ON DELETE CASCADE +); + +CREATE INDEX idx_filters_subscription_id ON convoy.filters(subscription_id); +CREATE INDEX idx_filters_event_type ON convoy.filters(event_type); +CREATE INDEX idx_filters_subscription_event_type ON convoy.filters(subscription_id, event_type); + +-- Migrate existing subscription filters to the new filters table. For each subscription event type, create a filter. +INSERT INTO convoy.filters ( + id, + subscription_id, + event_type, + headers, + body, + raw_headers, + raw_body +) +SELECT + convoy.generate_ulid()::VARCHAR, + id, + unnest(filter_config_event_types), + filter_config_filter_headers, + filter_config_filter_body, + filter_config_filter_raw_headers, + filter_config_filter_raw_body +FROM convoy.subscriptions +WHERE deleted_at IS NULL; + +-- +migrate Down +DROP TABLE IF EXISTS convoy.filters; + diff --git a/worker/task/process_broadcast_event_creation.go b/worker/task/process_broadcast_event_creation.go index 4a7547aa90..bd59dcf2f4 100644 --- a/worker/task/process_broadcast_event_creation.go +++ b/worker/task/process_broadcast_event_creation.go @@ -149,7 +149,7 @@ func (b *BroadcastEventChannel) MatchSubscriptions(ctx context.Context, metadata subscriptions = append(subscriptions, eventTypeSubs...) subscriptions = append(subscriptions, matchAllSubs...) - subscriptions, err = matchSubscriptionsUsingFilter(ctx, broadcastEvent, args.subRepo, args.licenser, subscriptions, true) + subscriptions, err = matchSubscriptionsUsingFilter(ctx, broadcastEvent, args.subRepo, args.filterRepo, args.licenser, subscriptions, true) if err != nil { args.tracerBackend.Capture(ctx, "broadcast.subscription.matching.error", attributes, startTime, time.Now()) return nil, &EndpointError{Err: fmt.Errorf("failed to match subscriptions using filter, err: %s", err.Error()), delay: defaultBroadcastDelay} @@ -172,8 +172,8 @@ func (b *BroadcastEventChannel) MatchSubscriptions(ctx context.Context, metadata return &response, nil } -func ProcessBroadcastEventCreation(ch *BroadcastEventChannel, endpointRepo datastore.EndpointRepository, eventRepo datastore.EventRepository, projectRepo datastore.ProjectRepository, eventDeliveryRepo datastore.EventDeliveryRepository, eventQueue queue.Queuer, subRepo datastore.SubscriptionRepository, deviceRepo datastore.DeviceRepository, licenser license.Licenser, tracerBackend tracer.Backend) func(context.Context, *asynq.Task) error { - return ProcessEventCreationByChannel(ch, endpointRepo, eventRepo, projectRepo, eventQueue, subRepo, licenser, tracerBackend) +func ProcessBroadcastEventCreation(ch *BroadcastEventChannel, endpointRepo datastore.EndpointRepository, eventRepo datastore.EventRepository, projectRepo datastore.ProjectRepository, eventQueue queue.Queuer, subRepo datastore.SubscriptionRepository, filterRepo datastore.FilterRepository, licenser license.Licenser, tracerBackend tracer.Backend) func(context.Context, *asynq.Task) error { + return ProcessEventCreationByChannel(ch, endpointRepo, eventRepo, projectRepo, eventQueue, subRepo, filterRepo, licenser, tracerBackend) } func getEndpointIDs(subs []datastore.Subscription) ([]string, []datastore.Subscription) { diff --git a/worker/task/process_dynamic_event_creation.go b/worker/task/process_dynamic_event_creation.go index 420f59f7f3..23fa127634 100644 --- a/worker/task/process_dynamic_event_creation.go +++ b/worker/task/process_dynamic_event_creation.go @@ -190,8 +190,10 @@ func (d *DynamicEventChannel) MatchSubscriptions(ctx context.Context, metadata E return &response, nil } -func ProcessDynamicEventCreation(ch *DynamicEventChannel, endpointRepo datastore.EndpointRepository, eventRepo datastore.EventRepository, projectRepo datastore.ProjectRepository, eventDeliveryRepo datastore.EventDeliveryRepository, eventQueue queue.Queuer, subRepo datastore.SubscriptionRepository, deviceRepo datastore.DeviceRepository, licenser license.Licenser, tracerBackend tracer.Backend) func(context.Context, *asynq.Task) error { - return ProcessEventCreationByChannel(ch, endpointRepo, eventRepo, projectRepo, eventQueue, subRepo, licenser, tracerBackend) +func ProcessDynamicEventCreation(endpointRepo datastore.EndpointRepository, eventRepo datastore.EventRepository, projectRepo datastore.ProjectRepository, eventQueue queue.Queuer, subRepo datastore.SubscriptionRepository, filterRepo datastore.FilterRepository, licenser license.Licenser, tracerBackend tracer.Backend) func(context.Context, *asynq.Task) error { + ch := &DynamicEventChannel{} + + return ProcessEventCreationByChannel(ch, endpointRepo, eventRepo, projectRepo, eventQueue, subRepo, filterRepo, licenser, tracerBackend) } func findEndpoint(ctx context.Context, project *datastore.Project, endpointRepo datastore.EndpointRepository, dynamicEvent *models.DynamicEvent) (*datastore.Endpoint, error) { diff --git a/worker/task/process_event_channel.go b/worker/task/process_event_channel.go index fd8162d833..48df62a5a7 100644 --- a/worker/task/process_event_channel.go +++ b/worker/task/process_event_channel.go @@ -34,6 +34,7 @@ type EventChannelArgs struct { projectRepo datastore.ProjectRepository endpointRepo datastore.EndpointRepository subRepo datastore.SubscriptionRepository + filterRepo datastore.FilterRepository licenser license.Licenser tracerBackend tracer.Backend } @@ -51,7 +52,7 @@ type EventChannel interface { MatchSubscriptions(context.Context, EventChannelMetadata, EventChannelArgs) (*EventChannelSubResponse, error) } -func ProcessEventCreationByChannel(channel EventChannel, endpointRepo datastore.EndpointRepository, eventRepo datastore.EventRepository, projectRepo datastore.ProjectRepository, eventQueue queue.Queuer, subRepo datastore.SubscriptionRepository, licenser license.Licenser, tracerBackend tracer.Backend) func(context.Context, *asynq.Task) error { +func ProcessEventCreationByChannel(channel EventChannel, endpointRepo datastore.EndpointRepository, eventRepo datastore.EventRepository, projectRepo datastore.ProjectRepository, eventQueue queue.Queuer, subRepo datastore.SubscriptionRepository, filterRepo datastore.FilterRepository, licenser license.Licenser, tracerBackend tracer.Backend) func(context.Context, *asynq.Task) error { return func(ctx context.Context, t *asynq.Task) error { cfg := channel.GetConfig() @@ -71,6 +72,7 @@ func ProcessEventCreationByChannel(channel EventChannel, endpointRepo datastore. projectRepo: projectRepo, endpointRepo: endpointRepo, subRepo: subRepo, + filterRepo: filterRepo, licenser: licenser, tracerBackend: tracerBackend, }) @@ -119,7 +121,7 @@ func ProcessEventCreationByChannel(channel EventChannel, endpointRepo datastore. } } -func MatchSubscriptionsAndCreateEventDeliveries(channels map[string]EventChannel, endpointRepo datastore.EndpointRepository, eventRepo datastore.EventRepository, projectRepo datastore.ProjectRepository, eventDeliveryRepo datastore.EventDeliveryRepository, eventQueue queue.Queuer, subRepo datastore.SubscriptionRepository, deviceRepo datastore.DeviceRepository, licenser license.Licenser, tracerBackend tracer.Backend) func(context.Context, *asynq.Task) error { +func MatchSubscriptionsAndCreateEventDeliveries(channels map[string]EventChannel, endpointRepo datastore.EndpointRepository, eventRepo datastore.EventRepository, projectRepo datastore.ProjectRepository, eventDeliveryRepo datastore.EventDeliveryRepository, eventQueue queue.Queuer, subRepo datastore.SubscriptionRepository, filterRepo datastore.FilterRepository, deviceRepo datastore.DeviceRepository, licenser license.Licenser, tracerBackend tracer.Backend) func(context.Context, *asynq.Task) error { return func(ctx context.Context, t *asynq.Task) error { // Start a new trace span for subscription matching and event delivery creation startTime := time.Now() @@ -152,6 +154,7 @@ func MatchSubscriptionsAndCreateEventDeliveries(channels map[string]EventChannel projectRepo: projectRepo, endpointRepo: endpointRepo, subRepo: subRepo, + filterRepo: filterRepo, licenser: licenser, tracerBackend: tracerBackend, }) diff --git a/worker/task/process_event_creation.go b/worker/task/process_event_creation.go index 42e901518a..2c168ac37e 100644 --- a/worker/task/process_event_creation.go +++ b/worker/task/process_event_creation.go @@ -188,7 +188,7 @@ func (d *DefaultEventChannel) MatchSubscriptions(ctx context.Context, metadata E createSubscription = !util.IsStringEmpty(cs) && cs == "true" } - subscriptions, err := findSubscriptions(ctx, args.endpointRepo, args.subRepo, args.licenser, project, event, createSubscription) + subscriptions, err := findSubscriptions(ctx, args.endpointRepo, args.subRepo, args.filterRepo, args.licenser, project, event, createSubscription) if err != nil { return nil, &EndpointError{Err: err, delay: defaultDelay} } @@ -201,8 +201,10 @@ func (d *DefaultEventChannel) MatchSubscriptions(ctx context.Context, metadata E return &response, nil } -func ProcessEventCreation(ch *DefaultEventChannel, endpointRepo datastore.EndpointRepository, eventRepo datastore.EventRepository, projectRepo datastore.ProjectRepository, eventDeliveryRepo datastore.EventDeliveryRepository, eventQueue queue.Queuer, subRepo datastore.SubscriptionRepository, deviceRepo datastore.DeviceRepository, licenser license.Licenser, tracerBackend tracer.Backend) func(context.Context, *asynq.Task) error { - return ProcessEventCreationByChannel(ch, endpointRepo, eventRepo, projectRepo, eventQueue, subRepo, licenser, tracerBackend) +func ProcessEventCreation(endpointRepo datastore.EndpointRepository, eventRepo datastore.EventRepository, projectRepo datastore.ProjectRepository, eventQueue queue.Queuer, subRepo datastore.SubscriptionRepository, filterRepo datastore.FilterRepository, licenser license.Licenser, tracerBackend tracer.Backend) func(context.Context, *asynq.Task) error { + ch := &DefaultEventChannel{} + + return ProcessEventCreationByChannel(ch, endpointRepo, eventRepo, projectRepo, eventQueue, subRepo, filterRepo, licenser, tracerBackend) } func writeEventDeliveriesToQueue(ctx context.Context, subscriptions []datastore.Subscription, event *datastore.Event, project *datastore.Project, eventDeliveryRepo datastore.EventDeliveryRepository, eventQueue queue.Queuer, deviceRepo datastore.DeviceRepository, endpointRepo datastore.EndpointRepository, licenser license.Licenser) error { @@ -339,7 +341,7 @@ func writeEventDeliveriesToQueue(ctx context.Context, subscriptions []datastore. } func findSubscriptions(ctx context.Context, endpointRepo datastore.EndpointRepository, - subRepo datastore.SubscriptionRepository, licenser license.Licenser, project *datastore.Project, event *datastore.Event, shouldCreateSubscription bool, + subRepo datastore.SubscriptionRepository, filterRepo datastore.FilterRepository, licenser license.Licenser, project *datastore.Project, event *datastore.Event, shouldCreateSubscription bool, ) ([]datastore.Subscription, error) { var subscriptions []datastore.Subscription var err error @@ -369,14 +371,17 @@ func findSubscriptions(ctx context.Context, endpointRepo datastore.EndpointRepos return subscriptions, nil } - subs = matchSubscriptions(string(event.EventType), subs) + matchedSubs, err := matchSubscriptions(ctx, string(event.EventType), subs, filterRepo) + if err != nil { + return subscriptions, &EndpointError{Err: errors.New("error matching subscriptions for event type"), delay: defaultDelay} + } - subs, err = matchSubscriptionsUsingFilter(ctx, event, subRepo, licenser, subs, false) + matchedSubs, err = matchSubscriptionsUsingFilter(ctx, event, subRepo, filterRepo, licenser, matchedSubs, false) if err != nil { return subscriptions, &EndpointError{Err: errors.New("error fetching subscriptions for event type"), delay: defaultDelay} } - subscriptions = append(subscriptions, subs...) + subscriptions = append(subscriptions, matchedSubs...) } } else if project.Type == datastore.IncomingProject { subscriptions, err = subRepo.FindSubscriptionsBySourceID(ctx, project.UID, event.SourceID) @@ -384,7 +389,21 @@ func findSubscriptions(ctx context.Context, endpointRepo datastore.EndpointRepos return nil, &EndpointError{Err: err, delay: defaultDelay} } - subscriptions, err = matchSubscriptionsUsingFilter(ctx, event, subRepo, licenser, subscriptions, false) + if len(subscriptions) > 0 { + matchedSubs, err := matchSubscriptions(ctx, string(event.EventType), subscriptions, filterRepo) + if err != nil { + return subscriptions, &EndpointError{Err: errors.New("error matching subscriptions for event type"), delay: defaultDelay} + } + + matchedSubs, err = matchSubscriptionsUsingFilter(ctx, event, subRepo, filterRepo, licenser, matchedSubs, false) + if err != nil { + return subscriptions, &EndpointError{Err: errors.New("error fetching subscriptions for event type"), delay: defaultDelay} + } + + subscriptions = matchedSubs + } + + subscriptions, err = matchSubscriptionsUsingFilter(ctx, event, subRepo, filterRepo, licenser, subscriptions, false) if err != nil { log.WithError(err).Error("error find a matching subscription for this source") return subscriptions, &EndpointError{Err: errors.New("error find a matching subscription for this source"), delay: defaultDelay} @@ -394,7 +413,7 @@ func findSubscriptions(ctx context.Context, endpointRepo datastore.EndpointRepos return subscriptions, nil } -func matchSubscriptionsUsingFilter(ctx context.Context, e *datastore.Event, subRepo datastore.SubscriptionRepository, licenser license.Licenser, subscriptions []datastore.Subscription, soft bool) ([]datastore.Subscription, error) { +func matchSubscriptionsUsingFilter(ctx context.Context, e *datastore.Event, subRepo datastore.SubscriptionRepository, filterRepo datastore.FilterRepository, licenser license.Licenser, subscriptions []datastore.Subscription, soft bool) ([]datastore.Subscription, error) { if !licenser.AdvancedSubscriptions() { return subscriptions, nil } @@ -427,25 +446,47 @@ func matchSubscriptionsUsingFilter(ctx context.Context, e *datastore.Event, subR for i := range subscriptions { s = &subscriptions[i] - if len(s.FilterConfig.Filter.Body) == 0 && len(s.FilterConfig.Filter.Headers) == 0 { + + // First check if there's a specific filter for this event type + filter, innerErr := filterRepo.FindFilterBySubscriptionAndEventType(ctx, s.UID, string(e.EventType)) + if innerErr != nil && !errors.Is(innerErr, datastore.ErrFilterNotFound) && soft { + log.WithError(innerErr).Errorf("failed to find filter for subscription (%s) and event type (%s)", s.UID, e.EventType) + continue + } else if innerErr != nil && !errors.Is(innerErr, datastore.ErrFilterNotFound) { + return nil, innerErr + } + + // If no specific filter found, try to find a catch-all filter + if filter == nil { + filter, innerErr = filterRepo.FindFilterBySubscriptionAndEventType(ctx, s.UID, "*") + if innerErr != nil && !errors.Is(innerErr, datastore.ErrFilterNotFound) && soft { + log.WithError(innerErr).Errorf("failed to find catch-all filter for subscription (%s)", s.UID) + continue + } else if innerErr != nil && !errors.Is(innerErr, datastore.ErrFilterNotFound) { + return nil, innerErr + } + } + + // If no filter found at all, or filter has no conditions, match the subscription + if filter == nil || (len(filter.Body) == 0 && len(filter.Headers) == 0) { matched = append(matched, *s) continue } - isBodyMatched, err := subRepo.CompareFlattenedPayload(ctx, flatPayload, s.FilterConfig.Filter.Body, s.FilterConfig.Filter.IsFlattened) - if err != nil && soft { - log.WithError(err).Errorf("subcription (%s) failed to match body", s.UID) + isBodyMatched, innerErr := subRepo.CompareFlattenedPayload(ctx, flatPayload, filter.Body, true) + if innerErr != nil && soft { + log.WithError(innerErr).Errorf("subcription (%s) failed to match body", s.UID) continue - } else if err != nil { - return nil, err + } else if innerErr != nil { + return nil, innerErr } - isHeaderMatched, err := subRepo.CompareFlattenedPayload(ctx, headers, s.FilterConfig.Filter.Headers, s.FilterConfig.Filter.IsFlattened) - if err != nil && soft { - log.WithError(err).Errorf("subscription (%s) failed to match header", s.UID) + isHeaderMatched, innerErr := subRepo.CompareFlattenedPayload(ctx, headers, filter.Headers, true) + if innerErr != nil && soft { + log.WithError(innerErr).Errorf("subscription (%s) failed to match header", s.UID) continue - } else if err != nil { - return nil, err + } else if innerErr != nil { + return nil, innerErr } isMatched := isHeaderMatched && isBodyMatched @@ -458,17 +499,34 @@ func matchSubscriptionsUsingFilter(ctx context.Context, e *datastore.Event, subR return matched, nil } -func matchSubscriptions(eventType string, subscriptions []datastore.Subscription) []datastore.Subscription { +func matchSubscriptions(ctx context.Context, eventType string, subscriptions []datastore.Subscription, filterRepo datastore.FilterRepository) ([]datastore.Subscription, error) { var matched []datastore.Subscription for _, sub := range subscriptions { - for _, ev := range sub.FilterConfig.EventTypes { - if ev == eventType || ev == "*" { // if this event type matches, or is *, add the subscription to matched - matched = append(matched, sub) - } + // Check if there's a specific filter for this event type + filter, err := filterRepo.FindFilterBySubscriptionAndEventType(ctx, sub.UID, eventType) + if err != nil && !errors.Is(err, datastore.ErrFilterNotFound) { + return nil, err + } + + // If a specific filter exists, add the subscription + if filter != nil { + matched = append(matched, sub) + continue + } + + // Check for a catch-all filter + filter, err = filterRepo.FindFilterBySubscriptionAndEventType(ctx, sub.UID, "*") + if err != nil && !errors.Is(err, datastore.ErrFilterNotFound) { + return nil, err + } + + // If a catch-all filter exists, add the subscription + if filter != nil { + matched = append(matched, sub) } } - return matched + return matched, nil } func getEventDeliveryStatus(ctx context.Context, subscription *datastore.Subscription, endpoint *datastore.Endpoint, From 7c23645938defd148a6e725c77b6d4a37affbe3d Mon Sep 17 00:00:00 2001 From: Raymond Tukpe Date: Wed, 26 Feb 2025 21:02:46 +0100 Subject: [PATCH 2/4] update event creation code --- api/handlers/filter.go | 38 ++++++- database/postgres/event_types.go | 19 +++- datastore/repository.go | 1 + mocks/repository.go | 154 ++++++++++++++++++++++++++ pkg/compare/compare.go | 18 ++- sql/1740483415.sql | 2 +- worker/task/process_event_creation.go | 57 +++++----- 7 files changed, 250 insertions(+), 39 deletions(-) diff --git a/api/handlers/filter.go b/api/handlers/filter.go index 3a496de45c..6508d7a933 100644 --- a/api/handlers/filter.go +++ b/api/handlers/filter.go @@ -48,6 +48,7 @@ func (h *Handler) CreateFilter(w http.ResponseWriter, r *http.Request) { subRepo := postgres.NewSubscriptionRepo(h.A.DB) filterRepo := postgres.NewFilterRepo(h.A.DB) + eventTypeRepo := postgres.NewEventTypesRepo(h.A.DB) // Check if subscription exists _, err = subRepo.FindSubscriptionByID(r.Context(), projectID, subscriptionID) @@ -60,7 +61,19 @@ func (h *Handler) CreateFilter(w http.ResponseWriter, r *http.Request) { return } - // Check if filter with same event type already exists + // check if the event type exists in the project + exists, err := eventTypeRepo.CheckEventTypeExists(r.Context(), newFilter.EventType, projectID) + if err != nil { + _ = render.Render(w, r, util.NewErrorResponse(err.Error(), http.StatusNotFound)) + return + } + + if !exists { + _ = render.Render(w, r, util.NewErrorResponse("event type does not exist", http.StatusNotFound)) + return + } + + // Check if a filter with the same event type already exists existingFilter, err := filterRepo.FindFilterBySubscriptionAndEventType(r.Context(), subscriptionID, newFilter.EventType) if err != nil && err.Error() != datastore.ErrFilterNotFound.Error() { _ = render.Render(w, r, util.NewErrorResponse("failed to check for existing filter", http.StatusBadRequest)) @@ -141,7 +154,7 @@ func (h *Handler) GetFilter(w http.ResponseWriter, r *http.Request) { return } - // Check if filter belongs to the subscription + // Check if the filter belongs to the subscription if filter.SubscriptionID != subscriptionID { _ = render.Render(w, r, util.NewErrorResponse("filter does not belong to this subscription", http.StatusNotFound)) return @@ -244,6 +257,7 @@ func (h *Handler) UpdateFilter(w http.ResponseWriter, r *http.Request) { return } + eventTypeRepo := postgres.NewEventTypesRepo(h.A.DB) subRepo := postgres.NewSubscriptionRepo(h.A.DB) filterRepo := postgres.NewFilterRepo(h.A.DB) @@ -258,6 +272,18 @@ func (h *Handler) UpdateFilter(w http.ResponseWriter, r *http.Request) { return } + // check if the event-type exists in the project + exists, err := eventTypeRepo.CheckEventTypeExists(r.Context(), updateFilter.EventType, projectID) + if err != nil { + _ = render.Render(w, r, util.NewErrorResponse(err.Error(), http.StatusNotFound)) + return + } + + if !exists { + _ = render.Render(w, r, util.NewErrorResponse("event type does not exist", http.StatusNotFound)) + return + } + // Get the filter filter, err := filterRepo.FindFilterByID(r.Context(), filterID) if err != nil { @@ -357,7 +383,7 @@ func (h *Handler) DeleteFilter(w http.ResponseWriter, r *http.Request) { return } - // Check if filter belongs to the subscription + // Check if the filter belongs to the subscription if filter.SubscriptionID != subscriptionID { _ = render.Render(w, r, util.NewErrorResponse("filter does not belong to this subscription", http.StatusNotFound)) return @@ -381,9 +407,9 @@ func (h *Handler) DeleteFilter(w http.ResponseWriter, r *http.Request) { // @Tags Filters // @Accept json // @Produce json -// @Param projectID path string true "Project ID" -// @Param subscriptionID path string true "Subscription ID" -// @Param eventType path string true "Event Type" +// @Param projectID path string true "Project ID" +// @Param subscriptionID path string true "Subscription ID" +// @Param eventType path string true "Event Type" // @Param payload body models.TestFilterRequest true "Payload to test" // @Success 200 {object} util.ServerResponse{data=models.TestFilterResponse} // @Failure 400,401,404 {object} util.ServerResponse{data=Stub} diff --git a/database/postgres/event_types.go b/database/postgres/event_types.go index 3f0a8def63..ef658c25d3 100644 --- a/database/postgres/event_types.go +++ b/database/postgres/event_types.go @@ -42,6 +42,11 @@ const ( WHERE id = $1 and project_id = $2; ` + checkEventTypeExists = ` + SELECT exists(SELECT 1 FROM convoy.event_types + WHERE name = $1 and project_id = $2); + ` + fetchAllEventTypes = ` SELECT * FROM convoy.event_types where project_id = $1; ` @@ -147,7 +152,6 @@ func (e *eventTypesRepo) DeprecateEventType(ctx context.Context, id, projectId s return eventType, nil } -// FetchEventTypeById to update func (e *eventTypesRepo) FetchEventTypeById(ctx context.Context, id, projectId string) (*datastore.ProjectEventType, error) { eventType := &datastore.ProjectEventType{} err := e.db.GetDB().QueryRowxContext(ctx, fetchEventTypeById, id, projectId).StructScan(eventType) @@ -161,6 +165,19 @@ func (e *eventTypesRepo) FetchEventTypeById(ctx context.Context, id, projectId s return eventType, nil } +func (e *eventTypesRepo) CheckEventTypeExists(ctx context.Context, name, projectId string) (bool, error) { + var exists bool + err := e.db.GetDB().QueryRowxContext(ctx, checkEventTypeExists, name, projectId).Scan(&exists) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return false, nil + } + return false, err + } + + return exists, nil +} + func (e *eventTypesRepo) FetchAllEventTypes(ctx context.Context, projectId string) ([]datastore.ProjectEventType, error) { var eventTypes []datastore.ProjectEventType rows, err := e.db.GetReadDB().QueryxContext(ctx, fetchAllEventTypes, projectId) diff --git a/datastore/repository.go b/datastore/repository.go index 62906b65f7..8466d61a20 100644 --- a/datastore/repository.go +++ b/datastore/repository.go @@ -239,6 +239,7 @@ type EventTypesRepository interface { DeprecateEventType(context.Context, string, string) (*ProjectEventType, error) FetchEventTypeById(context.Context, string, string) (*ProjectEventType, error) FetchAllEventTypes(context.Context, string) ([]ProjectEventType, error) + CheckEventTypeExists(context.Context, string, string) (bool, error) } // Filter errors diff --git a/mocks/repository.go b/mocks/repository.go index 6a55dde8ac..6b8c5b0208 100644 --- a/mocks/repository.go +++ b/mocks/repository.go @@ -1739,6 +1739,145 @@ func (mr *MockSubscriptionRepositoryMockRecorder) UpdateSubscription(ctx, projec return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateSubscription", reflect.TypeOf((*MockSubscriptionRepository)(nil).UpdateSubscription), ctx, projectID, subscription) } +// MockFilterRepository is a mock of FilterRepository interface. +type MockFilterRepository struct { + ctrl *gomock.Controller + recorder *MockFilterRepositoryMockRecorder +} + +// MockFilterRepositoryMockRecorder is the mock recorder for MockFilterRepository. +type MockFilterRepositoryMockRecorder struct { + mock *MockFilterRepository +} + +// NewMockFilterRepository creates a new mock instance. +func NewMockFilterRepository(ctrl *gomock.Controller) *MockFilterRepository { + mock := &MockFilterRepository{ctrl: ctrl} + mock.recorder = &MockFilterRepositoryMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockFilterRepository) EXPECT() *MockFilterRepositoryMockRecorder { + return m.recorder +} + +// CreateFilter mocks base method. +func (m *MockFilterRepository) CreateFilter(ctx context.Context, filter *datastore.EventTypeFilter) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateFilter", ctx, filter) + ret0, _ := ret[0].(error) + return ret0 +} + +// CreateFilter indicates an expected call of CreateFilter. +func (mr *MockFilterRepositoryMockRecorder) CreateFilter(ctx, filter any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateFilter", reflect.TypeOf((*MockFilterRepository)(nil).CreateFilter), ctx, filter) +} + +// CreateFilters mocks base method. +func (m *MockFilterRepository) CreateFilters(ctx context.Context, filters []datastore.EventTypeFilter) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CreateFilters", ctx, filters) + ret0, _ := ret[0].(error) + return ret0 +} + +// CreateFilters indicates an expected call of CreateFilters. +func (mr *MockFilterRepositoryMockRecorder) CreateFilters(ctx, filters any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateFilters", reflect.TypeOf((*MockFilterRepository)(nil).CreateFilters), ctx, filters) +} + +// DeleteFilter mocks base method. +func (m *MockFilterRepository) DeleteFilter(ctx context.Context, filterID string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "DeleteFilter", ctx, filterID) + ret0, _ := ret[0].(error) + return ret0 +} + +// DeleteFilter indicates an expected call of DeleteFilter. +func (mr *MockFilterRepositoryMockRecorder) DeleteFilter(ctx, filterID any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteFilter", reflect.TypeOf((*MockFilterRepository)(nil).DeleteFilter), ctx, filterID) +} + +// FindFilterByID mocks base method. +func (m *MockFilterRepository) FindFilterByID(ctx context.Context, filterID string) (*datastore.EventTypeFilter, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FindFilterByID", ctx, filterID) + ret0, _ := ret[0].(*datastore.EventTypeFilter) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FindFilterByID indicates an expected call of FindFilterByID. +func (mr *MockFilterRepositoryMockRecorder) FindFilterByID(ctx, filterID any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindFilterByID", reflect.TypeOf((*MockFilterRepository)(nil).FindFilterByID), ctx, filterID) +} + +// FindFilterBySubscriptionAndEventType mocks base method. +func (m *MockFilterRepository) FindFilterBySubscriptionAndEventType(ctx context.Context, subscriptionID, eventType string) (*datastore.EventTypeFilter, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FindFilterBySubscriptionAndEventType", ctx, subscriptionID, eventType) + ret0, _ := ret[0].(*datastore.EventTypeFilter) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FindFilterBySubscriptionAndEventType indicates an expected call of FindFilterBySubscriptionAndEventType. +func (mr *MockFilterRepositoryMockRecorder) FindFilterBySubscriptionAndEventType(ctx, subscriptionID, eventType any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindFilterBySubscriptionAndEventType", reflect.TypeOf((*MockFilterRepository)(nil).FindFilterBySubscriptionAndEventType), ctx, subscriptionID, eventType) +} + +// FindFiltersBySubscriptionID mocks base method. +func (m *MockFilterRepository) FindFiltersBySubscriptionID(ctx context.Context, subscriptionID string) ([]datastore.EventTypeFilter, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FindFiltersBySubscriptionID", ctx, subscriptionID) + ret0, _ := ret[0].([]datastore.EventTypeFilter) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FindFiltersBySubscriptionID indicates an expected call of FindFiltersBySubscriptionID. +func (mr *MockFilterRepositoryMockRecorder) FindFiltersBySubscriptionID(ctx, subscriptionID any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindFiltersBySubscriptionID", reflect.TypeOf((*MockFilterRepository)(nil).FindFiltersBySubscriptionID), ctx, subscriptionID) +} + +// TestFilter mocks base method. +func (m *MockFilterRepository) TestFilter(ctx context.Context, subscriptionID, eventType string, payload any) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TestFilter", ctx, subscriptionID, eventType, payload) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// TestFilter indicates an expected call of TestFilter. +func (mr *MockFilterRepositoryMockRecorder) TestFilter(ctx, subscriptionID, eventType, payload any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TestFilter", reflect.TypeOf((*MockFilterRepository)(nil).TestFilter), ctx, subscriptionID, eventType, payload) +} + +// UpdateFilter mocks base method. +func (m *MockFilterRepository) UpdateFilter(ctx context.Context, filter *datastore.EventTypeFilter) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UpdateFilter", ctx, filter) + ret0, _ := ret[0].(error) + return ret0 +} + +// UpdateFilter indicates an expected call of UpdateFilter. +func (mr *MockFilterRepositoryMockRecorder) UpdateFilter(ctx, filter any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateFilter", reflect.TypeOf((*MockFilterRepository)(nil).UpdateFilter), ctx, filter) +} + // MockSourceRepository is a mock of SourceRepository interface. type MockSourceRepository struct { ctrl *gomock.Controller @@ -2760,6 +2899,21 @@ func (m *MockEventTypesRepository) EXPECT() *MockEventTypesRepositoryMockRecorde return m.recorder } +// CheckEventTypeExists mocks base method. +func (m *MockEventTypesRepository) CheckEventTypeExists(ctx context.Context, name, projectId string) (bool, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "CheckEventTypeExists", ctx, name, projectId) + ret0, _ := ret[0].(bool) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// CheckEventTypeExists indicates an expected call of CheckEventTypeExists. +func (mr *MockEventTypesRepositoryMockRecorder) CheckEventTypeExists(ctx, name, projectId any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CheckEventTypeExists", reflect.TypeOf((*MockEventTypesRepository)(nil).CheckEventTypeExists), ctx, name, projectId) +} + // CreateDefaultEventType mocks base method. func (m *MockEventTypesRepository) CreateDefaultEventType(ctx context.Context, projectId string) error { m.ctrl.T.Helper() diff --git a/pkg/compare/compare.go b/pkg/compare/compare.go index ed978dac5a..a70afd4461 100644 --- a/pkg/compare/compare.go +++ b/pkg/compare/compare.go @@ -376,7 +376,7 @@ func exist(payload, filter interface{}) (bool, error) { return b == want, nil } -// toFloat64 converts interface{} value to float64 if value is numeric else return false +// toFloat64 converts interface{} value to float64 if the value is numeric, else return false func toFloat64(v interface{}) (float64, bool) { var f float64 flag := true @@ -397,7 +397,14 @@ func toFloat64(v interface{}) (float64, bool) { case float64: f = u default: - flag = false + // finally, try to convert it to a numeric type + ff, err := strconv.ParseFloat(fmt.Sprintf("%+v", v), 64) + if err != nil { + flag = false + return 0, flag + } + + return ff, true } return f, flag } @@ -463,9 +470,10 @@ func genCombos(payload map[string]interface{}, s string) ([]string, error) { } // generateCombinations takes an array of strings representing a combination of -// non-replaced segments and "$" characters, a current index, and a maximum integer value n, -// generates all possible combinations of integers from 0 to n for each "$" character, -// and returns a slice of strings representing all possible combinations. +// non-replaced segments and "$" characters, a current index, and a maximum +// integer value n, generates all possible combinations of integers from 0 to n +// for each "$" character, and returns a slice of strings representing all +// possible combinations. func generateCombinations(combinations []string, index int, n int) []string { if index >= len(combinations) { return []string{strings.Join(combinations, "")} diff --git a/sql/1740483415.sql b/sql/1740483415.sql index e5cfd2b685..c27847b63c 100644 --- a/sql/1740483415.sql +++ b/sql/1740483415.sql @@ -17,7 +17,7 @@ CREATE TABLE IF NOT EXISTS convoy.filters ( CREATE INDEX idx_filters_subscription_id ON convoy.filters(subscription_id); CREATE INDEX idx_filters_event_type ON convoy.filters(event_type); -CREATE INDEX idx_filters_subscription_event_type ON convoy.filters(subscription_id, event_type); +CREATE UNIQUE INDEX idx_filters_subscription_event_type ON convoy.filters(subscription_id, event_type); -- Migrate existing subscription filters to the new filters table. For each subscription event type, create a filter. INSERT INTO convoy.filters ( diff --git a/worker/task/process_event_creation.go b/worker/task/process_event_creation.go index 2c168ac37e..9f8b9be3a7 100644 --- a/worker/task/process_event_creation.go +++ b/worker/task/process_event_creation.go @@ -355,30 +355,30 @@ func findSubscriptions(ctx context.Context, endpointRepo datastore.EndpointRepos return subscriptions, &EndpointError{Err: err, delay: defaultDelay} } - subs, err := subRepo.FindSubscriptionsByEndpointID(ctx, project.UID, endpoint.UID) - if err != nil { - return subscriptions, &EndpointError{Err: errors.New("error fetching subscriptions for event type"), delay: defaultDelay} + subs, innerErr := subRepo.FindSubscriptionsByEndpointID(ctx, project.UID, endpoint.UID) + if innerErr != nil { + return subscriptions, &EndpointError{Err: fmt.Errorf("error fetching subscriptions for event type"), delay: defaultDelay} } if len(subs) == 0 && shouldCreateSubscription { - subs := generateSubscription(project, endpoint) - err := subRepo.CreateSubscription(ctx, project.UID, subs) - if err != nil { - return subscriptions, &EndpointError{Err: errors.New("error creating subscription for endpoint"), delay: defaultDelay} + genSubs := generateSubscription(project, endpoint) + createSubErr := subRepo.CreateSubscription(ctx, project.UID, genSubs) + if createSubErr != nil { + return subscriptions, &EndpointError{Err: fmt.Errorf("error creating subscription for endpoint: %v", createSubErr), delay: defaultDelay} } - subscriptions = append(subscriptions, *subs) + subscriptions = append(subscriptions, *genSubs) return subscriptions, nil } - matchedSubs, err := matchSubscriptions(ctx, string(event.EventType), subs, filterRepo) - if err != nil { - return subscriptions, &EndpointError{Err: errors.New("error matching subscriptions for event type"), delay: defaultDelay} + matchedSubs, innerErr := matchSubscriptions(ctx, string(event.EventType), subs, filterRepo) + if innerErr != nil { + return subscriptions, &EndpointError{Err: fmt.Errorf("error matching subscriptions for event type: %v", innerErr), delay: defaultDelay} } - matchedSubs, err = matchSubscriptionsUsingFilter(ctx, event, subRepo, filterRepo, licenser, matchedSubs, false) - if err != nil { - return subscriptions, &EndpointError{Err: errors.New("error fetching subscriptions for event type"), delay: defaultDelay} + matchedSubs, innerErr = matchSubscriptionsUsingFilter(ctx, event, subRepo, filterRepo, licenser, matchedSubs, false) + if innerErr != nil { + return subscriptions, &EndpointError{Err: fmt.Errorf("error fetching subscriptions for event type: %v", innerErr), delay: defaultDelay} } subscriptions = append(subscriptions, matchedSubs...) @@ -390,14 +390,14 @@ func findSubscriptions(ctx context.Context, endpointRepo datastore.EndpointRepos } if len(subscriptions) > 0 { - matchedSubs, err := matchSubscriptions(ctx, string(event.EventType), subscriptions, filterRepo) - if err != nil { - return subscriptions, &EndpointError{Err: errors.New("error matching subscriptions for event type"), delay: defaultDelay} + matchedSubs, innerErr := matchSubscriptions(ctx, string(event.EventType), subscriptions, filterRepo) + if innerErr != nil { + return subscriptions, &EndpointError{Err: fmt.Errorf("error matching subscriptions for event type: %v", innerErr), delay: defaultDelay} } - matchedSubs, err = matchSubscriptionsUsingFilter(ctx, event, subRepo, filterRepo, licenser, matchedSubs, false) - if err != nil { - return subscriptions, &EndpointError{Err: errors.New("error fetching subscriptions for event type"), delay: defaultDelay} + matchedSubs, innerErr = matchSubscriptionsUsingFilter(ctx, event, subRepo, filterRepo, licenser, matchedSubs, false) + if innerErr != nil { + return subscriptions, &EndpointError{Err: fmt.Errorf("error fetching subscriptions for event type: %v", innerErr), delay: defaultDelay} } subscriptions = matchedSubs @@ -406,7 +406,7 @@ func findSubscriptions(ctx context.Context, endpointRepo datastore.EndpointRepos subscriptions, err = matchSubscriptionsUsingFilter(ctx, event, subRepo, filterRepo, licenser, subscriptions, false) if err != nil { log.WithError(err).Error("error find a matching subscription for this source") - return subscriptions, &EndpointError{Err: errors.New("error find a matching subscription for this source"), delay: defaultDelay} + return subscriptions, &EndpointError{Err: fmt.Errorf("error find a matching subscription for this source: %v", err), delay: defaultDelay} } } @@ -418,6 +418,8 @@ func matchSubscriptionsUsingFilter(ctx context.Context, e *datastore.Event, subR return subscriptions, nil } + // fmt.Printf("matched %+v\n", subscriptions) + var matched []datastore.Subscription // payload is interface{} and not map[string]interface{} because @@ -449,17 +451,17 @@ func matchSubscriptionsUsingFilter(ctx context.Context, e *datastore.Event, subR // First check if there's a specific filter for this event type filter, innerErr := filterRepo.FindFilterBySubscriptionAndEventType(ctx, s.UID, string(e.EventType)) - if innerErr != nil && !errors.Is(innerErr, datastore.ErrFilterNotFound) && soft { + if innerErr != nil && innerErr.Error() != datastore.ErrFilterNotFound.Error() && soft { log.WithError(innerErr).Errorf("failed to find filter for subscription (%s) and event type (%s)", s.UID, e.EventType) continue - } else if innerErr != nil && !errors.Is(innerErr, datastore.ErrFilterNotFound) { + } else if innerErr != nil && innerErr.Error() != datastore.ErrFilterNotFound.Error() { return nil, innerErr } // If no specific filter found, try to find a catch-all filter if filter == nil { filter, innerErr = filterRepo.FindFilterBySubscriptionAndEventType(ctx, s.UID, "*") - if innerErr != nil && !errors.Is(innerErr, datastore.ErrFilterNotFound) && soft { + if innerErr != nil && innerErr.Error() != datastore.ErrFilterNotFound.Error() && soft { log.WithError(innerErr).Errorf("failed to find catch-all filter for subscription (%s)", s.UID) continue } else if innerErr != nil && !errors.Is(innerErr, datastore.ErrFilterNotFound) { @@ -470,6 +472,7 @@ func matchSubscriptionsUsingFilter(ctx context.Context, e *datastore.Event, subR // If no filter found at all, or filter has no conditions, match the subscription if filter == nil || (len(filter.Body) == 0 && len(filter.Headers) == 0) { matched = append(matched, *s) + fmt.Printf("empty match: %+v %s\n", filter.EventType, s.UID) continue } @@ -492,8 +495,10 @@ func matchSubscriptionsUsingFilter(ctx context.Context, e *datastore.Event, subR isMatched := isHeaderMatched && isBodyMatched if isMatched { + fmt.Printf("m_bool: %v ?? b_bool: %v, h_bool: %v ?? h: %v hh: %v\n", isMatched, isBodyMatched, isHeaderMatched, headers, filter.Headers) matched = append(matched, *s) } + } return matched, nil @@ -504,7 +509,7 @@ func matchSubscriptions(ctx context.Context, eventType string, subscriptions []d for _, sub := range subscriptions { // Check if there's a specific filter for this event type filter, err := filterRepo.FindFilterBySubscriptionAndEventType(ctx, sub.UID, eventType) - if err != nil && !errors.Is(err, datastore.ErrFilterNotFound) { + if err != nil && err.Error() != datastore.ErrFilterNotFound.Error() { return nil, err } @@ -516,7 +521,7 @@ func matchSubscriptions(ctx context.Context, eventType string, subscriptions []d // Check for a catch-all filter filter, err = filterRepo.FindFilterBySubscriptionAndEventType(ctx, sub.UID, "*") - if err != nil && !errors.Is(err, datastore.ErrFilterNotFound) { + if err != nil && err.Error() != datastore.ErrFilterNotFound.Error() { return nil, err } From 7980ad2ee03be420b1a23e0a70f3006ab5f54d5c Mon Sep 17 00:00:00 2001 From: Raymond Tukpe Date: Thu, 27 Feb 2025 12:24:16 +0100 Subject: [PATCH 3/4] update migration down query --- sql/1740483415.sql | 22 +++++++++++++++++++++- worker/task/process_event_creation.go | 6 ++---- 2 files changed, 23 insertions(+), 5 deletions(-) diff --git a/sql/1740483415.sql b/sql/1740483415.sql index c27847b63c..0a87622607 100644 --- a/sql/1740483415.sql +++ b/sql/1740483415.sql @@ -41,5 +41,25 @@ FROM convoy.subscriptions WHERE deleted_at IS NULL; -- +migrate Down -DROP TABLE IF EXISTS convoy.filters; +WITH catch_all_filters AS ( + SELECT + subscription_id, + headers, + body, + raw_headers, + raw_body + FROM convoy.filters + WHERE event_type = '*' +) +UPDATE convoy.subscriptions s +SET + filter_config_filter_headers = c.headers, + filter_config_filter_body = c.body, + filter_config_filter_raw_headers = c.raw_headers, + filter_config_filter_raw_body = c.raw_body +FROM catch_all_filters c +WHERE s.id = c.subscription_id + AND s.deleted_at IS NULL; +-- +migrate Down +DROP TABLE IF EXISTS convoy.filters; diff --git a/worker/task/process_event_creation.go b/worker/task/process_event_creation.go index 9f8b9be3a7..b52712a7bd 100644 --- a/worker/task/process_event_creation.go +++ b/worker/task/process_event_creation.go @@ -80,7 +80,7 @@ func (d *DefaultEventChannel) CreateEvent(ctx context.Context, t *asynq.Task, ch err := msgpack.DecodeMsgPack(t.Payload(), &createEvent) if err != nil { - err := json.Unmarshal(t.Payload(), &createEvent) + err = json.Unmarshal(t.Payload(), &createEvent) if err != nil { args.tracerBackend.Capture(ctx, "event.creation.error", attributes, startTime, time.Now()) return nil, &EndpointError{Err: err, delay: defaultDelay} @@ -115,7 +115,7 @@ func (d *DefaultEventChannel) CreateEvent(ctx context.Context, t *asynq.Task, ch _, err = args.eventRepo.FindEventByID(ctx, project.UID, event.UID) if err != nil { // 404 - err := updateEventMetadata(channel, event, createEvent.CreateSubscription) + err = updateEventMetadata(channel, event, createEvent.CreateSubscription) if err != nil { args.tracerBackend.Capture(ctx, "event.creation.error", attributes, startTime, time.Now()) return nil, err @@ -472,7 +472,6 @@ func matchSubscriptionsUsingFilter(ctx context.Context, e *datastore.Event, subR // If no filter found at all, or filter has no conditions, match the subscription if filter == nil || (len(filter.Body) == 0 && len(filter.Headers) == 0) { matched = append(matched, *s) - fmt.Printf("empty match: %+v %s\n", filter.EventType, s.UID) continue } @@ -495,7 +494,6 @@ func matchSubscriptionsUsingFilter(ctx context.Context, e *datastore.Event, subR isMatched := isHeaderMatched && isBodyMatched if isMatched { - fmt.Printf("m_bool: %v ?? b_bool: %v, h_bool: %v ?? h: %v hh: %v\n", isMatched, isBodyMatched, isHeaderMatched, headers, filter.Headers) matched = append(matched, *s) } From 8c91c0ad3ff942a031b145e8bd5d738483b44f38 Mon Sep 17 00:00:00 2001 From: Raymond Tukpe Date: Fri, 28 Feb 2025 14:26:17 +0100 Subject: [PATCH 4/4] update tests --- mocks/repository.go | 8 +- .../process_broadcast_event_creation_test.go | 2 +- .../process_dynamic_event_creation_test.go | 2 +- worker/task/process_event_channel.go | 5 +- worker/task/process_event_creation_test.go | 136 +++++++++++++++--- 5 files changed, 129 insertions(+), 24 deletions(-) diff --git a/mocks/repository.go b/mocks/repository.go index 6b8c5b0208..1d29d6e712 100644 --- a/mocks/repository.go +++ b/mocks/repository.go @@ -2900,18 +2900,18 @@ func (m *MockEventTypesRepository) EXPECT() *MockEventTypesRepositoryMockRecorde } // CheckEventTypeExists mocks base method. -func (m *MockEventTypesRepository) CheckEventTypeExists(ctx context.Context, name, projectId string) (bool, error) { +func (m *MockEventTypesRepository) CheckEventTypeExists(arg0 context.Context, arg1, arg2 string) (bool, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CheckEventTypeExists", ctx, name, projectId) + ret := m.ctrl.Call(m, "CheckEventTypeExists", arg0, arg1, arg2) ret0, _ := ret[0].(bool) ret1, _ := ret[1].(error) return ret0, ret1 } // CheckEventTypeExists indicates an expected call of CheckEventTypeExists. -func (mr *MockEventTypesRepositoryMockRecorder) CheckEventTypeExists(ctx, name, projectId any) *gomock.Call { +func (mr *MockEventTypesRepositoryMockRecorder) CheckEventTypeExists(arg0, arg1, arg2 any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CheckEventTypeExists", reflect.TypeOf((*MockEventTypesRepository)(nil).CheckEventTypeExists), ctx, name, projectId) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CheckEventTypeExists", reflect.TypeOf((*MockEventTypesRepository)(nil).CheckEventTypeExists), arg0, arg1, arg2) } // CreateDefaultEventType mocks base method. diff --git a/worker/task/process_broadcast_event_creation_test.go b/worker/task/process_broadcast_event_creation_test.go index 5a4331dc0f..179fb61a55 100644 --- a/worker/task/process_broadcast_event_creation_test.go +++ b/worker/task/process_broadcast_event_creation_test.go @@ -90,7 +90,7 @@ func TestProcessBroadcastEventCreation(t *testing.T) { task := asynq.NewTask(string(convoy.EventProcessor), job.Payload, asynq.Queue(string(convoy.EventQueue)), asynq.ProcessIn(job.Delay)) - fn := ProcessBroadcastEventCreation(NewBroadcastEventChannel(args.subTable), args.endpointRepo, args.eventRepo, args.projectRepo, args.eventDeliveryRepo, args.eventQueue, args.subRepo, args.deviceRepo, args.licenser, args.tracer) + fn := ProcessBroadcastEventCreation(NewBroadcastEventChannel(args.subTable), args.endpointRepo, args.eventRepo, args.projectRepo, args.eventQueue, args.subRepo, args.filterRepo, args.licenser, args.tracer) err = fn(context.Background(), task) if tt.wantErr { require.NotNil(t, err) diff --git a/worker/task/process_dynamic_event_creation_test.go b/worker/task/process_dynamic_event_creation_test.go index ab47ec0002..a2fc846a6d 100644 --- a/worker/task/process_dynamic_event_creation_test.go +++ b/worker/task/process_dynamic_event_creation_test.go @@ -130,7 +130,7 @@ func TestProcessDynamicEventCreation(t *testing.T) { task := asynq.NewTask(string(convoy.EventProcessor), job.Payload, asynq.Queue(string(convoy.EventQueue)), asynq.ProcessIn(job.Delay)) - fn := ProcessDynamicEventCreation(NewDynamicEventChannel(), args.endpointRepo, args.eventRepo, args.projectRepo, args.eventDeliveryRepo, args.eventQueue, args.subRepo, args.deviceRepo, args.licenser, args.tracer) + fn := ProcessDynamicEventCreation(args.endpointRepo, args.eventRepo, args.projectRepo, args.eventQueue, args.subRepo, args.filterRepo, args.licenser, args.tracer) err = fn(context.Background(), task) if tt.wantErr { require.NotNil(t, err) diff --git a/worker/task/process_event_channel.go b/worker/task/process_event_channel.go index 48df62a5a7..b8bad2a6df 100644 --- a/worker/task/process_event_channel.go +++ b/worker/task/process_event_channel.go @@ -52,7 +52,10 @@ type EventChannel interface { MatchSubscriptions(context.Context, EventChannelMetadata, EventChannelArgs) (*EventChannelSubResponse, error) } -func ProcessEventCreationByChannel(channel EventChannel, endpointRepo datastore.EndpointRepository, eventRepo datastore.EventRepository, projectRepo datastore.ProjectRepository, eventQueue queue.Queuer, subRepo datastore.SubscriptionRepository, filterRepo datastore.FilterRepository, licenser license.Licenser, tracerBackend tracer.Backend) func(context.Context, *asynq.Task) error { +func ProcessEventCreationByChannel(channel EventChannel, endpointRepo datastore.EndpointRepository, + eventRepo datastore.EventRepository, projectRepo datastore.ProjectRepository, + eventQueue queue.Queuer, subRepo datastore.SubscriptionRepository, filterRepo datastore.FilterRepository, + licenser license.Licenser, tracerBackend tracer.Backend) func(context.Context, *asynq.Task) error { return func(ctx context.Context, t *asynq.Task) error { cfg := channel.GetConfig() diff --git a/worker/task/process_event_creation_test.go b/worker/task/process_event_creation_test.go index 63c3ec69e3..183fcedeb4 100644 --- a/worker/task/process_event_creation_test.go +++ b/worker/task/process_event_creation_test.go @@ -34,6 +34,7 @@ type testArgs struct { cache cache.Cache eventQueue queue.Queuer subRepo datastore.SubscriptionRepository + filterRepo datastore.FilterRepository deviceRepo datastore.DeviceRepository subTable memorystore.ITable licenser license.Licenser @@ -51,6 +52,7 @@ func provideArgs(ctrl *gomock.Controller) *testArgs { subRepo := mocks.NewMockSubscriptionRepository(ctrl) db := mocks.NewMockDatabase(ctrl) subTable := mocks.NewMockITable(ctrl) + filterRepo := mocks.NewMockFilterRepository(ctrl) mockTracer := mocks.NewMockBackend(ctrl) return &testArgs{ @@ -64,6 +66,7 @@ func provideArgs(ctrl *gomock.Controller) *testArgs { eventQueue: mockQueuer, subRepo: subRepo, subTable: subTable, + filterRepo: filterRepo, licenser: mocks.NewMockLicenser(ctrl), tracer: mockTracer, } @@ -343,7 +346,9 @@ func TestProcessEventCreated(t *testing.T) { task := asynq.NewTask(string(convoy.EventProcessor), job.Payload, asynq.Queue(string(convoy.EventQueue)), asynq.ProcessIn(job.Delay)) - fn := ProcessEventCreation(NewDefaultEventChannel(), args.endpointRepo, args.eventRepo, args.projectRepo, args.eventDeliveryRepo, args.eventQueue, args.subRepo, args.deviceRepo, args.licenser, args.tracer) + fn := ProcessEventCreation(args.endpointRepo, args.eventRepo, + args.projectRepo, args.eventQueue, args.subRepo, + args.filterRepo, args.licenser, args.tracer) err = fn(context.Background(), task) if tt.wantErr { require.NotNil(t, err) @@ -376,7 +381,14 @@ func TestMatchSubscriptionsUsingFilter(t *testing.T) { }, dbFn: func(args *testArgs) { s, _ := args.subRepo.(*mocks.MockSubscriptionRepository) - s.EXPECT().CompareFlattenedPayload(gomock.Any(), gomock.Any(), gomock.Any(), false).Times(2).Return(true, nil) + s.EXPECT().CompareFlattenedPayload(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(2).Return(true, nil) + + fe, _ := args.filterRepo.(*mocks.MockFilterRepository) + fe.EXPECT().FindFilterBySubscriptionAndEventType(gomock.Any(), gomock.Any(), gomock.Any()).Times(1). + Return(&datastore.EventTypeFilter{Body: map[string]interface{}{"person.age": 10}}, nil) + + fe.EXPECT().FindFilterBySubscriptionAndEventType(gomock.Any(), gomock.Any(), gomock.Any()).Times(1). + Return(&datastore.EventTypeFilter{Body: map[string]interface{}{}}, nil) licenser, _ := args.licenser.(*mocks.MockLicenser) licenser.EXPECT().AdvancedSubscriptions().Times(1).Return(true) @@ -455,8 +467,15 @@ func TestMatchSubscriptionsUsingFilter(t *testing.T) { }, dbFn: func(args *testArgs) { s, _ := args.subRepo.(*mocks.MockSubscriptionRepository) - s.EXPECT().CompareFlattenedPayload(gomock.Any(), gomock.Any(), gomock.Any(), false).Times(2).Return(true, nil) - s.EXPECT().CompareFlattenedPayload(gomock.Any(), gomock.Any(), gomock.Any(), false).Times(2).Return(false, nil) + s.EXPECT().CompareFlattenedPayload(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(2).Return(true, nil) + s.EXPECT().CompareFlattenedPayload(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(2).Return(false, nil) + + fe, _ := args.filterRepo.(*mocks.MockFilterRepository) + fe.EXPECT().FindFilterBySubscriptionAndEventType(gomock.Any(), gomock.Any(), gomock.Any()).Times(1). + Return(&datastore.EventTypeFilter{Body: map[string]interface{}{"person.age": 10}}, nil) + + fe.EXPECT().FindFilterBySubscriptionAndEventType(gomock.Any(), gomock.Any(), gomock.Any()).Times(1). + Return(&datastore.EventTypeFilter{Body: map[string]interface{}{"person.age": 5}}, nil) licenser, _ := args.licenser.(*mocks.MockLicenser) licenser.EXPECT().AdvancedSubscriptions().Times(1).Return(true) @@ -490,8 +509,15 @@ func TestMatchSubscriptionsUsingFilter(t *testing.T) { }, dbFn: func(args *testArgs) { s, _ := args.subRepo.(*mocks.MockSubscriptionRepository) - s.EXPECT().CompareFlattenedPayload(gomock.Any(), gomock.Any(), gomock.Any(), false).Times(2).Return(true, nil) - s.EXPECT().CompareFlattenedPayload(gomock.Any(), gomock.Any(), gomock.Any(), false).Times(2).Return(false, nil) + s.EXPECT().CompareFlattenedPayload(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(2).Return(true, nil) + s.EXPECT().CompareFlattenedPayload(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(2).Return(false, nil) + + fe, _ := args.filterRepo.(*mocks.MockFilterRepository) + fe.EXPECT().FindFilterBySubscriptionAndEventType(gomock.Any(), gomock.Any(), gomock.Any()).Times(1). + Return(&datastore.EventTypeFilter{Body: map[string]interface{}{"person.age": map[string]interface{}{"$eq": 10}}}, nil) + + fe.EXPECT().FindFilterBySubscriptionAndEventType(gomock.Any(), gomock.Any(), gomock.Any()).Times(1). + Return(&datastore.EventTypeFilter{Body: map[string]interface{}{"person.age": 5}}, nil) licenser, _ := args.licenser.(*mocks.MockLicenser) licenser.EXPECT().AdvancedSubscriptions().Times(1).Return(true) @@ -531,8 +557,17 @@ func TestMatchSubscriptionsUsingFilter(t *testing.T) { }, dbFn: func(args *testArgs) { s, _ := args.subRepo.(*mocks.MockSubscriptionRepository) - s.EXPECT().CompareFlattenedPayload(gomock.Any(), gomock.Any(), gomock.Any(), false).Times(2).Return(false, nil) - s.EXPECT().CompareFlattenedPayload(gomock.Any(), gomock.Any(), gomock.Any(), false).Times(2).Return(true, nil) + s.EXPECT().CompareFlattenedPayload(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(2).Return(false, nil) + s.EXPECT().CompareFlattenedPayload(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(2).Return(true, nil) + + fe, _ := args.filterRepo.(*mocks.MockFilterRepository) + fe.EXPECT().FindFilterBySubscriptionAndEventType(gomock.Any(), gomock.Any(), gomock.Any()).Times(1). + Return(&datastore.EventTypeFilter{Body: map[string]interface{}{"person.age": map[string]interface{}{ + "$neq": 10, + }}}, nil) + + fe.EXPECT().FindFilterBySubscriptionAndEventType(gomock.Any(), gomock.Any(), gomock.Any()).Times(1). + Return(&datastore.EventTypeFilter{Body: map[string]interface{}{"person.age": 10}}, nil) licenser, _ := args.licenser.(*mocks.MockLicenser) licenser.EXPECT().AdvancedSubscriptions().Times(1).Return(true) @@ -572,8 +607,23 @@ func TestMatchSubscriptionsUsingFilter(t *testing.T) { }, dbFn: func(args *testArgs) { s, _ := args.subRepo.(*mocks.MockSubscriptionRepository) - s.EXPECT().CompareFlattenedPayload(gomock.Any(), gomock.Any(), gomock.Any(), false).Times(2).Return(true, nil) - s.EXPECT().CompareFlattenedPayload(gomock.Any(), gomock.Any(), gomock.Any(), false).Times(2).Return(false, nil) + s.EXPECT().CompareFlattenedPayload(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(2).Return(true, nil) + s.EXPECT().CompareFlattenedPayload(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(2).Return(false, nil) + + fe, _ := args.filterRepo.(*mocks.MockFilterRepository) + fe.EXPECT().FindFilterBySubscriptionAndEventType(gomock.Any(), gomock.Any(), gomock.Any()).Times(1). + Return(&datastore.EventTypeFilter{Body: map[string]interface{}{ + "person.age": map[string]interface{}{ + "$gt": 10, + }, + }}, nil) + + fe.EXPECT().FindFilterBySubscriptionAndEventType(gomock.Any(), gomock.Any(), gomock.Any()).Times(1). + Return(&datastore.EventTypeFilter{Body: map[string]interface{}{ + "person.age": map[string]interface{}{ + "$gte": 10, + }, + }}, nil) licenser, _ := args.licenser.(*mocks.MockLicenser) licenser.EXPECT().AdvancedSubscriptions().Times(1).Return(true) @@ -619,7 +669,22 @@ func TestMatchSubscriptionsUsingFilter(t *testing.T) { }, dbFn: func(args *testArgs) { s, _ := args.subRepo.(*mocks.MockSubscriptionRepository) - s.EXPECT().CompareFlattenedPayload(gomock.Any(), gomock.Any(), gomock.Any(), false).Times(4).Return(true, nil) + s.EXPECT().CompareFlattenedPayload(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(4).Return(true, nil) + + fe, _ := args.filterRepo.(*mocks.MockFilterRepository) + fe.EXPECT().FindFilterBySubscriptionAndEventType(gomock.Any(), gomock.Any(), gomock.Any()).Times(1). + Return(&datastore.EventTypeFilter{Body: map[string]interface{}{ + "person.age": map[string]interface{}{ + "$lt": 10, + }, + }}, nil) + + fe.EXPECT().FindFilterBySubscriptionAndEventType(gomock.Any(), gomock.Any(), gomock.Any()).Times(1). + Return(&datastore.EventTypeFilter{Body: map[string]interface{}{ + "person.age": map[string]interface{}{ + "$lte": 10, + }, + }}, nil) licenser, _ := args.licenser.(*mocks.MockLicenser) licenser.EXPECT().AdvancedSubscriptions().Times(1).Return(true) @@ -668,8 +733,30 @@ func TestMatchSubscriptionsUsingFilter(t *testing.T) { }, dbFn: func(args *testArgs) { s, _ := args.subRepo.(*mocks.MockSubscriptionRepository) - s.EXPECT().CompareFlattenedPayload(gomock.Any(), gomock.Any(), gomock.Any(), false).Times(2).Return(true, nil) - s.EXPECT().CompareFlattenedPayload(gomock.Any(), gomock.Any(), gomock.Any(), false).Times(4).Return(false, nil) + s.EXPECT().CompareFlattenedPayload(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(2).Return(true, nil) + s.EXPECT().CompareFlattenedPayload(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(4).Return(false, nil) + + fe, _ := args.filterRepo.(*mocks.MockFilterRepository) + fe.EXPECT().FindFilterBySubscriptionAndEventType(gomock.Any(), gomock.Any(), gomock.Any()).Times(1). + Return(&datastore.EventTypeFilter{Body: map[string]interface{}{ + "person.age": map[string]interface{}{ + "$in": []int{10, 1}, + }, + }}, nil) + + fe.EXPECT().FindFilterBySubscriptionAndEventType(gomock.Any(), gomock.Any(), gomock.Any()).Times(1). + Return(&datastore.EventTypeFilter{Body: map[string]interface{}{ + "person.age": map[string]interface{}{ + "$in": []int{10, 1}, + }, + }}, nil) + + fe.EXPECT().FindFilterBySubscriptionAndEventType(gomock.Any(), gomock.Any(), gomock.Any()).Times(1). + Return(&datastore.EventTypeFilter{Body: map[string]interface{}{ + "person.age": map[string]interface{}{ + "$in": []int{10, 1}, + }, + }}, nil) licenser, _ := args.licenser.(*mocks.MockLicenser) licenser.EXPECT().AdvancedSubscriptions().Times(1).Return(true) @@ -681,7 +768,7 @@ func TestMatchSubscriptionsUsingFilter(t *testing.T) { Filter: datastore.FilterSchema{ Body: map[string]interface{}{ "person.age": map[string]interface{}{ - "$in": []int{10, 1}, + "$gt": 10, }, }, }, @@ -727,8 +814,23 @@ func TestMatchSubscriptionsUsingFilter(t *testing.T) { }, dbFn: func(args *testArgs) { s, _ := args.subRepo.(*mocks.MockSubscriptionRepository) - s.EXPECT().CompareFlattenedPayload(gomock.Any(), gomock.Any(), gomock.Any(), false).Times(2).Return(false, nil) - s.EXPECT().CompareFlattenedPayload(gomock.Any(), gomock.Any(), gomock.Any(), false).Times(2).Return(true, nil) + s.EXPECT().CompareFlattenedPayload(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(2).Return(false, nil) + s.EXPECT().CompareFlattenedPayload(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Times(2).Return(true, nil) + + fe, _ := args.filterRepo.(*mocks.MockFilterRepository) + fe.EXPECT().FindFilterBySubscriptionAndEventType(gomock.Any(), gomock.Any(), gomock.Any()).Times(1). + Return(&datastore.EventTypeFilter{Body: map[string]interface{}{ + "event.action": map[string]interface{}{ + "$nin": []string{"update", "delete"}, + }, + }}, nil) + + fe.EXPECT().FindFilterBySubscriptionAndEventType(gomock.Any(), gomock.Any(), gomock.Any()).Times(1). + Return(&datastore.EventTypeFilter{Body: map[string]interface{}{ + "event.action": map[string]interface{}{ + "$nin": []string{"read", "delete"}, + }, + }}, nil) licenser, _ := args.licenser.(*mocks.MockLicenser) licenser.EXPECT().AdvancedSubscriptions().Times(1).Return(true) @@ -779,7 +881,7 @@ func TestMatchSubscriptionsUsingFilter(t *testing.T) { payload, err := json.Marshal(tt.payload) require.NoError(t, err) - subs, err := matchSubscriptionsUsingFilter(context.Background(), &datastore.Event{Data: payload}, args.subRepo, args.licenser, tt.inputSubs, false) + subs, err := matchSubscriptionsUsingFilter(context.Background(), &datastore.Event{Data: payload}, args.subRepo, args.filterRepo, args.licenser, tt.inputSubs, false) if tt.wantErr { require.NotNil(t, err) return