Skip to content

Commit

Permalink
feat: billing subscription handler
Browse files Browse the repository at this point in the history
  • Loading branch information
turip committed Dec 16, 2024
1 parent dc00372 commit 245fdd5
Show file tree
Hide file tree
Showing 27 changed files with 2,459 additions and 218 deletions.
9 changes: 9 additions & 0 deletions openmeter/billing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -149,3 +149,12 @@ The entity's `ChildrenWithIDReuse` call can be used to facilitate the line reuse
Then the adapter layer will use those IDs to make decisions if they want to persist or recreate the records.

We could do the same logic in the adapter layer, but this approach makes it more flexible on the calculation layer if we want to generate new lines or not. If this becomes a burden we can do the same matching logic as part of the upsert logic in adapter.

## Subscription adapter

The subscription adapter is responsible for feeding the billing with line items during the subscription's lifecycle. The generation of items is event-driven, new items are yielded when:
- A subscription is created
- A new invoice is created
- A subscription is modified

TODO: for new versions the previous line will be split and will not get the stuff from the other side -> this means that the unique id should encode the version of the item
1 change: 1 addition & 0 deletions openmeter/billing/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type InvoiceLineAdapter interface {
ListInvoiceLines(ctx context.Context, input ListInvoiceLinesAdapterInput) ([]*Line, error)
AssociateLinesToInvoice(ctx context.Context, input AssociateLinesToInvoiceAdapterInput) ([]*Line, error)
GetInvoiceLine(ctx context.Context, input GetInvoiceLineAdapterInput) (*Line, error)
GetLinesForSubscription(ctx context.Context, input GetLinesForSubscriptionInput) ([]*Line, error)

GetInvoiceLineOwnership(ctx context.Context, input GetInvoiceLineOwnershipAdapterInput) (GetOwnershipAdapterResponse, error)
}
Expand Down
19 changes: 14 additions & 5 deletions openmeter/billing/adapter/invoice.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (a *adapter) GetInvoiceById(ctx context.Context, in billing.GetInvoiceByIdI
WithBillingWorkflowConfig()

if in.Expand.Lines {
query = tx.expandInvoiceLineItems(query, in.Expand.DeletedLines)
query = tx.expandInvoiceLineItems(query, in.Expand)
}

invoice, err := query.Only(ctx)
Expand All @@ -63,15 +63,20 @@ func (a *adapter) GetInvoiceById(ctx context.Context, in billing.GetInvoiceByIdI
})
}

