Skip to content

Commit

Permalink
WIP - Simplify VersionedFetcher
Browse files Browse the repository at this point in the history
This simplifies InstanceWithStore, which in turn will allow future simplifications of crdt stuff
  • Loading branch information
AndrewSisley committed Oct 31, 2024
1 parent 9b15f24 commit 7e32155
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 54 deletions.
29 changes: 0 additions & 29 deletions internal/db/base/collection_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@
package base

import (
"fmt"

"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/internal/core"
"github.com/sourcenetwork/defradb/internal/keys"
)

Expand All @@ -35,29 +32,3 @@ func MakeDataStoreKeyWithCollectionAndDocID(
DocID: docID,
}
}

func MakePrimaryIndexKeyForCRDT(
c client.CollectionDefinition,
ctype client.CType,
key keys.DataStoreKey,
fieldName string,
) (keys.DataStoreKey, error) {
switch ctype {
case client.COMPOSITE:
return MakeDataStoreKeyWithCollectionDescription(c.Description).
WithInstanceInfo(key).
WithFieldID(core.COMPOSITE_NAMESPACE),
nil
case client.LWW_REGISTER, client.PN_COUNTER, client.P_COUNTER:
field, ok := c.GetFieldByName(fieldName)
if !ok {
return keys.DataStoreKey{}, client.NewErrFieldNotExist(fieldName)
}

return MakeDataStoreKeyWithCollectionDescription(c.Description).
WithInstanceInfo(key).
WithFieldID(fmt.Sprint(field.ID)),
nil
}
return keys.DataStoreKey{}, ErrInvalidCrdtType
}
41 changes: 22 additions & 19 deletions internal/db/fetcher/versioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package fetcher
import (
"container/list"
"context"
"fmt"

"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
Expand All @@ -27,7 +28,6 @@ import (
"github.com/sourcenetwork/defradb/datastore/memory"
"github.com/sourcenetwork/defradb/internal/core"
coreblock "github.com/sourcenetwork/defradb/internal/core/block"
"github.com/sourcenetwork/defradb/internal/db/base"
"github.com/sourcenetwork/defradb/internal/keys"
merklecrdt "github.com/sourcenetwork/defradb/internal/merkle/crdt"
"github.com/sourcenetwork/defradb/internal/planner/mapper"
Expand Down Expand Up @@ -99,7 +99,7 @@ type VersionedFetcher struct {

col client.Collection
// @todo index *client.IndexDescription
mCRDTs map[uint32]merklecrdt.MerkleCRDT
mCRDTs map[client.FieldID]merklecrdt.MerkleCRDT
}

// Init initializes the VersionedFetcher.
Expand All @@ -118,7 +118,7 @@ func (vf *VersionedFetcher) Init(
vf.acp = acp
vf.col = col
vf.queuedCids = list.New()
vf.mCRDTs = make(map[uint32]merklecrdt.MerkleCRDT)
vf.mCRDTs = make(map[client.FieldID]merklecrdt.MerkleCRDT)
vf.txn = txn

// create store
Expand Down Expand Up @@ -182,7 +182,7 @@ func (vf *VersionedFetcher) Start(ctx context.Context, spans core.Spans) error {
}

vf.ctx = ctx
vf.dsKey = dk
vf.dsKey = dk.WithCollectionRoot(vf.col.Description().RootID)
vf.version = c

if err := vf.seekTo(vf.version); err != nil {
Expand Down Expand Up @@ -352,7 +352,17 @@ func (vf *VersionedFetcher) merge(c cid.Cid) error {
}

// first arg 0 is the index for the composite DAG in the mCRDTs cache
if err := vf.processBlock(0, block, link, client.COMPOSITE, client.FieldKind_None, ""); err != nil {
mcrdt, exists := vf.mCRDTs[0]
if !exists {
mcrdt = merklecrdt.NewMerkleCompositeDAG(
vf.store,
keys.CollectionSchemaVersionKey{},
vf.dsKey.WithFieldID(core.COMPOSITE_NAMESPACE),
)
vf.mCRDTs[0] = mcrdt
}
err = mcrdt.Clock().ProcessBlock(vf.ctx, block, link)
if err != nil {
return err
}

Expand All @@ -368,7 +378,7 @@ func (vf *VersionedFetcher) merge(c cid.Cid) error {
if !ok {
return client.NewErrFieldNotExist(l.Name)
}
if err := vf.processBlock(uint32(field.ID), subBlock, l.Link, field.Typ, field.Kind, l.Name); err != nil {
if err := vf.processBlock(subBlock, l.Link, field, l.Name); err != nil {
return err
}
}
Expand All @@ -377,32 +387,25 @@ func (vf *VersionedFetcher) merge(c cid.Cid) error {
}

func (vf *VersionedFetcher) processBlock(
crdtIndex uint32,
block *coreblock.Block,
blockLink cidlink.Link,
ctype client.CType,
kind client.FieldKind,
field client.FieldDefinition,
fieldName string,
) (err error) {
// handle CompositeDAG
mcrdt, exists := vf.mCRDTs[crdtIndex]
mcrdt, exists := vf.mCRDTs[field.ID]
if !exists {
dsKey, err := base.MakePrimaryIndexKeyForCRDT(vf.col.Definition(), ctype, vf.dsKey, fieldName)
if err != nil {
return err
}
mcrdt, err = merklecrdt.InstanceWithStore(
vf.store,
keys.CollectionSchemaVersionKey{},
ctype,
kind,
dsKey,
field.Typ,
field.Kind,
vf.dsKey.WithFieldID(fmt.Sprint(field.ID)),
fieldName,
)
if err != nil {
return err
}
vf.mCRDTs[crdtIndex] = mcrdt
vf.mCRDTs[field.ID] = mcrdt
}

return mcrdt.Clock().ProcessBlock(vf.ctx, block, blockLink)
Expand Down
6 changes: 6 additions & 0 deletions internal/keys/datastore_doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ func (k DataStoreKey) WithDeletedFlag() DataStoreKey {
return newKey
}

func (k DataStoreKey) WithCollectionRoot(colRoot uint32) DataStoreKey {
newKey := k
newKey.CollectionRootID = colRoot
return newKey
}

func (k DataStoreKey) WithDocID(docID string) DataStoreKey {
newKey := k
newKey.DocID = docID
Expand Down
6 changes: 0 additions & 6 deletions internal/merkle/crdt/merklecrdt.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,6 @@ func InstanceWithStore(
cType == client.PN_COUNTER,
kind.(client.ScalarKind),
), nil
case client.COMPOSITE:
return NewMerkleCompositeDAG(
store,
schemaVersionKey,
key,
), nil
}
return nil, client.NewErrUnknownCRDT(cType)
}

0 comments on commit 7e32155

Please sign in to comment.