Skip to content

Commit

Permalink
Limit mempool batch size to 5 msgs at once
Browse files Browse the repository at this point in the history
  • Loading branch information
aleksej-paschenko committed Dec 5, 2023
1 parent 67ea9d1 commit 68cc1e3
Showing 1 changed file with 25 additions and 0 deletions.
25 changes: 25 additions & 0 deletions pkg/api/event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"net/http"
"time"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/shopspring/decimal"
"github.com/tonkeeper/opentonapi/pkg/blockchain"
"github.com/tonkeeper/tongo"
Expand All @@ -30,6 +32,20 @@ import (
"github.com/tonkeeper/opentonapi/pkg/wallet"
)

const maxBatchSize = 5

var (
mempoolBatchSize = promauto.NewHistogram(prometheus.HistogramOpts{
Name: "mempool_messages_batch_size",
Help: "Sizes of mempool batches",
Buckets: []float64{2, 3, 4, 5, 6, 7, 8, 9, 10},
})
mempoolMessageCounter = promauto.NewCounter(prometheus.CounterOpts{
Name: "mempool_messages_counter",
Help: "The total number of mempool messages",
})
)

func (h *Handler) SendBlockchainMessage(ctx context.Context, request *oas.SendBlockchainMessageReq) error {
if h.msgSender == nil {
return toError(http.StatusBadRequest, fmt.Errorf("msg sender is not configured"))
Expand All @@ -38,6 +54,7 @@ func (h *Handler) SendBlockchainMessage(ctx context.Context, request *oas.SendBl
return toError(http.StatusBadRequest, fmt.Errorf("boc not found"))
}
if request.Boc.IsSet() {

payload, err := base64.StdEncoding.DecodeString(request.Boc.Value)
if err != nil {
return toError(http.StatusBadRequest, fmt.Errorf("boc must be a base64 encoded string"))
Expand All @@ -47,6 +64,7 @@ func (h *Handler) SendBlockchainMessage(ctx context.Context, request *oas.SendBl
Payload: payload,
Details: h.ctxToDetails(ctx),
}
mempoolMessageCounter.Inc()
if err := h.msgSender.SendMessage(ctx, msgCopy); err != nil {
sentry.Send("sending message", sentry.SentryInfoData{"payload": request.Boc}, sentry.LevelError)
return toError(http.StatusInternalServerError, err)
Expand All @@ -67,6 +85,9 @@ func (h *Handler) SendBlockchainMessage(ctx context.Context, request *oas.SendBl
copies []blockchain.ExtInMsgCopy
shardAccount = map[tongo.AccountID]tlb.ShardAccount{}
)
if len(request.Batch) > maxBatchSize {
return toError(http.StatusBadRequest, fmt.Errorf("batch size must be less than %v", maxBatchSize))
}
for _, msgBoc := range request.Batch {
payload, err := base64.StdEncoding.DecodeString(msgBoc)
if err != nil {
Expand All @@ -83,6 +104,10 @@ func (h *Handler) SendBlockchainMessage(ctx context.Context, request *oas.SendBl
}
copies = append(copies, msgCopy)
}

mempoolMessageCounter.Add(float64(len(copies)))
mempoolBatchSize.Observe(float64(len(copies)))

h.msgSender.SendMultipleMessages(ctx, copies)
return nil
}
Expand Down

0 comments on commit 68cc1e3

Please sign in to comment.