Skip to content

Commit

Permalink
feat: reconnect full node to sequencer with configurable time check (#…
Browse files Browse the repository at this point in the history
…556)

Co-authored-by: alex <[email protected]>
Co-authored-by: Michael Tsitrin <[email protected]>
  • Loading branch information
3 people authored Jan 15, 2024
1 parent 379b93e commit 9129983
Show file tree
Hide file tree
Showing 11 changed files with 69 additions and 20 deletions.
4 changes: 3 additions & 1 deletion block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ func TestInitialState(t *testing.T) {

// Init p2p client
privKey, _, _ := crypto.GenerateEd25519Key(rand.Reader)
p2pClient, err := p2p.NewClient(config.P2PConfig{}, privKey, "TestChain", 50, logger)
p2pClient, err := p2p.NewClient(config.P2PConfig{
GossipCacheSize: 50,
BoostrapTime: 30 * time.Second}, privKey, "TestChain", logger)
assert.NoError(err)
assert.NotNil(p2pClient)

Expand Down
4 changes: 3 additions & 1 deletion block/testutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,9 @@ func getManager(conf config.BlockManagerConfig, settlementlc settlement.LayerI,

// Init p2p client and validator
p2pKey, _, _ := crypto.GenerateEd25519Key(rand.Reader)
p2pClient, err := p2p.NewClient(config.P2PConfig{}, p2pKey, "TestChain", 50, logger)
p2pClient, err := p2p.NewClient(config.P2PConfig{
GossipCacheSize: 50,
BoostrapTime: 30 * time.Second}, p2pKey, "TestChain", logger)
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ type NodeConfig struct {
SettlementLayer string `mapstructure:"settlement_layer"`
SettlementConfig settlement.Config `mapstructure:",squash"`
Instrumentation *InstrumentationConfig `mapstructure:"instrumentation"`
BootstrapTime time.Duration `mapstructure:"bootstrap_time"`
}

// BlockManagerConfig consists of all parameters required by BlockManagerConfig
Expand Down
1 change: 1 addition & 0 deletions config/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ func DefaultConfig(home, chainId string) *NodeConfig {
Prometheus: false,
PrometheusListenAddr: ":2112",
},
BootstrapTime: 30 * time.Second,
}

if home == "" {
Expand Down
8 changes: 6 additions & 2 deletions config/p2p.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
package config

import "time"

// P2PConfig stores configuration related to peer-to-peer networking.
type P2PConfig struct {
ListenAddress string // Address to listen for incoming connections
Seeds string // Comma separated list of seed nodes to connect to
ListenAddress string // Address to listen for incoming connections
Seeds string // Comma separated list of seed nodes to connect to
GossipCacheSize int
BoostrapTime time.Duration
}
3 changes: 3 additions & 0 deletions config/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ block_batch_max_size_bytes = {{ .BlockManagerConfig.BlockBatchMaxSizeBytes }}
# max number of cached messages by gossipsub protocol
gossiped_blocks_cache_size = {{ .BlockManagerConfig.GossipedBlocksCacheSize }}
# time interval to check if no p2p nodes are connected to bootstrap again
bootstrap_time = "{{ .BootstrapTime }}"
#celestia config example:
# da_config = "{\"base_url\": \"http://127.0.0.1:26658\", \"timeout\": 60000000000, \"gas_prices\":0.1, \"gas_adjustment\": 1.3, \"token\":\"TOKEN\"}"
# Avail config example:
Expand Down
5 changes: 4 additions & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,10 @@ func NewNode(ctx context.Context, conf config.NodeConfig, p2pKey crypto.PrivKey,

// Set p2p client and it's validators
p2pValidator := p2p.NewValidator(logger.With("module", "p2p_validator"), pubsubServer)
p2pClient, err := p2p.NewClient(conf.P2P, p2pKey, genesis.ChainID, conf.GossipedBlocksCacheSize, logger.With("module", "p2p"))

conf.P2P.GossipCacheSize = conf.BlockManagerConfig.GossipedBlocksCacheSize
conf.P2P.BoostrapTime = conf.BootstrapTime
p2pClient, err := p2p.NewClient(conf.P2P, p2pKey, genesis.ChainID, logger.With("module", "p2p"))
if err != nil {
return nil, err
}
Expand Down
43 changes: 32 additions & 11 deletions p2p/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,27 +72,24 @@ type Client struct {
cancel context.CancelFunc

logger log.Logger

gossipCacheSize int
}

// NewClient creates new Client object.
//
// Basic checks on parameters are done, and default parameters are provided for unset-configuration
// TODO(tzdybal): consider passing entire config, not just P2P config, to reduce number of arguments
func NewClient(conf config.P2PConfig, privKey crypto.PrivKey, chainID string, gossipCacheSize int, logger log.Logger) (*Client, error) {
func NewClient(conf config.P2PConfig, privKey crypto.PrivKey, chainID string, logger log.Logger) (*Client, error) {
if privKey == nil {
return nil, errNoPrivKey
}
if conf.ListenAddress == "" {
conf.ListenAddress = config.DefaultListenAddress
}
return &Client{
conf: conf,
privKey: privKey,
chainID: chainID,
logger: logger,
gossipCacheSize: gossipCacheSize,
conf: conf,
privKey: privKey,
chainID: chainID,
logger: logger,
}, nil
}

Expand Down Expand Up @@ -266,6 +263,10 @@ func (c *Client) setupDHT(ctx context.Context) error {
return fmt.Errorf("failed to bootstrap DHT: %w", err)
}

if len(seedNodes) > 0 {
go c.bootstrapLoop(ctx)
}

c.host = routedhost.Wrap(c.host, c.dht)

return nil
Expand Down Expand Up @@ -332,9 +333,9 @@ func (c *Client) tryConnect(ctx context.Context, peer peer.AddrInfo) {

func (c *Client) setupGossiping(ctx context.Context) error {

pubsub.GossipSubHistoryGossip = c.gossipCacheSize
pubsub.GossipSubHistoryLength = c.gossipCacheSize
pubsub.GossipSubMaxIHaveMessages = c.gossipCacheSize
pubsub.GossipSubHistoryGossip = c.conf.GossipCacheSize
pubsub.GossipSubHistoryLength = c.conf.GossipCacheSize
pubsub.GossipSubMaxIHaveMessages = c.conf.GossipCacheSize

ps, err := pubsub.NewGossipSub(ctx, c.host)
if err != nil {
Expand Down Expand Up @@ -413,3 +414,23 @@ func (c *Client) NewTxValidator() GossipValidator {
return true
}
}

func (c *Client) bootstrapLoop(ctx context.Context) {
ticker := time.NewTicker(c.conf.BoostrapTime)
defer ticker.Stop()
for {
select {
//Context canceled
case <-ctx.Done():
return
case <-ticker.C:
if len(c.Peers()) == 0 {
err := c.dht.Bootstrap(ctx)
if err != nil {
c.logger.Error("failed to re-bootstrap DHT: %w", err)
}
}

}
}
}
8 changes: 6 additions & 2 deletions p2p/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (

func TestClientStartup(t *testing.T) {
privKey, _, _ := crypto.GenerateEd25519Key(rand.Reader)
client, err := NewClient(config.P2PConfig{}, privKey, "TestChain", 50, log.TestingLogger())
client, err := NewClient(config.P2PConfig{
GossipCacheSize: 50,
BoostrapTime: 30 * time.Second}, privKey, "TestChain", log.TestingLogger())
assert := assert.New(t)
assert.NoError(err)
assert.NotNil(client)
Expand Down Expand Up @@ -165,7 +167,9 @@ func TestSeedStringParsing(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
logger := &test.MockLogger{}
client, err := NewClient(config.P2PConfig{}, privKey, "TestNetwork", 50, logger)
client, err := NewClient(config.P2PConfig{
GossipCacheSize: 50,
BoostrapTime: 30 * time.Second}, privKey, "TestNetwork", logger)
require.NoError(err)
require.NotNil(client)
actual := client.getSeedAddrInfo(c.input)
Expand Down
6 changes: 4 additions & 2 deletions p2p/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net"
"strings"
"testing"
"time"

"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -101,10 +102,11 @@ func startTestNetwork(ctx context.Context, t *testing.T, n int, conf map[int]hos
clients := make([]*Client, n)
for i := 0; i < n; i++ {
client, err := NewClient(config.P2PConfig{
Seeds: seeds[i]},
Seeds: seeds[i],
GossipCacheSize: 50,
BoostrapTime: 30 * time.Second},
mnet.Hosts()[i].Peerstore().PrivKey(mnet.Hosts()[i].ID()),
conf[i].chainID,
50,
logger)
require.NoError(err)
require.NotNil(client)
Expand Down
6 changes: 6 additions & 0 deletions rpc/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ func TestGenesisChunked(t *testing.T) {
BlockBatchMaxSizeBytes: 1000,
GossipedBlocksCacheSize: 50,
},
BootstrapTime: 30 * time.Second,
DALayer: "mock",
DAConfig: "",
SettlementLayer: "mock",
Expand Down Expand Up @@ -441,6 +442,7 @@ func TestTx(t *testing.T) {
BlockBatchMaxSizeBytes: 1000,
GossipedBlocksCacheSize: 50,
},
BootstrapTime: 30 * time.Second,
SettlementConfig: settlement.Config{ProposerPubKey: hex.EncodeToString(pubKeybytes)},
},
key, signingKey, proxy.NewLocalClientCreator(mockApp),
Expand Down Expand Up @@ -709,6 +711,7 @@ func TestValidatorSetHandling(t *testing.T) {
BlockBatchMaxSizeBytes: 1000,
GossipedBlocksCacheSize: 50,
},
BootstrapTime: 30 * time.Second,
SettlementConfig: settlement.Config{ProposerPubKey: hex.EncodeToString(proposerPubKeyBytes)},
}

Expand Down Expand Up @@ -828,6 +831,7 @@ func getRPC(t *testing.T) (*mocks.Application, *Client) {
BlockBatchMaxSizeBytes: 1000,
GossipedBlocksCacheSize: 50,
},
BootstrapTime: 30 * time.Second,
DALayer: "mock",
DAConfig: "",
SettlementLayer: "mock",
Expand Down Expand Up @@ -915,6 +919,7 @@ func TestMempool2Nodes(t *testing.T) {
BlockBatchMaxSizeBytes: 1000,
GossipedBlocksCacheSize: 50,
},
BootstrapTime: 30 * time.Second,
P2P: config.P2PConfig{
ListenAddress: "/ip4/127.0.0.1/tcp/9001",
},
Expand All @@ -933,6 +938,7 @@ func TestMempool2Nodes(t *testing.T) {
BlockBatchMaxSizeBytes: 1000,
GossipedBlocksCacheSize: 50,
},
BootstrapTime: 30 * time.Second,
P2P: config.P2PConfig{
ListenAddress: "/ip4/127.0.0.1/tcp/9002",
Seeds: "/ip4/127.0.0.1/tcp/9001/p2p/" + id1.String(),
Expand Down

0 comments on commit 9129983

Please sign in to comment.