Skip to content

Commit

Permalink
add async versions of txn callbacks
Browse files Browse the repository at this point in the history
  • Loading branch information
nasdf committed Jun 10, 2024
1 parent 05a0932 commit 5ccea09
Show file tree
Hide file tree
Showing 8 changed files with 296 additions and 85 deletions.
31 changes: 7 additions & 24 deletions datastore/concurrent_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ import (
"sync"

ds "github.com/ipfs/go-datastore"

"github.com/sourcenetwork/defradb/datastore/iterable"
)

type concurrentTxn struct {
Expand All @@ -32,31 +30,16 @@ type concurrentTxn struct {

// NewConcurrentTxnFrom creates a new Txn from rootstore that supports concurrent API calls
func NewConcurrentTxnFrom(ctx context.Context, rootstore ds.TxnDatastore, id uint64, readonly bool) (Txn, error) {
var rootTxn ds.Txn
var err error

// check if our datastore natively supports iterable transaction, transactions or batching
if iterableTxnStore, ok := rootstore.(iterable.IterableTxnDatastore); ok {
rootTxn, err = iterableTxnStore.NewIterableTransaction(ctx, readonly)
if err != nil {
return nil, err
}
} else {
rootTxn, err = rootstore.NewTransaction(ctx, readonly)
if err != nil {
return nil, err
}
rootTxn, err := newTxnFrom(ctx, rootstore, readonly)
if err != nil {
return nil, err
}

rootConcurentTxn := &concurrentTxn{Txn: rootTxn}
multistore := MultiStoreFrom(rootConcurentTxn)
return &txn{
rootConcurentTxn,
multistore,
id,
[]func(){},
[]func(){},
[]func(){},
t: rootConcurentTxn,
MultiStore: multistore,
id: id,
}, nil
}

Expand Down Expand Up @@ -90,7 +73,7 @@ func (t *concurrentTxn) Put(ctx context.Context, key ds.Key, value []byte) error

// Sync executes the transaction.
func (t *concurrentTxn) Sync(ctx context.Context, prefix ds.Key) error {
return t.Txn.Commit(ctx)
return t.Commit(ctx)
}

// Close discards the transaction.
Expand Down
99 changes: 99 additions & 0 deletions datastore/mocks/txn.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

125 changes: 68 additions & 57 deletions datastore/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,105 +43,116 @@ type Txn interface {

// OnDiscard registers a function to be called when the transaction is discarded.
OnDiscard(fn func())

// OnSuccessAsync registers a function to be called asynchronously when the transaction is committed.
OnSuccessAsync(fn func())

// OnErrorAsync registers a function to be called asynchronously when the transaction is rolled back.
OnErrorAsync(fn func())

// OnDiscardAsync registers a function to be called asynchronously when the transaction is discarded.
OnDiscardAsync(fn func())
}

type txn struct {
t ds.Txn
MultiStore

t ds.Txn
id uint64

successFns []func()
errorFns []func()
discardFns []func()

successAsyncFns []func()
errorAsyncFns []func()
discardAsyncFns []func()
}

var _ Txn = (*txn)(nil)

// NewTxnFrom returns a new Txn from the rootstore.
func NewTxnFrom(ctx context.Context, rootstore ds.TxnDatastore, id uint64, readonly bool) (Txn, error) {
func newTxnFrom(ctx context.Context, rootstore ds.TxnDatastore, readonly bool) (ds.Txn, error) {
// check if our datastore natively supports iterable transaction, transactions or batching
if iterableTxnStore, ok := rootstore.(iterable.IterableTxnDatastore); ok {
rootTxn, err := iterableTxnStore.NewIterableTransaction(ctx, readonly)
if err != nil {
return nil, err
}
multistore := MultiStoreFrom(ShimTxnStore{rootTxn})
return &txn{
rootTxn,
multistore,
id,
[]func(){},
[]func(){},
[]func(){},
}, nil
switch t := rootstore.(type) {
case iterable.IterableTxnDatastore:
return t.NewIterableTransaction(ctx, readonly)

default:
return rootstore.NewTransaction(ctx, readonly)
}
}

rootTxn, err := rootstore.NewTransaction(ctx, readonly)
// NewTxnFrom returns a new Txn from the rootstore.
func NewTxnFrom(ctx context.Context, rootstore ds.TxnDatastore, id uint64, readonly bool) (Txn, error) {
rootTxn, err := newTxnFrom(ctx, rootstore, readonly)
if err != nil {
return nil, err
}

multistore := MultiStoreFrom(ShimTxnStore{rootTxn})
return &txn{
rootTxn,
multistore,
id,
[]func(){},
[]func(){},
[]func(){},
t: rootTxn,
MultiStore: multistore,
id: id,
}, nil
}

// ID returns the unique immutable identifier for this transaction.
func (t *txn) ID() uint64 {
return t.id
}

// Commit finalizes a transaction, attempting to commit it to the Datastore.
func (t *txn) Commit(ctx context.Context) error {
if err := t.t.Commit(ctx); err != nil {
runFns(t.errorFns)
return err
var fns []func()
var asyncFns []func()

err := t.t.Commit(ctx)
if err != nil {
fns = t.errorFns
asyncFns = t.errorAsyncFns
} else {
fns = t.successFns
asyncFns = t.successAsyncFns
}
runFns(t.successFns)
return nil

for _, fn := range fns {
fn()
}
for _, fn := range asyncFns {
go fn()
}
return err
}

// Discard throws away changes recorded in a transaction without committing.
func (t *txn) Discard(ctx context.Context) {
t.t.Discard(ctx)
runFns(t.discardFns)
for _, fn := range t.discardFns {
fn()
}
for _, fn := range t.discardAsyncFns {
go fn()
}
}

// OnSuccess registers a function to be called when the transaction is committed.
func (txn *txn) OnSuccess(fn func()) {
if fn == nil {
return
}
txn.successFns = append(txn.successFns, fn)
func (t *txn) OnSuccess(fn func()) {
t.successFns = append(t.successFns, fn)
}

// OnError registers a function to be called when the transaction is rolled back.
func (txn *txn) OnError(fn func()) {
if fn == nil {
return
}
txn.errorFns = append(txn.errorFns, fn)
func (t *txn) OnError(fn func()) {
t.errorFns = append(t.errorFns, fn)
}

// OnDiscard registers a function to be called when the transaction is discarded.
func (txn *txn) OnDiscard(fn func()) {
if fn == nil {
return
}
txn.discardFns = append(txn.discardFns, fn)
func (t *txn) OnDiscard(fn func()) {
t.discardFns = append(t.discardFns, fn)
}

func runFns(fns []func()) {
for _, fn := range fns {
fn()
}
func (t *txn) OnSuccessAsync(fn func()) {
t.successAsyncFns = append(t.successAsyncFns, fn)
}

func (t *txn) OnErrorAsync(fn func()) {
t.errorAsyncFns = append(t.errorAsyncFns, fn)
}

func (t *txn) OnDiscardAsync(fn func()) {
t.discardAsyncFns = append(t.discardAsyncFns, fn)
}

// Shim to make ds.Txn support ds.Datastore.
Expand Down
Loading

0 comments on commit 5ccea09

Please sign in to comment.