diff --git a/contracts/swap/swap.go b/contracts/swap/swap.go
index fec90fd99a..e8d2bf7926 100644
--- a/contracts/swap/swap.go
+++ b/contracts/swap/swap.go
@@ -22,7 +22,9 @@ package swap
import (
+ "strings"
+ "github.com/ethereum/go-ethereum/accounts/abi"
@@ -37,8 +39,8 @@ type Contract interface {
Withdraw(auth *bind.TransactOpts, amount *big.Int) (*types.Receipt, error)
// Deposit sends a raw transaction to the chequebook, triggering the fallback—depositing amount
Deposit(auth *bind.TransactOpts, amout *big.Int) (*types.Receipt, error)
- // CashChequeBeneficiaryStart sends the transaction to cash a cheque as the beneficiary
- CashChequeBeneficiaryStart(opts *bind.TransactOpts, beneficiary common.Address, cumulativePayout *uint256.Uint256, ownerSig []byte) (*types.Transaction, error)
+ // CashChequeBeneficiaryRequest generates a TxRequest for a CashChequeBeneficiary transaction
+ CashChequeBeneficiaryRequest(beneficiary common.Address, cumulativePayout *uint256.Uint256, ownerSig []byte) (*chain.TxRequest, error)
// CashChequeBeneficiaryResult processes the receipt from a CashChequeBeneficiary transaction
CashChequeBeneficiaryResult(receipt *types.Receipt) *CashChequeResult
// LiquidBalance returns the LiquidBalance (total balance in ERC20-token - total hard deposits in ERC20-token) of the chequebook
@@ -75,19 +77,30 @@ type Params struct {
type simpleContract struct {
instance *contract.ERC20SimpleSwap
+ abi abi.ABI
address common.Address
backend chain.Backend
// InstanceAt creates a new instance of a contract at a specific address.
-// It assumes that there is an existing contract instance at the given address, or an error is returned
+// It assumes that there is an existing contract instance at the given address
// This function is needed to communicate with remote Swap contracts (e.g. sending a cheque)
func InstanceAt(address common.Address, backend chain.Backend) (Contract, error) {
instance, err := contract.NewERC20SimpleSwap(address, backend)
if err != nil {
return nil, err
- c := simpleContract{instance: instance, address: address, backend: backend}
+ contractABI, err := abi.JSON(strings.NewReader(contract.ERC20SimpleSwapABI))
+ if err != nil {
+ return nil, err
+ }
+ c := simpleContract{
+ abi: contractABI,
+ instance: instance,
+ address: address,
+ backend: backend,
+ }
return c, err
@@ -130,15 +143,19 @@ func (s simpleContract) Deposit(auth *bind.TransactOpts, amount *big.Int) (*type
return chain.WaitMined(auth.Context, s.backend, tx.Hash())
-// CashChequeBeneficiaryStart sends the transaction to cash a cheque as the beneficiary
-func (s simpleContract) CashChequeBeneficiaryStart(opts *bind.TransactOpts, beneficiary common.Address, cumulativePayout *uint256.Uint256, ownerSig []byte) (*types.Transaction, error) {
+// CashChequeBeneficiaryRequest generates a TxRequest for a CashChequeBeneficiary transaction
+func (s simpleContract) CashChequeBeneficiaryRequest(beneficiary common.Address, cumulativePayout *uint256.Uint256, ownerSig []byte) (*chain.TxRequest, error) {
payout := cumulativePayout.Value()
- // send a copy of cumulativePayout to instance as it modifies the supplied big int internally
- tx, err := s.instance.CashChequeBeneficiary(opts, beneficiary, big.NewInt(0).Set(&payout), ownerSig)
+ callData, err := s.abi.Pack("cashChequeBeneficiary", beneficiary, big.NewInt(0).Set(&payout), ownerSig)
if err != nil {
return nil, err
- return tx, nil
+ return &chain.TxRequest{
+ To: s.address,
+ Value: big.NewInt(0),
+ Data: callData,
+ }, nil
// CashChequeBeneficiaryResult processes the receipt from a CashChequeBeneficiary transaction
diff --git a/swap/cashout.go b/swap/cashout.go
index 83a74f9fde..5f81d598ce 100644
--- a/swap/cashout.go
+++ b/swap/cashout.go
@@ -22,6 +22,7 @@ import (
+ "github.com/ethereum/go-ethereum/core/types"
contract "github.com/ethersphere/swarm/contracts/swap"
@@ -31,58 +32,117 @@ import (
// CashChequeBeneficiaryTransactionCost is the expected gas cost of a CashChequeBeneficiary transaction
const CashChequeBeneficiaryTransactionCost = 50000
-// CashoutProcessor holds all relevant fields needed for processing cashouts
-type CashoutProcessor struct {
- backend chain.Backend // ethereum backend to use
- privateKey *ecdsa.PrivateKey // private key to use
- Logger Logger
+// CashoutRequestHandlerID is the handlerID used by the CashoutProcessor for CashoutRequests
+const CashoutRequestHandlerID = "CashoutProcessor_CashoutRequest"
// CashoutRequest represents a request for a cashout operation
type CashoutRequest struct {
Cheque Cheque // cheque to be cashed
Destination common.Address // destination for the payout
- Logger Logger
-// ActiveCashout stores the necessary information for a cashout in progess
-type ActiveCashout struct {
- Request CashoutRequest // the request that caused this cashout
- TransactionHash common.Hash // the hash of the current transaction for this request
- Logger Logger
+// CashoutProcessor holds all relevant fields needed for processing cashouts
+type CashoutProcessor struct {
+ backend chain.Backend // ethereum backend to use
+ txScheduler chain.TxScheduler // transaction queue to use
+ cashoutResultHandler CashoutResultHandler
+ logger Logger
+// CashoutResultHandler is an interface which accepts CashChequeResults from a CashoutProcessor
+type CashoutResultHandler interface {
+ // Called by the CashoutProcessor when a CashoutRequest was successfully executed
+ // It will be called again if an error is returned
+ HandleCashoutResult(request *CashoutRequest, result *contract.CashChequeResult, receipt *types.Receipt) error
// newCashoutProcessor creates a new instance of CashoutProcessor
-func newCashoutProcessor(backend chain.Backend, privateKey *ecdsa.PrivateKey) *CashoutProcessor {
- return &CashoutProcessor{
- backend: backend,
- privateKey: privateKey,
+func newCashoutProcessor(txScheduler chain.TxScheduler, backend chain.Backend, privateKey *ecdsa.PrivateKey, cashoutResultHandler CashoutResultHandler, logger Logger) *CashoutProcessor {
+ c := &CashoutProcessor{
+ backend: backend,
+ txScheduler: txScheduler,
+ cashoutResultHandler: cashoutResultHandler,
+ logger: logger,
-// cashCheque tries to cash the cheque specified in the request
-// after the transaction is sent it waits on its success
-func (c *CashoutProcessor) cashCheque(ctx context.Context, request *CashoutRequest) error {
- cheque := request.Cheque
- opts := bind.NewKeyedTransactor(c.privateKey)
- opts.Context = ctx
+ txScheduler.SetHandlers(CashoutRequestHandlerID, &chain.TxRequestHandlers{
+ NotifyReceipt: func(ctx context.Context, id uint64, notification *chain.TxReceiptNotification) error {
+ var request *CashoutRequest
+ err := c.txScheduler.GetExtraData(id, &request)
+ if err != nil {
+ return err
+ }
+ otherSwap, err := contract.InstanceAt(request.Cheque.Contract, c.backend)
+ if err != nil {
+ return err
+ }
+ receipt := ¬ification.Receipt
+ if receipt.Status == 0 {
+ c.logger.Error(CashChequeAction, "cheque cashing transaction reverted", "tx", receipt.TxHash)
+ return nil
+ }
+ result := otherSwap.CashChequeBeneficiaryResult(receipt)
+ return c.cashoutResultHandler.HandleCashoutResult(request, result, receipt)
+ },
+ NotifyPending: func(ctx context.Context, id uint64, notification *chain.TxPendingNotification) error {
+ c.logger.Debug(CashChequeAction, "cheque cashing transaction sent", "hash", notification.Transaction.Hash())
+ return nil
+ },
+ NotifyCancelled: func(ctx context.Context, id uint64, notification *chain.TxCancelledNotification) error {
+ c.logger.Warn(CashChequeAction, "cheque cashing transaction cancelled", "reason", notification.Reason)
+ return nil
+ },
+ NotifyStatusUnknown: func(ctx context.Context, id uint64, notification *chain.TxStatusUnknownNotification) error {
+ c.logger.Error(CashChequeAction, "cheque cashing transaction status unknown", "reason", notification.Reason)
+ return nil
+ },
+ })
+ return c
- otherSwap, err := contract.InstanceAt(cheque.Contract, c.backend)
+// submitCheque submits a cheque for cashout
+// the cheque might not be cashed if it is not deemed profitable
+func (c *CashoutProcessor) submitCheque(ctx context.Context, request *CashoutRequest) {
+ expectedPayout, transactionCosts, err := c.estimatePayout(ctx, &request.Cheque)
if err != nil {
- return err
+ c.logger.Error(CashChequeAction, "could not estimate payout", "error", err)
+ return
- tx, err := otherSwap.CashChequeBeneficiaryStart(opts, request.Destination, cheque.CumulativePayout, cheque.Signature)
+ costsMultiplier := uint256.FromUint64(2)
+ costThreshold, err := uint256.New().Mul(transactionCosts, costsMultiplier)
if err != nil {
- return err
+ c.logger.Error(CashChequeAction, "overflow in transaction fee", "error", err)
+ return
- // this blocks until the cashout has been successfully processed
- return c.waitForAndProcessActiveCashout(&ActiveCashout{
- Request: *request,
- TransactionHash: tx.Hash(),
- Logger: request.Logger,
- })
+ // do a payout transaction if we get more than 2 times the gas costs
+ if expectedPayout.Cmp(costThreshold) == 1 {
+ c.logger.Info(CashChequeAction, "queueing cashout", "cheque", &request.Cheque)
+ cheque := request.Cheque
+ otherSwap, err := contract.InstanceAt(cheque.Contract, c.backend)
+ if err != nil {
+ c.logger.Error(CashChequeAction, "could not get swap instance", "error", err)
+ return
+ }
+ txRequest, err := otherSwap.CashChequeBeneficiaryRequest(cheque.Beneficiary, cheque.CumulativePayout, cheque.Signature)
+ if err != nil {
+ metrics.GetOrRegisterCounter("swap/cheques/cashed/errors", nil).Inc(1)
+ c.logger.Error(CashChequeAction, "cashing cheque:", "error", err)
+ return
+ }
+ _, err = c.txScheduler.ScheduleRequest(CashoutRequestHandlerID, *txRequest, request)
+ if err != nil {
+ metrics.GetOrRegisterCounter("swap/cheques/cashed/errors", nil).Inc(1)
+ c.logger.Error(CashChequeAction, "cashing cheque:", "error", err)
+ }
+ }
// estimatePayout estimates the payout for a given cheque as well as the transaction cost
@@ -128,31 +188,3 @@ func (c *CashoutProcessor) estimatePayout(ctx context.Context, cheque *Cheque) (
return expectedPayout, transactionCosts, nil
-// waitForAndProcessActiveCashout waits for activeCashout to complete
-func (c *CashoutProcessor) waitForAndProcessActiveCashout(activeCashout *ActiveCashout) error {
- ctx, cancel := context.WithTimeout(context.Background(), DefaultTransactionTimeout)
- defer cancel()
- receipt, err := chain.WaitMined(ctx, c.backend, activeCashout.TransactionHash)
- if err != nil {
- return err
- }
- otherSwap, err := contract.InstanceAt(activeCashout.Request.Cheque.Contract, c.backend)
- if err != nil {
- return err
- }
- result := otherSwap.CashChequeBeneficiaryResult(receipt)
- metrics.GetOrRegisterCounter("swap/cheques/cashed/honey", nil).Inc(result.TotalPayout.Int64())
- if result.Bounced {
- metrics.GetOrRegisterCounter("swap/cheques/cashed/bounced", nil).Inc(1)
- activeCashout.Logger.Warn(CashChequeAction, "cheque bounced", "tx", receipt.TxHash)
- }
- activeCashout.Logger.Info(CashChequeAction, "cheque cashed", "honey", activeCashout.Request.Cheque.Honey)
- return nil
diff --git a/swap/cashout_test.go b/swap/cashout_test.go
index e6370472c9..e2d4a7a11b 100644
--- a/swap/cashout_test.go
+++ b/swap/cashout_test.go
@@ -19,10 +19,11 @@ package swap
import (
+ "time"
- "github.com/ethereum/go-ethereum/log"
+ "github.com/ethersphere/swarm/state"
@@ -34,11 +35,13 @@ import (
// afterwards it attempts to cash-in a bouncing cheque
func TestContractIntegration(t *testing.T) {
backend := newTestBackend(t)
- reset := setupContractTest()
- defer reset()
+ defer backend.Close()
+ ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
+ defer cancel()
payout := uint256.FromUint64(42)
- chequebook, err := testDeployWithPrivateKey(context.Background(), backend, ownerKey, ownerAddress, payout)
+ chequebook, err := testDeployWithPrivateKey(ctx, backend, ownerKey, ownerAddress, payout)
if err != nil {
@@ -50,20 +53,43 @@ func TestContractIntegration(t *testing.T) {
opts := bind.NewKeyedTransactor(beneficiaryKey)
- tx, err := chequebook.CashChequeBeneficiaryStart(opts, beneficiaryAddress, payout, cheque.Signature)
+ txRequest, err := chequebook.CashChequeBeneficiaryRequest(beneficiaryAddress, payout, cheque.Signature)
if err != nil {
- receipt, err := chain.WaitMined(nil, backend, tx.Hash())
+ txRequest.GasLimit, err = txRequest.EstimateGas(ctx, backend, opts.From)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if err != nil {
+ t.Fatal(err)
+ }
+ nonce, err := backend.PendingNonceAt(ctx, opts.From)
+ if err != nil {
+ t.Fatal(err)
+ }
+ tx, err := txRequest.ToSignedTx(nonce, opts)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = backend.SendTransaction(ctx, tx)
+ if err != nil {
+ t.Fatal(err)
+ }
+ receipt, err := chain.WaitMined(ctx, backend, tx.Hash())
if err != nil {
- cashResult := chequebook.CashChequeBeneficiaryResult(receipt)
if receipt.Status != 1 {
t.Fatalf("Bad status %d", receipt.Status)
+ cashResult := chequebook.CashChequeBeneficiaryResult(receipt)
if cashResult.Bounced {
t.Fatal("cashing bounced")
@@ -81,7 +107,6 @@ func TestContractIntegration(t *testing.T) {
if !cheque.CumulativePayout.Equals(paidOut) {
t.Fatalf("Wrong cumulative payout %v", paidOut)
- log.Debug("cheques result", "result", result)
// create a cheque that will bounce
_, err = payout.Add(payout, uint256.FromUint64(10000*RetrieveRequestPrice))
@@ -94,7 +119,27 @@ func TestContractIntegration(t *testing.T) {
- tx, err = chequebook.CashChequeBeneficiaryStart(opts, beneficiaryAddress, bouncingCheque.CumulativePayout, bouncingCheque.Signature)
+ txRequest, err = chequebook.CashChequeBeneficiaryRequest(beneficiaryAddress, bouncingCheque.CumulativePayout, bouncingCheque.Signature)
+ if err != nil {
+ t.Fatal(err)
+ }
+ txRequest.GasLimit, err = txRequest.EstimateGas(ctx, backend, opts.From)
+ if err != nil {
+ t.Fatal(err)
+ }
+ nonce, err = backend.PendingNonceAt(ctx, opts.From)
+ if err != nil {
+ t.Fatal(err)
+ }
+ tx, err = txRequest.ToSignedTx(nonce, opts)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = backend.SendTransaction(ctx, tx)
if err != nil {
@@ -111,17 +156,26 @@ func TestContractIntegration(t *testing.T) {
if !cashResult.Bounced {
t.Fatal("cheque did not bounce")
-// TestCashCheque creates a valid cheque and feeds it to cashoutProcessor.cashCheque
+// TestCashCheque creates a valid cheque and feeds it to cashoutProcessor.submitCheque
func TestCashCheque(t *testing.T) {
backend := newTestBackend(t)
- reset := setupContractTest()
- defer reset()
+ defer backend.Close()
- cashoutProcessor := newCashoutProcessor(backend, ownerKey)
- payout := uint256.FromUint64(42)
+ store := state.NewInmemoryStore()
+ defer store.Close()
+ transactionQueue := chain.NewTxQueue(store, "queue", &chain.DefaultTxSchedulerBackend{
+ Backend: backend,
+ }, ownerKey)
+ transactionQueue.Start()
+ defer transactionQueue.Stop()
+ cashoutHandler := newTestCashoutResultHandler(nil)
+ swapLog := newSwapLogger(emptyLogPath, DefaultSwapLogLevel, &network.BzzAddr{OAddr: ownerAddress.Bytes(), UAddr: ownerAddress.Bytes()})
+ cashoutProcessor := newCashoutProcessor(transactionQueue, backend, ownerKey, cashoutHandler, swapLog)
+ payout := uint256.FromUint64(CashChequeBeneficiaryTransactionCost*2 + 1)
chequebook, err := testDeployWithPrivateKey(context.Background(), backend, ownerKey, ownerAddress, payout)
if err != nil {
@@ -132,15 +186,16 @@ func TestCashCheque(t *testing.T) {
if err != nil {
- swapLog := newSwapLogger(emptyLogPath, DefaultSwapLogLevel, &network.BzzAddr{OAddr: ownerAddress.Bytes(), UAddr: ownerAddress.Bytes()})
- err = cashoutProcessor.cashCheque(context.Background(), &CashoutRequest{
+ cashoutProcessor.submitCheque(context.Background(), &CashoutRequest{
Cheque: *testCheque,
Destination: ownerAddress,
- Logger: swapLog,
- if err != nil {
- t.Fatal(err)
+ select {
+ case <-cashoutHandler.cashChequeDone:
+ case <-time.After(5 * time.Second):
+ t.Fatal("cheque was not cashed within timeout")
paidOut, err := chequebook.PaidOut(nil, ownerAddress)
@@ -157,12 +212,21 @@ func TestCashCheque(t *testing.T) {
// TestEstimatePayout creates a valid cheque and feeds it to cashoutProcessor.estimatePayout
func TestEstimatePayout(t *testing.T) {
backend := newTestBackend(t)
- reset := setupContractTest()
- defer reset()
+ defer backend.Close()
- cashoutProcessor := newCashoutProcessor(backend, ownerKey)
- payout := uint256.FromUint64(42)
+ store := state.NewInmemoryStore()
+ defer store.Close()
+ transactionQueue := chain.NewTxQueue(store, "queue", &chain.DefaultTxSchedulerBackend{
+ Backend: backend,
+ }, ownerKey)
+ transactionQueue.Start()
+ defer transactionQueue.Stop()
+ swapLog := newSwapLogger(emptyLogPath, DefaultSwapLogLevel, &network.BzzAddr{OAddr: ownerAddress.Bytes(), UAddr: ownerAddress.Bytes()})
+ cashoutProcessor := newCashoutProcessor(transactionQueue, backend, ownerKey, &testCashoutResultHandler{}, swapLog)
+ payout := uint256.FromUint64(42)
chequebook, err := testDeployWithPrivateKey(context.Background(), backend, ownerKey, ownerAddress, payout)
if err != nil {
diff --git a/swap/chain/backend.go b/swap/chain/backend.go
index 54ad6b55b1..7091f4c204 100644
--- a/swap/chain/backend.go
+++ b/swap/chain/backend.go
@@ -1,8 +1,23 @@
+// Copyright 2020 The Swarm Authors
+// This file is part of the Swarm library.
+// The Swarm library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+// The Swarm library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// GNU Lesser General Public License for more details.
+// You should have received a copy of the GNU Lesser General Public License
+// along with the Swarm library. If not, see .
package chain
import (
- "errors"
@@ -12,11 +27,6 @@ import (
-var (
- // ErrTransactionReverted is given when the transaction that cashes a cheque is reverted
- ErrTransactionReverted = errors.New("Transaction reverted")
// Backend is the minimum amount of functionality required by the underlying ethereum backend
type Backend interface {
@@ -30,12 +40,10 @@ func WaitMined(ctx context.Context, b Backend, hash common.Hash) (*types.Receipt
for {
receipt, err := b.TransactionReceipt(ctx, hash)
if err != nil {
- log.Error("receipt retrieval failed", "err", err)
+ // some clients treat an unconfirmed transaction as an error, other simply return null
+ log.Trace("receipt retrieval failed", "err", err)
if receipt != nil {
- if receipt.Status != types.ReceiptStatusSuccessful {
- return nil, ErrTransactionReverted
- }
return receipt, nil
diff --git a/swap/chain/common_test.go b/swap/chain/common_test.go
new file mode 100644
index 0000000000..06d954b9da
--- /dev/null
+++ b/swap/chain/common_test.go
@@ -0,0 +1,23 @@
+// Copyright 2020 The Swarm Authors
+// This file is part of the Swarm library.
+// The Swarm library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+// The Swarm library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// GNU Lesser General Public License for more details.
+// You should have received a copy of the GNU Lesser General Public License
+// along with the Swarm library. If not, see .
+package chain
+import "github.com/ethersphere/swarm/testutil"
+func init() {
+ testutil.Init()
diff --git a/swap/chain/mock/testbackend.go b/swap/chain/mock/testbackend.go
index 40b64c4b46..0f8c88e091 100644
--- a/swap/chain/mock/testbackend.go
+++ b/swap/chain/mock/testbackend.go
@@ -1,3 +1,19 @@
+// Copyright 2020 The Swarm Authors
+// This file is part of the Swarm library.
+// The Swarm library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+// The Swarm library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// GNU Lesser General Public License for more details.
+// You should have received a copy of the GNU Lesser General Public License
+// along with the Swarm library. If not, see .
package mock
import (
diff --git a/swap/chain/persistentqueue.go b/swap/chain/persistentqueue.go
new file mode 100644
index 0000000000..bac966cb9b
--- /dev/null
+++ b/swap/chain/persistentqueue.go
@@ -0,0 +1,137 @@
+// Copyright 2020 The Swarm Authors
+// This file is part of the Swarm library.
+// The Swarm library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+// The Swarm library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// GNU Lesser General Public License for more details.
+// You should have received a copy of the GNU Lesser General Public License
+// along with the Swarm library. If not, see .
+package chain
+import (
+ "context"
+ "encoding"
+ "encoding/json"
+ "fmt"
+ "strings"
+ "sync"
+ "time"
+ "github.com/ethersphere/swarm/state"
+ persistentQueue represents a queue stored in a state store
+ Items are enqueued by writing them to the state store with the timestamp as prefix and a nonce so that two items can be queued at the same time
+ It provides a (blocking) Next function to wait for a new item to be available. Only a single call to Next may be active at any time
+ To allow atomic operations with other state store operations all functions only write to batches instead of writing to the store directly
+ The user must ensure that all functions (except Next) are called with the same lock held which is provided externally so multiple queues can use the same
+ The queue provides no dequeue function. Instead an item must be deleted by its key
+// persistentQueue represents a queue stored in a state store
+type persistentQueue struct {
+ store state.Store // the store backing this queue
+ prefix string // the prefix for the keys for this queue
+ trigger chan struct{} // channel to notify the queue that a new item is available
+ nonce uint64 // increasing nonce. starts with 0 on every startup
+// NewPersistentQueue creates a structure to interact with a queue with the given prefix
+func newPersistentQueue(store state.Store, prefix string) *persistentQueue {
+ return &persistentQueue{
+ store: store,
+ prefix: prefix,
+ trigger: make(chan struct{}, 1),
+ nonce: 0,
+ }
+// enqueue puts the necessary database operations for enqueueing a new item into the supplied batch
+// It returns the generated key and a trigger function which must be called once the batch was successfully written
+// This only returns an error if the encoding fails which is an unrecoverable error
+// A lock must be held and kept until after the trigger function was called or the batch write failed
+func (pq *persistentQueue) enqueue(b *state.StoreBatch, v interface{}) (key string, trigger func(), err error) {
+ // the nonce guarantees keys don't collide if multiple transactions are queued in the same second
+ pq.nonce++
+ key = fmt.Sprintf("%d_%08d", time.Now().Unix(), pq.nonce)
+ if err = b.Put(pq.prefix+key, v); err != nil {
+ return "", nil, err
+ }
+ return key, func() {
+ select {
+ case pq.trigger <- struct{}{}:
+ default:
+ }
+ }, nil
+// peek looks at the next item in the queue
+// The error returned is either a decode or an io error
+// A lock must be held when this is called and should be held afterwards to prevent the item from being removed while processing
+func (pq *persistentQueue) peek(i interface{}) (key string, exists bool, err error) {
+ err = pq.store.Iterate(pq.prefix, func(k, data []byte) (bool, error) {
+ key = string(k)
+ unmarshaler, ok := i.(encoding.BinaryUnmarshaler)
+ if !ok {
+ return true, json.Unmarshal(data, i)
+ }
+ return true, unmarshaler.UnmarshalBinary(data)
+ })
+ if err != nil {
+ return "", false, err
+ }
+ if key == "" {
+ return "", false, nil
+ }
+ return strings.TrimPrefix(key, pq.prefix), true, nil
+// Next looks at the next item in the queue and blocks until an item is available if there is none
+// The error returned is either an decode error, an io error or a cancelled context
+// No lock should not be held when this is called. Only a single call to next may be active at any time
+// If the the key is not "", the value exists, the supplied lock was acquired and must be released by the caller after processing the item
+// The supplied lock should be the same that is used for the other functions
+func (pq *persistentQueue) next(ctx context.Context, i interface{}, lock *sync.Mutex) (key string, err error) {
+ lock.Lock()
+ key, exists, err := pq.peek(i)
+ if exists {
+ return key, nil
+ }
+ lock.Unlock()
+ if err != nil {
+ return "", err
+ }
+ for {
+ select {
+ case <-pq.trigger:
+ lock.Lock()
+ key, exists, err = pq.peek(i)
+ if exists {
+ return key, nil
+ }
+ lock.Unlock()
+ if err != nil {
+ return "", err
+ }
+ case <-ctx.Done():
+ return "", ctx.Err()
+ }
+ }
+// Delete adds the batch operation to delete the queue element with the given key
+// A lock must be held when the batch is written
+func (pq *persistentQueue) delete(b *state.StoreBatch, key string) {
+ b.Delete(pq.prefix + key)
diff --git a/swap/chain/persistentqueue_test.go b/swap/chain/persistentqueue_test.go
new file mode 100644
index 0000000000..426b4dfa5c
--- /dev/null
+++ b/swap/chain/persistentqueue_test.go
@@ -0,0 +1,123 @@
+// Copyright 2020 The Swarm Authors
+// This file is part of the Swarm library.
+// The Swarm library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+// The Swarm library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// GNU Lesser General Public License for more details.
+// You should have received a copy of the GNU Lesser General Public License
+// along with the Swarm library. If not, see .
+package chain
+import (
+ "context"
+ "errors"
+ "fmt"
+ "sync"
+ "testing"
+ "time"
+ "github.com/ethersphere/swarm/state"
+// TestNewPersistentQueue adds 200 elements in one routine and waits for them and then deletes them in another
+func TestNewPersistentQueue(t *testing.T) {
+ store := state.NewInmemoryStore()
+ defer store.Close()
+ queue := newPersistentQueue(store, "testq")
+ ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
+ defer cancel()
+ var lock sync.Mutex // lock for the queue
+ var wg sync.WaitGroup // wait group to wait for both routines to terminate
+ wg.Add(2)
+ count := 200
+ var errlock sync.Mutex
+ var errout error // stores the last error that occurred in one of the routines
+ go func() {
+ defer wg.Done()
+ for i := 0; i < count; i++ {
+ func() { // this is a function so we can use defer with the right scope
+ var value uint64
+ key, err := queue.next(ctx, &value, &lock)
+ if err != nil {
+ errlock.Lock()
+ errout = fmt.Errorf("failed to get next item: %v", err)
+ errlock.Unlock()
+ return
+ }
+ defer lock.Unlock()
+ if key == "" {
+ errlock.Lock()
+ errout = errors.New("key is empty")
+ errlock.Unlock()
+ return
+ }
+ if value != uint64(i) {
+ errlock.Lock()
+ errout = fmt.Errorf("values don't match: got %v, expected %v", value, i)
+ errlock.Unlock()
+ return
+ }
+ batch := new(state.StoreBatch)
+ queue.delete(batch, key)
+ err = store.WriteBatch(batch)
+ if err != nil {
+ errlock.Lock()
+ errout = fmt.Errorf("could not write batch: %v", err)
+ errlock.Unlock()
+ return
+ }
+ }()
+ }
+ }()
+ go func() {
+ defer wg.Done()
+ for i := 0; i < count; i++ {
+ func() { // this is a function so we can use defer with the right scope
+ lock.Lock()
+ defer lock.Unlock()
+ var value = uint64(i)
+ batch := new(state.StoreBatch)
+ _, trigger, err := queue.enqueue(batch, value)
+ if err != nil {
+ errlock.Lock()
+ errout = fmt.Errorf("failed to queue item: %v", err)
+ errlock.Unlock()
+ return
+ }
+ err = store.WriteBatch(batch)
+ if err != nil {
+ errlock.Lock()
+ errout = fmt.Errorf("failed to write batch: %v", err)
+ errlock.Unlock()
+ return
+ }
+ trigger()
+ }()
+ }
+ }()
+ wg.Wait()
+ if errout != nil {
+ t.Fatal(errout)
+ }
diff --git a/swap/chain/txqueue.go b/swap/chain/txqueue.go
new file mode 100644
index 0000000000..028d0a8603
--- /dev/null
+++ b/swap/chain/txqueue.go
@@ -0,0 +1,662 @@
+// Copyright 2020 The Swarm Authors
+// This file is part of the Swarm library.
+// The Swarm library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+// The Swarm library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// GNU Lesser General Public License for more details.
+// You should have received a copy of the GNU Lesser General Public License
+// along with the Swarm library. If not, see .
+package chain
+import (
+ "context"
+ "crypto/ecdsa"
+ "errors"
+ "fmt"
+ "sync"
+ "time"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/log"
+ "github.com/ethereum/go-ethereum/accounts/abi/bind"
+ "github.com/ethersphere/swarm/state"
+// TxQueue is a TxScheduler which sends transactions in sequence
+// A new transaction is only sent after the previous one confirmed
+// This is done to minimize the chance of wrong nonce use
+type TxQueue struct {
+ lock sync.Mutex // lock for the entire queue
+ ctx context.Context // context used for all network requests and waiting operations to ensure the queue can be stopped at any point
+ cancel context.CancelFunc // function to cancel the above context
+ wg sync.WaitGroup // used to ensure that all background go routines have finished before Stop returns
+ startedChan chan struct{} // channel to be closed when the queue has started processing
+ started bool // bool indicating that the queue has been started. used to ensure it does not run multiple times simultaneously
+ errorChan chan error // channel to stop the queue in case of errors
+ store state.Store // state store to use as the db backend
+ prefix string // all keys in the state store are prefixed with this
+ requestQueue *persistentQueue // queue for all future requests
+ handlers map[string]*TxRequestHandlers // map from handlerIDs to their registered handlers
+ notificationQueues map[string]*persistentQueue // map from handlerIDs to the notification queue of that handler
+ backend TxSchedulerBackend // ethereum backend to use
+ privateKey *ecdsa.PrivateKey // private key used to sign transactions
+// txRequestData is the metadata the queue saves for every request
+// the extra data is stored at a different key
+type txRequestData struct {
+ ID uint64 // id of the request
+ Request TxRequest // the request itself
+ HandlerID string // the type id of this request
+ State TxRequestState // the state this request is in
+ Transaction *types.Transaction // the generated transaction for this request or nil if not yet signed
+// notificationQueueItem is the metadata the queue saves for every pending notification
+// the actual notification content is stored at a different key
+type notificationQueueItem struct {
+ NotificationType string // the type of the notification
+ RequestID uint64 // the request this notification is for
+const (
+ txReceiptNotificationType = "TxReceiptNotification"
+ txPendingNotificationType = "TxPendingNotification"
+ txCancelledNotificationType = "TxCancelledNotification"
+ txStatusUnknownNotificationType = "TxStatusUnknownNotification"
+// NewTxQueue creates a new TxQueue
+func NewTxQueue(store state.Store, prefix string, backend TxSchedulerBackend, privateKey *ecdsa.PrivateKey) *TxQueue {
+ txq := &TxQueue{
+ store: store,
+ prefix: prefix,
+ handlers: make(map[string]*TxRequestHandlers),
+ notificationQueues: make(map[string]*persistentQueue),
+ backend: backend,
+ privateKey: privateKey,
+ requestQueue: newPersistentQueue(store, prefix+"_requestQueue_"),
+ errorChan: make(chan error, 1),
+ startedChan: make(chan struct{}),
+ }
+ // we create the context here already because handlers can be set before the queue starts
+ txq.ctx, txq.cancel = context.WithCancel(context.Background())
+ return txq
+// requestKey returns the database key for the txRequestData for the given id
+func (txq *TxQueue) requestKey(id uint64) string {
+ return fmt.Sprintf("%s_requests_%d", txq.prefix, id)
+// extraDataKey returns the database key for the extra data stored alongside the request
+func (txq *TxQueue) extraDataKey(id uint64) string {
+ return fmt.Sprintf("%s_data", txq.requestKey(id))
+// activeRequestKey returns the database key used for the currently active request
+func (txq *TxQueue) activeRequestKey() string {
+ return fmt.Sprintf("%s_active", txq.prefix)
+// notificationKey returns the database key for a notification
+func (txq *TxQueue) notificationKey(key string) string {
+ return fmt.Sprintf("%s_notification_%s", txq.prefix, key)
+// idKey returns the database key for the last used id value
+func (txq *TxQueue) idKey() string {
+ return fmt.Sprintf("%s_request_id", txq.prefix)
+// stopWithError sends the error to the error channel
+// this is used to stop the queue from notification handlers
+func (txq *TxQueue) stopWithError(err error) {
+ select {
+ case txq.errorChan <- err:
+ default:
+ log.Error("failed to write error to txqueue error channel", "error", err)
+ }
+// ScheduleRequest adds a new request to be processed
+// The request is assigned an id which is returned
+func (txq *TxQueue) ScheduleRequest(handlerID string, request TxRequest, extraData interface{}) (id uint64, err error) {
+ txq.lock.Lock()
+ defer txq.lock.Unlock()
+ // get the last id
+ err = txq.store.Get(txq.idKey(), &id)
+ if err != nil && err != state.ErrNotFound {
+ return 0, err
+ }
+ // increment existing id, starting with an initial value of 1
+ id++
+ // in a single batch this
+ // * stores the request data
+ // * stores the request extraData
+ // * adds it to the queue
+ batch := new(state.StoreBatch)
+ err = batch.Put(txq.idKey(), id)
+ if err != nil {
+ return 0, err
+ }
+ err = batch.Put(txq.extraDataKey(id), extraData)
+ if err != nil {
+ return 0, err
+ }
+ err = batch.Put(txq.requestKey(id), &txRequestData{
+ ID: id,
+ Request: request,
+ HandlerID: handlerID,
+ State: TxRequestStateScheduled,
+ })
+ if err != nil {
+ return 0, err
+ }
+ _, triggerQueue, err := txq.requestQueue.enqueue(batch, id)
+ if err != nil {
+ return 0, err
+ }
+ // persist to disk
+ err = txq.store.WriteBatch(batch)
+ if err != nil {
+ return 0, err
+ }
+ triggerQueue()
+ return id, nil
+// GetExtraData load the serialized extra data for this request from disk and tries to decode it
+func (txq *TxQueue) GetExtraData(id uint64, request interface{}) error {
+ return txq.store.Get(txq.extraDataKey(id), &request)
+// GetRequestState gets the state the request is currently in
+func (txq *TxQueue) GetRequestState(id uint64) (TxRequestState, error) {
+ var requestMetadata *txRequestData
+ err := txq.store.Get(txq.requestKey(id), &requestMetadata)
+ if err != nil {
+ return 0, err
+ }
+ return requestMetadata.State, nil
+// Start starts processing transactions if it is not already doing so
+func (txq *TxQueue) Start() {
+ txq.lock.Lock()
+ defer txq.lock.Unlock()
+ if txq.started {
+ return
+ }
+ txq.started = true
+ txq.wg.Add(2)
+ go func() {
+ defer txq.wg.Done()
+ // run the actual loop
+ err := txq.processQueue()
+ if err != nil && !errors.Is(err, context.Canceled) {
+ log.Error("transaction queue terminated with an error", "queue", txq.prefix, "error", err)
+ }
+ }()
+ go func() {
+ defer txq.wg.Done()
+ // listen on the error channel and stop the queue on error
+ select {
+ case err := <-txq.errorChan:
+ log.Error("unrecoverable transaction queue error (transaction processing disabled)", "error", err)
+ txq.Stop()
+ case <-txq.ctx.Done():
+ }
+ }()
+ close(txq.startedChan)
+// Stop stops processing transactions if it is running
+// It will block until processing has terminated
+func (txq *TxQueue) Stop() {
+ txq.lock.Lock()
+ if !txq.started {
+ txq.lock.Unlock()
+ return
+ }
+ // we cancel the context that all long running operations in the queue use
+ txq.cancel()
+ txq.lock.Unlock()
+ // wait until all routines have finished
+ txq.wg.Wait()
+// getNotificationQueue gets the notification queue for a handler
+// it initializes the struct if it does not yet exist
+// the TxQueue lock must be held
+func (txq *TxQueue) getNotificationQueue(handlerID string) *persistentQueue {
+ queue, ok := txq.notificationQueues[handlerID]
+ if !ok {
+ queue = newPersistentQueue(txq.store, fmt.Sprintf("%s_notify_%s", txq.prefix, handlerID))
+ txq.notificationQueues[handlerID] = queue
+ }
+ return queue
+// SetHandlers registers the handlers for the given handlerID
+// This starts the delivery of notifications for this handlerID
+func (txq *TxQueue) SetHandlers(handlerID string, handlers *TxRequestHandlers) error {
+ txq.lock.Lock()
+ defer txq.lock.Unlock()
+ if txq.handlers[handlerID] != nil {
+ return fmt.Errorf("handlers for %s already set", handlerID)
+ }
+ txq.handlers[handlerID] = handlers
+ notifyQueue := txq.getNotificationQueue(handlerID)
+ // go routine processing the notification queue for this handler
+ txq.wg.Add(1)
+ go func() {
+ defer txq.wg.Done()
+ // only start sending notification once the loop started
+ select {
+ case <-txq.startedChan:
+ case <-txq.ctx.Done():
+ return
+ }
+ for {
+ var item notificationQueueItem
+ // get the next notification item
+ key, err := notifyQueue.next(txq.ctx, &item, &txq.lock)
+ if err != nil {
+ if !errors.Is(err, context.Canceled) {
+ txq.stopWithError(fmt.Errorf("could not read from notification queue: %v", err))
+ }
+ return
+ }
+ // since this is the only function which deletes this item from notifyQueue we can already unlock here
+ txq.lock.Unlock()
+ // load and decode the notification
+ var notification interface{}
+ switch item.NotificationType {
+ case txReceiptNotificationType:
+ notification = &TxReceiptNotification{}
+ case txPendingNotificationType:
+ notification = &TxPendingNotification{}
+ case txCancelledNotificationType:
+ notification = &TxCancelledNotification{}
+ case txStatusUnknownNotificationType:
+ notification = &TxStatusUnknownNotification{}
+ }
+ err = txq.store.Get(txq.notificationKey(key), notification)
+ if err != nil {
+ txq.stopWithError(fmt.Errorf("could not read notification: %v", err))
+ return
+ }
+ switch item.NotificationType {
+ case txReceiptNotificationType:
+ if handlers.NotifyReceipt != nil {
+ err = handlers.NotifyReceipt(txq.ctx, item.RequestID, notification.(*TxReceiptNotification))
+ }
+ case txPendingNotificationType:
+ if handlers.NotifyPending != nil {
+ err = handlers.NotifyPending(txq.ctx, item.RequestID, notification.(*TxPendingNotification))
+ }
+ case txCancelledNotificationType:
+ if handlers.NotifyCancelled != nil {
+ err = handlers.NotifyCancelled(txq.ctx, item.RequestID, notification.(*TxCancelledNotification))
+ }
+ case txStatusUnknownNotificationType:
+ if handlers.NotifyStatusUnknown != nil {
+ err = handlers.NotifyStatusUnknown(txq.ctx, item.RequestID, notification.(*TxStatusUnknownNotification))
+ }
+ }
+ // if a handler failed we will try again in 10 seconds
+ if err != nil {
+ log.Error("transaction request handler failed", "type", item.NotificationType, "request", item.RequestID, "error", err)
+ select {
+ case <-txq.ctx.Done():
+ return
+ case <-time.After(10 * time.Second):
+ continue
+ }
+ }
+ // once the notification was handled delete it from the queue
+ txq.lock.Lock()
+ batch := new(state.StoreBatch)
+ notifyQueue.delete(batch, key)
+ err = txq.store.WriteBatch(batch)
+ txq.lock.Unlock()
+ if err != nil {
+ txq.stopWithError(fmt.Errorf("could not delete notification: %v", err))
+ return
+ }
+ }
+ }()
+ return nil
+// helper function to trigger a notification
+// the returned trigger function must be called once the batch has been written
+// must be called with the txqueue lock held
+func (txq *TxQueue) notify(batch *state.StoreBatch, id uint64, handlerID string, notificationType string, notification interface{}) (triggerNotifyQueue func(), err error) {
+ notifyQueue := txq.getNotificationQueue(handlerID)
+ key, triggerNotifyQueue, err := notifyQueue.enqueue(batch, ¬ificationQueueItem{
+ RequestID: id,
+ NotificationType: notificationType,
+ })
+ if err != nil {
+ return nil, fmt.Errorf("could not serialize notification queue item: %v", err)
+ }
+ err = batch.Put(txq.notificationKey(key), notification)
+ if err != nil {
+ return nil, fmt.Errorf("could not serialize notification: %v", err)
+ }
+ return triggerNotifyQueue, nil
+// waitForNextRequest waits for the next request and sets it as the active request
+// the txqueue lock must not be held
+func (txq *TxQueue) waitForNextRequest() (requestMetadata *txRequestData, err error) {
+ var id uint64
+ // get the id of the next request in the queue
+ key, err := txq.requestQueue.next(txq.ctx, &id, &txq.lock)
+ if err != nil {
+ return nil, err
+ }
+ defer txq.lock.Unlock()
+ err = txq.store.Get(txq.requestKey(id), &requestMetadata)
+ if err != nil {
+ return nil, err
+ }
+ // if the request was successfully decoded it is removed from the queue and set as the active request
+ batch := new(state.StoreBatch)
+ err = batch.Put(txq.activeRequestKey(), requestMetadata.ID)
+ if err != nil {
+ return nil, fmt.Errorf("could not put id write into batch: %v", err)
+ }
+ txq.requestQueue.delete(batch, key)
+ err = txq.store.WriteBatch(batch)
+ if err != nil {
+ return nil, err
+ }
+ return requestMetadata, nil
+// helper function to set a request state and remove it as the active request in a single batch
+// the txqueue lock must be held
+func (txq *TxQueue) finalizeRequest(batch *state.StoreBatch, requestMetadata *txRequestData, state TxRequestState) error {
+ requestMetadata.State = state
+ err := batch.Put(txq.requestKey(requestMetadata.ID), requestMetadata.ID)
+ if err != nil {
+ return err
+ }
+ batch.Delete(txq.activeRequestKey())
+ return txq.store.WriteBatch(batch)
+// helper function to set a request as cancelled and emit the appropriate notification
+// the txqueue lock must be held
+func (txq *TxQueue) finalizeRequestCancelled(requestMetadata *txRequestData, err error) error {
+ batch := new(state.StoreBatch)
+ trigger, err := txq.notify(batch, requestMetadata.ID, requestMetadata.HandlerID, "TxCancelledNotification", &TxCancelledNotification{
+ Reason: err.Error(),
+ })
+ if err != nil {
+ return err
+ }
+ err = txq.finalizeRequest(batch, requestMetadata, TxRequestStateCancelled)
+ if err != nil {
+ return err
+ }
+ trigger()
+ return nil
+// helper function to set a request as status unknown and emit the appropriate notification
+// the txqueue lock must be held
+func (txq *TxQueue) finalizeRequestStatusUnknown(requestMetadata *txRequestData, reason string) error {
+ batch := new(state.StoreBatch)
+ trigger, err := txq.notify(batch, requestMetadata.ID, requestMetadata.HandlerID, txStatusUnknownNotificationType, &TxStatusUnknownNotification{
+ Reason: reason,
+ })
+ if err != nil {
+ return err
+ }
+ err = txq.finalizeRequest(batch, requestMetadata, TxRequestStateStatusUnknown)
+ if err != nil {
+ return err
+ }
+ trigger()
+ return nil
+// helper function to set a request as confirmed and emit the appropriate notification
+// the txqueue lock must be held
+func (txq *TxQueue) finalizeRequestConfirmed(requestMetadata *txRequestData, receipt types.Receipt) error {
+ batch := new(state.StoreBatch)
+ trigger, err := txq.notify(batch, requestMetadata.ID, requestMetadata.HandlerID, txReceiptNotificationType, &TxReceiptNotification{
+ Receipt: receipt,
+ })
+ if err != nil {
+ return err
+ }
+ err = txq.finalizeRequest(batch, requestMetadata, TxRequestStateConfirmed)
+ if err != nil {
+ return err
+ }
+ trigger()
+ return nil
+// processRequest continues processing the provided request
+func (txq *TxQueue) processRequest(requestMetadata *txRequestData) error {
+ switch requestMetadata.State {
+ case TxRequestStateScheduled:
+ err := txq.generateTransaction(requestMetadata)
+ if err != nil {
+ return err
+ }
+ fallthrough
+ case TxRequestStateSigned:
+ err := txq.sendTransaction(requestMetadata)
+ if err != nil {
+ return err
+ }
+ fallthrough
+ case TxRequestStatePending:
+ return txq.waitForActiveTransaction(requestMetadata)
+ default:
+ return fmt.Errorf("trying to process transaction in unknown state: %d", requestMetadata.State)
+ }
+// generateTransaction assigns the nonce, signs the resulting transaction and saves it
+func (txq *TxQueue) generateTransaction(requestMetadata *txRequestData) error {
+ opts := bind.NewKeyedTransactor(txq.privateKey)
+ opts.Context = txq.ctx
+ nonce, err := txq.backend.PendingNonceAt(txq.ctx, opts.From)
+ if err != nil {
+ return txq.finalizeRequestCancelled(requestMetadata, err)
+ }
+ request := requestMetadata.Request
+ if request.GasLimit == 0 {
+ gasLimit, err := request.EstimateGas(txq.ctx, txq.backend, opts.From)
+ if err != nil {
+ return txq.finalizeRequestCancelled(requestMetadata, err)
+ }
+ request.GasLimit = gasLimit
+ }
+ if request.GasPrice == nil {
+ request.GasPrice, err = txq.backend.SuggestGasPrice(txq.ctx)
+ if err != nil {
+ return txq.finalizeRequestCancelled(requestMetadata, err)
+ }
+ }
+ tx := types.NewTransaction(
+ nonce,
+ request.To,
+ request.Value,
+ request.GasLimit,
+ request.GasPrice,
+ request.Data,
+ )
+ requestMetadata.Transaction, err = opts.Signer(&types.HomesteadSigner{}, opts.From, tx)
+ if err != nil {
+ return txq.finalizeRequestCancelled(requestMetadata, err)
+ }
+ requestMetadata.State = TxRequestStateSigned
+ return txq.store.Put(txq.requestKey(requestMetadata.ID), requestMetadata)
+// sendTransaction sends the signed transaction to the ethereum backend
+func (txq *TxQueue) sendTransaction(requestMetadata *txRequestData) error {
+ err := txq.backend.SendTransactionWithID(txq.ctx, requestMetadata.ID, requestMetadata.Transaction)
+ txq.lock.Lock()
+ defer txq.lock.Unlock()
+ if err != nil {
+ // even if SendTransactionRequest returns an error there are still certain rare edge cases where the transaction might still be sent so we mark it as status unknown
+ return txq.finalizeRequestStatusUnknown(requestMetadata, err.Error())
+ }
+ // if we have a hash we mark the transaction as pending
+ batch := new(state.StoreBatch)
+ requestMetadata.State = TxRequestStatePending
+ err = batch.Put(txq.requestKey(requestMetadata.ID), requestMetadata)
+ if err != nil {
+ return err
+ }
+ trigger, err := txq.notify(batch, requestMetadata.ID, requestMetadata.HandlerID, txPendingNotificationType, &TxPendingNotification{
+ Transaction: requestMetadata.Transaction,
+ })
+ if err != nil {
+ return err
+ }
+ err = txq.store.WriteBatch(batch)
+ if err != nil {
+ return err
+ }
+ trigger()
+ return nil
+// processActiveRequest continues monitoring the active request if there is one
+// this is called on startup before the queue begins normal operation
+func (txq *TxQueue) processActiveRequest() error {
+ // get the stored active request key
+ // if nothing is stored id will remain 0 (which is invalid as ids start with 1)
+ var id uint64
+ err := txq.store.Get(txq.activeRequestKey(), &id)
+ if err == state.ErrNotFound {
+ return nil
+ }
+ if err != nil {
+ return err
+ }
+ // load the request metadata
+ var requestMetadata txRequestData
+ err = txq.store.Get(txq.requestKey(id), &requestMetadata)
+ if err != nil {
+ return err
+ }
+ // continue processing as regular
+ return txq.processRequest(&requestMetadata)
+// waitForActiveTransaction waits for requestMetadata to be mined and resets the active transaction afterwards
+// the transaction will also be considered mined once the notification was queued successfully
+// this only returns an error if the encoding fails which is an unrecoverable error
+// the txqueue lock must not be held
+func (txq *TxQueue) waitForActiveTransaction(requestMetadata *txRequestData) error {
+ ctx, cancel := context.WithTimeout(txq.ctx, 20*time.Minute)
+ defer cancel()
+ // an error here means the context was cancelled
+ receipt, err := WaitMined(ctx, txq.backend, requestMetadata.Transaction.Hash())
+ txq.lock.Lock()
+ defer txq.lock.Unlock()
+ if err != nil {
+ // if the main context of the TxQueue was cancelled we log and return
+ if txq.ctx.Err() != nil {
+ log.Info("terminating transaction queue while waiting for a transaction", "hash", requestMetadata.Transaction.Hash())
+ return nil
+ }
+ // if the timeout context expired we mark the transaction status as unknown
+ // future versions of the queue (with local nonce-tracking) should keep note of that and reuse the nonce for the next request
+ log.Warn("transaction timeout reached", "hash", requestMetadata.Transaction.Hash())
+ return txq.finalizeRequestStatusUnknown(requestMetadata, "transaction timed out")
+ }
+ return txq.finalizeRequestConfirmed(requestMetadata, *receipt)
+// processQueue is the main transaction processing function of the TxQueue
+// first it checks if there already is an active request. If so it processes this first
+// then it will take requests from the queue in a loop and execute those
+func (txq *TxQueue) processQueue() error {
+ err := txq.processActiveRequest()
+ if err != nil {
+ return err
+ }
+ for {
+ select {
+ case <-txq.ctx.Done():
+ return nil
+ default:
+ }
+ requestMetadata, err := txq.waitForNextRequest()
+ if err != nil {
+ return err
+ }
+ err = txq.processRequest(requestMetadata)
+ if err != nil {
+ return err
+ }
+ }
diff --git a/swap/chain/txqueue_test.go b/swap/chain/txqueue_test.go
new file mode 100644
index 0000000000..c50789bdcd
--- /dev/null
+++ b/swap/chain/txqueue_test.go
@@ -0,0 +1,470 @@
+// Copyright 2020 The Swarm Authors
+// This file is part of the Swarm library.
+// The Swarm library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+// The Swarm library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// GNU Lesser General Public License for more details.
+// You should have received a copy of the GNU Lesser General Public License
+// along with the Swarm library. If not, see .
+package chain
+import (
+ "bytes"
+ "context"
+ "errors"
+ "fmt"
+ "math/big"
+ "reflect"
+ "sync"
+ "testing"
+ "time"
+ "github.com/ethereum/go-ethereum/accounts/abi/bind/backends"
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/crypto"
+ "github.com/ethersphere/swarm/state"
+var (
+ senderKey, _ = crypto.HexToECDSA("634fb5a872396d9693e5c9f9d7233cfa93f395c093371017ff44aa9ae6564cdd")
+ senderAddress = crypto.PubkeyToAddress(senderKey.PublicKey)
+var defaultBackend = backends.NewSimulatedBackend(core.GenesisAlloc{
+ senderAddress: {Balance: big.NewInt(1000000000000000000)},
+}, 8000000)
+// backend.SendTransaction outcome associated with a request id
+type testRequestOutcome struct {
+ noCommit bool // the backend should not automatically mine the transaction
+ sendError error // SendTransaction should return with this error
+// testTxSchedulerBackend wraps a SimulatedBackend and provides a way to determine the result of SendTransaction
+type testTxSchedulerBackend struct {
+ *backends.SimulatedBackend
+ requestOutcomes map[uint64]testRequestOutcome // map of request id to outcome
+ lock sync.Mutex // lock for map access and blocking SendTransactionWithID
+func newTestTxSchedulerBackend(backend *backends.SimulatedBackend) *testTxSchedulerBackend {
+ return &testTxSchedulerBackend{
+ SimulatedBackend: backend,
+ requestOutcomes: make(map[uint64]testRequestOutcome),
+ }
+func (b *testTxSchedulerBackend) SendTransactionWithID(ctx context.Context, id uint64, tx *types.Transaction) error {
+ b.lock.Lock()
+ defer b.lock.Unlock()
+ outcome, ok := b.requestOutcomes[id]
+ if ok {
+ if outcome.sendError != nil {
+ return outcome.sendError
+ }
+ err := b.SimulatedBackend.SendTransaction(ctx, tx)
+ if err == nil && !outcome.noCommit {
+ b.SimulatedBackend.Commit()
+ }
+ return err
+ }
+ err := b.SimulatedBackend.SendTransaction(ctx, tx)
+ if err == nil {
+ b.SimulatedBackend.Commit()
+ }
+ return err
+const testHandlerID = "test_TestRequest"
+// txSchedulerTester is a helper used for testing TxScheduler implementations
+// it saves received notifications to channels so they can easily be checked in tests
+// furthermore it can trigger certain errors depending on flags set in the requests
+type txSchedulerTester struct {
+ lock sync.Mutex
+ txScheduler TxScheduler
+ chans map[uint64]*txSchedulerTesterRequestData // map from request id to channels
+ backend *testTxSchedulerBackend
+// txSchedulerTesterRequestData is the data txSchedulerTester saves for every request
+type txSchedulerTesterRequestData struct {
+ ReceiptNotification chan *TxReceiptNotification
+ CancelledNotification chan *TxCancelledNotification
+ PendingNotification chan *TxPendingNotification
+ StatusUnknownNotification chan *TxStatusUnknownNotification
+ request TxRequest
+type txSchedulerTesterRequestExtraData struct {
+func newTxSchedulerTester(backend *testTxSchedulerBackend, txScheduler TxScheduler) (*txSchedulerTester, error) {
+ tc := &txSchedulerTester{
+ txScheduler: txScheduler,
+ backend: backend,
+ chans: make(map[uint64]*txSchedulerTesterRequestData),
+ }
+ err := tc.setHandlers(txScheduler)
+ if err != nil {
+ return nil, err
+ }
+ return tc, nil
+// hooks up the TxScheduler handlers to the txSchedulerTester channels
+func (tc *txSchedulerTester) setHandlers(txScheduler TxScheduler) error {
+ return txScheduler.SetHandlers(testHandlerID, &TxRequestHandlers{
+ NotifyReceipt: func(ctx context.Context, id uint64, notification *TxReceiptNotification) error {
+ select {
+ case tc.getRequest(id).ReceiptNotification <- notification:
+ return nil
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+ },
+ NotifyCancelled: func(ctx context.Context, id uint64, notification *TxCancelledNotification) error {
+ select {
+ case tc.getRequest(id).CancelledNotification <- notification:
+ return nil
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+ },
+ NotifyPending: func(ctx context.Context, id uint64, notification *TxPendingNotification) error {
+ select {
+ case tc.getRequest(id).PendingNotification <- notification:
+ return nil
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+ },
+ NotifyStatusUnknown: func(ctx context.Context, id uint64, notification *TxStatusUnknownNotification) error {
+ select {
+ case tc.getRequest(id).StatusUnknownNotification <- notification:
+ return nil
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+ },
+ })
+// schedule request with the provided extra data and the transaction outcome
+func (tc *txSchedulerTester) schedule(request TxRequest, requestExtraData interface{}, outcome *testRequestOutcome) (uint64, error) {
+ // this lock here is crucial as it blocks SendTransaction until the requestOutcomes has been set
+ tc.backend.lock.Lock()
+ defer tc.backend.lock.Unlock()
+ id, err := tc.txScheduler.ScheduleRequest(testHandlerID, request, requestExtraData)
+ if err != nil {
+ return 0, err
+ }
+ if outcome != nil {
+ tc.backend.requestOutcomes[id] = *outcome
+ }
+ tc.getRequest(id).request = request
+ return id, nil
+// getRequest gets the txSchedulerTesterRequestData for this id or initializes it if it does not yet exist
+func (tc *txSchedulerTester) getRequest(id uint64) *txSchedulerTesterRequestData {
+ tc.lock.Lock()
+ defer tc.lock.Unlock()
+ c, ok := tc.chans[id]
+ if !ok {
+ tc.chans[id] = &txSchedulerTesterRequestData{
+ ReceiptNotification: make(chan *TxReceiptNotification),
+ PendingNotification: make(chan *TxPendingNotification),
+ CancelledNotification: make(chan *TxCancelledNotification),
+ StatusUnknownNotification: make(chan *TxStatusUnknownNotification),
+ }
+ return tc.chans[id]
+ }
+ return c
+// expectStateChangedNotification waits for a StateChangedNotification with the given parameters
+func (tc *txSchedulerTester) expectStatusUnknownNotification(ctx context.Context, id uint64, reason string) error {
+ var notification *TxStatusUnknownNotification
+ request := tc.getRequest(id)
+ select {
+ case notification = <-request.StatusUnknownNotification:
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+ if notification.Reason != reason {
+ return fmt.Errorf("reason mismatch. got %s, expected %s", notification.Reason, reason)
+ }
+ return nil
+func (tc *txSchedulerTester) expectPendingNotification(ctx context.Context, id uint64) error {
+ var notification *TxPendingNotification
+ request := tc.getRequest(id)
+ select {
+ case notification = <-request.PendingNotification:
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+ tx := notification.Transaction
+ if !bytes.Equal(tx.Data(), request.request.Data) {
+ return fmt.Errorf("transaction data mismatch. got %v, expected %v", tx.Data(), request.request.Data)
+ }
+ if *tx.To() != request.request.To {
+ return fmt.Errorf("transaction to mismatch. got %v, expected %v", tx.To(), request.request.To)
+ }
+ if tx.Value().Cmp(request.request.Value) != 0 {
+ return fmt.Errorf("transaction value mismatch. got %v, expected %v", tx.Value(), request.request.Value)
+ }
+ return nil
+// expectStateChangedNotification waits for a ReceiptNotification for the given request id and verifies its hash
+func (tc *txSchedulerTester) expectReceiptNotification(ctx context.Context, id uint64) error {
+ var notification *TxReceiptNotification
+ request := tc.getRequest(id)
+ select {
+ case notification = <-request.ReceiptNotification:
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+ tx, pending, err := tc.backend.TransactionByHash(ctx, notification.Receipt.TxHash)
+ if err != nil {
+ return err
+ }
+ if pending {
+ return errors.New("received a receipt notification for a pending transaction")
+ }
+ if tx == nil {
+ return errors.New("transaction not found")
+ }
+ if !bytes.Equal(tx.Data(), request.request.Data) {
+ return fmt.Errorf("transaction data mismatch. got %v, expected %v", tx.Data(), request.request.Data)
+ }
+ if *tx.To() != request.request.To {
+ return fmt.Errorf("transaction to mismatch. got %v, expected %v", tx.To(), request.request.To)
+ }
+ if tx.Value().Cmp(request.request.Value) != 0 {
+ return fmt.Errorf("transaction value mismatch. got %v, expected %v", tx.Value(), request.request.Value)
+ }
+ return nil
+// makeTestRequest creates a simple test request to the 0x0 address
+func makeTestRequest() TxRequest {
+ return TxRequest{
+ To: common.Address{},
+ Value: big.NewInt(0),
+ Data: []byte{},
+ }
+// helper function for queue tests which sets up everything and provides a cleanup function
+// if run is true the queue starts processing requests and cleanup function will wait for proper termination
+func setupTxQueueTest(run bool) (*TxQueue, *testTxSchedulerBackend, func()) {
+ backend := defaultBackend
+ backend.Commit()
+ testBackend := newTestTxSchedulerBackend(backend)
+ store := state.NewInmemoryStore()
+ txq := NewTxQueue(store, "test", testBackend, senderKey)
+ if run {
+ txq.Start()
+ }
+ return txq, testBackend, func() {
+ if run {
+ txq.Stop()
+ }
+ store.Close()
+ }
+// TestTxQueueScheduleRequest tests scheduling a single request when the queue is not running
+// Afterwards the queue is started and the correct sequence of notifications is expected
+func TestTxQueueScheduleRequest(t *testing.T) {
+ txq, backend, clean := setupTxQueueTest(false)
+ defer clean()
+ tc, err := newTxSchedulerTester(backend, txq)
+ if err != nil {
+ t.Fatal(err)
+ }
+ testRequest := &txSchedulerTesterRequestExtraData{}
+ id, err := tc.schedule(makeTestRequest(), testRequest, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if id != 1 {
+ t.Fatal("expected id to be 1")
+ }
+ var testRequestRetrieved *txSchedulerTesterRequestExtraData
+ err = txq.GetExtraData(id, &testRequestRetrieved)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if !reflect.DeepEqual(testRequest, testRequestRetrieved) {
+ t.Fatalf("got request %v, expected %v", testRequestRetrieved, testRequest)
+ }
+ txq.Start()
+ defer txq.Stop()
+ ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
+ defer cancel()
+ if err = tc.expectPendingNotification(ctx, id); err != nil {
+ t.Fatal(err)
+ }
+ if err = tc.expectReceiptNotification(ctx, id); err != nil {
+ t.Fatal(err)
+ }
+// TestTxQueueManyRequests schedules many requests and expects all of them to be successful
+func TestTxQueueManyRequests(t *testing.T) {
+ txq, backend, clean := setupTxQueueTest(true)
+ defer clean()
+ tc, err := newTxSchedulerTester(backend, txq)
+ if err != nil {
+ t.Fatal(err)
+ }
+ var ids []uint64
+ count := 200
+ for i := 0; i < count; i++ {
+ id, err := tc.schedule(makeTestRequest(), &txSchedulerTesterRequestExtraData{}, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ ids = append(ids, id)
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+ for _, id := range ids {
+ err = tc.expectPendingNotification(ctx, id)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = tc.expectReceiptNotification(ctx, id)
+ if err != nil {
+ t.Fatal(err)
+ }
+ }
+// TestTxQueueActiveTransaction tests that the queue continues to monitor the last pending transaction
+func TestTxQueueActiveTransaction(t *testing.T) {
+ txq, backend, clean := setupTxQueueTest(false)
+ defer clean()
+ tc, err := newTxSchedulerTester(backend, txq)
+ if err != nil {
+ t.Fatal(err)
+ }
+ txq.Start()
+ id, err := tc.schedule(makeTestRequest(), 5, &testRequestOutcome{
+ noCommit: true,
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
+ defer cancel()
+ err = tc.expectPendingNotification(ctx, id)
+ if err != nil {
+ t.Fatal(err)
+ }
+ txq.Stop()
+ state, err := txq.GetRequestState(id)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if state != TxRequestStatePending {
+ t.Fatalf("state not pending, was %d", state)
+ }
+ // start a new queue with the same store and backend
+ txq2 := NewTxQueue(txq.store, txq.prefix, txq.backend, txq.privateKey)
+ if err != nil {
+ t.Fatal(err)
+ }
+ // reuse the tester so it maintains state about the tx hash and id
+ tc.setHandlers(txq2)
+ if err != nil {
+ t.Fatal(err)
+ }
+ // the transaction confirmed in the meantime
+ backend.Commit()
+ txq2.Start()
+ defer txq2.Stop()
+ err = tc.expectReceiptNotification(ctx, id)
+ if err != nil {
+ t.Fatal(err)
+ }
+// TestTxQueueErrorDuringSend tests that a request is marked as TxRequestStateStatusUnknown if the send fails
+func TestTxQueueErrorDuringSend(t *testing.T) {
+ txq, backend, clean := setupTxQueueTest(true)
+ defer clean()
+ tc, err := newTxSchedulerTester(backend, txq)
+ if err != nil {
+ t.Fatal(err)
+ }
+ id, err := tc.schedule(makeTestRequest(), 5, &testRequestOutcome{
+ sendError: errors.New("test error"),
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
+ defer cancel()
+ err = tc.expectStatusUnknownNotification(ctx, id, "test error")
+ if err != nil {
+ t.Fatal(err)
+ }
diff --git a/swap/chain/txscheduler.go b/swap/chain/txscheduler.go
new file mode 100644
index 0000000000..3fa1e4d60e
--- /dev/null
+++ b/swap/chain/txscheduler.go
@@ -0,0 +1,159 @@
+// Copyright 2020 The Swarm Authors
+// This file is part of the Swarm library.
+// The Swarm library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+// The Swarm library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// GNU Lesser General Public License for more details.
+// You should have received a copy of the GNU Lesser General Public License
+// along with the Swarm library. If not, see .
+package chain
+import (
+ "context"
+ "math/big"
+ ethereum "github.com/ethereum/go-ethereum"
+ "github.com/ethereum/go-ethereum/accounts/abi/bind"
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core/types"
+// TxSchedulerBackend is an extension of the normal Backend interface
+type TxSchedulerBackend interface {
+ Backend
+ // SendTransactionWithID is the same as SendTransaction but with the ID of the associated request passed alongside
+ // This is primarily used so the backend can react with the expected result during testing
+ SendTransactionWithID(ctx context.Context, id uint64, tx *types.Transaction) error
+// DefaultTxSchedulerBackend is the standard backend that should be used
+// It simply wraps another Backend
+type DefaultTxSchedulerBackend struct {
+ Backend
+// SendTransactionWithID in the default Backend calls the underlying SendTransaction function
+func (b *DefaultTxSchedulerBackend) SendTransactionWithID(ctx context.Context, id uint64, tx *types.Transaction) error {
+ return b.Backend.SendTransaction(ctx, tx)
+// TxRequest describes a request for a transaction that can be scheduled
+type TxRequest struct {
+ To common.Address // recipient of the transaction
+ Data []byte // transaction data
+ GasPrice *big.Int // gas price or nil if suggested gas price should be used
+ GasLimit uint64 // gas limit or 0 if it should be estimated
+ Value *big.Int // amount of wei to send
+// ToSignedTx returns a signed types.Transaction for the given request and nonce
+func (request *TxRequest) ToSignedTx(nonce uint64, opts *bind.TransactOpts) (*types.Transaction, error) {
+ tx := types.NewTransaction(
+ nonce,
+ request.To,
+ request.Value,
+ request.GasLimit,
+ request.GasPrice,
+ request.Data,
+ )
+ return opts.Signer(&types.HomesteadSigner{}, opts.From, tx)
+// EstimateGas estimates the gas usage if this request was send from the supplied sender
+func (request *TxRequest) EstimateGas(ctx context.Context, backend Backend, from common.Address) (uint64, error) {
+ gasLimit, err := backend.EstimateGas(ctx, ethereum.CallMsg{
+ From: from,
+ To: &request.To,
+ Data: request.Data,
+ })
+ if err != nil {
+ return 0, err
+ }
+ return gasLimit, nil
+// TxScheduler represents a central sender for all transactions from a single ethereum account
+// its purpose is to ensure there are no nonce issues and that transaction initiators are notified of the result
+// notifications are guaranteed to happen even across node restarts and disconnects from the ethereum backend
+// the account managed by this scheduler must not be used from anywhere else
+type TxScheduler interface {
+ // SetHandlers registers the handlers for the given handlerID
+ // This starts the delivery of notifications for this handlerID
+ SetHandlers(handlerID string, handlers *TxRequestHandlers) error
+ // ScheduleRequest adds a new request to be processed
+ // The request is assigned an id which is returned
+ ScheduleRequest(handlerID string, request TxRequest, requestExtraData interface{}) (id uint64, err error)
+ // GetExtraData loads the serialized extra data for this request from disk and tries to decode it
+ GetExtraData(id uint64, request interface{}) error
+ // GetRequestState gets the state the request is currently in
+ GetRequestState(id uint64) (TxRequestState, error)
+ // Start starts processing transactions if it is not already doing so
+ // This cannot be used to restart the queue once stopped
+ Start()
+ // Stop stops processing transactions if it is running
+ // It will block until processing has terminated
+ Stop()
+// TxRequestHandlers holds all the callbacks for a given string
+// Any of the functions may be nil
+// Notify functions are called by the transaction queue when a notification for a transaction occurs
+// If the handler returns an error the notification will be resent in the future (including across restarts)
+type TxRequestHandlers struct {
+ // NotifyReceipt is called the first time a receipt is observed for a transaction
+ // This happens the first time a transaction was included in a block
+ NotifyReceipt func(ctx context.Context, id uint64, notification *TxReceiptNotification) error
+ // NotifyPending is called after the transaction was successfully sent to the backend
+ NotifyPending func(ctx context.Context, id uint64, notification *TxPendingNotification) error
+ // NotifyCancelled is called when it is certain that this transaction will never be sent
+ NotifyCancelled func(ctx context.Context, id uint64, notification *TxCancelledNotification) error
+ // NotifyStatusUnknown is called if it cannot be determined if the transaction might be confirmed
+ NotifyStatusUnknown func(ctx context.Context, id uint64, notification *TxStatusUnknownNotification) error
+// TxReceiptNotification is the notification emitted when the receipt is available
+type TxReceiptNotification struct {
+ Receipt types.Receipt // the receipt of the included transaction
+// TxCancelledNotification is the notification emitted when it is certain that a transaction will never be sent
+type TxCancelledNotification struct {
+ Reason string // The reason behind the cancellation
+// TxStatusUnknownNotification is the notification emitted if it cannot be determined if the transaction might be confirmed
+type TxStatusUnknownNotification struct {
+ Reason string // The reason why it is unknown
+// TxPendingNotification is the notification emitted after the transaction was successfully sent to the backend
+type TxPendingNotification struct {
+ Transaction *types.Transaction // The transaction that was sent
+// TxRequestState is the type used to indicate which state the transaction is in
+type TxRequestState uint8
+const (
+ // TxRequestStateScheduled is the initial state for all requests that enter the queue
+ TxRequestStateScheduled TxRequestState = 0
+ // TxRequestStateSigned means the transaction has been generated and signed but not yet sent
+ TxRequestStateSigned TxRequestState = 1
+ // TxRequestStatePending means the transaction has been sent but is not yet confirmed
+ TxRequestStatePending TxRequestState = 2
+ // TxRequestStateConfirmed is entered the first time a confirmation is received
+ TxRequestStateConfirmed TxRequestState = 3
+ // TxRequestStateStatusUnknown is used for all cases where it is unclear wether the transaction was broadcast or not. This is also used for timed-out transactions.
+ TxRequestStateStatusUnknown TxRequestState = 4
+ // TxRequestStateCancelled is used for all cases where it is certain the transaction was and never will be sent
+ TxRequestStateCancelled TxRequestState = 5
diff --git a/swap/common_test.go b/swap/common_test.go
index 5eff2714e2..263e082862 100644
--- a/swap/common_test.go
+++ b/swap/common_test.go
@@ -15,10 +15,12 @@ import (
+ "github.com/ethereum/go-ethereum/core/types"
contractFactory "github.com/ethersphere/go-sw3/contracts-v0-2-0/simpleswapfactory"
+ contract "github.com/ethersphere/swarm/contracts/swap"
cswap "github.com/ethersphere/swarm/contracts/swap"
@@ -34,8 +36,6 @@ type swapTestBackend struct {
factoryAddress common.Address // address of the SimpleSwapFactory in the simulated network
tokenAddress common.Address // address of the token in the simulated network
- // the async cashing go routine needs synchronization for tests
- cashDone chan struct{}
var defaultBackend = backends.NewSimulatedBackend(core.GenesisAlloc{
@@ -67,7 +67,6 @@ func newTestBackend(t *testing.T) *swapTestBackend {
TestBackend: backend,
factoryAddress: factoryAddress,
tokenAddress: tokenAddress,
- cashDone: make(chan struct{}),
@@ -106,7 +105,11 @@ func newBaseTestSwapWithParams(t *testing.T, key *ecdsa.PrivateKey, params *Para
if err != nil {
- swap := newSwapInstance(stateStore, owner, backend, 10, params, factory, swapLogger)
+ txqueue := chain.NewTxQueue(stateStore, "chain", &chain.DefaultTxSchedulerBackend{
+ Backend: backend,
+ }, owner.privateKey)
+ swap := newSwapInstance(stateStore, owner, backend, 10, params, factory, txqueue, swapLogger)
return swap, dir
@@ -127,6 +130,7 @@ func newTestSwap(t *testing.T, key *ecdsa.PrivateKey, backend *swapTestBackend)
usedBackend = newTestBackend(t)
swap, dir := newBaseTestSwap(t, key, usedBackend)
+ swap.txScheduler.Start()
clean := func() {
// only close if created by newTestSwap to avoid double close
@@ -207,32 +211,6 @@ func newRandomTestCheque() *Cheque {
return cheque
-// During tests, because the cashing in of cheques is async, we should wait for the function to be returned
-// Otherwise if we call `handleEmitChequeMsg` manually, it will return before the TX has been committed to the `SimulatedBackend`,
-// causing subsequent TX to possibly fail due to nonce mismatch
-func testCashCheque(s *Swap, cheque *Cheque) {
- cashCheque(s, cheque)
- // send to the channel, signals to clients that this function actually finished
- if stb, ok := s.backend.(*swapTestBackend); ok {
- if stb.cashDone != nil {
- stb.cashDone <- struct{}{}
- }
- }
-// setupContractTest is a helper function for setting up the
-// blockchain wait function for testing
-func setupContractTest() func() {
- // we also need to store the previous cashCheque function in case this is called multiple times
- currentCashCheque := defaultCashCheque
- defaultCashCheque = testCashCheque
- // overwrite only for the duration of the test, so...
- return func() {
- // ...we need to set it back to original when done
- defaultCashCheque = currentCashCheque
- }
// deploy for testing (needs simulated backend commit)
func testDeployWithPrivateKey(ctx context.Context, backend chain.Backend, privateKey *ecdsa.PrivateKey, ownerAddress common.Address, depositAmount *uint256.Uint256) (cswap.Contract, error) {
opts := bind.NewKeyedTransactor(privateKey)
@@ -249,9 +227,6 @@ func testDeployWithPrivateKey(ctx context.Context, backend chain.Backend, privat
return nil, err
- // setup the wait for mined transaction function for testing
- cleanup := setupContractTest()
- defer cleanup()
contract, err := factory.DeploySimpleSwap(opts, ownerAddress, big.NewInt(int64(defaultHarddepositTimeoutDuration)))
if err != nil {
return nil, err
@@ -316,3 +291,45 @@ func (d *dummyMsgRW) ReadMsg() (p2p.Msg, error) {
func (d *dummyMsgRW) WriteMsg(msg p2p.Msg) error {
return nil
+// struct used by the testCashoutResultHandler
+type cashChequeDoneData struct {
+ request *CashoutRequest
+ result *contract.CashChequeResult
+ receipt *types.Receipt
+// testCashoutResultHandler is a CashoutResultHandler which writes to a channel after forwarding the result to swap
+type testCashoutResultHandler struct {
+ swap *Swap
+ cashChequeDone chan cashChequeDoneData
+func newTestCashoutResultHandler(swap *Swap) *testCashoutResultHandler {
+ return &testCashoutResultHandler{
+ swap: swap,
+ cashChequeDone: make(chan cashChequeDoneData),
+ }
+// HandleCashoutResult forwards the result to swap if set and afterwards sends it to its channel
+func (h *testCashoutResultHandler) HandleCashoutResult(request *CashoutRequest, result *contract.CashChequeResult, receipt *types.Receipt) error {
+ if h.swap != nil {
+ if err := h.swap.HandleCashoutResult(request, result, receipt); err != nil {
+ return err
+ }
+ }
+ h.cashChequeDone <- cashChequeDoneData{
+ request: request,
+ result: result,
+ receipt: receipt,
+ }
+ return nil
+// helper function to override the cashoutHandler for a cashout processor of swap instance
+func overrideCashoutResultHandler(swap *Swap) *testCashoutResultHandler {
+ cashoutResultHandler := newTestCashoutResultHandler(swap)
+ swap.cashoutProcessor.cashoutResultHandler = cashoutResultHandler
+ return cashoutResultHandler
diff --git a/swap/protocol_test.go b/swap/protocol_test.go
index ca120ae9d6..56f2a59b99 100644
--- a/swap/protocol_test.go
+++ b/swap/protocol_test.go
@@ -49,7 +49,6 @@ type swapTester struct {
swap *Swap
-// creates a new protocol tester for swap with a deployed chequebook
func newSwapTester(t *testing.T, backend *swapTestBackend, depositAmount *uint256.Uint256) (*swapTester, func(), error) {
swap, clean := newTestSwap(t, ownerKey, backend)
@@ -229,14 +228,11 @@ func TestEmitCheque(t *testing.T) {
creditorSwap := protocolTester.swap
+ cashoutHandler := overrideCashoutResultHandler(creditorSwap)
debitorSwap, cleanDebitorSwap := newTestSwap(t, beneficiaryKey, testBackend)
defer cleanDebitorSwap()
- // setup the wait for mined transaction function for testing
- cleanup := setupContractTest()
- defer cleanup()
log.Debug("deploy to simulated backend")
// cashCheque cashes a cheque when the reward of doing so is twice the transaction costs.
@@ -317,7 +313,7 @@ func TestEmitCheque(t *testing.T) {
// we wait until the cashCheque is actually terminated (ensures proper nonce count)
select {
- case <-testBackend.cashDone:
+ case <-cashoutHandler.cashChequeDone:
log.Debug("cash transaction completed and committed")
case <-time.After(4 * time.Second):
t.Fatalf("Timeout waiting for cash transaction to complete")
@@ -340,9 +336,6 @@ func TestTriggerPaymentThreshold(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
- // setup the wait for mined transaction function for testing
- cleanup := setupContractTest()
- defer cleanup()
if err = protocolTester.testHandshake(
diff --git a/swap/simulations_test.go b/swap/simulations_test.go
index a822b1927f..0ef33e3934 100644
--- a/swap/simulations_test.go
+++ b/swap/simulations_test.go
@@ -47,6 +47,7 @@ import (
+ "github.com/ethersphere/swarm/swap/chain"
mock "github.com/ethersphere/swarm/swap/chain/mock"
@@ -62,6 +63,7 @@ For integration tests, run test cluster deployments with all integration moduele
(blockchains, oracles, etc.)
// swapSimulationParams allows to avoid global variables for the test
type swapSimulationParams struct {
swaps map[int]*Swap
dirs map[int]string
@@ -165,9 +167,10 @@ func newSimServiceMap(params *swapSimulationParams) map[string]simulation.Servic
if err != nil {
return nil, nil, err
+ ts.swap.cashoutProcessor.txScheduler.Start()
cleanup = func() {
- ts.swap.store.Close()
+ ts.swap.Close()
@@ -238,7 +241,6 @@ func newSharedBackendSwaps(t *testing.T, nodeCount int) (*swapSimulationParams,
TestBackend: mock.NewTestBackend(defaultBackend),
factoryAddress: factoryAddress,
tokenAddress: tokenAddress,
- cashDone: make(chan struct{}),
// finally, create all Swap instances for each node, which share the same backend
var owner *Owner
@@ -249,8 +251,11 @@ func newSharedBackendSwaps(t *testing.T, nodeCount int) (*swapSimulationParams,
if err != nil {
+ txqueue := chain.NewTxQueue(stores[i], "chain", &chain.DefaultTxSchedulerBackend{
+ Backend: testBackend,
+ }, owner.privateKey)
swapLogger := newSwapLogger(defParams.LogPath, defParams.LogLevel, defParams.BaseAddrs)
- params.swaps[i] = newSwapInstance(stores[i], owner, testBackend, 10, defParams, factory, swapLogger)
+ params.swaps[i] = newSwapInstance(stores[i], owner, testBackend, 10, defParams, factory, txqueue, swapLogger)
params.backend = testBackend
@@ -272,10 +277,6 @@ func TestMultiChequeSimulation(t *testing.T) {
// cleanup backend
defer params.backend.Close()
- // setup the wait for mined transaction function for testing
- cleanup := setupContractTest()
- defer cleanup()
// initialize the simulation
sim := simulation.NewBzzInProc(newSimServiceMap(params), false)
defer sim.Close()
@@ -307,6 +308,8 @@ func TestMultiChequeSimulation(t *testing.T) {
// get the testService for the creditor
creditorSvc := sim.Service("swap", creditor).(*testService)
+ cashoutHandler := overrideCashoutResultHandler(creditorSvc.swap)
var debLen, credLen, debSwapLen, credSwapLen int
timeout := time.After(10 * time.Second)
for {
@@ -385,7 +388,7 @@ func TestMultiChequeSimulation(t *testing.T) {
balanceAfterMessage := debitorBalance - int64(msgPrice)
if balanceAfterMessage <= -paymentThreshold {
// we need to wait a bit in order to give time for the cheque to be processed
- if err := waitForChequeProcessed(t, params.backend, counter, lastCount, debitorSvc.swap.peers[creditor], expectedPayout); err != nil {
+ if err := waitForChequeProcessed(t, params.backend, counter, lastCount, debitorSvc.swap.peers[creditor], expectedPayout, cashoutHandler.cashChequeDone); err != nil {
expectedPayout += uint64(-balanceAfterMessage)
@@ -621,7 +624,7 @@ func TestBasicSwapSimulation(t *testing.T) {
log.Info("Simulation ended")
-func waitForChequeProcessed(t *testing.T, backend *swapTestBackend, counter metrics.Counter, lastCount int64, p *Peer, expectedLastPayout uint64) error {
+func waitForChequeProcessed(t *testing.T, backend *swapTestBackend, counter metrics.Counter, lastCount int64, p *Peer, expectedLastPayout uint64, cashChequeDone chan cashChequeDoneData) error {
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
@@ -644,7 +647,7 @@ func waitForChequeProcessed(t *testing.T, backend *swapTestBackend, counter metr
- case <-backend.cashDone:
+ case <-cashChequeDone:
diff --git a/swap/swap.go b/swap/swap.go
index e7844e2958..0a4e0c6709 100644
--- a/swap/swap.go
+++ b/swap/swap.go
@@ -29,6 +29,7 @@ import (
+ "github.com/ethereum/go-ethereum/core/types"
@@ -52,6 +53,7 @@ var ErrSkipDeposit = errors.New("swap-deposit-amount non-zero, but swap-skip-dep
// a peer to peer micropayment system
// A node maintains an individual balance with every peer
// Only messages which have a price will be accounted for
+// Swap implements the CashoutResultHandler interface
type Swap struct {
store state.Store // store is needed in order to keep balances and cheques across sessions
peers map[enode.ID]*Peer // map of all swap Peers
@@ -64,6 +66,7 @@ type Swap struct {
chequebookFactory contract.SimpleSwapFactory // the chequebook factory used
honeyPriceOracle HoneyOracle // oracle which resolves the price of honey (in Wei)
cashoutProcessor *CashoutProcessor // processor for cashing out
+ txScheduler chain.TxScheduler // transaction scheduler to use
logger Logger //Swap Logger
@@ -84,8 +87,8 @@ type Params struct {
// newSwapInstance is a swap constructor function without integrity checks
-func newSwapInstance(stateStore state.Store, owner *Owner, backend chain.Backend, chainID uint64, params *Params, chequebookFactory contract.SimpleSwapFactory, logger Logger) *Swap {
- return &Swap{
+func newSwapInstance(stateStore state.Store, owner *Owner, backend chain.Backend, chainID uint64, params *Params, chequebookFactory contract.SimpleSwapFactory, txScheduler chain.TxScheduler, logger Logger) *Swap {
+ s := &Swap{
store: stateStore,
peers: make(map[enode.ID]*Peer),
backend: backend,
@@ -94,9 +97,11 @@ func newSwapInstance(stateStore state.Store, owner *Owner, backend chain.Backend
chequebookFactory: chequebookFactory,
honeyPriceOracle: NewHoneyPriceOracle(),
chainID: chainID,
- cashoutProcessor: newCashoutProcessor(backend, owner.privateKey),
+ txScheduler: txScheduler,
logger: logger,
+ s.cashoutProcessor = newCashoutProcessor(txScheduler, backend, owner.privateKey, s, logger)
+ return s
// New prepares and creates all fields to create a swap instance:
@@ -158,6 +163,9 @@ func New(dbPath string, prvkey *ecdsa.PrivateKey, backendURL string, params *Par
+ chain.NewTxQueue(stateStore, "chain", &chain.DefaultTxSchedulerBackend{
+ Backend: backend,
+ }, owner.privateKey),
// start the chequebook
@@ -165,6 +173,8 @@ func New(dbPath string, prvkey *ecdsa.PrivateKey, backendURL string, params *Par
return nil, err
+ swap.txScheduler.Start()
// deposit money in the chequebook if desired
if !skipDepositFlag {
// prompt the user for a depositAmount
@@ -342,8 +352,6 @@ func (s *Swap) handleMsg(p *Peer) func(ctx context.Context, msg interface{}) err
-var defaultCashCheque = cashCheque
// handleEmitChequeMsg should be handled by the creditor when it receives
// a cheque from a debitor
func (s *Swap) handleEmitChequeMsg(ctx context.Context, p *Peer, msg *EmitChequeMsg) error {
@@ -386,21 +394,10 @@ func (s *Swap) handleEmitChequeMsg(ctx context.Context, p *Peer, msg *EmitCheque
return protocols.Break(err)
- expectedPayout, transactionCosts, err := s.cashoutProcessor.estimatePayout(context.TODO(), cheque)
- if err != nil {
- return protocols.Break(err)
- }
- costsMultiplier := uint256.FromUint64(2)
- costThreshold, err := uint256.New().Mul(transactionCosts, costsMultiplier)
- if err != nil {
- return err
- }
- // do a payout transaction if we get 2 times the gas costs
- if expectedPayout.Cmp(costThreshold) == 1 {
- go defaultCashCheque(s, cheque)
- }
+ s.cashoutProcessor.submitCheque(ctx, &CashoutRequest{
+ Cheque: *cheque,
+ Destination: s.GetParams().ContractAddress,
+ })
return nil
@@ -440,21 +437,6 @@ func (s *Swap) handleConfirmChequeMsg(ctx context.Context, p *Peer, msg *Confirm
return nil
-// cashCheque should be called async as it blocks until the transaction(s) are mined
-// The function cashes the cheque by sending it to the blockchain
-func cashCheque(s *Swap, cheque *Cheque) {
- err := s.cashoutProcessor.cashCheque(context.Background(), &CashoutRequest{
- Cheque: *cheque,
- Destination: s.GetParams().ContractAddress,
- Logger: s.logger,
- })
- if err != nil {
- metrics.GetOrRegisterCounter("swap/cheques/cashed/errors", nil).Inc(1)
- s.logger.Error(CashChequeAction, "cashing cheque:", err)
- }
// processAndVerifyCheque verifies the cheque and compares it with the last received cheque
// if the cheque is valid it will also be saved as the new last cheque
// the caller is expected to hold p.lock
@@ -565,6 +547,7 @@ func (s *Swap) saveBalance(p enode.ID, balance int64) error {
// Close cleans up swap
func (s *Swap) Close() error {
+ s.txScheduler.Stop()
return s.store.Close()
@@ -689,3 +672,16 @@ func (s *Swap) loadChequebook() (common.Address, error) {
func (s *Swap) saveChequebook(chequebook common.Address) error {
return s.store.Put(connectedChequebookKey, chequebook)
+// HandleCashoutResult is the handler function called by the CashoutProcessor in case of a successful cashing transaction
+func (s *Swap) HandleCashoutResult(request *CashoutRequest, result *contract.CashChequeResult, receipt *types.Receipt) error {
+ metrics.GetOrRegisterCounter("swap/cheques/cashed/honey", nil).Inc(result.TotalPayout.Int64())
+ if result.Bounced {
+ metrics.GetOrRegisterCounter("swap/cheques/cashed/bounced", nil).Inc(1)
+ s.logger.Warn(CashChequeAction, "cheque bounced", "tx", receipt.TxHash)
+ }
+ s.logger.Info(CashChequeAction, "cheque cashed", "cheque", &request.Cheque)
+ return nil
diff --git a/swap/swap_test.go b/swap/swap_test.go
index 42f04caebb..9ac45bbe61 100644
--- a/swap/swap_test.go
+++ b/swap/swap_test.go
@@ -602,6 +602,7 @@ func TestResetBalance(t *testing.T) {
defer testBackend.Close()
// create both test swap accounts
creditorSwap, clean1 := newTestSwap(t, beneficiaryKey, testBackend)
+ cashoutHandler := overrideCashoutResultHandler(creditorSwap)
debitorSwap, clean2 := newTestSwap(t, ownerKey, testBackend)
defer clean1()
defer clean2()
@@ -640,10 +641,6 @@ func TestResetBalance(t *testing.T) {
- // setup the wait for mined transaction function for testing
- cleanup := setupContractTest()
- defer cleanup()
// now simulate sending the cheque to the creditor from the debitor
if err = creditor.sendCheque(); err != nil {
@@ -662,20 +659,18 @@ func TestResetBalance(t *testing.T) {
if cheque == nil {
t.Fatal("expected to find a cheque, but it was empty")
- // ...create a message...
- msg := &EmitChequeMsg{
- Cheque: cheque,
- }
// ...and trigger message handling on the receiver side (creditor)
// remember that debitor is the model of the remote node for the creditor...
- err = creditorSwap.handleEmitChequeMsg(ctx, debitor, msg)
+ err = creditorSwap.handleEmitChequeMsg(ctx, debitor, &EmitChequeMsg{
+ Cheque: cheque,
+ })
if err != nil {
// ...on which we wait until the cashCheque is actually terminated (ensures proper nonce count)
select {
- case <-testBackend.cashDone:
+ case <-cashoutHandler.cashChequeDone:
creditorSwap.logger.Debug(CashChequeAction, "cash transaction completed and committed")
case <-time.After(4 * time.Second):
t.Fatalf("Timeout waiting for cash transactions to complete")
@@ -691,8 +686,6 @@ func TestResetBalance(t *testing.T) {
func TestDebtCheques(t *testing.T) {
testBackend := newTestBackend(t)
defer testBackend.Close()
- cleanup := setupContractTest()
- defer cleanup()
creditorSwap, cleanup := newTestSwap(t, beneficiaryKey, testBackend)
defer cleanup()
@@ -744,14 +737,6 @@ func TestDebtCheques(t *testing.T) {
if err != nil {
- // ...on which we wait until the cashCheque is actually terminated (ensures proper nonce count)
- select {
- case <-testBackend.cashDone:
- log.Debug("cash transaction completed and committed")
- case <-time.After(4 * time.Second):
- t.Fatalf("Timeout waiting for cash transactions to complete")
- }
// generate bookings based on parameters, apply them to a Swap struct and verify the result
@@ -1302,10 +1287,6 @@ func TestSwapLogToFile(t *testing.T) {
- // setup the wait for mined transaction function for testing
- cleanup := setupContractTest()
- defer cleanup()
// now simulate sending the cheque to the creditor from the debitor
if err = creditor.sendCheque(); err != nil {
@@ -1433,8 +1414,6 @@ func TestAvailableBalance(t *testing.T) {
defer testBackend.Close()
swap, clean := newTestSwap(t, ownerKey, testBackend)
defer clean()
- cleanup := setupContractTest()
- defer cleanup()
depositAmount := uint256.FromUint64(9000 * RetrieveRequestPrice)