From 495a66cc6ef8d2987620d20a251dae531507b1a2 Mon Sep 17 00:00:00 2001 From: Andrew Sisley Date: Mon, 28 Oct 2024 13:51:53 -0400 Subject: [PATCH] WIP - Implement branchable collections --- client/collection_description.go | 2 + internal/core/block/block.go | 1 + internal/core/crdt/collection.go | 74 +++++++++++++++++++++++++++ internal/keys/headstore.go | 10 ++++ internal/keys/headstore_collection.go | 52 +++++++++++++++++++ internal/keys/headstore_doc.go | 4 +- internal/merkle/clock/clock.go | 2 +- internal/merkle/clock/heads.go | 6 +-- internal/merkle/crdt/collection.go | 60 ++++++++++++++++++++++ 9 files changed, 205 insertions(+), 6 deletions(-) create mode 100644 internal/core/crdt/collection.go create mode 100644 internal/keys/headstore_collection.go create mode 100644 internal/merkle/crdt/collection.go diff --git a/client/collection_description.go b/client/collection_description.go index d86a252644..819431a8f3 100644 --- a/client/collection_description.go +++ b/client/collection_description.go @@ -88,6 +88,8 @@ type CollectionDescription struct { // At the moment this can only be set to `false` if this collection sources its data from // another collection/query (is a View). IsMaterialized bool + + TrackChanges bool } // QuerySource represents a collection data source from a query. diff --git a/internal/core/block/block.go b/internal/core/block/block.go index e930816030..3b9b4815be 100644 --- a/internal/core/block/block.go +++ b/internal/core/block/block.go @@ -44,6 +44,7 @@ func init() { &crdt.LWWRegDelta{}, &crdt.CompositeDAGDelta{}, &crdt.CounterDelta{}, + &crdt.CollectionDelta{}, ) EncryptionSchema, EncryptionSchemaPrototype = mustSetSchema( diff --git a/internal/core/crdt/collection.go b/internal/core/crdt/collection.go new file mode 100644 index 0000000000..33d6f30c4f --- /dev/null +++ b/internal/core/crdt/collection.go @@ -0,0 +1,74 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package crdt + +import ( + "context" + + "github.com/sourcenetwork/defradb/datastore" + "github.com/sourcenetwork/defradb/internal/core" + "github.com/sourcenetwork/defradb/internal/keys" +) + +type Collection struct { + store datastore.DSReaderWriter + // schemaVersionKey is the schema version datastore key at the time of commit. + // + // It can be used to identify the collection datastructure state at the time of commit. + schemaVersionKey keys.CollectionSchemaVersionKey +} + +var _ core.ReplicatedData = (*Collection)(nil) + +func NewCollection(store datastore.DSReaderWriter, schemaVersionKey keys.CollectionSchemaVersionKey) *Collection { + return &Collection{ + store: store, + schemaVersionKey: schemaVersionKey, + } +} + +func (c *Collection) Merge(ctx context.Context, other core.Delta) error { + // Collection merges don't actually need to do anything, as the delta is empty, + // and doc-level merges are handled by the document commits. + + // todo - make sure this is well tested, you might be wrong! + return nil +} + +func (c *Collection) Append() *CollectionDelta { + return &CollectionDelta{ + SchemaVersionID: c.schemaVersionKey.SchemaVersionID, + } +} + +type CollectionDelta struct { + Priority uint64 + // todo: This is temporary pending a global collection id (link to ticket) + SchemaVersionID string +} + +var _ core.Delta = (*CollectionDelta)(nil) + +func (delta *CollectionDelta) IPLDSchemaBytes() []byte { + return []byte(` + type CollectionDelta struct { + priority Int + schemaVersionID String + }`) +} + +func (d *CollectionDelta) GetPriority() uint64 { + return d.Priority +} + +func (d *CollectionDelta) SetPriority(priority uint64) { + d.Priority = priority +} diff --git a/internal/keys/headstore.go b/internal/keys/headstore.go index b0db874169..3458227c7a 100644 --- a/internal/keys/headstore.go +++ b/internal/keys/headstore.go @@ -10,6 +10,16 @@ package keys +import ( + "github.com/ipfs/go-cid" +) + const ( HEADSTORE_DOC = "/d" + HEADSTORE_COL = "/c" ) + +type HeadstoreKey interface { + Key + WithCid(c cid.Cid) HeadstoreKey +} diff --git a/internal/keys/headstore_collection.go b/internal/keys/headstore_collection.go new file mode 100644 index 0000000000..c71e7f4dad --- /dev/null +++ b/internal/keys/headstore_collection.go @@ -0,0 +1,52 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package keys + +import ( + "strconv" + + "github.com/ipfs/go-cid" + ds "github.com/ipfs/go-datastore" +) + +type HeadstoreColKey struct { + CollectionRootID uint32 + Cid cid.Cid +} + +var _ HeadstoreKey = (*HeadstoreColKey)(nil) + +func (k HeadstoreColKey) WithCid(c cid.Cid) HeadstoreKey { + newKey := k + newKey.Cid = c + return newKey +} + +func (k HeadstoreColKey) ToString() string { + result := HEADSTORE_COL + + if k.CollectionRootID != 0 { + result = result + "/" + strconv.Itoa(int(k.CollectionRootID)) + } + if k.Cid.Defined() { + result = result + "/" + k.Cid.String() + } + + return result +} + +func (k HeadstoreColKey) Bytes() []byte { + return []byte(k.ToString()) +} + +func (k HeadstoreColKey) ToDS() ds.Key { + return ds.NewKey(k.ToString()) +} diff --git a/internal/keys/headstore_doc.go b/internal/keys/headstore_doc.go index ad40984663..75f45eca4b 100644 --- a/internal/keys/headstore_doc.go +++ b/internal/keys/headstore_doc.go @@ -23,7 +23,7 @@ type HeadstoreDocKey struct { Cid cid.Cid } -var _ Key = (*HeadstoreDocKey)(nil) +var _ HeadstoreKey = (*HeadstoreDocKey)(nil) // Creates a new HeadstoreDocKey from a string as best as it can, // splitting the input using '/' as a field deliminator. It assumes @@ -57,7 +57,7 @@ func (k HeadstoreDocKey) WithDocID(docID string) HeadstoreDocKey { return newKey } -func (k HeadstoreDocKey) WithCid(c cid.Cid) HeadstoreDocKey { +func (k HeadstoreDocKey) WithCid(c cid.Cid) HeadstoreKey { newKey := k newKey.Cid = c return newKey diff --git a/internal/merkle/clock/clock.go b/internal/merkle/clock/clock.go index cc2f7f58ec..342ca335df 100644 --- a/internal/merkle/clock/clock.go +++ b/internal/merkle/clock/clock.go @@ -48,7 +48,7 @@ func NewMerkleClock( headstore datastore.DSReaderWriter, blockstore datastore.Blockstore, encstore datastore.Blockstore, - namespace keys.HeadStoreDocKey, + namespace keys.HeadstoreKey, crdt core.ReplicatedData, ) *MerkleClock { return &MerkleClock{ diff --git a/internal/merkle/clock/heads.go b/internal/merkle/clock/heads.go index 03cc41d04f..c1e0e89a37 100644 --- a/internal/merkle/clock/heads.go +++ b/internal/merkle/clock/heads.go @@ -27,17 +27,17 @@ import ( // heads manages the current Merkle-CRDT heads. type heads struct { store datastore.DSReaderWriter - namespace keys.HeadStoreDocKey + namespace keys.HeadstoreKey } -func NewHeadSet(store datastore.DSReaderWriter, namespace keys.HeadStoreDocKey) *heads { +func NewHeadSet(store datastore.DSReaderWriter, namespace keys.HeadstoreKey) *heads { return &heads{ store: store, namespace: namespace, } } -func (hh *heads) key(c cid.Cid) keys.HeadStoreDocKey { +func (hh *heads) key(c cid.Cid) keys.HeadstoreKey { return hh.namespace.WithCid(c) } diff --git a/internal/merkle/crdt/collection.go b/internal/merkle/crdt/collection.go new file mode 100644 index 0000000000..469514e360 --- /dev/null +++ b/internal/merkle/crdt/collection.go @@ -0,0 +1,60 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package merklecrdt + +import ( + "context" + + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/sourcenetwork/defradb/client" + coreblock "github.com/sourcenetwork/defradb/internal/core/block" + "github.com/sourcenetwork/defradb/internal/core/crdt" + "github.com/sourcenetwork/defradb/internal/keys" + "github.com/sourcenetwork/defradb/internal/merkle/clock" +) + +type MerkleCollection struct { + clock *clock.MerkleClock + reg *crdt.Collection +} + +var _ MerkleCRDT = (*MerkleCollection)(nil) + +func NewMerkleCollection( + store Stores, + schemaVersionKey keys.CollectionSchemaVersionKey, + key keys.HeadstoreColKey, +) *MerkleCollection { + register := crdt.NewCollection(store.Datastore(), schemaVersionKey) + + clk := clock.NewMerkleClock(store.Headstore(), store.Blockstore(), store.Encstore(), key, register) + + return &MerkleCollection{ + clock: clk, + reg: register, + } +} + +func (m *MerkleCollection) Clock() *clock.MerkleClock { + return m.clock +} + +// Save the value of the Counter to the DAG. +func (m *MerkleCollection) Save(ctx context.Context, data any) (cidlink.Link, []byte, error) { + links, ok := data.([]coreblock.DAGLink) + if !ok { + return cidlink.Link{}, nil, NewErrUnexpectedValueType(client.COMPOSITE, []coreblock.DAGLink{}, data) + } + + delta := m.reg.Append() + + return m.clock.AddDelta(ctx, delta, links...) +}