From 5b3d383948f78b958c7fb813aa7bf6bf57502191 Mon Sep 17 00:00:00 2001 From: Debendra Oli Date: Wed, 12 Jun 2024 18:09:23 +0545 Subject: [PATCH 1/7] fix: goto last saved for icon ws error, rf: use plain http rpc instead of ws on evm --- relayer/chains/evm/client.go | 9 +++++++-- relayer/chains/icon/client.go | 3 ++- relayer/chains/icon/listener.go | 1 + 3 files changed, 10 insertions(+), 3 deletions(-) diff --git a/relayer/chains/evm/client.go b/relayer/chains/evm/client.go index 465f4fc7..7bc3dbd9 100644 --- a/relayer/chains/evm/client.go +++ b/relayer/chains/evm/client.go @@ -27,12 +27,17 @@ func newClient(ctx context.Context, connectionContract, XcallContract common.Add return nil, err } - connection, err := bridgeContract.NewConnection(connectionContract, eth) + ethRpc, err := ethclient.DialContext(ctx, rpcUrl) + if err != nil { + return nil, err + } + + connection, err := bridgeContract.NewConnection(connectionContract, ethRpc) if err != nil { return nil, fmt.Errorf("error occured when creating connection cobtract: %v ", err) } - xcall, err := bridgeContract.NewXcall(XcallContract, eth) + xcall, err := bridgeContract.NewXcall(XcallContract, ethRpc) if err != nil { return nil, fmt.Errorf("error occured when creating eth client: %v ", err) } diff --git a/relayer/chains/icon/client.go b/relayer/chains/icon/client.go index 4f8edf9b..c7765bb3 100644 --- a/relayer/chains/icon/client.go +++ b/relayer/chains/icon/client.go @@ -265,12 +265,13 @@ func (c *Client) Monitor(ctx context.Context, reqUrl string, reqPtr, respPtr int return err } defer func() { - c.log.Debug(fmt.Sprintf("Monitor finish %s", conn.LocalAddr().String())) + c.log.Debug(fmt.Sprintf("Monitor finish %s", conn.RemoteAddr().String())) c.wsClose(conn) }() if err = c.wsRequest(conn, reqPtr); err != nil { return err } + c.log.Info("Monitoring started", zap.String("address", conn.RemoteAddr().String())) conn.SetPongHandler(func(string) error { return conn.SetReadDeadline(time.Now().Add(15 * time.Second)) }) diff --git a/relayer/chains/icon/listener.go b/relayer/chains/icon/listener.go index 7cc8cb7e..197529ab 100644 --- a/relayer/chains/icon/listener.go +++ b/relayer/chains/icon/listener.go @@ -102,6 +102,7 @@ func (p *Provider) Listener(ctx context.Context, lastSavedHeight uint64, incomin if errors.Is(err, context.Canceled) { return } + eventReq.Height = types.NewHexInt(int64(p.GetLastSavedBlockHeight())) time.Sleep(time.Second * 3) reconnect() p.log.Warn("error occured during monitor event", zap.Error(err)) From 943cf3cdc8e784c100f06667629c5745a0ea377c Mon Sep 17 00:00:00 2001 From: Debendra Oli Date: Wed, 12 Jun 2024 21:16:49 +0545 Subject: [PATCH 2/7] fix: reconnect with new client when subs errors --- relayer/chains/evm/client.go | 27 ++++++++++++++++++++------- relayer/chains/evm/listener.go | 5 +++++ relayer/relay.go | 5 ++--- 3 files changed, 27 insertions(+), 10 deletions(-) diff --git a/relayer/chains/evm/client.go b/relayer/chains/evm/client.go index 7bc3dbd9..b594c483 100644 --- a/relayer/chains/evm/client.go +++ b/relayer/chains/evm/client.go @@ -27,17 +27,12 @@ func newClient(ctx context.Context, connectionContract, XcallContract common.Add return nil, err } - ethRpc, err := ethclient.DialContext(ctx, rpcUrl) - if err != nil { - return nil, err - } - - connection, err := bridgeContract.NewConnection(connectionContract, ethRpc) + connection, err := bridgeContract.NewConnection(connectionContract, eth) if err != nil { return nil, fmt.Errorf("error occured when creating connection cobtract: %v ", err) } - xcall, err := bridgeContract.NewXcall(XcallContract, ethRpc) + xcall, err := bridgeContract.NewXcall(XcallContract, eth) if err != nil { return nil, fmt.Errorf("error occured when creating eth client: %v ", err) } @@ -48,12 +43,22 @@ func newClient(ctx context.Context, connectionContract, XcallContract common.Add return nil, err } + reconnectFunc := func() (IClient, error) { + eth.Close() + newClient, err := newClient(ctx, connectionContract, XcallContract, rpcUrl, websocketUrl, l) + if err != nil { + return nil, err + } + return newClient, nil + } + return &Client{ log: l, eth: eth, EVMChainID: evmChainId, connection: connection, xcall: xcall, + reconnect: reconnectFunc, }, nil } @@ -61,9 +66,11 @@ func newClient(ctx context.Context, connectionContract, XcallContract common.Add type Client struct { log *zap.Logger eth *ethclient.Client + ethRpc *ethclient.Client EVMChainID *big.Int connection *bridgeContract.Connection xcall *bridgeContract.Xcall + reconnect func() (IClient, error) } type IClient interface { @@ -83,6 +90,7 @@ type IClient interface { EstimateGas(ctx context.Context, msg ethereum.CallMsg) (uint64, error) SendTransaction(ctx context.Context, tx *ethTypes.Transaction) error Subscribe(ctx context.Context, q ethereum.FilterQuery, ch chan<- ethTypes.Log) (ethereum.Subscription, error) + Reconnect() (IClient, error) // abiContract for connection ParseConnectionMessage(log ethTypes.Log) (*bridgeContract.ConnectionMessage, error) @@ -224,3 +232,8 @@ func (c *Client) ExecuteRollback(opts *bind.TransactOpts, sn *big.Int) (*ethType func (c *Client) Subscribe(ctx context.Context, q ethereum.FilterQuery, ch chan<- ethTypes.Log) (ethereum.Subscription, error) { return c.eth.SubscribeFilterLogs(ctx, q, ch) } + +// Reconnect +func (c *Client) Reconnect() (IClient, error) { + return c.reconnect() +} diff --git a/relayer/chains/evm/listener.go b/relayer/chains/evm/listener.go index 4037b136..d359d7a4 100644 --- a/relayer/chains/evm/listener.go +++ b/relayer/chains/evm/listener.go @@ -59,6 +59,11 @@ func (p *Provider) Listener(ctx context.Context, lastSavedHeight uint64, blockIn resetFunc = func() { isSubError = true subscribeStart.Reset(time.Second * 3) + client, err := p.client.Reconnect() + if err != nil { + p.log.Error("failed to reconnect", zap.Error(err)) + } + p.client = client } ) diff --git a/relayer/relay.go b/relayer/relay.go index 7024f3fe..ece1e10e 100644 --- a/relayer/relay.go +++ b/relayer/relay.go @@ -225,8 +225,7 @@ func (r *Relayer) processMessages(ctx context.Context) { } if ok := dst.shouldSendMessage(ctx, message, src); !ok { - // debug log - r.log.Debug("message not sent to destination", zap.Any("message", message)) + r.log.Debug("processing", zap.Any("message", message)) continue } @@ -235,7 +234,7 @@ func (r *Relayer) processMessages(ctx context.Context) { // if message reached delete the message messageReceived, err := dst.Provider.MessageReceived(ctx, &key) if err != nil { - dst.log.Error("error occured when checking message received", zap.Error(err)) + dst.log.Error("error occured when checking message received", zap.String("src", message.Src), zap.Uint64("sn", message.Sn.Uint64()), zap.Error(err)) message.ToggleProcessing() continue } From f08155007675d4bf405900e51e379e86ce7f5c7d Mon Sep 17 00:00:00 2001 From: Debendra Oli Date: Thu, 13 Jun 2024 11:39:40 +0545 Subject: [PATCH 3/7] fix: reconnect retry --- relayer/chains/evm/listener.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/relayer/chains/evm/listener.go b/relayer/chains/evm/listener.go index d359d7a4..68503705 100644 --- a/relayer/chains/evm/listener.go +++ b/relayer/chains/evm/listener.go @@ -62,8 +62,9 @@ func (p *Provider) Listener(ctx context.Context, lastSavedHeight uint64, blockIn client, err := p.client.Reconnect() if err != nil { p.log.Error("failed to reconnect", zap.Error(err)) + } else { + p.client = client } - p.client = client } ) @@ -107,11 +108,13 @@ func (p *Provider) Listener(ctx context.Context, lastSavedHeight uint64, blockIn Addresses: p.blockReq.Addresses, Topics: p.blockReq.Topics, } + p.log.Info("syncing", zap.Uint64("start", br.start), zap.Uint64("end", br.end), zap.Uint64("latest", latestHeight)) logs, err := p.getLogsRetry(ctx, filter, br.retry) if err != nil { p.log.Warn("failed to fetch blocks", zap.Uint64("from", br.start), zap.Uint64("to", br.end), zap.Error(err)) continue } + p.log.Info("synced", zap.Uint64("start", br.start), zap.Uint64("end", br.end), zap.Uint64("latest", latestHeight)) for _, log := range logs { message, err := p.getRelayMessageFromLog(log) if err != nil { From e32852a3ca9f92bc70f99d9a451cdcc9866d56e5 Mon Sep 17 00:00:00 2001 From: Debendra Oli Date: Thu, 13 Jun 2024 16:12:14 +0545 Subject: [PATCH 4/7] fix: check connection and retry --- relayer/chains/evm/listener.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/relayer/chains/evm/listener.go b/relayer/chains/evm/listener.go index 68503705..0900fca3 100644 --- a/relayer/chains/evm/listener.go +++ b/relayer/chains/evm/listener.go @@ -256,6 +256,12 @@ func (p *Provider) Subscribe(ctx context.Context, blockInfoChan chan *relayertyp Height: log.BlockNumber, Messages: []*relayertypes.Message{message}, } + case <-time.After(time.Minute): + if _, err := p.client.GetHeaderByHeight(ctx, big.NewInt(1)); err != nil { + p.log.Error("connection error", zap.Error(err)) + resetFunc() + return err + } } } } From 9d63e804ed6ca4fb6b7b25970e668cf4fa4d3a65 Mon Sep 17 00:00:00 2001 From: Debendra Oli Date: Thu, 13 Jun 2024 16:14:42 +0545 Subject: [PATCH 5/7] rf: timeout for connection --- relayer/chains/evm/listener.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/relayer/chains/evm/listener.go b/relayer/chains/evm/listener.go index 0900fca3..d62580eb 100644 --- a/relayer/chains/evm/listener.go +++ b/relayer/chains/evm/listener.go @@ -256,7 +256,9 @@ func (p *Provider) Subscribe(ctx context.Context, blockInfoChan chan *relayertyp Height: log.BlockNumber, Messages: []*relayertypes.Message{message}, } - case <-time.After(time.Minute): + case <-time.After(time.Minute * 2): + ctx, cancel := context.WithTimeout(ctx, defaultReadTimeout) + defer cancel() if _, err := p.client.GetHeaderByHeight(ctx, big.NewInt(1)); err != nil { p.log.Error("connection error", zap.Error(err)) resetFunc() From d8f1474e283b38277e615efb48d3a25e61fc6039 Mon Sep 17 00:00:00 2001 From: Debendra Oli Date: Thu, 13 Jun 2024 16:54:09 +0545 Subject: [PATCH 6/7] fix: poll batch size --- relayer/chains/evm/listener.go | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/relayer/chains/evm/listener.go b/relayer/chains/evm/listener.go index d62580eb..bbdab809 100644 --- a/relayer/chains/evm/listener.go +++ b/relayer/chains/evm/listener.go @@ -82,13 +82,9 @@ func (p *Provider) Listener(ctx context.Context, lastSavedHeight uint64, blockIn } var blockReqs []*blockReq - for i := startHeight; i < latestHeight; i += p.cfg.BlockBatchSize { - end := i + p.cfg.BlockBatchSize - if end > latestHeight { - end = latestHeight - } - blockReqs = append(blockReqs, &blockReq{start: i, end: end, retry: maxBlockQueryFailedRetry}) - i = end + 1 + for start := startHeight; start <= latestHeight; start += p.cfg.BlockBatchSize { + end := min(start+p.cfg.BlockBatchSize-1, latestHeight) + blockReqs = append(blockReqs, &blockReq{start, end, nil, maxBlockQueryFailedRetry}) } totalReqs := len(blockReqs) // Calculate the size of each chunk From 7a389652f61c7b8dfcf15f4f60eae7ddc3dfd28b Mon Sep 17 00:00:00 2001 From: Debendra Oli Date: Thu, 13 Jun 2024 18:10:22 +0545 Subject: [PATCH 7/7] rf: time checker for dead eth client --- relayer/chains/evm/listener.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/relayer/chains/evm/listener.go b/relayer/chains/evm/listener.go index bbdab809..563b4a95 100644 --- a/relayer/chains/evm/listener.go +++ b/relayer/chains/evm/listener.go @@ -58,7 +58,7 @@ func (p *Provider) Listener(ctx context.Context, lastSavedHeight uint64, blockIn concurrency = p.GetConcurrency(ctx, startHeight, latestHeight) resetFunc = func() { isSubError = true - subscribeStart.Reset(time.Second * 3) + subscribeStart.Reset(time.Second * 1) client, err := p.client.Reconnect() if err != nil { p.log.Error("failed to reconnect", zap.Error(err)) @@ -253,8 +253,6 @@ func (p *Provider) Subscribe(ctx context.Context, blockInfoChan chan *relayertyp Messages: []*relayertypes.Message{message}, } case <-time.After(time.Minute * 2): - ctx, cancel := context.WithTimeout(ctx, defaultReadTimeout) - defer cancel() if _, err := p.client.GetHeaderByHeight(ctx, big.NewInt(1)); err != nil { p.log.Error("connection error", zap.Error(err)) resetFunc()