Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Strategy usage #24

Merged
32 commits merged into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
73e3e0e
AsyncTxStrategy should do one of the following during execute: raise …
derekpierre Mar 1, 2024
013279c
Adjust strategies to accomodate latest paradigm for execute i.e. retu…
derekpierre Mar 1, 2024
9c2ec6e
Adjust existing strategy tests since None can now be returned from ex…
derekpierre Mar 1, 2024
39b4b36
Update timeout strategy logging.
derekpierre Mar 3, 2024
cf88810
Update speedup strategy calcs to use math.ceil instead of round
derekpierre Mar 3, 2024
1c01980
Move from a fixed max tip to a max tip factor based on the current su…
derekpierre Mar 4, 2024
352f74a
Simplify legacy transaction speedup logic.
derekpierre Mar 4, 2024
0be4f52
Add tests for constructor parameters and legacy tx speed up functiona…
derekpierre Mar 4, 2024
c023dea
Update logging of gas conditions.
derekpierre Mar 4, 2024
979f3f9
Better annotations about what the strategy is doing.
derekpierre Mar 4, 2024
9589151
Don't modify tx nonce as part of strategy.
derekpierre Mar 4, 2024
b66f5af
Fix outdated comment about value of warn factor.
derekpierre Mar 4, 2024
bf13194
Clean up logging logic since old_tip, old_max_fee may or may not be p…
derekpierre Mar 4, 2024
f7bd316
Add tests for eip1559 transaction speedup.
derekpierre Mar 4, 2024
699d38f
Rename speed up strategy; while the rate value is fixed the updates a…
derekpierre Mar 4, 2024
3755667
Use constant instead of "maxPriorityFeePerGas".
derekpierre Mar 4, 2024
16c2ed3
Use constant for minimum required speedup increase percentage.
derekpierre Mar 4, 2024
1c67055
Use a max between a low default value and warn factor calc in case ti…
derekpierre Mar 4, 2024
f8cf36b
Add todo about best way of setting a cap on the speedup strategy.
derekpierre Mar 4, 2024
1f1fb28
Use reactor to determine when hook is called instead of repeatedly ca…
derekpierre Mar 4, 2024
abb8555
Add TODO to determine whether strategies can be overriden.
derekpierre Mar 4, 2024
5491a6e
Update logging category for monitor when no longer tracking tx.
derekpierre Mar 5, 2024
2a9c0b5
Use enable/disabling of auto mining for better testing.
derekpierre Mar 5, 2024
436ce64
Fix nonce logging message for strategies.
derekpierre Mar 5, 2024
c7851ae
Add the ability to update the tracker's active pending tx after a retry.
derekpierre Mar 5, 2024
7e647c0
Ensure that "from" value in tx params matches/equals signer.
derekpierre Mar 5, 2024
283ba70
Make __fire reusable by both broadcast and strategize - it is now lim…
derekpierre Mar 5, 2024
bd27cbf
Add tests to ensure that strategies are employed when a tx hasn't alr…
derekpierre Mar 5, 2024
25ad0b5
Add testing for "from" tx parameter handling when a tx is being queued.
derekpierre Mar 6, 2024
adac77d
Add test for strategies which do not make updates to parameters.
derekpierre Mar 6, 2024
dc6cda3
Add testing statements to ensure that the broadcast hook is also call…
derekpierre Mar 6, 2024
28f67f4
txhash is not optionally returned from __fire - either it is or an ex…
derekpierre Mar 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions atxm/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,6 @@ class InsufficientFunds(RPCError):
"""raised when a transaction exceeds the spending cap"""


class Wait(Exception):
"""
Raised when a strategy exceeds a limitation.
Used to mark a pending transaction as "wait, don't retry".
"""


class TransactionFaulted(Exception):
"""Raised when a transaction has been faulted."""

Expand Down
103 changes: 62 additions & 41 deletions atxm/machine.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from copy import deepcopy
from copy import copy, deepcopy
from typing import List, Optional, Type

