Skip to content

Commit

Permalink
refactor: DAG sync and move merge outside of net package (#2658)
Browse files Browse the repository at this point in the history
## Relevant issue(s)

Resolves #2624 

## Description

This PR simplifies the DAG sync process withing the `net` package and
moves the merge functionality to the `db` package. The merge is now
initiated via an event channel.

Note: I did a search and replace for `SchemaVersionId` to
`SchemaVersionID`. It's in its own commit. I've also remove the
`tests/integration/net/order` tests as they are now annoying to maintain
an will become even more irrelevant when we refactor the WaitForSync
functionality of our test framework.

Another note: I've reduced the severity of the race condition on my Mac.
We had a lot of leaking go routines and what is left of them is
WaitForSync methods that sometimes seem to leak and also badger cache
and libp2p transport that seem to leak go routines on close but I'm not
sure how to handle these last two.
  • Loading branch information
fredcarle authored Jun 7, 2024
1 parent a7004b2 commit 0c134e5
Show file tree
Hide file tree
Showing 69 changed files with 1,035 additions and 1,575 deletions.
1 change: 1 addition & 0 deletions cli/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ func MakeStartCommand() *cobra.Command {
node.WithPeers(peers...),
// db options
db.WithUpdateEvents(),
db.WithDAGMergeEvents(),
db.WithMaxRetries(cfg.GetInt("datastore.MaxTxnRetries")),
// net node options
net.WithListenAddresses(cfg.GetStringSlice("net.p2pAddresses")...),
Expand Down
15 changes: 0 additions & 15 deletions client/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,21 +122,6 @@ type Collection interface {

// GetIndexes returns all the indexes that exist on the collection.
GetIndexes(ctx context.Context) ([]IndexDescription, error)

// CreateDocIndex creates an index for the given document.
// WARNING: This method is only for internal use and is not supposed to be called by the client
// as it might compromise the integrity of the database. This method will be removed in the future
CreateDocIndex(context.Context, *Document) error

// UpdateDocIndex updates the index for the given document.
// WARNING: This method is only for internal use and is not supposed to be called by the client
// as it might compromise the integrity of the database. This method will be removed in the future
UpdateDocIndex(ctx context.Context, oldDoc, newDoc *Document) error

// DeleteDocIndex deletes the index for the given document.
// WARNING: This method is only for internal use and is not supposed to be called by the client
// as it might compromise the integrity of the database. This method will be removed in the future
DeleteDocIndex(context.Context, *Document) error
}

// DocIDResult wraps the result of an attempt at a DocID retrieval operation.
Expand Down
130 changes: 0 additions & 130 deletions client/mocks/collection.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 8 additions & 4 deletions datastore/memory/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,10 +347,14 @@ func (d *Datastore) executePurge(ctx context.Context) {
}

func (d *Datastore) handleContextDone(ctx context.Context) {
<-ctx.Done()
// It is safe to ignore the error since the only error that could occur is if the
// datastore is already closed, in which case the purpose of the `Close` call is already covered.
_ = d.Close()
select {
case <-d.closing:
return
case <-ctx.Done():
// It is safe to ignore the error since the only error that could occur is if the
// datastore is already closed, in which case the purpose of the `Close` call is already covered.
_ = d.Close()
}
}

// commit commits the given transaction to the datastore.
Expand Down
33 changes: 33 additions & 0 deletions events/dag_sync.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2024 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package events

import (
"sync"

"github.com/ipfs/go-cid"

"github.com/sourcenetwork/immutable"
)

// DAGMergeChannel is the bus onto which dag merge are published.
type DAGMergeChannel = immutable.Option[Channel[DAGMerge]]

// DAGMerge is a notification that a merge can be performed up to the provided CID.
type DAGMerge struct {
// Cid is the id of the composite commit that formed this update in the DAG.
Cid cid.Cid
// SchemaRoot is the root identifier of the schema that defined the shape of the document that was updated.
SchemaRoot string
// Wg is a wait group that can be used to synchronize the merge,
// allowing the caller to optionnaly block until the merge is complete.
Wg *sync.WaitGroup
}
3 changes: 3 additions & 0 deletions events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,7 @@ func New[T any](commandBufferSize int, eventBufferSize int) Channel[T] {
type Events struct {
// Updates publishes an `Update` for each document written to in the database.
Updates UpdateChannel

// DAGMerges publishes a `DAGMerge` for each completed DAG sync process over P2P.
DAGMerges DAGMergeChannel
}
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1166,8 +1166,6 @@ github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49u
github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM=
github.com/vito/go-sse v1.0.0 h1:e6/iTrrvy8BRrOwJwmQmlndlil+TLdxXvHi55ZDzH6M=
github.com/vito/go-sse v1.0.0/go.mod h1:2wkcaQ+jtlZ94Uve8gYZjFpL68luAjssTINA2hpgcZs=
github.com/warpfork/go-testmark v0.12.1 h1:rMgCpJfwy1sJ50x0M0NgyphxYYPMOODIJHhsXyEHU0s=
github.com/warpfork/go-testmark v0.12.1/go.mod h1:kHwy7wfvGSPh1rQJYKayD4AbtNaeyZdcGi9tNJTaa5Y=
github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0 h1:GDDkbFiaK8jsSDJfjId/PEGEShv6ugrt4kYsC5UIDaQ=
github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw=
github.com/wasmerio/wasmer-go v1.0.4 h1:MnqHoOGfiQ8MMq2RF6wyCeebKOe84G88h5yv+vmxJgs=
Expand Down
12 changes: 0 additions & 12 deletions http/client_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,15 +413,3 @@ func (c *Collection) GetIndexes(ctx context.Context) ([]client.IndexDescription,
}
return indexes, nil
}

func (c *Collection) CreateDocIndex(context.Context, *client.Document) error {
return ErrMethodIsNotImplemented
}

func (c *Collection) UpdateDocIndex(ctx context.Context, oldDoc, newDoc *client.Document) error {
return ErrMethodIsNotImplemented
}

func (c *Collection) DeleteDocIndex(context.Context, *client.Document) error {
return ErrMethodIsNotImplemented
}
4 changes: 2 additions & 2 deletions internal/core/crdt/composite.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func (c CompositeDAG) Set(status client.DocumentStatus) *CompositeDAGDelta {
return &CompositeDAGDelta{
DocID: []byte(c.key.DocID),
FieldName: c.fieldName,
SchemaVersionID: c.schemaVersionKey.SchemaVersionId,
SchemaVersionID: c.schemaVersionKey.SchemaVersionID,
Status: status,
}
}
Expand Down Expand Up @@ -130,7 +130,7 @@ func (c CompositeDAG) Merge(ctx context.Context, delta core.Delta) error {
// been migrated yet locally.
schemaVersionId = dagDelta.SchemaVersionID
} else {
schemaVersionId = c.schemaVersionKey.SchemaVersionId
schemaVersionId = c.schemaVersionKey.SchemaVersionID
}

err = c.store.Put(ctx, versionKey.ToDS(), []byte(schemaVersionId))
Expand Down
2 changes: 1 addition & 1 deletion internal/core/crdt/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func (c Counter[T]) Increment(ctx context.Context, value T) (*CounterDelta[T], e
DocID: []byte(c.key.DocID),
FieldName: c.fieldName,
Data: value,
SchemaVersionID: c.schemaVersionKey.SchemaVersionId,
SchemaVersionID: c.schemaVersionKey.SchemaVersionID,
Nonce: nonce,
}, nil
}
Expand Down
2 changes: 1 addition & 1 deletion internal/core/crdt/lwwreg.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ func (reg LWWRegister) Set(value []byte) *LWWRegDelta {
Data: value,
DocID: []byte(reg.key.DocID),
FieldName: reg.fieldName,
SchemaVersionID: reg.schemaVersionKey.SchemaVersionId,
SchemaVersionID: reg.schemaVersionKey.SchemaVersionID,
}
}

