Skip to content

Commit

Permalink
feat: billing subscription handler
Browse files Browse the repository at this point in the history
  • Loading branch information
turip committed Dec 10, 2024
1 parent 5e03f46 commit 009abef
Show file tree
Hide file tree
Showing 7 changed files with 748 additions and 4 deletions.
8 changes: 8 additions & 0 deletions openmeter/billing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,11 @@ The entity's `ChildrenWithIDReuse` call can be used to facilitate the line reuse
Then the adapter layer will use those IDs to make decisions if they want to persist or recreate the records.

We could do the same logic in the adapter layer, but this approach makes it more flexible on the calculation layer if we want to generate new lines or not. If this becomes a burden we can do the same matching logic as part of the upsert logic in adapter.

## Subscription adapter

The subscription adapter is responsible for feeding the billing with line items during the subscription's lifecycle. The generation of items is event-driven, new items are yielded when:
- A subscription is created
- A new invoice is created
- A subscription is modified

92 changes: 92 additions & 0 deletions openmeter/billing/subscriptionhandler/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package subscriptionhandler

import (
"context"
"fmt"
"slices"
"time"

"github.com/openmeterio/openmeter/openmeter/billing"
"github.com/openmeterio/openmeter/openmeter/subscription"
)

type GetUpcomingLineItemsInput struct {
SubscriptionView subscription.SubscriptionView
StartFrom *time.Time

Customer billing.ProfileWithCustomerDetails
}

func GetUpcomingLineItems(ctx context.Context, in GetUpcomingLineItemsInput) ([]billing.Line, error) {
// Given we are event-driven we should at least yield one line item. If that's assigned to
// a new invoice, this function can be triggered again.

slices.SortFunc(in.SubscriptionView.Phases, func(i, j subscription.SubscriptionPhaseView) int {
switch {
case i.SubscriptionPhase.ActiveFrom.Before(j.SubscriptionPhase.ActiveFrom):
return -1
case i.SubscriptionPhase.ActiveFrom.After(j.SubscriptionPhase.ActiveFrom):
return 1
default:
return 0
}
})

// Let's identify the first phase that is invoicable
firstInvoicablePhaseIdx, found := findFirstInvoicablePhase(in.SubscriptionView.Phases, in.StartFrom)
if !found {
// There are no invoicable items in the subscription, so we can return an empty list
// If the subscription has changed we will just recalculate the line items with the updated
// contents.
return nil, nil
}

// Let's find out the limit of the generation. As a rule of thumb, we should have at least one line per
// invoicable item.

switch in.Customer.Profile.WorkflowConfig.Collection.Alignment {
case billing.AlignmentKindSubscription:
// In this case, the end of the generation end will be
default:
return nil, fmt.Errorf("unsupported alignment type: %s", in.Customer.Profile.WorkflowConfig.Collection.Alignment)
}

for i := firstInvoicablePhaseIdx; i < len(in.SubscriptionView.Phases); i++ {
}

return nil, nil
}

func findFirstInvoicablePhase(phases []subscription.SubscriptionPhaseView, startFrom *time.Time) (int, bool) {
// A phase is invoicable, if it has any items that has price objects set, and if it's activeFrom is before or equal startFrom
// and the next phase's activeFrom is after startFrom.

for i, phase := range phases {
isBillable := false
// TODO: maybe forEachRateCard or similar
for _, items := range phase.ItemsByKey {
for _, item := range items {
if item.Spec.RateCard.Price != nil {
isBillable = true
break
}
}
}

if !isBillable {
continue
}

if !phase.SubscriptionPhase.ActiveFrom.After(*startFrom) {
if i == len(phases)-1 {
return i, true
}

if phases[i+1].SubscriptionPhase.ActiveFrom.After(*startFrom) {
return i, true
}
}
}

return -1, false
}
198 changes: 198 additions & 0 deletions openmeter/billing/subscriptionhandler/phaseiterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
package subscriptionhandler

import (
"fmt"
"iter"
"strings"
"time"

"github.com/openmeterio/openmeter/openmeter/billing"
"github.com/openmeterio/openmeter/openmeter/subscription"
"github.com/openmeterio/openmeter/pkg/datex"
)

type PhaseIterator struct {
subscriptionID string
phase subscription.SubscriptionPhaseView
iterationEnd time.Time

flattenedItems []subscriptionItemView
periodIndexByKey map[string]int

currentItem int
}

type rateCardWithPeriod struct {
RateCard subscription.RateCard
Period billing.Period
UniqueID string
}

type subscriptionItemView struct {
subscription.SubscriptionItemView

lastGenerationTime time.Time
// index stores the index of the item in the subscriptionPhaseView's ItemsByKey
index int

// done is true if the item will not yield more periods
done bool
}

