From 07e431d399510d9501a9d498ce8e26a0650ae486 Mon Sep 17 00:00:00 2001 From: Keenan Nemetz Date: Mon, 17 Jun 2024 10:40:03 -0700 Subject: [PATCH] fix: Merge retry logic (#2719) ## Relevant issue(s) Resolves #2718 Resolves #2721 ## Description This PR fixes issues with merge retry and DAG sync processes. It also moves the `docQueue` from the `net` package into the `db` package. ## Tasks - [x] I made sure the code is well commented, particularly hard-to-understand areas. - [x] I made sure the repository-held documentation is changed accordingly. - [x] I made sure the pull request title adheres to the conventional commit style (the subset used in the project can be found in [tools/configs/chglog/config.yml](tools/configs/chglog/config.yml)). - [x] I made sure to discuss its limitations such as threats to validity, vulnerability to mistake and misuse, robustness to invalidation of assumptions, resource requirements, ... ## How has this been tested? `make test` Specify the platform(s) on which this was tested: - MacOS --- events/dag_sync.go | 3 +- go.mod | 1 + go.sum | 4 ++ internal/db/merge.go | 124 +++++++++++++++++++++++--------------- internal/db/merge_test.go | 28 +++++++++ net/process.go | 124 -------------------------------------- net/server.go | 76 +++-------------------- net/server_test.go | 25 -------- net/sync_dag.go | 91 ++++++++++++++++++++++++++++ 9 files changed, 209 insertions(+), 267 deletions(-) delete mode 100644 net/process.go create mode 100644 net/sync_dag.go diff --git a/events/dag_sync.go b/events/dag_sync.go index 4ab568b7d0..d6150c9118 100644 --- a/events/dag_sync.go +++ b/events/dag_sync.go @@ -14,7 +14,6 @@ import ( "sync" "github.com/ipfs/go-cid" - "github.com/sourcenetwork/immutable" ) @@ -23,6 +22,8 @@ type DAGMergeChannel = immutable.Option[Channel[DAGMerge]] // DAGMerge is a notification that a merge can be performed up to the provided CID. type DAGMerge struct { + // DocID is the unique identifier for the document being merged. + DocID string // Cid is the id of the composite commit that formed this update in the DAG. Cid cid.Cid // SchemaRoot is the root identifier of the schema that defined the shape of the document that was updated. diff --git a/go.mod b/go.mod index fc838114a6..de665e784f 100644 --- a/go.mod +++ b/go.mod @@ -24,6 +24,7 @@ require ( github.com/ipfs/go-log/v2 v2.5.1 github.com/ipld/go-ipld-prime v0.21.0 github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20240322071758-198d7dba8fb8 + github.com/ipld/go-ipld-prime/storage/bsrvadapter v0.0.0-20240322071758-198d7dba8fb8 github.com/jbenet/goprocess v0.1.4 github.com/lens-vm/lens/host-go v0.0.0-20231127204031-8d858ed2926c github.com/lestrrat-go/jwx/v2 v2.0.21 diff --git a/go.sum b/go.sum index 23410e3394..eade42f4c3 100644 --- a/go.sum +++ b/go.sum @@ -636,6 +636,8 @@ github.com/ipld/go-ipld-prime v0.21.0 h1:n4JmcpOlPDIxBcY037SVfpd1G+Sj1nKZah0m6QH github.com/ipld/go-ipld-prime v0.21.0/go.mod h1:3RLqy//ERg/y5oShXXdx5YIp50cFGOanyMctpPjsvxQ= github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20240322071758-198d7dba8fb8 h1:WQVfplCGOHtFNyZH7eOaEqGsbbje3NP8EFeGggUvEQs= github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20240322071758-198d7dba8fb8/go.mod h1:PVDd/V/Zz9IW+Diz9LEhD+ZYS9pKzawmtVQhVd0hcgQ= +github.com/ipld/go-ipld-prime/storage/bsrvadapter v0.0.0-20240322071758-198d7dba8fb8 h1:adq3fTx2YXmpTPNvBRIM0Zi5lX4JjQTRjdLYKhXMkQg= +github.com/ipld/go-ipld-prime/storage/bsrvadapter v0.0.0-20240322071758-198d7dba8fb8/go.mod h1:ej/GTRX+HjlHMs/M3zg9fM8mUlQXgHqRvPJjtp+atHw= github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus= github.com/jackpal/go-nat-pmp v1.0.2/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc= github.com/jbenet/go-cienv v0.1.0/go.mod h1:TqNnHUmJgXau0nCzC7kXWeotg3J9W34CUv5Djy1+FlA= @@ -1166,6 +1168,8 @@ github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49u github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM= github.com/vito/go-sse v1.0.0 h1:e6/iTrrvy8BRrOwJwmQmlndlil+TLdxXvHi55ZDzH6M= github.com/vito/go-sse v1.0.0/go.mod h1:2wkcaQ+jtlZ94Uve8gYZjFpL68luAjssTINA2hpgcZs= +github.com/warpfork/go-testmark v0.12.1 h1:rMgCpJfwy1sJ50x0M0NgyphxYYPMOODIJHhsXyEHU0s= +github.com/warpfork/go-testmark v0.12.1/go.mod h1:kHwy7wfvGSPh1rQJYKayD4AbtNaeyZdcGi9tNJTaa5Y= github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0 h1:GDDkbFiaK8jsSDJfjId/PEGEShv6ugrt4kYsC5UIDaQ= github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw= github.com/wasmerio/wasmer-go v1.0.4 h1:MnqHoOGfiQ8MMq2RF6wyCeebKOe84G88h5yv+vmxJgs= diff --git a/internal/db/merge.go b/internal/db/merge.go index 323f7ae92c..7618d31309 100644 --- a/internal/db/merge.go +++ b/internal/db/merge.go @@ -13,6 +13,7 @@ package db import ( "container/list" "context" + "sync" "github.com/ipfs/go-cid" "github.com/ipld/go-ipld-prime/linking" @@ -34,6 +35,7 @@ import ( ) func (db *db) handleMerges(ctx context.Context, merges events.Subscription[events.DAGMerge]) { + queue := newMergeQueue() for { select { case <-ctx.Done(): @@ -43,15 +45,33 @@ func (db *db) handleMerges(ctx context.Context, merges events.Subscription[event return } go func() { - err := db.executeMerge(ctx, merge) + // ensure only one merge per docID + queue.add(merge.DocID) + defer queue.done(merge.DocID) + + // retry the merge process if a conflict occurs + // + // conficts occur when a user updates a document + // while a merge is in progress. + var err error + for i := 0; i < db.MaxTxnRetries(); i++ { + err = db.executeMerge(ctx, merge) + if errors.Is(err, badger.ErrTxnConflict) { + continue // retry merge + } + break // merge success or error + } + if err != nil { log.ErrorContextE( ctx, "Failed to execute merge", err, - corelog.String("CID", merge.Cid.String()), - corelog.String("Error", err.Error()), - ) + corelog.Any("Error", err), + corelog.Any("Event", merge)) + } + if merge.Wg != nil { + merge.Wg.Done() } }() } @@ -59,12 +79,6 @@ func (db *db) handleMerges(ctx context.Context, merges events.Subscription[event } func (db *db) executeMerge(ctx context.Context, dagMerge events.DAGMerge) error { - defer func() { - // Notify the caller that the merge is complete. - if dagMerge.Wg != nil { - dagMerge.Wg.Done() - } - }() ctx, txn, err := ensureContextTxn(ctx, db, false) if err != nil { return err @@ -79,7 +93,7 @@ func (db *db) executeMerge(ctx context.Context, dagMerge events.DAGMerge) error ls := cidlink.DefaultLinkSystem() ls.SetReadStorage(txn.DAGstore().AsIPLDStorage()) - docID, err := getDocIDFromBlock(ctx, ls, dagMerge.Cid) + docID, err := client.NewDocIDFromString(dagMerge.DocID) if err != nil { return err } @@ -100,35 +114,57 @@ func (db *db) executeMerge(ctx context.Context, dagMerge events.DAGMerge) error return err } - for retry := 0; retry < db.MaxTxnRetries(); retry++ { - err := mp.mergeComposites(ctx) - if err != nil { - return err - } - err = syncIndexedDoc(ctx, docID, col) - if err != nil { - return err - } - err = txn.Commit(ctx) - if err != nil { - if errors.Is(err, badger.ErrTxnConflict) { - txn, err = db.NewTxn(ctx, false) - if err != nil { - return err - } - ctx = SetContextTxn(ctx, txn) - mp.txn = txn - mp.lsys.SetReadStorage(txn.DAGstore().AsIPLDStorage()) - // Reset the CRDTs to avoid reusing the old transaction. - mp.mCRDTs = make(map[string]merklecrdt.MerkleCRDT) - continue - } - return err - } - break + err = mp.mergeComposites(ctx) + if err != nil { + return err } - return nil + err = syncIndexedDoc(ctx, docID, col) + if err != nil { + return err + } + + return txn.Commit(ctx) +} + +// mergeQueue is synchronization source to ensure that concurrent +// document merges do not cause transaction conflicts. +type mergeQueue struct { + docs map[string]chan struct{} + mutex sync.Mutex +} + +func newMergeQueue() *mergeQueue { + return &mergeQueue{ + docs: make(map[string]chan struct{}), + } +} + +// add adds a docID to the queue. If the docID is already in the queue, it will +// wait for the docID to be removed from the queue. For every add call, done must +// be called to remove the docID from the queue. Otherwise, subsequent add calls will +// block forever. +func (m *mergeQueue) add(docID string) { + m.mutex.Lock() + done, ok := m.docs[docID] + if !ok { + m.docs[docID] = make(chan struct{}) + } + m.mutex.Unlock() + if ok { + <-done + m.add(docID) + } +} + +func (m *mergeQueue) done(docID string) { + m.mutex.Lock() + defer m.mutex.Unlock() + done, ok := m.docs[docID] + if ok { + delete(m.docs, docID) + close(done) + } } type mergeProcessor struct { @@ -333,18 +369,6 @@ func (mp *mergeProcessor) initCRDTForType( return mcrdt, nil } -func getDocIDFromBlock(ctx context.Context, ls linking.LinkSystem, cid cid.Cid) (client.DocID, error) { - nd, err := ls.Load(linking.LinkContext{Ctx: ctx}, cidlink.Link{Cid: cid}, coreblock.SchemaPrototype) - if err != nil { - return client.DocID{}, err - } - block, err := coreblock.GetFromNode(nd) - if err != nil { - return client.DocID{}, err - } - return client.NewDocIDFromString(string(block.Delta.GetDocID())) -} - func getCollectionFromRootSchema(ctx context.Context, db *db, rootSchema string) (*collection, error) { cols, err := db.getCollections( ctx, diff --git a/internal/db/merge_test.go b/internal/db/merge_test.go index b8671a5171..9f0e0b34af 100644 --- a/internal/db/merge_test.go +++ b/internal/db/merge_test.go @@ -13,6 +13,7 @@ package db import ( "context" "testing" + "time" "github.com/fxamacker/cbor/v2" "github.com/ipld/go-ipld-prime" @@ -59,6 +60,7 @@ func TestMerge_SingleBranch_NoError(t *testing.T) { require.NoError(t, err) err = db.executeMerge(ctx, events.DAGMerge{ + DocID: docID.String(), Cid: compInfo2.link.Cid, SchemaRoot: col.SchemaRoot(), }) @@ -103,6 +105,7 @@ func TestMerge_DualBranch_NoError(t *testing.T) { require.NoError(t, err) err = db.executeMerge(ctx, events.DAGMerge{ + DocID: docID.String(), Cid: compInfo2.link.Cid, SchemaRoot: col.SchemaRoot(), }) @@ -112,6 +115,7 @@ func TestMerge_DualBranch_NoError(t *testing.T) { require.NoError(t, err) err = db.executeMerge(ctx, events.DAGMerge{ + DocID: docID.String(), Cid: compInfo3.link.Cid, SchemaRoot: col.SchemaRoot(), }) @@ -159,6 +163,7 @@ func TestMerge_DualBranchWithOneIncomplete_CouldNotFindCID(t *testing.T) { require.NoError(t, err) err = db.executeMerge(ctx, events.DAGMerge{ + DocID: docID.String(), Cid: compInfo2.link.Cid, SchemaRoot: col.SchemaRoot(), }) @@ -177,6 +182,7 @@ func TestMerge_DualBranchWithOneIncomplete_CouldNotFindCID(t *testing.T) { require.NoError(t, err) err = db.executeMerge(ctx, events.DAGMerge{ + DocID: docID.String(), Cid: compInfo3.link.Cid, SchemaRoot: col.SchemaRoot(), }) @@ -292,3 +298,25 @@ func encodeValue(val any) []byte { } return b } + +func TestMergeQueue(t *testing.T) { + q := newMergeQueue() + + testDocID := "test" + + q.add(testDocID) + go q.add(testDocID) + // give time for the goroutine to block + time.Sleep(10 * time.Millisecond) + require.Len(t, q.docs, 1) + q.done(testDocID) + // give time for the goroutine to add the docID + time.Sleep(10 * time.Millisecond) + q.mutex.Lock() + require.Len(t, q.docs, 1) + q.mutex.Unlock() + q.done(testDocID) + q.mutex.Lock() + require.Len(t, q.docs, 0) + q.mutex.Unlock() +} diff --git a/net/process.go b/net/process.go deleted file mode 100644 index b4f85134fb..0000000000 --- a/net/process.go +++ /dev/null @@ -1,124 +0,0 @@ -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package net - -import ( - "context" - "sync" - "time" - - "github.com/ipfs/boxo/blockservice" - "github.com/ipfs/go-cid" - "github.com/ipld/go-ipld-prime/linking" - cidlink "github.com/ipld/go-ipld-prime/linking/cid" - "github.com/sourcenetwork/corelog" - - coreblock "github.com/sourcenetwork/defradb/internal/core/block" -) - -var ( - dagSyncTimeout = time.Second * 60 -) - -type blockProcessor struct { - *Peer - wg *sync.WaitGroup - bsSession *blockservice.Session - queuedChildren *sync.Map -} - -func newBlockProcessor( - ctx context.Context, - p *Peer, -) *blockProcessor { - return &blockProcessor{ - Peer: p, - wg: &sync.WaitGroup{}, - bsSession: blockservice.NewSession(ctx, p.bserv), - queuedChildren: &sync.Map{}, - } -} - -// processRemoteBlock stores the block in the DAG store and initiates a sync of the block's children. -func (bp *blockProcessor) processRemoteBlock( - ctx context.Context, - block *coreblock.Block, -) error { - // Store the block in the DAG store - lsys := cidlink.DefaultLinkSystem() - lsys.SetWriteStorage(bp.db.Blockstore().AsIPLDStorage()) - _, err := lsys.Store(linking.LinkContext{Ctx: ctx}, coreblock.GetLinkPrototype(), block.GenerateNode()) - if err != nil { - return err - } - // Initiate a sync of the block's children - bp.wg.Add(1) - bp.handleChildBlocks(ctx, block) - - return nil -} - -func (bp *blockProcessor) handleChildBlocks( - ctx context.Context, - block *coreblock.Block, -) { - defer bp.wg.Done() - - if len(block.Links) == 0 { - return - } - - links := make([]cid.Cid, 0, len(block.Links)) - for _, link := range block.Links { - exists, err := bp.db.Blockstore().Has(ctx, link.Cid) - if err != nil { - log.ErrorContextE( - ctx, - "Failed to check if block exists", - err, - corelog.Any("CID", link.Cid), - ) - continue - } - if exists { - continue - } - if _, loaded := bp.queuedChildren.LoadOrStore(link.Cid, struct{}{}); !loaded { - links = append(links, link.Cid) - } - } - - getCtx, cancel := context.WithTimeout(ctx, dagSyncTimeout) - defer cancel() - - childBlocks := bp.bsSession.GetBlocks(getCtx, links) - - for rawBlock := range childBlocks { - block, err := coreblock.GetFromBytes(rawBlock.RawData()) - if err != nil { - log.ErrorContextE( - ctx, - "Failed to get block from bytes", - err, - corelog.Any("CID", rawBlock.Cid()), - ) - continue - } - bp.wg.Add(1) - go bp.handleChildBlocks(ctx, block) - } - - for _, link := range links { - bp.queuedChildren.Delete(link) - } -} diff --git a/net/server.go b/net/server.go index 94d791854f..2d0fcdaaa3 100644 --- a/net/server.go +++ b/net/server.go @@ -17,7 +17,7 @@ import ( "fmt" "sync" - "github.com/ipfs/go-cid" + cid "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p/core/event" libpeer "github.com/libp2p/go-libp2p/core/peer" "github.com/sourcenetwork/corelog" @@ -51,11 +51,6 @@ type server struct { pubSubEmitter event.Emitter pushLogEmitter event.Emitter - // docQueue is used to track which documents are currently being processed. - // This is used to prevent multiple concurrent processing of the same document and - // limit unecessary transaction conflicts. - docQueue *docQueue - pb.UnimplementedServiceServer } @@ -73,9 +68,6 @@ func newServer(p *Peer, opts ...grpc.DialOption) (*server, error) { peer: p, conns: make(map[libpeer.ID]*grpc.ClientConn), topics: make(map[string]pubsubTopic), - docQueue: &docQueue{ - docs: make(map[string]chan struct{}), - }, } cred := insecure.NewCredentials() @@ -152,45 +144,13 @@ func (s *server) GetLog(ctx context.Context, req *pb.GetLogRequest) (*pb.GetLogR return nil, nil } -type docQueue struct { - docs map[string]chan struct{} - mu sync.Mutex -} - -// add adds a docID to the queue. If the docID is already in the queue, it will -// wait for the docID to be removed from the queue. For every add call, done must -// be called to remove the docID from the queue. Otherwise, subsequent add calls will -// block forever. -func (dq *docQueue) add(docID string) { - dq.mu.Lock() - done, ok := dq.docs[docID] - if !ok { - dq.docs[docID] = make(chan struct{}) - } - dq.mu.Unlock() - if ok { - <-done - dq.add(docID) - } -} - -func (dq *docQueue) done(docID string) { - dq.mu.Lock() - defer dq.mu.Unlock() - done, ok := dq.docs[docID] - if ok { - delete(dq.docs, docID) - close(done) - } -} - // PushLog receives a push log request func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushLogReply, error) { pid, err := peerIDFromContext(ctx) if err != nil { return nil, err } - cid, err := cid.Cast(req.Body.Cid) + headCID, err := cid.Cast(req.Body.Cid) if err != nil { return nil, err } @@ -198,10 +158,12 @@ func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushL if err != nil { return nil, err } + block, err := coreblock.GetFromBytes(req.Body.Log.Block) + if err != nil { + return nil, err + } - s.docQueue.add(docID.String()) defer func() { - s.docQueue.done(docID.String()) if s.pushLogEmitter != nil { byPeer, err := libpeer.Decode(req.Body.Creator) if err != nil { @@ -219,37 +181,17 @@ func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushL } }() - // check if we already have this block - exists, err := s.peer.db.Blockstore().Has(ctx, cid) - if err != nil { - return nil, NewErrCheckingForExistingBlock(err, cid.String()) - } - if exists { - return &pb.PushLogReply{}, nil - } - - block, err := coreblock.GetFromBytes(req.Body.Log.Block) + err = syncDAG(ctx, s.peer.bserv, block) if err != nil { return nil, err } - bp := newBlockProcessor(ctx, s.peer) - err = bp.processRemoteBlock(ctx, block) - if err != nil { - log.ErrorContextE( - ctx, - "Failed to process remote block", - err, - corelog.String("DocID", docID.String()), - corelog.Any("CID", cid), - ) - } - bp.wg.Wait() if s.peer.db.Events().DAGMerges.HasValue() { wg := &sync.WaitGroup{} wg.Add(1) s.peer.db.Events().DAGMerges.Value().Publish(events.DAGMerge{ - Cid: cid, + DocID: docID.String(), + Cid: headCID, SchemaRoot: string(req.Body.SchemaRoot), Wg: wg, }) diff --git a/net/server_test.go b/net/server_test.go index 47d6a68aa8..d17705b404 100644 --- a/net/server_test.go +++ b/net/server_test.go @@ -13,7 +13,6 @@ package net import ( "context" "testing" - "time" "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore/query" @@ -227,30 +226,6 @@ func TestGetHeadLog(t *testing.T) { require.Nil(t, err) } -func TestDocQueue(t *testing.T) { - q := docQueue{ - docs: make(map[string]chan struct{}), - } - - testDocID := "test" - - q.add(testDocID) - go q.add(testDocID) - // give time for the goroutine to block - time.Sleep(10 * time.Millisecond) - require.Len(t, q.docs, 1) - q.done(testDocID) - // give time for the goroutine to add the docID - time.Sleep(10 * time.Millisecond) - q.mu.Lock() - require.Len(t, q.docs, 1) - q.mu.Unlock() - q.done(testDocID) - q.mu.Lock() - require.Len(t, q.docs, 0) - q.mu.Unlock() -} - func getHead(ctx context.Context, db client.DB, docID client.DocID) (cid.Cid, error) { prefix := core.DataStoreKeyFromDocID(docID).ToHeadStoreKey().WithFieldId(core.COMPOSITE_NAMESPACE).ToString() results, err := db.Headstore().Query(ctx, query.Query{Prefix: prefix}) diff --git a/net/sync_dag.go b/net/sync_dag.go new file mode 100644 index 0000000000..6e9801ebd7 --- /dev/null +++ b/net/sync_dag.go @@ -0,0 +1,91 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package net + +import ( + "context" + "time" + + "github.com/ipfs/boxo/blockservice" + "github.com/ipld/go-ipld-prime" + "github.com/ipld/go-ipld-prime/datamodel" + "github.com/ipld/go-ipld-prime/linking" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/ipld/go-ipld-prime/linking/preload" + basicnode "github.com/ipld/go-ipld-prime/node/basic" + "github.com/ipld/go-ipld-prime/schema" + "github.com/ipld/go-ipld-prime/storage/bsrvadapter" + "github.com/ipld/go-ipld-prime/traversal" + "github.com/ipld/go-ipld-prime/traversal/selector" + "github.com/ipld/go-ipld-prime/traversal/selector/builder" + + coreblock "github.com/sourcenetwork/defradb/internal/core/block" +) + +// syncDAGTimeout is the maximum amount of time +// to wait for a dag to be fetched. +var syncDAGTimeout = 60 * time.Second + +// syncDAG synchronizes the DAG starting with the given block +// using the blockservice to fetch remote blocks. +// +// This process walks the entire DAG until the issue below is resolved. +// https://github.com/sourcenetwork/defradb/issues/2722 +func syncDAG(ctx context.Context, bserv blockservice.BlockService, block *coreblock.Block) error { + ctx, cancel := context.WithTimeout(ctx, syncDAGTimeout) + defer cancel() + + store := &bsrvadapter.Adapter{Wrapped: bserv} + lsys := cidlink.DefaultLinkSystem() + lsys.SetWriteStorage(store) + lsys.SetReadStorage(store) + lsys.TrustedStorage = true + + // Store the block in the DAG store + _, err := lsys.Store(linking.LinkContext{Ctx: ctx}, coreblock.GetLinkPrototype(), block.GenerateNode()) + if err != nil { + return err + } + + ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any) + matchAllSelector, err := ssb.ExploreRecursive(selector.RecursionLimitNone(), ssb.ExploreUnion( + ssb.Matcher(), + ssb.ExploreAll(ssb.ExploreRecursiveEdge()), + )).Selector() + if err != nil { + return err + } + + // prototypeChooser returns the node prototype to use when traversing + prototypeChooser := func(lnk ipld.Link, lnkCtx ipld.LinkContext) (ipld.NodePrototype, error) { + if tlnkNd, ok := lnkCtx.LinkNode.(schema.TypedLinkNode); ok { + return tlnkNd.LinkTargetNodePrototype(), nil + } + return basicnode.Prototype.Any, nil + } + // preloader is used to asynchronously load blocks before traversing + // + // any errors encountered during preload are ignored + preloader := func(pctx preload.PreloadContext, l preload.Link) { + go lsys.Load(linking.LinkContext{Ctx: pctx.Ctx}, l.Link, basicnode.Prototype.Any) //nolint:errcheck + } + config := traversal.Config{ + Ctx: ctx, + LinkSystem: lsys, + LinkVisitOnlyOnce: true, + LinkTargetNodePrototypeChooser: prototypeChooser, + Preloader: preloader, + } + visit := func(p traversal.Progress, n datamodel.Node) error { + return nil + } + return traversal.Progress{Cfg: &config}.WalkMatching(block.GenerateNode(), matchAllSelector, visit) +}