Skip to content

Commit

Permalink
Make peers sync secondary index
Browse files Browse the repository at this point in the history
  • Loading branch information
islamaliev committed Mar 7, 2024
1 parent 4e9f6f4 commit 4a2b0f7
Show file tree
Hide file tree
Showing 13 changed files with 539 additions and 41 deletions.
9 changes: 9 additions & 0 deletions client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,15 @@ type Store interface {
// GetAllIndexes returns all the indexes that currently exist within this [Store].
GetAllIndexes(context.Context) (map[CollectionName][]IndexDescription, error)

// CreateDocIndex creates an index for the given document.
CreateDocIndex(context.Context, Collection, *Document) error

// UpdateDocIndex updates the index for the given document.
UpdateDocIndex(ctx context.Context, col Collection, oldDoc, newDoc *Document) error

// DeleteDocIndex deletes the index for the given document.
DeleteDocIndex(context.Context, Collection, *Document) error

// ExecRequest executes the given GQL request against the [Store].
ExecRequest(context.Context, string) *RequestResult
}
Expand Down
20 changes: 20 additions & 0 deletions client/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@

package client

import (
"context"

"github.com/sourcenetwork/defradb/datastore"
)

// IndexFieldDescription describes how a field is being indexed.
type IndexedFieldDescription struct {
// Name contains the name of the field.
Expand All @@ -30,6 +36,20 @@ type IndexDescription struct {
Unique bool
}

// CollectionIndex is an interface for indexing documents in a collection.
type CollectionIndex interface {
// Save indexes a document by storing it
Save(context.Context, datastore.Txn, *Document) error
// Update updates an existing document in the index
Update(context.Context, datastore.Txn, *Document, *Document) error
// Delete deletes an existing document from the index
Delete(context.Context, datastore.Txn, *Document) error
// Name returns the name of the index
Name() string
// Description returns the description of the index
Description() IndexDescription
}

// CollectIndexedFields returns all fields that are indexed by all collection indexes.
func (d CollectionDefinition) CollectIndexedFields() []FieldDefinition {
fieldsMap := make(map[string]bool)
Expand Down
23 changes: 20 additions & 3 deletions db/collection_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ func (db *db) dropCollectionIndex(
return col.DropIndex(ctx, indexName)
}

// getAllIndexes returns all the indexes in the database.
func (db *db) getAllIndexes(
// getAllIndexDescriptions returns all the index descriptions in the database.
func (db *db) getAllIndexDescriptions(
ctx context.Context,
txn datastore.Txn,
) (map[client.CollectionName][]client.IndexDescription, error) {
Expand Down Expand Up @@ -107,6 +107,22 @@ func (db *db) fetchCollectionIndexDescriptions(
return indexDescriptions, nil
}

func (db *db) getCollectionIndexes(ctx context.Context, txn datastore.Txn, col client.Collection) ([]CollectionIndex, error) {

Check failure on line 110 in db/collection_index.go

View workflow job for this annotation

GitHub Actions / Lint GoLang job

line is 126 characters (lll)
indexDescriptions, err := db.fetchCollectionIndexDescriptions(ctx, txn, col.ID())
if err != nil {
return nil, err
}

Check warning on line 114 in db/collection_index.go

View check run for this annotation

Codecov / codecov/patch

db/collection_index.go#L113-L114

Added lines #L113 - L114 were not covered by tests
colIndexes := make([]CollectionIndex, 0, len(indexDescriptions))
for _, indexDesc := range indexDescriptions {
index, err := NewCollectionIndex(col, indexDesc)
if err != nil {
return nil, err
}

Check warning on line 120 in db/collection_index.go

View check run for this annotation

Codecov / codecov/patch

db/collection_index.go#L119-L120

Added lines #L119 - L120 were not covered by tests
colIndexes = append(colIndexes, index)
}
return colIndexes, nil
}

func (c *collection) indexNewDoc(ctx context.Context, txn datastore.Txn, doc *client.Document) error {
err := c.loadIndexes(ctx, txn)
if err != nil {
Expand All @@ -133,7 +149,8 @@ func (c *collection) updateIndexedDoc(
oldDoc, err := c.get(
ctx,
txn,
c.getPrimaryKeyFromDocID(doc.ID()), c.Definition().CollectIndexedFields(),
c.getPrimaryKeyFromDocID(doc.ID()),
c.Definition().CollectIndexedFields(),
false,
)
if err != nil {
Expand Down
25 changes: 17 additions & 8 deletions db/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,9 @@ import (
// It abstracts away common index functionality to be implemented
// by different index types: non-unique, unique, and composite
type CollectionIndex interface {
// Save indexes a document by storing it
Save(context.Context, datastore.Txn, *client.Document) error
// Update updates an existing document in the index
Update(context.Context, datastore.Txn, *client.Document, *client.Document) error
client.CollectionIndex
// RemoveAll removes all documents from the index
RemoveAll(context.Context, datastore.Txn) error
// Name returns the name of the index
Name() string
// Description returns the description of the index
Description() client.IndexDescription
}

func canConvertIndexFieldValue[T any](val any) bool {
Expand Down Expand Up @@ -248,6 +241,14 @@ func (index *collectionSimpleIndex) Update(
return index.Save(ctx, txn, newDoc)
}

func (index *collectionSimpleIndex) Delete(
ctx context.Context,
txn datastore.Txn,
doc *client.Document,
) error {
return index.deleteDocIndex(ctx, txn, doc)
}

func (index *collectionSimpleIndex) deleteDocIndex(
ctx context.Context,
txn datastore.Txn,
Expand Down Expand Up @@ -358,6 +359,14 @@ func (index *collectionUniqueIndex) prepareIndexRecordToStore(
return key, val, nil
}

func (index *collectionUniqueIndex) Delete(
ctx context.Context,
txn datastore.Txn,
doc *client.Document,
) error {
return index.deleteDocIndex(ctx, txn, doc)

Check warning on line 367 in db/index.go

View check run for this annotation

Codecov / codecov/patch

db/index.go#L366-L367

Added lines #L366 - L367 were not covered by tests
}

func (index *collectionUniqueIndex) Update(
ctx context.Context,
txn datastore.Txn,
Expand Down
2 changes: 1 addition & 1 deletion db/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func (f *indexTestFixture) createCollectionIndexFor(
}

func (f *indexTestFixture) getAllIndexes() (map[client.CollectionName][]client.IndexDescription, error) {
return f.db.getAllIndexes(f.ctx, f.txn)
return f.db.getAllIndexDescriptions(f.ctx, f.txn)
}

func (f *indexTestFixture) getCollectionIndexes(colID uint32) ([]client.IndexDescription, error) {
Expand Down
144 changes: 142 additions & 2 deletions db/txn_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,14 +174,154 @@ func (db *implicitTxnDB) GetAllIndexes(
}
defer txn.Discard(ctx)

return db.getAllIndexes(ctx, txn)
return db.getAllIndexDescriptions(ctx, txn)
}

// GetAllIndexes gets all the indexes in the database.
func (db *explicitTxnDB) GetAllIndexes(
ctx context.Context,
) (map[client.CollectionName][]client.IndexDescription, error) {
return db.getAllIndexes(ctx, db.txn)
return db.getAllIndexDescriptions(ctx, db.txn)

Check warning on line 184 in db/txn_db.go

View check run for this annotation

Codecov / codecov/patch

db/txn_db.go#L184

Added line #L184 was not covered by tests
}

// CreateDocIndex creates a new index for the given document.
func (db *implicitTxnDB) CreateDocIndex(
ctx context.Context,
col client.Collection,
doc *client.Document,
) error {
txn, err := db.NewTxn(ctx, true)
if err != nil {
return err
}
defer txn.Discard(ctx)

indexes, err := db.getCollectionIndexes(ctx, txn, col)
if err != nil {
return err
}

Check warning on line 202 in db/txn_db.go

View check run for this annotation

Codecov / codecov/patch

db/txn_db.go#L192-L202

Added lines #L192 - L202 were not covered by tests

for _, index := range indexes {
err := index.Save(ctx, txn, doc)
if err != nil {
return err
}

Check warning on line 208 in db/txn_db.go

View check run for this annotation

Codecov / codecov/patch

db/txn_db.go#L204-L208

Added lines #L204 - L208 were not covered by tests
}
return txn.Commit(ctx)

Check warning on line 210 in db/txn_db.go

View check run for this annotation

Codecov / codecov/patch

db/txn_db.go#L210

Added line #L210 was not covered by tests
}

// CreateDocIndex creates a new index for the given document.
func (db *explicitTxnDB) CreateDocIndex(
ctx context.Context,
col client.Collection,
doc *client.Document,
) error {
indexes, err := db.getCollectionIndexes(ctx, db.txn, col)
if err != nil {
return err
}

Check warning on line 222 in db/txn_db.go

View check run for this annotation

Codecov / codecov/patch

db/txn_db.go#L221-L222

Added lines #L221 - L222 were not covered by tests

for _, index := range indexes {
err := index.Save(ctx, db.txn, doc)
if err != nil {
return err
}

Check warning on line 228 in db/txn_db.go

View check run for this annotation

Codecov / codecov/patch

db/txn_db.go#L227-L228

Added lines #L227 - L228 were not covered by tests
}
return nil
}

// UpdateDocIndex updates the indexes for the given document.
func (db *implicitTxnDB) UpdateDocIndex(
ctx context.Context,
col client.Collection,
oldDoc *client.Document,
newDoc *client.Document,
) error {
txn, err := db.NewTxn(ctx, true)
if err != nil {
return err
}
defer txn.Discard(ctx)

indexes, err := db.getCollectionIndexes(ctx, txn, col)
if err != nil {
return err
}

Check warning on line 249 in db/txn_db.go

View check run for this annotation

Codecov / codecov/patch

db/txn_db.go#L239-L249

Added lines #L239 - L249 were not covered by tests

for _, index := range indexes {
err := index.Update(ctx, txn, oldDoc, newDoc)
if err != nil {
return err
}

Check warning on line 255 in db/txn_db.go

View check run for this annotation

Codecov / codecov/patch

db/txn_db.go#L251-L255

Added lines #L251 - L255 were not covered by tests
}
return txn.Commit(ctx)

Check warning on line 257 in db/txn_db.go

View check run for this annotation

Codecov / codecov/patch

db/txn_db.go#L257

Added line #L257 was not covered by tests
}

// UpdateDocIndex updates the indexes for the given document.
func (db *explicitTxnDB) UpdateDocIndex(
ctx context.Context,
col client.Collection,
oldDoc *client.Document,
newDoc *client.Document,
) error {
indexes, err := db.getCollectionIndexes(ctx, db.txn, col)
if err != nil {
return err
}

Check warning on line 270 in db/txn_db.go

View check run for this annotation

Codecov / codecov/patch

db/txn_db.go#L269-L270

Added lines #L269 - L270 were not covered by tests

for _, index := range indexes {
err := index.Update(ctx, db.txn, oldDoc, newDoc)
if err != nil {
return err
}

Check warning on line 276 in db/txn_db.go

View check run for this annotation

Codecov / codecov/patch

db/txn_db.go#L275-L276

Added lines #L275 - L276 were not covered by tests
}
return nil
}

// DeleteDocIndex deletes the indexes for the given document.
func (db *implicitTxnDB) DeleteDocIndex(
ctx context.Context,
col client.Collection,
doc *client.Document,
) error {
txn, err := db.NewTxn(ctx, true)
if err != nil {
return err
}
defer txn.Discard(ctx)

indexes, err := db.getCollectionIndexes(ctx, txn, col)
if err != nil {
return err
}

Check warning on line 296 in db/txn_db.go

View check run for this annotation

Codecov / codecov/patch

db/txn_db.go#L286-L296

Added lines #L286 - L296 were not covered by tests

for _, index := range indexes {
err := index.Delete(ctx, txn, doc)
if err != nil {
return err
}

Check warning on line 302 in db/txn_db.go

View check run for this annotation

Codecov / codecov/patch

db/txn_db.go#L298-L302

Added lines #L298 - L302 were not covered by tests
}
return txn.Commit(ctx)

Check warning on line 304 in db/txn_db.go

View check run for this annotation

Codecov / codecov/patch

db/txn_db.go#L304

Added line #L304 was not covered by tests
}

// DeleteDocIndex deletes the indexes for the given document.
func (db *explicitTxnDB) DeleteDocIndex(
ctx context.Context,
col client.Collection,
doc *client.Document,
) error {
indexes, err := db.getCollectionIndexes(ctx, db.txn, col)
if err != nil {
return err
}

Check warning on line 316 in db/txn_db.go

View check run for this annotation

Codecov / codecov/patch

db/txn_db.go#L315-L316

Added lines #L315 - L316 were not covered by tests

for _, index := range indexes {
err := index.Delete(ctx, db.txn, doc)
if err != nil {
return err
}

Check warning on line 322 in db/txn_db.go

View check run for this annotation

Codecov / codecov/patch

db/txn_db.go#L321-L322

Added lines #L321 - L322 were not covered by tests
}
return nil
}

// AddSchema takes the provided GQL schema in SDL format, and applies it to the database,
Expand Down
29 changes: 27 additions & 2 deletions http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func (c *Client) ExecRequest(ctx context.Context, query string) *client.RequestR
return result
}
if res.Header.Get("Content-Type") == "text/event-stream" {
result.Pub = c.execRequestSubscription(ctx, res.Body)
result.Pub = c.execRequestSubscription(res.Body)
return result
}
// ignore close errors because they have
Expand All @@ -367,7 +367,7 @@ func (c *Client) ExecRequest(ctx context.Context, query string) *client.RequestR
return result
}

func (c *Client) execRequestSubscription(ctx context.Context, r io.ReadCloser) *events.Publisher[events.Update] {
func (c *Client) execRequestSubscription(r io.ReadCloser) *events.Publisher[events.Update] {
pubCh := events.New[events.Update](0, 0)
pub, err := events.NewPublisher[events.Update](pubCh, 0)
if err != nil {
Expand Down Expand Up @@ -434,3 +434,28 @@ func (c *Client) Events() events.Events {
func (c *Client) MaxTxnRetries() int {
panic("client side database")
}

func (c *Client) CreateDocIndex(
ctx context.Context,
col client.Collection,
doc *client.Document,
) error {
panic("client side database")

Check warning on line 443 in http/client.go

View check run for this annotation

Codecov / codecov/patch

http/client.go#L442-L443

Added lines #L442 - L443 were not covered by tests
}

func (c *Client) UpdateDocIndex(
ctx context.Context,
col client.Collection,
oldDoc *client.Document,
newDoc *client.Document,
) error {
panic("client side database")

Check warning on line 452 in http/client.go

View check run for this annotation

Codecov / codecov/patch

http/client.go#L451-L452

Added lines #L451 - L452 were not covered by tests
}

func (w *Client) DeleteDocIndex(
ctx context.Context,
col client.Collection,
newDoc *client.Document,
) error {
panic("client side database")

Check warning on line 460 in http/client.go

View check run for this annotation

Codecov / codecov/patch

http/client.go#L459-L460

Added lines #L459 - L460 were not covered by tests
}
Loading

0 comments on commit 4a2b0f7

Please sign in to comment.