diff --git a/client/collection_description.go b/client/collection_description.go index d86a252644..819431a8f3 100644 --- a/client/collection_description.go +++ b/client/collection_description.go @@ -88,6 +88,8 @@ type CollectionDescription struct { // At the moment this can only be set to `false` if this collection sources its data from // another collection/query (is a View). IsMaterialized bool + + TrackChanges bool } // QuerySource represents a collection data source from a query. diff --git a/internal/core/block/block.go b/internal/core/block/block.go index e930816030..3b9b4815be 100644 --- a/internal/core/block/block.go +++ b/internal/core/block/block.go @@ -44,6 +44,7 @@ func init() { &crdt.LWWRegDelta{}, &crdt.CompositeDAGDelta{}, &crdt.CounterDelta{}, + &crdt.CollectionDelta{}, ) EncryptionSchema, EncryptionSchemaPrototype = mustSetSchema( diff --git a/internal/core/crdt/collection.go b/internal/core/crdt/collection.go new file mode 100644 index 0000000000..33d6f30c4f --- /dev/null +++ b/internal/core/crdt/collection.go @@ -0,0 +1,74 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package crdt + +import ( + "context" + + "github.com/sourcenetwork/defradb/datastore" + "github.com/sourcenetwork/defradb/internal/core" + "github.com/sourcenetwork/defradb/internal/keys" +) + +type Collection struct { + store datastore.DSReaderWriter + // schemaVersionKey is the schema version datastore key at the time of commit. + // + // It can be used to identify the collection datastructure state at the time of commit. + schemaVersionKey keys.CollectionSchemaVersionKey +} + +var _ core.ReplicatedData = (*Collection)(nil) + +func NewCollection(store datastore.DSReaderWriter, schemaVersionKey keys.CollectionSchemaVersionKey) *Collection { + return &Collection{ + store: store, + schemaVersionKey: schemaVersionKey, + } +} + +func (c *Collection) Merge(ctx context.Context, other core.Delta) error { + // Collection merges don't actually need to do anything, as the delta is empty, + // and doc-level merges are handled by the document commits. + + // todo - make sure this is well tested, you might be wrong! + return nil +} + +func (c *Collection) Append() *CollectionDelta { + return &CollectionDelta{ + SchemaVersionID: c.schemaVersionKey.SchemaVersionID, + } +} + +type CollectionDelta struct { + Priority uint64 + // todo: This is temporary pending a global collection id (link to ticket) + SchemaVersionID string +} + +var _ core.Delta = (*CollectionDelta)(nil) + +func (delta *CollectionDelta) IPLDSchemaBytes() []byte { + return []byte(` + type CollectionDelta struct { + priority Int + schemaVersionID String + }`) +} + +func (d *CollectionDelta) GetPriority() uint64 { + return d.Priority +} + +func (d *CollectionDelta) SetPriority(priority uint64) { + d.Priority = priority +} diff --git a/internal/db/collection.go b/internal/db/collection.go index 8d71c7aff6..12f79043d1 100644 --- a/internal/db/collection.go +++ b/internal/db/collection.go @@ -711,6 +711,29 @@ func (c *collection) save( doc.SetHead(link.Cid) }) + if c.def.Description.TrackChanges { + collectionCRDT := merklecrdt.NewMerkleCollection( + txn, + keys.NewCollectionSchemaVersionKey(c.Schema().VersionID, c.ID()), + keys.NewHeadstoreColKey(c.def.Description.RootID), + ) + + link, headNode, err := collectionCRDT.Save(ctx, []coreblock.DAGLink{{Link: link}}) + if err != nil { + return err + } + + updateEvent := event.Update{ + Cid: link.Cid, + SchemaRoot: c.Schema().Root, + Block: headNode, + } + + txn.OnSuccess(func() { + c.db.events.Publish(event.NewMessage(event.UpdateName, updateEvent)) + }) + } + return nil } diff --git a/internal/keys/headstore.go b/internal/keys/headstore.go index b0db874169..3458227c7a 100644 --- a/internal/keys/headstore.go +++ b/internal/keys/headstore.go @@ -10,6 +10,16 @@ package keys +import ( + "github.com/ipfs/go-cid" +) + const ( HEADSTORE_DOC = "/d" + HEADSTORE_COL = "/c" ) + +type HeadstoreKey interface { + Key + WithCid(c cid.Cid) HeadstoreKey +} diff --git a/internal/keys/headstore_collection.go b/internal/keys/headstore_collection.go new file mode 100644 index 0000000000..fe6c70fd3e --- /dev/null +++ b/internal/keys/headstore_collection.go @@ -0,0 +1,58 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package keys + +import ( + "strconv" + + "github.com/ipfs/go-cid" + ds "github.com/ipfs/go-datastore" +) + +type HeadstoreColKey struct { + CollectionRootID uint32 + Cid cid.Cid +} + +var _ HeadstoreKey = (*HeadstoreColKey)(nil) + +func NewHeadstoreColKey(colRootID uint32) HeadstoreColKey { + return HeadstoreColKey{ + CollectionRootID: colRootID, + } +} + +func (k HeadstoreColKey) WithCid(c cid.Cid) HeadstoreKey { + newKey := k + newKey.Cid = c + return newKey +} + +func (k HeadstoreColKey) ToString() string { + result := HEADSTORE_COL + + if k.CollectionRootID != 0 { + result = result + "/" + strconv.Itoa(int(k.CollectionRootID)) + } + if k.Cid.Defined() { + result = result + "/" + k.Cid.String() + } + + return result +} + +func (k HeadstoreColKey) Bytes() []byte { + return []byte(k.ToString()) +} + +func (k HeadstoreColKey) ToDS() ds.Key { + return ds.NewKey(k.ToString()) +} diff --git a/internal/keys/headstore_doc.go b/internal/keys/headstore_doc.go index ad40984663..75f45eca4b 100644 --- a/internal/keys/headstore_doc.go +++ b/internal/keys/headstore_doc.go @@ -23,7 +23,7 @@ type HeadstoreDocKey struct { Cid cid.Cid } -var _ Key = (*HeadstoreDocKey)(nil) +var _ HeadstoreKey = (*HeadstoreDocKey)(nil) // Creates a new HeadstoreDocKey from a string as best as it can, // splitting the input using '/' as a field deliminator. It assumes @@ -57,7 +57,7 @@ func (k HeadstoreDocKey) WithDocID(docID string) HeadstoreDocKey { return newKey } -func (k HeadstoreDocKey) WithCid(c cid.Cid) HeadstoreDocKey { +func (k HeadstoreDocKey) WithCid(c cid.Cid) HeadstoreKey { newKey := k newKey.Cid = c return newKey diff --git a/internal/merkle/clock/clock.go b/internal/merkle/clock/clock.go index e7682f1912..342ca335df 100644 --- a/internal/merkle/clock/clock.go +++ b/internal/merkle/clock/clock.go @@ -48,7 +48,7 @@ func NewMerkleClock( headstore datastore.DSReaderWriter, blockstore datastore.Blockstore, encstore datastore.Blockstore, - namespace keys.HeadstoreDocKey, + namespace keys.HeadstoreKey, crdt core.ReplicatedData, ) *MerkleClock { return &MerkleClock{ diff --git a/internal/merkle/clock/heads.go b/internal/merkle/clock/heads.go index c0afa21636..c1e0e89a37 100644 --- a/internal/merkle/clock/heads.go +++ b/internal/merkle/clock/heads.go @@ -27,17 +27,17 @@ import ( // heads manages the current Merkle-CRDT heads. type heads struct { store datastore.DSReaderWriter - namespace keys.HeadstoreDocKey + namespace keys.HeadstoreKey } -func NewHeadSet(store datastore.DSReaderWriter, namespace keys.HeadstoreDocKey) *heads { +func NewHeadSet(store datastore.DSReaderWriter, namespace keys.HeadstoreKey) *heads { return &heads{ store: store, namespace: namespace, } } -func (hh *heads) key(c cid.Cid) keys.HeadstoreDocKey { +func (hh *heads) key(c cid.Cid) keys.HeadstoreKey { return hh.namespace.WithCid(c) } diff --git a/internal/merkle/crdt/collection.go b/internal/merkle/crdt/collection.go new file mode 100644 index 0000000000..71c2775f92 --- /dev/null +++ b/internal/merkle/crdt/collection.go @@ -0,0 +1,52 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package merklecrdt + +import ( + "context" + + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + coreblock "github.com/sourcenetwork/defradb/internal/core/block" + "github.com/sourcenetwork/defradb/internal/core/crdt" + "github.com/sourcenetwork/defradb/internal/keys" + "github.com/sourcenetwork/defradb/internal/merkle/clock" +) + +type MerkleCollection struct { + clock *clock.MerkleClock + reg *crdt.Collection +} + +var _ MerkleCRDT = (*MerkleCollection)(nil) + +func NewMerkleCollection( + store Stores, + schemaVersionKey keys.CollectionSchemaVersionKey, + key keys.HeadstoreColKey, +) *MerkleCollection { + register := crdt.NewCollection(store.Datastore(), schemaVersionKey) + + clk := clock.NewMerkleClock(store.Headstore(), store.Blockstore(), store.Encstore(), key, register) + + return &MerkleCollection{ + clock: clk, + reg: register, + } +} + +func (m *MerkleCollection) Clock() *clock.MerkleClock { + return m.clock +} + +func (m *MerkleCollection) Save(ctx context.Context, links []coreblock.DAGLink) (cidlink.Link, []byte, error) { + delta := m.reg.Append() + return m.clock.AddDelta(ctx, delta, links...) +}