Expand Down
10 changes: 5 additions & 5 deletions internal/core/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ var _ Key = (*CollectionNameKey)(nil)
//
// This key should be removed in https://github.com/sourcenetwork/defradb/issues/1085
type CollectionSchemaVersionKey struct {
SchemaVersionId string
SchemaVersionID string
CollectionID uint32
}

Expand Down Expand Up @@ -296,7 +296,7 @@ func NewCollectionNameKey(name string) CollectionNameKey {

func NewCollectionSchemaVersionKey(schemaVersionId string, collectionID uint32) CollectionSchemaVersionKey {
return CollectionSchemaVersionKey{
SchemaVersionId: schemaVersionId,
SchemaVersionID: schemaVersionId,
CollectionID: collectionID,
}
}
Expand All @@ -309,7 +309,7 @@ func NewCollectionSchemaVersionKeyFromString(key string) (CollectionSchemaVersio
}

return CollectionSchemaVersionKey{
SchemaVersionId: elements[len(elements)-2],
SchemaVersionID: elements[len(elements)-2],
CollectionID: uint32(colID),
}, nil
}
Expand Down Expand Up @@ -591,8 +591,8 @@ func (k CollectionNameKey) ToDS() ds.Key {
func (k CollectionSchemaVersionKey) ToString() string {
result := COLLECTION_SCHEMA_VERSION

if k.SchemaVersionId != "" {
result = result + "/" + k.SchemaVersionId
if k.SchemaVersionID != "" {
result = result + "/" + k.SchemaVersionID
}

if k.CollectionID != 0 {
Expand Down
46 changes: 3 additions & 43 deletions internal/db/collection_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,53 +109,13 @@ func (db *db) fetchCollectionIndexDescriptions(
return indexDescriptions, nil
}

func (c *collection) CreateDocIndex(ctx context.Context, doc *client.Document) error {
ctx, txn, err := ensureContextTxn(ctx, c.db, false)
if err != nil {
return err
}
defer txn.Discard(ctx)

err = c.indexNewDoc(ctx, doc)
if err != nil {
return err
}

return txn.Commit(ctx)
}

func (c *collection) UpdateDocIndex(ctx context.Context, oldDoc, newDoc *client.Document) error {
ctx, txn, err := ensureContextTxn(ctx, c.db, false)
if err != nil {
return err
}
defer txn.Discard(ctx)

err = c.deleteIndexedDoc(ctx, oldDoc)
if err != nil {
return err
}
err = c.indexNewDoc(ctx, newDoc)
if err != nil {
return err
}

return txn.Commit(ctx)
}

func (c *collection) DeleteDocIndex(ctx context.Context, doc *client.Document) error {
ctx, txn, err := ensureContextTxn(ctx, c.db, false)
func (c *collection) updateDocIndex(ctx context.Context, oldDoc, newDoc *client.Document) error {
err := c.deleteIndexedDoc(ctx, oldDoc)
if err != nil {
return err
}
defer txn.Discard(ctx)

err = c.deleteIndexedDoc(ctx, doc)
if err != nil {
return err
}

return txn.Commit(ctx)
return c.indexNewDoc(ctx, newDoc)
}

func (c *collection) indexNewDoc(ctx context.Context, doc *client.Document) error {
Expand Down
Loading

0 comments on commit 0c134e5

Please sign in to comment.