diff --git a/app/app.go b/app/app.go index 9183607f..d93e7cb7 100644 --- a/app/app.go +++ b/app/app.go @@ -20,7 +20,6 @@ import ( "github.com/ChainSafe/chainbridge-core/logger" "github.com/ChainSafe/chainbridge-core/lvldb" "github.com/ChainSafe/chainbridge-core/opentelemetry" - "github.com/ChainSafe/chainbridge-core/relayer/message" "github.com/ChainSafe/chainbridge-core/store" "github.com/ChainSafe/sygma-relayer/chains/evm" "github.com/ChainSafe/sygma-relayer/chains/evm/calls/contracts/bridge" @@ -31,8 +30,6 @@ import ( "github.com/ChainSafe/sygma-relayer/chains/substrate" coreSubstrate "github.com/sygmaprotocol/sygma-core/chains/substrate" - "github.com/ChainSafe/sygma-relayer/chains/substrate/client" - "github.com/ChainSafe/sygma-relayer/chains/substrate/connection" substrateExecutor "github.com/ChainSafe/sygma-relayer/chains/substrate/executor" substrate_listener "github.com/ChainSafe/sygma-relayer/chains/substrate/listener" substrate_pallet "github.com/ChainSafe/sygma-relayer/chains/substrate/pallet" @@ -43,6 +40,9 @@ import ( "github.com/sygmaprotocol/sygma-core/chains/evm/listener" "github.com/sygmaprotocol/sygma-core/chains/evm/transactor/monitored" "github.com/sygmaprotocol/sygma-core/chains/evm/transactor/transaction" + "github.com/sygmaprotocol/sygma-core/chains/substrate/client" + "github.com/sygmaprotocol/sygma-core/chains/substrate/connection" + core_substrate_listener "github.com/sygmaprotocol/sygma-core/chains/substrate/listener" "github.com/ChainSafe/sygma-relayer/comm/elector" "github.com/ChainSafe/sygma-relayer/comm/p2p" @@ -233,6 +233,7 @@ func Run() error { if err != nil { panic(err) } + substrateClient := client.NewSubstrateClient(conn, &keyPair, config.ChainID, config.Tip) bridgePallet := substrate_pallet.NewPallet(substrateClient) @@ -240,14 +241,15 @@ func Run() error { l := log.With().Str("chain", fmt.Sprintf("%v", config.GeneralChainConfig.Name)).Uint8("domainID", *config.GeneralChainConfig.Id) depositHandler := substrate_listener.NewSubstrateDepositHandler() - depositHandler.RegisterDepositHandler(message.FungibleTransfer, substrate_listener.FungibleTransferHandler) - eventHandlers := make([]substrate_listener.EventHandler, 0) - eventHandlers = append(eventHandlers, substrate_listener.NewFungibleTransferEventHandler(l, *config.GeneralChainConfig.Id, depositHandler)) - eventHandlers = append(eventHandlers, substrate_listener.NewRetryEventHandler(l, conn, depositHandler, *config.GeneralChainConfig.Id)) - substrateListener := substrate_listener.NewSubstrateListener(conn, eventHandlers, config) + depositHandler.RegisterDepositHandler(substrate.FungibleTransfer, substrate_listener.FungibleTransferHandler) + eventHandlers := make([]core_substrate_listener.EventHandler, 0) + eventHandlers = append(eventHandlers, substrate_listener.NewFungibleTransferEventHandler(l, *config.GeneralChainConfig.Id, depositHandler, make(chan []*coreMessage.Message, 1), conn)) + eventHandlers = append(eventHandlers, substrate_listener.NewRetryEventHandler(l, conn, depositHandler, *config.GeneralChainConfig.Id, make(chan []*coreMessage.Message, 1))) + + substrateListener := core_substrate_listener.NewSubstrateListener(conn, eventHandlers, blockstore, sygmaMetrics, *config.GeneralChainConfig.Id, config.BlockRetryInterval, config.BlockInterval) mh := coreMessage.NewMessageHandler() - mh.RegisterMessageHandler(substrateExecutor.FungibleTransfer, &substrateExecutor.SubstrateMessageHandler{}) + mh.RegisterMessageHandler(substrate.FungibleTransfer, &substrateExecutor.SubstrateMessageHandler{}) sExecutor := substrateExecutor.NewExecutor(host, communication, coordinator, bridgePallet, keyshareStore, conn, exitLock) substrateChain := coreSubstrate.NewSubstrateChain(substrateListener, mh, sExecutor, *config.GeneralChainConfig.Id, config.StartBlock) @@ -281,11 +283,10 @@ func Run() error { log.Info().Msg("Relayer not part of MPC. Waiting for refresh event...") } - select { - case sig := <-sysErr: - log.Info().Msgf("terminating got ` [%v] signal", sig) - return nil - } + sig := <-sysErr + log.Info().Msgf("terminating got ` [%v] signal", sig) + return nil + } func panicOnError(err error) { diff --git a/chains/evm/listener/eventHandlers/event-handler_test.go b/chains/evm/listener/eventHandlers/event-handler_test.go index 21cfe962..630f0751 100644 --- a/chains/evm/listener/eventHandlers/event-handler_test.go +++ b/chains/evm/listener/eventHandlers/event-handler_test.go @@ -40,7 +40,7 @@ func (s *RetryEventHandlerTestSuite) SetupTest() { s.domainID = 1 s.mockEventListener = mock_listener.NewMockEventListener(ctrl) s.mockDepositHandler = mock_listener.NewMockDepositHandler(ctrl) - s.msgChan = make(chan []*message.Message, 2) + s.msgChan = make(chan []*message.Message, 1) s.retryEventHandler = eventHandlers.NewRetryEventHandler(log.With(), s.mockEventListener, s.mockDepositHandler, common.Address{}, s.domainID, big.NewInt(5), s.msgChan) } diff --git a/chains/substrate/client/client.go b/chains/substrate/client/client.go deleted file mode 100644 index 4bf4cf4a..00000000 --- a/chains/substrate/client/client.go +++ /dev/null @@ -1,213 +0,0 @@ -// The Licensed Work is (c) 2022 Sygma -// SPDX-License-Identifier: LGPL-3.0-only - -package client - -import ( - "bytes" - "context" - "fmt" - "math/big" - "sync" - "time" - - "github.com/ChainSafe/sygma-relayer/chains/substrate/connection" - "github.com/ChainSafe/sygma-relayer/chains/substrate/events" - "github.com/centrifuge/go-substrate-rpc-client/v4/rpc/author" - "github.com/centrifuge/go-substrate-rpc-client/v4/scale" - "github.com/centrifuge/go-substrate-rpc-client/v4/signature" - "github.com/centrifuge/go-substrate-rpc-client/v4/types" - "github.com/rs/zerolog/log" -) - -type SubstrateClient struct { - key *signature.KeyringPair // Keyring used for signing - nonceLock sync.Mutex // Locks nonce for updates - nonce types.U32 // Latest account nonce - tip uint64 - Conn *connection.Connection - ChainID *big.Int -} - -func NewSubstrateClient(conn *connection.Connection, key *signature.KeyringPair, chainID *big.Int, tip uint64) *SubstrateClient { - return &SubstrateClient{ - key: key, - Conn: conn, - ChainID: chainID, - tip: tip, - } -} - -// Transact constructs and submits an extrinsic to call the method with the given arguments. -// All args are passed directly into GSRPC. GSRPC types are recommended to avoid serialization inconsistencies. -func (c *SubstrateClient) Transact(method string, args ...interface{}) (types.Hash, *author.ExtrinsicStatusSubscription, error) { - log.Debug().Msgf("Submitting substrate call... method %s, sender %s", method, c.key.Address) - - // Create call and extrinsic - meta := c.Conn.GetMetadata() - call, err := types.NewCall( - &meta, - method, - args..., - ) - if err != nil { - return types.Hash{}, nil, fmt.Errorf("failed to construct call: %w", err) - } - - ext := types.NewExtrinsic(call) - // Get latest runtime version - rv, err := c.Conn.RPC.State.GetRuntimeVersionLatest() - if err != nil { - return types.Hash{}, nil, err - } - - c.nonceLock.Lock() - defer c.nonceLock.Unlock() - - nonce, err := c.nextNonce(&meta) - if err != nil { - return types.Hash{}, nil, err - } - - // Sign the extrinsic - o := types.SignatureOptions{ - BlockHash: c.Conn.GenesisHash, - Era: types.ExtrinsicEra{IsMortalEra: false}, - GenesisHash: c.Conn.GenesisHash, - Nonce: types.NewUCompactFromUInt(uint64(nonce)), - SpecVersion: rv.SpecVersion, - Tip: types.NewUCompactFromUInt(c.tip), - TransactionVersion: rv.TransactionVersion, - } - sub, err := c.submitAndWatchExtrinsic(o, &ext) - if err != nil { - return types.Hash{}, nil, fmt.Errorf("submission of extrinsic failed: %w", err) - } - - hash, err := ExtrinsicHash(ext) - if err != nil { - return types.Hash{}, nil, err - } - - log.Info().Str("extrinsic", hash.Hex()).Msgf("Extrinsic call submitted... method %s, sender %s, nonce %d", method, c.key.Address, nonce) - c.nonce = nonce + 1 - - return hash, sub, nil -} - -func (c *SubstrateClient) TrackExtrinsic(extHash types.Hash, sub *author.ExtrinsicStatusSubscription) error { - ctx, cancel := context.WithTimeout(context.Background(), time.Duration(time.Minute*10)) - defer sub.Unsubscribe() - defer cancel() - subChan := sub.Chan() - for { - select { - case status := <-subChan: - { - if status.IsInBlock { - log.Debug().Str("extrinsic", extHash.Hex()).Msgf("Extrinsic in block with hash: %#x", status.AsInBlock) - } - if status.IsFinalized { - log.Info().Str("extrinsic", extHash.Hex()).Msgf("Extrinsic is finalized in block with hash: %#x", status.AsFinalized) - return c.checkExtrinsicSuccess(extHash, status.AsFinalized) - } - } - case <-ctx.Done(): - return fmt.Errorf("extrinsic has timed out") - } - } -} - -func (c *SubstrateClient) nextNonce(meta *types.Metadata) (types.U32, error) { - key, err := types.CreateStorageKey(meta, "System", "Account", c.key.PublicKey, nil) - if err != nil { - return 0, err - } - - var latestNonce types.U32 - var acct types.AccountInfo - exists, err := c.Conn.RPC.State.GetStorageLatest(key, &acct) - if err != nil { - return 0, err - } - - if !exists { - latestNonce = 0 - } else { - latestNonce = acct.Nonce - } - - if latestNonce < c.nonce { - return c.nonce, nil - } - - return latestNonce, nil -} - -func (c *SubstrateClient) submitAndWatchExtrinsic(opts types.SignatureOptions, ext *types.Extrinsic) (*author.ExtrinsicStatusSubscription, error) { - err := ext.Sign(*c.key, opts) - if err != nil { - return nil, err - } - - sub, err := c.Conn.RPC.Author.SubmitAndWatchExtrinsic(*ext) - if err != nil { - return nil, err - } - - return sub, nil -} - -func (c *SubstrateClient) checkExtrinsicSuccess(extHash types.Hash, blockHash types.Hash) error { - block, err := c.Conn.Chain.GetBlock(blockHash) - if err != nil { - return err - } - - evts, err := c.Conn.GetBlockEvents(blockHash) - if err != nil { - return err - } - - for _, event := range evts { - index := event.Phase.AsApplyExtrinsic - hash, err := ExtrinsicHash(block.Block.Extrinsics[index]) - if err != nil { - return err - } - - if extHash != hash { - continue - } - - if event.Name == events.ExtrinsicFailedEvent { - return fmt.Errorf("extrinsic failed") - } - if event.Name == events.FailedHandlerExecutionEvent { - return fmt.Errorf("extrinsic failed with failed handler execution") - } - if event.Name == events.ExtrinsicSuccessEvent { - return nil - } - } - - return fmt.Errorf("no event found") -} - -func (c *SubstrateClient) LatestBlock() (*big.Int, error) { - block, err := c.Conn.Chain.GetBlockLatest() - if err != nil { - return nil, err - } - return big.NewInt(int64(block.Block.Header.Number)), nil -} - -func ExtrinsicHash(ext types.Extrinsic) (types.Hash, error) { - extHash := bytes.NewBuffer([]byte{}) - encoder := scale.NewEncoder(extHash) - err := ext.Encode(*encoder) - if err != nil { - return types.Hash{}, err - } - return types.NewHash(extHash.Bytes()), nil -} diff --git a/chains/substrate/config.go b/chains/substrate/config.go index 286191a1..96d10e1c 100644 --- a/chains/substrate/config.go +++ b/chains/substrate/config.go @@ -11,10 +11,15 @@ import ( "github.com/centrifuge/go-substrate-rpc-client/v4/signature" "github.com/creasty/defaults" "github.com/mitchellh/mapstructure" + "github.com/sygmaprotocol/sygma-core/relayer/message" "github.com/ChainSafe/chainbridge-core/config/chain" ) +const ( + FungibleTransfer message.MessageType = "FungibleTransfer" +) + type RawSubstrateConfig struct { chain.GeneralChainConfig `mapstructure:",squash"` ChainID int64 `mapstructure:"chainID"` diff --git a/chains/substrate/connection/connection.go b/chains/substrate/connection/connection.go deleted file mode 100644 index 74e478d6..00000000 --- a/chains/substrate/connection/connection.go +++ /dev/null @@ -1,88 +0,0 @@ -// The Licensed Work is (c) 2022 Sygma -// SPDX-License-Identifier: LGPL-3.0-only - -package connection - -import ( - "sync" - - "github.com/centrifuge/go-substrate-rpc-client/v4/client" - "github.com/centrifuge/go-substrate-rpc-client/v4/registry/parser" - "github.com/centrifuge/go-substrate-rpc-client/v4/registry/retriever" - "github.com/centrifuge/go-substrate-rpc-client/v4/registry/state" - - "github.com/centrifuge/go-substrate-rpc-client/v4/rpc" - "github.com/centrifuge/go-substrate-rpc-client/v4/rpc/chain" - "github.com/centrifuge/go-substrate-rpc-client/v4/types" -) - -type Connection struct { - chain.Chain - client.Client - *rpc.RPC - meta types.Metadata // Latest chain metadata - metaLock sync.RWMutex // Lock metadata for updates, allows concurrent reads - GenesisHash types.Hash // Chain genesis hash -} - -func NewSubstrateConnection(url string) (*Connection, error) { - client, err := client.Connect(url) - if err != nil { - return nil, err - } - rpc, err := rpc.NewRPC(client) - if err != nil { - return nil, err - } - - meta, err := rpc.State.GetMetadataLatest() - if err != nil { - return nil, err - } - genesisHash, err := rpc.Chain.GetBlockHash(0) - if err != nil { - return nil, err - } - - return &Connection{ - meta: *meta, - - RPC: rpc, - Chain: rpc.Chain, - Client: client, - GenesisHash: genesisHash, - }, nil -} - -func (c *Connection) GetMetadata() (meta types.Metadata) { - c.metaLock.RLock() - meta = c.meta - c.metaLock.RUnlock() - return meta -} - -func (c *Connection) UpdateMetatdata() error { - c.metaLock.Lock() - meta, err := c.RPC.State.GetMetadataLatest() - if err != nil { - c.metaLock.Unlock() - return err - } - c.meta = *meta - c.metaLock.Unlock() - return nil -} - -func (c *Connection) GetBlockEvents(hash types.Hash) ([]*parser.Event, error) { - provider := state.NewEventProvider(c.State) - eventRetriever, err := retriever.NewDefaultEventRetriever(provider, c.State) - if err != nil { - return nil, err - } - - evts, err := eventRetriever.GetEvents(hash) - if err != nil { - return nil, err - } - return evts, nil -} diff --git a/chains/substrate/executor/executor.go b/chains/substrate/executor/executor.go index 925c7650..94e9c321 100644 --- a/chains/substrate/executor/executor.go +++ b/chains/substrate/executor/executor.go @@ -11,10 +11,9 @@ import ( "time" "github.com/ChainSafe/sygma-relayer/chains" - "github.com/ChainSafe/sygma-relayer/chains/substrate/connection" "github.com/binance-chain/tss-lib/common" "github.com/sourcegraph/conc/pool" - "github.com/sygmaprotocol/sygma-core/relayer/message" + "github.com/sygmaprotocol/sygma-core/chains/substrate/connection" "github.com/sygmaprotocol/sygma-core/relayer/proposal" "github.com/centrifuge/go-substrate-rpc-client/v4/rpc/author" @@ -29,10 +28,6 @@ import ( "github.com/ChainSafe/sygma-relayer/tss/signing" ) -const ( - FungibleTransfer message.MessageType = "FungibleTransfer" -) - var ( executionCheckPeriod = time.Minute signingTimeout = 30 * time.Minute diff --git a/chains/substrate/executor/message-handler.go b/chains/substrate/executor/message-handler.go index d8b3f401..5c25a10f 100644 --- a/chains/substrate/executor/message-handler.go +++ b/chains/substrate/executor/message-handler.go @@ -8,6 +8,7 @@ import ( "math/big" "github.com/ChainSafe/sygma-relayer/chains" + "github.com/ChainSafe/sygma-relayer/chains/substrate" "github.com/ethereum/go-ethereum/common" "github.com/sygmaprotocol/sygma-core/relayer/message" "github.com/sygmaprotocol/sygma-core/relayer/proposal" @@ -23,7 +24,7 @@ func (mh *SubstrateMessageHandler) HandleMessage(m *message.Message) (*proposal. Type: m.Type, } switch transferMessage.Type { - case FungibleTransfer: + case substrate.FungibleTransfer: return fungibleTransferMessageHandler(transferMessage) } return nil, errors.New("wrong message type passed while handling message") diff --git a/chains/substrate/executor/message-handler_test.go b/chains/substrate/executor/message-handler_test.go index f84b2f45..0b788894 100644 --- a/chains/substrate/executor/message-handler_test.go +++ b/chains/substrate/executor/message-handler_test.go @@ -16,6 +16,8 @@ import ( "github.com/sygmaprotocol/sygma-core/relayer/message" "github.com/sygmaprotocol/sygma-core/relayer/proposal" + substrate_chain "github.com/ChainSafe/sygma-relayer/chains/substrate" + "github.com/stretchr/testify/suite" ) @@ -47,7 +49,7 @@ func (s *FungibleTransferHandlerTestSuite) TestFungibleTransferHandleMessage() { }, }, - Type: executor.FungibleTransfer, + Type: substrate_chain.FungibleTransfer, } data, _ := hex.DecodeString("0000000000000000000000000000000000000000000000000000000000000002000000000000000000000000000000000000000000000000000000000000002400010100d43593c715fdd31c61141abd04a99fd6822c8558854ccde39a5684e7a56da27d") expectedProp := &proposal.Proposal{ @@ -80,7 +82,7 @@ func (s *FungibleTransferHandlerTestSuite) TestFungibleTransferHandleMessageInco }, }, - Type: executor.FungibleTransfer, + Type: substrate_chain.FungibleTransfer, } mh := executor.SubstrateMessageHandler{} @@ -104,7 +106,7 @@ func (s *FungibleTransferHandlerTestSuite) TestFungibleTransferHandleMessageInco }, }, - Type: executor.FungibleTransfer, + Type: substrate_chain.FungibleTransfer, } mh := executor.SubstrateMessageHandler{} @@ -128,7 +130,7 @@ func (s *FungibleTransferHandlerTestSuite) TestFungibleTransferHandleMessageInco }, }, - Type: executor.FungibleTransfer, + Type: substrate_chain.FungibleTransfer, } mh := executor.SubstrateMessageHandler{} @@ -155,7 +157,7 @@ func (s *FungibleTransferHandlerTestSuite) TestSuccesfullyRegisterFungibleTransf }, }, - Type: executor.FungibleTransfer, + Type: substrate_chain.FungibleTransfer, } invalidMessageData := &message.Message{ @@ -175,7 +177,7 @@ func (s *FungibleTransferHandlerTestSuite) TestSuccesfullyRegisterFungibleTransf depositMessageHandler := message.NewMessageHandler() // Register FungibleTransferMessageHandler function - depositMessageHandler.RegisterMessageHandler(executor.FungibleTransfer, &executor.SubstrateMessageHandler{}) + depositMessageHandler.RegisterMessageHandler(substrate_chain.FungibleTransfer, &executor.SubstrateMessageHandler{}) prop1, err1 := depositMessageHandler.HandleMessage(messageData) s.Nil(err1) s.NotNil(prop1) diff --git a/chains/substrate/listener/deposit-handler.go b/chains/substrate/listener/deposit-handler.go index 9206d62c..1f36c2c4 100644 --- a/chains/substrate/listener/deposit-handler.go +++ b/chains/substrate/listener/deposit-handler.go @@ -6,14 +6,14 @@ package listener import ( "errors" - "github.com/rs/zerolog/log" - - "github.com/ChainSafe/chainbridge-core/relayer/message" - core_types "github.com/ChainSafe/chainbridge-core/types" + "github.com/ChainSafe/sygma-relayer/chains" + "github.com/ChainSafe/sygma-relayer/chains/substrate" "github.com/centrifuge/go-substrate-rpc-client/v4/types" + "github.com/rs/zerolog/log" + "github.com/sygmaprotocol/sygma-core/relayer/message" ) -type DepositHandlers map[message.TransferType]DepositHandlerFunc +type DepositHandlers map[message.MessageType]DepositHandlerFunc type DepositHandlerFunc func(sourceID uint8, destId types.U8, nonce types.U64, resourceID types.Bytes32, calldata []byte) (*message.Message, error) type SubstrateDepositHandler struct { @@ -28,14 +28,14 @@ const ( // handler functions for processing deposit events func NewSubstrateDepositHandler() *SubstrateDepositHandler { return &SubstrateDepositHandler{ - depositHandlers: make(map[message.TransferType]DepositHandlerFunc), + depositHandlers: make(map[message.MessageType]DepositHandlerFunc), } } func (e *SubstrateDepositHandler) HandleDeposit(sourceID uint8, destID types.U8, depositNonce types.U64, resourceID types.Bytes32, calldata []byte, transferType types.U8) (*message.Message, error) { - var depositType message.TransferType + var depositType message.MessageType if transferType == FungibleTransfer { - depositType = message.FungibleTransfer + depositType = substrate.FungibleTransfer } else { return nil, errors.New("no corresponding deposit handler for this transfer type exists") } @@ -49,7 +49,7 @@ func (e *SubstrateDepositHandler) HandleDeposit(sourceID uint8, destID types.U8, } // matchAddressWithHandlerFunc matches a transfer type with an associated handler function -func (e *SubstrateDepositHandler) matchTransferTypeHandlerFunc(transferType message.TransferType) (DepositHandlerFunc, error) { +func (e *SubstrateDepositHandler) matchTransferTypeHandlerFunc(transferType message.MessageType) (DepositHandlerFunc, error) { hf, ok := e.depositHandlers[transferType] if !ok { return nil, errors.New("no corresponding deposit handler for this transfer type exists") @@ -58,7 +58,7 @@ func (e *SubstrateDepositHandler) matchTransferTypeHandlerFunc(transferType mess } // RegisterDepositHandler registers an event handler by associating a handler function to a transfer type -func (e *SubstrateDepositHandler) RegisterDepositHandler(transferType message.TransferType, handler DepositHandlerFunc) { +func (e *SubstrateDepositHandler) RegisterDepositHandler(transferType message.MessageType, handler DepositHandlerFunc) { if transferType == "" { return } @@ -67,7 +67,7 @@ func (e *SubstrateDepositHandler) RegisterDepositHandler(transferType message.Tr e.depositHandlers[transferType] = handler } -//FungibleTransferHandler converts data pulled from event logs into message +// FungibleTransferHandler converts data pulled from event logs into message // handlerResponse can be an empty slice func FungibleTransferHandler(sourceID uint8, destId types.U8, nonce types.U64, resourceID types.Bytes32, calldata []byte) (*message.Message, error) { if len(calldata) < 84 { @@ -90,7 +90,9 @@ func FungibleTransferHandler(sourceID uint8, destId types.U8, nonce types.U64, r recipientAddress, } - metadata := message.Metadata{} - - return message.NewMessage(uint8(sourceID), uint8(destId), uint64(nonce), core_types.ResourceID(resourceID), message.FungibleTransfer, payload, metadata), nil + return chains.NewMessage(sourceID, uint8(destId), chains.TransferMessageData{ + DepositNonce: uint64(nonce), + ResourceId: resourceID, + Payload: payload, + }, substrate.FungibleTransfer), nil } diff --git a/chains/substrate/listener/deposit-handler_test.go b/chains/substrate/listener/deposit-handler_test.go index d99a76ce..649cce27 100644 --- a/chains/substrate/listener/deposit-handler_test.go +++ b/chains/substrate/listener/deposit-handler_test.go @@ -7,13 +7,15 @@ import ( "errors" "unsafe" - "github.com/ChainSafe/chainbridge-core/relayer/message" - core_types "github.com/ChainSafe/chainbridge-core/types" + "github.com/sygmaprotocol/sygma-core/relayer/message" + "github.com/centrifuge/go-substrate-rpc-client/v4/types" "math/big" "testing" + "github.com/ChainSafe/sygma-relayer/chains" + substrate_chain "github.com/ChainSafe/sygma-relayer/chains/substrate" "github.com/ChainSafe/sygma-relayer/chains/substrate/events" "github.com/ChainSafe/sygma-relayer/chains/substrate/listener" "github.com/ChainSafe/sygma-relayer/e2e/substrate" @@ -56,15 +58,17 @@ func (s *Erc20HandlerTestSuite) TestErc20HandleEvent() { recipientAddressParsed := calldata[64:] expected := &message.Message{ - Source: sourceID, - Destination: uint8(depositLog.DestDomainID), - DepositNonce: uint64(depositLog.DepositNonce), - ResourceId: core_types.ResourceID(depositLog.ResourceID), - Type: message.FungibleTransfer, - Payload: []interface{}{ - amountParsed, - recipientAddressParsed, + Source: sourceID, + Destination: uint8(depositLog.DestDomainID), + Data: chains.TransferMessageData{ + DepositNonce: uint64(depositLog.DepositNonce), + ResourceId: depositLog.ResourceID, + Payload: []interface{}{ + amountParsed, + recipientAddressParsed, + }, }, + Type: substrate_chain.FungibleTransfer, } message, err := listener.FungibleTransferHandler(sourceID, depositLog.DestDomainID, depositLog.DepositNonce, depositLog.ResourceID, depositLog.CallData) @@ -115,7 +119,7 @@ func (s *Erc20HandlerTestSuite) TestSuccesfullyRegisterFungibleTransferHandler() depositHandler := listener.NewSubstrateDepositHandler() // Register FungibleTransferHandler function - depositHandler.RegisterDepositHandler(message.FungibleTransfer, listener.FungibleTransferHandler) + depositHandler.RegisterDepositHandler(substrate_chain.FungibleTransfer, listener.FungibleTransferHandler) message1, err1 := depositHandler.HandleDeposit(1, d1.DestDomainID, d1.DepositNonce, d1.ResourceID, d1.CallData, d1.TransferType) s.Nil(err1) s.NotNil(message1) diff --git a/chains/substrate/listener/event-handlers.go b/chains/substrate/listener/event-handlers.go index 86e664fa..a71106ab 100644 --- a/chains/substrate/listener/event-handlers.go +++ b/chains/substrate/listener/event-handlers.go @@ -6,7 +6,6 @@ package listener import ( "math/big" - "github.com/ChainSafe/chainbridge-core/relayer/message" "github.com/ChainSafe/sygma-relayer/chains/substrate/events" "github.com/centrifuge/go-substrate-rpc-client/v4/registry" "github.com/centrifuge/go-substrate-rpc-client/v4/registry/parser" @@ -14,19 +13,26 @@ import ( "github.com/mitchellh/mapstructure" "github.com/rs/zerolog" "github.com/rs/zerolog/log" + "github.com/sygmaprotocol/sygma-core/relayer/message" ) type SystemUpdateEventHandler struct { - conn ChainConnection + conn Connection } -func NewSystemUpdateEventHandler(conn ChainConnection) *SystemUpdateEventHandler { +func NewSystemUpdateEventHandler(conn Connection) *SystemUpdateEventHandler { return &SystemUpdateEventHandler{ conn: conn, } } -func (eh *SystemUpdateEventHandler) HandleEvents(evts []*parser.Event, msgChan chan []*message.Message) error { +func (eh *SystemUpdateEventHandler) HandleEvents(startBlock *big.Int, endBlock *big.Int) error { + + evts, err := FetchEvents(startBlock, endBlock, eh.conn) + if err != nil { + log.Error().Err(err).Msg("Error fetching events") + return err + } for _, e := range evts { if e.Name == events.ParachainUpdatedEvent { log.Info().Msgf("Updating substrate metadata") @@ -111,17 +117,27 @@ type FungibleTransferEventHandler struct { domainID uint8 depositHandler DepositHandler log zerolog.Logger + msgChan chan []*message.Message + conn Connection } -func NewFungibleTransferEventHandler(logC zerolog.Context, domainID uint8, depositHandler DepositHandler) *FungibleTransferEventHandler { +func NewFungibleTransferEventHandler(logC zerolog.Context, domainID uint8, depositHandler DepositHandler, msgChan chan []*message.Message, conn Connection) *FungibleTransferEventHandler { return &FungibleTransferEventHandler{ depositHandler: depositHandler, domainID: domainID, log: logC.Logger(), + msgChan: msgChan, + conn: conn, } } -func (eh *FungibleTransferEventHandler) HandleEvents(evts []*parser.Event, msgChan chan []*message.Message) error { +func (eh *FungibleTransferEventHandler) HandleEvents(startBlock *big.Int, endBlock *big.Int) error { + evts, err := FetchEvents(startBlock, endBlock, eh.conn) + if err != nil { + log.Error().Err(err).Msg("Error fetching events") + return err + } + domainDeposits := make(map[uint8][]*message.Message) for _, evt := range evts { @@ -153,29 +169,37 @@ func (eh *FungibleTransferEventHandler) HandleEvents(evts []*parser.Event, msgCh for _, deposits := range domainDeposits { go func(d []*message.Message) { - msgChan <- d + eh.msgChan <- d }(deposits) } return nil } type RetryEventHandler struct { - conn ChainConnection + conn Connection domainID uint8 depositHandler DepositHandler log zerolog.Logger + msgChan chan []*message.Message } -func NewRetryEventHandler(logC zerolog.Context, conn ChainConnection, depositHandler DepositHandler, domainID uint8) *RetryEventHandler { +func NewRetryEventHandler(logC zerolog.Context, conn Connection, depositHandler DepositHandler, domainID uint8, msgChan chan []*message.Message) *RetryEventHandler { return &RetryEventHandler{ depositHandler: depositHandler, domainID: domainID, conn: conn, log: logC.Logger(), + msgChan: msgChan, } } -func (rh *RetryEventHandler) HandleEvents(evts []*parser.Event, msgChan chan []*message.Message) error { +func (rh *RetryEventHandler) HandleEvents(startBlock *big.Int, endBlock *big.Int) error { + evts, err := FetchEvents(startBlock, endBlock, rh.conn) + if err != nil { + log.Error().Err(err).Msg("Error fetching events") + return err + } + hash, err := rh.conn.GetFinalizedHead() if err != nil { return err @@ -241,7 +265,7 @@ func (rh *RetryEventHandler) HandleEvents(evts []*parser.Event, msgChan chan []* } for _, deposits := range domainDeposits { - msgChan <- deposits + rh.msgChan <- deposits } return nil } diff --git a/chains/substrate/listener/event-handlers_test.go b/chains/substrate/listener/event-handlers_test.go index 05d0515f..3dd4c6b9 100644 --- a/chains/substrate/listener/event-handlers_test.go +++ b/chains/substrate/listener/event-handlers_test.go @@ -7,10 +7,11 @@ import ( "fmt" "math/big" - "github.com/ChainSafe/chainbridge-core/relayer/message" + "github.com/ChainSafe/sygma-relayer/chains" "github.com/ChainSafe/sygma-relayer/chains/substrate/listener" mock_events "github.com/ChainSafe/sygma-relayer/chains/substrate/listener/mock" "github.com/rs/zerolog" + "github.com/sygmaprotocol/sygma-core/relayer/message" "testing" @@ -23,7 +24,7 @@ import ( type SystemUpdateHandlerTestSuite struct { suite.Suite - conn *mock_events.MockChainConnection + mockConn *mock_events.MockConnection systemUpdateHandler *listener.SystemUpdateEventHandler } @@ -33,46 +34,46 @@ func TestRunSystemUpdateHandlerTestSuite(t *testing.T) { func (s *SystemUpdateHandlerTestSuite) SetupTest() { ctrl := gomock.NewController(s.T()) - s.conn = mock_events.NewMockChainConnection(ctrl) - s.systemUpdateHandler = listener.NewSystemUpdateEventHandler(s.conn) + s.mockConn = mock_events.NewMockConnection(ctrl) + s.systemUpdateHandler = listener.NewSystemUpdateEventHandler(s.mockConn) } func (s *SystemUpdateHandlerTestSuite) Test_UpdateMetadataFails() { - s.conn.EXPECT().UpdateMetatdata().Return(fmt.Errorf("error")) - + s.mockConn.EXPECT().UpdateMetatdata().Return(fmt.Errorf("error")) evts := []*parser.Event{ { Name: "ParachainSystem.ValidationFunctionApplied", }, } - msgChan := make(chan []*message.Message, 1) - err := s.systemUpdateHandler.HandleEvents(evts, msgChan) + s.mockConn.EXPECT().GetBlockHash(gomock.Any()).Return(types.Hash{}, nil) + s.mockConn.EXPECT().GetBlockEvents(gomock.Any()).Return(evts, nil) + + err := s.systemUpdateHandler.HandleEvents(big.NewInt(0), big.NewInt(1)) s.NotNil(err) - s.Equal(len(msgChan), 0) } func (s *SystemUpdateHandlerTestSuite) Test_NoMetadataUpdate() { - evts := []*parser.Event{} - msgChan := make(chan []*message.Message, 1) - err := s.systemUpdateHandler.HandleEvents(evts, msgChan) + s.mockConn.EXPECT().GetBlockHash(gomock.Any()).Return(types.Hash{}, nil) + s.mockConn.EXPECT().GetBlockEvents(gomock.Any()).Return([]*parser.Event{}, nil) + err := s.systemUpdateHandler.HandleEvents(big.NewInt(0), big.NewInt(1)) s.Nil(err) - s.Equal(len(msgChan), 0) } func (s *SystemUpdateHandlerTestSuite) Test_SuccesfullMetadataUpdate() { - s.conn.EXPECT().UpdateMetatdata().Return(nil) + s.mockConn.EXPECT().UpdateMetatdata().Return(nil) evts := []*parser.Event{ { Name: "ParachainSystem.ValidationFunctionApplied", }, } - msgChan := make(chan []*message.Message, 1) - err := s.systemUpdateHandler.HandleEvents(evts, msgChan) + s.mockConn.EXPECT().GetBlockHash(gomock.Any()).Return(types.Hash{}, nil) + s.mockConn.EXPECT().GetBlockEvents(gomock.Any()).Return(evts, nil) + + err := s.systemUpdateHandler.HandleEvents(big.NewInt(0), big.NewInt(1)) s.Nil(err) - s.Equal(len(msgChan), 0) } type DepositHandlerTestSuite struct { @@ -80,6 +81,8 @@ type DepositHandlerTestSuite struct { depositEventHandler *listener.FungibleTransferEventHandler mockDepositHandler *mock_events.MockDepositHandler domainID uint8 + msgChan chan []*message.Message + mockConn *mock_events.MockConnection } func TestRunDepositHandlerTestSuite(t *testing.T) { @@ -90,7 +93,9 @@ func (s *DepositHandlerTestSuite) SetupTest() { ctrl := gomock.NewController(s.T()) s.domainID = 1 s.mockDepositHandler = mock_events.NewMockDepositHandler(ctrl) - s.depositEventHandler = listener.NewFungibleTransferEventHandler(zerolog.Context{}, s.domainID, s.mockDepositHandler) + s.msgChan = make(chan []*message.Message, 2) + s.mockConn = mock_events.NewMockConnection(ctrl) + s.depositEventHandler = listener.NewFungibleTransferEventHandler(zerolog.Context{}, s.domainID, s.mockDepositHandler, s.msgChan, s.mockConn) } func (s *DepositHandlerTestSuite) Test_HandleDepositFails_ExecutionContinue() { @@ -125,13 +130,12 @@ func (s *DepositHandlerTestSuite) Test_HandleDepositFails_ExecutionContinue() { d2["deposit_nonce"], d2["resource_id"], d2["deposit_data"], - d1["sygma_traits_TransferType"], + d2["sygma_traits_TransferType"], ).Return( - &message.Message{DepositNonce: 2}, + &message.Message{Data: chains.TransferMessageData{DepositNonce: 2}}, nil, ) - msgChan := make(chan []*message.Message, 2) evts := []*parser.Event{ { Name: "SygmaBridge.Deposit", @@ -156,11 +160,14 @@ func (s *DepositHandlerTestSuite) Test_HandleDepositFails_ExecutionContinue() { }, }, } - err := s.depositEventHandler.HandleEvents(evts, msgChan) - msgs := <-msgChan + s.mockConn.EXPECT().GetBlockHash(gomock.Any()).Return(types.Hash{}, nil) + s.mockConn.EXPECT().GetBlockEvents(gomock.Any()).Return(evts, nil) + + err := s.depositEventHandler.HandleEvents(big.NewInt(0), big.NewInt(1)) + msgs := <-s.msgChan s.Nil(err) - s.Equal(msgs, []*message.Message{{DepositNonce: 2}}) + s.Equal(msgs, []*message.Message{{Data: chains.TransferMessageData{DepositNonce: 2}}}) } func (s *DepositHandlerTestSuite) Test_SuccessfulHandleDeposit() { @@ -188,7 +195,7 @@ func (s *DepositHandlerTestSuite) Test_SuccessfulHandleDeposit() { d1["deposit_data"], d1["sygma_traits_TransferType"], ).Return( - &message.Message{DepositNonce: 1}, + &message.Message{Data: chains.TransferMessageData{DepositNonce: 1}}, nil, ) s.mockDepositHandler.EXPECT().HandleDeposit( @@ -199,12 +206,10 @@ func (s *DepositHandlerTestSuite) Test_SuccessfulHandleDeposit() { d2["deposit_data"], d2["sygma_traits_TransferType"], ).Return( - &message.Message{DepositNonce: 2}, + &message.Message{Data: chains.TransferMessageData{DepositNonce: 2}}, nil, ) - msgChan := make(chan []*message.Message, 2) - evts := []*parser.Event{ { Name: "SygmaBridge.Deposit", @@ -229,11 +234,14 @@ func (s *DepositHandlerTestSuite) Test_SuccessfulHandleDeposit() { }, }, } - err := s.depositEventHandler.HandleEvents(evts, msgChan) - msgs := <-msgChan + s.mockConn.EXPECT().GetBlockHash(gomock.Any()).Return(types.Hash{}, nil) + s.mockConn.EXPECT().GetBlockEvents(gomock.Any()).Return(evts, nil) + + err := s.depositEventHandler.HandleEvents(big.NewInt(0), big.NewInt(1)) + msgs := <-s.msgChan s.Nil(err) - s.Equal(msgs, []*message.Message{{DepositNonce: 1}, {DepositNonce: 2}}) + s.Equal(msgs, []*message.Message{{Data: chains.TransferMessageData{DepositNonce: 1}}, {Data: chains.TransferMessageData{DepositNonce: 2}}}) } func (s *DepositHandlerTestSuite) Test_HandleDepositPanics_ExecutionContinues() { @@ -271,11 +279,10 @@ func (s *DepositHandlerTestSuite) Test_HandleDepositPanics_ExecutionContinues() d2["deposit_data"], d2["sygma_traits_TransferType"], ).Return( - &message.Message{DepositNonce: 2}, + &message.Message{Data: chains.TransferMessageData{DepositNonce: 2}}, nil, ) - msgChan := make(chan []*message.Message, 2) evts := []*parser.Event{ { Name: "SygmaBridge.Deposit", @@ -300,18 +307,24 @@ func (s *DepositHandlerTestSuite) Test_HandleDepositPanics_ExecutionContinues() }, }, } - err := s.depositEventHandler.HandleEvents(evts, msgChan) - msgs := <-msgChan + + s.mockConn.EXPECT().GetBlockHash(gomock.Any()).Return(types.Hash{}, nil) + s.mockConn.EXPECT().GetBlockEvents(gomock.Any()).Return(evts, nil) + + err := s.depositEventHandler.HandleEvents(big.NewInt(0), big.NewInt(1)) + msgs := <-s.msgChan s.Nil(err) - s.Equal(msgs, []*message.Message{{DepositNonce: 2}}) + s.Equal(msgs, []*message.Message{{Data: chains.TransferMessageData{DepositNonce: 2}}}) } type RetryHandlerTestSuite struct { suite.Suite + retryHandler *listener.RetryEventHandler mockDepositHandler *mock_events.MockDepositHandler - mockConn *mock_events.MockChainConnection + mockConn *mock_events.MockConnection domainID uint8 + msgChan chan []*message.Message } func TestRunRetryHandlerTestSuite(t *testing.T) { @@ -322,15 +335,17 @@ func (s *RetryHandlerTestSuite) SetupTest() { ctrl := gomock.NewController(s.T()) s.domainID = 1 s.mockDepositHandler = mock_events.NewMockDepositHandler(ctrl) - s.mockConn = mock_events.NewMockChainConnection(ctrl) + s.mockConn = mock_events.NewMockConnection(ctrl) + s.msgChan = make(chan []*message.Message, 2) + s.retryHandler = listener.NewRetryEventHandler(zerolog.Context{}, s.mockConn, s.mockDepositHandler, s.domainID, s.msgChan) + } func (s *RetryHandlerTestSuite) Test_CannotFetchLatestBlock() { - s.mockConn.EXPECT().GetFinalizedHead().Return(types.Hash{}, fmt.Errorf("error")) + s.mockConn.EXPECT().GetBlockHash(gomock.Any()).Return(types.Hash{}, nil) + s.mockConn.EXPECT().GetBlockEvents(gomock.Any()).Return([]*parser.Event{}, fmt.Errorf("error")) - retryHandler := listener.NewRetryEventHandler(zerolog.Context{}, s.mockConn, s.mockDepositHandler, s.domainID) - msgChan := make(chan []*message.Message, 2) - err := retryHandler.HandleEvents([]*parser.Event{}, msgChan) + err := s.retryHandler.HandleEvents(big.NewInt(0), big.NewInt(1)) s.NotNil(err) } @@ -343,8 +358,6 @@ func (s *RetryHandlerTestSuite) Test_EventTooNew() { }, }}, nil) - retryHandler := listener.NewRetryEventHandler(zerolog.Context{}, s.mockConn, s.mockDepositHandler, s.domainID) - msgChan := make(chan []*message.Message) rtry := map[string]any{ "deposit_on_block_height": types.NewU128(*big.NewInt(101)), } @@ -356,11 +369,12 @@ func (s *RetryHandlerTestSuite) Test_EventTooNew() { }, }, } + s.mockConn.EXPECT().GetBlockHash(gomock.Any()).Return(types.Hash{}, nil) + s.mockConn.EXPECT().GetBlockEvents(gomock.Any()).Return(evts, nil) - err := retryHandler.HandleEvents(evts, msgChan) - + err := s.retryHandler.HandleEvents(big.NewInt(0), big.NewInt(1)) s.Nil(err) - s.Equal(len(msgChan), 0) + s.Equal(len(s.msgChan), 0) } func (s *RetryHandlerTestSuite) Test_FetchingBlockHashFails() { @@ -370,10 +384,10 @@ func (s *RetryHandlerTestSuite) Test_FetchingBlockHashFails() { Number: types.BlockNumber(uint32(100)), }, }}, nil) + + s.mockConn.EXPECT().GetBlockHash(gomock.Any()).Return(types.Hash{}, nil) s.mockConn.EXPECT().GetBlockHash(uint64(95)).Return(types.Hash{}, fmt.Errorf("error")) - retryHandler := listener.NewRetryEventHandler(zerolog.Context{}, s.mockConn, s.mockDepositHandler, s.domainID) - msgChan := make(chan []*message.Message) rtry := map[string]any{ "deposit_on_block_height": types.NewU128(*big.NewInt(95)), } @@ -385,10 +399,12 @@ func (s *RetryHandlerTestSuite) Test_FetchingBlockHashFails() { }, }, } - err := retryHandler.HandleEvents(evts, msgChan) + s.mockConn.EXPECT().GetBlockEvents(gomock.Any()).Return(evts, nil) + + err := s.retryHandler.HandleEvents(big.NewInt(0), big.NewInt(1)) s.NotNil(err) - s.Equal(len(msgChan), 0) + s.Equal(len(s.msgChan), 0) } func (s *RetryHandlerTestSuite) Test_FetchingBlockEventsFails() { @@ -398,11 +414,9 @@ func (s *RetryHandlerTestSuite) Test_FetchingBlockEventsFails() { Number: types.BlockNumber(uint32(100)), }, }}, nil) + s.mockConn.EXPECT().GetBlockHash(gomock.Any()).Return(types.Hash{}, nil) s.mockConn.EXPECT().GetBlockHash(uint64(95)).Return(types.Hash{}, nil) - s.mockConn.EXPECT().GetBlockEvents(gomock.Any()).Return(nil, fmt.Errorf("error")) - retryHandler := listener.NewRetryEventHandler(zerolog.Context{}, s.mockConn, s.mockDepositHandler, s.domainID) - msgChan := make(chan []*message.Message) rtry := map[string]any{ "deposit_on_block_height": types.NewU128(*big.NewInt(95)), } @@ -414,10 +428,13 @@ func (s *RetryHandlerTestSuite) Test_FetchingBlockEventsFails() { }, }, } - err := retryHandler.HandleEvents(evts, msgChan) + s.mockConn.EXPECT().GetBlockEvents(gomock.Any()).Return(evts, nil) + s.mockConn.EXPECT().GetBlockEvents(gomock.Any()).Return(nil, fmt.Errorf("error")) + + err := s.retryHandler.HandleEvents(big.NewInt(0), big.NewInt(1)) s.NotNil(err) - s.Equal(len(msgChan), 0) + s.Equal(len(s.msgChan), 0) } func (s *RetryHandlerTestSuite) Test_NoEvents() { @@ -427,11 +444,9 @@ func (s *RetryHandlerTestSuite) Test_NoEvents() { Number: types.BlockNumber(uint32(100)), }, }}, nil) + s.mockConn.EXPECT().GetBlockHash(gomock.Any()).Return(types.Hash{}, nil) s.mockConn.EXPECT().GetBlockHash(uint64(95)).Return(types.Hash{}, nil) - s.mockConn.EXPECT().GetBlockEvents(gomock.Any()).Return([]*parser.Event{}, nil) - retryHandler := listener.NewRetryEventHandler(zerolog.Context{}, s.mockConn, s.mockDepositHandler, s.domainID) - msgChan := make(chan []*message.Message) rtry := map[string]any{ "deposit_on_block_height": types.NewU128(*big.NewInt(95)), } @@ -443,10 +458,13 @@ func (s *RetryHandlerTestSuite) Test_NoEvents() { }, }, } - err := retryHandler.HandleEvents(evts, msgChan) + s.mockConn.EXPECT().GetBlockEvents(gomock.Any()).Return(evts, nil) + s.mockConn.EXPECT().GetBlockEvents(gomock.Any()).Return([]*parser.Event{}, nil) + + err := s.retryHandler.HandleEvents(big.NewInt(0), big.NewInt(1)) s.Nil(err) - s.Equal(len(msgChan), 0) + s.Equal(len(s.msgChan), 0) } func (s *RetryHandlerTestSuite) Test_ValidEvents() { @@ -456,6 +474,7 @@ func (s *RetryHandlerTestSuite) Test_ValidEvents() { Number: types.BlockNumber(uint32(100)), }, }}, nil) + s.mockConn.EXPECT().GetBlockHash(gomock.Any()).Return(types.Hash{}, nil) s.mockConn.EXPECT().GetBlockHash(uint64(95)).Return(types.Hash{}, nil) d1 := map[string]any{ "dest_domain_id": types.NewU8(2), @@ -497,7 +516,6 @@ func (s *RetryHandlerTestSuite) Test_ValidEvents() { }, }, } - s.mockConn.EXPECT().GetBlockEvents(gomock.Any()).Return(blockEvts, nil) s.mockDepositHandler.EXPECT().HandleDeposit( s.domainID, d1["dest_domain_id"], @@ -506,7 +524,7 @@ func (s *RetryHandlerTestSuite) Test_ValidEvents() { d1["deposit_data"], d1["sygma_traits_TransferType"], ).Return( - &message.Message{DepositNonce: 1}, + &message.Message{Data: chains.TransferMessageData{DepositNonce: 1}}, nil, ) s.mockDepositHandler.EXPECT().HandleDeposit( @@ -517,12 +535,10 @@ func (s *RetryHandlerTestSuite) Test_ValidEvents() { d2["deposit_data"], d2["sygma_traits_TransferType"], ).Return( - &message.Message{DepositNonce: 2}, + &message.Message{Data: chains.TransferMessageData{DepositNonce: 2}}, nil, ) - retryHandler := listener.NewRetryEventHandler(zerolog.Context{}, s.mockConn, s.mockDepositHandler, s.domainID) - msgChan := make(chan []*message.Message, 2) rtry := map[string]any{ "deposit_on_block_height": types.NewU128(*big.NewInt(95)), } @@ -534,12 +550,15 @@ func (s *RetryHandlerTestSuite) Test_ValidEvents() { }, }, } - err := retryHandler.HandleEvents(evts, msgChan) - msgs := <-msgChan + s.mockConn.EXPECT().GetBlockEvents(gomock.Any()).Return(evts, nil) + s.mockConn.EXPECT().GetBlockEvents(gomock.Any()).Return(blockEvts, nil) + + err := s.retryHandler.HandleEvents(big.NewInt(0), big.NewInt(1)) + msgs := <-s.msgChan s.Nil(err) - s.Equal(len(msgChan), 0) - s.Equal(msgs, []*message.Message{{DepositNonce: 1}, {DepositNonce: 2}}) + s.Equal(len(s.msgChan), 0) + s.Equal(msgs, []*message.Message{{Data: chains.TransferMessageData{DepositNonce: 1}}, {Data: chains.TransferMessageData{DepositNonce: 2}}}) } func (s *RetryHandlerTestSuite) Test_EventPanics() { @@ -549,6 +568,7 @@ func (s *RetryHandlerTestSuite) Test_EventPanics() { Number: types.BlockNumber(uint32(100)), }, }}, nil) + s.mockConn.EXPECT().GetBlockHash(gomock.Any()).Return(types.Hash{}, nil) s.mockConn.EXPECT().GetBlockHash(uint64(95)).Return(types.Hash{}, nil) s.mockConn.EXPECT().GetBlockHash(uint64(95)).Return(types.Hash{}, nil) d1 := map[string]any{ @@ -594,8 +614,7 @@ func (s *RetryHandlerTestSuite) Test_EventPanics() { }, }, } - s.mockConn.EXPECT().GetBlockEvents(gomock.Any()).Return(blockEvts1, nil) - s.mockConn.EXPECT().GetBlockEvents(gomock.Any()).Return(blockEvts2, nil) + s.mockDepositHandler.EXPECT().HandleDeposit( s.domainID, d1["dest_domain_id"], @@ -614,12 +633,10 @@ func (s *RetryHandlerTestSuite) Test_EventPanics() { d2["deposit_data"], d2["sygma_traits_TransferType"], ).Return( - &message.Message{DepositNonce: 2}, + &message.Message{Data: chains.TransferMessageData{DepositNonce: 2}}, nil, ) - retryHandler := listener.NewRetryEventHandler(zerolog.Context{}, s.mockConn, s.mockDepositHandler, s.domainID) - msgChan := make(chan []*message.Message, 1) rtry := map[string]any{ "deposit_on_block_height": types.NewU128(*big.NewInt(95)), } @@ -637,10 +654,14 @@ func (s *RetryHandlerTestSuite) Test_EventPanics() { }, }, } - err := retryHandler.HandleEvents(evts, msgChan) - msgs := <-msgChan + s.mockConn.EXPECT().GetBlockEvents(gomock.Any()).Return(evts, nil) + s.mockConn.EXPECT().GetBlockEvents(gomock.Any()).Return(blockEvts1, nil) + s.mockConn.EXPECT().GetBlockEvents(gomock.Any()).Return(blockEvts2, nil) + + err := s.retryHandler.HandleEvents(big.NewInt(0), big.NewInt(1)) + msgs := <-s.msgChan s.Nil(err) - s.Equal(len(msgChan), 0) - s.Equal(msgs, []*message.Message{{DepositNonce: 2}}) + s.Equal(len(s.msgChan), 0) + s.Equal(msgs, []*message.Message{{Data: chains.TransferMessageData{DepositNonce: 2}}}) } diff --git a/chains/substrate/listener/listener.go b/chains/substrate/listener/listener.go index 12b93a0c..7c62c518 100644 --- a/chains/substrate/listener/listener.go +++ b/chains/substrate/listener/listener.go @@ -4,123 +4,30 @@ package listener import ( - "context" "math/big" - "time" - - "github.com/ChainSafe/chainbridge-core/relayer/message" - "github.com/ChainSafe/chainbridge-core/store" - - "github.com/ChainSafe/sygma-relayer/chains/substrate" "github.com/centrifuge/go-substrate-rpc-client/v4/registry/parser" "github.com/centrifuge/go-substrate-rpc-client/v4/types" - "github.com/rs/zerolog" - "github.com/rs/zerolog/log" ) -type EventHandler interface { - HandleEvents(evts []*parser.Event, msgChan chan []*message.Message) error -} - -type ChainConnection interface { - UpdateMetatdata() error - GetHeaderLatest() (*types.Header, error) - GetBlockHash(blockNumber uint64) (types.Hash, error) - GetBlockEvents(hash types.Hash) ([]*parser.Event, error) +type Connection interface { GetFinalizedHead() (types.Hash, error) GetBlock(blockHash types.Hash) (*types.SignedBlock, error) + GetBlockHash(blockNumber uint64) (types.Hash, error) + GetBlockEvents(hash types.Hash) ([]*parser.Event, error) + UpdateMetatdata() error } -type SubstrateListener struct { - conn ChainConnection - - eventHandlers []EventHandler - blockRetryInterval time.Duration - blockInterval *big.Int - - log zerolog.Logger -} - -func NewSubstrateListener(connection ChainConnection, eventHandlers []EventHandler, config *substrate.SubstrateConfig) *SubstrateListener { - return &SubstrateListener{ - log: log.With().Uint8("domainID", *config.GeneralChainConfig.Id).Logger(), - conn: connection, - eventHandlers: eventHandlers, - blockRetryInterval: config.BlockRetryInterval, - blockInterval: config.BlockInterval, - } -} - -func (l *SubstrateListener) ListenToEvents(ctx context.Context, startBlock *big.Int, domainID uint8, blockstore store.BlockStore, msgChan chan []*message.Message) { - endBlock := big.NewInt(0) - - go func() { - loop: - for { - select { - case <-ctx.Done(): - return - default: - hash, err := l.conn.GetFinalizedHead() - if err != nil { - l.log.Warn().Err(err).Msg("Failed to fetch finalized header") - time.Sleep(l.blockRetryInterval) - continue - } - head, err := l.conn.GetBlock(hash) - if err != nil { - l.log.Warn().Err(err).Msg("Failed to fetch block") - time.Sleep(l.blockRetryInterval) - continue - } - - if startBlock == nil { - startBlock = big.NewInt(int64(head.Block.Header.Number)) - } - endBlock.Add(startBlock, l.blockInterval) - - // Sleep if finalized is less then current block - if big.NewInt(int64(head.Block.Header.Number)).Cmp(endBlock) == -1 { - time.Sleep(l.blockRetryInterval) - continue - } - - evts, err := l.fetchEvents(startBlock, endBlock) - if err != nil { - l.log.Warn().Err(err).Msgf("Failed fetching events for block range %s-%s", startBlock, endBlock) - time.Sleep(l.blockRetryInterval) - continue - } - - for _, handler := range l.eventHandlers { - err := handler.HandleEvents(evts, msgChan) - if err != nil { - l.log.Warn().Err(err).Msg("Error handling substrate events") - continue loop - } - } - err = blockstore.StoreBlock(endBlock, domainID) - if err != nil { - l.log.Error().Str("block", startBlock.String()).Err(err).Msg("Failed to write latest block to blockstore") - } - startBlock.Add(startBlock, l.blockInterval) - } - } - }() -} - -func (l *SubstrateListener) fetchEvents(startBlock *big.Int, endBlock *big.Int) ([]*parser.Event, error) { - l.log.Debug().Msgf("Fetching substrate events for block range %s-%s", startBlock, endBlock) +func FetchEvents(startBlock *big.Int, endBlock *big.Int, conn Connection) ([]*parser.Event, error) { evts := make([]*parser.Event, 0) for i := new(big.Int).Set(startBlock); i.Cmp(endBlock) == -1; i.Add(i, big.NewInt(1)) { - hash, err := l.conn.GetBlockHash(i.Uint64()) + hash, err := conn.GetBlockHash(i.Uint64()) if err != nil { return nil, err } - evt, err := l.conn.GetBlockEvents(hash) + evt, err := conn.GetBlockEvents(hash) if err != nil { return nil, err } diff --git a/chains/substrate/listener/mock/handlers.go b/chains/substrate/listener/mock/handlers.go index bf90e6d7..724f71a8 100644 --- a/chains/substrate/listener/mock/handlers.go +++ b/chains/substrate/listener/mock/handlers.go @@ -7,9 +7,9 @@ package mock_listener import ( reflect "reflect" - message "github.com/ChainSafe/chainbridge-core/relayer/message" types "github.com/centrifuge/go-substrate-rpc-client/v4/types" gomock "github.com/golang/mock/gomock" + message "github.com/sygmaprotocol/sygma-core/relayer/message" ) // MockDepositHandler is a mock of DepositHandler interface. diff --git a/chains/substrate/listener/mock/listener.go b/chains/substrate/listener/mock/listener.go index d6ec1279..cc5c955a 100644 --- a/chains/substrate/listener/mock/listener.go +++ b/chains/substrate/listener/mock/listener.go @@ -7,74 +7,36 @@ package mock_listener import ( reflect "reflect" - message "github.com/ChainSafe/chainbridge-core/relayer/message" parser "github.com/centrifuge/go-substrate-rpc-client/v4/registry/parser" types "github.com/centrifuge/go-substrate-rpc-client/v4/types" gomock "github.com/golang/mock/gomock" ) -// MockEventHandler is a mock of EventHandler interface. -type MockEventHandler struct { +// MockConnection is a mock of Connection interface. +type MockConnection struct { ctrl *gomock.Controller - recorder *MockEventHandlerMockRecorder + recorder *MockConnectionMockRecorder } -// MockEventHandlerMockRecorder is the mock recorder for MockEventHandler. -type MockEventHandlerMockRecorder struct { - mock *MockEventHandler +// MockConnectionMockRecorder is the mock recorder for MockConnection. +type MockConnectionMockRecorder struct { + mock *MockConnection } -// NewMockEventHandler creates a new mock instance. -func NewMockEventHandler(ctrl *gomock.Controller) *MockEventHandler { - mock := &MockEventHandler{ctrl: ctrl} - mock.recorder = &MockEventHandlerMockRecorder{mock} +// NewMockConnection creates a new mock instance. +func NewMockConnection(ctrl *gomock.Controller) *MockConnection { + mock := &MockConnection{ctrl: ctrl} + mock.recorder = &MockConnectionMockRecorder{mock} return mock } // EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockEventHandler) EXPECT() *MockEventHandlerMockRecorder { - return m.recorder -} - -// HandleEvents mocks base method. -func (m *MockEventHandler) HandleEvents(evts []*parser.Event, msgChan chan []*message.Message) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "HandleEvents", evts, msgChan) - ret0, _ := ret[0].(error) - return ret0 -} - -// HandleEvents indicates an expected call of HandleEvents. -func (mr *MockEventHandlerMockRecorder) HandleEvents(evts, msgChan interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "HandleEvents", reflect.TypeOf((*MockEventHandler)(nil).HandleEvents), evts, msgChan) -} - -// MockChainConnection is a mock of ChainConnection interface. -type MockChainConnection struct { - ctrl *gomock.Controller - recorder *MockChainConnectionMockRecorder -} - -// MockChainConnectionMockRecorder is the mock recorder for MockChainConnection. -type MockChainConnectionMockRecorder struct { - mock *MockChainConnection -} - -// NewMockChainConnection creates a new mock instance. -func NewMockChainConnection(ctrl *gomock.Controller) *MockChainConnection { - mock := &MockChainConnection{ctrl: ctrl} - mock.recorder = &MockChainConnectionMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockChainConnection) EXPECT() *MockChainConnectionMockRecorder { +func (m *MockConnection) EXPECT() *MockConnectionMockRecorder { return m.recorder } // GetBlock mocks base method. -func (m *MockChainConnection) GetBlock(blockHash types.Hash) (*types.SignedBlock, error) { +func (m *MockConnection) GetBlock(blockHash types.Hash) (*types.SignedBlock, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetBlock", blockHash) ret0, _ := ret[0].(*types.SignedBlock) @@ -83,13 +45,13 @@ func (m *MockChainConnection) GetBlock(blockHash types.Hash) (*types.SignedBlock } // GetBlock indicates an expected call of GetBlock. -func (mr *MockChainConnectionMockRecorder) GetBlock(blockHash interface{}) *gomock.Call { +func (mr *MockConnectionMockRecorder) GetBlock(blockHash interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetBlock", reflect.TypeOf((*MockChainConnection)(nil).GetBlock), blockHash) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetBlock", reflect.TypeOf((*MockConnection)(nil).GetBlock), blockHash) } // GetBlockEvents mocks base method. -func (m *MockChainConnection) GetBlockEvents(hash types.Hash) ([]*parser.Event, error) { +func (m *MockConnection) GetBlockEvents(hash types.Hash) ([]*parser.Event, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetBlockEvents", hash) ret0, _ := ret[0].([]*parser.Event) @@ -98,13 +60,13 @@ func (m *MockChainConnection) GetBlockEvents(hash types.Hash) ([]*parser.Event, } // GetBlockEvents indicates an expected call of GetBlockEvents. -func (mr *MockChainConnectionMockRecorder) GetBlockEvents(hash interface{}) *gomock.Call { +func (mr *MockConnectionMockRecorder) GetBlockEvents(hash interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetBlockEvents", reflect.TypeOf((*MockChainConnection)(nil).GetBlockEvents), hash) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetBlockEvents", reflect.TypeOf((*MockConnection)(nil).GetBlockEvents), hash) } // GetBlockHash mocks base method. -func (m *MockChainConnection) GetBlockHash(blockNumber uint64) (types.Hash, error) { +func (m *MockConnection) GetBlockHash(blockNumber uint64) (types.Hash, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetBlockHash", blockNumber) ret0, _ := ret[0].(types.Hash) @@ -113,13 +75,13 @@ func (m *MockChainConnection) GetBlockHash(blockNumber uint64) (types.Hash, erro } // GetBlockHash indicates an expected call of GetBlockHash. -func (mr *MockChainConnectionMockRecorder) GetBlockHash(blockNumber interface{}) *gomock.Call { +func (mr *MockConnectionMockRecorder) GetBlockHash(blockNumber interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetBlockHash", reflect.TypeOf((*MockChainConnection)(nil).GetBlockHash), blockNumber) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetBlockHash", reflect.TypeOf((*MockConnection)(nil).GetBlockHash), blockNumber) } // GetFinalizedHead mocks base method. -func (m *MockChainConnection) GetFinalizedHead() (types.Hash, error) { +func (m *MockConnection) GetFinalizedHead() (types.Hash, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetFinalizedHead") ret0, _ := ret[0].(types.Hash) @@ -128,28 +90,13 @@ func (m *MockChainConnection) GetFinalizedHead() (types.Hash, error) { } // GetFinalizedHead indicates an expected call of GetFinalizedHead. -func (mr *MockChainConnectionMockRecorder) GetFinalizedHead() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetFinalizedHead", reflect.TypeOf((*MockChainConnection)(nil).GetFinalizedHead)) -} - -// GetHeaderLatest mocks base method. -func (m *MockChainConnection) GetHeaderLatest() (*types.Header, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetHeaderLatest") - ret0, _ := ret[0].(*types.Header) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// GetHeaderLatest indicates an expected call of GetHeaderLatest. -func (mr *MockChainConnectionMockRecorder) GetHeaderLatest() *gomock.Call { +func (mr *MockConnectionMockRecorder) GetFinalizedHead() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetHeaderLatest", reflect.TypeOf((*MockChainConnection)(nil).GetHeaderLatest)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetFinalizedHead", reflect.TypeOf((*MockConnection)(nil).GetFinalizedHead)) } // UpdateMetatdata mocks base method. -func (m *MockChainConnection) UpdateMetatdata() error { +func (m *MockConnection) UpdateMetatdata() error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "UpdateMetatdata") ret0, _ := ret[0].(error) @@ -157,7 +104,7 @@ func (m *MockChainConnection) UpdateMetatdata() error { } // UpdateMetatdata indicates an expected call of UpdateMetatdata. -func (mr *MockChainConnectionMockRecorder) UpdateMetatdata() *gomock.Call { +func (mr *MockConnectionMockRecorder) UpdateMetatdata() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateMetatdata", reflect.TypeOf((*MockChainConnection)(nil).UpdateMetatdata)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateMetatdata", reflect.TypeOf((*MockConnection)(nil).UpdateMetatdata)) } diff --git a/chains/substrate/pallet/pallet.go b/chains/substrate/pallet/pallet.go index cc3170b4..72b12918 100644 --- a/chains/substrate/pallet/pallet.go +++ b/chains/substrate/pallet/pallet.go @@ -7,7 +7,8 @@ import ( "strconv" "github.com/ChainSafe/sygma-relayer/chains" - "github.com/ChainSafe/sygma-relayer/chains/substrate/client" + "github.com/sygmaprotocol/sygma-core/chains/substrate/client" + "github.com/centrifuge/go-substrate-rpc-client/v4/rpc/author" "github.com/centrifuge/go-substrate-rpc-client/v4/types" diff --git a/e2e/substrate/substrate_test.go b/e2e/substrate/substrate_test.go index e5c8886e..a8caf70e 100644 --- a/e2e/substrate/substrate_test.go +++ b/e2e/substrate/substrate_test.go @@ -8,11 +8,11 @@ import ( "encoding/binary" "github.com/ChainSafe/chainbridge-core/crypto/secp256k1" - "github.com/ChainSafe/sygma-relayer/chains/substrate/client" - "github.com/ChainSafe/sygma-relayer/chains/substrate/connection" substrateTypes "github.com/centrifuge/go-substrate-rpc-client/v4/types" "github.com/ethereum/go-ethereum/core/types" "github.com/sygmaprotocol/sygma-core/chains/evm/transactor" + "github.com/sygmaprotocol/sygma-core/chains/substrate/client" + "github.com/sygmaprotocol/sygma-core/chains/substrate/connection" "math/big" "testing" diff --git a/e2e/substrate/util.go b/e2e/substrate/util.go index 79205a46..32738415 100644 --- a/e2e/substrate/util.go +++ b/e2e/substrate/util.go @@ -14,8 +14,7 @@ import ( "github.com/centrifuge/go-substrate-rpc-client/v4/scale" "github.com/centrifuge/go-substrate-rpc-client/v4/signature" substrateTypes "github.com/centrifuge/go-substrate-rpc-client/v4/types" - - "github.com/ChainSafe/sygma-relayer/chains/substrate/connection" + "github.com/sygmaprotocol/sygma-core/chains/substrate/connection" "github.com/ChainSafe/chainbridge-core/chains/evm/calls" "github.com/ChainSafe/chainbridge-core/chains/evm/calls/evmgaspricer" diff --git a/example/app/app.go b/example/app/app.go index 1e1de1d3..81147575 100644 --- a/example/app/app.go +++ b/example/app/app.go @@ -15,6 +15,8 @@ import ( "github.com/ChainSafe/chainbridge-core/crypto/secp256k1" "github.com/ChainSafe/chainbridge-core/lvldb" "github.com/ChainSafe/chainbridge-core/opentelemetry" + "github.com/sygmaprotocol/sygma-core/chains/substrate/client" + "github.com/sygmaprotocol/sygma-core/chains/substrate/connection" "github.com/ethereum/go-ethereum/common" "github.com/libp2p/go-libp2p/core/crypto" @@ -37,8 +39,6 @@ import ( "github.com/ChainSafe/sygma-relayer/chains/evm" "github.com/ChainSafe/sygma-relayer/chains/substrate" - "github.com/ChainSafe/sygma-relayer/chains/substrate/client" - "github.com/ChainSafe/sygma-relayer/chains/substrate/connection" substrateExecutor "github.com/ChainSafe/sygma-relayer/chains/substrate/executor" substrate_listener "github.com/ChainSafe/sygma-relayer/chains/substrate/listener" substrate_pallet "github.com/ChainSafe/sygma-relayer/chains/substrate/pallet"