diff --git a/admin/admin.go b/admin/admin.go index abe678757..b5568becb 100644 --- a/admin/admin.go +++ b/admin/admin.go @@ -10,6 +10,7 @@ import ( "github.com/ethereum-optimism/supersim/config" "github.com/ethereum-optimism/supersim/interop" + "github.com/ethereum-optimism/supersim/opsimulator" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/log" @@ -26,6 +27,7 @@ type AdminServer struct { networkConfig *config.NetworkConfig l2ToL2MsgIndexer *interop.L2ToL2MessageIndexer + l1ToL2MsgIndexer *opsimulator.L1ToL2MessageIndexer port uint64 } @@ -34,6 +36,7 @@ type RPCMethods struct { log log.Logger networkConfig *config.NetworkConfig l2ToL2MsgIndexer *interop.L2ToL2MessageIndexer + l1ToL2MsgIndexer *opsimulator.L1ToL2MessageIndexer } type JSONRPCError struct { @@ -50,6 +53,34 @@ type JSONL2ToL2Message struct { Message hexutil.Bytes `json:"Message"` } +type JSONDepositTx struct { + SourceHash common.Hash `json:"SourceHash"` + From common.Address `json:"From"` + To *common.Address `json:"To"` + Mint *big.Int `json:"Mint"` + Value *big.Int `json:"Value"` + Gas uint64 `json:"Gas"` + IsSystemTransaction bool `json:"IsSystemTransaction"` + Data hexutil.Bytes `json:"Data"` +} + +type JSONDepositLog struct { + Address common.Address `json:"Address"` + Topics []common.Hash `json:"Topics"` + Data hexutil.Bytes `json:"Data"` + BlockNumber uint64 `json:"BlockNumber"` + TxHash common.Hash `json:"TxHash"` + TxIndex uint `json:"TxIndex"` + BlockHash common.Hash `json:"BlockHash"` + Index uint `json:"Index"` + Removed bool `json:"Removed"` +} + +type JSONDepositMessage struct { + DepositTxn JSONDepositTx + DepositLog JSONDepositLog +} + func (e *JSONRPCError) Error() string { return e.Message } @@ -58,12 +89,12 @@ func (err *JSONRPCError) ErrorCode() int { return err.Code } -func NewAdminServer(log log.Logger, port uint64, networkConfig *config.NetworkConfig, indexer *interop.L2ToL2MessageIndexer) *AdminServer { +func NewAdminServer(log log.Logger, port uint64, networkConfig *config.NetworkConfig, l2ToL2MsgIndexer *interop.L2ToL2MessageIndexer, l1ToL2MsgIndexer *opsimulator.L1ToL2MessageIndexer) *AdminServer { - adminServer := &AdminServer{log: log, port: port, networkConfig: networkConfig} + adminServer := &AdminServer{log: log, port: port, networkConfig: networkConfig, l1ToL2MsgIndexer: l1ToL2MsgIndexer} - if networkConfig.InteropEnabled && indexer != nil { - adminServer.l2ToL2MsgIndexer = indexer + if networkConfig.InteropEnabled && l2ToL2MsgIndexer != nil { + adminServer.l2ToL2MsgIndexer = l2ToL2MsgIndexer } return adminServer @@ -140,6 +171,7 @@ func (s *AdminServer) setupRouter() *gin.Engine { log: s.log, networkConfig: s.networkConfig, l2ToL2MsgIndexer: s.l2ToL2MsgIndexer, + l1ToL2MsgIndexer: s.l1ToL2MsgIndexer, } if err := rpcServer.RegisterName("admin", rpcMethods); err != nil { @@ -223,3 +255,56 @@ func (m *RPCMethods) GetL2ToL2MessageByMsgHash(args *common.Hash) (*JSONL2ToL2Me Message: msg.Message, }, nil } + +func (m *RPCMethods) GetL1ToL2MessageByTxnHash(args *common.Hash) (*JSONDepositMessage, error) { + if m.l1ToL2MsgIndexer == nil { + return nil, &JSONRPCError{ + Code: -32601, + Message: "L1ToL2MsgIndexer is not initialized.", + } + } + + if (args == nil || args == &common.Hash{}) { + return nil, &JSONRPCError{ + Code: -32602, + Message: "Valid msg hash not provided", + } + } + + storeEntry, err := m.l1ToL2MsgIndexer.Get(*args) + + if err != nil { + return nil, &JSONRPCError{ + Code: -32603, + Message: fmt.Sprintf("Failed to get message: %v", err), + } + } + + depositTxn := JSONDepositTx{ + SourceHash: storeEntry.DepositTxn.SourceHash, + From: storeEntry.DepositTxn.From, + To: storeEntry.DepositTxn.To, + Mint: storeEntry.DepositTxn.Mint, + Value: storeEntry.DepositTxn.Value, + Gas: storeEntry.DepositTxn.Gas, + IsSystemTransaction: storeEntry.DepositTxn.IsSystemTransaction, + Data: storeEntry.DepositTxn.Data, + } + + depositLog := JSONDepositLog{ + Address: storeEntry.DepositLog.Address, + Topics: storeEntry.DepositLog.Topics, + Data: storeEntry.DepositLog.Data, + BlockNumber: storeEntry.DepositLog.BlockNumber, + TxHash: storeEntry.DepositLog.TxHash, + TxIndex: storeEntry.DepositLog.TxIndex, + BlockHash: storeEntry.DepositLog.BlockHash, + Index: storeEntry.DepositLog.Index, + Removed: storeEntry.DepositLog.Removed, + } + + return &JSONDepositMessage{ + DepositTxn: depositTxn, + DepositLog: depositLog, + }, nil +} diff --git a/admin/admin_test.go b/admin/admin_test.go index 4e0e2f296..61e0572d9 100644 --- a/admin/admin_test.go +++ b/admin/admin_test.go @@ -23,7 +23,7 @@ func TestAdminServerBasicFunctionality(t *testing.T) { testlog := testlog.Logger(t, log.LevelInfo) ctx, cancel := context.WithCancel(context.Background()) - adminServer := NewAdminServer(testlog, 0, &networkConfig, nil) + adminServer := NewAdminServer(testlog, 0, &networkConfig, nil, nil) t.Cleanup(func() { cancel() }) require.NoError(t, adminServer.Start(ctx)) @@ -46,7 +46,7 @@ func TestGetL1AddressesRPC(t *testing.T) { testlog := testlog.Logger(t, log.LevelInfo) ctx, cancel := context.WithCancel(context.Background()) - adminServer := NewAdminServer(testlog, 0, &networkConfig, nil) + adminServer := NewAdminServer(testlog, 0, &networkConfig, nil, nil) t.Cleanup(func() { cancel() }) require.NoError(t, adminServer.Start(ctx)) diff --git a/contracts/lib/optimism b/contracts/lib/optimism index 3f43f039a..7b119c533 160000 --- a/contracts/lib/optimism +++ b/contracts/lib/optimism @@ -1 +1 @@ -Subproject commit 3f43f039a9e68b777045d7e2446947acbd9b0592 +Subproject commit 7b119c533f22bd5ef86bed2455b945987ca319a9 diff --git a/interop/indexer.go b/interop/indexer.go index 3117c86e5..494f8378b 100644 --- a/interop/indexer.go +++ b/interop/indexer.go @@ -49,6 +49,7 @@ func (i *L2ToL2MessageIndexer) Start(ctx context.Context, clients map[uint64]*et for chainID, client := range i.clients { i.tasks.Go(func() error { + logCh := make(chan types.Log) fq := ethereum.FilterQuery{Addresses: []common.Address{predeploys.L2toL2CrossDomainMessengerAddr}} sub, err := client.SubscribeFilterLogs(i.tasksCtx, fq, logCh) diff --git a/opsimulator/deposits.go b/opsimulator/deposits.go deleted file mode 100644 index fd14ce3fc..000000000 --- a/opsimulator/deposits.go +++ /dev/null @@ -1,87 +0,0 @@ -package opsimulator - -import ( - "context" - "errors" - "fmt" - - "github.com/ethereum-optimism/optimism/op-node/rollup/derive" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" - - "github.com/ethereum/go-ethereum" -) - -var _ ethereum.Subscription = &depositTxSubscription{} - -type depositTxSubscription struct { - logSubscription ethereum.Subscription - logCh chan types.Log - errCh chan error - doneCh chan struct{} -} - -func (d *depositTxSubscription) Unsubscribe() { - d.logSubscription.Unsubscribe() - d.doneCh <- struct{}{} -} - -func (d *depositTxSubscription) Err() <-chan error { - return d.errCh -} - -type LogSubscriber interface { - SubscribeFilterLogs(context.Context, ethereum.FilterQuery, chan<- types.Log) (ethereum.Subscription, error) -} - -// transforms Deposit event logs into DepositTx -func SubscribeDepositTx(ctx context.Context, logSub LogSubscriber, depositContractAddr common.Address, ch chan<- *types.DepositTx) (ethereum.Subscription, error) { - logCh := make(chan types.Log) - filterQuery := ethereum.FilterQuery{Addresses: []common.Address{depositContractAddr}, Topics: [][]common.Hash{{derive.DepositEventABIHash}}} - logSubscription, err := logSub.SubscribeFilterLogs(ctx, filterQuery, logCh) - if err != nil { - return nil, fmt.Errorf("failed to create log subscription: %w", err) - } - - errCh := make(chan error) - doneCh := make(chan struct{}) - logErrCh := logSubscription.Err() - - go func() { - defer close(logCh) - defer close(errCh) - defer close(doneCh) - for { - select { - case log := <-logCh: - dep, err := logToDepositTx(&log) - if err != nil { - errCh <- err - continue - } - ch <- dep - case err := <-logErrCh: - errCh <- fmt.Errorf("log subscription error: %w", err) - case <-ctx.Done(): - return - case <-doneCh: - return - } - } - }() - - return &depositTxSubscription{logSubscription, logCh, errCh, doneCh}, nil -} - -func logToDepositTx(log *types.Log) (*types.DepositTx, error) { - if len(log.Topics) > 0 && log.Topics[0] == derive.DepositEventABIHash { - dep, err := derive.UnmarshalDepositLogEvent(log) - if err != nil { - return nil, err - } - return dep, nil - } else { - return nil, errors.New("log is not a deposit event") - } -} diff --git a/opsimulator/indexer.go b/opsimulator/indexer.go new file mode 100644 index 000000000..49bafc75e --- /dev/null +++ b/opsimulator/indexer.go @@ -0,0 +1,234 @@ +package opsimulator + +import ( + "context" + "errors" + "fmt" + + "github.com/asaskevich/EventBus" + "github.com/ethereum-optimism/optimism/op-node/rollup/derive" + "github.com/ethereum-optimism/optimism/op-service/tasks" + "github.com/ethereum-optimism/supersim/config" + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/ethereum/go-ethereum/log" +) + +var _ ethereum.Subscription = &depositTxSubscription{} + +type L1ToL2MessageIndexer struct { + log log.Logger + storeManager *L1DepositStoreManager + eb EventBus.Bus + l2Chain config.Chain + tasks tasks.Group + tasksCtx context.Context + tasksCancel context.CancelFunc + ethClient *ethclient.Client + chains map[uint64]config.Chain +} + +type depositTxSubscription struct { + logSubscription ethereum.Subscription + logCh chan types.Log + errCh chan error + doneCh chan struct{} +} + +type DepositChannels struct { + DepositTxCh chan<- *types.DepositTx + LogCh chan<- types.Log +} + +func (d *depositTxSubscription) Unsubscribe() { + // since multiple opsims run subcription to indexer multiple times, a select needs to be added to avoid any race condition leading to a panic + select { + case <-d.doneCh: + return + default: + d.logSubscription.Unsubscribe() + close(d.doneCh) + } +} + +func (d *depositTxSubscription) Err() <-chan error { + return d.errCh +} + +type LogSubscriber interface { + SubscribeFilterLogs(context.Context, ethereum.FilterQuery, chan<- types.Log) (ethereum.Subscription, error) +} + +func NewL1ToL2MessageIndexer(log log.Logger, storeManager *L1DepositStoreManager) *L1ToL2MessageIndexer { + tasksCtx, tasksCancel := context.WithCancel(context.Background()) + + return &L1ToL2MessageIndexer{ + log: log, + storeManager: storeManager, + eb: EventBus.New(), + tasks: tasks.Group{ + HandleCrit: func(err error) { + fmt.Printf("unhandled indexer error: %v\n", err) + }, + }, + tasksCtx: tasksCtx, + tasksCancel: tasksCancel, + } +} + +func (i *L1ToL2MessageIndexer) Start(ctx context.Context, client *ethclient.Client, l2Chains map[uint64]config.Chain) error { + + i.chains = l2Chains + + for _, chain := range i.chains { + if err := i.startForChain(client, chain); err != nil { + return fmt.Errorf("Failed to start L1 to L2 indexer") + } + } + + return nil +} + +func (i *L1ToL2MessageIndexer) startForChain(client *ethclient.Client, chain config.Chain) error { + i.tasks.Go(func() error { + depositTxCh := make(chan *types.DepositTx) + logCh := make(chan types.Log) + + defer close(depositTxCh) + defer close(logCh) + + channels := DepositChannels{ + DepositTxCh: depositTxCh, + LogCh: logCh, + } + + portalAddress := common.Address(chain.Config().L2Config.L1Addresses.OptimismPortalProxy) + sub, err := SubscribeDepositTx(i.tasksCtx, client, portalAddress, channels) + + if err != nil { + return fmt.Errorf("failed to subscribe to deposit tx: %w", err) + } + + chainID := chain.Config().ChainID + + for { + select { + case dep := <-depositTxCh: + log := <-logCh + i.log.Info("observed deposit event on L1", "deposit:", dep) + if err := i.ProcessEvent(dep, log, chainID); err != nil { + fmt.Printf("failed to process log: %v\n", err) + } + + case <-i.tasksCtx.Done(): + sub.Unsubscribe() + } + } + }) + + return nil +} + +func (i *L1ToL2MessageIndexer) Stop(ctx context.Context) error { + i.tasksCancel() + return nil +} + +func depositMessageInfoKey(destinationChainID uint64) string { + return fmt.Sprintf("DepositMessageKey:destination:%d", destinationChainID) +} + +func (i *L1ToL2MessageIndexer) SubscribeDepositMessage(destinationChainID uint64, depositMessageChan chan<- *types.Transaction) (func(), error) { + return i.createSubscription(depositMessageInfoKey(destinationChainID), depositMessageChan) +} + +func (i *L1ToL2MessageIndexer) createSubscription(key string, depositMessageChan chan<- *types.Transaction) (func(), error) { + handler := func(e *types.Transaction) { + depositMessageChan <- e + } + + if err := i.eb.Subscribe(key, handler); err != nil { + return nil, fmt.Errorf("failed to create subscription %s: %w", key, err) + } + + return func() { + _ = i.eb.Unsubscribe(key, handler) + }, nil +} + +func (i *L1ToL2MessageIndexer) Get(msgHash common.Hash) (*L1DepositMessage, error) { + return i.storeManager.Get(msgHash) +} + +func (i *L1ToL2MessageIndexer) ProcessEvent(dep *types.DepositTx, log types.Log, chainID uint64) error { + + depTx := types.NewTx(dep) + i.log.Info("observed deposit event on L1", "hash", depTx.Hash().String(), "SourceHash", dep.SourceHash.String()) + + depositMessage := L1DepositMessage{ + DepositTxn: dep, + DepositLog: log, + } + + if err := i.storeManager.Set(dep.SourceHash, &depositMessage); err != nil { + i.log.Error("failed to store deposit tx to chain: %w", "chain.id", chainID, "err", err) + return err + } + + i.eb.Publish(depositMessageInfoKey(chainID), depTx) + return nil +} + +// transforms Deposit event logs into DepositTx +func SubscribeDepositTx(ctx context.Context, logSub LogSubscriber, depositContractAddr common.Address, channels DepositChannels) (ethereum.Subscription, error) { + logCh := make(chan types.Log) + filterQuery := ethereum.FilterQuery{Addresses: []common.Address{depositContractAddr}, Topics: [][]common.Hash{{derive.DepositEventABIHash}}} + logSubscription, err := logSub.SubscribeFilterLogs(ctx, filterQuery, logCh) + if err != nil { + return nil, fmt.Errorf("failed to create log subscription: %w", err) + } + + errCh := make(chan error) + doneCh := make(chan struct{}) + logErrCh := logSubscription.Err() + + go func() { + defer close(logCh) + defer close(errCh) + for { + select { + case log := <-logCh: + dep, err := logToDepositTx(&log) + if err != nil { + errCh <- err + continue + } + + channels.DepositTxCh <- dep + channels.LogCh <- log + case err := <-logErrCh: + errCh <- fmt.Errorf("log subscription error: %w", err) + case <-ctx.Done(): + return + case <-doneCh: + return + } + } + }() + + return &depositTxSubscription{logSubscription, logCh, errCh, doneCh}, nil +} + +func logToDepositTx(log *types.Log) (*types.DepositTx, error) { + if len(log.Topics) > 0 && log.Topics[0] == derive.DepositEventABIHash { + dep, err := derive.UnmarshalDepositLogEvent(log) + if err != nil { + return nil, err + } + return dep, nil + } else { + return nil, errors.New("log is not a deposit event") + } +} diff --git a/opsimulator/deposits_test.go b/opsimulator/indexer_test.go similarity index 55% rename from opsimulator/deposits_test.go rename to opsimulator/indexer_test.go index ee4258b1d..42596fb42 100644 --- a/opsimulator/deposits_test.go +++ b/opsimulator/indexer_test.go @@ -7,6 +7,7 @@ import ( "testing" "github.com/ethereum-optimism/optimism/op-node/rollup/derive" + oplog "github.com/ethereum-optimism/optimism/op-service/log" optestutils "github.com/ethereum-optimism/optimism/op-service/testutils" "github.com/ethereum-optimism/supersim/config" @@ -62,8 +63,14 @@ func TestSubscribeDepositTx(t *testing.T) { ctx := context.Background() depositTxCh := make(chan *types.DepositTx, len(mockDepositTxs)) + logCh := make(chan types.Log, len(mockDepositTxs)) - sub, err := SubscribeDepositTx(ctx, &chain, common.HexToAddress(""), depositTxCh) + channels := DepositChannels{ + DepositTxCh: depositTxCh, + LogCh: logCh, + } + + sub, err := SubscribeDepositTx(ctx, &chain, common.HexToAddress(""), channels) if err != nil { require.NoError(t, err) } @@ -86,3 +93,62 @@ func TestSubscribeDepositTx(t *testing.T) { close(depositTxCh) } + +func TestSubscribePublishTx(t *testing.T) { + + depositStoreMngr := NewL1DepositStoreManager() + indexer := NewL1ToL2MessageIndexer(oplog.NewLogger(oplog.AppOut(nil), oplog.DefaultCLIConfig()), depositStoreMngr) + + mockDepositTxs := createMockDepositTxs() + chain := MockChainWithSubscriptions{testutils.NewMockChain(), mockDepositTxs} + + depositPubTxCh := make(chan *types.Transaction, len(mockDepositTxs)) + + unsubscribe, err := indexer.SubscribeDepositMessage(chain.Config().ChainID, depositPubTxCh) + + require.NoError(t, err, "Should subscribe via chainId") + + ctx := context.Background() + + depositTxCh := make(chan *types.DepositTx, len(mockDepositTxs)) + logCh := make(chan types.Log, len(mockDepositTxs)) + + channels := DepositChannels{ + DepositTxCh: depositTxCh, + LogCh: logCh, + } + + sub, err := SubscribeDepositTx(ctx, &chain, common.HexToAddress(""), channels) + if err != nil { + require.NoError(t, err) + } + + initiatedDepTxn := make([]*types.Transaction, len(mockDepositTxs)) + + for i := 0; i < len(mockDepositTxs); i++ { + dep := <-depositTxCh + log := <-logCh + depTx := types.NewTx(dep) + + initiatedDepTxn[i] = depTx + + err := indexer.ProcessEvent(dep, log, chain.Config().ChainID) + require.NoError(t, err, "Should send valid details") + } + + for i := 0; i < len(initiatedDepTxn); i++ { + dep := <-depositPubTxCh + depTx := initiatedDepTxn[i] + + require.Equal(t, dep.To(), depTx.To()) + require.Equal(t, dep.IsDepositTx(), depTx.IsDepositTx()) + require.Equal(t, dep.Mint(), depTx.Mint()) + require.Equal(t, dep.SourceHash(), depTx.SourceHash()) + require.Equal(t, dep.Cost(), depTx.Cost()) + require.Equal(t, dep.Value(), depTx.Value()) + } + + unsubscribe() + sub.Unsubscribe() + close(depositTxCh) +} diff --git a/opsimulator/opsimulator.go b/opsimulator/opsimulator.go index d607f9df7..e0a77e4cc 100644 --- a/opsimulator/opsimulator.go +++ b/opsimulator/opsimulator.go @@ -58,6 +58,8 @@ type OpSimulator struct { ethClient *ethclient.Client stopped atomic.Bool + + indexer *L1ToL2MessageIndexer } // OpSimulator wraps around the l2 chain. By embedding `Chain`, it also implements the same inteface @@ -92,7 +94,7 @@ func New(log log.Logger, closeApp context.CancelCauseFunc, port uint64, host str } } -func (opSim *OpSimulator) Start(ctx context.Context) error { +func (opSim *OpSimulator) Start(ctx context.Context, indexer *L1ToL2MessageIndexer) error { mux := http.NewServeMux() mux.Handle("/", corsHandler(opSim.handler(ctx))) @@ -105,6 +107,8 @@ func (opSim *OpSimulator) Start(ctx context.Context) error { opSim.log.Debug("started opsimulator", "name", cfg.Name, "chain.id", cfg.ChainID, "addr", hs.Addr()) opSim.httpServer = hs + opSim.indexer = indexer + if opSim.port == 0 { _, portStr, err := net.SplitHostPort(hs.Addr().String()) if err != nil { @@ -151,19 +155,18 @@ func (opSim *OpSimulator) EthClient() *ethclient.Client { func (opSim *OpSimulator) startBackgroundTasks() { // Relay deposit tx from L1 to L2 opSim.bgTasks.Go(func() error { - depositTxCh := make(chan *types.DepositTx) - portalAddress := common.Address(opSim.Config().L2Config.L1Addresses.OptimismPortalProxy) - sub, err := SubscribeDepositTx(context.Background(), opSim.l1Chain.EthClient(), portalAddress, depositTxCh) + depositTxCh := make(chan *types.Transaction) + unsubscribe, err := opSim.indexer.SubscribeDepositMessage(opSim.Config().ChainID, depositTxCh) + if err != nil { - return fmt.Errorf("failed to subscribe to deposit tx: %w", err) + opSim.log.Error("Failed to subscribe to indexer") } chainId := opSim.Config().ChainID for { select { - case dep := <-depositTxCh: - depTx := types.NewTx(dep) + case depTx := <-depositTxCh: opSim.log.Debug("observed deposit event on L1", "hash", depTx.Hash().String()) clnt := opSim.Chain.EthClient() @@ -174,7 +177,7 @@ func (opSim *OpSimulator) startBackgroundTasks() { opSim.log.Info("OptimismPortal#depositTransaction", "l2TxHash", depTx.Hash().String()) case <-opSim.bgTasksCtx.Done(): - sub.Unsubscribe() + unsubscribe() close(depositTxCh) return nil } diff --git a/opsimulator/store.go b/opsimulator/store.go new file mode 100644 index 000000000..4007a883d --- /dev/null +++ b/opsimulator/store.go @@ -0,0 +1,68 @@ +package opsimulator + +import ( + "fmt" + "sync" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +type L1DepositMessage struct { + DepositTxn *types.DepositTx + DepositLog types.Log +} + +type L1DepositStore struct { + entryByHash map[common.Hash]*L1DepositMessage + mu sync.RWMutex +} + +type L1DepositStoreManager struct { + store *L1DepositStore +} + +func NewL1DepositStore() *L1DepositStore { + return &L1DepositStore{ + entryByHash: make(map[common.Hash]*L1DepositMessage), + } +} + +func NewL1DepositStoreManager() *L1DepositStoreManager { + return &L1DepositStoreManager{ + store: NewL1DepositStore(), + } +} + +func (s *L1DepositStore) Set(txnHash common.Hash, entry *L1DepositMessage) error { + s.mu.Lock() + defer s.mu.Unlock() + + s.entryByHash[txnHash] = entry + return nil +} + +func (s *L1DepositStore) Get(txnHash common.Hash) (*L1DepositMessage, error) { + s.mu.RLock() + defer s.mu.RUnlock() + + entry, exists := s.entryByHash[txnHash] + + if !exists { + return nil, fmt.Errorf("Deposit txn not found") + } + + return entry, nil +} + +func (s *L1DepositStoreManager) Get(txnHash common.Hash) (*L1DepositMessage, error) { + return s.store.Get(txnHash) +} + +func (s *L1DepositStoreManager) Set(txnHash common.Hash, entry *L1DepositMessage) error { + if err := s.store.Set(txnHash, entry); err != nil { + return fmt.Errorf("failed to store message: %w", err) + } + + return nil +} diff --git a/opsimulator/store_test.go b/opsimulator/store_test.go new file mode 100644 index 000000000..e3e516920 --- /dev/null +++ b/opsimulator/store_test.go @@ -0,0 +1,36 @@ +package opsimulator + +import ( + "math/rand" + "testing" + + "github.com/ethereum-optimism/optimism/op-service/testutils" + optestutils "github.com/ethereum-optimism/optimism/op-service/testutils" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/stretchr/testify/assert" +) + +func TestL1DepositStore_SetAndGet(t *testing.T) { + sm := NewL1DepositStoreManager() + + rng := rand.New(rand.NewSource(int64(0))) + sourceHash := common.Hash{} + depInput := optestutils.GenerateDeposit(sourceHash, rng) + depLog := optestutils.GenerateLog(testutils.RandomAddress(rng), nil, nil) + depTx := types.NewTx(depInput) + txnHash := depTx.SourceHash() + + depositMessage := L1DepositMessage{ + DepositTxn: depInput, + DepositLog: *depLog, + } + + err := sm.store.Set(txnHash, &depositMessage) + assert.NoError(t, err, "expect no error while store deposit txn ref") + + retrievedEntry, err := sm.store.Get(txnHash) + assert.NoError(t, err, "expected no error when getting entry from store") + assert.Equal(t, depositMessage.DepositTxn, retrievedEntry.DepositTxn, "expected retrieved depositTxn to equal stored depositTxn") + assert.Equal(t, depositMessage.DepositLog, retrievedEntry.DepositLog, "expected retrieved depositLog to equal stored depositLog") +} diff --git a/orchestrator/orchestrator.go b/orchestrator/orchestrator.go index 888c52609..9f3e9818f 100644 --- a/orchestrator/orchestrator.go +++ b/orchestrator/orchestrator.go @@ -30,6 +30,7 @@ type Orchestrator struct { l2ToL2MsgIndexer *interop.L2ToL2MessageIndexer l2ToL2MsgRelayer *interop.L2ToL2MessageRelayer + l1ToL2MsgIndexer *opsimulator.L1ToL2MessageIndexer AdminServer *admin.AdminServer } @@ -58,9 +59,10 @@ func NewOrchestrator(log log.Logger, closeApp context.CancelCauseFunc, cliConfig l2Anvils[cfg.ChainID] = l2Anvil } - // Spin up OpSim to front the L2 instances + // Sping up OpSim to front the L2 instances for i := range networkConfig.L2Configs { cfg := networkConfig.L2Configs[i] + l2OpSims[cfg.ChainID] = opsimulator.New(log, closeApp, nextL2Port, cfg.Host, l1Anvil, l2Anvils[cfg.ChainID], l2Anvils, networkConfig.InteropDelay) // only increment expected port if it has been specified @@ -71,6 +73,9 @@ func NewOrchestrator(log log.Logger, closeApp context.CancelCauseFunc, cliConfig o := Orchestrator{log: log, config: networkConfig, l1Chain: l1Anvil, l2Chains: l2Anvils, l2OpSims: l2OpSims} + depositStoreMngr := opsimulator.NewL1DepositStoreManager() + o.l1ToL2MsgIndexer = opsimulator.NewL1ToL2MessageIndexer(log, depositStoreMngr) + // Interop Setup if networkConfig.InteropEnabled { o.l2ToL2MsgIndexer = interop.NewL2ToL2MessageIndexer(log) @@ -79,13 +84,20 @@ func NewOrchestrator(log log.Logger, closeApp context.CancelCauseFunc, cliConfig } } - o.AdminServer = admin.NewAdminServer(log, adminPort, networkConfig, o.l2ToL2MsgIndexer) + a := admin.NewAdminServer(log, adminPort, networkConfig, o.l2ToL2MsgIndexer, o.l1ToL2MsgIndexer) + + o.AdminServer = a + return &o, nil } func (o *Orchestrator) Start(ctx context.Context) error { o.log.Debug("starting orchestrator") + // TODO: hack until opsim proxy supports websocket connections. + // We need websocket connections to make subscriptions. + // We should try to use make RPC through opsim not directly to the underlying chain + // Start Chains if err := o.l1Chain.Start(ctx); err != nil { return fmt.Errorf("l1 chain %s failed to start: %w", o.l1Chain.Config().Name, err) @@ -96,7 +108,7 @@ func (o *Orchestrator) Start(ctx context.Context) error { } } for _, opSim := range o.l2OpSims { - if err := opSim.Start(ctx); err != nil { + if err := opSim.Start(ctx, o.l1ToL2MsgIndexer); err != nil { return fmt.Errorf("op simulator instance %s failed to start: %w", opSim.Config().Name, err) } } @@ -106,14 +118,18 @@ func (o *Orchestrator) Start(ctx context.Context) error { return fmt.Errorf("unable to start mining: %w", err) } - // TODO: hack until opsim proxy supports websocket connections. - // We need websocket connections to make subscriptions. - // We should try to use make RPC through opsim not directly to the underlying chain l2ChainClientByChainId := make(map[uint64]*ethclient.Client) l2OpSimClientByChainId := make(map[uint64]*ethclient.Client) + l2ChainByChainId := make(map[uint64]config.Chain) + for chainID, opSim := range o.l2OpSims { l2ChainClientByChainId[chainID] = opSim.Chain.EthClient() l2OpSimClientByChainId[chainID] = opSim.EthClient() + l2ChainByChainId[chainID] = opSim.Chain + } + + if err := o.l1ToL2MsgIndexer.Start(ctx, o.l1Chain.EthClient(), l2ChainByChainId); err != nil { + return fmt.Errorf("l1 to l2 message indexer failed to start: %w", err) } // Configure Interop (if applicable) @@ -175,6 +191,10 @@ func (o *Orchestrator) Stop(ctx context.Context) error { } } + if err := o.l1ToL2MsgIndexer.Stop(ctx); err != nil { + errs = append(errs, fmt.Errorf("l1 to l2 message indexer failed to stop: %w", err)) + } + for _, opSim := range o.l2OpSims { o.log.Debug("stopping op simulator", "chain.id", opSim.Config().ChainID) if err := opSim.Stop(ctx); err != nil { @@ -228,7 +248,7 @@ func (o *Orchestrator) L1Chain() config.Chain { func (o *Orchestrator) L2Chains() []config.Chain { var chains []config.Chain - for _, chain := range o.l2OpSims { + for _, chain := range o.l2Chains { chains = append(chains, chain) } return chains diff --git a/supersim_test.go b/supersim_test.go index be5a4aca9..2999c6131 100644 --- a/supersim_test.go +++ b/supersim_test.go @@ -10,10 +10,13 @@ import ( "github.com/ethereum-optimism/optimism/op-chain-ops/devkeys" opbindings "github.com/ethereum-optimism/optimism/op-e2e/bindings" + "github.com/ethereum-optimism/optimism/op-e2e/e2eutils/receipts" "github.com/ethereum-optimism/optimism/op-e2e/e2eutils/wait" + "github.com/ethereum-optimism/optimism/op-node/rollup/derive" "github.com/ethereum-optimism/optimism/op-service/predeploys" "github.com/ethereum-optimism/optimism/op-service/testlog" registry "github.com/ethereum-optimism/superchain-registry/superchain" + "github.com/ethereum-optimism/supersim/admin" "github.com/ethereum-optimism/supersim/bindings" "github.com/ethereum-optimism/supersim/config" "github.com/ethereum-optimism/supersim/interop" @@ -64,15 +67,6 @@ type TestSuite struct { Supersim *Supersim } -type JSONL2ToL2Message struct { - Destination uint64 `json:"Destination"` - Source uint64 `json:"Source"` - Nonce *big.Int `json:"Nonce"` - Sender common.Address `json:"Sender"` - Target common.Address `json:"Target"` - Message hexutil.Bytes `json:"Message"` -} - type InteropTestSuite struct { t *testing.T @@ -1120,8 +1114,7 @@ func TestAdminGetL2ToL2MessageByMsgHash(t *testing.T) { return diff.Cmp(valueToTransfer) == 0, nil })) - var message *JSONL2ToL2Message - + var message *admin.JSONL2ToL2Message // msgHash for the above sendERC20 txn msgHash := "0x3656fd893944321663b2877d10db2895fb68e2346fd7e3f648ce5b986c200166" rpcErr := client.CallContext(context.Background(), &message, "admin_getL2ToL2MessageByMsgHash", msgHash) @@ -1132,3 +1125,73 @@ func TestAdminGetL2ToL2MessageByMsgHash(t *testing.T) { assert.Equal(t, tx.To().String(), message.Target.String()) assert.Equal(t, tx.To().String(), message.Sender.String()) } + +func TestAdminGetL1ToL2MessageByTxnHash(t *testing.T) { + t.Parallel() + + testSuite := createTestSuite(t) + + l1Chain := testSuite.Supersim.Orchestrator.L1Chain() + l1EthClient, _ := ethclient.Dial(l1Chain.Endpoint()) + + var wg sync.WaitGroup + var l1TxMutex sync.Mutex + + l2Chains := testSuite.Supersim.Orchestrator.L2Chains() + wg.Add(len(l2Chains)) + for i, chain := range l2Chains { + go func() { + defer wg.Done() + + l2EthClient, _ := ethclient.Dial(chain.Endpoint()) + privateKey, _ := testSuite.DevKeys.Secret(devkeys.UserKey(i)) + senderAddress, _ := testSuite.DevKeys.Address(devkeys.UserKey(i)) + adminRPCClient, _ := rpc.Dial(testSuite.Supersim.Orchestrator.AdminServer.Endpoint()) + + oneEth := big.NewInt(1e18) + prevBalance, _ := l2EthClient.BalanceAt(context.Background(), senderAddress, nil) + + transactor, _ := bind.NewKeyedTransactorWithChainID(privateKey, big.NewInt(int64(l1Chain.Config().ChainID))) + transactor.Value = oneEth + optimismPortal, _ := opbindings.NewOptimismPortal(common.Address(chain.Config().L2Config.L1Addresses.OptimismPortalProxy), l1EthClient) + + // needs a lock because the gas estimation can be outdated between transactions + l1TxMutex.Lock() + tx, err := optimismPortal.DepositTransaction(transactor, senderAddress, oneEth, 100000, false, make([]byte, 0)) + l1TxMutex.Unlock() + require.NoError(t, err) + + txReceipt, _ := bind.WaitMined(context.Background(), l1EthClient, tx) + require.NoError(t, err) + + require.True(t, txReceipt.Status == 1, "Deposit transaction failed") + require.NotEmpty(t, txReceipt.Logs, "Deposit transaction failed") + + postBalance, postBalanceCheckErr := wait.ForBalanceChange( + context.Background(), + l2EthClient, + senderAddress, + prevBalance, + ) + require.NoError(t, postBalanceCheckErr) + + // check that balance was increased + require.Equal(t, oneEth, postBalance.Sub(postBalance, prevBalance), "Recipient balance is incorrect") + + depositEvent, err := receipts.FindLog(txReceipt.Logs, optimismPortal.ParseTransactionDeposited) + require.NoError(t, err, "Should emit deposit event") + depositTx, err := derive.UnmarshalDepositLogEvent(&depositEvent.Raw) + require.NoError(t, err) + + var message *admin.JSONDepositMessage + rpcErr := adminRPCClient.CallContext(context.Background(), &message, "admin_getL1ToL2MessageByTxnHash", depositTx.SourceHash) + require.NoError(t, rpcErr) + + assert.Equal(t, oneEth.String(), message.DepositTxn.Value.String()) + assert.Equal(t, oneEth.String(), message.DepositTxn.Mint.String()) + assert.Equal(t, false, message.DepositTxn.IsSystemTransaction) + }() + } + + wg.Wait() +}