From 6abdd5d9d71a5e6557d0c93a4257e4dc6d7e6a89 Mon Sep 17 00:00:00 2001 From: Andrew Gouin Date: Mon, 20 Nov 2023 15:58:14 -0700 Subject: [PATCH] tm websocket first pass --- .../chains/cosmos/cosmos_chain_processor.go | 163 ++++++++++++++++++ relayer/chains/cosmos/provider.go | 32 +++- relayer/chains/cosmos/tx.go | 15 +- relayer/processor/path_end_runtime.go | 10 +- 4 files changed, 201 insertions(+), 19 deletions(-) diff --git a/relayer/chains/cosmos/cosmos_chain_processor.go b/relayer/chains/cosmos/cosmos_chain_processor.go index 14ab75a0b8..83c3c1ac31 100644 --- a/relayer/chains/cosmos/cosmos_chain_processor.go +++ b/relayer/chains/cosmos/cosmos_chain_processor.go @@ -18,6 +18,7 @@ import ( "github.com/cosmos/relayer/v2/relayer/provider" ctypes "github.com/cometbft/cometbft/rpc/core/types" + comettypes "github.com/cometbft/cometbft/types" "github.com/cosmos/relayer/v2/relayer/chains" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -272,6 +273,10 @@ func (ccp *CosmosChainProcessor) Run(ctx context.Context, initialBlockHistory ui ccp.log.Debug("Entering main query loop") + if err := ccp.subscribe(ctx); err != nil { + ccp.log.Warn("Error subscribing to websocket, falling back to rpc polling", zap.Error(err)) + } + ticker := time.NewTicker(persistence.minQueryLoopDuration) defer ticker.Stop() @@ -288,6 +293,164 @@ func (ccp *CosmosChainProcessor) Run(ctx context.Context, initialBlockHistory ui } } +func (ccp CosmosChainProcessor) periodicTMVersionCheck(ctx context.Context) { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + status, err := ccp.nodeStatusWithRetry(ctx) + if err != nil { + ccp.log.Error( + "Failed to query node status after max attempts", + zap.Uint("attempts", latestHeightQueryRetries), + zap.Error(err), + ) + continue + } + + ccp.chainProvider.setCometVersion(ccp.log, status.NodeInfo.Version) + } + } + +} + +func (ccp *CosmosChainProcessor) subscribe(ctx context.Context) error { + + // latestHeight, err := ccp.latestHeightWithRetry(ctx) + // if err != nil { + // return err + // } + // ibcHeader, err := ccp.chainProvider.QueryIBCHeader(ctx, latestHeight) + // if err != nil { + // return err + // } + + // valSet := ibcHeader.(provider.TendermintIBCHeader).ValidatorSet + + headerChan, err := ccp.chainProvider.RPCClient.Subscribe(ctx, "header", comettypes.QueryForEvent(comettypes.EventNewBlockHeader).String()) + if err != nil { + return fmt.Errorf("failed to subscribe to blocks over websocket: %w", err) + } + + txChan, err := ccp.chainProvider.RPCClient.Subscribe(ctx, "tx", comettypes.QueryForEvent(comettypes.EventTx).String()) + if err != nil { + return fmt.Errorf("failed to subscribe to blocks over websocket: %w", err) + } + + // periodically check the tendermint version of the node + go ccp.periodicTMVersionCheck(ctx) + + // valSetChan, err := ccp.chainProvider.RPCClient.Subscribe(ctx, "vals", comettypes.QueryForEvent(comettypes.EventValidatorSetUpdates).String()) + // if err != nil { + // return fmt.Errorf("failed to subscribe to blocks over websocket: %w", err) + // } + + chainID := ccp.chainProvider.ChainId() + + for { + ibcMessagesCache := processor.NewIBCMessagesCache() + ibcHeaderCache := make(processor.IBCHeaderCache) + + var latestHeader provider.TendermintIBCHeader + + select { + case <-ctx.Done(): + return nil + case event := <-headerChan: + headerEvent := event.Data.(comettypes.EventDataNewBlockHeader) + + ccp.log.Debug("Received new block header event", zap.Int64("height", headerEvent.Header.Height)) + + // TODO try to avoid this query by getting the SignedHeader via websocket. + // If we can't, add retry logic. + ibcHeader, err := ccp.chainProvider.QueryIBCHeader(ctx, int64(headerEvent.Header.Height)) + if err != nil { + ccp.log.Error("Error querying IBC header", + zap.Int64("height", headerEvent.Header.Height), + zap.Error(err), + ) + continue + } + + latestHeader = ibcHeader.(provider.TendermintIBCHeader) + + heightUint64 := uint64(headerEvent.Header.Height) + + blockMsgs := ccp.ibcMessagesFromBlockEvents( + headerEvent.ResultBeginBlock.Events, + headerEvent.ResultEndBlock.Events, + heightUint64, + ccp.chainProvider.cometLegacyEncoding, + ) + for _, m := range blockMsgs { + ccp.handleMessage(ctx, m, ibcMessagesCache) + } + + ccp.latestBlock = provider.LatestBlock{ + Height: heightUint64, + Time: headerEvent.Header.Time, + } + + // latestHeader = provider.TendermintIBCHeader{ + // ValidatorSet: valSet, + // SignedHeader: &comettypes.SignedHeader{ + // Header: &headerEvent.Header, + // // TODO: how do we get Commit without querying light block? + // // Commit: , + // }, + // } + + ibcHeaderCache[heightUint64] = latestHeader + case event := <-txChan: + ccp.log.Debug("Received new tx event") + + txEvent := event.Data.(comettypes.EventDataTx) + + tx := txEvent.Result + if tx.Code != 0 { + // tx was not successful + continue + } + messages := chains.IbcMessagesFromEvents(ccp.log, tx.Events, chainID, uint64(txEvent.Height), ccp.chainProvider.cometLegacyEncoding) + + for _, m := range messages { + ccp.handleMessage(ctx, m, ibcMessagesCache) + } + // case event := <-valSetChan: + // ccp.log.Debug("Received validator set change event") + // vsEvent := event.Data.(comettypes.EventDataValidatorSetUpdates) + // valSet.UpdateWithChangeSet(vsEvent.ValidatorUpdates) + // continue + } + + for _, pp := range ccp.pathProcessors { + clientID := pp.RelevantClientID(chainID) + clientState, err := ccp.clientState(ctx, clientID) + if err != nil { + ccp.log.Error("Error fetching client state", + zap.String("client_id", clientID), + zap.Error(err), + ) + continue + } + + pp.HandleNewData(chainID, processor.ChainProcessorCacheData{ + LatestBlock: ccp.latestBlock, + LatestHeader: latestHeader, + IBCMessagesCache: ibcMessagesCache.Clone(), + InSync: ccp.inSync, + ClientState: clientState, + ConnectionStateCache: ccp.connectionStateCache.FilterForClient(clientID), + ChannelStateCache: ccp.channelStateCache.FilterForClient(clientID, ccp.channelConnections, ccp.connectionClients), + IBCHeaderCache: ibcHeaderCache.Clone(), + }) + } + } +} + // initializeConnectionState will bootstrap the connectionStateCache with the open connection state. func (ccp *CosmosChainProcessor) initializeConnectionState(ctx context.Context) error { ctx, cancel := context.WithTimeout(ctx, queryStateTimeout) diff --git a/relayer/chains/cosmos/provider.go b/relayer/chains/cosmos/provider.go index f7c82b9dd3..69458a561e 100644 --- a/relayer/chains/cosmos/provider.go +++ b/relayer/chains/cosmos/provider.go @@ -162,8 +162,30 @@ type CosmosProvider struct { } type WalletState struct { - NextAccountSequence uint64 - Mu sync.Mutex + nextAccountSequence uint64 + mu sync.RWMutex +} + +func (ws *WalletState) UpdateNextAccountSequence(seq uint64) { + ws.mu.Lock() + defer ws.mu.Unlock() + if seq > ws.nextAccountSequence { + ws.nextAccountSequence = seq + } +} + +func (ws *WalletState) SetNextAccountSequence(seq uint64) { + ws.mu.Lock() + defer ws.mu.Unlock() + if seq > ws.nextAccountSequence { + ws.nextAccountSequence = seq + } +} + +func (ws *WalletState) NextAccountSequence() uint64 { + ws.mu.RLock() + defer ws.mu.RUnlock() + return ws.nextAccountSequence } func (cc *CosmosProvider) ProviderConfig() provider.ProviderConfig { @@ -367,12 +389,6 @@ func (cc *CosmosProvider) SetMetrics(m *processor.PrometheusMetrics) { cc.metrics = m } -func (cc *CosmosProvider) updateNextAccountSequence(sequenceGuard *WalletState, seq uint64) { - if seq > sequenceGuard.NextAccountSequence { - sequenceGuard.NextAccountSequence = seq - } -} - func (cc *CosmosProvider) setCometVersion(log *zap.Logger, version string) { cc.cometLegacyEncoding = cc.legacyEncodedEvents(log, version) cc.cometLegacyBlockResults = cc.legacyBlockResults(version) diff --git a/relayer/chains/cosmos/tx.go b/relayer/chains/cosmos/tx.go index d0306bf0a5..e799db234f 100644 --- a/relayer/chains/cosmos/tx.go +++ b/relayer/chains/cosmos/tx.go @@ -169,8 +169,6 @@ func (cc *CosmosProvider) SendMessagesToMempool( } sequenceGuard := ensureSequenceGuard(cc, txSignerKey) - sequenceGuard.Mu.Lock() - defer sequenceGuard.Mu.Unlock() txBytes, sequence, fees, err := cc.buildMessages(ctx, msgs, memo, 0, txSignerKey, feegranterKey, sequenceGuard) if err != nil { @@ -190,8 +188,8 @@ func (cc *CosmosProvider) SendMessagesToMempool( return err } - // we had a successful tx broadcast with this sequence, so update it to the next - cc.updateNextAccountSequence(sequenceGuard, sequence+1) + sequenceGuard.UpdateNextAccountSequence(sequence + 1) + return nil } @@ -617,9 +615,10 @@ func (cc *CosmosProvider) buildMessages( } sequence = txf.Sequence() - cc.updateNextAccountSequence(sequenceGuard, sequence) - if sequence < sequenceGuard.NextAccountSequence { - sequence = sequenceGuard.NextAccountSequence + sequenceGuard.UpdateNextAccountSequence(sequence) + nas := sequenceGuard.NextAccountSequence() + if sequence < nas { + sequence = nas txf = txf.WithSequence(sequence) } @@ -684,7 +683,7 @@ func (cc *CosmosProvider) handleAccountSequenceMismatchError(sequenceGuard *Wall if err != nil { return } - sequenceGuard.NextAccountSequence = nextSeq + sequenceGuard.SetNextAccountSequence(nextSeq) } // MsgCreateClient creates an sdk.Msg to update the client on src with consensus state from dst diff --git a/relayer/processor/path_end_runtime.go b/relayer/processor/path_end_runtime.go index d91307f121..6c6e5cff8b 100644 --- a/relayer/processor/path_end_runtime.go +++ b/relayer/processor/path_end_runtime.go @@ -375,7 +375,9 @@ func (pathEnd *pathEndRuntime) mergeCacheData(ctx context.Context, cancel func() pathEnd.lastClientUpdateHeightMu.Unlock() pathEnd.inSync = d.InSync - pathEnd.latestHeader = d.LatestHeader + if d.LatestHeader != nil { + pathEnd.latestHeader = d.LatestHeader + } pathEnd.clientState = d.ClientState terminate, err := pathEnd.checkForMisbehaviour(ctx, pathEnd.clientState, counterParty) @@ -407,8 +409,10 @@ func (pathEnd *pathEndRuntime) mergeCacheData(ctx context.Context, cancel func() pathEnd.mergeMessageCache(d.IBCMessagesCache, counterpartyChainID, pathEnd.inSync && counterpartyInSync) // Merge incoming packet IBC messages into the backlog - pathEnd.ibcHeaderCache.Merge(d.IBCHeaderCache) // Update latest IBC header state - pathEnd.ibcHeaderCache.Prune(ibcHeadersToCache) // Only keep most recent IBC headers + if d.IBCHeaderCache != nil { + pathEnd.ibcHeaderCache.Merge(d.IBCHeaderCache) // Update latest IBC header state + pathEnd.ibcHeaderCache.Prune(ibcHeadersToCache) // Only keep most recent IBC headers + } } // shouldSendPacketMessage determines if the packet flow message should be sent now.