diff --git a/controllers/account/api/v1/account_types.go b/controllers/account/api/v1/account_types.go index 9d3911168ec..7d3fc498fe6 100644 --- a/controllers/account/api/v1/account_types.go +++ b/controllers/account/api/v1/account_types.go @@ -34,8 +34,9 @@ type ( const ( // Consumption 消费 Consumption common.Type = iota - // Recharge 充值 - Recharge + //Subconsumption 子消费 + SubConsumption + TransferIn TransferOut ActivityGiving diff --git a/controllers/pkg/database/cockroach/accountv2.go b/controllers/pkg/database/cockroach/accountv2.go index 121046d5328..f331269e41d 100644 --- a/controllers/pkg/database/cockroach/accountv2.go +++ b/controllers/pkg/database/cockroach/accountv2.go @@ -222,40 +222,56 @@ func (c *Cockroach) getFirstRechargePayments(ops *types.UserQueryOpts) ([]types. } func (c *Cockroach) ProcessPendingTaskRewards() error { - userTasks, err := c.getPendingRewardUserTask() - if err != nil { - return fmt.Errorf("failed to get pending reward user task: %w", err) - } - tasks, err := c.getTask() - if err != nil { - return fmt.Errorf("failed to get tasks: %w", err) - } - for i := range userTasks { - err = c.DB.Transaction(func(tx *gorm.DB) error { - task := tasks[userTasks[i].TaskID] + for { + var userTask types.UserTask + err := c.DB.Transaction(func(tx *gorm.DB) error { + if err := tx.Clauses(clause.Locking{ + Strength: "UPDATE", + Options: "SKIP LOCKED", + }).Where(&types.UserTask{ + Status: types.TaskStatusCompleted, + RewardStatus: types.TaskStatusNotCompleted, + }).First(&userTask).Error; err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return err + } + return fmt.Errorf("failed to get pending reward user task: %w", err) + } + tasks, err := c.getTask() + if err != nil { + return fmt.Errorf("failed to get tasks: %w", err) + } + + task := tasks[userTask.TaskID] if task.Reward == 0 { - fmt.Printf("usertask %v reward is 0, skip\n", userTasks[i]) + fmt.Printf("usertask %v reward is 0, skip\n", userTask) return nil } - if err = c.updateBalanceRaw(tx, &types.UserQueryOpts{UID: userTasks[i].UserUID}, task.Reward, false, true, true); err != nil { + if err = c.updateBalanceRaw(tx, &types.UserQueryOpts{UID: userTask.UserUID}, task.Reward, false, true, true); err != nil { return fmt.Errorf("failed to update balance: %w", err) } msg := fmt.Sprintf("task %s reward", task.Title) transaction := types.AccountTransaction{ Balance: task.Reward, Type: string(task.TaskType) + "_Reward", - UserUID: userTasks[i].UserUID, + UserUID: userTask.UserUID, ID: uuid.New(), Message: &msg, - BillingID: userTasks[i].ID, + BillingID: userTask.ID, } - if err = tx.Save(&transaction).Error; err != nil { + if err = tx.Create(&transaction).Error; err != nil { return fmt.Errorf("failed to save transaction: %w", err) } - return c.completeRewardUserTask(tx, &userTasks[i]) + if err = tx.Model(&userTask).Update("rewardStatus", types.TaskStatusCompleted).Error; err != nil { + return fmt.Errorf("failed to update user task status: %w", err) + } + return nil }) + if errors.Is(err, gorm.ErrRecordNotFound) { + break + } if err != nil { - return fmt.Errorf("failed to process reward pending user task %v rewards: %w", userTasks[i], err) + return err } } return nil @@ -276,17 +292,6 @@ func (c *Cockroach) getTask() (map[uuid.UUID]types.Task, error) { return c.tasks, nil } -func (c *Cockroach) getPendingRewardUserTask() ([]types.UserTask, error) { - var userTasks []types.UserTask - return userTasks, c.DB.Where(&types.UserTask{Status: types.TaskStatusCompleted, RewardStatus: types.TaskStatusNotCompleted}). - Find(&userTasks).Error -} - -func (c *Cockroach) completeRewardUserTask(tx *gorm.DB, userTask *types.UserTask) error { - userTask.RewardStatus = types.TaskStatusCompleted - return tx.Model(userTask).Update("rewardStatus", types.TaskStatusCompleted).Error -} - func (c *Cockroach) GetUserCr(ops *types.UserQueryOpts) (*types.RegionUserCr, error) { if ops.UID == uuid.Nil && ops.Owner == "" { if ops.ID == "" { @@ -311,6 +316,46 @@ func (c *Cockroach) GetUserCr(ops *types.UserQueryOpts) (*types.RegionUserCr, er return &userCr, nil } +func (c *Cockroach) GetAccountWithWorkspace(workspace string) (*types.Account, error) { + if workspace == "" { + return nil, fmt.Errorf("empty workspace") + } + var userUIDString string + err := c.Localdb.Table("Workspace"). + Select(`"UserCr"."userUid"`). + Joins(`JOIN "UserWorkspace" ON "Workspace".uid = "UserWorkspace"."workspaceUid"`). + Joins(`JOIN "UserCr" ON "UserWorkspace"."userCrUid" = "UserCr".uid`). + Where(`"Workspace".id = ?`, workspace). + Where(`"UserWorkspace".role = ?`, "OWNER"). + Limit(1). + Scan(&userUIDString).Error + + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return nil, fmt.Errorf("not found user uid with workspace %s", workspace) + } + return nil, fmt.Errorf("failed to get user uid with workspace %s: %v", workspace, err) + } + + userUID, err := uuid.Parse(userUIDString) + if err != nil { + return nil, fmt.Errorf("failed to parse user uid %s: %v", userUIDString, err) + } + if userUID == uuid.Nil { + return nil, fmt.Errorf("empty user uid") + } + + var account types.Account + err = c.DB.Where(&types.Account{UserUID: userUID}).First(&account).Error + if err != nil { + if errors.Is(err, gorm.ErrRecordNotFound) { + return nil, fmt.Errorf("not found account with user uid %s", userUID) + } + return nil, fmt.Errorf("failed to get account with user uid %s: %v", userUID, err) + } + return &account, nil +} + func (c *Cockroach) GetUserUID(ops *types.UserQueryOpts) (uuid.UUID, error) { if ops.UID != uuid.Nil { return ops.UID, nil @@ -492,37 +537,33 @@ func (c *Cockroach) updateBalanceRaw(tx *gorm.DB, ops *types.UserQueryOpts, amou } ops.UID = user.UserUID } - var account = &types.Account{} - if err := tx.Where(&types.Account{UserUID: ops.UID}).First(account).Error; err != nil { - // if not found, create a new account and retry - if !errors.Is(err, gorm.ErrRecordNotFound) { - return fmt.Errorf("failed to get account: %w", err) - } - if account, err = c.NewAccount(ops); err != nil { - return fmt.Errorf("failed to create account: %v", err) - } + return c.updateWithAccount(ops.UID, isDeduction, add, isActive, amount, tx) +} + +func (c *Cockroach) updateWithAccount(userUID uuid.UUID, isDeduction, add, isActive bool, amount int64, db *gorm.DB) error { + exprs := map[string]interface{}{} + control := "-" + if add { + control = "+" } - if err := c.updateWithAccount(isDeduction, add, account, amount); err != nil { - return err + if isDeduction { + exprs["deduction_balance"] = gorm.Expr("deduction_balance "+control+" ?", amount) + } else { + exprs["balance"] = gorm.Expr("balance "+control+" ?", amount) } if isActive { - account.ActivityBonus = account.ActivityBonus + amount - } - if err := tx.Save(account).Error; err != nil { - return fmt.Errorf("failed to update account balance: %w", err) + exprs[`"activityBonus"`] = gorm.Expr(`"activityBonus" + ?`, amount) } - return nil + result := db.Model(&types.Account{}).Where(`"userUid" = ?`, userUID).Updates(exprs) + return HandleUpdateResult(result, types.Account{}.TableName()) } -func (c *Cockroach) updateWithAccount(isDeduction bool, add bool, account *types.Account, amount int64) error { - balancePtr := &account.Balance - if isDeduction { - balancePtr = &account.DeductionBalance +func HandleUpdateResult(result *gorm.DB, entityName string) error { + if result.Error != nil { + return fmt.Errorf("failed to update %s: %w", entityName, result.Error) } - if add { - *balancePtr += amount - } else { - *balancePtr -= amount + if result.RowsAffected == 0 { + return fmt.Errorf("no %s updated", entityName) } return nil } @@ -557,6 +598,10 @@ func (c *Cockroach) AddDeductionBalance(ops *types.UserQueryOpts, amount int64) }) } +func (c *Cockroach) AddDeductionBalanceWithDB(ops *types.UserQueryOpts, amount int64, tx *gorm.DB) error { + return c.updateBalance(tx, ops, amount, true, true) +} + func (c *Cockroach) AddDeductionBalanceWithFunc(ops *types.UserQueryOpts, amount int64, preDo, postDo func() error) error { return c.DB.Transaction(func(tx *gorm.DB) error { if err := preDo(); err != nil { @@ -996,14 +1041,24 @@ func NewCockRoach(globalURI, localURI string) (*Cockroach, error) { IgnoreRecordNotFoundError: true, Colorful: true, }) - db, err := gorm.Open(postgres.Open(globalURI), &gorm.Config{ - Logger: dbLogger, + db, err := gorm.Open(postgres.New(postgres.Config{ + DSN: globalURI, + PreferSimpleProtocol: true, + }), &gorm.Config{ + Logger: dbLogger, + PrepareStmt: true, + TranslateError: true, }) if err != nil { return nil, fmt.Errorf("failed to open global url %s : %v", globalURI, err) } - localdb, err := gorm.Open(postgres.Open(localURI), &gorm.Config{ - Logger: dbLogger, + localdb, err := gorm.Open(postgres.New(postgres.Config{ + DSN: localURI, + PreferSimpleProtocol: true, + }), &gorm.Config{ + Logger: dbLogger, + PrepareStmt: true, + TranslateError: true, }) if err != nil { return nil, fmt.Errorf("failed to open local url %s : %v", localURI, err) diff --git a/controllers/pkg/database/cockroach/accountv2_test.go b/controllers/pkg/database/cockroach/accountv2_test.go index 15abd30817f..751b584a01e 100644 --- a/controllers/pkg/database/cockroach/accountv2_test.go +++ b/controllers/pkg/database/cockroach/accountv2_test.go @@ -47,3 +47,20 @@ func TestCockroach_GetUserOauthProvider(t *testing.T) { } t.Logf("provider: %+v", provider) } + +func TestCockroach_GetAccountWithWorkspace(t *testing.T) { + ck, err := NewCockRoach(os.Getenv("GLOBAL_COCKROACH_URI"), os.Getenv("LOCAL_COCKROACH_URI")) + if err != nil { + t.Errorf("NewCockRoach() error = %v", err) + return + } + defer ck.Close() + + account, err := ck.GetAccountWithWorkspace("ns-1c6gn6e0") + + if err != nil { + t.Errorf("GetAccountWithWorkspace() error = %v", err) + return + } + t.Logf("account: %+v", account) +} diff --git a/controllers/pkg/database/interface.go b/controllers/pkg/database/interface.go index 53bd72d7284..36e474a41ea 100644 --- a/controllers/pkg/database/interface.go +++ b/controllers/pkg/database/interface.go @@ -18,6 +18,8 @@ import ( "context" "time" + "gorm.io/gorm" + "go.mongodb.org/mongo-driver/bson/primitive" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -109,6 +111,7 @@ type AccountV2 interface { TransferAccount(from, to *types.UserQueryOpts, amount int64) error TransferAccountAll(from, to *types.UserQueryOpts) error AddDeductionBalance(user *types.UserQueryOpts, balance int64) error + AddDeductionBalanceWithDB(ops *types.UserQueryOpts, amount int64, tx *gorm.DB) error AddDeductionBalanceWithFunc(ops *types.UserQueryOpts, amount int64, preDo, postDo func() error) error } diff --git a/controllers/pkg/resources/resources.go b/controllers/pkg/resources/resources.go index ffe94d3267d..4452acd08e9 100644 --- a/controllers/pkg/resources/resources.go +++ b/controllers/pkg/resources/resources.go @@ -18,6 +18,10 @@ import ( "strings" "time" + "go.mongodb.org/mongo-driver/bson/primitive" + + "github.com/google/uuid" + "github.com/labring/sealos/controllers/pkg/common" "github.com/labring/sealos/controllers/pkg/gpu" @@ -84,6 +88,29 @@ type Monitor struct { Property string `json:"property,omitempty" bson:"property,omitempty"` } +type ActiveBilling struct { + ID primitive.ObjectID `json:"_id,omitempty" bson:"_id,omitempty"` + Time time.Time `json:"time,omitempty" bson:"time"` + Namespace string `json:"namespace" bson:"namespace"` + AppType string `json:"app_type" bson:"app_type"` + AppName string `json:"app_name" bson:"app_name"` + Used UsedMap `json:"used,omitempty" bson:"used,omitempty"` + Amount int64 `json:"amount" bson:"amount,omitempty"` + Owner string `json:"owner" bson:"owner,omitempty"` + UserUID uuid.UUID `json:"user_uid" bson:"user_uid"` + Status ConsumptionStatus `json:"status" bson:"status"` + //Rule string `json:"rule" bson:"rule,omitempty"` +} + +type ConsumptionStatus string + +const ( + Consumed ConsumptionStatus = "consumed" + Processing ConsumptionStatus = "processing" + Unconsumed ConsumptionStatus = "unconsumed" + ErrorConsumed ConsumptionStatus = "error_consumed" +) + type BillingType int type Billing struct { @@ -102,12 +129,13 @@ type Billing struct { Amount int64 `json:"amount" bson:"amount,omitempty"` Owner string `json:"owner" bson:"owner,omitempty"` // 0: 未结算 1: 已结算 - Status BillingStatus `json:"status" bson:"status,omitempty"` + Status BillingStatus `json:"status" bson:"status"` // if type = Consumption, then payment is not nil Payment *Payment `json:"payment" bson:"payment,omitempty"` // if type = Transfer, then transfer is not nil Transfer *Transfer `json:"transfer" bson:"transfer,omitempty"` Detail string `json:"detail" bson:"detail,omitempty"` + UserUID uuid.UUID `json:"user_uid" bson:"user_uid,omitempty"` } type Payment struct { @@ -165,6 +193,7 @@ const ( appStore dbBackup devBox + llmToken ) const ( @@ -178,19 +207,22 @@ const ( AppStore = "APP-STORE" DBBackup = "DB-BACKUP" DevBox = "DEV-BOX" + LLMToken = "LLM-TOKEN" ) var AppType = map[string]uint8{ - DB: db, APP: app, TERMINAL: terminal, JOB: job, OTHER: other, ObjectStorage: objectStorage, CVM: cvm, AppStore: appStore, DBBackup: dbBackup, DevBox: devBox, + DB: db, APP: app, TERMINAL: terminal, JOB: job, OTHER: other, ObjectStorage: objectStorage, CVM: cvm, AppStore: appStore, DBBackup: dbBackup, DevBox: devBox, LLMToken: llmToken, } var AppTypeReverse = map[uint8]string{ - db: DB, app: APP, terminal: TERMINAL, job: JOB, other: OTHER, objectStorage: ObjectStorage, cvm: CVM, appStore: AppStore, dbBackup: DBBackup, devBox: DevBox, + db: DB, app: APP, terminal: TERMINAL, job: JOB, other: OTHER, objectStorage: ObjectStorage, cvm: CVM, appStore: AppStore, dbBackup: DBBackup, devBox: DevBox, llmToken: LLMToken, } // resource consumption type EnumUsedMap map[uint8]int64 +type UsedMap map[string]float64 + type PropertyType struct { // For the monitoring storage enumeration type, use uint 8 to save memory // 0 cpu, 1 memory, 2 storage, 3 network ... expandable diff --git a/controllers/pkg/types/dbquery.go b/controllers/pkg/types/dbquery.go index 9d057f48a06..61cf6dc9dae 100644 --- a/controllers/pkg/types/dbquery.go +++ b/controllers/pkg/types/dbquery.go @@ -23,6 +23,7 @@ import ( type UserQueryOpts struct { UID uuid.UUID ID string + Namespace string Owner string IgnoreEmpty bool } diff --git a/service/account/Makefile b/service/account/Makefile index 9754adf8fb8..5a784626190 100644 --- a/service/account/Makefile +++ b/service/account/Makefile @@ -44,8 +44,7 @@ clean: .PHONY: build build: ## Build service-hub binary. LD_FLAGS="-s -w"; \ - [ -n "$(CRYPTOKEY)" ] && LD_FLAGS+=" -X github.com/labring/sealos/controllers/pkg/crypto.encryptionKey=${CRYPTOKEY}"; \ - CGO_ENABLED=0 GOOS=linux go build -ldflags "$${LD_FLAGS}" -trimpath -o bin/manager main.go + CGO_ENABLED=0 GOOS=linux go build -tags=jsoniter -ldflags "$${LD_FLAGS}" -trimpath -o bin/manager main.go .PHONY: docker-build docker-build: build diff --git a/service/account/api/admin.go b/service/account/api/admin.go new file mode 100644 index 00000000000..04d6a319ac4 --- /dev/null +++ b/service/account/api/admin.go @@ -0,0 +1,117 @@ +package api + +import ( + "fmt" + "net/http" + + "github.com/gin-gonic/gin" + "github.com/labring/sealos/service/account/dao" + "github.com/labring/sealos/service/account/helper" +) + +// GetAccount +// @Summary Get user account +// @Description Get user account +// @Tags Account +// @Accept json +// @Produce json +// @Success 200 {object} map[string]interface{} "successfully retrieved user account" +// @Failure 401 {object} map[string]interface{} "authenticate error" +// @Failure 500 {object} map[string]interface{} "failed to get user account" +// @Router /admin/v1alpha1/account [get] +func AdminGetAccountWithWorkspaceID(c *gin.Context) { + err := authenticateAdminRequest(c) + if err != nil { + c.JSON(http.StatusUnauthorized, helper.ErrorMessage{Error: fmt.Sprintf("authenticate error : %v", err)}) + return + } + workspace, exist := c.GetQuery("namespace") + if !exist || workspace == "" { + c.JSON(http.StatusBadRequest, helper.ErrorMessage{Error: "empty workspace"}) + return + } + account, err := dao.DBClient.GetAccountWithWorkspace(workspace) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("failed to get account : %v", err)}) + return + } + c.JSON(http.StatusOK, gin.H{ + "userUID": account.UserUID, + "balance": account.Balance - account.DeductionBalance, + }) +} + +// ChargeBilling +// @Summary Charge billing +// @Description Charge billing +// @Tags Account +// @Accept json +// @Produce json +// @Success 200 {object} map[string]interface{} "successfully charged billing" +// @Failure 401 {object} map[string]interface{} "authenticate error" +// @Failure 500 {object} map[string]interface{} "failed to charge billing" +// @Router /admin/v1alpha1/charge [post] +func AdminChargeBilling(c *gin.Context) { + err := authenticateAdminRequest(c) + if err != nil { + c.JSON(http.StatusUnauthorized, helper.ErrorMessage{Error: fmt.Sprintf("authenticate error : %v", err)}) + return + } + billingReq, err := helper.ParseAdminChargeBillingReq(c) + if err != nil { + c.JSON(http.StatusBadRequest, helper.ErrorMessage{Error: fmt.Sprintf("failed to parse request : %v", err)}) + return + } + helper.CallCounter.WithLabelValues("ChargeBilling", billingReq.UserUID.String()).Inc() + err = dao.DBClient.ChargeBilling(billingReq) + if err != nil { + c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("failed to charge billing : %v", err)}) + return + } + c.JSON(http.StatusOK, gin.H{ + "message": "successfully charged billing", + }) +} + +// ActiveBilling +// @Summary Active billing +// @Description Active billing +// @Tags Account +// @Accept json +// @Produce json +// @Success 200 {object} map[string]interface{} "successfully activated billing" +// @Failure 401 {object} map[string]interface{} "authenticate error" +// @Failure 500 {object} map[string]interface{} "failed to activate billing" +// @Router /admin/v1alpha1/active [post] +//func AdminActiveBilling(c *gin.Context) { +// err := authenticateAdminRequest(c) +// if err != nil { +// c.JSON(http.StatusUnauthorized, helper.ErrorMessage{Error: fmt.Sprintf("authenticate error : %v", err)}) +// return +// } +// billingReq, err := dao.ParseAdminActiveBillingReq(c) +// if err != nil { +// c.JSON(http.StatusBadRequest, helper.ErrorMessage{Error: fmt.Sprintf("failed to parse request : %v", err)}) +// return +// } +// dao.ActiveBillingTask.AddTask(billingReq) +// c.JSON(http.StatusOK, gin.H{ +// "message": "successfully activated billing", +// }) +//} + +const AdminUserName = "sealos-admin" + +func authenticateAdminRequest(c *gin.Context) error { + user, err := dao.JwtMgr.ParseUser(c) + if err != nil { + return fmt.Errorf("failed to parse user: %v", err) + } + if user == nil { + return fmt.Errorf("user not found") + } + if user.Requester != AdminUserName { + return fmt.Errorf("user is not admin") + } + return nil +} diff --git a/service/account/dao/active.go b/service/account/dao/active.go new file mode 100644 index 00000000000..eba82e10037 --- /dev/null +++ b/service/account/dao/active.go @@ -0,0 +1,50 @@ +package dao + +import ( + "fmt" + "time" + + "github.com/gin-gonic/gin" + "github.com/labring/sealos/controllers/pkg/resources" +) + +type ActiveBillingReq struct { + resources.ActiveBilling +} + +func (a *ActiveBillingReq) Execute() error { + return DBClient.ActiveBilling(a.ActiveBilling) +} + +func ParseAdminActiveBillingReq(c *gin.Context) (*ActiveBillingReq, error) { + activeBilling := &ActiveBillingReq{} + if activeBilling.Time.IsZero() { + activeBilling.Time = time.Now().UTC() + } + if err := c.ShouldBindJSON(activeBilling); err != nil { + return nil, fmt.Errorf("bind json error: %v", err) + } + return activeBilling, nil +} + +type ActiveBillingReconcile struct { + StartTime, EndTime time.Time +} + +func (a *ActiveBillingReconcile) Execute() error { + if err := DBClient.ReconcileActiveBilling(a.StartTime, a.EndTime); err != nil { + return fmt.Errorf("reconcile active billing error: %v", err) + } + return nil +} + +type ArchiveBillingReconcile struct { + StartTime time.Time +} + +func (a *ArchiveBillingReconcile) Execute() error { + if err := DBClient.ArchiveHourlyBilling(a.StartTime, a.StartTime.Add(time.Hour)); err != nil { + return fmt.Errorf("archive hourly billing error: %v", err) + } + return nil +} diff --git a/service/account/dao/init.go b/service/account/dao/init.go index e95f519f7d1..7b2ff2eeda0 100644 --- a/service/account/dao/init.go +++ b/service/account/dao/init.go @@ -1,10 +1,13 @@ package dao import ( + "context" "fmt" "os" "time" + "github.com/labring/sealos/controllers/pkg/utils/env" + "github.com/goccy/go-json" "github.com/labring/sealos/service/account/helper" @@ -25,13 +28,15 @@ type Region struct { } var ( - DBClient Interface - JwtMgr *helper.JWTManager - Cfg *Config - Debug bool + DBClient Interface + JwtMgr *helper.JWTManager + Cfg *Config + BillingTask *helper.TaskQueue + Debug bool ) -func InitDB() error { +func Init(ctx context.Context) error { + BillingTask = helper.NewTaskQueue(ctx, env.GetIntEnvWithDefault("ACTIVE_BILLING_TASK_WORKER_COUNT", 10), env.GetIntEnvWithDefault("ACTIVE_BILLING_TASK_QUEUE_SIZE", 10000)) var err error globalCockroach := os.Getenv(helper.ENVGlobalCockroach) if globalCockroach == "" { diff --git a/service/account/dao/interface.go b/service/account/dao/interface.go index 4aa2af419bd..ebf798c9112 100644 --- a/service/account/dao/interface.go +++ b/service/account/dao/interface.go @@ -7,6 +7,10 @@ import ( "strings" "time" + "github.com/sirupsen/logrus" + + accountv1 "github.com/labring/sealos/controllers/account/api/v1" + "gorm.io/gorm" gonanoid "github.com/matoous/go-nanoid/v2" @@ -23,6 +27,7 @@ import ( "go.mongodb.org/mongo-driver/mongo/options" + "github.com/google/uuid" "github.com/labring/sealos/service/account/helper" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" @@ -30,9 +35,11 @@ import ( type Interface interface { GetBillingHistoryNamespaceList(req *helper.NamespaceBillingHistoryReq) ([][]string, error) + GetAccountWithWorkspace(workspace string) (*types.Account, error) GetProperties() ([]common.PropertyQuery, error) GetCosts(req helper.ConsumptionRecordReq) (common.TimeCostsMap, error) GetAppCosts(req *helper.AppCostsReq) (*common.AppCosts, error) + ChargeBilling(req *helper.AdminChargeBillingReq) error GetAppCostTimeRange(req helper.GetCostAppListReq) (helper.TimeRange, error) GetCostOverview(req helper.GetCostAppListReq) (helper.CostOverviewResp, error) GetBasicCostDistribution(req helper.GetCostAppListReq) (map[string]int64, error) @@ -60,6 +67,10 @@ type Interface interface { GetRechargeDiscount(req helper.AuthReq) (helper.RechargeDiscountResp, error) ProcessPendingTaskRewards() error GetUserRealNameInfo(req *helper.GetRealNameInfoReq) (*types.UserRealNameInfo, error) + ReconcileUnsettledLLMBilling(startTime, endTime time.Time) error + ReconcileActiveBilling(startTime, endTime time.Time) error + ArchiveHourlyBilling(hourStart, hourEnd time.Time) error + ActiveBilling(req resources.ActiveBilling) error } type Account struct { @@ -68,11 +79,12 @@ type Account struct { } type MongoDB struct { - Client *mongo.Client - AccountDBName string - BillingConn string - PropertiesConn string - Properties *resources.PropertyTypeLS + Client *mongo.Client + AccountDBName string + BillingConn string + ActiveBillingConn string + PropertiesConn string + Properties *resources.PropertyTypeLS } type Cockroach struct { @@ -87,6 +99,10 @@ func (g *Cockroach) GetAccount(ops types.UserQueryOpts) (*types.Account, error) return account, nil } +func (g *Cockroach) GetAccountWithWorkspace(workspace string) (*types.Account, error) { + return g.ck.GetAccountWithWorkspace(workspace) +} + func (g *Cockroach) GetWorkspaceName(namespaces []string) ([][]string, error) { workspaceList := make([][]string, 0) workspaces, err := g.ck.GetWorkspace(namespaces...) @@ -254,6 +270,52 @@ func (m *MongoDB) GetCosts(req helper.ConsumptionRecordReq) (common.TimeCostsMap return costsMap, nil } +func (m *Account) InitDB() error { + if err := m.ck.InitTables(); err != nil { + return fmt.Errorf("failed to init tables: %v", err) + } + return m.MongoDB.initTables() +} + +func (m *MongoDB) initTables() error { + if exist, err := m.collectionExist(m.AccountDBName, m.ActiveBillingConn); exist || err != nil { + return err + } + indexModel := mongo.IndexModel{ + Keys: bson.D{{Key: "time", Value: 1}}, + Options: options.Index().SetExpireAfterSeconds(30 * 24 * 60 * 60), + } + _, err := m.getActiveBillingCollection().Indexes().CreateOne(context.Background(), indexModel) + if err != nil { + return fmt.Errorf("failed to create index: %v", err) + } + return nil +} + +func (m *MongoDB) collectionExist(dbName, collectionName string) (bool, error) { + // Check if the collection already exists + collections, err := m.Client.Database(dbName).ListCollectionNames(context.Background(), bson.M{"name": collectionName}) + return len(collections) > 0, err +} + +func (m *MongoDB) SaveActiveBillings(billing ...*resources.ActiveBilling) error { + billings := make([]interface{}, len(billing)) + for i, b := range billing { + billings[i] = b + } + _, err := m.getActiveBillingCollection().InsertMany(context.Background(), billings) + return err +} + +func (m *MongoDB) SaveBillings(billing ...*resources.Billing) error { + billings := make([]interface{}, len(billing)) + for i, b := range billing { + billings[i] = b + } + _, err := m.getBillingCollection().InsertMany(context.Background(), billings) + return err +} + func (m *MongoDB) GetAppCosts(req *helper.AppCostsReq) (results *common.AppCosts, rErr error) { if req.Page <= 0 { req.Page = 1 @@ -308,23 +370,11 @@ func (m *MongoDB) GetAppCosts(req *helper.AppCostsReq) (results *common.AppCosts pipeline := mongo.Pipeline{ {{Key: "$match", Value: match}}, {{Key: "$facet", Value: bson.D{ - {Key: "totalRecords", Value: bson.A{ + {Key: "withAppCosts", Value: bson.A{ + bson.D{{Key: "$match", Value: bson.D{{Key: "app_costs", Value: bson.M{"$exists": true}}}}}, bson.D{{Key: "$unwind", Value: "$app_costs"}}, bson.D{{Key: "$match", Value: matchConditions}}, - bson.D{{Key: "$count", Value: "count"}}, - }}, - {Key: "costs", Value: bson.A{ - bson.D{{Key: "$unwind", Value: "$app_costs"}}, - bson.D{{Key: "$match", Value: matchConditions}}, - bson.D{{Key: "$sort", Value: bson.D{ - {Key: "time", Value: -1}, - {Key: "app_costs.name", Value: 1}, - {Key: "_id", Value: 1}, - }}}, - bson.D{{Key: "$skip", Value: (req.Page - 1) * pageSize}}, - bson.D{{Key: "$limit", Value: pageSize}}, bson.D{{Key: "$project", Value: bson.D{ - {Key: "_id", Value: 0}, {Key: "time", Value: 1}, {Key: "order_id", Value: 1}, {Key: "namespace", Value: 1}, @@ -335,6 +385,42 @@ func (m *MongoDB) GetAppCosts(req *helper.AppCostsReq) (results *common.AppCosts {Key: "app_type", Value: "$app_type"}, }}}, }}, + {Key: "withoutAppCosts", Value: bson.A{ + bson.D{{Key: "$match", Value: bson.D{ + {Key: "app_costs", Value: bson.M{"$exists": false}}, + {Key: "app_name", Value: bson.M{"$exists": true}}, + }}}, + bson.D{{Key: "$match", Value: matchConditions}}, + bson.D{{Key: "$project", Value: bson.D{ + {Key: "time", Value: 1}, + {Key: "order_id", Value: 1}, + {Key: "namespace", Value: 1}, + {Key: "used", Value: nil}, + {Key: "used_amount", Value: nil}, + {Key: "amount", Value: 1}, + {Key: "app_name", Value: 1}, + {Key: "app_type", Value: 1}, + }}}, + }}, + }}}, + {{Key: "$project", Value: bson.D{ + {Key: "combined", Value: bson.D{{Key: "$concatArrays", Value: bson.A{"$withAppCosts", "$withoutAppCosts"}}}}, + }}}, + {{Key: "$unwind", Value: "$combined"}}, + {{Key: "$replaceRoot", Value: bson.D{{Key: "newRoot", Value: "$combined"}}}}, + {{Key: "$sort", Value: bson.D{ + {Key: "time", Value: -1}, + {Key: "app_name", Value: 1}, + {Key: "_id", Value: 1}, + }}}, + {{Key: "$facet", Value: bson.D{ + {Key: "totalRecords", Value: bson.A{ + bson.D{{Key: "$count", Value: "count"}}, + }}, + {Key: "costs", Value: bson.A{ + bson.D{{Key: "$skip", Value: (req.Page - 1) * pageSize}}, + bson.D{{Key: "$limit", Value: pageSize}}, + }}, }}}, {{Key: "$project", Value: bson.D{ {Key: "total_records", Value: bson.D{{Key: "$arrayElemAt", Value: bson.A{"$totalRecords.count", 0}}}}, @@ -550,7 +636,7 @@ func (m *MongoDB) getTotalAppCost(req helper.GetCostAppListReq, app helper.CostA req.StartTime = time.Now().UTC().Add(-time.Hour * 24 * 30) req.EndTime = time.Now().UTC() } - match := bson.M{ + subConsumptionMatch := bson.M{ "owner": owner, "namespace": namespace, "app_costs.name": appName, @@ -560,7 +646,7 @@ func (m *MongoDB) getTotalAppCost(req helper.GetCostAppListReq, app helper.CostA "$lte": req.EndTime, }, } - appStoreMatch := bson.M{ + consumptionMatch := bson.M{ "owner": owner, "namespace": namespace, "app_name": appName, @@ -572,10 +658,10 @@ func (m *MongoDB) getTotalAppCost(req helper.GetCostAppListReq, app helper.CostA } var pipeline mongo.Pipeline - if appType == resources.AppType[resources.AppStore] { - // If appType is 8, match app_name and app_type directly + if appType == resources.AppType[resources.AppStore] || appType == resources.AppType[resources.LLMToken] { + // If appType is app-store || llm-token, match app_name and app_type directly pipeline = mongo.Pipeline{ - {{Key: "$match", Value: appStoreMatch}}, + {{Key: "$match", Value: consumptionMatch}}, {{Key: "$group", Value: bson.D{ {Key: "_id", Value: nil}, {Key: "totalAmount", Value: bson.D{{Key: "$sum", Value: "$amount"}}}, @@ -584,7 +670,7 @@ func (m *MongoDB) getTotalAppCost(req helper.GetCostAppListReq, app helper.CostA } else { // Otherwise, match inside app_costs pipeline = mongo.Pipeline{ - {{Key: "$match", Value: match}}, + {{Key: "$match", Value: subConsumptionMatch}}, {{Key: "$unwind", Value: "$app_costs"}}, {{Key: "$match", Value: bson.D{ {Key: "app_costs.name", Value: appName}, @@ -618,9 +704,6 @@ func (m *MongoDB) getTotalAppCost(req helper.GetCostAppListReq, app helper.CostA } func (m *MongoDB) GetCostAppList(req helper.GetCostAppListReq) (resp helper.CostAppListResp, rErr error) { - var ( - result []helper.CostApp - ) if req.PageSize <= 0 { req.PageSize = 10 } @@ -630,8 +713,8 @@ func (m *MongoDB) GetCostAppList(req helper.GetCostAppListReq) (resp helper.Cost pageSize := req.PageSize if strings.ToUpper(req.AppType) != resources.AppStore { match := bson.M{ - "owner": req.Owner, - // Exclude app store + "owner": req.Owner, + "type": accountv1.Consumption, "app_type": bson.M{"$ne": resources.AppType[resources.AppStore]}, } if req.Namespace != "" { @@ -640,9 +723,6 @@ func (m *MongoDB) GetCostAppList(req helper.GetCostAppListReq) (resp helper.Cost if req.AppType != "" { match["app_type"] = resources.AppType[strings.ToUpper(req.AppType)] } - if req.AppName != "" { - match["app_costs.name"] = req.AppName - } if req.StartTime.IsZero() { req.StartTime = time.Now().UTC().Add(-time.Hour * 24 * 30) req.EndTime = time.Now().UTC() @@ -651,52 +731,71 @@ func (m *MongoDB) GetCostAppList(req helper.GetCostAppListReq) (resp helper.Cost "$gte": req.StartTime, "$lte": req.EndTime, } - sort := bson.E{Key: "$sort", Value: bson.D{ - {Key: "time", Value: -1}, - }} + pipeline := mongo.Pipeline{ {{Key: "$match", Value: match}}, - {{Key: "$unwind", Value: "$app_costs"}}, - {{Key: "$match", Value: bson.D{ - {Key: "app_costs.name", Value: req.AppName}, + {{Key: "$facet", Value: bson.D{ + {Key: "withAppCosts", Value: bson.A{ + bson.D{{Key: "$match", Value: bson.D{{Key: "app_costs", Value: bson.M{"$exists": true}}}}}, + bson.D{{Key: "$unwind", Value: "$app_costs"}}, + bson.D{{Key: "$group", Value: bson.D{ + {Key: "_id", Value: bson.D{ + {Key: "app_type", Value: "$app_type"}, + {Key: "app_name", Value: "$app_costs.name"}, + {Key: "namespace", Value: "$namespace"}, + {Key: "owner", Value: "$owner"}, + }}, + }}}, + }}, + {Key: "withoutAppCosts", Value: bson.A{ + bson.D{{Key: "$match", Value: bson.D{ + {Key: "app_costs", Value: bson.M{"$exists": false}}, + {Key: "app_name", Value: bson.M{"$exists": true}}, + }}}, + bson.D{{Key: "$group", Value: bson.D{ + {Key: "_id", Value: bson.D{ + {Key: "app_type", Value: "$app_type"}, + {Key: "app_name", Value: "$app_name"}, + {Key: "namespace", Value: "$namespace"}, + {Key: "owner", Value: "$owner"}, + }}, + }}}, + }}, + }}}, + {{Key: "$project", Value: bson.D{ + {Key: "combined", Value: bson.D{{Key: "$concatArrays", Value: bson.A{"$withAppCosts", "$withoutAppCosts"}}}}, }}}, - {sort}, + {{Key: "$unwind", Value: "$combined"}}, + {{Key: "$replaceRoot", Value: bson.D{{Key: "newRoot", Value: "$combined._id"}}}}, } - if req.AppName == "" { - pipeline = mongo.Pipeline{ - {{Key: "$match", Value: match}}, - {{Key: "$unwind", Value: "$app_costs"}}, - {sort}, - } + + if req.AppName != "" { + pipeline = append(pipeline, bson.D{{Key: "$match", Value: bson.D{ + {Key: "app_name", Value: req.AppName}, + }}}) } - pipeline = append(pipeline, mongo.Pipeline{ - {{Key: "$group", Value: bson.D{ - {Key: "_id", Value: bson.D{ - {Key: "app_type", Value: "$app_type"}, - {Key: "app_name", Value: "$app_costs.name"}, - {Key: "namespace", Value: "$namespace"}, - {Key: "owner", Value: "$owner"}, - }}, - {Key: "count", Value: bson.D{{Key: "$sum", Value: 1}}}, // 添加一个计数字段 - }}}, - {{Key: "$project", Value: bson.D{ + pipeline = append(pipeline, bson.D{ + {Key: "$project", Value: bson.D{ {Key: "_id", Value: 0}, - {Key: "namespace", Value: "$_id.namespace"}, - {Key: "appType", Value: "$_id.app_type"}, - {Key: "owner", Value: "$_id.owner"}, - {Key: "appName", Value: "$_id.app_name"}, - }}}, - }...) + {Key: "namespace", Value: "$namespace"}, + {Key: "appType", Value: "$app_type"}, + {Key: "owner", Value: "$owner"}, + {Key: "appName", Value: "$app_name"}, + }}, + }) - limitPipeline := append(pipeline, bson.D{{Key: "$skip", Value: (req.Page - 1) * req.PageSize}}, bson.D{{Key: "$limit", Value: req.PageSize}}) + pipeline = append(pipeline, bson.D{{Key: "$sort", Value: bson.D{ + {Key: "appName", Value: 1}, + {Key: "appType", Value: -1}, + {Key: "namespace", Value: 1}, + {Key: "amount", Value: 1}, + }}}) countPipeline := append(pipeline, bson.D{{Key: "$count", Value: "total"}}) - countCursor, err := m.getBillingCollection().Aggregate(context.Background(), countPipeline) if err != nil { - rErr = fmt.Errorf("failed to execute count aggregate query: %w", err) - return + return resp, fmt.Errorf("failed to execute count aggregate query: %w", err) } defer countCursor.Close(context.Background()) @@ -705,43 +804,45 @@ func (m *MongoDB) GetCostAppList(req helper.GetCostAppListReq) (resp helper.Cost Total int64 `bson:"total"` } if err := countCursor.Decode(&countResult); err != nil { - rErr = fmt.Errorf("failed to decode count result: %w", err) - return + return resp, fmt.Errorf("failed to decode count result: %w", err) } resp.Total = countResult.Total } + pipeline = append(pipeline, + bson.D{{Key: "$skip", Value: (req.Page - 1) * pageSize}}, + bson.D{{Key: "$limit", Value: pageSize}}, + ) - cursor, err := m.getBillingCollection().Aggregate(context.Background(), limitPipeline) + cursor, err := m.getBillingCollection().Aggregate(context.Background(), pipeline) if err != nil { - rErr = fmt.Errorf("failed to execute aggregate query: %w", err) - return + return resp, fmt.Errorf("failed to execute aggregate query: %w", err) } defer cursor.Close(context.Background()) + var result []helper.CostApp if err := cursor.All(context.Background(), &result); err != nil { - rErr = fmt.Errorf("failed to decode all billing record: %w", err) - return + return resp, fmt.Errorf("failed to decode all billing record: %w", err) } + resp.Apps = result } appStoreTotal, err := m.getAppStoreTotal(req) if err != nil { - rErr = fmt.Errorf("failed to get app store total: %w", err) - return + return resp, fmt.Errorf("failed to get app store total: %w", err) } if req.AppType == "" || strings.ToUpper(req.AppType) == resources.AppStore { - currentAppPageIsFull := len(result) == req.PageSize + currentAppPageIsFull := len(resp.Apps) == req.PageSize maxAppPageSize := (resp.Total + int64(req.PageSize) - 1) / int64(req.PageSize) completedNum := calculateComplement(int(resp.Total), req.PageSize) appPageSize := (resp.Total + int64(req.PageSize) - 1) / int64(req.PageSize) + if req.Page == int(maxAppPageSize) { if !currentAppPageIsFull { appStoreResp, err := m.getAppStoreList(req, 0, completedNum) if err != nil { - rErr = fmt.Errorf("failed to get app store list: %w", err) - return + return resp, fmt.Errorf("failed to get app store list: %w", err) } - result = append(result, appStoreResp.Apps...) + resp.Apps = append(resp.Apps, appStoreResp.Apps...) } } else if req.Page > int(maxAppPageSize) { skipPageSize := (req.Page - int(appPageSize) - 1) * req.PageSize @@ -750,16 +851,14 @@ func (m *MongoDB) GetCostAppList(req helper.GetCostAppListReq) (resp helper.Cost } appStoreResp, err := m.getAppStoreList(req, completedNum+skipPageSize, req.PageSize) if err != nil { - rErr = fmt.Errorf("failed to get app store list: %w", err) - return + return resp, fmt.Errorf("failed to get app store list: %w", err) } - result = append(result, appStoreResp.Apps...) + resp.Apps = append(resp.Apps, appStoreResp.Apps...) } resp.Total += appStoreTotal } resp.TotalPage = (resp.Total + int64(pageSize) - 1) / int64(pageSize) - resp.Apps = result return resp, nil } @@ -1220,19 +1319,20 @@ func NewAccountInterface(mongoURI, globalCockRoachURI, localCockRoachURI string) return nil, fmt.Errorf("failed to ping mongodb: %v", err) } mongodb := &MongoDB{ - Client: client, - AccountDBName: "sealos-resources", - BillingConn: "billing", - PropertiesConn: "properties", + Client: client, + AccountDBName: "sealos-resources", + BillingConn: "billing", + ActiveBillingConn: "active-billing", + PropertiesConn: "properties", } ck, err := cockroach.NewCockRoach(globalCockRoachURI, localCockRoachURI) if err != nil { return nil, fmt.Errorf("failed to connect cockroach: %v", err) } - if err = ck.InitTables(); err != nil { + account := &Account{MongoDB: mongodb, Cockroach: &Cockroach{ck: ck}} + if err = account.InitDB(); err != nil { return nil, fmt.Errorf("failed to init tables: %v", err) } - account := &Account{MongoDB: mongodb, Cockroach: &Cockroach{ck: ck}} return account, nil } @@ -1247,10 +1347,11 @@ func newAccountForTest(mongoURI, globalCockRoachURI, localCockRoachURI string) ( return nil, fmt.Errorf("failed to ping mongodb: %v", err) } account.MongoDB = &MongoDB{ - Client: client, - AccountDBName: "sealos-resources", - BillingConn: "billing", - PropertiesConn: "properties", + Client: client, + AccountDBName: "sealos-resources", + BillingConn: "billing", + ActiveBillingConn: "active-billing", + PropertiesConn: "properties", } } else { fmt.Printf("mongoURI is empty, skip connecting to mongodb\n") @@ -1338,6 +1439,10 @@ func (m *MongoDB) getMonitorCollection(collTime time.Time) *mongo.Collection { return m.Client.Database(m.AccountDBName).Collection(m.getMonitorCollectionName(collTime)) } +func (m *MongoDB) getActiveBillingCollection() *mongo.Collection { + return m.Client.Database(m.AccountDBName).Collection(m.ActiveBillingConn) +} + func (m *MongoDB) getMonitorCollectionName(collTime time.Time) string { // Calculate the suffix by day, for example, the suffix on the first day of 202012 is 20201201 return fmt.Sprintf("%s_%s", "monitor", collTime.Format("20060102")) @@ -1474,3 +1579,350 @@ func (m *Account) GetUserRealNameInfo(req *helper.GetRealNameInfoReq) (*types.Us return userRealNameInfo, nil } + +func (m *Account) ReconcileActiveBilling(startTime, endTime time.Time) error { + ctx := context.Background() + billings := make(map[uuid.UUID]*billingBatch) + + // Process billings in batches + if err := m.processBillingBatches(ctx, startTime, endTime, billings); err != nil { + helper.ErrorCounter.WithLabelValues("ReconcileActiveBilling", "processBillingBatches", "").Inc() + return fmt.Errorf("failed to process billing batches: %w", err) + } + + // Handle each user's billings + for uid, batch := range billings { + if err := m.reconcileUserBilling(ctx, uid, batch); err != nil { + helper.ErrorCounter.WithLabelValues("ReconcileActiveBilling", "reconcileUserBilling", uid.String()).Inc() + logrus.Errorf("failed to reconcile billing for user %s: %v", uid, err) + continue + } + } + + return nil +} + +type billingBatch struct { + IDs []primitive.ObjectID + Amount int64 +} + +func (m *Account) processBillingBatches(ctx context.Context, startTime, endTime time.Time, billings map[uuid.UUID]*billingBatch) error { + filter := bson.M{ + "time": bson.M{ + "$gte": startTime, + "$lte": endTime, + }, + "status": bson.M{"$nin": []resources.ConsumptionStatus{ + resources.Processing, + resources.Consumed, + }}, + } + + for { + var billing resources.ActiveBilling + err := m.MongoDB.getActiveBillingCollection().FindOneAndUpdate( + ctx, + filter, + bson.M{"$set": bson.M{"status": resources.Processing}}, + options.FindOneAndUpdate(). + SetReturnDocument(options.After). + SetSort(bson.M{"time": 1}), + ).Decode(&billing) + + if err == mongo.ErrNoDocuments { + break + } + // TODO error handling + if err != nil { + logrus.Errorf("failed to find and update billing: %v", err) + continue + } + + batch, ok := billings[billing.UserUID] + if !ok { + batch = &billingBatch{ + IDs: make([]primitive.ObjectID, 0), + Amount: 0, + } + billings[billing.UserUID] = batch + } + batch.IDs = append(batch.IDs, billing.ID) + batch.Amount += billing.Amount + } + + return nil +} + +func (m *Account) reconcileUserBilling(ctx context.Context, uid uuid.UUID, batch *billingBatch) error { + return m.ck.DB.Transaction(func(tx *gorm.DB) error { + // Deduct balance + if err := m.ck.AddDeductionBalanceWithDB(&types.UserQueryOpts{UID: uid}, batch.Amount, tx); err != nil { + return fmt.Errorf("failed to deduct balance: %w", err) + } + + // Update billing status + _, err := m.MongoDB.getActiveBillingCollection().UpdateMany( + ctx, + bson.M{"_id": bson.M{"$in": batch.IDs}}, + bson.M{"$set": bson.M{"status": resources.Consumed}}, + ) + if err != nil { + return fmt.Errorf("failed to update billing status: %w", err) + } + + return nil + }) +} + +func (m *Account) ChargeBilling(req *helper.AdminChargeBillingReq) error { + billing := &resources.ActiveBilling{ + Namespace: req.Namespace, + AppType: req.AppType, + AppName: req.AppName, + Amount: req.Amount, + //Owner: userCr.CrName, + Time: time.Now().UTC(), + Status: resources.Unconsumed, + UserUID: req.UserUID, + } + err := m.MongoDB.SaveActiveBillings(billing) + if err != nil { + return fmt.Errorf("save active monitor failed: %v", err) + } + return nil +} + +func (m *Account) ActiveBilling(req resources.ActiveBilling) error { + return m.ck.DB.Transaction(func(tx *gorm.DB) error { + if err := m.ck.AddDeductionBalanceWithDB(&types.UserQueryOpts{UID: req.UserUID}, req.Amount, tx); err != nil { + helper.ErrorCounter.WithLabelValues("ActiveBilling", "AddDeductionBalanceWithDB", req.UserUID.String()).Inc() + return fmt.Errorf("failed to deduct balance: %v", err) + } + req.Status = resources.Consumed + _, err := m.getActiveBillingCollection().InsertOne(context.Background(), req) + if err != nil { + helper.ErrorCounter.WithLabelValues("ActiveBilling", "InsertOne", req.UserUID.String()).Inc() + return fmt.Errorf("failed to insert (%v) monitor: %v", req, err) + } + return nil + }) +} + +func (m *MongoDB) UpdateBillingStatus(orderID string, status resources.BillingStatus) error { + filter := bson.M{"order_id": orderID} + update := bson.M{ + "$set": bson.M{ + "status": status, + }, + } + _, err := m.getBillingCollection().UpdateOne(context.Background(), filter, update) + if err != nil { + return fmt.Errorf("update error: %v", err) + } + return nil +} + +func (m *Account) ReconcileUnsettledLLMBilling(startTime, endTime time.Time) error { + unsettledAmounts, err := m.MongoDB.reconcileUnsettledLLMBilling(startTime, endTime) + if err != nil { + return fmt.Errorf("failed to get unsettled billing: %v", err) + } + for userUID, amount := range unsettledAmounts { + err = m.ck.DB.Transaction(func(tx *gorm.DB) error { + // 1. deduct balance + if err := m.ck.AddDeductionBalanceWithDB(&types.UserQueryOpts{UID: userUID}, amount, tx); err != nil { + return fmt.Errorf("failed to deduct balance: %v", err) + } + // 2. update billing status + filter := bson.M{ + "user_uid": userUID, + "type": accountv1.SubConsumption, + "status": resources.Unsettled, + "app_type": resources.AppType[resources.LLMToken], + "time": bson.M{ + "$gte": startTime, + "$lte": endTime, + }, + } + update := bson.M{ + "$set": bson.M{ + "status": resources.Settled, + }, + } + + _, err = m.MongoDB.getBillingCollection().UpdateMany(context.Background(), filter, update) + if err != nil { + return fmt.Errorf("failed to update billing status: %v", err) + } + + return nil + }) + + // If the transaction fails, roll back the billing state + //if err != nil { + // err = fmt.Errorf("failed to reconcile billing for user %s: %v", userUID, err) + // filter := bson.M{ + // "user_uid": userUID, + // "app_type": resources.AppType["LLM-TOKEN"], + // "time": bson.M{ + // "$gte": time.Now().Add(-time.Hour), + // }, + // } + // update := bson.M{ + // "$set": bson.M{ + // "status": resources.Unsettled, + // }, + // } + // if _, rollBackErr := m.MongoDB.getBillingCollection().UpdateMany(context.Background(), filter, update); rollBackErr != nil { + // return fmt.Errorf("%v; And failed to rollback billing status: %v", err, rollBackErr) + // } + // return err + //} + if err != nil { + return fmt.Errorf("failed to reconcile billing for user %s: %v", userUID, err) + } + } + return nil +} + +func (m *Account) ArchiveHourlyBilling(hourStart, hourEnd time.Time) error { + pipeline := mongo.Pipeline{ + {{Key: "$match", Value: bson.M{ + "time": bson.M{ + "$gte": hourStart, + "$lt": hourEnd, + }, + "status": resources.Consumed, + }}}, + {{Key: "$group", Value: bson.M{ + "_id": bson.M{ + "user_uid": "$user_uid", + "app_type": "$app_type", + "app_name": "$app_name", + //"owner": "$owner", + "namespace": "$namespace", + }, + "total_amount": bson.M{"$sum": "$amount"}, + }}}, + } + + cursor, err := m.MongoDB.getActiveBillingCollection().Aggregate(context.Background(), pipeline) + if err != nil { + helper.ErrorCounter.WithLabelValues("ArchiveHourlyBilling", "Aggregate", "").Inc() + return fmt.Errorf("failed to aggregate hourly billing: %v", err) + } + defer cursor.Close(context.Background()) + + var errs []error + for cursor.Next(context.Background()) { + var result struct { + ID struct { + UserUID uuid.UUID `bson:"user_uid"` + AppName string `bson:"app_name"` + AppType string `bson:"app_type"` + Owner string `bson:"owner,omitempty"` + Namespace string `bson:"namespace"` + } `bson:"_id"` + TotalAmount int64 `bson:"total_amount"` + } + + if err := cursor.Decode(&result); err != nil { + errs = append(errs, fmt.Errorf("failed to decode document: %v", err)) + continue + } + if result.ID.Owner == "" { + userCr, err := m.ck.GetUserCr(&types.UserQueryOpts{UID: result.ID.UserUID}) + if err != nil { + helper.ErrorCounter.WithLabelValues("ArchiveHourlyBilling", "GetUserCr", result.ID.UserUID.String()).Inc() + errs = append(errs, fmt.Errorf("failed to get user cr: %v", err)) + continue + } + result.ID.Owner = userCr.CrName + } + + filter := bson.M{ + "app_type": resources.AppType[result.ID.AppType], + "app_name": result.ID.AppName, + "namespace": result.ID.Namespace, + "owner": result.ID.Owner, + "time": hourStart, + "type": accountv1.Consumption, + } + + billing := bson.M{ + "order_id": gonanoid.Must(12), + "type": accountv1.Consumption, + "namespace": result.ID.Namespace, + "app_type": resources.AppType[result.ID.AppType], + "app_name": result.ID.AppName, + "amount": result.TotalAmount, + "owner": result.ID.Owner, + "time": hourStart, + "status": resources.Settled, + "user_uid": result.ID.UserUID, + } + + update := bson.M{ + "$setOnInsert": billing, + } + + opts := options.Update().SetUpsert(true) + _, err = m.MongoDB.getBillingCollection().UpdateOne( + context.Background(), + filter, + update, + opts, + ) + if err != nil { + helper.ErrorCounter.WithLabelValues("ArchiveHourlyBilling", "UpdateOne", result.ID.UserUID.String()).Inc() + errs = append(errs, fmt.Errorf("failed to upsert billing for user %s, app %s: %v", + result.ID.UserUID, result.ID.AppName, err)) + continue + } + } + if err = cursor.Err(); err != nil { + errs = append(errs, fmt.Errorf("cursor error: %v", err)) + } + if len(errs) > 0 { + return fmt.Errorf("encountered %d errors during archiving: %v", len(errs), errs) + } + return nil +} + +func (m *MongoDB) reconcileUnsettledLLMBilling(startTime, endTime time.Time) (map[uuid.UUID]int64, error) { + pipeline := mongo.Pipeline{ + {{Key: "$match", Value: bson.M{ + "time": bson.M{ + "$gte": startTime, + "$lte": endTime, + }, + "status": resources.Unsettled, + "app_type": resources.AppType[resources.LLMToken], + }}}, + {{Key: "$group", Value: bson.M{ + "_id": "$user_uid", + "total_amount": bson.M{"$sum": "$amount"}, + }}}, + } + cursor, err := m.getBillingCollection().Aggregate(context.Background(), pipeline) + if err != nil { + return nil, fmt.Errorf("failed to aggregate billing: %v", err) + } + defer cursor.Close(context.Background()) + result := make(map[uuid.UUID]int64) + for cursor.Next(context.Background()) { + var doc struct { + ID uuid.UUID `bson:"_id"` + Amount int64 `bson:"total_amount"` + } + if err := cursor.Decode(&doc); err != nil { + return nil, fmt.Errorf("failed to decode document: %v", err) + } + result[doc.ID] = doc.Amount + } + if err := cursor.Err(); err != nil { + return nil, fmt.Errorf("cursor error: %v", err) + } + return result, nil +} diff --git a/service/account/dao/interface_test.go b/service/account/dao/interface_test.go index 01ef8b704c2..cf6259a2071 100644 --- a/service/account/dao/interface_test.go +++ b/service/account/dao/interface_test.go @@ -668,3 +668,28 @@ func TestMongoDB_GetMonitorUniqueValues(t *testing.T) { t.Logf("monitor = %+v\n", monitor) } } + +func TestAccount_ReconcileUnsettledLLMBilling(t *testing.T) { + type fields struct { + MongoDB *MongoDB + Cockroach *Cockroach + } + tests := []struct { + name string + fields fields + wantErr bool + }{ + // TODO: Add test cases. + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + m := &Account{ + MongoDB: tt.fields.MongoDB, + Cockroach: tt.fields.Cockroach, + } + if err := m.ReconcileUnsettledLLMBilling(time.Now().Add(-1*time.Hour), time.Now()); (err != nil) != tt.wantErr { + t.Errorf("ReconcileUnsettledLLMBilling() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} diff --git a/service/account/deploy/manifests/deploy.yaml.tmpl b/service/account/deploy/manifests/deploy.yaml.tmpl index 6e9b3622fda..2b4e0b797d9 100644 --- a/service/account/deploy/manifests/deploy.yaml.tmpl +++ b/service/account/deploy/manifests/deploy.yaml.tmpl @@ -43,6 +43,17 @@ spec: containers: - name: account-service image: ghcr.io/labring/sealos-account-service:latest + env: + - name: POD_NAME + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: metadata.name + - name: POD_NAMESPACE + valueFrom: + fieldRef: + apiVersion: v1 + fieldPath: metadata.namespace envFrom: - configMapRef: name: account-manager-env diff --git a/service/account/helper/common.go b/service/account/helper/common.go index ba88c53a52c..2a1634876e7 100644 --- a/service/account/helper/common.go +++ b/service/account/helper/common.go @@ -32,6 +32,13 @@ const ( GetUserRealNameInfo = "/real-name-info" ) +const ( + AdminGroup = "/admin/v1alpha1" + AdminGetAccountWithWorkspace = "/account-with-workspace" + AdminChargeBilling = "/charge-billing" + AdminActiveBilling = "/active-billing" +) + // env const ( ConfigPath = "/config/config.json" diff --git a/service/account/helper/jwt.go b/service/account/helper/jwt.go index dfef164d3e9..c47a05e3007 100644 --- a/service/account/helper/jwt.go +++ b/service/account/helper/jwt.go @@ -23,6 +23,7 @@ type UserClaims struct { } type JwtUser struct { + Requester string `json:"requester,omitempty"` UserUID uuid.UUID `json:"userUid,omitempty"` UserCrUID string `json:"userCrUid,omitempty"` UserCrName string `json:"userCrName,omitempty"` diff --git a/service/account/helper/metrics.go b/service/account/helper/metrics.go new file mode 100644 index 00000000000..61d1080905b --- /dev/null +++ b/service/account/helper/metrics.go @@ -0,0 +1,35 @@ +package helper + +import ( + "os" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +var ( + podName = os.Getenv("POD_NAME") + namespace = os.Getenv("POD_NAMESPACE") + domain = os.Getenv("DOMAIN") + + registry = prometheus.WrapRegistererWith( + prometheus.Labels{"pod_name": podName, "namespace": namespace, "domain": domain}, + prometheus.DefaultRegisterer, + ) + // ErrorCounter is a counter for account service errors + ErrorCounter = promauto.With(registry).NewCounterVec( + prometheus.CounterOpts{ + Name: "sealos_account_errors_total", + Help: "account service error counter", + }, + []string{"error_type", "function", "user_uid"}, + ) + // CallCounter is a counter for account service calls + CallCounter = promauto.With(registry).NewCounterVec( + prometheus.CounterOpts{ + Name: "sealos_account_calls_total", + Help: "account service call counter", + }, + []string{"function", "user_uid"}, + ) +) diff --git a/service/account/helper/request.go b/service/account/helper/request.go index 4d2994de617..7b35b956718 100644 --- a/service/account/helper/request.go +++ b/service/account/helper/request.go @@ -594,3 +594,20 @@ func ParseUserUsageReq(c *gin.Context) (*UserUsageReq, error) { } return userUsage, nil } + +type AdminChargeBillingReq struct { + Amount int64 `json:"amount" bson:"amount" example:"100000000"` + Namespace string `json:"namespace" bson:"namespace" example:"ns-admin"` + Owner string `json:"owner" bson:"owner" example:"admin"` + AppType string `json:"appType" bson:"appType"` + AppName string `json:"appName" bson:"appName"` + UserUID uuid.UUID `json:"userUID" bson:"userUID"` +} + +func ParseAdminChargeBillingReq(c *gin.Context) (*AdminChargeBillingReq, error) { + rechargeBilling := &AdminChargeBillingReq{} + if err := c.ShouldBindJSON(rechargeBilling); err != nil { + return nil, fmt.Errorf("bind json error: %v", err) + } + return rechargeBilling, nil +} diff --git a/service/account/helper/task.go b/service/account/helper/task.go new file mode 100644 index 00000000000..6dfe23e54b8 --- /dev/null +++ b/service/account/helper/task.go @@ -0,0 +1,83 @@ +package helper + +import ( + "context" + "sync" + + "github.com/sirupsen/logrus" +) + +type Task interface { + Execute() error +} + +type TaskQueue struct { + ctx context.Context + cancel context.CancelFunc + queue chan Task + workers int + wg sync.WaitGroup + //mu sync.Mutex + started bool +} + +func NewTaskQueue(ctx context.Context, workerCount, queueSize int) *TaskQueue { + ctx, cancel := context.WithCancel(ctx) + return &TaskQueue{ + ctx: ctx, + queue: make(chan Task, queueSize), + workers: workerCount, + cancel: cancel, + } +} + +func (tq *TaskQueue) AddTask(task Task) { + //tq.mu.Lock() + //defer tq.mu.Unlock() + //if tq.started { + // tq.queue <- task + //} else { + // fmt.Println("TaskQueue has not been started yet") + //} + select { + case <-tq.ctx.Done(): + logrus.Info("TaskQueue has been stopped.") + case tq.queue <- task: + } +} + +func (tq *TaskQueue) Start() { + if tq.started { + return + } + tq.started = true + for i := 0; i < tq.workers; i++ { + tq.wg.Add(1) + go tq.worker(i) + } +} + +func (tq *TaskQueue) Stop() { + tq.cancel() + tq.wg.Wait() + close(tq.queue) + logrus.Info("TaskQueue stopped") +} + +func (tq *TaskQueue) worker(id int) { + defer tq.wg.Done() + for { + select { + case <-tq.ctx.Done(): + return + case task, ok := <-tq.queue: + if !ok { + return + } + if err := task.Execute(); err != nil { + // TODO handle task execution failures + logrus.Errorf("Worker %d failed to process task: %v", id, err) + } + } + } +} diff --git a/service/account/router/router.go b/service/account/router/router.go index 4c875f5447a..60e5c7e30b8 100644 --- a/service/account/router/router.go +++ b/service/account/router/router.go @@ -4,13 +4,19 @@ import ( "context" "fmt" "log" + "net/http" "os" "os/signal" + "sync/atomic" "syscall" "time" + "github.com/sirupsen/logrus" + "github.com/labring/sealos/controllers/pkg/utils/env" + "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/labring/sealos/service/account/docs" "github.com/labring/sealos/service/account/dao" @@ -27,16 +33,16 @@ import ( func RegisterPayRouter() { router := gin.Default() - - if err := dao.InitDB(); err != nil { + ctx := context.Background() + if err := dao.Init(ctx); err != nil { log.Fatalf("Error initializing database: %v", err) } - ctx := context.Background() defer func() { if err := dao.DBClient.Disconnect(ctx); err != nil { log.Fatalf("Error disconnecting database: %v", err) } }() + router.GET("/metrics", gin.WrapH(promhttp.Handler())) // /account/v1alpha1/{/namespaces | /properties | {/costs | /costs/recharge | /costs/consumption | /costs/properties}} router.Group(helper.GROUP). POST(helper.GetHistoryNamespaces, api.GetBillingHistoryNamespaceList). @@ -67,19 +73,25 @@ func RegisterPayRouter() { POST(helper.UserUsage, api.UserUsage). POST(helper.GetRechargeDiscount, api.GetRechargeDiscount). POST(helper.GetUserRealNameInfo, api.GetUserRealNameInfo) + router.Group(helper.AdminGroup). + GET(helper.AdminGetAccountWithWorkspace, api.AdminGetAccountWithWorkspaceID). + POST(helper.AdminChargeBilling, api.AdminChargeBilling) + //POST(helper.AdminActiveBilling, api.AdminActiveBilling) docs.SwaggerInfo.Host = env.GetEnvWithDefault("SWAGGER_HOST", "localhost:2333") router.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerfiles.Handler)) // Create a buffered channel interrupt and use the signal. - interrupt := make(chan os.Signal, 1) - signal.Notify(interrupt, os.Interrupt, syscall.SIGTERM) + rootCtx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer stop() // Start the HTTP server to listen on port 2333. + srv := &http.Server{ + Addr: ":2333", + Handler: router, + } go func() { - err := router.Run(":2333") - fmt.Println("account service is running on port 2333") - if err != nil { - log.Fatalf("Error running server: %v", err) + if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Fatalf("listen: %s\n", err) } }() @@ -88,10 +100,25 @@ func RegisterPayRouter() { fmt.Println("Start reward processing timer") go startRewardProcessingTimer(ctx) } + dao.BillingTask.Start() + // process llm task + go startReconcileBilling(ctx) + + // process hourly archive + go startHourlyBillingActiveArchive(ctx) // Wait for interrupt signal. - <-interrupt + <-rootCtx.Done() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + if err := srv.Shutdown(ctx); err != nil { + log.Fatal("Server forced to shutdown: ", err) + } + dao.BillingTask.Stop() + + log.Println("Server exiting") // Terminate procedure. os.Exit(0) } @@ -111,3 +138,70 @@ func startRewardProcessingTimer(ctx context.Context) { } } } + +var lastReconcileTime atomic.Value + +func startReconcileBilling(ctx context.Context) { + // initialize to one hour ago + lastReconcileTime.Store(time.Now().UTC().Add(-time.Hour)) + + tickerTime, err := time.ParseDuration(env.GetEnvWithDefault("BILLING_RECONCILE_INTERVAL", "5s")) + if err != nil { + logrus.Errorf("Failed to parse LLM_BILLING_RECONCILE_INTERVAL: %v", err) + tickerTime = 5 * time.Second + } + // create a timer and execute it once every minute + ticker := time.NewTicker(tickerTime) + defer ticker.Stop() + + logrus.Info("Starting LLM billing reconciliation service, interval: ", tickerTime.String()) + + // This command is executed for the first time to process the data within the last hour + startTime, endTime := time.Now().UTC().Add(-time.Hour), time.Now().UTC() + dao.BillingTask.AddTask(&dao.ActiveBillingReconcile{ + StartTime: startTime, + EndTime: endTime, + }) + lastReconcileTime.Store(endTime) + + for { + select { + case <-ctx.Done(): + logrus.Info("Stopping LLM billing reconciliation service") + return + case t := <-ticker.C: + currentTime := t.UTC() + lastTime := lastReconcileTime.Load().(time.Time) + //doBillingReconcile(lastTime, currentTime) + dao.BillingTask.AddTask(&dao.ActiveBillingReconcile{ + StartTime: lastTime, + EndTime: currentTime, + }) + lastReconcileTime.Store(currentTime) + } + } +} + +func startHourlyBillingActiveArchive(ctx context.Context) { + logrus.Info("Starting hourly billing active archive service") + now := time.Now().UTC() + lastHourStart := time.Date(now.Year(), now.Month(), now.Day(), now.Hour()-1, 0, 0, 0, now.Location()) + + dao.BillingTask.AddTask(&dao.ArchiveBillingReconcile{ + StartTime: lastHourStart, + }) + nextHour := time.Date(now.Year(), now.Month(), now.Day(), now.Hour()+1, 0, 0, 0, now.Location()) + for { + select { + case <-ctx.Done(): + logrus.Info("Stopping hourly billing archive service") + return + case <-time.After(time.Until(nextHour)): + currentHour := nextHour.Add(-time.Hour) + dao.BillingTask.AddTask(&dao.ArchiveBillingReconcile{ + StartTime: currentHour, + }) + nextHour = nextHour.Add(time.Hour) + } + } +} diff --git a/service/go.work.sum b/service/go.work.sum index 893f229853f..dce39ff6b4c 100644 --- a/service/go.work.sum +++ b/service/go.work.sum @@ -1167,6 +1167,7 @@ github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b h1:0LFwY6Q3g github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= github.com/pquerna/cachecontrol v0.1.0/go.mod h1:NrUG3Z7Rdu85UNR3vm7SOsl1nFIeSiQnrHV5K9mBcUI= github.com/prometheus/client_golang v1.15.1/go.mod h1:e9yaBhRPU2pPNsZwE+JdQl0KEt1N9XgF6zxWmaC0xOk= +github.com/prometheus/client_golang v1.17.0/go.mod h1:VeL+gMmOAxkS2IqfCq0ZmHSL+LjWfWDUmp1mBz9JgUY= github.com/prometheus/client_golang v1.18.0/go.mod h1:T+GXkCk5wSJyOqMIzVgvvjFDlkOQntgjkJWKrN5txjA= github.com/prometheus/client_golang v1.19.0 h1:ygXvpU1AoN1MhdzckN+PyD9QJOSD4x7kmXYlnfbA6JU= github.com/prometheus/client_golang v1.19.0/go.mod h1:ZRM9uEAypZakd+q/x7+gmsvXdURP+DABIEIjnmDdp+k=