Skip to content

Commit

Permalink
Merge pull request #252 from icon-project/fix/rpc-timeout
Browse files Browse the repository at this point in the history
fix: icon ws error and use plain http rpc
  • Loading branch information
debendraoli authored Jun 14, 2024
2 parents 5f1ab86 + 7a38965 commit 92fc521
Show file tree
Hide file tree
Showing 5 changed files with 41 additions and 12 deletions.
18 changes: 18 additions & 0 deletions relayer/chains/evm/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,22 +43,34 @@ func newClient(ctx context.Context, connectionContract, XcallContract common.Add
return nil, err
}

reconnectFunc := func() (IClient, error) {
eth.Close()
newClient, err := newClient(ctx, connectionContract, XcallContract, rpcUrl, websocketUrl, l)
if err != nil {
return nil, err
}
return newClient, nil
}

return &Client{
log: l,
eth: eth,
EVMChainID: evmChainId,
connection: connection,
xcall: xcall,
reconnect: reconnectFunc,
}, nil
}

// grouped rpc api clients
type Client struct {
log *zap.Logger
eth *ethclient.Client
ethRpc *ethclient.Client
EVMChainID *big.Int
connection *bridgeContract.Connection
xcall *bridgeContract.Xcall
reconnect func() (IClient, error)
}

type IClient interface {
Expand All @@ -78,6 +90,7 @@ type IClient interface {
EstimateGas(ctx context.Context, msg ethereum.CallMsg) (uint64, error)
SendTransaction(ctx context.Context, tx *ethTypes.Transaction) error
Subscribe(ctx context.Context, q ethereum.FilterQuery, ch chan<- ethTypes.Log) (ethereum.Subscription, error)
Reconnect() (IClient, error)

// abiContract for connection
ParseConnectionMessage(log ethTypes.Log) (*bridgeContract.ConnectionMessage, error)
Expand Down Expand Up @@ -219,3 +232,8 @@ func (c *Client) ExecuteRollback(opts *bind.TransactOpts, sn *big.Int) (*ethType
func (c *Client) Subscribe(ctx context.Context, q ethereum.FilterQuery, ch chan<- ethTypes.Log) (ethereum.Subscription, error) {
return c.eth.SubscribeFilterLogs(ctx, q, ch)
}

// Reconnect
func (c *Client) Reconnect() (IClient, error) {
return c.reconnect()
}
26 changes: 18 additions & 8 deletions relayer/chains/evm/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,13 @@ func (p *Provider) Listener(ctx context.Context, lastSavedHeight uint64, blockIn
concurrency = p.GetConcurrency(ctx, startHeight, latestHeight)
resetFunc = func() {
isSubError = true
subscribeStart.Reset(time.Second * 3)
subscribeStart.Reset(time.Second * 1)
client, err := p.client.Reconnect()
if err != nil {
p.log.Error("failed to reconnect", zap.Error(err))
} else {
p.client = client
}
}
)

Expand All @@ -76,13 +82,9 @@ func (p *Provider) Listener(ctx context.Context, lastSavedHeight uint64, blockIn
}

var blockReqs []*blockReq
for i := startHeight; i < latestHeight; i += p.cfg.BlockBatchSize {
end := i + p.cfg.BlockBatchSize
if end > latestHeight {
end = latestHeight
}
blockReqs = append(blockReqs, &blockReq{start: i, end: end, retry: maxBlockQueryFailedRetry})
i = end + 1
for start := startHeight; start <= latestHeight; start += p.cfg.BlockBatchSize {
end := min(start+p.cfg.BlockBatchSize-1, latestHeight)
blockReqs = append(blockReqs, &blockReq{start, end, nil, maxBlockQueryFailedRetry})
}
totalReqs := len(blockReqs)
// Calculate the size of each chunk
Expand All @@ -102,11 +104,13 @@ func (p *Provider) Listener(ctx context.Context, lastSavedHeight uint64, blockIn
Addresses: p.blockReq.Addresses,
Topics: p.blockReq.Topics,
}
p.log.Info("syncing", zap.Uint64("start", br.start), zap.Uint64("end", br.end), zap.Uint64("latest", latestHeight))
logs, err := p.getLogsRetry(ctx, filter, br.retry)
if err != nil {
p.log.Warn("failed to fetch blocks", zap.Uint64("from", br.start), zap.Uint64("to", br.end), zap.Error(err))
continue
}
p.log.Info("synced", zap.Uint64("start", br.start), zap.Uint64("end", br.end), zap.Uint64("latest", latestHeight))
for _, log := range logs {
message, err := p.getRelayMessageFromLog(log)
if err != nil {
Expand Down Expand Up @@ -248,6 +252,12 @@ func (p *Provider) Subscribe(ctx context.Context, blockInfoChan chan *relayertyp
Height: log.BlockNumber,
Messages: []*relayertypes.Message{message},
}
case <-time.After(time.Minute * 2):
if _, err := p.client.GetHeaderByHeight(ctx, big.NewInt(1)); err != nil {
p.log.Error("connection error", zap.Error(err))
resetFunc()
return err
}
}
}
}
3 changes: 2 additions & 1 deletion relayer/chains/icon/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,12 +265,13 @@ func (c *Client) Monitor(ctx context.Context, reqUrl string, reqPtr, respPtr int
return err
}
defer func() {
c.log.Debug(fmt.Sprintf("Monitor finish %s", conn.LocalAddr().String()))
c.log.Debug(fmt.Sprintf("Monitor finish %s", conn.RemoteAddr().String()))
c.wsClose(conn)
}()
if err = c.wsRequest(conn, reqPtr); err != nil {
return err
}
c.log.Info("Monitoring started", zap.String("address", conn.RemoteAddr().String()))
conn.SetPongHandler(func(string) error {
return conn.SetReadDeadline(time.Now().Add(15 * time.Second))
})
Expand Down
1 change: 1 addition & 0 deletions relayer/chains/icon/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func (p *Provider) Listener(ctx context.Context, lastSavedHeight uint64, incomin
if errors.Is(err, context.Canceled) {
return
}
eventReq.Height = types.NewHexInt(int64(p.GetLastSavedBlockHeight()))
time.Sleep(time.Second * 3)
reconnect()
p.log.Warn("error occured during monitor event", zap.Error(err))
Expand Down
5 changes: 2 additions & 3 deletions relayer/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,7 @@ func (r *Relayer) processMessages(ctx context.Context) {
}

if ok := dst.shouldSendMessage(ctx, message, src); !ok {
// debug log
r.log.Debug("message not sent to destination", zap.Any("message", message))
r.log.Debug("processing", zap.Any("message", message))
continue
}

Expand All @@ -235,7 +234,7 @@ func (r *Relayer) processMessages(ctx context.Context) {
// if message reached delete the message
messageReceived, err := dst.Provider.MessageReceived(ctx, &key)
if err != nil {
dst.log.Error("error occured when checking message received", zap.Error(err))
dst.log.Error("error occured when checking message received", zap.String("src", message.Src), zap.Uint64("sn", message.Sn.Uint64()), zap.Error(err))
message.ToggleProcessing()
continue
}
Expand Down

0 comments on commit 92fc521

Please sign in to comment.