From 4a2b0f7053a8539dacdcd494694a19a64be563b7 Mon Sep 17 00:00:00 2001 From: Islam Aleiv Date: Thu, 7 Mar 2024 19:18:27 +0100 Subject: [PATCH 1/6] Make peers sync secondary index --- client/db.go | 9 ++ client/index.go | 20 +++ db/collection_index.go | 23 ++- db/index.go | 25 ++- db/index_test.go | 2 +- db/txn_db.go | 144 ++++++++++++++++- http/client.go | 29 +++- net/server.go | 75 ++++++--- tests/clients/cli/wrapper.go | 31 +++- tests/clients/cli/wrapper_cli.go | 6 +- tests/clients/cli/wrapper_collection.go | 2 +- tests/clients/http/wrapper.go | 25 +++ tests/integration/index/index_p2p_test.go | 189 ++++++++++++++++++++++ 13 files changed, 539 insertions(+), 41 deletions(-) create mode 100644 tests/integration/index/index_p2p_test.go diff --git a/client/db.go b/client/db.go index 7b0cc8060f..0e05cafbef 100644 --- a/client/db.go +++ b/client/db.go @@ -215,6 +215,15 @@ type Store interface { // GetAllIndexes returns all the indexes that currently exist within this [Store]. GetAllIndexes(context.Context) (map[CollectionName][]IndexDescription, error) + // CreateDocIndex creates an index for the given document. + CreateDocIndex(context.Context, Collection, *Document) error + + // UpdateDocIndex updates the index for the given document. + UpdateDocIndex(ctx context.Context, col Collection, oldDoc, newDoc *Document) error + + // DeleteDocIndex deletes the index for the given document. + DeleteDocIndex(context.Context, Collection, *Document) error + // ExecRequest executes the given GQL request against the [Store]. ExecRequest(context.Context, string) *RequestResult } diff --git a/client/index.go b/client/index.go index 9175cf7c0d..6f87626c98 100644 --- a/client/index.go +++ b/client/index.go @@ -10,6 +10,12 @@ package client +import ( + "context" + + "github.com/sourcenetwork/defradb/datastore" +) + // IndexFieldDescription describes how a field is being indexed. type IndexedFieldDescription struct { // Name contains the name of the field. @@ -30,6 +36,20 @@ type IndexDescription struct { Unique bool } +// CollectionIndex is an interface for indexing documents in a collection. +type CollectionIndex interface { + // Save indexes a document by storing it + Save(context.Context, datastore.Txn, *Document) error + // Update updates an existing document in the index + Update(context.Context, datastore.Txn, *Document, *Document) error + // Delete deletes an existing document from the index + Delete(context.Context, datastore.Txn, *Document) error + // Name returns the name of the index + Name() string + // Description returns the description of the index + Description() IndexDescription +} + // CollectIndexedFields returns all fields that are indexed by all collection indexes. func (d CollectionDefinition) CollectIndexedFields() []FieldDefinition { fieldsMap := make(map[string]bool) diff --git a/db/collection_index.go b/db/collection_index.go index b95154097c..ae732477c3 100644 --- a/db/collection_index.go +++ b/db/collection_index.go @@ -57,8 +57,8 @@ func (db *db) dropCollectionIndex( return col.DropIndex(ctx, indexName) } -// getAllIndexes returns all the indexes in the database. -func (db *db) getAllIndexes( +// getAllIndexDescriptions returns all the index descriptions in the database. +func (db *db) getAllIndexDescriptions( ctx context.Context, txn datastore.Txn, ) (map[client.CollectionName][]client.IndexDescription, error) { @@ -107,6 +107,22 @@ func (db *db) fetchCollectionIndexDescriptions( return indexDescriptions, nil } +func (db *db) getCollectionIndexes(ctx context.Context, txn datastore.Txn, col client.Collection) ([]CollectionIndex, error) { + indexDescriptions, err := db.fetchCollectionIndexDescriptions(ctx, txn, col.ID()) + if err != nil { + return nil, err + } + colIndexes := make([]CollectionIndex, 0, len(indexDescriptions)) + for _, indexDesc := range indexDescriptions { + index, err := NewCollectionIndex(col, indexDesc) + if err != nil { + return nil, err + } + colIndexes = append(colIndexes, index) + } + return colIndexes, nil +} + func (c *collection) indexNewDoc(ctx context.Context, txn datastore.Txn, doc *client.Document) error { err := c.loadIndexes(ctx, txn) if err != nil { @@ -133,7 +149,8 @@ func (c *collection) updateIndexedDoc( oldDoc, err := c.get( ctx, txn, - c.getPrimaryKeyFromDocID(doc.ID()), c.Definition().CollectIndexedFields(), + c.getPrimaryKeyFromDocID(doc.ID()), + c.Definition().CollectIndexedFields(), false, ) if err != nil { diff --git a/db/index.go b/db/index.go index ddec525598..319cdeb8a7 100644 --- a/db/index.go +++ b/db/index.go @@ -25,16 +25,9 @@ import ( // It abstracts away common index functionality to be implemented // by different index types: non-unique, unique, and composite type CollectionIndex interface { - // Save indexes a document by storing it - Save(context.Context, datastore.Txn, *client.Document) error - // Update updates an existing document in the index - Update(context.Context, datastore.Txn, *client.Document, *client.Document) error + client.CollectionIndex // RemoveAll removes all documents from the index RemoveAll(context.Context, datastore.Txn) error - // Name returns the name of the index - Name() string - // Description returns the description of the index - Description() client.IndexDescription } func canConvertIndexFieldValue[T any](val any) bool { @@ -248,6 +241,14 @@ func (index *collectionSimpleIndex) Update( return index.Save(ctx, txn, newDoc) } +func (index *collectionSimpleIndex) Delete( + ctx context.Context, + txn datastore.Txn, + doc *client.Document, +) error { + return index.deleteDocIndex(ctx, txn, doc) +} + func (index *collectionSimpleIndex) deleteDocIndex( ctx context.Context, txn datastore.Txn, @@ -358,6 +359,14 @@ func (index *collectionUniqueIndex) prepareIndexRecordToStore( return key, val, nil } +func (index *collectionUniqueIndex) Delete( + ctx context.Context, + txn datastore.Txn, + doc *client.Document, +) error { + return index.deleteDocIndex(ctx, txn, doc) +} + func (index *collectionUniqueIndex) Update( ctx context.Context, txn datastore.Txn, diff --git a/db/index_test.go b/db/index_test.go index 2b0dbdc8b6..44c2e45f52 100644 --- a/db/index_test.go +++ b/db/index_test.go @@ -263,7 +263,7 @@ func (f *indexTestFixture) createCollectionIndexFor( } func (f *indexTestFixture) getAllIndexes() (map[client.CollectionName][]client.IndexDescription, error) { - return f.db.getAllIndexes(f.ctx, f.txn) + return f.db.getAllIndexDescriptions(f.ctx, f.txn) } func (f *indexTestFixture) getCollectionIndexes(colID uint32) ([]client.IndexDescription, error) { diff --git a/db/txn_db.go b/db/txn_db.go index 455b062b93..662273ea96 100644 --- a/db/txn_db.go +++ b/db/txn_db.go @@ -174,14 +174,154 @@ func (db *implicitTxnDB) GetAllIndexes( } defer txn.Discard(ctx) - return db.getAllIndexes(ctx, txn) + return db.getAllIndexDescriptions(ctx, txn) } // GetAllIndexes gets all the indexes in the database. func (db *explicitTxnDB) GetAllIndexes( ctx context.Context, ) (map[client.CollectionName][]client.IndexDescription, error) { - return db.getAllIndexes(ctx, db.txn) + return db.getAllIndexDescriptions(ctx, db.txn) +} + +// CreateDocIndex creates a new index for the given document. +func (db *implicitTxnDB) CreateDocIndex( + ctx context.Context, + col client.Collection, + doc *client.Document, +) error { + txn, err := db.NewTxn(ctx, true) + if err != nil { + return err + } + defer txn.Discard(ctx) + + indexes, err := db.getCollectionIndexes(ctx, txn, col) + if err != nil { + return err + } + + for _, index := range indexes { + err := index.Save(ctx, txn, doc) + if err != nil { + return err + } + } + return txn.Commit(ctx) +} + +// CreateDocIndex creates a new index for the given document. +func (db *explicitTxnDB) CreateDocIndex( + ctx context.Context, + col client.Collection, + doc *client.Document, +) error { + indexes, err := db.getCollectionIndexes(ctx, db.txn, col) + if err != nil { + return err + } + + for _, index := range indexes { + err := index.Save(ctx, db.txn, doc) + if err != nil { + return err + } + } + return nil +} + +// UpdateDocIndex updates the indexes for the given document. +func (db *implicitTxnDB) UpdateDocIndex( + ctx context.Context, + col client.Collection, + oldDoc *client.Document, + newDoc *client.Document, +) error { + txn, err := db.NewTxn(ctx, true) + if err != nil { + return err + } + defer txn.Discard(ctx) + + indexes, err := db.getCollectionIndexes(ctx, txn, col) + if err != nil { + return err + } + + for _, index := range indexes { + err := index.Update(ctx, txn, oldDoc, newDoc) + if err != nil { + return err + } + } + return txn.Commit(ctx) +} + +// UpdateDocIndex updates the indexes for the given document. +func (db *explicitTxnDB) UpdateDocIndex( + ctx context.Context, + col client.Collection, + oldDoc *client.Document, + newDoc *client.Document, +) error { + indexes, err := db.getCollectionIndexes(ctx, db.txn, col) + if err != nil { + return err + } + + for _, index := range indexes { + err := index.Update(ctx, db.txn, oldDoc, newDoc) + if err != nil { + return err + } + } + return nil +} + +// DeleteDocIndex deletes the indexes for the given document. +func (db *implicitTxnDB) DeleteDocIndex( + ctx context.Context, + col client.Collection, + doc *client.Document, +) error { + txn, err := db.NewTxn(ctx, true) + if err != nil { + return err + } + defer txn.Discard(ctx) + + indexes, err := db.getCollectionIndexes(ctx, txn, col) + if err != nil { + return err + } + + for _, index := range indexes { + err := index.Delete(ctx, txn, doc) + if err != nil { + return err + } + } + return txn.Commit(ctx) +} + +// DeleteDocIndex deletes the indexes for the given document. +func (db *explicitTxnDB) DeleteDocIndex( + ctx context.Context, + col client.Collection, + doc *client.Document, +) error { + indexes, err := db.getCollectionIndexes(ctx, db.txn, col) + if err != nil { + return err + } + + for _, index := range indexes { + err := index.Delete(ctx, db.txn, doc) + if err != nil { + return err + } + } + return nil } // AddSchema takes the provided GQL schema in SDL format, and applies it to the database, diff --git a/http/client.go b/http/client.go index b8db184b9b..edfb4acb8e 100644 --- a/http/client.go +++ b/http/client.go @@ -344,7 +344,7 @@ func (c *Client) ExecRequest(ctx context.Context, query string) *client.RequestR return result } if res.Header.Get("Content-Type") == "text/event-stream" { - result.Pub = c.execRequestSubscription(ctx, res.Body) + result.Pub = c.execRequestSubscription(res.Body) return result } // ignore close errors because they have @@ -367,7 +367,7 @@ func (c *Client) ExecRequest(ctx context.Context, query string) *client.RequestR return result } -func (c *Client) execRequestSubscription(ctx context.Context, r io.ReadCloser) *events.Publisher[events.Update] { +func (c *Client) execRequestSubscription(r io.ReadCloser) *events.Publisher[events.Update] { pubCh := events.New[events.Update](0, 0) pub, err := events.NewPublisher[events.Update](pubCh, 0) if err != nil { @@ -434,3 +434,28 @@ func (c *Client) Events() events.Events { func (c *Client) MaxTxnRetries() int { panic("client side database") } + +func (c *Client) CreateDocIndex( + ctx context.Context, + col client.Collection, + doc *client.Document, +) error { + panic("client side database") +} + +func (c *Client) UpdateDocIndex( + ctx context.Context, + col client.Collection, + oldDoc *client.Document, + newDoc *client.Document, +) error { + panic("client side database") +} + +func (w *Client) DeleteDocIndex( + ctx context.Context, + col client.Collection, + newDoc *client.Document, +) error { + panic("client side database") +} diff --git a/net/server.go b/net/server.go index 8bcce0f226..59dff68622 100644 --- a/net/server.go +++ b/net/server.go @@ -247,7 +247,6 @@ func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushL return &pb.PushLogReply{}, nil } - schemaRoot := string(req.Body.SchemaRoot) dsKey := core.DataStoreKeyFromDocID(docID) var txnErr error @@ -263,24 +262,9 @@ func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushL // Currently a schema is the best way we have to link a push log request to a collection, // this will change with https://github.com/sourcenetwork/defradb/issues/1085 - cols, err := store.GetCollections( - ctx, - client.CollectionFetchOptions{ - SchemaRoot: immutable.Some(schemaRoot), - }, - ) + col, err := s.getActiveCollection(ctx, store, string(req.Body.SchemaRoot)) if err != nil { - return nil, errors.Wrap(fmt.Sprintf("Failed to get collection from schemaRoot %s", schemaRoot), err) - } - if len(cols) == 0 { - return nil, client.NewErrCollectionNotFoundForSchema(schemaRoot) - } - var col client.Collection - for _, c := range cols { - if col != nil && col.Name().HasValue() && !c.Name().HasValue() { - continue - } - col = c + return nil, err } // Create a new DAG service with the current transaction @@ -311,6 +295,11 @@ func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushL session.Wait() bp.mergeBlocks(ctx) + err = s.syncIndexedDocs(ctx, col.WithTxn(txn), docID, store) + if err != nil { + return nil, err + } + // dagWorkers specific to the DocID will have been spawned within handleChildBlocks. // Once we are done with the dag syncing process, we can get rid of those workers. if s.peer.closeJob != nil { @@ -338,6 +327,56 @@ func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushL return &pb.PushLogReply{}, client.NewErrMaxTxnRetries(txnErr) } +func (*server) getActiveCollection(ctx context.Context, store client.Store, schemaRoot string) (client.Collection, error) { + cols, err := store.GetCollections( + ctx, + client.CollectionFetchOptions{ + SchemaRoot: immutable.Some(schemaRoot), + }, + ) + if err != nil { + return nil, errors.Wrap(fmt.Sprintf("Failed to get collection from schemaRoot %s", schemaRoot), err) + } + if len(cols) == 0 { + return nil, client.NewErrCollectionNotFoundForSchema(schemaRoot) + } + var col client.Collection + for _, c := range cols { + if col != nil && col.Name().HasValue() && !c.Name().HasValue() { + continue + } + col = c + } + return col, nil +} + +func (s *server) syncIndexedDocs(ctx context.Context, col client.Collection, docID client.DocID, store client.Store) error { + preTxnCol, err := s.db.GetCollectionByName(ctx, col.Name().Value()) + if err != nil { + return err + } + + oldDoc, err := preTxnCol.Get(ctx, docID, false) + isNewDoc := errors.Is(err, client.ErrDocumentNotFound) + if !isNewDoc && err != nil { + return err + } + + doc, err := col.Get(ctx, docID, false) + isDeletedDoc := errors.Is(err, client.ErrDocumentNotFound) + if !isDeletedDoc && err != nil { + return err + } + + if isDeletedDoc { + return store.DeleteDocIndex(ctx, col, oldDoc) + } else if isNewDoc { + return store.CreateDocIndex(ctx, col, doc) + } else { + return store.UpdateDocIndex(ctx, col, oldDoc, doc) + } +} + // GetHeadLog receives a get head log request func (s *server) GetHeadLog( ctx context.Context, diff --git a/tests/clients/cli/wrapper.go b/tests/clients/cli/wrapper.go index cd20da888d..393acf411d 100644 --- a/tests/clients/cli/wrapper.go +++ b/tests/clients/cli/wrapper.go @@ -365,7 +365,7 @@ func (w *Wrapper) ExecRequest(ctx context.Context, query string) *client.Request result := &client.RequestResult{} - stdOut, stdErr, err := w.cmd.executeStream(ctx, args) + stdOut, stdErr, err := w.cmd.executeStream(args) if err != nil { result.GQL.Errors = []error{err} return result @@ -377,7 +377,7 @@ func (w *Wrapper) ExecRequest(ctx context.Context, query string) *client.Request return result } if header == cli.SUB_RESULTS_HEADER { - result.Pub = w.execRequestSubscription(ctx, buffer) + result.Pub = w.execRequestSubscription(buffer) return result } data, err := io.ReadAll(buffer) @@ -405,7 +405,7 @@ func (w *Wrapper) ExecRequest(ctx context.Context, query string) *client.Request return result } -func (w *Wrapper) execRequestSubscription(ctx context.Context, r io.Reader) *events.Publisher[events.Update] { +func (w *Wrapper) execRequestSubscription(r io.Reader) *events.Publisher[events.Update] { pubCh := events.New[events.Update](0, 0) pub, err := events.NewPublisher[events.Update](pubCh, 0) if err != nil { @@ -522,3 +522,28 @@ func (w *Wrapper) WaitForPushLogByPeerEvent(id peer.ID) error { func (w *Wrapper) WaitForPushLogFromPeerEvent(id peer.ID) error { return w.node.WaitForPushLogFromPeerEvent(id) } + +func (w *Wrapper) CreateDocIndex( + ctx context.Context, + col client.Collection, + doc *client.Document, +) error { + panic("client side database") +} + +func (w *Wrapper) UpdateDocIndex( + ctx context.Context, + col client.Collection, + oldDoc *client.Document, + newDoc *client.Document, +) error { + panic("client side database") +} + +func (w *Wrapper) DeleteDocIndex( + ctx context.Context, + col client.Collection, + newDoc *client.Document, +) error { + panic("client side database") +} diff --git a/tests/clients/cli/wrapper_cli.go b/tests/clients/cli/wrapper_cli.go index e8dc24270c..2a985dcb18 100644 --- a/tests/clients/cli/wrapper_cli.go +++ b/tests/clients/cli/wrapper_cli.go @@ -38,8 +38,8 @@ func (w *cliWrapper) withTxn(tx datastore.Txn) *cliWrapper { } } -func (w *cliWrapper) execute(ctx context.Context, args []string) ([]byte, error) { - stdOut, stdErr, err := w.executeStream(ctx, args) +func (w *cliWrapper) execute(_ context.Context, args []string) ([]byte, error) { + stdOut, stdErr, err := w.executeStream(args) if err != nil { return nil, err } @@ -57,7 +57,7 @@ func (w *cliWrapper) execute(ctx context.Context, args []string) ([]byte, error) return stdOutData, nil } -func (w *cliWrapper) executeStream(ctx context.Context, args []string) (io.ReadCloser, io.ReadCloser, error) { +func (w *cliWrapper) executeStream(args []string) (io.ReadCloser, io.ReadCloser, error) { stdOutRead, stdOutWrite := io.Pipe() stdErrRead, stdErrWrite := io.Pipe() diff --git a/tests/clients/cli/wrapper_collection.go b/tests/clients/cli/wrapper_collection.go index 4d94d7b20f..08e5119312 100644 --- a/tests/clients/cli/wrapper_collection.go +++ b/tests/clients/cli/wrapper_collection.go @@ -364,7 +364,7 @@ func (c *Collection) GetAllDocIDs(ctx context.Context) (<-chan client.DocIDResul args := []string{"client", "collection", "docIDs"} args = append(args, "--name", c.Description().Name.Value()) - stdOut, _, err := c.cmd.executeStream(ctx, args) + stdOut, _, err := c.cmd.executeStream(args) if err != nil { return nil, err } diff --git a/tests/clients/http/wrapper.go b/tests/clients/http/wrapper.go index b45105a7f7..9c52928c13 100644 --- a/tests/clients/http/wrapper.go +++ b/tests/clients/http/wrapper.go @@ -226,3 +226,28 @@ func (w *Wrapper) WaitForPushLogByPeerEvent(id peer.ID) error { func (w *Wrapper) WaitForPushLogFromPeerEvent(id peer.ID) error { return w.node.WaitForPushLogFromPeerEvent(id) } + +func (w *Wrapper) CreateDocIndex( + ctx context.Context, + col client.Collection, + doc *client.Document, +) error { + return w.client.CreateDocIndex(ctx, col, doc) +} + +func (w *Wrapper) UpdateDocIndex( + ctx context.Context, + col client.Collection, + oldDoc *client.Document, + newDoc *client.Document, +) error { + return w.client.UpdateDocIndex(ctx, col, oldDoc, newDoc) +} + +func (w *Wrapper) DeleteDocIndex( + ctx context.Context, + col client.Collection, + doc *client.Document, +) error { + return w.client.DeleteDocIndex(ctx, col, doc) +} diff --git a/tests/integration/index/index_p2p_test.go b/tests/integration/index/index_p2p_test.go new file mode 100644 index 0000000000..6399b64be4 --- /dev/null +++ b/tests/integration/index/index_p2p_test.go @@ -0,0 +1,189 @@ +// Copyright 2023 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package index + +import ( + "testing" + + testUtils "github.com/sourcenetwork/defradb/tests/integration" + "github.com/sourcenetwork/immutable" +) + +func TestIndexP2P_IfPeerCreatedDoc_ListeningPeerShouldIndexIt(t *testing.T) { + test := testUtils.TestCase{ + Actions: []any{ + testUtils.RandomNetworkingConfig(), + testUtils.RandomNetworkingConfig(), + testUtils.SchemaUpdate{ + Schema: ` + type Users { + name: String + } + `, + }, + testUtils.CreateIndex{ + CollectionID: 0, + FieldName: "name", + }, + testUtils.ConnectPeers{ + SourceNodeID: 1, + TargetNodeID: 0, + }, + testUtils.SubscribeToCollection{ + NodeID: 1, + CollectionIDs: []int{0}, + }, + testUtils.CreateDoc{ + NodeID: immutable.Some(0), + Doc: `{ + "name": "Fred" + }`, + }, + testUtils.WaitForSync{}, + testUtils.Request{ + NodeID: immutable.Some(1), + Request: `query { + Users (filter: {name: {_eq: "Fred"}}){ + name + } + }`, + Results: []map[string]any{ + { + "name": "Fred", + }, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +} + +func TestIndexP2P_IfPeerUpdateDoc_ListeningPeerShouldUpdateIndex(t *testing.T) { + test := testUtils.TestCase{ + Actions: []any{ + testUtils.RandomNetworkingConfig(), + testUtils.RandomNetworkingConfig(), + testUtils.SchemaUpdate{ + Schema: ` + type Users { + name: String + } + `, + }, + testUtils.CreateIndex{ + CollectionID: 0, + FieldName: "name", + }, + testUtils.ConnectPeers{ + SourceNodeID: 1, + TargetNodeID: 0, + }, + testUtils.SubscribeToCollection{ + NodeID: 1, + CollectionIDs: []int{0}, + }, + testUtils.CreateDoc{ + NodeID: immutable.Some(0), + Doc: `{ + "name": "Fred" + }`, + }, + testUtils.WaitForSync{}, + testUtils.UpdateDoc{ + NodeID: immutable.Some(0), + Doc: `{ + "name": "Islam" + }`, + }, + testUtils.WaitForSync{}, + testUtils.Request{ + NodeID: immutable.Some(1), + Request: `query { + Users (filter: {name: {_eq: "Islam"}}){ + name + } + }`, + Results: []map[string]any{ + { + "name": "Islam", + }, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +} + +func TestIndexP2P_IfPeerDeleteDoc_ListeningPeerShouldDeleteIndex(t *testing.T) { + test := testUtils.TestCase{ + Actions: []any{ + testUtils.RandomNetworkingConfig(), + testUtils.RandomNetworkingConfig(), + testUtils.SchemaUpdate{ + Schema: ` + type Users { + name: String + age: Int + } + `, + }, + testUtils.CreateIndex{ + CollectionID: 0, + FieldName: "name", + }, + testUtils.ConnectPeers{ + SourceNodeID: 1, + TargetNodeID: 0, + }, + testUtils.SubscribeToCollection{ + NodeID: 1, + CollectionIDs: []int{0}, + }, + testUtils.CreateDoc{ + NodeID: immutable.Some(0), + Doc: `{ + "name": "Fred", + "age": 25 + }`, + }, + testUtils.CreateDoc{ + NodeID: immutable.Some(0), + Doc: `{ + "name": "Fred", + "age": 30 + }`, + }, + testUtils.WaitForSync{}, + testUtils.DeleteDoc{ + NodeID: immutable.Some(0), + DocID: 0, + }, + testUtils.WaitForSync{}, + testUtils.Request{ + NodeID: immutable.Some(1), + Request: `query { + Users (filter: {name: {_eq: "Fred"}}){ + age + } + }`, + Results: []map[string]any{ + { + "age": int64(30), + }, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +} From be811d89f8e9cf196a5cf67d1842f0212cf87945 Mon Sep 17 00:00:00 2001 From: Islam Aleiv Date: Thu, 7 Mar 2024 19:29:42 +0100 Subject: [PATCH 2/6] Fix lint --- db/collection_index.go | 6 +++++- net/server.go | 13 +++++++++++-- tests/integration/index/index_p2p_test.go | 3 ++- 3 files changed, 18 insertions(+), 4 deletions(-) diff --git a/db/collection_index.go b/db/collection_index.go index ae732477c3..621d05153a 100644 --- a/db/collection_index.go +++ b/db/collection_index.go @@ -107,7 +107,11 @@ func (db *db) fetchCollectionIndexDescriptions( return indexDescriptions, nil } -func (db *db) getCollectionIndexes(ctx context.Context, txn datastore.Txn, col client.Collection) ([]CollectionIndex, error) { +func (db *db) getCollectionIndexes( + ctx context.Context, + txn datastore.Txn, + col client.Collection, +) ([]CollectionIndex, error) { indexDescriptions, err := db.fetchCollectionIndexDescriptions(ctx, txn, col.ID()) if err != nil { return nil, err diff --git a/net/server.go b/net/server.go index 59dff68622..39e4b829cd 100644 --- a/net/server.go +++ b/net/server.go @@ -327,7 +327,11 @@ func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushL return &pb.PushLogReply{}, client.NewErrMaxTxnRetries(txnErr) } -func (*server) getActiveCollection(ctx context.Context, store client.Store, schemaRoot string) (client.Collection, error) { +func (*server) getActiveCollection( + ctx context.Context, + store client.Store, + schemaRoot string, +) (client.Collection, error) { cols, err := store.GetCollections( ctx, client.CollectionFetchOptions{ @@ -350,7 +354,12 @@ func (*server) getActiveCollection(ctx context.Context, store client.Store, sche return col, nil } -func (s *server) syncIndexedDocs(ctx context.Context, col client.Collection, docID client.DocID, store client.Store) error { +func (s *server) syncIndexedDocs( + ctx context.Context, + col client.Collection, + docID client.DocID, + store client.Store, +) error { preTxnCol, err := s.db.GetCollectionByName(ctx, col.Name().Value()) if err != nil { return err diff --git a/tests/integration/index/index_p2p_test.go b/tests/integration/index/index_p2p_test.go index 6399b64be4..820d6d2c5b 100644 --- a/tests/integration/index/index_p2p_test.go +++ b/tests/integration/index/index_p2p_test.go @@ -13,8 +13,9 @@ package index import ( "testing" - testUtils "github.com/sourcenetwork/defradb/tests/integration" "github.com/sourcenetwork/immutable" + + testUtils "github.com/sourcenetwork/defradb/tests/integration" ) func TestIndexP2P_IfPeerCreatedDoc_ListeningPeerShouldIndexIt(t *testing.T) { From 710d8c58056c0c4c14f2228c21468134c904821e Mon Sep 17 00:00:00 2001 From: Islam Aleiv Date: Thu, 7 Mar 2024 20:12:02 +0100 Subject: [PATCH 3/6] Extract non-transaction related code --- db/txn_db.go | 64 ++++++++++++++++++++++++++++++---------------------- 1 file changed, 37 insertions(+), 27 deletions(-) diff --git a/db/txn_db.go b/db/txn_db.go index 662273ea96..8af7db48df 100644 --- a/db/txn_db.go +++ b/db/txn_db.go @@ -196,17 +196,11 @@ func (db *implicitTxnDB) CreateDocIndex( } defer txn.Discard(ctx) - indexes, err := db.getCollectionIndexes(ctx, txn, col) + err = db.createDocIndex(ctx, txn, col, doc) if err != nil { return err } - for _, index := range indexes { - err := index.Save(ctx, txn, doc) - if err != nil { - return err - } - } return txn.Commit(ctx) } @@ -216,13 +210,22 @@ func (db *explicitTxnDB) CreateDocIndex( col client.Collection, doc *client.Document, ) error { - indexes, err := db.getCollectionIndexes(ctx, db.txn, col) + return db.createDocIndex(ctx, db.txn, col, doc) +} + +func (db *db) createDocIndex( + ctx context.Context, + txn datastore.Txn, + col client.Collection, + doc *client.Document, +) error { + indexes, err := db.getCollectionIndexes(ctx, txn, col) if err != nil { return err } for _, index := range indexes { - err := index.Save(ctx, db.txn, doc) + err := index.Save(ctx, txn, doc) if err != nil { return err } @@ -243,17 +246,11 @@ func (db *implicitTxnDB) UpdateDocIndex( } defer txn.Discard(ctx) - indexes, err := db.getCollectionIndexes(ctx, txn, col) + err = db.updateDocIndex(ctx, txn, col, oldDoc, newDoc) if err != nil { return err } - for _, index := range indexes { - err := index.Update(ctx, txn, oldDoc, newDoc) - if err != nil { - return err - } - } return txn.Commit(ctx) } @@ -264,13 +261,23 @@ func (db *explicitTxnDB) UpdateDocIndex( oldDoc *client.Document, newDoc *client.Document, ) error { - indexes, err := db.getCollectionIndexes(ctx, db.txn, col) + return db.updateDocIndex(ctx, db.txn, col, oldDoc, newDoc) +} + +func (db *db) updateDocIndex( + ctx context.Context, + txn datastore.Txn, + col client.Collection, + oldDoc *client.Document, + newDoc *client.Document, +) error { + indexes, err := db.getCollectionIndexes(ctx, txn, col) if err != nil { return err } for _, index := range indexes { - err := index.Update(ctx, db.txn, oldDoc, newDoc) + err := index.Update(ctx, txn, oldDoc, newDoc) if err != nil { return err } @@ -290,17 +297,11 @@ func (db *implicitTxnDB) DeleteDocIndex( } defer txn.Discard(ctx) - indexes, err := db.getCollectionIndexes(ctx, txn, col) + err = db.deleteDocIndex(ctx, txn, col, doc) if err != nil { return err } - for _, index := range indexes { - err := index.Delete(ctx, txn, doc) - if err != nil { - return err - } - } return txn.Commit(ctx) } @@ -310,13 +311,22 @@ func (db *explicitTxnDB) DeleteDocIndex( col client.Collection, doc *client.Document, ) error { - indexes, err := db.getCollectionIndexes(ctx, db.txn, col) + return db.deleteDocIndex(ctx, db.txn, col, doc) +} + +func (db *db) deleteDocIndex( + ctx context.Context, + txn datastore.Txn, + col client.Collection, + doc *client.Document, +) error { + indexes, err := db.getCollectionIndexes(ctx, txn, col) if err != nil { return err } for _, index := range indexes { - err := index.Delete(ctx, db.txn, doc) + err := index.Delete(ctx, txn, doc) if err != nil { return err } From ed5ec478e1d23866a2c46296ff7d6d637595ea08 Mon Sep 17 00:00:00 2001 From: Islam Aleiv Date: Thu, 7 Mar 2024 21:12:02 +0100 Subject: [PATCH 4/6] Move index method from Store to Collection --- client/collection.go | 9 ++ client/db.go | 9 -- db/collection_index.go | 77 +++++++++--- db/txn_db.go | 150 ------------------------ http/client.go | 25 ---- http/client_collection.go | 12 ++ net/server.go | 9 +- tests/clients/cli/wrapper.go | 25 ---- tests/clients/cli/wrapper_collection.go | 12 ++ tests/clients/http/wrapper.go | 25 ---- 10 files changed, 99 insertions(+), 254 deletions(-) diff --git a/client/collection.go b/client/collection.go index 35df2cad33..56bf05a352 100644 --- a/client/collection.go +++ b/client/collection.go @@ -169,6 +169,15 @@ type Collection interface { // GetIndexes returns all the indexes that exist on the collection. GetIndexes(ctx context.Context) ([]IndexDescription, error) + + // CreateDocIndex creates an index for the given document. + CreateDocIndex(context.Context, *Document) error + + // UpdateDocIndex updates the index for the given document. + UpdateDocIndex(ctx context.Context, oldDoc, newDoc *Document) error + + // DeleteDocIndex deletes the index for the given document. + DeleteDocIndex(context.Context, *Document) error } // DocIDResult wraps the result of an attempt at a DocID retrieval operation. diff --git a/client/db.go b/client/db.go index 0e05cafbef..7b0cc8060f 100644 --- a/client/db.go +++ b/client/db.go @@ -215,15 +215,6 @@ type Store interface { // GetAllIndexes returns all the indexes that currently exist within this [Store]. GetAllIndexes(context.Context) (map[CollectionName][]IndexDescription, error) - // CreateDocIndex creates an index for the given document. - CreateDocIndex(context.Context, Collection, *Document) error - - // UpdateDocIndex updates the index for the given document. - UpdateDocIndex(ctx context.Context, col Collection, oldDoc, newDoc *Document) error - - // DeleteDocIndex deletes the index for the given document. - DeleteDocIndex(context.Context, Collection, *Document) error - // ExecRequest executes the given GQL request against the [Store]. ExecRequest(context.Context, string) *RequestResult } diff --git a/db/collection_index.go b/db/collection_index.go index 621d05153a..7fb036498a 100644 --- a/db/collection_index.go +++ b/db/collection_index.go @@ -107,24 +107,53 @@ func (db *db) fetchCollectionIndexDescriptions( return indexDescriptions, nil } -func (db *db) getCollectionIndexes( - ctx context.Context, - txn datastore.Txn, - col client.Collection, -) ([]CollectionIndex, error) { - indexDescriptions, err := db.fetchCollectionIndexDescriptions(ctx, txn, col.ID()) +func (c *collection) CreateDocIndex(ctx context.Context, doc *client.Document) error { + txn, err := c.getTxn(ctx, false) if err != nil { - return nil, err + return err } - colIndexes := make([]CollectionIndex, 0, len(indexDescriptions)) - for _, indexDesc := range indexDescriptions { - index, err := NewCollectionIndex(col, indexDesc) - if err != nil { - return nil, err - } - colIndexes = append(colIndexes, index) + defer c.discardImplicitTxn(ctx, txn) + + err = c.indexNewDoc(ctx, txn, doc) + if err != nil { + return err + } + + return c.commitImplicitTxn(ctx, txn) +} + +func (c *collection) UpdateDocIndex(ctx context.Context, oldDoc, newDoc *client.Document) error { + txn, err := c.getTxn(ctx, false) + if err != nil { + return err + } + defer c.discardImplicitTxn(ctx, txn) + + err = c.deleteIndexedDoc(ctx, txn, oldDoc) + if err != nil { + return err + } + err = c.indexNewDoc(ctx, txn, newDoc) + if err != nil { + return err } - return colIndexes, nil + + return c.commitImplicitTxn(ctx, txn) +} + +func (c *collection) DeleteDocIndex(ctx context.Context, doc *client.Document) error { + txn, err := c.getTxn(ctx, false) + if err != nil { + return err + } + defer c.discardImplicitTxn(ctx, txn) + + err = c.deleteIndexedDoc(ctx, txn, doc) + if err != nil { + return err + } + + return c.commitImplicitTxn(ctx, txn) } func (c *collection) indexNewDoc(ctx context.Context, txn datastore.Txn, doc *client.Document) error { @@ -169,6 +198,24 @@ func (c *collection) updateIndexedDoc( return nil } +func (c *collection) deleteIndexedDoc( + ctx context.Context, + txn datastore.Txn, + doc *client.Document, +) error { + err := c.loadIndexes(ctx, txn) + if err != nil { + return err + } + for _, index := range c.indexes { + err = index.Delete(ctx, txn, doc) + if err != nil { + return err + } + } + return nil +} + // CreateIndex creates a new index on the collection. // // If the index name is empty, a name will be automatically generated. diff --git a/db/txn_db.go b/db/txn_db.go index 8af7db48df..f2fbe7cea3 100644 --- a/db/txn_db.go +++ b/db/txn_db.go @@ -184,156 +184,6 @@ func (db *explicitTxnDB) GetAllIndexes( return db.getAllIndexDescriptions(ctx, db.txn) } -// CreateDocIndex creates a new index for the given document. -func (db *implicitTxnDB) CreateDocIndex( - ctx context.Context, - col client.Collection, - doc *client.Document, -) error { - txn, err := db.NewTxn(ctx, true) - if err != nil { - return err - } - defer txn.Discard(ctx) - - err = db.createDocIndex(ctx, txn, col, doc) - if err != nil { - return err - } - - return txn.Commit(ctx) -} - -// CreateDocIndex creates a new index for the given document. -func (db *explicitTxnDB) CreateDocIndex( - ctx context.Context, - col client.Collection, - doc *client.Document, -) error { - return db.createDocIndex(ctx, db.txn, col, doc) -} - -func (db *db) createDocIndex( - ctx context.Context, - txn datastore.Txn, - col client.Collection, - doc *client.Document, -) error { - indexes, err := db.getCollectionIndexes(ctx, txn, col) - if err != nil { - return err - } - - for _, index := range indexes { - err := index.Save(ctx, txn, doc) - if err != nil { - return err - } - } - return nil -} - -// UpdateDocIndex updates the indexes for the given document. -func (db *implicitTxnDB) UpdateDocIndex( - ctx context.Context, - col client.Collection, - oldDoc *client.Document, - newDoc *client.Document, -) error { - txn, err := db.NewTxn(ctx, true) - if err != nil { - return err - } - defer txn.Discard(ctx) - - err = db.updateDocIndex(ctx, txn, col, oldDoc, newDoc) - if err != nil { - return err - } - - return txn.Commit(ctx) -} - -// UpdateDocIndex updates the indexes for the given document. -func (db *explicitTxnDB) UpdateDocIndex( - ctx context.Context, - col client.Collection, - oldDoc *client.Document, - newDoc *client.Document, -) error { - return db.updateDocIndex(ctx, db.txn, col, oldDoc, newDoc) -} - -func (db *db) updateDocIndex( - ctx context.Context, - txn datastore.Txn, - col client.Collection, - oldDoc *client.Document, - newDoc *client.Document, -) error { - indexes, err := db.getCollectionIndexes(ctx, txn, col) - if err != nil { - return err - } - - for _, index := range indexes { - err := index.Update(ctx, txn, oldDoc, newDoc) - if err != nil { - return err - } - } - return nil -} - -// DeleteDocIndex deletes the indexes for the given document. -func (db *implicitTxnDB) DeleteDocIndex( - ctx context.Context, - col client.Collection, - doc *client.Document, -) error { - txn, err := db.NewTxn(ctx, true) - if err != nil { - return err - } - defer txn.Discard(ctx) - - err = db.deleteDocIndex(ctx, txn, col, doc) - if err != nil { - return err - } - - return txn.Commit(ctx) -} - -// DeleteDocIndex deletes the indexes for the given document. -func (db *explicitTxnDB) DeleteDocIndex( - ctx context.Context, - col client.Collection, - doc *client.Document, -) error { - return db.deleteDocIndex(ctx, db.txn, col, doc) -} - -func (db *db) deleteDocIndex( - ctx context.Context, - txn datastore.Txn, - col client.Collection, - doc *client.Document, -) error { - indexes, err := db.getCollectionIndexes(ctx, txn, col) - if err != nil { - return err - } - - for _, index := range indexes { - err := index.Delete(ctx, txn, doc) - if err != nil { - return err - } - } - return nil -} - // AddSchema takes the provided GQL schema in SDL format, and applies it to the database, // creating the necessary collections, request types, etc. // diff --git a/http/client.go b/http/client.go index edfb4acb8e..142a359c5b 100644 --- a/http/client.go +++ b/http/client.go @@ -434,28 +434,3 @@ func (c *Client) Events() events.Events { func (c *Client) MaxTxnRetries() int { panic("client side database") } - -func (c *Client) CreateDocIndex( - ctx context.Context, - col client.Collection, - doc *client.Document, -) error { - panic("client side database") -} - -func (c *Client) UpdateDocIndex( - ctx context.Context, - col client.Collection, - oldDoc *client.Document, - newDoc *client.Document, -) error { - panic("client side database") -} - -func (w *Client) DeleteDocIndex( - ctx context.Context, - col client.Collection, - newDoc *client.Document, -) error { - panic("client side database") -} diff --git a/http/client_collection.go b/http/client_collection.go index b44f5045fc..85c746cc59 100644 --- a/http/client_collection.go +++ b/http/client_collection.go @@ -457,3 +457,15 @@ func (c *Collection) GetIndexes(ctx context.Context) ([]client.IndexDescription, } return indexes, nil } + +func (c *Collection) CreateDocIndex(context.Context, *client.Document) error { + return nil +} + +func (c *Collection) UpdateDocIndex(ctx context.Context, oldDoc, newDoc *client.Document) error { + return nil +} + +func (c *Collection) DeleteDocIndex(context.Context, *client.Document) error { + return nil +} diff --git a/net/server.go b/net/server.go index 39e4b829cd..206ccb3b53 100644 --- a/net/server.go +++ b/net/server.go @@ -295,7 +295,7 @@ func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushL session.Wait() bp.mergeBlocks(ctx) - err = s.syncIndexedDocs(ctx, col.WithTxn(txn), docID, store) + err = s.syncIndexedDocs(ctx, col.WithTxn(txn), docID) if err != nil { return nil, err } @@ -358,7 +358,6 @@ func (s *server) syncIndexedDocs( ctx context.Context, col client.Collection, docID client.DocID, - store client.Store, ) error { preTxnCol, err := s.db.GetCollectionByName(ctx, col.Name().Value()) if err != nil { @@ -378,11 +377,11 @@ func (s *server) syncIndexedDocs( } if isDeletedDoc { - return store.DeleteDocIndex(ctx, col, oldDoc) + return preTxnCol.DeleteDocIndex(ctx, oldDoc) } else if isNewDoc { - return store.CreateDocIndex(ctx, col, doc) + return col.CreateDocIndex(ctx, doc) } else { - return store.UpdateDocIndex(ctx, col, oldDoc, doc) + return col.UpdateDocIndex(ctx, oldDoc, doc) } } diff --git a/tests/clients/cli/wrapper.go b/tests/clients/cli/wrapper.go index 393acf411d..89ba2cf3db 100644 --- a/tests/clients/cli/wrapper.go +++ b/tests/clients/cli/wrapper.go @@ -522,28 +522,3 @@ func (w *Wrapper) WaitForPushLogByPeerEvent(id peer.ID) error { func (w *Wrapper) WaitForPushLogFromPeerEvent(id peer.ID) error { return w.node.WaitForPushLogFromPeerEvent(id) } - -func (w *Wrapper) CreateDocIndex( - ctx context.Context, - col client.Collection, - doc *client.Document, -) error { - panic("client side database") -} - -func (w *Wrapper) UpdateDocIndex( - ctx context.Context, - col client.Collection, - oldDoc *client.Document, - newDoc *client.Document, -) error { - panic("client side database") -} - -func (w *Wrapper) DeleteDocIndex( - ctx context.Context, - col client.Collection, - newDoc *client.Document, -) error { - panic("client side database") -} diff --git a/tests/clients/cli/wrapper_collection.go b/tests/clients/cli/wrapper_collection.go index 08e5119312..ed90413e8f 100644 --- a/tests/clients/cli/wrapper_collection.go +++ b/tests/clients/cli/wrapper_collection.go @@ -460,3 +460,15 @@ func (c *Collection) GetIndexes(ctx context.Context) ([]client.IndexDescription, } return indexes, nil } + +func (c *Collection) CreateDocIndex(context.Context, *client.Document) error { + return nil +} + +func (c *Collection) UpdateDocIndex(ctx context.Context, oldDoc, newDoc *client.Document) error { + return nil +} + +func (c *Collection) DeleteDocIndex(context.Context, *client.Document) error { + return nil +} diff --git a/tests/clients/http/wrapper.go b/tests/clients/http/wrapper.go index 9c52928c13..b45105a7f7 100644 --- a/tests/clients/http/wrapper.go +++ b/tests/clients/http/wrapper.go @@ -226,28 +226,3 @@ func (w *Wrapper) WaitForPushLogByPeerEvent(id peer.ID) error { func (w *Wrapper) WaitForPushLogFromPeerEvent(id peer.ID) error { return w.node.WaitForPushLogFromPeerEvent(id) } - -func (w *Wrapper) CreateDocIndex( - ctx context.Context, - col client.Collection, - doc *client.Document, -) error { - return w.client.CreateDocIndex(ctx, col, doc) -} - -func (w *Wrapper) UpdateDocIndex( - ctx context.Context, - col client.Collection, - oldDoc *client.Document, - newDoc *client.Document, -) error { - return w.client.UpdateDocIndex(ctx, col, oldDoc, newDoc) -} - -func (w *Wrapper) DeleteDocIndex( - ctx context.Context, - col client.Collection, - doc *client.Document, -) error { - return w.client.DeleteDocIndex(ctx, col, doc) -} From 669f247b6226c63a64c74463d897be92527aa11b Mon Sep 17 00:00:00 2001 From: Islam Aleiv Date: Thu, 7 Mar 2024 22:34:22 +0100 Subject: [PATCH 5/6] Add documentation to new index method --- client/collection.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/client/collection.go b/client/collection.go index 56bf05a352..58b53c3af0 100644 --- a/client/collection.go +++ b/client/collection.go @@ -171,12 +171,18 @@ type Collection interface { GetIndexes(ctx context.Context) ([]IndexDescription, error) // CreateDocIndex creates an index for the given document. + // WARNING: This method is only for internal use and is not supposed to be called by the client + // as it might compromise the integrity of the database. This method will be removed in the future CreateDocIndex(context.Context, *Document) error // UpdateDocIndex updates the index for the given document. + // WARNING: This method is only for internal use and is not supposed to be called by the client + // as it might compromise the integrity of the database. This method will be removed in the future UpdateDocIndex(ctx context.Context, oldDoc, newDoc *Document) error // DeleteDocIndex deletes the index for the given document. + // WARNING: This method is only for internal use and is not supposed to be called by the client + // as it might compromise the integrity of the database. This method will be removed in the future DeleteDocIndex(context.Context, *Document) error } From b89c0a2211463e576e571aa31201acf90dab3975 Mon Sep 17 00:00:00 2001 From: Islam Aleiv Date: Thu, 7 Mar 2024 22:34:49 +0100 Subject: [PATCH 6/6] Make implementations return not-implemented error --- http/client_collection.go | 6 +++--- http/errors.go | 20 +++++++++++--------- tests/clients/cli/errors.go | 24 ++++++++++++++++++++++++ tests/clients/cli/wrapper_collection.go | 6 +++--- 4 files changed, 41 insertions(+), 15 deletions(-) create mode 100644 tests/clients/cli/errors.go diff --git a/http/client_collection.go b/http/client_collection.go index 85c746cc59..876c175338 100644 --- a/http/client_collection.go +++ b/http/client_collection.go @@ -459,13 +459,13 @@ func (c *Collection) GetIndexes(ctx context.Context) ([]client.IndexDescription, } func (c *Collection) CreateDocIndex(context.Context, *client.Document) error { - return nil + return ErrMethodIsNotImplemented } func (c *Collection) UpdateDocIndex(ctx context.Context, oldDoc, newDoc *client.Document) error { - return nil + return ErrMethodIsNotImplemented } func (c *Collection) DeleteDocIndex(context.Context, *client.Document) error { - return nil + return ErrMethodIsNotImplemented } diff --git a/http/errors.go b/http/errors.go index b78771723f..1510c2e520 100644 --- a/http/errors.go +++ b/http/errors.go @@ -17,7 +17,8 @@ import ( ) const ( - errFailedToLoadKeys string = "failed to load given keys" + errFailedToLoadKeys string = "failed to load given keys" + errMethodIsNotImplemented string = "the method is not implemented" ) // Errors returnable from this package. @@ -25,14 +26,15 @@ const ( // This list is incomplete. Undefined errors may also be returned. // Errors returned from this package may be tested against these errors with errors.Is. var ( - ErrNoListener = errors.New("cannot serve with no listener") - ErrNoEmail = errors.New("email address must be specified for tls with autocert") - ErrInvalidRequestBody = errors.New("invalid request body") - ErrStreamingNotSupported = errors.New("streaming not supported") - ErrMigrationNotFound = errors.New("migration not found") - ErrMissingRequest = errors.New("missing request") - ErrInvalidTransactionId = errors.New("invalid transaction id") - ErrP2PDisabled = errors.New("p2p network is disabled") + ErrNoListener = errors.New("cannot serve with no listener") + ErrNoEmail = errors.New("email address must be specified for tls with autocert") + ErrInvalidRequestBody = errors.New("invalid request body") + ErrStreamingNotSupported = errors.New("streaming not supported") + ErrMigrationNotFound = errors.New("migration not found") + ErrMissingRequest = errors.New("missing request") + ErrInvalidTransactionId = errors.New("invalid transaction id") + ErrP2PDisabled = errors.New("p2p network is disabled") + ErrMethodIsNotImplemented = errors.New(errMethodIsNotImplemented) ) type errorResponse struct { diff --git a/tests/clients/cli/errors.go b/tests/clients/cli/errors.go new file mode 100644 index 0000000000..08915de170 --- /dev/null +++ b/tests/clients/cli/errors.go @@ -0,0 +1,24 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package cli + +import ( + "github.com/sourcenetwork/defradb/errors" +) + +const ( + errMethodIsNotImplemented string = "the method is not implemented" +) + +// Errors returnable from this package. +var ( + ErrMethodIsNotImplemented = errors.New(errMethodIsNotImplemented) +) diff --git a/tests/clients/cli/wrapper_collection.go b/tests/clients/cli/wrapper_collection.go index ed90413e8f..be7c3302ac 100644 --- a/tests/clients/cli/wrapper_collection.go +++ b/tests/clients/cli/wrapper_collection.go @@ -462,13 +462,13 @@ func (c *Collection) GetIndexes(ctx context.Context) ([]client.IndexDescription, } func (c *Collection) CreateDocIndex(context.Context, *client.Document) error { - return nil + return ErrMethodIsNotImplemented } func (c *Collection) UpdateDocIndex(ctx context.Context, oldDoc, newDoc *client.Document) error { - return nil + return ErrMethodIsNotImplemented } func (c *Collection) DeleteDocIndex(context.Context, *client.Document) error { - return nil + return ErrMethodIsNotImplemented }