From 8a75c24af43ae7a11432dde301f8b67bd5b0b650 Mon Sep 17 00:00:00 2001 From: zhangkai Date: Mon, 29 Apr 2024 17:54:20 +0800 Subject: [PATCH] Count ready tx (#196) * add transaction counter use set instead of increase add balance issue situation * tidy code * fix tx counter remove lock * add setReadyTxCount to close block --- docs/config-file/node-config-doc.html | 8 +- docs/config-file/node-config-doc.md | 97 ----------- docs/config-file/node-config-schema.json | 38 ----- jsonrpc/dynamic_gas_price_xlayer.go | 19 +-- jsonrpc/endpoints_eth_xlayer.go | 20 ++- jsonrpc/mocks/mock_pool_xlayer.go | 23 ++- jsonrpc/types/interfaces.go | 2 +- pool/apollo_xlayer.go | 11 -- pool/config.go | 3 - pool/interfaces.go | 8 +- pool/pendingstat_xlayer.go | 190 --------------------- pool/pendingstatcache_xlayer.go | 71 -------- pool/pgpoolstorage/pgpoolstorage_xlayer.go | 111 ++---------- pool/pool.go | 2 - pool/pool_xlayer.go | 5 - sequencer/addrqueue_xlayer.go | 16 ++ sequencer/interfaces.go | 2 + sequencer/l2block.go | 2 + sequencer/mock_pool_xlayer.go | 4 + sequencer/mock_worker_xlayer.go | 8 + sequencer/pooltx_counter_xlayer.go | 31 ++++ sequencer/sequencer.go | 2 + sequencer/sequencer_xlayer.go | 12 ++ sequencer/worker.go | 11 +- sequencer/worker_xlayer.go | 30 ++++ 25 files changed, 172 insertions(+), 554 deletions(-) delete mode 100644 pool/pendingstat_xlayer.go delete mode 100644 pool/pendingstatcache_xlayer.go create mode 100644 sequencer/addrqueue_xlayer.go create mode 100644 sequencer/mock_worker_xlayer.go create mode 100644 sequencer/pooltx_counter_xlayer.go create mode 100644 sequencer/worker_xlayer.go diff --git a/docs/config-file/node-config-doc.html b/docs/config-file/node-config-doc.html index 982fb7455b..8561f49ee2 100644 --- a/docs/config-file/node-config-doc.html +++ b/docs/config-file/node-config-doc.html @@ -14,13 +14,7 @@
"300ms"
 

Default: "15s"Type: string

PollMinAllowedGasPriceInterval is the interval to poll the suggested min gas price for a tx


Examples:

"1m"
 
"300ms"
-

Default: 64Type: integer

AccountQueue represents the maximum number of non-executable transaction slots permitted per account


Default: 1024Type: integer

GlobalQueue represents the maximum number of non-executable transaction slots for all accounts


EffectiveGasPrice is the config for the effective gas price calculation
Default: falseType: boolean

Enabled is a flag to enable/disable the effective gas price


Default: 0.25Type: number

L1GasPriceFactor is the percentage of the L1 gas price that will be used as the L2 min gas price


Default: 16Type: integer

ByteGasCost is the gas cost per byte that is not 0


Default: 4Type: integer

ZeroByteGasCost is the gas cost per byte that is 0


Default: 1Type: number

NetProfit is the profit margin to apply to the calculated breakEvenGasPrice


Default: 1.1Type: number

BreakEvenFactor is the factor to apply to the calculated breakevenGasPrice when comparing it with the gasPriceSigned of a tx


Default: 10Type: integer

FinalDeviationPct is the max allowed deviation percentage BreakEvenGasPrice on re-calculation


Default: 0Type: integer

EthTransferGasPrice is the fixed gas price returned as effective gas price for txs tha are ETH transfers (0 means disabled)
Only one of EthTransferGasPrice or EthTransferL1GasPriceFactor params can be different than 0. If both params are set to 0, the sequencer will halt and log an error


Default: 0Type: number

EthTransferL1GasPriceFactor is the percentage of L1 gas price returned as effective gas price for txs tha are ETH transfers (0 means disabled)
Only one of EthTransferGasPrice or EthTransferL1GasPriceFactor params can be different than 0. If both params are set to 0, the sequencer will halt and log an error


Default: 0.5Type: number

L2GasPriceSuggesterFactor is the factor to apply to L1 gas price to get the suggested L2 gas price used in the
calculations when the effective gas price is disabled (testing/metrics purposes)


Default: 0Type: integer

ForkID is the current fork ID of the chain


Default: ["0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266"]Type: array of string

XLayer config
FreeGasAddress is the default free gas address

Each item of this array must be:


Default: 150000Type: integer

FreeClaimGasLimit is the max gas allowed use to do a free claim


Type: array of string

BridgeClaimMethodSignature for tracking BridgeClaimMethodSignature method

Each item of this array must be:


PendingStat is the configuration for the pending statistics
Default: falseType: boolean

Default: "0s"Type: string

Examples:

"1m"
-
"300ms"
-

Default: "0s"Type: string

Examples:

"1m"
-
"300ms"
-

Default: "0s"Type: string

Examples:

"1m"
-
"300ms"
-

Configuration for RPC service. THis one offers a extended Ethereum JSON-RPC API interface to interact with the node
Default: "0.0.0.0"Type: string

Host defines the network adapter that will be used to serve the HTTP requests


Default: 8545Type: integer

Port defines the port to serve the endpoints via HTTP


Default: "1m0s"Type: string

ReadTimeout is the HTTP server read timeout
check net/http.server.ReadTimeout and net/http.server.ReadHeaderTimeout


Examples:

"1m"
+

Default: 64Type: integer

AccountQueue represents the maximum number of non-executable transaction slots permitted per account


Default: 1024Type: integer

GlobalQueue represents the maximum number of non-executable transaction slots for all accounts


EffectiveGasPrice is the config for the effective gas price calculation
Default: falseType: boolean

Enabled is a flag to enable/disable the effective gas price


Default: 0.25Type: number

L1GasPriceFactor is the percentage of the L1 gas price that will be used as the L2 min gas price


Default: 16Type: integer

ByteGasCost is the gas cost per byte that is not 0


Default: 4Type: integer

ZeroByteGasCost is the gas cost per byte that is 0


Default: 1Type: number

NetProfit is the profit margin to apply to the calculated breakEvenGasPrice


Default: 1.1Type: number

BreakEvenFactor is the factor to apply to the calculated breakevenGasPrice when comparing it with the gasPriceSigned of a tx


Default: 10Type: integer

FinalDeviationPct is the max allowed deviation percentage BreakEvenGasPrice on re-calculation


Default: 0Type: integer

EthTransferGasPrice is the fixed gas price returned as effective gas price for txs tha are ETH transfers (0 means disabled)
Only one of EthTransferGasPrice or EthTransferL1GasPriceFactor params can be different than 0. If both params are set to 0, the sequencer will halt and log an error


Default: 0Type: number

EthTransferL1GasPriceFactor is the percentage of L1 gas price returned as effective gas price for txs tha are ETH transfers (0 means disabled)
Only one of EthTransferGasPrice or EthTransferL1GasPriceFactor params can be different than 0. If both params are set to 0, the sequencer will halt and log an error


Default: 0.5Type: number

L2GasPriceSuggesterFactor is the factor to apply to L1 gas price to get the suggested L2 gas price used in the
calculations when the effective gas price is disabled (testing/metrics purposes)


Default: 0Type: integer

ForkID is the current fork ID of the chain


Default: ["0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266"]Type: array of string

XLayer config
FreeGasAddress is the default free gas address

Each item of this array must be:


Default: 150000Type: integer

FreeClaimGasLimit is the max gas allowed use to do a free claim


Type: array of string

BridgeClaimMethodSignature for tracking BridgeClaimMethodSignature method

Each item of this array must be:


Configuration for RPC service. THis one offers a extended Ethereum JSON-RPC API interface to interact with the node
Default: "0.0.0.0"Type: string

Host defines the network adapter that will be used to serve the HTTP requests


Default: 8545Type: integer

Port defines the port to serve the endpoints via HTTP


Default: "1m0s"Type: string

ReadTimeout is the HTTP server read timeout
check net/http.server.ReadTimeout and net/http.server.ReadHeaderTimeout


Examples:

"1m"
 
"300ms"
 

Default: "1m0s"Type: string

WriteTimeout is the HTTP server write timeout
check net/http.server.WriteTimeout


Examples:

"1m"
 
"300ms"
diff --git a/docs/config-file/node-config-doc.md b/docs/config-file/node-config-doc.md
index d141187a2c..e2df90a6f2 100644
--- a/docs/config-file/node-config-doc.md
+++ b/docs/config-file/node-config-doc.md
@@ -709,7 +709,6 @@ SecretKey=""
 | - [FreeGasAddress](#Pool_FreeGasAddress )                                       | No      | array of string | No         | -          | XLayer config
FreeGasAddress is the default free gas address | | - [FreeClaimGasLimit](#Pool_FreeClaimGasLimit ) | No | integer | No | - | FreeClaimGasLimit is the max gas allowed use to do a free claim | | - [BridgeClaimMethodSigs](#Pool_BridgeClaimMethodSigs ) | No | array of string | No | - | BridgeClaimMethodSignature for tracking BridgeClaimMethodSignature method | -| - [PendingStat](#Pool_PendingStat ) | No | object | No | - | PendingStat is the configuration for the pending statistics | ### 7.1. `Pool.IntervalToRefreshBlockedAddresses` @@ -1249,102 +1248,6 @@ FreeClaimGasLimit=150000 **Type:** : `array of string` **Description:** BridgeClaimMethodSignature for tracking BridgeClaimMethodSignature method -### 7.18. `[Pool.PendingStat]` - -**Type:** : `object` -**Description:** PendingStat is the configuration for the pending statistics - -| Property | Pattern | Type | Deprecated | Definition | Title/Description | -| --------------------------------------------------- | ------- | ------- | ---------- | ---------- | ----------------- | -| - [Enable](#Pool_PendingStat_Enable ) | No | boolean | No | - | - | -| - [Interval](#Pool_PendingStat_Interval ) | No | string | No | - | Duration | -| - [StaleInterval](#Pool_PendingStat_StaleInterval ) | No | string | No | - | Duration | -| - [CacheInternal](#Pool_PendingStat_CacheInternal ) | No | string | No | - | Duration | - -#### 7.18.1. `Pool.PendingStat.Enable` - -**Type:** : `boolean` - -**Default:** `false` - -**Example setting the default value** (false): -``` -[Pool.PendingStat] -Enable=false -``` - -#### 7.18.2. `Pool.PendingStat.Interval` - -**Title:** Duration - -**Type:** : `string` - -**Default:** `"0s"` - -**Examples:** - -```json -"1m" -``` - -```json -"300ms" -``` - -**Example setting the default value** ("0s"): -``` -[Pool.PendingStat] -Interval="0s" -``` - -#### 7.18.3. `Pool.PendingStat.StaleInterval` - -**Title:** Duration - -**Type:** : `string` - -**Default:** `"0s"` - -**Examples:** - -```json -"1m" -``` - -```json -"300ms" -``` - -**Example setting the default value** ("0s"): -``` -[Pool.PendingStat] -StaleInterval="0s" -``` - -#### 7.18.4. `Pool.PendingStat.CacheInternal` - -**Title:** Duration - -**Type:** : `string` - -**Default:** `"0s"` - -**Examples:** - -```json -"1m" -``` - -```json -"300ms" -``` - -**Example setting the default value** ("0s"): -``` -[Pool.PendingStat] -CacheInternal="0s" -``` - ## 8. `[RPC]` **Type:** : `object` diff --git a/docs/config-file/node-config-schema.json b/docs/config-file/node-config-schema.json index 5aebce0147..12d15f76de 100644 --- a/docs/config-file/node-config-schema.json +++ b/docs/config-file/node-config-schema.json @@ -471,44 +471,6 @@ }, "type": "array", "description": "BridgeClaimMethodSignature for tracking BridgeClaimMethodSignature method" - }, - "PendingStat": { - "properties": { - "Enable": { - "type": "boolean", - "default": false - }, - "Interval": { - "type": "string", - "title": "Duration", - "default": "0s", - "examples": [ - "1m", - "300ms" - ] - }, - "StaleInterval": { - "type": "string", - "title": "Duration", - "default": "0s", - "examples": [ - "1m", - "300ms" - ] - }, - "CacheInternal": { - "type": "string", - "title": "Duration", - "default": "0s", - "examples": [ - "1m", - "300ms" - ] - } - }, - "additionalProperties": false, - "type": "object", - "description": "PendingStat is the configuration for the pending statistics" } }, "additionalProperties": false, diff --git a/jsonrpc/dynamic_gas_price_xlayer.go b/jsonrpc/dynamic_gas_price_xlayer.go index c90182e638..172da898c7 100644 --- a/jsonrpc/dynamic_gas_price_xlayer.go +++ b/jsonrpc/dynamic_gas_price_xlayer.go @@ -10,7 +10,6 @@ import ( zktypes "github.com/0xPolygonHermez/zkevm-node/config/types" "github.com/0xPolygonHermez/zkevm-node/jsonrpc/metrics" "github.com/0xPolygonHermez/zkevm-node/log" - "github.com/0xPolygonHermez/zkevm-node/pool" "github.com/ethereum/go-ethereum/core/types" ) @@ -213,21 +212,11 @@ func (e *EthEndpoints) getL2BatchTxsTips(ctx context.Context, l2BlockNumber uint } func (e *EthEndpoints) isCongested(ctx context.Context) (bool, error) { - var txCount uint64 - if e.pool != nil && e.pool.IsPendingStatEnabled(ctx) { - stat := pool.GetPendingStat() - if stat.Total < stat.SkipNonce+stat.BalanceIssue+stat.ErrorNonce { - txCount = 0 - } else { - txCount = stat.Total - stat.SkipNonce - stat.BalanceIssue - stat.ErrorNonce - } - } else { - cnt, err := e.pool.CountPendingTransactions(ctx) - if err != nil { - return false, err - } - txCount = cnt + txCount, err := e.pool.GetReadyTxCount(ctx) + if err != nil { + return false, err } + if txCount >= e.cfg.DynamicGP.CongestionTxThreshold { return true, nil } diff --git a/jsonrpc/endpoints_eth_xlayer.go b/jsonrpc/endpoints_eth_xlayer.go index bfde6a4356..c4f977c29f 100644 --- a/jsonrpc/endpoints_eth_xlayer.go +++ b/jsonrpc/endpoints_eth_xlayer.go @@ -16,7 +16,6 @@ import ( "github.com/0xPolygonHermez/zkevm-node/jsonrpc/metrics" "github.com/0xPolygonHermez/zkevm-node/jsonrpc/types" "github.com/0xPolygonHermez/zkevm-node/log" - "github.com/0xPolygonHermez/zkevm-node/pool" "github.com/0xPolygonHermez/zkevm-node/state" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" @@ -401,8 +400,23 @@ func (e *EthEndpoints) getMinPriceFromSequencerNode() (interface{}, types.Error) // GetPendingStat returns the pending stat func (e *EthEndpoints) GetPendingStat() (interface{}, types.Error) { - if e.isDisabled("eth_getPendingStat") || (e.pool != nil && !e.pool.IsPendingStatEnabled(context.Background())) { + if e.isDisabled("eth_getPendingStat") { return RPCErrorResponse(types.DefaultErrorCode, "not supported yet", nil, true) } - return pool.GetPendingStat(), nil + + pendingTotal, err := e.pool.CountPendingTransactions(context.Background()) + if err != nil { + return RPCErrorResponse(types.DefaultErrorCode, "failed to get pending transactions count", err, true) + } + readyTxCount, err := e.pool.GetReadyTxCount(context.Background()) + if err != nil { + return RPCErrorResponse(types.DefaultErrorCode, "failed to get ready tx count", err, true) + } + return struct { + Total uint64 `json:"total"` + ReadyTxCount uint64 `json:"readyTxCount"` + }{ + Total: pendingTotal, + ReadyTxCount: readyTxCount, + }, nil } diff --git a/jsonrpc/mocks/mock_pool_xlayer.go b/jsonrpc/mocks/mock_pool_xlayer.go index 2b785d244f..96b0069083 100644 --- a/jsonrpc/mocks/mock_pool_xlayer.go +++ b/jsonrpc/mocks/mock_pool_xlayer.go @@ -85,25 +85,32 @@ func (_m *PoolMock) GetMinSuggestedGasPriceWithDelta(ctx context.Context, delta return r0, r1 } -// IsPendingStatEnabled provides a mock function with given fields: ctx -func (_m *PoolMock) IsPendingStatEnabled(ctx context.Context) bool { +// GetReadyTxCount provides a mock function with given fields: ctx +func (_m *PoolMock) GetReadyTxCount(ctx context.Context) (uint64, error) { ret := _m.Called(ctx) if len(ret) == 0 { - panic("no return value specified for IsPendingStatEnabled") + panic("no return value specified for GetReadyTxCount") } - var r0 bool - if rf, ok := ret.Get(0).(func(context.Context) bool); ok { + var r0 uint64 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context) (uint64, error)); ok { return rf(ctx) } - if rf, ok := ret.Get(0).(func(context.Context) bool); ok { + if rf, ok := ret.Get(0).(func(context.Context) uint64); ok { r0 = rf(ctx) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(bool) + r0 = ret.Get(0).(uint64) } } - return r0 + if rf, ok := ret.Get(1).(func(context.Context) error); ok { + r1 = rf(ctx) + } else { + r1 = ret.Error(1) + } + + return r0, r1 } diff --git a/jsonrpc/types/interfaces.go b/jsonrpc/types/interfaces.go index 07aef05a26..f56127ac44 100644 --- a/jsonrpc/types/interfaces.go +++ b/jsonrpc/types/interfaces.go @@ -28,7 +28,7 @@ type PoolInterface interface { AddInnerTx(ctx context.Context, txHash common.Hash, innerTx []byte) error GetInnerTx(ctx context.Context, txHash common.Hash) (string, error) GetMinSuggestedGasPriceWithDelta(ctx context.Context, delta time.Duration) (uint64, error) - IsPendingStatEnabled(ctx context.Context) bool + GetReadyTxCount(ctx context.Context) (uint64, error) } // StateInterface gathers the methods required to interact with the state. diff --git a/pool/apollo_xlayer.go b/pool/apollo_xlayer.go index a5e885de0d..d912a4a2e6 100644 --- a/pool/apollo_xlayer.go +++ b/pool/apollo_xlayer.go @@ -66,7 +66,6 @@ func UpdateConfig(apolloConfig Config) { getApolloConfig().setFreeGasAddresses(apolloConfig.FreeGasAddress) getApolloConfig().EnableWhitelist = apolloConfig.EnableWhitelist getApolloConfig().setBridgeClaimMethods(apolloConfig.BridgeClaimMethodSigs) - getApolloConfig().EnablePendingStat = apolloConfig.PendingStat.Enable getApolloConfig().Unlock() } @@ -125,13 +124,3 @@ func getEnableWhitelist(enableWhitelist bool) bool { return enableWhitelist } - -func getEnablePendingStat(enablePendingStat bool) bool { - if getApolloConfig().enable() { - getApolloConfig().RLock() - defer getApolloConfig().RUnlock() - return getApolloConfig().EnablePendingStat - } - - return enablePendingStat -} diff --git a/pool/config.go b/pool/config.go index 73372bf335..b6733f1a48 100644 --- a/pool/config.go +++ b/pool/config.go @@ -57,9 +57,6 @@ type Config struct { FreeClaimGasLimit uint64 `mapstructure:"FreeClaimGasLimit"` // BridgeClaimMethodSignature for tracking BridgeClaimMethodSignature method BridgeClaimMethodSigs []string `mapstructure:"BridgeClaimMethodSigs"` - - // PendingStat is the configuration for the pending statistics - PendingStat PendingStatCfg `mapstructure:"PendingStat"` } // EffectiveGasPriceCfg contains the configuration properties for the effective gas price diff --git a/pool/interfaces.go b/pool/interfaces.go index 45089e0a54..9008a31754 100644 --- a/pool/interfaces.go +++ b/pool/interfaces.go @@ -42,12 +42,8 @@ type storage interface { GetEarliestProcessedTx(ctx context.Context) (common.Hash, error) AddInnerTx(ctx context.Context, txHash common.Hash, innerTx []byte) error GetInnerTx(ctx context.Context, txHash common.Hash) (string, error) - GetPendingFromAndMinNonceBefore(ctx context.Context, timeDuration time.Duration) ([]common.Address, []uint64, error) - LockStat(ctx context.Context, timeDuration time.Duration) (bool, error) - UnLockStat(ctx context.Context) error - UpdateStatAndUnlock(ctx context.Context, totoal, skip, balanceIssue, nonceIssue uint64) error - GetStat(ctx context.Context) (uint64, uint64, uint64, uint64, error) - CountTransactionsByFromStatusAndNonce(ctx context.Context, from common.Address, nonce uint64, status ...TxStatus) (uint64, error) + UpdateReadyTxCount(ctx context.Context, count uint64) error + GetReadyTxCount(ctx context.Context) (uint64, error) } type stateInterface interface { diff --git a/pool/pendingstat_xlayer.go b/pool/pendingstat_xlayer.go deleted file mode 100644 index 5c65d987fe..0000000000 --- a/pool/pendingstat_xlayer.go +++ /dev/null @@ -1,190 +0,0 @@ -package pool - -import ( - "context" - "fmt" - - "github.com/0xPolygonHermez/zkevm-node/config/types" - "github.com/0xPolygonHermez/zkevm-node/log" - "github.com/0xPolygonHermez/zkevm-node/pool/trace" - "github.com/0xPolygonHermez/zkevm-node/state" - "github.com/ethereum/go-ethereum/common" - "github.com/google/uuid" -) - -// PendingStatCfg is the configuration for the pending stat -type PendingStatCfg struct { - Enable bool `mapstructure:"Enable"` - Interval types.Duration `mapstructure:"Interval"` - StaleInterval types.Duration `mapstructure:"StaleInterval"` - CacheInternal types.Duration `mapstructure:"CacheInternal"` -} - -func (p *Pool) startPendingStat() { - if p.cfg.PendingStat.Enable { - go state.InfiniteSafeRun(p.updatePendingStat, "error updating pending stat", p.cfg.PendingStat.Interval.Duration) - go state.InfiniteSafeRun(p.updatePendingStatCache, "error updating pending stat cache", p.cfg.PendingStat.CacheInternal.Duration) - } -} - -// updatePendingStat updates the pending statistics -// 1. find all pending transactions count -// 2. find all pending address and min nonce received before the stale interval -// 3. find all pending address that skip nonce -// 4. find all pending address that have balance issue -func (p *Pool) updatePendingStat() { - if !getEnablePendingStat(p.cfg.PendingStat.Enable) { - return - } - ctx := context.WithValue(context.Background(), trace.ID, uuid.New().String()) - locked, err := p.storage.LockStat(ctx, p.cfg.PendingStat.Interval.Duration) - if err != nil { - return - } - if !locked { - return - } - defer func() { - err = p.storage.UnLockStat(ctx) - if err != nil { - log.WithFields(trace.GetID(ctx)).Error("error unlocking stat", "err", err) - } - }() - - mLog := log.WithFields(trace.GetID(ctx)) - mLog.Infof("updating pending stat") - - totalCount, err := p.CountTransactionsByStatus(ctx, TxStatusPending) - if err != nil { - mLog.Error("error getting pending transactions count", "err", err) - return - } - mLog.Infof("total pending transactions %v", totalCount) - - address, nonces, err := p.storage.GetPendingFromAndMinNonceBefore(ctx, p.cfg.PendingStat.StaleInterval.Duration) - if err != nil { - mLog.Error("error getting pending address and min nonce before ", err) - return - } - mLog.Infof("pending address count %v before %v", len(address), p.cfg.PendingStat.StaleInterval.Duration) - - skipNonceAddress, continueNonceAddress, continueNonces, errNonceAddress, noncesNormal, err := p.filterAddress(ctx, address, nonces) - if err != nil { - mLog.Error("error filtering skip nonce address", "err", err) - return - } - mLog.Infof("skip nonce address count %v, continue nonce address count %v, err nonce address count %v", len(skipNonceAddress), len(continueNonceAddress), len(errNonceAddress)) - - totalSkipNonceTransactions, err := p.countSkipNonceTransactions(ctx, skipNonceAddress) - if err != nil { - mLog.Error("error counting skip nonce transactions", "err", err) - return - } - mLog.Infof("total skip nonce transactions %v", totalSkipNonceTransactions) - - totalBalanceIssueTransactions, err := p.countBalanceIssueTransactions(ctx, continueNonceAddress, continueNonces) - if err != nil { - mLog.Error("error counting balance issue transactions", "err", err) - return - } - mLog.Infof("total balance issue transactions %v", totalBalanceIssueTransactions) - - totalErrNonceTransactions, err := p.countErrNonceTransactions(ctx, errNonceAddress, noncesNormal) - if err != nil { - mLog.Error("error counting nonce issue transactions", "err", err) - return - } - mLog.Infof("total nonce issue transactions %v", totalErrNonceTransactions) - - err = p.storage.UpdateStatAndUnlock(ctx, totalCount, totalSkipNonceTransactions, totalBalanceIssueTransactions, totalErrNonceTransactions) - if err != nil { - mLog.Error("error updating stat and unlock", "err", err) - } else { - mLog.Infof("total %v, skip nonce %v, balance issue %v, nonce issue %v", totalCount, totalSkipNonceTransactions, totalBalanceIssueTransactions, totalErrNonceTransactions) - } -} - -func (p *Pool) filterAddress(ctx context.Context, addresses []common.Address, nonces []uint64) ([]common.Address, []common.Address, []uint64, []common.Address, []uint64, error) { - var skipNonceAddresses []common.Address - var continueNonceAddresses []common.Address - var continueNonces []uint64 - var errNonceAddresses []common.Address - var noncesNormal []uint64 - - lastL2Block, err := p.state.GetLastL2Block(ctx, nil) - if err != nil { - return nil, nil, nil, nil, nil, fmt.Errorf("failed to load last l2 block while adding tx to the pool: %w", err) - } - for i, addr := range addresses { - nonce, err := p.state.GetNonce(ctx, addr, lastL2Block.Root()) - if err != nil { - return nil, nil, nil, nil, nil, fmt.Errorf("failed to load nonce while adding tx to the pool: %w", err) - } - if nonces[i]-nonce > 0 { - skipNonceAddresses = append(skipNonceAddresses, addr) - } else if (nonces[i] - nonce) == 0 { - continueNonceAddresses = append(continueNonceAddresses, addr) - continueNonces = append(continueNonces, nonces[i]) - } else { - errNonceAddresses = append(errNonceAddresses, addr) - noncesNormal = append(noncesNormal, nonce) - } - } - return skipNonceAddresses, continueNonceAddresses, continueNonces, errNonceAddresses, noncesNormal, nil -} - -func (p *Pool) countSkipNonceTransactions(ctx context.Context, addresses []common.Address) (uint64, error) { - var totalSkipNonceTransactions uint64 - for _, addr := range addresses { - count, err := p.storage.CountTransactionsByFromAndStatus(ctx, addr, TxStatusPending) - if err != nil { - return 0, err - } - totalSkipNonceTransactions += count - } - return totalSkipNonceTransactions, nil -} - -func (p *Pool) countBalanceIssueTransactions(ctx context.Context, addresses []common.Address, nonces []uint64) (uint64, error) { - var totalBalanceIssueTransactions uint64 - mLog := log.WithFields(trace.GetID(ctx)) - - lastL2Block, err := p.state.GetLastL2Block(ctx, nil) - if err != nil { - return 0, fmt.Errorf("failed to load last l2 block while adding tx to the pool: %w", err) - } - - for i, addr := range addresses { - txs, err := p.storage.GetTxsByFromAndNonce(ctx, addr, nonces[i]) - if err != nil || len(txs) > 1 { - mLog.Warnf("error getting transactions by from %v and nonce %v tx count %v, err %v", addr, nonces[i], len(txs), err) - continue - } - balance, err := p.state.GetBalance(ctx, addr, lastL2Block.Root()) - if err != nil { - mLog.Warnf("error getting balance for address %v, l2block %v, err %v", addr, err, lastL2Block.Root()) - continue - } - if balance.Cmp(txs[0].Cost()) < 0 { - count, err := p.storage.CountTransactionsByFromAndStatus(ctx, addr, TxStatusPending) - if err != nil { - mLog.Warnf("error getting transactions count by from %v, err %v", addr, err) - } - totalBalanceIssueTransactions += count - } - } - - return totalBalanceIssueTransactions, nil -} - -func (p *Pool) countErrNonceTransactions(ctx context.Context, addresses []common.Address, nonces []uint64) (uint64, error) { - var totalErrNonceTransactions uint64 - for i, addr := range addresses { - count, err := p.storage.CountTransactionsByFromStatusAndNonce(ctx, addr, nonces[i], TxStatusPending) - if err != nil { - return 0, err - } - totalErrNonceTransactions += count - } - return totalErrNonceTransactions, nil -} diff --git a/pool/pendingstatcache_xlayer.go b/pool/pendingstatcache_xlayer.go deleted file mode 100644 index 0ca6a2134d..0000000000 --- a/pool/pendingstatcache_xlayer.go +++ /dev/null @@ -1,71 +0,0 @@ -package pool - -import ( - "context" - "sync" - "sync/atomic" - - "github.com/0xPolygonHermez/zkevm-node/log" - "github.com/0xPolygonHermez/zkevm-node/pool/trace" - "github.com/google/uuid" -) - -// PendingStat is the pending stat -type PendingStat struct { - // Total is the total number of pending transactions - Total uint64 `json:"total"` - - // SkipNonce is the number of transactions that skipped nonce - SkipNonce uint64 `json:"skipNonce"` - - // BalanceIssue is the number of transactions that have balance issue - BalanceIssue uint64 `json:"balanceIssue"` - - // ErrorNonce is the number of transactions that have nonce issue - ErrorNonce uint64 `json:"errorNonce"` -} - -var pendingStatInst *PendingStat -var pendingStatOnce sync.Once - -// GetPendingStat returns the singleton instance -func GetPendingStat() *PendingStat { - pendingStatOnce.Do(func() { - pendingStatInst = &PendingStat{} - }) - return pendingStatInst -} - -func (ps *PendingStat) setStat(stat PendingStat) { - atomic.StoreUint64(&ps.Total, stat.Total) - atomic.StoreUint64(&ps.SkipNonce, stat.SkipNonce) - atomic.StoreUint64(&ps.BalanceIssue, stat.BalanceIssue) -} - -// GetStat returns the pending stat -func (ps *PendingStat) GetStat() PendingStat { - return PendingStat{ - Total: atomic.LoadUint64(&ps.Total), - SkipNonce: atomic.LoadUint64(&ps.SkipNonce), - BalanceIssue: atomic.LoadUint64(&ps.BalanceIssue), - } -} - -func (p *Pool) updatePendingStatCache() { - if !getEnablePendingStat(p.cfg.PendingStat.Enable) { - return - } - ctx := context.WithValue(context.Background(), trace.ID, uuid.New().String()) - mLog := log.WithFields(trace.GetID(ctx)) - total, skip, balanceIssue, nonceIssue, err := p.storage.GetStat(ctx) - if err != nil { - mLog.Errorf("error getting stat: %v", err) - return - } - GetPendingStat().setStat(PendingStat{ - Total: total, - SkipNonce: skip, - BalanceIssue: balanceIssue, - ErrorNonce: nonceIssue, - }) -} diff --git a/pool/pgpoolstorage/pgpoolstorage_xlayer.go b/pool/pgpoolstorage/pgpoolstorage_xlayer.go index 0d69cdebb0..40cf651f97 100644 --- a/pool/pgpoolstorage/pgpoolstorage_xlayer.go +++ b/pool/pgpoolstorage/pgpoolstorage_xlayer.go @@ -5,9 +5,6 @@ import ( "errors" "time" - "github.com/0xPolygonHermez/zkevm-node/log" - "github.com/0xPolygonHermez/zkevm-node/pool" - "github.com/0xPolygonHermez/zkevm-node/pool/trace" "github.com/ethereum/go-ethereum/common" "github.com/jackc/pgx/v4" ) @@ -70,85 +67,18 @@ func (p *PostgresPoolStorage) GetInnerTx(ctx context.Context, txHash common.Hash return innerTx, nil } -// GetPendingFromAndMinNonceBefore get pending from and min nonce before timeDuration -func (p *PostgresPoolStorage) GetPendingFromAndMinNonceBefore(ctx context.Context, timeDuration time.Duration) ([]common.Address, []uint64, error) { - sql := `SELECT from_address, MIN(nonce) FROM pool."transaction" where status='pending' and "received_at" < $1 GROUP BY from_address` - - mLog := log.WithFields(trace.GetID(ctx)) - timeStamp := time.Now().Add(-timeDuration) - rows, err := p.db.Query(ctx, sql, timeStamp) - if err != nil { - if errors.Is(err, pgx.ErrNoRows) { - mLog.Infof("no pending transactions before %v", timeStamp) - return nil, nil, nil - } else { - return nil, nil, err - } - } - defer rows.Close() - - var addresses []common.Address - var nonces []uint64 - for rows.Next() { - var address string - var nonce uint64 - err := rows.Scan(&address, &nonce) - if err != nil { - return nil, nil, err - } - addresses = append(addresses, common.HexToAddress(address)) - nonces = append(nonces, nonce) - } - mLog.Infof("pending address count %v before %v", len(addresses), timeStamp) - - return addresses, nonces, nil -} - -// CREATE TABLE pool.stat ( -// id INT PRIMARY KEY NOT NULL, -// total INT, -// skip_nonce INT, -// balance_issue INT, -// nonce_issue INT, -// locked INT, // 1 for unlocked 2 for locked -// created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, +// CREATE TABLE pool.readytx( +// id SERIAL PRIMARY KEY NOT NULL, +// count INT, // updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP // ); -// insert into pool.stat(id, total, skip_nonce, balance_issue, nonce_issue, locked) values(1, 0, 0, 0, 0, 1); - -// LockStat lock stat -func (p *PostgresPoolStorage) LockStat(ctx context.Context, timeDuration time.Duration) (bool, error) { - timeStamp := time.Now().Add(-timeDuration) - sql := `UPDATE pool.stat SET locked = 2 WHERE locked = 1 and updated_at < $1 and id=1` - - stat, err := p.db.Exec(ctx, sql, timeStamp) - if err != nil { - return false, err - } - if stat.RowsAffected() > 0 { - return true, nil - } - - return false, nil -} - -// UnLockStat unlock stat -func (p *PostgresPoolStorage) UnLockStat(ctx context.Context) error { - sql := `UPDATE pool.stat SET locked = 1 WHERE locked = 2 and id=1` - - _, err := p.db.Exec(ctx, sql) - if err != nil { - return err - } +// insert into pool.readytx(id, count) values(1, 0); - return nil -} - -// UpdateStatAndUnlock update stat and unlock -func (p *PostgresPoolStorage) UpdateStatAndUnlock(ctx context.Context, totoal, skip, balanceIssue, nonceIssue uint64) error { - sql := `UPDATE pool.stat SET total = $1, skip_nonce = $2, balance_issue = $3, nonce_issue = $4, locked = 1, updated_at = CURRENT_TIMESTAMP WHERE id=1` +// UpdateReadyTxCount update ready tx count +func (p *PostgresPoolStorage) UpdateReadyTxCount(ctx context.Context, count uint64) error { + sql := `UPDATE pool.readytx SET count = $1, updated_at = $2 WHERE id=1` - _, err := p.db.Exec(ctx, sql, totoal, skip, balanceIssue, nonceIssue) + _, err := p.db.Exec(ctx, sql, count, time.Now()) if err != nil { return err } @@ -156,26 +86,15 @@ func (p *PostgresPoolStorage) UpdateStatAndUnlock(ctx context.Context, totoal, s return nil } -// GetStat get stat -func (p *PostgresPoolStorage) GetStat(ctx context.Context) (uint64, uint64, uint64, uint64, error) { - sql := `SELECT total, skip_nonce, balance_issue, nonce_issue FROM pool.stat WHERE id=1` +// GetReadyTxCount get ready tx count +func (p *PostgresPoolStorage) GetReadyTxCount(ctx context.Context) (uint64, error) { + sql := `SELECT count FROM pool.readytx where id=1` - var total, skip, balanceIssue, nonceIssue uint64 - err := p.db.QueryRow(ctx, sql).Scan(&total, &skip, &balanceIssue, &nonceIssue) - if err != nil { - return 0, 0, 0, 0, err - } - - return total, skip, balanceIssue, nonceIssue, nil -} - -// CountTransactionsByFromStatusAndNonce count transactions by from status and nonce -func (p *PostgresPoolStorage) CountTransactionsByFromStatusAndNonce(ctx context.Context, from common.Address, nonce uint64, status ...pool.TxStatus) (uint64, error) { - sql := "SELECT COUNT(*) FROM pool.transaction WHERE from_address = $1 AND nonce <= $2 AND status = ANY ($3)" - var counter uint64 - err := p.db.QueryRow(ctx, sql, from.String(), nonce, status).Scan(&counter) + var count uint64 + err := p.db.QueryRow(ctx, sql).Scan(&count) if err != nil { return 0, err } - return counter, nil + + return count, nil } diff --git a/pool/pool.go b/pool/pool.go index 37cd2b5739..8ac72acce6 100644 --- a/pool/pool.go +++ b/pool/pool.go @@ -96,8 +96,6 @@ func NewPool(cfg Config, batchConstraintsCfg state.BatchConstraintsCfg, s storag } }(&cfg, p) - p.startPendingStat() - return p } diff --git a/pool/pool_xlayer.go b/pool/pool_xlayer.go index a2bd2600fb..6944dbcd14 100644 --- a/pool/pool_xlayer.go +++ b/pool/pool_xlayer.go @@ -84,8 +84,3 @@ func (p *Pool) GetMinSuggestedGasPriceWithDelta(ctx context.Context, delta time. return p.storage.MinL2GasPriceSince(ctx, fromTimestamp) } - -// IsPendingStatEnabled checks if the pending stat is enabled -func (p *Pool) IsPendingStatEnabled(ctx context.Context) bool { - return getEnablePendingStat(p.cfg.PendingStat.Enable) -} diff --git a/sequencer/addrqueue_xlayer.go b/sequencer/addrqueue_xlayer.go new file mode 100644 index 0000000000..c700850d53 --- /dev/null +++ b/sequencer/addrqueue_xlayer.go @@ -0,0 +1,16 @@ +package sequencer + +func (a *addrQueue) GetTxCount() uint64 { + if a == nil { + return 0 + } + var readyTxCount uint64 + if a.readyTx != nil { + readyTxCount = 1 + } + notReadyTxCount := uint64(len(a.notReadyTxs)) + forcedTxCount := uint64(len(a.forcedTxs)) + pendingTxsToStoreCount := uint64(len(a.pendingTxsToStore)) + + return readyTxCount + notReadyTxCount + forcedTxCount + pendingTxsToStoreCount +} diff --git a/sequencer/interfaces.go b/sequencer/interfaces.go index a67839bd6d..36faf5891a 100644 --- a/sequencer/interfaces.go +++ b/sequencer/interfaces.go @@ -29,6 +29,7 @@ type txPool interface { GetL1AndL2GasPrice() (uint64, uint64) GetEarliestProcessedTx(ctx context.Context) (common.Hash, error) CountPendingTransactions(ctx context.Context) (uint64, error) + UpdateReadyTxCount(ctx context.Context, count uint64) error } // etherman contains the methods required to interact with ethereum. @@ -93,4 +94,5 @@ type workerInterface interface { NewTxTracker(tx pool.Transaction, usedZKcounters state.ZKCounters, reservedZKCouners state.ZKCounters, ip string) (*TxTracker, error) AddForcedTx(txHash common.Hash, addr common.Address) DeleteForcedTx(txHash common.Hash, addr common.Address) + CountReadyTx() uint64 } diff --git a/sequencer/l2block.go b/sequencer/l2block.go index bf56d64326..f953266a9d 100644 --- a/sequencer/l2block.go +++ b/sequencer/l2block.go @@ -478,6 +478,8 @@ func (f *finalizer) closeWIPL2Block(ctx context.Context) { f.addPendingL2BlockToProcess(ctx, f.wipL2Block) } + getPoolReadyTxCounter().setReadyTxCount(f.workerIntf.CountReadyTx()) + f.wipL2Block = nil } diff --git a/sequencer/mock_pool_xlayer.go b/sequencer/mock_pool_xlayer.go index 4da8a4b90f..8391e52dd9 100644 --- a/sequencer/mock_pool_xlayer.go +++ b/sequencer/mock_pool_xlayer.go @@ -9,4 +9,8 @@ import ( // DeleteFailedTransactionsOlderThan provides a mock function with given fields: ctx, date func (_m *PoolMock) CountPendingTransactions(ctx context.Context) (uint64, error) { return 0, nil +} + +func (_m *PoolMock) UpdateReadyTxCount(ctx context.Context, count uint64) error { + return nil } \ No newline at end of file diff --git a/sequencer/mock_worker_xlayer.go b/sequencer/mock_worker_xlayer.go new file mode 100644 index 0000000000..efb4563946 --- /dev/null +++ b/sequencer/mock_worker_xlayer.go @@ -0,0 +1,8 @@ +package sequencer + +// CountReadyTx provides a mock function with given fields: +func (_m *WorkerMock) CountReadyTx() uint64 { + _m.Called() + ret := _m.Called() + return ret.Get(0).(uint64) +} diff --git a/sequencer/pooltx_counter_xlayer.go b/sequencer/pooltx_counter_xlayer.go new file mode 100644 index 0000000000..b9cfdf1a87 --- /dev/null +++ b/sequencer/pooltx_counter_xlayer.go @@ -0,0 +1,31 @@ +package sequencer + +import ( + "sync" + "sync/atomic" +) + +// PoolReadyTxCounter is the struct that holds the ready tx counter +type PoolReadyTxCounter struct { + // Count is the number of ready transactions + Count uint64 +} + +var poolReadyTxCounterInst *PoolReadyTxCounter +var poolReadyTxCounterOnce sync.Once + +func getPoolReadyTxCounter() *PoolReadyTxCounter { + poolReadyTxCounterOnce.Do(func() { + poolReadyTxCounterInst = &PoolReadyTxCounter{} + }) + return poolReadyTxCounterInst +} + +func (ptx *PoolReadyTxCounter) setReadyTxCount(count uint64) { + atomic.StoreUint64(&ptx.Count, count) +} + +// Sum returns the sum of the ready tx counter +func (ptx *PoolReadyTxCounter) getReadyTxCount() uint64 { + return atomic.LoadUint64(&ptx.Count) +} diff --git a/sequencer/sequencer.go b/sequencer/sequencer.go index dcabff3aa8..25a24c54ad 100644 --- a/sequencer/sequencer.go +++ b/sequencer/sequencer.go @@ -104,6 +104,8 @@ func (s *Sequencer) Start(ctx context.Context) { go s.countPendingTx() + go s.countReadyTx() + if s.streamServer != nil { go s.sendDataToStreamer(s.cfg.StreamServer.ChainID) } diff --git a/sequencer/sequencer_xlayer.go b/sequencer/sequencer_xlayer.go index 6e72ae57da..50d5749ddf 100644 --- a/sequencer/sequencer_xlayer.go +++ b/sequencer/sequencer_xlayer.go @@ -6,6 +6,7 @@ import ( "github.com/0xPolygonHermez/zkevm-node/log" pmetric "github.com/0xPolygonHermez/zkevm-node/sequencer/metrics" + "github.com/0xPolygonHermez/zkevm-node/state" ) var countinterval = 10 @@ -21,3 +22,14 @@ func (s *Sequencer) countPendingTx() { pmetric.PendingTxCount(int(transactions)) } } + +func (s *Sequencer) updateReadyTxCount() { + err := s.pool.UpdateReadyTxCount(context.Background(), getPoolReadyTxCounter().getReadyTxCount()) + if err != nil { + log.Errorf("error adding ready tx count: %v", err) + } +} + +func (s *Sequencer) countReadyTx() { + state.InfiniteSafeRun(s.updateReadyTxCount, "error counting ready tx", time.Second) +} diff --git a/sequencer/worker.go b/sequencer/worker.go index 9b502b09fb..10e6ed64ed 100644 --- a/sequencer/worker.go +++ b/sequencer/worker.go @@ -23,6 +23,8 @@ type Worker struct { batchConstraints state.BatchConstraintsCfg readyTxsCond *timeoutCond claimGp *big.Int + + readyTxCounter map[string]uint64 } // NewWorker creates an init a worker @@ -33,6 +35,7 @@ func NewWorker(state stateInterface, constraints state.BatchConstraintsCfg, read state: state, batchConstraints: constraints, readyTxsCond: readyTxsCond, + readyTxCounter: make(map[string]uint64), claimGp: new(big.Int), } @@ -108,10 +111,12 @@ func (w *Worker) AddTxTracker(ctx context.Context, tx *TxTracker) (replacedTx *T if prevReadyTx != nil { log.Debugf("prevReadyTx %s (nonce: %d, gasPrice: %d, addr: %s) deleted from TxSortedList", prevReadyTx.HashStr, prevReadyTx.Nonce, prevReadyTx.GasPrice, tx.FromStr) w.txSortedList.delete(prevReadyTx) + w.deleteReadyTxCounter(prevReadyTx.FromStr) } if newReadyTx != nil { log.Debugf("newReadyTx %s (nonce: %d, gasPrice: %d, addr: %s) added to TxSortedList", newReadyTx.HashStr, newReadyTx.Nonce, newReadyTx.GasPrice, tx.FromStr) w.addTxToSortedList(newReadyTx) + w.setReadyTxCounter(addr.fromStr, addr.GetTxCount()) } if repTx != nil { @@ -132,10 +137,12 @@ func (w *Worker) applyAddressUpdate(from common.Address, fromNonce *uint64, from if prevReadyTx != nil { log.Debugf("prevReadyTx %s (nonce: %d, gasPrice: %d) deleted from TxSortedList", prevReadyTx.Hash.String(), prevReadyTx.Nonce, prevReadyTx.GasPrice) w.txSortedList.delete(prevReadyTx) + w.deleteReadyTxCounter(prevReadyTx.FromStr) } if newReadyTx != nil { log.Debugf("newReadyTx %s (nonce: %d, gasPrice: %d) added to TxSortedList", newReadyTx.Hash.String(), newReadyTx.Nonce, newReadyTx.GasPrice) w.addTxToSortedList(newReadyTx) + w.setReadyTxCounter(addrQueue.fromStr, addrQueue.GetTxCount()) } return newReadyTx, prevReadyTx, txsToDelete @@ -202,6 +209,7 @@ func (w *Worker) DeleteTx(txHash common.Hash, addr common.Address) { if deletedReadyTx != nil { log.Debugf("tx %s deleted from TxSortedList", deletedReadyTx.Hash.String()) w.txSortedList.delete(deletedReadyTx) + w.deleteReadyTxCounter(deletedReadyTx.FromStr) } } else { log.Warnf("addrQueue %s not found", addr.String()) @@ -342,7 +350,7 @@ func (w *Worker) GetBestFittingTx(resources state.BatchResources) (*TxTracker, e wg.Wait() if foundAt != -1 { - log.Debugf("best fitting tx %s found at index %d with gasPrice %d", tx.HashStr, foundAt, tx.GasPrice) + log.Infof("best fitting tx %s found at index %d with gasPrice %d", tx.HashStr, foundAt, tx.GasPrice) if !tx.IsClaimTx { w.claimGp = tx.GasPrice } @@ -366,6 +374,7 @@ func (w *Worker) ExpireTransactions(maxTime time.Duration) []*TxTracker { if prevReadyTx != nil { w.txSortedList.delete(prevReadyTx) + w.deleteReadyTxCounter(prevReadyTx.FromStr) } /*if addrQueue.IsEmpty() { diff --git a/sequencer/worker_xlayer.go b/sequencer/worker_xlayer.go new file mode 100644 index 0000000000..3286771bbc --- /dev/null +++ b/sequencer/worker_xlayer.go @@ -0,0 +1,30 @@ +package sequencer + +func (w *Worker) deleteReadyTxCounter(addr string) { + if w == nil || w.readyTxCounter == nil { + return + } + delete(w.readyTxCounter, addr) +} + +func (w *Worker) setReadyTxCounter(addr string, count uint64) { + if w == nil || w.readyTxCounter == nil { + return + } + w.readyTxCounter[addr] = count +} + +// CountReadyTx returns the number of ready transactions +func (w *Worker) CountReadyTx() uint64 { + if w == nil { + return 0 + } + w.workerMutex.Lock() + defer w.workerMutex.Unlock() + + var count uint64 + for _, c := range w.readyTxCounter { + count += c + } + return count +}