Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BCI-2564] - TXM feature to drop stale unstarted transactions #13874

Closed
wants to merge 12 commits into from
5 changes: 5 additions & 0 deletions .changeset/purple-seas-destroy.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

TXM feature to drop stale unstarted txes. #added
34 changes: 34 additions & 0 deletions core/chains/evm/txmgr/evm_tx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type TxStoreWebApi interface {
FindTxAttempt(ctx context.Context, hash common.Hash) (*TxAttempt, error)
FindTxWithAttempts(ctx context.Context, etxID int64) (etx Tx, err error)
FindTxsByStateAndFromAddresses(ctx context.Context, addresses []common.Address, state txmgrtypes.TxState, chainID *big.Int) (txs []*Tx, err error)
DeleteUnstartedTxes(ctx context.Context, chainID *big.Int, subject uuid.UUID) error
}

type TestEvmTxStore interface {
Expand Down Expand Up @@ -1872,6 +1873,39 @@ id < (
return
}

func (o *evmTxStore) DeleteUnstartedTxes(ctx context.Context, chainID *big.Int, subject uuid.UUID) error {
var cancel context.CancelFunc
ctx, cancel = o.stopCh.Ctx(ctx)
defer cancel()

err := o.Transact(ctx, false, func(orm *evmTxStore) error {
var err error
zeroUUID := uuid.UUID{}
if subject == zeroUUID {
// without subject deletes all unstarted txes
_, err = orm.q.ExecContext(ctx, `
DELETE FROM evm.txes
WHERE state = 'unstarted' AND evm_chain_id = $1`, chainID.String())
} else {
// with subject deletes only those subject unstarted txes
_, err = orm.q.ExecContext(ctx, `
DELETE FROM evm.txes
WHERE state = 'unstarted' AND evm_chain_id = $1 AND subject = $2`, chainID.String(), subject)
}

if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil
}
return fmt.Errorf("DeleteUnstartedTxes failed: %w", err)
}

return nil
})

return err
}

func (o *evmTxStore) ReapTxHistory(ctx context.Context, minBlockNumberToKeep int64, timeThreshold time.Time, chainID *big.Int) error {
var cancel context.CancelFunc
ctx, cancel = o.stopCh.Ctx(ctx)
Expand Down
94 changes: 93 additions & 1 deletion core/chains/evm/txmgr/evm_tx_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1843,6 +1843,98 @@ func TestORM_PruneUnstartedTxQueue(t *testing.T) {
})
}

func TestORM_DeleteUnstartedTxes(t *testing.T) {
t.Parallel()

db := pgtest.NewSqlxDB(t)
txStore := txmgr.NewTxStore(db, logger.Test(t))
ethKeyStore := cltest.NewKeyStore(t, db).Eth()
evmtest.NewEthClientMockWithDefaultChain(t)
_, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore)

toAddress := testutils.NewAddress()
gasLimit := uint64(1000)
payload := []byte{1, 2, 3}

txReq := txmgr.TxRequest{
FromAddress: fromAddress,
ToAddress: toAddress,
EncodedPayload: payload,
FeeLimit: gasLimit,
Meta: nil,
Strategy: nil,
SignalCallback: true,
}

strategyWithoutSubject := txmgrcommon.NewDropOldestStrategy(uuid.UUID{}, uint32(5))
randomSubject := uuid.New()
strategyWithSubject := txmgrcommon.NewDropOldestStrategy(randomSubject, uint32(5))

t.Run("no transactions to delete when calling DeleteUnstartedTxes: handled gracefully", func(t *testing.T) {
err := txStore.DeleteUnstartedTxes(tests.Context(t), testutils.FixtureChainID, uuid.UUID{})
assert.NoError(t, err)
})

t.Run("no subject provided to DeleteUnstartedTxes: deletes all unstarted txs.", func(t *testing.T) {
txReq.Strategy = strategyWithoutSubject

// Create and assert 2 unstarted transactions without subject
_, err := txStore.CreateTransaction(tests.Context(t), txReq, testutils.FixtureChainID)
assert.NoError(t, err)
_, err = txStore.CreateTransaction(tests.Context(t), txReq, testutils.FixtureChainID)
assert.NoError(t, err)
AssertCountPerSubject(t, txStore, 2, uuid.UUID{})

// Create and assert 2 unstarted transactions with subject
txReq.Strategy = strategyWithSubject
_, err = txStore.CreateTransaction(tests.Context(t), txReq, testutils.FixtureChainID)
assert.NoError(t, err)
_, err = txStore.CreateTransaction(tests.Context(t), txReq, testutils.FixtureChainID)
assert.NoError(t, err)
AssertCountPerSubject(t, txStore, 2, randomSubject)

// Deletes all unstarted transactions
err = txStore.DeleteUnstartedTxes(tests.Context(t), testutils.FixtureChainID, uuid.UUID{})
assert.NoError(t, err)

// Assert all unstarted transactions have been deleted
AssertCountPerSubject(t, txStore, 0, uuid.UUID{})
})

