diff --git a/internal/db/merge.go b/internal/db/merge.go index f35a3269dd..890e8e7198 100644 --- a/internal/db/merge.go +++ b/internal/db/merge.go @@ -23,6 +23,7 @@ import ( "github.com/sourcenetwork/defradb/client" "github.com/sourcenetwork/defradb/datastore" + "github.com/sourcenetwork/defradb/datastore/badger/v4" "github.com/sourcenetwork/defradb/errors" "github.com/sourcenetwork/defradb/events" "github.com/sourcenetwork/defradb/internal/core" @@ -48,7 +49,8 @@ func (db *db) handleMerges(ctx context.Context, merges events.Subscription[event ctx, "Failed to execute merge", err, - corelog.String("cid", merge.Cid.String()), + corelog.String("CID", merge.Cid.String()), + corelog.String("Error", err.Error()), ) } }() @@ -68,87 +70,86 @@ func (db *db) executeMerge(ctx context.Context, dagMerge events.DAGMerge) error return err } defer txn.Discard(ctx) - mp, err := db.newMergeProcessor(ctx, dagMerge.Cid, dagMerge.SchemaRoot) - if err != nil { - return err - } - mt, err := mp.getHeads(ctx) + + col, err := getCollectionFromRootSchema(ctx, db, dagMerge.SchemaRoot) if err != nil { return err } - err = mp.getComposites(ctx, dagMerge.Cid, mt) + + ls := cidlink.DefaultLinkSystem() + ls.SetReadStorage(txn.DAGstore().AsIPLDStorage()) + + docID, err := getDocIDFromBlock(ctx, ls, dagMerge.Cid) if err != nil { return err } - err = mp.merge(ctx) + dsKey := base.MakeDataStoreKeyWithCollectionAndDocID(col.Description(), docID.String()) + + mt, err := getHeadsAsMergeTarget(ctx, txn, dsKey) if err != nil { return err } - err = mp.syncIndexedDocs(ctx) + + mp, err := db.newMergeProcessor(txn, ls, col, dsKey) if err != nil { return err } - return txn.Commit(ctx) -} - -type mergeProcessor struct { - ctx context.Context - txn datastore.Txn - ls linking.LinkSystem - docID client.DocID - mCRDTs map[uint32]merklecrdt.MerkleCRDT - col *collection - schemaVersionKey core.CollectionSchemaVersionKey - dsKey core.DataStoreKey - composites *list.List -} -func (db *db) newMergeProcessor(ctx context.Context, cid cid.Cid, rootSchema string) (*mergeProcessor, error) { - txn, ok := TryGetContextTxn(ctx) - if !ok { - return nil, ErrNoTransactionInContext - } - - ls := cidlink.DefaultLinkSystem() - ls.SetReadStorage(txn.DAGstore().AsIPLDStorage()) - nd, err := ls.Load(linking.LinkContext{Ctx: ctx}, cidlink.Link{Cid: cid}, coreblock.SchemaPrototype) + err = mp.loadComposites(ctx, dagMerge.Cid, mt) if err != nil { - return nil, err + return err } - block, err := coreblock.GetFromNode(nd) - if err != nil { - return nil, err + for retry := 0; retry < db.MaxTxnRetries(); retry++ { + err := mp.mergeComposites(ctx) + if err != nil { + return err + } + err = syncIndexedDocs(ctx, docID, col) + if err != nil { + return err + } + err = txn.Commit(ctx) + if err != nil { + if errors.Is(err, badger.ErrTxnConflict) { + txn, err = db.NewTxn(ctx, false) + if err != nil { + return err + } + ctx = SetContextTxn(ctx, txn) + mp.txn = txn + mp.ls.SetReadStorage(txn.DAGstore().AsIPLDStorage()) + continue + } + return err + } + break } - cols, err := db.getCollections( - ctx, - client.CollectionFetchOptions{ - SchemaRoot: immutable.Some(rootSchema), - }, - ) - if err != nil { - return nil, err - } + return nil +} - col := cols[0].(*collection) - docID, err := client.NewDocIDFromString(string(block.Delta.GetDocID())) - if err != nil { - return nil, err - } +type mergeProcessor struct { + txn datastore.Txn + ls linking.LinkSystem + mCRDTs map[string]merklecrdt.MerkleCRDT + col *collection + dsKey core.DataStoreKey + composites *list.List +} +func (db *db) newMergeProcessor( + txn datastore.Txn, + ls linking.LinkSystem, + col *collection, + dsKey core.DataStoreKey, +) (*mergeProcessor, error) { return &mergeProcessor{ - ctx: ctx, - txn: txn, - ls: ls, - docID: docID, - mCRDTs: make(map[uint32]merklecrdt.MerkleCRDT), - col: col, - schemaVersionKey: core.CollectionSchemaVersionKey{ - SchemaVersionID: col.Schema().VersionID, - CollectionID: col.ID(), - }, - dsKey: base.MakeDataStoreKeyWithCollectionAndDocID(col.Description(), docID.String()), + txn: txn, + ls: ls, + mCRDTs: make(map[string]merklecrdt.MerkleCRDT), + col: col, + dsKey: dsKey, composites: list.New(), }, nil } @@ -164,9 +165,13 @@ func newMergeTarget() mergeTarget { } } -// getComposites retrieves the composite blocks for the given document until it reaches a -// block that has already been merged or until we reach the genesis block. -func (mp *mergeProcessor) getComposites(ctx context.Context, blockCid cid.Cid, mt mergeTarget) error { +// loadComposites retrieves and stores into the merge processor the composite blocks for the given +// document until it reaches a block that has already been merged or until we reach the genesis block. +func (mp *mergeProcessor) loadComposites( + ctx context.Context, + blockCid cid.Cid, + mt mergeTarget, +) error { nd, err := mp.ls.Load(linking.LinkContext{Ctx: ctx}, cidlink.Link{Cid: blockCid}, coreblock.SchemaPrototype) if err != nil { return err @@ -186,7 +191,7 @@ func (mp *mergeProcessor) getComposites(ctx context.Context, blockCid cid.Cid, m mp.composites.PushFront(block) for _, link := range block.Links { if link.Name == core.HEAD { - err := mp.getComposites(ctx, link.Cid, mt) + err := mp.loadComposites(ctx, link.Cid, mt) if err != nil { return err } @@ -210,43 +215,12 @@ func (mp *mergeProcessor) getComposites(ctx context.Context, blockCid cid.Cid, m newMT.headHeigth = childBlock.Delta.GetPriority() } } - return mp.getComposites(ctx, blockCid, newMT) + return mp.loadComposites(ctx, blockCid, newMT) } return nil } -// getHeads retrieves the heads of the composite DAG for the given document. -func (mp *mergeProcessor) getHeads(ctx context.Context) (mergeTarget, error) { - headset := clock.NewHeadSet( - mp.txn.Headstore(), - mp.dsKey.WithFieldId(core.COMPOSITE_NAMESPACE).ToHeadStoreKey(), - ) - - cids, _, err := headset.List(ctx) - if err != nil { - return mergeTarget{}, err - } - - mt := newMergeTarget() - for _, cid := range cids { - b, err := mp.txn.DAGstore().Get(ctx, cid) - if err != nil { - return mergeTarget{}, err - } - - block, err := coreblock.GetFromBytes(b.RawData()) - if err != nil { - return mergeTarget{}, err - } - - mt.heads[cid] = block - // All heads have the same height so overwriting is ok. - mt.headHeigth = block.Delta.GetPriority() - } - return mt, nil -} - -func (mp *mergeProcessor) merge(ctx context.Context) error { +func (mp *mergeProcessor) mergeComposites(ctx context.Context) error { for e := mp.composites.Front(); e != nil; e = e.Next() { block := e.Value.(*coreblock.Block) link, err := block.GenerateLink() @@ -288,12 +262,12 @@ func (mp *mergeProcessor) processBlock( continue } - b, err := mp.txn.DAGstore().Get(ctx, link.Cid) + nd, err := mp.ls.Load(linking.LinkContext{Ctx: ctx}, link.Link, coreblock.SchemaPrototype) if err != nil { return err } - childBlock, err := coreblock.GetFromBytes(b.RawData()) + childBlock, err := coreblock.GetFromNode(nd) if err != nil { return err } @@ -309,13 +283,25 @@ func (mp *mergeProcessor) processBlock( func (mp *mergeProcessor) initCRDTForType( field string, ) (merklecrdt.MerkleCRDT, error) { + mcrdt, exists := mp.mCRDTs[field] + if exists { + return mcrdt, nil + } + + schemaVersionKey := core.CollectionSchemaVersionKey{ + SchemaVersionID: mp.col.Schema().VersionID, + CollectionID: mp.col.ID(), + } + if field == "" { - return merklecrdt.NewMerkleCompositeDAG( + mcrdt = merklecrdt.NewMerkleCompositeDAG( mp.txn, - mp.schemaVersionKey, + schemaVersionKey, mp.dsKey.WithFieldId(core.COMPOSITE_NAMESPACE), "", - ), nil + ) + mp.mCRDTs[field] = mcrdt + return mcrdt, nil } fd, ok := mp.col.Definition().GetFieldByName(field) @@ -324,39 +310,109 @@ func (mp *mergeProcessor) initCRDTForType( return nil, nil } - return merklecrdt.InstanceWithStore( + mcrdt, err := merklecrdt.InstanceWithStore( mp.txn, - mp.schemaVersionKey, + schemaVersionKey, fd.Typ, fd.Kind, mp.dsKey.WithFieldId(fd.ID.String()), field, ) + if err != nil { + return nil, err + } + + mp.mCRDTs[field] = mcrdt + return mcrdt, nil +} + +func getDocIDFromBlock(ctx context.Context, ls linking.LinkSystem, cid cid.Cid) (client.DocID, error) { + nd, err := ls.Load(linking.LinkContext{Ctx: ctx}, cidlink.Link{Cid: cid}, coreblock.SchemaPrototype) + if err != nil { + return client.DocID{}, err + } + block, err := coreblock.GetFromNode(nd) + if err != nil { + return client.DocID{}, err + } + return client.NewDocIDFromString(string(block.Delta.GetDocID())) +} + +func getCollectionFromRootSchema(ctx context.Context, db *db, rootSchema string) (*collection, error) { + cols, err := db.getCollections( + ctx, + client.CollectionFetchOptions{ + SchemaRoot: immutable.Some(rootSchema), + }, + ) + if err != nil { + return nil, err + } + if len(cols) == 0 { + return nil, client.NewErrCollectionNotFoundForSchema(rootSchema) + } + // We currently only support one active collection per root schema + // so it is safe to return the first one. + return cols[0].(*collection), nil +} + +// getHeadsAsMergeTarget retrieves the heads of the composite DAG for the given document +// and returns them as a merge target. +func getHeadsAsMergeTarget(ctx context.Context, txn datastore.Txn, dsKey core.DataStoreKey) (mergeTarget, error) { + headset := clock.NewHeadSet( + txn.Headstore(), + dsKey.WithFieldId(core.COMPOSITE_NAMESPACE).ToHeadStoreKey(), + ) + + cids, _, err := headset.List(ctx) + if err != nil { + return mergeTarget{}, err + } + + mt := newMergeTarget() + for _, cid := range cids { + b, err := txn.DAGstore().Get(ctx, cid) + if err != nil { + return mergeTarget{}, err + } + + block, err := coreblock.GetFromBytes(b.RawData()) + if err != nil { + return mergeTarget{}, err + } + + mt.heads[cid] = block + // All heads have the same height so overwriting is ok. + mt.headHeigth = block.Delta.GetPriority() + } + return mt, nil } -func (mp *mergeProcessor) syncIndexedDocs( +func syncIndexedDocs( ctx context.Context, + docID client.DocID, + col *collection, ) error { // remove transaction from old context oldCtx := SetContextTxn(ctx, nil) - oldDoc, err := mp.col.Get(oldCtx, mp.docID, false) + oldDoc, err := col.Get(oldCtx, docID, false) isNewDoc := errors.Is(err, client.ErrDocumentNotFoundOrNotAuthorized) if !isNewDoc && err != nil { return err } - doc, err := mp.col.Get(ctx, mp.docID, false) + doc, err := col.Get(ctx, docID, false) isDeletedDoc := errors.Is(err, client.ErrDocumentNotFoundOrNotAuthorized) if !isDeletedDoc && err != nil { return err } if isDeletedDoc { - return mp.col.deleteIndexedDoc(ctx, oldDoc) + return col.deleteIndexedDoc(ctx, oldDoc) } else if isNewDoc { - return mp.col.indexNewDoc(ctx, doc) + return col.indexNewDoc(ctx, doc) } else { - return mp.col.updateDocIndex(ctx, oldDoc, doc) + return col.updateDocIndex(ctx, oldDoc, doc) } }