From 6ffe8935df395d065548cf66c6589cc40b36de59 Mon Sep 17 00:00:00 2001 From: Peter Turi Date: Tue, 10 Dec 2024 11:31:49 +0100 Subject: [PATCH] feat: billing subscription handler --- openmeter/billing/README.md | 9 + .../subscriptionhandler/scanario_test.go | 133 ------- openmeter/billing/worker/handler.go | 98 ++++++ openmeter/billing/worker/phaseiterator.go | 196 +++++++++++ .../billing/worker/phaseiterator_test.go | 266 ++++++++++++++ openmeter/billing/worker/scanario_test.go | 327 ++++++++++++++++++ test/billing/suite.go | 1 + 7 files changed, 897 insertions(+), 133 deletions(-) delete mode 100644 openmeter/billing/subscriptionhandler/scanario_test.go create mode 100644 openmeter/billing/worker/handler.go create mode 100644 openmeter/billing/worker/phaseiterator.go create mode 100644 openmeter/billing/worker/phaseiterator_test.go create mode 100644 openmeter/billing/worker/scanario_test.go diff --git a/openmeter/billing/README.md b/openmeter/billing/README.md index 0f0f2aa74..64b3e39eb 100644 --- a/openmeter/billing/README.md +++ b/openmeter/billing/README.md @@ -149,3 +149,12 @@ The entity's `ChildrenWithIDReuse` call can be used to facilitate the line reuse Then the adapter layer will use those IDs to make decisions if they want to persist or recreate the records. We could do the same logic in the adapter layer, but this approach makes it more flexible on the calculation layer if we want to generate new lines or not. If this becomes a burden we can do the same matching logic as part of the upsert logic in adapter. + +## Subscription adapter + +The subscription adapter is responsible for feeding the billing with line items during the subscription's lifecycle. The generation of items is event-driven, new items are yielded when: +- A subscription is created +- A new invoice is created +- A subscription is modified + +TODO: for new versions the previous line will be split and will not get the stuff from the other side -> this means that the unique id should encode the version of the item diff --git a/openmeter/billing/subscriptionhandler/scanario_test.go b/openmeter/billing/subscriptionhandler/scanario_test.go deleted file mode 100644 index 0e83ef50a..000000000 --- a/openmeter/billing/subscriptionhandler/scanario_test.go +++ /dev/null @@ -1,133 +0,0 @@ -package subscriptionhandler - -import ( - "log/slog" - "testing" - "time" - - "github.com/openmeterio/openmeter/openmeter/credit" - grantrepo "github.com/openmeterio/openmeter/openmeter/credit/adapter" - enttx "github.com/openmeterio/openmeter/openmeter/ent/tx" - "github.com/openmeterio/openmeter/openmeter/entitlement" - entitlementrepo "github.com/openmeterio/openmeter/openmeter/entitlement/adapter" - booleanentitlement "github.com/openmeterio/openmeter/openmeter/entitlement/boolean" - meteredentitlement "github.com/openmeterio/openmeter/openmeter/entitlement/metered" - staticentitlement "github.com/openmeterio/openmeter/openmeter/entitlement/static" - "github.com/openmeterio/openmeter/openmeter/productcatalog/plan" - planadapter "github.com/openmeterio/openmeter/openmeter/productcatalog/plan/adapter" - planservice "github.com/openmeterio/openmeter/openmeter/productcatalog/plan/service" - "github.com/openmeterio/openmeter/openmeter/subscription" - subscriptionentitlementadatapter "github.com/openmeterio/openmeter/openmeter/subscription/adapters/entitlement" - subscriptionrepo "github.com/openmeterio/openmeter/openmeter/subscription/repo" - subscriptionservice "github.com/openmeterio/openmeter/openmeter/subscription/service" - "github.com/openmeterio/openmeter/openmeter/watermill/eventbus" - billingtest "github.com/openmeterio/openmeter/test/billing" - "github.com/stretchr/testify/suite" -) - -type SubscriptionHandlerTestSuite struct { - billingtest.BaseSuite - - PlanService plan.Service - SubscriptionService subscription.Service -} - -func (s *SubscriptionHandlerTestSuite) SetupSuite() { - s.BaseSuite.SetupSuite() - - planAdapter, err := planadapter.New(planadapter.Config{ - Client: s.DBClient, - Logger: slog.Default(), - }) - - planService, err := planservice.New(planservice.Config{ - Feature: s.FeatureService, - Adapter: planAdapter, - Logger: slog.Default(), - }) - s.NoError(err) - - s.PlanService = planService - - subsRepo := subscriptionrepo.NewSubscriptionRepo(s.DBClient) - subsItemRepo := subscriptionrepo.NewSubscriptionItemRepo(s.DBClient) - - s.SubscriptionService = subscriptionservice.New(subscriptionservice.ServiceConfig{ - SubscriptionRepo: subsRepo, - SubscriptionPhaseRepo: subscriptionrepo.NewSubscriptionPhaseRepo(s.DBClient), - SubscriptionItemRepo: subsItemRepo, - // connectors - CustomerService: s.CustomerService, - // adapters - EntitlementAdapter: subscriptionentitlementadatapter.NewEntitlementSubscriptionAdapter( - s.SetupEntitlements(), - subsItemRepo, - subsRepo, - ), - // framework - TransactionManager: subsRepo, - }) -} - -func (s *SubscriptionHandlerTestSuite) SetupEntitlements() entitlement.Connector { - // Init grants/credit - grantRepo := grantrepo.NewPostgresGrantRepo(s.DBClient) - balanceSnapshotRepo := grantrepo.NewPostgresBalanceSnapshotRepo(s.DBClient) - - // Init entitlements - entitlementRepo := entitlementrepo.NewPostgresEntitlementRepo(s.DBClient) - usageResetRepo := entitlementrepo.NewPostgresUsageResetRepo(s.DBClient) - - mockPublisher := eventbus.NewMock(s.T()) - - owner := meteredentitlement.NewEntitlementGrantOwnerAdapter( - s.FeatureRepo, - entitlementRepo, - usageResetRepo, - s.MeterRepo, - slog.Default(), - ) - - transactionManager := enttx.NewCreator(s.DBClient) - - creditConnector := credit.NewCreditConnector( - grantRepo, - balanceSnapshotRepo, - owner, - s.MockStreamingConnector, - slog.Default(), - time.Minute, - mockPublisher, - transactionManager, - ) - - meteredEntitlementConnector := meteredentitlement.NewMeteredEntitlementConnector( - s.MockStreamingConnector, - owner, - creditConnector, - creditConnector, - grantRepo, - entitlementRepo, - mockPublisher, - ) - - staticEntitlementConnector := staticentitlement.NewStaticEntitlementConnector() - booleanEntitlementConnector := booleanentitlement.NewBooleanEntitlementConnector() - - return entitlement.NewEntitlementConnector( - entitlementRepo, - s.FeatureService, - s.MeterRepo, - meteredEntitlementConnector, - staticEntitlementConnector, - booleanEntitlementConnector, - mockPublisher, - ) -} - -func TestSubscriptionHandlerScenarios(t *testing.T) { - suite.Run(t, new(SubscriptionHandlerTestSuite)) -} - -func (t *SubscriptionHandlerTestSuite) TestSubscriptionHappyPath() { -} diff --git a/openmeter/billing/worker/handler.go b/openmeter/billing/worker/handler.go new file mode 100644 index 000000000..fc6b691d3 --- /dev/null +++ b/openmeter/billing/worker/handler.go @@ -0,0 +1,98 @@ +package billingworker + +import ( + "context" + "fmt" + "slices" + "time" + + "github.com/openmeterio/openmeter/openmeter/billing" + "github.com/openmeterio/openmeter/openmeter/subscription" +) + +type GetUpcomingLineItemsInput struct { + SubscriptionView subscription.SubscriptionView + StartFrom *time.Time + + Customer billing.ProfileWithCustomerDetails +} + +func GetUpcomingLineItems(ctx context.Context, in GetUpcomingLineItemsInput) ([]billing.Line, error) { + // Given we are event-driven we should at least yield one line item. If that's assigned to + // a new invoice, this function can be triggered again. + + // TODO: there's a builtin for this + slices.SortFunc(in.SubscriptionView.Phases, func(i, j subscription.SubscriptionPhaseView) int { + return timeCMP(i.SubscriptionPhase.ActiveFrom, j.SubscriptionPhase.ActiveFrom) + }) + + // Let's identify the first phase that is invoicable + firstInvoicablePhaseIdx, found := findFirstInvoicablePhase(in.SubscriptionView.Phases, in.StartFrom) + if !found { + // There are no invoicable items in the subscription, so we can return an empty list + // If the subscription has changed we will just recalculate the line items with the updated + // contents. + return nil, nil + } + + // Let's find out the limit of the generation. As a rule of thumb, we should have at least one line per + // invoicable item. + + switch in.Customer.Profile.WorkflowConfig.Collection.Alignment { + case billing.AlignmentKindSubscription: + // In this case, the end of the generation end will be + default: + return nil, fmt.Errorf("unsupported alignment type: %s", in.Customer.Profile.WorkflowConfig.Collection.Alignment) + } + + for i := firstInvoicablePhaseIdx; i < len(in.SubscriptionView.Phases); i++ { + } + + return nil, nil +} + +func findFirstInvoicablePhase(phases []subscription.SubscriptionPhaseView, startFrom *time.Time) (int, bool) { + // A phase is invoicable, if it has any items that has price objects set, and if it's activeFrom is before or equal startFrom + // and the next phase's activeFrom is after startFrom. + + for i, phase := range phases { + isBillable := false + // TODO: maybe forEachRateCard or similar + for _, items := range phase.ItemsByKey { + for _, item := range items { + if item.Spec.RateCard.Price != nil { + isBillable = true + break + } + } + } + + if !isBillable { + continue + } + + if !phase.SubscriptionPhase.ActiveFrom.After(*startFrom) { + if i == len(phases)-1 { + return i, true + } + + if phases[i+1].SubscriptionPhase.ActiveFrom.After(*startFrom) { + return i, true + } + } + } + + return -1, false +} + +func timeCMP(a, b time.Time) int { + if a.Before(b) { + return -1 + } + + if a.After(b) { + return 1 + } + + return 0 +} diff --git a/openmeter/billing/worker/phaseiterator.go b/openmeter/billing/worker/phaseiterator.go new file mode 100644 index 000000000..8bb02fb6f --- /dev/null +++ b/openmeter/billing/worker/phaseiterator.go @@ -0,0 +1,196 @@ +package billingworker + +import ( + "fmt" + "slices" + "strings" + "time" + + "github.com/openmeterio/openmeter/openmeter/billing" + "github.com/openmeterio/openmeter/openmeter/subscription" +) + +type PhaseIterator struct { + subscriptionID string + phase subscription.SubscriptionPhaseView + iterationEnd time.Time + currentPhaseEnd *time.Time + + items []subscriptionItemView + periodIndexByKey map[string]int + + currentItem int +} + +type rateCardWithPeriod struct { + RateCard subscription.RateCard + Period billing.Period + UniqueID string +} + +type subscriptionItemView struct { + view []subscription.SubscriptionItemView + + lastGenerationTime time.Time + + // done is true if the item will not yield more periods + done bool +} + +// NewPhaseIterator creates a new PhaseIterator for the given phase and subscription +// +// It is guaranteed that all items that are starting before phaseEnd are returned. The call +// might return more items if needed, but it always honors the phase's end. +func NewPhaseIterator(phase subscription.SubscriptionPhaseView, subs subscription.SubscriptionView, end time.Time, currentPhaseEnd *time.Time) *PhaseIterator { + it := &PhaseIterator{ + phase: phase, + subscriptionID: subs.Subscription.ID, + iterationEnd: end, + // TODO: cleanup! + periodIndexByKey: make(map[string]int, len(phase.ItemsByKey)), + currentPhaseEnd: currentPhaseEnd, + } + + it.items = make([]subscriptionItemView, 0, len(phase.ItemsByKey)) + for _, items := range phase.ItemsByKey { + slices.SortFunc(items, func(i, j subscription.SubscriptionItemView) int { + return timeCMP(i.SubscriptionItem.ActiveFrom, j.SubscriptionItem.ActiveFrom) + }) + + it.items = append(it.items, subscriptionItemView{ + view: items, + }) + } + + return it +} + +func (it *PhaseIterator) GetMinimumPeriodEndAfter(t time.Time) time.Time { + panic("TODO") +} + +func (it *PhaseIterator) Generate() []rateCardWithPeriod { + out := []rateCardWithPeriod{} + iterationStartEpoch := it.phase.SubscriptionPhase.ActiveFrom + + for i := range it.items { + it.items[i].lastGenerationTime = iterationStartEpoch + } + + defer it.Reset() + + for i := range it.items { + itemsByKey := &it.items[i] + + slices.SortFunc(itemsByKey.view, func(i, j subscription.SubscriptionItemView) int { + return timeCMP(i.SubscriptionItem.ActiveFrom, j.SubscriptionItem.ActiveFrom) + }) + + for versionID, item := range itemsByKey.view { + start := item.SubscriptionItem.ActiveFrom + periodID := 0 + + stopAt := it.iterationEnd + if it.currentPhaseEnd != nil && it.currentPhaseEnd.Before(stopAt) { + stopAt = *it.currentPhaseEnd + } + + for { + end, _ := item.Spec.RateCard.BillingCadence.AddTo(start) + + if item.SubscriptionItem.ActiveTo != nil && item.SubscriptionItem.ActiveTo.Before(end) { + end = *item.SubscriptionItem.ActiveTo + } + + if it.currentPhaseEnd != nil && end.After(*it.currentPhaseEnd) { + end = *it.currentPhaseEnd + } + + generatedItem := rateCardWithPeriod{ + RateCard: item.Spec.RateCard, + Period: billing.Period{ + Start: start, + End: end, + }, + + // TODO: let's have a stable sorting on the items in case there are more than one in the subscriptionphaseview + // so that we are not changing the liens for each generation + UniqueID: strings.Join([]string{ + it.subscriptionID, + it.phase.SubscriptionPhase.Key, + item.Spec.ItemKey, + fmt.Sprintf("v[%d]", versionID), + fmt.Sprintf("period[%d]", periodID), + }, "/"), + } + + out = append(out, generatedItem) + + periodID++ + start = end + + // Either we have reached the end of the phase + if it.currentPhaseEnd != nil && !start.Before(*it.currentPhaseEnd) { + break + } + + // We have reached the end of the active range + if item.SubscriptionItem.ActiveTo != nil && !start.Before(*item.SubscriptionItem.ActiveTo) { + break + } + + // Or we have reached the iteration end + if !start.Before(it.iterationEnd) { + break + } + } + + } + + } + return out +} + +/* +// shouldYield generates an item with a period to compensate for any active from/active to overrides +// it returns true if the item should be yielded, false otherwise + + func (i *PhaseIterator) shouldYield(generatedItem rateCardWithPeriod, item *subscriptionItemView) (rateCardWithPeriod, bool) { + // Stage 1: Filtering + if !generatedItem.Period.End.After(item.SubscriptionItem.ActiveFrom) { + // This item is not really present in the phase, let's just skip it + return generatedItem, false + } + + if item.SubscriptionItem.ActiveTo != nil && !generatedItem.Period.Start.Before(*item.SubscriptionItem.ActiveTo) { + // This item is not active yet, let's skip it + return generatedItem, false + } + + // Let's compensate for any active from/active to overrides + if item.SubscriptionItem.ActiveFrom.After(generatedItem.Period.Start) { + generatedItem.Period.Start = item.SubscriptionItem.ActiveFrom + } + + if item.SubscriptionItem.ActiveTo != nil && item.SubscriptionItem.ActiveTo.Before(generatedItem.Period.End) { + generatedItem.Period.End = *item.SubscriptionItem.ActiveTo + } + + return generatedItem, true + } +*/ +func (i *PhaseIterator) Reset() { + // TODO +} + +/* +func (i *PhaseIterator) areAllItemsDone() bool { + for _, item := range i.flattenedItems { + if !item.done { + return false + } + } + + return true +} +*/ diff --git a/openmeter/billing/worker/phaseiterator_test.go b/openmeter/billing/worker/phaseiterator_test.go new file mode 100644 index 000000000..95ebade12 --- /dev/null +++ b/openmeter/billing/worker/phaseiterator_test.go @@ -0,0 +1,266 @@ +package billingworker + +import ( + "fmt" + "testing" + "time" + + "github.com/openmeterio/openmeter/openmeter/productcatalog" + "github.com/openmeterio/openmeter/openmeter/subscription" + "github.com/openmeterio/openmeter/pkg/datex" + "github.com/openmeterio/openmeter/pkg/models" + "github.com/samber/lo" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +const NotSet = "" + +type PhaseIteratorTestSuite struct { + *require.Assertions + suite.Suite +} + +func TestPhaseIterator(t *testing.T) { + suite.Run(t, new(PhaseIteratorTestSuite)) +} + +func (s *PhaseIteratorTestSuite) SetupSuite() { + s.Assertions = require.New(s.T()) +} + +func (s *PhaseIteratorTestSuite) newSIWWithBillingCadence(itemKey string, cadence string) subscription.SubscriptionItemView { + return subscription.SubscriptionItemView{ + Spec: subscription.SubscriptionItemSpec{ + CreateSubscriptionItemInput: subscription.CreateSubscriptionItemInput{ + CreateSubscriptionItemPlanInput: subscription.CreateSubscriptionItemPlanInput{ + ItemKey: itemKey, + RateCard: subscription.RateCard{ + Price: productcatalog.NewPriceFrom(productcatalog.FlatPrice{}), + BillingCadence: lo.ToPtr(datex.MustParse(s.T(), cadence)), + }, + }, + }, + }, + } +} + +func (s *PhaseIteratorTestSuite) newSIWWithBillingCadenceActiveFromTo(itemKey string, cadence string, activeFrom string, activeTo string) subscription.SubscriptionItemView { + out := s.newSIWWithBillingCadence(itemKey, cadence) + + // TODO: validate if subscription fills this properly + if activeFrom != "" { + out.SubscriptionItem.ActiveFrom = lo.Must(time.Parse(time.RFC3339, activeFrom)) + } + + if activeTo != "" { + out.SubscriptionItem.ActiveTo = lo.ToPtr(lo.Must(time.Parse(time.RFC3339, activeTo))) + } + + return out +} + +type expectedIterations struct { + Start time.Time + End time.Time + Key string +} + +func (s *PhaseIteratorTestSuite) mustParseTime(t string) time.Time { + return lo.Must(time.Parse(time.RFC3339, t)) +} + +func (s *PhaseIteratorTestSuite) TestPhaseIterator() { + tcs := []struct { + name string + items []subscription.SubscriptionItemView + end time.Time + expected []expectedIterations + phaseEnd *time.Time + }{ + { + name: "empty", + items: []subscription.SubscriptionItemView{}, + end: s.mustParseTime("2021-01-01T00:00:00Z"), + expected: []expectedIterations{}, + }, + { + name: "sanity", + items: []subscription.SubscriptionItemView{ + s.newSIWWithBillingCadence("item-key", "P1D"), + }, + end: s.mustParseTime("2021-01-03T00:00:00Z"), + expected: []expectedIterations{ + { + Start: s.mustParseTime("2021-01-01T00:00:00Z"), + End: s.mustParseTime("2021-01-02T00:00:00Z"), + Key: "subID/phase-test/item-key/v[0]/period[0]", + }, + { + Start: s.mustParseTime("2021-01-02T00:00:00Z"), + End: s.mustParseTime("2021-01-03T00:00:00Z"), + Key: "subID/phase-test/item-key/v[0]/period[1]", + }, + }, + }, + { + name: "sanity-phase-end", + items: []subscription.SubscriptionItemView{ + s.newSIWWithBillingCadence("item-key", "P1D"), + }, + end: s.mustParseTime("2021-01-03T00:00:00Z"), + expected: []expectedIterations{ + { + Start: s.mustParseTime("2021-01-01T00:00:00Z"), + End: s.mustParseTime("2021-01-02T00:00:00Z"), + Key: "subID/phase-test/item-key/v[0]/period[0]", + }, + { + Start: s.mustParseTime("2021-01-02T00:00:00Z"), + End: s.mustParseTime("2021-01-02T15:00:00Z"), + Key: "subID/phase-test/item-key/v[0]/period[1]", + }, + }, + phaseEnd: lo.ToPtr(s.mustParseTime("2021-01-02T15:00:00Z")), + }, + { + name: "different cadence", + items: []subscription.SubscriptionItemView{ + s.newSIWWithBillingCadence("item-key-1d", "P1D"), + s.newSIWWithBillingCadence("item-key-2d", "P2D"), + }, + end: s.mustParseTime("2021-01-04T00:00:00Z"), + expected: []expectedIterations{ + { + Start: s.mustParseTime("2021-01-01T00:00:00Z"), + End: s.mustParseTime("2021-01-02T00:00:00Z"), + Key: "subID/phase-test/item-key-1d/v[0]/period[0]", + }, + { + Start: s.mustParseTime("2021-01-02T00:00:00Z"), + End: s.mustParseTime("2021-01-03T00:00:00Z"), + Key: "subID/phase-test/item-key-1d/v[0]/period[1]", + }, + { + Start: s.mustParseTime("2021-01-03T00:00:00Z"), + End: s.mustParseTime("2021-01-04T00:00:00Z"), + Key: "subID/phase-test/item-key-1d/v[0]/period[2]", + }, + { + Start: s.mustParseTime("2021-01-01T00:00:00Z"), + End: s.mustParseTime("2021-01-03T00:00:00Z"), + Key: "subID/phase-test/item-key-2d/v[0]/period[0]", + }, + { + Start: s.mustParseTime("2021-01-03T00:00:00Z"), + End: s.mustParseTime("2021-01-05T00:00:00Z"), + Key: "subID/phase-test/item-key-2d/v[0]/period[1]", + }, + }, + }, + { + // Note: this happens on subscription updates, but the active to/from is always disjunct + name: "active-from-to-matching-period", + items: []subscription.SubscriptionItemView{ + s.newSIWWithBillingCadenceActiveFromTo("item-key", "P1D", NotSet, "2021-01-02T00:00:00Z"), + s.newSIWWithBillingCadenceActiveFromTo("item-key", "P1D", "2021-01-02T00:00:00Z", NotSet), + }, + end: s.mustParseTime("2021-01-03T00:00:00Z"), + expected: []expectedIterations{ + { + Start: s.mustParseTime("2021-01-01T00:00:00Z"), + End: s.mustParseTime("2021-01-02T00:00:00Z"), + Key: "subID/phase-test/item-key/v[0]/period[0]", + }, + { + Start: s.mustParseTime("2021-01-02T00:00:00Z"), + End: s.mustParseTime("2021-01-03T00:00:00Z"), + Key: "subID/phase-test/item-key/v[1]/period[0]", + }, + }, + }, + { + // Note: this happens on subscription updates, but the active to/from is always disjunct + name: "active-from-to-matching-period", + items: []subscription.SubscriptionItemView{ + s.newSIWWithBillingCadenceActiveFromTo("item-key", "P1D", "2021-01-02T20:00:00Z", NotSet), + s.newSIWWithBillingCadenceActiveFromTo("item-key", "P1D", NotSet, "2021-01-02T20:00:00Z"), + }, + end: s.mustParseTime("2021-01-03T00:00:00Z"), + expected: []expectedIterations{ + { + Start: s.mustParseTime("2021-01-01T00:00:00Z"), + End: s.mustParseTime("2021-01-02T00:00:00Z"), + Key: "subID/phase-test/item-key/v[0]/period[0]", + }, + { + Start: s.mustParseTime("2021-01-02T00:00:00Z"), + End: s.mustParseTime("2021-01-02T20:00:00Z"), + Key: "subID/phase-test/item-key/v[0]/period[1]", + }, + { + Start: s.mustParseTime("2021-01-02T20:00:00Z"), + End: s.mustParseTime("2021-01-03T20:00:00Z"), + Key: "subID/phase-test/item-key/v[1]/period[0]", + }, + }, + }, + // TODO: let's add flat fee tests + // - flat fee with cadence (recurring) + // - flat fee without cadence (one-time, in arrears) => only if we have phase end set + // - flat fee without cadence (one-time, in advance) => ok + } + + for _, tc := range tcs { + s.Run(tc.name, func() { + phase := subscription.SubscriptionPhaseView{ + SubscriptionPhase: subscription.SubscriptionPhase{ + ActiveFrom: lo.Must(time.Parse(time.RFC3339, "2021-01-01T00:00:00Z")), + Key: "phase-test", + }, + ItemsByKey: map[string][]subscription.SubscriptionItemView{}, + } + + for _, item := range tc.items { + if item.SubscriptionItem.ActiveFrom.IsZero() { + item.SubscriptionItem.ActiveFrom = phase.SubscriptionPhase.ActiveFrom + } + + phase.ItemsByKey[item.Spec.ItemKey] = append(phase.ItemsByKey[item.Spec.ItemKey], item) + } + + it := NewPhaseIterator( + phase, + subscription.SubscriptionView{ + Subscription: subscription.Subscription{ + NamespacedID: models.NamespacedID{ + ID: "subID", + }, + }, + }, + tc.end, + tc.phaseEnd, + ) + + out := it.Generate() + + outAsExpect := make([]expectedIterations, 0, len(out)) + for i, item := range out { + + fmt.Printf("out[%d]: [%s..%s] %s\n", i, item.Period.Start, item.Period.End, item.UniqueID) + + outAsExpect = append(outAsExpect, expectedIterations{ + Start: item.Period.Start, + End: item.Period.End, + Key: item.UniqueID, + }) + } + + for i, item := range tc.expected { + fmt.Printf("expected[%d]: [%s..%s] %s\n", i, item.Start, item.End, item.Key) + } + + s.ElementsMatch(tc.expected, outAsExpect) + }) + } +} diff --git a/openmeter/billing/worker/scanario_test.go b/openmeter/billing/worker/scanario_test.go new file mode 100644 index 000000000..087b8b194 --- /dev/null +++ b/openmeter/billing/worker/scanario_test.go @@ -0,0 +1,327 @@ +package billingworker + +import ( + "context" + "fmt" + "log/slog" + "testing" + "time" + + "github.com/alpacahq/alpacadecimal" + "github.com/invopop/gobl/currency" + "github.com/openmeterio/openmeter/openmeter/credit" + grantrepo "github.com/openmeterio/openmeter/openmeter/credit/adapter" + customerentity "github.com/openmeterio/openmeter/openmeter/customer/entity" + enttx "github.com/openmeterio/openmeter/openmeter/ent/tx" + "github.com/openmeterio/openmeter/openmeter/entitlement" + entitlementrepo "github.com/openmeterio/openmeter/openmeter/entitlement/adapter" + booleanentitlement "github.com/openmeterio/openmeter/openmeter/entitlement/boolean" + meteredentitlement "github.com/openmeterio/openmeter/openmeter/entitlement/metered" + staticentitlement "github.com/openmeterio/openmeter/openmeter/entitlement/static" + "github.com/openmeterio/openmeter/openmeter/productcatalog" + "github.com/openmeterio/openmeter/openmeter/productcatalog/feature" + "github.com/openmeterio/openmeter/openmeter/productcatalog/plan" + planadapter "github.com/openmeterio/openmeter/openmeter/productcatalog/plan/adapter" + planservice "github.com/openmeterio/openmeter/openmeter/productcatalog/plan/service" + plansubscription "github.com/openmeterio/openmeter/openmeter/productcatalog/subscription" + productcatalogsubscription "github.com/openmeterio/openmeter/openmeter/productcatalog/subscription" + "github.com/openmeterio/openmeter/openmeter/subscription" + subscriptionentitlementadatapter "github.com/openmeterio/openmeter/openmeter/subscription/adapters/entitlement" + subscriptionrepo "github.com/openmeterio/openmeter/openmeter/subscription/repo" + subscriptionservice "github.com/openmeterio/openmeter/openmeter/subscription/service" + "github.com/openmeterio/openmeter/openmeter/watermill/eventbus" + "github.com/openmeterio/openmeter/pkg/currencyx" + "github.com/openmeterio/openmeter/pkg/datex" + "github.com/openmeterio/openmeter/pkg/models" + billingtest "github.com/openmeterio/openmeter/test/billing" + "github.com/samber/lo" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + "gopkg.in/yaml.v3" +) + +type SubscriptionHandlerTestSuite struct { + billingtest.BaseSuite + + PlanService plan.Service + SubscriptionService subscription.Service + SubscrpiptionPlanAdapter plansubscription.Adapter + SubscriptionWorkflowService subscription.WorkflowService +} + +func (s *SubscriptionHandlerTestSuite) SetupSuite() { + s.BaseSuite.SetupSuite() + + planAdapter, err := planadapter.New(planadapter.Config{ + Client: s.DBClient, + Logger: slog.Default(), + }) + s.NoError(err) + + planService, err := planservice.New(planservice.Config{ + Feature: s.FeatureService, + Adapter: planAdapter, + Logger: slog.Default(), + }) + s.NoError(err) + + s.PlanService = planService + + subsRepo := subscriptionrepo.NewSubscriptionRepo(s.DBClient) + subsItemRepo := subscriptionrepo.NewSubscriptionItemRepo(s.DBClient) + + s.SubscriptionService = subscriptionservice.New(subscriptionservice.ServiceConfig{ + SubscriptionRepo: subsRepo, + SubscriptionPhaseRepo: subscriptionrepo.NewSubscriptionPhaseRepo(s.DBClient), + SubscriptionItemRepo: subsItemRepo, + // connectors + CustomerService: s.CustomerService, + // adapters + EntitlementAdapter: subscriptionentitlementadatapter.NewSubscriptionEntitlementAdapter( + s.SetupEntitlements(), + subsItemRepo, + subsRepo, + ), + // framework + TransactionManager: subsRepo, + }) + + s.SubscrpiptionPlanAdapter = plansubscription.NewPlanSubscriptionAdapter(plansubscription.PlanSubscriptionAdapterConfig{ + PlanService: planService, + Logger: slog.Default(), + }) + + s.SubscriptionWorkflowService = subscriptionservice.NewWorkflowService(subscriptionservice.WorkflowServiceConfig{ + Service: s.SubscriptionService, + CustomerService: s.CustomerService, + TransactionManager: subsRepo, + }) +} + +func (s *SubscriptionHandlerTestSuite) SetupEntitlements() entitlement.Connector { + // Init grants/credit + grantRepo := grantrepo.NewPostgresGrantRepo(s.DBClient) + balanceSnapshotRepo := grantrepo.NewPostgresBalanceSnapshotRepo(s.DBClient) + + // Init entitlements + entitlementRepo := entitlementrepo.NewPostgresEntitlementRepo(s.DBClient) + usageResetRepo := entitlementrepo.NewPostgresUsageResetRepo(s.DBClient) + + mockPublisher := eventbus.NewMock(s.T()) + + owner := meteredentitlement.NewEntitlementGrantOwnerAdapter( + s.FeatureRepo, + entitlementRepo, + usageResetRepo, + s.MeterRepo, + slog.Default(), + ) + + transactionManager := enttx.NewCreator(s.DBClient) + + creditConnector := credit.NewCreditConnector( + grantRepo, + balanceSnapshotRepo, + owner, + s.MockStreamingConnector, + slog.Default(), + time.Minute, + mockPublisher, + transactionManager, + ) + + meteredEntitlementConnector := meteredentitlement.NewMeteredEntitlementConnector( + s.MockStreamingConnector, + owner, + creditConnector, + creditConnector, + grantRepo, + entitlementRepo, + mockPublisher, + ) + + staticEntitlementConnector := staticentitlement.NewStaticEntitlementConnector() + booleanEntitlementConnector := booleanentitlement.NewBooleanEntitlementConnector() + + return entitlement.NewEntitlementConnector( + entitlementRepo, + s.FeatureService, + s.MeterRepo, + meteredEntitlementConnector, + staticEntitlementConnector, + booleanEntitlementConnector, + mockPublisher, + ) +} + +func TestSubscriptionHandlerScenarios(t *testing.T) { + suite.Run(t, new(SubscriptionHandlerTestSuite)) +} + +func (s *SubscriptionHandlerTestSuite) TestSubscriptionHappyPath() { + ctx := context.Background() + namespace := "test-subs-happy-path" + start := time.Now() + + s.MeterRepo.ReplaceMeters(ctx, []models.Meter{ + { + Namespace: namespace, + Slug: "api-requests-total", + WindowSize: models.WindowSizeMinute, + Aggregation: models.MeterAggregationSum, + }, + }) + + apiRequestsTotalFeatureKey := "api-requests-total" + + apiRequestsTotalFeature, err := s.FeatureService.CreateFeature(ctx, feature.CreateFeatureInputs{ + Namespace: namespace, + Name: "api-requests-total", + Key: apiRequestsTotalFeatureKey, + MeterSlug: lo.ToPtr("api-requests-total"), + }) + s.NoError(err) + + customerEntity, err := s.CustomerService.CreateCustomer(ctx, customerentity.CreateCustomerInput{ + Namespace: namespace, + + CustomerMutate: customerentity.CustomerMutate{ + Name: "Test Customer", + PrimaryEmail: lo.ToPtr("test@test.com"), + BillingAddress: &models.Address{ + Country: lo.ToPtr(models.CountryCode("US")), + }, + Currency: lo.ToPtr(currencyx.Code(currency.USD)), + UsageAttribution: customerentity.CustomerUsageAttribution{ + SubjectKeys: []string{"test"}, + }, + }, + }) + require.NoError(s.T(), err) + require.NotNil(s.T(), customerEntity) + require.NotEmpty(s.T(), customerEntity.ID) + + plan, err := s.PlanService.CreatePlan(ctx, plan.CreatePlanInput{ + NamespacedModel: models.NamespacedModel{ + Namespace: namespace, + }, + Plan: productcatalog.Plan{ + PlanMeta: productcatalog.PlanMeta{ + Name: "Test Plan", + Key: "test-plan", + Version: 1, + Currency: currency.USD, + }, + + Phases: []productcatalog.Phase{ + { + PhaseMeta: productcatalog.PhaseMeta{ + Name: "free trial", + Key: "free-trial", + StartAfter: datex.MustParse(s.T(), "P0D"), + }, + // TODO: let's add discount handling (as this could be a 100% discount for the first month) + RateCards: productcatalog.RateCards{ + &productcatalog.UsageBasedRateCard{ + RateCardMeta: productcatalog.RateCardMeta{ + Key: apiRequestsTotalFeatureKey, + Name: apiRequestsTotalFeatureKey, + Feature: &apiRequestsTotalFeature, + }, + BillingCadence: datex.MustParse(s.T(), "P1M"), + }, + }, + }, + { + PhaseMeta: productcatalog.PhaseMeta{ + Name: "discounted phase", + Key: "discounted phase", + StartAfter: datex.MustParse(s.T(), "P1M"), + }, + // TODO: 50% discount + RateCards: productcatalog.RateCards{ + &productcatalog.UsageBasedRateCard{ + RateCardMeta: productcatalog.RateCardMeta{ + Key: apiRequestsTotalFeatureKey, + Name: apiRequestsTotalFeatureKey, + Feature: &apiRequestsTotalFeature, + Price: productcatalog.NewPriceFrom(productcatalog.UnitPrice{ + Amount: alpacadecimal.NewFromFloat(5), + }), + }, + BillingCadence: datex.MustParse(s.T(), "P1M"), + }, + }, + }, + { + PhaseMeta: productcatalog.PhaseMeta{ + Name: "final phase", + Key: "final phase", + StartAfter: datex.MustParse(s.T(), "P3M"), + }, + RateCards: productcatalog.RateCards{ + &productcatalog.UsageBasedRateCard{ + RateCardMeta: productcatalog.RateCardMeta{ + Key: apiRequestsTotalFeatureKey, + Name: apiRequestsTotalFeatureKey, + Feature: &apiRequestsTotalFeature, + Price: productcatalog.NewPriceFrom(productcatalog.UnitPrice{ + Amount: alpacadecimal.NewFromFloat(10), + }), + }, + BillingCadence: datex.MustParse(s.T(), "P1M"), + }, + }, + }, + }, + }, + }) + + s.NoError(err) + s.NotNil(plan) + + subscriptionPlan, err := s.SubscrpiptionPlanAdapter.GetVersion(ctx, namespace, productcatalogsubscription.PlanRefInput{ + Key: plan.Key, + Version: lo.ToPtr(1), // TODO: what is the expected behavior if version is nil?, right now it just throws an + }) + + subsView, err := s.SubscriptionWorkflowService.CreateFromPlan(ctx, subscription.CreateSubscriptionWorkflowInput{ + Namespace: namespace, + ActiveFrom: start, + CustomerID: customerEntity.ID, + Name: "subs-1", + }, subscriptionPlan) + + s.NoError(err) + s.NotNil(subsView) + + freeTierPhase := getPhraseByKey(s.T(), subsView, "free-trial") + s.Equal(lo.ToPtr(datex.MustParse(s.T(), "P1M")), freeTierPhase.ItemsByKey[apiRequestsTotalFeatureKey][0].Spec.RateCard.BillingCadence) + + upcomingLineItems, err := GetUpcomingLineItems(ctx, GetUpcomingLineItemsInput{ + SubscriptionView: subsView, + // StartFrom: start, + }) + + linesString, err := yaml.Marshal(upcomingLineItems) + s.NoError(err) + fmt.Println(string(linesString)) + + // TODO: remove, this is just debugging output + json, err := yaml.Marshal(subsView) + s.NoError(err) + fmt.Println(string(json)) + + s.T().Fail() +} + +func getPhraseByKey(t *testing.T, subsView subscription.SubscriptionView, key string) subscription.SubscriptionPhaseView { + for _, phase := range subsView.Phases { + if phase.SubscriptionPhase.Key == key { + return phase + } + } + + t.Fatalf("phase with key %s not found", key) + return subscription.SubscriptionPhaseView{} +} diff --git a/test/billing/suite.go b/test/billing/suite.go index 00eeabd8e..b537d25b1 100644 --- a/test/billing/suite.go +++ b/test/billing/suite.go @@ -62,6 +62,7 @@ func (s *BaseSuite) SetupSuite() { // init db dbClient := db.NewClient(db.Driver(s.TestDB.EntDriver.Driver())) + s.DBClient = dbClient if os.Getenv("TEST_DISABLE_ATLAS") != "" { s.Require().NoError(dbClient.Schema.Create(context.Background()))