Skip to content

Commit

Permalink
WIP - Implement branchable collections
Browse files Browse the repository at this point in the history
  • Loading branch information
AndrewSisley committed Oct 31, 2024
1 parent 705ca6f commit 71fb069
Show file tree
Hide file tree
Showing 10 changed files with 233 additions and 6 deletions.
2 changes: 2 additions & 0 deletions client/collection_description.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ type CollectionDescription struct {
// At the moment this can only be set to `false` if this collection sources its data from
// another collection/query (is a View).
IsMaterialized bool

TrackChanges bool
}

// QuerySource represents a collection data source from a query.
Expand Down
1 change: 1 addition & 0 deletions internal/core/block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func init() {
&crdt.LWWRegDelta{},
&crdt.CompositeDAGDelta{},
&crdt.CounterDelta{},
&crdt.CollectionDelta{},
)

EncryptionSchema, EncryptionSchemaPrototype = mustSetSchema(
Expand Down
74 changes: 74 additions & 0 deletions internal/core/crdt/collection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Copyright 2024 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package crdt

import (
"context"

"github.com/sourcenetwork/defradb/datastore"
"github.com/sourcenetwork/defradb/internal/core"
"github.com/sourcenetwork/defradb/internal/keys"
)

type Collection struct {
store datastore.DSReaderWriter
// schemaVersionKey is the schema version datastore key at the time of commit.
//
// It can be used to identify the collection datastructure state at the time of commit.
schemaVersionKey keys.CollectionSchemaVersionKey
}

var _ core.ReplicatedData = (*Collection)(nil)

func NewCollection(store datastore.DSReaderWriter, schemaVersionKey keys.CollectionSchemaVersionKey) *Collection {
return &Collection{
store: store,
schemaVersionKey: schemaVersionKey,
}
}

func (c *Collection) Merge(ctx context.Context, other core.Delta) error {
// Collection merges don't actually need to do anything, as the delta is empty,
// and doc-level merges are handled by the document commits.

// todo - make sure this is well tested, you might be wrong!
return nil
}

func (c *Collection) Append() *CollectionDelta {
return &CollectionDelta{
SchemaVersionID: c.schemaVersionKey.SchemaVersionID,
}
}

type CollectionDelta struct {
Priority uint64
// todo: This is temporary pending a global collection id (link to ticket)
SchemaVersionID string
}

var _ core.Delta = (*CollectionDelta)(nil)

func (delta *CollectionDelta) IPLDSchemaBytes() []byte {
return []byte(`
type CollectionDelta struct {
priority Int
schemaVersionID String
}`)
}

func (d *CollectionDelta) GetPriority() uint64 {
return d.Priority
}

func (d *CollectionDelta) SetPriority(priority uint64) {
d.Priority = priority
}
23 changes: 23 additions & 0 deletions internal/db/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,29 @@ func (c *collection) save(
doc.SetHead(link.Cid)
})

if c.def.Description.TrackChanges {
collectionCRDT := merklecrdt.NewMerkleCollection(
txn,
keys.NewCollectionSchemaVersionKey(c.Schema().VersionID, c.ID()),
keys.NewHeadstoreColKey(c.def.Description.RootID),
)

link, headNode, err := collectionCRDT.Save(ctx, []coreblock.DAGLink{{Link: link}})
if err != nil {
return err
}

updateEvent := event.Update{
Cid: link.Cid,
SchemaRoot: c.Schema().Root,
Block: headNode,
}

txn.OnSuccess(func() {
c.db.events.Publish(event.NewMessage(event.UpdateName, updateEvent))
})
}

return nil
}

Expand Down
10 changes: 10 additions & 0 deletions internal/keys/headstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,16 @@

package keys

import (
"github.com/ipfs/go-cid"
)

const (
HEADSTORE_DOC = "/d"
HEADSTORE_COL = "/c"
)

type HeadstoreKey interface {
Key
WithCid(c cid.Cid) HeadstoreKey
}
58 changes: 58 additions & 0 deletions internal/keys/headstore_collection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2024 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package keys

import (
"strconv"

"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
)

type HeadstoreColKey struct {
CollectionRootID uint32
Cid cid.Cid
}

var _ HeadstoreKey = (*HeadstoreColKey)(nil)

func NewHeadstoreColKey(colRootID uint32) HeadstoreColKey {
return HeadstoreColKey{
CollectionRootID: colRootID,
}
}

