Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: billing subscription handler #1943

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions openmeter/billing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,12 @@ The entity's `ChildrenWithIDReuse` call can be used to facilitate the line reuse
Then the adapter layer will use those IDs to make decisions if they want to persist or recreate the records.

We could do the same logic in the adapter layer, but this approach makes it more flexible on the calculation layer if we want to generate new lines or not. If this becomes a burden we can do the same matching logic as part of the upsert logic in adapter.

## Subscription adapter

The subscription adapter is responsible for feeding the billing with line items during the subscription's lifecycle. The generation of items is event-driven, new items are yielded when:
- A subscription is created
- A new invoice is created
- A subscription is modified

TODO: for new versions the previous line will be split and will not get the stuff from the other side -> this means that the unique id should encode the version of the item
98 changes: 98 additions & 0 deletions openmeter/billing/worker/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package billingworker

import (
"context"
"fmt"
"slices"
"time"

"github.com/openmeterio/openmeter/openmeter/billing"
"github.com/openmeterio/openmeter/openmeter/subscription"
)

type GetUpcomingLineItemsInput struct {
SubscriptionView subscription.SubscriptionView
StartFrom *time.Time

Customer billing.ProfileWithCustomerDetails
}

func GetUpcomingLineItems(ctx context.Context, in GetUpcomingLineItemsInput) ([]billing.Line, error) {
// Given we are event-driven we should at least yield one line item. If that's assigned to
// a new invoice, this function can be triggered again.

// TODO: there's a builtin for this
slices.SortFunc(in.SubscriptionView.Phases, func(i, j subscription.SubscriptionPhaseView) int {
return timeCMP(i.SubscriptionPhase.ActiveFrom, j.SubscriptionPhase.ActiveFrom)
})

// Let's identify the first phase that is invoicable
firstInvoicablePhaseIdx, found := findFirstInvoicablePhase(in.SubscriptionView.Phases, in.StartFrom)
if !found {
// There are no invoicable items in the subscription, so we can return an empty list
// If the subscription has changed we will just recalculate the line items with the updated
// contents.
return nil, nil
}

// Let's find out the limit of the generation. As a rule of thumb, we should have at least one line per
// invoicable item.

switch in.Customer.Profile.WorkflowConfig.Collection.Alignment {
case billing.AlignmentKindSubscription:
// In this case, the end of the generation end will be
default:
return nil, fmt.Errorf("unsupported alignment type: %s", in.Customer.Profile.WorkflowConfig.Collection.Alignment)
}

for i := firstInvoicablePhaseIdx; i < len(in.SubscriptionView.Phases); i++ {
}

return nil, nil
}

func findFirstInvoicablePhase(phases []subscription.SubscriptionPhaseView, startFrom *time.Time) (int, bool) {
// A phase is invoicable, if it has any items that has price objects set, and if it's activeFrom is before or equal startFrom
// and the next phase's activeFrom is after startFrom.

for i, phase := range phases {
isBillable := false
// TODO: maybe forEachRateCard or similar
for _, items := range phase.ItemsByKey {
for _, item := range items {
if item.Spec.RateCard.Price != nil {
isBillable = true
break
}
}
}

if !isBillable {
continue
}

if !phase.SubscriptionPhase.ActiveFrom.After(*startFrom) {
if i == len(phases)-1 {
return i, true
}

if phases[i+1].SubscriptionPhase.ActiveFrom.After(*startFrom) {
return i, true
}
}
}

return -1, false
}

func timeCMP(a, b time.Time) int {
if a.Before(b) {
return -1
}

if a.After(b) {
return 1
}

return 0
}
196 changes: 196 additions & 0 deletions openmeter/billing/worker/phaseiterator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,196 @@
package billingworker

import (
"fmt"
"slices"
"strings"
"time"

"github.com/openmeterio/openmeter/openmeter/billing"
"github.com/openmeterio/openmeter/openmeter/subscription"
)

type PhaseIterator struct {
subscriptionID string
phase subscription.SubscriptionPhaseView
iterationEnd time.Time
currentPhaseEnd *time.Time

items []subscriptionItemView
periodIndexByKey map[string]int

currentItem int
}

type rateCardWithPeriod struct {
RateCard subscription.RateCard
Period billing.Period
UniqueID string
}

type subscriptionItemView struct {
view []subscription.SubscriptionItemView

lastGenerationTime time.Time

// done is true if the item will not yield more periods
done bool
}

