Skip to content

Commit

Permalink
WIP - Handle collection commits over P2P
Browse files Browse the repository at this point in the history
  • Loading branch information
AndrewSisley committed Nov 13, 2024
1 parent dae2820 commit c4e24c8
Show file tree
Hide file tree
Showing 7 changed files with 307 additions and 118 deletions.
161 changes: 91 additions & 70 deletions internal/db/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,37 +28,43 @@ import (
"github.com/sourcenetwork/defradb/event"
"github.com/sourcenetwork/defradb/internal/core"
coreblock "github.com/sourcenetwork/defradb/internal/core/block"
"github.com/sourcenetwork/defradb/internal/core/crdt"
"github.com/sourcenetwork/defradb/internal/db/base"
"github.com/sourcenetwork/defradb/internal/encryption"
"github.com/sourcenetwork/defradb/internal/keys"
"github.com/sourcenetwork/defradb/internal/merkle/clock"
merklecrdt "github.com/sourcenetwork/defradb/internal/merkle/crdt"
)

func (db *db) executeMerge(ctx context.Context, dagMerge event.Merge) error {
func (db *db) executeMerge(ctx context.Context, col *collection, dagMerge event.Merge) error {
ctx, txn, err := ensureContextTxn(ctx, db, false)
if err != nil {
return err
}
defer txn.Discard(ctx)

col, err := getCollectionFromRootSchema(ctx, db, dagMerge.SchemaRoot)
if err != nil {
return err
}

docID, err := client.NewDocIDFromString(dagMerge.DocID)
if err != nil {
return err
}
dsKey := base.MakeDataStoreKeyWithCollectionAndDocID(col.Description(), docID.String())
var mt mergeTarget
if dagMerge.DocID != "" {
_, err := client.NewDocIDFromString(dagMerge.DocID)
if err != nil {
return err
}

mp, err := db.newMergeProcessor(txn, col, dsKey)
if err != nil {
return err
mt, err = getHeadsAsMergeTarget(ctx, txn, keys.HeadstoreDocKey{
DocID: dagMerge.DocID,
FieldID: core.COMPOSITE_NAMESPACE,
})
if err != nil {
return err
}
} else {
mt, err = getHeadsAsMergeTarget(ctx, txn, keys.NewHeadstoreColKey(col.Description().RootID))
if err != nil {
return err
}
}

mt, err := getHeadsAsMergeTarget(ctx, txn, dsKey.WithFieldID(core.COMPOSITE_NAMESPACE))
mp, err := db.newMergeProcessor(txn, col)
if err != nil {
return err
}
Expand All @@ -73,9 +79,15 @@ func (db *db) executeMerge(ctx context.Context, dagMerge event.Merge) error {
return err
}

err = syncIndexedDoc(ctx, docID, col)
if err != nil {
return err
for docID := range mp.docIDs {
docID, err := client.NewDocIDFromString(docID)
if err != nil {
return err
}
err = syncIndexedDoc(ctx, docID, col)
if err != nil {
return err
}
}

err = txn.Commit(ctx)
Expand All @@ -94,39 +106,39 @@ func (db *db) executeMerge(ctx context.Context, dagMerge event.Merge) error {
// mergeQueue is synchronization source to ensure that concurrent
// document merges do not cause transaction conflicts.
type mergeQueue struct {
docs map[string]chan struct{}
keys map[string]chan struct{}
mutex sync.Mutex
}

func newMergeQueue() *mergeQueue {
return &mergeQueue{
docs: make(map[string]chan struct{}),
keys: make(map[string]chan struct{}),
}
}

// add adds a docID to the queue. If the docID is already in the queue, it will
// wait for the docID to be removed from the queue. For every add call, done must
// be called to remove the docID from the queue. Otherwise, subsequent add calls will
// add adds a key to the queue. If the key is already in the queue, it will
// wait for the key to be removed from the queue. For every add call, done must
// be called to remove the key from the queue. Otherwise, subsequent add calls will
// block forever.
func (m *mergeQueue) add(docID string) {
func (m *mergeQueue) add(key string) {
m.mutex.Lock()
done, ok := m.docs[docID]
done, ok := m.keys[key]
if !ok {
m.docs[docID] = make(chan struct{})
m.keys[key] = make(chan struct{})
}
m.mutex.Unlock()
if ok {
<-done
m.add(docID)
m.add(key)
}
}

func (m *mergeQueue) done(docID string) {
func (m *mergeQueue) done(key string) {
m.mutex.Lock()
defer m.mutex.Unlock()
done, ok := m.docs[docID]
done, ok := m.keys[key]
if ok {
delete(m.docs, docID)
delete(m.keys, key)
close(done)
}
}
Expand All @@ -135,9 +147,11 @@ type mergeProcessor struct {
txn datastore.Txn
blockLS linking.LinkSystem
encBlockLS linking.LinkSystem
mCRDTs map[string]merklecrdt.MerkleCRDT
col *collection
dsKey keys.DataStoreKey

// docIDs contains all docIDs that have been merged so far by the mergeProcessor
docIDs map[string]struct{}

// composites is a list of composites that need to be merged.
composites *list.List
// missingEncryptionBlocks is a list of blocks that we failed to fetch
Expand All @@ -149,7 +163,6 @@ type mergeProcessor struct {
func (db *db) newMergeProcessor(
txn datastore.Txn,
col *collection,
dsKey keys.DataStoreKey,
) (*mergeProcessor, error) {
blockLS := cidlink.DefaultLinkSystem()
blockLS.SetReadStorage(txn.Blockstore().AsIPLDStorage())
Expand All @@ -161,9 +174,8 @@ func (db *db) newMergeProcessor(
txn: txn,
blockLS: blockLS,
encBlockLS: encBlockLS,
mCRDTs: make(map[string]merklecrdt.MerkleCRDT),
col: col,
dsKey: dsKey,
docIDs: make(map[string]struct{}),
composites: list.New(),
missingEncryptionBlocks: make(map[cidlink.Link]struct{}),
availableEncryptionBlocks: make(map[cidlink.Link]*coreblock.Encryption),
Expand Down Expand Up @@ -375,7 +387,7 @@ func (mp *mergeProcessor) processBlock(
}

if canRead {
crdt, err := mp.initCRDTForType(dagBlock.Delta.GetFieldName())
crdt, err := mp.initCRDTForType(dagBlock.Delta)
if err != nil {
return err
}
Expand Down Expand Up @@ -435,50 +447,59 @@ func decryptBlock(
return newBlock, nil
}

func (mp *mergeProcessor) initCRDTForType(field string) (merklecrdt.MerkleCRDT, error) {
mcrdt, exists := mp.mCRDTs[field]
if exists {
return mcrdt, nil
}

func (mp *mergeProcessor) initCRDTForType(crdt crdt.CRDT) (merklecrdt.MerkleCRDT, error) {
schemaVersionKey := keys.CollectionSchemaVersionKey{
SchemaVersionID: mp.col.Schema().VersionID,
CollectionID: mp.col.ID(),
}

if field == "" {
mcrdt = merklecrdt.NewMerkleCompositeDAG(
switch {
case crdt.IsComposite():
docID := string(crdt.GetDocID())
mp.docIDs[docID] = struct{}{}

return merklecrdt.NewMerkleCompositeDAG(
mp.txn,
schemaVersionKey,
mp.dsKey.WithFieldID(core.COMPOSITE_NAMESPACE),
)
mp.mCRDTs[field] = mcrdt
return mcrdt, nil
}
base.MakeDataStoreKeyWithCollectionAndDocID(mp.col.Description(), docID).WithFieldID(core.COMPOSITE_NAMESPACE),
), nil

fd, ok := mp.col.Definition().GetFieldByName(field)
if !ok {
// If the field is not part of the schema, we can safely ignore it.
return nil, nil
case crdt.IsCollection():
return merklecrdt.NewMerkleCollection(
mp.txn,
schemaVersionKey,
keys.NewHeadstoreColKey(mp.col.Description().RootID),
), nil

default:
docID := string(crdt.GetDocID())
mp.docIDs[docID] = struct{}{}

field := crdt.GetFieldName()
fd, ok := mp.col.Definition().GetFieldByName(field)
if !ok {
// If the field is not part of the schema, we can safely ignore it.
return nil, nil
}

return merklecrdt.FieldLevelCRDTWithStore(
mp.txn,
schemaVersionKey,
fd.Typ,
fd.Kind,
base.MakeDataStoreKeyWithCollectionAndDocID(mp.col.Description(), docID).WithFieldID(fd.ID.String()),
field,
)
}
}

mcrdt, err := merklecrdt.FieldLevelCRDTWithStore(
mp.txn,
schemaVersionKey,
fd.Typ,
fd.Kind,
mp.dsKey.WithFieldID(fd.ID.String()),
field,
)
func getCollectionFromRootSchema(ctx context.Context, db *db, rootSchema string) (*collection, error) {
ctx, txn, err := ensureContextTxn(ctx, db, false)
if err != nil {
return nil, err
}
defer txn.Discard(ctx)

mp.mCRDTs[field] = mcrdt
return mcrdt, nil
}

func getCollectionFromRootSchema(ctx context.Context, db *db, rootSchema string) (*collection, error) {
cols, err := db.getCollections(
ctx,
client.CollectionFetchOptions{
Expand All @@ -498,8 +519,8 @@ func getCollectionFromRootSchema(ctx context.Context, db *db, rootSchema string)

// getHeadsAsMergeTarget retrieves the heads of the composite DAG for the given document
// and returns them as a merge target.
func getHeadsAsMergeTarget(ctx context.Context, txn datastore.Txn, dsKey keys.DataStoreKey) (mergeTarget, error) {
cids, err := getHeads(ctx, txn, dsKey)
func getHeadsAsMergeTarget(ctx context.Context, txn datastore.Txn, key keys.HeadstoreKey) (mergeTarget, error) {
cids, err := getHeads(ctx, txn, key)

if err != nil {
return mergeTarget{}, err
Expand All @@ -520,8 +541,8 @@ func getHeadsAsMergeTarget(ctx context.Context, txn datastore.Txn, dsKey keys.Da
}

// getHeads retrieves the heads associated with the given datastore key.
func getHeads(ctx context.Context, txn datastore.Txn, dsKey keys.DataStoreKey) ([]cid.Cid, error) {
headset := clock.NewHeadSet(txn.Headstore(), dsKey.ToHeadStoreKey())
func getHeads(ctx context.Context, txn datastore.Txn, key keys.HeadstoreKey) ([]cid.Cid, error) {
headset := clock.NewHeadSet(txn.Headstore(), key)

cids, _, err := headset.List(ctx)
if err != nil {
Expand Down
16 changes: 8 additions & 8 deletions internal/db/merge_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestMerge_SingleBranch_NoError(t *testing.T) {
compInfo2, err := d.generateCompositeUpdate(&lsys, map[string]any{"name": "Johny"}, compInfo)
require.NoError(t, err)

err = db.executeMerge(ctx, event.Merge{
err = db.executeMerge(ctx, col.(*collection), event.Merge{
DocID: docID.String(),
Cid: compInfo2.link.Cid,
SchemaRoot: col.SchemaRoot(),
Expand Down Expand Up @@ -103,7 +103,7 @@ func TestMerge_DualBranch_NoError(t *testing.T) {
compInfo2, err := d.generateCompositeUpdate(&lsys, map[string]any{"name": "Johny"}, compInfo)
require.NoError(t, err)

err = db.executeMerge(ctx, event.Merge{
err = db.executeMerge(ctx, col.(*collection), event.Merge{
DocID: docID.String(),
Cid: compInfo2.link.Cid,
SchemaRoot: col.SchemaRoot(),
Expand All @@ -113,7 +113,7 @@ func TestMerge_DualBranch_NoError(t *testing.T) {
compInfo3, err := d.generateCompositeUpdate(&lsys, map[string]any{"age": 30}, compInfo)
require.NoError(t, err)

err = db.executeMerge(ctx, event.Merge{
err = db.executeMerge(ctx, col.(*collection), event.Merge{
DocID: docID.String(),
Cid: compInfo3.link.Cid,
SchemaRoot: col.SchemaRoot(),
Expand Down Expand Up @@ -161,7 +161,7 @@ func TestMerge_DualBranchWithOneIncomplete_CouldNotFindCID(t *testing.T) {
compInfo2, err := d.generateCompositeUpdate(&lsys, map[string]any{"name": "Johny"}, compInfo)
require.NoError(t, err)

err = db.executeMerge(ctx, event.Merge{
err = db.executeMerge(ctx, col.(*collection), event.Merge{
DocID: docID.String(),
Cid: compInfo2.link.Cid,
SchemaRoot: col.SchemaRoot(),
Expand All @@ -180,7 +180,7 @@ func TestMerge_DualBranchWithOneIncomplete_CouldNotFindCID(t *testing.T) {
compInfo3, err := d.generateCompositeUpdate(&lsys, map[string]any{"name": "Johny"}, compInfoUnkown)
require.NoError(t, err)

err = db.executeMerge(ctx, event.Merge{
err = db.executeMerge(ctx, col.(*collection), event.Merge{
DocID: docID.String(),
Cid: compInfo3.link.Cid,
SchemaRoot: col.SchemaRoot(),
Expand Down Expand Up @@ -304,15 +304,15 @@ func TestMergeQueue(t *testing.T) {
go q.add(testDocID)
// give time for the goroutine to block
time.Sleep(10 * time.Millisecond)
require.Len(t, q.docs, 1)
require.Len(t, q.keys, 1)
q.done(testDocID)
// give time for the goroutine to add the docID
time.Sleep(10 * time.Millisecond)
q.mutex.Lock()
require.Len(t, q.docs, 1)
require.Len(t, q.keys, 1)
q.mutex.Unlock()
q.done(testDocID)
q.mutex.Lock()
require.Len(t, q.docs, 0)
require.Len(t, q.keys, 0)
q.mutex.Unlock()
}
31 changes: 25 additions & 6 deletions internal/db/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@ import (
)

func (db *db) handleMessages(ctx context.Context, sub *event.Subscription) {
queue := newMergeQueue()
docIdQueue := newMergeQueue()
schemaRootQueue := newMergeQueue()

// This is used to ensure we only trigger loadAndPublishP2PCollections and loadAndPublishReplicators
// once per db instanciation.
loadOnce := sync.Once{}
Expand All @@ -37,17 +39,34 @@ func (db *db) handleMessages(ctx context.Context, sub *event.Subscription) {
switch evt := msg.Data.(type) {
case event.Merge:
go func() {
// ensure only one merge per docID
queue.add(evt.DocID)
defer queue.done(evt.DocID)
col, err := getCollectionFromRootSchema(ctx, db, evt.SchemaRoot)
if err != nil {
log.ErrorContextE(
ctx,
"Failed to execute merge",
err,
corelog.Any("Event", evt))
return
}

if col.Description().IsBranchable {
// As collection commits link to document composite commits, all events
// recieved for branchable collections must be processed serially else
// they may otherwise cause a transaction conflict.
schemaRootQueue.add(evt.SchemaRoot)
defer schemaRootQueue.done(evt.SchemaRoot)
} else {
// ensure only one merge per docID
docIdQueue.add(evt.DocID)
defer docIdQueue.done(evt.DocID)
}

// retry the merge process if a conflict occurs
//
// conficts occur when a user updates a document
// while a merge is in progress.
var err error
for i := 0; i < db.MaxTxnRetries(); i++ {
err = db.executeMerge(ctx, evt)
err = db.executeMerge(ctx, col, evt)
if errors.Is(err, datastore.ErrTxnConflict) {
continue // retry merge
}
Expand Down
Loading

0 comments on commit c4e24c8

Please sign in to comment.