From e831ee347ae5c1e35eb1928c7bf5fd5c2978b248 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Sat, 2 Nov 2024 23:24:57 +0400 Subject: [PATCH 01/32] feat!: use blocks streaming API instead of RPC in fetcher --- core/client.go | 20 +-- core/exchange_test.go | 11 +- core/fetcher.go | 234 +++++++++++++++++++++++++++-------- core/fetcher_no_race_test.go | 10 +- core/fetcher_test.go | 9 +- core/header_test.go | 8 +- core/listener.go | 12 +- core/testing.go | 20 +-- go.mod | 4 +- go.sum | 4 - nodebuilder/core/module.go | 6 - 11 files changed, 218 insertions(+), 120 deletions(-) diff --git a/core/client.go b/core/client.go index 9636619b02..18ca64101b 100644 --- a/core/client.go +++ b/core/client.go @@ -2,25 +2,13 @@ package core import ( "fmt" - - retryhttp "github.com/hashicorp/go-retryablehttp" - "github.com/tendermint/tendermint/rpc/client" - "github.com/tendermint/tendermint/rpc/client/http" + coregrpc "github.com/tendermint/tendermint/rpc/grpc" ) // Client is an alias to Core Client. -type Client = client.Client +type Client = coregrpc.BlockAPIClient -// NewRemote creates a new Client that communicates with a remote Core endpoint over HTTP. +// NewRemote creates a new Client that communicates with a remote Core endpoint over gRPC. func NewRemote(ip, port string) (Client, error) { - httpClient := retryhttp.NewClient() - httpClient.RetryMax = 2 - // suppress logging - httpClient.Logger = nil - - return http.NewWithClient( - fmt.Sprintf("tcp://%s:%s", ip, port), - "/websocket", - httpClient.StandardClient(), - ) + return coregrpc.StartBlockAPIGRPCClient(fmt.Sprintf("tcp://%s:%s", ip, port)) } diff --git a/core/exchange_test.go b/core/exchange_test.go index c0b925b51d..e4a6374f05 100644 --- a/core/exchange_test.go +++ b/core/exchange_test.go @@ -3,6 +3,7 @@ package core import ( "bytes" "context" + "net" "testing" "time" @@ -118,7 +119,11 @@ func createCoreFetcher(t *testing.T, cfg *testnode.Config) (*BlockFetcher, testn // flakiness with accessing account state) _, err := cctx.WaitForHeightWithTimeout(2, time.Second*2) // TODO @renaynay: configure? require.NoError(t, err) - return NewBlockFetcher(cctx.Client), cctx + host, port, err := net.SplitHostPort(cctx.GRPCClient.Target()) + require.NoError(t, err) + blockAPIClient, err := NewRemote(host, port) + require.NoError(t, err) + return NewBlockFetcher(blockAPIClient), cctx } // fillBlocks fills blocks until the context is canceled. @@ -153,10 +158,6 @@ func generateNonEmptyBlocks( sub, err := fetcher.SubscribeNewBlockEvent(ctx) require.NoError(t, err) - defer func() { - err = fetcher.UnsubscribeNewBlockEvent(ctx) - require.NoError(t, err) - }() go fillBlocks(t, generateCtx, cfg, cctx) diff --git a/core/fetcher.go b/core/fetcher.go index f2b160e108..efe0adbdf3 100644 --- a/core/fetcher.go +++ b/core/fetcher.go @@ -2,8 +2,11 @@ package core import ( "context" - "errors" "fmt" + "github.com/gogo/protobuf/proto" + tmproto "github.com/tendermint/tendermint/proto/tendermint/types" + coregrpc "github.com/tendermint/tendermint/rpc/grpc" + "io" logging "github.com/ipfs/go-log/v2" coretypes "github.com/tendermint/tendermint/rpc/core/types" @@ -54,41 +57,87 @@ func (f *BlockFetcher) GetBlockInfo(ctx context.Context, height *int64) (*types. } // GetBlock queries Core for a `Block` at the given height. +// if the height is nil, use the latest height func (f *BlockFetcher) GetBlock(ctx context.Context, height *int64) (*types.Block, error) { - res, err := f.client.Block(ctx, height) + blockHeight, err := f.resolveHeight(ctx, height) if err != nil { return nil, err } - if res != nil && res.Block == nil { - return nil, fmt.Errorf("core/fetcher: block not found, height: %d", height) + stream, err := f.client.BlockByHeight(ctx, &coregrpc.BlockByHeightRequest{Height: blockHeight}) + if err != nil { + return nil, err } - - return res.Block, nil + block, _, _, _, err := receiveBlockByHeight(stream) + if err != nil { + return nil, err + } + return block, nil } func (f *BlockFetcher) GetBlockByHash(ctx context.Context, hash libhead.Hash) (*types.Block, error) { - res, err := f.client.BlockByHash(ctx, hash) + if hash == nil { + return nil, fmt.Errorf("cannot get block with nil hash") + } + stream, err := f.client.BlockByHash(ctx, &coregrpc.BlockByHashRequest{Hash: hash}) if err != nil { return nil, err } - - if res != nil && res.Block == nil { - return nil, fmt.Errorf("core/fetcher: block not found, hash: %s", hash.String()) + block, err := receiveBlockByHash(stream) + if err != nil { + return nil, err } - return res.Block, nil + return block, nil +} + +// resolveHeight takes a height pointer and returns its value if it's not nil. +// otherwise, returns the latest height. +func (f *BlockFetcher) resolveHeight(ctx context.Context, height *int64) (int64, error) { + if height != nil { + return *height, nil + } else { + status, err := f.client.Status(ctx, &coregrpc.StatusRequest{}) + if err != nil { + return 0, err + } + return status.SyncInfo.LatestBlockHeight, nil + } } // GetSignedBlock queries Core for a `Block` at the given height. +// if the height is nil, use the latest height. func (f *BlockFetcher) GetSignedBlock(ctx context.Context, height *int64) (*coretypes.ResultSignedBlock, error) { - return f.client.SignedBlock(ctx, height) + blockHeight, err := f.resolveHeight(ctx, height) + if err != nil { + return nil, err + } + + stream, err := f.client.BlockByHeight(ctx, &coregrpc.BlockByHeightRequest{Height: blockHeight}) + if err != nil { + return nil, err + } + block, _, commit, validatorSet, err := receiveBlockByHeight(stream) + if err != nil { + return nil, err + } + return &coretypes.ResultSignedBlock{ + Header: block.Header, + Commit: *commit, + Data: block.Data, + ValidatorSet: *validatorSet, + }, nil } // Commit queries Core for a `Commit` from the block at // the given height. +// If the height is nil, use the latest height. func (f *BlockFetcher) Commit(ctx context.Context, height *int64) (*types.Commit, error) { - res, err := f.client.Commit(ctx, height) + blockHeight, err := f.resolveHeight(ctx, height) + if err != nil { + return nil, err + } + res, err := f.client.Commit(ctx, &coregrpc.CommitRequest{Height: blockHeight}) if err != nil { return nil, err } @@ -97,45 +146,47 @@ func (f *BlockFetcher) Commit(ctx context.Context, height *int64) (*types.Commit return nil, fmt.Errorf("core/fetcher: commit not found at height %d", height) } - return res.Commit, nil + commit, err := types.CommitFromProto(res.Commit) + if err != nil { + return nil, err + } + + return commit, nil } // ValidatorSet queries Core for the ValidatorSet from the // block at the given height. +// If the height is nil, use the latest height. func (f *BlockFetcher) ValidatorSet(ctx context.Context, height *int64) (*types.ValidatorSet, error) { - perPage := 100 - - vals, total := make([]*types.Validator, 0), -1 - for page := 1; len(vals) != total; page++ { - res, err := f.client.Validators(ctx, height, &page, &perPage) - if err != nil { - return nil, err - } + blockHeight, err := f.resolveHeight(ctx, height) + if err != nil { + return nil, err + } + res, err := f.client.ValidatorSet(ctx, &coregrpc.ValidatorSetRequest{Height: blockHeight}) + if err != nil { + return nil, err + } - if res != nil && len(res.Validators) == 0 { - return nil, fmt.Errorf("core/fetcher: validator set not found at height %d", height) - } + if res != nil && res.ValidatorSet == nil { + return nil, fmt.Errorf("core/fetcher: validator set not found at height %d", height) + } - total = res.Total - vals = append(vals, res.Validators...) + validatorSet, err := types.ValidatorSetFromProto(res.ValidatorSet) + if err != nil { + return nil, err } - return types.NewValidatorSet(vals), nil + return validatorSet, nil } // SubscribeNewBlockEvent subscribes to new block events from Core, returning // a new block event channel on success. func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (<-chan types.EventDataSignedBlock, error) { - // start the client if not started yet - if !f.client.IsRunning() { - return nil, errors.New("client not running") - } - ctx, cancel := context.WithCancel(ctx) f.cancel = cancel f.doneCh = make(chan struct{}) - eventChan, err := f.client.Subscribe(ctx, newBlockSubscriber, newDataSignedBlockQuery) + subscription, err := f.client.SubscribeNewHeights(ctx, &coregrpc.SubscribeNewHeightsRequest{}) if err != nil { return nil, err } @@ -148,14 +199,24 @@ func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (<-chan types select { case <-ctx.Done(): return - case newEvent, ok := <-eventChan: - if !ok { - log.Errorw("fetcher: new blocks subscription channel closed unexpectedly") + default: + resp, err := subscription.Recv() + if err != nil { + log.Errorw("fetcher: error receiving new height", "err", err.Error()) + return + } + signedBlock, err := f.GetSignedBlock(ctx, &resp.Height) + if err != nil { + log.Errorw("fetcher: error receiving signed block", "height", resp.Height, "err", err.Error()) return } - signedBlock := newEvent.Data.(types.EventDataSignedBlock) select { - case signedBlockCh <- signedBlock: + case signedBlockCh <- types.EventDataSignedBlock{ + Header: signedBlock.Header, + Commit: signedBlock.Commit, + ValidatorSet: signedBlock.ValidatorSet, + Data: signedBlock.Data, + }: case <-ctx.Done(): return } @@ -166,24 +227,95 @@ func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (<-chan types return signedBlockCh, nil } -// UnsubscribeNewBlockEvent stops the subscription to new block events from Core. -func (f *BlockFetcher) UnsubscribeNewBlockEvent(ctx context.Context) error { - f.cancel() - select { - case <-f.doneCh: - case <-ctx.Done(): - return fmt.Errorf("fetcher: unsubscribe from new block events: %w", ctx.Err()) - } - return f.client.Unsubscribe(ctx, newBlockSubscriber, newDataSignedBlockQuery) -} - // IsSyncing returns the sync status of the Core connection: true for // syncing, and false for already caught up. It can also return an error // in the case of a failed status request. func (f *BlockFetcher) IsSyncing(ctx context.Context) (bool, error) { - resp, err := f.client.Status(ctx) + resp, err := f.client.Status(ctx, &coregrpc.StatusRequest{}) if err != nil { return false, err } return resp.SyncInfo.CatchingUp, nil } + +func receiveBlockByHeight(streamer coregrpc.BlockAPI_BlockByHeightClient) (*types.Block, *types.BlockMeta, *types.Commit, *types.ValidatorSet, error) { + parts := make([]*tmproto.Part, 0) + + // receive the first part to get the block meta, commit, and validator set + resp, err := streamer.Recv() + if err != nil { + return nil, nil, nil, nil, err + } + blockMeta, err := types.BlockMetaFromProto(resp.BlockMeta) + if err != nil { + return nil, nil, nil, nil, err + } + commit, err := types.CommitFromProto(resp.Commit) + if err != nil { + return nil, nil, nil, nil, err + } + validatorSet, err := types.ValidatorSetFromProto(resp.ValidatorSet) + if err != nil { + return nil, nil, nil, nil, err + } + parts = append(parts, resp.BlockPart) + + // receive the rest of the block + isLast := resp.IsLast + for !isLast { + resp, err := streamer.Recv() + if err != nil { + return nil, nil, nil, nil, err + } + parts = append(parts, resp.BlockPart) + isLast = resp.IsLast + } + block, err := partsToBlock(parts) + if err != nil { + return nil, nil, nil, nil, err + } + return block, blockMeta, commit, validatorSet, nil +} + +func receiveBlockByHash(streamer coregrpc.BlockAPI_BlockByHashClient) (*types.Block, error) { + parts := make([]*tmproto.Part, 0) + isLast := false + for !isLast { + resp, err := streamer.Recv() + if err != nil { + return nil, err + } + parts = append(parts, resp.BlockPart) + isLast = resp.IsLast + } + return partsToBlock(parts) +} + +func partsToBlock(parts []*tmproto.Part) (*types.Block, error) { + partSet := types.NewPartSetFromHeader(types.PartSetHeader{ + Total: uint32(len(parts)), + }) + for _, part := range parts { + ok, err := partSet.AddPartWithoutProof(&types.Part{Index: part.Index, Bytes: part.Bytes}) + if err != nil { + return nil, err + } + if !ok { + return nil, err + } + } + pbb := new(tmproto.Block) + bz, err := io.ReadAll(partSet.GetReader()) + if err != nil { + return nil, err + } + err = proto.Unmarshal(bz, pbb) + if err != nil { + return nil, err + } + block, err := types.BlockFromProto(pbb) + if err != nil { + return nil, err + } + return block, nil +} diff --git a/core/fetcher_no_race_test.go b/core/fetcher_no_race_test.go index 8b3af8e5e1..8ec2fdbc0f 100644 --- a/core/fetcher_no_race_test.go +++ b/core/fetcher_no_race_test.go @@ -4,6 +4,7 @@ package core import ( "context" + "net" "testing" "time" @@ -18,8 +19,12 @@ func TestBlockFetcherHeaderValues(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) t.Cleanup(cancel) - client := StartTestNode(t).Client - fetcher := NewBlockFetcher(client) + node := StartTestNode(t) + host, port, err := net.SplitHostPort(node.GRPCClient.Target()) + require.NoError(t, err) + blockAPIClient, err := NewRemote(host, port) + require.NoError(t, err) + fetcher := NewBlockFetcher(blockAPIClient) // generate some blocks newBlockChan, err := fetcher.SubscribeNewBlockEvent(ctx) @@ -51,5 +56,4 @@ func TestBlockFetcherHeaderValues(t *testing.T) { // compare ValidatorSet hash to the ValidatorsHash from first block height hexBytes := valSet.Hash() assert.Equal(t, nextBlock.ValidatorSet.Hash(), hexBytes) - require.NoError(t, fetcher.UnsubscribeNewBlockEvent(ctx)) } diff --git a/core/fetcher_test.go b/core/fetcher_test.go index 42afa42bcd..e6f4f3b967 100644 --- a/core/fetcher_test.go +++ b/core/fetcher_test.go @@ -2,6 +2,7 @@ package core import ( "context" + "net" "testing" "time" @@ -13,8 +14,11 @@ func TestBlockFetcher_GetBlock_and_SubscribeNewBlockEvent(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*15) t.Cleanup(cancel) - client := StartTestNode(t).Client - fetcher := NewBlockFetcher(client) + host, port, err := net.SplitHostPort(StartTestNode(t).GRPCClient.Target()) + require.NoError(t, err) + blockAPIClient, err := NewRemote(host, port) + require.NoError(t, err) + fetcher := NewBlockFetcher(blockAPIClient) // generate some blocks newBlockChan, err := fetcher.SubscribeNewBlockEvent(ctx) @@ -35,5 +39,4 @@ func TestBlockFetcher_GetBlock_and_SubscribeNewBlockEvent(t *testing.T) { require.NoError(t, ctx.Err()) } } - require.NoError(t, fetcher.UnsubscribeNewBlockEvent(ctx)) } diff --git a/core/header_test.go b/core/header_test.go index 7b7eb3a7b7..951aa4c943 100644 --- a/core/header_test.go +++ b/core/header_test.go @@ -3,6 +3,7 @@ package core import ( "context" "fmt" + "net" "testing" "github.com/stretchr/testify/assert" @@ -20,8 +21,11 @@ func TestMakeExtendedHeaderForEmptyBlock(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - client := StartTestNode(t).Client - fetcher := NewBlockFetcher(client) + host, port, err := net.SplitHostPort(StartTestNode(t).GRPCClient.Target()) + require.NoError(t, err) + blockAPIClient, err := NewRemote(host, port) + require.NoError(t, err) + fetcher := NewBlockFetcher(blockAPIClient) sub, err := fetcher.SubscribeNewBlockEvent(ctx) require.NoError(t, err) diff --git a/core/listener.go b/core/listener.go index 67e532d970..ec397bfdc8 100644 --- a/core/listener.go +++ b/core/listener.go @@ -110,11 +110,6 @@ func (cl *Listener) Start(context.Context) error { // Stop stops the listener loop. func (cl *Listener) Stop(ctx context.Context) error { - err := cl.fetcher.UnsubscribeNewBlockEvent(ctx) - if err != nil { - log.Warnw("listener: unsubscribing from new block event", "err", err) - } - cl.cancel() select { case <-cl.closed: @@ -124,7 +119,7 @@ func (cl *Listener) Stop(ctx context.Context) error { return ctx.Err() } - err = cl.metrics.Close() + err := cl.metrics.Close() if err != nil { log.Warnw("listener: closing metrics", "err", err) } @@ -155,11 +150,6 @@ func (cl *Listener) runSubscriber(ctx context.Context, sub <-chan types.EventDat } func (cl *Listener) resubscribe(ctx context.Context) <-chan types.EventDataSignedBlock { - err := cl.fetcher.UnsubscribeNewBlockEvent(ctx) - if err != nil { - log.Warnw("listener: unsubscribe", "err", err) - } - ticker := time.NewTicker(retrySubscriptionDelay) defer ticker.Stop() for { diff --git a/core/testing.go b/core/testing.go index 6d2aa8cc36..0544339c42 100644 --- a/core/testing.go +++ b/core/testing.go @@ -6,7 +6,6 @@ import ( "testing" "time" - "github.com/stretchr/testify/require" tmconfig "github.com/tendermint/tendermint/config" tmrand "github.com/tendermint/tendermint/libs/rand" @@ -50,22 +49,9 @@ func StartTestNode(t *testing.T) testnode.Context { func StartTestNodeWithConfig(t *testing.T, cfg *testnode.Config) testnode.Context { cctx, _, _ := testnode.NewNetwork(t, cfg) // we want to test over remote http client, - // so we are as close to the real environment as possible - // however, it might be useful to use local tendermint client - // if you need to debug something inside of it - ip, port, err := getEndpoint(cfg.TmConfig) - require.NoError(t, err) - client, err := NewRemote(ip, port) - require.NoError(t, err) - - err = client.Start() - require.NoError(t, err) - t.Cleanup(func() { - err := client.Stop() - require.NoError(t, err) - }) - - cctx.WithClient(client) + // so we are as close to the real environment as possible, + // however, it might be useful to use a local tendermint client + // if you need to debug something inside it return cctx } diff --git a/go.mod b/go.mod index 8142b7fe58..7fe92b8be6 100644 --- a/go.mod +++ b/go.mod @@ -354,10 +354,10 @@ require ( ) replace ( - github.com/cosmos/cosmos-sdk => github.com/celestiaorg/cosmos-sdk v1.25.0-sdk-v0.46.16 + github.com/cosmos/cosmos-sdk => ../cosmos-sdk github.com/filecoin-project/dagstore => github.com/celestiaorg/dagstore v0.0.0-20230824094345-537c012aa403 github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1 // broken goleveldb needs to be replaced for the cosmos-sdk and celestia-app github.com/syndtr/goleveldb => github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 - github.com/tendermint/tendermint => github.com/celestiaorg/celestia-core v1.43.0-tm-v0.34.35 + github.com/tendermint/tendermint => ../celestia-core ) diff --git a/go.sum b/go.sum index ad08fc77bd..4ac7d6a65e 100644 --- a/go.sum +++ b/go.sum @@ -347,10 +347,6 @@ github.com/celestiaorg/blobstream-contracts/v3 v3.1.0 h1:h1Y4V3EMQ2mFmNtWt2sIhZI github.com/celestiaorg/blobstream-contracts/v3 v3.1.0/go.mod h1:x4DKyfKOSv1ZJM9NwV+Pw01kH2CD7N5zTFclXIVJ6GQ= github.com/celestiaorg/celestia-app/v3 v3.0.0-rc0 h1:cfZVxldi5u/vGZPdFvW95quUmcg307v44PndjYwEOR4= github.com/celestiaorg/celestia-app/v3 v3.0.0-rc0/go.mod h1:K8U6TRHgofz0y5UcvlOL+CuNLbx4jeZrZF7HZdf+Rgs= -github.com/celestiaorg/celestia-core v1.43.0-tm-v0.34.35 h1:L4GTm+JUXhB0a/nGPMq6jEqqe6THuYSQ8m2kUCtZYqw= -github.com/celestiaorg/celestia-core v1.43.0-tm-v0.34.35/go.mod h1:bFr0lAGwaJ0mOHSBmib5/ca5pbBf1yKWGPs93Td0HPw= -github.com/celestiaorg/cosmos-sdk v1.25.0-sdk-v0.46.16 h1:f+fTe7GGk0/qgdzyqB8kk8EcDf9d6MC22khBTQiDXsU= -github.com/celestiaorg/cosmos-sdk v1.25.0-sdk-v0.46.16/go.mod h1:07Z8HJqS8Rw4XlZ+ok3D3NM/X/in8mvcGLvl0Zb5wrA= github.com/celestiaorg/go-fraud v0.2.1 h1:oYhxI0gM/EpGRgbVQdRI/LSlqyT65g/WhQGSVGfx09w= github.com/celestiaorg/go-fraud v0.2.1/go.mod h1:lNY1i4K6kUeeE60Z2VK8WXd+qXb8KRzfBhvwPkK6aUc= github.com/celestiaorg/go-header v0.6.3 h1:VI+fsNxFLeUS7cNn0LgHP6Db66uslnKp/fgMg5nxqHg= diff --git a/nodebuilder/core/module.go b/nodebuilder/core/module.go index 441907ce32..964719e926 100644 --- a/nodebuilder/core/module.go +++ b/nodebuilder/core/module.go @@ -76,12 +76,6 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option )), fx.Provide(fx.Annotate( remote, - fx.OnStart(func(_ context.Context, client core.Client) error { - return client.Start() - }), - fx.OnStop(func(_ context.Context, client core.Client) error { - return client.Stop() - }), )), ) default: From ea9a2d73103e7102c167d24fed2ea1a27dbc52d6 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Mon, 4 Nov 2024 12:24:14 +0400 Subject: [PATCH 02/32] test: node builder test using the new grpc endpoints --- nodebuilder/node_bridge_test.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/nodebuilder/node_bridge_test.go b/nodebuilder/node_bridge_test.go index d2b7ebaf4e..878473f96e 100644 --- a/nodebuilder/node_bridge_test.go +++ b/nodebuilder/node_bridge_test.go @@ -19,8 +19,11 @@ func TestBridge_WithMockedCoreClient(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) - client := core.StartTestNode(t).Client - node, err := New(node.Bridge, p2p.Private, repo, coremodule.WithClient(client)) + host, port, err := net.SplitHostPort(core.StartTestNode(t).GRPCClient.Target()) + require.NoError(t, err) + blockAPIClient, err := core.NewRemote(host, port) + require.NoError(t, err) + node, err := New(node.Bridge, p2p.Private, repo, coremodule.WithClient(blockAPIClient)) require.NoError(t, err) require.NotNil(t, node) err = node.Start(ctx) From 5cd2e948e51989cb6c45439974deaac305edb605 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Mon, 4 Nov 2024 12:24:22 +0400 Subject: [PATCH 03/32] chore: import --- nodebuilder/node_bridge_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/nodebuilder/node_bridge_test.go b/nodebuilder/node_bridge_test.go index 878473f96e..e45b70ffd6 100644 --- a/nodebuilder/node_bridge_test.go +++ b/nodebuilder/node_bridge_test.go @@ -2,6 +2,7 @@ package nodebuilder import ( "context" + "net" "testing" "github.com/stretchr/testify/require" From c30fdbcce70fd3e42f40aea4ea5373888863d8dc Mon Sep 17 00:00:00 2001 From: sweexordious Date: Mon, 4 Nov 2024 14:09:13 +0400 Subject: [PATCH 04/32] chore: add stop function for fetcher --- core/exchange_test.go | 4 ++++ core/fetcher.go | 14 +++++++++++++- core/fetcher_no_race_test.go | 1 + core/fetcher_test.go | 1 + core/listener.go | 12 +++++++++++- nodebuilder/tests/swamp/swamp.go | 8 +++++++- 6 files changed, 37 insertions(+), 3 deletions(-) diff --git a/core/exchange_test.go b/core/exchange_test.go index e4a6374f05..167afb376c 100644 --- a/core/exchange_test.go +++ b/core/exchange_test.go @@ -158,6 +158,10 @@ func generateNonEmptyBlocks( sub, err := fetcher.SubscribeNewBlockEvent(ctx) require.NoError(t, err) + defer func() { + err = fetcher.Stop(ctx) + require.NoError(t, err) + }() go fillBlocks(t, generateCtx, cfg, cctx) diff --git a/core/fetcher.go b/core/fetcher.go index efe0adbdf3..c8e556356c 100644 --- a/core/fetcher.go +++ b/core/fetcher.go @@ -36,6 +36,16 @@ func NewBlockFetcher(client Client) *BlockFetcher { } } +func (f *BlockFetcher) Stop(ctx context.Context) error { + f.cancel() + select { + case <-f.doneCh: + return nil + case <-ctx.Done(): + return fmt.Errorf("fetcher: unsubscribe from new block events: %w", ctx.Err()) + } +} + // GetBlockInfo queries Core for additional block information, like Commit and ValidatorSet. func (f *BlockFetcher) GetBlockInfo(ctx context.Context, height *int64) (*types.Commit, *types.ValidatorSet, error) { commit, err := f.Commit(ctx, height) @@ -202,7 +212,9 @@ func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (<-chan types default: resp, err := subscription.Recv() if err != nil { - log.Errorw("fetcher: error receiving new height", "err", err.Error()) + if ctx.Err() == nil { + log.Errorw("fetcher: error receiving new height", "err", err.Error()) + } return } signedBlock, err := f.GetSignedBlock(ctx, &resp.Height) diff --git a/core/fetcher_no_race_test.go b/core/fetcher_no_race_test.go index 8ec2fdbc0f..3b8a7660e4 100644 --- a/core/fetcher_no_race_test.go +++ b/core/fetcher_no_race_test.go @@ -56,4 +56,5 @@ func TestBlockFetcherHeaderValues(t *testing.T) { // compare ValidatorSet hash to the ValidatorsHash from first block height hexBytes := valSet.Hash() assert.Equal(t, nextBlock.ValidatorSet.Hash(), hexBytes) + require.NoError(t, fetcher.Stop(ctx)) } diff --git a/core/fetcher_test.go b/core/fetcher_test.go index e6f4f3b967..ecde519dbc 100644 --- a/core/fetcher_test.go +++ b/core/fetcher_test.go @@ -39,4 +39,5 @@ func TestBlockFetcher_GetBlock_and_SubscribeNewBlockEvent(t *testing.T) { require.NoError(t, ctx.Err()) } } + require.NoError(t, fetcher.Stop(ctx)) } diff --git a/core/listener.go b/core/listener.go index ec397bfdc8..178688994d 100644 --- a/core/listener.go +++ b/core/listener.go @@ -110,6 +110,11 @@ func (cl *Listener) Start(context.Context) error { // Stop stops the listener loop. func (cl *Listener) Stop(ctx context.Context) error { + err := cl.fetcher.Stop(ctx) + if err != nil { + log.Warnw("listener: stopping gRPC block event", "err", err) + } + cl.cancel() select { case <-cl.closed: @@ -119,7 +124,7 @@ func (cl *Listener) Stop(ctx context.Context) error { return ctx.Err() } - err := cl.metrics.Close() + err = cl.metrics.Close() if err != nil { log.Warnw("listener: closing metrics", "err", err) } @@ -150,6 +155,11 @@ func (cl *Listener) runSubscriber(ctx context.Context, sub <-chan types.EventDat } func (cl *Listener) resubscribe(ctx context.Context) <-chan types.EventDataSignedBlock { + err := cl.fetcher.Stop(ctx) + if err != nil { + log.Warnw("listener: unsubscribe", "err", err) + } + ticker := time.NewTicker(retrySubscriptionDelay) defer ticker.Stop() for { diff --git a/nodebuilder/tests/swamp/swamp.go b/nodebuilder/tests/swamp/swamp.go index 57d46bb64b..6c8404b513 100644 --- a/nodebuilder/tests/swamp/swamp.go +++ b/nodebuilder/tests/swamp/swamp.go @@ -179,8 +179,14 @@ func (s *Swamp) setupGenesis() { store, err := store.NewStore(store.DefaultParameters(), s.t.TempDir()) require.NoError(s.t, err) + host, port, err := net.SplitHostPort(s.ClientContext.GRPCClient.Target()) + require.NoError(t, err) + blockAPIClient, err := core.NewRemote(host, port) + require.NoError(t, err) + fetcher := core.NewBlockFetcher(blockAPIClient) + ex, err := core.NewExchange( - core.NewBlockFetcher(s.ClientContext.Client), + fetcher, store, header.MakeExtendedHeader, ) From 9a194b23d9336b9059ca898a3824c88ad8e69136 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Mon, 4 Nov 2024 15:08:45 +0400 Subject: [PATCH 05/32] feat!: remove rpc port --- nodebuilder/core/config.go | 10 ---------- nodebuilder/core/config_test.go | 16 +--------------- nodebuilder/core/constructors.go | 2 +- nodebuilder/core/flags.go | 17 +++-------------- nodebuilder/core/flags_test.go | 13 ------------- 5 files changed, 5 insertions(+), 53 deletions(-) diff --git a/nodebuilder/core/config.go b/nodebuilder/core/config.go index b82be1d809..d60494431f 100644 --- a/nodebuilder/core/config.go +++ b/nodebuilder/core/config.go @@ -8,7 +8,6 @@ import ( ) const ( - DefaultRPCPort = "26657" DefaultGRPCPort = "9090" ) @@ -17,7 +16,6 @@ var MetricsEnabled bool // Config combines all configuration fields for managing the relationship with a Core node. type Config struct { IP string - RPCPort string GRPCPort string } @@ -26,7 +24,6 @@ type Config struct { func DefaultConfig() Config { return Config{ IP: "", - RPCPort: DefaultRPCPort, GRPCPort: DefaultGRPCPort, } } @@ -37,9 +34,6 @@ func (cfg *Config) Validate() error { return nil } - if cfg.RPCPort == "" { - return fmt.Errorf("nodebuilder/core: rpc port is not set") - } if cfg.GRPCPort == "" { return fmt.Errorf("nodebuilder/core: grpc port is not set") } @@ -49,10 +43,6 @@ func (cfg *Config) Validate() error { return err } cfg.IP = ip - _, err = strconv.Atoi(cfg.RPCPort) - if err != nil { - return fmt.Errorf("nodebuilder/core: invalid rpc port: %s", err.Error()) - } _, err = strconv.Atoi(cfg.GRPCPort) if err != nil { return fmt.Errorf("nodebuilder/core: invalid grpc port: %s", err.Error()) diff --git a/nodebuilder/core/config_test.go b/nodebuilder/core/config_test.go index 6535e28807..b136c4f6a7 100644 --- a/nodebuilder/core/config_test.go +++ b/nodebuilder/core/config_test.go @@ -16,7 +16,6 @@ func TestValidate(t *testing.T) { name: "valid config", cfg: Config{ IP: "127.0.0.1", - RPCPort: DefaultRPCPort, GRPCPort: DefaultGRPCPort, }, expectErr: false, @@ -30,7 +29,6 @@ func TestValidate(t *testing.T) { name: "hostname preserved", cfg: Config{ IP: "celestia.org", - RPCPort: DefaultRPCPort, GRPCPort: DefaultGRPCPort, }, expectErr: false, @@ -46,8 +44,7 @@ func TestValidate(t *testing.T) { { name: "missing GRPC port", cfg: Config{ - IP: "127.0.0.1", - RPCPort: DefaultRPCPort, + IP: "127.0.0.1", }, expectErr: true, }, @@ -55,25 +52,14 @@ func TestValidate(t *testing.T) { name: "invalid IP, but will be accepted as host and not raise an error", cfg: Config{ IP: "invalid-ip", - RPCPort: DefaultRPCPort, GRPCPort: DefaultGRPCPort, }, expectErr: false, }, - { - name: "invalid RPC port", - cfg: Config{ - IP: "127.0.0.1", - RPCPort: "invalid-port", - GRPCPort: DefaultGRPCPort, - }, - expectErr: true, - }, { name: "invalid GRPC port", cfg: Config{ IP: "127.0.0.1", - RPCPort: DefaultRPCPort, GRPCPort: "invalid-port", }, expectErr: true, diff --git a/nodebuilder/core/constructors.go b/nodebuilder/core/constructors.go index 53c914a041..ba14f95d0e 100644 --- a/nodebuilder/core/constructors.go +++ b/nodebuilder/core/constructors.go @@ -5,5 +5,5 @@ import ( ) func remote(cfg Config) (core.Client, error) { - return core.NewRemote(cfg.IP, cfg.RPCPort) + return core.NewRemote(cfg.IP, cfg.GRPCPort) } diff --git a/nodebuilder/core/flags.go b/nodebuilder/core/flags.go index 127ee5ee60..b68d90d9af 100644 --- a/nodebuilder/core/flags.go +++ b/nodebuilder/core/flags.go @@ -9,7 +9,6 @@ import ( var ( coreFlag = "core.ip" - coreRPCFlag = "core.rpc.port" coreGRPCFlag = "core.grpc.port" ) @@ -22,12 +21,7 @@ func Flags() *flag.FlagSet { "", "Indicates node to connect to the given core node. "+ "Example: , 127.0.0.1. , subdomain.domain.tld "+ - "Assumes RPC port 26657 and gRPC port 9090 as default unless otherwise specified.", - ) - flags.String( - coreRPCFlag, - DefaultRPCPort, - "Set a custom RPC port for the core node connection. The --core.ip flag must also be provided.", + "Assumes gRPC port 9090 as default unless otherwise specified.", ) flags.String( coreGRPCFlag, @@ -44,17 +38,12 @@ func ParseFlags( ) error { coreIP := cmd.Flag(coreFlag).Value.String() if coreIP == "" { - if cmd.Flag(coreGRPCFlag).Changed || cmd.Flag(coreRPCFlag).Changed { - return fmt.Errorf("cannot specify RPC/gRPC ports without specifying an IP address for --core.ip") + if cmd.Flag(coreGRPCFlag).Changed { + return fmt.Errorf("cannot specify gRPC port without specifying an IP address for --core.ip") } return nil } - if cmd.Flag(coreRPCFlag).Changed { - rpc := cmd.Flag(coreRPCFlag).Value.String() - cfg.RPCPort = rpc - } - if cmd.Flag(coreGRPCFlag).Changed { grpc := cmd.Flag(coreGRPCFlag).Value.String() cfg.GRPCPort = grpc diff --git a/nodebuilder/core/flags_test.go b/nodebuilder/core/flags_test.go index ce906de037..f20b74c5bb 100644 --- a/nodebuilder/core/flags_test.go +++ b/nodebuilder/core/flags_test.go @@ -21,7 +21,6 @@ func TestParseFlags(t *testing.T) { inputCfg: Config{}, expectedCfg: Config{ IP: "", - RPCPort: "", GRPCPort: "", }, expectError: false, @@ -30,12 +29,10 @@ func TestParseFlags(t *testing.T) { name: "only core.ip", args: []string{"--core.ip=127.0.0.1"}, inputCfg: Config{ - RPCPort: DefaultRPCPort, GRPCPort: DefaultGRPCPort, }, expectedCfg: Config{ IP: "127.0.0.1", - RPCPort: DefaultRPCPort, GRPCPort: DefaultGRPCPort, }, expectError: false, @@ -46,7 +43,6 @@ func TestParseFlags(t *testing.T) { inputCfg: Config{}, expectedCfg: Config{ IP: "127.0.0.1", - RPCPort: DefaultRPCPort, GRPCPort: DefaultGRPCPort, }, expectError: true, @@ -56,12 +52,10 @@ func TestParseFlags(t *testing.T) { args: []string{}, inputCfg: Config{ IP: "127.162.36.1", - RPCPort: "1234", GRPCPort: "5678", }, expectedCfg: Config{ IP: "127.162.36.1", - RPCPort: "1234", GRPCPort: "5678", }, expectError: false, @@ -70,12 +64,10 @@ func TestParseFlags(t *testing.T) { name: "only core.ip, with config.toml overridden defaults for ports", args: []string{"--core.ip=127.0.0.1"}, inputCfg: Config{ - RPCPort: "1234", GRPCPort: "5678", }, expectedCfg: Config{ IP: "127.0.0.1", - RPCPort: "1234", GRPCPort: "5678", }, expectError: false, @@ -84,12 +76,10 @@ func TestParseFlags(t *testing.T) { name: "core.ip and core.rpc.port", args: []string{"--core.ip=127.0.0.1", "--core.rpc.port=12345"}, inputCfg: Config{ - RPCPort: DefaultRPCPort, GRPCPort: DefaultGRPCPort, }, expectedCfg: Config{ IP: "127.0.0.1", - RPCPort: "12345", GRPCPort: DefaultGRPCPort, }, expectError: false, @@ -98,12 +88,10 @@ func TestParseFlags(t *testing.T) { name: "core.ip and core.grpc.port", args: []string{"--core.ip=127.0.0.1", "--core.grpc.port=54321"}, inputCfg: Config{ - RPCPort: DefaultRPCPort, GRPCPort: DefaultGRPCPort, }, expectedCfg: Config{ IP: "127.0.0.1", - RPCPort: DefaultRPCPort, GRPCPort: "54321", }, expectError: false, @@ -113,7 +101,6 @@ func TestParseFlags(t *testing.T) { args: []string{"--core.ip=127.0.0.1", "--core.rpc.port=12345", "--core.grpc.port=54321"}, expectedCfg: Config{ IP: "127.0.0.1", - RPCPort: "12345", GRPCPort: "54321", }, expectError: false, From 8360531b166e74f9b9b296a45107eeb8be6bb5ab Mon Sep 17 00:00:00 2001 From: sweexordious Date: Mon, 4 Nov 2024 15:14:10 +0400 Subject: [PATCH 06/32] chore: remove core rpc tests --- nodebuilder/core/flags_test.go | 27 --------------------------- 1 file changed, 27 deletions(-) diff --git a/nodebuilder/core/flags_test.go b/nodebuilder/core/flags_test.go index f20b74c5bb..a2cdccc309 100644 --- a/nodebuilder/core/flags_test.go +++ b/nodebuilder/core/flags_test.go @@ -72,18 +72,6 @@ func TestParseFlags(t *testing.T) { }, expectError: false, }, - { - name: "core.ip and core.rpc.port", - args: []string{"--core.ip=127.0.0.1", "--core.rpc.port=12345"}, - inputCfg: Config{ - GRPCPort: DefaultGRPCPort, - }, - expectedCfg: Config{ - IP: "127.0.0.1", - GRPCPort: DefaultGRPCPort, - }, - expectError: false, - }, { name: "core.ip and core.grpc.port", args: []string{"--core.ip=127.0.0.1", "--core.grpc.port=54321"}, @@ -96,21 +84,6 @@ func TestParseFlags(t *testing.T) { }, expectError: false, }, - { - name: "core.ip, core.rpc.port, and core.grpc.port", - args: []string{"--core.ip=127.0.0.1", "--core.rpc.port=12345", "--core.grpc.port=54321"}, - expectedCfg: Config{ - IP: "127.0.0.1", - GRPCPort: "54321", - }, - expectError: false, - }, - { - name: "core.rpc.port without core.ip", - args: []string{"--core.rpc.port=12345"}, - expectedCfg: Config{}, - expectError: true, - }, { name: "core.grpc.port without core.ip", args: []string{"--core.grpc.port=54321"}, From 2b71210315e252cd2141be3ff37efdac1b8d2f42 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Mon, 4 Nov 2024 15:17:37 +0400 Subject: [PATCH 07/32] chore: remove core rpc tests --- nodebuilder/core/config_test.go | 8 -------- 1 file changed, 8 deletions(-) diff --git a/nodebuilder/core/config_test.go b/nodebuilder/core/config_test.go index b136c4f6a7..60ee30262c 100644 --- a/nodebuilder/core/config_test.go +++ b/nodebuilder/core/config_test.go @@ -33,14 +33,6 @@ func TestValidate(t *testing.T) { }, expectErr: false, }, - { - name: "missing RPC port", - cfg: Config{ - IP: "127.0.0.1", - GRPCPort: DefaultGRPCPort, - }, - expectErr: true, - }, { name: "missing GRPC port", cfg: Config{ From 13bddc3d0a2c7b671998fda7bc8a70bdf7773a84 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Mon, 4 Nov 2024 17:36:04 +0400 Subject: [PATCH 08/32] fix: test fill blocks failing after node stop --- core/exchange_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/exchange_test.go b/core/exchange_test.go index 167afb376c..88c50eac77 100644 --- a/core/exchange_test.go +++ b/core/exchange_test.go @@ -63,6 +63,7 @@ func TestCoreExchange_RequestHeaders(t *testing.T) { require.NoError(t, err) assert.True(t, has) } + require.NoError(t, fetcher.Stop(ctx)) } // TestExchange_DoNotStoreHistoric tests that the CoreExchange will not @@ -141,7 +142,9 @@ func fillBlocks( } _, err := cctx.FillBlock(16, cfg.Genesis.Accounts()[0].Name, flags.BroadcastBlock) - require.NoError(t, err) + if err != nil && ctx.Err() == nil { + require.NoError(t, err) + } } } From a56859879e8633d99cc82dc06760024bd63038f1 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Mon, 4 Nov 2024 17:51:00 +0400 Subject: [PATCH 09/32] chore: use commits for deps --- go.mod | 5 ++--- go.sum | 10 ++++------ 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/go.mod b/go.mod index 7fe92b8be6..7b701761e9 100644 --- a/go.mod +++ b/go.mod @@ -27,7 +27,6 @@ require ( github.com/gorilla/mux v1.8.1 github.com/grafana/otel-profiling-go v0.5.1 github.com/grafana/pyroscope-go v1.1.2 - github.com/hashicorp/go-retryablehttp v0.7.7 github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/imdario/mergo v0.3.16 github.com/ipfs/boxo v0.24.0 @@ -354,10 +353,10 @@ require ( ) replace ( - github.com/cosmos/cosmos-sdk => ../cosmos-sdk + github.com/cosmos/cosmos-sdk => github.com/celestiaorg/cosmos-sdk v1.25.0-sdk-v0.46.16.0.20241009080849-0cd36203b354 github.com/filecoin-project/dagstore => github.com/celestiaorg/dagstore v0.0.0-20230824094345-537c012aa403 github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1 // broken goleveldb needs to be replaced for the cosmos-sdk and celestia-app github.com/syndtr/goleveldb => github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 - github.com/tendermint/tendermint => ../celestia-core + github.com/tendermint/tendermint => github.com/celestiaorg/celestia-core v1.43.0-tm-v0.34.35.0.20241104132041-3d2b32b7ddf2 ) diff --git a/go.sum b/go.sum index 4ac7d6a65e..657d588b15 100644 --- a/go.sum +++ b/go.sum @@ -347,6 +347,10 @@ github.com/celestiaorg/blobstream-contracts/v3 v3.1.0 h1:h1Y4V3EMQ2mFmNtWt2sIhZI github.com/celestiaorg/blobstream-contracts/v3 v3.1.0/go.mod h1:x4DKyfKOSv1ZJM9NwV+Pw01kH2CD7N5zTFclXIVJ6GQ= github.com/celestiaorg/celestia-app/v3 v3.0.0-rc0 h1:cfZVxldi5u/vGZPdFvW95quUmcg307v44PndjYwEOR4= github.com/celestiaorg/celestia-app/v3 v3.0.0-rc0/go.mod h1:K8U6TRHgofz0y5UcvlOL+CuNLbx4jeZrZF7HZdf+Rgs= +github.com/celestiaorg/celestia-core v1.43.0-tm-v0.34.35.0.20241104132041-3d2b32b7ddf2 h1:V0Dga4zDWv6e8xl2nOXWTZr8QFbTXLasZGrziBYnygY= +github.com/celestiaorg/celestia-core v1.43.0-tm-v0.34.35.0.20241104132041-3d2b32b7ddf2/go.mod h1:bFr0lAGwaJ0mOHSBmib5/ca5pbBf1yKWGPs93Td0HPw= +github.com/celestiaorg/cosmos-sdk v1.25.0-sdk-v0.46.16.0.20241009080849-0cd36203b354 h1:3lqz1cEs0wx1PWQQezEaYttAMYMsdxU677Jh58keyyc= +github.com/celestiaorg/cosmos-sdk v1.25.0-sdk-v0.46.16.0.20241009080849-0cd36203b354/go.mod h1:xDj0DMkHYv8u4oMOV88QU7g2f9nbHjVYlJ3S/6HeTEs= github.com/celestiaorg/go-fraud v0.2.1 h1:oYhxI0gM/EpGRgbVQdRI/LSlqyT65g/WhQGSVGfx09w= github.com/celestiaorg/go-fraud v0.2.1/go.mod h1:lNY1i4K6kUeeE60Z2VK8WXd+qXb8KRzfBhvwPkK6aUc= github.com/celestiaorg/go-header v0.6.3 h1:VI+fsNxFLeUS7cNn0LgHP6Db66uslnKp/fgMg5nxqHg= @@ -594,8 +598,6 @@ github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 h1:7HZCaLC5+BZpm github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4/go.mod h1:5tD+neXqOorC30/tWg0LCSkrqj/AR6gu8yY8/fpw1q0= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= -github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= -github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/filecoin-project/go-jsonrpc v0.6.0 h1:/fFJIAN/k6EgY90m7qbyfY28woMwyseZmh2gVs5sYjY= @@ -913,8 +915,6 @@ github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9n github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48= github.com/hashicorp/go-getter v1.7.5 h1:dT58k9hQ/vbxNMwoI5+xFYAJuv6152UNvdHokfI5wE4= github.com/hashicorp/go-getter v1.7.5/go.mod h1:W7TalhMmbPmsSMdNjD0ZskARur/9GJ17cfHTRtXV744= -github.com/hashicorp/go-hclog v1.6.3 h1:Qr2kF+eVWjTiYmU7Y31tYlP1h0q/X3Nl3tPGdaB11/k= -github.com/hashicorp/go-hclog v1.6.3/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M= github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= github.com/hashicorp/go-immutable-radix v1.3.1 h1:DKHmCUm2hRBK510BaiZlwvpD40f8bJFeZnpfm2KLowc= github.com/hashicorp/go-immutable-radix v1.3.1/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= @@ -923,8 +923,6 @@ github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHh github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/hashicorp/go-retryablehttp v0.5.3/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs= -github.com/hashicorp/go-retryablehttp v0.7.7 h1:C8hUCYzor8PIfXHa4UrZkU4VvK8o9ISHxT2Q8+VepXU= -github.com/hashicorp/go-retryablehttp v0.7.7/go.mod h1:pkQpWZeYWskR+D1tR2O5OcBFOxfA7DoAO6xtkuQnHTk= github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU= github.com/hashicorp/go-safetemp v1.0.0 h1:2HR189eFNrjHQyENnQMMpCiBAsRxzbTMIgBhEyExpmo= github.com/hashicorp/go-safetemp v1.0.0/go.mod h1:oaerMy3BhqiTbVye6QuFhFtIceqFoDHxNAB65b+Rj1I= From 291952badf6bf224be33468beb9bb2e19548fa74 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Mon, 4 Nov 2024 17:55:26 +0400 Subject: [PATCH 10/32] chore: fmt --- core/client.go | 1 + core/fetcher.go | 6 +++--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/core/client.go b/core/client.go index 18ca64101b..40d0d952a0 100644 --- a/core/client.go +++ b/core/client.go @@ -2,6 +2,7 @@ package core import ( "fmt" + coregrpc "github.com/tendermint/tendermint/rpc/grpc" ) diff --git a/core/fetcher.go b/core/fetcher.go index c8e556356c..1002f6bdf4 100644 --- a/core/fetcher.go +++ b/core/fetcher.go @@ -3,13 +3,13 @@ package core import ( "context" "fmt" - "github.com/gogo/protobuf/proto" - tmproto "github.com/tendermint/tendermint/proto/tendermint/types" - coregrpc "github.com/tendermint/tendermint/rpc/grpc" "io" + "github.com/gogo/protobuf/proto" logging "github.com/ipfs/go-log/v2" + tmproto "github.com/tendermint/tendermint/proto/tendermint/types" coretypes "github.com/tendermint/tendermint/rpc/core/types" + coregrpc "github.com/tendermint/tendermint/rpc/grpc" "github.com/tendermint/tendermint/types" libhead "github.com/celestiaorg/go-header" From 3087a3a9f657a9cfe2ed4663c3d6dd4910d5c86c Mon Sep 17 00:00:00 2001 From: sweexordious Date: Tue, 5 Nov 2024 13:20:40 +0400 Subject: [PATCH 11/32] chore: fix tests --- nodebuilder/node_test.go | 29 +++++++++++++++++++++++++---- nodebuilder/tests/swamp/swamp.go | 14 +++++++++++--- 2 files changed, 36 insertions(+), 7 deletions(-) diff --git a/nodebuilder/node_test.go b/nodebuilder/node_test.go index bcf5f01942..5dce6acbf1 100644 --- a/nodebuilder/node_test.go +++ b/nodebuilder/node_test.go @@ -4,6 +4,8 @@ package nodebuilder import ( "context" + "github.com/celestiaorg/celestia-node/core" + "net" "net/http" "net/http/httptest" "strconv" @@ -29,7 +31,16 @@ func TestLifecycle(t *testing.T) { for i, tt := range test { t.Run(strconv.Itoa(i), func(t *testing.T) { - node := TestNode(t, tt.tp) + // we're also creating a test node because the gRPC connection + // is started automatically when starting the node. + host, port, err := net.SplitHostPort(core.StartTestNode(t).GRPCClient.Target()) + require.NoError(t, err) + + cfg := DefaultConfig(tt.tp) + cfg.Core.IP = host + cfg.Core.GRPCPort = port + + node := TestNodeWithConfig(t, tt.tp, cfg) require.NotNil(t, node) require.NotNil(t, node.Config) require.NotNil(t, node.Host) @@ -41,7 +52,7 @@ func TestLifecycle(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - err := node.Start(ctx) + err = node.Start(ctx) require.NoError(t, err) err = node.Stop(ctx) @@ -67,9 +78,19 @@ func TestLifecycle_WithMetrics(t *testing.T) { for i, tt := range test { t.Run(strconv.Itoa(i), func(t *testing.T) { - node := TestNode( + // we're also creating a test node because the gRPC connection + // is started automatically when starting the node. + host, port, err := net.SplitHostPort(core.StartTestNode(t).GRPCClient.Target()) + require.NoError(t, err) + + cfg := DefaultConfig(tt.tp) + cfg.Core.IP = host + cfg.Core.GRPCPort = port + + node := TestNodeWithConfig( t, tt.tp, + cfg, WithMetrics( []otlpmetrichttp.Option{ otlpmetrichttp.WithEndpoint(otelCollectorURL), @@ -88,7 +109,7 @@ func TestLifecycle_WithMetrics(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - err := node.Start(ctx) + err = node.Start(ctx) require.NoError(t, err) err = node.Stop(ctx) diff --git a/nodebuilder/tests/swamp/swamp.go b/nodebuilder/tests/swamp/swamp.go index 6c8404b513..f3299ca80c 100644 --- a/nodebuilder/tests/swamp/swamp.go +++ b/nodebuilder/tests/swamp/swamp.go @@ -180,9 +180,9 @@ func (s *Swamp) setupGenesis() { require.NoError(s.t, err) host, port, err := net.SplitHostPort(s.ClientContext.GRPCClient.Target()) - require.NoError(t, err) + require.NoError(s.t, err) blockAPIClient, err := core.NewRemote(host, port) - require.NoError(t, err) + require.NoError(s.t, err) fetcher := core.NewBlockFetcher(blockAPIClient) ex, err := core.NewExchange( @@ -284,8 +284,16 @@ func (s *Swamp) NewNodeWithStore( switch tp { case node.Bridge: + host, port, err := net.SplitHostPort(s.ClientContext.GRPCClient.Target()) + if err != nil { + return nil, err + } + blockAPIClient, err := core.NewRemote(host, port) + if err != nil { + return nil, err + } options = append(options, - coremodule.WithClient(s.ClientContext.Client), + coremodule.WithClient(blockAPIClient), ) default: } From cc81362cfe38edef9195f421c137d30f55400f23 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Tue, 5 Nov 2024 13:22:46 +0400 Subject: [PATCH 12/32] chore: fmt --- nodebuilder/node_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nodebuilder/node_test.go b/nodebuilder/node_test.go index 5dce6acbf1..2f5274ceaa 100644 --- a/nodebuilder/node_test.go +++ b/nodebuilder/node_test.go @@ -4,7 +4,6 @@ package nodebuilder import ( "context" - "github.com/celestiaorg/celestia-node/core" "net" "net/http" "net/http/httptest" @@ -17,6 +16,7 @@ import ( collectormetricpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" "google.golang.org/protobuf/proto" + "github.com/celestiaorg/celestia-node/core" "github.com/celestiaorg/celestia-node/nodebuilder/node" ) From 7d89608be47739079fbcee1f07ac4bb34533ab74 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Tue, 5 Nov 2024 13:24:33 +0400 Subject: [PATCH 13/32] chore: fmt --- core/fetcher.go | 19 ++++++++++++------- core/testing.go | 15 --------------- 2 files changed, 12 insertions(+), 22 deletions(-) diff --git a/core/fetcher.go b/core/fetcher.go index 1002f6bdf4..63adcfa47f 100644 --- a/core/fetcher.go +++ b/core/fetcher.go @@ -106,13 +106,12 @@ func (f *BlockFetcher) GetBlockByHash(ctx context.Context, hash libhead.Hash) (* func (f *BlockFetcher) resolveHeight(ctx context.Context, height *int64) (int64, error) { if height != nil { return *height, nil - } else { - status, err := f.client.Status(ctx, &coregrpc.StatusRequest{}) - if err != nil { - return 0, err - } - return status.SyncInfo.LatestBlockHeight, nil } + status, err := f.client.Status(ctx, &coregrpc.StatusRequest{}) + if err != nil { + return 0, err + } + return status.SyncInfo.LatestBlockHeight, nil } // GetSignedBlock queries Core for a `Block` at the given height. @@ -250,7 +249,13 @@ func (f *BlockFetcher) IsSyncing(ctx context.Context) (bool, error) { return resp.SyncInfo.CatchingUp, nil } -func receiveBlockByHeight(streamer coregrpc.BlockAPI_BlockByHeightClient) (*types.Block, *types.BlockMeta, *types.Commit, *types.ValidatorSet, error) { +func receiveBlockByHeight(streamer coregrpc.BlockAPI_BlockByHeightClient) ( + *types.Block, + *types.BlockMeta, + *types.Commit, + *types.ValidatorSet, + error, +) { parts := make([]*tmproto.Part, 0) // receive the first part to get the block meta, commit, and validator set diff --git a/core/testing.go b/core/testing.go index 0544339c42..d4b5f6334b 100644 --- a/core/testing.go +++ b/core/testing.go @@ -1,12 +1,9 @@ package core import ( - "net" - "net/url" "testing" "time" - tmconfig "github.com/tendermint/tendermint/config" tmrand "github.com/tendermint/tendermint/libs/rand" "github.com/celestiaorg/celestia-app/v3/test/util/genesis" @@ -55,18 +52,6 @@ func StartTestNodeWithConfig(t *testing.T, cfg *testnode.Config) testnode.Contex return cctx } -func getEndpoint(cfg *tmconfig.Config) (string, string, error) { - url, err := url.Parse(cfg.RPC.ListenAddress) - if err != nil { - return "", "", err - } - host, _, err := net.SplitHostPort(url.Host) - if err != nil { - return "", "", err - } - return host, url.Port(), nil -} - // generateRandomAccounts generates n random account names. func generateRandomAccounts(n int) []string { accounts := make([]string, n) From 51096af579bfc4814887a891fdc3b76389e2b53b Mon Sep 17 00:00:00 2001 From: sweexordious Date: Thu, 7 Nov 2024 01:12:17 +0400 Subject: [PATCH 14/32] chore: memory optimisation --- core/fetcher.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/core/fetcher.go b/core/fetcher.go index 63adcfa47f..6673db3632 100644 --- a/core/fetcher.go +++ b/core/fetcher.go @@ -308,11 +308,14 @@ func receiveBlockByHash(streamer coregrpc.BlockAPI_BlockByHashClient) (*types.Bl return partsToBlock(parts) } +// partsToBlock takes a slice of parts and generates the corresponding block. +// It empties the slice to optimize the memory usage. +// TODO(@rach-id): decide whether to keep the memory optimisation func partsToBlock(parts []*tmproto.Part) (*types.Block, error) { partSet := types.NewPartSetFromHeader(types.PartSetHeader{ Total: uint32(len(parts)), }) - for _, part := range parts { + for i, part := range parts { ok, err := partSet.AddPartWithoutProof(&types.Part{Index: part.Index, Bytes: part.Bytes}) if err != nil { return nil, err @@ -320,16 +323,22 @@ func partsToBlock(parts []*tmproto.Part) (*types.Block, error) { if !ok { return nil, err } + // free up memory by clearing reference + parts[i] = nil } pbb := new(tmproto.Block) bz, err := io.ReadAll(partSet.GetReader()) if err != nil { return nil, err } + // free up memory by clearing reference + partSet = nil err = proto.Unmarshal(bz, pbb) if err != nil { return nil, err } + // free up memory by clearing reference + bz = nil block, err := types.BlockFromProto(pbb) if err != nil { return nil, err From fe541379b63f20c9ae74e50905c2a676a78e3a05 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Fri, 8 Nov 2024 13:43:08 +0400 Subject: [PATCH 15/32] chore: rename to first part --- core/fetcher.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/core/fetcher.go b/core/fetcher.go index 6673db3632..522ef67641 100644 --- a/core/fetcher.go +++ b/core/fetcher.go @@ -17,6 +17,9 @@ import ( const newBlockSubscriber = "NewBlock/Events" +type SignedBlock struct { +} + var ( log = logging.Logger("core") newDataSignedBlockQuery = types.QueryForEvent(types.EventSignedBlock).String() @@ -259,26 +262,26 @@ func receiveBlockByHeight(streamer coregrpc.BlockAPI_BlockByHeightClient) ( parts := make([]*tmproto.Part, 0) // receive the first part to get the block meta, commit, and validator set - resp, err := streamer.Recv() + firstPart, err := streamer.Recv() if err != nil { return nil, nil, nil, nil, err } - blockMeta, err := types.BlockMetaFromProto(resp.BlockMeta) + blockMeta, err := types.BlockMetaFromProto(firstPart.BlockMeta) if err != nil { return nil, nil, nil, nil, err } - commit, err := types.CommitFromProto(resp.Commit) + commit, err := types.CommitFromProto(firstPart.Commit) if err != nil { return nil, nil, nil, nil, err } - validatorSet, err := types.ValidatorSetFromProto(resp.ValidatorSet) + validatorSet, err := types.ValidatorSetFromProto(firstPart.ValidatorSet) if err != nil { return nil, nil, nil, nil, err } - parts = append(parts, resp.BlockPart) + parts = append(parts, firstPart.BlockPart) // receive the rest of the block - isLast := resp.IsLast + isLast := firstPart.IsLast for !isLast { resp, err := streamer.Recv() if err != nil { From 62c1dc716f3f7378c780cab2fd9902d2ce642c23 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Fri, 8 Nov 2024 13:46:18 +0400 Subject: [PATCH 16/32] feat!: use a local signed block --- core/exchange.go | 4 ++-- core/fetcher.go | 25 ++++++++++++++----------- 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/core/exchange.go b/core/exchange.go index 3f55bbf990..84099e81f6 100644 --- a/core/exchange.go +++ b/core/exchange.go @@ -173,12 +173,12 @@ func (ce *Exchange) getExtendedHeaderByHeight(ctx context.Context, height *int64 } log.Debugw("fetched signed block from core", "height", b.Header.Height) - eds, err := extendBlock(b.Data, b.Header.Version.App) + eds, err := extendBlock(*b.Data, b.Header.Version.App) if err != nil { return nil, fmt.Errorf("extending block data for height %d: %w", b.Header.Height, err) } // create extended header - eh, err := ce.construct(&b.Header, &b.Commit, &b.ValidatorSet, eds) + eh, err := ce.construct(b.Header, b.Commit, b.ValidatorSet, eds) if err != nil { panic(fmt.Errorf("constructing extended header for height %d: %w", b.Header.Height, err)) } diff --git a/core/fetcher.go b/core/fetcher.go index 522ef67641..954c90fe37 100644 --- a/core/fetcher.go +++ b/core/fetcher.go @@ -8,7 +8,6 @@ import ( "github.com/gogo/protobuf/proto" logging "github.com/ipfs/go-log/v2" tmproto "github.com/tendermint/tendermint/proto/tendermint/types" - coretypes "github.com/tendermint/tendermint/rpc/core/types" coregrpc "github.com/tendermint/tendermint/rpc/grpc" "github.com/tendermint/tendermint/types" @@ -18,6 +17,10 @@ import ( const newBlockSubscriber = "NewBlock/Events" type SignedBlock struct { + Header *types.Header `json:"header"` + Commit *types.Commit `json:"commit"` + Data *types.Data `json:"data"` + ValidatorSet *types.ValidatorSet `json:"validator_set"` } var ( @@ -119,7 +122,7 @@ func (f *BlockFetcher) resolveHeight(ctx context.Context, height *int64) (int64, // GetSignedBlock queries Core for a `Block` at the given height. // if the height is nil, use the latest height. -func (f *BlockFetcher) GetSignedBlock(ctx context.Context, height *int64) (*coretypes.ResultSignedBlock, error) { +func (f *BlockFetcher) GetSignedBlock(ctx context.Context, height *int64) (*SignedBlock, error) { blockHeight, err := f.resolveHeight(ctx, height) if err != nil { return nil, err @@ -133,11 +136,11 @@ func (f *BlockFetcher) GetSignedBlock(ctx context.Context, height *int64) (*core if err != nil { return nil, err } - return &coretypes.ResultSignedBlock{ - Header: block.Header, - Commit: *commit, - Data: block.Data, - ValidatorSet: *validatorSet, + return &SignedBlock{ + Header: &block.Header, + Commit: commit, + Data: &block.Data, + ValidatorSet: validatorSet, }, nil } @@ -226,10 +229,10 @@ func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (<-chan types } select { case signedBlockCh <- types.EventDataSignedBlock{ - Header: signedBlock.Header, - Commit: signedBlock.Commit, - ValidatorSet: signedBlock.ValidatorSet, - Data: signedBlock.Data, + Header: *signedBlock.Header, + Commit: *signedBlock.Commit, + ValidatorSet: *signedBlock.ValidatorSet, + Data: *signedBlock.Data, }: case <-ctx.Done(): return From 184391b56cc810addda515c0c235ec5f25e4ce7b Mon Sep 17 00:00:00 2001 From: sweexordious Date: Sat, 9 Nov 2024 00:27:19 +0400 Subject: [PATCH 17/32] chore: remove unnecessary select --- core/fetcher.go | 45 ++++++++++++++++++++------------------------- 1 file changed, 20 insertions(+), 25 deletions(-) diff --git a/core/fetcher.go b/core/fetcher.go index 954c90fe37..8e3eaf1397 100644 --- a/core/fetcher.go +++ b/core/fetcher.go @@ -3,13 +3,12 @@ package core import ( "context" "fmt" - "io" - "github.com/gogo/protobuf/proto" logging "github.com/ipfs/go-log/v2" tmproto "github.com/tendermint/tendermint/proto/tendermint/types" coregrpc "github.com/tendermint/tendermint/rpc/grpc" "github.com/tendermint/tendermint/types" + "io" libhead "github.com/celestiaorg/go-header" ) @@ -211,32 +210,28 @@ func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (<-chan types defer close(f.doneCh) defer close(signedBlockCh) for { + resp, err := subscription.Recv() + if err != nil { + // case where the context was not canceled but still received an error + if ctx.Err() == nil { + log.Errorw("fetcher: error receiving new height", "err", err.Error()) + } + return + } + signedBlock, err := f.GetSignedBlock(ctx, &resp.Height) + if err != nil { + log.Errorw("fetcher: error receiving signed block", "height", resp.Height, "err", err.Error()) + return + } select { + case signedBlockCh <- types.EventDataSignedBlock{ + Header: *signedBlock.Header, + Commit: *signedBlock.Commit, + ValidatorSet: *signedBlock.ValidatorSet, + Data: *signedBlock.Data, + }: case <-ctx.Done(): return - default: - resp, err := subscription.Recv() - if err != nil { - if ctx.Err() == nil { - log.Errorw("fetcher: error receiving new height", "err", err.Error()) - } - return - } - signedBlock, err := f.GetSignedBlock(ctx, &resp.Height) - if err != nil { - log.Errorw("fetcher: error receiving signed block", "height", resp.Height, "err", err.Error()) - return - } - select { - case signedBlockCh <- types.EventDataSignedBlock{ - Header: *signedBlock.Header, - Commit: *signedBlock.Commit, - ValidatorSet: *signedBlock.ValidatorSet, - Data: *signedBlock.Data, - }: - case <-ctx.Done(): - return - } } } }() From e11d3336fa7f46baceb73874c2420ef902e2e671 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Sat, 9 Nov 2024 00:27:36 +0400 Subject: [PATCH 18/32] chore: fmt --- core/fetcher.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/fetcher.go b/core/fetcher.go index 8e3eaf1397..51188e1121 100644 --- a/core/fetcher.go +++ b/core/fetcher.go @@ -3,12 +3,13 @@ package core import ( "context" "fmt" + "io" + "github.com/gogo/protobuf/proto" logging "github.com/ipfs/go-log/v2" tmproto "github.com/tendermint/tendermint/proto/tendermint/types" coregrpc "github.com/tendermint/tendermint/rpc/grpc" "github.com/tendermint/tendermint/types" - "io" libhead "github.com/celestiaorg/go-header" ) From b14cb16fb14874d89aa942ac5099e3e0180238da Mon Sep 17 00:00:00 2001 From: sweexordious Date: Mon, 11 Nov 2024 19:39:15 +0400 Subject: [PATCH 19/32] chore: go mod tidy --- go.sum | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/go.sum b/go.sum index 7a1f140965..785ffcd780 100644 --- a/go.sum +++ b/go.sum @@ -347,10 +347,10 @@ github.com/celestiaorg/blobstream-contracts/v3 v3.1.0 h1:h1Y4V3EMQ2mFmNtWt2sIhZI github.com/celestiaorg/blobstream-contracts/v3 v3.1.0/go.mod h1:x4DKyfKOSv1ZJM9NwV+Pw01kH2CD7N5zTFclXIVJ6GQ= github.com/celestiaorg/celestia-app/v3 v3.0.0-mocha h1:9tdQDaNgOfU56BueKq8i0Qte4FRmJJzG7woPTm6HHhk= github.com/celestiaorg/celestia-app/v3 v3.0.0-mocha/go.mod h1:K8U6TRHgofz0y5UcvlOL+CuNLbx4jeZrZF7HZdf+Rgs= -github.com/celestiaorg/celestia-core v1.43.0-tm-v0.34.35 h1:L4GTm+JUXhB0a/nGPMq6jEqqe6THuYSQ8m2kUCtZYqw= -github.com/celestiaorg/celestia-core v1.43.0-tm-v0.34.35/go.mod h1:bFr0lAGwaJ0mOHSBmib5/ca5pbBf1yKWGPs93Td0HPw= -github.com/celestiaorg/cosmos-sdk v1.25.0-sdk-v0.46.16 h1:f+fTe7GGk0/qgdzyqB8kk8EcDf9d6MC22khBTQiDXsU= -github.com/celestiaorg/cosmos-sdk v1.25.0-sdk-v0.46.16/go.mod h1:07Z8HJqS8Rw4XlZ+ok3D3NM/X/in8mvcGLvl0Zb5wrA= +github.com/celestiaorg/celestia-core v1.43.0-tm-v0.34.35.0.20241104132041-3d2b32b7ddf2 h1:V0Dga4zDWv6e8xl2nOXWTZr8QFbTXLasZGrziBYnygY= +github.com/celestiaorg/celestia-core v1.43.0-tm-v0.34.35.0.20241104132041-3d2b32b7ddf2/go.mod h1:bFr0lAGwaJ0mOHSBmib5/ca5pbBf1yKWGPs93Td0HPw= +github.com/celestiaorg/cosmos-sdk v1.25.0-sdk-v0.46.16.0.20241009080849-0cd36203b354 h1:3lqz1cEs0wx1PWQQezEaYttAMYMsdxU677Jh58keyyc= +github.com/celestiaorg/cosmos-sdk v1.25.0-sdk-v0.46.16.0.20241009080849-0cd36203b354/go.mod h1:xDj0DMkHYv8u4oMOV88QU7g2f9nbHjVYlJ3S/6HeTEs= github.com/celestiaorg/go-fraud v0.2.1 h1:oYhxI0gM/EpGRgbVQdRI/LSlqyT65g/WhQGSVGfx09w= github.com/celestiaorg/go-fraud v0.2.1/go.mod h1:lNY1i4K6kUeeE60Z2VK8WXd+qXb8KRzfBhvwPkK6aUc= github.com/celestiaorg/go-header v0.6.3 h1:VI+fsNxFLeUS7cNn0LgHP6Db66uslnKp/fgMg5nxqHg= @@ -598,8 +598,6 @@ github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4 h1:7HZCaLC5+BZpm github.com/facebookgo/subset v0.0.0-20200203212716-c811ad88dec4/go.mod h1:5tD+neXqOorC30/tWg0LCSkrqj/AR6gu8yY8/fpw1q0= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= -github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM= -github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE= github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/filecoin-project/go-jsonrpc v0.6.0 h1:/fFJIAN/k6EgY90m7qbyfY28woMwyseZmh2gVs5sYjY= @@ -917,8 +915,6 @@ github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9n github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48= github.com/hashicorp/go-getter v1.7.5 h1:dT58k9hQ/vbxNMwoI5+xFYAJuv6152UNvdHokfI5wE4= github.com/hashicorp/go-getter v1.7.5/go.mod h1:W7TalhMmbPmsSMdNjD0ZskARur/9GJ17cfHTRtXV744= -github.com/hashicorp/go-hclog v1.6.3 h1:Qr2kF+eVWjTiYmU7Y31tYlP1h0q/X3Nl3tPGdaB11/k= -github.com/hashicorp/go-hclog v1.6.3/go.mod h1:W4Qnvbt70Wk/zYJryRzDRU/4r0kIg0PVHBcfoyhpF5M= github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= github.com/hashicorp/go-immutable-radix v1.3.1 h1:DKHmCUm2hRBK510BaiZlwvpD40f8bJFeZnpfm2KLowc= github.com/hashicorp/go-immutable-radix v1.3.1/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= @@ -927,8 +923,6 @@ github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHh github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= github.com/hashicorp/go-retryablehttp v0.5.3/go.mod h1:9B5zBasrRhHXnJnui7y6sL7es7NDiJgTc6Er0maI1Xs= -github.com/hashicorp/go-retryablehttp v0.7.7 h1:C8hUCYzor8PIfXHa4UrZkU4VvK8o9ISHxT2Q8+VepXU= -github.com/hashicorp/go-retryablehttp v0.7.7/go.mod h1:pkQpWZeYWskR+D1tR2O5OcBFOxfA7DoAO6xtkuQnHTk= github.com/hashicorp/go-rootcerts v1.0.0/go.mod h1:K6zTfqpRlCUIjkwsN4Z+hiSfzSTQa6eBIzfwKfwNnHU= github.com/hashicorp/go-safetemp v1.0.0 h1:2HR189eFNrjHQyENnQMMpCiBAsRxzbTMIgBhEyExpmo= github.com/hashicorp/go-safetemp v1.0.0/go.mod h1:oaerMy3BhqiTbVye6QuFhFtIceqFoDHxNAB65b+Rj1I= From e51f2370eb35e4fc961a78c0df27fe714a8448ad Mon Sep 17 00:00:00 2001 From: sweexordious Date: Tue, 12 Nov 2024 11:09:47 +0400 Subject: [PATCH 20/32] chore: remove unnecessary =nil --- core/fetcher.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/core/fetcher.go b/core/fetcher.go index 51188e1121..5084cc0c7d 100644 --- a/core/fetcher.go +++ b/core/fetcher.go @@ -317,7 +317,7 @@ func partsToBlock(parts []*tmproto.Part) (*types.Block, error) { partSet := types.NewPartSetFromHeader(types.PartSetHeader{ Total: uint32(len(parts)), }) - for i, part := range parts { + for _, part := range parts { ok, err := partSet.AddPartWithoutProof(&types.Part{Index: part.Index, Bytes: part.Bytes}) if err != nil { return nil, err @@ -325,22 +325,16 @@ func partsToBlock(parts []*tmproto.Part) (*types.Block, error) { if !ok { return nil, err } - // free up memory by clearing reference - parts[i] = nil } pbb := new(tmproto.Block) bz, err := io.ReadAll(partSet.GetReader()) if err != nil { return nil, err } - // free up memory by clearing reference - partSet = nil err = proto.Unmarshal(bz, pbb) if err != nil { return nil, err } - // free up memory by clearing reference - bz = nil block, err := types.BlockFromProto(pbb) if err != nil { return nil, err From 221253f4945d9ed2fce31dae0995085885824c9a Mon Sep 17 00:00:00 2001 From: sweexordious Date: Tue, 12 Nov 2024 11:13:49 +0400 Subject: [PATCH 21/32] chore: fix test --- core/fetcher_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/fetcher_test.go b/core/fetcher_test.go index ecde519dbc..fe02b21073 100644 --- a/core/fetcher_test.go +++ b/core/fetcher_test.go @@ -30,10 +30,10 @@ func TestBlockFetcher_GetBlock_and_SubscribeNewBlockEvent(t *testing.T) { h := newBlockFromChan.Header.Height block, err := fetcher.GetSignedBlock(ctx, &h) require.NoError(t, err) - assert.Equal(t, newBlockFromChan.Data, block.Data) - assert.Equal(t, newBlockFromChan.Header, block.Header) - assert.Equal(t, newBlockFromChan.Commit, block.Commit) - assert.Equal(t, newBlockFromChan.ValidatorSet, block.ValidatorSet) + assert.Equal(t, newBlockFromChan.Data, *block.Data) + assert.Equal(t, newBlockFromChan.Header, *block.Header) + assert.Equal(t, newBlockFromChan.Commit, *block.Commit) + assert.Equal(t, newBlockFromChan.ValidatorSet, *block.ValidatorSet) require.GreaterOrEqual(t, newBlockFromChan.Header.Height, int64(i)) case <-ctx.Done(): require.NoError(t, ctx.Err()) From fe20093b27c1dbf8ee1c3678b8e43aaf301fa238 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Wed, 13 Nov 2024 00:02:54 +0400 Subject: [PATCH 22/32] chore: rename grpc port to port --- nodebuilder/config_test.go | 3 +-- nodebuilder/core/config.go | 14 ++++++------- nodebuilder/core/config_test.go | 18 ++++++++--------- nodebuilder/core/constructors.go | 2 +- nodebuilder/core/flags.go | 4 ++-- nodebuilder/core/flags_test.go | 34 ++++++++++++++++---------------- nodebuilder/node_test.go | 4 ++-- nodebuilder/state/core.go | 2 +- nodebuilder/tests/swamp/swamp.go | 2 +- state/core_access.go | 8 ++++---- 10 files changed, 45 insertions(+), 46 deletions(-) diff --git a/nodebuilder/config_test.go b/nodebuilder/config_test.go index 519466d88e..227fefd3c7 100644 --- a/nodebuilder/config_test.go +++ b/nodebuilder/config_test.go @@ -61,8 +61,7 @@ func TestUpdateConfig(t *testing.T) { var outdatedConfig = ` [Core] IP = "0.0.0.0" - RPCPort = "0" - GRPCPort = "0" + Port = "0" [State] DefaultKeyName = "thisshouldnthavechanged" diff --git a/nodebuilder/core/config.go b/nodebuilder/core/config.go index d60494431f..d13ab45f66 100644 --- a/nodebuilder/core/config.go +++ b/nodebuilder/core/config.go @@ -8,23 +8,23 @@ import ( ) const ( - DefaultGRPCPort = "9090" + DefaultPort = "9090" ) var MetricsEnabled bool // Config combines all configuration fields for managing the relationship with a Core node. type Config struct { - IP string - GRPCPort string + IP string + Port string } // DefaultConfig returns default configuration for managing the // node's connection to a Celestia-Core endpoint. func DefaultConfig() Config { return Config{ - IP: "", - GRPCPort: DefaultGRPCPort, + IP: "", + Port: DefaultPort, } } @@ -34,7 +34,7 @@ func (cfg *Config) Validate() error { return nil } - if cfg.GRPCPort == "" { + if cfg.Port == "" { return fmt.Errorf("nodebuilder/core: grpc port is not set") } @@ -43,7 +43,7 @@ func (cfg *Config) Validate() error { return err } cfg.IP = ip - _, err = strconv.Atoi(cfg.GRPCPort) + _, err = strconv.Atoi(cfg.Port) if err != nil { return fmt.Errorf("nodebuilder/core: invalid grpc port: %s", err.Error()) } diff --git a/nodebuilder/core/config_test.go b/nodebuilder/core/config_test.go index 60ee30262c..90994d6e15 100644 --- a/nodebuilder/core/config_test.go +++ b/nodebuilder/core/config_test.go @@ -15,8 +15,8 @@ func TestValidate(t *testing.T) { { name: "valid config", cfg: Config{ - IP: "127.0.0.1", - GRPCPort: DefaultGRPCPort, + IP: "127.0.0.1", + Port: DefaultPort, }, expectErr: false, }, @@ -28,8 +28,8 @@ func TestValidate(t *testing.T) { { name: "hostname preserved", cfg: Config{ - IP: "celestia.org", - GRPCPort: DefaultGRPCPort, + IP: "celestia.org", + Port: DefaultPort, }, expectErr: false, }, @@ -43,16 +43,16 @@ func TestValidate(t *testing.T) { { name: "invalid IP, but will be accepted as host and not raise an error", cfg: Config{ - IP: "invalid-ip", - GRPCPort: DefaultGRPCPort, + IP: "invalid-ip", + Port: DefaultPort, }, expectErr: false, }, { - name: "invalid GRPC port", + name: "invalid port", cfg: Config{ - IP: "127.0.0.1", - GRPCPort: "invalid-port", + IP: "127.0.0.1", + Port: "invalid-port", }, expectErr: true, }, diff --git a/nodebuilder/core/constructors.go b/nodebuilder/core/constructors.go index ba14f95d0e..f46f8ced02 100644 --- a/nodebuilder/core/constructors.go +++ b/nodebuilder/core/constructors.go @@ -5,5 +5,5 @@ import ( ) func remote(cfg Config) (core.Client, error) { - return core.NewRemote(cfg.IP, cfg.GRPCPort) + return core.NewRemote(cfg.IP, cfg.Port) } diff --git a/nodebuilder/core/flags.go b/nodebuilder/core/flags.go index b68d90d9af..ffe608f54c 100644 --- a/nodebuilder/core/flags.go +++ b/nodebuilder/core/flags.go @@ -25,7 +25,7 @@ func Flags() *flag.FlagSet { ) flags.String( coreGRPCFlag, - DefaultGRPCPort, + DefaultPort, "Set a custom gRPC port for the core node connection. The --core.ip flag must also be provided.", ) return flags @@ -46,7 +46,7 @@ func ParseFlags( if cmd.Flag(coreGRPCFlag).Changed { grpc := cmd.Flag(coreGRPCFlag).Value.String() - cfg.GRPCPort = grpc + cfg.Port = grpc } cfg.IP = coreIP diff --git a/nodebuilder/core/flags_test.go b/nodebuilder/core/flags_test.go index a2cdccc309..a1a11a9a23 100644 --- a/nodebuilder/core/flags_test.go +++ b/nodebuilder/core/flags_test.go @@ -20,8 +20,8 @@ func TestParseFlags(t *testing.T) { args: []string{}, inputCfg: Config{}, expectedCfg: Config{ - IP: "", - GRPCPort: "", + IP: "", + Port: "", }, expectError: false, }, @@ -29,11 +29,11 @@ func TestParseFlags(t *testing.T) { name: "only core.ip", args: []string{"--core.ip=127.0.0.1"}, inputCfg: Config{ - GRPCPort: DefaultGRPCPort, + Port: DefaultPort, }, expectedCfg: Config{ - IP: "127.0.0.1", - GRPCPort: DefaultGRPCPort, + IP: "127.0.0.1", + Port: DefaultPort, }, expectError: false, }, @@ -42,8 +42,8 @@ func TestParseFlags(t *testing.T) { args: []string{"--core.ip=127.0.0.1"}, inputCfg: Config{}, expectedCfg: Config{ - IP: "127.0.0.1", - GRPCPort: DefaultGRPCPort, + IP: "127.0.0.1", + Port: DefaultPort, }, expectError: true, }, @@ -51,12 +51,12 @@ func TestParseFlags(t *testing.T) { name: "no flags, values from input config.toml ", args: []string{}, inputCfg: Config{ - IP: "127.162.36.1", - GRPCPort: "5678", + IP: "127.162.36.1", + Port: "5678", }, expectedCfg: Config{ - IP: "127.162.36.1", - GRPCPort: "5678", + IP: "127.162.36.1", + Port: "5678", }, expectError: false, }, @@ -64,11 +64,11 @@ func TestParseFlags(t *testing.T) { name: "only core.ip, with config.toml overridden defaults for ports", args: []string{"--core.ip=127.0.0.1"}, inputCfg: Config{ - GRPCPort: "5678", + Port: "5678", }, expectedCfg: Config{ - IP: "127.0.0.1", - GRPCPort: "5678", + IP: "127.0.0.1", + Port: "5678", }, expectError: false, }, @@ -76,11 +76,11 @@ func TestParseFlags(t *testing.T) { name: "core.ip and core.grpc.port", args: []string{"--core.ip=127.0.0.1", "--core.grpc.port=54321"}, inputCfg: Config{ - GRPCPort: DefaultGRPCPort, + Port: DefaultPort, }, expectedCfg: Config{ - IP: "127.0.0.1", - GRPCPort: "54321", + IP: "127.0.0.1", + Port: "54321", }, expectError: false, }, diff --git a/nodebuilder/node_test.go b/nodebuilder/node_test.go index 2f5274ceaa..d78be08903 100644 --- a/nodebuilder/node_test.go +++ b/nodebuilder/node_test.go @@ -38,7 +38,7 @@ func TestLifecycle(t *testing.T) { cfg := DefaultConfig(tt.tp) cfg.Core.IP = host - cfg.Core.GRPCPort = port + cfg.Core.Port = port node := TestNodeWithConfig(t, tt.tp, cfg) require.NotNil(t, node) @@ -85,7 +85,7 @@ func TestLifecycle_WithMetrics(t *testing.T) { cfg := DefaultConfig(tt.tp) cfg.Core.IP = host - cfg.Core.GRPCPort = port + cfg.Core.Port = port node := TestNodeWithConfig( t, diff --git a/nodebuilder/state/core.go b/nodebuilder/state/core.go index 39ab732368..c7c89b3ae2 100644 --- a/nodebuilder/state/core.go +++ b/nodebuilder/state/core.go @@ -30,7 +30,7 @@ func coreAccessor( *modfraud.ServiceBreaker[*state.CoreAccessor, *header.ExtendedHeader], error, ) { - ca, err := state.NewCoreAccessor(keyring, string(keyname), sync, corecfg.IP, corecfg.GRPCPort, + ca, err := state.NewCoreAccessor(keyring, string(keyname), sync, corecfg.IP, corecfg.Port, network.String(), opts...) sBreaker := &modfraud.ServiceBreaker[*state.CoreAccessor, *header.ExtendedHeader]{ diff --git a/nodebuilder/tests/swamp/swamp.go b/nodebuilder/tests/swamp/swamp.go index f3299ca80c..2fdaf825a4 100644 --- a/nodebuilder/tests/swamp/swamp.go +++ b/nodebuilder/tests/swamp/swamp.go @@ -205,7 +205,7 @@ func (s *Swamp) DefaultTestConfig(tp node.Type) *nodebuilder.Config { require.NoError(s.t, err) cfg.Core.IP = ip - cfg.Core.GRPCPort = port + cfg.Core.Port = port return cfg } diff --git a/state/core_access.go b/state/core_access.go index a363577b1c..cdbe03f775 100644 --- a/state/core_access.go +++ b/state/core_access.go @@ -69,7 +69,7 @@ type CoreAccessor struct { coreConn *grpc.ClientConn coreIP string - grpcPort string + port string network string // these fields are mutatable and thus need to be protected by a mutex @@ -91,7 +91,7 @@ func NewCoreAccessor( keyname string, getter libhead.Head[*header.ExtendedHeader], coreIP, - grpcPort string, + port string, network string, options ...Option, ) (*CoreAccessor, error) { @@ -105,7 +105,7 @@ func NewCoreAccessor( defaultSignerAccount: keyname, getter: getter, coreIP: coreIP, - grpcPort: grpcPort, + port: port, prt: prt, network: network, } @@ -123,7 +123,7 @@ func (ca *CoreAccessor) Start(ctx context.Context) error { ca.ctx, ca.cancel = context.WithCancel(context.Background()) // dial given celestia-core endpoint - endpoint := fmt.Sprintf("%s:%s", ca.coreIP, ca.grpcPort) + endpoint := fmt.Sprintf("%s:%s", ca.coreIP, ca.port) client, err := grpc.NewClient( endpoint, grpc.WithTransportCredentials(insecure.NewCredentials()), From f371c33329d21daf3ee0ebc84d363fcfd778ea82 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Wed, 20 Nov 2024 21:01:07 +0100 Subject: [PATCH 23/32] chore: use net.JoinHostPort as suggested by @cristaloleg --- state/core_access.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/state/core_access.go b/state/core_access.go index cdbe03f775..1c127e20ce 100644 --- a/state/core_access.go +++ b/state/core_access.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "net" "sync" "time" @@ -123,7 +124,7 @@ func (ca *CoreAccessor) Start(ctx context.Context) error { ca.ctx, ca.cancel = context.WithCancel(context.Background()) // dial given celestia-core endpoint - endpoint := fmt.Sprintf("%s:%s", ca.coreIP, ca.port) + endpoint := net.JoinHostPort(ca.coreIP, ca.port) client, err := grpc.NewClient( endpoint, grpc.WithTransportCredentials(insecure.NewCredentials()), From 92c4f942f62503ef41cb40af50a835f463cea216 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Wed, 20 Nov 2024 21:17:46 +0100 Subject: [PATCH 24/32] chore: context timeout as suggested by @walldiss --- core/fetcher.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/fetcher.go b/core/fetcher.go index 5084cc0c7d..acfe72b876 100644 --- a/core/fetcher.go +++ b/core/fetcher.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "time" "github.com/gogo/protobuf/proto" logging "github.com/ipfs/go-log/v2" @@ -219,7 +220,9 @@ func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (<-chan types } return } - signedBlock, err := f.GetSignedBlock(ctx, &resp.Height) + withTimeout, ctxCancel := context.WithTimeout(ctx, 10*time.Second) + signedBlock, err := f.GetSignedBlock(withTimeout, &resp.Height) + ctxCancel() if err != nil { log.Errorw("fetcher: error receiving signed block", "height", resp.Height, "err", err.Error()) return From af7f2bedf48e714ffc93b6c326cb5c0fe2f52c03 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Thu, 21 Nov 2024 09:41:34 +0100 Subject: [PATCH 25/32] chore: remove unnecessary comment --- core/fetcher.go | 1 - 1 file changed, 1 deletion(-) diff --git a/core/fetcher.go b/core/fetcher.go index acfe72b876..cb0a3ad566 100644 --- a/core/fetcher.go +++ b/core/fetcher.go @@ -315,7 +315,6 @@ func receiveBlockByHash(streamer coregrpc.BlockAPI_BlockByHashClient) (*types.Bl // partsToBlock takes a slice of parts and generates the corresponding block. // It empties the slice to optimize the memory usage. -// TODO(@rach-id): decide whether to keep the memory optimisation func partsToBlock(parts []*tmproto.Part) (*types.Block, error) { partSet := types.NewPartSetFromHeader(types.PartSetHeader{ Total: uint32(len(parts)), From 7a6257044bf23f4c88e6a243ba8bb31a5da6def4 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Thu, 21 Nov 2024 10:52:59 +0100 Subject: [PATCH 26/32] feat!: remove pointers in requests --- core/fetcher.go | 53 +++++++++----------------------------------- core/fetcher_test.go | 2 +- go.mod | 2 +- go.sum | 4 ++-- 4 files changed, 15 insertions(+), 46 deletions(-) diff --git a/core/fetcher.go b/core/fetcher.go index cb0a3ad566..0e19c96204 100644 --- a/core/fetcher.go +++ b/core/fetcher.go @@ -54,7 +54,7 @@ func (f *BlockFetcher) Stop(ctx context.Context) error { } // GetBlockInfo queries Core for additional block information, like Commit and ValidatorSet. -func (f *BlockFetcher) GetBlockInfo(ctx context.Context, height *int64) (*types.Commit, *types.ValidatorSet, error) { +func (f *BlockFetcher) GetBlockInfo(ctx context.Context, height int64) (*types.Commit, *types.ValidatorSet, error) { commit, err := f.Commit(ctx, height) if err != nil { return nil, nil, fmt.Errorf("core/fetcher: getting commit at height %d: %w", height, err) @@ -65,7 +65,7 @@ func (f *BlockFetcher) GetBlockInfo(ctx context.Context, height *int64) (*types. // commit and getting the latest validator set. Therefore, it is // best to get the validator set at the latest commit's height to // prevent this potential inconsistency. - valSet, err := f.ValidatorSet(ctx, &commit.Height) + valSet, err := f.ValidatorSet(ctx, commit.Height) if err != nil { return nil, nil, fmt.Errorf("core/fetcher: getting validator set at height %d: %w", height, err) } @@ -75,13 +75,8 @@ func (f *BlockFetcher) GetBlockInfo(ctx context.Context, height *int64) (*types. // GetBlock queries Core for a `Block` at the given height. // if the height is nil, use the latest height -func (f *BlockFetcher) GetBlock(ctx context.Context, height *int64) (*types.Block, error) { - blockHeight, err := f.resolveHeight(ctx, height) - if err != nil { - return nil, err - } - - stream, err := f.client.BlockByHeight(ctx, &coregrpc.BlockByHeightRequest{Height: blockHeight}) +func (f *BlockFetcher) GetBlock(ctx context.Context, height int64) (*types.Block, error) { + stream, err := f.client.BlockByHeight(ctx, &coregrpc.BlockByHeightRequest{Height: height}) if err != nil { return nil, err } @@ -108,28 +103,10 @@ func (f *BlockFetcher) GetBlockByHash(ctx context.Context, hash libhead.Hash) (* return block, nil } -// resolveHeight takes a height pointer and returns its value if it's not nil. -// otherwise, returns the latest height. -func (f *BlockFetcher) resolveHeight(ctx context.Context, height *int64) (int64, error) { - if height != nil { - return *height, nil - } - status, err := f.client.Status(ctx, &coregrpc.StatusRequest{}) - if err != nil { - return 0, err - } - return status.SyncInfo.LatestBlockHeight, nil -} - // GetSignedBlock queries Core for a `Block` at the given height. // if the height is nil, use the latest height. -func (f *BlockFetcher) GetSignedBlock(ctx context.Context, height *int64) (*SignedBlock, error) { - blockHeight, err := f.resolveHeight(ctx, height) - if err != nil { - return nil, err - } - - stream, err := f.client.BlockByHeight(ctx, &coregrpc.BlockByHeightRequest{Height: blockHeight}) +func (f *BlockFetcher) GetSignedBlock(ctx context.Context, height int64) (*SignedBlock, error) { + stream, err := f.client.BlockByHeight(ctx, &coregrpc.BlockByHeightRequest{Height: height}) if err != nil { return nil, err } @@ -148,12 +125,8 @@ func (f *BlockFetcher) GetSignedBlock(ctx context.Context, height *int64) (*Sign // Commit queries Core for a `Commit` from the block at // the given height. // If the height is nil, use the latest height. -func (f *BlockFetcher) Commit(ctx context.Context, height *int64) (*types.Commit, error) { - blockHeight, err := f.resolveHeight(ctx, height) - if err != nil { - return nil, err - } - res, err := f.client.Commit(ctx, &coregrpc.CommitRequest{Height: blockHeight}) +func (f *BlockFetcher) Commit(ctx context.Context, height int64) (*types.Commit, error) { + res, err := f.client.Commit(ctx, &coregrpc.CommitRequest{Height: height}) if err != nil { return nil, err } @@ -173,12 +146,8 @@ func (f *BlockFetcher) Commit(ctx context.Context, height *int64) (*types.Commit // ValidatorSet queries Core for the ValidatorSet from the // block at the given height. // If the height is nil, use the latest height. -func (f *BlockFetcher) ValidatorSet(ctx context.Context, height *int64) (*types.ValidatorSet, error) { - blockHeight, err := f.resolveHeight(ctx, height) - if err != nil { - return nil, err - } - res, err := f.client.ValidatorSet(ctx, &coregrpc.ValidatorSetRequest{Height: blockHeight}) +func (f *BlockFetcher) ValidatorSet(ctx context.Context, height int64) (*types.ValidatorSet, error) { + res, err := f.client.ValidatorSet(ctx, &coregrpc.ValidatorSetRequest{Height: height}) if err != nil { return nil, err } @@ -221,7 +190,7 @@ func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (<-chan types return } withTimeout, ctxCancel := context.WithTimeout(ctx, 10*time.Second) - signedBlock, err := f.GetSignedBlock(withTimeout, &resp.Height) + signedBlock, err := f.GetSignedBlock(withTimeout, resp.Height) ctxCancel() if err != nil { log.Errorw("fetcher: error receiving signed block", "height", resp.Height, "err", err.Error()) diff --git a/core/fetcher_test.go b/core/fetcher_test.go index fe02b21073..ea3c78cd16 100644 --- a/core/fetcher_test.go +++ b/core/fetcher_test.go @@ -28,7 +28,7 @@ func TestBlockFetcher_GetBlock_and_SubscribeNewBlockEvent(t *testing.T) { select { case newBlockFromChan := <-newBlockChan: h := newBlockFromChan.Header.Height - block, err := fetcher.GetSignedBlock(ctx, &h) + block, err := fetcher.GetSignedBlock(ctx, h) require.NoError(t, err) assert.Equal(t, newBlockFromChan.Data, *block.Data) assert.Equal(t, newBlockFromChan.Header, *block.Header) diff --git a/go.mod b/go.mod index 68106733c4..c53b10e1ba 100644 --- a/go.mod +++ b/go.mod @@ -358,5 +358,5 @@ replace ( github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1 // broken goleveldb needs to be replaced for the cosmos-sdk and celestia-app github.com/syndtr/goleveldb => github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 - github.com/tendermint/tendermint => github.com/celestiaorg/celestia-core v1.43.0-tm-v0.34.35.0.20241104132041-3d2b32b7ddf2 + github.com/tendermint/tendermint => github.com/celestiaorg/celestia-core v1.42.0-tm-v0.34.35.0.20241121094233-6291264239f8 ) diff --git a/go.sum b/go.sum index 785ffcd780..83856d82c1 100644 --- a/go.sum +++ b/go.sum @@ -347,8 +347,8 @@ github.com/celestiaorg/blobstream-contracts/v3 v3.1.0 h1:h1Y4V3EMQ2mFmNtWt2sIhZI github.com/celestiaorg/blobstream-contracts/v3 v3.1.0/go.mod h1:x4DKyfKOSv1ZJM9NwV+Pw01kH2CD7N5zTFclXIVJ6GQ= github.com/celestiaorg/celestia-app/v3 v3.0.0-mocha h1:9tdQDaNgOfU56BueKq8i0Qte4FRmJJzG7woPTm6HHhk= github.com/celestiaorg/celestia-app/v3 v3.0.0-mocha/go.mod h1:K8U6TRHgofz0y5UcvlOL+CuNLbx4jeZrZF7HZdf+Rgs= -github.com/celestiaorg/celestia-core v1.43.0-tm-v0.34.35.0.20241104132041-3d2b32b7ddf2 h1:V0Dga4zDWv6e8xl2nOXWTZr8QFbTXLasZGrziBYnygY= -github.com/celestiaorg/celestia-core v1.43.0-tm-v0.34.35.0.20241104132041-3d2b32b7ddf2/go.mod h1:bFr0lAGwaJ0mOHSBmib5/ca5pbBf1yKWGPs93Td0HPw= +github.com/celestiaorg/celestia-core v1.42.0-tm-v0.34.35.0.20241121094233-6291264239f8 h1:yb4wsYRCMFgZDG4VuIuOcscw+A6L+gJrDzoBtaM4BFU= +github.com/celestiaorg/celestia-core v1.42.0-tm-v0.34.35.0.20241121094233-6291264239f8/go.mod h1:/fK0n3ps09t5uErBQe1QZbrE81L81MNUzWpFyWQLDT0= github.com/celestiaorg/cosmos-sdk v1.25.0-sdk-v0.46.16.0.20241009080849-0cd36203b354 h1:3lqz1cEs0wx1PWQQezEaYttAMYMsdxU677Jh58keyyc= github.com/celestiaorg/cosmos-sdk v1.25.0-sdk-v0.46.16.0.20241009080849-0cd36203b354/go.mod h1:xDj0DMkHYv8u4oMOV88QU7g2f9nbHjVYlJ3S/6HeTEs= github.com/celestiaorg/go-fraud v0.2.1 h1:oYhxI0gM/EpGRgbVQdRI/LSlqyT65g/WhQGSVGfx09w= From a863f6b9a9dcc376b085157f7785f07187b68a23 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Thu, 21 Nov 2024 11:21:53 +0100 Subject: [PATCH 27/32] chore: bump version --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index c53b10e1ba..237d1f8259 100644 --- a/go.mod +++ b/go.mod @@ -358,5 +358,5 @@ replace ( github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1 // broken goleveldb needs to be replaced for the cosmos-sdk and celestia-app github.com/syndtr/goleveldb => github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 - github.com/tendermint/tendermint => github.com/celestiaorg/celestia-core v1.42.0-tm-v0.34.35.0.20241121094233-6291264239f8 + github.com/tendermint/tendermint => github.com/celestiaorg/celestia-core v1.43.0-tm-v0.34.35.0.20241121101943-d2e0e6e29b52 ) diff --git a/go.sum b/go.sum index 83856d82c1..501bae485e 100644 --- a/go.sum +++ b/go.sum @@ -347,8 +347,8 @@ github.com/celestiaorg/blobstream-contracts/v3 v3.1.0 h1:h1Y4V3EMQ2mFmNtWt2sIhZI github.com/celestiaorg/blobstream-contracts/v3 v3.1.0/go.mod h1:x4DKyfKOSv1ZJM9NwV+Pw01kH2CD7N5zTFclXIVJ6GQ= github.com/celestiaorg/celestia-app/v3 v3.0.0-mocha h1:9tdQDaNgOfU56BueKq8i0Qte4FRmJJzG7woPTm6HHhk= github.com/celestiaorg/celestia-app/v3 v3.0.0-mocha/go.mod h1:K8U6TRHgofz0y5UcvlOL+CuNLbx4jeZrZF7HZdf+Rgs= -github.com/celestiaorg/celestia-core v1.42.0-tm-v0.34.35.0.20241121094233-6291264239f8 h1:yb4wsYRCMFgZDG4VuIuOcscw+A6L+gJrDzoBtaM4BFU= -github.com/celestiaorg/celestia-core v1.42.0-tm-v0.34.35.0.20241121094233-6291264239f8/go.mod h1:/fK0n3ps09t5uErBQe1QZbrE81L81MNUzWpFyWQLDT0= +github.com/celestiaorg/celestia-core v1.43.0-tm-v0.34.35.0.20241121101943-d2e0e6e29b52 h1:oq2/YZa8EnQGk9dH4ggpFn0dpeevDLIgu/sA08Qonf4= +github.com/celestiaorg/celestia-core v1.43.0-tm-v0.34.35.0.20241121101943-d2e0e6e29b52/go.mod h1:bFr0lAGwaJ0mOHSBmib5/ca5pbBf1yKWGPs93Td0HPw= github.com/celestiaorg/cosmos-sdk v1.25.0-sdk-v0.46.16.0.20241009080849-0cd36203b354 h1:3lqz1cEs0wx1PWQQezEaYttAMYMsdxU677Jh58keyyc= github.com/celestiaorg/cosmos-sdk v1.25.0-sdk-v0.46.16.0.20241009080849-0cd36203b354/go.mod h1:xDj0DMkHYv8u4oMOV88QU7g2f9nbHjVYlJ3S/6HeTEs= github.com/celestiaorg/go-fraud v0.2.1 h1:oYhxI0gM/EpGRgbVQdRI/LSlqyT65g/WhQGSVGfx09w= From b5a545f1f09d5ba8be2c2466afbfd864739c3913 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Thu, 21 Nov 2024 11:39:04 +0100 Subject: [PATCH 28/32] chore: fix remaining implementation to support removing pointers --- core/exchange.go | 14 +++++--------- core/exchange_test.go | 4 ++-- core/fetcher_no_race_test.go | 4 ++-- core/header_test.go | 4 ++-- 4 files changed, 11 insertions(+), 15 deletions(-) diff --git a/core/exchange.go b/core/exchange.go index 84099e81f6..b3f1132375 100644 --- a/core/exchange.go +++ b/core/exchange.go @@ -60,8 +60,7 @@ func NewExchange( func (ce *Exchange) GetByHeight(ctx context.Context, height uint64) (*header.ExtendedHeader, error) { log.Debugw("requesting header", "height", height) - intHeight := int64(height) - return ce.getExtendedHeaderByHeight(ctx, &intHeight) + return ce.getExtendedHeaderByHeight(ctx, int64(height)) } func (ce *Exchange) GetRangeByHeight( @@ -127,7 +126,7 @@ func (ce *Exchange) Get(ctx context.Context, hash libhead.Hash) (*header.Extende return nil, fmt.Errorf("fetching block by hash %s: %w", hash.String(), err) } - comm, vals, err := ce.fetcher.GetBlockInfo(ctx, &block.Height) + comm, vals, err := ce.fetcher.GetBlockInfo(ctx, block.Height) if err != nil { return nil, fmt.Errorf("fetching block info for height %d: %w", &block.Height, err) } @@ -160,16 +159,13 @@ func (ce *Exchange) Head( _ ...libhead.HeadOption[*header.ExtendedHeader], ) (*header.ExtendedHeader, error) { log.Debug("requesting head") - return ce.getExtendedHeaderByHeight(ctx, nil) + return ce.getExtendedHeaderByHeight(ctx, 0) } -func (ce *Exchange) getExtendedHeaderByHeight(ctx context.Context, height *int64) (*header.ExtendedHeader, error) { +func (ce *Exchange) getExtendedHeaderByHeight(ctx context.Context, height int64) (*header.ExtendedHeader, error) { b, err := ce.fetcher.GetSignedBlock(ctx, height) if err != nil { - if height == nil { - return nil, fmt.Errorf("fetching signed block for head from core: %w", err) - } - return nil, fmt.Errorf("fetching signed block at height %d from core: %w", *height, err) + return nil, fmt.Errorf("fetching signed block at height %d from core: %w", height, err) } log.Debugw("fetched signed block from core", "height", b.Header.Height) diff --git a/core/exchange_test.go b/core/exchange_test.go index 88c50eac77..6cffd62854 100644 --- a/core/exchange_test.go +++ b/core/exchange_test.go @@ -36,7 +36,7 @@ func TestCoreExchange_RequestHeaders(t *testing.T) { // initialize store with genesis block genHeight := int64(1) - genBlock, err := fetcher.GetBlock(ctx, &genHeight) + genBlock, err := fetcher.GetBlock(ctx, genHeight) require.NoError(t, err) genHeader, err := ce.Get(ctx, genBlock.Header.Hash().Bytes()) require.NoError(t, err) @@ -90,7 +90,7 @@ func TestExchange_DoNotStoreHistoric(t *testing.T) { // initialize store with genesis block genHeight := int64(1) - genBlock, err := fetcher.GetBlock(ctx, &genHeight) + genBlock, err := fetcher.GetBlock(ctx, genHeight) require.NoError(t, err) genHeader, err := ce.Get(ctx, genBlock.Header.Hash().Bytes()) require.NoError(t, err) diff --git a/core/fetcher_no_race_test.go b/core/fetcher_no_race_test.go index 3b8a7660e4..c8bdb77aff 100644 --- a/core/fetcher_no_race_test.go +++ b/core/fetcher_no_race_test.go @@ -38,10 +38,10 @@ func TestBlockFetcherHeaderValues(t *testing.T) { require.NoError(t, ctx.Err()) } // get Commit from current height - commit, err := fetcher.Commit(ctx, &h) + commit, err := fetcher.Commit(ctx, h) require.NoError(t, err) // get ValidatorSet from current height - valSet, err := fetcher.ValidatorSet(ctx, &h) + valSet, err := fetcher.ValidatorSet(ctx, h) require.NoError(t, err) // get next block var nextBlock types.EventDataSignedBlock diff --git a/core/header_test.go b/core/header_test.go index 951aa4c943..6fb51cf3f3 100644 --- a/core/header_test.go +++ b/core/header_test.go @@ -32,10 +32,10 @@ func TestMakeExtendedHeaderForEmptyBlock(t *testing.T) { <-sub height := int64(1) - b, err := fetcher.GetBlock(ctx, &height) + b, err := fetcher.GetBlock(ctx, height) require.NoError(t, err) - comm, val, err := fetcher.GetBlockInfo(ctx, &height) + comm, val, err := fetcher.GetBlockInfo(ctx, height) require.NoError(t, err) eds, err := extendBlock(b.Data, b.Header.Version.App) From a10ac73306c81e6dc36a1e4d6770f7cab80fcb7e Mon Sep 17 00:00:00 2001 From: sweexordious Date: Thu, 21 Nov 2024 11:44:44 +0100 Subject: [PATCH 29/32] chore: lint --- header/header.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/header/header.go b/header/header.go index b44447ce42..122b46558f 100644 --- a/header/header.go +++ b/header/header.go @@ -230,7 +230,7 @@ func (eh *ExtendedHeader) UnmarshalBinary(data []byte) error { // Uses tendermint encoder for tendermint compatibility. func (eh *ExtendedHeader) MarshalJSON() ([]byte, error) { // alias the type to avoid going into recursion loop - // because tmjson.Marshal invokes custom json marshalling + // because tmjson.Marshal invokes custom json marshaling type Alias ExtendedHeader return tmjson.Marshal((*Alias)(eh)) } From d4243fa6f9cf29c4a798bc1e2da245f459cc9f13 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Thu, 21 Nov 2024 11:48:09 +0100 Subject: [PATCH 30/32] chore: check if fetcher is listening for new blocks --- core/fetcher.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/core/fetcher.go b/core/fetcher.go index 0e19c96204..bf6ad649d2 100644 --- a/core/fetcher.go +++ b/core/fetcher.go @@ -32,8 +32,9 @@ var ( type BlockFetcher struct { client Client - doneCh chan struct{} - cancel context.CancelFunc + doneCh chan struct{} + cancel context.CancelFunc + isListeningForBlocks bool } // NewBlockFetcher returns a new `BlockFetcher`. @@ -167,9 +168,14 @@ func (f *BlockFetcher) ValidatorSet(ctx context.Context, height int64) (*types.V // SubscribeNewBlockEvent subscribes to new block events from Core, returning // a new block event channel on success. func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (<-chan types.EventDataSignedBlock, error) { + if f.isListeningForBlocks { + return nil, fmt.Errorf("already subscribed to new blocks") + } ctx, cancel := context.WithCancel(ctx) f.cancel = cancel f.doneCh = make(chan struct{}) + f.isListeningForBlocks = true + defer func() { f.isListeningForBlocks = false }() subscription, err := f.client.SubscribeNewHeights(ctx, &coregrpc.SubscribeNewHeightsRequest{}) if err != nil { From 511068ad5ea3911e7238a924aba9035325897d63 Mon Sep 17 00:00:00 2001 From: sweexordious Date: Fri, 22 Nov 2024 21:51:05 +0100 Subject: [PATCH 31/32] fix: correctly set the is listening for blocks to false --- core/fetcher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/fetcher.go b/core/fetcher.go index bf6ad649d2..ee2da7e65b 100644 --- a/core/fetcher.go +++ b/core/fetcher.go @@ -175,7 +175,6 @@ func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (<-chan types f.cancel = cancel f.doneCh = make(chan struct{}) f.isListeningForBlocks = true - defer func() { f.isListeningForBlocks = false }() subscription, err := f.client.SubscribeNewHeights(ctx, &coregrpc.SubscribeNewHeightsRequest{}) if err != nil { @@ -186,6 +185,7 @@ func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (<-chan types go func() { defer close(f.doneCh) defer close(signedBlockCh) + defer func() { f.isListeningForBlocks = false }() for { resp, err := subscription.Recv() if err != nil { From a033feb98bbe57abad444a34d63e8f72369fd0cd Mon Sep 17 00:00:00 2001 From: sweexordious Date: Sat, 23 Nov 2024 00:05:02 +0100 Subject: [PATCH 32/32] feat: support stopping the gRPC connection + subscription retry --- core/client.go | 62 +++++++++++++++++++-- core/exchange_test.go | 6 +- core/fetcher.go | 94 ++++++++++++++++++++++++-------- core/fetcher_no_race_test.go | 5 +- core/fetcher_test.go | 5 +- core/header_test.go | 5 +- go.mod | 2 +- go.sum | 4 +- nodebuilder/core/constructors.go | 4 +- nodebuilder/core/module.go | 6 ++ nodebuilder/core/opts.go | 6 +- nodebuilder/node_bridge_test.go | 6 +- nodebuilder/testing.go | 2 +- nodebuilder/tests/swamp/swamp.go | 11 ++-- 14 files changed, 164 insertions(+), 54 deletions(-) diff --git a/core/client.go b/core/client.go index 40d0d952a0..2bef6dba0c 100644 --- a/core/client.go +++ b/core/client.go @@ -4,12 +4,64 @@ import ( "fmt" coregrpc "github.com/tendermint/tendermint/rpc/grpc" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" ) -// Client is an alias to Core Client. -type Client = coregrpc.BlockAPIClient +// Client is a core gRPC client. +type Client struct { + coregrpc.BlockAPIClient + host, port string + conn *grpc.ClientConn +} + +// NewClient creates a new Client that communicates with a remote Core endpoint over gRPC. +// The connection is not started when creating the client. +// Use the Start method to start the connection. +func NewClient(host, port string) *Client { + return &Client{ + host: host, + port: port, + } +} + +// Start created the Client's gRPC connection with optional dial options. +// If the connection is already started, it does nothing. +func (c *Client) Start(opts ...grpc.DialOption) error { + if c.IsRunning() { + return nil + } + if len(opts) == 0 { + opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) + } + conn, err := grpc.NewClient( + fmt.Sprintf("%s:%s", c.host, c.port), + opts..., + ) + if err != nil { + return err + } + c.conn = conn + + c.BlockAPIClient = coregrpc.NewBlockAPIClient(conn) + return nil +} + +// IsRunning checks if the client's connection is established and ready for use. +// It returns true if the connection is active, false otherwise. +func (c *Client) IsRunning() bool { + return c.conn != nil && c.BlockAPIClient != nil +} -// NewRemote creates a new Client that communicates with a remote Core endpoint over gRPC. -func NewRemote(ip, port string) (Client, error) { - return coregrpc.StartBlockAPIGRPCClient(fmt.Sprintf("tcp://%s:%s", ip, port)) +// Stop terminates the Client's gRPC connection and releases all related resources. +// If the connection is already stopped, it does nothing. +func (c *Client) Stop() error { + if !c.IsRunning() { + return nil + } + defer func() { + c.conn = nil + c.BlockAPIClient = nil + }() + return c.conn.Close() } diff --git a/core/exchange_test.go b/core/exchange_test.go index 6cffd62854..d71e3ffaf7 100644 --- a/core/exchange_test.go +++ b/core/exchange_test.go @@ -122,9 +122,11 @@ func createCoreFetcher(t *testing.T, cfg *testnode.Config) (*BlockFetcher, testn require.NoError(t, err) host, port, err := net.SplitHostPort(cctx.GRPCClient.Target()) require.NoError(t, err) - blockAPIClient, err := NewRemote(host, port) + client := NewClient(host, port) + require.NoError(t, client.Start()) + fetcher, err := NewBlockFetcher(client) require.NoError(t, err) - return NewBlockFetcher(blockAPIClient), cctx + return fetcher, cctx } // fillBlocks fills blocks until the context is canceled. diff --git a/core/fetcher.go b/core/fetcher.go index ee2da7e65b..287c647dc6 100644 --- a/core/fetcher.go +++ b/core/fetcher.go @@ -2,6 +2,7 @@ package core import ( "context" + "errors" "fmt" "io" "time" @@ -30,7 +31,7 @@ var ( ) type BlockFetcher struct { - client Client + client *Client doneCh chan struct{} cancel context.CancelFunc @@ -38,12 +39,14 @@ type BlockFetcher struct { } // NewBlockFetcher returns a new `BlockFetcher`. -func NewBlockFetcher(client Client) *BlockFetcher { +func NewBlockFetcher(client *Client) (*BlockFetcher, error) { return &BlockFetcher{ client: client, - } + }, nil } +// Stop stops the block fetcher. +// The underlying gRPC connection needs to be stopped separately. func (f *BlockFetcher) Stop(ctx context.Context) error { f.cancel() select { @@ -56,6 +59,10 @@ func (f *BlockFetcher) Stop(ctx context.Context) error { // GetBlockInfo queries Core for additional block information, like Commit and ValidatorSet. func (f *BlockFetcher) GetBlockInfo(ctx context.Context, height int64) (*types.Commit, *types.ValidatorSet, error) { + // return error if the client is still not started + if !f.client.IsRunning() { + return nil, nil, errors.New("client not running") + } commit, err := f.Commit(ctx, height) if err != nil { return nil, nil, fmt.Errorf("core/fetcher: getting commit at height %d: %w", height, err) @@ -77,6 +84,10 @@ func (f *BlockFetcher) GetBlockInfo(ctx context.Context, height int64) (*types.C // GetBlock queries Core for a `Block` at the given height. // if the height is nil, use the latest height func (f *BlockFetcher) GetBlock(ctx context.Context, height int64) (*types.Block, error) { + // return error if the client is still not started + if !f.client.IsRunning() { + return nil, errors.New("client not running") + } stream, err := f.client.BlockByHeight(ctx, &coregrpc.BlockByHeightRequest{Height: height}) if err != nil { return nil, err @@ -89,6 +100,10 @@ func (f *BlockFetcher) GetBlock(ctx context.Context, height int64) (*types.Block } func (f *BlockFetcher) GetBlockByHash(ctx context.Context, hash libhead.Hash) (*types.Block, error) { + // return error if the client is still not started + if !f.client.IsRunning() { + return nil, errors.New("client not running") + } if hash == nil { return nil, fmt.Errorf("cannot get block with nil hash") } @@ -107,6 +122,10 @@ func (f *BlockFetcher) GetBlockByHash(ctx context.Context, hash libhead.Hash) (* // GetSignedBlock queries Core for a `Block` at the given height. // if the height is nil, use the latest height. func (f *BlockFetcher) GetSignedBlock(ctx context.Context, height int64) (*SignedBlock, error) { + // return error if the client is still not started + if !f.client.IsRunning() { + return nil, errors.New("client not running") + } stream, err := f.client.BlockByHeight(ctx, &coregrpc.BlockByHeightRequest{Height: height}) if err != nil { return nil, err @@ -127,6 +146,10 @@ func (f *BlockFetcher) GetSignedBlock(ctx context.Context, height int64) (*Signe // the given height. // If the height is nil, use the latest height. func (f *BlockFetcher) Commit(ctx context.Context, height int64) (*types.Commit, error) { + // return error if the client is still not started + if !f.client.IsRunning() { + return nil, errors.New("client not running") + } res, err := f.client.Commit(ctx, &coregrpc.CommitRequest{Height: height}) if err != nil { return nil, err @@ -148,6 +171,10 @@ func (f *BlockFetcher) Commit(ctx context.Context, height int64) (*types.Commit, // block at the given height. // If the height is nil, use the latest height. func (f *BlockFetcher) ValidatorSet(ctx context.Context, height int64) (*types.ValidatorSet, error) { + // return error if the client is still not started + if !f.client.IsRunning() { + return nil, errors.New("client not running") + } res, err := f.client.ValidatorSet(ctx, &coregrpc.ValidatorSetRequest{Height: height}) if err != nil { return nil, err @@ -168,6 +195,10 @@ func (f *BlockFetcher) ValidatorSet(ctx context.Context, height int64) (*types.V // SubscribeNewBlockEvent subscribes to new block events from Core, returning // a new block event channel on success. func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (<-chan types.EventDataSignedBlock, error) { + // return error if the client is still not started + if !f.client.IsRunning() { + return nil, errors.New("client not running") + } if f.isListeningForBlocks { return nil, fmt.Errorf("already subscribed to new blocks") } @@ -187,30 +218,41 @@ func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (<-chan types defer close(signedBlockCh) defer func() { f.isListeningForBlocks = false }() for { - resp, err := subscription.Recv() - if err != nil { - // case where the context was not canceled but still received an error - if ctx.Err() == nil { - log.Errorw("fetcher: error receiving new height", "err", err.Error()) - } - return - } - withTimeout, ctxCancel := context.WithTimeout(ctx, 10*time.Second) - signedBlock, err := f.GetSignedBlock(withTimeout, resp.Height) - ctxCancel() - if err != nil { - log.Errorw("fetcher: error receiving signed block", "height", resp.Height, "err", err.Error()) - return - } select { - case signedBlockCh <- types.EventDataSignedBlock{ - Header: *signedBlock.Header, - Commit: *signedBlock.Commit, - ValidatorSet: *signedBlock.ValidatorSet, - Data: *signedBlock.Data, - }: case <-ctx.Done(): return + default: + resp, err := subscription.Recv() + if err != nil { + // case where the context was not canceled but still received an error + if ctx.Err() == nil { + log.Errorw("fetcher: error receiving new height", "err", err.Error()) + // sleeping a bit to avoid retrying instantly and give time for the gRPC connection + // to recover automatically. + time.Sleep(time.Second) + } + continue + } + withTimeout, ctxCancel := context.WithTimeout(ctx, 10*time.Second) + signedBlock, err := f.GetSignedBlock(withTimeout, resp.Height) + ctxCancel() + if err != nil { + log.Errorw("fetcher: error receiving signed block", "height", resp.Height, "err", err.Error()) + // sleeping a bit to avoid retrying instantly and give time for the gRPC connection + // to recover automatically. + time.Sleep(time.Second) + continue + } + select { + case signedBlockCh <- types.EventDataSignedBlock{ + Header: *signedBlock.Header, + Commit: *signedBlock.Commit, + ValidatorSet: *signedBlock.ValidatorSet, + Data: *signedBlock.Data, + }: + case <-ctx.Done(): + return + } } } }() @@ -222,6 +264,10 @@ func (f *BlockFetcher) SubscribeNewBlockEvent(ctx context.Context) (<-chan types // syncing, and false for already caught up. It can also return an error // in the case of a failed status request. func (f *BlockFetcher) IsSyncing(ctx context.Context) (bool, error) { + // return error if the client is still not started + if !f.client.IsRunning() { + return false, errors.New("client not running") + } resp, err := f.client.Status(ctx, &coregrpc.StatusRequest{}) if err != nil { return false, err diff --git a/core/fetcher_no_race_test.go b/core/fetcher_no_race_test.go index c8bdb77aff..2f34e4bc05 100644 --- a/core/fetcher_no_race_test.go +++ b/core/fetcher_no_race_test.go @@ -22,9 +22,10 @@ func TestBlockFetcherHeaderValues(t *testing.T) { node := StartTestNode(t) host, port, err := net.SplitHostPort(node.GRPCClient.Target()) require.NoError(t, err) - blockAPIClient, err := NewRemote(host, port) + client := NewClient(host, port) + require.NoError(t, client.Start()) + fetcher, err := NewBlockFetcher(client) require.NoError(t, err) - fetcher := NewBlockFetcher(blockAPIClient) // generate some blocks newBlockChan, err := fetcher.SubscribeNewBlockEvent(ctx) diff --git a/core/fetcher_test.go b/core/fetcher_test.go index ea3c78cd16..14eeab0bd7 100644 --- a/core/fetcher_test.go +++ b/core/fetcher_test.go @@ -16,9 +16,10 @@ func TestBlockFetcher_GetBlock_and_SubscribeNewBlockEvent(t *testing.T) { host, port, err := net.SplitHostPort(StartTestNode(t).GRPCClient.Target()) require.NoError(t, err) - blockAPIClient, err := NewRemote(host, port) + client := NewClient(host, port) + require.NoError(t, client.Start()) + fetcher, err := NewBlockFetcher(client) require.NoError(t, err) - fetcher := NewBlockFetcher(blockAPIClient) // generate some blocks newBlockChan, err := fetcher.SubscribeNewBlockEvent(ctx) diff --git a/core/header_test.go b/core/header_test.go index 6fb51cf3f3..a8a5209f66 100644 --- a/core/header_test.go +++ b/core/header_test.go @@ -23,9 +23,10 @@ func TestMakeExtendedHeaderForEmptyBlock(t *testing.T) { host, port, err := net.SplitHostPort(StartTestNode(t).GRPCClient.Target()) require.NoError(t, err) - blockAPIClient, err := NewRemote(host, port) + client := NewClient(host, port) + require.NoError(t, client.Start()) + fetcher, err := NewBlockFetcher(client) require.NoError(t, err) - fetcher := NewBlockFetcher(blockAPIClient) sub, err := fetcher.SubscribeNewBlockEvent(ctx) require.NoError(t, err) diff --git a/go.mod b/go.mod index 237d1f8259..6df0b02a47 100644 --- a/go.mod +++ b/go.mod @@ -358,5 +358,5 @@ replace ( github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1 // broken goleveldb needs to be replaced for the cosmos-sdk and celestia-app github.com/syndtr/goleveldb => github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 - github.com/tendermint/tendermint => github.com/celestiaorg/celestia-core v1.43.0-tm-v0.34.35.0.20241121101943-d2e0e6e29b52 + github.com/tendermint/tendermint => github.com/celestiaorg/celestia-core v1.43.0-tm-v0.34.35.0.20241122130856-072b5eb2ca43 ) diff --git a/go.sum b/go.sum index 501bae485e..2c2b026736 100644 --- a/go.sum +++ b/go.sum @@ -347,8 +347,8 @@ github.com/celestiaorg/blobstream-contracts/v3 v3.1.0 h1:h1Y4V3EMQ2mFmNtWt2sIhZI github.com/celestiaorg/blobstream-contracts/v3 v3.1.0/go.mod h1:x4DKyfKOSv1ZJM9NwV+Pw01kH2CD7N5zTFclXIVJ6GQ= github.com/celestiaorg/celestia-app/v3 v3.0.0-mocha h1:9tdQDaNgOfU56BueKq8i0Qte4FRmJJzG7woPTm6HHhk= github.com/celestiaorg/celestia-app/v3 v3.0.0-mocha/go.mod h1:K8U6TRHgofz0y5UcvlOL+CuNLbx4jeZrZF7HZdf+Rgs= -github.com/celestiaorg/celestia-core v1.43.0-tm-v0.34.35.0.20241121101943-d2e0e6e29b52 h1:oq2/YZa8EnQGk9dH4ggpFn0dpeevDLIgu/sA08Qonf4= -github.com/celestiaorg/celestia-core v1.43.0-tm-v0.34.35.0.20241121101943-d2e0e6e29b52/go.mod h1:bFr0lAGwaJ0mOHSBmib5/ca5pbBf1yKWGPs93Td0HPw= +github.com/celestiaorg/celestia-core v1.43.0-tm-v0.34.35.0.20241122130856-072b5eb2ca43 h1:ueNQhrkVx/d17j4XoEq5H0JBmTs6KEtmq7ajTfdnNOU= +github.com/celestiaorg/celestia-core v1.43.0-tm-v0.34.35.0.20241122130856-072b5eb2ca43/go.mod h1:bFr0lAGwaJ0mOHSBmib5/ca5pbBf1yKWGPs93Td0HPw= github.com/celestiaorg/cosmos-sdk v1.25.0-sdk-v0.46.16.0.20241009080849-0cd36203b354 h1:3lqz1cEs0wx1PWQQezEaYttAMYMsdxU677Jh58keyyc= github.com/celestiaorg/cosmos-sdk v1.25.0-sdk-v0.46.16.0.20241009080849-0cd36203b354/go.mod h1:xDj0DMkHYv8u4oMOV88QU7g2f9nbHjVYlJ3S/6HeTEs= github.com/celestiaorg/go-fraud v0.2.1 h1:oYhxI0gM/EpGRgbVQdRI/LSlqyT65g/WhQGSVGfx09w= diff --git a/nodebuilder/core/constructors.go b/nodebuilder/core/constructors.go index f46f8ced02..ae598a6029 100644 --- a/nodebuilder/core/constructors.go +++ b/nodebuilder/core/constructors.go @@ -4,6 +4,6 @@ import ( "github.com/celestiaorg/celestia-node/core" ) -func remote(cfg Config) (core.Client, error) { - return core.NewRemote(cfg.IP, cfg.Port) +func remote(cfg Config) *core.Client { + return core.NewClient(cfg.IP, cfg.Port) } diff --git a/nodebuilder/core/module.go b/nodebuilder/core/module.go index 964719e926..61a4e3468c 100644 --- a/nodebuilder/core/module.go +++ b/nodebuilder/core/module.go @@ -76,6 +76,12 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option )), fx.Provide(fx.Annotate( remote, + fx.OnStart(func(_ context.Context, client *core.Client) error { + return client.Start() + }), + fx.OnStop(func(_ context.Context, client *core.Client) error { + return client.Stop() + }), )), ) default: diff --git a/nodebuilder/core/opts.go b/nodebuilder/core/opts.go index 56347a5cb6..6a0bc68d55 100644 --- a/nodebuilder/core/opts.go +++ b/nodebuilder/core/opts.go @@ -8,9 +8,9 @@ import ( "github.com/celestiaorg/celestia-node/libs/fxutil" ) -// WithClient sets custom client for core process -func WithClient(client core.Client) fx.Option { - return fxutil.ReplaceAs(client, new(core.Client)) +// WithClient sets a custom client for core process +func WithClient(client *core.Client) fx.Option { + return fxutil.ReplaceAs(client, new(*core.Client)) } // WithHeaderConstructFn sets custom func that creates extended header diff --git a/nodebuilder/node_bridge_test.go b/nodebuilder/node_bridge_test.go index e45b70ffd6..a4cac93c99 100644 --- a/nodebuilder/node_bridge_test.go +++ b/nodebuilder/node_bridge_test.go @@ -22,9 +22,9 @@ func TestBridge_WithMockedCoreClient(t *testing.T) { host, port, err := net.SplitHostPort(core.StartTestNode(t).GRPCClient.Target()) require.NoError(t, err) - blockAPIClient, err := core.NewRemote(host, port) - require.NoError(t, err) - node, err := New(node.Bridge, p2p.Private, repo, coremodule.WithClient(blockAPIClient)) + client := core.NewClient(host, port) + require.NoError(t, client.Start()) + node, err := New(node.Bridge, p2p.Private, repo, coremodule.WithClient(client)) require.NoError(t, err) require.NotNil(t, node) err = node.Start(ctx) diff --git a/nodebuilder/testing.go b/nodebuilder/testing.go index 0f2e046882..67f82056d5 100644 --- a/nodebuilder/testing.go +++ b/nodebuilder/testing.go @@ -74,7 +74,7 @@ func TestNodeWithConfig(t *testing.T, tp node.Type, cfg *Config, opts ...fx.Opti if tp == node.Bridge { cctx := core.StartTestNode(t) opts = append(opts, - fxutil.ReplaceAs(cctx.Client, new(core.Client)), + fxutil.ReplaceAs(cctx.Client, new(*core.Client)), ) } diff --git a/nodebuilder/tests/swamp/swamp.go b/nodebuilder/tests/swamp/swamp.go index 2fdaf825a4..dd0c7137ce 100644 --- a/nodebuilder/tests/swamp/swamp.go +++ b/nodebuilder/tests/swamp/swamp.go @@ -181,9 +181,10 @@ func (s *Swamp) setupGenesis() { host, port, err := net.SplitHostPort(s.ClientContext.GRPCClient.Target()) require.NoError(s.t, err) - blockAPIClient, err := core.NewRemote(host, port) + client := core.NewClient(host, port) + require.NoError(s.t, client.Start()) + fetcher, err := core.NewBlockFetcher(client) require.NoError(s.t, err) - fetcher := core.NewBlockFetcher(blockAPIClient) ex, err := core.NewExchange( fetcher, @@ -288,12 +289,12 @@ func (s *Swamp) NewNodeWithStore( if err != nil { return nil, err } - blockAPIClient, err := core.NewRemote(host, port) - if err != nil { + client := core.NewClient(host, port) + if err := client.Start(); err != nil { return nil, err } options = append(options, - coremodule.WithClient(blockAPIClient), + coremodule.WithClient(client), ) default: }