Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Make peers sync secondary index #2390

Merged
merged 6 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions client/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,21 @@ type Collection interface {

// GetIndexes returns all the indexes that exist on the collection.
GetIndexes(ctx context.Context) ([]IndexDescription, error)

// CreateDocIndex creates an index for the given document.
// WARNING: This method is only for internal use and is not supposed to be called by the client
// as it might compromise the integrity of the database. This method will be removed in the future
CreateDocIndex(context.Context, *Document) error

// UpdateDocIndex updates the index for the given document.
// WARNING: This method is only for internal use and is not supposed to be called by the client
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

praise: These are suitably scary - thanks 😁

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I second that 👍

// as it might compromise the integrity of the database. This method will be removed in the future
UpdateDocIndex(ctx context.Context, oldDoc, newDoc *Document) error

// DeleteDocIndex deletes the index for the given document.
// WARNING: This method is only for internal use and is not supposed to be called by the client
// as it might compromise the integrity of the database. This method will be removed in the future
DeleteDocIndex(context.Context, *Document) error
}

// DocIDResult wraps the result of an attempt at a DocID retrieval operation.
Expand Down
20 changes: 20 additions & 0 deletions client/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,12 @@

package client

import (
"context"

"github.com/sourcenetwork/defradb/datastore"
)

