diff --git a/block/manager_test.go b/block/manager_test.go index 857f1fe54..6c88fee31 100644 --- a/block/manager_test.go +++ b/block/manager_test.go @@ -53,9 +53,9 @@ func TestInitialState(t *testing.T) { // Init p2p client privKey, _, _ := crypto.GenerateEd25519Key(rand.Reader) p2pClient, err := p2p.NewClient(config.P2PConfig{ - ListenAddress: config.DefaultListenAddress, - GossipedBlocksCacheSize: 50, - BootstrapRetryTime: 30 * time.Second, + ListenAddress: config.DefaultListenAddress, + GossipSubCacheSize: 50, + BootstrapRetryTime: 30 * time.Second, }, privKey, "TestChain", pubsubServer, logger) assert.NoError(err) assert.NotNil(p2pClient) diff --git a/config/config_test.go b/config/config_test.go index f1a78d250..c94098b35 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -220,10 +220,10 @@ func fullNodeConfig() config.NodeConfig { Port: 9090, }, P2PConfig: config.P2PConfig{ - GossipedBlocksCacheSize: 50, - BootstrapRetryTime: 30 * time.Second, - ListenAddress: config.DefaultListenAddress, - BootstrapNodes: "", + GossipSubCacheSize: 50, + BootstrapRetryTime: 30 * time.Second, + ListenAddress: config.DefaultListenAddress, + BootstrapNodes: "", }, } } diff --git a/config/defaults.go b/config/defaults.go index 9a8acb1ff..516f3155d 100644 --- a/config/defaults.go +++ b/config/defaults.go @@ -37,11 +37,11 @@ func DefaultConfig(home, chainId string) *NodeConfig { PrometheusListenAddr: ":2112", }, P2PConfig: P2PConfig{ - GossipedBlocksCacheSize: 50, - BootstrapRetryTime: 30 * time.Second, - ListenAddress: DefaultListenAddress, - BootstrapNodes: "", - AdvertisingEnabled: true, + GossipSubCacheSize: 50, + BootstrapRetryTime: 30 * time.Second, + ListenAddress: DefaultListenAddress, + BootstrapNodes: "", + AdvertisingEnabled: true, }, DBConfig: DBConfig{ SyncWrites: true, diff --git a/config/flags.go b/config/flags.go index 8d6ab1d9a..816563ab3 100644 --- a/config/flags.go +++ b/config/flags.go @@ -62,7 +62,7 @@ func AddNodeFlags(cmd *cobra.Command) { cmd.Flags().String(FlagP2PListenAddress, def.P2PConfig.ListenAddress, "P2P listen address") cmd.Flags().String(FlagP2PBootstrapNodes, def.P2PConfig.BootstrapNodes, "P2P bootstrap nodes") cmd.Flags().Duration(FlagP2PBootstrapRetryTime, def.P2PConfig.BootstrapRetryTime, "P2P bootstrap time") - cmd.Flags().Uint64(FlagP2PGossipCacheSize, uint64(def.P2PConfig.GossipedBlocksCacheSize), "P2P Gossiped blocks cache size") + cmd.Flags().Uint64(FlagP2PGossipCacheSize, uint64(def.P2PConfig.GossipSubCacheSize), "P2P Gossiped blocks cache size") } func BindDymintFlags(cmd *cobra.Command, v *viper.Viper) error { diff --git a/config/p2p.go b/config/p2p.go index 24fcc2443..31ef7bec5 100644 --- a/config/p2p.go +++ b/config/p2p.go @@ -14,7 +14,7 @@ type P2PConfig struct { // List of nodes persistent P2P nodes PersistentNodes string `mapstructure:"p2p_persistent_nodes"` // Size of the Gossipsub router cache - GossipedBlocksCacheSize int `mapstructure:"p2p_gossiped_blocks_cache_size"` + GossipSubCacheSize int `mapstructure:"p2p_gossip_cache_size"` // Time interval a node tries to bootstrap again, in case no nodes connected BootstrapRetryTime time.Duration `mapstructure:"p2p_bootstrap_retry_time"` // Param used to enable the advertisement of the node to be part of the P2P network in the DHT @@ -23,7 +23,7 @@ type P2PConfig struct { // Validate P2PConfig func (c P2PConfig) Validate() error { - if c.GossipedBlocksCacheSize < 0 { + if c.GossipSubCacheSize < 0 { return fmt.Errorf("gossipsub cache size cannot be negative") } if c.BootstrapRetryTime <= 0 { diff --git a/config/toml.go b/config/toml.go index 5d289ae88..43af9946c 100644 --- a/config/toml.go +++ b/config/toml.go @@ -102,7 +102,7 @@ p2p_bootstrap_nodes = "{{ .P2PConfig.BootstrapNodes }}" p2p_persistent_nodes = "{{ .P2PConfig.PersistentNodes }}" # max number of cached messages by gossipsub protocol -p2p_gossiped_blocks_cache_size = {{ .P2PConfig.GossipedBlocksCacheSize }} +p2p_gossip_cache_size = {{ .P2PConfig.GossipSubCacheSize }} # time interval to check if no p2p nodes are connected to bootstrap again p2p_bootstrap_retry_time = "{{ .P2PConfig.BootstrapRetryTime }}" diff --git a/node/node_test.go b/node/node_test.go index 697dc9bc7..448ba7ff6 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -65,10 +65,10 @@ func TestMempoolDirectly(t *testing.T) { RootDir: "", DBPath: "", P2PConfig: config.P2PConfig{ - ListenAddress: config.DefaultListenAddress, - GossipedBlocksCacheSize: 50, - BootstrapRetryTime: 30 * time.Second, - BootstrapNodes: "", + ListenAddress: config.DefaultListenAddress, + GossipSubCacheSize: 50, + BootstrapRetryTime: 30 * time.Second, + BootstrapNodes: "", }, RPC: config.RPCConfig{}, MempoolConfig: *tmcfg.DefaultMempoolConfig(), diff --git a/p2p/client.go b/p2p/client.go index 70e7ee084..f5d26f774 100644 --- a/p2p/client.go +++ b/p2p/client.go @@ -322,10 +322,9 @@ func (c *Client) tryConnect(ctx context.Context, peer peer.AddrInfo) { } func (c *Client) setupGossiping(ctx context.Context) error { - pubsub.GossipSubHistoryGossip = c.conf.GossipedBlocksCacheSize - pubsub.GossipSubHistoryLength = c.conf.GossipedBlocksCacheSize + pubsub.GossipSubHistoryGossip = c.conf.GossipSubCacheSize + pubsub.GossipSubHistoryLength = c.conf.GossipSubCacheSize - // We add WithSeenMessagesTTL (with 1 year time) option to avoid ever requesting already seen blocks ps, err := pubsub.NewGossipSub(ctx, c.Host) if err != nil { return err diff --git a/p2p/client_test.go b/p2p/client_test.go index c519996c5..ddf1c0944 100644 --- a/p2p/client_test.go +++ b/p2p/client_test.go @@ -27,9 +27,9 @@ func TestClientStartup(t *testing.T) { err := pubsubServer.Start() require.NoError(t, err) client, err := p2p.NewClient(config.P2PConfig{ - ListenAddress: config.DefaultListenAddress, - GossipedBlocksCacheSize: 50, - BootstrapRetryTime: 30 * time.Second, + ListenAddress: config.DefaultListenAddress, + GossipSubCacheSize: 50, + BootstrapRetryTime: 30 * time.Second, }, privKey, "TestChain", pubsubServer, log.TestingLogger()) assert := assert.New(t) assert.NoError(err) @@ -179,8 +179,8 @@ func TestSeedStringParsing(t *testing.T) { require := require.New(t) logger := &testutil.MockLogger{} client, err := p2p.NewClient(config.P2PConfig{ - GossipedBlocksCacheSize: 50, - BootstrapRetryTime: 30 * time.Second, + GossipSubCacheSize: 50, + BootstrapRetryTime: 30 * time.Second, }, privKey, "TestNetwork", pubsubServer, logger) require.NoError(err) require.NotNil(client) diff --git a/p2p/gossip.go b/p2p/gossip.go index d4e19b479..21ea9ad6c 100644 --- a/p2p/gossip.go +++ b/p2p/gossip.go @@ -13,6 +13,9 @@ import ( "github.com/dymensionxyz/dymint/types" ) +// buffer size used by gossipSub router to consume received packets (blocks or txs). packets are dropped in case buffer overflows. in case of blocks, it can buffer up to 5 minutes (assuming 200ms block rate) +const pubsubBufferSize = 3000 + // GossipMessage represents message gossiped via P2P network (e.g. transaction, Block etc). type GossipMessage struct { Data []byte @@ -50,8 +53,7 @@ func NewGossiper(host host.Host, ps *pubsub.PubSub, topicStr string, msgHandler if err != nil { return nil, err } - - subscription, err := topic.Subscribe() + subscription, err := topic.Subscribe(pubsub.WithBufferSize(max(pubsub.GossipSubHistoryGossip, pubsubBufferSize))) if err != nil { return nil, err } diff --git a/rpc/client/client.go b/rpc/client/client.go index 5ea1e0264..9a6473aca 100644 --- a/rpc/client/client.go +++ b/rpc/client/client.go @@ -718,7 +718,6 @@ func (c *Client) Status(ctx context.Context) (*ctypes.ResultStatus, error) { if validator == nil { return nil, fmt.Errorf("find proposer %s in the valSet", string(latest.Header.ProposerAddress)) } - state, err := c.node.Store.LoadState() if err != nil { return nil, fmt.Errorf("load the last saved state: %w", err) diff --git a/rpc/client/client_test.go b/rpc/client/client_test.go index 99e004970..8a4d72c6e 100644 --- a/rpc/client/client_test.go +++ b/rpc/client/client_test.go @@ -103,10 +103,10 @@ func TestGenesisChunked(t *testing.T) { RootDir: "", DBPath: "", P2PConfig: config.P2PConfig{ - ListenAddress: config.DefaultListenAddress, - BootstrapNodes: "", - GossipedBlocksCacheSize: 50, - BootstrapRetryTime: 30 * time.Second, + ListenAddress: config.DefaultListenAddress, + BootstrapNodes: "", + GossipSubCacheSize: 50, + BootstrapRetryTime: 30 * time.Second, }, RPC: config.RPCConfig{}, BlockManagerConfig: config.BlockManagerConfig{ @@ -704,10 +704,10 @@ func TestValidatorSetHandling(t *testing.T) { DALayer: "mock", SettlementLayer: "mock", P2PConfig: config.P2PConfig{ - ListenAddress: config.DefaultListenAddress, - BootstrapNodes: "", - GossipedBlocksCacheSize: 50, - BootstrapRetryTime: 30 * time.Second, + ListenAddress: config.DefaultListenAddress, + BootstrapNodes: "", + GossipSubCacheSize: 50, + BootstrapRetryTime: 30 * time.Second, }, BlockManagerConfig: config.BlockManagerConfig{ BlockTime: 10 * time.Millisecond, @@ -861,10 +861,10 @@ func getRPCInternal(t *testing.T, sequencer bool) (*tmmocks.MockApplication, *cl RootDir: "", DBPath: "", P2PConfig: config.P2PConfig{ - ListenAddress: config.DefaultListenAddress, - BootstrapNodes: "", - GossipedBlocksCacheSize: 50, - BootstrapRetryTime: 30 * time.Second, + ListenAddress: config.DefaultListenAddress, + BootstrapNodes: "", + GossipSubCacheSize: 50, + BootstrapRetryTime: 30 * time.Second, }, RPC: config.RPCConfig{}, MempoolConfig: *tmcfg.DefaultMempoolConfig(), @@ -972,10 +972,10 @@ func TestMempool2Nodes(t *testing.T) { RollappID: rollappID, }, P2PConfig: config.P2PConfig{ - ListenAddress: "/ip4/127.0.0.1/tcp/9001", - BootstrapNodes: "", - GossipedBlocksCacheSize: 50, - BootstrapRetryTime: 30 * time.Second, + ListenAddress: "/ip4/127.0.0.1/tcp/9001", + BootstrapNodes: "", + GossipSubCacheSize: 50, + BootstrapRetryTime: 30 * time.Second, }, BlockManagerConfig: config.BlockManagerConfig{ BlockTime: 100 * time.Millisecond, @@ -1002,10 +1002,10 @@ func TestMempool2Nodes(t *testing.T) { MaxSupportedBatchSkew: 10, }, P2PConfig: config.P2PConfig{ - ListenAddress: "/ip4/127.0.0.1/tcp/9002", - BootstrapNodes: "/ip4/127.0.0.1/tcp/9001/p2p/" + id1.String(), - BootstrapRetryTime: 30 * time.Second, - GossipedBlocksCacheSize: 50, + ListenAddress: "/ip4/127.0.0.1/tcp/9002", + BootstrapNodes: "/ip4/127.0.0.1/tcp/9001/p2p/" + id1.String(), + BootstrapRetryTime: 30 * time.Second, + GossipSubCacheSize: 50, }, MempoolConfig: *tmcfg.DefaultMempoolConfig(), }, key2, signingKey2, proxy.NewLocalClientCreator(app), &tmtypes.GenesisDoc{ChainID: rollappID}, log.TestingLogger(), mempool.NopMetrics()) diff --git a/rpc/json/service_test.go b/rpc/json/service_test.go index c79f24530..88bbe3a8d 100644 --- a/rpc/json/service_test.go +++ b/rpc/json/service_test.go @@ -313,9 +313,9 @@ func getRPC(t *testing.T) (*tmmocks.MockApplication, *client.Client) { RollappID: rollappID, }, P2PConfig: config.P2PConfig{ - ListenAddress: config.DefaultListenAddress, - GossipedBlocksCacheSize: 50, - BootstrapRetryTime: 30 * time.Second, + ListenAddress: config.DefaultListenAddress, + GossipSubCacheSize: 50, + BootstrapRetryTime: 30 * time.Second, }, } node, err := node.NewNode( diff --git a/testutil/block.go b/testutil/block.go index 3280e6d52..43550e09c 100644 --- a/testutil/block.go +++ b/testutil/block.go @@ -91,8 +91,8 @@ func GetManagerWithProposerKey(conf config.BlockManagerConfig, proposerKey crypt // Init p2p client and validator p2pKey, _, _ := crypto.GenerateEd25519Key(rand.Reader) p2pClient, err := p2p.NewClient(config.P2PConfig{ - GossipedBlocksCacheSize: 50, - BootstrapRetryTime: 30 * time.Second, + GossipSubCacheSize: 50, + BootstrapRetryTime: 30 * time.Second, }, p2pKey, "TestChain", pubsubServer, logger) if err != nil { return nil, err diff --git a/testutil/p2p.go b/testutil/p2p.go index 713a7aada..32ee0e6a1 100644 --- a/testutil/p2p.go +++ b/testutil/p2p.go @@ -108,10 +108,10 @@ func StartTestNetwork(ctx context.Context, t *testing.T, n int, conf map[int]Hos clients := make([]*p2p.Client, n) for i := 0; i < n; i++ { client, err := p2p.NewClient(config.P2PConfig{ - BootstrapNodes: seeds[i], - GossipedBlocksCacheSize: 50, - BootstrapRetryTime: 30 * time.Second, - ListenAddress: config.DefaultListenAddress, + BootstrapNodes: seeds[i], + GossipSubCacheSize: 50, + BootstrapRetryTime: 30 * time.Second, + ListenAddress: config.DefaultListenAddress, }, mnet.Hosts()[i].Peerstore().PrivKey(mnet.Hosts()[i].ID()), conf[i].ChainID,