diff --git a/client/collection_description.go b/client/collection_description.go index d86a252644..ee9ad3679d 100644 --- a/client/collection_description.go +++ b/client/collection_description.go @@ -88,6 +88,8 @@ type CollectionDescription struct { // At the moment this can only be set to `false` if this collection sources its data from // another collection/query (is a View). IsMaterialized bool + + IsBranchable bool } // QuerySource represents a collection data source from a query. @@ -189,6 +191,7 @@ type collectionDescription struct { RootID uint32 SchemaVersionID string IsMaterialized bool + IsBranchable bool Policy immutable.Option[PolicyDescription] Indexes []IndexDescription Fields []CollectionFieldDescription @@ -209,6 +212,7 @@ func (c *CollectionDescription) UnmarshalJSON(bytes []byte) error { c.RootID = descMap.RootID c.SchemaVersionID = descMap.SchemaVersionID c.IsMaterialized = descMap.IsMaterialized + c.IsBranchable = descMap.IsBranchable c.Indexes = descMap.Indexes c.Fields = descMap.Fields c.Sources = make([]any, len(descMap.Sources)) diff --git a/internal/core/block/block.go b/internal/core/block/block.go index e930816030..3b9b4815be 100644 --- a/internal/core/block/block.go +++ b/internal/core/block/block.go @@ -44,6 +44,7 @@ func init() { &crdt.LWWRegDelta{}, &crdt.CompositeDAGDelta{}, &crdt.CounterDelta{}, + &crdt.CollectionDelta{}, ) EncryptionSchema, EncryptionSchemaPrototype = mustSetSchema( diff --git a/internal/core/crdt/collection.go b/internal/core/crdt/collection.go new file mode 100644 index 0000000000..33d6f30c4f --- /dev/null +++ b/internal/core/crdt/collection.go @@ -0,0 +1,74 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package crdt + +import ( + "context" + + "github.com/sourcenetwork/defradb/datastore" + "github.com/sourcenetwork/defradb/internal/core" + "github.com/sourcenetwork/defradb/internal/keys" +) + +type Collection struct { + store datastore.DSReaderWriter + // schemaVersionKey is the schema version datastore key at the time of commit. + // + // It can be used to identify the collection datastructure state at the time of commit. + schemaVersionKey keys.CollectionSchemaVersionKey +} + +var _ core.ReplicatedData = (*Collection)(nil) + +func NewCollection(store datastore.DSReaderWriter, schemaVersionKey keys.CollectionSchemaVersionKey) *Collection { + return &Collection{ + store: store, + schemaVersionKey: schemaVersionKey, + } +} + +func (c *Collection) Merge(ctx context.Context, other core.Delta) error { + // Collection merges don't actually need to do anything, as the delta is empty, + // and doc-level merges are handled by the document commits. + + // todo - make sure this is well tested, you might be wrong! + return nil +} + +func (c *Collection) Append() *CollectionDelta { + return &CollectionDelta{ + SchemaVersionID: c.schemaVersionKey.SchemaVersionID, + } +} + +type CollectionDelta struct { + Priority uint64 + // todo: This is temporary pending a global collection id (link to ticket) + SchemaVersionID string +} + +var _ core.Delta = (*CollectionDelta)(nil) + +func (delta *CollectionDelta) IPLDSchemaBytes() []byte { + return []byte(` + type CollectionDelta struct { + priority Int + schemaVersionID String + }`) +} + +func (d *CollectionDelta) GetPriority() uint64 { + return d.Priority +} + +func (d *CollectionDelta) SetPriority(priority uint64) { + d.Priority = priority +} diff --git a/internal/core/crdt/ipld_union.go b/internal/core/crdt/ipld_union.go index 28c9ccf420..8187b626a9 100644 --- a/internal/core/crdt/ipld_union.go +++ b/internal/core/crdt/ipld_union.go @@ -17,6 +17,7 @@ type CRDT struct { LWWRegDelta *LWWRegDelta CompositeDAGDelta *CompositeDAGDelta CounterDelta *CounterDelta + CollectionDelta *CollectionDelta } // NewCRDT returns a new CRDT. @@ -28,6 +29,8 @@ func NewCRDT(delta core.Delta) CRDT { return CRDT{CompositeDAGDelta: d} case *CounterDelta: return CRDT{CounterDelta: d} + case *CollectionDelta: + return CRDT{CollectionDelta: d} } return CRDT{} } @@ -41,6 +44,7 @@ func (c CRDT) IPLDSchemaBytes() []byte { | LWWRegDelta "lww" | CompositeDAGDelta "composite" | CounterDelta "counter" + | CollectionDelta "collection" } representation keyed`) } @@ -53,6 +57,8 @@ func (c CRDT) GetDelta() core.Delta { return c.CompositeDAGDelta case c.CounterDelta != nil: return c.CounterDelta + case c.CollectionDelta != nil: + return c.CollectionDelta } return nil } @@ -66,6 +72,8 @@ func (c CRDT) GetPriority() uint64 { return c.CompositeDAGDelta.GetPriority() case c.CounterDelta != nil: return c.CounterDelta.GetPriority() + case c.CollectionDelta != nil: + return c.CollectionDelta.GetPriority() } return 0 } @@ -90,6 +98,8 @@ func (c CRDT) GetDocID() []byte { return c.CompositeDAGDelta.DocID case c.CounterDelta != nil: return c.CounterDelta.DocID + case c.CollectionDelta != nil: + return nil } return nil } @@ -103,6 +113,8 @@ func (c CRDT) GetSchemaVersionID() string { return c.CompositeDAGDelta.SchemaVersionID case c.CounterDelta != nil: return c.CounterDelta.SchemaVersionID + case c.CollectionDelta != nil: + return c.CollectionDelta.SchemaVersionID } return "" } @@ -135,6 +147,11 @@ func (c CRDT) Clone() CRDT { Nonce: c.CounterDelta.Nonce, Data: c.CounterDelta.Data, } + case c.CollectionDelta != nil: + cloned.CollectionDelta = &CollectionDelta{ + Priority: c.CollectionDelta.Priority, + SchemaVersionID: c.CollectionDelta.SchemaVersionID, + } } return cloned } @@ -172,3 +189,8 @@ func (c CRDT) SetData(data []byte) { func (c CRDT) IsComposite() bool { return c.CompositeDAGDelta != nil } + +// IsCollection returns true if the CRDT is a collection CRDT. +func (c CRDT) IsCollection() bool { + return c.CollectionDelta != nil +} diff --git a/internal/db/collection.go b/internal/db/collection.go index 8d71c7aff6..1e143e56cd 100644 --- a/internal/db/collection.go +++ b/internal/db/collection.go @@ -711,6 +711,29 @@ func (c *collection) save( doc.SetHead(link.Cid) }) + if c.def.Description.IsBranchable { + collectionCRDT := merklecrdt.NewMerkleCollection( + txn, + keys.NewCollectionSchemaVersionKey(c.Schema().VersionID, c.ID()), + keys.NewHeadstoreColKey(c.def.Description.RootID), + ) + + link, headNode, err := collectionCRDT.Save(ctx, []coreblock.DAGLink{{Link: link}}) + if err != nil { + return err + } + + updateEvent := event.Update{ + Cid: link.Cid, + SchemaRoot: c.Schema().Root, + Block: headNode, + } + + txn.OnSuccess(func() { + c.db.events.Publish(event.NewMessage(event.UpdateName, updateEvent)) + }) + } + return nil } diff --git a/internal/db/fetcher/dag.go b/internal/db/fetcher/dag.go index 897a18c434..e9ceb69b9a 100644 --- a/internal/db/fetcher/dag.go +++ b/internal/db/fetcher/dag.go @@ -31,13 +31,18 @@ type HeadFetcher struct { func (hf *HeadFetcher) Start( ctx context.Context, txn datastore.Txn, - prefix keys.HeadstoreDocKey, + prefix immutable.Option[keys.HeadstoreKey], fieldId immutable.Option[string], ) error { hf.fieldId = fieldId + var prefixString string + if prefix.HasValue() { + prefixString = prefix.Value().ToString() + } + q := dsq.Query{ - Prefix: prefix.ToString(), + Prefix: prefixString, Orders: []dsq.Order{dsq.OrderByKey{}}, } @@ -64,17 +69,28 @@ func (hf *HeadFetcher) FetchNext() (*cid.Cid, error) { return nil, nil } - headStoreKey, err := keys.NewHeadstoreDocKey(res.Key) + headStoreKey, err := keys.NewHeadstoreKey(res.Key) if err != nil { return nil, err } - if hf.fieldId.HasValue() && hf.fieldId.Value() != headStoreKey.FieldID { - // FieldIds do not match, continue to next row - return hf.FetchNext() + if hf.fieldId.HasValue() { + switch typedHeadStoreKey := headStoreKey.(type) { + case keys.HeadstoreDocKey: + if hf.fieldId.Value() != typedHeadStoreKey.FieldID { + // FieldIds do not match, continue to next row + return hf.FetchNext() + } + + return &typedHeadStoreKey.Cid, nil + + case keys.HeadstoreColKey: + return &typedHeadStoreKey.Cid, nil // todo - define/document behaviour if fieldID provided! + } } - return &headStoreKey.Cid, nil + cid := headStoreKey.GetCid() + return &cid, nil } func (hf *HeadFetcher) Close() error { diff --git a/internal/db/merge.go b/internal/db/merge.go index 74db1ad302..9cc6d7dc28 100644 --- a/internal/db/merge.go +++ b/internal/db/merge.go @@ -411,7 +411,7 @@ func decryptBlock( ) (*coreblock.Block, error) { _, encryptor := encryption.EnsureContextWithEncryptor(ctx) - if block.Delta.IsComposite() { + if block.Delta.IsComposite() || block.Delta.IsCollection() { // for composite blocks there is nothing to decrypt return block, nil } diff --git a/internal/keys/headstore.go b/internal/keys/headstore.go index b0db874169..db188a097b 100644 --- a/internal/keys/headstore.go +++ b/internal/keys/headstore.go @@ -10,6 +10,30 @@ package keys +import ( + "strings" + + "github.com/ipfs/go-cid" +) + const ( HEADSTORE_DOC = "/d" + HEADSTORE_COL = "/c" ) + +type HeadstoreKey interface { + Walkable + GetCid() cid.Cid + WithCid(c cid.Cid) HeadstoreKey +} + +func NewHeadstoreKey(key string) (HeadstoreKey, error) { + switch { + case strings.HasPrefix(key, HEADSTORE_DOC): + return NewHeadstoreDocKey(key) + case strings.HasPrefix(key, HEADSTORE_COL): + return NewHeadstoreColKeyFromString(key) + default: + return nil, ErrInvalidKey + } +} diff --git a/internal/keys/headstore_collection.go b/internal/keys/headstore_collection.go new file mode 100644 index 0000000000..eec9eb744f --- /dev/null +++ b/internal/keys/headstore_collection.go @@ -0,0 +1,102 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package keys + +import ( + "strconv" + "strings" + + "github.com/ipfs/go-cid" + ds "github.com/ipfs/go-datastore" +) + +type HeadstoreColKey struct { + CollectionRootID uint32 + Cid cid.Cid +} + +var _ HeadstoreKey = (*HeadstoreColKey)(nil) + +func NewHeadstoreColKey(colRootID uint32) HeadstoreColKey { + return HeadstoreColKey{ + CollectionRootID: colRootID, + } +} + +func NewHeadstoreColKeyFromString(key string) (HeadstoreColKey, error) { + elements := strings.Split(key, "/") + if len(elements) != 4 { + return HeadstoreColKey{}, ErrInvalidKey + } + + rootID, err := strconv.Atoi(elements[2]) + if err != nil { + return HeadstoreColKey{}, err + } + + cid, err := cid.Decode(elements[3]) + if err != nil { + return HeadstoreColKey{}, err + } + + return HeadstoreColKey{ + // elements[0] is empty (key has leading '/') + CollectionRootID: uint32(rootID), + Cid: cid, + }, nil +} + +func (k HeadstoreColKey) WithCid(c cid.Cid) HeadstoreKey { + newKey := k + newKey.Cid = c + return newKey +} + +func (k HeadstoreColKey) GetCid() cid.Cid { + return k.Cid +} + +func (k HeadstoreColKey) ToString() string { + result := HEADSTORE_COL + + if k.CollectionRootID != 0 { + result = result + "/" + strconv.Itoa(int(k.CollectionRootID)) + } + if k.Cid.Defined() { + result = result + "/" + k.Cid.String() + } + + return result +} + +func (k HeadstoreColKey) Bytes() []byte { + return []byte(k.ToString()) +} + +func (k HeadstoreColKey) ToDS() ds.Key { + return ds.NewKey(k.ToString()) +} + +func (k HeadstoreColKey) PrefixEnd() Walkable { + newKey := k + + if k.Cid.Defined() { + newKey.Cid = cid.MustParse(bytesPrefixEnd(k.Cid.Bytes())) + return newKey + } + + if k.CollectionRootID != 0 { + newKey.CollectionRootID = k.CollectionRootID + 1 + return newKey + } + + return newKey +} diff --git a/internal/keys/headstore_doc.go b/internal/keys/headstore_doc.go index fc65e2f157..d38dea5fab 100644 --- a/internal/keys/headstore_doc.go +++ b/internal/keys/headstore_doc.go @@ -23,7 +23,7 @@ type HeadstoreDocKey struct { Cid cid.Cid } -var _ Walkable = (*HeadstoreDocKey)(nil) +var _ HeadstoreKey = (*HeadstoreDocKey)(nil) // Creates a new HeadstoreDocKey from a string as best as it can, // splitting the input using '/' as a field deliminator. It assumes @@ -57,12 +57,16 @@ func (k HeadstoreDocKey) WithDocID(docID string) HeadstoreDocKey { return newKey } -func (k HeadstoreDocKey) WithCid(c cid.Cid) HeadstoreDocKey { +func (k HeadstoreDocKey) WithCid(c cid.Cid) HeadstoreKey { newKey := k newKey.Cid = c return newKey } +func (k HeadstoreDocKey) GetCid() cid.Cid { + return k.Cid +} + func (k HeadstoreDocKey) WithFieldID(fieldID string) HeadstoreDocKey { newKey := k newKey.FieldID = fieldID diff --git a/internal/merkle/clock/clock.go b/internal/merkle/clock/clock.go index e7682f1912..15a07acca9 100644 --- a/internal/merkle/clock/clock.go +++ b/internal/merkle/clock/clock.go @@ -48,7 +48,7 @@ func NewMerkleClock( headstore datastore.DSReaderWriter, blockstore datastore.Blockstore, encstore datastore.Blockstore, - namespace keys.HeadstoreDocKey, + namespace keys.HeadstoreKey, crdt core.ReplicatedData, ) *MerkleClock { return &MerkleClock{ @@ -207,7 +207,7 @@ func encryptBlock( block *coreblock.Block, encBlock *coreblock.Encryption, ) (*coreblock.Block, error) { - if block.Delta.IsComposite() { + if block.Delta.IsComposite() || block.Delta.IsCollection() { return block, nil } diff --git a/internal/merkle/clock/heads.go b/internal/merkle/clock/heads.go index c0afa21636..873ba64503 100644 --- a/internal/merkle/clock/heads.go +++ b/internal/merkle/clock/heads.go @@ -27,17 +27,17 @@ import ( // heads manages the current Merkle-CRDT heads. type heads struct { store datastore.DSReaderWriter - namespace keys.HeadstoreDocKey + namespace keys.HeadstoreKey } -func NewHeadSet(store datastore.DSReaderWriter, namespace keys.HeadstoreDocKey) *heads { +func NewHeadSet(store datastore.DSReaderWriter, namespace keys.HeadstoreKey) *heads { return &heads{ store: store, namespace: namespace, } } -func (hh *heads) key(c cid.Cid) keys.HeadstoreDocKey { +func (hh *heads) key(c cid.Cid) keys.HeadstoreKey { return hh.namespace.WithCid(c) } @@ -102,7 +102,7 @@ func (hh *heads) List(ctx context.Context) ([]cid.Cid, uint64, error) { return nil, 0, NewErrFailedToGetNextQResult(r.Error) } - headKey, err := keys.NewHeadstoreDocKey(r.Key) + headKey, err := keys.NewHeadstoreKey(r.Key) if err != nil { return nil, 0, err } @@ -111,7 +111,7 @@ func (hh *heads) List(ctx context.Context) ([]cid.Cid, uint64, error) { if n <= 0 { return nil, 0, ErrDecodingHeight } - heads = append(heads, headKey.Cid) + heads = append(heads, headKey.GetCid()) if height > maxHeight { maxHeight = height } diff --git a/internal/merkle/crdt/collection.go b/internal/merkle/crdt/collection.go new file mode 100644 index 0000000000..5faf348c39 --- /dev/null +++ b/internal/merkle/crdt/collection.go @@ -0,0 +1,53 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package merklecrdt + +import ( + "context" + + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + + coreblock "github.com/sourcenetwork/defradb/internal/core/block" + "github.com/sourcenetwork/defradb/internal/core/crdt" + "github.com/sourcenetwork/defradb/internal/keys" + "github.com/sourcenetwork/defradb/internal/merkle/clock" +) + +type MerkleCollection struct { + clock *clock.MerkleClock + reg *crdt.Collection +} + +var _ MerkleCRDT = (*MerkleCollection)(nil) + +func NewMerkleCollection( + store Stores, + schemaVersionKey keys.CollectionSchemaVersionKey, + key keys.HeadstoreColKey, +) *MerkleCollection { + register := crdt.NewCollection(store.Datastore(), schemaVersionKey) + + clk := clock.NewMerkleClock(store.Headstore(), store.Blockstore(), store.Encstore(), key, register) + + return &MerkleCollection{ + clock: clk, + reg: register, + } +} + +func (m *MerkleCollection) Clock() *clock.MerkleClock { + return m.clock +} + +func (m *MerkleCollection) Save(ctx context.Context, links []coreblock.DAGLink) (cidlink.Link, []byte, error) { + delta := m.reg.Append() + return m.clock.AddDelta(ctx, delta, links...) +} diff --git a/internal/planner/commit.go b/internal/planner/commit.go index d53ddbd581..8c49f4d072 100644 --- a/internal/planner/commit.go +++ b/internal/planner/commit.go @@ -36,7 +36,7 @@ type dagScanNode struct { queuedCids []*cid.Cid fetcher fetcher.HeadFetcher - prefix keys.HeadstoreDocKey + prefix immutable.Option[keys.HeadstoreKey] commitSelect *mapper.CommitSelect execInfo dagScanExecInfo @@ -67,8 +67,7 @@ func (n *dagScanNode) Kind() string { } func (n *dagScanNode) Init() error { - undefined := keys.HeadstoreDocKey{} - if n.prefix == undefined { + if !n.prefix.HasValue() { if n.commitSelect.DocID.HasValue() { key := keys.HeadstoreDocKey{}.WithDocID(n.commitSelect.DocID.Value()) @@ -77,7 +76,7 @@ func (n *dagScanNode) Init() error { key = key.WithFieldID(field) } - n.prefix = key + n.prefix = immutable.Some[keys.HeadstoreKey](key) } } @@ -114,7 +113,7 @@ func (n *dagScanNode) Spans(spans []core.Span) { start = s } - n.prefix = start.WithFieldID(fieldID) + n.prefix = immutable.Some[keys.HeadstoreKey](start.WithFieldID(fieldID)) return } } @@ -144,14 +143,13 @@ func (n *dagScanNode) simpleExplain() (map[string]any, error) { // Build the explanation of the spans attribute. spansExplainer := []map[string]any{} - undefinedHsKey := keys.HeadstoreDocKey{} // Note: n.headset is `nil` for single commit selection query, so must check for it. - if n.prefix != undefinedHsKey { + if n.prefix.HasValue() { spansExplainer = append( spansExplainer, map[string]any{ - "start": n.prefix.ToString(), - "end": n.prefix.PrefixEnd().ToString(), + "start": n.prefix.Value().ToString(), + "end": n.prefix.Value().PrefixEnd().ToString(), }, ) } @@ -305,11 +303,13 @@ func (n *dagScanNode) dagBlockToNodeDoc(block *coreblock.Block) (core.Doc, error n.commitSelect.DocumentMapping.SetFirstOfName(&commit, request.SchemaVersionIDFieldName, schemaVersionId) var fieldName any - - var fieldID string + var fieldID any if block.Delta.CompositeDAGDelta != nil { fieldID = core.COMPOSITE_NAMESPACE fieldName = nil + } else if block.Delta.CollectionDelta != nil { + fieldID = nil + fieldName = nil } else { fName := block.Delta.GetFieldName() fieldName = fName @@ -348,9 +348,13 @@ func (n *dagScanNode) dagBlockToNodeDoc(block *coreblock.Block) (core.Doc, error n.commitSelect.DocumentMapping.SetFirstOfName(&commit, request.FieldIDFieldName, fieldID) docID := block.Delta.GetDocID() - - n.commitSelect.DocumentMapping.SetFirstOfName(&commit, - request.DocIDArgName, string(docID)) + if docID != nil { + n.commitSelect.DocumentMapping.SetFirstOfName( + &commit, + request.DocIDArgName, + string(docID), + ) + } cols, err := n.planner.db.GetCollections( n.planner.ctx, @@ -391,7 +395,9 @@ func (n *dagScanNode) dagBlockToNodeDoc(block *coreblock.Block) (core.Doc, error for _, l := range block.Links { link := linksMapping.NewDoc() - linksMapping.SetFirstOfName(&link, request.LinksNameFieldName, l.Name) + if l.Name != "" { + linksMapping.SetFirstOfName(&link, request.LinksNameFieldName, l.Name) + } linksMapping.SetFirstOfName(&link, request.LinksCidFieldName, l.Link.Cid.String()) links[i] = link diff --git a/internal/request/graphql/schema/collection.go b/internal/request/graphql/schema/collection.go index 7085ba5c97..bfe4e9954a 100644 --- a/internal/request/graphql/schema/collection.go +++ b/internal/request/graphql/schema/collection.go @@ -163,6 +163,7 @@ func collectionFromAstDefinition( }) isMaterialized := immutable.None[bool]() + var isBranchable bool for _, directive := range def.Directives { switch directive.Name.Value { case types.IndexDirectiveLabel: @@ -197,6 +198,22 @@ func collectionFromAstDefinition( } else { isMaterialized = immutable.Some(true) } + + case types.BranchableDirectiveLabel: + if isBranchable { + continue + } + + explicitIsBranchable := immutable.None[bool]() + + for _, arg := range directive.Arguments { + if arg.Name.Value == types.BranchableDirectivePropIf { + explicitIsBranchable = immutable.Some(arg.Value.GetValue().(bool)) + break + } + } + + isBranchable = !explicitIsBranchable.HasValue() || explicitIsBranchable.Value() } } @@ -207,6 +224,7 @@ func collectionFromAstDefinition( Policy: policyDescription, Fields: collectionFieldDescriptions, IsMaterialized: !isMaterialized.HasValue() || isMaterialized.Value(), + IsBranchable: isBranchable, }, Schema: client.SchemaDescription{ Name: def.Name.Value, diff --git a/internal/request/graphql/schema/schema.go b/internal/request/graphql/schema/schema.go index d18911c929..1168687441 100644 --- a/internal/request/graphql/schema/schema.go +++ b/internal/request/graphql/schema/schema.go @@ -105,6 +105,7 @@ func defaultDirectivesType( types.PrimaryDirective(), types.RelationDirective(), types.MaterializedDirective(), + types.BranchableDirective(), } } diff --git a/internal/request/graphql/schema/types/types.go b/internal/request/graphql/schema/types/types.go index 5770b4b579..c3358cb0ea 100644 --- a/internal/request/graphql/schema/types/types.go +++ b/internal/request/graphql/schema/types/types.go @@ -54,6 +54,9 @@ const ( MaterializedDirectiveLabel = "materialized" MaterializedDirectivePropIf = "if" + BranchableDirectiveLabel = "branchable" + BranchableDirectivePropIf = "if" + FieldOrderASC = "ASC" FieldOrderDESC = "DESC" ) @@ -237,6 +240,21 @@ func MaterializedDirective() *gql.Directive { }) } +func BranchableDirective() *gql.Directive { + return gql.NewDirective(gql.DirectiveConfig{ + Name: BranchableDirectiveLabel, + Description: `todo`, + Args: gql.FieldConfigArgument{ + BranchableDirectivePropIf: &gql.ArgumentConfig{ + Type: gql.Boolean, + }, + }, + Locations: []string{ + gql.DirectiveLocationObject, + }, + }) +} + func CRDTEnum() *gql.Enum { return gql.NewEnum(gql.EnumConfig{ Name: "CRDTType", diff --git a/tests/integration/events.go b/tests/integration/events.go index 6129d600ee..5ae04d452d 100644 --- a/tests/integration/events.go +++ b/tests/integration/events.go @@ -178,6 +178,10 @@ func waitForUpdateEvents( require.Fail(s.t, "timeout waiting for update event") } + if evt.DocID == "" { + continue // todo, this might be fine, but probs needs changing when testing p2p cols + } + // make sure the event is expected _, ok := expect[evt.DocID] require.True(s.t, ok, "unexpected document update") diff --git a/tests/integration/query/commits/branchables/create_test.go b/tests/integration/query/commits/branchables/create_test.go new file mode 100644 index 0000000000..5eb9d01392 --- /dev/null +++ b/tests/integration/query/commits/branchables/create_test.go @@ -0,0 +1,117 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package branchables + +import ( + "testing" + + testUtils "github.com/sourcenetwork/defradb/tests/integration" +) + +func TestQueryCommitsBranchables_WithMultipleCreate(t *testing.T) { + test := testUtils.TestCase{ + Actions: []any{ + testUtils.SchemaUpdate{ + Schema: ` + type Users @branchable { + name: String + age: Int + } + `, + }, + testUtils.CreateDoc{ + Doc: `{ + "name": "John", + "age": 21 + }`, + }, + testUtils.CreateDoc{ + Doc: `{ + "name": "Fred", + "age": 25 + }`, + }, + testUtils.Request{ + Request: `query { + commits { + cid + links { + cid + } + } + }`, + Results: map[string]any{ + "commits": []map[string]any{ + { + "cid": testUtils.NewUniqueCid("collection, doc2 create"), + "links": []map[string]any{ + { + "cid": testUtils.NewUniqueCid("collection, doc1 create"), + }, + { + "cid": testUtils.NewUniqueCid("doc2 create"), + }, + }, + }, + { + "cid": testUtils.NewUniqueCid("collection, doc1 create"), + "links": []map[string]any{ + { + "cid": testUtils.NewUniqueCid("doc1 create"), + }, + }, + }, + { + "cid": testUtils.NewUniqueCid("doc1 name"), + "links": []map[string]any{}, + }, + { + "cid": testUtils.NewUniqueCid("doc1 age"), + "links": []map[string]any{}, + }, + { + "cid": testUtils.NewUniqueCid("doc1 create"), + "links": []map[string]any{ + { + "cid": testUtils.NewUniqueCid("doc1 name"), + }, + { + "cid": testUtils.NewUniqueCid("doc1 age"), + }, + }, + }, + { + "cid": testUtils.NewUniqueCid("doc2 name"), + "links": []map[string]any{}, + }, + { + "cid": testUtils.NewUniqueCid("doc2 age"), + "links": []map[string]any{}, + }, + { + "cid": testUtils.NewUniqueCid("doc2 create"), + "links": []map[string]any{ + { + "cid": testUtils.NewUniqueCid("doc2 name"), + }, + { + "cid": testUtils.NewUniqueCid("doc2 age"), + }, + }, + }, + }, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +} diff --git a/tests/integration/query/commits/branchables/peer_test.go b/tests/integration/query/commits/branchables/peer_test.go new file mode 100644 index 0000000000..c92d1b099f --- /dev/null +++ b/tests/integration/query/commits/branchables/peer_test.go @@ -0,0 +1,132 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package branchables + +import ( + "testing" + + testUtils "github.com/sourcenetwork/defradb/tests/integration" + "github.com/sourcenetwork/immutable" +) + +// TODO: This test documents an unimplemented feature. Tracked by: +// https://github.com/sourcenetwork/defradb/issues/3212 +func TestQueryCommitsBranchables_AcrossPeerConnection(t *testing.T) { + test := testUtils.TestCase{ + Actions: []any{ + testUtils.RandomNetworkingConfig(), + testUtils.RandomNetworkingConfig(), + testUtils.SchemaUpdate{ + Schema: ` + type Users @branchable { + name: String + age: Int + } + `, + }, + testUtils.ConnectPeers{ + SourceNodeID: 1, + TargetNodeID: 0, + }, + testUtils.SubscribeToCollection{ + NodeID: 1, + CollectionIDs: []int{0}, + }, + testUtils.CreateDoc{ + NodeID: immutable.Some(0), + Doc: `{ + "name": "John", + "age": 21 + }`, + }, + testUtils.WaitForSync{}, + testUtils.Request{ + NodeID: immutable.Some(0), + Request: `query { + commits { + cid + links { + cid + } + } + }`, + Results: map[string]any{ + "commits": []map[string]any{ + { + "cid": testUtils.NewUniqueCid("collection"), + "links": []map[string]any{ + { + "cid": testUtils.NewUniqueCid("composite"), + }, + }, + }, + { + "cid": testUtils.NewUniqueCid("age"), + "links": []map[string]any{}, + }, + { + "cid": testUtils.NewUniqueCid("name"), + "links": []map[string]any{}, + }, + { + "cid": testUtils.NewUniqueCid("composite"), + "links": []map[string]any{ + { + "cid": testUtils.NewUniqueCid("age"), + }, + { + "cid": testUtils.NewUniqueCid("name"), + }, + }, + }, + }, + }, + }, + testUtils.Request{ + NodeID: immutable.Some(1), + Request: `query { + commits { + cid + links { + cid + } + } + }`, + Results: map[string]any{ + "commits": []map[string]any{ + // Note: The collection commit has not synced. + { + "cid": testUtils.NewUniqueCid("age"), + "links": []map[string]any{}, + }, + { + "cid": testUtils.NewUniqueCid("name"), + "links": []map[string]any{}, + }, + { + "cid": testUtils.NewUniqueCid("composite"), + "links": []map[string]any{ + { + "cid": testUtils.NewUniqueCid("age"), + }, + { + "cid": testUtils.NewUniqueCid("name"), + }, + }, + }, + }, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +} diff --git a/tests/integration/query/commits/branchables/simple_test.go b/tests/integration/query/commits/branchables/simple_test.go new file mode 100644 index 0000000000..44f0021ca4 --- /dev/null +++ b/tests/integration/query/commits/branchables/simple_test.go @@ -0,0 +1,161 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package branchables + +import ( + "testing" + + testUtils "github.com/sourcenetwork/defradb/tests/integration" +) + +func TestQueryCommitsBranchables(t *testing.T) { + test := testUtils.TestCase{ + Actions: []any{ + testUtils.SchemaUpdate{ + Schema: ` + type Users @branchable { + name: String + age: Int + } + `, + }, + testUtils.CreateDoc{ + Doc: `{ + "name": "John", + "age": 21 + }`, + }, + testUtils.Request{ + Request: `query { + commits { + cid + } + }`, + Results: map[string]any{ + "commits": []map[string]any{ + { + "cid": testUtils.NewUniqueCid("collection"), + }, + { + "cid": testUtils.NewUniqueCid("name"), + }, + { + "cid": testUtils.NewUniqueCid("age"), + }, + { + "cid": testUtils.NewUniqueCid("head"), + }, + }, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +} + +func TestQueryCommitsBranchables_WithAllFields(t *testing.T) { + test := testUtils.TestCase{ + Actions: []any{ + testUtils.SchemaUpdate{ + Schema: ` + type Users @branchable { + name: String + age: Int + } + `, + }, + testUtils.CreateDoc{ + Doc: `{ + "name": "John", + "age": 21 + }`, + }, + testUtils.Request{ + Request: `query { + commits { + cid + collectionID + delta + docID + fieldId + fieldName + height + links { + cid + name + } + } + }`, + Results: map[string]any{ + "commits": []map[string]any{ + { + "cid": testUtils.NewUniqueCid("collection"), + "collectionID": int64(1), + "delta": nil, + "docID": nil, + "fieldId": nil, + "fieldName": nil, + "height": int64(1), + "links": []map[string]any{ + { + "cid": testUtils.NewUniqueCid("composite"), + "name": nil, + }, + }, + }, + { + "cid": testUtils.NewUniqueCid("age"), + "collectionID": int64(1), + "delta": testUtils.CBORValue(21), + "docID": "bae-0b2f15e5-bfe7-5cb7-8045-471318d7dbc3", + "fieldId": "1", + "fieldName": "age", + "height": int64(1), + "links": []map[string]any{}, + }, + { + "cid": testUtils.NewUniqueCid("name"), + "collectionID": int64(1), + "delta": testUtils.CBORValue("John"), + "docID": "bae-0b2f15e5-bfe7-5cb7-8045-471318d7dbc3", + "fieldId": "2", + "fieldName": "name", + "height": int64(1), + "links": []map[string]any{}, + }, + { + "cid": testUtils.NewUniqueCid("composite"), + "collectionID": int64(1), + "delta": nil, + "docID": "bae-0b2f15e5-bfe7-5cb7-8045-471318d7dbc3", + "fieldId": "C", + "fieldName": nil, + "height": int64(1), + "links": []map[string]any{ + { + "cid": testUtils.NewUniqueCid("age"), + "name": "age", + }, + { + "cid": testUtils.NewUniqueCid("name"), + "name": "name", + }, + }, + }, + }, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +} diff --git a/tests/integration/query/commits/branchables/update_test.go b/tests/integration/query/commits/branchables/update_test.go new file mode 100644 index 0000000000..fcb7a88106 --- /dev/null +++ b/tests/integration/query/commits/branchables/update_test.go @@ -0,0 +1,116 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package branchables + +import ( + "testing" + + testUtils "github.com/sourcenetwork/defradb/tests/integration" +) + +func TestQueryCommitsBranchables_WithDocUpdate(t *testing.T) { + test := testUtils.TestCase{ + Actions: []any{ + testUtils.SchemaUpdate{ + Schema: ` + type Users @branchable { + name: String + age: Int + } + `, + }, + testUtils.CreateDoc{ + Doc: `{ + "name": "John", + "age": 21 + }`, + }, + testUtils.UpdateDoc{ + Doc: `{ + "name": "Fred" + }`, + }, + testUtils.Request{ + Request: `query { + commits { + cid + links { + cid + } + } + }`, + Results: map[string]any{ + "commits": []map[string]any{ + { + "cid": testUtils.NewUniqueCid("collection, update"), + "links": []map[string]any{ + { + "cid": testUtils.NewUniqueCid("collection, create"), + }, + { + "cid": testUtils.NewUniqueCid("update"), + }, + }, + }, + { + "cid": testUtils.NewUniqueCid("collection, create"), + "links": []map[string]any{ + { + "cid": testUtils.NewUniqueCid("create"), + }, + }, + }, + { + "cid": testUtils.NewUniqueCid("age, create"), + "links": []map[string]any{}, + }, + { + "cid": testUtils.NewUniqueCid("name, update"), + "links": []map[string]any{ + { + "cid": testUtils.NewUniqueCid("name, create"), + }, + }, + }, + { + "cid": testUtils.NewUniqueCid("name, create"), + "links": []map[string]any{}, + }, + { + "cid": testUtils.NewUniqueCid("update"), + "links": []map[string]any{ + { + "cid": testUtils.NewUniqueCid("create"), + }, + { + "cid": testUtils.NewUniqueCid("name, update"), + }, + }, + }, + { + "cid": testUtils.NewUniqueCid("create"), + "links": []map[string]any{ + { + "cid": testUtils.NewUniqueCid("age, create"), + }, + { + "cid": testUtils.NewUniqueCid("name, create"), + }, + }, + }, + }, + }, + }, + }, + } + + testUtils.ExecuteTestCase(t, test) +}