diff --git a/core/node/libp2p/pubsub.go b/core/node/libp2p/pubsub.go index 072d74ee142..2730df14430 100644 --- a/core/node/libp2p/pubsub.go +++ b/core/node/libp2p/pubsub.go @@ -1,26 +1,75 @@ package libp2p import ( + "context" + "errors" + "fmt" + + datastore "github.com/ipfs/go-datastore" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/discovery" "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/peer" "go.uber.org/fx" "github.com/ipfs/kubo/core/node/helpers" + "github.com/ipfs/kubo/repo" ) +type P2PPubSubIn struct { + fx.In + + Repo repo.Repo + Host host.Host + Discovery discovery.Discovery +} + func FloodSub(pubsubOptions ...pubsub.Option) interface{} { - return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, disc discovery.Discovery) (service *pubsub.PubSub, err error) { - return pubsub.NewFloodSub(helpers.LifecycleCtx(mctx, lc), host, append(pubsubOptions, pubsub.WithDiscovery(disc))...) + return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, params P2PPubSubIn) (service *pubsub.PubSub, err error) { + return pubsub.NewFloodSub( + helpers.LifecycleCtx(mctx, lc), + params.Host, + append(pubsubOptions, + pubsub.WithDiscovery(params.Discovery), + pubsub.WithDefaultValidator(pubsub.NewBasicSeqnoValidator(makePubSubMetadataStore(params.Repo.Datastore()))))..., + ) } } func GossipSub(pubsubOptions ...pubsub.Option) interface{} { - return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, host host.Host, disc discovery.Discovery) (service *pubsub.PubSub, err error) { - return pubsub.NewGossipSub(helpers.LifecycleCtx(mctx, lc), host, append( - pubsubOptions, - pubsub.WithDiscovery(disc), - pubsub.WithFloodPublish(true))..., + return func(mctx helpers.MetricsCtx, lc fx.Lifecycle, params P2PPubSubIn) (service *pubsub.PubSub, err error) { + return pubsub.NewGossipSub( + helpers.LifecycleCtx(mctx, lc), + params.Host, + append( + pubsubOptions, + pubsub.WithDiscovery(params.Discovery), + pubsub.WithFloodPublish(true), + pubsub.WithDefaultValidator(pubsub.NewBasicSeqnoValidator(makePubSubMetadataStore(params.Repo.Datastore()))))..., ) } } + +func makePubSubMetadataStore(ds datastore.Datastore) pubsub.PeerMetadataStore { + return &pubsubMetadataStore{ds: ds} +} + +type pubsubMetadataStore struct { + ds datastore.Datastore +} + +func (m *pubsubMetadataStore) Get(ctx context.Context, p peer.ID) ([]byte, error) { + k := datastore.NewKey(fmt.Sprintf("/pubsub/seqno/%s", p)) + + v, err := m.ds.Get(ctx, k) + if err != nil && errors.Is(err, datastore.ErrNotFound) { + return nil, nil + } + + return v, err +} + +func (m *pubsubMetadataStore) Put(ctx context.Context, p peer.ID, v []byte) error { + k := datastore.NewKey(fmt.Sprintf("/pubsub/seqno/%s", p)) + return m.ds.Put(ctx, k, v) +} diff --git a/test/integration/pubsub_msg_seen_cache_test.go b/test/integration/pubsub_msg_seen_cache_test.go index 85cc8ae9f1e..702677ad35a 100644 --- a/test/integration/pubsub_msg_seen_cache_test.go +++ b/test/integration/pubsub_msg_seen_cache_test.go @@ -30,7 +30,7 @@ import ( ) func TestMessageSeenCacheTTL(t *testing.T) { - t.Skip("skipping PubSub seen cache TTL test due to flakiness") + t.Skip("the behaviour of seen cache has changed wrt to timing and this test cannot capture this behaviour now; it also has unit tests in go-libp2p-pubsub.") if err := RunMessageSeenCacheTTLTest(t, "10s"); err != nil { t.Fatal(err) }