func (k HeadstoreColKey) WithCid(c cid.Cid) HeadstoreKey {
newKey := k
newKey.Cid = c
return newKey
}

func (k HeadstoreColKey) ToString() string {
result := HEADSTORE_COL

if k.CollectionRootID != 0 {
result = result + "/" + strconv.Itoa(int(k.CollectionRootID))
}
if k.Cid.Defined() {
result = result + "/" + k.Cid.String()
}

return result
}

func (k HeadstoreColKey) Bytes() []byte {
return []byte(k.ToString())
}

func (k HeadstoreColKey) ToDS() ds.Key {
return ds.NewKey(k.ToString())
}
4 changes: 2 additions & 2 deletions internal/keys/headstore_doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type HeadstoreDocKey struct {
Cid cid.Cid
}

var _ Key = (*HeadstoreDocKey)(nil)
var _ HeadstoreKey = (*HeadstoreDocKey)(nil)

// Creates a new HeadstoreDocKey from a string as best as it can,
// splitting the input using '/' as a field deliminator. It assumes
Expand Down Expand Up @@ -57,7 +57,7 @@ func (k HeadstoreDocKey) WithDocID(docID string) HeadstoreDocKey {
return newKey
}

func (k HeadstoreDocKey) WithCid(c cid.Cid) HeadstoreDocKey {
func (k HeadstoreDocKey) WithCid(c cid.Cid) HeadstoreKey {
newKey := k
newKey.Cid = c
return newKey
Expand Down
2 changes: 1 addition & 1 deletion internal/merkle/clock/clock.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func NewMerkleClock(
headstore datastore.DSReaderWriter,
blockstore datastore.Blockstore,
encstore datastore.Blockstore,
namespace keys.HeadstoreDocKey,
namespace keys.HeadstoreKey,
crdt core.ReplicatedData,
) *MerkleClock {
return &MerkleClock{
Expand Down
6 changes: 3 additions & 3 deletions internal/merkle/clock/heads.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,17 @@ import (
// heads manages the current Merkle-CRDT heads.
type heads struct {
store datastore.DSReaderWriter
namespace keys.HeadstoreDocKey
namespace keys.HeadstoreKey
}

func NewHeadSet(store datastore.DSReaderWriter, namespace keys.HeadstoreDocKey) *heads {
func NewHeadSet(store datastore.DSReaderWriter, namespace keys.HeadstoreKey) *heads {
return &heads{
store: store,
namespace: namespace,
}
}

func (hh *heads) key(c cid.Cid) keys.HeadstoreDocKey {
func (hh *heads) key(c cid.Cid) keys.HeadstoreKey {
return hh.namespace.WithCid(c)
}

Expand Down
59 changes: 59 additions & 0 deletions internal/merkle/crdt/collection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2024 Democratized Data Foundation
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package merklecrdt

import (
"context"

cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/sourcenetwork/defradb/client"
coreblock "github.com/sourcenetwork/defradb/internal/core/block"
"github.com/sourcenetwork/defradb/internal/core/crdt"
"github.com/sourcenetwork/defradb/internal/keys"
"github.com/sourcenetwork/defradb/internal/merkle/clock"
)

type MerkleCollection struct {
clock *clock.MerkleClock
reg *crdt.Collection
}

var _ MerkleCRDT = (*MerkleCollection)(nil)

func NewMerkleCollection(
store Stores,
schemaVersionKey keys.CollectionSchemaVersionKey,
key keys.HeadstoreColKey,
) *MerkleCollection {
register := crdt.NewCollection(store.Datastore(), schemaVersionKey)

clk := clock.NewMerkleClock(store.Headstore(), store.Blockstore(), store.Encstore(), key, register)

return &MerkleCollection{
clock: clk,
reg: register,
}
}

func (m *MerkleCollection) Clock() *clock.MerkleClock {
return m.clock
}

func (m *MerkleCollection) Save(ctx context.Context, data any) (cidlink.Link, []byte, error) {
links, ok := data.([]coreblock.DAGLink)
if !ok {
return cidlink.Link{}, nil, NewErrUnexpectedValueType(client.COMPOSITE, []coreblock.DAGLink{}, data)
}

delta := m.reg.Append()

return m.clock.AddDelta(ctx, delta, links...)
}

0 comments on commit 71fb069

Please sign in to comment.