t.Run("subject provided to DeleteUnstartedTxes: deletes only those subject unstarted txs.", func(t *testing.T) {
txReq.Strategy = strategyWithoutSubject

// Create and assert 2 unstarted transactions without subject
_, err := txStore.CreateTransaction(tests.Context(t), txReq, testutils.FixtureChainID)
assert.NoError(t, err)
_, err = txStore.CreateTransaction(tests.Context(t), txReq, testutils.FixtureChainID)
assert.NoError(t, err)
AssertCountPerSubject(t, txStore, 2, uuid.UUID{})

// Create and assert 2 unstarted transactions with subject
txReq.Strategy = strategyWithSubject
_, err = txStore.CreateTransaction(tests.Context(t), txReq, testutils.FixtureChainID)
assert.NoError(t, err)
_, err = txStore.CreateTransaction(tests.Context(t), txReq, testutils.FixtureChainID)
assert.NoError(t, err)
AssertCountPerSubject(t, txStore, 2, randomSubject)

// Deletes only unstarted transactions whose subject is randomSubject
err = txStore.DeleteUnstartedTxes(tests.Context(t), testutils.FixtureChainID, randomSubject)
assert.NoError(t, err)

// Get txCount of unstarted transactions with and without subject.
txCountWithoutSubject, err := txStore.CountTxesByStateAndSubject(tests.Context(t), txmgrcommon.TxUnstarted, uuid.UUID{})
assert.NoError(t, err)
txCountWithSubject, err := txStore.CountTxesByStateAndSubject(tests.Context(t), txmgrcommon.TxUnstarted, randomSubject)
assert.NoError(t, err)

// Assert that the only transactions that have been deleted are those whose subject is randomSubject
require.Equal(t, txCountWithoutSubject, 2)
require.Equal(t, txCountWithSubject, 0)
})
}

func TestORM_FindTxesWithAttemptsAndReceiptsByIdsAndState(t *testing.T) {
t.Parallel()

Expand All @@ -1867,7 +1959,7 @@ func TestORM_FindTxesWithAttemptsAndReceiptsByIdsAndState(t *testing.T) {

func AssertCountPerSubject(t *testing.T, txStore txmgr.TestEvmTxStore, expected int64, subject uuid.UUID) {
t.Helper()
count, err := txStore.CountTxesByStateAndSubject(tests.Context(t), "unstarted", subject)
count, err := txStore.CountTxesByStateAndSubject(tests.Context(t), txmgrcommon.TxUnstarted, subject)
require.NoError(t, err)
require.Equal(t, int(expected), count)
}
48 changes: 48 additions & 0 deletions core/chains/evm/txmgr/mocks/evm_tx_store.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

57 changes: 42 additions & 15 deletions core/web/eth_keys_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"strconv"
"strings"

"github.com/google/uuid"

commonassets "github.com/smartcontractkit/chainlink-common/pkg/assets"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets"
ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
Expand All @@ -25,7 +27,6 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/gin-gonic/gin"
"github.com/pkg/errors"
"go.uber.org/multierr"
)

// ETHKeysController manages account keys
Expand Down Expand Up @@ -260,7 +261,6 @@ func (ekc *ETHKeysController) Export(c *gin.Context) {

// Chain updates settings for a given chain for the key
func (ekc *ETHKeysController) Chain(c *gin.Context) {
var err error
kst := ekc.app.GetKeyStore().Eth()
defer ekc.app.GetLogger().ErrorIfFn(c.Request.Body.Close, "Error closing Import request body")

Expand All @@ -277,34 +277,61 @@ func (ekc *ETHKeysController) Chain(c *gin.Context) {
return
}

abandon := false
// Parse abandon flag
if abandonStr := c.Query("abandon"); abandonStr != "" {
abandon, err = strconv.ParseBool(abandonStr)
abandon, err := strconv.ParseBool(abandonStr)
if err != nil {
jsonAPIError(c, http.StatusBadRequest, errors.Wrapf(err, "invalid value for abandon: expected boolean, got: %s", abandonStr))
return
}

// If flag is set, marks as error='abandoned' and state='fatal_error'
// txes that were in 'unconfirmed', 'in_progress', 'unstarted' states for given address.
if abandon {
err = chain.TxManager().Reset(address, abandon)
if err != nil {
if strings.Contains(err.Error(), "key state not found with address") {
jsonAPIError(c, http.StatusNotFound, err)
return
}
jsonAPIError(c, http.StatusInternalServerError, err)
return
}
}
}

// Reset the chain
if abandon {
var resetErr error
err = chain.TxManager().Reset(address, abandon)
err = multierr.Combine(err, resetErr)
// Parse abandonUnstarted flag
if abandonUnstartedStr := c.Query("abandonUnstarted"); abandonUnstartedStr != "" {
abandonUnstarted, err := strconv.ParseBool(abandonUnstartedStr)
if err != nil {
if strings.Contains(err.Error(), "key state not found with address") {
jsonAPIError(c, http.StatusNotFound, err)
jsonAPIError(c, http.StatusBadRequest, errors.Wrapf(err, "invalid value for abandonUnstarted: expected boolean, got: %s", abandonUnstartedStr))
return
}

// If flag set, it deletes txes with 'unstarted' state.
if abandonUnstarted {
// Parse optional subject.
subject := uuid.Nil
if subjectStr := c.Query("subject"); subjectStr != "" {
subject, err = uuid.Parse(subjectStr)
if err != nil {
jsonAPIError(c, http.StatusBadRequest, errors.Wrapf(err, "invalid value for subject: expected uuid, got: %s", subject))
return
}
}

// If a subject is given, it will only delete 'unstarted' txes for given subject.
// If no subject is provided, it will delete all 'unstarted' txes.
if err = ekc.app.TxmStorageService().DeleteUnstartedTxes(c, chain.ID(), subject); err != nil {
jsonAPIError(c, http.StatusInternalServerError, err)
return
}
jsonAPIError(c, http.StatusInternalServerError, err)
return
}
}

enabledStr := c.Query("enabled")
if enabledStr != "" {
var enabled bool
enabled, err = strconv.ParseBool(enabledStr)
enabled, err := strconv.ParseBool(enabledStr)
if err != nil {
jsonAPIError(c, http.StatusBadRequest, errors.Wrap(err, "enabled must be bool"))
return
Expand Down
Loading
Loading