// NewPhaseIterator creates a new PhaseIterator for the given phase and subscription
//
// It is guaranteed that all items that are starting before phaseEnd are returned. The call
// might return more items if needed, but it always honors the phase's end.
func NewPhaseIterator(phase subscription.SubscriptionPhaseView, subs subscription.SubscriptionView, end time.Time) *PhaseIterator {
it := &PhaseIterator{
phase: phase,
subscriptionID: subs.Subscription.ID,
iterationEnd: end,
periodIndexByKey: make(map[string]int, len(phase.ItemsByKey)),
}

it.flattenedItems = make([]subscriptionItemView, 0, len(phase.ItemsByKey))
for _, items := range phase.ItemsByKey {
for i, item := range items {
it.flattenedItems = append(it.flattenedItems, subscriptionItemView{
SubscriptionItemView: item,
index: i,
})
}
}

return it
}

func (it *PhaseIterator) GetMinimumPeriodEndAfter(t time.Time) time.Time {
panic("TODO")
}

func (it *PhaseIterator) Seq() iter.Seq[rateCardWithPeriod] {
// Let's find the maximum billing cadence of an item, this will be the limit of a single pass
// of generation per item

maxCadence := datex.Period{}
for _, item := range it.flattenedItems {
if item.Spec.RateCard.BillingCadence.DurationApprox() > maxCadence.DurationApprox() {
// TODO: When can this be nil?
// TODO: What about fee items or recurring fee items?
maxCadence = *item.Spec.RateCard.BillingCadence
}
}

if maxCadence.IsZero() {
// We cannot generate anything, as there is no cadence and the algorithm would just
// loop infinitely
return func(yield func(rateCardWithPeriod) bool) {
return
}
}

return func(yield func(rateCardWithPeriod) bool) {
iterationStartEpoch := it.phase.SubscriptionPhase.ActiveFrom

for i := range it.flattenedItems {
it.flattenedItems[i].lastGenerationTime = iterationStartEpoch
}

defer it.Reset()

for {
for i := range it.flattenedItems {
item := &it.flattenedItems[i]
haltAfter, _ := maxCadence.AddTo(item.lastGenerationTime)

// TODO: active from to overrides
for {
itemPeriodStart := item.lastGenerationTime
itemPeriodEnd, _ := item.Spec.RateCard.BillingCadence.AddTo(itemPeriodStart)

if !itemPeriodStart.Before(it.iterationEnd) {
// Phase ended we should stop

item.done = true
break
}

generatedItem := rateCardWithPeriod{
RateCard: item.Spec.RateCard,
Period: billing.Period{
Start: item.lastGenerationTime,
End: itemPeriodEnd,
},

// TODO: let's have a stable sorting on the items in case there are more than one in the subscriptionphaseview
// so that we are not changing the liens for each generation
UniqueID: strings.Join([]string{
it.subscriptionID,
it.phase.SubscriptionPhase.Key,
item.Spec.ItemKey,
fmt.Sprintf("period[%d]", it.periodIndexByKey[item.Spec.ItemKey]),
}, "/"),
}

// Let's compensate for any active from/active to overrides
generatedItem, shouldYield := it.shouldYield(generatedItem, item)
if shouldYield {
if !yield(generatedItem) {
return
}

it.periodIndexByKey[item.Spec.ItemKey]++
}

item.lastGenerationTime = itemPeriodEnd

if !itemPeriodEnd.Before(haltAfter) {
break
}
}
}

if it.areAllItemsDone() {
return
}
}
}
}

// shouldYield generates an item with a period to compensate for any active from/active to overrides
// it returns true if the item should be yielded, false otherwise
func (i *PhaseIterator) shouldYield(generatedItem rateCardWithPeriod, item *subscriptionItemView) (rateCardWithPeriod, bool) {
// Stage 1: Filtering
if !generatedItem.Period.End.After(item.SubscriptionItem.ActiveFrom) {
// This item is not really present in the phase, let's just skip it
return generatedItem, false
}

if item.SubscriptionItem.ActiveTo != nil && !generatedItem.Period.Start.Before(*item.SubscriptionItem.ActiveTo) {
// This item is not active yet, let's skip it
return generatedItem, false
}

// Let's compensate for any active from/active to overrides
if item.SubscriptionItem.ActiveFrom.After(generatedItem.Period.Start) {
generatedItem.Period.Start = item.SubscriptionItem.ActiveFrom
}

if item.SubscriptionItem.ActiveTo != nil && item.SubscriptionItem.ActiveTo.Before(generatedItem.Period.End) {
generatedItem.Period.End = *item.SubscriptionItem.ActiveTo
}

return generatedItem, true
}

func (i *PhaseIterator) Reset() {
// TODO
}

func (i *PhaseIterator) areAllItemsDone() bool {
for _, item := range i.flattenedItems {
if !item.done {
return false
}
}

return true
}
Loading

0 comments on commit 009abef

Please sign in to comment.