Skip to content
This repository has been archived by the owner on Aug 2, 2021. It is now read-only.

swap, swap/chain: add transaction queue (txqueue part 1) #2089

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 80 additions & 56 deletions swap/cashout.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/metrics"
contract "github.com/ethersphere/swarm/contracts/swap"
"github.com/ethersphere/swarm/swap/chain"
Expand All @@ -30,10 +31,9 @@ import (
// CashChequeBeneficiaryTransactionCost is the expected gas cost of a CashChequeBeneficiary transaction
const CashChequeBeneficiaryTransactionCost = 50000

// CashoutProcessor holds all relevant fields needed for processing cashouts
type CashoutProcessor struct {
backend chain.Backend // ethereum backend to use
privateKey *ecdsa.PrivateKey // private key to use
var CashoutRequestTypeID = chain.TxRequestTypeID{
Handler: "cashout",
RequestType: "CashoutRequest",
}

// CashoutRequest represents a request for a cashout operation
Expand All @@ -42,42 +42,94 @@ type CashoutRequest struct {
Destination common.Address // destination for the payout
}

// ActiveCashout stores the necessary information for a cashout in progess
type ActiveCashout struct {
Request CashoutRequest // the request that caused this cashout
TransactionHash common.Hash // the hash of the current transaction for this request
// CashoutProcessor holds all relevant fields needed for processing cashouts
type CashoutProcessor struct {
backend chain.Backend // ethereum backend to use
txScheduler chain.TxScheduler // transaction queue to use
cashoutResultHandler CashoutResultHandler
cashoutDone chan *CashoutRequest
}

type CashoutResultHandler interface {
HandleCashoutResult(request *CashoutRequest, result *contract.CashChequeResult, receipt *types.Receipt) error
}

// newCashoutProcessor creates a new instance of CashoutProcessor
func newCashoutProcessor(backend chain.Backend, privateKey *ecdsa.PrivateKey) *CashoutProcessor {
return &CashoutProcessor{
backend: backend,
privateKey: privateKey,
func newCashoutProcessor(txScheduler chain.TxScheduler, backend chain.Backend, privateKey *ecdsa.PrivateKey, cashoutResultHandler CashoutResultHandler) *CashoutProcessor {
c := &CashoutProcessor{
backend: backend,
txScheduler: txScheduler,
cashoutResultHandler: cashoutResultHandler,
}
}

// cashCheque tries to cash the cheque specified in the request
// after the transaction is sent it waits on its success
func (c *CashoutProcessor) cashCheque(ctx context.Context, request *CashoutRequest) error {
cheque := request.Cheque
opts := bind.NewKeyedTransactor(c.privateKey)
opts.Context = ctx
txScheduler.SetHandlers(CashoutRequestTypeID, &chain.TxRequestHandlers{
Send: func(id uint64, backend chain.Backend, opts *bind.TransactOpts) (common.Hash, error) {
var request CashoutRequest
if err := c.txScheduler.GetRequest(id, &request); err != nil {
return common.Hash{}, err
}

cheque := request.Cheque

otherSwap, err := contract.InstanceAt(cheque.Contract, backend)
if err != nil {
return common.Hash{}, err
}

tx, err := otherSwap.CashChequeBeneficiaryStart(opts, request.Destination, cheque.CumulativePayout, cheque.Signature)
if err != nil {
return common.Hash{}, err
}
return tx.Hash(), nil
},
NotifyReceipt: func(ctx context.Context, id uint64, notification *chain.TxReceiptNotification) error {
var request *CashoutRequest
err := c.txScheduler.GetRequest(id, &request)
if err != nil {
return err
}

otherSwap, err := contract.InstanceAt(request.Cheque.Contract, c.backend)
if err != nil {
return err
}

receipt := &notification.Receipt
if receipt.Status == 0 {
swapLog.Error("cheque cashing transaction reverted", "tx", receipt.TxHash)
return nil
}

result := otherSwap.CashChequeBeneficiaryResult(receipt)
return c.cashoutResultHandler.HandleCashoutResult(request, result, receipt)
},
})
return c
}

otherSwap, err := contract.InstanceAt(cheque.Contract, c.backend)
func (c *CashoutProcessor) submitCheque(ctx context.Context, request *CashoutRequest) {
expectedPayout, transactionCosts, err := c.estimatePayout(ctx, &request.Cheque)
if err != nil {
return err
swapLog.Error("could not estimate payout", "error", err)
return
}

tx, err := otherSwap.CashChequeBeneficiaryStart(opts, request.Destination, cheque.CumulativePayout, cheque.Signature)
costsMultiplier := uint256.FromUint64(2)
costThreshold, err := uint256.New().Mul(transactionCosts, costsMultiplier)
if err != nil {
return err
swapLog.Error("overflow in transaction fee", "error", err)
return
}

// this blocks until the cashout has been successfully processed
return c.waitForAndProcessActiveCashout(&ActiveCashout{
Request: *request,
TransactionHash: tx.Hash(),
})
// do a payout transaction if we get 2 times the gas costs
if expectedPayout.Cmp(costThreshold) == 1 {
swapLog.Info("queueing cashout", "cheque", &request.Cheque)
_, err := c.txScheduler.ScheduleRequest(CashoutRequestTypeID, request)
if err != nil {
metrics.GetOrRegisterCounter("swap.cheques.cashed.errors", nil).Inc(1)
swapLog.Error("cashing cheque:", "error", err)
}
}
}

// estimatePayout estimates the payout for a given cheque as well as the transaction cost
Expand Down Expand Up @@ -123,31 +175,3 @@ func (c *CashoutProcessor) estimatePayout(ctx context.Context, cheque *Cheque) (

return expectedPayout, transactionCosts, nil
}

// waitForAndProcessActiveCashout waits for activeCashout to complete
func (c *CashoutProcessor) waitForAndProcessActiveCashout(activeCashout *ActiveCashout) error {
ctx, cancel := context.WithTimeout(context.Background(), DefaultTransactionTimeout)
defer cancel()

receipt, err := chain.WaitMined(ctx, c.backend, activeCashout.TransactionHash)
if err != nil {
return err
}

otherSwap, err := contract.InstanceAt(activeCashout.Request.Cheque.Contract, c.backend)
if err != nil {
return err
}

result := otherSwap.CashChequeBeneficiaryResult(receipt)

metrics.GetOrRegisterCounter("swap.cheques.cashed.honey", nil).Inc(result.TotalPayout.Int64())

if result.Bounced {
metrics.GetOrRegisterCounter("swap.cheques.cashed.bounced", nil).Inc(1)
swapLog.Warn("cheque bounced", "tx", receipt.TxHash)
}

swapLog.Info("cheque cashed", "honey", activeCashout.Request.Cheque.Honey)
return nil
}
42 changes: 29 additions & 13 deletions swap/cashout_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,11 @@ package swap
import (
"context"
"testing"
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/log"
"github.com/ethersphere/swarm/state"
"github.com/ethersphere/swarm/swap/chain"
"github.com/ethersphere/swarm/uint256"
)
Expand All @@ -33,8 +35,7 @@ import (
// afterwards it attempts to cash-in a bouncing cheque
func TestContractIntegration(t *testing.T) {
backend := newTestBackend(t)
reset := setupContractTest()
defer reset()
defer backend.Close()

payout := uint256.FromUint64(42)
chequebook, err := testDeployWithPrivateKey(context.Background(), backend, ownerKey, ownerAddress, payout)
Expand Down Expand Up @@ -116,11 +117,18 @@ func TestContractIntegration(t *testing.T) {
// TestCashCheque creates a valid cheque and feeds it to cashoutProcessor.cashCheque
func TestCashCheque(t *testing.T) {
backend := newTestBackend(t)
reset := setupContractTest()
defer reset()
defer backend.Close()

cashoutProcessor := newCashoutProcessor(backend, ownerKey)
payout := uint256.FromUint64(42)
store := state.NewInmemoryStore()
defer store.Close()

transactionQueue := chain.NewTxQueue(store, "queue", backend, ownerKey)
transactionQueue.Start()
defer transactionQueue.Stop()

cashoutHandler := newTestCashoutResultHandler(nil)
cashoutProcessor := newCashoutProcessor(transactionQueue, backend, ownerKey, cashoutHandler)
payout := uint256.FromUint64(CashChequeBeneficiaryTransactionCost*2 + 1)

chequebook, err := testDeployWithPrivateKey(context.Background(), backend, ownerKey, ownerAddress, payout)
if err != nil {
Expand All @@ -132,12 +140,14 @@ func TestCashCheque(t *testing.T) {
t.Fatal(err)
}

err = cashoutProcessor.cashCheque(context.Background(), &CashoutRequest{
cashoutProcessor.submitCheque(context.Background(), &CashoutRequest{
Cheque: *testCheque,
Destination: ownerAddress,
})
if err != nil {
t.Fatal(err)

select {
case <-cashoutHandler.cashChequeDone:
case <-time.After(5 * time.Second):
}

paidOut, err := chequebook.PaidOut(nil, ownerAddress)
Expand All @@ -154,12 +164,18 @@ func TestCashCheque(t *testing.T) {
// TestEstimatePayout creates a valid cheque and feeds it to cashoutProcessor.estimatePayout
func TestEstimatePayout(t *testing.T) {
backend := newTestBackend(t)
reset := setupContractTest()
defer reset()
defer backend.Close()

cashoutProcessor := newCashoutProcessor(backend, ownerKey)
payout := uint256.FromUint64(42)
store := state.NewInmemoryStore()
defer store.Close()

transactionQueue := chain.NewTxQueue(store, "queue", backend, ownerKey)
transactionQueue.Start()
defer transactionQueue.Stop()

cashoutProcessor := newCashoutProcessor(transactionQueue, backend, ownerKey, &testCashoutResultHandler{})

payout := uint256.FromUint64(42)
chequebook, err := testDeployWithPrivateKey(context.Background(), backend, ownerKey, ownerAddress, payout)
if err != nil {
t.Fatal(err)
Expand Down
11 changes: 1 addition & 10 deletions swap/chain/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package chain

import (
"context"
"errors"
"time"

"github.com/ethereum/go-ethereum/log"
Expand All @@ -12,11 +11,6 @@ import (
"github.com/ethereum/go-ethereum/core/types"
)

var (
// ErrTransactionReverted is given when the transaction that cashes a cheque is reverted
ErrTransactionReverted = errors.New("Transaction reverted")
)

// Backend is the minimum amount of functionality required by the underlying ethereum backend
type Backend interface {
bind.ContractBackend
Expand All @@ -30,12 +24,9 @@ func WaitMined(ctx context.Context, b Backend, hash common.Hash) (*types.Receipt
for {
receipt, err := b.TransactionReceipt(ctx, hash)
if err != nil {
log.Error("receipt retrieval failed", "err", err)
log.Trace("receipt retrieval failed", "err", err)
}
if receipt != nil {
if receipt.Status != types.ReceiptStatusSuccessful {
return nil, ErrTransactionReverted
}
return receipt, nil
}

Expand Down
7 changes: 7 additions & 0 deletions swap/chain/common_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package chain

import "github.com/ethersphere/swarm/testutil"

func init() {
testutil.Init()
}
5 changes: 5 additions & 0 deletions swap/chain/mock/testbackend.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@ func (b *TestBackend) SendTransaction(ctx context.Context, tx *types.Transaction
return err
}

// SendTransactionNoCommit provides access to the underlying SendTransaction function without the auto commit
func (b *TestBackend) SendTransactionNoCommit(ctx context.Context, tx *types.Transaction) (err error) {
return b.SimulatedBackend.SendTransaction(ctx, tx)
}

// Close overrides the Close function of the underlying SimulatedBackend so that it does nothing
// This allows the same SimulatedBackend backend to be reused across tests
// This is necessary due to some memory leakage issues with the used version of the SimulatedBackend
Expand Down
Loading