// IndexFieldDescription describes how a field is being indexed.
type IndexedFieldDescription struct {
// Name contains the name of the field.
Expand All @@ -30,6 +36,20 @@ type IndexDescription struct {
Unique bool
}

// CollectionIndex is an interface for indexing documents in a collection.
type CollectionIndex interface {
// Save indexes a document by storing it
Save(context.Context, datastore.Txn, *Document) error
// Update updates an existing document in the index
Update(context.Context, datastore.Txn, *Document, *Document) error
// Delete deletes an existing document from the index
Delete(context.Context, datastore.Txn, *Document) error
// Name returns the name of the index
Name() string
// Description returns the description of the index
Description() IndexDescription
}

// CollectIndexedFields returns all fields that are indexed by all collection indexes.
func (d CollectionDefinition) CollectIndexedFields() []FieldDefinition {
fieldsMap := make(map[string]bool)
Expand Down
74 changes: 71 additions & 3 deletions db/collection_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ func (db *db) dropCollectionIndex(
return col.DropIndex(ctx, indexName)
}

// getAllIndexes returns all the indexes in the database.
func (db *db) getAllIndexes(
// getAllIndexDescriptions returns all the index descriptions in the database.
func (db *db) getAllIndexDescriptions(
ctx context.Context,
txn datastore.Txn,
) (map[client.CollectionName][]client.IndexDescription, error) {
Expand Down Expand Up @@ -107,6 +107,55 @@ func (db *db) fetchCollectionIndexDescriptions(
return indexDescriptions, nil
}

func (c *collection) CreateDocIndex(ctx context.Context, doc *client.Document) error {
txn, err := c.getTxn(ctx, false)
if err != nil {
return err
}
defer c.discardImplicitTxn(ctx, txn)

err = c.indexNewDoc(ctx, txn, doc)
if err != nil {
return err
}

return c.commitImplicitTxn(ctx, txn)
}

func (c *collection) UpdateDocIndex(ctx context.Context, oldDoc, newDoc *client.Document) error {
txn, err := c.getTxn(ctx, false)
if err != nil {
return err
}
defer c.discardImplicitTxn(ctx, txn)

err = c.deleteIndexedDoc(ctx, txn, oldDoc)
if err != nil {
return err
}
err = c.indexNewDoc(ctx, txn, newDoc)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: I do see delete+new as still being business logic, but only at a nitpick level - so feel free to merge as is :)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you expand on what is your nitpick concern here Andy?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have none and thought I was in the txn_db.go file :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I see I how I got confused - they have moved to collection, and I new that they had moved, but my brain still thought of the implementation as being in the txn_db file when I saw the txn logic. I wonder if we can do a similar job to collection that we did to client.Store and split out the txn boilerplate to another file sometime

if err != nil {
return err
}

return c.commitImplicitTxn(ctx, txn)
}

func (c *collection) DeleteDocIndex(ctx context.Context, doc *client.Document) error {
txn, err := c.getTxn(ctx, false)
if err != nil {
return err
}
defer c.discardImplicitTxn(ctx, txn)

err = c.deleteIndexedDoc(ctx, txn, doc)
if err != nil {
return err
}

return c.commitImplicitTxn(ctx, txn)
}

func (c *collection) indexNewDoc(ctx context.Context, txn datastore.Txn, doc *client.Document) error {
err := c.loadIndexes(ctx, txn)
if err != nil {
Expand All @@ -133,7 +182,8 @@ func (c *collection) updateIndexedDoc(
oldDoc, err := c.get(
ctx,
txn,
c.getPrimaryKeyFromDocID(doc.ID()), c.Definition().CollectIndexedFields(),
c.getPrimaryKeyFromDocID(doc.ID()),
c.Definition().CollectIndexedFields(),
false,
)
if err != nil {
Expand All @@ -148,6 +198,24 @@ func (c *collection) updateIndexedDoc(
return nil
}

func (c *collection) deleteIndexedDoc(
ctx context.Context,
txn datastore.Txn,
doc *client.Document,
) error {
err := c.loadIndexes(ctx, txn)
if err != nil {
return err
}
for _, index := range c.indexes {
err = index.Delete(ctx, txn, doc)
if err != nil {
return err
}
}
return nil
}

// CreateIndex creates a new index on the collection.
//
// If the index name is empty, a name will be automatically generated.
Expand Down
25 changes: 17 additions & 8 deletions db/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,9 @@ import (
// It abstracts away common index functionality to be implemented
// by different index types: non-unique, unique, and composite
type CollectionIndex interface {
// Save indexes a document by storing it
Save(context.Context, datastore.Txn, *client.Document) error
// Update updates an existing document in the index
Update(context.Context, datastore.Txn, *client.Document, *client.Document) error
client.CollectionIndex
// RemoveAll removes all documents from the index
RemoveAll(context.Context, datastore.Txn) error
// Name returns the name of the index
Name() string
// Description returns the description of the index
Description() client.IndexDescription
}

func canConvertIndexFieldValue[T any](val any) bool {
Expand Down Expand Up @@ -248,6 +241,14 @@ func (index *collectionSimpleIndex) Update(
return index.Save(ctx, txn, newDoc)
}

func (index *collectionSimpleIndex) Delete(
ctx context.Context,
txn datastore.Txn,
doc *client.Document,
) error {
return index.deleteDocIndex(ctx, txn, doc)
}

func (index *collectionSimpleIndex) deleteDocIndex(
ctx context.Context,
txn datastore.Txn,
Expand Down Expand Up @@ -358,6 +359,14 @@ func (index *collectionUniqueIndex) prepareIndexRecordToStore(
return key, val, nil
}

func (index *collectionUniqueIndex) Delete(
ctx context.Context,
txn datastore.Txn,
doc *client.Document,
) error {
return index.deleteDocIndex(ctx, txn, doc)
}

func (index *collectionUniqueIndex) Update(
ctx context.Context,
txn datastore.Txn,
Expand Down
2 changes: 1 addition & 1 deletion db/index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ func (f *indexTestFixture) createCollectionIndexFor(
}

func (f *indexTestFixture) getAllIndexes() (map[client.CollectionName][]client.IndexDescription, error) {
return f.db.getAllIndexes(f.ctx, f.txn)
return f.db.getAllIndexDescriptions(f.ctx, f.txn)
}

func (f *indexTestFixture) getCollectionIndexes(colID uint32) ([]client.IndexDescription, error) {
Expand Down
4 changes: 2 additions & 2 deletions db/txn_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,14 +174,14 @@ func (db *implicitTxnDB) GetAllIndexes(
}
defer txn.Discard(ctx)

return db.getAllIndexes(ctx, txn)
return db.getAllIndexDescriptions(ctx, txn)
}

// GetAllIndexes gets all the indexes in the database.
func (db *explicitTxnDB) GetAllIndexes(
ctx context.Context,
) (map[client.CollectionName][]client.IndexDescription, error) {
return db.getAllIndexes(ctx, db.txn)
return db.getAllIndexDescriptions(ctx, db.txn)
}

// AddSchema takes the provided GQL schema in SDL format, and applies it to the database,
Expand Down
4 changes: 2 additions & 2 deletions http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func (c *Client) ExecRequest(ctx context.Context, query string) *client.RequestR
return result
}
if res.Header.Get("Content-Type") == "text/event-stream" {
result.Pub = c.execRequestSubscription(ctx, res.Body)
result.Pub = c.execRequestSubscription(res.Body)
return result
}
// ignore close errors because they have
Expand All @@ -367,7 +367,7 @@ func (c *Client) ExecRequest(ctx context.Context, query string) *client.RequestR
return result
}

func (c *Client) execRequestSubscription(ctx context.Context, r io.ReadCloser) *events.Publisher[events.Update] {
func (c *Client) execRequestSubscription(r io.ReadCloser) *events.Publisher[events.Update] {
pubCh := events.New[events.Update](0, 0)
pub, err := events.NewPublisher[events.Update](pubCh, 0)
if err != nil {
Expand Down
12 changes: 12 additions & 0 deletions http/client_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,3 +457,15 @@ func (c *Collection) GetIndexes(ctx context.Context) ([]client.IndexDescription,
}
return indexes, nil
}

func (c *Collection) CreateDocIndex(context.Context, *client.Document) error {
return ErrMethodIsNotImplemented
}

func (c *Collection) UpdateDocIndex(ctx context.Context, oldDoc, newDoc *client.Document) error {
return ErrMethodIsNotImplemented
}

func (c *Collection) DeleteDocIndex(context.Context, *client.Document) error {
return ErrMethodIsNotImplemented
}
20 changes: 11 additions & 9 deletions http/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,24 @@ import (
)

const (
errFailedToLoadKeys string = "failed to load given keys"
errFailedToLoadKeys string = "failed to load given keys"
errMethodIsNotImplemented string = "the method is not implemented"
)

// Errors returnable from this package.
//
// This list is incomplete. Undefined errors may also be returned.
// Errors returned from this package may be tested against these errors with errors.Is.
var (
ErrNoListener = errors.New("cannot serve with no listener")
ErrNoEmail = errors.New("email address must be specified for tls with autocert")
ErrInvalidRequestBody = errors.New("invalid request body")
ErrStreamingNotSupported = errors.New("streaming not supported")
ErrMigrationNotFound = errors.New("migration not found")
ErrMissingRequest = errors.New("missing request")
ErrInvalidTransactionId = errors.New("invalid transaction id")
ErrP2PDisabled = errors.New("p2p network is disabled")
ErrNoListener = errors.New("cannot serve with no listener")
ErrNoEmail = errors.New("email address must be specified for tls with autocert")
ErrInvalidRequestBody = errors.New("invalid request body")
ErrStreamingNotSupported = errors.New("streaming not supported")
ErrMigrationNotFound = errors.New("migration not found")
ErrMissingRequest = errors.New("missing request")
ErrInvalidTransactionId = errors.New("invalid transaction id")
ErrP2PDisabled = errors.New("p2p network is disabled")
ErrMethodIsNotImplemented = errors.New(errMethodIsNotImplemented)
)

type errorResponse struct {
Expand Down
Loading
Loading