From 7c0d643565bb9f732720c9f0cdc67a6815829264 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Tue, 10 Dec 2024 23:32:44 +0200 Subject: [PATCH] Brings the async_substrate_interface up to date with that in SDK --- .../bittensor/async_substrate_interface.py | 712 ++++++++++-------- .../src/bittensor/subtensor_interface.py | 3 +- .../src/commands/stake/children_hotkeys.py | 6 +- 3 files changed, 414 insertions(+), 307 deletions(-) diff --git a/bittensor_cli/src/bittensor/async_substrate_interface.py b/bittensor_cli/src/bittensor/async_substrate_interface.py index af54b531..148f99be 100644 --- a/bittensor_cli/src/bittensor/async_substrate_interface.py +++ b/bittensor_cli/src/bittensor/async_substrate_interface.py @@ -1,14 +1,23 @@ +""" +This library comprises the asyncio-compatible version of the subtensor interface commands we use in bittensor, as +well as its helper functions and classes. The docstring for the `AsyncSubstrateInterface` class goes more in-depth in +regard to how to instantiate and use it. +""" + import asyncio +import inspect import json import random +import ssl +import time from collections import defaultdict from dataclasses import dataclass from hashlib import blake2b from typing import Optional, Any, Union, Callable, Awaitable, cast, TYPE_CHECKING from async_property import async_property -from bt_decode import PortableRegistry, decode as decode_by_type_string, MetadataV15 from bittensor_wallet import Keypair +from bt_decode import PortableRegistry, decode as decode_by_type_string, MetadataV15 from scalecodec import GenericExtrinsic from scalecodec.base import ScaleBytes, ScaleType, RuntimeConfigurationObject from scalecodec.type_registry import load_type_registry_preset @@ -22,7 +31,7 @@ from websockets.asyncio.client import connect from websockets.exceptions import ConnectionClosed -from bittensor_cli.src.bittensor.utils import hex_to_bytes +from bittensor_cli.src.bittensor.utils import hex_to_bytes, err_console if TYPE_CHECKING: from websockets.asyncio.client import ClientConnection @@ -30,14 +39,6 @@ ResultHandler = Callable[[dict, Any], Awaitable[tuple[dict, bool]]] -class TimeoutException(Exception): - pass - - -def timeout_handler(signum, frame): - raise TimeoutException("Operation timed out") - - class ExtrinsicReceipt: """ Object containing information of submitted extrinsic. Block hash where extrinsic is included is required @@ -57,12 +58,11 @@ def __init__( Object containing information of submitted extrinsic. Block hash where extrinsic is included is required when retrieving triggered events or determine if extrinsic was successful - Parameters - ---------- - substrate - extrinsic_hash - block_hash - finalized + Args: + substrate: the AsyncSubstrateInterface instance + extrinsic_hash: the hash of the extrinsic + block_hash: the hash of the block on which this extrinsic exists + finalized: whether the extrinsic is finalized """ self.substrate = substrate self.extrinsic_hash = extrinsic_hash @@ -486,15 +486,11 @@ def reload_type_registry( Reload type registry and preset used to instantiate the SubstrateInterface object. Useful to periodically apply changes in type definitions when a runtime upgrade occurred - Parameters - ---------- - use_remote_preset: When True preset is downloaded from Github master, otherwise use files from local installed - scalecodec package - auto_discover - - Returns - ------- - + Args: + use_remote_preset: When True preset is downloaded from Github master, otherwise use files from local installed + scalecodec package + auto_discover: Whether to automatically discover the type registry presets based on the chain name and the + type registry """ self.runtime_config.clear_type_registry() @@ -513,8 +509,10 @@ def apply_type_registry_presets( ): """ Applies type registry presets to the runtime - :param use_remote_preset: bool, whether to use presets from remote - :param auto_discover: bool, whether to use presets from local installed scalecodec package + + Args: + use_remote_preset: whether to use presets from remote + auto_discover: whether to use presets from local installed scalecodec package """ if self.type_registry_preset is not None: # Load type registry according to preset @@ -622,10 +620,11 @@ def __init__( Websocket manager object. Allows for the use of a single websocket connection by multiple calls. - :param ws_url: Websocket URL to connect to - :param max_subscriptions: Maximum number of subscriptions per websocket connection - :param max_connections: Maximum number of connections total - :param shutdown_timer: Number of seconds to shut down websocket connection after last use + Args: + ws_url: Websocket URL to connect to + max_subscriptions: Maximum number of subscriptions per websocket connection + max_connections: Maximum number of connections total + shutdown_timer: Number of seconds to shut down websocket connection after last use """ # TODO allow setting max concurrent connections and rpc subscriptions per connection # TODO reconnection logic @@ -644,20 +643,32 @@ def __init__( self._exit_task = None self._open_subscriptions = 0 self._options = options if options else {} + self.last_received = time.time() async def __aenter__(self): async with self._lock: self._in_use += 1 - if self._exit_task: - self._exit_task.cancel() - if not self._initialized: - self._initialized = True - self.ws = await asyncio.wait_for( - connect(self.ws_url, **self._options), timeout=10 - ) - self._receiving_task = asyncio.create_task(self._start_receiving()) + await self.connect() return self + async def connect(self, force=False): + if self._exit_task: + self._exit_task.cancel() + if not self._initialized or force: + self._initialized = True + try: + self._receiving_task.cancel() + await self._receiving_task + await self.ws.close() + except (AttributeError, asyncio.CancelledError): + pass + self.ws = await asyncio.wait_for( + connect(self.ws_url, **self._options), timeout=10 + ) + self._receiving_task = asyncio.create_task(self._start_receiving()) + if force: + self.id = 100 + async def __aexit__(self, exc_type, exc_val, exc_tb): async with self._lock: self._in_use -= 1 @@ -699,7 +710,10 @@ async def shutdown(self): async def _recv(self) -> None: try: response = json.loads(await self.ws.recv()) + self.last_received = time.time() async with self._lock: + # note that these 'subscriptions' are all waiting sent messages which have not received + # responses, and thus are not the same as RPC 'subscriptions', which are unique self._open_subscriptions -= 1 if "id" in response: self._received[response["id"]] = response @@ -707,10 +721,10 @@ async def _recv(self) -> None: self._received[response["params"]["subscription"]] = response else: raise KeyError(response) - except ConnectionClosed: + except ssl.SSLError: + raise ConnectionClosed + except (ConnectionClosed, KeyError): raise - except KeyError as e: - raise e async def _start_receiving(self): try: @@ -719,14 +733,18 @@ async def _start_receiving(self): except asyncio.CancelledError: pass except ConnectionClosed: - # TODO try reconnect, but only if it's needed - raise + async with self._lock: + await self.connect(force=True) async def send(self, payload: dict) -> int: """ Sends a payload to the websocket connection. - :param payload: payload, generate a payload with the AsyncSubstrateInterface.make_payload method + Args: + payload: payload, generate a payload with the AsyncSubstrateInterface.make_payload method + + Returns: + id: the internal ID of the request (incremented int) """ async with self._lock: original_id = self.id @@ -735,22 +753,25 @@ async def send(self, payload: dict) -> int: try: await self.ws.send(json.dumps({**payload, **{"id": original_id}})) return original_id - except ConnectionClosed: - raise + except (ConnectionClosed, ssl.SSLError, EOFError): + async with self._lock: + await self.connect(force=True) async def retrieve(self, item_id: int) -> Optional[dict]: """ Retrieves a single item from received responses dict queue - :param item_id: id of the item to retrieve + Args: + item_id: id of the item to retrieve - :return: retrieved item + Returns: + retrieved item """ - while True: - async with self._lock: - if item_id in self._received: - return self._received.pop(item_id) + try: + return self._received.pop(item_id) + except KeyError: await asyncio.sleep(0.1) + return None class AsyncSubstrateInterface: @@ -760,16 +781,32 @@ class AsyncSubstrateInterface: def __init__( self, chain_endpoint: str, - use_remote_preset=False, - auto_discover=True, - auto_reconnect=True, - ss58_format=None, - type_registry=None, - chain_name=None, + use_remote_preset: bool = False, + auto_discover: bool = True, + ss58_format: Optional[int] = None, + type_registry: Optional[dict] = None, + chain_name: Optional[str] = None, + max_retries: int = 5, + retry_timeout: float = 20.0, ): """ - The asyncio-compatible version of the subtensor interface commands we use in bittensor + The asyncio-compatible version of the subtensor interface commands we use in bittensor. It is important to + initialise this class asynchronously in an async context manager using `async with AsyncSubstrateInterface()`. + Otherwise, some (most) methods will not work properly, and may raise exceptions. + + Args: + chain_endpoint: the URI of the chain to connect to + use_remote_preset: whether to pull the preset from GitHub + auto_discover: whether to automatically pull the presets based on the chain name and type registry + ss58_format: the specific SS58 format to use + type_registry: a dict of custom types + chain_name: the name of the chain (the result of the rpc request for "system_chain") + max_retries: number of times to retry RPC requests before giving up + retry_timeout: how to long wait since the last ping to retry the RPC request + """ + self.max_retries = max_retries + self.retry_timeout = retry_timeout self.chain_endpoint = chain_endpoint self.__chain = chain_name self.ws = Websocket( @@ -784,7 +821,6 @@ def __init__( self.config = { "use_remote_preset": use_remote_preset, "auto_discover": auto_discover, - "auto_reconnect": auto_reconnect, "rpc_methods": None, "strict_scale_decode": True, } @@ -795,12 +831,15 @@ def __init__( self.runtime_cache = RuntimeCache() self.block_id: Optional[int] = None self.runtime_version = None - self.runtime_config = RuntimeConfigurationObject() + self.runtime_config = RuntimeConfigurationObject( + ss58_format=self.ss58_format, implements_scale_info=True + ) self.__metadata_cache = {} self.type_registry_preset = None self.transaction_version = None - self.metadata = None + self.__metadata = None self.metadata_version_hex = "0x0f000000" # v15 + self.event_loop = asyncio.get_event_loop() async def __aenter__(self): await self.initialize() @@ -828,10 +867,20 @@ def chain(self): """ return self.__chain + @property + def metadata(self): + if self.__metadata is None: + raise AttributeError( + "Metadata not found. This generally indicates that the AsyncSubstrateInterface object " + "is not properly async initialized." + ) + else: + return self.__metadata + async def get_storage_item(self, module: str, storage_function: str): - if not self.metadata: + if not self.__metadata: await self.init_runtime() - metadata_pallet = self.metadata.get_metadata_pallet(module) + metadata_pallet = self.__metadata.get_metadata_pallet(module) storage_item = metadata_pallet.get_storage_function(storage_function) return storage_item @@ -856,23 +905,18 @@ async def load_registry(self): metadata_v15 = MetadataV15.decode_from_metadata_option(metadata_option_bytes) self.registry = PortableRegistry.from_metadata_v15(metadata_v15) - async def decode_scale( - self, type_string, scale_bytes: bytes, return_scale_obj=False - ): + async def decode_scale(self, type_string, scale_bytes: bytes) -> Any: """ Helper function to decode arbitrary SCALE-bytes (e.g. 0x02000000) according to given RUST type_string (e.g. BlockNumber). The relevant versioning information of the type (if defined) will be applied if block_hash is set - Parameters - ---------- - type_string - scale_bytes - block_hash - return_scale_obj: if True the SCALE object itself is returned, otherwise the serialized dict value of the object + Args: + type_string: the type string of the SCALE object for decoding + scale_bytes: the SCALE-bytes representation of the SCALE object to decode - Returns - ------- + Returns: + Decoded object """ if scale_bytes == b"\x00": @@ -892,10 +936,12 @@ async def init_runtime( Because parsing of metadata and type registry is quite heavy, the result will be cached per runtime id. In the future there could be support for caching backends like Redis to make this cache more persistent. - :param block_hash: optional block hash, should not be specified if block_id is - :param block_id: optional block id, should not be specified if block_hash is + Args: + block_hash: optional block hash, should not be specified if block_id is + block_id: optional block id, should not be specified if block_hash is - :returns: Runtime object + Returns: + Runtime object """ async def get_runtime(block_hash, block_id) -> Runtime: @@ -903,11 +949,11 @@ async def get_runtime(block_hash, block_id) -> Runtime: if ( (block_hash and block_hash == self.last_block_hash) or (block_id and block_id == self.block_id) - ) and self.metadata is not None: + ) and self.__metadata is not None: return Runtime( self.chain, self.runtime_config, - self.metadata, + self.__metadata, self.type_registry, ) @@ -930,7 +976,6 @@ async def get_runtime(block_hash, block_id) -> Runtime: raise SubstrateRequestException( f'Block not found for "{self.last_block_hash}"' ) - parent_block_hash: str = block_header["result"]["parentHash"] if ( @@ -952,41 +997,41 @@ async def get_runtime(block_hash, block_id) -> Runtime: # Check if runtime state already set to current block if ( runtime_info.get("specVersion") == self.runtime_version - and self.metadata is not None + and self.__metadata is not None ): return Runtime( self.chain, self.runtime_config, - self.metadata, + self.__metadata, self.type_registry, ) self.runtime_version = runtime_info.get("specVersion") self.transaction_version = runtime_info.get("transactionVersion") - if not self.metadata: + if not self.__metadata: if self.runtime_version in self.__metadata_cache: # Get metadata from cache # self.debug_message('Retrieved metadata for {} from memory'.format(self.runtime_version)) - metadata = self.metadata = self.__metadata_cache[ + metadata = self.__metadata = self.__metadata_cache[ self.runtime_version ] else: - metadata = self.metadata = await self.get_block_metadata( + metadata = self.__metadata = await self.get_block_metadata( block_hash=runtime_block_hash, decode=True ) # self.debug_message('Retrieved metadata for {} from Substrate node'.format(self.runtime_version)) # Update metadata cache - self.__metadata_cache[self.runtime_version] = self.metadata + self.__metadata_cache[self.runtime_version] = self.__metadata else: - metadata = self.metadata + metadata = self.__metadata # Update type registry self.reload_type_registry(use_remote_preset=False, auto_discover=True) if self.implements_scaleinfo: # self.debug_message('Add PortableRegistry from metadata to type registry') - self.runtime_config.add_portable_registry(self.metadata) + self.runtime_config.add_portable_registry(metadata) # Set active runtime version self.runtime_config.set_active_spec_version_id(self.runtime_version) @@ -1014,7 +1059,7 @@ async def get_runtime(block_hash, block_id) -> Runtime: return Runtime( self.chain, self.runtime_config, - self.metadata, + metadata, self.type_registry, ) @@ -1033,17 +1078,14 @@ def reload_type_registry( self, use_remote_preset: bool = True, auto_discover: bool = True ): """ - Reload type registry and preset used to instantiate the SubtrateInterface object. Useful to periodically apply - changes in type definitions when a runtime upgrade occurred - - Parameters - ---------- - use_remote_preset: When True preset is downloaded from Github master, otherwise use files from local installed scalecodec package - auto_discover - - Returns - ------- + Reload type registry and preset used to instantiate the `AsyncSubstrateInterface` object. Useful to + periodically apply changes in type definitions when a runtime upgrade occurred + Args: + use_remote_preset: When True preset is downloaded from Github master, + otherwise use files from local installed scalecodec package + auto_discover: Whether to automatically discover the type_registry + presets based on the chain name and typer registry """ self.runtime_config.clear_type_registry() @@ -1110,8 +1152,8 @@ def implements_scaleinfo(self) -> Optional[bool]: ------- bool """ - if self.metadata: - return self.metadata.portable_registry is not None + if self.__metadata: + return self.__metadata.portable_registry is not None else: return None @@ -1125,15 +1167,14 @@ async def create_storage_key( """ Create a `StorageKey` instance providing storage function details. See `subscribe_storage()`. - Parameters - ---------- - pallet: name of pallet - storage_function: name of storage function - params: Optional list of parameters in case of a Mapped storage function + Args: + pallet: name of pallet + storage_function: name of storage function + params: list of parameters in case of a Mapped storage function + block_hash: the hash of the blockchain block whose runtime to use - Returns - ------- - StorageKey + Returns: + StorageKey """ await self.init_runtime(block_hash=block_hash) @@ -1142,7 +1183,7 @@ async def create_storage_key( storage_function, params, runtime_config=self.runtime_config, - metadata=self.metadata, + metadata=self.__metadata, ) async def _get_block_handler( @@ -1152,14 +1193,14 @@ async def _get_block_handler( include_author: bool = False, header_only: bool = False, finalized_only: bool = False, - subscription_handler: Optional[Callable] = None, + subscription_handler: Optional[Callable[[dict], Awaitable[Any]]] = None, ): try: await self.init_runtime(block_hash=block_hash) except BlockNotFound: return None - async def decode_block(block_data, block_data_hash=None): + async def decode_block(block_data, block_data_hash=None) -> dict[str, Any]: if block_data: if block_data_hash: block_data["header"]["hash"] = block_data_hash @@ -1174,16 +1215,16 @@ async def decode_block(block_data, block_data_hash=None): if "extrinsics" in block_data: for idx, extrinsic_data in enumerate(block_data["extrinsics"]): - extrinsic_decoder = extrinsic_cls( - data=ScaleBytes(extrinsic_data), - metadata=self.metadata, - runtime_config=self.runtime_config, - ) try: + extrinsic_decoder = extrinsic_cls( + data=ScaleBytes(extrinsic_data), + metadata=self.__metadata, + runtime_config=self.runtime_config, + ) extrinsic_decoder.decode(check_remaining=True) block_data["extrinsics"][idx] = extrinsic_decoder - except Exception as e: + except Exception: if not ignore_decoding_errors: raise block_data["extrinsics"][idx] = None @@ -1295,23 +1336,29 @@ async def decode_block(block_data, block_data_hash=None): if callable(subscription_handler): rpc_method_prefix = "Finalized" if finalized_only else "New" - async def result_handler(message, update_nr, subscription_id): - new_block = await decode_block({"header": message["params"]["result"]}) + async def result_handler( + message: dict, subscription_id: str + ) -> tuple[Any, bool]: + reached = False + subscription_result = None + if "params" in message: + new_block = await decode_block( + {"header": message["params"]["result"]} + ) - subscription_result = subscription_handler( - new_block, update_nr, subscription_id - ) + subscription_result = await subscription_handler(new_block) - if subscription_result is not None: - # Handler returned end result: unsubscribe from further updates - self._forgettable_task = asyncio.create_task( - self.rpc_request( - f"chain_unsubscribe{rpc_method_prefix}Heads", - [subscription_id], + if subscription_result is not None: + reached = True + # Handler returned end result: unsubscribe from further updates + self._forgettable_task = asyncio.create_task( + self.rpc_request( + f"chain_unsubscribe{rpc_method_prefix}Heads", + [subscription_id], + ) ) - ) - return subscription_result + return subscription_result, reached result = await self._make_rpc_request( [ @@ -1324,7 +1371,7 @@ async def result_handler(message, update_nr, subscription_id): result_handler=result_handler, ) - return result + return result["_get_block_handler"][-1] else: if header_only: @@ -1353,17 +1400,15 @@ async def get_block( Either `block_hash` or `block_number` should be set, or both omitted. - Parameters - ---------- - block_hash: the hash of the block to be retrieved - block_number: the block number to retrieved - ignore_decoding_errors: When set this will catch all decoding errors, set the item to None and continue decoding - include_author: This will retrieve the block author from the validator set and add to the result - finalized_only: when no `block_hash` or `block_number` is set, this will retrieve the finalized head + Args: + block_hash: the hash of the block to be retrieved + block_number: the block number to retrieved + ignore_decoding_errors: When set this will catch all decoding errors, set the item to None and continue decoding + include_author: This will retrieve the block author from the validator set and add to the result + finalized_only: when no `block_hash` or `block_number` is set, this will retrieve the finalized head - Returns - ------- - A dict containing the extrinsic and digest logs data + Returns: + A dict containing the extrinsic and digest logs data """ if block_hash and block_number: raise ValueError("Either block_hash or block_number should be be set") @@ -1397,13 +1442,11 @@ async def get_events(self, block_hash: Optional[str] = None) -> list: """ Convenience method to get events for a certain block (storage call for module 'System' and function 'Events') - Parameters - ---------- - block_hash + Args: + block_hash: the hash of the block to be retrieved - Returns - ------- - list + Returns: + list of events """ def convert_event_data(data): @@ -1452,9 +1495,7 @@ def convert_event_data(data): ) if storage_obj: for item in list(storage_obj): - # print("item!", item) events.append(convert_event_data(item)) - # events += list(storage_obj) return events async def get_block_runtime_version(self, block_hash: str) -> dict: @@ -1470,14 +1511,12 @@ async def get_block_metadata( """ A pass-though to existing JSONRPC method `state_getMetadata`. - Parameters - ---------- - block_hash - decode: True for decoded version - - Returns - ------- + Args: + block_hash: the hash of the block to be queried against + decode: Whether to decode the metadata or present it raw + Returns: + metadata, either as a dict (not decoded) or ScaleType (decoded) """ params = None if decode and not self.runtime_config: @@ -1514,7 +1553,7 @@ async def _preprocess( """ params = query_for if query_for else [] # Search storage call in metadata - metadata_pallet = self.metadata.get_metadata_pallet(module) + metadata_pallet = self.__metadata.get_metadata_pallet(module) if not metadata_pallet: raise SubstrateRequestException(f'Pallet "{module}" not found') @@ -1540,7 +1579,7 @@ async def _preprocess( storage_item.value["name"], params, runtime_config=self.runtime_config, - metadata=self.metadata, + metadata=self.__metadata, ) method = "state_getStorageAt" return Preprocessed( @@ -1564,14 +1603,16 @@ async def _process_response( Processes the RPC call response by decoding it, returning it as is, or setting a handler for subscriptions, depending on the specific call. - :param response: the RPC call response - :param subscription_id: the subscription id for subscriptions, used only for subscriptions with a result handler - :param value_scale_type: Scale Type string used for decoding ScaleBytes results - :param storage_item: The ScaleType object used for decoding ScaleBytes results - :param runtime: the runtime object, used for decoding ScaleBytes results - :param result_handler: the result handler coroutine used for handling longer-running subscriptions + Args: + response: the RPC call response + subscription_id: the subscription id for subscriptions, used only for subscriptions with a result handler + value_scale_type: Scale Type string used for decoding ScaleBytes results + storage_item: The ScaleType object used for decoding ScaleBytes results + runtime: the runtime object, used for decoding ScaleBytes results + result_handler: the result handler coroutine used for handling longer-running subscriptions - :return: (decoded response, completion) + Returns: + (decoded response, completion) """ result: Union[dict, ScaleType] = response if value_scale_type and isinstance(storage_item, ScaleType): @@ -1580,7 +1621,7 @@ async def _process_response( runtime = Runtime( self.chain, self.runtime_config, - self.metadata, + self.__metadata, self.type_registry, ) if response.get("result") is not None: @@ -1598,11 +1639,10 @@ async def _process_response( q = bytes(query_value) else: q = query_value - obj = await self.decode_scale(value_scale_type, q, True) - result = obj + result = await self.decode_scale(value_scale_type, q) if asyncio.iscoroutinefunction(result_handler): # For multipart responses as a result of subscriptions. - message, bool_result = await result_handler(response, subscription_id) + message, bool_result = await result_handler(result, subscription_id) return message, bool_result return result, True @@ -1613,6 +1653,7 @@ async def _make_rpc_request( storage_item: Optional[ScaleType] = None, runtime: Optional[Runtime] = None, result_handler: Optional[ResultHandler] = None, + attempt: int = 1, ) -> RequestManager.RequestResults: request_manager = RequestManager(payloads) @@ -1624,7 +1665,7 @@ async def _make_rpc_request( request_manager.add_request(item_id, item["id"]) while True: - for item_id in request_manager.response_map.keys(): + for item_id in list(request_manager.response_map.keys()): if ( item_id not in request_manager.responses or asyncio.iscoroutinefunction(result_handler) @@ -1640,6 +1681,7 @@ async def _make_rpc_request( item_id = request_manager.overwrite_request( item_id, response["result"] ) + subscription_added = True except KeyError: raise SubstrateRequestException(str(response)) decoded_response, complete = await self._process_response( @@ -1653,15 +1695,30 @@ async def _make_rpc_request( request_manager.add_response( item_id, decoded_response, complete ) - if ( - asyncio.iscoroutinefunction(result_handler) - and not subscription_added - ): - subscription_added = True - break if request_manager.is_complete: break + if time.time() - self.ws.last_received >= self.retry_timeout: + if attempt >= self.max_retries: + err_console.print( + f"Timed out waiting for RPC requests {attempt} times. Exiting." + ) + raise SubstrateRequestException("Max retries reached.") + else: + self.ws.last_received = time.time() + await self.ws.connect(force=True) + err_console.print( + f"Timed out waiting for RPC requests. " + f"Retrying attempt {attempt + 1} of {self.max_retries}" + ) + return await self._make_rpc_request( + payloads, + value_scale_type, + storage_item, + runtime, + result_handler, + attempt + 1, + ) return request_manager.get_results() @@ -1670,11 +1727,13 @@ def make_payload(id_: str, method: str, params: list) -> dict: """ Creates a payload for making an rpc_request with _make_rpc_request - :param id_: a unique name you would like to give to this request - :param method: the method in the RPC request - :param params: the params in the RPC request + Args: + id_: a unique name you would like to give to this request + method: the method in the RPC request + params: the params in the RPC request - :return: the payload dict + Returns: + the payload dict """ return { "id": id_, @@ -1689,17 +1748,19 @@ async def rpc_request( reuse_block_hash: bool = False, ) -> Any: """ - Makes an RPC request to the subtensor. Use this only if ``self.query`` and ``self.query_multiple`` and - ``self.query_map`` do not meet your needs. + Makes an RPC request to the subtensor. Use this only if `self.query`` and `self.query_multiple` and + `self.query_map` do not meet your needs. - :param method: str the method in the RPC request - :param params: list of the params in the RPC request - :param block_hash: optional str, the hash of the block — only supply this if not supplying the block - hash in the params, and not reusing the block hash - :param reuse_block_hash: optional bool, whether to reuse the block hash in the params — only mark as True - if not supplying the block hash in the params, or via the `block_hash` parameter + Args: + method: str the method in the RPC request + params: list of the params in the RPC request + block_hash: the hash of the block — only supply this if not supplying the block + hash in the params, and not reusing the block hash + reuse_block_hash: whether to reuse the block hash in the params — only mark as True + if not supplying the block hash in the params, or via the `block_hash` parameter - :return: the response from the RPC request + Returns: + the response from the RPC request """ block_hash = await self._get_current_block_hash(block_hash, reuse_block_hash) params = params or [] @@ -1714,7 +1775,7 @@ async def rpc_request( runtime = Runtime( self.chain, self.runtime_config, - self.metadata, + self.__metadata, self.type_registry, ) result = await self._make_rpc_request(payloads, runtime=runtime) @@ -1740,7 +1801,7 @@ async def get_chain_head(self) -> str: runtime=Runtime( self.chain, self.runtime_config, - self.metadata, + self.__metadata, self.type_registry, ), ) @@ -1757,20 +1818,23 @@ async def compose_call( """ Composes a call payload which can be used in an extrinsic. - :param call_module: Name of the runtime module e.g. Balances - :param call_function: Name of the call function e.g. transfer - :param call_params: This is a dict containing the params of the call. e.g. - `{'dest': 'EaG2CRhJWPb7qmdcJvy3LiWdh26Jreu9Dx6R1rXxPmYXoDk', 'value': 1000000000000}` - :param block_hash: Use metadata at given block_hash to compose call + Args: + call_module: Name of the runtime module e.g. Balances + call_function: Name of the call function e.g. transfer + call_params: This is a dict containing the params of the call. e.g. + `{'dest': 'EaG2CRhJWPb7qmdcJvy3LiWdh26Jreu9Dx6R1rXxPmYXoDk', 'value': 1000000000000}` + block_hash: Use metadata at given block_hash to compose call - :return: A composed call + Returns: + A composed call """ if call_params is None: call_params = {} await self.init_runtime(block_hash=block_hash) + call = self.runtime_config.create_scale_object( - type_string="Call", metadata=self.metadata + type_string="Call", metadata=self.__metadata ) call.encode( @@ -1843,14 +1907,12 @@ async def query_multi( result = substrate.query_multi(storage_keys) ``` - Parameters - ---------- - storage_keys: list of StorageKey objects - block_hash: Optional block_hash of state snapshot + Args: + storage_keys: list of StorageKey objects + block_hash: hash of the block to query against - Returns - ------- - list of `(storage_key, scale_obj)` tuples + Returns: + list of `(storage_key, scale_obj)` tuples """ await self.init_runtime(block_hash=block_hash) @@ -1897,12 +1959,14 @@ async def create_scale_object( Convenience method to create a SCALE object of type `type_string`, this will initialize the runtime automatically at moment of `block_hash`, or chain tip if omitted. - :param type_string: str Name of SCALE type to create - :param data: ScaleBytes Optional ScaleBytes to decode - :param block_hash: Optional block hash for moment of decoding, when omitted the chain tip will be used - :param kwargs: keyword args for the Scale Type constructor + Args: + type_string: Name of SCALE type to create + data: ScaleBytes: ScaleBytes to decode + block_hash: block hash for moment of decoding, when omitted the chain tip will be used + kwargs: keyword args for the Scale Type constructor - :return: The created Scale Type object + Returns: + The created Scale Type object """ runtime = await self.init_runtime(block_hash=block_hash) if "metadata" not in kwargs: @@ -1950,12 +2014,12 @@ async def generate_signature_payload( ) # Process signed extensions in metadata - if "signed_extensions" in self.metadata[1][1]["extrinsic"]: + if "signed_extensions" in self.__metadata[1][1]["extrinsic"]: # Base signature payload signature_payload.type_mapping = [["call", "CallBytes"]] # Add signed extensions to payload - signed_extensions = self.metadata.get_signed_extensions() + signed_extensions = self.__metadata.get_signed_extensions() if "CheckMortality" in signed_extensions: signature_payload.type_mapping.append( @@ -2075,28 +2139,29 @@ async def create_signed_extrinsic( """ Creates an extrinsic signed by given account details - :param call: GenericCall to create extrinsic for - :param keypair: Keypair used to sign the extrinsic - :param era: Specify mortality in blocks in follow format: - {'period': [amount_blocks]} If omitted the extrinsic is immortal - :param nonce: nonce to include in extrinsics, if omitted the current nonce is retrieved on-chain - :param tip: The tip for the block author to gain priority during network congestion - :param tip_asset_id: Optional asset ID with which to pay the tip - :param signature: Optionally provide signature if externally signed + Args: + call: GenericCall to create extrinsic for + keypair: Keypair used to sign the extrinsic + era: Specify mortality in blocks in follow format: + {'period': [amount_blocks]} If omitted the extrinsic is immortal + nonce: nonce to include in extrinsics, if omitted the current nonce is retrieved on-chain + tip: The tip for the block author to gain priority during network congestion + tip_asset_id: Optional asset ID with which to pay the tip + signature: Optionally provide signature if externally signed - :return: The signed Extrinsic + Returns: + The signed Extrinsic """ - if not self.metadata: - await self.init_runtime() + await self.init_runtime() # Check requirements if not isinstance(call, GenericCall): raise TypeError("'call' must be of type Call") # Check if extrinsic version is supported - if self.metadata[1][1]["extrinsic"]["version"] != 4: # type: ignore + if self.__metadata[1][1]["extrinsic"]["version"] != 4: # type: ignore raise NotImplementedError( - f"Extrinsic version {self.metadata[1][1]['extrinsic']['version']} not supported" # type: ignore + f"Extrinsic version {self.__metadata[1][1]['extrinsic']['version']} not supported" # type: ignore ) # Retrieve nonce @@ -2138,8 +2203,9 @@ async def create_signed_extrinsic( # Create extrinsic extrinsic = self.runtime_config.create_scale_object( - type_string="Extrinsic", metadata=self.metadata + type_string="Extrinsic", metadata=self.__metadata ) + value = { "account_id": f"0x{keypair.public_key.hex()}", "signature": f"0x{signature.hex()}", @@ -2157,7 +2223,9 @@ async def create_signed_extrinsic( signature_cls = self.runtime_config.get_decoder_class("ExtrinsicSignature") if issubclass(signature_cls, self.runtime_config.get_decoder_class("Enum")): value["signature_version"] = signature_version + extrinsic.encode(value) + return extrinsic async def get_chain_finalised_head(self): @@ -2186,12 +2254,14 @@ async def runtime_call( """ Calls a runtime API method - :param api: Name of the runtime API e.g. 'TransactionPaymentApi' - :param method: Name of the method e.g. 'query_fee_details' - :param params: List of parameters needed to call the runtime API - :param block_hash: Hash of the block at which to make the runtime API call + Args: + api: Name of the runtime API e.g. 'TransactionPaymentApi' + method: Name of the method e.g. 'query_fee_details' + params: List of parameters needed to call the runtime API + block_hash: Hash of the block at which to make the runtime API call - :return: ScaleType from the runtime call + Returns: + ScaleType from the runtime call """ await self.init_runtime() @@ -2219,7 +2289,7 @@ async def runtime_call( runtime = Runtime( self.chain, self.runtime_config, - self.metadata, + self.__metadata, self.type_registry, ) @@ -2256,9 +2326,11 @@ async def get_account_nonce(self, account_address: str) -> int: """ Returns current nonce for given account address - :param account_address: SS58 formatted address + Args: + account_address: SS58 formatted address - :return: Nonce for given account address + Returns: + Nonce for given account address """ nonce_obj = await self.runtime_call( "AccountNonceApi", "account_nonce", [account_address] @@ -2270,20 +2342,18 @@ async def get_metadata_constant(self, module_name, constant_name, block_hash=Non Retrieves the details of a constant for given module name, call function name and block_hash (or chaintip if block_hash is omitted) - Parameters - ---------- - module_name - constant_name - block_hash + Args: + module_name: name of the module you are querying + constant_name: name of the constant you are querying + block_hash: hash of the block at which to make the runtime API call - Returns - ------- - MetadataModuleConstants + Returns: + MetadataModuleConstants """ await self.init_runtime(block_hash=block_hash) - for module in self.metadata.pallets: + for module in self.__metadata.pallets: if module_name == module.name and module.constants: for constant in module.constants: if constant_name == constant.value["name"]: @@ -2300,14 +2370,14 @@ async def get_constant( Returns the decoded `ScaleType` object of the constant for given module name, call function name and block_hash (or chaintip if block_hash is omitted) - Parameters - ---------- - :param module_name: Name of the module to query - :param constant_name: Name of the constant to query - :param block_hash: Hash of the block at which to make the runtime API call - :param reuse_block_hash: Reuse last-used block hash if set to true + Args: + module_name: Name of the module to query + constant_name: Name of the constant to query + block_hash: Hash of the block at which to make the runtime API call + reuse_block_hash: Reuse last-used block hash if set to true - :return: ScaleType from the runtime call + Returns: + ScaleType from the runtime call """ block_hash = await self._get_current_block_hash(block_hash, reuse_block_hash) constant = await self.get_metadata_constant( @@ -2318,7 +2388,6 @@ async def get_constant( return await self.decode_scale( constant.type, bytes(constant.constant_value), - return_scale_obj=True, ) else: return None @@ -2329,16 +2398,14 @@ async def get_payment_info( """ Retrieves fee estimation via RPC for given extrinsic - Parameters - ---------- - call: Call object to estimate fees for - keypair: Keypair of the sender, does not have to include private key because no valid signature is required - - Returns - ------- - Dict with payment info + Args: + call: Call object to estimate fees for + keypair: Keypair of the sender, does not have to include private key because no valid signature is + required - E.g. `{'class': 'normal', 'partialFee': 151000000, 'weight': {'ref_time': 143322000}}` + Returns: + Dict with payment info + E.g. `{'class': 'normal', 'partialFee': 151000000, 'weight': {'ref_time': 143322000}}` """ @@ -2431,20 +2498,22 @@ async def query_map( Note: it is important that you do not use `for x in result.records`, as this will sidestep possible pagination. You must do `async for x in result`. - :param module: The module name in the metadata, e.g. System or Balances. - :param storage_function: The storage function name, e.g. Account or Locks. - :param params: The input parameters in case of for example a `DoubleMap` storage function - :param block_hash: Optional block hash for result at given block, when left to None the chain tip will be used. - :param max_results: the maximum of results required, if set the query will stop fetching results when number is - reached - :param start_key: The storage key used as offset for the results, for pagination purposes - :param page_size: The results are fetched from the node RPC in chunks of this size - :param ignore_decoding_errors: When set this will catch all decoding errors, set the item to None and continue - decoding - :param reuse_block_hash: use True if you wish to make the query using the last-used block hash. Do not mark True - if supplying a block_hash - - :return: QueryMapResult object + Args: + module: The module name in the metadata, e.g. System or Balances. + storage_function: The storage function name, e.g. Account or Locks. + params: The input parameters in case of for example a `DoubleMap` storage function + block_hash: Optional block hash for result at given block, when left to None the chain tip will be used. + max_results: the maximum of results required, if set the query will stop fetching results when number is + reached + start_key: The storage key used as offset for the results, for pagination purposes + page_size: The results are fetched from the node RPC in chunks of this size + ignore_decoding_errors: When set this will catch all decoding errors, set the item to None and continue + decoding + reuse_block_hash: use True if you wish to make the query using the last-used block hash. Do not mark True + if supplying a block_hash + + Returns: + QueryMapResult object """ params = params or [] block_hash = await self._get_current_block_hash(block_hash, reuse_block_hash) @@ -2543,7 +2612,6 @@ def concat_hash_len(key_hasher: str) -> int: item_key_obj = await self.decode_scale( type_string=f"({', '.join(key_type_string)})", scale_bytes=bytes.fromhex(item[0][len(prefix) :]), - return_scale_obj=True, ) # strip key_hashers to use as item key @@ -2564,9 +2632,7 @@ def concat_hash_len(key_hasher: str) -> int: item_bytes = hex_to_bytes(item[1]) item_value = await self.decode_scale( - type_string=value_type, - scale_bytes=item_bytes, - return_scale_obj=True, + type_string=value_type, scale_bytes=item_bytes ) except Exception as _: if not ignore_decoding_errors: @@ -2599,18 +2665,14 @@ async def submit_extrinsic( in a block and/or the block is finalized. The receipt returned provided information about the block and triggered events - Parameters - ---------- - extrinsic: Extrinsic The extrinsic to be sent to the network - wait_for_inclusion: wait until extrinsic is included in a block (only works for websocket connections) - wait_for_finalization: wait until extrinsic is finalized (only works for websocket connections) - - Returns - ------- - ExtrinsicReceipt + Args: + extrinsic: Extrinsic The extrinsic to be sent to the network + wait_for_inclusion: wait until extrinsic is included in a block (only works for websocket connections) + wait_for_finalization: wait until extrinsic is finalized (only works for websocket connections) + Returns: + ExtrinsicReceipt object of your submitted extrinsic """ - # Check requirements if not isinstance(extrinsic, GenericExtrinsic): raise TypeError("'extrinsic' must be of type Extrinsics") @@ -2621,11 +2683,13 @@ async def result_handler(message: dict, subscription_id) -> tuple[dict, bool]: to handle the results of the extrinsic rpc call, which are multipart, and require subscribing to the message - :param message: message received from the rpc call - :param subscription_id: subscription id received from the initial rpc call for the subscription + Args: + message: message received from the rpc call + subscription_id: subscription id received from the initial rpc call for the subscription - :returns: tuple containing the dict of the block info for the subscription, and bool for whether - the subscription is completed. + Returns: + tuple containing the dict of the block info for the subscription, and bool for whether + the subscription is completed. """ # Check if extrinsic is included and finalized if "params" in message and isinstance(message["params"]["result"], dict): @@ -2712,11 +2776,13 @@ async def get_metadata_call_function( Retrieves a list of all call functions in metadata active for given block_hash (or chaintip if block_hash is omitted) - :param module_name: name of the module - :param call_function_name: name of the call function - :param block_hash: optional block hash + Args: + module_name: name of the module + call_function_name: name of the call function + block_hash: optional block hash - :return: list of call functions + Returns: + list of call functions """ runtime = await self.init_runtime(block_hash=block_hash) @@ -2727,7 +2793,7 @@ async def get_metadata_call_function( return call return None - async def get_block_number(self, block_hash: Optional[str]) -> int: + async def get_block_number(self, block_hash: Optional[str] = None) -> int: """Async version of `substrateinterface.base.get_block_number` method.""" response = await self.rpc_request("chain_getHeader", [block_hash]) @@ -2746,3 +2812,49 @@ async def close(self): await self.ws.shutdown() except AttributeError: pass + + async def wait_for_block( + self, + block: int, + result_handler: Callable[[dict], Awaitable[Any]], + task_return: bool = True, + ) -> Union[asyncio.Task, Union[bool, Any]]: + """ + Executes the result_handler when the chain has reached the block specified. + + Args: + block: block number + result_handler: coroutine executed upon reaching the block number. This can be basically anything, but + must accept one single arg, a dict with the block data; whether you use this data or not is entirely + up to you. + task_return: True to immediately return the result of wait_for_block as an asyncio Task, False to wait + for the block to be reached, and return the result of the result handler. + + Returns: + Either an asyncio.Task (which contains the running subscription, and whose `result()` will contain the + return of the result_handler), or the result itself, depending on `task_return` flag. + Note that if your result_handler returns `None`, this method will return `True`, otherwise + the return will be the result of your result_handler. + """ + + async def _handler(block_data: dict[str, Any]): + required_number = block + number = block_data["header"]["number"] + if number >= required_number: + return ( + r if (r := await result_handler(block_data)) is not None else True + ) + + args = inspect.getfullargspec(result_handler).args + if len(args) != 1: + raise ValueError( + "result_handler must take exactly one arg: the dict block data." + ) + + co = self._get_block_handler( + self.last_block_hash, subscription_handler=_handler + ) + if task_return is True: + return asyncio.create_task(co) + else: + return await co diff --git a/bittensor_cli/src/bittensor/subtensor_interface.py b/bittensor_cli/src/bittensor/subtensor_interface.py index 8f97e58d..1c9e75b8 100644 --- a/bittensor_cli/src/bittensor/subtensor_interface.py +++ b/bittensor_cli/src/bittensor/subtensor_interface.py @@ -14,7 +14,6 @@ from bittensor_cli.src.bittensor.async_substrate_interface import ( AsyncSubstrateInterface, - TimeoutException, ) from bittensor_cli.src.bittensor.chain_data import ( DelegateInfo, @@ -118,7 +117,7 @@ async def __aenter__(self): ): async with self.substrate: return self - except TimeoutException: + except TimeoutError: err_console.print( "\n[red]Error[/red]: Timeout occurred connecting to substrate. " f"Verify your chain and network settings: {self}" diff --git a/bittensor_cli/src/commands/stake/children_hotkeys.py b/bittensor_cli/src/commands/stake/children_hotkeys.py index fc913faa..66f25d3e 100644 --- a/bittensor_cli/src/commands/stake/children_hotkeys.py +++ b/bittensor_cli/src/commands/stake/children_hotkeys.py @@ -299,11 +299,7 @@ async def get_total_stake_for_hk(hotkey: str, parent: bool = False): params=[hotkey], reuse_block_hash=True, ) - stake = ( - Balance.from_rao(_result) - if _result is not None - else Balance(0) - ) + stake = Balance.from_rao(_result) if _result is not None else Balance(0) if parent: console.print( f"\nYour Hotkey: [bright_magenta]{hotkey}[/bright_magenta] | Total Stake: [dark_orange]{stake}t[/dark_orange]\n",