from eth_account.signers.local import LocalAccount
Expand All @@ -9,17 +9,17 @@
from web3 import Web3
from web3.types import TxParams

from atxm.exceptions import TransactionFaulted, TransactionReverted, Wait
from atxm.exceptions import TransactionFaulted, TransactionReverted
from atxm.strategies import (
AsyncTxStrategy,
FixedRateSpeedUp,
ExponentialSpeedupStrategy,
InsufficientFundsPause,
TimeoutStrategy,
)
from atxm.tracker import _TxTracker
from atxm.tx import (
AsyncTx,
FutureTx,
PendingTx,
TxHash,
)
from atxm.utils import (
Expand Down Expand Up @@ -92,7 +92,7 @@ class _Machine(StateMachine):
STRATEGIES: List[Type[AsyncTxStrategy]] = [
InsufficientFundsPause,
TimeoutStrategy,
FixedRateSpeedUp,
ExponentialSpeedupStrategy,
]

class LogObserver:
Expand Down Expand Up @@ -240,7 +240,7 @@ def _sleep(self) -> None:
# Lifecycle
#

def __handle_active_transaction(self) -> bool:
def __handle_active_transaction(self) -> None:
"""
Handles the currently tracked pending transaction.

Expand All @@ -249,7 +249,7 @@ def __handle_active_transaction(self) -> bool:
1. paused
2. reverted (fault)
3. finalized
4. strategize: retry, wait, or fault
4. strategize: retry, do nothing and wait, or fault

Returns True if the next queued transaction can be broadcasted right now.
"""
Expand All @@ -262,7 +262,7 @@ def __handle_active_transaction(self) -> bool:
# Outcome 2: the pending transaction was reverted (final error)
except TransactionReverted as e:
self._tx_tracker.fault(fault_error=e)
return True
return

# Outcome 3: pending transaction is finalized (final success)
if receipt:
Expand All @@ -273,14 +273,13 @@ def __handle_active_transaction(self) -> bool:
f"with {confirmations} confirmation(s) txhash: {final_txhash.hex()}"
)
self._tx_tracker.finalize_active_tx(receipt=receipt)
return True
return

# Outcome 4: re-strategize the pending transaction
pending_tx = self.__strategize()
return pending_tx is not None
self.__strategize()

#
# Broadcast
# Broadcast tx
#

def __get_signer(self, address: str) -> LocalAccount:
Expand All @@ -290,7 +289,7 @@ def __get_signer(self, address: str) -> LocalAccount:
raise ValueError(f"Signer {address} not found")
return signer

def __fire(self, tx: FutureTx, msg: str) -> Optional[PendingTx]:
def __fire(self, tx: AsyncTx, msg: str) -> TxHash:
"""
Signs and broadcasts a transaction, handling RPC errors
and internal state changes.
Expand All @@ -301,57 +300,63 @@ def __fire(self, tx: FutureTx, msg: str) -> Optional[PendingTx]:
Morphs a `FutureTx` into a `PendingTx` and advances it
into the active transaction slot if broadcast is successful.
"""
signer: LocalAccount = self.__get_signer(tx._from)
try:
txhash = self.w3.eth.send_raw_transaction(
signer.sign_transaction(tx.params).rawTransaction
)
except ValueError as e:
_handle_rpc_error(e, tx=tx)
return
signer: LocalAccount = self.__get_signer(tx.params["from"])
txhash = self.w3.eth.send_raw_transaction(
signer.sign_transaction(tx.params).rawTransaction
)
self.log.info(
f"[{msg}] fired transaction #atx-{tx.id}|{tx.params['nonce']}|{txhash.hex()}"
)
pending_tx = self._tx_tracker.morph(tx=tx, txhash=txhash)
if tx.on_broadcast:
fire_hook(hook=tx.on_broadcast, tx=pending_tx)
return pending_tx
return txhash

def __strategize(self) -> Optional[PendingTx]:
def __strategize(self) -> None:
"""Retry the currently tracked pending transaction with the configured strategy."""
if not self._tx_tracker.pending:
raise RuntimeError("No active transaction to strategize")

