Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add details from request context to mempool messages #262

Merged
merged 1 commit into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions cmd/api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ func main() {
if err != nil {
log.Fatal("storage init", zap.Error(err))
}
// mempool receives a copy of any payload that goes through our API method /v2/blockchain/message
mempool := sources.NewMemPool(log)
// mempoolChannel receives a copy of any payload that goes through our API method /v2/blockchain/message
mempoolChannel, emulationCh := mempool.Run(context.TODO())
mempoolCh := mempool.Run(context.TODO())

msgSender, err := blockchain.NewMsgSender(cfg.App.LiteServers, []chan []byte{mempoolChannel})
msgSender, err := blockchain.NewMsgSender(cfg.App.LiteServers, []chan<- blockchain.ExtInMsgCopy{mempoolCh})
if err != nil {
log.Fatal("failed to create msg sender", zap.Error(err))
}
Expand All @@ -57,7 +57,7 @@ func main() {
api.WithExecutor(storage),
api.WithMessageSender(msgSender),
api.WithSpamFilter(spamFilter),
api.WithEmulationChannel(emulationCh),
api.WithEmulationChannel(mempoolCh),
api.WithTonConnectSecret(cfg.TonConnect.Secret),
)
if err != nil {
Expand Down
2 changes: 0 additions & 2 deletions pkg/api/decode_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package api

import (
"context"
"fmt"
"testing"

"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -44,7 +43,6 @@ func TestHandler_DecodeMessage(t *testing.T) {
return
}
require.Nil(t, err)
fmt.Printf("response: %v\n", response)
pkgTesting.CompareResults(t, response, tt.filenamePrefix)
})
}
Expand Down
50 changes: 27 additions & 23 deletions pkg/api/event_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"time"

"github.com/shopspring/decimal"
"github.com/tonkeeper/opentonapi/pkg/pusher/sources"
"github.com/tonkeeper/opentonapi/pkg/blockchain"
"github.com/tonkeeper/tongo"
"github.com/tonkeeper/tongo/abi"
"github.com/tonkeeper/tongo/boc"
Expand All @@ -38,8 +38,16 @@ func (h *Handler) SendBlockchainMessage(ctx context.Context, request *oas.SendBl
return toError(http.StatusBadRequest, fmt.Errorf("boc not found"))
}
if request.Boc.IsSet() {
payload, err := sendMessage(ctx, request.Boc.Value, h.msgSender)
payload, err := base64.StdEncoding.DecodeString(request.Boc.Value)
if err != nil {
return toError(http.StatusBadRequest, fmt.Errorf("boc must be a base64 encoded string"))
}
msgCopy := blockchain.ExtInMsgCopy{
MsgBoc: request.Boc.Value,
Payload: payload,
Details: h.ctxToDetails(ctx),
}
if err := h.msgSender.SendMessage(ctx, msgCopy); err != nil {
sentry.Send("sending message", sentry.SentryInfoData{"payload": request.Boc}, sentry.LevelError)
return toError(http.StatusInternalServerError, err)
}
Expand All @@ -49,25 +57,33 @@ func (h *Handler) SendBlockchainMessage(ctx context.Context, request *oas.SendBl
sentry.Send("addToMempool", sentry.SentryInfoData{"payload": request.Boc}, sentry.LevelError)
}
}()
h.addToMempool(payload, nil)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
h.addToMempool(ctx, payload, nil)
}()
return nil
}
var (
batchOfBoc []string
copies []blockchain.ExtInMsgCopy
shardAccount = map[tongo.AccountID]tlb.ShardAccount{}
)
for _, msgBoc := range request.Batch {
payload, err := base64.StdEncoding.DecodeString(msgBoc)
if err != nil {
return toError(http.StatusBadRequest, err)
}
shardAccount, err = h.addToMempool(payload, shardAccount)
shardAccount, err = h.addToMempool(ctx, payload, shardAccount)
if err != nil {
continue
}
batchOfBoc = append(batchOfBoc, msgBoc)
msgCopy := blockchain.ExtInMsgCopy{
MsgBoc: msgBoc,
Payload: payload,
Details: h.ctxToDetails(ctx),
}
copies = append(copies, msgCopy)
}
h.msgSender.MsgsBocAddToMempool(batchOfBoc)
h.msgSender.SendMultipleMessages(ctx, copies)
return nil
}

