Skip to content

Commit

Permalink
feat(p2p): moving gossiped blocks reception from validator to p2p cli…
Browse files Browse the repository at this point in the history
…ent (#811)

Co-authored-by: Daniel T <[email protected]>
  • Loading branch information
srene and danwt authored May 8, 2024
1 parent 0cd09f3 commit 0215717
Show file tree
Hide file tree
Showing 10 changed files with 74 additions and 45 deletions.
2 changes: 1 addition & 1 deletion block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,10 +241,10 @@ func (m *Manager) onNodeHealthStatus(event pubsub.Message) {
// onNewGossippedBlock will take a block and apply it
func (m *Manager) onNewGossipedBlock(event pubsub.Message) {
m.retrieverMutex.Lock() // needed to protect blockCache access
m.logger.Debug("Received new block via gossip", "n cachedBlocks", len(m.blockCache))
eventData := event.Data().(p2p.GossipedBlock)
block := eventData.Block
commit := eventData.Commit
m.logger.Debug("Received new block via gossip", "height", block.Header.Height, "n cachedBlocks", len(m.blockCache))

nextHeight := m.Store.NextHeight()
if block.Header.Height >= nextHeight {
Expand Down
2 changes: 1 addition & 1 deletion block/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func TestInitialState(t *testing.T) {
p2pClient, err := p2p.NewClient(config.P2PConfig{
GossipCacheSize: 50,
BoostrapTime: 30 * time.Second,
}, privKey, "TestChain", logger)
}, privKey, "TestChain", pubsubServer, logger)
assert.NoError(err)
assert.NotNil(p2pClient)

Expand Down
4 changes: 2 additions & 2 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,11 @@ func NewNode(
mpIDs := nodemempool.NewMempoolIDs()

// Set p2p client and it's validators
p2pValidator := p2p.NewValidator(logger.With("module", "p2p_validator"), pubsubServer, settlementlc)
p2pValidator := p2p.NewValidator(logger.With("module", "p2p_validator"), settlementlc)

conf.P2P.GossipCacheSize = conf.BlockManagerConfig.GossipedBlocksCacheSize
conf.P2P.BoostrapTime = conf.BootstrapTime
p2pClient, err := p2p.NewClient(conf.P2P, p2pKey, genesis.ChainID, logger.With("module", "p2p"))
p2pClient, err := p2p.NewClient(conf.P2P, p2pKey, genesis.ChainID, pubsubServer, logger.With("module", "p2p"))
if err != nil {
return nil, err
}
Expand Down
30 changes: 23 additions & 7 deletions p2p/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
discutil "github.com/libp2p/go-libp2p/p2p/discovery/util"
routedhost "github.com/libp2p/go-libp2p/p2p/host/routed"
"github.com/multiformats/go-multiaddr"
tmpubsub "github.com/tendermint/tendermint/libs/pubsub"
"github.com/tendermint/tendermint/p2p"
"go.uber.org/multierr"

Expand Down Expand Up @@ -65,25 +66,28 @@ type Client struct {
// it's required because of discovery.Advertise call
cancel context.CancelFunc

localPubsubServer *tmpubsub.Server

logger types.Logger
}

// NewClient creates new Client object.
//
// Basic checks on parameters are done, and default parameters are provided for unset-configuration
// TODO(tzdybal): consider passing entire config, not just P2P config, to reduce number of arguments
func NewClient(conf config.P2PConfig, privKey crypto.PrivKey, chainID string, logger types.Logger) (*Client, error) {
func NewClient(conf config.P2PConfig, privKey crypto.PrivKey, chainID string, localPubsubServer *tmpubsub.Server, logger types.Logger) (*Client, error) {
if privKey == nil {
return nil, errNoPrivKey
}
if conf.ListenAddress == "" {
conf.ListenAddress = config.DefaultListenAddress
}
return &Client{
conf: conf,
privKey: privKey,
chainID: chainID,
logger: logger,
conf: conf,
privKey: privKey,
chainID: chainID,
logger: logger,
localPubsubServer: localPubsubServer,
}, nil
}

Expand Down Expand Up @@ -323,13 +327,14 @@ func (c *Client) setupGossiping(ctx context.Context) error {
return err
}

c.txGossiper, err = NewGossiper(c.Host, ps, c.getTxTopic(), c.logger, WithValidator(c.txValidator))
//tx gossiper receives the tx to add to the mempool through validation process, since it is a joint process
c.txGossiper, err = NewGossiper(c.Host, ps, c.getTxTopic(), nil, c.logger, WithValidator(c.txValidator))
if err != nil {
return err
}
go c.txGossiper.ProcessMessages(ctx)

c.blockGossiper, err = NewGossiper(c.Host, ps, c.getBlockTopic(), c.logger,
c.blockGossiper, err = NewGossiper(c.Host, ps, c.getBlockTopic(), c.gossipedBlockReceived, c.logger,
WithValidator(c.blockValidator))
if err != nil {
return err
Expand Down Expand Up @@ -385,6 +390,17 @@ func (c *Client) NewTxValidator() GossipValidator {
}
}

func (c *Client) gossipedBlockReceived(msg *GossipMessage) {
var gossipedBlock GossipedBlock
if err := gossipedBlock.UnmarshalBinary(msg.Data); err != nil {
c.logger.Error("deserialize gossiped block", "error", err)
}
err := c.localPubsubServer.PublishWithEvents(context.Background(), gossipedBlock, map[string][]string{EventTypeKey: {EventNewGossipedBlock}})
if err != nil {
c.logger.Error("publishing event", "err", err)
}
}

func (c *Client) bootstrapLoop(ctx context.Context) {
ticker := time.NewTicker(c.conf.BoostrapTime)
defer ticker.Stop()
Expand Down
12 changes: 10 additions & 2 deletions p2p/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/libs/pubsub"

"github.com/dymensionxyz/dymint/config"
"github.com/dymensionxyz/dymint/p2p"
Expand All @@ -22,10 +23,13 @@ import (

func TestClientStartup(t *testing.T) {
privKey, _, _ := crypto.GenerateEd25519Key(rand.Reader)
pubsubServer := pubsub.NewServer()
err := pubsubServer.Start()
require.NoError(t, err)
client, err := p2p.NewClient(config.P2PConfig{
GossipCacheSize: 50,
BoostrapTime: 30 * time.Second,
}, privKey, "TestChain", log.TestingLogger())
}, privKey, "TestChain", pubsubServer, log.TestingLogger())
assert := assert.New(t)
assert.NoError(err)
assert.NotNil(client)
Expand Down Expand Up @@ -149,6 +153,10 @@ func TestSeedStringParsing(t *testing.T) {
// this one is a valid multiaddr, but can't be converted to PeerID (because there is no ID)
seed3 := "/ip4/127.0.0.1/tcp/12345"

pubsubServer := pubsub.NewServer()
err = pubsubServer.Start()
require.NoError(t, err)

cases := []struct {
name string
input string
Expand All @@ -172,7 +180,7 @@ func TestSeedStringParsing(t *testing.T) {
client, err := p2p.NewClient(config.P2PConfig{
GossipCacheSize: 50,
BoostrapTime: 30 * time.Second,
}, privKey, "TestNetwork", logger)
}, privKey, "TestNetwork", pubsubServer, logger)
require.NoError(err)
require.NotNil(client)
actual := client.GetSeedAddrInfo(c.input)
Expand Down
34 changes: 21 additions & 13 deletions p2p/gossip.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ type GossipMessage struct {
// GossiperOption sets optional parameters of Gossiper.
type GossiperOption func(*Gossiper) error

type GossipMessageHandler func(msg *GossipMessage)

// WithValidator options registers topic validator for Gossiper.
func WithValidator(validator GossipValidator) GossiperOption {
return func(g *Gossiper) error {
Expand All @@ -33,17 +35,17 @@ func WithValidator(validator GossipValidator) GossiperOption {
type Gossiper struct {
ownID peer.ID

ps *pubsub.PubSub
topic *pubsub.Topic
sub *pubsub.Subscription

logger types.Logger
ps *pubsub.PubSub
topic *pubsub.Topic
sub *pubsub.Subscription
msgHandler GossipMessageHandler
logger types.Logger
}

// NewGossiper creates new, ready to use instance of Gossiper.
//
// Returned Gossiper object can be used for sending (Publishing) and receiving messages in topic identified by topicStr.
func NewGossiper(host host.Host, ps *pubsub.PubSub, topicStr string, logger types.Logger, options ...GossiperOption) (*Gossiper, error) {
func NewGossiper(host host.Host, ps *pubsub.PubSub, topicStr string, msgHandler GossipMessageHandler, logger types.Logger, options ...GossiperOption) (*Gossiper, error) {
topic, err := ps.Join(topicStr)
if err != nil {
return nil, err
Expand All @@ -54,11 +56,12 @@ func NewGossiper(host host.Host, ps *pubsub.PubSub, topicStr string, logger type
return nil, err
}
g := &Gossiper{
ownID: host.ID(),
ps: ps,
topic: topic,
sub: subscription,
logger: logger,
ownID: host.ID(),
ps: ps,
topic: topic,
sub: subscription,
logger: logger,
msgHandler: msgHandler,
}

for _, option := range options {
Expand Down Expand Up @@ -89,15 +92,20 @@ func (g *Gossiper) Publish(ctx context.Context, data []byte) error {
// ProcessMessages waits for messages published in the topic and execute handler.
func (g *Gossiper) ProcessMessages(ctx context.Context) {
for {
_, err := g.sub.Next(ctx)
msg, err := g.sub.Next(ctx)
if errors.Is(err, context.Canceled) {
return
}
if err != nil {
g.logger.Error("read message", "error", err)
return
}
// Logic is handled in validator
if g.msgHandler != nil {
g.msgHandler(&GossipMessage{
Data: msg.Data,
From: msg.GetFrom(),
})
}
}
}

Expand Down
21 changes: 6 additions & 15 deletions p2p/validator.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package p2p

import (
"context"
"errors"

"github.com/dymensionxyz/dymint/mempool"
nodemempool "github.com/dymensionxyz/dymint/node/mempool"
"github.com/dymensionxyz/dymint/settlement"
"github.com/dymensionxyz/dymint/types"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/libs/pubsub"
corep2p "github.com/tendermint/tendermint/p2p"
)

Expand All @@ -25,19 +23,17 @@ type IValidator interface {

// Validator is a validator for messages gossiped in the p2p network.
type Validator struct {
logger types.Logger
localPubsubServer *pubsub.Server
slClient settlement.LayerI
logger types.Logger
slClient settlement.LayerI
}

var _ IValidator = (*Validator)(nil)

// NewValidator creates a new Validator.
func NewValidator(logger types.Logger, pusbsubServer *pubsub.Server, slClient settlement.LayerI) *Validator {
func NewValidator(logger types.Logger, slClient settlement.LayerI) *Validator {
return &Validator{
logger: logger,
localPubsubServer: pusbsubServer,
slClient: slClient,
logger: logger,
slClient: slClient,
}
}

Expand Down Expand Up @@ -82,15 +78,10 @@ func (v *Validator) BlockValidator() GossipValidator {
return false
}
if err := gossipedBlock.Validate(v.slClient.GetProposer()); err != nil {
v.logger.Error("Invalid gossiped block", "error", err)
v.logger.Error("Invalid gossiped block.", "height", gossipedBlock.Block.Header.Height, "error", err)
return false
}

err := v.localPubsubServer.PublishWithEvents(context.Background(), gossipedBlock, map[string][]string{EventTypeKey: {EventNewGossipedBlock}})
if err != nil {
v.logger.Error("publishing event", "err", err)
return false
}
return true
}
}
4 changes: 2 additions & 2 deletions p2p/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func TestValidator_TxValidator(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
logger := log.TestingLogger()
validateTx := p2p.NewValidator(logger, nil, nil).TxValidator(tt.args.mp, nodemempool.NewMempoolIDs())
validateTx := p2p.NewValidator(logger, nil).TxValidator(tt.args.mp, nodemempool.NewMempoolIDs())
valid := validateTx(txMsg)
assert.Equalf(t, tt.want, valid, "validateTx() = %v, want %v", valid, tt.want)
})
Expand Down Expand Up @@ -179,7 +179,7 @@ func TestValidator_BlockValidator(t *testing.T) {
}

//Check block validity
validateBlock := p2p.NewValidator(logger, pubsubServer, client).BlockValidator()
validateBlock := p2p.NewValidator(logger, client).BlockValidator()
valid := validateBlock(blockMsg)
require.Equal(t, tt.valid, valid)
})
Expand Down
4 changes: 2 additions & 2 deletions testutil/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,11 @@ func GetManagerWithProposerKey(conf config.BlockManagerConfig, proposerKey crypt
p2pClient, err := p2p.NewClient(config.P2PConfig{
GossipCacheSize: 50,
BoostrapTime: 30 * time.Second,
}, p2pKey, "TestChain", logger)
}, p2pKey, "TestChain", pubsubServer, logger)
if err != nil {
return nil, err
}
p2pValidator := p2p.NewValidator(logger, pubsubServer, settlementlc)
p2pValidator := p2p.NewValidator(logger, settlementlc)
p2pClient.SetTxValidator(p2pValidator.TxValidator(mp, mpIDs))
p2pClient.SetBlockValidator(p2pValidator.BlockValidator())

Expand Down
6 changes: 6 additions & 0 deletions testutil/p2p.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
mocknet "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/libs/pubsub"
"go.uber.org/multierr"

"github.com/dymensionxyz/dymint/config"
Expand Down Expand Up @@ -100,6 +101,10 @@ func StartTestNetwork(ctx context.Context, t *testing.T, n int, conf map[int]Hos
seeds[src] = strings.TrimSuffix(seeds[src], ",")
}

pubsubServer := pubsub.NewServer()
err = pubsubServer.Start()
require.NoError(err)

clients := make([]*p2p.Client, n)
for i := 0; i < n; i++ {
client, err := p2p.NewClient(config.P2PConfig{
Expand All @@ -109,6 +114,7 @@ func StartTestNetwork(ctx context.Context, t *testing.T, n int, conf map[int]Hos
},
mnet.Hosts()[i].Peerstore().PrivKey(mnet.Hosts()[i].ID()),
conf[i].ChainID,
pubsubServer,
logger)
require.NoError(err)
require.NotNil(client)
Expand Down

0 comments on commit 0215717

Please sign in to comment.