Skip to content

Commit

Permalink
feat: add estimated minimum and consumption fee
Browse files Browse the repository at this point in the history
  • Loading branch information
Ja7ad committed Oct 31, 2024
1 parent 7014f41 commit d5bc6f6
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 40 deletions.
102 changes: 76 additions & 26 deletions txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ type txPool struct {
// The transaction pool also maintains a consumption map for tracking byte usage per address.
func NewTxPool(conf *Config, storeReader store.Reader, broadcastCh chan message.Message) TxPool {
pools := make(map[payload.Type]pool)
pools[payload.TypeTransfer] = newPool(conf.transferPoolSize(), conf.minFee())
pools[payload.TypeBond] = newPool(conf.bondPoolSize(), conf.minFee())
pools[payload.TypeTransfer] = newPool(conf.transferPoolSize(), conf.fixedFee())
pools[payload.TypeBond] = newPool(conf.bondPoolSize(), conf.fixedFee())
pools[payload.TypeUnbond] = newPool(conf.unbondPoolSize(), 0)
pools[payload.TypeWithdraw] = newPool(conf.withdrawPoolSize(), conf.minFee())
pools[payload.TypeWithdraw] = newPool(conf.withdrawPoolSize(), conf.fixedFee())
pools[payload.TypeSortition] = newPool(conf.sortitionPoolSize(), 0)

pool := &txPool{
Expand Down Expand Up @@ -112,11 +112,13 @@ func (p *txPool) appendTx(trx *tx.Tx) error {
}

if !trx.IsFreeTx() {
if trx.Fee() < payloadPool.estimatedFee() {
p.logger.Warn("low fee transaction", "tx", trx, "minFee", payloadPool.estimatedFee())
minFee := p.estimatedMinimumFee(trx)

if trx.Fee() < minFee {
p.logger.Warn("low fee transaction", "txs", trx, "minFee", minFee)

return AppendError{
Err: fmt.Errorf("low fee transaction, expected to be more than %s", payloadPool.estimatedFee()),
Err: fmt.Errorf("low fee transaction, expected to be more than %s", minFee),
}
}
}
Expand All @@ -128,32 +130,44 @@ func (p *txPool) appendTx(trx *tx.Tx) error {
}

payloadPool.list.PushBack(trx.ID(), trx)
p.logger.Debug("transaction appended into pool", "tx", trx)
p.logger.Debug("transaction appended into pool", "txs", trx)

return nil
}

func (p *txPool) checkTx(trx *tx.Tx) error {
if err := execution.CheckAndExecute(trx, p.sbx, false); err != nil {
p.logger.Debug("invalid transaction", "tx", trx, "error", err)
p.logger.Debug("invalid transaction", "txs", trx, "error", err)

return err
}

return nil
}

func (p *txPool) HandleCommittedBlock(blk *block.Block) error {
func (p *txPool) EstimatedFee(_ amount.Amount, payloadType payload.Type) amount.Amount {
selectedPool, ok := p.pools[payloadType]
if !ok {
return 0
}

return selectedPool.estimatedFee()
}

func (p *txPool) HandleCommittedBlock(blk *block.Block) {
p.lk.Lock()
defer p.lk.Unlock()

for _, trx := range blk.Transactions() {
p.removeTx(trx.ID())

p.handleIncreaseConsumption(trx)
}

return p.handleDecreaseConsumption(blk.Height())
if p.config.CalculateConsumption() {
for _, trx := range blk.Transactions() {
p.handleIncreaseConsumption(trx)
}
p.handleDecreaseConsumption(blk.Height())
}
}

func (p *txPool) handleIncreaseConsumption(trx *tx.Tx) {
Expand All @@ -164,26 +178,30 @@ func (p *txPool) handleIncreaseConsumption(trx *tx.Tx) {
}
}

func (p *txPool) handleDecreaseConsumption(height uint32) error {
func (p *txPool) handleDecreaseConsumption(height uint32) {
// If height is less than or equal to ConsumptionWindow, nothing to do.
if height <= p.config.ConsumptionWindow {
return nil
return
}

// Calculate the block height that has passed out of the consumption window.
windowedBlockHeight := height - p.config.ConsumptionWindow
committedBlock, err := p.store.Block(windowedBlockHeight)
if err != nil {
return err
p.logger.Error("failed to read block", "height", windowedBlockHeight, "err", err)

return
}

blk, err := committedBlock.ToBlock()
if err != nil {
return err
p.logger.Error("failed to parse block", "height", windowedBlockHeight, "err", err)

return
}

for _, trx := range blk.Transactions() {
if trx.IsTransferTx() || trx.IsBondTx() || trx.IsWithdrawTx() {
if !trx.IsFreeTx() {
signer := trx.Payload().Signer()
if consumption, ok := p.consumptionMap[signer]; ok {
// Decrease the consumption by the size of the transaction
Expand All @@ -199,8 +217,6 @@ func (p *txPool) handleDecreaseConsumption(height uint32) error {
}
}
}

return nil
}

func (p *txPool) removeTx(txID tx.ID) {
Expand Down Expand Up @@ -291,16 +307,31 @@ func (p *txPool) Size() int {
return size
}

func (p *txPool) EstimatedFee(_ amount.Amount, payloadType payload.Type) amount.Amount {
p.lk.RLock()
defer p.lk.RUnlock()
func (p *txPool) estimatedMinimumFee(trx *tx.Tx) amount.Amount {
return p.fixedFee() + p.consumptionalFee(trx)
}

payloadPool, ok := p.pools[payloadType]
if !ok {
return 0
func (p *txPool) fixedFee() amount.Amount {
return p.config.fixedFee()
}

// consumptionalFee calculates based on the amount of data each address consumes daily.
func (p *txPool) consumptionalFee(trx *tx.Tx) amount.Amount {
var consumption uint32
signer := trx.Payload().Signer()
txSize := uint32(trx.SerializeSize())

if !p.store.HasPublicKey(signer) {
consumption = p.config.Fee.DailyLimit
} else {
consumption = p.consumptionMap[signer] + txSize + p.getPendingConsumption(signer)
}

return payloadPool.estimatedFee()
coefficient := consumption / p.config.Fee.DailyLimit

consumptionalFee, _ := amount.NewAmount(float64(coefficient) * float64(consumption) * p.config.Fee.UnitPrice)

return consumptionalFee
}

func (p *txPool) AllPendingTxs() []*tx.Tx {
Expand All @@ -322,6 +353,25 @@ func (p *txPool) AllPendingTxs() []*tx.Tx {
return txs
}

func (p *txPool) getPendingConsumption(signer crypto.Address) uint32 {
totalSize := uint32(0)

// TODO: big o is "o(n * m)"
var next *linkedlist.Element[linkedmap.Pair[tx.ID, *tx.Tx]]
for ptype, pool := range p.pools {
if ptype == payload.TypeTransfer || ptype == payload.TypeBond || ptype == payload.TypeWithdraw {
for e := pool.list.HeadNode(); e != nil; e = next {
next = e.Next
if e.Data.Value.Payload().Signer() == signer {
totalSize += uint32(e.Data.Value.SerializeSize())
}
}
}
}

return totalSize
}

func (p *txPool) String() string {
return fmt.Sprintf("{💸 %v 🔐 %v 🔓 %v 🎯 %v 🧾 %v}",
p.pools[payload.TypeTransfer].list.Size(),
Expand Down
97 changes: 83 additions & 14 deletions txpool/txpool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/pactus-project/pactus/store"
"github.com/pactus-project/pactus/sync/bundle/message"
"github.com/pactus-project/pactus/types/account"
"github.com/pactus-project/pactus/types/amount"
"github.com/pactus-project/pactus/types/tx"
"github.com/pactus-project/pactus/types/validator"
"github.com/pactus-project/pactus/util/logger"
Expand All @@ -33,21 +34,24 @@ func testConfig() *Config {
MaxSize: 10,
ConsumptionWindow: 3,
Fee: &FeeConfig{
FixedFee: 0.000001,
FixedFee: 0.01,
DailyLimit: 280,
UnitPrice: 0.0,
UnitPrice: 0.00005,
},
}
}

func setup(t *testing.T) *testData {
func setup(t *testing.T, cfg *Config) *testData {
t.Helper()

ts := testsuite.NewTestSuite(t)

broadcastCh := make(chan message.Message, 10)
sbx := sandbox.MockingSandbox(ts)
config := testConfig()
if cfg != nil {
config = cfg
}
mockStore := store.MockingStore(ts)
poolInt := NewTxPool(config, mockStore, broadcastCh)
poolInt.SetNewSandboxAndRecheck(sbx)
Expand Down Expand Up @@ -89,7 +93,7 @@ func (td *testData) shouldPublishTransaction(t *testing.T, txID tx.ID) {
}

func TestAppendAndRemove(t *testing.T) {
td := setup(t)
td := setup(t, nil)

height := td.RandHeight()
td.sbx.TestStore.AddTestBlock(height)
Expand All @@ -108,7 +112,7 @@ func TestAppendAndRemove(t *testing.T) {
}

func TestCalculatingConsumption(t *testing.T) {
td := setup(t)
td := setup(t, nil)

// Generate keys for different transaction signers
_, prv1 := td.RandEd25519KeyPair()
Expand Down Expand Up @@ -145,29 +149,94 @@ func TestCalculatingConsumption(t *testing.T) {

for _, tt := range tests {
// Generate a block with the transactions for the given height
blk, cert := td.TestSuite.GenerateTestBlock(tt.height, func(bm *testsuite.BlockMaker) {
blk, cert := td.GenerateTestBlock(tt.height, func(bm *testsuite.BlockMaker) {
bm.Txs = tt.txs
})
td.store.SaveBlock(blk, cert)

// Handle the block in the transaction pool
err := td.pool.HandleCommittedBlock(blk)
require.NoError(t, err)
td.pool.HandleCommittedBlock(blk)
}

require.Equal(t, expected, td.pool.consumptionMap)
}

func TestEstimatedConsumptionalFee(t *testing.T) {
td := setup(t, &Config{
MaxSize: 10,
ConsumptionWindow: 3,
Fee: &FeeConfig{
FixedFee: 0,
DailyLimit: 360,
UnitPrice: 0.000005,
},
})

t.Run("Test indexed signer", func(t *testing.T) {
accPub, accPrv := td.RandEd25519KeyPair()
acc1Addr := accPub.AccountAddress()
acc1 := account.NewAccount(0)
acc1.AddToBalance(1000e9)
td.sbx.UpdateAccount(acc1Addr, acc1)

txr := td.GenerateTestTransferTx(testsuite.TransactionWithEd25519Signer(accPrv), testsuite.TransactionWithAmount(1e9))

blk, cert := td.GenerateTestBlock(td.RandHeight(), testsuite.BlockWithTransactions([]*tx.Tx{txr}))
td.store.SaveBlock(blk, cert)

tests := []struct {
value amount.Amount
fee amount.Amount
withErr bool
}{
{1e9, 0, false},
{1e9, 0, false},
{1e9, 89800000, false},
{1e9, 90000000, false},
{1e9, 7000000000, false},
{1e9, 0, true},
}

for _, tt := range tests {
trx := td.GenerateTestTransferTx(
testsuite.TransactionWithEd25519Signer(accPrv),
testsuite.TransactionWithAmount(tt.value),
testsuite.TransactionWithFee(tt.fee),
)

err := td.pool.AppendTx(trx)
if tt.withErr {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
}
})

t.Run("Test non-indexed signer", func(t *testing.T) {
_, accPrv := td.RandEd25519KeyPair()

trx := td.GenerateTestTransferTx(
testsuite.TransactionWithEd25519Signer(accPrv),
testsuite.TransactionWithAmount(1e9),
testsuite.TransactionWithFee(0),
)

err := td.pool.AppendTx(trx)
assert.Error(t, err)
})
}

func TestAppendInvalidTransaction(t *testing.T) {
td := setup(t)
td := setup(t, nil)

invTrx := td.GenerateTestTransferTx()
assert.Error(t, td.pool.AppendTx(invTrx))
}

// TestFullPool tests if the pool prunes the old transactions when it is full.
func TestFullPool(t *testing.T) {
td := setup(t)
td := setup(t, nil)

randHeight := td.RandHeight()
_ = td.sbx.TestStore.AddTestBlock(randHeight)
Expand All @@ -194,13 +263,13 @@ func TestFullPool(t *testing.T) {
}

func TestEmptyPool(t *testing.T) {
td := setup(t)
td := setup(t, nil)

assert.Empty(t, td.pool.PrepareBlockTransactions(), "pool should be empty")
}

func TestPrepareBlockTransactions(t *testing.T) {
td := setup(t)
td := setup(t, nil)

randHeight := td.RandHeight() + td.sbx.TestParams.UnbondInterval
_ = td.sbx.TestStore.AddTestBlock(randHeight)
Expand Down Expand Up @@ -255,7 +324,7 @@ func TestPrepareBlockTransactions(t *testing.T) {
}

func TestAppendAndBroadcast(t *testing.T) {
td := setup(t)
td := setup(t, nil)

height := td.RandHeight()
td.sbx.TestStore.AddTestBlock(height)
Expand All @@ -269,7 +338,7 @@ func TestAppendAndBroadcast(t *testing.T) {
}

func TestAddSubsidyTransactions(t *testing.T) {
td := setup(t)
td := setup(t, nil)

randHeight := td.RandHeight()
td.sbx.TestStore.AddTestBlock(randHeight)
Expand Down

0 comments on commit d5bc6f6

Please sign in to comment.