From 5ccea09fb5203c7841a67d3d243803351676cc8d Mon Sep 17 00:00:00 2001 From: Keenan Nemetz Date: Mon, 10 Jun 2024 14:57:51 -0700 Subject: [PATCH] add async versions of txn callbacks --- datastore/concurrent_txn.go | 31 ++----- datastore/mocks/txn.go | 99 +++++++++++++++++++++++ datastore/txn.go | 125 ++++++++++++++++------------- datastore/txn_test.go | 87 +++++++++++++++++++- http/client_tx.go | 12 +++ tests/bench/query/planner/utils.go | 3 + tests/clients/cli/wrapper_tx.go | 12 +++ tests/clients/http/wrapper_tx.go | 12 +++ 8 files changed, 296 insertions(+), 85 deletions(-) diff --git a/datastore/concurrent_txn.go b/datastore/concurrent_txn.go index f46637e99d..409a26223c 100644 --- a/datastore/concurrent_txn.go +++ b/datastore/concurrent_txn.go @@ -15,8 +15,6 @@ import ( "sync" ds "github.com/ipfs/go-datastore" - - "github.com/sourcenetwork/defradb/datastore/iterable" ) type concurrentTxn struct { @@ -32,31 +30,16 @@ type concurrentTxn struct { // NewConcurrentTxnFrom creates a new Txn from rootstore that supports concurrent API calls func NewConcurrentTxnFrom(ctx context.Context, rootstore ds.TxnDatastore, id uint64, readonly bool) (Txn, error) { - var rootTxn ds.Txn - var err error - - // check if our datastore natively supports iterable transaction, transactions or batching - if iterableTxnStore, ok := rootstore.(iterable.IterableTxnDatastore); ok { - rootTxn, err = iterableTxnStore.NewIterableTransaction(ctx, readonly) - if err != nil { - return nil, err - } - } else { - rootTxn, err = rootstore.NewTransaction(ctx, readonly) - if err != nil { - return nil, err - } + rootTxn, err := newTxnFrom(ctx, rootstore, readonly) + if err != nil { + return nil, err } - rootConcurentTxn := &concurrentTxn{Txn: rootTxn} multistore := MultiStoreFrom(rootConcurentTxn) return &txn{ - rootConcurentTxn, - multistore, - id, - []func(){}, - []func(){}, - []func(){}, + t: rootConcurentTxn, + MultiStore: multistore, + id: id, }, nil } @@ -90,7 +73,7 @@ func (t *concurrentTxn) Put(ctx context.Context, key ds.Key, value []byte) error // Sync executes the transaction. func (t *concurrentTxn) Sync(ctx context.Context, prefix ds.Key) error { - return t.Txn.Commit(ctx) + return t.Commit(ctx) } // Close discards the transaction. diff --git a/datastore/mocks/txn.go b/datastore/mocks/txn.go index 0dc71cb46f..711464dc12 100644 --- a/datastore/mocks/txn.go +++ b/datastore/mocks/txn.go @@ -300,6 +300,39 @@ func (_c *Txn_OnDiscard_Call) RunAndReturn(run func(func())) *Txn_OnDiscard_Call return _c } +// OnDiscardAsync provides a mock function with given fields: fn +func (_m *Txn) OnDiscardAsync(fn func()) { + _m.Called(fn) +} + +// Txn_OnDiscardAsync_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'OnDiscardAsync' +type Txn_OnDiscardAsync_Call struct { + *mock.Call +} + +// OnDiscardAsync is a helper method to define mock.On call +// - fn func() +func (_e *Txn_Expecter) OnDiscardAsync(fn interface{}) *Txn_OnDiscardAsync_Call { + return &Txn_OnDiscardAsync_Call{Call: _e.mock.On("OnDiscardAsync", fn)} +} + +func (_c *Txn_OnDiscardAsync_Call) Run(run func(fn func())) *Txn_OnDiscardAsync_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(func())) + }) + return _c +} + +func (_c *Txn_OnDiscardAsync_Call) Return() *Txn_OnDiscardAsync_Call { + _c.Call.Return() + return _c +} + +func (_c *Txn_OnDiscardAsync_Call) RunAndReturn(run func(func())) *Txn_OnDiscardAsync_Call { + _c.Call.Return(run) + return _c +} + // OnError provides a mock function with given fields: fn func (_m *Txn) OnError(fn func()) { _m.Called(fn) @@ -333,6 +366,39 @@ func (_c *Txn_OnError_Call) RunAndReturn(run func(func())) *Txn_OnError_Call { return _c } +// OnErrorAsync provides a mock function with given fields: fn +func (_m *Txn) OnErrorAsync(fn func()) { + _m.Called(fn) +} + +// Txn_OnErrorAsync_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'OnErrorAsync' +type Txn_OnErrorAsync_Call struct { + *mock.Call +} + +// OnErrorAsync is a helper method to define mock.On call +// - fn func() +func (_e *Txn_Expecter) OnErrorAsync(fn interface{}) *Txn_OnErrorAsync_Call { + return &Txn_OnErrorAsync_Call{Call: _e.mock.On("OnErrorAsync", fn)} +} + +func (_c *Txn_OnErrorAsync_Call) Run(run func(fn func())) *Txn_OnErrorAsync_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(func())) + }) + return _c +} + +func (_c *Txn_OnErrorAsync_Call) Return() *Txn_OnErrorAsync_Call { + _c.Call.Return() + return _c +} + +func (_c *Txn_OnErrorAsync_Call) RunAndReturn(run func(func())) *Txn_OnErrorAsync_Call { + _c.Call.Return(run) + return _c +} + // OnSuccess provides a mock function with given fields: fn func (_m *Txn) OnSuccess(fn func()) { _m.Called(fn) @@ -366,6 +432,39 @@ func (_c *Txn_OnSuccess_Call) RunAndReturn(run func(func())) *Txn_OnSuccess_Call return _c } +// OnSuccessAsync provides a mock function with given fields: fn +func (_m *Txn) OnSuccessAsync(fn func()) { + _m.Called(fn) +} + +// Txn_OnSuccessAsync_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'OnSuccessAsync' +type Txn_OnSuccessAsync_Call struct { + *mock.Call +} + +// OnSuccessAsync is a helper method to define mock.On call +// - fn func() +func (_e *Txn_Expecter) OnSuccessAsync(fn interface{}) *Txn_OnSuccessAsync_Call { + return &Txn_OnSuccessAsync_Call{Call: _e.mock.On("OnSuccessAsync", fn)} +} + +func (_c *Txn_OnSuccessAsync_Call) Run(run func(fn func())) *Txn_OnSuccessAsync_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(func())) + }) + return _c +} + +func (_c *Txn_OnSuccessAsync_Call) Return() *Txn_OnSuccessAsync_Call { + _c.Call.Return() + return _c +} + +func (_c *Txn_OnSuccessAsync_Call) RunAndReturn(run func(func())) *Txn_OnSuccessAsync_Call { + _c.Call.Return(run) + return _c +} + // Peerstore provides a mock function with given fields: func (_m *Txn) Peerstore() datastore.DSBatching { ret := _m.Called() diff --git a/datastore/txn.go b/datastore/txn.go index acc7a53193..da052123c3 100644 --- a/datastore/txn.go +++ b/datastore/txn.go @@ -43,105 +43,116 @@ type Txn interface { // OnDiscard registers a function to be called when the transaction is discarded. OnDiscard(fn func()) + + // OnSuccessAsync registers a function to be called asynchronously when the transaction is committed. + OnSuccessAsync(fn func()) + + // OnErrorAsync registers a function to be called asynchronously when the transaction is rolled back. + OnErrorAsync(fn func()) + + // OnDiscardAsync registers a function to be called asynchronously when the transaction is discarded. + OnDiscardAsync(fn func()) } type txn struct { - t ds.Txn MultiStore - + t ds.Txn id uint64 successFns []func() errorFns []func() discardFns []func() + + successAsyncFns []func() + errorAsyncFns []func() + discardAsyncFns []func() } var _ Txn = (*txn)(nil) -// NewTxnFrom returns a new Txn from the rootstore. -func NewTxnFrom(ctx context.Context, rootstore ds.TxnDatastore, id uint64, readonly bool) (Txn, error) { +func newTxnFrom(ctx context.Context, rootstore ds.TxnDatastore, readonly bool) (ds.Txn, error) { // check if our datastore natively supports iterable transaction, transactions or batching - if iterableTxnStore, ok := rootstore.(iterable.IterableTxnDatastore); ok { - rootTxn, err := iterableTxnStore.NewIterableTransaction(ctx, readonly) - if err != nil { - return nil, err - } - multistore := MultiStoreFrom(ShimTxnStore{rootTxn}) - return &txn{ - rootTxn, - multistore, - id, - []func(){}, - []func(){}, - []func(){}, - }, nil + switch t := rootstore.(type) { + case iterable.IterableTxnDatastore: + return t.NewIterableTransaction(ctx, readonly) + + default: + return rootstore.NewTransaction(ctx, readonly) } +} - rootTxn, err := rootstore.NewTransaction(ctx, readonly) +// NewTxnFrom returns a new Txn from the rootstore. +func NewTxnFrom(ctx context.Context, rootstore ds.TxnDatastore, id uint64, readonly bool) (Txn, error) { + rootTxn, err := newTxnFrom(ctx, rootstore, readonly) if err != nil { return nil, err } - multistore := MultiStoreFrom(ShimTxnStore{rootTxn}) return &txn{ - rootTxn, - multistore, - id, - []func(){}, - []func(){}, - []func(){}, + t: rootTxn, + MultiStore: multistore, + id: id, }, nil } -// ID returns the unique immutable identifier for this transaction. func (t *txn) ID() uint64 { return t.id } -// Commit finalizes a transaction, attempting to commit it to the Datastore. func (t *txn) Commit(ctx context.Context) error { - if err := t.t.Commit(ctx); err != nil { - runFns(t.errorFns) - return err + var fns []func() + var asyncFns []func() + + err := t.t.Commit(ctx) + if err != nil { + fns = t.errorFns + asyncFns = t.errorAsyncFns + } else { + fns = t.successFns + asyncFns = t.successAsyncFns } - runFns(t.successFns) - return nil + + for _, fn := range fns { + fn() + } + for _, fn := range asyncFns { + go fn() + } + return err } -// Discard throws away changes recorded in a transaction without committing. func (t *txn) Discard(ctx context.Context) { t.t.Discard(ctx) - runFns(t.discardFns) + for _, fn := range t.discardFns { + fn() + } + for _, fn := range t.discardAsyncFns { + go fn() + } } -// OnSuccess registers a function to be called when the transaction is committed. -func (txn *txn) OnSuccess(fn func()) { - if fn == nil { - return - } - txn.successFns = append(txn.successFns, fn) +func (t *txn) OnSuccess(fn func()) { + t.successFns = append(t.successFns, fn) } -// OnError registers a function to be called when the transaction is rolled back. -func (txn *txn) OnError(fn func()) { - if fn == nil { - return - } - txn.errorFns = append(txn.errorFns, fn) +func (t *txn) OnError(fn func()) { + t.errorFns = append(t.errorFns, fn) } -// OnDiscard registers a function to be called when the transaction is discarded. -func (txn *txn) OnDiscard(fn func()) { - if fn == nil { - return - } - txn.discardFns = append(txn.discardFns, fn) +func (t *txn) OnDiscard(fn func()) { + t.discardFns = append(t.discardFns, fn) } -func runFns(fns []func()) { - for _, fn := range fns { - fn() - } +func (t *txn) OnSuccessAsync(fn func()) { + t.successAsyncFns = append(t.successAsyncFns, fn) +} + +func (t *txn) OnErrorAsync(fn func()) { + t.errorAsyncFns = append(t.errorAsyncFns, fn) +} + +func (t *txn) OnDiscardAsync(fn func()) { + t.discardAsyncFns = append(t.discardAsyncFns, fn) } // Shim to make ds.Txn support ds.Datastore. diff --git a/datastore/txn_test.go b/datastore/txn_test.go index 95c2cf7ef0..1a8623600f 100644 --- a/datastore/txn_test.go +++ b/datastore/txn_test.go @@ -12,6 +12,7 @@ package datastore import ( "context" + "sync" "testing" ds "github.com/ipfs/go-datastore" @@ -57,8 +58,6 @@ func TestOnSuccess(t *testing.T) { txn, err := NewTxnFrom(ctx, rootstore, 0, false) require.NoError(t, err) - txn.OnSuccess(nil) - text := "Source" txn.OnSuccess(func() { text += " Inc" @@ -69,7 +68,7 @@ func TestOnSuccess(t *testing.T) { require.Equal(t, text, "Source Inc") } -func TestOnError(t *testing.T) { +func TestOnSuccessAsync(t *testing.T) { ctx := context.Background() opts := badgerds.Options{Options: badger.DefaultOptions("").WithInMemory(true)} rootstore, err := badgerds.NewDatastore("", &opts) @@ -78,7 +77,25 @@ func TestOnError(t *testing.T) { txn, err := NewTxnFrom(ctx, rootstore, 0, false) require.NoError(t, err) - txn.OnError(nil) + var wg sync.WaitGroup + txn.OnSuccessAsync(func() { + wg.Done() + }) + + wg.Add(1) + err = txn.Commit(ctx) + require.NoError(t, err) + wg.Wait() +} + +func TestOnError(t *testing.T) { + ctx := context.Background() + opts := badgerds.Options{Options: badger.DefaultOptions("").WithInMemory(true)} + rootstore, err := badgerds.NewDatastore("", &opts) + require.NoError(t, err) + + txn, err := NewTxnFrom(ctx, rootstore, 0, false) + require.NoError(t, err) text := "Source" txn.OnError(func() { @@ -94,6 +111,68 @@ func TestOnError(t *testing.T) { require.Equal(t, text, "Source Inc") } +func TestOnErrorAsync(t *testing.T) { + ctx := context.Background() + opts := badgerds.Options{Options: badger.DefaultOptions("").WithInMemory(true)} + rootstore, err := badgerds.NewDatastore("", &opts) + require.NoError(t, err) + + txn, err := NewTxnFrom(ctx, rootstore, 0, false) + require.NoError(t, err) + + var wg sync.WaitGroup + txn.OnErrorAsync(func() { + wg.Done() + }) + + rootstore.Close() + require.NoError(t, err) + + wg.Add(1) + err = txn.Commit(ctx) + require.ErrorIs(t, err, badgerds.ErrClosed) + wg.Wait() +} + +func TestOnDiscard(t *testing.T) { + ctx := context.Background() + opts := badgerds.Options{Options: badger.DefaultOptions("").WithInMemory(true)} + rootstore, err := badgerds.NewDatastore("", &opts) + require.NoError(t, err) + + txn, err := NewTxnFrom(ctx, rootstore, 0, false) + require.NoError(t, err) + + text := "Source" + txn.OnDiscard(func() { + text += " Inc" + }) + txn.Discard(ctx) + require.NoError(t, err) + + require.Equal(t, text, "Source Inc") +} + +func TestOnDiscardAsync(t *testing.T) { + ctx := context.Background() + opts := badgerds.Options{Options: badger.DefaultOptions("").WithInMemory(true)} + rootstore, err := badgerds.NewDatastore("", &opts) + require.NoError(t, err) + + txn, err := NewTxnFrom(ctx, rootstore, 0, false) + require.NoError(t, err) + + var wg sync.WaitGroup + txn.OnDiscardAsync(func() { + wg.Done() + }) + + wg.Add(1) + txn.Discard(ctx) + require.NoError(t, err) + wg.Wait() +} + func TestShimTxnStoreSync(t *testing.T) { ctx := context.Background() opts := badgerds.Options{Options: badger.DefaultOptions("").WithInMemory(true)} diff --git a/http/client_tx.go b/http/client_tx.go index f1f2830006..19e5814b51 100644 --- a/http/client_tx.go +++ b/http/client_tx.go @@ -71,6 +71,18 @@ func (c *Transaction) OnDiscard(fn func()) { panic("client side transaction") } +func (c *Transaction) OnSuccessAsync(fn func()) { + panic("client side transaction") +} + +func (c *Transaction) OnErrorAsync(fn func()) { + panic("client side transaction") +} + +func (c *Transaction) OnDiscardAsync(fn func()) { + panic("client side transaction") +} + func (c *Transaction) Rootstore() datastore.DSReaderWriter { panic("client side transaction") } diff --git a/tests/bench/query/planner/utils.go b/tests/bench/query/planner/utils.go index 967f141357..b91b0aa2a3 100644 --- a/tests/bench/query/planner/utils.go +++ b/tests/bench/query/planner/utils.go @@ -143,4 +143,7 @@ func (*dummyTxn) Discard(ctx context.Context) {} func (*dummyTxn) OnSuccess(fn func()) {} func (*dummyTxn) OnError(fn func()) {} func (*dummyTxn) OnDiscard(fn func()) {} +func (*dummyTxn) OnSuccessAsync(fn func()) {} +func (*dummyTxn) OnErrorAsync(fn func()) {} +func (*dummyTxn) OnDiscardAsync(fn func()) {} func (*dummyTxn) ID() uint64 { return 0 } diff --git a/tests/clients/cli/wrapper_tx.go b/tests/clients/cli/wrapper_tx.go index 33bfe43bee..5b5b2c3ea7 100644 --- a/tests/clients/cli/wrapper_tx.go +++ b/tests/clients/cli/wrapper_tx.go @@ -55,6 +55,18 @@ func (w *Transaction) OnDiscard(fn func()) { w.tx.OnDiscard(fn) } +func (w *Transaction) OnSuccessAsync(fn func()) { + w.tx.OnSuccessAsync(fn) +} + +func (w *Transaction) OnErrorAsync(fn func()) { + w.tx.OnErrorAsync(fn) +} + +func (w *Transaction) OnDiscardAsync(fn func()) { + w.tx.OnDiscardAsync(fn) +} + func (w *Transaction) Rootstore() datastore.DSReaderWriter { return w.tx.Rootstore() } diff --git a/tests/clients/http/wrapper_tx.go b/tests/clients/http/wrapper_tx.go index fe63a9ded5..d53d967b3b 100644 --- a/tests/clients/http/wrapper_tx.go +++ b/tests/clients/http/wrapper_tx.go @@ -49,6 +49,18 @@ func (w *TxWrapper) OnDiscard(fn func()) { w.server.OnDiscard(fn) } +func (w *TxWrapper) OnSuccessAsync(fn func()) { + w.server.OnSuccessAsync(fn) +} + +func (w *TxWrapper) OnErrorAsync(fn func()) { + w.server.OnErrorAsync(fn) +} + +func (w *TxWrapper) OnDiscardAsync(fn func()) { + w.server.OnDiscardAsync(fn) +} + func (w *TxWrapper) Rootstore() datastore.DSReaderWriter { return w.server.Rootstore() }