Skip to content

Commit

Permalink
soak dev to see how it behaves
Browse files Browse the repository at this point in the history
  • Loading branch information
Farber98 committed Jan 4, 2025
1 parent cc50224 commit 9810dad
Show file tree
Hide file tree
Showing 7 changed files with 297 additions and 612 deletions.
6 changes: 3 additions & 3 deletions pkg/solana/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@ var defaultConfigSet = Chain{
TxTimeout: config.MustNewDuration(time.Minute), // timeout for send tx method in client
TxRetryTimeout: config.MustNewDuration(10 * time.Second), // duration for tx rebroadcasting to RPC node
TxConfirmTimeout: config.MustNewDuration(30 * time.Second), // duration before discarding tx as unconfirmed. Set to 0 to disable discarding tx.
TxExpirationRebroadcast: ptr(false), // to enable rebroadcasting of expired transactions
TxExpirationRebroadcast: ptr(true), // to enable rebroadcasting of expired transactions
TxRetentionTimeout: config.MustNewDuration(0 * time.Second), // duration to retain transactions after being marked as finalized or errored. Set to 0 to immediately drop transactions.
SkipPreflight: ptr(true), // to enable or disable preflight checks
Commitment: ptr(string(rpc.CommitmentConfirmed)),
MaxRetries: ptr(int64(0)), // max number of retries (default = 0). when config.MaxRetries < 0), interpreted as MaxRetries = nil and rpc node will do a reasonable number of retries

// fee estimator
FeeEstimatorMode: ptr("fixed"),
FeeEstimatorMode: ptr("blockhistory"),
ComputeUnitPriceMax: ptr(uint64(1_000)),
ComputeUnitPriceMin: ptr(uint64(0)),
ComputeUnitPriceDefault: ptr(uint64(0)),
FeeBumpPeriod: config.MustNewDuration(3 * time.Second), // set to 0 to disable fee bumping
BlockHistoryPollPeriod: config.MustNewDuration(5 * time.Second),
BlockHistorySize: ptr(uint64(1)), // 1: uses latest block; >1: Uses multiple blocks, where n is number of blocks. DISCLAIMER: 1:1 ratio between n and RPC calls.
BlockHistorySize: ptr(uint64(10)), // 1: uses latest block; >1: Uses multiple blocks, where n is number of blocks. DISCLAIMER: 1:1 ratio between n and RPC calls.
ComputeUnitLimitDefault: ptr(uint32(200_000)), // set to 0 to disable adding compute unit limit
EstimateComputeUnitLimit: ptr(false), // set to false to disable compute unit limit estimation
}
Expand Down
180 changes: 27 additions & 153 deletions pkg/solana/txm/pendingtx.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,10 @@ type PendingTxContext interface {
GetTxState(id string) (TxState, error)
// TrimFinalizedErroredTxs removes transactions that have reached their retention time
TrimFinalizedErroredTxs() int
// GetSignatureInfo returns the transaction ID and TxState for the provided signature
GetSignatureInfo(sig solana.Signature) (txInfo, error)
// UpdateSignatureStatus updates the status of a signature in the SigToTxInfo map
UpdateSignatureStatus(sig solana.Signature, status TxState) error
// TxHasReorg determines whether a reorg has occurred for a given tx.
// It achieves this by comparing the highest aggregated state across all associated signatures with the current state of the transaction.
// If the highest aggregated state is less than the current state, a reorg has occurred and we need to handle it.
TxHasReorg(id string) bool
// OnReorg resets state to Broadcasted for given transaction ID
OnReorg(id string) error
TxHasReorg(sig solana.Signature, currentState TxState) (string, bool)
// GetPendingTx returns the pendingTx for the given ID if it exists
GetPendingTx(id string) (pendingTx, error)
}
Expand All @@ -80,7 +74,9 @@ type finishedTx struct {
}

type txInfo struct {
id string
// id of the transaction
id string
// state of the signature
state TxState
}

