diff --git a/client/db.go b/client/db.go index 7b0cc8060f..0e05cafbef 100644 --- a/client/db.go +++ b/client/db.go @@ -215,6 +215,15 @@ type Store interface { // GetAllIndexes returns all the indexes that currently exist within this [Store]. GetAllIndexes(context.Context) (map[CollectionName][]IndexDescription, error) + // CreateDocIndex creates an index for the given document. + CreateDocIndex(context.Context, Collection, *Document) error + + // UpdateDocIndex updates the index for the given document. + UpdateDocIndex(ctx context.Context, col Collection, oldDoc, newDoc *Document) error + + // DeleteDocIndex deletes the index for the given document. + DeleteDocIndex(context.Context, Collection, *Document) error + // ExecRequest executes the given GQL request against the [Store]. ExecRequest(context.Context, string) *RequestResult } diff --git a/client/index.go b/client/index.go index 9175cf7c0d..6f87626c98 100644 --- a/client/index.go +++ b/client/index.go @@ -10,6 +10,12 @@ package client +import ( + "context" + + "github.com/sourcenetwork/defradb/datastore" +) + // IndexFieldDescription describes how a field is being indexed. type IndexedFieldDescription struct { // Name contains the name of the field. @@ -30,6 +36,20 @@ type IndexDescription struct { Unique bool } +// CollectionIndex is an interface for indexing documents in a collection. +type CollectionIndex interface { + // Save indexes a document by storing it + Save(context.Context, datastore.Txn, *Document) error + // Update updates an existing document in the index + Update(context.Context, datastore.Txn, *Document, *Document) error + // Delete deletes an existing document from the index + Delete(context.Context, datastore.Txn, *Document) error + // Name returns the name of the index + Name() string + // Description returns the description of the index + Description() IndexDescription +} + // CollectIndexedFields returns all fields that are indexed by all collection indexes. func (d CollectionDefinition) CollectIndexedFields() []FieldDefinition { fieldsMap := make(map[string]bool) diff --git a/db/collection_index.go b/db/collection_index.go index b95154097c..ae732477c3 100644 --- a/db/collection_index.go +++ b/db/collection_index.go @@ -57,8 +57,8 @@ func (db *db) dropCollectionIndex( return col.DropIndex(ctx, indexName) } -// getAllIndexes returns all the indexes in the database. -func (db *db) getAllIndexes( +// getAllIndexDescriptions returns all the index descriptions in the database. +func (db *db) getAllIndexDescriptions( ctx context.Context, txn datastore.Txn, ) (map[client.CollectionName][]client.IndexDescription, error) { @@ -107,6 +107,22 @@ func (db *db) fetchCollectionIndexDescriptions( return indexDescriptions, nil } +func (db *db) getCollectionIndexes(ctx context.Context, txn datastore.Txn, col client.Collection) ([]CollectionIndex, error) { + indexDescriptions, err := db.fetchCollectionIndexDescriptions(ctx, txn, col.ID()) + if err != nil { + return nil, err + } + colIndexes := make([]CollectionIndex, 0, len(indexDescriptions)) + for _, indexDesc := range indexDescriptions { + index, err := NewCollectionIndex(col, indexDesc) + if err != nil { + return nil, err + } + colIndexes = append(colIndexes, index) + } + return colIndexes, nil +} + func (c *collection) indexNewDoc(ctx context.Context, txn datastore.Txn, doc *client.Document) error { err := c.loadIndexes(ctx, txn) if err != nil { @@ -133,7 +149,8 @@ func (c *collection) updateIndexedDoc( oldDoc, err := c.get( ctx, txn, - c.getPrimaryKeyFromDocID(doc.ID()), c.Definition().CollectIndexedFields(), + c.getPrimaryKeyFromDocID(doc.ID()), + c.Definition().CollectIndexedFields(), false, ) if err != nil { diff --git a/db/index.go b/db/index.go index ddec525598..319cdeb8a7 100644 --- a/db/index.go +++ b/db/index.go @@ -25,16 +25,9 @@ import ( // It abstracts away common index functionality to be implemented // by different index types: non-unique, unique, and composite type CollectionIndex interface { - // Save indexes a document by storing it - Save(context.Context, datastore.Txn, *client.Document) error - // Update updates an existing document in the index - Update(context.Context, datastore.Txn, *client.Document, *client.Document) error + client.CollectionIndex // RemoveAll removes all documents from the index RemoveAll(context.Context, datastore.Txn) error - // Name returns the name of the index - Name() string - // Description returns the description of the index - Description() client.IndexDescription } func canConvertIndexFieldValue[T any](val any) bool { @@ -248,6 +241,14 @@ func (index *collectionSimpleIndex) Update( return index.Save(ctx, txn, newDoc) } +func (index *collectionSimpleIndex) Delete( + ctx context.Context, + txn datastore.Txn, + doc *client.Document, +) error { + return index.deleteDocIndex(ctx, txn, doc) +} + func (index *collectionSimpleIndex) deleteDocIndex( ctx context.Context, txn datastore.Txn, @@ -358,6 +359,14 @@ func (index *collectionUniqueIndex) prepareIndexRecordToStore( return key, val, nil } +func (index *collectionUniqueIndex) Delete( + ctx context.Context, + txn datastore.Txn, + doc *client.Document, +) error { + return index.deleteDocIndex(ctx, txn, doc) +} + func (index *collectionUniqueIndex) Update( ctx context.Context, txn datastore.Txn, diff --git a/db/index_test.go b/db/index_test.go index 2b0dbdc8b6..44c2e45f52 100644 --- a/db/index_test.go +++ b/db/index_test.go @@ -263,7 +263,7 @@ func (f *indexTestFixture) createCollectionIndexFor( } func (f *indexTestFixture) getAllIndexes() (map[client.CollectionName][]client.IndexDescription, error) { - return f.db.getAllIndexes(f.ctx, f.txn) + return f.db.getAllIndexDescriptions(f.ctx, f.txn) } func (f *indexTestFixture) getCollectionIndexes(colID uint32) ([]client.IndexDescription, error) { diff --git a/db/txn_db.go b/db/txn_db.go index 455b062b93..662273ea96 100644 --- a/db/txn_db.go +++ b/db/txn_db.go @@ -174,14 +174,154 @@ func (db *implicitTxnDB) GetAllIndexes( } defer txn.Discard(ctx) - return db.getAllIndexes(ctx, txn) + return db.getAllIndexDescriptions(ctx, txn) } // GetAllIndexes gets all the indexes in the database. func (db *explicitTxnDB) GetAllIndexes( ctx context.Context, ) (map[client.CollectionName][]client.IndexDescription, error) { - return db.getAllIndexes(ctx, db.txn) + return db.getAllIndexDescriptions(ctx, db.txn) +} + +// CreateDocIndex creates a new index for the given document. +func (db *implicitTxnDB) CreateDocIndex( + ctx context.Context, + col client.Collection, + doc *client.Document, +) error { + txn, err := db.NewTxn(ctx, true) + if err != nil { + return err + } + defer txn.Discard(ctx) + + indexes, err := db.getCollectionIndexes(ctx, txn, col) + if err != nil { + return err + } + + for _, index := range indexes { + err := index.Save(ctx, txn, doc) + if err != nil { + return err + } + } + return txn.Commit(ctx) +} + +// CreateDocIndex creates a new index for the given document. +func (db *explicitTxnDB) CreateDocIndex( + ctx context.Context, + col client.Collection, + doc *client.Document, +) error { + indexes, err := db.getCollectionIndexes(ctx, db.txn, col) + if err != nil { + return err + } + + for _, index := range indexes { + err := index.Save(ctx, db.txn, doc) + if err != nil { + return err + } + } + return nil +} + +// UpdateDocIndex updates the indexes for the given document. +func (db *implicitTxnDB) UpdateDocIndex( + ctx context.Context, + col client.Collection, + oldDoc *client.Document, + newDoc *client.Document, +) error { + txn, err := db.NewTxn(ctx, true) + if err != nil { + return err + } + defer txn.Discard(ctx) + + indexes, err := db.getCollectionIndexes(ctx, txn, col) + if err != nil { + return err + } + + for _, index := range indexes { + err := index.Update(ctx, txn, oldDoc, newDoc) + if err != nil { + return err + } + } + return txn.Commit(ctx) +} + +// UpdateDocIndex updates the indexes for the given document. +func (db *explicitTxnDB) UpdateDocIndex( + ctx context.Context, + col client.Collection, + oldDoc *client.Document, + newDoc *client.Document, +) error { + indexes, err := db.getCollectionIndexes(ctx, db.txn, col) + if err != nil { + return err + } + + for _, index := range indexes { + err := index.Update(ctx, db.txn, oldDoc, newDoc) + if err != nil { + return err + } + } + return nil +} + +// DeleteDocIndex deletes the indexes for the given document. +func (db *implicitTxnDB) DeleteDocIndex( + ctx context.Context, + col client.Collection, + doc *client.Document, +) error { + txn, err := db.NewTxn(ctx, true) + if err != nil { + return err + } + defer txn.Discard(ctx) + + indexes, err := db.getCollectionIndexes(ctx, txn, col) + if err != nil { + return err + } + + for _, index := range indexes { + err := index.Delete(ctx, txn, doc) + if err != nil { + return err + } + } + return txn.Commit(ctx) +} + +// DeleteDocIndex deletes the indexes for the given document. +func (db *explicitTxnDB) DeleteDocIndex( + ctx context.Context, + col client.Collection, + doc *client.Document, +) error { + indexes, err := db.getCollectionIndexes(ctx, db.txn, col) + if err != nil { + return err + } + + for _, index := range indexes { + err := index.Delete(ctx, db.txn, doc) + if err != nil { + return err + } + } + return nil } // AddSchema takes the provided GQL schema in SDL format, and applies it to the database, diff --git a/http/client.go b/http/client.go index b8db184b9b..edfb4acb8e 100644 --- a/http/client.go +++ b/http/client.go @@ -344,7 +344,7 @@ func (c *Client) ExecRequest(ctx context.Context, query string) *client.RequestR return result } if res.Header.Get("Content-Type") == "text/event-stream" { - result.Pub = c.execRequestSubscription(ctx, res.Body) + result.Pub = c.execRequestSubscription(res.Body) return result } // ignore close errors because they have @@ -367,7 +367,7 @@ func (c *Client) ExecRequest(ctx context.Context, query string) *client.RequestR return result } -func (c *Client) execRequestSubscription(ctx context.Context, r io.ReadCloser) *events.Publisher[events.Update] { +func (c *Client) execRequestSubscription(r io.ReadCloser) *events.Publisher[events.Update] { pubCh := events.New[events.Update](0, 0) pub, err := events.NewPublisher[events.Update](pubCh, 0) if err != nil { @@ -434,3 +434,28 @@ func (c *Client) Events() events.Events { func (c *Client) MaxTxnRetries() int { panic("client side database") } + +func (c *Client) CreateDocIndex( + ctx context.Context, + col client.Collection, + doc *client.Document, +) error { + panic("client side database") +} + +func (c *Client) UpdateDocIndex( + ctx context.Context, + col client.Collection, + oldDoc *client.Document, + newDoc *client.Document, +) error { + panic("client side database") +} + +func (w *Client) DeleteDocIndex( + ctx context.Context, + col client.Collection, + newDoc *client.Document, +) error { + panic("client side database") +} diff --git a/net/server.go b/net/server.go index 8bcce0f226..59dff68622 100644 --- a/net/server.go +++ b/net/server.go @@ -247,7 +247,6 @@ func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushL return &pb.PushLogReply{}, nil } - schemaRoot := string(req.Body.SchemaRoot) dsKey := core.DataStoreKeyFromDocID(docID) var txnErr error @@ -263,24 +262,9 @@ func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushL // Currently a schema is the best way we have to link a push log request to a collection, // this will change with https://github.com/sourcenetwork/defradb/issues/1085 - cols, err := store.GetCollections( - ctx, - client.CollectionFetchOptions{ - SchemaRoot: immutable.Some(schemaRoot), - }, - ) + col, err := s.getActiveCollection(ctx, store, string(req.Body.SchemaRoot)) if err != nil { - return nil, errors.Wrap(fmt.Sprintf("Failed to get collection from schemaRoot %s", schemaRoot), err) - } - if len(cols) == 0 { - return nil, client.NewErrCollectionNotFoundForSchema(schemaRoot) - } - var col client.Collection - for _, c := range cols { - if col != nil && col.Name().HasValue() && !c.Name().HasValue() { - continue - } - col = c + return nil, err } // Create a new DAG service with the current transaction @@ -311,6 +295,11 @@ func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushL session.Wait() bp.mergeBlocks(ctx) + err = s.syncIndexedDocs(ctx, col.WithTxn(txn), docID, store) + if err != nil { + return nil, err + } + // dagWorkers specific to the DocID will have been spawned within handleChildBlocks. // Once we are done with the dag syncing process, we can get rid of those workers. if s.peer.closeJob != nil { @@ -338,6 +327,56 @@ func (s *server) PushLog(ctx context.Context, req *pb.PushLogRequest) (*pb.PushL return &pb.PushLogReply{}, client.NewErrMaxTxnRetries(txnErr) } +func (*server) getActiveCollection(ctx context.Context, store client.Store, schemaRoot string) (client.Collection, error) { + cols, err := store.GetCollections( + ctx, + client.CollectionFetchOptions{ + SchemaRoot: immutable.Some(schemaRoot), + }, + ) + if err != nil { + return nil, errors.Wrap(fmt.Sprintf("Failed to get collection from schemaRoot %s", schemaRoot), err) + } + if len(cols) == 0 { + return nil, client.NewErrCollectionNotFoundForSchema(schemaRoot) + } + var col client.Collection + for _, c := range cols { + if col != nil && col.Name().HasValue() && !c.Name().HasValue() { + continue + } + col = c + } + return col, nil +} + +func (s *server) syncIndexedDocs(ctx context.Context, col client.Collection, docID client.DocID, store client.Store) error { + preTxnCol, err := s.db.GetCollectionByName(ctx, col.Name().Value()) + if err != nil { + return err + } + + oldDoc, err := preTxnCol.Get(ctx, docID, false) + isNewDoc := errors.Is(err, client.ErrDocumentNotFound) + if !isNewDoc && err != nil { + return err + } + + doc, err := col.Get(ctx, docID, false) + isDeletedDoc := errors.Is(err, client.ErrDocumentNotFound) + if !isDeletedDoc && err != nil { + return err + } + + if isDeletedDoc { + return store.DeleteDocIndex(ctx, col, oldDoc) + } else if isNewDoc { + return store.CreateDocIndex(ctx, col, doc) + } else { + return store.UpdateDocIndex(ctx, col, oldDoc, doc) + } +} + // GetHeadLog receives a get head log request func (s *server) GetHeadLog( ctx context.Context, diff --git a/tests/clients/cli/wrapper.go b/tests/clients/cli/wrapper.go index cd20da888d..393acf411d 100644 --- a/tests/clients/cli/wrapper.go +++ b/tests/clients/cli/wrapper.go @@ -365,7 +365,7 @@ func (w *Wrapper) ExecRequest(ctx context.Context, query string) *client.Request result := &client.RequestResult{} - stdOut, stdErr, err := w.cmd.executeStream(ctx, args) + stdOut, stdErr, err := w.cmd.executeStream(args) if err != nil { result.GQL.Errors = []error{err} return result @@ -377,7 +377,7 @@ func (w *Wrapper) ExecRequest(ctx context.Context, query string) *client.Request return result } if header == cli.SUB_RESULTS_HEADER { - result.Pub = w.execRequestSubscription(ctx, buffer) + result.Pub = w.execRequestSubscription(buffer) return result } data, err := io.ReadAll(buffer) @@ -405,7 +405,7 @@ func (w *Wrapper) ExecRequest(ctx context.Context, query string) *client.Request return result } -func (w *Wrapper) execRequestSubscription(ctx context.Context, r io.Reader) *events.Publisher[events.Update] { +func (w *Wrapper) execRequestSubscription(r io.Reader) *events.Publisher[events.Update] { pubCh := events.New[events.Update](0, 0) pub, err := events.NewPublisher[events.Update](pubCh, 0) if err != nil { @@ -522,3 +522,28 @@ func (w *Wrapper) WaitForPushLogByPeerEvent(id peer.ID) error { func (w *Wrapper) WaitForPushLogFromPeerEvent(id peer.ID) error { return w.node.WaitForPushLogFromPeerEvent(id) } + +func (w *Wrapper) CreateDocIndex( + ctx context.Context, + col client.Collection, + doc *client.Document, +) error { + panic("client side database") +} + +func (w *Wrapper) UpdateDocIndex( + ctx context.Context, + col client.Collection, + oldDoc *client.Document, + newDoc *client.Document, +) error { + panic("client side database") +} + +func (w *Wrapper) DeleteDocIndex( + ctx context.Context, + col client.Collection, + newDoc *client.Document, +) error { + panic("client side database") +} diff --git a/tests/clients/cli/wrapper_cli.go b/tests/clients/cli/wrapper_cli.go index e8dc24270c..2a985dcb18 100644 --- a/tests/clients/cli/wrapper_cli.go +++ b/tests/clients/cli/wrapper_cli.go @@ -38,8 +38,8 @@ func (w *cliWrapper) withTxn(tx datastore.Txn) *cliWrapper { } } -func (w *cliWrapper) execute(ctx context.Context, args []string) ([]byte, error) { - stdOut, stdErr, err := w.executeStream(ctx, args) +func (w *cliWrapper) execute(_ context.Context, args []string) ([]byte, error) { + stdOut, stdErr, err := w.executeStream(args) if err != nil { return nil, err } @@ -57,7 +57,7 @@ func (w *cliWrapper) execute(ctx context.Context, args []string) ([]byte, error) return stdOutData, nil } -func (w *cliWrapper) executeStream(ctx context.Context, args []string) (io.ReadCloser, io.ReadCloser, error) { +func (w *cliWrapper) executeStream(args []string) (io.ReadCloser, io.ReadCloser, error) { stdOutRead, stdOutWrite := io.Pipe() stdErrRead, stdErrWrite := io.Pipe() diff --git a/tests/clients/cli/wrapper_collection.go b/tests/clients/cli/wrapper_collection.go index 4d94d7b20f..08e5119312 100644 --- a/tests/clients/cli/wrapper_collection.go +++ b/tests/clients/cli/wrapper_collection.go @@ -364,7 +364,7 @@ func (c *Collection) GetAllDocIDs(ctx context.Context) (<-chan client.DocIDResul args := []string{"client", "collection", "docIDs"} args = append(args, "--name", c.Description().Name.Value()) - stdOut, _, err := c.cmd.executeStream(ctx, args) + stdOut, _, err := c.cmd.executeStream(args) if err != nil { return nil, err } diff --git a/tests/clients/http/wrapper.go b/tests/clients/http/wrapper.go index b45105a7f7..9c52928c13 100644 --- a/tests/clients/http/wrapper.go +++ b/tests/clients/http/wrapper.go @@ -226,3 +226,28 @@ func (w *Wrapper) WaitForPushLogByPeerEvent(id peer.ID) error { func (w *Wrapper) WaitForPushLogFromPeerEvent(id peer.ID) error { return w.node.WaitForPushLogFromPeerEvent(id) } + +func (w *Wrapper) CreateDocIndex( + ctx context.Context, + col client.Collection, + doc *client.Document, +) error { + return w.client.CreateDocIndex(ctx, col, doc) +} + +func (w *Wrapper) UpdateDocIndex( + ctx context.Context, + col client.Collection, + oldDoc *client.Document, + newDoc *client.Document, +) error { + return w.client.UpdateDocIndex(ctx, col, oldDoc, newDoc) +} + +func (w *Wrapper) DeleteDocIndex( + ctx context.Context, + col client.Collection, + doc *client.Document, +) error { + return w.client.DeleteDocIndex(ctx, col, doc) +} diff --git a/tests/integration/index/index_p2p_test.go b/tests/integration/index/index_p2p_test.go new file mode 100644 index 0000000000..6399b64be4 --- /dev/null +++ b/tests/integration/index/index_p2p_test.go @@ -0,0 +1,189 @@ +// Copyright 2023 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package index + +import ( + "testing" + + testUtils "github.com/sourcenetwork/defradb/tests/integration" + "github.com/sourcenetwork/immutable" +) + +func TestIndexP2P_IfPeerCreatedDoc_ListeningPeerShouldIndexIt(t *testing.T) { + test := testUtils.TestCase{ + Actions: []any{ + testUtils.RandomNetworkingConfig(), + testUtils.RandomNetworkingConfig(), + testUtils.SchemaUpdate{ + Schema: ` + type Users { + name: String + } + `, + }, + testUtils.CreateIndex{ + CollectionID: 0, + FieldName: "name", + }, + testUtils.ConnectPeers{ + SourceNodeID: 1, + TargetNodeID: 0, + }, + testUtils.SubscribeToCollection{ + NodeID: 1, + CollectionIDs: []int{0}, + }, + testUtils.CreateDoc{ + NodeID: immutable.Some(0), + Doc: `{ + "name": "Fred" + }`, + }, + testUtils.WaitForSync{}, + testUtils.Request{ + NodeID: immutable.Some(1), + Request: `query { + Users (filter: {name: {_eq: "Fred"}}){ + name + } + }`, + Results: []map[string]any{ + { + "name": "Fred", + }, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +} + +func TestIndexP2P_IfPeerUpdateDoc_ListeningPeerShouldUpdateIndex(t *testing.T) { + test := testUtils.TestCase{ + Actions: []any{ + testUtils.RandomNetworkingConfig(), + testUtils.RandomNetworkingConfig(), + testUtils.SchemaUpdate{ + Schema: ` + type Users { + name: String + } + `, + }, + testUtils.CreateIndex{ + CollectionID: 0, + FieldName: "name", + }, + testUtils.ConnectPeers{ + SourceNodeID: 1, + TargetNodeID: 0, + }, + testUtils.SubscribeToCollection{ + NodeID: 1, + CollectionIDs: []int{0}, + }, + testUtils.CreateDoc{ + NodeID: immutable.Some(0), + Doc: `{ + "name": "Fred" + }`, + }, + testUtils.WaitForSync{}, + testUtils.UpdateDoc{ + NodeID: immutable.Some(0), + Doc: `{ + "name": "Islam" + }`, + }, + testUtils.WaitForSync{}, + testUtils.Request{ + NodeID: immutable.Some(1), + Request: `query { + Users (filter: {name: {_eq: "Islam"}}){ + name + } + }`, + Results: []map[string]any{ + { + "name": "Islam", + }, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +} + +func TestIndexP2P_IfPeerDeleteDoc_ListeningPeerShouldDeleteIndex(t *testing.T) { + test := testUtils.TestCase{ + Actions: []any{ + testUtils.RandomNetworkingConfig(), + testUtils.RandomNetworkingConfig(), + testUtils.SchemaUpdate{ + Schema: ` + type Users { + name: String + age: Int + } + `, + }, + testUtils.CreateIndex{ + CollectionID: 0, + FieldName: "name", + }, + testUtils.ConnectPeers{ + SourceNodeID: 1, + TargetNodeID: 0, + }, + testUtils.SubscribeToCollection{ + NodeID: 1, + CollectionIDs: []int{0}, + }, + testUtils.CreateDoc{ + NodeID: immutable.Some(0), + Doc: `{ + "name": "Fred", + "age": 25 + }`, + }, + testUtils.CreateDoc{ + NodeID: immutable.Some(0), + Doc: `{ + "name": "Fred", + "age": 30 + }`, + }, + testUtils.WaitForSync{}, + testUtils.DeleteDoc{ + NodeID: immutable.Some(0), + DocID: 0, + }, + testUtils.WaitForSync{}, + testUtils.Request{ + NodeID: immutable.Some(1), + Request: `query { + Users (filter: {name: {_eq: "Fred"}}){ + age + } + }`, + Results: []map[string]any{ + { + "age": int64(30), + }, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +}