Skip to content

Commit

Permalink
feat: write entity managers
Browse files Browse the repository at this point in the history
  • Loading branch information
GAlexIHU committed Oct 29, 2024
1 parent 4c264fe commit 7c7067f
Show file tree
Hide file tree
Showing 17 changed files with 355 additions and 42 deletions.
3 changes: 1 addition & 2 deletions openmeter/subscription/adapter/mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/openmeterio/openmeter/openmeter/entitlement"
"github.com/openmeterio/openmeter/openmeter/subscription"
"github.com/openmeterio/openmeter/openmeter/subscription/applieddiscount"
"github.com/openmeterio/openmeter/openmeter/subscription/price"
"github.com/openmeterio/openmeter/pkg/convert"
"github.com/openmeterio/openmeter/pkg/datex"
"github.com/openmeterio/openmeter/pkg/models"
Expand Down Expand Up @@ -127,7 +126,7 @@ func MapDBSubscriptionPatch(patch *db.SubscriptionPatch) (subscription.Subscript
return subscription.SubscriptionPatch{}, fmt.Errorf("both price key and value must be defined if price is defined")
}

p.CreateInput.CreatePriceInput = &price.Spec{
p.CreateInput.CreatePriceInput = &subscription.CreatePriceInput{
Value: *val.CreatePriceValue,
PhaseKey: val.PhaseKey,
ItemKey: val.ItemKey,
Expand Down
19 changes: 19 additions & 0 deletions openmeter/subscription/adapter/subscriptionrepo.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"slices"
"time"

"github.com/samber/lo"

Expand All @@ -28,6 +29,24 @@ func NewSubscriptionRepo(db *db.Client) *subscriptionRepo {
}
}

func (r *subscriptionRepo) EndCadence(ctx context.Context, id string, at time.Time) (*subscription.Subscription, error) {
return entutils.TransactingRepo(ctx, r, func(ctx context.Context, repo *subscriptionRepo) (*subscription.Subscription, error) {
ent, err := repo.db.Subscription.UpdateOneID(id).SetActiveTo(at).Save(ctx)
if db.IsNotFound(err) {
return nil, &subscription.NotFoundError{
ID: id,
}
}
if err != nil {
return nil, err
}

sub, err := MapDBSubscription(ent)

return lo.ToPtr(sub), err
})
}

func (r *subscriptionRepo) GetCustomerSubscription(ctx context.Context, customerID models.NamespacedID) (subscription.Subscription, error) {
return entutils.TransactingRepo(
ctx,
Expand Down
3 changes: 1 addition & 2 deletions openmeter/subscription/adapter/subscriptionrepo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/openmeterio/openmeter/openmeter/entitlement"
"github.com/openmeterio/openmeter/openmeter/subscription"
"github.com/openmeterio/openmeter/openmeter/subscription/price"
subscriptiontestutils "github.com/openmeterio/openmeter/openmeter/subscription/testutils"
"github.com/openmeterio/openmeter/openmeter/testutils"
"github.com/openmeterio/openmeter/pkg/clock"
Expand Down Expand Up @@ -120,7 +119,7 @@ func TestPatchParsing(t *testing.T) {
PhaseKey: "test",
ItemKey: "test",
FeatureKey: lo.ToPtr("feature-1"),
CreatePriceInput: &price.Spec{
CreatePriceInput: &subscription.CreatePriceInput{
PhaseKey: "test",
ItemKey: "test",
Value: "100.0",
Expand Down
23 changes: 15 additions & 8 deletions openmeter/subscription/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ type Connector interface {
}
type commandAndQuery struct {
repo Repository
// managers
entitlementManager EntitlementManager
priceManager PriceManager
subscriptionManager SubscriptionManager
// connectors
priceConnector price.Connector
customerService customer.Service
Expand All @@ -49,6 +53,10 @@ type commandAndQuery struct {

func NewConnector(
repo Repository,
// managers
entitlementManager EntitlementManager,
priceManager PriceManager,
subscriptionManager SubscriptionManager,
// connectors
priceConnector price.Connector,
customerService customer.Service,
Expand Down Expand Up @@ -119,6 +127,7 @@ func (c *commandAndQuery) Create(ctx context.Context, req NewSubscriptionRequest

// Get the default spec based on the Plan
spec, err := SpecFromPlan(plan, CreateSubscriptionCustomerInput{
Namespace: req.Namespace,
Currency: req.Currency,
CustomerId: req.CustomerID,
ActiveFrom: req.ActiveFrom,
Expand Down Expand Up @@ -238,14 +247,11 @@ func (c *commandAndQuery) createPhase(ctx context.Context, sub Subscription, cus
// Create Price
if item.CreatePriceInput != nil {
// Correct linking of price input to item & phase happens during spec validation
_, err := c.priceConnector.Create(ctx, price.CreateInput{
SubscriptionId: models.NamespacedID{
Namespace: sub.Namespace,
ID: sub.ID,
},
Spec: *item.CreatePriceInput,
CadencedModel: cadence,
})
_, err := c.priceConnector.Create(ctx, item.CreatePriceInput.ToCreatePriceSpec(
sub.Namespace,
sub.ID,
cadence,
).CreateInput)
if err != nil {
return fmt.Errorf("failed to create price for item %s: %w", item.ItemKey, err)
}
Expand Down Expand Up @@ -293,6 +299,7 @@ func (q *commandAndQuery) getSpec(ctx context.Context, sub Subscription) (*Subsc

// Get the default spec based on the Plan
spec, err := SpecFromPlan(plan, CreateSubscriptionCustomerInput{
Namespace: sub.Namespace,
Currency: sub.Currency,
CustomerId: sub.CustomerId,
ActiveFrom: sub.ActiveFrom,
Expand Down
3 changes: 1 addition & 2 deletions openmeter/subscription/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
customerentity "github.com/openmeterio/openmeter/openmeter/customer/entity"
"github.com/openmeterio/openmeter/openmeter/subscription"
subscriptionentitlement "github.com/openmeterio/openmeter/openmeter/subscription/entitlement"
"github.com/openmeterio/openmeter/openmeter/subscription/price"
subscriptiontestutils "github.com/openmeterio/openmeter/openmeter/subscription/testutils"
"github.com/openmeterio/openmeter/openmeter/testutils"
"github.com/openmeterio/openmeter/pkg/clock"
Expand Down Expand Up @@ -343,7 +342,7 @@ func TestCreation(t *testing.T) {
CreateSubscriptionItemPlanInput: subscription.CreateSubscriptionItemPlanInput{
PhaseKey: "test-phase-2",
ItemKey: "added-ratecard",
CreatePriceInput: &price.Spec{
CreatePriceInput: &subscription.CreatePriceInput{
PhaseKey: "test-phase-2",
ItemKey: "added-ratecard",
Value: "500",
Expand Down
2 changes: 2 additions & 0 deletions openmeter/subscription/entitlement.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,6 @@ type EntitlementAdapter interface {
// if t1 < t2 < t3, and some entitlement was deleted effective at t2, then
// with at = t1 the entitlement will be returned, while with at = t3 it won't.
GetForSubscription(ctx context.Context, subscriptionID models.NamespacedID, at time.Time) ([]SubscriptionEntitlement, error)

Delete(ctx context.Context, namespace string, ref SubscriptionItemRef) error
}
4 changes: 4 additions & 0 deletions openmeter/subscription/entitlement/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,3 +159,7 @@ func (a *EntitlementSubscriptionAdapter) GetForSubscription(ctx context.Context,

return subEnts, nil
}

func (a *EntitlementSubscriptionAdapter) Delete(ctx context.Context, namespace string, ref subscription.SubscriptionItemRef) error {
panic("not implemented")
}
148 changes: 144 additions & 4 deletions openmeter/subscription/entitymanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,159 @@ package subscription

import (
"context"
"fmt"

"github.com/samber/lo"

"github.com/openmeterio/openmeter/openmeter/subscription/price"
)

// EntityManager is responsible for keeping a certain entity's state in sync with a desired state.
type EntityManager[TView, TSpec, TEntity any] interface {
// SyncState transforms a given entity from the current view to the desired spec.
SyncState(ctx context.Context, current TView, target TSpec) (TEntity, error)
SyncState(ctx context.Context, current *TView, target *TSpec) (*TEntity, error)
}

// TODO: specify types
type (
EntitlementManager = EntityManager[any, any, SubscriptionEntitlement]
PriceManager = EntityManager[any, any, price.Price]
SubscriptionManager = EntityManager[any, any, Subscription]
EntitlementManager = EntityManager[SubscriptionEntitlement, SubscriptionEntitlementSpec, SubscriptionEntitlement]
PriceManager = EntityManager[price.Price, CreatePriceSpec, price.Price]
// SubscriptionManager DOES NOT manage phases and items, only the subscription resource itself.
// To manage phases and items, use connector.Sync!
SubscriptionManager = EntityManager[subscriptionView, SubscriptionSpec, Subscription]
)

// Some local utils to dedupe code
type cud[TView, TSpec, TEntity any] interface {
Create(ctx context.Context, target TSpec) (*TEntity, error)
Update(ctx context.Context, current TView, target TSpec) (*TEntity, error)
Delete(ctx context.Context, current TView) error
}

type syncer[TView, TSpec, TEntity any] struct {
fn func(ctx context.Context, current *TView, target *TSpec) (*TEntity, error)
}

func (s *syncer[TView, TSpec, TEntity]) SyncState(ctx context.Context, current *TView, target *TSpec) (*TEntity, error) {
return s.fn(ctx, current, target)
}

func managerFromCud[TView, TSpec, TEntity any](cudder cud[TView, TSpec, TEntity]) EntityManager[TView, TSpec, TEntity] {
return &syncer[TView, TSpec, TEntity]{
fn: func(ctx context.Context, current *TView, target *TSpec) (*TEntity, error) {
if lo.IsNil(current) && lo.IsNil(target) {
// This is a no-op, nothing was there previously and nothing is desired.
// Alternatively we could return an error here
return nil, nil
}

if lo.IsNil(current) && !lo.IsNil(target) {
return cudder.Create(ctx, *target)
}

if !lo.IsNil(current) && !lo.IsNil(target) {
return cudder.Update(ctx, *current, *target)
}

if !lo.IsNil(current) && lo.IsNil(target) {
err := cudder.Delete(ctx, *current)
return nil, err
}

return nil, fmt.Errorf("unexpected state: current=%v, target=%v, but neither can be nil", current, target)
},
}
}

type entitlementCud struct {
entitlementAdapter EntitlementAdapter
}

var _ cud[SubscriptionEntitlement, SubscriptionEntitlementSpec, SubscriptionEntitlement] = &entitlementCud{}

func NewEntitlementManager(entitlementAdapter EntitlementAdapter) EntitlementManager {
return managerFromCud(&entitlementCud{
entitlementAdapter: entitlementAdapter,
})
}

func (c *entitlementCud) Create(ctx context.Context, target SubscriptionEntitlementSpec) (*SubscriptionEntitlement, error) {
// FIXME: ID wont be present on this?
return c.entitlementAdapter.ScheduleEntitlement(ctx, target.ItemRef, target.EntitlementInputs)
}

func (c *entitlementCud) Update(ctx context.Context, current SubscriptionEntitlement, target SubscriptionEntitlementSpec) (*SubscriptionEntitlement, error) {
// How do we update entitlements?
// Entitlements don't have update methods, the naive approach is that we just delete the current one and create a new one.
// Are there any changes that have to be made?
// For now, let's naively assume no!
err := c.Delete(ctx, current)
if err != nil {
return nil, err
}
return c.Create(ctx, target)
}

func (c *entitlementCud) Delete(ctx context.Context, current SubscriptionEntitlement) error {
return c.entitlementAdapter.Delete(ctx, current.Entitlement.Namespace, current.ItemRef)
}

type priceCud struct {
connector price.Connector
}

var _ cud[price.Price, CreatePriceSpec, price.Price] = &priceCud{}

func NewPriceManager(connector price.Connector) PriceManager {
return managerFromCud(&priceCud{
connector: connector,
})
}

func (c *priceCud) Create(ctx context.Context, target CreatePriceSpec) (*price.Price, error) {
// FIXME: ID wont be present on this?
return c.connector.Create(ctx, target.CreateInput)
}

func (c *priceCud) Update(ctx context.Context, current price.Price, target CreatePriceSpec) (*price.Price, error) {
// FIXME: We cannot update prices if they were already invoiced
// In that case we'd have to unschedul the price and create a new one from the current timestamp
panic("not implemented")
}

func (c *priceCud) Delete(ctx context.Context, current price.Price) error {
// FIXME: We cannot delete prices if they were already invoiced
panic("not implemented")
}

type subscriptionCud struct {
repo Repository
}

var _ cud[subscriptionView, SubscriptionSpec, Subscription] = &subscriptionCud{}

func NewSubscriptionManager(repo Repository) SubscriptionManager {
return managerFromCud(&subscriptionCud{repo: repo})
}

func (c *subscriptionCud) Create(ctx context.Context, target SubscriptionSpec) (*Subscription, error) {
sub, err := c.repo.CreateSubscription(ctx, target.Namespace, target.GetCreateInput())
if err != nil {
return nil, err
}
return &sub, nil
}

func (c *subscriptionCud) Update(ctx context.Context, current subscriptionView, target SubscriptionSpec) (*Subscription, error) {
// Properties we can update are:
// - TODO: PlanRef
// - activeTo
if target.ActiveTo != current.subscription.ActiveTo && target.ActiveTo != nil {
return c.repo.EndCadence(ctx, current.subscription.ID, *target.ActiveTo)
}
panic("not implemented")
}

func (c *subscriptionCud) Delete(ctx context.Context, current subscriptionView) error {
return fmt.Errorf("entitymanager cannot delete a subscription!")
}
3 changes: 1 addition & 2 deletions openmeter/subscription/patch_serialization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (

"github.com/openmeterio/openmeter/openmeter/subscription"
"github.com/openmeterio/openmeter/openmeter/subscription/applieddiscount"
"github.com/openmeterio/openmeter/openmeter/subscription/price"
"github.com/openmeterio/openmeter/pkg/datex"
)

Expand Down Expand Up @@ -84,7 +83,7 @@ func TestShouldSerializeAndDeserialize(t *testing.T) {
PhaseKey: "asd",
ItemKey: "asd2",
FeatureKey: lo.ToPtr("feature-1"),
CreatePriceInput: &price.Spec{
CreatePriceInput: &subscription.CreatePriceInput{
PhaseKey: "asd",
ItemKey: "asd2",
Value: "1.0",
Expand Down
5 changes: 5 additions & 0 deletions openmeter/subscription/price/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

type Connector interface {
Create(ctx context.Context, input CreateInput) (*Price, error)
Delete(ctx context.Context, id models.NamespacedID) error
GetForSubscription(ctx context.Context, subscriptionID models.NamespacedID) ([]Price, error)
}

Expand All @@ -27,3 +28,7 @@ func (c *connector) GetForSubscription(ctx context.Context, subscriptionID model
// Should we return deleted prices?
return c.repo.GetForSubscription(ctx, subscriptionID, GetPriceFilters{IncludeDeleted: false})
}

func (c *connector) Delete(ctx context.Context, id models.NamespacedID) error {
return c.repo.Delete(ctx, id)
}
17 changes: 17 additions & 0 deletions openmeter/subscription/price/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Repository interface {

GetForSubscription(ctx context.Context, subscriptionId models.NamespacedID, filters GetPriceFilters) ([]Price, error)
Create(ctx context.Context, input CreateInput) (*Price, error)
Delete(ctx context.Context, id models.NamespacedID) error
}

type repository struct {
Expand All @@ -38,6 +39,22 @@ func NewRepository(db *db.Client) *repository {
return &repository{db: db}
}

func (r *repository) Delete(ctx context.Context, id models.NamespacedID) error {
_, err := entutils.TransactingRepo(ctx, r, func(ctx context.Context, repo *repository) (interface{}, error) {
_, err := repo.db.Price.UpdateOneID(id.ID).
SetDeletedAt(clock.Now()).
Save(ctx)
if db.IsNotFound(err) {
return nil, NotFoundError{ID: id.ID}
}
if err != nil {
return nil, fmt.Errorf("failed to delete price: %w", err)
}
return nil, nil
})
return err
}

func (r *repository) GetForSubscription(ctx context.Context, subscriptionId models.NamespacedID, filters GetPriceFilters) ([]Price, error) {
return entutils.TransactingRepo(
ctx,
Expand Down
2 changes: 2 additions & 0 deletions openmeter/subscription/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
)

type Repository interface {
models.CadencedResourceRepo[Subscription]

// Returns the current customer subscription
GetCustomerSubscription(ctx context.Context, customerID models.NamespacedID) (Subscription, error)

Expand Down
Loading

0 comments on commit 7c7067f

Please sign in to comment.