Skip to content

Commit

Permalink
feat: aiproxy errorlog and hide get balance error (#5230)
Browse files Browse the repository at this point in the history
* fix: hide get balance error

* fix: err message

* fix: consume no need reqid
  • Loading branch information
zijiren233 authored Nov 23, 2024
1 parent f9dc424 commit 6eab0a8
Show file tree
Hide file tree
Showing 9 changed files with 186 additions and 74 deletions.
28 changes: 14 additions & 14 deletions service/aiproxy/common/balance/sealos.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ func cacheSetGroupBalance(ctx context.Context, group string, balance int64, user
if !common.RedisEnabled || !sealosRedisCacheEnable {
return nil
}
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
pipe := common.RDB.Pipeline()
pipe.HSet(ctx, fmt.Sprintf(sealosGroupBalanceKey, group), sealosCache{
Balance: balance,
Expand All @@ -117,6 +119,8 @@ func cacheGetGroupBalance(ctx context.Context, group string) (*sealosCache, erro
if !common.RedisEnabled || !sealosRedisCacheEnable {
return nil, redis.Nil
}
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
var cache sealosCache
if err := common.RDB.HGetAll(ctx, fmt.Sprintf(sealosGroupBalanceKey, group)).Scan(&cache); err != nil {
return nil, err
Expand Down Expand Up @@ -161,26 +165,25 @@ func (s *Sealos) getGroupRemainBalance(ctx context.Context, group string) (float
return decimal.NewFromInt(cache.Balance).Div(decimalBalancePrecision).InexactFloat64(),
newSealosPostGroupConsumer(s.accountURL, group, cache.UserUID, cache.Balance), nil
} else if err != nil && !errors.Is(err, redis.Nil) {
logger.Errorf(ctx, "get group (%s) balance cache failed: %s", group, err)
logger.SysErrorf("get group (%s) balance cache failed: %s", group, err)
}

ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

balance, userUID, err := s.fetchBalanceFromAPI(ctx, group)
if err != nil {
return 0, nil, err
}

if err := cacheSetGroupBalance(ctx, group, balance, userUID); err != nil {
logger.Errorf(ctx, "set group (%s) balance cache failed: %s", group, err)
logger.SysErrorf("set group (%s) balance cache failed: %s", group, err)
}

return decimal.NewFromInt(balance).Div(decimalBalancePrecision).InexactFloat64(),
newSealosPostGroupConsumer(s.accountURL, group, userUID, balance), nil
}

func (s *Sealos) fetchBalanceFromAPI(ctx context.Context, group string) (balance int64, userUID string, err error) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodGet,
fmt.Sprintf("%s/admin/v1alpha1/account-with-workspace?namespace=%s", s.accountURL, group), nil)
if err != nil {
Expand All @@ -200,8 +203,7 @@ func (s *Sealos) fetchBalanceFromAPI(ctx context.Context, group string) (balance
}

if sealosResp.Error != "" {
logger.Errorf(ctx, "get group (%s) balance failed: %s", group, sealosResp.Error)
return 0, "", fmt.Errorf("get group (%s) balance failed", group)
return 0, "", errors.New(sealosResp.Error)
}

if resp.StatusCode != http.StatusOK {
Expand Down Expand Up @@ -235,7 +237,7 @@ func (s *SealosPostGroupConsumer) PostGroupConsume(ctx context.Context, tokenNam
amount := s.calculateAmount(usage)

if err := cacheDecreaseGroupBalance(ctx, s.group, amount.IntPart()); err != nil {
logger.Errorf(ctx, "decrease group (%s) balance cache failed: %s", s.group, err)
logger.SysErrorf("decrease group (%s) balance cache failed: %s", s.group, err)
}

if err := s.postConsume(ctx, amount.IntPart(), tokenName); err != nil {
Expand Down Expand Up @@ -276,19 +278,17 @@ func (s *SealosPostGroupConsumer) postConsume(ctx context.Context, amount int64,
req.Header.Set("Authorization", "Bearer "+jwtToken)
resp, err := sealosHTTPClient.Do(req)
if err != nil {
return fmt.Errorf("post group (%s) consume failed: %w", s.group, err)
return err
}
defer resp.Body.Close()

var sealosResp sealosPostGroupConsumeResp
if err := json.NewDecoder(resp.Body).Decode(&sealosResp); err != nil {
return fmt.Errorf("post group (%s) consume failed: %w", s.group, err)
return err
}

if resp.StatusCode != http.StatusOK {
logger.Errorf(ctx, "group (%s) consume failed with status code %d: %s",
s.group, resp.StatusCode, sealosResp.Error)
return fmt.Errorf("group (%s) consume failed with status code %d", s.group, resp.StatusCode)
if resp.StatusCode != http.StatusOK || sealosResp.Error != "" {
return fmt.Errorf("status code: %d, error: %s", resp.StatusCode, sealosResp.Error)
}

return nil
Expand Down
3 changes: 2 additions & 1 deletion service/aiproxy/controller/channel-billing.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,9 +389,10 @@ func GetSubscription(c *gin.Context) {
group := c.GetString(ctxkey.Group)
b, _, err := balance.Default.GetGroupRemainBalance(c, group)
if err != nil {
logger.Errorf(c, "get group (%s) balance failed: %s", group, err)
c.JSON(http.StatusOK, gin.H{
"success": false,
"message": err.Error(),
"message": fmt.Sprintf("get group (%s) balance failed", group),
})
return
}
Expand Down
2 changes: 1 addition & 1 deletion service/aiproxy/controller/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func shouldRetry(c *gin.Context, statusCode int) bool {
}

func processChannelRelayError(ctx context.Context, group string, channelID int, err *model.ErrorWithStatusCode) {
logger.Errorf(ctx, "relay error (channel id %d, group: %s): %s", channelID, group, err.Message)
logger.Errorf(ctx, "relay error (channel id %d, group: %s): %s", channelID, group, err)
// https://platform.openai.com/docs/guides/error-codes/api-errors
if monitor.ShouldDisableChannel(&err.Error, err.StatusCode) {
_ = dbmodel.DisableChannelByID(channelID)
Expand Down
3 changes: 1 addition & 2 deletions service/aiproxy/relay/adaptor/openai/adaptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ func (a *Adaptor) GetRequestURL(meta *meta.Meta) (string, error) {
requestURL := strings.Split(meta.RequestURLPath, "?")[0]
requestURL = fmt.Sprintf("%s?api-version=%s", requestURL, meta.Config.APIVersion)
task := strings.TrimPrefix(requestURL, "/v1/")
model := meta.ActualModelName
model = strings.Replace(model, ".", "", -1)
model := strings.ReplaceAll(meta.ActualModelName, ".", "")
// https://github.com/labring/sealos/service/aiproxy/issues/1191
// {your endpoint}/openai/deployments/{your azure_model}/chat/completions?api-version={api_version}
requestURL = fmt.Sprintf("/openai/deployments/%s/%s", model, task)
Expand Down
54 changes: 44 additions & 10 deletions service/aiproxy/relay/controller/audio.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"github.com/labring/sealos/service/aiproxy/common"
"github.com/labring/sealos/service/aiproxy/common/balance"
"github.com/labring/sealos/service/aiproxy/common/ctxkey"
"github.com/labring/sealos/service/aiproxy/common/helper"
"github.com/labring/sealos/service/aiproxy/common/logger"
"github.com/labring/sealos/service/aiproxy/relay"
"github.com/labring/sealos/service/aiproxy/relay/adaptor/openai"
"github.com/labring/sealos/service/aiproxy/relay/meta"
Expand Down Expand Up @@ -78,7 +78,12 @@ func RelayAudioHelper(c *gin.Context, relayMode int) *relaymodel.ErrorWithStatus

groupRemainBalance, postGroupConsumer, err := balance.Default.GetGroupRemainBalance(c.Request.Context(), group)
if err != nil {
return openai.ErrorWrapper(err, "get_group_balance_failed", http.StatusInternalServerError)
logger.Errorf(c, "get group (%s) balance failed: %s", group, err)
return openai.ErrorWrapper(
fmt.Errorf("get group (%s) balance failed", group),
"get_group_balance_failed",
http.StatusInternalServerError,
)
}

preConsumedAmount := decimal.NewFromInt(int64(meta.PromptTokens)).
Expand All @@ -92,28 +97,57 @@ func RelayAudioHelper(c *gin.Context, relayMode int) *relaymodel.ErrorWithStatus

resp, err := adaptor.DoRequest(c, meta, body)
if err != nil {
logger.Errorf(c, "do request failed: %s", err.Error())
ConsumeWaitGroup.Add(1)
go postConsumeAmount(context.Background(),
&ConsumeWaitGroup,
postGroupConsumer,
http.StatusInternalServerError,
c.Request.URL.Path,
nil, meta, price, completionPrice, err.Error(),
)
return openai.ErrorWrapper(err, "do_request_failed", http.StatusInternalServerError)
}

consumeCtx := context.WithValue(context.Background(), helper.RequestIDKey, c.Value(helper.RequestIDKey))

if resp.StatusCode != http.StatusOK {
if isErrorHappened(meta, resp) {
err := RelayErrorHandler(resp)
ConsumeWaitGroup.Add(1)
go postConsumeAmount(consumeCtx, &ConsumeWaitGroup, postGroupConsumer, resp.StatusCode, c.Request.URL.Path, &relaymodel.Usage{
PromptTokens: 0,
CompletionTokens: 0,
}, meta, price, completionPrice, err.Message)
go postConsumeAmount(context.Background(),
&ConsumeWaitGroup,
postGroupConsumer,
resp.StatusCode,
c.Request.URL.Path,
nil,
meta,
price,
completionPrice,
err.String(),
)
return err
}

usage, respErr := adaptor.DoResponse(c, resp, meta)
if respErr != nil {
logger.Errorf(c, "do response failed: %s", respErr)
ConsumeWaitGroup.Add(1)
go postConsumeAmount(context.Background(),
&ConsumeWaitGroup,
postGroupConsumer,
respErr.StatusCode,
c.Request.URL.Path,
nil, meta, price, completionPrice, respErr.String(),
)
return respErr
}

ConsumeWaitGroup.Add(1)
go postConsumeAmount(consumeCtx, &ConsumeWaitGroup, postGroupConsumer, resp.StatusCode, c.Request.URL.Path, usage, meta, price, completionPrice, "")
go postConsumeAmount(context.Background(),
&ConsumeWaitGroup,
postGroupConsumer,
resp.StatusCode,
c.Request.URL.Path,
usage, meta, price, completionPrice, "",
)

return nil
}
4 changes: 2 additions & 2 deletions service/aiproxy/relay/controller/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@ func getPreConsumedAmount(textRequest *relaymodel.GeneralOpenAIRequest, promptTo
InexactFloat64()
}

func preCheckGroupBalance(ctx context.Context, textRequest *relaymodel.GeneralOpenAIRequest, promptTokens int, price float64, meta *meta.Meta) (bool, balance.PostGroupConsumer, *relaymodel.ErrorWithStatusCode) {
func preCheckGroupBalance(ctx context.Context, textRequest *relaymodel.GeneralOpenAIRequest, promptTokens int, price float64, meta *meta.Meta) (bool, balance.PostGroupConsumer, error) {
preConsumedAmount := getPreConsumedAmount(textRequest, promptTokens, price)

groupRemainBalance, postGroupConsumer, err := balance.Default.GetGroupRemainBalance(ctx, meta.Group)
if err != nil {
return false, nil, openai.ErrorWrapper(err, "get_group_quota_failed", http.StatusInternalServerError)
return false, nil, err
}
if groupRemainBalance < preConsumedAmount {
return false, nil, nil
Expand Down
85 changes: 57 additions & 28 deletions service/aiproxy/relay/controller/image.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ import (
"github.com/gin-gonic/gin"
"github.com/labring/sealos/service/aiproxy/common"
"github.com/labring/sealos/service/aiproxy/common/balance"
"github.com/labring/sealos/service/aiproxy/common/helper"
"github.com/labring/sealos/service/aiproxy/common/logger"
"github.com/labring/sealos/service/aiproxy/model"
"github.com/labring/sealos/service/aiproxy/relay"
"github.com/labring/sealos/service/aiproxy/relay/adaptor/openai"
"github.com/labring/sealos/service/aiproxy/relay/channeltype"
Expand Down Expand Up @@ -144,51 +142,82 @@ func RelayImageHelper(c *gin.Context, _ int) *relaymodel.ErrorWithStatusCode {

groupRemainBalance, postGroupConsumer, err := balance.Default.GetGroupRemainBalance(ctx, meta.Group)
if err != nil {
return openai.ErrorWrapper(err, "get_group_remain_balance_failed", http.StatusInternalServerError)
logger.Errorf(ctx, "get group (%s) balance failed: %s", meta.Group, err)
return openai.ErrorWrapper(
fmt.Errorf("get group (%s) balance failed", meta.Group),
"get_group_remain_balance_failed",
http.StatusInternalServerError,
)
}

amount := decimal.NewFromFloat(imageCostPrice).Mul(decimal.NewFromInt(int64(imageRequest.N))).InexactFloat64()

if groupRemainBalance-amount < 0 {
return openai.ErrorWrapper(errors.New("group balance is not enough"), "insufficient_group_balance", http.StatusForbidden)
return openai.ErrorWrapper(
errors.New("group balance is not enough"),
"insufficient_group_balance",
http.StatusForbidden,
)
}

// do request
resp, err := adaptor.DoRequest(c, meta, requestBody)
if err != nil {
logger.Errorf(ctx, "DoRequest failed: %s", err.Error())
logger.Errorf(ctx, "do request failed: %s", err.Error())
ConsumeWaitGroup.Add(1)
go postConsumeAmount(context.Background(),
&ConsumeWaitGroup,
postGroupConsumer,
http.StatusInternalServerError,
c.Request.URL.Path, nil, meta, imageCostPrice, 0, err.Error(),
)
return openai.ErrorWrapper(err, "do_request_failed", http.StatusInternalServerError)
}

defer func() {
if resp == nil || resp.StatusCode != http.StatusOK {
_ = model.RecordConsumeLog(ctx, meta.Group, resp.StatusCode, meta.ChannelID, imageRequest.N, 0, imageRequest.Model, meta.TokenID, meta.TokenName, 0, imageCostPrice, 0, c.Request.URL.Path, imageRequest.Size)
return
}

consumeCtx := context.WithValue(context.Background(), helper.RequestIDKey, ctx.Value(helper.RequestIDKey))
_amount, err := postGroupConsumer.PostGroupConsume(consumeCtx, meta.TokenName, amount)
if err != nil {
logger.Error(ctx, "error consuming token remain balance: "+err.Error())
err = model.CreateConsumeError(meta.Group, meta.TokenName, imageRequest.Model, err.Error(), amount, meta.TokenID)
if err != nil {
logger.Error(ctx, "failed to create consume error: "+err.Error())
}
} else {
amount = _amount
}
err = model.BatchRecordConsume(consumeCtx, meta.Group, resp.StatusCode, meta.ChannelID, imageRequest.N, 0, imageRequest.Model, meta.TokenID, meta.TokenName, amount, imageCostPrice, 0, c.Request.URL.Path, imageRequest.Size)
if err != nil {
logger.Error(ctx, "failed to record consume log: "+err.Error())
}
}()
if isErrorHappened(meta, resp) {
err := RelayErrorHandler(resp)
ConsumeWaitGroup.Add(1)
go postConsumeAmount(context.Background(),
&ConsumeWaitGroup,
postGroupConsumer,
resp.StatusCode,
c.Request.URL.Path,
nil,
meta,
imageCostPrice,
0,
err.String(),
)
return err
}

// do response
_, respErr := adaptor.DoResponse(c, resp, meta)
if respErr != nil {
logger.Errorf(ctx, "respErr is not nil: %+v", respErr)
logger.Errorf(ctx, "do response failed: %s", respErr)
ConsumeWaitGroup.Add(1)
go postConsumeAmount(context.Background(),
&ConsumeWaitGroup,
postGroupConsumer,
respErr.StatusCode,
c.Request.URL.Path,
nil,
meta,
imageCostPrice,
0,
respErr.String(),
)
return respErr
}

ConsumeWaitGroup.Add(1)
go postConsumeAmount(context.Background(),
&ConsumeWaitGroup,
postGroupConsumer,
resp.StatusCode,
c.Request.URL.Path,
nil, meta, imageCostPrice, 0, imageRequest.Size,
)

return nil
}
Loading

0 comments on commit 6eab0a8

Please sign in to comment.