Skip to content

Commit

Permalink
feat: consensus messages (#1093)
Browse files Browse the repository at this point in the history
Co-authored-by: Faulty Tolly <@faulttolerance.net>
Co-authored-by: keruch <[email protected]>
Co-authored-by: omritoptix <[email protected]>
  • Loading branch information
3 people authored Oct 9, 2024
1 parent e0adc4c commit 51d4178
Show file tree
Hide file tree
Showing 14 changed files with 322 additions and 77 deletions.
9 changes: 9 additions & 0 deletions block/consensus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package block

import (
"github.com/gogo/protobuf/proto"
)

type ConsensusMessagesStream interface {
GetConsensusMessages() ([]proto.Message, error)
}
79 changes: 65 additions & 14 deletions block/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import (
"errors"
"time"

proto2 "github.com/gogo/protobuf/proto"
proto "github.com/gogo/protobuf/types"

abci "github.com/tendermint/tendermint/abci/types"
tmcrypto "github.com/tendermint/tendermint/crypto/encoding"
tmstate "github.com/tendermint/tendermint/proto/tendermint/state"
Expand All @@ -21,11 +24,12 @@ const minBlockMaxBytes = 10000

// Executor creates and applies blocks and maintains state.
type Executor struct {
localAddress []byte
chainID string
proxyAppConsensusConn proxy.AppConnConsensus
proxyAppQueryConn proxy.AppConnQuery
mempool mempool.Mempool
localAddress []byte
chainID string
proxyAppConsensusConn proxy.AppConnConsensus
proxyAppQueryConn proxy.AppConnQuery
mempool mempool.Mempool
consensusMessagesStream ConsensusMessagesStream

eventBus *tmtypes.EventBus

Expand All @@ -34,15 +38,24 @@ type Executor struct {

// NewExecutor creates new instance of BlockExecutor.
// localAddress will be used in sequencer mode only.
func NewExecutor(localAddress []byte, chainID string, mempool mempool.Mempool, proxyApp proxy.AppConns, eventBus *tmtypes.EventBus, logger types.Logger) (*Executor, error) {
func NewExecutor(
localAddress []byte,
chainID string,
mempool mempool.Mempool,
proxyApp proxy.AppConns,
eventBus *tmtypes.EventBus,
consensusMessagesStream ConsensusMessagesStream,
logger types.Logger,
) (*Executor, error) {
be := Executor{
localAddress: localAddress,
chainID: chainID,
proxyAppConsensusConn: proxyApp.Consensus(),
proxyAppQueryConn: proxyApp.Query(),
mempool: mempool,
eventBus: eventBus,
logger: logger,
localAddress: localAddress,
chainID: chainID,
proxyAppConsensusConn: proxyApp.Consensus(),
proxyAppQueryConn: proxyApp.Query(),
mempool: mempool,
eventBus: eventBus,
consensusMessagesStream: consensusMessagesStream,
logger: logger,
}
return &be, nil
}
Expand Down Expand Up @@ -92,10 +105,26 @@ func (e *Executor) InitChain(genesis *tmtypes.GenesisDoc, valset []*tmtypes.Vali
}

// CreateBlock reaps transactions from mempool and builds a block.
func (e *Executor) CreateBlock(height uint64, lastCommit *types.Commit, lastHeaderHash, nextSeqHash [32]byte, state *types.State, maxBlockDataSizeBytes uint64) *types.Block {
func (e *Executor) CreateBlock(
height uint64,
lastCommit *types.Commit,
lastHeaderHash, nextSeqHash [32]byte,
state *types.State,
maxBlockDataSizeBytes uint64,
) *types.Block {
maxBlockDataSizeBytes = min(maxBlockDataSizeBytes, uint64(max(minBlockMaxBytes, state.ConsensusParams.Block.MaxBytes)))
mempoolTxs := e.mempool.ReapMaxBytesMaxGas(int64(maxBlockDataSizeBytes), state.ConsensusParams.Block.MaxGas)

var consensusAnyMessages []*proto.Any
if e.consensusMessagesStream != nil {
consensusMessages, err := e.consensusMessagesStream.GetConsensusMessages()
if err != nil {
e.logger.Error("Failed to get consensus messages", "error", err)
}

consensusAnyMessages = fromProtoMsgSliceToAnySlice(consensusMessages)
}

block := &types.Block{
Header: types.Header{
Version: types.Version{
Expand All @@ -116,6 +145,7 @@ func (e *Executor) CreateBlock(height uint64, lastCommit *types.Commit, lastHead
Txs: toDymintTxs(mempoolTxs),
IntermediateStateRoots: types.IntermediateStateRoots{RawRootsList: nil},
Evidence: types.EvidenceData{Evidence: nil},
ConsensusMessages: consensusAnyMessages,
},
LastCommit: *lastCommit,
}
Expand Down Expand Up @@ -209,6 +239,7 @@ func (e *Executor) ExecuteBlock(state *types.State, block *types.Block) (*tmstat
Votes: nil,
},
ByzantineValidators: nil,
ConsensusMessages: block.Data.ConsensusMessages,
})
if err != nil {
return nil, err
Expand Down Expand Up @@ -284,3 +315,23 @@ func fromDymintTxs(optiTxs types.Txs) tmtypes.Txs {
}
return txs
}

func fromProtoMsgToAny(msg proto2.Message) *proto.Any {
theType, err := proto2.Marshal(msg)
if err != nil {
return nil
}

return &proto.Any{
TypeUrl: proto2.MessageName(msg),
Value: theType,
}
}

func fromProtoMsgSliceToAnySlice(msgs []proto2.Message) []*proto.Any {
result := make([]*proto.Any, len(msgs))
for i, msg := range msgs {
result[i] = fromProtoMsgToAny(msg)
}
return result
}
95 changes: 93 additions & 2 deletions block/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@ import (
"testing"
"time"

"github.com/gogo/protobuf/proto"
prototypes "github.com/gogo/protobuf/types"
"github.com/golang/groupcache/testpb"

"github.com/dymensionxyz/dymint/block"

cryptocodec "github.com/cosmos/cosmos-sdk/crypto/codec"
Expand Down Expand Up @@ -47,7 +51,7 @@ func TestCreateBlock(t *testing.T) {
require.NotNil(abciClient)

mpool := mempoolv1.NewTxMempool(logger, cfg.DefaultMempoolConfig(), proxy.NewAppConnMempool(abciClient), 0)
executor, err := block.NewExecutor([]byte("test address"), "test", mpool, proxy.NewAppConns(clientCreator), nil, logger)
executor, err := block.NewExecutor([]byte("test address"), "test", mpool, proxy.NewAppConns(clientCreator), nil, nil, logger)
assert.NoError(err)

maxBytes := uint64(100)
Expand Down Expand Up @@ -86,6 +90,93 @@ func TestCreateBlock(t *testing.T) {
assert.Len(block.Data.Txs, 2)
}

func TestCreateBlockWithConsensusMessages(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
logger := log.TestingLogger()
app := &tmmocks.MockApplication{}
app.On("CheckTx", mock.Anything).Return(abci.ResponseCheckTx{})
clientCreator := proxy.NewLocalClientCreator(app)
abciClient, err := clientCreator.NewABCIClient()
require.NoError(err)
require.NotNil(clientCreator)
require.NotNil(abciClient)
mpool := mempoolv1.NewTxMempool(logger, cfg.DefaultMempoolConfig(), proxy.NewAppConnMempool(abciClient), 0)

name, city := "test1", ""
theMsg1 := &testpb.TestMessage{
Name: &name,
City: &city,
}

name, city = "test2", ""
theMsg2 := &testpb.TestMessage{
Name: &name,
City: &city,
}

// Create a mock ConsensusMessagesStream
mockStream := &MockConsensusMessagesStream{}
mockStream.On("GetConsensusMessages").Return([]proto.Message{
theMsg1,
theMsg2,
}, nil)

executor, err := block.NewExecutor([]byte("test address"), "test", mpool, proxy.NewAppConns(clientCreator), nil, mockStream, logger)
assert.NoError(err)

maxBytes := uint64(1000)
proposerKey := ed25519.GenPrivKey()
tmPubKey, err := cryptocodec.ToTmPubKeyInterface(proposerKey.PubKey())
require.NoError(err)

state := &types.State{}
state.Sequencers.SetProposer(types.NewSequencerFromValidator(*tmtypes.NewValidator(tmPubKey, 1)))
state.ConsensusParams.Block.MaxBytes = int64(maxBytes)
state.ConsensusParams.Block.MaxGas = 100000

block := executor.CreateBlock(1, &types.Commit{}, [32]byte{}, [32]byte(state.Sequencers.ProposerHash()[:]), state, maxBytes)

require.NotNil(block)
assert.Empty(block.Data.Txs)
assert.Equal(uint64(1), block.Header.Height)
assert.Len(block.Data.ConsensusMessages, 2)

// Verify the content of ConsensusMessages
theType, err := proto.Marshal(theMsg1)
require.NoError(err)

anyMsg1 := &prototypes.Any{
TypeUrl: proto.MessageName(theMsg1),
Value: theType,
}
require.NoError(err)

theType, err = proto.Marshal(theMsg2)
require.NoError(err)

anyMsg2 := &prototypes.Any{
TypeUrl: proto.MessageName(theMsg2),
Value: theType,
}
require.NoError(err)

assert.True(proto.Equal(anyMsg1, block.Data.ConsensusMessages[0]))
assert.True(proto.Equal(anyMsg2, block.Data.ConsensusMessages[1]))

mockStream.AssertExpectations(t)
}

// MockConsensusMessagesStream is a mock implementation of ConsensusMessagesStream
type MockConsensusMessagesStream struct {
mock.Mock
}

func (m *MockConsensusMessagesStream) GetConsensusMessages() ([]proto.Message, error) {
args := m.Called()
return args.Get(0).([]proto.Message), args.Error(1)
}

func TestApplyBlock(t *testing.T) {
assert := assert.New(t)
require := require.New(t)
Expand Down Expand Up @@ -138,7 +229,7 @@ func TestApplyBlock(t *testing.T) {
appConns := &tmmocksproxy.MockAppConns{}
appConns.On("Consensus").Return(abciClient)
appConns.On("Query").Return(abciClient)
executor, err := block.NewExecutor([]byte("test address"), chainID, mpool, appConns, eventBus, logger)
executor, err := block.NewExecutor([]byte("test address"), chainID, mpool, appConns, eventBus, nil, logger)
assert.NoError(err)

// Subscribe to tx events
Expand Down
10 changes: 9 additions & 1 deletion block/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,15 @@ func NewManager(
if err != nil {
return nil, err
}
exec, err := NewExecutor(localAddress, genesis.ChainID, mempool, proxyApp, eventBus, logger)
exec, err := NewExecutor(
localAddress,
genesis.ChainID,
mempool,
proxyApp,
eventBus,
nil, // TODO add ConsensusMessagesStream
logger,
)
if err != nil {
return nil, fmt.Errorf("create block executor: %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion buf.gen.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ plugins:
# The relative output directory.
out: proto/pb
# Any options to provide to the plugin.
opt: plugins=grpc,paths=source_relative
opt: plugins=grpc,Mgoogle/protobuf/any.proto=github.com/gogo/protobuf/types,paths=source_relative

4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ require (
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect
github.com/godbus/dbus/v5 v5.1.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da
github.com/golang/protobuf v1.5.4
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
github.com/google/btree v1.1.2 // indirect
Expand Down Expand Up @@ -301,7 +301,7 @@ replace (
github.com/evmos/evmos/v12 => github.com/dymensionxyz/evmos/v12 v12.1.6-dymension-v0.3
github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.2-alpha.regen.4
github.com/gorilla/rpc => github.com/dymensionxyz/rpc v1.3.1
github.com/tendermint/tendermint => github.com/dymensionxyz/cometbft v0.34.29-0.20240906143736-1e3959c2826e
github.com/tendermint/tendermint => github.com/dymensionxyz/cometbft v0.34.29-0.20241008141942-63af9d24107f
)

replace github.com/osmosis-labs/osmosis/v15 => github.com/dymensionxyz/osmosis/v15 v15.2.0-dymension-v1.1.2
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -457,8 +457,8 @@ github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkp
github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto=
github.com/dvsekhvalnov/jose2go v1.5.0 h1:3j8ya4Z4kMCwT5nXIKFSV84YS+HdqSSO0VsTQxaLAeM=
github.com/dvsekhvalnov/jose2go v1.5.0/go.mod h1:QsHjhyTlD/lAVqn/NSbVZmSCGeDehTB/mPZadG+mhXU=
github.com/dymensionxyz/cometbft v0.34.29-0.20240906143736-1e3959c2826e h1:A5FIvuFPvdxShuf9mSHfDUEL7I/oKVcSr1AtSlmgskA=
github.com/dymensionxyz/cometbft v0.34.29-0.20240906143736-1e3959c2826e/go.mod h1:L9shMfbkZ8B+7JlwANEr+NZbBcn+hBpwdbeYvA5rLCw=
github.com/dymensionxyz/cometbft v0.34.29-0.20241008141942-63af9d24107f h1:CclWJWRydsd8D4/R1IegIkcWtL4wqTA3MWtXrx1a6y4=
github.com/dymensionxyz/cometbft v0.34.29-0.20241008141942-63af9d24107f/go.mod h1:L9shMfbkZ8B+7JlwANEr+NZbBcn+hBpwdbeYvA5rLCw=
github.com/dymensionxyz/cosmosclient v0.4.2-beta.0.20240821081230-b4018b2bac13 h1:u5yeve5jZR6TdRjjR+vYT/8PWKbhwCZxUmAu+/Tnxyg=
github.com/dymensionxyz/cosmosclient v0.4.2-beta.0.20240821081230-b4018b2bac13/go.mod h1:jabDQYXrccscSE0fXkh7eQFYPWJCRiuWKonFGObVq6s=
github.com/dymensionxyz/evmos/v12 v12.1.6-dymension-v0.3 h1:vmAdUGUc4rTIiO3Phezr7vGq+0uPDVKSA4WAe8+yl6w=
Expand Down
2 changes: 1 addition & 1 deletion p2p/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func TestValidator_BlockValidator(t *testing.T) {
require.NotNil(t, clientCreator)
require.NotNil(t, abciClient)
mpool := mempoolv1.NewTxMempool(logger, cfg.DefaultMempoolConfig(), proxy.NewAppConnMempool(abciClient), 0)
executor, err := block.NewExecutor([]byte("test address"), "test", mpool, proxy.NewAppConns(clientCreator), nil, logger)
executor, err := block.NewExecutor([]byte("test address"), "test", mpool, proxy.NewAppConns(clientCreator), nil, nil, logger)
assert.NoError(t, err)

// Create state
Expand Down
2 changes: 2 additions & 0 deletions proto/types/dymint/dymint.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ option go_package = "github.com/dymensionxyz/dymint/types/pb/dymint";
import "types/tendermint/abci/types.proto";
import "types/tendermint/types/types.proto";
import "types/tendermint/types/validator.proto";
import "google/protobuf/any.proto";

// Version captures the consensus rules for processing a block in the blockchain,
// including all blockchain data structures and the rules of the application's
Expand Down Expand Up @@ -74,6 +75,7 @@ message Data {
repeated bytes txs = 1;
repeated bytes intermediate_state_roots = 2;
repeated tendermint.abci.Evidence evidence = 3;
repeated google.protobuf.Any consensus_messages = 4;
}

message Block {
Expand Down
13 changes: 13 additions & 0 deletions proto/types/tendermint/abci/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import "types/tendermint/crypto/keys.proto";
import "types/tendermint/types/params.proto";
import "google/protobuf/timestamp.proto";
import "gogoproto/gogo.proto";
import "google/protobuf/any.proto";

// This file is copied from http://github.com/tendermint/abci
// NOTE: When using custom types, mind the warnings.
Expand Down Expand Up @@ -79,6 +80,8 @@ message RequestBeginBlock {
tendermint.types.Header header = 2 [(gogoproto.nullable) = false];
LastCommitInfo last_commit_info = 3 [(gogoproto.nullable) = false];
repeated Evidence byzantine_validators = 4 [(gogoproto.nullable) = false];

google.protobuf.Any consensus_messages = 5;
}

enum CheckTxType {
Expand Down Expand Up @@ -200,6 +203,16 @@ message ResponseQuery {
message ResponseBeginBlock {
repeated Event events = 1
[(gogoproto.nullable) = false, (gogoproto.jsontag) = "events,omitempty"];

// Defines responses for consensus messages in order.
repeated ConsensusMessageResponse consensus_messages_responses = 2;
}

message ConsensusMessageResponse {
oneof response {
string error = 1; // Error message if execution fails.
google.protobuf.Any ok = 2; // Success response.
}
}

message ResponseCheckTx {
Expand Down
3 changes: 3 additions & 0 deletions proto/types/tendermint/types/types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package tendermint.types;
option go_package = "github.com/tendermint/tendermint/proto/tendermint/types";

import "gogoproto/gogo.proto";
import "google/protobuf/any.proto";
import "google/protobuf/timestamp.proto";
import "types/tendermint/crypto/proof.proto";
import "types/tendermint/version/types.proto";
Expand Down Expand Up @@ -87,6 +88,8 @@ message Data {
// NOTE: not all txs here are valid. We're just agreeing on the order first.
// This means that block.AppHash does not include these txs.
repeated bytes txs = 1;

repeated google.protobuf.Any consensus_messages = 2;
}

// Vote represents a prevote, precommit, or commit vote from validators for
Expand Down
Loading

0 comments on commit 51d4178

Please sign in to comment.