diff --git a/txpool/txpool.go b/txpool/txpool.go index 03b7ee9bf..4b4f18ef3 100644 --- a/txpool/txpool.go +++ b/txpool/txpool.go @@ -34,10 +34,10 @@ type txPool struct { // The transaction pool also maintains a consumption map for tracking byte usage per address. func NewTxPool(conf *Config, storeReader store.Reader, broadcastCh chan message.Message) TxPool { pools := make(map[payload.Type]pool) - pools[payload.TypeTransfer] = newPool(conf.transferPoolSize(), conf.minFee()) - pools[payload.TypeBond] = newPool(conf.bondPoolSize(), conf.minFee()) + pools[payload.TypeTransfer] = newPool(conf.transferPoolSize(), conf.fixedFee()) + pools[payload.TypeBond] = newPool(conf.bondPoolSize(), conf.fixedFee()) pools[payload.TypeUnbond] = newPool(conf.unbondPoolSize(), 0) - pools[payload.TypeWithdraw] = newPool(conf.withdrawPoolSize(), conf.minFee()) + pools[payload.TypeWithdraw] = newPool(conf.withdrawPoolSize(), conf.fixedFee()) pools[payload.TypeSortition] = newPool(conf.sortitionPoolSize(), 0) pool := &txPool{ @@ -112,11 +112,13 @@ func (p *txPool) appendTx(trx *tx.Tx) error { } if !trx.IsFreeTx() { - if trx.Fee() < payloadPool.estimatedFee() { - p.logger.Warn("low fee transaction", "tx", trx, "minFee", payloadPool.estimatedFee()) + minFee := p.estimatedMinimumFee(trx) + + if trx.Fee() < minFee { + p.logger.Warn("low fee transaction", "txs", trx, "minFee", minFee) return AppendError{ - Err: fmt.Errorf("low fee transaction, expected to be more than %s", payloadPool.estimatedFee()), + Err: fmt.Errorf("low fee transaction, expected to be more than %s", minFee), } } } @@ -128,14 +130,14 @@ func (p *txPool) appendTx(trx *tx.Tx) error { } payloadPool.list.PushBack(trx.ID(), trx) - p.logger.Debug("transaction appended into pool", "tx", trx) + p.logger.Debug("transaction appended into pool", "txs", trx) return nil } func (p *txPool) checkTx(trx *tx.Tx) error { if err := execution.CheckAndExecute(trx, p.sbx, false); err != nil { - p.logger.Debug("invalid transaction", "tx", trx, "error", err) + p.logger.Debug("invalid transaction", "txs", trx, "error", err) return err } @@ -143,17 +145,29 @@ func (p *txPool) checkTx(trx *tx.Tx) error { return nil } -func (p *txPool) HandleCommittedBlock(blk *block.Block) error { +func (p *txPool) EstimatedFee(_ amount.Amount, payloadType payload.Type) amount.Amount { + selectedPool, ok := p.pools[payloadType] + if !ok { + return 0 + } + + return selectedPool.estimatedFee() +} + +func (p *txPool) HandleCommittedBlock(blk *block.Block) { p.lk.Lock() defer p.lk.Unlock() for _, trx := range blk.Transactions() { p.removeTx(trx.ID()) - - p.handleIncreaseConsumption(trx) } - return p.handleDecreaseConsumption(blk.Height()) + if p.config.CalculateConsumption() { + for _, trx := range blk.Transactions() { + p.handleIncreaseConsumption(trx) + } + p.handleDecreaseConsumption(blk.Height()) + } } func (p *txPool) handleIncreaseConsumption(trx *tx.Tx) { @@ -164,26 +178,30 @@ func (p *txPool) handleIncreaseConsumption(trx *tx.Tx) { } } -func (p *txPool) handleDecreaseConsumption(height uint32) error { +func (p *txPool) handleDecreaseConsumption(height uint32) { // If height is less than or equal to ConsumptionWindow, nothing to do. if height <= p.config.ConsumptionWindow { - return nil + return } // Calculate the block height that has passed out of the consumption window. windowedBlockHeight := height - p.config.ConsumptionWindow committedBlock, err := p.store.Block(windowedBlockHeight) if err != nil { - return err + p.logger.Error("failed to read block", "height", windowedBlockHeight, "err", err) + + return } blk, err := committedBlock.ToBlock() if err != nil { - return err + p.logger.Error("failed to parse block", "height", windowedBlockHeight, "err", err) + + return } for _, trx := range blk.Transactions() { - if trx.IsTransferTx() || trx.IsBondTx() || trx.IsWithdrawTx() { + if !trx.IsFreeTx() { signer := trx.Payload().Signer() if consumption, ok := p.consumptionMap[signer]; ok { // Decrease the consumption by the size of the transaction @@ -199,8 +217,6 @@ func (p *txPool) handleDecreaseConsumption(height uint32) error { } } } - - return nil } func (p *txPool) removeTx(txID tx.ID) { @@ -291,16 +307,31 @@ func (p *txPool) Size() int { return size } -func (p *txPool) EstimatedFee(_ amount.Amount, payloadType payload.Type) amount.Amount { - p.lk.RLock() - defer p.lk.RUnlock() +func (p *txPool) estimatedMinimumFee(trx *tx.Tx) amount.Amount { + return p.fixedFee() + p.consumptionalFee(trx) +} - payloadPool, ok := p.pools[payloadType] - if !ok { - return 0 +func (p *txPool) fixedFee() amount.Amount { + return p.config.fixedFee() +} + +// consumptionalFee calculates based on the amount of data each address consumes daily. +func (p *txPool) consumptionalFee(trx *tx.Tx) amount.Amount { + var consumption uint32 + signer := trx.Payload().Signer() + txSize := uint32(trx.SerializeSize()) + + if !p.store.HasPublicKey(signer) { + consumption = p.config.Fee.DailyLimit + } else { + consumption = p.consumptionMap[signer] + txSize + p.getPendingConsumption(signer) } - return payloadPool.estimatedFee() + coefficient := consumption / p.config.Fee.DailyLimit + + consumptionalFee, _ := amount.NewAmount(float64(coefficient) * float64(consumption) * p.config.Fee.UnitPrice) + + return consumptionalFee } func (p *txPool) AllPendingTxs() []*tx.Tx { @@ -322,6 +353,25 @@ func (p *txPool) AllPendingTxs() []*tx.Tx { return txs } +func (p *txPool) getPendingConsumption(signer crypto.Address) uint32 { + totalSize := uint32(0) + + // TODO: big o is "o(n * m)" + var next *linkedlist.Element[linkedmap.Pair[tx.ID, *tx.Tx]] + for ptype, pool := range p.pools { + if ptype == payload.TypeTransfer || ptype == payload.TypeBond || ptype == payload.TypeWithdraw { + for e := pool.list.HeadNode(); e != nil; e = next { + next = e.Next + if e.Data.Value.Payload().Signer() == signer { + totalSize += uint32(e.Data.Value.SerializeSize()) + } + } + } + } + + return totalSize +} + func (p *txPool) String() string { return fmt.Sprintf("{๐Ÿ’ธ %v ๐Ÿ” %v ๐Ÿ”“ %v ๐ŸŽฏ %v ๐Ÿงพ %v}", p.pools[payload.TypeTransfer].list.Size(), diff --git a/txpool/txpool_test.go b/txpool/txpool_test.go index 88cbb6794..0b1b4e854 100644 --- a/txpool/txpool_test.go +++ b/txpool/txpool_test.go @@ -11,6 +11,7 @@ import ( "github.com/pactus-project/pactus/store" "github.com/pactus-project/pactus/sync/bundle/message" "github.com/pactus-project/pactus/types/account" + "github.com/pactus-project/pactus/types/amount" "github.com/pactus-project/pactus/types/tx" "github.com/pactus-project/pactus/types/validator" "github.com/pactus-project/pactus/util/logger" @@ -33,14 +34,14 @@ func testConfig() *Config { MaxSize: 10, ConsumptionWindow: 3, Fee: &FeeConfig{ - FixedFee: 0.000001, + FixedFee: 0.01, DailyLimit: 280, - UnitPrice: 0.0, + UnitPrice: 0.00005, }, } } -func setup(t *testing.T) *testData { +func setup(t *testing.T, cfg *Config) *testData { t.Helper() ts := testsuite.NewTestSuite(t) @@ -48,6 +49,9 @@ func setup(t *testing.T) *testData { broadcastCh := make(chan message.Message, 10) sbx := sandbox.MockingSandbox(ts) config := testConfig() + if cfg != nil { + config = cfg + } mockStore := store.MockingStore(ts) poolInt := NewTxPool(config, mockStore, broadcastCh) poolInt.SetNewSandboxAndRecheck(sbx) @@ -89,7 +93,7 @@ func (td *testData) shouldPublishTransaction(t *testing.T, txID tx.ID) { } func TestAppendAndRemove(t *testing.T) { - td := setup(t) + td := setup(t, nil) height := td.RandHeight() td.sbx.TestStore.AddTestBlock(height) @@ -108,7 +112,7 @@ func TestAppendAndRemove(t *testing.T) { } func TestCalculatingConsumption(t *testing.T) { - td := setup(t) + td := setup(t, nil) // Generate keys for different transaction signers _, prv1 := td.RandEd25519KeyPair() @@ -145,21 +149,86 @@ func TestCalculatingConsumption(t *testing.T) { for _, tt := range tests { // Generate a block with the transactions for the given height - blk, cert := td.TestSuite.GenerateTestBlock(tt.height, func(bm *testsuite.BlockMaker) { + blk, cert := td.GenerateTestBlock(tt.height, func(bm *testsuite.BlockMaker) { bm.Txs = tt.txs }) td.store.SaveBlock(blk, cert) // Handle the block in the transaction pool - err := td.pool.HandleCommittedBlock(blk) - require.NoError(t, err) + td.pool.HandleCommittedBlock(blk) } require.Equal(t, expected, td.pool.consumptionMap) } +func TestEstimatedConsumptionalFee(t *testing.T) { + td := setup(t, &Config{ + MaxSize: 10, + ConsumptionWindow: 3, + Fee: &FeeConfig{ + FixedFee: 0, + DailyLimit: 360, + UnitPrice: 0.000005, + }, + }) + + t.Run("Test indexed signer", func(t *testing.T) { + accPub, accPrv := td.RandEd25519KeyPair() + acc1Addr := accPub.AccountAddress() + acc1 := account.NewAccount(0) + acc1.AddToBalance(1000e9) + td.sbx.UpdateAccount(acc1Addr, acc1) + + txr := td.GenerateTestTransferTx(testsuite.TransactionWithEd25519Signer(accPrv), testsuite.TransactionWithAmount(1e9)) + + blk, cert := td.GenerateTestBlock(td.RandHeight(), testsuite.BlockWithTransactions([]*tx.Tx{txr})) + td.store.SaveBlock(blk, cert) + + tests := []struct { + value amount.Amount + fee amount.Amount + withErr bool + }{ + {1e9, 0, false}, + {1e9, 0, false}, + {1e9, 89800000, false}, + {1e9, 90000000, false}, + {1e9, 7000000000, false}, + {1e9, 0, true}, + } + + for _, tt := range tests { + trx := td.GenerateTestTransferTx( + testsuite.TransactionWithEd25519Signer(accPrv), + testsuite.TransactionWithAmount(tt.value), + testsuite.TransactionWithFee(tt.fee), + ) + + err := td.pool.AppendTx(trx) + if tt.withErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + } + }) + + t.Run("Test non-indexed signer", func(t *testing.T) { + _, accPrv := td.RandEd25519KeyPair() + + trx := td.GenerateTestTransferTx( + testsuite.TransactionWithEd25519Signer(accPrv), + testsuite.TransactionWithAmount(1e9), + testsuite.TransactionWithFee(0), + ) + + err := td.pool.AppendTx(trx) + assert.Error(t, err) + }) +} + func TestAppendInvalidTransaction(t *testing.T) { - td := setup(t) + td := setup(t, nil) invTrx := td.GenerateTestTransferTx() assert.Error(t, td.pool.AppendTx(invTrx)) @@ -167,7 +236,7 @@ func TestAppendInvalidTransaction(t *testing.T) { // TestFullPool tests if the pool prunes the old transactions when it is full. func TestFullPool(t *testing.T) { - td := setup(t) + td := setup(t, nil) randHeight := td.RandHeight() _ = td.sbx.TestStore.AddTestBlock(randHeight) @@ -194,13 +263,13 @@ func TestFullPool(t *testing.T) { } func TestEmptyPool(t *testing.T) { - td := setup(t) + td := setup(t, nil) assert.Empty(t, td.pool.PrepareBlockTransactions(), "pool should be empty") } func TestPrepareBlockTransactions(t *testing.T) { - td := setup(t) + td := setup(t, nil) randHeight := td.RandHeight() + td.sbx.TestParams.UnbondInterval _ = td.sbx.TestStore.AddTestBlock(randHeight) @@ -255,7 +324,7 @@ func TestPrepareBlockTransactions(t *testing.T) { } func TestAppendAndBroadcast(t *testing.T) { - td := setup(t) + td := setup(t, nil) height := td.RandHeight() td.sbx.TestStore.AddTestBlock(height) @@ -269,7 +338,7 @@ func TestAppendAndBroadcast(t *testing.T) { } func TestAddSubsidyTransactions(t *testing.T) { - td := setup(t) + td := setup(t, nil) randHeight := td.RandHeight() td.sbx.TestStore.AddTestBlock(randHeight)