func (a *adapter) expandInvoiceLineItems(query *db.BillingInvoiceQuery, includeDeleted bool) *db.BillingInvoiceQuery {
func (a *adapter) expandInvoiceLineItems(query *db.BillingInvoiceQuery, expand billing.InvoiceExpand) *db.BillingInvoiceQuery {
return query.WithBillingInvoiceLines(func(q *db.BillingInvoiceLineQuery) {
if !includeDeleted {
if !expand.DeletedLines {
q = q.Where(billinginvoiceline.DeletedAtIsNil())
}

requestedStatuses := []billing.InvoiceLineStatus{billing.InvoiceLineStatusValid}
if expand.SplitLines {
requestedStatuses = append(requestedStatuses, billing.InvoiceLineStatusSplit)
}

q = q.Where(
// Detailed lines are sub-lines of a line and should not be included in the top-level invoice
billinginvoiceline.StatusIn(billing.InvoiceLineStatusValid),
billinginvoiceline.StatusIn(requestedStatuses...),
)

a.expandLineItems(q)
Expand Down Expand Up @@ -170,6 +175,10 @@ func (a *adapter) ListInvoices(ctx context.Context, input billing.ListInvoicesIn
query = query.Where(billinginvoice.StatusIn(input.ExtendedStatuses...))
}

if len(input.IDs) > 0 {
query = query.Where(billinginvoice.IDIn(input.IDs...))
}

if len(input.Statuses) > 0 {
query = query.Where(func(s *sql.Selector) {
s.Where(sql.Or(
Expand All @@ -190,7 +199,7 @@ func (a *adapter) ListInvoices(ctx context.Context, input billing.ListInvoicesIn
}

if input.Expand.Lines {
query = tx.expandInvoiceLineItems(query, input.Expand.DeletedLines)
query = tx.expandInvoiceLineItems(query, input.Expand)
}

switch input.OrderBy {
Expand Down
1 change: 1 addition & 0 deletions openmeter/billing/adapter/invoicelinemapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func (a *adapter) mapInvoiceLineFromDB(ctx context.Context, invoiceLines []*db.B
if err != nil {
return nil, err
}

mappedEntities[dbLine.ID] = &entity
}

Expand Down
28 changes: 27 additions & 1 deletion openmeter/billing/adapter/invoicelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (a *adapter) UpsertInvoiceLines(ctx context.Context, inputIn billing.Upsert
SetNillableInvoicingAppExternalID(lo.EmptyableToPtr(line.ExternalIDs.Invoicing))

if line.Subscription != nil {
create = create.SetSubscriptionID(line.Subscription.ItemID).
create = create.SetSubscriptionID(line.Subscription.SubscriptionID).
SetSubscriptionPhaseID(line.Subscription.PhaseID).
SetSubscriptionItemID(line.Subscription.ItemID)
}
Expand Down Expand Up @@ -492,3 +492,29 @@ func (a *adapter) GetInvoiceLineOwnership(ctx context.Context, in billing.GetInv
}, nil
})
}

func (a *adapter) GetLinesForSubscription(ctx context.Context, in billing.GetLinesForSubscriptionInput) ([]*billing.Line, error) {
if err := in.Validate(); err != nil {
return nil, billing.ValidationError{
Err: err,
}
}

return entutils.TransactingRepo(ctx, a, func(ctx context.Context, tx *adapter) ([]*billing.Line, error) {
query := tx.db.BillingInvoiceLine.Query().
Where(billinginvoiceline.Namespace(in.Namespace)).
Where(billinginvoiceline.SubscriptionID(in.SubscriptionID)).
// TODO: document issues with deleted lines
// Where(billinginvoiceline.DeletedAtIsNil()). TBD
Where(billinginvoiceline.ParentLineIDIsNil()) // This one is required so that we are not fetching split line's children directly, the mapper will handle that

query = tx.expandLineItems(query)

dbLines, err := query.All(ctx)
if err != nil {
return nil, fmt.Errorf("fetching lines: %w", err)
}

return tx.mapInvoiceLineFromDB(ctx, dbLines)
})
}
4 changes: 4 additions & 0 deletions openmeter/billing/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,8 @@ import "time"

const (
DefaultMeterResolution = time.Minute
// DefaultProratePercision is the default number of decimal places to round to when prorating, this should be
// at least one digit more than the smallest unit represented in the currency (e.g. 3 for USD)
// TODO[later]: This should come from the currency
DefaultProratePercision = 3
)
26 changes: 26 additions & 0 deletions openmeter/billing/invoice.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,15 @@ type InvoiceExpand struct {
WorkflowApps bool
Lines bool
DeletedLines bool
SplitLines bool
}

var InvoiceExpandAll = InvoiceExpand{
Preceding: true,
WorkflowApps: true,
Lines: true,
DeletedLines: false,
SplitLines: false,
}

func (e InvoiceExpand) Validate() error {
Expand All @@ -160,6 +162,11 @@ func (e InvoiceExpand) SetDeletedLines(v bool) InvoiceExpand {
return e
}

func (e InvoiceExpand) SetSplitLines(v bool) InvoiceExpand {
e.SplitLines = v
return e
}

type InvoiceBase struct {
Namespace string `json:"namespace"`
ID string `json:"id"`
Expand Down Expand Up @@ -373,6 +380,7 @@ type ListInvoicesInput struct {
pagination.Page

Namespace string
IDs []string
Customers []string
// Statuses searches by short InvoiceStatus (e.g. draft, issued)
Statuses []string
Expand Down Expand Up @@ -497,3 +505,21 @@ type GetOwnershipAdapterResponse struct {
}

type DeleteInvoiceInput = InvoiceID

type UpdateInvoiceLinesInternalInput struct {
Namespace string
CustomerID string
Lines []*Line
}

func (i UpdateInvoiceLinesInternalInput) Validate() error {
if i.Namespace == "" {
return errors.New("namespace is required")
}

if i.CustomerID == "" {
return errors.New("customer ID is required")
}

return nil
}
57 changes: 57 additions & 0 deletions openmeter/billing/invoiceline.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ func (p Period) Contains(t time.Time) bool {
return t.After(p.Start) && t.Before(p.End)
}

func (p Period) Duration() time.Duration {
return p.End.Sub(p.Start)
}

// LineBase represents the common fields for an invoice item.
type LineBase struct {
Namespace string `json:"namespace"`
Expand Down Expand Up @@ -409,6 +413,38 @@ func (i Line) ValidateUsageBased() error {
return nil
}

// DissacociateChildren removes the Children both from the DBState and the current line, so that the
// line can be safely persisted/managed without the children.
//
// The childrens receive DBState objects, so that they can be safely persisted/managed without the parent.
func (i *Line) DisassociateChildren() {
if i.Children.IsAbsent() {
return
}

// originalChildren := i.Children

i.Children = LineChildren{}
if i.DBState != nil {
i.DBState.Children = LineChildren{}
}
/*
TODO
if i.DBState != nil {
dbChildrenByID := lo.GroupBy(i.DBState.Children.OrEmpty(), func(l *Line) string {
return l.ID
})
currentChildren := originalChildren.OrEmpty()
for _, child := range currentChildren {
if dbChild, ok := dbChildrenByID[child.ID]; ok {
child.DBState = dbChild[0]
}
}
}
*/
}

// TODO[OM-1016]: For events we need a json marshaler
type LineChildren struct {
mo.Option[[]*Line]
Expand Down Expand Up @@ -471,7 +507,11 @@ func (c *LineChildren) ReplaceByID(id string, newLine *Line) bool {

for i, line := range lines {
if line.ID == id {
// Let's preserve the DB state of the original line (as we are only replacing the current state)
originalDBState := line.DBState

lines[i] = newLine
lines[i].DBState = originalDBState
return true
}
}
Expand Down Expand Up @@ -952,3 +992,20 @@ type GetInvoiceLineInput = LineID
type GetInvoiceLineOwnershipAdapterInput = LineID

type DeleteInvoiceLineInput = LineID

type GetLinesForSubscriptionInput struct {
Namespace string
SubscriptionID string
}

func (i GetLinesForSubscriptionInput) Validate() error {
if i.Namespace == "" {
return errors.New("namespace is required")
}

if i.SubscriptionID == "" {
return errors.New("subscription id is required")
}

return nil
}
9 changes: 9 additions & 0 deletions openmeter/billing/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ type CustomerOverrideService interface {
type InvoiceLineService interface {
CreatePendingInvoiceLines(ctx context.Context, input CreateInvoiceLinesInput) ([]*Line, error)
GetInvoiceLine(ctx context.Context, input GetInvoiceLineInput) (*Line, error)
GetLinesForSubscription(ctx context.Context, input GetLinesForSubscriptionInput) ([]*Line, error)
UpdateInvoiceLine(ctx context.Context, input UpdateInvoiceLineInput) (*Line, error)

DeleteInvoiceLine(ctx context.Context, input DeleteInvoiceLineInput) error
}

Expand All @@ -48,4 +50,11 @@ type InvoiceService interface {
ApproveInvoice(ctx context.Context, input ApproveInvoiceInput) (Invoice, error)
RetryInvoice(ctx context.Context, input RetryInvoiceInput) (Invoice, error)
DeleteInvoice(ctx context.Context, input DeleteInvoiceInput) error

// UpdateInvoiceLinesInternal updates the specified invoice lines and ensures that invoice states are properly syncronized
// This method is intended to be used by OpenMeter internal services only, as it allows for updating invoice line values,
// that are not allowed to be updated by external services.
//
// The call also ensures that the invoice's state is properly updated and invoice immutability is also considered.
UpdateInvoiceLinesInternal(ctx context.Context, input UpdateInvoiceLinesInternalInput) error
}
Loading

0 comments on commit 245fdd5

Please sign in to comment.