From 366373b886e49d250d1a2c1e8edcf8ac47d56b5a Mon Sep 17 00:00:00 2001 From: Andrew Sisley Date: Mon, 28 Oct 2024 13:51:53 -0400 Subject: [PATCH] WIP - Implement branchable collections --- internal/core/crdt/collection.go | 69 ++++++++++++++++++++++++++++++ internal/merkle/crdt/collection.go | 60 ++++++++++++++++++++++++++ 2 files changed, 129 insertions(+) create mode 100644 internal/core/crdt/collection.go create mode 100644 internal/merkle/crdt/collection.go diff --git a/internal/core/crdt/collection.go b/internal/core/crdt/collection.go new file mode 100644 index 0000000000..6493b8e526 --- /dev/null +++ b/internal/core/crdt/collection.go @@ -0,0 +1,69 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package crdt + +import ( + "context" + + "github.com/sourcenetwork/defradb/datastore" + "github.com/sourcenetwork/defradb/internal/core" +) + +type Collection struct { + store datastore.DSReaderWriter + // schemaVersionKey is the schema version datastore key at the time of commit. + // + // It can be used to identify the collection datastructure state at the time of commit. + schemaVersionKey core.CollectionSchemaVersionKey +} + +var _ core.ReplicatedData = (*Collection)(nil) + +func NewCollection(store datastore.DSReaderWriter, schemaVersionKey core.CollectionSchemaVersionKey) *Collection { + return &Collection{ + store: store, + schemaVersionKey: schemaVersionKey, + } +} + +func (c *Collection) Merge(ctx context.Context, other core.Delta) error { + panic("todo") +} + +func (c *Collection) Append() *CollectionDelta { + return &CollectionDelta{ + SchemaVersionID: c.schemaVersionKey.SchemaVersionID, + } +} + +type CollectionDelta struct { + Priority uint64 + // todo: This is temporary pending a global collection id (link to ticket) + SchemaVersionID string +} + +var _ core.Delta = (*CollectionDelta)(nil) + +func (delta *CollectionDelta) IPLDSchemaBytes() []byte { + return []byte(` + type CollectionDelta struct { + priority Int + schemaVersionID String + }`) +} + +func (d *CollectionDelta) GetPriority() uint64 { + panic("todo") +} + +func (d *CollectionDelta) SetPriority(uint64) { + panic("todo") +} diff --git a/internal/merkle/crdt/collection.go b/internal/merkle/crdt/collection.go new file mode 100644 index 0000000000..c913245cae --- /dev/null +++ b/internal/merkle/crdt/collection.go @@ -0,0 +1,60 @@ +// Copyright 2024 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package merklecrdt + +import ( + "context" + + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/sourcenetwork/defradb/client" + "github.com/sourcenetwork/defradb/internal/core" + coreblock "github.com/sourcenetwork/defradb/internal/core/block" + "github.com/sourcenetwork/defradb/internal/core/crdt" + "github.com/sourcenetwork/defradb/internal/merkle/clock" +) + +type MerkleCollection struct { + clock *clock.MerkleClock + reg *crdt.Collection +} + +var _ MerkleCRDT = (*MerkleCollection)(nil) + +func NewMerkleCollection( + store Stores, + schemaVersionKey core.CollectionSchemaVersionKey, +) *MerkleCollection { + register := crdt.NewCollection(store.Datastore(), schemaVersionKey) + + // todo - you can interface HeadStoreKey so that it has a `WithCid` func then it should hopefuly all be good + //clk := clock.NewMerkleClock(store.Headstore(), store.Blockstore(), store.Encstore(), key.ToHeadStoreKey(), register) + + return &MerkleCollection{ + //clock: clk, + reg: register, + } +} + +func (m *MerkleCollection) Clock() *clock.MerkleClock { + return m.clock +} + +// Save the value of the Counter to the DAG. +func (m *MerkleCollection) Save(ctx context.Context, data any) (cidlink.Link, []byte, error) { + links, ok := data.([]coreblock.DAGLink) + if !ok { + return cidlink.Link{}, nil, NewErrUnexpectedValueType(client.COMPOSITE, []coreblock.DAGLink{}, data) + } + + delta := m.reg.Append() + + return m.clock.AddDelta(ctx, delta, links...) +}