Skip to content

Commit

Permalink
handle txn conflict on dag sync
Browse files Browse the repository at this point in the history
  • Loading branch information
nasdf committed Jun 14, 2024
1 parent b4b456b commit f89456b
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 8 deletions.
5 changes: 5 additions & 0 deletions internal/db/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func TestMerge_SingleBranch_NoError(t *testing.T) {
require.NoError(t, err)

err = db.executeMerge(ctx, events.DAGMerge{
DocID: docID.String(),
Cid: compInfo2.link.Cid,
SchemaRoot: col.SchemaRoot(),
})
Expand Down Expand Up @@ -104,6 +105,7 @@ func TestMerge_DualBranch_NoError(t *testing.T) {
require.NoError(t, err)

err = db.executeMerge(ctx, events.DAGMerge{
DocID: docID.String(),
Cid: compInfo2.link.Cid,
SchemaRoot: col.SchemaRoot(),
})
Expand All @@ -113,6 +115,7 @@ func TestMerge_DualBranch_NoError(t *testing.T) {
require.NoError(t, err)

err = db.executeMerge(ctx, events.DAGMerge{
DocID: docID.String(),
Cid: compInfo3.link.Cid,
SchemaRoot: col.SchemaRoot(),
})
Expand Down Expand Up @@ -160,6 +163,7 @@ func TestMerge_DualBranchWithOneIncomplete_CouldNotFindCID(t *testing.T) {
require.NoError(t, err)

err = db.executeMerge(ctx, events.DAGMerge{
DocID: docID.String(),
Cid: compInfo2.link.Cid,
SchemaRoot: col.SchemaRoot(),
})
Expand All @@ -178,6 +182,7 @@ func TestMerge_DualBranchWithOneIncomplete_CouldNotFindCID(t *testing.T) {
require.NoError(t, err)

err = db.executeMerge(ctx, events.DAGMerge{
DocID: docID.String(),
Cid: compInfo3.link.Cid,
SchemaRoot: col.SchemaRoot(),
})
Expand Down
40 changes: 32 additions & 8 deletions net/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

"github.com/ipfs/boxo/ipld/merkledag"
cid "github.com/ipfs/go-cid"
format "github.com/ipfs/go-ipld-format"
"github.com/libp2p/go-libp2p/core/event"
libpeer "github.com/libp2p/go-libp2p/core/peer"
"github.com/sourcenetwork/corelog"
Expand All @@ -30,14 +31,15 @@ import (
"google.golang.org/protobuf/proto"

"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/datastore/badger/v4"
"github.com/sourcenetwork/defradb/errors"
"github.com/sourcenetwork/defradb/events"
pb "github.com/sourcenetwork/defradb/net/pb"
)

// dagFetchTimeout is the maximum amount of time
// syncDAGTimeout is the maximum amount of time
// to wait for a dag to be fetched.
var dagFetchTimeout = 60 * time.Second
var syncDAGTimeout = 60 * time.Second

// Server is the request/response instance for all P2P RPC communication.
// Implements gRPC server. See net/pb/net.proto for corresponding service definitions.
Expand Down Expand Up @@ -182,12 +184,7 @@ func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushL
}
}()

fetchCtx, cancel := context.WithTimeout(ctx, dagFetchTimeout)
defer cancel()
// the merkledag must be fetched every time
// to ensure we have all of the child blocks
dagServ := merkledag.NewDAGService(s.peer.bserv)
err = merkledag.FetchGraph(fetchCtx, headCID, dagServ)
err = s.syncDAG(ctx, headCID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -360,6 +357,33 @@ func (s *server) pubSubEventHandler(from libpeer.ID, topic string, msg []byte) {
}
}

// syncDAG ensures that the DAG with the given CID is completely synchronized.
func (s *server) syncDAG(ctx context.Context, c cid.Cid) error {
ctx, cancel := context.WithTimeout(ctx, syncDAGTimeout)
defer cancel()

dserv := merkledag.NewDAGService(s.peer.bserv)
var ng format.NodeGetter = merkledag.NewSession(ctx, dserv)

set := make(map[cid.Cid]struct{})
// visit each node only once
visit := func(c cid.Cid) bool {
_, ok := set[c]
set[c] = struct{}{}
return !ok
}
// ignore transaction conflict errors
onError := func(c cid.Cid, err error) error {
if errors.Is(err, badger.ErrTxnConflict) {
return nil

Check warning on line 378 in net/server.go

View check run for this annotation

Codecov / codecov/patch

net/server.go#L377-L378

Added lines #L377 - L378 were not covered by tests
}
return err

Check warning on line 380 in net/server.go

View check run for this annotation

Codecov / codecov/patch

net/server.go#L380

Added line #L380 was not covered by tests
}

opts := []merkledag.WalkOption{merkledag.Concurrent(), merkledag.OnError(onError)}
return merkledag.Walk(ctx, merkledag.GetLinksDirect(ng), c, visit, opts...)
}

// addr implements net.Addr and holds a libp2p peer ID.
type addr struct{ id libpeer.ID }

Expand Down

0 comments on commit f89456b

Please sign in to comment.