Skip to content

Commit

Permalink
fix(p2p): set gossipsub buffersize to avoid missed blocks (#975)
Browse files Browse the repository at this point in the history
  • Loading branch information
srene authored Jul 22, 2024
1 parent a9cbe2d commit 0d3be11
Show file tree
Hide file tree
Showing 15 changed files with 60 additions and 60 deletions.
6 changes: 3 additions & 3 deletions block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@ func TestInitialState(t *testing.T) {
// Init p2p client
privKey, _, _ := crypto.GenerateEd25519Key(rand.Reader)
p2pClient, err := p2p.NewClient(config.P2PConfig{
ListenAddress: config.DefaultListenAddress,
GossipedBlocksCacheSize: 50,
BootstrapRetryTime: 30 * time.Second,
ListenAddress: config.DefaultListenAddress,
GossipSubCacheSize: 50,
BootstrapRetryTime: 30 * time.Second,
}, privKey, "TestChain", pubsubServer, logger)
assert.NoError(err)
assert.NotNil(p2pClient)
Expand Down
8 changes: 4 additions & 4 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,10 @@ func fullNodeConfig() config.NodeConfig {
Port: 9090,
},
P2PConfig: config.P2PConfig{
GossipedBlocksCacheSize: 50,
BootstrapRetryTime: 30 * time.Second,
ListenAddress: config.DefaultListenAddress,
BootstrapNodes: "",
GossipSubCacheSize: 50,
BootstrapRetryTime: 30 * time.Second,
ListenAddress: config.DefaultListenAddress,
BootstrapNodes: "",
},
}
}
10 changes: 5 additions & 5 deletions config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ func DefaultConfig(home, chainId string) *NodeConfig {
PrometheusListenAddr: ":2112",
},
P2PConfig: P2PConfig{
GossipedBlocksCacheSize: 50,
BootstrapRetryTime: 30 * time.Second,
ListenAddress: DefaultListenAddress,
BootstrapNodes: "",
AdvertisingEnabled: true,
GossipSubCacheSize: 50,
BootstrapRetryTime: 30 * time.Second,
ListenAddress: DefaultListenAddress,
BootstrapNodes: "",
AdvertisingEnabled: true,
},
DBConfig: DBConfig{
SyncWrites: true,
Expand Down
2 changes: 1 addition & 1 deletion config/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func AddNodeFlags(cmd *cobra.Command) {
cmd.Flags().String(FlagP2PListenAddress, def.P2PConfig.ListenAddress, "P2P listen address")
cmd.Flags().String(FlagP2PBootstrapNodes, def.P2PConfig.BootstrapNodes, "P2P bootstrap nodes")
cmd.Flags().Duration(FlagP2PBootstrapRetryTime, def.P2PConfig.BootstrapRetryTime, "P2P bootstrap time")
cmd.Flags().Uint64(FlagP2PGossipCacheSize, uint64(def.P2PConfig.GossipedBlocksCacheSize), "P2P Gossiped blocks cache size")
cmd.Flags().Uint64(FlagP2PGossipCacheSize, uint64(def.P2PConfig.GossipSubCacheSize), "P2P Gossiped blocks cache size")
}

func BindDymintFlags(cmd *cobra.Command, v *viper.Viper) error {
Expand Down
4 changes: 2 additions & 2 deletions config/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ type P2PConfig struct {
// List of nodes persistent P2P nodes
PersistentNodes string `mapstructure:"p2p_persistent_nodes"`
// Size of the Gossipsub router cache
GossipedBlocksCacheSize int `mapstructure:"p2p_gossiped_blocks_cache_size"`
GossipSubCacheSize int `mapstructure:"p2p_gossip_cache_size"`
// Time interval a node tries to bootstrap again, in case no nodes connected
BootstrapRetryTime time.Duration `mapstructure:"p2p_bootstrap_retry_time"`
// Param used to enable the advertisement of the node to be part of the P2P network in the DHT
Expand All @@ -23,7 +23,7 @@ type P2PConfig struct {

// Validate P2PConfig
func (c P2PConfig) Validate() error {
if c.GossipedBlocksCacheSize < 0 {
if c.GossipSubCacheSize < 0 {
return fmt.Errorf("gossipsub cache size cannot be negative")
}
if c.BootstrapRetryTime <= 0 {
Expand Down
2 changes: 1 addition & 1 deletion config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ p2p_bootstrap_nodes = "{{ .P2PConfig.BootstrapNodes }}"
p2p_persistent_nodes = "{{ .P2PConfig.PersistentNodes }}"
# max number of cached messages by gossipsub protocol
p2p_gossiped_blocks_cache_size = {{ .P2PConfig.GossipedBlocksCacheSize }}
p2p_gossip_cache_size = {{ .P2PConfig.GossipSubCacheSize }}
# time interval to check if no p2p nodes are connected to bootstrap again
p2p_bootstrap_retry_time = "{{ .P2PConfig.BootstrapRetryTime }}"
Expand Down
8 changes: 4 additions & 4 deletions node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,10 @@ func TestMempoolDirectly(t *testing.T) {
RootDir: "",
DBPath: "",
P2PConfig: config.P2PConfig{
ListenAddress: config.DefaultListenAddress,
GossipedBlocksCacheSize: 50,
BootstrapRetryTime: 30 * time.Second,
BootstrapNodes: "",
ListenAddress: config.DefaultListenAddress,
GossipSubCacheSize: 50,
BootstrapRetryTime: 30 * time.Second,
BootstrapNodes: "",
},
RPC: config.RPCConfig{},
MempoolConfig: *tmcfg.DefaultMempoolConfig(),
Expand Down
5 changes: 2 additions & 3 deletions p2p/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,10 +322,9 @@ func (c *Client) tryConnect(ctx context.Context, peer peer.AddrInfo) {
}

func (c *Client) setupGossiping(ctx context.Context) error {
pubsub.GossipSubHistoryGossip = c.conf.GossipedBlocksCacheSize
pubsub.GossipSubHistoryLength = c.conf.GossipedBlocksCacheSize
pubsub.GossipSubHistoryGossip = c.conf.GossipSubCacheSize
pubsub.GossipSubHistoryLength = c.conf.GossipSubCacheSize

// We add WithSeenMessagesTTL (with 1 year time) option to avoid ever requesting already seen blocks
ps, err := pubsub.NewGossipSub(ctx, c.Host)
if err != nil {
return err
Expand Down
10 changes: 5 additions & 5 deletions p2p/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ func TestClientStartup(t *testing.T) {
err := pubsubServer.Start()
require.NoError(t, err)
client, err := p2p.NewClient(config.P2PConfig{
ListenAddress: config.DefaultListenAddress,
GossipedBlocksCacheSize: 50,
BootstrapRetryTime: 30 * time.Second,
ListenAddress: config.DefaultListenAddress,
GossipSubCacheSize: 50,
BootstrapRetryTime: 30 * time.Second,
}, privKey, "TestChain", pubsubServer, log.TestingLogger())
assert := assert.New(t)
assert.NoError(err)
Expand Down Expand Up @@ -179,8 +179,8 @@ func TestSeedStringParsing(t *testing.T) {
require := require.New(t)
logger := &testutil.MockLogger{}
client, err := p2p.NewClient(config.P2PConfig{
GossipedBlocksCacheSize: 50,
BootstrapRetryTime: 30 * time.Second,
GossipSubCacheSize: 50,
BootstrapRetryTime: 30 * time.Second,
}, privKey, "TestNetwork", pubsubServer, logger)
require.NoError(err)
require.NotNil(client)
Expand Down
6 changes: 4 additions & 2 deletions p2p/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ import (
"github.com/dymensionxyz/dymint/types"
)

// buffer size used by gossipSub router to consume received packets (blocks or txs). packets are dropped in case buffer overflows. in case of blocks, it can buffer up to 5 minutes (assuming 200ms block rate)
const pubsubBufferSize = 3000

// GossipMessage represents message gossiped via P2P network (e.g. transaction, Block etc).
type GossipMessage struct {
Data []byte
Expand Down Expand Up @@ -50,8 +53,7 @@ func NewGossiper(host host.Host, ps *pubsub.PubSub, topicStr string, msgHandler
if err != nil {
return nil, err
}

subscription, err := topic.Subscribe()
subscription, err := topic.Subscribe(pubsub.WithBufferSize(max(pubsub.GossipSubHistoryGossip, pubsubBufferSize)))
if err != nil {
return nil, err
}
Expand Down
1 change: 0 additions & 1 deletion rpc/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,6 @@ func (c *Client) Status(ctx context.Context) (*ctypes.ResultStatus, error) {
if validator == nil {
return nil, fmt.Errorf("find proposer %s in the valSet", string(latest.Header.ProposerAddress))
}

state, err := c.node.Store.LoadState()
if err != nil {
return nil, fmt.Errorf("load the last saved state: %w", err)
Expand Down
40 changes: 20 additions & 20 deletions rpc/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,10 @@ func TestGenesisChunked(t *testing.T) {
RootDir: "",
DBPath: "",
P2PConfig: config.P2PConfig{
ListenAddress: config.DefaultListenAddress,
BootstrapNodes: "",
GossipedBlocksCacheSize: 50,
BootstrapRetryTime: 30 * time.Second,
ListenAddress: config.DefaultListenAddress,
BootstrapNodes: "",
GossipSubCacheSize: 50,
BootstrapRetryTime: 30 * time.Second,
},
RPC: config.RPCConfig{},
BlockManagerConfig: config.BlockManagerConfig{
Expand Down Expand Up @@ -704,10 +704,10 @@ func TestValidatorSetHandling(t *testing.T) {
DALayer: "mock",
SettlementLayer: "mock",
P2PConfig: config.P2PConfig{
ListenAddress: config.DefaultListenAddress,
BootstrapNodes: "",
GossipedBlocksCacheSize: 50,
BootstrapRetryTime: 30 * time.Second,
ListenAddress: config.DefaultListenAddress,
BootstrapNodes: "",
GossipSubCacheSize: 50,
BootstrapRetryTime: 30 * time.Second,
},
BlockManagerConfig: config.BlockManagerConfig{
BlockTime: 10 * time.Millisecond,
Expand Down Expand Up @@ -861,10 +861,10 @@ func getRPCInternal(t *testing.T, sequencer bool) (*tmmocks.MockApplication, *cl
RootDir: "",
DBPath: "",
P2PConfig: config.P2PConfig{
ListenAddress: config.DefaultListenAddress,
BootstrapNodes: "",
GossipedBlocksCacheSize: 50,
BootstrapRetryTime: 30 * time.Second,
ListenAddress: config.DefaultListenAddress,
BootstrapNodes: "",
GossipSubCacheSize: 50,
BootstrapRetryTime: 30 * time.Second,
},
RPC: config.RPCConfig{},
MempoolConfig: *tmcfg.DefaultMempoolConfig(),
Expand Down Expand Up @@ -972,10 +972,10 @@ func TestMempool2Nodes(t *testing.T) {
RollappID: rollappID,
},
P2PConfig: config.P2PConfig{
ListenAddress: "/ip4/127.0.0.1/tcp/9001",
BootstrapNodes: "",
GossipedBlocksCacheSize: 50,
BootstrapRetryTime: 30 * time.Second,
ListenAddress: "/ip4/127.0.0.1/tcp/9001",
BootstrapNodes: "",
GossipSubCacheSize: 50,
BootstrapRetryTime: 30 * time.Second,
},
BlockManagerConfig: config.BlockManagerConfig{
BlockTime: 100 * time.Millisecond,
Expand All @@ -1002,10 +1002,10 @@ func TestMempool2Nodes(t *testing.T) {
MaxSupportedBatchSkew: 10,
},
P2PConfig: config.P2PConfig{
ListenAddress: "/ip4/127.0.0.1/tcp/9002",
BootstrapNodes: "/ip4/127.0.0.1/tcp/9001/p2p/" + id1.String(),
BootstrapRetryTime: 30 * time.Second,
GossipedBlocksCacheSize: 50,
ListenAddress: "/ip4/127.0.0.1/tcp/9002",
BootstrapNodes: "/ip4/127.0.0.1/tcp/9001/p2p/" + id1.String(),
BootstrapRetryTime: 30 * time.Second,
GossipSubCacheSize: 50,
},
MempoolConfig: *tmcfg.DefaultMempoolConfig(),
}, key2, signingKey2, proxy.NewLocalClientCreator(app), &tmtypes.GenesisDoc{ChainID: rollappID}, log.TestingLogger(), mempool.NopMetrics())
Expand Down
6 changes: 3 additions & 3 deletions rpc/json/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,9 +313,9 @@ func getRPC(t *testing.T) (*tmmocks.MockApplication, *client.Client) {
RollappID: rollappID,
},
P2PConfig: config.P2PConfig{
ListenAddress: config.DefaultListenAddress,
GossipedBlocksCacheSize: 50,
BootstrapRetryTime: 30 * time.Second,
ListenAddress: config.DefaultListenAddress,
GossipSubCacheSize: 50,
BootstrapRetryTime: 30 * time.Second,
},
}
node, err := node.NewNode(
Expand Down
4 changes: 2 additions & 2 deletions testutil/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ func GetManagerWithProposerKey(conf config.BlockManagerConfig, proposerKey crypt
// Init p2p client and validator
p2pKey, _, _ := crypto.GenerateEd25519Key(rand.Reader)
p2pClient, err := p2p.NewClient(config.P2PConfig{
GossipedBlocksCacheSize: 50,
BootstrapRetryTime: 30 * time.Second,
GossipSubCacheSize: 50,
BootstrapRetryTime: 30 * time.Second,
}, p2pKey, "TestChain", pubsubServer, logger)
if err != nil {
return nil, err
Expand Down
8 changes: 4 additions & 4 deletions testutil/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,10 @@ func StartTestNetwork(ctx context.Context, t *testing.T, n int, conf map[int]Hos
clients := make([]*p2p.Client, n)
for i := 0; i < n; i++ {
client, err := p2p.NewClient(config.P2PConfig{
BootstrapNodes: seeds[i],
GossipedBlocksCacheSize: 50,
BootstrapRetryTime: 30 * time.Second,
ListenAddress: config.DefaultListenAddress,
BootstrapNodes: seeds[i],
GossipSubCacheSize: 50,
BootstrapRetryTime: 30 * time.Second,
ListenAddress: config.DefaultListenAddress,
},
mnet.Hosts()[i].Peerstore().PrivKey(mnet.Hosts()[i].ID()),
conf[i].ChainID,
Expand Down

0 comments on commit 0d3be11

Please sign in to comment.