Expand Down Expand Up @@ -499,9 +515,7 @@ func (h *Handler) EmulateMessageToWallet(ctx context.Context, request *oas.Emula
return &consequences, nil
}

func (h *Handler) addToMempool(bytesBoc []byte, shardAccount map[tongo.AccountID]tlb.ShardAccount) (map[tongo.AccountID]tlb.ShardAccount, error) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
func (h *Handler) addToMempool(ctx context.Context, bytesBoc []byte, shardAccount map[tongo.AccountID]tlb.ShardAccount) (map[tongo.AccountID]tlb.ShardAccount, error) {
if shardAccount == nil {
shardAccount = map[tongo.AccountID]tlb.ShardAccount{}
}
Expand Down Expand Up @@ -554,7 +568,9 @@ func (h *Handler) addToMempool(bytesBoc []byte, shardAccount map[tongo.AccountID
traces = slices.Insert(traces, 0, hex.EncodeToString(hash))
h.mempoolEmulate.accountsTraces.Set(account, traces, cache.WithExpiration(time.Second*time.Duration(ttl)))
}
h.emulationCh <- sources.PayloadAndEmulationResults{
h.emulationCh <- blockchain.ExtInMsgCopy{
MsgBoc: base64.StdEncoding.EncodeToString(bytesBoc),
Details: h.ctxToDetails(ctx),
Payload: bytesBoc,
Accounts: accounts,
}
Expand Down Expand Up @@ -704,15 +720,3 @@ func emulatedTreeToTrace(ctx context.Context, executor executor, resolver core.L
}
return t, nil
}

func sendMessage(ctx context.Context, msgBoc string, msgSender messageSender) ([]byte, error) {
payload, err := base64.StdEncoding.DecodeString(msgBoc)
if err != nil {
return nil, err
}
err = msgSender.SendMessage(ctx, payload)
if err != nil {
return nil, err
}
return payload, nil
}
51 changes: 35 additions & 16 deletions pkg/api/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
"sync"

"github.com/go-faster/errors"
"github.com/tonkeeper/opentonapi/pkg/blockchain"
"github.com/tonkeeper/opentonapi/pkg/chainstate"
"github.com/tonkeeper/opentonapi/pkg/core"
"github.com/tonkeeper/opentonapi/pkg/pusher/sources"
"github.com/tonkeeper/opentonapi/pkg/rates"
"github.com/tonkeeper/opentonapi/pkg/spam"
"github.com/tonkeeper/tongo"
Expand All @@ -26,6 +26,9 @@ import (
// Compile-time check for Handler.
var _ oas.Handler = (*Handler)(nil)

// ctxToDetails converts a request context to a details instance.
type ctxToDetails func(ctx context.Context) any

type Handler struct {
logger *zap.Logger

Expand All @@ -43,7 +46,9 @@ type Handler struct {
// mempoolEmulate contains results of emulation of messages that are in the mempool.
mempoolEmulate mempoolEmulate
// emulationCh is used to send emulation results to mempool subscribers.
emulationCh chan sources.PayloadAndEmulationResults
emulationCh chan<- blockchain.ExtInMsgCopy
// ctxToDetails converts a request context to a details instance.
ctxToDetails ctxToDetails

// mu protects "dns".
mu sync.Mutex
Expand All @@ -65,7 +70,8 @@ type Options struct {
spamFilter spamFilter
ratesSource ratesSource
tonConnectSecret string
emulationCh chan sources.PayloadAndEmulationResults
emulationCh chan<- blockchain.ExtInMsgCopy
ctxToDetails ctxToDetails
}

type Option func(o *Options)
Expand All @@ -88,7 +94,7 @@ func WithAddressBook(book addressBook) Option {
}

// WithEmulationChannel configures a channel that will be used to send emulation results to mempool subscribers.
func WithEmulationChannel(ch chan sources.PayloadAndEmulationResults) Option {
func WithEmulationChannel(ch chan<- blockchain.ExtInMsgCopy) Option {
return func(o *Options) {
o.emulationCh = ch
}
Expand Down Expand Up @@ -130,6 +136,12 @@ func WithTonConnectSecret(tonConnectSecret string) Option {
}
}

func WithContextToDetails(ctxToDetails ctxToDetails) Option {
return func(o *Options) {
o.ctxToDetails = ctxToDetails
}
}

func NewHandler(logger *zap.Logger, opts ...Option) (*Handler, error) {
options := &Options{}
for _, o := range opts {
Expand Down Expand Up @@ -157,31 +169,38 @@ func NewHandler(logger *zap.Logger, opts ...Option) (*Handler, error) {
return nil, fmt.Errorf("executor is not configured")
}
if options.emulationCh == nil {
options.emulationCh = make(chan sources.PayloadAndEmulationResults, 100)
emulationCh := make(chan blockchain.ExtInMsgCopy, 100)
options.emulationCh = emulationCh
go func() {
for {
select {
case <-options.emulationCh:
case <-emulationCh:
// drop it
}
}
}()
}
if options.ctxToDetails == nil {
options.ctxToDetails = func(ctx context.Context) any {
return nil
}
}
tonConnect, err := tonconnect.NewTonConnect(options.executor, options.tonConnectSecret)
if err != nil {
return nil, fmt.Errorf("failed to init tonconnect")
}
return &Handler{
logger: logger,
storage: options.storage,
state: options.chainState,
addressBook: options.addressBook,
msgSender: options.msgSender,
executor: options.executor,
limits: options.limits,
spamFilter: options.spamFilter,
emulationCh: options.emulationCh,
ratesSource: rates.InitCalculator(options.ratesSource),
logger: logger,
storage: options.storage,
state: options.chainState,
addressBook: options.addressBook,
msgSender: options.msgSender,
executor: options.executor,
limits: options.limits,
spamFilter: options.spamFilter,
emulationCh: options.emulationCh,
ctxToDetails: options.ctxToDetails,
ratesSource: rates.InitCalculator(options.ratesSource),
metaCache: metadataCache{
collectionsCache: cache.NewLRUCache[tongo.AccountID, tep64.Metadata](10000, "nft_metadata_cache"),
jettonsCache: cache.NewLRUCache[tongo.AccountID, tep64.Metadata](10000, "jetton_metadata_cache"),
Expand Down
7 changes: 5 additions & 2 deletions pkg/api/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/ed25519"
"sync"

"github.com/tonkeeper/opentonapi/pkg/blockchain"
rules "github.com/tonkeeper/scam_backoffice_rules"
"github.com/tonkeeper/tongo"
"github.com/tonkeeper/tongo/abi"
Expand Down Expand Up @@ -124,8 +125,10 @@ type chainState interface {

// messageSender provides a method to send a raw message to the blockchain.
type messageSender interface {
SendMessage(ctx context.Context, payload []byte) error // SendMessage sends the given payload(a message) to the blockchain.
MsgsBocAddToMempool(bocMsgs []string) // MsgsBocAddToMempool sends a list of boc to the cache for later sending
// SendMessage sends the given message to the blockchain.
SendMessage(ctx context.Context, msgCopy blockchain.ExtInMsgCopy) error
// SendMultipleMessages sends a list of messages to the cache for later sending.
SendMultipleMessages(ctx context.Context, copies []blockchain.ExtInMsgCopy)
}

// executor runs any get methods
Expand Down
Loading