Skip to content

Commit

Permalink
WIP - Move fetcher from desc to col
Browse files Browse the repository at this point in the history
  • Loading branch information
AndrewSisley committed Oct 2, 2023
1 parent d827838 commit 293a6b5
Show file tree
Hide file tree
Showing 9 changed files with 46 additions and 82 deletions.
2 changes: 1 addition & 1 deletion db/collection_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (c *collection) get(
df := c.newFetcher()
desc := &c.desc
// initialize it with the primary index
err := df.Init(ctx, txn, &c.desc, fields, nil, nil, false, showDeleted)
err := df.Init(ctx, txn, c, fields, nil, nil, false, showDeleted)
if err != nil {
_ = df.Close()
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion db/collection_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func (c *collection) iterateAllDocs(
exec func(doc *client.Document) error,
) error {
df := c.newFetcher()
err := df.Init(ctx, txn, &c.desc, fields, nil, nil, false, false)
err := df.Init(ctx, txn, c, fields, nil, nil, false, false)
if err != nil {
_ = df.Close()
return err
Expand Down
16 changes: 8 additions & 8 deletions db/fetcher/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type Fetcher interface {
Init(
ctx context.Context,
txn datastore.Txn,
col *client.CollectionDescription,
col client.Collection,
fields []client.FieldDescription,
filter *mapper.Filter,
docmapper *core.DocumentMapping,
Expand All @@ -77,7 +77,7 @@ var (

// DocumentFetcher is a utility to incrementally fetch all the documents.
type DocumentFetcher struct {
col *client.CollectionDescription
col client.Collection
reverse bool
deletedDocs bool

Expand Down Expand Up @@ -133,7 +133,7 @@ type DocumentFetcher struct {
func (df *DocumentFetcher) Init(
ctx context.Context,
txn datastore.Txn,
col *client.CollectionDescription,
col client.Collection,
fields []client.FieldDescription,
filter *mapper.Filter,
docmapper *core.DocumentMapping,
Expand All @@ -159,7 +159,7 @@ func (df *DocumentFetcher) Init(
}

func (df *DocumentFetcher) init(
col *client.CollectionDescription,
col client.Collection,
fields []client.FieldDescription,
filter *mapper.Filter,
docMapper *core.DocumentMapping,
Expand Down Expand Up @@ -195,7 +195,7 @@ func (df *DocumentFetcher) init(
// get them all
var targetFields []client.FieldDescription
if len(fields) == 0 {
targetFields = df.col.Schema.Fields
targetFields = df.col.Schema().Fields
} else {
targetFields = fields
}
Expand All @@ -206,12 +206,12 @@ func (df *DocumentFetcher) init(

if df.filter != nil {
conditions := df.filter.ToMap(df.mapping)
parsedfilterFields, err := parser.ParseFilterFieldsForDescription(conditions, df.col.Schema)
parsedfilterFields, err := parser.ParseFilterFieldsForDescription(conditions, df.col.Schema())
if err != nil {
return err
}
df.filterFields = make(map[uint32]client.FieldDescription, len(parsedfilterFields))
df.filterSet = bitset.New(uint(len(col.Schema.Fields)))
df.filterSet = bitset.New(uint(len(col.Schema().Fields)))
for _, field := range parsedfilterFields {
df.filterFields[uint32(field.ID)] = field
df.filterSet.Set(uint(field.ID))
Expand Down Expand Up @@ -246,7 +246,7 @@ func (df *DocumentFetcher) start(ctx context.Context, spans core.Spans, withDele
df.deletedDocs = withDeleted

if !spans.HasValue { // no specified spans so create a prefix scan key for the entire collection
start := base.MakeCollectionKey(*df.col)
start := base.MakeCollectionKey(df.col.Description())
if withDeleted {
start = start.WithDeletedFlag()
} else {
Expand Down
12 changes: 6 additions & 6 deletions db/fetcher/mocks/fetcher.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 5 additions & 4 deletions db/fetcher/versioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ type VersionedFetcher struct {

queuedCids *list.List

col *client.CollectionDescription
col client.Collection
// @todo index *client.IndexDescription
mCRDTs map[uint32]crdt.MerkleCRDT
}
Expand All @@ -101,7 +101,7 @@ type VersionedFetcher struct {
func (vf *VersionedFetcher) Init(
ctx context.Context,
txn datastore.Txn,
col *client.CollectionDescription,
col client.Collection,
fields []client.FieldDescription,
filter *mapper.Filter,
docmapper *core.DocumentMapping,
Expand Down Expand Up @@ -357,7 +357,8 @@ func (vf *VersionedFetcher) merge(c cid.Cid) error {
return err
}

field, ok := vf.col.GetFieldByName(l.Name, &vf.col.Schema)
schema := vf.col.Schema()
field, ok := vf.col.Description().GetFieldByName(l.Name, &schema)
if !ok {
return client.NewErrFieldNotExist(l.Name)
}
Expand All @@ -380,7 +381,7 @@ func (vf *VersionedFetcher) processNode(
// handle CompositeDAG
mcrdt, exists := vf.mCRDTs[crdtIndex]
if !exists {
key, err := base.MakePrimaryIndexKeyForCRDT(*vf.col, vf.col.Schema, ctype, vf.key, fieldName)
key, err := base.MakePrimaryIndexKeyForCRDT(vf.col.Description(), vf.col.Schema(), ctype, vf.key, fieldName)
if err != nil {
return err
}
Expand Down
41 changes: 2 additions & 39 deletions db/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (

"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/core"
"github.com/sourcenetwork/defradb/datastore"
"github.com/sourcenetwork/defradb/db/base"
"github.com/sourcenetwork/defradb/db/fetcher"
)
Expand Down Expand Up @@ -51,40 +50,6 @@ func newTestCollectionDescription() client.CollectionDescription {
}
}

func newTestFetcher(ctx context.Context, txn datastore.Txn) (*fetcher.DocumentFetcher, error) {
df := new(fetcher.DocumentFetcher)
desc := newTestCollectionDescription()
err := df.Init(ctx, txn, &desc, desc.Schema.Fields, nil, nil, false, false)
if err != nil {
return nil, err
}
return df, nil
}

func TestFetcherInit(t *testing.T) {
_, err := newTestFetcher(context.Background(), nil)
assert.NoError(t, err)
}

func TestFetcherStart(t *testing.T) {
ctx := context.Background()
db, err := newMemoryDB(ctx)
if err != nil {
t.Error(err)
return
}
txn, err := db.NewTxn(ctx, true)
if err != nil {
t.Error(err)
return
}
df, err := newTestFetcher(ctx, txn)
assert.NoError(t, err)

err = df.Start(ctx, core.Spans{})
assert.NoError(t, err)
}

func TestFetcherStartWithoutInit(t *testing.T) {
ctx := context.Background()
df := new(fetcher.DocumentFetcher)
Expand Down Expand Up @@ -123,8 +88,7 @@ func TestFetcherGetAllPrimaryIndexEncodedDocSingle(t *testing.T) {
// db.printDebugDB()

df := new(fetcher.DocumentFetcher)
desc := col.Description()
err = df.Init(ctx, txn, &desc, desc.Schema.Fields, nil, nil, false, false)
err = df.Init(ctx, txn, col, col.Schema().Fields, nil, nil, false, false)
assert.NoError(t, err)

err = df.Start(ctx, core.Spans{})
Expand Down Expand Up @@ -168,8 +132,7 @@ func TestFetcherGetAllPrimaryIndexEncodedDocMultiple(t *testing.T) {
// db.printDebugDB()

df := new(fetcher.DocumentFetcher)
desc := col.Description()
err = df.Init(ctx, txn, &desc, desc.Schema.Fields, nil, nil, false, false)
err = df.Init(ctx, txn, col, col.Schema().Fields, nil, nil, false, false)
assert.NoError(t, err)

err = df.Start(ctx, core.Spans{})
Expand Down
2 changes: 1 addition & 1 deletion db/indexed_docs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,7 @@ func TestNonUniqueUpdate_ShouldPassToFetcherOnlyRelevantFields(t *testing.T) {
RunAndReturn(func(
ctx context.Context,
txn datastore.Txn,
col *client.CollectionDescription,
col client.Collection,
fields []client.FieldDescription,
filter *mapper.Filter,
mapping *core.DocumentMapping,
Expand Down
20 changes: 10 additions & 10 deletions lens/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type lensedFetcher struct {

txn datastore.Txn

col *client.CollectionDescription
col client.Collection
// Cache the fieldDescriptions mapped by name to allow for cheaper access within the fetcher loop
fieldDescriptionsByName map[string]client.FieldDescription

Expand All @@ -58,7 +58,7 @@ func NewFetcher(source fetcher.Fetcher, registry client.LensRegistry) fetcher.Fe
func (f *lensedFetcher) Init(
ctx context.Context,
txn datastore.Txn,
col *client.CollectionDescription,
col client.Collection,
fields []client.FieldDescription,
filter *mapper.Filter,
docmapper *core.DocumentMapping,
Expand All @@ -67,12 +67,12 @@ func (f *lensedFetcher) Init(
) error {
f.col = col

f.fieldDescriptionsByName = make(map[string]client.FieldDescription, len(col.Schema.Fields))
f.fieldDescriptionsByName = make(map[string]client.FieldDescription, len(col.Schema().Fields))
// Add cache the field descriptions in reverse, allowing smaller-index fields to overwrite any later
// ones. This should never really happen here, but it ensures the result is consistent with col.GetField
// which returns the first one it finds with a matching name.
for i := len(col.Schema.Fields) - 1; i >= 0; i-- {
field := col.Schema.Fields[i]
for i := len(col.Schema().Fields) - 1; i >= 0; i-- {
field := col.Schema().Fields[i]
f.fieldDescriptionsByName[field.Name] = field
}

Expand All @@ -81,11 +81,11 @@ func (f *lensedFetcher) Init(
return err
}

history, err := getTargetedSchemaHistory(ctx, txn, cfg, f.col.Schema.SchemaID, f.col.Schema.VersionID)
history, err := getTargetedSchemaHistory(ctx, txn, cfg, f.col.Schema().SchemaID, f.col.Schema().VersionID)
if err != nil {
return err
}
f.lens = new(ctx, f.registry, f.col.Schema.VersionID, history)
f.lens = new(ctx, f.registry, f.col.Schema().VersionID, history)
f.txn = txn

for schemaVersionID := range history {
Expand All @@ -100,7 +100,7 @@ func (f *lensedFetcher) Init(
}
}

f.targetVersionID = col.Schema.VersionID
f.targetVersionID = col.Schema().VersionID

var innerFetcherFields []client.FieldDescription
if f.hasMigrations {
Expand Down Expand Up @@ -238,7 +238,7 @@ func (f *lensedFetcher) lensDocToEncodedDoc(docAsMap LensDoc) (fetcher.EncodedDo

return &lensEncodedDocument{
key: []byte(key),
schemaVersionID: f.col.Schema.VersionID,
schemaVersionID: f.col.Schema().VersionID,
status: status,
properties: properties,
}, nil
Expand Down Expand Up @@ -283,7 +283,7 @@ func (f *lensedFetcher) updateDataStore(ctx context.Context, original map[string
}

datastoreKeyBase := core.DataStoreKey{
CollectionID: f.col.IDString(),
CollectionID: f.col.Description().IDString(),
DocKey: dockey,
InstanceType: core.ValueKey,
}
Expand Down
Loading

0 comments on commit 293a6b5

Please sign in to comment.