Skip to content

Commit

Permalink
WIP - Implement branchable collections
Browse files Browse the repository at this point in the history
  • Loading branch information
AndrewSisley committed Nov 6, 2024
1 parent b9abd21 commit f90f6ae
Show file tree
Hide file tree
Showing 32 changed files with 1,328 additions and 50 deletions.
4 changes: 4 additions & 0 deletions client/collection_description.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ type CollectionDescription struct {
// At the moment this can only be set to `false` if this collection sources its data from
// another collection/query (is a View).
IsMaterialized bool

IsBranchable bool
}

// QuerySource represents a collection data source from a query.
Expand Down Expand Up @@ -189,6 +191,7 @@ type collectionDescription struct {
RootID uint32
SchemaVersionID string
IsMaterialized bool
IsBranchable bool
Policy immutable.Option[PolicyDescription]
Indexes []IndexDescription
Fields []CollectionFieldDescription
Expand All @@ -209,6 +212,7 @@ func (c *CollectionDescription) UnmarshalJSON(bytes []byte) error {
c.RootID = descMap.RootID
c.SchemaVersionID = descMap.SchemaVersionID
c.IsMaterialized = descMap.IsMaterialized
c.IsBranchable = descMap.IsBranchable
c.Indexes = descMap.Indexes
c.Fields = descMap.Fields
c.Sources = make([]any, len(descMap.Sources))
Expand Down
1 change: 1 addition & 0 deletions internal/core/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func init() {
&crdt.LWWRegDelta{},
&crdt.CompositeDAGDelta{},
&crdt.CounterDelta{},
&crdt.CollectionDelta{},
)

EncryptionSchema, EncryptionSchemaPrototype = mustSetSchema(
Expand Down
74 changes: 74 additions & 0 deletions internal/core/crdt/collection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2024 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package crdt

import (
"context"

"github.com/sourcenetwork/defradb/datastore"
"github.com/sourcenetwork/defradb/internal/core"
"github.com/sourcenetwork/defradb/internal/keys"
)

type Collection struct {
store datastore.DSReaderWriter
// schemaVersionKey is the schema version datastore key at the time of commit.
//
// It can be used to identify the collection datastructure state at the time of commit.
schemaVersionKey keys.CollectionSchemaVersionKey
}

var _ core.ReplicatedData = (*Collection)(nil)

func NewCollection(store datastore.DSReaderWriter, schemaVersionKey keys.CollectionSchemaVersionKey) *Collection {
return &Collection{
store: store,
schemaVersionKey: schemaVersionKey,
}
}

func (c *Collection) Merge(ctx context.Context, other core.Delta) error {
// Collection merges don't actually need to do anything, as the delta is empty,
// and doc-level merges are handled by the document commits.

// todo - make sure this is well tested, you might be wrong!
return nil
}

func (c *Collection) Append() *CollectionDelta {
return &CollectionDelta{
SchemaVersionID: c.schemaVersionKey.SchemaVersionID,
}
}

type CollectionDelta struct {
Priority uint64
// todo: This is temporary pending a global collection id (link to ticket)
SchemaVersionID string
}

var _ core.Delta = (*CollectionDelta)(nil)

func (delta *CollectionDelta) IPLDSchemaBytes() []byte {
return []byte(`
type CollectionDelta struct {
priority Int
schemaVersionID String
}`)
}

func (d *CollectionDelta) GetPriority() uint64 {
return d.Priority
}

func (d *CollectionDelta) SetPriority(priority uint64) {
d.Priority = priority
}
22 changes: 22 additions & 0 deletions internal/core/crdt/ipld_union.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type CRDT struct {
LWWRegDelta *LWWRegDelta
CompositeDAGDelta *CompositeDAGDelta
CounterDelta *CounterDelta
CollectionDelta *CollectionDelta
}

// NewCRDT returns a new CRDT.
Expand All @@ -28,6 +29,8 @@ func NewCRDT(delta core.Delta) CRDT {
return CRDT{CompositeDAGDelta: d}
case *CounterDelta:
return CRDT{CounterDelta: d}
case *CollectionDelta:
return CRDT{CollectionDelta: d}
}
return CRDT{}
}
Expand All @@ -41,6 +44,7 @@ func (c CRDT) IPLDSchemaBytes() []byte {
| LWWRegDelta "lww"
| CompositeDAGDelta "composite"
| CounterDelta "counter"
| CollectionDelta "collection"
} representation keyed`)
}

Expand All @@ -53,6 +57,8 @@ func (c CRDT) GetDelta() core.Delta {
return c.CompositeDAGDelta
case c.CounterDelta != nil:
return c.CounterDelta
case c.CollectionDelta != nil:
return c.CollectionDelta
}
return nil
}
Expand All @@ -66,6 +72,8 @@ func (c CRDT) GetPriority() uint64 {
return c.CompositeDAGDelta.GetPriority()
case c.CounterDelta != nil:
return c.CounterDelta.GetPriority()
case c.CollectionDelta != nil:
return c.CollectionDelta.GetPriority()
}
return 0
}
Expand All @@ -90,6 +98,8 @@ func (c CRDT) GetDocID() []byte {
return c.CompositeDAGDelta.DocID
case c.CounterDelta != nil:
return c.CounterDelta.DocID
case c.CollectionDelta != nil:
return nil
}
return nil
}
Expand All @@ -103,6 +113,8 @@ func (c CRDT) GetSchemaVersionID() string {
return c.CompositeDAGDelta.SchemaVersionID
case c.CounterDelta != nil:
return c.CounterDelta.SchemaVersionID
case c.CollectionDelta != nil:
return c.CollectionDelta.SchemaVersionID
}
return ""
}
Expand Down Expand Up @@ -135,6 +147,11 @@ func (c CRDT) Clone() CRDT {
Nonce: c.CounterDelta.Nonce,
Data: c.CounterDelta.Data,
}
case c.CollectionDelta != nil:
cloned.CollectionDelta = &CollectionDelta{
Priority: c.CollectionDelta.Priority,
SchemaVersionID: c.CollectionDelta.SchemaVersionID,
}
}
return cloned
}
Expand Down Expand Up @@ -172,3 +189,8 @@ func (c CRDT) SetData(data []byte) {
func (c CRDT) IsComposite() bool {
return c.CompositeDAGDelta != nil
}

// IsCollection returns true if the CRDT is a collection CRDT.
func (c CRDT) IsCollection() bool {
return c.CollectionDelta != nil
}
23 changes: 23 additions & 0 deletions internal/db/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,29 @@ func (c *collection) save(
doc.SetHead(link.Cid)
})

if c.def.Description.IsBranchable {
collectionCRDT := merklecrdt.NewMerkleCollection(
txn,
keys.NewCollectionSchemaVersionKey(c.Schema().VersionID, c.ID()),
keys.NewHeadstoreColKey(c.def.Description.RootID),
)

link, headNode, err := collectionCRDT.Save(ctx, []coreblock.DAGLink{{Link: link}})
if err != nil {
return err
}

updateEvent := event.Update{
Cid: link.Cid,
SchemaRoot: c.Schema().Root,
Block: headNode,
}

txn.OnSuccess(func() {
c.db.events.Publish(event.NewMessage(event.UpdateName, updateEvent))
})
}

return nil
}

Expand Down
17 changes: 17 additions & 0 deletions internal/db/definition_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ var collectionUpdateValidators = append(
validateIDExists,
validateSchemaVersionIDNotMutated,
validateCollectionNotRemoved,
validateCollectionIsBranchable,
),
globalValidators...,
)
Expand Down Expand Up @@ -1036,3 +1037,19 @@ func validateCollectionFieldDefaultValue(

return nil
}
func validateCollectionIsBranchable(
ctx context.Context,
db *db,
newState *definitionState,
oldState *definitionState,
) error {
for _, newCol := range newState.collections {
oldCol := oldState.collectionsByID[newCol.ID]

if newCol.IsBranchable != oldCol.IsBranchable {
return NewErrColMutatingIsBranchable(newCol.Name.Value())
}
}

return nil
}
9 changes: 9 additions & 0 deletions internal/db/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ const (
errFailedToHandleEncKeysReceivedEvent string = "failed to handle encryption-keys-received event"
errSelfReferenceWithoutSelf string = "must specify 'Self' kind for self referencing relations"
errColNotMaterialized string = "non-materialized collections are not supported"
errColMutatingIsBranchable string = "mutating IsBranchable is not supported"
errMaterializedViewAndACPNotSupported string = "materialized views do not support ACP"
errInvalidDefaultFieldValue string = "default field value is invalid"
)
Expand Down Expand Up @@ -152,6 +153,7 @@ var (
ErrContextDone = errors.New("context done")
ErrFailedToRetryDoc = errors.New("failed to retry doc")
ErrTimeoutDocRetry = errors.New("timeout while retrying doc")
ErrColMutatingIsBranchable = errors.New(errColMutatingIsBranchable)
)

// NewErrFailedToGetHeads returns a new error indicating that the heads of a document
Expand Down Expand Up @@ -676,6 +678,13 @@ func NewErrColNotMaterialized(collection string) error {
)
}

func NewErrColMutatingIsBranchable(collection string) error {
return errors.New(
errColMutatingIsBranchable,
errors.NewKV("Collection", collection),
)
}

func NewErrMaterializedViewAndACPNotSupported(collection string) error {
return errors.New(
errMaterializedViewAndACPNotSupported,
Expand Down
34 changes: 27 additions & 7 deletions internal/db/fetcher/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,18 @@ type HeadFetcher struct {
func (hf *HeadFetcher) Start(
ctx context.Context,
txn datastore.Txn,
prefix keys.HeadstoreDocKey,
prefix immutable.Option[keys.HeadstoreKey],
fieldId immutable.Option[string],
) error {
hf.fieldId = fieldId

var prefixString string
if prefix.HasValue() {
prefixString = prefix.Value().ToString()
}

q := dsq.Query{
Prefix: prefix.ToString(),
Prefix: prefixString,
Orders: []dsq.Order{dsq.OrderByKey{}},
}

Expand All @@ -64,17 +69,32 @@ func (hf *HeadFetcher) FetchNext() (*cid.Cid, error) {
return nil, nil
}

headStoreKey, err := keys.NewHeadstoreDocKey(res.Key)
headStoreKey, err := keys.NewHeadstoreKey(res.Key)
if err != nil {
return nil, err
}

if hf.fieldId.HasValue() && hf.fieldId.Value() != headStoreKey.FieldID {
// FieldIds do not match, continue to next row
return hf.FetchNext()
if hf.fieldId.HasValue() {
switch typedHeadStoreKey := headStoreKey.(type) {
case keys.HeadstoreDocKey:
if hf.fieldId.Value() != typedHeadStoreKey.FieldID {
// FieldIds do not match, continue to next row
return hf.FetchNext()
}

return &typedHeadStoreKey.Cid, nil

case keys.HeadstoreColKey:
if hf.fieldId.Value() == "" {
return &typedHeadStoreKey.Cid, nil
} else {
return nil, nil
}
}
}

return &headStoreKey.Cid, nil
cid := headStoreKey.GetCid()
return &cid, nil
}

func (hf *HeadFetcher) Close() error {
Expand Down
2 changes: 1 addition & 1 deletion internal/db/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,7 @@ func decryptBlock(
) (*coreblock.Block, error) {
_, encryptor := encryption.EnsureContextWithEncryptor(ctx)

if block.Delta.IsComposite() {
if block.Delta.IsComposite() || block.Delta.IsCollection() {
// for composite blocks there is nothing to decrypt
return block, nil
}
Expand Down
24 changes: 24 additions & 0 deletions internal/keys/headstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,30 @@

package keys

import (
"strings"

"github.com/ipfs/go-cid"
)

const (
HEADSTORE_DOC = "/d"
HEADSTORE_COL = "/c"
)

type HeadstoreKey interface {
Walkable
GetCid() cid.Cid
WithCid(c cid.Cid) HeadstoreKey
}

func NewHeadstoreKey(key string) (HeadstoreKey, error) {
switch {
case strings.HasPrefix(key, HEADSTORE_DOC):
return NewHeadstoreDocKey(key)
case strings.HasPrefix(key, HEADSTORE_COL):
return NewHeadstoreColKeyFromString(key)
default:
return nil, ErrInvalidKey
}
}
Loading

0 comments on commit f90f6ae

Please sign in to comment.