Skip to content

Commit

Permalink
feat(txpool): add consomptional fee model (#1572)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ja7ad authored Nov 1, 2024
1 parent 9711f31 commit d8f993d
Show file tree
Hide file tree
Showing 13 changed files with 197 additions and 57 deletions.
9 changes: 5 additions & 4 deletions config/example_config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,15 @@
fixed_fee = 0.01

# The `daily_limit` is the number of bytes an account can send each day without paying a fee.
# The `daily_limit` is part of he consumptional fee model.
# The `daily_limit` is part of the consumptional fee model.
# To undesrstand how condumptional fee model works, you can refer to
# PIP-31: Consumptional Fee Mode (https://pips.pactus.org/PIPs/pip-31)
# Default is `280` bytes.
daily_limit = 280
# Default is `360` bytes.
daily_limit = 360

# The `unit_price` defines the fee per byte in PAC.
# The `unit_price` is part of he consumptional fee model.
# The `unit_price` is part of the consumptional fee model.
# If it is zero, the consumptional fee will be ignored.
# To undesrstand how condumptional fee model works, you can refer to
# PIP-31: Consumptional Fee Mode (https://pips.pactus.org/PIPs/pip-31)
# Default is `0.0` PAC.
Expand Down
4 changes: 1 addition & 3 deletions state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,9 +438,7 @@ func (st *state) CommitBlock(blk *block.Block, cert *certificate.BlockCertificat
}

// Remove transactions from pool and update consumption
if err := st.txPool.HandleCommittedBlock(blk); err != nil {
return err
}
st.txPool.HandleCommittedBlock(blk)

st.logger.Info("new block committed", "block", blk, "round", cert.Round())

Expand Down
1 change: 1 addition & 0 deletions store/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
)

func blockKey(height uint32) []byte { return append(blockPrefix, util.Uint32ToSlice(height)...) }

