From f89456b99cf0b2a68727d60703a9fbc0fee1932b Mon Sep 17 00:00:00 2001 From: Keenan Nemetz Date: Fri, 14 Jun 2024 10:13:36 -0700 Subject: [PATCH] handle txn conflict on dag sync --- internal/db/merge_test.go | 5 +++++ net/server.go | 40 +++++++++++++++++++++++++++++++-------- 2 files changed, 37 insertions(+), 8 deletions(-) diff --git a/internal/db/merge_test.go b/internal/db/merge_test.go index 77a6848dbb..3e2bd015a0 100644 --- a/internal/db/merge_test.go +++ b/internal/db/merge_test.go @@ -60,6 +60,7 @@ func TestMerge_SingleBranch_NoError(t *testing.T) { require.NoError(t, err) err = db.executeMerge(ctx, events.DAGMerge{ + DocID: docID.String(), Cid: compInfo2.link.Cid, SchemaRoot: col.SchemaRoot(), }) @@ -104,6 +105,7 @@ func TestMerge_DualBranch_NoError(t *testing.T) { require.NoError(t, err) err = db.executeMerge(ctx, events.DAGMerge{ + DocID: docID.String(), Cid: compInfo2.link.Cid, SchemaRoot: col.SchemaRoot(), }) @@ -113,6 +115,7 @@ func TestMerge_DualBranch_NoError(t *testing.T) { require.NoError(t, err) err = db.executeMerge(ctx, events.DAGMerge{ + DocID: docID.String(), Cid: compInfo3.link.Cid, SchemaRoot: col.SchemaRoot(), }) @@ -160,6 +163,7 @@ func TestMerge_DualBranchWithOneIncomplete_CouldNotFindCID(t *testing.T) { require.NoError(t, err) err = db.executeMerge(ctx, events.DAGMerge{ + DocID: docID.String(), Cid: compInfo2.link.Cid, SchemaRoot: col.SchemaRoot(), }) @@ -178,6 +182,7 @@ func TestMerge_DualBranchWithOneIncomplete_CouldNotFindCID(t *testing.T) { require.NoError(t, err) err = db.executeMerge(ctx, events.DAGMerge{ + DocID: docID.String(), Cid: compInfo3.link.Cid, SchemaRoot: col.SchemaRoot(), }) diff --git a/net/server.go b/net/server.go index a00e0566fd..87753a81a8 100644 --- a/net/server.go +++ b/net/server.go @@ -20,6 +20,7 @@ import ( "github.com/ipfs/boxo/ipld/merkledag" cid "github.com/ipfs/go-cid" + format "github.com/ipfs/go-ipld-format" "github.com/libp2p/go-libp2p/core/event" libpeer "github.com/libp2p/go-libp2p/core/peer" "github.com/sourcenetwork/corelog" @@ -30,14 +31,15 @@ import ( "google.golang.org/protobuf/proto" "github.com/sourcenetwork/defradb/client" + "github.com/sourcenetwork/defradb/datastore/badger/v4" "github.com/sourcenetwork/defradb/errors" "github.com/sourcenetwork/defradb/events" pb "github.com/sourcenetwork/defradb/net/pb" ) -// dagFetchTimeout is the maximum amount of time +// syncDAGTimeout is the maximum amount of time // to wait for a dag to be fetched. -var dagFetchTimeout = 60 * time.Second +var syncDAGTimeout = 60 * time.Second // Server is the request/response instance for all P2P RPC communication. // Implements gRPC server. See net/pb/net.proto for corresponding service definitions. @@ -182,12 +184,7 @@ func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushL } }() - fetchCtx, cancel := context.WithTimeout(ctx, dagFetchTimeout) - defer cancel() - // the merkledag must be fetched every time - // to ensure we have all of the child blocks - dagServ := merkledag.NewDAGService(s.peer.bserv) - err = merkledag.FetchGraph(fetchCtx, headCID, dagServ) + err = s.syncDAG(ctx, headCID) if err != nil { return nil, err } @@ -360,6 +357,33 @@ func (s *server) pubSubEventHandler(from libpeer.ID, topic string, msg []byte) { } } +// syncDAG ensures that the DAG with the given CID is completely synchronized. +func (s *server) syncDAG(ctx context.Context, c cid.Cid) error { + ctx, cancel := context.WithTimeout(ctx, syncDAGTimeout) + defer cancel() + + dserv := merkledag.NewDAGService(s.peer.bserv) + var ng format.NodeGetter = merkledag.NewSession(ctx, dserv) + + set := make(map[cid.Cid]struct{}) + // visit each node only once + visit := func(c cid.Cid) bool { + _, ok := set[c] + set[c] = struct{}{} + return !ok + } + // ignore transaction conflict errors + onError := func(c cid.Cid, err error) error { + if errors.Is(err, badger.ErrTxnConflict) { + return nil + } + return err + } + + opts := []merkledag.WalkOption{merkledag.Concurrent(), merkledag.OnError(onError)} + return merkledag.Walk(ctx, merkledag.GetLinksDirect(ng), c, visit, opts...) +} + // addr implements net.Addr and holds a libp2p peer ID. type addr struct{ id libpeer.ID }