_active_copy = deepcopy(self._tx_tracker.pending)
params_updated = False
for strategy in self._strategies:
try:
params = strategy.execute(pending=_active_copy)
except Wait as e:
log.info(f"[wait] strategy {strategy.__class__} signalled wait: {e}")
return
except TransactionFaulted as e:
self._tx_tracker.fault(fault_error=e)
return
if params:
# in case the strategy accidentally returns None
# keep the parameters as they are.
_active_copy.params.update(params)
params_updated = True

if not params_updated:
log.info(
f"[wait] strategies made no suggested updates to pending tx #{_active_copy.id} - skipping retry round"
)
return

# (!) retry the transaction with the new parameters
retry_params = TxParams(_active_copy.params)
_names = " -> ".join(s.name for s in self._strategies)
pending_tx = self.__fire(tx=retry_params, msg=_names)
self.log.info(f"[retry] transaction #{pending_tx.id} has been re-broadcasted")

return pending_tx
# TODO try-except needed here (similar to broadcast) #14, #18, #20
txhash = self.__fire(tx=_active_copy, msg=_names)

_active_copy.txhash = txhash
self._tx_tracker.update_after_retry(_active_copy)

def __broadcast(self) -> Optional[TxHash]:
pending_tx = self._tx_tracker.pending
self.log.info(f"[retry] transaction #{pending_tx.id} has been re-broadcasted")
if pending_tx.on_broadcast:
fire_hook(hook=pending_tx.on_broadcast, tx=pending_tx)

def __broadcast(self):
"""
Attempts to broadcast the next `FutureTx` in the queue.
If the broadcast is not successful, it is re-queued.
"""
future_tx = self._tx_tracker._pop() # popleft
future_tx.params = _make_tx_params(future_tx.params)

# update nonce as necessary
signer = self.__get_signer(future_tx._from)
nonce = self.w3.eth.get_transaction_count(signer.address, "latest")
if nonce > future_tx.params["nonce"]:
Expand All @@ -360,11 +365,19 @@ def __broadcast(self) -> Optional[TxHash]:
f"by another transaction. Updating queued tx nonce {future_tx.params['nonce']} -> {nonce}"
)
future_tx.params["nonce"] = nonce
pending_tx = self.__fire(tx=future_tx, msg="broadcast")
if not pending_tx:

try:
txhash = self.__fire(tx=future_tx, msg="broadcast")
except ValueError as e:
_handle_rpc_error(e, tx=future_tx)
# TODO don't requeue forever #12, #20
self._tx_tracker._requeue(future_tx)
return
return pending_tx.txhash

self._tx_tracker.morph(tx=future_tx, txhash=txhash)
pending_tx = self._tx_tracker.pending
if pending_tx.on_broadcast:
fire_hook(hook=pending_tx.on_broadcast, tx=pending_tx)

#
# Monitoring
Expand All @@ -379,7 +392,7 @@ def __monitor_finalized(self) -> None:
if tx in self._tx_tracker.finalized:
self._tx_tracker.finalized.remove(tx)
self.log.info(
f"[clear] stopped tracking {tx.txhash.hex()} after {confirmations} confirmations"
f"[monitor] stopped tracking {tx.txhash.hex()} after {confirmations} confirmations"
)
continue
self.log.info(
Expand Down Expand Up @@ -418,9 +431,17 @@ def queue_transaction(
if signer.address not in self.signers:
self.signers[signer.address] = signer

tx = self._tx_tracker._queue(
_from=signer.address, params=params, *args, **kwargs
)
params_copy = copy(params)

from_param = params_copy.get("from")
if from_param is None:
params_copy["from"] = signer.address
if from_param and from_param != signer.address:
raise ValueError(
f"Mismatched 'from' value ({from_param}) and 'signer' account ({signer.address})"
)

tx = self._tx_tracker._queue(params=params_copy, *args, **kwargs)
if not previously_busy_or_paused:
self._wake()

Expand Down
Loading
Loading