Skip to content

Commit

Permalink
Feat: llm token account service (#5202)
Browse files Browse the repository at this point in the history
* add get account with workspace id

* add charge billing

* add ReconcileUnsettledLLMBilling auto reconcile

* add HourlyLLMBillingArchive

* custom startReconcileLLMBilling ticker time

* Process timer tasks through queue tasks to prevent task accumulation from causing the timer to miss execution opportunities.

* Create a buffered channel interrupt and use the signal.

* reconcile user active task reward with clause Locking.

* change signal notify to SIGTERM

* add account service metrics
  • Loading branch information
bxy4543 authored Nov 20, 2024
1 parent af26adb commit 3cd3d6d
Show file tree
Hide file tree
Showing 20 changed files with 1,179 additions and 173 deletions.
5 changes: 3 additions & 2 deletions controllers/account/api/v1/account_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ type (
const (
// Consumption 消费
Consumption common.Type = iota
// Recharge 充值
Recharge
//Subconsumption 子消费
SubConsumption

TransferIn
TransferOut
ActivityGiving
Expand Down
169 changes: 112 additions & 57 deletions controllers/pkg/database/cockroach/accountv2.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,40 +222,56 @@ func (c *Cockroach) getFirstRechargePayments(ops *types.UserQueryOpts) ([]types.
}

func (c *Cockroach) ProcessPendingTaskRewards() error {
userTasks, err := c.getPendingRewardUserTask()
if err != nil {
return fmt.Errorf("failed to get pending reward user task: %w", err)
}
tasks, err := c.getTask()
if err != nil {
return fmt.Errorf("failed to get tasks: %w", err)
}
for i := range userTasks {
err = c.DB.Transaction(func(tx *gorm.DB) error {
task := tasks[userTasks[i].TaskID]
for {
var userTask types.UserTask
err := c.DB.Transaction(func(tx *gorm.DB) error {
if err := tx.Clauses(clause.Locking{
Strength: "UPDATE",
Options: "SKIP LOCKED",
}).Where(&types.UserTask{
Status: types.TaskStatusCompleted,
RewardStatus: types.TaskStatusNotCompleted,
}).First(&userTask).Error; err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return err
}
return fmt.Errorf("failed to get pending reward user task: %w", err)
}
tasks, err := c.getTask()
if err != nil {
return fmt.Errorf("failed to get tasks: %w", err)
}

task := tasks[userTask.TaskID]
if task.Reward == 0 {
fmt.Printf("usertask %v reward is 0, skip\n", userTasks[i])
fmt.Printf("usertask %v reward is 0, skip\n", userTask)
return nil
}
if err = c.updateBalanceRaw(tx, &types.UserQueryOpts{UID: userTasks[i].UserUID}, task.Reward, false, true, true); err != nil {
if err = c.updateBalanceRaw(tx, &types.UserQueryOpts{UID: userTask.UserUID}, task.Reward, false, true, true); err != nil {
return fmt.Errorf("failed to update balance: %w", err)
}
msg := fmt.Sprintf("task %s reward", task.Title)
transaction := types.AccountTransaction{
Balance: task.Reward,
Type: string(task.TaskType) + "_Reward",
UserUID: userTasks[i].UserUID,
UserUID: userTask.UserUID,
ID: uuid.New(),
Message: &msg,
BillingID: userTasks[i].ID,
BillingID: userTask.ID,
}
if err = tx.Save(&transaction).Error; err != nil {
if err = tx.Create(&transaction).Error; err != nil {
return fmt.Errorf("failed to save transaction: %w", err)
}
return c.completeRewardUserTask(tx, &userTasks[i])
if err = tx.Model(&userTask).Update("rewardStatus", types.TaskStatusCompleted).Error; err != nil {
return fmt.Errorf("failed to update user task status: %w", err)
}
return nil
})
if errors.Is(err, gorm.ErrRecordNotFound) {
break
}
if err != nil {
return fmt.Errorf("failed to process reward pending user task %v rewards: %w", userTasks[i], err)
return err
}
}
return nil
Expand All @@ -276,17 +292,6 @@ func (c *Cockroach) getTask() (map[uuid.UUID]types.Task, error) {
return c.tasks, nil
}

func (c *Cockroach) getPendingRewardUserTask() ([]types.UserTask, error) {
var userTasks []types.UserTask
return userTasks, c.DB.Where(&types.UserTask{Status: types.TaskStatusCompleted, RewardStatus: types.TaskStatusNotCompleted}).
Find(&userTasks).Error
}

func (c *Cockroach) completeRewardUserTask(tx *gorm.DB, userTask *types.UserTask) error {
userTask.RewardStatus = types.TaskStatusCompleted
return tx.Model(userTask).Update("rewardStatus", types.TaskStatusCompleted).Error
}

func (c *Cockroach) GetUserCr(ops *types.UserQueryOpts) (*types.RegionUserCr, error) {
if ops.UID == uuid.Nil && ops.Owner == "" {
if ops.ID == "" {
Expand All @@ -311,6 +316,46 @@ func (c *Cockroach) GetUserCr(ops *types.UserQueryOpts) (*types.RegionUserCr, er
return &userCr, nil
}

func (c *Cockroach) GetAccountWithWorkspace(workspace string) (*types.Account, error) {
if workspace == "" {
return nil, fmt.Errorf("empty workspace")
}
var userUIDString string
err := c.Localdb.Table("Workspace").
Select(`"UserCr"."userUid"`).
Joins(`JOIN "UserWorkspace" ON "Workspace".uid = "UserWorkspace"."workspaceUid"`).
Joins(`JOIN "UserCr" ON "UserWorkspace"."userCrUid" = "UserCr".uid`).
Where(`"Workspace".id = ?`, workspace).
Where(`"UserWorkspace".role = ?`, "OWNER").
Limit(1).
Scan(&userUIDString).Error

if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, fmt.Errorf("not found user uid with workspace %s", workspace)
}
return nil, fmt.Errorf("failed to get user uid with workspace %s: %v", workspace, err)
}

userUID, err := uuid.Parse(userUIDString)
if err != nil {
return nil, fmt.Errorf("failed to parse user uid %s: %v", userUIDString, err)
}
if userUID == uuid.Nil {
return nil, fmt.Errorf("empty user uid")
}

var account types.Account
err = c.DB.Where(&types.Account{UserUID: userUID}).First(&account).Error
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, fmt.Errorf("not found account with user uid %s", userUID)
}
return nil, fmt.Errorf("failed to get account with user uid %s: %v", userUID, err)
}
return &account, nil
}

func (c *Cockroach) GetUserUID(ops *types.UserQueryOpts) (uuid.UUID, error) {
if ops.UID != uuid.Nil {
return ops.UID, nil
Expand Down Expand Up @@ -492,37 +537,33 @@ func (c *Cockroach) updateBalanceRaw(tx *gorm.DB, ops *types.UserQueryOpts, amou
}
ops.UID = user.UserUID
}
var account = &types.Account{}
if err := tx.Where(&types.Account{UserUID: ops.UID}).First(account).Error; err != nil {
// if not found, create a new account and retry
if !errors.Is(err, gorm.ErrRecordNotFound) {
return fmt.Errorf("failed to get account: %w", err)
}
if account, err = c.NewAccount(ops); err != nil {
return fmt.Errorf("failed to create account: %v", err)
}
return c.updateWithAccount(ops.UID, isDeduction, add, isActive, amount, tx)
}

func (c *Cockroach) updateWithAccount(userUID uuid.UUID, isDeduction, add, isActive bool, amount int64, db *gorm.DB) error {
exprs := map[string]interface{}{}
control := "-"
if add {
control = "+"
}
if err := c.updateWithAccount(isDeduction, add, account, amount); err != nil {
return err
if isDeduction {
exprs["deduction_balance"] = gorm.Expr("deduction_balance "+control+" ?", amount)
} else {
exprs["balance"] = gorm.Expr("balance "+control+" ?", amount)
}
if isActive {
account.ActivityBonus = account.ActivityBonus + amount
}
if err := tx.Save(account).Error; err != nil {
return fmt.Errorf("failed to update account balance: %w", err)
exprs[`"activityBonus"`] = gorm.Expr(`"activityBonus" + ?`, amount)
}
return nil
result := db.Model(&types.Account{}).Where(`"userUid" = ?`, userUID).Updates(exprs)
return HandleUpdateResult(result, types.Account{}.TableName())
}

func (c *Cockroach) updateWithAccount(isDeduction bool, add bool, account *types.Account, amount int64) error {
balancePtr := &account.Balance
if isDeduction {
balancePtr = &account.DeductionBalance
func HandleUpdateResult(result *gorm.DB, entityName string) error {
if result.Error != nil {
return fmt.Errorf("failed to update %s: %w", entityName, result.Error)
}
if add {
*balancePtr += amount
} else {
*balancePtr -= amount
if result.RowsAffected == 0 {
return fmt.Errorf("no %s updated", entityName)
}
return nil
}
Expand Down Expand Up @@ -557,6 +598,10 @@ func (c *Cockroach) AddDeductionBalance(ops *types.UserQueryOpts, amount int64)
})
}

func (c *Cockroach) AddDeductionBalanceWithDB(ops *types.UserQueryOpts, amount int64, tx *gorm.DB) error {
return c.updateBalance(tx, ops, amount, true, true)
}

func (c *Cockroach) AddDeductionBalanceWithFunc(ops *types.UserQueryOpts, amount int64, preDo, postDo func() error) error {
return c.DB.Transaction(func(tx *gorm.DB) error {
if err := preDo(); err != nil {
Expand Down Expand Up @@ -996,14 +1041,24 @@ func NewCockRoach(globalURI, localURI string) (*Cockroach, error) {
IgnoreRecordNotFoundError: true,
Colorful: true,
})
db, err := gorm.Open(postgres.Open(globalURI), &gorm.Config{
Logger: dbLogger,
db, err := gorm.Open(postgres.New(postgres.Config{
DSN: globalURI,
PreferSimpleProtocol: true,
}), &gorm.Config{
Logger: dbLogger,
PrepareStmt: true,
TranslateError: true,
})
if err != nil {
return nil, fmt.Errorf("failed to open global url %s : %v", globalURI, err)
}
localdb, err := gorm.Open(postgres.Open(localURI), &gorm.Config{
Logger: dbLogger,
localdb, err := gorm.Open(postgres.New(postgres.Config{
DSN: localURI,
PreferSimpleProtocol: true,
}), &gorm.Config{
Logger: dbLogger,
PrepareStmt: true,
TranslateError: true,
})
if err != nil {
return nil, fmt.Errorf("failed to open local url %s : %v", localURI, err)
Expand Down
17 changes: 17 additions & 0 deletions controllers/pkg/database/cockroach/accountv2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,20 @@ func TestCockroach_GetUserOauthProvider(t *testing.T) {
}
t.Logf("provider: %+v", provider)
}

func TestCockroach_GetAccountWithWorkspace(t *testing.T) {
ck, err := NewCockRoach(os.Getenv("GLOBAL_COCKROACH_URI"), os.Getenv("LOCAL_COCKROACH_URI"))
if err != nil {
t.Errorf("NewCockRoach() error = %v", err)
return
}
defer ck.Close()

account, err := ck.GetAccountWithWorkspace("ns-1c6gn6e0")

if err != nil {
t.Errorf("GetAccountWithWorkspace() error = %v", err)
return
}
t.Logf("account: %+v", account)
}
3 changes: 3 additions & 0 deletions controllers/pkg/database/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"time"

"gorm.io/gorm"

"go.mongodb.org/mongo-driver/bson/primitive"

v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -109,6 +111,7 @@ type AccountV2 interface {
TransferAccount(from, to *types.UserQueryOpts, amount int64) error
TransferAccountAll(from, to *types.UserQueryOpts) error
AddDeductionBalance(user *types.UserQueryOpts, balance int64) error
AddDeductionBalanceWithDB(ops *types.UserQueryOpts, amount int64, tx *gorm.DB) error
AddDeductionBalanceWithFunc(ops *types.UserQueryOpts, amount int64, preDo, postDo func() error) error
}

Expand Down
38 changes: 35 additions & 3 deletions controllers/pkg/resources/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ import (
"strings"
"time"

"go.mongodb.org/mongo-driver/bson/primitive"

"github.com/google/uuid"

"github.com/labring/sealos/controllers/pkg/common"

"github.com/labring/sealos/controllers/pkg/gpu"
Expand Down Expand Up @@ -84,6 +88,29 @@ type Monitor struct {
Property string `json:"property,omitempty" bson:"property,omitempty"`
}

type ActiveBilling struct {
ID primitive.ObjectID `json:"_id,omitempty" bson:"_id,omitempty"`
Time time.Time `json:"time,omitempty" bson:"time"`
Namespace string `json:"namespace" bson:"namespace"`
AppType string `json:"app_type" bson:"app_type"`
AppName string `json:"app_name" bson:"app_name"`
Used UsedMap `json:"used,omitempty" bson:"used,omitempty"`
Amount int64 `json:"amount" bson:"amount,omitempty"`
Owner string `json:"owner" bson:"owner,omitempty"`
UserUID uuid.UUID `json:"user_uid" bson:"user_uid"`
Status ConsumptionStatus `json:"status" bson:"status"`
//Rule string `json:"rule" bson:"rule,omitempty"`
}

type ConsumptionStatus string

const (
Consumed ConsumptionStatus = "consumed"
Processing ConsumptionStatus = "processing"
Unconsumed ConsumptionStatus = "unconsumed"
ErrorConsumed ConsumptionStatus = "error_consumed"
)

type BillingType int

type Billing struct {
Expand All @@ -102,12 +129,13 @@ type Billing struct {
Amount int64 `json:"amount" bson:"amount,omitempty"`
Owner string `json:"owner" bson:"owner,omitempty"`
// 0: 未结算 1: 已结算
Status BillingStatus `json:"status" bson:"status,omitempty"`
Status BillingStatus `json:"status" bson:"status"`
// if type = Consumption, then payment is not nil
Payment *Payment `json:"payment" bson:"payment,omitempty"`
// if type = Transfer, then transfer is not nil
Transfer *Transfer `json:"transfer" bson:"transfer,omitempty"`
Detail string `json:"detail" bson:"detail,omitempty"`
UserUID uuid.UUID `json:"user_uid" bson:"user_uid,omitempty"`
}

type Payment struct {
Expand Down Expand Up @@ -165,6 +193,7 @@ const (
appStore
dbBackup
devBox
llmToken
)

const (
Expand All @@ -178,19 +207,22 @@ const (
AppStore = "APP-STORE"
DBBackup = "DB-BACKUP"
DevBox = "DEV-BOX"
LLMToken = "LLM-TOKEN"
)

var AppType = map[string]uint8{
DB: db, APP: app, TERMINAL: terminal, JOB: job, OTHER: other, ObjectStorage: objectStorage, CVM: cvm, AppStore: appStore, DBBackup: dbBackup, DevBox: devBox,
DB: db, APP: app, TERMINAL: terminal, JOB: job, OTHER: other, ObjectStorage: objectStorage, CVM: cvm, AppStore: appStore, DBBackup: dbBackup, DevBox: devBox, LLMToken: llmToken,
}

var AppTypeReverse = map[uint8]string{
db: DB, app: APP, terminal: TERMINAL, job: JOB, other: OTHER, objectStorage: ObjectStorage, cvm: CVM, appStore: AppStore, dbBackup: DBBackup, devBox: DevBox,
db: DB, app: APP, terminal: TERMINAL, job: JOB, other: OTHER, objectStorage: ObjectStorage, cvm: CVM, appStore: AppStore, dbBackup: DBBackup, devBox: DevBox, llmToken: LLMToken,
}

// resource consumption
type EnumUsedMap map[uint8]int64

type UsedMap map[string]float64

type PropertyType struct {
// For the monitoring storage enumeration type, use uint 8 to save memory
// 0 cpu, 1 memory, 2 storage, 3 network ... expandable
Expand Down
1 change: 1 addition & 0 deletions controllers/pkg/types/dbquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
type UserQueryOpts struct {
UID uuid.UUID
ID string
Namespace string
Owner string
IgnoreEmpty bool
}
Expand Down
3 changes: 1 addition & 2 deletions service/account/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,7 @@ clean:
.PHONY: build
build: ## Build service-hub binary.
LD_FLAGS="-s -w"; \
[ -n "$(CRYPTOKEY)" ] && LD_FLAGS+=" -X github.com/labring/sealos/controllers/pkg/crypto.encryptionKey=${CRYPTOKEY}"; \
CGO_ENABLED=0 GOOS=linux go build -ldflags "$${LD_FLAGS}" -trimpath -o bin/manager main.go
CGO_ENABLED=0 GOOS=linux go build -tags=jsoniter -ldflags "$${LD_FLAGS}" -trimpath -o bin/manager main.go

.PHONY: docker-build
docker-build: build
Expand Down
Loading

0 comments on commit 3cd3d6d

Please sign in to comment.