Skip to content

Commit

Permalink
Extract core/keys.go to multiple files
Browse files Browse the repository at this point in the history
Original file had grown way to large and was getting quite painful to find stuff in it, and I need to add new types to it shortly.
  • Loading branch information
AndrewSisley committed Oct 29, 2024
1 parent 6ed8222 commit 1028cc9
Show file tree
Hide file tree
Showing 71 changed files with 2,066 additions and 1,633 deletions.
6 changes: 3 additions & 3 deletions internal/core/crdt/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ import (

"github.com/sourcenetwork/defradb/datastore"
"github.com/sourcenetwork/defradb/errors"
"github.com/sourcenetwork/defradb/internal/core"
"github.com/sourcenetwork/defradb/internal/keys"
)

func setPriority(
ctx context.Context,
store datastore.DSReaderWriter,
key core.DataStoreKey,
key keys.DataStoreKey,
priority uint64,
) error {
prioK := key.WithPriorityFlag()
Expand All @@ -41,7 +41,7 @@ func setPriority(
func getPriority(
ctx context.Context,
store datastore.DSReaderWriter,
key core.DataStoreKey,
key keys.DataStoreKey,
) (uint64, error) {
pKey := key.WithPriorityFlag()
pbuf, err := store.Get(ctx, pKey.ToDS())
Expand Down
10 changes: 5 additions & 5 deletions internal/core/crdt/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,22 @@ import (
ds "github.com/ipfs/go-datastore"

"github.com/sourcenetwork/defradb/datastore"
"github.com/sourcenetwork/defradb/internal/core"
"github.com/sourcenetwork/defradb/internal/keys"
)

func newDS() datastore.DSReaderWriter {
return datastore.AsDSReaderWriter(ds.NewMapDatastore())
}

func TestBaseCRDTvalueKey(t *testing.T) {
vk := core.DataStoreKey{}.WithDocID("mykey").WithValueFlag()
vk := keys.DataStoreKey{}.WithDocID("mykey").WithValueFlag()
if vk.ToString() != "/v/mykey" {
t.Errorf("Incorrect valueKey. Have %v, want %v", vk.ToString(), "/v/mykey")
}
}

func TestBaseCRDTprioryKey(t *testing.T) {
pk := core.DataStoreKey{}.WithDocID("mykey").WithPriorityFlag()
pk := keys.DataStoreKey{}.WithDocID("mykey").WithPriorityFlag()
if pk.ToString() != "/p/mykey" {
t.Errorf("Incorrect priorityKey. Have %v, want %v", pk.ToString(), "/p/mykey")
}
Expand All @@ -42,13 +42,13 @@ func TestBaseCRDTSetGetPriority(t *testing.T) {
store := newDS()

ctx := context.Background()
err := setPriority(ctx, store, core.DataStoreKey{}.WithDocID("mykey"), 10)
err := setPriority(ctx, store, keys.DataStoreKey{}.WithDocID("mykey"), 10)
if err != nil {
t.Errorf("baseCRDT failed to set Priority. err: %v", err)
return
}

priority, err := getPriority(ctx, store, core.DataStoreKey{}.WithDocID("mykey"))
priority, err := getPriority(ctx, store, keys.DataStoreKey{}.WithDocID("mykey"))
if err != nil {
t.Errorf("baseCRDT failed to get priority. err: %v", err)
return
Expand Down
17 changes: 9 additions & 8 deletions internal/core/crdt/composite.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/sourcenetwork/defradb/errors"
"github.com/sourcenetwork/defradb/internal/core"
"github.com/sourcenetwork/defradb/internal/db/base"
"github.com/sourcenetwork/defradb/internal/keys"
)

// CompositeDAGDelta represents a delta-state update made of sub-MerkleCRDTs.
Expand Down Expand Up @@ -77,20 +78,20 @@ func (delta *CompositeDAGDelta) SetPriority(prio uint64) {
// CompositeDAG is a CRDT structure that is used to track a collection of sub MerkleCRDTs.
type CompositeDAG struct {
store datastore.DSReaderWriter
key core.DataStoreKey
key keys.DataStoreKey

// schemaVersionKey is the schema version datastore key at the time of commit.
//
// It can be used to identify the collection datastructure state at the time of commit.
schemaVersionKey core.CollectionSchemaVersionKey
schemaVersionKey keys.CollectionSchemaVersionKey
}

var _ core.ReplicatedData = (*CompositeDAG)(nil)

func NewCompositeDAG(
store datastore.DSReaderWriter,
schemaVersionKey core.CollectionSchemaVersionKey,
key core.DataStoreKey,
schemaVersionKey keys.CollectionSchemaVersionKey,
key keys.DataStoreKey,
) CompositeDAG {
return CompositeDAG{
store: store,
Expand Down Expand Up @@ -125,7 +126,7 @@ func (c CompositeDAG) Merge(ctx context.Context, delta core.Delta) error {
// We cannot rely on the dagDelta.Status here as it may have been deleted locally, this is not
// reflected in `dagDelta.Status` if sourced via P2P. Updates synced via P2P should not undelete
// the local representation of the document.
versionKey := c.key.WithValueFlag().WithFieldID(core.DATASTORE_DOC_VERSION_FIELD_ID)
versionKey := c.key.WithValueFlag().WithFieldID(keys.DATASTORE_DOC_VERSION_FIELD_ID)
objectMarker, err := c.store.Get(ctx, c.key.ToPrimaryDataStoreKey().ToDS())
hasObjectMarker := !errors.Is(err, ds.ErrNotFound)
if err != nil && hasObjectMarker {
Expand Down Expand Up @@ -159,7 +160,7 @@ func (c CompositeDAG) Merge(ctx context.Context, delta core.Delta) error {
return nil
}

func (c CompositeDAG) deleteWithPrefix(ctx context.Context, key core.DataStoreKey) error {
func (c CompositeDAG) deleteWithPrefix(ctx context.Context, key keys.DataStoreKey) error {
q := query.Query{
Prefix: key.ToString(),
}
Expand All @@ -168,12 +169,12 @@ func (c CompositeDAG) deleteWithPrefix(ctx context.Context, key core.DataStoreKe
if e.Error != nil {
return err
}
dsKey, err := core.NewDataStoreKey(e.Key)
dsKey, err := keys.NewDataStoreKey(e.Key)
if err != nil {
return err
}

if dsKey.InstanceType == core.ValueKey {
if dsKey.InstanceType == keys.ValueKey {
err = c.store.Put(ctx, dsKey.WithDeletedFlag().ToDS(), e.Value)
if err != nil {
return err
Expand Down
13 changes: 7 additions & 6 deletions internal/core/crdt/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/sourcenetwork/defradb/errors"
"github.com/sourcenetwork/defradb/internal/core"
"github.com/sourcenetwork/defradb/internal/db/base"
"github.com/sourcenetwork/defradb/internal/keys"
)

type Incrementable interface {
Expand Down Expand Up @@ -78,12 +79,12 @@ func (delta *CounterDelta) SetPriority(prio uint64) {
// of an Int and Float data types that ensures convergence.
type Counter struct {
store datastore.DSReaderWriter
key core.DataStoreKey
key keys.DataStoreKey

// schemaVersionKey is the schema version datastore key at the time of commit.
//
// It can be used to identify the collection datastructure state at the time of commit.
schemaVersionKey core.CollectionSchemaVersionKey
schemaVersionKey keys.CollectionSchemaVersionKey

// fieldName holds the name of the field hosting this CRDT, if this is a field level
// commit.
Expand All @@ -98,8 +99,8 @@ var _ core.ReplicatedData = (*Counter)(nil)
// NewCounter returns a new instance of the Counter with the given ID.
func NewCounter(
store datastore.DSReaderWriter,
schemaVersionKey core.CollectionSchemaVersionKey,
key core.DataStoreKey,
schemaVersionKey keys.CollectionSchemaVersionKey,
key keys.DataStoreKey,
fieldName string,
allowDecrement bool,
kind client.ScalarKind,
Expand Down Expand Up @@ -205,7 +206,7 @@ func (c Counter) CType() client.CType {
func validateAndIncrement[T Incrementable](
ctx context.Context,
store datastore.DSReaderWriter,
key core.DataStoreKey,
key keys.DataStoreKey,
valueAsBytes []byte,
allowDecrement bool,
) ([]byte, error) {
Expand All @@ -230,7 +231,7 @@ func validateAndIncrement[T Incrementable](
func getCurrentValue[T Incrementable](
ctx context.Context,
store datastore.DSReaderWriter,
key core.DataStoreKey,
key keys.DataStoreKey,
) (T, error) {
curValue, err := store.Get(ctx, key.ToDS())
if err != nil {
Expand Down
9 changes: 5 additions & 4 deletions internal/core/crdt/lwwreg.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/sourcenetwork/defradb/errors"
"github.com/sourcenetwork/defradb/internal/core"
"github.com/sourcenetwork/defradb/internal/db/base"
"github.com/sourcenetwork/defradb/internal/keys"
)

// LWWRegDelta is a single delta operation for an LWWRegister
Expand Down Expand Up @@ -66,12 +67,12 @@ func (delta *LWWRegDelta) SetPriority(prio uint64) {
// of an arbitrary data type that ensures convergence.
type LWWRegister struct {
store datastore.DSReaderWriter
key core.DataStoreKey
key keys.DataStoreKey

// schemaVersionKey is the schema version datastore key at the time of commit.
//
// It can be used to identify the collection datastructure state at the time of commit.
schemaVersionKey core.CollectionSchemaVersionKey
schemaVersionKey keys.CollectionSchemaVersionKey

// fieldName holds the name of the field hosting this CRDT, if this is a field level
// commit.
Expand All @@ -83,8 +84,8 @@ var _ core.ReplicatedData = (*LWWRegister)(nil)
// NewLWWRegister returns a new instance of the LWWReg with the given ID.
func NewLWWRegister(
store datastore.DSReaderWriter,
schemaVersionKey core.CollectionSchemaVersionKey,
key core.DataStoreKey,
schemaVersionKey keys.CollectionSchemaVersionKey,
key keys.DataStoreKey,
fieldName string,
) LWWRegister {
return LWWRegister{
Expand Down
5 changes: 3 additions & 2 deletions internal/core/crdt/lwwreg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/sourcenetwork/defradb/datastore"
"github.com/sourcenetwork/defradb/internal/core"
"github.com/sourcenetwork/defradb/internal/keys"
)

func newMockStore() datastore.DSReaderWriter {
Expand All @@ -26,8 +27,8 @@ func newMockStore() datastore.DSReaderWriter {

func setupLWWRegister() LWWRegister {
store := newMockStore()
key := core.DataStoreKey{DocID: "AAAA-BBBB"}
return NewLWWRegister(store, core.CollectionSchemaVersionKey{}, key, "")
key := keys.DataStoreKey{DocID: "AAAA-BBBB"}
return NewLWWRegister(store, keys.CollectionSchemaVersionKey{}, key, "")
}

func TestLWWRegisterAddDelta(t *testing.T) {
Expand Down
22 changes: 13 additions & 9 deletions internal/core/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,40 +10,44 @@

package core

import "strings"
import (
"strings"

"github.com/sourcenetwork/defradb/internal/keys"
)

// Span is a range of keys from [Start, End).
type Span interface {
// Start returns the starting key of the Span.
Start() DataStoreKey
Start() keys.DataStoreKey
// End returns the ending key of the Span.
End() DataStoreKey
End() keys.DataStoreKey
// Compare returns -1 if the provided span is less, 0 if it is equal, and 1 if its greater.
Compare(Span) SpanComparisonResult
}

type span struct {
start DataStoreKey
end DataStoreKey
start keys.DataStoreKey
end keys.DataStoreKey
}

var _ Span = span{}

// NewSpan creates a new Span from the provided start and end keys.
func NewSpan(start, end DataStoreKey) Span {
func NewSpan(start, end keys.DataStoreKey) Span {
return span{
start: start,
end: end,
}
}

// Start returns the starting key of the Span.
func (s span) Start() DataStoreKey {
func (s span) Start() keys.DataStoreKey {
return s.start
}

// End returns the ending key of the Span.
func (s span) End() DataStoreKey {
func (s span) End() keys.DataStoreKey {
return s.end
}

Expand Down Expand Up @@ -136,7 +140,7 @@ func (this span) Compare(other Span) SpanComparisonResult {
return After
}

func isAdjacent(this DataStoreKey, other DataStoreKey) bool {
func isAdjacent(this keys.DataStoreKey, other keys.DataStoreKey) bool {
return len(this.ToString()) == len(other.ToString()) &&
(this.PrefixEnd().ToString() == other.ToString() ||
this.ToString() == other.PrefixEnd().ToString())
Expand Down
Loading

0 comments on commit 1028cc9

Please sign in to comment.