Skip to content

Commit

Permalink
refactor: Remove indirection from crdt packages (sourcenetwork#3192)
Browse files Browse the repository at this point in the history
## Relevant issue(s)

Resolves sourcenetwork#3191

## Description

Removes various items of indirection from the merkle/crdt and core/crdt
packages that was making the code quite a lot harder to follow than it
need to be.
  • Loading branch information
AndrewSisley authored Oct 29, 2024
1 parent 3a3baac commit b4b2bf2
Show file tree
Hide file tree
Showing 12 changed files with 107 additions and 284 deletions.
45 changes: 9 additions & 36 deletions internal/core/crdt/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,40 +21,9 @@ import (
"github.com/sourcenetwork/defradb/internal/core"
)

// baseCRDT is embedded as a base layer into all
// the core CRDT implementations to reduce code
// duplication, and better manage the overhead
// tasks that all the CRDTs need to implement anyway
type baseCRDT struct {
store datastore.DSReaderWriter
key core.DataStoreKey

// schemaVersionKey is the schema version datastore key at the time of commit.
//
// It can be used to identify the collection datastructure state at the time of commit.
schemaVersionKey core.CollectionSchemaVersionKey

// fieldName holds the name of the field hosting this CRDT, if this is a field level
// commit.
fieldName string
}

func newBaseCRDT(
store datastore.DSReaderWriter,
key core.DataStoreKey,
schemaVersionKey core.CollectionSchemaVersionKey,
fieldName string,
) baseCRDT {
return baseCRDT{
store: store,
key: key,
schemaVersionKey: schemaVersionKey,
fieldName: fieldName,
}
}

func (base baseCRDT) setPriority(
func setPriority(
ctx context.Context,
store datastore.DSReaderWriter,
key core.DataStoreKey,
priority uint64,
) error {
Expand All @@ -65,13 +34,17 @@ func (base baseCRDT) setPriority(
return ErrEncodingPriority
}

return base.store.Put(ctx, prioK.ToDS(), buf[0:n])
return store.Put(ctx, prioK.ToDS(), buf[0:n])
}

// get the current priority for given key
func (base baseCRDT) getPriority(ctx context.Context, key core.DataStoreKey) (uint64, error) {
func getPriority(
ctx context.Context,
store datastore.DSReaderWriter,
key core.DataStoreKey,
) (uint64, error) {
pKey := key.WithPriorityFlag()
pbuf, err := base.store.Get(ctx, pKey.ToDS())
pbuf, err := store.Get(ctx, pKey.ToDS())
if err != nil {
if errors.Is(err, ds.ErrNotFound) {
return 0, nil
Expand Down
28 changes: 6 additions & 22 deletions internal/core/crdt/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,47 +24,31 @@ func newDS() datastore.DSReaderWriter {
return datastore.AsDSReaderWriter(ds.NewMapDatastore())
}

func newSeededDS() datastore.DSReaderWriter {
return newDS()
}

func exampleBaseCRDT() baseCRDT {
return newBaseCRDT(newSeededDS(), core.DataStoreKey{}, core.CollectionSchemaVersionKey{}, "")
}

func TestBaseCRDTNew(t *testing.T) {
base := newBaseCRDT(newDS(), core.DataStoreKey{}, core.CollectionSchemaVersionKey{}, "")
if base.store == nil {
t.Error("newBaseCRDT needs to init store")
}
}

func TestBaseCRDTvalueKey(t *testing.T) {
base := exampleBaseCRDT()
vk := base.key.WithDocID("mykey").WithValueFlag()
vk := core.DataStoreKey{}.WithDocID("mykey").WithValueFlag()
if vk.ToString() != "/v/mykey" {
t.Errorf("Incorrect valueKey. Have %v, want %v", vk.ToString(), "/v/mykey")
}
}

func TestBaseCRDTprioryKey(t *testing.T) {
base := exampleBaseCRDT()
pk := base.key.WithDocID("mykey").WithPriorityFlag()
pk := core.DataStoreKey{}.WithDocID("mykey").WithPriorityFlag()
if pk.ToString() != "/p/mykey" {
t.Errorf("Incorrect priorityKey. Have %v, want %v", pk.ToString(), "/p/mykey")
}
}

func TestBaseCRDTSetGetPriority(t *testing.T) {
base := exampleBaseCRDT()
store := newDS()

ctx := context.Background()
err := base.setPriority(ctx, base.key.WithDocID("mykey"), 10)
err := setPriority(ctx, store, core.DataStoreKey{}.WithDocID("mykey"), 10)
if err != nil {
t.Errorf("baseCRDT failed to set Priority. err: %v", err)
return
}

priority, err := base.getPriority(ctx, base.key.WithDocID("mykey"))
priority, err := getPriority(ctx, store, core.DataStoreKey{}.WithDocID("mykey"))
if err != nil {
t.Errorf("baseCRDT failed to get priority. err: %v", err)
return
Expand Down
19 changes: 12 additions & 7 deletions internal/core/crdt/composite.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,13 @@ func (delta *CompositeDAGDelta) SetPriority(prio uint64) {

// CompositeDAG is a CRDT structure that is used to track a collection of sub MerkleCRDTs.
type CompositeDAG struct {
baseCRDT
store datastore.DSReaderWriter
key core.DataStoreKey

// schemaVersionKey is the schema version datastore key at the time of commit.
//
// It can be used to identify the collection datastructure state at the time of commit.
schemaVersionKey core.CollectionSchemaVersionKey
}

var _ core.ReplicatedData = (*CompositeDAG)(nil)
Expand All @@ -86,12 +92,11 @@ func NewCompositeDAG(
schemaVersionKey core.CollectionSchemaVersionKey,
key core.DataStoreKey,
) CompositeDAG {
return CompositeDAG{newBaseCRDT(store, key, schemaVersionKey, "")}
}

// Value is a no-op for a CompositeDAG.
func (c CompositeDAG) Value(ctx context.Context) ([]byte, error) {
return nil, nil
return CompositeDAG{
store: store,
key: key,
schemaVersionKey: schemaVersionKey,
}
}

// Set returns a new composite DAG delta CRDT with the given status.
Expand Down
32 changes: 20 additions & 12 deletions internal/core/crdt/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,18 @@ func (delta *CounterDelta) SetPriority(prio uint64) {
// Counter, is a simple CRDT type that allows increment/decrement
// of an Int and Float data types that ensures convergence.
type Counter struct {
baseCRDT
store datastore.DSReaderWriter
key core.DataStoreKey

// schemaVersionKey is the schema version datastore key at the time of commit.
//
// It can be used to identify the collection datastructure state at the time of commit.
schemaVersionKey core.CollectionSchemaVersionKey

// fieldName holds the name of the field hosting this CRDT, if this is a field level
// commit.
fieldName string

AllowDecrement bool
Kind client.ScalarKind
}
Expand All @@ -93,17 +104,14 @@ func NewCounter(
allowDecrement bool,
kind client.ScalarKind,
) Counter {
return Counter{newBaseCRDT(store, key, schemaVersionKey, fieldName), allowDecrement, kind}
}

// Value gets the current counter value
func (c Counter) Value(ctx context.Context) ([]byte, error) {
valueK := c.key.WithValueFlag()
buf, err := c.store.Get(ctx, valueK.ToDS())
if err != nil {
return nil, err
return Counter{
store: store,
key: key,
schemaVersionKey: schemaVersionKey,
fieldName: fieldName,
AllowDecrement: allowDecrement,
Kind: kind,
}
return buf, nil
}

// Set generates a new delta with the supplied value.
Expand Down Expand Up @@ -184,7 +192,7 @@ func (c Counter) incrementValue(
return NewErrFailedToStoreValue(err)
}

return c.setPriority(ctx, c.key, priority)
return setPriority(ctx, c.store, c.key, priority)
}

func (c Counter) CType() client.CType {
Expand Down
32 changes: 18 additions & 14 deletions internal/core/crdt/lwwreg.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,17 @@ func (delta *LWWRegDelta) SetPriority(prio uint64) {
// LWWRegister, Last-Writer-Wins Register, is a simple CRDT type that allows set/get
// of an arbitrary data type that ensures convergence.
type LWWRegister struct {
baseCRDT
store datastore.DSReaderWriter
key core.DataStoreKey

// schemaVersionKey is the schema version datastore key at the time of commit.
//
// It can be used to identify the collection datastructure state at the time of commit.
schemaVersionKey core.CollectionSchemaVersionKey

// fieldName holds the name of the field hosting this CRDT, if this is a field level
// commit.
fieldName string
}

var _ core.ReplicatedData = (*LWWRegister)(nil)
Expand All @@ -77,18 +87,12 @@ func NewLWWRegister(
key core.DataStoreKey,
fieldName string,
) LWWRegister {
return LWWRegister{newBaseCRDT(store, key, schemaVersionKey, fieldName)}
}

// Value gets the current register value
// RETURN STATE
func (reg LWWRegister) Value(ctx context.Context) ([]byte, error) {
valueK := reg.key.WithValueFlag()
buf, err := reg.store.Get(ctx, valueK.ToDS())
if err != nil {
return nil, err
return LWWRegister{
store: store,
key: key,
schemaVersionKey: schemaVersionKey,
fieldName: fieldName,
}
return buf, nil
}

// Set generates a new delta with the supplied value
Expand Down Expand Up @@ -116,7 +120,7 @@ func (reg LWWRegister) Merge(ctx context.Context, delta core.Delta) error {
}

func (reg LWWRegister) setValue(ctx context.Context, val []byte, priority uint64) error {
curPrio, err := reg.getPriority(ctx, reg.key)
curPrio, err := getPriority(ctx, reg.store, reg.key)
if err != nil {
return NewErrFailedToGetPriority(err)
}
Expand Down Expand Up @@ -161,5 +165,5 @@ func (reg LWWRegister) setValue(ctx context.Context, val []byte, priority uint64
}
}

return reg.setPriority(ctx, reg.key, priority)
return setPriority(ctx, reg.store, reg.key, priority)
}
70 changes: 0 additions & 70 deletions internal/core/crdt/lwwreg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,10 @@
package crdt

import (
"context"
"reflect"
"testing"

ds "github.com/ipfs/go-datastore"
"github.com/stretchr/testify/require"

"github.com/sourcenetwork/defradb/datastore"
"github.com/sourcenetwork/defradb/internal/core"
Expand All @@ -32,15 +30,6 @@ func setupLWWRegister() LWWRegister {
return NewLWWRegister(store, core.CollectionSchemaVersionKey{}, key, "")
}

func setupLoadedLWWRegister(t *testing.T, ctx context.Context) LWWRegister {
lww := setupLWWRegister()
addDelta := lww.Set([]byte("test"))
addDelta.SetPriority(1)
err := lww.Merge(ctx, addDelta)
require.NoError(t, err)
return lww
}

func TestLWWRegisterAddDelta(t *testing.T) {
lww := setupLWWRegister()
addDelta := lww.Set([]byte("test"))
Expand All @@ -50,65 +39,6 @@ func TestLWWRegisterAddDelta(t *testing.T) {
}
}

func TestLWWRegisterInitialMerge(t *testing.T) {
ctx := context.Background()
lww := setupLWWRegister()
addDelta := lww.Set([]byte("test"))
addDelta.SetPriority(1)
err := lww.Merge(ctx, addDelta)
if err != nil {
t.Errorf("Unexpected error: %s\n", err)
return
}

val, err := lww.Value(ctx)
if err != nil {
t.Errorf("Unexpected error: %s", err)
return
}

expectedVal := []byte("test")
if string(val) != string(expectedVal) {
t.Errorf("Mismatch value for LWWRegister, was %s want %s", val, expectedVal)
}
}

func TestLWWRegisterFollowupMerge(t *testing.T) {
ctx := context.Background()
lww := setupLoadedLWWRegister(t, ctx)
addDelta := lww.Set([]byte("test2"))
addDelta.SetPriority(2)
err := lww.Merge(ctx, addDelta)
require.NoError(t, err)

val, err := lww.Value(ctx)
if err != nil {
t.Error(err)
}

if string(val) != string([]byte("test2")) {
t.Errorf("Incorrect merge state, want %s, have %s", []byte("test2"), val)
}
}

func TestLWWRegisterOldMerge(t *testing.T) {
ctx := context.Background()
lww := setupLoadedLWWRegister(t, ctx)
addDelta := lww.Set([]byte("test-1"))
addDelta.SetPriority(0)
err := lww.Merge(ctx, addDelta)
require.NoError(t, err)

val, err := lww.Value(ctx)
if err != nil {
t.Error(err)
}

if string(val) != string([]byte("test")) {
t.Errorf("Incorrect merge state, want %s, have %s", []byte("test"), val)
}
}

func TestLWWRegisterDeltaInit(t *testing.T) {
delta := &LWWRegDelta{}

Expand Down
9 changes: 0 additions & 9 deletions internal/core/replicated.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,10 @@ package core

import (
"context"

cid "github.com/ipld/go-ipld-prime/linking/cid"
)

// ReplicatedData is a data type that allows concurrent writers to deterministically merge other
// replicated data so as to converge on the same state.
type ReplicatedData interface {
Merge(ctx context.Context, other Delta) error
Value(ctx context.Context) ([]byte, error)
}

// PersistedReplicatedData persists a ReplicatedData to an underlying datastore.
type PersistedReplicatedData interface {
ReplicatedData
Publish(Delta) (cid.Link, error)
}
Loading

0 comments on commit b4b2bf2

Please sign in to comment.