diff --git a/claimtxman/claimtxman.go b/claimtxman/claimtxman.go index cb86863b..d14201d3 100644 --- a/claimtxman/claimtxman.go +++ b/claimtxman/claimtxman.go @@ -182,7 +182,7 @@ func (tm *ClaimTxManager) processDepositStatus(ger *etherman.GlobalExitRoot, dbT log.Errorf("error BuildSendClaim tx for deposit %d. Error: %v", deposit.DepositCount, err) return err } - if err = tm.addClaimTx(deposit.DepositCount, deposit.BlockID, tm.auth.From, tx.To(), nil, tx.Data(), dbTx); err != nil { + if err = tm.addClaimTx(deposit.DepositCount, tm.auth.From, tx.To(), nil, tx.Data(), dbTx); err != nil { log.Errorf("error adding claim tx for deposit %d. Error: %v", deposit.DepositCount, err) return err } @@ -194,11 +194,11 @@ func (tm *ClaimTxManager) processDepositStatus(ger *etherman.GlobalExitRoot, dbT func (tm *ClaimTxManager) isDepositMessageAllowed(deposit *etherman.Deposit) bool { for _, addr := range tm.cfg.AuthorizedClaimMessageAddresses { if deposit.OriginalAddress == addr { - log.Debugf("MessageBridge from authorized account detected: %+v, account: %s", deposit, addr.String()) + log.Infof("MessageBridge from authorized account detected: %+v, account: %s", deposit, addr.String()) return true } } - log.Debugf("MessageBridge Not authorized: %+v", deposit) + log.Infof("MessageBridge Not authorized. DepositCount: %d", deposit.DepositCount) return false } @@ -216,7 +216,7 @@ func (tm *ClaimTxManager) getNextNonce(from common.Address) (uint64, error) { return nonce, nil } -func (tm *ClaimTxManager) addClaimTx(depositCount uint, blockID uint64, from common.Address, to *common.Address, value *big.Int, data []byte, dbTx pgx.Tx) error { +func (tm *ClaimTxManager) addClaimTx(depositCount uint, from common.Address, to *common.Address, value *big.Int, data []byte, dbTx pgx.Tx) error { // get gas tx := ethereum.CallMsg{ From: from, @@ -244,7 +244,7 @@ func (tm *ClaimTxManager) addClaimTx(depositCount uint, blockID uint64, from com // create monitored tx mTx := ctmtypes.MonitoredTx{ - ID: depositCount, BlockID: blockID, From: from, To: to, + DepositID: depositCount, From: from, To: to, Nonce: nonce, Value: value, Data: data, Gas: gas, Status: ctmtypes.MonitoredTxStatusCreated, } @@ -283,7 +283,7 @@ func (tm *ClaimTxManager) monitorTxs(ctx context.Context) error { log.Infof("found %v monitored tx to process", len(mTxs)) for _, mTx := range mTxs { mTx := mTx // force variable shadowing to avoid pointer conflicts - mTxLog := log.WithFields("monitoredTx", mTx.ID) + mTxLog := log.WithFields("monitoredTx", mTx.DepositID) mTxLog.Infof("processing tx with nonce %d", mTx.Nonce) // check if any of the txs in the history was mined @@ -294,7 +294,7 @@ func (tm *ClaimTxManager) monitorTxs(ctx context.Context) error { receiptSuccessful := false for txHash := range mTx.History { - mTxLog.Infof("Checking if tx %s is mined", txHash) + mTxLog.Infof("Checking if tx %s is mined", txHash.String()) mined, receipt, err = tm.l2Node.CheckTxWasMined(ctx, txHash) if err != nil { mTxLog.Errorf("failed to check if tx %s was mined: %v", txHash.String(), err) @@ -332,22 +332,6 @@ func (tm *ClaimTxManager) monitorTxs(ctx context.Context) error { if receipt.Status == types.ReceiptStatusSuccessful { mTxLog.Infof("tx %s was mined successfully", txHash.String()) receiptSuccessful = true - block, err := tm.l2Node.BlockByNumber(ctx, receipt.BlockNumber) - if err != nil { - mTxLog.Errorf("failed to get receipt block: %v", err) - continue - } - mTx.BlockID, err = tm.storage.AddBlock(ctx, ðerman.Block{ - NetworkID: tm.l2NetworkID, - BlockNumber: block.Number().Uint64(), - BlockHash: block.Hash(), - ParentHash: block.ParentHash(), - ReceivedAt: block.ReceivedAt, - }, dbTx) - if err != nil { - mTxLog.Errorf("failed to add receipt block: %v", err) - continue - } mTx.Status = ctmtypes.MonitoredTxStatusConfirmed // update monitored tx changes into storage err = tm.storage.UpdateClaimTx(ctx, mTx, dbTx) @@ -486,7 +470,7 @@ func (tm *ClaimTxManager) monitorTxs(ctx context.Context) error { // accordingly to the current information stored and the current // state of the blockchain func (tm *ClaimTxManager) ReviewMonitoredTx(ctx context.Context, mTx *ctmtypes.MonitoredTx, reviewNonce bool) error { - mTxLog := log.WithFields("monitoredTx", mTx.ID) + mTxLog := log.WithFields("monitoredTx", mTx.DepositID) mTxLog.Debug("reviewing") // get gas tx := ethereum.CallMsg{ diff --git a/claimtxman/claimtxman_test.go b/claimtxman/claimtxman_test.go index f37abfce..0a4d1103 100644 --- a/claimtxman/claimtxman_test.go +++ b/claimtxman/claimtxman_test.go @@ -27,16 +27,6 @@ func TestMonitoredTxStorage(t *testing.T) { tx, err := pg.BeginDBTransaction(ctx) require.NoError(t, err) - block := ðerman.Block{ - BlockNumber: 1, - BlockHash: common.HexToHash("0x29e885edaf8e4b51e1d2e05f9da28161d2fb4f6b1d53827d9b80a23cf2d7d9f1"), - ParentHash: common.HexToHash("0x29e885edaf8e4b51e1d2e05f9da28161d2fb4f6b1d53827d9b80a23cf2d7d9f2"), - NetworkID: 0, - ReceivedAt: time.Now(), - } - blockID, err := pg.AddBlock(ctx, block, tx) - require.NoError(t, err) - deposit := ðerman.Deposit{ NetworkID: 0, OriginalNetwork: 0, @@ -45,7 +35,6 @@ func TestMonitoredTxStorage(t *testing.T) { DestinationNetwork: 1, DestinationAddress: common.HexToAddress("0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266"), BlockNumber: 1, - BlockID: blockID, DepositCount: 1, Metadata: common.FromHex("0x0"), } @@ -54,16 +43,15 @@ func TestMonitoredTxStorage(t *testing.T) { toAdr := common.HexToAddress("0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266") mTx := ctmtypes.MonitoredTx{ - ID: 1, - BlockID: blockID, - From: common.HexToAddress("0x6B175474E89094C44Da98b954EedeAC495271d0F"), - To: &toAdr, - Nonce: 1, - Value: big.NewInt(1000000), - Data: common.FromHex("0x0"), - Gas: 1000000, - Status: ctmtypes.MonitoredTxStatusCreated, - History: make(map[common.Hash]bool), + DepositID: 1, + From: common.HexToAddress("0x6B175474E89094C44Da98b954EedeAC495271d0F"), + To: &toAdr, + Nonce: 1, + Value: big.NewInt(1000000), + Data: common.FromHex("0x0"), + Gas: 1000000, + Status: ctmtypes.MonitoredTxStatusCreated, + History: make(map[common.Hash]bool), } err = pg.AddClaimTx(ctx, mTx, tx) require.NoError(t, err) @@ -79,16 +67,15 @@ func TestMonitoredTxStorage(t *testing.T) { require.NoError(t, err) mTx = ctmtypes.MonitoredTx{ - ID: 2, - BlockID: blockID, - From: common.HexToAddress("0x6B175474E89094C44Da98b954EedeAC495271d0F"), - To: &toAdr, - Nonce: 1, - Value: big.NewInt(1000000), - Data: common.FromHex("0x0"), - Gas: 1000000, - Status: ctmtypes.MonitoredTxStatusConfirmed, - History: make(map[common.Hash]bool), + DepositID: 2, + From: common.HexToAddress("0x6B175474E89094C44Da98b954EedeAC495271d0F"), + To: &toAdr, + Nonce: 1, + Value: big.NewInt(1000000), + Data: common.FromHex("0x0"), + Gas: 1000000, + Status: ctmtypes.MonitoredTxStatusConfirmed, + History: make(map[common.Hash]bool), } err = pg.AddClaimTx(ctx, mTx, tx) require.NoError(t, err) diff --git a/claimtxman/types/monitoredtx.go b/claimtxman/types/monitoredtx.go index ba20258f..55bf698b 100644 --- a/claimtxman/types/monitoredtx.go +++ b/claimtxman/types/monitoredtx.go @@ -39,8 +39,8 @@ func (s MonitoredTxStatus) String() string { // MonitoredTx represents a set of information used to build tx // plus information to monitor if the transactions was sent successfully type MonitoredTx struct { - // Id is the tx identifier controller by the caller - ID uint + // DepositID is the tx identifier controller by the caller + DepositID uint // From is a sender of the tx, used to identify which private key should be used to sing the tx From common.Address @@ -66,11 +66,6 @@ type MonitoredTx struct { // Status of this monitoring Status MonitoredTxStatus - // BlockID represents the block where the tx was identified - // to be mined, it's the same as the block id found in the - // tx receipt, this is used to control reorged monitored txs - BlockID uint64 - // History represent all transaction hashes from // transactions created using this struct data and // sent to the network diff --git a/db/pgstorage/migrations/0006.sql b/db/pgstorage/migrations/0006.sql index b61a79d5..a40d1110 100644 --- a/db/pgstorage/migrations/0006.sql +++ b/db/pgstorage/migrations/0006.sql @@ -6,9 +6,13 @@ DROP TABLE IF EXISTS mt.rht_temp; CREATE INDEX IF NOT EXISTS claim_block_id ON sync.claim USING btree (block_id); CREATE INDEX IF NOT EXISTS deposit_block_id ON sync.deposit USING btree (block_id); -CREATE INDEX IF NOT EXISTS monitored_txs_block_id ON sync.monitored_txs USING btree (block_id); CREATE INDEX IF NOT EXISTS token_wrapped_block_id ON sync.token_wrapped USING btree (block_id); +ALTER TABLE sync.monitored_txs +DROP COLUMN IF EXISTS block_id; +ALTER TABLE sync.monitored_txs +RENAME COLUMN id TO deposit_id; + -- +migrate Down ALTER TABLE mt.rht DROP CONSTRAINT rht_pkey; ALTER TABLE mt.root DROP CONSTRAINT root_pkey; @@ -17,5 +21,9 @@ CREATE TABLE IF NOT EXISTS mt.rht_temp AS (SELECT key, min(value), max(deposit_i DROP INDEX IF EXISTS sync.claim_block_id; DROP INDEX IF EXISTS sync.deposit_block_id; -DROP INDEX IF EXISTS sync.monitored_txs_block_id; -DROP INDEX IF EXISTS sync.token_wrapped_block_id; \ No newline at end of file +DROP INDEX IF EXISTS sync.token_wrapped_block_id; + +ALTER TABLE sync.monitored_txs +ADD COLUMN block_id BIGINT DEFAULT 0 REFERENCES sync.block (id) ON DELETE CASCADE; +ALTER TABLE sync.monitored_txs +RENAME COLUMN deposit_id TO id; diff --git a/db/pgstorage/migrations/0006_test.go b/db/pgstorage/migrations/0006_test.go index 28dad50f..fef8c389 100644 --- a/db/pgstorage/migrations/0006_test.go +++ b/db/pgstorage/migrations/0006_test.go @@ -12,6 +12,14 @@ import ( type migrationTest0006 struct{} func (m migrationTest0006) InsertData(db *sql.DB) error { + block := "INSERT INTO sync.block (id, block_num, block_hash, parent_hash, network_id, received_at) VALUES(609636, 2803824, decode('27474F16174BBE50C294FE13C190B92E42B2368A6D4AEB8A4A015F52816296C3','hex'), decode('C9B5033799ADF3739383A0489EFBE8A0D4D5E4478778A4F4304562FD51AE4C07','hex'), 1, '0001-01-01 01:00:00.000');" + if _, err := db.Exec(block); err != nil { + return err + } + insert := "INSERT INTO sync.monitored_txs (id, block_id, from_addr, to_addr, nonce, value, data, gas, status, history, created_at, updated_at) VALUES(130161, 609636, decode('34353AC3B4EB2F4DE26845ECE44733453F74CAAF','hex'), decode('F6BEEEBB578E214CA9E23B0E9683454FF88ED2A7','hex'), 45155, '', decode('2CFFD02E2C42C143213FD0E36D843D9D40866CE7BE02C671BEEC0EAE3FFD3D2638ACC87CAD3228B676F7D3CD4284A5443F17F1962B36E491B30A40B2405849E597BA5FB5B4C11951957C6F8F642C4AF61CD6B24640FEC6DC7FC607EE8206A99E92410D3021DDB9A356815C3FAC1026B6DEC5DF3124AFBADB485C9BA5A3E3398A04B7BA85560772B348ED365DB06F0733574CD1EEB40C499F589E6CB697F4C5013EFDF41131BB4E597286B6408422BC670ACCAE918EFFC64D836DFD7659A090C54A4CB8E8E198CC9405CDC50C64C07BD4C21BBD3B717540B448AA9CA6DE1CDD8A3A475CD6FFD70157E48063FC33C97A050F7F640233BF646CC98D9524C6B92BCF3AB56F839867CC5F7F196B93BAE1E27E6320742445D290F2263827498B54FEC539F756AFCEFAD4E508C098B9A7E1D8FEB19955FB02BA9675585078710969D3440F5054E0B36BE68D55E47EF092DD266AC32C93CD026408B38F973E7E2B878399AA36F64A729B2A4D1D8A59F8EF0337F976980B3A71AE487BB7AAD320B74598DE3E8359B1B12B53E2846D04411D8E3FAF24C5831C3FFE8BC07B2E620B1791A49D2C5DCEDAC7478ACEB09309F31467A671AAF20E095E63F7F3DDBFAC998A572588A35C95819F833381E59F86473AB1AC4BE6ECCE76C9FDBC30161665D8DFB120B344C968ED845E4D97C17BFFFE24471BE7DD959E3F12AD65BE053FB3D574F1E640269C13D8645B9978AD76C57DB977FE48EFB0B43110E3EBE78F9BADD9047AAC6BCD8D66A5E1D3B5C807B281E4683CC6D6315CF95B9ADE8641DEFCB32372F1C126E398EF7A5A2DCE0A8A7F68BB74560F8F71837C2C2EBBCBF7FFFB42AE1896F13F7C7479A0B46A28B6F55540F89444F63DE0378E3D121BE09E06CC9DED1C20E65876D36AA0C65E9645644786B620E2DD2AD648DDFCBF4A7E5B1A3A4ECFE7F64667A3F0B7E2F4418588ED35A2458CFFEB39B93D26F18D2AB13BDCE6AEE58E7B99359EC2DFD95A9C16DC00D6EF18B7933A6F8DC65CCB55667138776F7DEA101070DC8796E3774DF84F40AE0C8229D0D6069E5C8F39A7C299677A09D367FC7B05E3BC380EE652CDC72595F74C7B1043D0E1FFBAB734648C838DFB0527D971B602BC216C9619EF0ABF5AC974A1ED57F4050AA510DD9C74F508277B39D7973BB2DFCCC5EEB0618DB8CD74046FF337F0A7BF2C8E03E10F642C1886798D71806AB1E888D9E5EE87D0838C5655CB21C6CB83313B5A631175DFF4963772CCE9108188B34AC87C81C41E662EE4DD2DD7B2BC707961B1E646C4047669DCB6584F0D8D770DAF5D7E7DEB2E388AB20E2573D171A88108E79D820E98F26C0B84AA8B2F4AA4968DBB818EA32293237C50BA75EE485F4C22ADF2F741400BDF8D6A9CC7DF7ECAE576221665D7358448818BB4AE4562849E949E17AC16E0BE16688E156B5CF15E098C627C0056A9000000000000000000000000000000000000000000000000000000000001FC714C39479BB3A50DAC1EF581C50DFDCBAC84C5DEF27CFC2F4E311DAB8E365EA780B59D3B536A4C3F03F4892F84474FE2851E42BC60B8A47807702BCFEF7CECC7A90000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000010000000000000000000000009AF3049DD15616FD627A35563B5282BEA5C32E2000000000000000000000000000000000000000000000000000005AF3107A400000000000000000000000000000000000000000000000000000000000000005200000000000000000000000000000000000000000000000000000000000000000','hex'), 101786, 'confirmed', '{decode(''5C7838373762353766343562386466626462653832396666646562343164626333333938303835653333626461343664613338613332393738323333316562303061'',''hex'')}', '2023-10-03 10:29:08.283', '2023-10-03 10:29:09.491');" + if _, err := db.Exec(insert); err != nil { + return err + } return nil } @@ -40,7 +48,7 @@ func (m migrationTest0006) RunAssertsAfterMigrationUp(t *testing.T, db *sql.DB) var count4 int assert.Error(t, row4.Scan(&count4)) - indexes := []string{"claim_block_id", "deposit_block_id", "monitored_txs_block_id", "token_wrapped_block_id"} + indexes := []string{"claim_block_id", "deposit_block_id", "token_wrapped_block_id"} // Check indexes adding for _, idx := range indexes { // getIndex @@ -50,6 +58,17 @@ func (m migrationTest0006) RunAssertsAfterMigrationUp(t *testing.T, db *sql.DB) assert.NoError(t, row.Scan(&result)) assert.Equal(t, 1, result) } + + checkBlockID := "select block_id from sync.monitored_txs;" + row5 := db.QueryRow(checkBlockID) + var blockID int + assert.Error(t, row5.Scan(&blockID)) + + checkDepositID := "select deposit_id from sync.monitored_txs;" + row6 := db.QueryRow(checkDepositID) + var depositID int + assert.NoError(t, row6.Scan(&depositID)) + assert.Equal(t, 130161, depositID) } func (m migrationTest0006) RunAssertsAfterMigrationDown(t *testing.T, db *sql.DB) { @@ -78,7 +97,7 @@ func (m migrationTest0006) RunAssertsAfterMigrationDown(t *testing.T, db *sql.DB assert.NoError(t, row4.Scan(&count4)) assert.Equal(t, 0, count4) - indexes := []string{"claim_block_id", "deposit_block_id", "monitored_txs_block_id", "token_wrapped_block_id"} + indexes := []string{"claim_block_id", "deposit_block_id", "token_wrapped_block_id"} // Check indexes removing for _, idx := range indexes { // getIndex @@ -88,6 +107,23 @@ func (m migrationTest0006) RunAssertsAfterMigrationDown(t *testing.T, db *sql.DB assert.NoError(t, row.Scan(&result)) assert.Equal(t, 0, result) } + + checkBlockID := "select block_id from sync.monitored_txs;" + row5 := db.QueryRow(checkBlockID) + var blockID int + assert.NoError(t, row5.Scan(&blockID)) + assert.Equal(t, 0, blockID) + + checkDepositID := "select deposit_id from sync.monitored_txs;" + row6 := db.QueryRow(checkDepositID) + var depositID int + assert.Error(t, row6.Scan(&depositID)) + + checkID := "select id from sync.monitored_txs;" + row7 := db.QueryRow(checkID) + var ID int + assert.NoError(t, row7.Scan(&ID)) + assert.Equal(t, 130161, ID) } func TestMigration0006(t *testing.T) { diff --git a/db/pgstorage/pgstorage.go b/db/pgstorage/pgstorage.go index 3237f99d..35cc2430 100644 --- a/db/pgstorage/pgstorage.go +++ b/db/pgstorage/pgstorage.go @@ -494,33 +494,32 @@ func (p *PostgresStorage) UpdateL2DepositsStatus(ctx context.Context, exitRoot [ // AddClaimTx adds a claim monitored transaction to the storage. func (p *PostgresStorage) AddClaimTx(ctx context.Context, mTx ctmtypes.MonitoredTx, dbTx pgx.Tx) error { const addMonitoredTxSQL = `INSERT INTO sync.monitored_txs - (id, block_id, from_addr, to_addr, nonce, value, data, gas, status, history, created_at, updated_at) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)` - _, err := p.getExecQuerier(dbTx).Exec(ctx, addMonitoredTxSQL, mTx.ID, mTx.BlockID, mTx.From, mTx.To, mTx.Nonce, mTx.Value.String(), mTx.Data, mTx.Gas, mTx.Status, pq.Array(mTx.HistoryHashSlice()), time.Now().UTC(), time.Now().UTC()) + (deposit_id, from_addr, to_addr, nonce, value, data, gas, status, history, created_at, updated_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)` + _, err := p.getExecQuerier(dbTx).Exec(ctx, addMonitoredTxSQL, mTx.DepositID, mTx.From, mTx.To, mTx.Nonce, mTx.Value.String(), mTx.Data, mTx.Gas, mTx.Status, pq.Array(mTx.HistoryHashSlice()), time.Now().UTC(), time.Now().UTC()) return err } // UpdateClaimTx updates a claim monitored transaction in the storage. func (p *PostgresStorage) UpdateClaimTx(ctx context.Context, mTx ctmtypes.MonitoredTx, dbTx pgx.Tx) error { const updateMonitoredTxSQL = `UPDATE sync.monitored_txs - SET block_id = $2 - , from_addr = $3 - , to_addr = $4 - , nonce = $5 - , value = $6 - , data = $7 - , gas = $8 - , status = $9 - , history = $10 - , updated_at = $11 - WHERE id = $1` - _, err := p.getExecQuerier(dbTx).Exec(ctx, updateMonitoredTxSQL, mTx.ID, mTx.BlockID, mTx.From, mTx.To, mTx.Nonce, mTx.Value.String(), mTx.Data, mTx.Gas, mTx.Status, pq.Array(mTx.HistoryHashSlice()), time.Now().UTC()) + SET from_addr = $2 + , to_addr = $3 + , nonce = $4 + , value = $5 + , data = $6 + , gas = $7 + , status = $8 + , history = $9 + , updated_at = $10 + WHERE deposit_id = $1` + _, err := p.getExecQuerier(dbTx).Exec(ctx, updateMonitoredTxSQL, mTx.DepositID, mTx.From, mTx.To, mTx.Nonce, mTx.Value.String(), mTx.Data, mTx.Gas, mTx.Status, pq.Array(mTx.HistoryHashSlice()), time.Now().UTC()) return err } // GetClaimTxsByStatus gets the monitored transactions by status. func (p *PostgresStorage) GetClaimTxsByStatus(ctx context.Context, statuses []ctmtypes.MonitoredTxStatus, dbTx pgx.Tx) ([]ctmtypes.MonitoredTx, error) { - const getMonitoredTxsSQL = "SELECT * FROM sync.monitored_txs WHERE status = ANY($1) ORDER BY created_at ASC" + const getMonitoredTxsSQL = "SELECT deposit_id, from_addr, to_addr, nonce, value, data, gas, status, history, created_at, updated_at FROM sync.monitored_txs WHERE status = ANY($1) ORDER BY created_at ASC" rows, err := p.getExecQuerier(dbTx).Query(ctx, getMonitoredTxsSQL, pq.Array(statuses)) if errors.Is(err, pgx.ErrNoRows) { return []ctmtypes.MonitoredTx{}, nil @@ -535,7 +534,7 @@ func (p *PostgresStorage) GetClaimTxsByStatus(ctx context.Context, statuses []ct history [][]byte ) mTx := ctmtypes.MonitoredTx{} - err = rows.Scan(&mTx.ID, &mTx.BlockID, &mTx.From, &mTx.To, &mTx.Nonce, &value, &mTx.Data, &mTx.Gas, &mTx.Status, pq.Array(&history), &mTx.CreatedAt, &mTx.UpdatedAt) + err = rows.Scan(&mTx.DepositID, &mTx.From, &mTx.To, &mTx.Nonce, &value, &mTx.Data, &mTx.Gas, &mTx.Status, pq.Array(&history), &mTx.CreatedAt, &mTx.UpdatedAt) if err != nil { return mTxs, err }