Skip to content

Commit

Permalink
enhance msg id:
Browse files Browse the repository at this point in the history
- added config and types
- added md5 func
- added tests
  • Loading branch information
amirylm committed Nov 11, 2023
1 parent f78c8e4 commit 5a6e32f
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 5 deletions.
6 changes: 6 additions & 0 deletions commons/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type PubsubConfig struct {
MsgValidationTimeout time.Duration `json:"msgValidationTimeout,omitempty" yaml:"msgValidationTimeout,omitempty"`
Scoring *ScoringParams `json:"scoring,omitempty" yaml:"scoring,omitempty"`
MsgValidator *MsgValidationConfig `json:"msgValidator,omitempty" yaml:"msgValidator,omitempty"`
MsgIDFnConfig *MsgIDFnConfig `json:"msgIDFn,omitempty" yaml:"msgIDFn,omitempty"`
Trace bool `json:"trace,omitempty" yaml:"trace,omitempty"`
}

Expand All @@ -103,6 +104,11 @@ func (psc PubsubConfig) GetTopicConfig(name string) (TopicConfig, bool) {
return TopicConfig{}, false
}

type MsgIDFnConfig struct {
Type string `json:"type,omitempty" yaml:"type,omitempty"`
Size int `json:"size,omitempty" yaml:"size,omitempty"`
}

type MsgValidationConfig struct {
Timeout time.Duration `json:"timeout,omitempty" yaml:"timeout,omitempty"`
Concurrency int `json:"concurrency,omitempty" yaml:"concurrency,omitempty"`
Expand Down
45 changes: 43 additions & 2 deletions core/gossip/msg_id.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,63 @@
package gossip

import (
"crypto/md5"
"crypto/sha256"
"encoding/hex"

pubsub "github.com/libp2p/go-libp2p-pubsub"
pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
)

// msgIDSha256 uses sha256 hash of the message content
func MsgIDSha256(size int) pubsub.MsgIdFunction {
type MsgIDSize int
type MsgIDFuncType string

const (
MsgIDSha256Type MsgIDFuncType = "sha256"
MsgIDMD5Type MsgIDFuncType = "md5"
)

var DefaultMsgIDFn = MsgIDSha256(20)

func MsgIDFn(tp MsgIDFuncType, size MsgIDSize) pubsub.MsgIdFunction {
switch tp {
case MsgIDSha256Type:
return MsgIDSha256(size)
case MsgIDMD5Type:
return MsgIDMD5(size)
default:
return DefaultMsgIDFn
}
}

// MsgIDSha256 uses sha256 hash of the message content
func MsgIDSha256(size MsgIDSize) pubsub.MsgIdFunction {
return func(pmsg *pubsub_pb.Message) string {
msg := pmsg.GetData()
if len(msg) == 0 {
return ""
}
// TODO: optimize, e.g. by using a pool of hashers
h := sha256.Sum256(msg)
if msgSize := MsgIDSize(len(h)); size > msgSize {
size = msgSize
}
return hex.EncodeToString(h[:size])
}
}

// MsgIDSMD5 uses md5 hash of the message content
func MsgIDMD5(size MsgIDSize) pubsub.MsgIdFunction {
return func(pmsg *pubsub_pb.Message) string {
msg := pmsg.GetData()
if len(msg) == 0 {
return ""
}
// TODO: optimize, e.g. by using a pool of hashers
h := md5.Sum(msg)
if msgSize := MsgIDSize(len(h)); size > msgSize {
size = msgSize
}
return hex.EncodeToString(h[:size])
}
}
57 changes: 57 additions & 0 deletions core/gossip/msg_id_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package gossip

import (
"testing"

pubsub_pb "github.com/libp2p/go-libp2p-pubsub/pb"
"github.com/stretchr/testify/require"
)

func TestMsgID(t *testing.T) {
tests := []struct {
name string
msgID MsgIDFuncType
size MsgIDSize
input []byte
want string
}{
{
name: "sha256",
msgID: MsgIDSha256Type,
size: 20,
input: []byte("hello world"),
want: "b94d27b9934d3e08a52e52d7da7dabfac484efe3",
},
{
name: "md5",
msgID: MsgIDMD5Type,
size: 10,
input: []byte("hello world"),
want: "5eb63bbbe01eeed093cb",
},
{
name: "default",
msgID: "",
size: 0,
input: []byte("hello world"),
want: "b94d27b9934d3e08a52e52d7da7dabfac484efe3",
},
{
name: "size overflow",
msgID: MsgIDMD5Type,
size: 100,
input: []byte("hello world"),
want: "5eb63bbbe01eeed093cb22bb8f5acdc3",
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
msgIDFn := MsgIDFn(tc.msgID, tc.size)
got := msgIDFn(&pubsub_pb.Message{
Data: tc.input,
})
require.Equal(t, tc.want, got)
})
}
}
3 changes: 2 additions & 1 deletion core/pubsub.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ var (
)

func (c *Controller) setupPubsubRouter(ctx context.Context, cfg commons.Config) error {
msgID := gossip.MsgIDFn(gossip.MsgIDFuncType(cfg.Pubsub.MsgIDFnConfig.Type), gossip.MsgIDSize(cfg.Pubsub.MsgIDFnConfig.Size))
opts := []pubsub.Option{
pubsub.WithMessageSigning(false),
pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign),
pubsub.WithGossipSubParams(gossip.GossipSubParams(cfg.Pubsub.Overlay)),
pubsub.WithMessageIdFn(gossip.MsgIDSha256(20)),
pubsub.WithMessageIdFn(msgID),
}

if cfg.Pubsub.Scoring != nil {
Expand Down
4 changes: 2 additions & 2 deletions core/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,11 @@ func SetupTestControllers(ctx context.Context, t *testing.T, n int, routingFn fu
}
msgRouter := NewMsgRouter(1024, 4, func(mw *MsgWrapper[error]) {
routingFn(mw.Msg)
}, gossip.MsgIDSha256(20))
}, gossip.DefaultMsgIDFn)
valRouter := NewMsgRouter(1024, 4, func(mw *MsgWrapper[pubsub.ValidationResult]) {
res := valFn(mw.Peer, mw.Msg)
mw.Result = res
}, gossip.MsgIDSha256(20))
}, gossip.DefaultMsgIDFn)
c, err := NewController(ctx, cfg, msgRouter, valRouter, fmt.Sprintf("peer-%d", i+1))
require.NoError(t, err)
controllers[i] = c
Expand Down

0 comments on commit 5a6e32f

Please sign in to comment.