Skip to content

Commit

Permalink
Simplify VersionedFetcher
Browse files Browse the repository at this point in the history
This simplifies InstanceWithStore, which in turn will allow future simplifications of crdt stuff
  • Loading branch information
AndrewSisley committed Oct 31, 2024
1 parent dde5b81 commit 0d9e3e2
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 54 deletions.
29 changes: 0 additions & 29 deletions internal/db/base/collection_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@
package base

import (
"fmt"

"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/internal/core"
"github.com/sourcenetwork/defradb/internal/keys"
)

Expand All @@ -35,29 +32,3 @@ func MakeDataStoreKeyWithCollectionAndDocID(
DocID: docID,
}
}

func MakePrimaryIndexKeyForCRDT(
c client.CollectionDefinition,
ctype client.CType,
key keys.DataStoreKey,
fieldName string,
) (keys.DataStoreKey, error) {
switch ctype {
case client.COMPOSITE:
return MakeDataStoreKeyWithCollectionDescription(c.Description).
WithInstanceInfo(key).
WithFieldID(core.COMPOSITE_NAMESPACE),
nil
case client.LWW_REGISTER, client.PN_COUNTER, client.P_COUNTER:
field, ok := c.GetFieldByName(fieldName)
if !ok {
return keys.DataStoreKey{}, client.NewErrFieldNotExist(fieldName)
}

return MakeDataStoreKeyWithCollectionDescription(c.Description).
WithInstanceInfo(key).
WithFieldID(fmt.Sprint(field.ID)),
nil
}
return keys.DataStoreKey{}, ErrInvalidCrdtType
}
41 changes: 22 additions & 19 deletions internal/db/fetcher/versioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package fetcher
import (
"container/list"
"context"
"fmt"

"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
Expand All @@ -27,7 +28,6 @@ import (
"github.com/sourcenetwork/defradb/datastore/memory"
"github.com/sourcenetwork/defradb/internal/core"
coreblock "github.com/sourcenetwork/defradb/internal/core/block"
"github.com/sourcenetwork/defradb/internal/db/base"
"github.com/sourcenetwork/defradb/internal/keys"
merklecrdt "github.com/sourcenetwork/defradb/internal/merkle/crdt"
"github.com/sourcenetwork/defradb/internal/planner/mapper"
Expand Down Expand Up @@ -99,7 +99,7 @@ type VersionedFetcher struct {

col client.Collection
// @todo index *client.IndexDescription
mCRDTs map[uint32]merklecrdt.MerkleCRDT
mCRDTs map[client.FieldID]merklecrdt.MerkleCRDT
}

// Init initializes the VersionedFetcher.
Expand All @@ -118,7 +118,7 @@ func (vf *VersionedFetcher) Init(
vf.acp = acp
vf.col = col
vf.queuedCids = list.New()
vf.mCRDTs = make(map[uint32]merklecrdt.MerkleCRDT)
vf.mCRDTs = make(map[client.FieldID]merklecrdt.MerkleCRDT)
vf.txn = txn

// create store
Expand Down Expand Up @@ -182,7 +182,7 @@ func (vf *VersionedFetcher) Start(ctx context.Context, spans core.Spans) error {
}

vf.ctx = ctx
vf.dsKey = dk
vf.dsKey = dk.WithCollectionRoot(vf.col.Description().RootID)
vf.version = c

if err := vf.seekTo(vf.version); err != nil {
Expand Down Expand Up @@ -352,7 +352,17 @@ func (vf *VersionedFetcher) merge(c cid.Cid) error {
}

// first arg 0 is the index for the composite DAG in the mCRDTs cache
if err := vf.processBlock(0, block, link, client.COMPOSITE, client.FieldKind_None, ""); err != nil {
mcrdt, exists := vf.mCRDTs[0]
if !exists {
mcrdt = merklecrdt.NewMerkleCompositeDAG(
vf.store,
keys.CollectionSchemaVersionKey{},
vf.dsKey.WithFieldID(core.COMPOSITE_NAMESPACE),
)
vf.mCRDTs[0] = mcrdt
}
err = mcrdt.Clock().ProcessBlock(vf.ctx, block, link)
if err != nil {
return err
}

Expand All @@ -368,7 +378,7 @@ func (vf *VersionedFetcher) merge(c cid.Cid) error {
if !ok {
return client.NewErrFieldNotExist(l.Name)
}
if err := vf.processBlock(uint32(field.ID), subBlock, l.Link, field.Typ, field.Kind, l.Name); err != nil {
if err := vf.processBlock(subBlock, l.Link, field, l.Name); err != nil {
return err
}
}
Expand All @@ -377,32 +387,25 @@ func (vf *VersionedFetcher) merge(c cid.Cid) error {
}

func (vf *VersionedFetcher) processBlock(
crdtIndex uint32,
block *coreblock.Block,
blockLink cidlink.Link,
ctype client.CType,
kind client.FieldKind,
field client.FieldDefinition,
fieldName string,
) (err error) {
// handle CompositeDAG
mcrdt, exists := vf.mCRDTs[crdtIndex]
mcrdt, exists := vf.mCRDTs[field.ID]
if !exists {
dsKey, err := base.MakePrimaryIndexKeyForCRDT(vf.col.Definition(), ctype, vf.dsKey, fieldName)
if err != nil {
return err
}
mcrdt, err = merklecrdt.InstanceWithStore(
vf.store,
keys.CollectionSchemaVersionKey{},
ctype,
kind,
dsKey,
field.Typ,
field.Kind,
vf.dsKey.WithFieldID(fmt.Sprint(field.ID)),
fieldName,
)
if err != nil {
return err
}
vf.mCRDTs[crdtIndex] = mcrdt
vf.mCRDTs[field.ID] = mcrdt
}

return mcrdt.Clock().ProcessBlock(vf.ctx, block, blockLink)
Expand Down
6 changes: 6 additions & 0 deletions internal/keys/datastore_doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ func (k DataStoreKey) WithDeletedFlag() DataStoreKey {
return newKey
}

func (k DataStoreKey) WithCollectionRoot(colRoot uint32) DataStoreKey {
newKey := k
newKey.CollectionRootID = colRoot
return newKey
}

func (k DataStoreKey) WithDocID(docID string) DataStoreKey {
newKey := k
newKey.DocID = docID
Expand Down
6 changes: 0 additions & 6 deletions internal/merkle/crdt/merklecrdt.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,6 @@ func InstanceWithStore(
cType == client.PN_COUNTER,
kind.(client.ScalarKind),
), nil
case client.COMPOSITE:
return NewMerkleCompositeDAG(
store,
schemaVersionKey,
key,
), nil
}
return nil, client.NewErrUnknownCRDT(cType)
}

0 comments on commit 0d9e3e2

Please sign in to comment.