From 293a6b5878003edd244ce6638e0da9fbfdea4f12 Mon Sep 17 00:00:00 2001 From: Andrew Sisley Date: Mon, 2 Oct 2023 13:09:46 -0400 Subject: [PATCH] WIP - Move fetcher from desc to col --- db/collection_get.go | 2 +- db/collection_index.go | 2 +- db/fetcher/fetcher.go | 16 +++++++-------- db/fetcher/mocks/fetcher.go | 12 +++++------ db/fetcher/versioned.go | 9 ++++---- db/fetcher_test.go | 41 ++----------------------------------- db/indexed_docs_test.go | 2 +- lens/fetcher.go | 20 +++++++++--------- planner/scan.go | 24 +++++++++++----------- 9 files changed, 46 insertions(+), 82 deletions(-) diff --git a/db/collection_get.go b/db/collection_get.go index 8262ff44ba..6440f088e1 100644 --- a/db/collection_get.go +++ b/db/collection_get.go @@ -55,7 +55,7 @@ func (c *collection) get( df := c.newFetcher() desc := &c.desc // initialize it with the primary index - err := df.Init(ctx, txn, &c.desc, fields, nil, nil, false, showDeleted) + err := df.Init(ctx, txn, c, fields, nil, nil, false, showDeleted) if err != nil { _ = df.Close() return nil, err diff --git a/db/collection_index.go b/db/collection_index.go index 44f221a78c..a173c7c1db 100644 --- a/db/collection_index.go +++ b/db/collection_index.go @@ -252,7 +252,7 @@ func (c *collection) iterateAllDocs( exec func(doc *client.Document) error, ) error { df := c.newFetcher() - err := df.Init(ctx, txn, &c.desc, fields, nil, nil, false, false) + err := df.Init(ctx, txn, c, fields, nil, nil, false, false) if err != nil { _ = df.Close() return err diff --git a/db/fetcher/fetcher.go b/db/fetcher/fetcher.go index 9723300f7d..5edf24ba67 100644 --- a/db/fetcher/fetcher.go +++ b/db/fetcher/fetcher.go @@ -53,7 +53,7 @@ type Fetcher interface { Init( ctx context.Context, txn datastore.Txn, - col *client.CollectionDescription, + col client.Collection, fields []client.FieldDescription, filter *mapper.Filter, docmapper *core.DocumentMapping, @@ -77,7 +77,7 @@ var ( // DocumentFetcher is a utility to incrementally fetch all the documents. type DocumentFetcher struct { - col *client.CollectionDescription + col client.Collection reverse bool deletedDocs bool @@ -133,7 +133,7 @@ type DocumentFetcher struct { func (df *DocumentFetcher) Init( ctx context.Context, txn datastore.Txn, - col *client.CollectionDescription, + col client.Collection, fields []client.FieldDescription, filter *mapper.Filter, docmapper *core.DocumentMapping, @@ -159,7 +159,7 @@ func (df *DocumentFetcher) Init( } func (df *DocumentFetcher) init( - col *client.CollectionDescription, + col client.Collection, fields []client.FieldDescription, filter *mapper.Filter, docMapper *core.DocumentMapping, @@ -195,7 +195,7 @@ func (df *DocumentFetcher) init( // get them all var targetFields []client.FieldDescription if len(fields) == 0 { - targetFields = df.col.Schema.Fields + targetFields = df.col.Schema().Fields } else { targetFields = fields } @@ -206,12 +206,12 @@ func (df *DocumentFetcher) init( if df.filter != nil { conditions := df.filter.ToMap(df.mapping) - parsedfilterFields, err := parser.ParseFilterFieldsForDescription(conditions, df.col.Schema) + parsedfilterFields, err := parser.ParseFilterFieldsForDescription(conditions, df.col.Schema()) if err != nil { return err } df.filterFields = make(map[uint32]client.FieldDescription, len(parsedfilterFields)) - df.filterSet = bitset.New(uint(len(col.Schema.Fields))) + df.filterSet = bitset.New(uint(len(col.Schema().Fields))) for _, field := range parsedfilterFields { df.filterFields[uint32(field.ID)] = field df.filterSet.Set(uint(field.ID)) @@ -246,7 +246,7 @@ func (df *DocumentFetcher) start(ctx context.Context, spans core.Spans, withDele df.deletedDocs = withDeleted if !spans.HasValue { // no specified spans so create a prefix scan key for the entire collection - start := base.MakeCollectionKey(*df.col) + start := base.MakeCollectionKey(df.col.Description()) if withDeleted { start = start.WithDeletedFlag() } else { diff --git a/db/fetcher/mocks/fetcher.go b/db/fetcher/mocks/fetcher.go index 79eefefc2b..1597b13b2e 100644 --- a/db/fetcher/mocks/fetcher.go +++ b/db/fetcher/mocks/fetcher.go @@ -134,11 +134,11 @@ func (_c *Fetcher_FetchNext_Call) RunAndReturn(run func(context.Context) (fetche } // Init provides a mock function with given fields: ctx, txn, col, fields, filter, docmapper, reverse, showDeleted -func (_m *Fetcher) Init(ctx context.Context, txn datastore.Txn, col *client.CollectionDescription, fields []client.FieldDescription, filter *mapper.Filter, docmapper *core.DocumentMapping, reverse bool, showDeleted bool) error { +func (_m *Fetcher) Init(ctx context.Context, txn datastore.Txn, col client.Collection, fields []client.FieldDescription, filter *mapper.Filter, docmapper *core.DocumentMapping, reverse bool, showDeleted bool) error { ret := _m.Called(ctx, txn, col, fields, filter, docmapper, reverse, showDeleted) var r0 error - if rf, ok := ret.Get(0).(func(context.Context, datastore.Txn, *client.CollectionDescription, []client.FieldDescription, *mapper.Filter, *core.DocumentMapping, bool, bool) error); ok { + if rf, ok := ret.Get(0).(func(context.Context, datastore.Txn, client.Collection, []client.FieldDescription, *mapper.Filter, *core.DocumentMapping, bool, bool) error); ok { r0 = rf(ctx, txn, col, fields, filter, docmapper, reverse, showDeleted) } else { r0 = ret.Error(0) @@ -155,7 +155,7 @@ type Fetcher_Init_Call struct { // Init is a helper method to define mock.On call // - ctx context.Context // - txn datastore.Txn -// - col *client.CollectionDescription +// - col client.Collection // - fields []client.FieldDescription // - filter *mapper.Filter // - docmapper *core.DocumentMapping @@ -165,9 +165,9 @@ func (_e *Fetcher_Expecter) Init(ctx interface{}, txn interface{}, col interface return &Fetcher_Init_Call{Call: _e.mock.On("Init", ctx, txn, col, fields, filter, docmapper, reverse, showDeleted)} } -func (_c *Fetcher_Init_Call) Run(run func(ctx context.Context, txn datastore.Txn, col *client.CollectionDescription, fields []client.FieldDescription, filter *mapper.Filter, docmapper *core.DocumentMapping, reverse bool, showDeleted bool)) *Fetcher_Init_Call { +func (_c *Fetcher_Init_Call) Run(run func(ctx context.Context, txn datastore.Txn, col client.Collection, fields []client.FieldDescription, filter *mapper.Filter, docmapper *core.DocumentMapping, reverse bool, showDeleted bool)) *Fetcher_Init_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(datastore.Txn), args[2].(*client.CollectionDescription), args[3].([]client.FieldDescription), args[4].(*mapper.Filter), args[5].(*core.DocumentMapping), args[6].(bool), args[7].(bool)) + run(args[0].(context.Context), args[1].(datastore.Txn), args[2].(client.Collection), args[3].([]client.FieldDescription), args[4].(*mapper.Filter), args[5].(*core.DocumentMapping), args[6].(bool), args[7].(bool)) }) return _c } @@ -177,7 +177,7 @@ func (_c *Fetcher_Init_Call) Return(_a0 error) *Fetcher_Init_Call { return _c } -func (_c *Fetcher_Init_Call) RunAndReturn(run func(context.Context, datastore.Txn, *client.CollectionDescription, []client.FieldDescription, *mapper.Filter, *core.DocumentMapping, bool, bool) error) *Fetcher_Init_Call { +func (_c *Fetcher_Init_Call) RunAndReturn(run func(context.Context, datastore.Txn, client.Collection, []client.FieldDescription, *mapper.Filter, *core.DocumentMapping, bool, bool) error) *Fetcher_Init_Call { _c.Call.Return(run) return _c } diff --git a/db/fetcher/versioned.go b/db/fetcher/versioned.go index db6956a9a1..da670b1c27 100644 --- a/db/fetcher/versioned.go +++ b/db/fetcher/versioned.go @@ -92,7 +92,7 @@ type VersionedFetcher struct { queuedCids *list.List - col *client.CollectionDescription + col client.Collection // @todo index *client.IndexDescription mCRDTs map[uint32]crdt.MerkleCRDT } @@ -101,7 +101,7 @@ type VersionedFetcher struct { func (vf *VersionedFetcher) Init( ctx context.Context, txn datastore.Txn, - col *client.CollectionDescription, + col client.Collection, fields []client.FieldDescription, filter *mapper.Filter, docmapper *core.DocumentMapping, @@ -357,7 +357,8 @@ func (vf *VersionedFetcher) merge(c cid.Cid) error { return err } - field, ok := vf.col.GetFieldByName(l.Name, &vf.col.Schema) + schema := vf.col.Schema() + field, ok := vf.col.Description().GetFieldByName(l.Name, &schema) if !ok { return client.NewErrFieldNotExist(l.Name) } @@ -380,7 +381,7 @@ func (vf *VersionedFetcher) processNode( // handle CompositeDAG mcrdt, exists := vf.mCRDTs[crdtIndex] if !exists { - key, err := base.MakePrimaryIndexKeyForCRDT(*vf.col, vf.col.Schema, ctype, vf.key, fieldName) + key, err := base.MakePrimaryIndexKeyForCRDT(vf.col.Description(), vf.col.Schema(), ctype, vf.key, fieldName) if err != nil { return err } diff --git a/db/fetcher_test.go b/db/fetcher_test.go index e2c3647792..f9edf8fdb1 100644 --- a/db/fetcher_test.go +++ b/db/fetcher_test.go @@ -18,7 +18,6 @@ import ( "github.com/sourcenetwork/defradb/client" "github.com/sourcenetwork/defradb/core" - "github.com/sourcenetwork/defradb/datastore" "github.com/sourcenetwork/defradb/db/base" "github.com/sourcenetwork/defradb/db/fetcher" ) @@ -51,40 +50,6 @@ func newTestCollectionDescription() client.CollectionDescription { } } -func newTestFetcher(ctx context.Context, txn datastore.Txn) (*fetcher.DocumentFetcher, error) { - df := new(fetcher.DocumentFetcher) - desc := newTestCollectionDescription() - err := df.Init(ctx, txn, &desc, desc.Schema.Fields, nil, nil, false, false) - if err != nil { - return nil, err - } - return df, nil -} - -func TestFetcherInit(t *testing.T) { - _, err := newTestFetcher(context.Background(), nil) - assert.NoError(t, err) -} - -func TestFetcherStart(t *testing.T) { - ctx := context.Background() - db, err := newMemoryDB(ctx) - if err != nil { - t.Error(err) - return - } - txn, err := db.NewTxn(ctx, true) - if err != nil { - t.Error(err) - return - } - df, err := newTestFetcher(ctx, txn) - assert.NoError(t, err) - - err = df.Start(ctx, core.Spans{}) - assert.NoError(t, err) -} - func TestFetcherStartWithoutInit(t *testing.T) { ctx := context.Background() df := new(fetcher.DocumentFetcher) @@ -123,8 +88,7 @@ func TestFetcherGetAllPrimaryIndexEncodedDocSingle(t *testing.T) { // db.printDebugDB() df := new(fetcher.DocumentFetcher) - desc := col.Description() - err = df.Init(ctx, txn, &desc, desc.Schema.Fields, nil, nil, false, false) + err = df.Init(ctx, txn, col, col.Schema().Fields, nil, nil, false, false) assert.NoError(t, err) err = df.Start(ctx, core.Spans{}) @@ -168,8 +132,7 @@ func TestFetcherGetAllPrimaryIndexEncodedDocMultiple(t *testing.T) { // db.printDebugDB() df := new(fetcher.DocumentFetcher) - desc := col.Description() - err = df.Init(ctx, txn, &desc, desc.Schema.Fields, nil, nil, false, false) + err = df.Init(ctx, txn, col, col.Schema().Fields, nil, nil, false, false) assert.NoError(t, err) err = df.Start(ctx, core.Spans{}) diff --git a/db/indexed_docs_test.go b/db/indexed_docs_test.go index b62cb992d6..cd3f0b3fea 100644 --- a/db/indexed_docs_test.go +++ b/db/indexed_docs_test.go @@ -938,7 +938,7 @@ func TestNonUniqueUpdate_ShouldPassToFetcherOnlyRelevantFields(t *testing.T) { RunAndReturn(func( ctx context.Context, txn datastore.Txn, - col *client.CollectionDescription, + col client.Collection, fields []client.FieldDescription, filter *mapper.Filter, mapping *core.DocumentMapping, diff --git a/lens/fetcher.go b/lens/fetcher.go index ee01aa7983..23adc8671d 100644 --- a/lens/fetcher.go +++ b/lens/fetcher.go @@ -34,7 +34,7 @@ type lensedFetcher struct { txn datastore.Txn - col *client.CollectionDescription + col client.Collection // Cache the fieldDescriptions mapped by name to allow for cheaper access within the fetcher loop fieldDescriptionsByName map[string]client.FieldDescription @@ -58,7 +58,7 @@ func NewFetcher(source fetcher.Fetcher, registry client.LensRegistry) fetcher.Fe func (f *lensedFetcher) Init( ctx context.Context, txn datastore.Txn, - col *client.CollectionDescription, + col client.Collection, fields []client.FieldDescription, filter *mapper.Filter, docmapper *core.DocumentMapping, @@ -67,12 +67,12 @@ func (f *lensedFetcher) Init( ) error { f.col = col - f.fieldDescriptionsByName = make(map[string]client.FieldDescription, len(col.Schema.Fields)) + f.fieldDescriptionsByName = make(map[string]client.FieldDescription, len(col.Schema().Fields)) // Add cache the field descriptions in reverse, allowing smaller-index fields to overwrite any later // ones. This should never really happen here, but it ensures the result is consistent with col.GetField // which returns the first one it finds with a matching name. - for i := len(col.Schema.Fields) - 1; i >= 0; i-- { - field := col.Schema.Fields[i] + for i := len(col.Schema().Fields) - 1; i >= 0; i-- { + field := col.Schema().Fields[i] f.fieldDescriptionsByName[field.Name] = field } @@ -81,11 +81,11 @@ func (f *lensedFetcher) Init( return err } - history, err := getTargetedSchemaHistory(ctx, txn, cfg, f.col.Schema.SchemaID, f.col.Schema.VersionID) + history, err := getTargetedSchemaHistory(ctx, txn, cfg, f.col.Schema().SchemaID, f.col.Schema().VersionID) if err != nil { return err } - f.lens = new(ctx, f.registry, f.col.Schema.VersionID, history) + f.lens = new(ctx, f.registry, f.col.Schema().VersionID, history) f.txn = txn for schemaVersionID := range history { @@ -100,7 +100,7 @@ func (f *lensedFetcher) Init( } } - f.targetVersionID = col.Schema.VersionID + f.targetVersionID = col.Schema().VersionID var innerFetcherFields []client.FieldDescription if f.hasMigrations { @@ -238,7 +238,7 @@ func (f *lensedFetcher) lensDocToEncodedDoc(docAsMap LensDoc) (fetcher.EncodedDo return &lensEncodedDocument{ key: []byte(key), - schemaVersionID: f.col.Schema.VersionID, + schemaVersionID: f.col.Schema().VersionID, status: status, properties: properties, }, nil @@ -283,7 +283,7 @@ func (f *lensedFetcher) updateDataStore(ctx context.Context, original map[string } datastoreKeyBase := core.DataStoreKey{ - CollectionID: f.col.IDString(), + CollectionID: f.col.Description().IDString(), DocKey: dockey, InstanceType: core.ValueKey, } diff --git a/planner/scan.go b/planner/scan.go index 256711b34e..7f5d933a13 100644 --- a/planner/scan.go +++ b/planner/scan.go @@ -35,8 +35,8 @@ type scanNode struct { documentIterator docMapper - p *Planner - desc client.CollectionDescription + p *Planner + col client.Collection fields []client.FieldDescription @@ -62,7 +62,7 @@ func (n *scanNode) Init() error { if err := n.fetcher.Init( n.p.ctx, n.p.txn, - &n.desc, + n.col, n.fields, n.filter, n.slct.DocumentMapping, @@ -74,8 +74,8 @@ func (n *scanNode) Init() error { return n.initScan() } -func (n *scanNode) initCollection(desc client.CollectionDescription) error { - n.desc = desc +func (n *scanNode) initCollection(col client.Collection) error { + n.col = col return n.initFields(n.slct.Fields) } @@ -101,7 +101,7 @@ func (n *scanNode) initFields(fields []mapper.Requestable) error { if target.Filter != nil { fieldDescs, err := parser.ParseFilterFieldsForDescription( target.Filter.ExternalConditions, - n.desc.Schema, + n.col.Schema(), ) if err != nil { return err @@ -122,7 +122,7 @@ func (n *scanNode) initFields(fields []mapper.Requestable) error { } func (n *scanNode) tryAddField(fieldName string) bool { - fd, ok := n.desc.Schema.GetField(fieldName) + fd, ok := n.col.Schema().GetField(fieldName) if !ok { // skip fields that are not part of the // schema description. The scanner (and fetcher) @@ -141,7 +141,7 @@ func (n *scanNode) Start() error { func (n *scanNode) initScan() error { if !n.spans.HasValue { - start := base.MakeCollectionKey(n.desc) + start := base.MakeCollectionKey(n.col.Description()) n.spans = core.NewSpans(core.NewSpan(start, start.PrefixEnd())) } @@ -223,8 +223,8 @@ func (n *scanNode) simpleExplain() (map[string]any, error) { } // Add the collection attributes. - simpleExplainMap[collectionNameLabel] = n.desc.Name - simpleExplainMap[collectionIDLabel] = n.desc.IDString() + simpleExplainMap[collectionNameLabel] = n.col.Name() + simpleExplainMap[collectionIDLabel] = n.col.Description().IDString() // Add the spans attribute. simpleExplainMap[spansLabel] = n.explainSpans() @@ -273,11 +273,11 @@ func (p *Planner) Scan(parsed *mapper.Select) (*scanNode, error) { docMapper: docMapper{parsed.DocumentMapping}, } - colDesc, err := p.getCollectionDesc(parsed.CollectionName) + col, err := p.db.GetCollectionByName(p.ctx, parsed.CollectionName) if err != nil { return nil, err } - err = scan.initCollection(colDesc) + err = scan.initCollection(col) if err != nil { return nil, err }