Skip to content

Commit

Permalink
feat: implement rules for Notification API
Browse files Browse the repository at this point in the history
  • Loading branch information
chrisgacsal committed Aug 13, 2024
1 parent ec150bb commit 42d664f
Show file tree
Hide file tree
Showing 7 changed files with 1,084 additions and 7 deletions.
9 changes: 9 additions & 0 deletions internal/notification/httpdriver/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

type Handler interface {
ChannelHandler
RuleHandler
}

type ChannelHandler interface {
Expand All @@ -23,6 +24,14 @@ type ChannelHandler interface {
UpdateChannel() UpdateChannelHandler
}

type RuleHandler interface {
ListRules() ListRulesHandler
CreateRule() CreateRuleHandler
DeleteRule() DeleteRuleHandler
GetRule() GetRuleHandler
UpdateRule() UpdateRuleHandler
}

var _ Handler = (*handler)(nil)

type handler struct {
Expand Down
272 changes: 272 additions & 0 deletions internal/notification/httpdriver/rule.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,272 @@
package httpdriver

import (
"context"
"fmt"
"net/http"

"github.com/openmeterio/openmeter/api"
"github.com/openmeterio/openmeter/internal/notification"
"github.com/openmeterio/openmeter/pkg/defaultx"
"github.com/openmeterio/openmeter/pkg/framework/commonhttp"
"github.com/openmeterio/openmeter/pkg/framework/transport/httptransport"
"github.com/openmeterio/openmeter/pkg/models"
"github.com/openmeterio/openmeter/pkg/pagination"
"github.com/openmeterio/openmeter/pkg/sortx"
)

type (
ListRulesRequest = notification.ListRulesInput
ListRulesResponse = api.NotificationRulesResponse
ListRulesParams = api.ListNotificationRulesParams
ListRulesHandler httptransport.HandlerWithArgs[ListRulesRequest, ListRulesResponse, ListRulesParams]
)

func (h *handler) ListRules() ListRulesHandler {
return httptransport.NewHandlerWithArgs(
func(ctx context.Context, r *http.Request, params ListRulesParams) (ListRulesRequest, error) {
ns, err := h.resolveNamespace(ctx)
if err != nil {
return ListRulesRequest{}, fmt.Errorf("failed to resolve namespace: %w", err)
}

req := ListRulesRequest{
Namespaces: []string{ns},
IncludeDisabled: defaultx.WithDefault(params.IncludeDisabled, notification.DefaultDisabled),
OrderBy: defaultx.WithDefault(params.OrderBy, api.ListNotificationRulesParamsOrderById),
Order: sortx.Order(defaultx.WithDefault(params.Order, api.SortOrderASC)),
Page: pagination.Page{
PageSize: defaultx.WithDefault(params.PageSize, notification.DefaultPageSize),
PageNumber: defaultx.WithDefault(params.Page, notification.DefaultPageNumber),
},
}

return req, nil
},
func(ctx context.Context, request ListRulesRequest) (ListRulesResponse, error) {
resp, err := h.service.ListRules(ctx, request)
if err != nil {
return ListRulesResponse{}, fmt.Errorf("failed to list rules: %w", err)
}

items := make([]api.NotificationRule, 0, len(resp.Items))

for _, rule := range resp.Items {
var item CreateRuleResponse

item, err = rule.AsNotificationRule()
if err != nil {
return ListRulesResponse{}, fmt.Errorf("failed to cast rule to notification rule: %w", err)
}

items = append(items, item)
}

return ListRulesResponse{
Items: items,
Page: resp.Page.PageNumber,
PageSize: resp.Page.PageSize,
TotalCount: resp.TotalCount,
}, nil
},
commonhttp.JSONResponseEncoderWithStatus[ListRulesResponse](http.StatusOK),
httptransport.AppendOptions(
h.options,
httptransport.WithOperationName("listNotificationRules"),
httptransport.WithErrorEncoder(errorEncoder()),
)...,
)
}

type (
CreateRuleRequest = notification.CreateRuleInput
CreateRuleResponse = api.NotificationRule
CreateRuleHandler httptransport.Handler[CreateRuleRequest, CreateRuleResponse]
)

func (h *handler) CreateRule() CreateRuleHandler {
return httptransport.NewHandler(
func(ctx context.Context, r *http.Request) (CreateRuleRequest, error) {
body := api.NotificationRuleCreateRequest{}
if err := commonhttp.JSONRequestBodyDecoder(r, &body); err != nil {
return CreateRuleRequest{}, fmt.Errorf("field to decode create rule request: %w", err)
}

ns, err := h.resolveNamespace(ctx)
if err != nil {
return CreateRuleRequest{}, fmt.Errorf("failed to resolve namespace: %w", err)
}

value, err := body.ValueByDiscriminator()
if err != nil {
return CreateRuleRequest{}, notification.ValidationError{
Err: err,
}
}

req := CreateRuleRequest{
NamespacedModel: models.NamespacedModel{
Namespace: ns,
},
}

switch v := value.(type) {
case api.NotificationRuleBalanceThresholdCreateRequest:
req = req.FromNotificationRuleBalanceThresholdCreateRequest(v)
default:
return CreateRuleRequest{}, notification.ValidationError{
Err: fmt.Errorf("invalid channel type: %T", v),
}
}

return req, nil
},
func(ctx context.Context, request CreateRuleRequest) (CreateRuleResponse, error) {
rule, err := h.service.CreateRule(ctx, request)
if err != nil {
return CreateRuleResponse{}, fmt.Errorf("failed to create rule: %w", err)
}

return rule.AsNotificationRule()
},
commonhttp.JSONResponseEncoderWithStatus[CreateRuleResponse](http.StatusCreated),
httptransport.AppendOptions(
h.options,
httptransport.WithOperationName("createNotificationRule"),
httptransport.WithErrorEncoder(errorEncoder()),
)...,
)
}

type (
UpdateRuleRequest = notification.UpdateRuleInput
UpdateRuleResponse = api.NotificationRule
UpdateRuleHandler httptransport.HandlerWithArgs[UpdateRuleRequest, UpdateRuleResponse, api.RuleId]
)

func (h *handler) UpdateRule() UpdateRuleHandler {
return httptransport.NewHandlerWithArgs(
func(ctx context.Context, r *http.Request, ruleID api.RuleId) (UpdateRuleRequest, error) {
body := api.NotificationRuleCreateRequest{}
if err := commonhttp.JSONRequestBodyDecoder(r, &body); err != nil {
return UpdateRuleRequest{}, fmt.Errorf("field to decode update rule request: %w", err)
}

ns, err := h.resolveNamespace(ctx)
if err != nil {
return UpdateRuleRequest{}, fmt.Errorf("failed to resolve namespace: %w", err)
}

value, err := body.ValueByDiscriminator()
if err != nil {
return UpdateRuleRequest{}, notification.ValidationError{
Err: err,
}
}

req := UpdateRuleRequest{
NamespacedModel: models.NamespacedModel{
Namespace: ns,
},
ID: ruleID,
}

switch v := value.(type) {
case api.NotificationRuleBalanceThresholdCreateRequest:
req = req.FromNotificationRuleBalanceThresholdCreateRequest(v)
default:
return UpdateRuleRequest{}, notification.ValidationError{
Err: fmt.Errorf("invalid rule type: %T", v),
}
}

return req, nil
},
func(ctx context.Context, request UpdateRuleRequest) (UpdateRuleResponse, error) {
rule, err := h.service.UpdateRule(ctx, request)
if err != nil {
return UpdateRuleResponse{}, fmt.Errorf("failed to update rule: %w", err)
}

return rule.AsNotificationRule()
},
commonhttp.JSONResponseEncoderWithStatus[UpdateRuleResponse](http.StatusOK),
httptransport.AppendOptions(
h.options,
httptransport.WithOperationName("updateNotificationRule"),
httptransport.WithErrorEncoder(errorEncoder()),
)...,
)
}

type (
DeleteRuleRequest = notification.DeleteRuleInput
DeleteRuleResponse = interface{}
DeleteRuleHandler httptransport.HandlerWithArgs[DeleteRuleRequest, DeleteRuleResponse, api.RuleId]
)

func (h *handler) DeleteRule() DeleteRuleHandler {
return httptransport.NewHandlerWithArgs(
func(ctx context.Context, r *http.Request, ruleID api.RuleId) (DeleteRuleRequest, error) {
ns, err := h.resolveNamespace(ctx)
if err != nil {
return DeleteRuleRequest{}, fmt.Errorf("failed to resolve namespace: %w", err)
}

return DeleteRuleRequest{
Namespace: ns,
ID: ruleID,
}, nil
},
func(ctx context.Context, request DeleteRuleRequest) (DeleteRuleResponse, error) {
err := h.service.DeleteRule(ctx, request)
if err != nil {
return nil, fmt.Errorf("failed to delete rule: %w", err)
}

return nil, nil
},
commonhttp.EmptyResponseEncoder[DeleteChannelResponse](http.StatusNoContent),
httptransport.AppendOptions(
h.options,
httptransport.WithOperationName("deleteNotificationRule"),
httptransport.WithErrorEncoder(errorEncoder()),
)...,
)
}

type (
GetRuleRequest = notification.GetRuleInput
GetRuleResponse = api.NotificationRule
GetRuleHandler httptransport.HandlerWithArgs[GetRuleRequest, GetRuleResponse, api.RuleId]
)

func (h *handler) GetRule() GetRuleHandler {
return httptransport.NewHandlerWithArgs(
func(ctx context.Context, r *http.Request, ruleID api.RuleId) (GetRuleRequest, error) {
ns, err := h.resolveNamespace(ctx)
if err != nil {
return GetRuleRequest{}, fmt.Errorf("failed to resolve namespace: %w", err)
}

return GetRuleRequest{
Namespace: ns,
ID: ruleID,
}, nil
},
func(ctx context.Context, request GetRuleRequest) (GetRuleResponse, error) {
rule, err := h.service.GetRule(ctx, request)
if err != nil {
return GetRuleResponse{}, fmt.Errorf("failed to get rule: %w", err)
}

return rule.AsNotificationRule()
},
commonhttp.JSONResponseEncoderWithStatus[GetRuleResponse](http.StatusOK),
httptransport.AppendOptions(
h.options,
httptransport.WithOperationName("getNotificationRule"),
httptransport.WithErrorEncoder(errorEncoder()),
)...,
)
}
9 changes: 9 additions & 0 deletions internal/notification/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

type Repository interface {
ChannelRepository
RuleRepository
}

type ChannelRepository interface {
Expand All @@ -17,3 +18,11 @@ type ChannelRepository interface {
GetChannel(ctx context.Context, params GetChannelInput) (*Channel, error)
UpdateChannel(ctx context.Context, params UpdateChannelInput) (*Channel, error)
}

type RuleRepository interface {
ListRules(ctx context.Context, params ListRulesInput) (pagination.PagedResponse[Rule], error)
CreateRule(ctx context.Context, params CreateRuleInput) (*Rule, error)
DeleteRule(ctx context.Context, params DeleteRuleInput) error
GetRule(ctx context.Context, params GetRuleInput) (*Rule, error)
UpdateRule(ctx context.Context, params UpdateRuleInput) (*Rule, error)
}
38 changes: 38 additions & 0 deletions internal/notification/repository/entitymapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,41 @@ func ChannelFromDBEntity(e db.NotificationChannel) *notification.Channel {
Config: e.Config,
}
}

func RuleFromDBEntity(e db.NotificationRule) *notification.Rule {
var channels []notification.Channel
if len(e.Edges.Channels) > 0 {
for _, channel := range e.Edges.Channels {
if channel == nil {
continue
}

channels = append(channels, *ChannelFromDBEntity(*channel))
}
}

return &notification.Rule{
NamespacedModel: models.NamespacedModel{
Namespace: e.Namespace,
},
ManagedModel: models.ManagedModel{
CreatedAt: e.CreatedAt.UTC(),
UpdatedAt: e.UpdatedAt.UTC(),
DeletedAt: func() *time.Time {
if e.DeletedAt == nil {
return nil
}

deletedAt := e.DeletedAt.UTC()

return &deletedAt
}(),
},
ID: e.ID,
Type: e.Type,
Name: e.Name,
Disabled: e.Disabled,
Config: e.Config,
Channels: channels,
}
}
Loading

0 comments on commit 42d664f

Please sign in to comment.