Skip to content

Commit

Permalink
Implement branchable collections
Browse files Browse the repository at this point in the history
  • Loading branch information
AndrewSisley committed Nov 7, 2024
1 parent b5a94b1 commit 2540b3c
Show file tree
Hide file tree
Showing 37 changed files with 1,646 additions and 50 deletions.
17 changes: 17 additions & 0 deletions client/collection_description.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,21 @@ type CollectionDescription struct {
// At the moment this can only be set to `false` if this collection sources its data from
// another collection/query (is a View).
IsMaterialized bool

// IsMaterialized defines whether the history of this collection is tracked as a single,
// verifiable entity.
//
// If set to `true` any change to the contents of this set will be linked to a collection
// level commit via the document(s) composite commit.
//
// This enables multiple nodes to verify that they have the same state/history.
//
// The history may be queried like a document history can be queried, for example via 'commits'
// GQL queries.
//
// Currently this property is immutable and can only be set on collection creation, however
// that will change in the future.
IsBranchable bool
}

// QuerySource represents a collection data source from a query.
Expand Down Expand Up @@ -189,6 +204,7 @@ type collectionDescription struct {
RootID uint32
SchemaVersionID string
IsMaterialized bool
IsBranchable bool
Policy immutable.Option[PolicyDescription]
Indexes []IndexDescription
Fields []CollectionFieldDescription
Expand All @@ -209,6 +225,7 @@ func (c *CollectionDescription) UnmarshalJSON(bytes []byte) error {
c.RootID = descMap.RootID
c.SchemaVersionID = descMap.SchemaVersionID
c.IsMaterialized = descMap.IsMaterialized
c.IsBranchable = descMap.IsBranchable
c.Indexes = descMap.Indexes
c.Fields = descMap.Fields
c.Sources = make([]any, len(descMap.Sources))
Expand Down
1 change: 1 addition & 0 deletions internal/core/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func init() {
&crdt.LWWRegDelta{},
&crdt.CompositeDAGDelta{},
&crdt.CounterDelta{},
&crdt.CollectionDelta{},
)

EncryptionSchema, EncryptionSchemaPrototype = mustSetSchema(
Expand Down
78 changes: 78 additions & 0 deletions internal/core/crdt/collection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
// Copyright 2024 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package crdt

import (
"context"

"github.com/sourcenetwork/defradb/datastore"
"github.com/sourcenetwork/defradb/internal/core"
"github.com/sourcenetwork/defradb/internal/keys"
)

// Collection is a simple CRDT type that tracks changes to the contents of a
// collection in a similar way to a document composite commit.
type Collection struct {
store datastore.DSReaderWriter

// schemaVersionKey is the schema version datastore key at the time of commit.
//
// It can be used to identify the collection datastructure state at the time of commit.
schemaVersionKey keys.CollectionSchemaVersionKey
}

var _ core.ReplicatedData = (*Collection)(nil)

func NewCollection(store datastore.DSReaderWriter, schemaVersionKey keys.CollectionSchemaVersionKey) *Collection {
return &Collection{
store: store,
schemaVersionKey: schemaVersionKey,
}
}

func (c *Collection) Merge(ctx context.Context, other core.Delta) error {
// Collection merges don't actually need to do anything, as the delta is empty,
// and doc-level merges are handled by the document commits.
return nil
}

func (c *Collection) Append() *CollectionDelta {
return &CollectionDelta{
SchemaVersionID: c.schemaVersionKey.SchemaVersionID,
}
}

type CollectionDelta struct {
Priority uint64

// As we do not yet have a global collection id we temporarily rely on the schema
// version id for tracking which collection this belongs to. See:
// https://github.com/sourcenetwork/defradb/issues/3215
SchemaVersionID string
}

var _ core.Delta = (*CollectionDelta)(nil)

func (delta *CollectionDelta) IPLDSchemaBytes() []byte {
return []byte(`
type CollectionDelta struct {
priority Int
schemaVersionID String
}`)
}

func (d *CollectionDelta) GetPriority() uint64 {
return d.Priority
}

func (d *CollectionDelta) SetPriority(priority uint64) {
d.Priority = priority
}
22 changes: 22 additions & 0 deletions internal/core/crdt/ipld_union.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type CRDT struct {
LWWRegDelta *LWWRegDelta
CompositeDAGDelta *CompositeDAGDelta
CounterDelta *CounterDelta
CollectionDelta *CollectionDelta
}

// NewCRDT returns a new CRDT.
Expand All @@ -28,6 +29,8 @@ func NewCRDT(delta core.Delta) CRDT {
return CRDT{CompositeDAGDelta: d}
case *CounterDelta:
return CRDT{CounterDelta: d}
case *CollectionDelta:
return CRDT{CollectionDelta: d}
}
return CRDT{}
}
Expand All @@ -41,6 +44,7 @@ func (c CRDT) IPLDSchemaBytes() []byte {
| LWWRegDelta "lww"
| CompositeDAGDelta "composite"
| CounterDelta "counter"
| CollectionDelta "collection"
} representation keyed`)
}

Expand All @@ -53,6 +57,8 @@ func (c CRDT) GetDelta() core.Delta {
return c.CompositeDAGDelta
case c.CounterDelta != nil:
return c.CounterDelta
case c.CollectionDelta != nil:
return c.CollectionDelta
}
return nil
}
Expand All @@ -66,6 +72,8 @@ func (c CRDT) GetPriority() uint64 {
return c.CompositeDAGDelta.GetPriority()
case c.CounterDelta != nil:
return c.CounterDelta.GetPriority()
case c.CollectionDelta != nil:
return c.CollectionDelta.GetPriority()
}
return 0
}
Expand All @@ -90,6 +98,8 @@ func (c CRDT) GetDocID() []byte {
return c.CompositeDAGDelta.DocID
case c.CounterDelta != nil:
return c.CounterDelta.DocID
case c.CollectionDelta != nil:
return nil
}
return nil
}
Expand All @@ -103,6 +113,8 @@ func (c CRDT) GetSchemaVersionID() string {
return c.CompositeDAGDelta.SchemaVersionID
case c.CounterDelta != nil:
return c.CounterDelta.SchemaVersionID
case c.CollectionDelta != nil:
return c.CollectionDelta.SchemaVersionID
}
return ""
}
Expand Down Expand Up @@ -135,6 +147,11 @@ func (c CRDT) Clone() CRDT {
Nonce: c.CounterDelta.Nonce,
Data: c.CounterDelta.Data,
}
case c.CollectionDelta != nil:
cloned.CollectionDelta = &CollectionDelta{
Priority: c.CollectionDelta.Priority,
SchemaVersionID: c.CollectionDelta.SchemaVersionID,
}
}
return cloned
}
Expand Down Expand Up @@ -172,3 +189,8 @@ func (c CRDT) SetData(data []byte) {
func (c CRDT) IsComposite() bool {
return c.CompositeDAGDelta != nil
}

// IsCollection returns true if the CRDT is a collection CRDT.
func (c CRDT) IsCollection() bool {
return c.CollectionDelta != nil
}
23 changes: 23 additions & 0 deletions internal/db/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,29 @@ func (c *collection) save(
doc.SetHead(link.Cid)
})

if c.def.Description.IsBranchable {
collectionCRDT := merklecrdt.NewMerkleCollection(
txn,
keys.NewCollectionSchemaVersionKey(c.Schema().VersionID, c.ID()),
keys.NewHeadstoreColKey(c.def.Description.RootID),
)

link, headNode, err := collectionCRDT.Save(ctx, []coreblock.DAGLink{{Link: link}})
if err != nil {
return err
}

updateEvent := event.Update{
Cid: link.Cid,
SchemaRoot: c.Schema().Root,
Block: headNode,
}

txn.OnSuccess(func() {
c.db.events.Publish(event.NewMessage(event.UpdateName, updateEvent))
})
}

return nil
}

Expand Down
24 changes: 24 additions & 0 deletions internal/db/collection_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/event"
"github.com/sourcenetwork/defradb/internal/core"
coreblock "github.com/sourcenetwork/defradb/internal/core/block"
"github.com/sourcenetwork/defradb/internal/keys"
merklecrdt "github.com/sourcenetwork/defradb/internal/merkle/crdt"
)
Expand Down Expand Up @@ -162,5 +163,28 @@ func (c *collection) applyDelete(
c.db.events.Publish(event.NewMessage(event.UpdateName, updateEvent))
})

if c.def.Description.IsBranchable {
collectionCRDT := merklecrdt.NewMerkleCollection(
txn,
keys.NewCollectionSchemaVersionKey(c.Schema().VersionID, c.ID()),
keys.NewHeadstoreColKey(c.def.Description.RootID),
)

link, headNode, err := collectionCRDT.Save(ctx, []coreblock.DAGLink{{Link: link}})
if err != nil {
return err
}

updateEvent := event.Update{
Cid: link.Cid,
SchemaRoot: c.Schema().Root,
Block: headNode,
}

txn.OnSuccess(func() {
c.db.events.Publish(event.NewMessage(event.UpdateName, updateEvent))
})
}

return nil
}
20 changes: 20 additions & 0 deletions internal/db/definition_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ var collectionUpdateValidators = append(
validateIDExists,
validateSchemaVersionIDNotMutated,
validateCollectionNotRemoved,
validateCollectionIsBranchableNotMutated,
),
globalValidators...,
)
Expand Down Expand Up @@ -1036,3 +1037,22 @@ func validateCollectionFieldDefaultValue(

return nil
}

// validateCollectionIsBranchableNotMutated is a temporary restriction that prevents users from toggling
// whether or not a collection is branchable.
func validateCollectionIsBranchableNotMutated(
ctx context.Context,
db *db,
newState *definitionState,
oldState *definitionState,
) error {
for _, newCol := range newState.collections {
oldCol := oldState.collectionsByID[newCol.ID]

if newCol.IsBranchable != oldCol.IsBranchable {
return NewErrColMutatingIsBranchable(newCol.Name.Value())
}
}

return nil
}
9 changes: 9 additions & 0 deletions internal/db/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ const (
errFailedToHandleEncKeysReceivedEvent string = "failed to handle encryption-keys-received event"
errSelfReferenceWithoutSelf string = "must specify 'Self' kind for self referencing relations"
errColNotMaterialized string = "non-materialized collections are not supported"
errColMutatingIsBranchable string = "mutating IsBranchable is not supported"
errMaterializedViewAndACPNotSupported string = "materialized views do not support ACP"
errInvalidDefaultFieldValue string = "default field value is invalid"
)
Expand Down Expand Up @@ -152,6 +153,7 @@ var (
ErrContextDone = errors.New("context done")
ErrFailedToRetryDoc = errors.New("failed to retry doc")
ErrTimeoutDocRetry = errors.New("timeout while retrying doc")
ErrColMutatingIsBranchable = errors.New(errColMutatingIsBranchable)
)

// NewErrFailedToGetHeads returns a new error indicating that the heads of a document
Expand Down Expand Up @@ -676,6 +678,13 @@ func NewErrColNotMaterialized(collection string) error {
)
}

func NewErrColMutatingIsBranchable(collection string) error {
return errors.New(
errColMutatingIsBranchable,
errors.NewKV("Collection", collection),
)
}

func NewErrMaterializedViewAndACPNotSupported(collection string) error {
return errors.New(
errMaterializedViewAndACPNotSupported,
Expand Down
34 changes: 27 additions & 7 deletions internal/db/fetcher/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,18 @@ type HeadFetcher struct {
func (hf *HeadFetcher) Start(
ctx context.Context,
txn datastore.Txn,
prefix keys.HeadstoreDocKey,
prefix immutable.Option[keys.HeadstoreKey],
fieldId immutable.Option[string],
) error {
hf.fieldId = fieldId

var prefixString string
if prefix.HasValue() {
prefixString = prefix.Value().ToString()
}

q := dsq.Query{
Prefix: prefix.ToString(),
Prefix: prefixString,
Orders: []dsq.Order{dsq.OrderByKey{}},
}

Expand All @@ -64,17 +69,32 @@ func (hf *HeadFetcher) FetchNext() (*cid.Cid, error) {
return nil, nil
}

headStoreKey, err := keys.NewHeadstoreDocKey(res.Key)
headStoreKey, err := keys.NewHeadstoreKey(res.Key)
if err != nil {
return nil, err
}

if hf.fieldId.HasValue() && hf.fieldId.Value() != headStoreKey.FieldID {
// FieldIds do not match, continue to next row
return hf.FetchNext()
if hf.fieldId.HasValue() {
switch typedHeadStoreKey := headStoreKey.(type) {
case keys.HeadstoreDocKey:
if hf.fieldId.Value() != typedHeadStoreKey.FieldID {
// FieldIds do not match, continue to next row
return hf.FetchNext()
}

return &typedHeadStoreKey.Cid, nil

case keys.HeadstoreColKey:
if hf.fieldId.Value() == "" {
return &typedHeadStoreKey.Cid, nil
} else {
return nil, nil
}
}
}

return &headStoreKey.Cid, nil
cid := headStoreKey.GetCid()
return &cid, nil
}

func (hf *HeadFetcher) Close() error {
Expand Down
Loading

0 comments on commit 2540b3c

Please sign in to comment.