Skip to content

Commit

Permalink
concurrent get blocks
Browse files Browse the repository at this point in the history
  • Loading branch information
colinlyguo committed Dec 12, 2023
1 parent b52f665 commit 55ff91f
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 20 deletions.
20 changes: 11 additions & 9 deletions bridge-history-api/internal/controller/fetcher/l1_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,13 @@ func (c *L1MessageFetcher) doFetchAndSaveEvents(ctx context.Context, from uint64
log.Info("fetch and save L1 events", "from", from, "to", to)
var l1FailedGatewayRouterTxs []*orm.CrossMessage
blockTimestampsMap := make(map[uint64]uint64)
for number := from; number <= to; number++ {
blockNumber := new(big.Int).SetUint64(number)
block, err := c.client.BlockByNumber(ctx, blockNumber)
if err != nil {
log.Error("failed to get block by number", "number", blockNumber.String(), "err", err)
return err
}
blocks, err := utils.GetL1BlocksInRange(c.ctx, c.client, from, to)
if err != nil {
log.Error("failed to get L1 blocks in range", "from", from, "to", to, "err", err)
return err
}
for i := from; i <= to; i++ {
block := blocks[i-from]
blockTimestampsMap[block.NumberU64()] = block.Time()

for _, tx := range block.Transactions() {
Expand All @@ -143,7 +143,8 @@ func (c *L1MessageFetcher) doFetchAndSaveEvents(ctx context.Context, from uint64
}
toAddress := to.String()
if toAddress == c.cfg.GatewayRouterAddr {
receipt, err := c.client.TransactionReceipt(ctx, tx.Hash())
var receipt *types.Receipt
receipt, err = c.client.TransactionReceipt(ctx, tx.Hash())
if err != nil {
log.Error("Failed to get transaction receipt", "txHash", tx.Hash().String(), "err", err)
return err
Expand All @@ -152,7 +153,8 @@ func (c *L1MessageFetcher) doFetchAndSaveEvents(ctx context.Context, from uint64
// Check if the transaction failed
if receipt.Status == types.ReceiptStatusFailed {
signer := types.NewLondonSigner(new(big.Int).SetUint64(tx.ChainId().Uint64()))
sender, err := signer.Sender(tx)
var sender common.Address
sender, err = signer.Sender(tx)
if err != nil {
log.Error("get sender failed", "chain id", tx.ChainId().Uint64(), "tx hash", tx.Hash().String(), "err", err)
return err
Expand Down
24 changes: 13 additions & 11 deletions bridge-history-api/internal/controller/fetcher/l2_fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/scroll-tech/go-ethereum/crypto"
"github.com/scroll-tech/go-ethereum/ethclient"
"github.com/scroll-tech/go-ethereum/log"
"github.com/scroll-tech/go-ethereum/rpc"
"gorm.io/gorm"

backendabi "scroll-tech/bridge-history-api/abi"
Expand Down Expand Up @@ -120,13 +119,13 @@ func (c *L2MessageFetcher) doFetchAndSaveEvents(ctx context.Context, from uint64
var l2FailedGatewayRouterTxs []*orm.CrossMessage
var l2RevertedRelayedMessages []*orm.CrossMessage
blockTimestampsMap := make(map[uint64]uint64)
for number := from; number <= to; number++ {
blockNumber := new(big.Int).SetUint64(number)
block, err := c.client.GetBlockByNumberOrHash(ctx, rpc.BlockNumberOrHashWithNumber(rpc.BlockNumber(number)))
if err != nil {
log.Error("failed to get block by number", "number", blockNumber.String(), "err", err)
return err
}
blocks, err := utils.GetL2BlocksInRange(c.ctx, c.client, from, to)
if err != nil {
log.Error("failed to get L2 blocks in range", "from", from, "to", to, "err", err)
return err
}
for i := from; i <= to; i++ {
block := blocks[i-from]
blockTimestampsMap[block.NumberU64()] = block.Time()

for _, tx := range block.Transactions() {
Expand All @@ -136,7 +135,8 @@ func (c *L2MessageFetcher) doFetchAndSaveEvents(ctx context.Context, from uint64
}
toAddress := to.String()
if toAddress == c.cfg.GatewayRouterAddr {
receipt, err := c.client.TransactionReceipt(ctx, tx.Hash())
var receipt *types.Receipt
receipt, err = c.client.TransactionReceipt(ctx, tx.Hash())
if err != nil {
log.Error("Failed to get transaction receipt", "txHash", tx.Hash().String(), "err", err)
return err
Expand All @@ -145,7 +145,8 @@ func (c *L2MessageFetcher) doFetchAndSaveEvents(ctx context.Context, from uint64
// Check if the transaction failed
if receipt.Status == types.ReceiptStatusFailed {
signer := types.NewLondonSigner(new(big.Int).SetUint64(tx.ChainId().Uint64()))
sender, err := signer.Sender(tx)
var sender common.Address
sender, err = signer.Sender(tx)
if err != nil {
log.Error("get sender failed", "chain id", tx.ChainId().Uint64(), "tx hash", tx.Hash().String(), "err", err)
return err
Expand All @@ -162,7 +163,8 @@ func (c *L2MessageFetcher) doFetchAndSaveEvents(ctx context.Context, from uint64
}
}
if tx.Type() == types.L1MessageTxType {
receipt, err := c.client.TransactionReceipt(ctx, tx.Hash())
var receipt *types.Receipt
receipt, err = c.client.TransactionReceipt(ctx, tx.Hash())
if err != nil {
log.Error("Failed to get transaction receipt", "txHash", tx.Hash().String(), "err", err)
return err
Expand Down
67 changes: 67 additions & 0 deletions bridge-history-api/internal/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import (
"github.com/scroll-tech/go-ethereum/core/types"
"github.com/scroll-tech/go-ethereum/crypto"
"github.com/scroll-tech/go-ethereum/ethclient"
"github.com/scroll-tech/go-ethereum/log"
"github.com/scroll-tech/go-ethereum/rpc"
"golang.org/x/sync/errgroup"

backendabi "scroll-tech/bridge-history-api/abi"
)
Expand Down Expand Up @@ -111,3 +114,67 @@ func GetBatchRangeFromCalldata(calldata []byte) (uint64, uint64, error) {

return startBlock, finishBlock, err
}

// GetL1BlocksInRange gets a batch of blocks for a block range [start, end] inclusive.
func GetL1BlocksInRange(ctx context.Context, cli *ethclient.Client, start, end uint64) ([]*types.Block, error) {
var (
eg errgroup.Group
blocks = make([]*types.Block, end-start+1)
concurrency = 32
sem = make(chan struct{}, concurrency)
)

for i := start; i <= end; i++ {
sem <- struct{}{} // Acquire a slot in the semaphore
blockNum := int64(i)
index := i - start
eg.Go(func() error {
defer func() { <-sem }() // Release the slot when done
block, err := cli.BlockByNumber(ctx, big.NewInt(blockNum))
if err != nil {
log.Error("Failed to fetch block number", "number", blockNum, "error", err)
return err
}
blocks[index] = block
return nil
})
}

if err := eg.Wait(); err != nil {
log.Error("Error waiting for block fetching routines", "error", err)
return nil, err
}
return blocks, nil
}

// GetL2BlocksInRange gets a batch of blocks for a block range [start, end] inclusive.
func GetL2BlocksInRange(ctx context.Context, cli *ethclient.Client, start, end uint64) ([]*types.BlockWithRowConsumption, error) {
var (
eg errgroup.Group
blocks = make([]*types.BlockWithRowConsumption, end-start+1)
concurrency = 32
sem = make(chan struct{}, concurrency)
)

for i := start; i <= end; i++ {
sem <- struct{}{} // Acquire a slot in the semaphore
blockNum := rpc.BlockNumberOrHashWithNumber(rpc.BlockNumber(i))
index := i - start
eg.Go(func() error {
defer func() { <-sem }() // Release the slot when done
block, err := cli.GetBlockByNumberOrHash(ctx, blockNum)
if err != nil {
log.Error("Failed to fetch block number", "number", blockNum, "error", err)
return err
}
blocks[index] = block
return nil
})
}

if err := eg.Wait(); err != nil {
log.Error("Error waiting for block fetching routines", "error", err)
return nil, err
}
return blocks, nil
}

0 comments on commit 55ff91f

Please sign in to comment.