diff --git a/cmd/api/main.go b/cmd/api/main.go index ecdfc2ac..3c7fc0f9 100644 --- a/cmd/api/main.go +++ b/cmd/api/main.go @@ -42,11 +42,11 @@ func main() { if err != nil { log.Fatal("storage init", zap.Error(err)) } + // mempool receives a copy of any payload that goes through our API method /v2/blockchain/message mempool := sources.NewMemPool(log) - // mempoolChannel receives a copy of any payload that goes through our API method /v2/blockchain/message - mempoolChannel, emulationCh := mempool.Run(context.TODO()) + mempoolCh := mempool.Run(context.TODO()) - msgSender, err := blockchain.NewMsgSender(cfg.App.LiteServers, []chan []byte{mempoolChannel}) + msgSender, err := blockchain.NewMsgSender(cfg.App.LiteServers, []chan<- blockchain.ExtInMsgCopy{mempoolCh}) if err != nil { log.Fatal("failed to create msg sender", zap.Error(err)) } @@ -57,7 +57,7 @@ func main() { api.WithExecutor(storage), api.WithMessageSender(msgSender), api.WithSpamFilter(spamFilter), - api.WithEmulationChannel(emulationCh), + api.WithEmulationChannel(mempoolCh), api.WithTonConnectSecret(cfg.TonConnect.Secret), ) if err != nil { diff --git a/pkg/api/decode_message_test.go b/pkg/api/decode_message_test.go index b2d29e05..cda3a862 100644 --- a/pkg/api/decode_message_test.go +++ b/pkg/api/decode_message_test.go @@ -2,7 +2,6 @@ package api import ( "context" - "fmt" "testing" "github.com/stretchr/testify/require" @@ -44,7 +43,6 @@ func TestHandler_DecodeMessage(t *testing.T) { return } require.Nil(t, err) - fmt.Printf("response: %v\n", response) pkgTesting.CompareResults(t, response, tt.filenamePrefix) }) } diff --git a/pkg/api/event_handlers.go b/pkg/api/event_handlers.go index a358b374..e2a8077c 100644 --- a/pkg/api/event_handlers.go +++ b/pkg/api/event_handlers.go @@ -11,7 +11,7 @@ import ( "time" "github.com/shopspring/decimal" - "github.com/tonkeeper/opentonapi/pkg/pusher/sources" + "github.com/tonkeeper/opentonapi/pkg/blockchain" "github.com/tonkeeper/tongo" "github.com/tonkeeper/tongo/abi" "github.com/tonkeeper/tongo/boc" @@ -38,8 +38,16 @@ func (h *Handler) SendBlockchainMessage(ctx context.Context, request *oas.SendBl return toError(http.StatusBadRequest, fmt.Errorf("boc not found")) } if request.Boc.IsSet() { - payload, err := sendMessage(ctx, request.Boc.Value, h.msgSender) + payload, err := base64.StdEncoding.DecodeString(request.Boc.Value) if err != nil { + return toError(http.StatusBadRequest, fmt.Errorf("boc must be a base64 encoded string")) + } + msgCopy := blockchain.ExtInMsgCopy{ + MsgBoc: request.Boc.Value, + Payload: payload, + Details: h.ctxToDetails(ctx), + } + if err := h.msgSender.SendMessage(ctx, msgCopy); err != nil { sentry.Send("sending message", sentry.SentryInfoData{"payload": request.Boc}, sentry.LevelError) return toError(http.StatusInternalServerError, err) } @@ -49,11 +57,14 @@ func (h *Handler) SendBlockchainMessage(ctx context.Context, request *oas.SendBl sentry.Send("addToMempool", sentry.SentryInfoData{"payload": request.Boc}, sentry.LevelError) } }() - h.addToMempool(payload, nil) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + h.addToMempool(ctx, payload, nil) }() + return nil } var ( - batchOfBoc []string + copies []blockchain.ExtInMsgCopy shardAccount = map[tongo.AccountID]tlb.ShardAccount{} ) for _, msgBoc := range request.Batch { @@ -61,13 +72,18 @@ func (h *Handler) SendBlockchainMessage(ctx context.Context, request *oas.SendBl if err != nil { return toError(http.StatusBadRequest, err) } - shardAccount, err = h.addToMempool(payload, shardAccount) + shardAccount, err = h.addToMempool(ctx, payload, shardAccount) if err != nil { continue } - batchOfBoc = append(batchOfBoc, msgBoc) + msgCopy := blockchain.ExtInMsgCopy{ + MsgBoc: msgBoc, + Payload: payload, + Details: h.ctxToDetails(ctx), + } + copies = append(copies, msgCopy) } - h.msgSender.MsgsBocAddToMempool(batchOfBoc) + h.msgSender.SendMultipleMessages(ctx, copies) return nil } @@ -499,9 +515,7 @@ func (h *Handler) EmulateMessageToWallet(ctx context.Context, request *oas.Emula return &consequences, nil } -func (h *Handler) addToMempool(bytesBoc []byte, shardAccount map[tongo.AccountID]tlb.ShardAccount) (map[tongo.AccountID]tlb.ShardAccount, error) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) - defer cancel() +func (h *Handler) addToMempool(ctx context.Context, bytesBoc []byte, shardAccount map[tongo.AccountID]tlb.ShardAccount) (map[tongo.AccountID]tlb.ShardAccount, error) { if shardAccount == nil { shardAccount = map[tongo.AccountID]tlb.ShardAccount{} } @@ -554,7 +568,9 @@ func (h *Handler) addToMempool(bytesBoc []byte, shardAccount map[tongo.AccountID traces = slices.Insert(traces, 0, hex.EncodeToString(hash)) h.mempoolEmulate.accountsTraces.Set(account, traces, cache.WithExpiration(time.Second*time.Duration(ttl))) } - h.emulationCh <- sources.PayloadAndEmulationResults{ + h.emulationCh <- blockchain.ExtInMsgCopy{ + MsgBoc: base64.StdEncoding.EncodeToString(bytesBoc), + Details: h.ctxToDetails(ctx), Payload: bytesBoc, Accounts: accounts, } @@ -704,15 +720,3 @@ func emulatedTreeToTrace(ctx context.Context, executor executor, resolver core.L } return t, nil } - -func sendMessage(ctx context.Context, msgBoc string, msgSender messageSender) ([]byte, error) { - payload, err := base64.StdEncoding.DecodeString(msgBoc) - if err != nil { - return nil, err - } - err = msgSender.SendMessage(ctx, payload) - if err != nil { - return nil, err - } - return payload, nil -} diff --git a/pkg/api/handler.go b/pkg/api/handler.go index 1ba890d6..e4124870 100644 --- a/pkg/api/handler.go +++ b/pkg/api/handler.go @@ -6,9 +6,9 @@ import ( "sync" "github.com/go-faster/errors" + "github.com/tonkeeper/opentonapi/pkg/blockchain" "github.com/tonkeeper/opentonapi/pkg/chainstate" "github.com/tonkeeper/opentonapi/pkg/core" - "github.com/tonkeeper/opentonapi/pkg/pusher/sources" "github.com/tonkeeper/opentonapi/pkg/rates" "github.com/tonkeeper/opentonapi/pkg/spam" "github.com/tonkeeper/tongo" @@ -26,6 +26,9 @@ import ( // Compile-time check for Handler. var _ oas.Handler = (*Handler)(nil) +// ctxToDetails converts a request context to a details instance. +type ctxToDetails func(ctx context.Context) any + type Handler struct { logger *zap.Logger @@ -43,7 +46,9 @@ type Handler struct { // mempoolEmulate contains results of emulation of messages that are in the mempool. mempoolEmulate mempoolEmulate // emulationCh is used to send emulation results to mempool subscribers. - emulationCh chan sources.PayloadAndEmulationResults + emulationCh chan<- blockchain.ExtInMsgCopy + // ctxToDetails converts a request context to a details instance. + ctxToDetails ctxToDetails // mu protects "dns". mu sync.Mutex @@ -65,7 +70,8 @@ type Options struct { spamFilter spamFilter ratesSource ratesSource tonConnectSecret string - emulationCh chan sources.PayloadAndEmulationResults + emulationCh chan<- blockchain.ExtInMsgCopy + ctxToDetails ctxToDetails } type Option func(o *Options) @@ -88,7 +94,7 @@ func WithAddressBook(book addressBook) Option { } // WithEmulationChannel configures a channel that will be used to send emulation results to mempool subscribers. -func WithEmulationChannel(ch chan sources.PayloadAndEmulationResults) Option { +func WithEmulationChannel(ch chan<- blockchain.ExtInMsgCopy) Option { return func(o *Options) { o.emulationCh = ch } @@ -130,6 +136,12 @@ func WithTonConnectSecret(tonConnectSecret string) Option { } } +func WithContextToDetails(ctxToDetails ctxToDetails) Option { + return func(o *Options) { + o.ctxToDetails = ctxToDetails + } +} + func NewHandler(logger *zap.Logger, opts ...Option) (*Handler, error) { options := &Options{} for _, o := range opts { @@ -157,31 +169,38 @@ func NewHandler(logger *zap.Logger, opts ...Option) (*Handler, error) { return nil, fmt.Errorf("executor is not configured") } if options.emulationCh == nil { - options.emulationCh = make(chan sources.PayloadAndEmulationResults, 100) + emulationCh := make(chan blockchain.ExtInMsgCopy, 100) + options.emulationCh = emulationCh go func() { for { select { - case <-options.emulationCh: + case <-emulationCh: // drop it } } }() } + if options.ctxToDetails == nil { + options.ctxToDetails = func(ctx context.Context) any { + return nil + } + } tonConnect, err := tonconnect.NewTonConnect(options.executor, options.tonConnectSecret) if err != nil { return nil, fmt.Errorf("failed to init tonconnect") } return &Handler{ - logger: logger, - storage: options.storage, - state: options.chainState, - addressBook: options.addressBook, - msgSender: options.msgSender, - executor: options.executor, - limits: options.limits, - spamFilter: options.spamFilter, - emulationCh: options.emulationCh, - ratesSource: rates.InitCalculator(options.ratesSource), + logger: logger, + storage: options.storage, + state: options.chainState, + addressBook: options.addressBook, + msgSender: options.msgSender, + executor: options.executor, + limits: options.limits, + spamFilter: options.spamFilter, + emulationCh: options.emulationCh, + ctxToDetails: options.ctxToDetails, + ratesSource: rates.InitCalculator(options.ratesSource), metaCache: metadataCache{ collectionsCache: cache.NewLRUCache[tongo.AccountID, tep64.Metadata](10000, "nft_metadata_cache"), jettonsCache: cache.NewLRUCache[tongo.AccountID, tep64.Metadata](10000, "jetton_metadata_cache"), diff --git a/pkg/api/interfaces.go b/pkg/api/interfaces.go index 0fc5bf18..0e39d977 100644 --- a/pkg/api/interfaces.go +++ b/pkg/api/interfaces.go @@ -5,6 +5,7 @@ import ( "crypto/ed25519" "sync" + "github.com/tonkeeper/opentonapi/pkg/blockchain" rules "github.com/tonkeeper/scam_backoffice_rules" "github.com/tonkeeper/tongo" "github.com/tonkeeper/tongo/abi" @@ -124,8 +125,10 @@ type chainState interface { // messageSender provides a method to send a raw message to the blockchain. type messageSender interface { - SendMessage(ctx context.Context, payload []byte) error // SendMessage sends the given payload(a message) to the blockchain. - MsgsBocAddToMempool(bocMsgs []string) // MsgsBocAddToMempool sends a list of boc to the cache for later sending + // SendMessage sends the given message to the blockchain. + SendMessage(ctx context.Context, msgCopy blockchain.ExtInMsgCopy) error + // SendMultipleMessages sends a list of messages to the cache for later sending. + SendMultipleMessages(ctx context.Context, copies []blockchain.ExtInMsgCopy) } // executor runs any get methods diff --git a/pkg/blockchain/msg_sender.go b/pkg/blockchain/msg_sender.go index a5a8b9bc..b1e95cd2 100644 --- a/pkg/blockchain/msg_sender.go +++ b/pkg/blockchain/msg_sender.go @@ -2,26 +2,49 @@ package blockchain import ( "context" - "encoding/base64" "fmt" "sync" "time" + "github.com/tonkeeper/tongo" "github.com/tonkeeper/tongo/config" "github.com/tonkeeper/tongo/liteapi" ) +const ttl = 5 * 60 // in seconds + // MsgSender provides a method to send a message to the blockchain. type MsgSender struct { mu sync.Mutex client *liteapi.Client - // channels is used to send a copy of payload before sending it to the blockchain. - channels []chan []byte - // messages is used as a cache for boc multi-sending - messages map[string]int64 // base64, created unix time + // receivers get a copy of a message before sending it to the blockchain. + receivers []chan<- ExtInMsgCopy + // batches is used as a cache for boc multi-sending. + batches []batchOfMessages +} + +type batchOfMessages struct { + Copies []ExtInMsgCopy + RecvAt int64 +} + +// ExtInMsgCopy represents an external message we receive on /v2/blockchain/message endpoint. +type ExtInMsgCopy struct { + // MsgBoc is a base64 encoded message boc. + MsgBoc string + // Payload is a decoded message boc. + Payload []byte + // Details contains some optional details from a request context. + Details any + // Accounts is set when the message is emulated. + Accounts map[tongo.AccountID]struct{} +} + +func (m *ExtInMsgCopy) IsEmulation() bool { + return len(m.Accounts) > 0 } -func NewMsgSender(servers []config.LiteServer, channels []chan []byte) (*MsgSender, error) { +func NewMsgSender(servers []config.LiteServer, receivers []chan<- ExtInMsgCopy) (*MsgSender, error) { var ( client *liteapi.Client err error @@ -36,64 +59,83 @@ func NewMsgSender(servers []config.LiteServer, channels []chan []byte) (*MsgSend return nil, err } msgSender := &MsgSender{ - client: client, - channels: channels, - messages: map[string]int64{}, + client: client, + receivers: receivers, } go func() { for { - msgSender.sendMsgsFromMempool() + msgSender.dropExpiredBatches() + msgSender.sendBatches() time.Sleep(time.Second * 5) } }() return msgSender, nil } -func (ms *MsgSender) payloadsFromTheQueue() [][]byte { +func (ms *MsgSender) dropExpiredBatches() { now := time.Now().Unix() - ms.mu.Lock() defer ms.mu.Unlock() - - var msgs [][]byte - for boc, createdTime := range ms.messages { - payload, err := base64.StdEncoding.DecodeString(boc) - if err != nil || now-createdTime > 5*60 { // ttl is 5 min - delete(ms.messages, boc) + var batches []batchOfMessages + for _, batch := range ms.batches { + if now-batch.RecvAt > ttl { continue } - msgs = append(msgs, payload) + batches = append(batches, batch) } - return msgs + ms.batches = batches } -func (ms *MsgSender) sendMsgsFromMempool() { - payloads := ms.payloadsFromTheQueue() - for _, payload := range payloads { - if err := ms.SendMessage(context.Background(), payload); err != nil { - continue +func (ms *MsgSender) batchesReadyForSending() []batchOfMessages { + ms.mu.Lock() + defer ms.mu.Unlock() + return ms.batches +} + +func (ms *MsgSender) sendBatches() { + batches := ms.batchesReadyForSending() + for _, batch := range batches { + for _, msgCopy := range batch.Copies { + if err := ms.sendMessageFromBatch(msgCopy); err != nil { + // TODO: remove from the queue on success? log error? + continue + } } } } -// SendMessage sends the given payload(a message) to the blockchain. -func (ms *MsgSender) SendMessage(ctx context.Context, payload []byte) error { - if err := liteapi.VerifySendMessagePayload(payload); err != nil { +// SendMessage sends the given a message to the blockchain. +func (ms *MsgSender) SendMessage(ctx context.Context, msgCopy ExtInMsgCopy) error { + if err := liteapi.VerifySendMessagePayload(msgCopy.Payload); err != nil { return err } - for _, ch := range ms.channels { - ch <- payload + for _, ch := range ms.receivers { + ch <- msgCopy } - _, err := ms.client.SendMessage(ctx, payload) + _, err := ms.client.SendMessage(ctx, msgCopy.Payload) return err } -func (ms *MsgSender) MsgsBocAddToMempool(bocMsgs []string) { +func (ms *MsgSender) sendMessageFromBatch(msgCopy ExtInMsgCopy) error { + if err := liteapi.VerifySendMessagePayload(msgCopy.Payload); err != nil { + return err + } + for _, ch := range ms.receivers { + ch <- msgCopy + } + _, err := ms.client.SendMessage(context.TODO(), msgCopy.Payload) + return err +} + +func (ms *MsgSender) SendMultipleMessages(ctx context.Context, copies []ExtInMsgCopy) { + if len(copies) == 0 { + return + } now := time.Now().Unix() ms.mu.Lock() defer ms.mu.Unlock() - - for _, boc := range bocMsgs { - ms.messages[boc] = now - } + ms.batches = append(ms.batches, batchOfMessages{ + Copies: copies, + RecvAt: now, + }) } diff --git a/pkg/blockchain/msg_sender_test.go b/pkg/blockchain/msg_sender_test.go new file mode 100644 index 00000000..ff9aba04 --- /dev/null +++ b/pkg/blockchain/msg_sender_test.go @@ -0,0 +1,59 @@ +package blockchain + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestMsgSender_dropExpiredBatches(t *testing.T) { + tests := []struct { + name string + batches []batchOfMessages + wantBocs []string + }{ + { + name: "expire some batches", + batches: []batchOfMessages{ + { + Copies: []ExtInMsgCopy{ + {MsgBoc: "1"}, + {MsgBoc: "2"}, + }, + RecvAt: time.Now().Unix(), + }, + { + Copies: []ExtInMsgCopy{ + {MsgBoc: "3"}, + {MsgBoc: "4"}, + }, + RecvAt: time.Now().Add(-6 * time.Minute).Unix(), + }, + { + Copies: []ExtInMsgCopy{ + {MsgBoc: "5"}, + }, + RecvAt: time.Now().Add(-4 * time.Minute).Unix(), + }, + }, + wantBocs: []string{"1", "2", "5"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ms := &MsgSender{ + batches: tt.batches, + } + ms.dropExpiredBatches() + + var gotBocs []string + for _, b := range ms.batchesReadyForSending() { + for _, c := range b.Copies { + gotBocs = append(gotBocs, c.MsgBoc) + } + } + require.Equal(t, tt.wantBocs, gotBocs) + }) + } +} diff --git a/pkg/pusher/sources/mem_pool.go b/pkg/pusher/sources/mem_pool.go index 6d331744..824fe9d5 100644 --- a/pkg/pusher/sources/mem_pool.go +++ b/pkg/pusher/sources/mem_pool.go @@ -8,6 +8,8 @@ import ( "github.com/tonkeeper/tongo" "go.uber.org/zap" + + "github.com/tonkeeper/opentonapi/pkg/blockchain" ) // MemPool implements "MemPoolSource" interface @@ -80,30 +82,25 @@ func (m *MemPool) SubscribeToMessages(ctx context.Context, deliveryFn DeliveryFn } -// PayloadAndEmulationResults contains a message payload and a list of accounts that are involved in the corresponding trace. -type PayloadAndEmulationResults struct { - Payload []byte - Accounts map[tongo.AccountID]struct{} -} - // Run runs a goroutine with a fan-out event-loop that resends an incoming payload to all subscribers. -func (m *MemPool) Run(ctx context.Context) (chan []byte, chan PayloadAndEmulationResults) { +func (m *MemPool) Run(ctx context.Context) chan blockchain.ExtInMsgCopy { // TODO: replace with elastic channel - ch := make(chan []byte, 100) - emulationCh := make(chan PayloadAndEmulationResults, 100) + ch := make(chan blockchain.ExtInMsgCopy, 100) go func() { for { select { case <-ctx.Done(): return - case payload := <-ch: - m.sendPayloadToSubscribers(payload) - case payload := <-emulationCh: - m.sendPayloadToEmulationSubscribers(payload) + case msgCopy := <-ch: + if msgCopy.IsEmulation() { + m.sendPayloadToEmulationSubscribers(msgCopy) + continue + } + m.sendPayloadToSubscribers(msgCopy.Payload) } } }() - return ch, emulationCh + return ch } func (m *MemPool) sendPayloadToSubscribers(payload []byte) { @@ -124,12 +121,12 @@ func (m *MemPool) sendPayloadToSubscribers(payload []byte) { } } -func (m *MemPool) sendPayloadToEmulationSubscribers(payload PayloadAndEmulationResults) { +func (m *MemPool) sendPayloadToEmulationSubscribers(msgCopy blockchain.ExtInMsgCopy) { msg := EmulationMessageEventData{ - BOC: payload.Payload, - InvolvedAccounts: make([]tongo.AccountID, 0, len(payload.Accounts)), + BOC: msgCopy.Payload, + InvolvedAccounts: make([]tongo.AccountID, 0, len(msgCopy.Accounts)), } - for account := range payload.Accounts { + for account := range msgCopy.Accounts { msg.InvolvedAccounts = append(msg.InvolvedAccounts, account) } sort.Slice(msg.InvolvedAccounts, func(i, j int) bool { @@ -145,6 +142,6 @@ func (m *MemPool) sendPayloadToEmulationSubscribers(payload PayloadAndEmulationR m.mu.Lock() defer m.mu.Unlock() for _, fn := range m.emulationSubscribers { - fn(eventData, payload.Accounts) + fn(eventData, msgCopy.Accounts) } } diff --git a/pkg/pusher/sources/mem_pool_test.go b/pkg/pusher/sources/mem_pool_test.go index ebbdca2a..c93b44e1 100644 --- a/pkg/pusher/sources/mem_pool_test.go +++ b/pkg/pusher/sources/mem_pool_test.go @@ -2,6 +2,7 @@ package sources import ( "context" + "encoding/base64" "encoding/json" "fmt" "sort" @@ -13,6 +14,8 @@ import ( "github.com/tonkeeper/tongo/ton" "go.uber.org/zap" "golang.org/x/exp/maps" + + "github.com/tonkeeper/opentonapi/pkg/blockchain" ) var testAccount1 = ton.MustParseAccountID("0:0a95e1d4ebe7860d051f8b861730dbdee1440fd11180211914e0089146580351") @@ -24,14 +27,14 @@ func TestMemPool_Run(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - ch, emulationCh := mempool.Run(ctx) + ch := mempool.Run(ctx) const eventsNumber = 10 var wg sync.WaitGroup wg.Add(eventsNumber * 2) // subscribe to mempool events - eventDataCh := make(chan []byte, eventsNumber) + eventDataCh := make(chan []byte, eventsNumber*2) cancelFn, err := mempool.SubscribeToMessages(context.Background(), func(eventData []byte) { eventDataCh <- eventData wg.Done() @@ -54,13 +57,20 @@ func TestMemPool_Run(t *testing.T) { for i := 0; i < eventsNumber; i++ { payload := []byte(fmt.Sprintf("payload-%d", i)) - ch <- payload + ch <- blockchain.ExtInMsgCopy{ + MsgBoc: base64.StdEncoding.EncodeToString(payload), + Payload: payload, + } eventData, err := json.Marshal(MessageEventData{BOC: payload}) require.Nil(t, err) expected = append(expected, eventData) emPayload := []byte(fmt.Sprintf("emulation-payload-%d", i)) - emulationCh <- PayloadAndEmulationResults{Payload: emPayload, Accounts: map[tongo.AccountID]struct{}{testAccount1: {}}} + ch <- blockchain.ExtInMsgCopy{ + MsgBoc: base64.StdEncoding.EncodeToString(emPayload), + Payload: emPayload, + Accounts: map[tongo.AccountID]struct{}{testAccount1: {}}, + } eventData, err = json.Marshal(EmulationMessageEventData{BOC: emPayload, InvolvedAccounts: []tongo.AccountID{testAccount1}}) require.Nil(t, err) emulationExpected = append(emulationExpected, eventData)