func publicKeyKey(addr crypto.Address) []byte {
return append(publicKeyPrefix, addr.Bytes()...)
}
Expand Down
1 change: 1 addition & 0 deletions store/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ type Reader interface {
Transaction(txID tx.ID) (*CommittedTx, error)
RecentTransaction(txID tx.ID) bool
PublicKey(addr crypto.Address) (crypto.PublicKey, error)
HasPublicKey(addr crypto.Address) bool
HasAccount(crypto.Address) bool
Account(addr crypto.Address) (*account.Account, error)
TotalAccounts() int32
Expand Down
6 changes: 6 additions & 0 deletions store/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,12 @@ func (m *MockStore) PublicKey(addr crypto.Address) (crypto.PublicKey, error) {
return nil, ErrNotFound
}

func (m *MockStore) HasPublicKey(addr crypto.Address) bool {
pub, _ := m.PublicKey(addr)

return pub != nil
}

func (m *MockStore) Transaction(txID tx.ID) (*CommittedTx, error) {
for height, blk := range m.Blocks {
for _, trx := range blk.Transactions() {
Expand Down
7 changes: 7 additions & 0 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,13 @@ func (s *store) PublicKey(addr crypto.Address) (crypto.PublicKey, error) {
return s.blockStore.publicKey(addr)
}

func (s *store) HasPublicKey(addr crypto.Address) bool {
s.lk.RLock()
defer s.lk.RUnlock()

return tryHas(s.db, publicKeyKey(addr))
}

func (s *store) Transaction(txID tx.ID) (*CommittedTx, error) {
s.lk.Lock()
defer s.lk.Unlock()
Expand Down
7 changes: 7 additions & 0 deletions store/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,13 +138,20 @@ func TestIndexingPublicKeys(t *testing.T) {
pub, err := td.store.PublicKey(addr)
assert.NoError(t, err)

ok := td.store.HasPublicKey(addr)
assert.True(t, ok)

assert.True(t, trx.PublicKey().EqualsTo(pub))
}
})

t.Run("Query non existing public key", func(t *testing.T) {
randValAddress := td.RandValAddress()
pubKey, err := td.store.PublicKey(randValAddress)

ok := td.store.HasPublicKey(randValAddress)
assert.False(t, ok)

assert.Error(t, err)
assert.Nil(t, pubKey)
})
Expand Down
12 changes: 7 additions & 5 deletions txpool/config.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package txpool

import (
"github.com/pactus-project/pactus/types/amount"
)
import "github.com/pactus-project/pactus/types/amount"

type Config struct {
MaxSize int `toml:"max_size"`
Expand All @@ -29,7 +27,7 @@ func DefaultConfig() *Config {
func DefaultFeeConfig() *FeeConfig {
return &FeeConfig{
FixedFee: 0.01,
DailyLimit: 280,
DailyLimit: 360,
UnitPrice: 0,
}
}
Expand All @@ -51,7 +49,11 @@ func (conf *Config) BasicCheck() error {
return nil
}

func (conf *Config) minFee() amount.Amount {
func (conf *Config) CalculateConsumption() bool {
return conf.Fee.UnitPrice > 0
}

func (conf *Config) fixedFee() amount.Amount {
amt, _ := amount.NewAmount(conf.Fee.FixedFee)

return amt
Expand Down
2 changes: 1 addition & 1 deletion txpool/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func TestDefaultConfig(t *testing.T) {
assert.Equal(t, 100, conf.unbondPoolSize())
assert.Equal(t, 100, conf.withdrawPoolSize())
assert.Equal(t, 100, conf.sortitionPoolSize())
assert.Equal(t, amount.Amount(0.1e8), conf.minFee())
assert.Equal(t, amount.Amount(0.1e8), conf.fixedFee())

assert.Equal(t,
conf.transferPoolSize()+
Expand Down
2 changes: 1 addition & 1 deletion txpool/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@ type TxPool interface {
SetNewSandboxAndRecheck(sbx sandbox.Sandbox)
AppendTxAndBroadcast(trx *tx.Tx) error
AppendTx(trx *tx.Tx) error
HandleCommittedBlock(blk *block.Block) error
HandleCommittedBlock(blk *block.Block)
}
4 changes: 1 addition & 3 deletions txpool/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,7 @@ func (m *MockTxPool) RemoveTx(id hash.Hash) {
}
}

func (*MockTxPool) HandleCommittedBlock(_ *block.Block) error {
return nil
}
func (*MockTxPool) HandleCommittedBlock(_ *block.Block) {}

func (m *MockTxPool) PrepareBlockTransactions() block.Txs {
txs := make([]*tx.Tx, m.Size())
Expand Down
102 changes: 76 additions & 26 deletions txpool/txpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ type txPool struct {
// The transaction pool also maintains a consumption map for tracking byte usage per address.
func NewTxPool(conf *Config, storeReader store.Reader, broadcastCh chan message.Message) TxPool {
pools := make(map[payload.Type]pool)
pools[payload.TypeTransfer] = newPool(conf.transferPoolSize(), conf.minFee())
pools[payload.TypeBond] = newPool(conf.bondPoolSize(), conf.minFee())
pools[payload.TypeTransfer] = newPool(conf.transferPoolSize(), conf.fixedFee())
pools[payload.TypeBond] = newPool(conf.bondPoolSize(), conf.fixedFee())
pools[payload.TypeUnbond] = newPool(conf.unbondPoolSize(), 0)
pools[payload.TypeWithdraw] = newPool(conf.withdrawPoolSize(), conf.minFee())
pools[payload.TypeWithdraw] = newPool(conf.withdrawPoolSize(), conf.fixedFee())
pools[payload.TypeSortition] = newPool(conf.sortitionPoolSize(), 0)

pool := &txPool{
Expand Down Expand Up @@ -112,11 +112,13 @@ func (p *txPool) appendTx(trx *tx.Tx) error {
}

if !trx.IsFreeTx() {
if trx.Fee() < payloadPool.estimatedFee() {
p.logger.Warn("low fee transaction", "tx", trx, "minFee", payloadPool.estimatedFee())
minFee := p.estimatedMinimumFee(trx)

if trx.Fee() < minFee {
p.logger.Warn("low fee transaction", "txs", trx, "minFee", minFee)

return AppendError{
Err: fmt.Errorf("low fee transaction, expected to be more than %s", payloadPool.estimatedFee()),
Err: fmt.Errorf("low fee transaction, expected to be more than %s", minFee),
}
}
}
Expand All @@ -128,32 +130,44 @@ func (p *txPool) appendTx(trx *tx.Tx) error {
}

payloadPool.list.PushBack(trx.ID(), trx)
p.logger.Debug("transaction appended into pool", "tx", trx)
p.logger.Debug("transaction appended into pool", "trx", trx)

return nil
}

func (p *txPool) checkTx(trx *tx.Tx) error {
if err := execution.CheckAndExecute(trx, p.sbx, false); err != nil {
p.logger.Debug("invalid transaction", "tx", trx, "error", err)
p.logger.Debug("invalid transaction", "trx", trx, "error", err)

return err
}

return nil
}

func (p *txPool) HandleCommittedBlock(blk *block.Block) error {
func (p *txPool) EstimatedFee(_ amount.Amount, payloadType payload.Type) amount.Amount {
selectedPool, ok := p.pools[payloadType]
if !ok {
return 0
}

return selectedPool.estimatedFee()
}

func (p *txPool) HandleCommittedBlock(blk *block.Block) {
p.lk.Lock()
defer p.lk.Unlock()

for _, trx := range blk.Transactions() {
p.removeTx(trx.ID())

p.handleIncreaseConsumption(trx)
}

return p.handleDecreaseConsumption(blk.Height())
if p.config.CalculateConsumption() {
for _, trx := range blk.Transactions() {
p.handleIncreaseConsumption(trx)
}
p.handleDecreaseConsumption(blk.Height())
}
}

func (p *txPool) handleIncreaseConsumption(trx *tx.Tx) {
Expand All @@ -164,26 +178,30 @@ func (p *txPool) handleIncreaseConsumption(trx *tx.Tx) {
}
}

func (p *txPool) handleDecreaseConsumption(height uint32) error {
func (p *txPool) handleDecreaseConsumption(height uint32) {
// If height is less than or equal to ConsumptionWindow, nothing to do.
if height <= p.config.ConsumptionWindow {
return nil
return
}

// Calculate the block height that has passed out of the consumption window.
windowedBlockHeight := height - p.config.ConsumptionWindow
committedBlock, err := p.store.Block(windowedBlockHeight)
if err != nil {
return err
p.logger.Error("failed to read block", "height", windowedBlockHeight, "err", err)

return
}

blk, err := committedBlock.ToBlock()
if err != nil {
return err
p.logger.Error("failed to parse block", "height", windowedBlockHeight, "err", err)

return
}

for _, trx := range blk.Transactions() {
if trx.IsTransferTx() || trx.IsBondTx() || trx.IsWithdrawTx() {
if !trx.IsFreeTx() {
signer := trx.Payload().Signer()
if consumption, ok := p.consumptionMap[signer]; ok {
// Decrease the consumption by the size of the transaction
Expand All @@ -199,8 +217,6 @@ func (p *txPool) handleDecreaseConsumption(height uint32) error {
}
}
}

return nil
}

func (p *txPool) removeTx(txID tx.ID) {
Expand Down Expand Up @@ -291,16 +307,31 @@ func (p *txPool) Size() int {
return size
}

func (p *txPool) EstimatedFee(_ amount.Amount, payloadType payload.Type) amount.Amount {
p.lk.RLock()
defer p.lk.RUnlock()
func (p *txPool) estimatedMinimumFee(trx *tx.Tx) amount.Amount {
return p.fixedFee() + p.consumptionalFee(trx)
}

payloadPool, ok := p.pools[payloadType]
if !ok {
return 0
func (p *txPool) fixedFee() amount.Amount {
return p.config.fixedFee()
}

// consumptionalFee calculates based on the amount of data each address consumes daily.
func (p *txPool) consumptionalFee(trx *tx.Tx) amount.Amount {
var consumption uint32
signer := trx.Payload().Signer()
txSize := uint32(trx.SerializeSize())

if !p.store.HasPublicKey(signer) {
consumption = p.config.Fee.DailyLimit
} else {
consumption = p.consumptionMap[signer] + txSize + p.getPendingConsumption(signer)
}

return payloadPool.estimatedFee()
coefficient := consumption / p.config.Fee.DailyLimit

consumptionalFee, _ := amount.NewAmount(float64(coefficient) * float64(consumption) * p.config.Fee.UnitPrice)

return consumptionalFee
}

func (p *txPool) AllPendingTxs() []*tx.Tx {
Expand All @@ -322,6 +353,25 @@ func (p *txPool) AllPendingTxs() []*tx.Tx {
return txs
}

func (p *txPool) getPendingConsumption(signer crypto.Address) uint32 {
totalSize := uint32(0)

// TODO: big o is "o(n * m)"
var next *linkedlist.Element[linkedmap.Pair[tx.ID, *tx.Tx]]
for ptype, pool := range p.pools {
if ptype == payload.TypeTransfer || ptype == payload.TypeBond || ptype == payload.TypeWithdraw {
for e := pool.list.HeadNode(); e != nil; e = next {
next = e.Next
if e.Data.Value.Payload().Signer() == signer {
totalSize += uint32(e.Data.Value.SerializeSize())
}
}
}
}

return totalSize
}

func (p *txPool) String() string {
return fmt.Sprintf("{💸 %v 🔐 %v 🔓 %v 🎯 %v 🧾 %v}",
p.pools[payload.TypeTransfer].list.Size(),
Expand Down
Loading

0 comments on commit d8f993d

Please sign in to comment.