Expand Down Expand Up @@ -110,10 +106,6 @@ func newPendingTxContext() *pendingTxContext {

func (c *pendingTxContext) New(tx pendingTx) error {
err := c.withReadLock(func() error {
// Check if ID already exists in any of the maps
if _, exists := c.broadcastedProcessedTxs[tx.id]; exists {
return ErrIDAlreadyExists
}
// Check if ID already exists in any of the maps
if _, exists := c.broadcastedProcessedTxs[tx.id]; exists {
return ErrIDAlreadyExists
Expand All @@ -130,7 +122,7 @@ func (c *pendingTxContext) New(tx pendingTx) error {
return err
}

// upgrade to write lock if id do not exist
// upgrade to write lock if id does not exist
_, err = c.withWriteLock(func() (string, error) {
// Check if ID already exists in any of the maps
if _, exists := c.broadcastedProcessedTxs[tx.id]; exists {
Expand Down Expand Up @@ -579,142 +571,36 @@ func (c *pendingTxContext) TrimFinalizedErroredTxs() int {
return len(expiredIDs)
}

func (c *pendingTxContext) GetSignatureInfo(sig solana.Signature) (txInfo, error) {
// TxHasReorg determines whether a reorg has occurred for a tx given a signature and it's current on-chain state.
// A regression is identified when the state of a signature transitions as follows:
// - Confirmed -> Processed || Broadcasted || Not Found
// - Processed -> Broadcasted || Not Found
// Returns the id of the transaction and a boolean indicating if a reorg has occurred.
func (c *pendingTxContext) TxHasReorg(sig solana.Signature, sigOnChainState TxState) (string, bool) {
c.lock.RLock()
defer c.lock.RUnlock()

info, exists := c.sigToTxInfo[sig]
txInfo, exists := c.sigToTxInfo[sig]
if !exists {
return txInfo{}, ErrSigDoesNotExist
}
return info, nil
}

func (c *pendingTxContext) OnReorg(id string) error {
err := c.withReadLock(func() error {
// Check if the transaction is still in a non finalized/errored state
var broadcastedExists, confirmedExists bool
_, broadcastedExists = c.broadcastedProcessedTxs[id]
_, confirmedExists = c.confirmedTxs[id]
if !broadcastedExists && !confirmedExists {
return ErrTransactionNotFound
}
return nil
})
if err != nil {
// If transaction or sig are not found, return
return err
return "", false
}

var pTx pendingTx
// Acquire a write lock to perform the state reset if needed
_, err = c.withWriteLock(func() (string, error) {
// Retrieve tx again inside the write lock
tx, broadcastedProcessedExists := c.broadcastedProcessedTxs[id]
if broadcastedProcessedExists {
pTx = tx
// Compare our in-memory state of the sig with the current on-chain state to determine if a reorg has occurred
sigInMemoryState := txInfo.state
var hasReorg bool
switch sigInMemoryState {
case Confirmed:
if sigOnChainState == Processed || sigOnChainState == Broadcasted || sigOnChainState == NotFound {
hasReorg = true
}
tx, confirmedExists := c.confirmedTxs[id]
if confirmedExists {
pTx = tx
case Processed:
if sigOnChainState == Broadcasted || sigOnChainState == NotFound {
hasReorg = true
}

if !broadcastedProcessedExists && !confirmedExists {
// transaction does not exist in any non finalized/errored maps
return "", ErrTransactionNotFound
}

// Reset the tx state to 'Broadcasted' upon detecting a reorg.
// Even if the tx might have already progressed to 'Processed' after the reorg, we reset it to 'Broadcasted' for simplicity here.
// Any state advancements (e.g., moving to 'Processed') will be picked up by the current status polling cycle after handling the reorg.
// This does not introduce risks with the expiration logic since we check for status changes before considering a tx for expiration.
pTx.state = Broadcasted
c.broadcastedProcessedTxs[id] = pTx

// If the transaction regressed from confirmed state, we also need to remove it from the confirmed map
if confirmedExists {
delete(c.confirmedTxs, id)
}

return "", nil
})
if err != nil {
// If transaction was not found
return err
default: // No reorg if the signature is not in a state that can be reorged
}

return nil
}

// TxHasReorg determines whether a reorg has occurred for a given tx.
// It achieves this by comparing the highest aggregated state across all associated signatures with the current state of the transaction.
// If the highest aggregated state is less than the current state, a reorg has occurred and we need to handle it.
func (c *pendingTxContext) TxHasReorg(id string) bool {
var pTx pendingTx
var broadcastedExists, confirmedExists bool
highestSigAggState := Broadcasted

c.lock.RLock()
defer c.lock.RUnlock()
// Check if the transaction is still in a non finalized/errored state
tx, broadcastedExists := c.broadcastedProcessedTxs[id]
if broadcastedExists {
pTx = tx
}
tx, confirmedExists = c.confirmedTxs[id]
if confirmedExists {
pTx = tx
}
if !broadcastedExists && !confirmedExists {
return false
}

// Get the highest state among all signatures
for _, sig := range pTx.signatures {
info, exists := c.sigToTxInfo[sig]
if !exists {
continue
}
highestSigAggState = max(highestSigAggState, info.state)
}

// If the highest state among all signatures is less than the transaction state, then a reorg has occurred
return highestSigAggState < pTx.state
}

func (c *pendingTxContext) UpdateSignatureStatus(sig solana.Signature, status TxState) error {
// Acquire a read lock to check if the signature exists and needs to be reset
err := c.withReadLock(func() error {
// Check if the signature is still being tracked
_, exists := c.sigToTxInfo[sig]
if !exists {
return ErrSigDoesNotExist
}
return nil
})
if err != nil {
// If sig not found, return
return err
}

// Acquire a write lock to perform the state reset
_, err = c.withWriteLock(func() (string, error) {
// Retrieve sig again inside the write lock
info, exists := c.sigToTxInfo[sig]
if !exists {
return "", ErrSigDoesNotExist
}
// Update the status of the signature
info.state = status
c.sigToTxInfo[sig] = info
return "", nil
})
if err != nil {
// If sig was not found
return err
}

return nil
return txInfo.id, hasReorg
}

func (c *pendingTxContext) GetPendingTx(id string) (pendingTx, error) {
Expand Down Expand Up @@ -862,20 +748,8 @@ func (c *pendingTxContextWithProm) TrimFinalizedErroredTxs() int {
return c.pendingTx.TrimFinalizedErroredTxs()
}

func (c *pendingTxContextWithProm) GetSignatureInfo(sig solana.Signature) (txInfo, error) {
return c.pendingTx.GetSignatureInfo(sig)
}

func (c *pendingTxContextWithProm) OnReorg(id string) error {
return c.pendingTx.OnReorg(id)
}

func (c *pendingTxContextWithProm) TxHasReorg(id string) bool {
return c.pendingTx.TxHasReorg(id)
}

func (c *pendingTxContextWithProm) UpdateSignatureStatus(sig solana.Signature, status TxState) error {
return c.pendingTx.UpdateSignatureStatus(sig, status)
func (c *pendingTxContextWithProm) TxHasReorg(sig solana.Signature, currentSigState TxState) (string, bool) {
return c.pendingTx.TxHasReorg(sig, currentSigState)
}

func (c *pendingTxContextWithProm) GetPendingTx(id string) (pendingTx, error) {
Expand Down
Loading

0 comments on commit 9810dad

Please sign in to comment.