Skip to content

Commit

Permalink
Merge branch 'develop' into nasdf/fix/merge-retry-logic
Browse files Browse the repository at this point in the history
  • Loading branch information
nasdf authored Jun 17, 2024
2 parents 01f5157 + 6626441 commit d9bbd87
Show file tree
Hide file tree
Showing 33 changed files with 846 additions and 501 deletions.
7 changes: 2 additions & 5 deletions datastore/badger/v4/datastore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1130,7 +1130,7 @@ func TestTxnWithConflict(t *testing.T) {
require.ErrorIs(t, err, ErrTxnConflict)
}

func TestTxnWithConflictAfterDelete(t *testing.T) {
func TestTxnWithNoConflictAfterDelete(t *testing.T) {
ctx := context.Background()
s := newLoadedDatastore(ctx, t)
defer func() {
Expand All @@ -1144,9 +1144,6 @@ func TestTxnWithConflictAfterDelete(t *testing.T) {
tx2, err := s.NewTransaction(ctx, false)
require.NoError(t, err)

_, err = tx.GetSize(ctx, testKey2)
require.NoError(t, err)

err = tx.Put(ctx, testKey2, testValue3)
require.NoError(t, err)

Expand All @@ -1157,7 +1154,7 @@ func TestTxnWithConflictAfterDelete(t *testing.T) {
require.NoError(t, err)

err = tx.Commit(ctx)
require.ErrorIs(t, err, ErrTxnConflict)
require.NoError(t, err)
}

func TestTxnWithNoConflictAfterGet(t *testing.T) {
Expand Down
21 changes: 16 additions & 5 deletions datastore/memory/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ func (t *basicTxn) get(ctx context.Context, key ds.Key) dsItem {
if result.key == "" {
result = t.ds.get(ctx, key, t.getDSVersion())
result.isGet = true
if result.key == "" {
// If the datastore doesn't have the item, we still need to track it
// to check for merge conflicts.
result.key = key.String()
}
t.ops.Set(result)
}
return result
Expand All @@ -97,7 +102,7 @@ func (t *basicTxn) Get(ctx context.Context, key ds.Key) ([]byte, error) {
return nil, ErrTxnDiscarded
}
result := t.get(ctx, key)
if result.key == "" || result.isDeleted {
if result.version == 0 || result.isDeleted {
return nil, ds.ErrNotFound
}
return result.val, nil
Expand All @@ -115,7 +120,7 @@ func (t *basicTxn) GetSize(ctx context.Context, key ds.Key) (size int, err error
return 0, ErrTxnDiscarded
}
result := t.get(ctx, key)
if result.key == "" || result.isDeleted {
if result.version == 0 || result.isDeleted {
return 0, ds.ErrNotFound
}
return len(result.val), nil
Expand All @@ -133,7 +138,7 @@ func (t *basicTxn) Has(ctx context.Context, key ds.Key) (exists bool, err error)
return false, ErrTxnDiscarded
}
result := t.get(ctx, key)
if result.key == "" || result.isDeleted {
if result.version == 0 || result.isDeleted {
return false, nil
}
return true, nil
Expand Down Expand Up @@ -270,8 +275,14 @@ func (t *basicTxn) checkForConflicts(ctx context.Context) error {
iter := t.ops.Iter()
defer iter.Release()
for iter.Next() {
expectedItem := t.ds.get(ctx, ds.NewKey(iter.Item().key), t.getDSVersion())
latestItem := t.ds.get(ctx, ds.NewKey(iter.Item().key), t.ds.getVersion())
item := iter.Item()
if !item.isGet {
// Conflict should only occur if an item has been updated
// after we've read it within the transaction.
continue
}
expectedItem := t.ds.get(ctx, ds.NewKey(item.key), t.getDSVersion())
latestItem := t.ds.get(ctx, ds.NewKey(item.key), t.ds.getVersion())
if latestItem.version != expectedItem.version {
return ErrTxnConflict
}
Expand Down
15 changes: 10 additions & 5 deletions datastore/memory/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -707,11 +707,16 @@ func TestTxnWithConflict(t *testing.T) {
require.NoError(t, err)
}()

tx := s.newTransaction(false)
tx, err := s.NewTransaction(ctx, false)
require.NoError(t, err)

tx2 := s.newTransaction(false)
tx2, err := s.NewTransaction(ctx, false)
require.NoError(t, err)

err := tx.Put(ctx, testKey3, testValue3)
_, err = tx.GetSize(ctx, testKey3)
require.ErrorIs(t, err, ds.ErrNotFound)

err = tx.Put(ctx, testKey3, testValue3)
require.NoError(t, err)

err = tx2.Put(ctx, testKey3, testValue4)
Expand All @@ -724,7 +729,7 @@ func TestTxnWithConflict(t *testing.T) {
require.ErrorIs(t, err, ErrTxnConflict)
}

func TestTxnWithConflictAfterDelete(t *testing.T) {
func TestTxnWithNoConflictAfterDelete(t *testing.T) {
ctx := context.Background()
s := newLoadedDatastore(ctx)
defer func() {
Expand All @@ -746,7 +751,7 @@ func TestTxnWithConflictAfterDelete(t *testing.T) {
require.NoError(t, err)

err = tx.Commit(ctx)
require.ErrorIs(t, err, ErrTxnConflict)
require.NoError(t, err)
}

func TestTxnWithConflictAfterGet(t *testing.T) {
Expand Down
17 changes: 4 additions & 13 deletions datastore/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,7 @@ func TestShimTxnStoreClose(t *testing.T) {
require.NoError(t, err)
}

// This test documents https://github.com/sourcenetwork/defradb/issues/2673
func TestMemoryStoreTxn_TwoTransactionsWithPutConflict_ShouldErrorWithConflict(t *testing.T) {
func TestMemoryStoreTxn_TwoTransactionsWithPutConflict_ShouldSucceed(t *testing.T) {
ctx := context.Background()
rootstore := memory.NewDatastore(ctx)

Expand All @@ -223,7 +222,7 @@ func TestMemoryStoreTxn_TwoTransactionsWithPutConflict_ShouldErrorWithConflict(t
require.NoError(t, err)

err = txn1.Commit(ctx)
require.ErrorIs(t, err, badger.ErrConflict)
require.NoError(t, err)
}

func TestMemoryStoreTxn_TwoTransactionsWithGetPutConflict_ShouldErrorWithConflict(t *testing.T) {
Expand Down Expand Up @@ -284,8 +283,7 @@ func TestMemoryStoreTxn_TwoTransactionsWithHasPutConflict_ShouldErrorWithConflic
require.ErrorIs(t, err, badger.ErrConflict)
}

// This test documents https://github.com/sourcenetwork/defradb/issues/2673
func TestBadgerMemoryStoreTxn_TwoTransactionsWithPutConflict_ShouldErrorWithConflict(t *testing.T) {
func TestBadgerMemoryStoreTxn_TwoTransactionsWithPutConflict_ShouldSucceed(t *testing.T) {
ctx := context.Background()
opts := badgerds.Options{Options: badger.DefaultOptions("").WithInMemory(true)}
rootstore, err := badgerds.NewDatastore("", &opts)
Expand All @@ -308,9 +306,6 @@ func TestBadgerMemoryStoreTxn_TwoTransactionsWithPutConflict_ShouldErrorWithConf
require.NoError(t, err)

err = txn1.Commit(ctx)
// We are expecting this to fail because of the conflict but badger does not return an error.
// Conflicts in badger only occurs when the value of a key was changed between the time you read and you rewrite it.
// require.ErrorIs(t, err, badger.ErrConflict)
require.NoError(t, err)
}

Expand Down Expand Up @@ -376,8 +371,7 @@ func TestBadgerMemoryStoreTxn_TwoTransactionsWithHasPutConflict_ShouldErrorWithC
require.ErrorIs(t, err, badger.ErrConflict)
}

// This test documents https://github.com/sourcenetwork/defradb/issues/2673
func TestBadgerFileStoreTxn_TwoTransactionsWithPutConflict_ShouldErrorWithConflict(t *testing.T) {
func TestBadgerFileStoreTxn_TwoTransactionsWithPutConflict_ShouldSucceed(t *testing.T) {
ctx := context.Background()
opts := badgerds.Options{Options: badger.DefaultOptions("")}
rootstore, err := badgerds.NewDatastore(t.TempDir(), &opts)
Expand All @@ -400,9 +394,6 @@ func TestBadgerFileStoreTxn_TwoTransactionsWithPutConflict_ShouldErrorWithConfli
require.NoError(t, err)

err = txn1.Commit(ctx)
// We are expecting this to fail because of the conflict but badger does not return an error.
// Conflicts in badger only occurs when the value of a key was changed between the time you read and you rewrite it.
// require.ErrorIs(t, err, badger.ErrConflict)
require.NoError(t, err)
}

Expand Down
169 changes: 91 additions & 78 deletions internal/db/collection_define.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,113 +25,126 @@ import (
"github.com/sourcenetwork/defradb/internal/db/description"
)

func (db *db) createCollection(
func (db *db) createCollections(
ctx context.Context,
def client.CollectionDefinition,
newDefinitions []client.CollectionDefinition,
) (client.Collection, error) {
schema := def.Schema
desc := def.Description
txn := mustGetContextTxn(ctx)

if desc.Name.HasValue() {
exists, err := description.HasCollectionByName(ctx, txn, desc.Name.Value())
if err != nil {
return nil, err
}
if exists {
return nil, ErrCollectionAlreadyExists
}
}
) ([]client.CollectionDefinition, error) {
returnDescriptions := make([]client.CollectionDefinition, len(newDefinitions))

existingDefinitions, err := db.getAllActiveDefinitions(ctx)
if err != nil {
return nil, err
}

schemaByName := map[string]client.SchemaDescription{}
for _, existingDefinition := range existingDefinitions {
schemaByName[existingDefinition.Schema.Name] = existingDefinition.Schema
}
for _, newDefinition := range newDefinitions {
schemaByName[newDefinition.Schema.Name] = newDefinition.Schema
}
txn := mustGetContextTxn(ctx)

_, err = validateUpdateSchemaFields(schemaByName, client.SchemaDescription{}, schema)
if err != nil {
return nil, err
}
for i, def := range newDefinitions {
schemaByName := map[string]client.SchemaDescription{}
for _, existingDefinition := range existingDefinitions {
schemaByName[existingDefinition.Schema.Name] = existingDefinition.Schema
}
for _, newDefinition := range newDefinitions {
schemaByName[newDefinition.Schema.Name] = newDefinition.Schema
}

definitionsByName := map[string]client.CollectionDefinition{}
for _, existingDefinition := range existingDefinitions {
definitionsByName[existingDefinition.GetName()] = existingDefinition
}
for _, newDefinition := range newDefinitions {
definitionsByName[newDefinition.GetName()] = newDefinition
}
err = db.validateNewCollection(def, definitionsByName)
if err != nil {
return nil, err
schema, err := description.CreateSchemaVersion(ctx, txn, def.Schema)
if err != nil {
return nil, err
}
newDefinitions[i].Description.SchemaVersionID = schema.VersionID
newDefinitions[i].Schema = schema
}

colSeq, err := db.getSequence(ctx, core.CollectionIDSequenceKey{})
if err != nil {
return nil, err
}
colID, err := colSeq.next(ctx)
if err != nil {
return nil, err
}
for i, def := range newDefinitions {
if len(def.Description.Fields) == 0 {
// This is a schema-only definition, we should not create a collection for it
continue
}

fieldSeq, err := db.getSequence(ctx, core.NewFieldIDSequenceKey(uint32(colID)))
if err != nil {
return nil, err
}
colSeq, err := db.getSequence(ctx, core.CollectionIDSequenceKey{})
if err != nil {
return nil, err
}
colID, err := colSeq.next(ctx)
if err != nil {
return nil, err
}

desc.ID = uint32(colID)
desc.RootID = desc.ID
fieldSeq, err := db.getSequence(ctx, core.NewFieldIDSequenceKey(uint32(colID)))
if err != nil {
return nil, err
}

schema, err = description.CreateSchemaVersion(ctx, txn, schema)
if err != nil {
return nil, err
}
desc.SchemaVersionID = schema.VersionID
for _, localField := range desc.Fields {
var fieldID uint64
if localField.Name == request.DocIDFieldName {
// There is no hard technical requirement for this, we just think it looks nicer
// if the doc id is at the zero index. It makes it look a little nicer in commit
// queries too.
fieldID = 0
} else {
fieldID, err = fieldSeq.next(ctx)
if err != nil {
return nil, err
newDefinitions[i].Description.ID = uint32(colID)
newDefinitions[i].Description.RootID = newDefinitions[i].Description.ID

for _, localField := range def.Description.Fields {
var fieldID uint64
if localField.Name == request.DocIDFieldName {
// There is no hard technical requirement for this, we just think it looks nicer
// if the doc id is at the zero index. It makes it look a little nicer in commit
// queries too.
fieldID = 0
} else {
fieldID, err = fieldSeq.next(ctx)
if err != nil {
return nil, err
}
}
}

for i := range desc.Fields {
if desc.Fields[i].Name == localField.Name {
desc.Fields[i].ID = client.FieldID(fieldID)
break
for j := range def.Description.Fields {
if def.Description.Fields[j].Name == localField.Name {
newDefinitions[i].Description.Fields[j].ID = client.FieldID(fieldID)
break
}
}
}
}

desc, err = description.SaveCollection(ctx, txn, desc)
err = db.validateNewCollection(
ctx,
append(
append(
[]client.CollectionDefinition{},
newDefinitions...,
),
existingDefinitions...,
),
existingDefinitions,
)
if err != nil {
return nil, err
}

col := db.newCollection(desc, schema)
for _, def := range newDefinitions {
if len(def.Description.Fields) == 0 {
// This is a schema-only definition, we should not create a collection for it
returnDescriptions = append(returnDescriptions, def)
continue
}

desc, err := description.SaveCollection(ctx, txn, def.Description)
if err != nil {
return nil, err
}

col := db.newCollection(desc, def.Schema)

for _, index := range desc.Indexes {
if _, err := col.createIndex(ctx, index); err != nil {
return nil, err
}
}

for _, index := range desc.Indexes {
if _, err := col.createIndex(ctx, index); err != nil {
result, err := db.getCollectionByID(ctx, desc.ID)
if err != nil {
return nil, err
}

returnDescriptions = append(returnDescriptions, result.Definition())
}

return db.getCollectionByID(ctx, desc.ID)
return returnDescriptions, nil
}

func (db *db) patchCollection(
Expand Down Expand Up @@ -171,7 +184,7 @@ func (db *db) patchCollection(
return err
}

err = db.validateCollectionChanges(existingColsByID, newColsByID)
err = db.validateCollectionChanges(ctx, cols, newColsByID)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit d9bbd87

Please sign in to comment.