// NewPhaseIterator creates a new PhaseIterator for the given phase and subscription
//
// It is guaranteed that all items that are starting before phaseEnd are returned. The call
// might return more items if needed, but it always honors the phase's end.
func NewPhaseIterator(phase subscription.SubscriptionPhaseView, subs subscription.SubscriptionView, end time.Time, currentPhaseEnd *time.Time) *PhaseIterator {
it := &PhaseIterator{
phase: phase,
subscriptionID: subs.Subscription.ID,
iterationEnd: end,
// TODO: cleanup!
periodIndexByKey: make(map[string]int, len(phase.ItemsByKey)),
currentPhaseEnd: currentPhaseEnd,
}

it.items = make([]subscriptionItemView, 0, len(phase.ItemsByKey))
for _, items := range phase.ItemsByKey {
slices.SortFunc(items, func(i, j subscription.SubscriptionItemView) int {
return timeCMP(i.SubscriptionItem.ActiveFrom, j.SubscriptionItem.ActiveFrom)
})

it.items = append(it.items, subscriptionItemView{
view: items,
})
}

return it
}

func (it *PhaseIterator) GetMinimumPeriodEndAfter(t time.Time) time.Time {
panic("TODO")
}

func (it *PhaseIterator) Generate() []rateCardWithPeriod {
out := []rateCardWithPeriod{}
iterationStartEpoch := it.phase.SubscriptionPhase.ActiveFrom

for i := range it.items {
it.items[i].lastGenerationTime = iterationStartEpoch
}

defer it.Reset()

for i := range it.items {
itemsByKey := &it.items[i]

slices.SortFunc(itemsByKey.view, func(i, j subscription.SubscriptionItemView) int {
return timeCMP(i.SubscriptionItem.ActiveFrom, j.SubscriptionItem.ActiveFrom)
})

for versionID, item := range itemsByKey.view {
start := item.SubscriptionItem.ActiveFrom
periodID := 0

stopAt := it.iterationEnd
if it.currentPhaseEnd != nil && it.currentPhaseEnd.Before(stopAt) {
stopAt = *it.currentPhaseEnd
}

for {
end, _ := item.Spec.RateCard.BillingCadence.AddTo(start)

if item.SubscriptionItem.ActiveTo != nil && item.SubscriptionItem.ActiveTo.Before(end) {
end = *item.SubscriptionItem.ActiveTo
}

if it.currentPhaseEnd != nil && end.After(*it.currentPhaseEnd) {
end = *it.currentPhaseEnd
}

generatedItem := rateCardWithPeriod{
RateCard: item.Spec.RateCard,
Period: billing.Period{
Start: start,
End: end,
},

// TODO: let's have a stable sorting on the items in case there are more than one in the subscriptionphaseview
// so that we are not changing the liens for each generation
UniqueID: strings.Join([]string{
it.subscriptionID,
it.phase.SubscriptionPhase.Key,
item.Spec.ItemKey,
fmt.Sprintf("v[%d]", versionID),
fmt.Sprintf("period[%d]", periodID),
}, "/"),
}

out = append(out, generatedItem)

periodID++
start = end

// Either we have reached the end of the phase
if it.currentPhaseEnd != nil && !start.Before(*it.currentPhaseEnd) {
break
}

// We have reached the end of the active range
if item.SubscriptionItem.ActiveTo != nil && !start.Before(*item.SubscriptionItem.ActiveTo) {
break
}

// Or we have reached the iteration end
if !start.Before(it.iterationEnd) {
break
}
}

}

}
return out
}

/*
// shouldYield generates an item with a period to compensate for any active from/active to overrides
// it returns true if the item should be yielded, false otherwise

func (i *PhaseIterator) shouldYield(generatedItem rateCardWithPeriod, item *subscriptionItemView) (rateCardWithPeriod, bool) {
// Stage 1: Filtering
if !generatedItem.Period.End.After(item.SubscriptionItem.ActiveFrom) {
// This item is not really present in the phase, let's just skip it
return generatedItem, false
}

if item.SubscriptionItem.ActiveTo != nil && !generatedItem.Period.Start.Before(*item.SubscriptionItem.ActiveTo) {
// This item is not active yet, let's skip it
return generatedItem, false
}

// Let's compensate for any active from/active to overrides
if item.SubscriptionItem.ActiveFrom.After(generatedItem.Period.Start) {
generatedItem.Period.Start = item.SubscriptionItem.ActiveFrom
}

if item.SubscriptionItem.ActiveTo != nil && item.SubscriptionItem.ActiveTo.Before(generatedItem.Period.End) {
generatedItem.Period.End = *item.SubscriptionItem.ActiveTo
}

return generatedItem, true
}
*/
func (i *PhaseIterator) Reset() {
// TODO
}

/*
func (i *PhaseIterator) areAllItemsDone() bool {
for _, item := range i.flattenedItems {
if !item.done {
return false
}
}

return true
}
*/
Loading
Loading