Skip to content

Commit

Permalink
moved the dynamic sql generating logic from prepared statements to re…
Browse files Browse the repository at this point in the history
…gular statements
  • Loading branch information
ice-ares committed Mar 27, 2023
1 parent 2104dc6 commit 624f577
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 70 deletions.
8 changes: 4 additions & 4 deletions application.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -173,31 +173,31 @@ tokenomics: &tokenomics
- 20
tokenomics1:
messageBroker:
consumerGroup: freezer-balance-recalculation-trigger-stream
consumerGroup: freezer-refrigerant-balance-recalculation-trigger-stream
urls:
- localhost:9092
consumingTopics:
- name: balance-recalculation-trigger-stream
oneGoroutinePerPartition: true
tokenomics2:
messageBroker:
consumerGroup: freezer-mining-rates-recalculation-trigger-stream
consumerGroup: freezer-refrigerant-mining-rates-recalculation-trigger-stream
urls:
- localhost:9092
consumingTopics:
- name: mining-rates-recalculation-trigger-stream
oneGoroutinePerPartition: true
tokenomics3:
messageBroker:
consumerGroup: freezer-blockchain-balance-synchronization-trigger-stream
consumerGroup: freezer-refrigerant-blockchain-balance-synchronization-trigger-stream
urls:
- localhost:9092
consumingTopics:
- name: blockchain-balance-synchronization-trigger-stream
oneGoroutinePerPartition: true
tokenomics4:
messageBroker:
consumerGroup: freezer-extra-bonus-processing-trigger-stream
consumerGroup: freezer-refrigerant-extra-bonus-processing-trigger-stream
urls:
- localhost:9092
consumingTopics:
Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ require (
github.com/goccy/go-json v0.10.2
github.com/google/uuid v1.3.0
github.com/hashicorp/go-multierror v1.1.1
github.com/ice-blockchain/eskimo v1.103.0
github.com/ice-blockchain/go-tarantool-client v0.0.0-20230322193140-81ac2079df0c
github.com/ice-blockchain/eskimo v1.105.0
github.com/ice-blockchain/go-tarantool-client v0.0.0-20230327200757-4fc71fa3f7bb
github.com/ice-blockchain/wintr v1.101.0
github.com/pkg/errors v0.9.1
github.com/stretchr/testify v1.8.2
Expand Down Expand Up @@ -66,7 +66,7 @@ require (
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/imdario/mergo v0.3.15 // indirect
github.com/imroc/req/v3 v3.33.1 // indirect
github.com/imroc/req/v3 v3.33.2 // indirect
github.com/ip2location/ip2location-go/v9 v9.5.0 // indirect
github.com/joho/godotenv v1.5.1 // indirect
github.com/josharian/intern v1.0.0 // indirect
Expand Down Expand Up @@ -131,7 +131,7 @@ require (
google.golang.org/api v0.114.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/appengine/v2 v2.0.2 // indirect
google.golang.org/genproto v0.0.0-20230323212658-478b75c54725 // indirect
google.golang.org/genproto v0.0.0-20230327152035-dc694ad2151e // indirect
google.golang.org/grpc v1.54.0 // indirect
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
Expand Down
16 changes: 8 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -268,16 +268,16 @@ github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ice-blockchain/eskimo v1.103.0 h1:Q8b5udJLYjyKBre4whunpLiWTSQvr0S2Gp0szTcvlAY=
github.com/ice-blockchain/eskimo v1.103.0/go.mod h1:gFqV1dqTJsGRAAwPWi0h2/s/HYo8zRYxLOynbTAXQq0=
github.com/ice-blockchain/go-tarantool-client v0.0.0-20230322193140-81ac2079df0c h1:c0fZ+DTu1zlEggEGckFzdGx0tuWQDPf4PjfAbQkHFAU=
github.com/ice-blockchain/go-tarantool-client v0.0.0-20230322193140-81ac2079df0c/go.mod h1:ZsQU7i3mxhgBBu43Oev7WPFbIjP4TniN/b1UPNGbrq8=
github.com/ice-blockchain/eskimo v1.105.0 h1:T4+Sl0xXZ8iC+5k84JkBzzb6XLs6ZhUwiUbhgaPgbVE=
github.com/ice-blockchain/eskimo v1.105.0/go.mod h1:ckOx9UtAIICYBJ2UYRauf9puNnArZAE7T4ei/cZ7GLc=
github.com/ice-blockchain/go-tarantool-client v0.0.0-20230327200757-4fc71fa3f7bb h1:8TnFP3mc7O+tc44kv2e0/TpZKnEVUaKH+UstwfBwRkk=
github.com/ice-blockchain/go-tarantool-client v0.0.0-20230327200757-4fc71fa3f7bb/go.mod h1:ZsQU7i3mxhgBBu43Oev7WPFbIjP4TniN/b1UPNGbrq8=
github.com/ice-blockchain/wintr v1.101.0 h1:84dJ+1TnlU4jLPnGO/om2BgigvOXhQhxDDNX1lGh/MY=
github.com/ice-blockchain/wintr v1.101.0/go.mod h1:YElO+8IEByLZPSr5/zSmI9H49Dg5GnjcouAgUv2zEC4=
github.com/imdario/mergo v0.3.15 h1:M8XP7IuFNsqUx6VPK2P9OSmsYsI/YFaGil0uD21V3dM=
github.com/imdario/mergo v0.3.15/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY=
github.com/imroc/req/v3 v3.33.1 h1:BZnyl+K0hXcJlZBHY2CqbPgmVc1pPJDzjn6aJfB6shI=
github.com/imroc/req/v3 v3.33.1/go.mod h1:cZ+7C3L/AYOr4tLGG16hZF90F1WzAdAdzt1xFSlizXY=
github.com/imroc/req/v3 v3.33.2 h1:mqphLIo++p+IPYdjgP/Wd5rqXUjKvuEIst2U+EsLIwQ=
github.com/imroc/req/v3 v3.33.2/go.mod h1:cZ+7C3L/AYOr4tLGG16hZF90F1WzAdAdzt1xFSlizXY=
github.com/ip2location/ip2location-go/v9 v9.5.0 h1:7gqKncm4MhBrpJIK0PmV8o6Bf8YbbSAPjORzyjAv1iM=
github.com/ip2location/ip2location-go/v9 v9.5.0/go.mod h1:s5SV6YZL10TpfPpXw//7fEJC65G/yH7Oh+Tjq9JcQEQ=
github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0=
Expand Down Expand Up @@ -795,8 +795,8 @@ google.golang.org/genproto v0.0.0-20201210142538-e3217bee35cc/go.mod h1:FWY/as6D
google.golang.org/genproto v0.0.0-20201214200347-8c77b98c765d/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20210108203827-ffc7fda8c3d7/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20210226172003-ab064af71705/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20230323212658-478b75c54725 h1:VmCWItVXcKboEMCwZaWge+1JLiTCQSngZeINF+wzO+g=
google.golang.org/genproto v0.0.0-20230323212658-478b75c54725/go.mod h1:UUQDJDOlWu4KYeJZffbWgBkS1YFobzKbLVfK69pe0Ak=
google.golang.org/genproto v0.0.0-20230327152035-dc694ad2151e h1:rRGPYd0STm9H4Ci+iGrSLG35mkAKY41/nzCcG7PQADw=
google.golang.org/genproto v0.0.0-20230327152035-dc694ad2151e/go.mod h1:UUQDJDOlWu4KYeJZffbWgBkS1YFobzKbLVfK69pe0Ak=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38=
google.golang.org/grpc v1.21.1/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM=
Expand Down
36 changes: 14 additions & 22 deletions tokenomics/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -426,18 +426,15 @@ func (r *repository) insertOrReplaceBalances( //nolint:revive // Alot of SQL par
if ctx.Err() != nil || len(balances) == 0 {
return errors.Wrap(ctx.Err(), "unexpected deadline")
}
const balanceFields = 5
params := make(map[string]any, 1+(balanceFields*len(balances)))
params["updated_at"] = updatedAt
values := make([]string, 0, len(balances))
for ix, bal := range balances {
params[fmt.Sprintf("user_id_%v", ix)] = bal.UserID
params[fmt.Sprintf("type_%v", ix)] = bal.Type
params[fmt.Sprintf("type_detail_%v", ix)] = bal.TypeDetail
params[fmt.Sprintf("negative_%v", ix)] = bal.Negative
params[fmt.Sprintf("amount_%v", ix)] = bal.Amount
value := fmt.Sprintf(`(:updated_at,:user_id_%[1]v,:type_%[1]v,:type_detail_%[1]v,:negative_%[1]v,:amount_%[1]v)`, ix)
values = append(values, value)
for _, bal := range balances {
if bal.Amount.IsNil() {
bal.Amount = coin.ZeroICEFlakes()
}
amount, err := bal.Amount.Uint.Marshal()
log.Panic(err)
values = append(values, fmt.Sprintf(`(%[1]v,'%[2]v',%[3]v,'%[4]v',%[5]v,'%[6]v')`,
updatedAt.UnixNano(), bal.UserID, bal.Type, bal.TypeDetail, bal.Negative, string(amount)))
}
insertOrReplace := "REPLACE"
if insert {
Expand All @@ -451,7 +448,7 @@ func (r *repository) insertOrReplaceBalances( //nolint:revive // Alot of SQL par
log.Info(fmt.Sprintf("[response]insert:%v replace balances SQL took: %v", insertOrReplace, elapsed))
}
}()
if _, err := storage.CheckSQLDMLResponse(r.db.PrepareExecute(sql, params)); err != nil {
if _, err := storage.CheckSQLDMLResponse(r.db.Execute(sql)); err != nil {
return errors.Wrapf(err, "failed at %v to %v balances:%#v", updatedAt, insertOrReplace, balances)
}

Expand All @@ -462,19 +459,14 @@ func (r *repository) deleteBalances(ctx context.Context, workerIndex uint64, bal
if ctx.Err() != nil || len(balances) == 0 {
return errors.Wrap(ctx.Err(), "context failed")
}
const fields = 4
params := make(map[string]any, fields*len(balances))
values := make([]string, 0, len(balances))
for ix, bal := range balances {
params[fmt.Sprintf("user_id_%v", ix)] = bal.UserID
params[fmt.Sprintf("type_%v", ix)] = bal.Type
params[fmt.Sprintf("type_detail_%v", ix)] = bal.TypeDetail
params[fmt.Sprintf("negative_%v", ix)] = bal.Negative
values = append(values, fmt.Sprintf(`(user_id = :user_id_%[1]v AND negative = :negative_%[1]v AND type = :type_%[1]v AND type_detail = :type_detail_%[1]v)`, ix)) //nolint:lll // .
for _, bal := range balances {
values = append(values, fmt.Sprintf(`(user_id = '%[1]v' AND negative = %[2]v AND type = %[3]v AND type_detail = '%[4]v')`,
bal.UserID, bal.Negative, bal.Type, bal.TypeDetail))
}
sql := fmt.Sprintf(`DELETE FROM balances_%v WHERE %v`, workerIndex, strings.Join(values, " OR "))
if _, err := storage.CheckSQLDMLResponse(r.db.PrepareExecute(sql, params)); err != nil {
return errors.Wrapf(err, "failed to DELETE from balances for params:%#v", params)
if _, err := storage.CheckSQLDMLResponse(r.db.Execute(sql)); err != nil {
return errors.Wrapf(err, "failed to DELETE from balances for values:%#v", values)
}

return nil
Expand Down
11 changes: 3 additions & 8 deletions tokenomics/balance_recalculation.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,20 +555,15 @@ func (s *balanceRecalculationTriggerStreamSource) stopWorkerForUsers(
if ctx.Err() != nil || len(lastMiningEndedAtPerUserID) == 0 {
return errors.Wrap(ctx.Err(), "unexpected deadline")
}
params := make(map[string]any, (1+1)*len(lastMiningEndedAtPerUserID))
conditions := make([]string, 0, len(lastMiningEndedAtPerUserID))
ix := 0
for userID, lastMiningEndedAt := range lastMiningEndedAtPerUserID {
params[fmt.Sprintf("user_id%v", ix)] = userID
params[fmt.Sprintf("last_mining_ended_at%v", ix)] = lastMiningEndedAt
conditions = append(conditions, fmt.Sprintf("(user_id = :user_id%[1]v AND last_mining_ended_at = :last_mining_ended_at%[1]v)", ix))
ix++
conditions = append(conditions, fmt.Sprintf("(user_id = '%[1]v' AND last_mining_ended_at = %[2]v)", userID, lastMiningEndedAt.UnixNano()))
}
sql := fmt.Sprintf(`UPDATE balance_recalculation_worker_%[1]v
SET enabled = FALSE
WHERE %v`, workerIndex, strings.Join(conditions, " OR "))
if _, err := storage.CheckSQLDMLResponse(s.db.PrepareExecute(sql, params)); err != nil {
return errors.Wrapf(err, "failed to update balance_recalculation_worker_%v SET enabled = FALSE for params:%#v", workerIndex, params)
if _, err := storage.CheckSQLDMLResponse(s.db.Execute(sql)); err != nil {
return errors.Wrapf(err, "failed to update balance_recalculation_worker_%v SET enabled = FALSE for conditions:%#v", workerIndex, conditions)
}

return nil
Expand Down
19 changes: 7 additions & 12 deletions tokenomics/blockchain_balance_synchronization.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,24 +229,19 @@ func (s *blockchainBalanceSynchronizationTriggerStreamSource) updateBalances( //
PreStakingICEFlake string
}
values := make([]string, 0, len(bs))
const balanceFields = 6
params := make(map[string]any, len(bs)*balanceFields)
blockchainMessages := make([]*blockchainMessage, 0, len(bs))
for ix, bal := range bs {
for _, bal := range bs {
if bal.Standard.IsNil() {
bal.Standard = coin.ZeroICEFlakes()
}
if bal.PreStaking.IsNil() {
bal.PreStaking = coin.ZeroICEFlakes()
}
total := coin.New(bal.Standard.Add(bal.PreStaking))
params[fmt.Sprintf("amount%v", ix)] = total.Amount
params[fmt.Sprintf("amount_w0%v", ix)] = total.AmountWord0
params[fmt.Sprintf("amount_w1%v", ix)] = total.AmountWord1
params[fmt.Sprintf("amount_w2%v", ix)] = total.AmountWord2
params[fmt.Sprintf("amount_w3%v", ix)] = total.AmountWord3
params[fmt.Sprintf("user_id%v", ix)] = bal.UserID
values = append(values, fmt.Sprintf("(:amount%[1]v,:amount_w0%[1]v,:amount_w1%[1]v,:amount_w2%[1]v,:amount_w3%[1]v,:user_id%[1]v)", ix))
totalAmount, err := total.Amount.Uint.Marshal()
log.Panic(err)
values = append(values, fmt.Sprintf("('%[1]v',%[2]v,%[3]v,%[4]v,%[5]v,'%[6]v')",
string(totalAmount), total.AmountWord0, total.AmountWord1, total.AmountWord2, total.AmountWord3, bal.UserID))
if bal.miningBlockchainAccountAddress != "" {
blockchainMessages = append(blockchainMessages, &blockchainMessage{
AccountAddress: bal.miningBlockchainAccountAddress,
Expand All @@ -256,8 +251,8 @@ func (s *blockchainBalanceSynchronizationTriggerStreamSource) updateBalances( //
}
}
sql := fmt.Sprintf(`REPLACE INTO balances (amount,amount_w0,amount_w1,amount_w2,amount_w3,user_id) VALUES %v`, strings.Join(values, ","))
if _, err := storage.CheckSQLDMLResponse(s.db.PrepareExecute(sql, params)); err != nil {
return errors.Wrapf(err, "failed to replace into balances, params:%#v", params)
if _, err := storage.CheckSQLDMLResponse(s.db.Execute(sql)); err != nil {
return errors.Wrapf(err, "failed to replace into balances, values:%#v", values)
}
if len(blockchainMessages) != 0 { //nolint:revive,staticcheck // .
//nolint:godox // .
Expand Down
29 changes: 17 additions & 12 deletions tokenomics/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/pkg/errors"

"github.com/ice-blockchain/wintr/connectors/storage"
"github.com/ice-blockchain/wintr/time"
)

func (r *repository) initializeWorker(ctx context.Context, table, userID string, workerIndex uint64) (err error) {
Expand All @@ -32,26 +33,30 @@ func (r *repository) updateWorkerFields(
if ctx.Err() != nil || len(userIDs) == 0 {
return errors.Wrap(ctx.Err(), "context failed")
}
values := make([]string, 0, len(userIDs))
fields := make([]string, 0, len(updateKV))
params := make(map[string]any, len(userIDs)+len(updateKV))
for key, value := range updateKV {
if value == nil {
fields = append(fields, fmt.Sprintf("%[1]v = null", key))
} else {
params[key] = value
fields = append(fields, fmt.Sprintf("%[1]v = :%[1]v", key))
switch typedValue := value.(type) {
case *time.Time:
fields = append(fields, fmt.Sprintf("%[1]v = %[2]v", key, typedValue.UnixNano()))
case string:
fields = append(fields, fmt.Sprintf("%[1]v = '%[2]v'", key, typedValue))
default:
if typedValue == nil {
fields = append(fields, fmt.Sprintf("%[1]v = null", key))
} else {
fields = append(fields, fmt.Sprintf("%[1]v = %[2]v", key, typedValue))
}
}
}
for i := range userIDs {
params[fmt.Sprintf("user_id%v", i)] = userIDs[i]
values = append(values, fmt.Sprintf(":user_id%v", i))
values := make([]string, 0, len(userIDs))
for _, userID := range userIDs {
values = append(values, fmt.Sprintf("'%v'", userID))
}
sql := fmt.Sprintf(`UPDATE %[1]v%[2]v
SET %[3]v
WHERE user_id in (%[4]v)`, table, workerIndex, strings.Join(fields, ","), strings.Join(values, ","))
if _, uErr := storage.CheckSQLDMLResponse(r.db.PrepareExecute(sql, params)); uErr != nil {
return errors.Wrapf(uErr, "failed to UPDATE %v%v params :%#v, for userIDs:%#v", table, workerIndex, params, userIDs)
if _, uErr := storage.CheckSQLDMLResponse(r.db.Execute(sql)); uErr != nil {
return errors.Wrapf(uErr, "failed to UPDATE %v%v updateKV :%#v, for userIDs:%#v", table, workerIndex, updateKV, userIDs)
}

return nil
Expand Down

0 comments on commit 624f577

Please sign in to comment.