diff --git a/eth_account/_utils/legacy_transactions.py b/eth_account/_utils/legacy_transactions.py index aa60cb4..3c48ec9 100644 --- a/eth_account/_utils/legacy_transactions.py +++ b/eth_account/_utils/legacy_transactions.py @@ -37,7 +37,7 @@ def serializable_unsigned_transaction_from_dict(transaction_dict): transaction_dict = set_transaction_type_if_needed(transaction_dict) - if 'type' in transaction_dict: + if "type" in transaction_dict: # We delegate to TypedTransaction, which will carry out validation & formatting. return TypedTransaction.from_dict(transaction_dict) @@ -49,7 +49,7 @@ def serializable_unsigned_transaction_from_dict(transaction_dict): chain_id_to_v, apply_formatters_to_dict(LEGACY_TRANSACTION_FORMATTERS), ) - if 'v' in filled_transaction: + if "v" in filled_transaction: serializer = Transaction else: serializer = UnsignedTransaction @@ -58,12 +58,13 @@ def serializable_unsigned_transaction_from_dict(transaction_dict): def encode_transaction(unsigned_transaction, vrs): (v, r, s) = vrs - chain_naive_transaction = dissoc(unsigned_transaction.as_dict(), 'v', 'r', 's') + chain_naive_transaction = dissoc(unsigned_transaction.as_dict(), "v", "r", "s") if isinstance(unsigned_transaction, TypedTransaction): - # Typed transaction have their own encoding format, so we must delegate the encoding. - chain_naive_transaction['v'] = v - chain_naive_transaction['r'] = r - chain_naive_transaction['s'] = s + # Typed transaction have their own encoding format, + # so we must delegate the encoding. + chain_naive_transaction["v"] = v + chain_naive_transaction["r"] = r + chain_naive_transaction["s"] = s signed_typed_transaction = TypedTransaction.from_dict(chain_naive_transaction) return signed_typed_transaction.encode() signed_transaction = Transaction(v=v, r=r, s=s, **chain_naive_transaction) @@ -71,23 +72,26 @@ def encode_transaction(unsigned_transaction, vrs): TRANSACTION_DEFAULTS = { - 'to': b'', - 'value': 0, - 'data': b'', - 'chainId': None, + "to": b"", + "value": 0, + "data": b"", + "chainId": None, } ALLOWED_TRANSACTION_KEYS = { - 'nonce', - 'gasPrice', - 'gas', - 'to', - 'value', - 'data', - 'chainId', # set chainId to None if you want a transaction that can be replayed across networks + "nonce", + "gasPrice", + "gas", + "to", + "value", + "data", + # set chainId to None if you want a transaction that can be replayed across networks + "chainId", } -REQUIRED_TRANSACTION_KEYS = ALLOWED_TRANSACTION_KEYS.difference(TRANSACTION_DEFAULTS.keys()) +REQUIRED_TRANSACTION_KEYS = ALLOWED_TRANSACTION_KEYS.difference( + TRANSACTION_DEFAULTS.keys() +) def assert_valid_fields(transaction_dict): @@ -99,19 +103,27 @@ def assert_valid_fields(transaction_dict): # check if any extra keys were specified superfluous_keys = set(transaction_dict.keys()).difference(ALLOWED_TRANSACTION_KEYS) if superfluous_keys: - raise TypeError("Transaction must not include unrecognized fields: %r" % superfluous_keys) + raise TypeError( + "Transaction must not include unrecognized fields: %r" % superfluous_keys + ) # check for valid types in each field valid_fields: Dict[str, bool] - valid_fields = apply_formatters_to_dict(LEGACY_TRANSACTION_VALID_VALUES, transaction_dict) + valid_fields = apply_formatters_to_dict( + LEGACY_TRANSACTION_VALID_VALUES, transaction_dict + ) if not all(valid_fields.values()): - invalid = {key: transaction_dict[key] for key, valid in valid_fields.items() if not valid} + invalid = { + key: transaction_dict[key] + for key, valid in valid_fields.items() + if not valid + } raise TypeError("Transaction had invalid fields: %r" % invalid) def chain_id_to_v(transaction_dict): # See EIP 155 - chain_id = transaction_dict.pop('chainId') + chain_id = transaction_dict.pop("chainId") if chain_id is None: return transaction_dict else: @@ -124,20 +136,20 @@ def fill_transaction_defaults(transaction): UNSIGNED_TRANSACTION_FIELDS = ( - ('nonce', big_endian_int), - ('gasPrice', big_endian_int), - ('gas', big_endian_int), - ('to', Binary.fixed_length(20, allow_empty=True)), - ('value', big_endian_int), - ('data', binary), + ("nonce", big_endian_int), + ("gasPrice", big_endian_int), + ("gas", big_endian_int), + ("to", Binary.fixed_length(20, allow_empty=True)), + ("value", big_endian_int), + ("data", binary), ) class Transaction(HashableRLP): fields = UNSIGNED_TRANSACTION_FIELDS + ( - ('v', big_endian_int), - ('r', big_endian_int), - ('s', big_endian_int), + ("v", big_endian_int), + ("r", big_endian_int), + ("s", big_endian_int), ) @@ -154,4 +166,4 @@ def strip_signature(txn): def vrs_from(transaction): - return (getattr(transaction, part) for part in 'vrs') + return (getattr(transaction, part) for part in "vrs") diff --git a/eth_account/_utils/signing.py b/eth_account/_utils/signing.py index 1078aa4..0a12cfb 100644 --- a/eth_account/_utils/signing.py +++ b/eth_account/_utils/signing.py @@ -22,9 +22,9 @@ V_OFFSET = 27 # signature versions -PERSONAL_SIGN_VERSION = b'E' # Hex value 0x45 -INTENDED_VALIDATOR_SIGN_VERSION = b'\x00' # Hex value 0x00 -STRUCTURED_DATA_SIGN_VERSION = b'\x01' # Hex value 0x01 +PERSONAL_SIGN_VERSION = b"E" # Hex value 0x45 +INTENDED_VALIDATOR_SIGN_VERSION = b"\x00" # Hex value 0x00 +STRUCTURED_DATA_SIGN_VERSION = b"\x01" # Hex value 0x01 def sign_transaction_dict(eth_key, transaction_dict): @@ -42,8 +42,9 @@ def sign_transaction_dict(eth_key, transaction_dict): (v, r, s) = sign_transaction_hash(eth_key, transaction_hash, chain_id) elif isinstance(unsigned_transaction, TypedTransaction): # Each transaction type dictates its payload, and consequently, - # all the funky logic around the `v` signature field is both obsolete && incorrect. - # We want to obtain the raw `v` and delegate to the transaction type itself. + # all the funky logic around the `v` signature field is both obsolete && + # incorrect. We want to obtain the raw `v` and delegate + # to the transaction type itself. (v, r, s) = eth_key.sign_msg_hash(transaction_hash).vrs else: # Cannot happen, but better for code to be defensive + self-documenting. @@ -128,7 +129,7 @@ def sign_transaction_hash(account, transaction_hash, chain_id): def _pad_to_eth_word(bytes_val): - return bytes_val.rjust(32, b'\0') + return bytes_val.rjust(32, b"\0") def to_bytes32(val): diff --git a/eth_account/_utils/structured_data/hashing.py b/eth_account/_utils/structured_data/hashing.py index a0e7673..cacbd75 100644 --- a/eth_account/_utils/structured_data/hashing.py +++ b/eth_account/_utils/structured_data/hashing.py @@ -7,7 +7,7 @@ ) from eth_abi import ( - encode_abi, + encode, is_encodable, is_encodable_type, ) @@ -15,10 +15,8 @@ parse, ) from eth_utils import ( - ValidationError, keccak, to_tuple, - toolz, ) from .validation import ( @@ -39,16 +37,25 @@ def get_dependencies(primary_type, types): deps.add(struct_name) fields = types[struct_name] for field in fields: - if field["type"] not in types: + field_type = field["type"] + + # Handle array types + if is_array_type(field_type): + field_type = field_type[: field_type.index("[")] + + if field_type not in types: # We don't need to expand types that are not user defined (customized) continue - elif field["type"] in deps: + elif field_type not in deps: + # Custom Struct Type + struct_names_yet_to_be_expanded.append(field_type) + elif field_type in deps: # skip types that we have already encountered continue else: - # Custom Struct Type - struct_names_yet_to_be_expanded.append(field["type"]) - + raise TypeError( + f"Unable to determine type dependencies with type `{field_type}`." + ) # Don't need to make a struct as dependency of itself deps.remove(primary_type) @@ -68,7 +75,7 @@ def field_identifier(field): def encode_struct(struct_name, struct_field_types): return "{0}({1})".format( struct_name, - ','.join(map(field_identifier, struct_field_types)), + ",".join(map(field_identifier, struct_field_types)), ) @@ -76,18 +83,16 @@ def encode_type(primary_type, types): """ Serialize types into an encoded string. - The type of a struct is encoded as name ‖ "(" ‖ member₁ ‖ "," ‖ member₂ ‖ "," ‖ … ‖ memberₙ ")" + The type of a struct is encoded as: + name ‖ "(" ‖ member₁ ‖ "," ‖ member₂ ‖ "," ‖ … ‖ memberₙ ")" where each member is written as type ‖ " " ‖ name. """ # Getting the dependencies and sorting them alphabetically as per EIP712 deps = get_dependencies(primary_type, types) sorted_deps = (primary_type,) + tuple(sorted(deps)) - result = ''.join( - [ - encode_struct(struct_name, types[struct_name]) - for struct_name in sorted_deps - ] + result = "".join( + [encode_struct(struct_name, types[struct_name]) for struct_name in sorted_deps] ) return result @@ -97,9 +102,7 @@ def hash_struct_type(primary_type, types): def is_array_type(type): - # Identify if type such as "person[]" or "person[2]" is an array - abi_type = parse(type) - return abi_type.is_array + return type.endswith("]") @to_tuple @@ -121,147 +124,114 @@ def get_depths_and_dimensions(data, depth): def get_array_dimensions(data): """ - Given an array type data item, check that it is an array and return the dimensions as a tuple. + Given an array type data item, check that it is an array and return the dimensions + as a tuple, in order from inside to outside. - Ex: get_array_dimensions([[1, 2, 3], [4, 5, 6]]) returns (2, 3) + Ex: get_array_dimensions([[1, 2, 3], [4, 5, 6]]) returns (3, 2) """ depths_and_dimensions = get_depths_and_dimensions(data, 0) - # re-form as a dictionary with `depth` as key, and all of the dimensions found at that depth. + + # re-form as a dictionary with `depth` as key, and all of the dimensions + # found at that depth. grouped_by_depth = { depth: tuple(dimension for depth, dimension in group) for depth, group in groupby(depths_and_dimensions, itemgetter(0)) } - # validate that there is only one dimension for any given depth. - invalid_depths_dimensions = tuple( - (depth, dimensions) - for depth, dimensions in grouped_by_depth.items() - if len(set(dimensions)) != 1 - ) - if invalid_depths_dimensions: - raise ValidationError( - '\n'.join( - [ - "Depth {0} of array data has more than one dimensions: {1}". - format(depth, dimensions) - for depth, dimensions in invalid_depths_dimensions - ] - ) - ) - dimensions = tuple( - toolz.first(set(dimensions)) - for depth, dimensions in sorted(grouped_by_depth.items()) + # check that all dimensions are the same, else use "dynamic" + dimensions[0] if all(dim == dimensions[0] for dim in dimensions) else "dynamic" + for _depth, dimensions in sorted(grouped_by_depth.items(), reverse=True) ) return dimensions -@to_tuple -def flatten_multidimensional_array(array): - for item in array: - if isinstance(item, (list, tuple)): - # Not checking for Iterable instance, because even Dictionaries and strings - # are considered as iterables, but that's not what we want the condition to be. - yield from flatten_multidimensional_array(item) - else: - yield item +def encode_field(types, name, field_type, value): + if value is None: + raise ValueError(f"Missing value for field {name} of type {field_type}") + if field_type in types: + return ("bytes32", keccak(encode_data(field_type, types, value))) -@to_tuple -def _encode_data(primary_type, types, data): - # Add typehash - yield "bytes32", hash_struct_type(primary_type, types) + if field_type == "bytes": + if not isinstance(value, bytes): + raise TypeError( + f"Value of field `{name}` ({value}) is of the type `{type(value)}`, " + f"but expected bytes value" + ) - # Add field contents - for field in types[primary_type]: - value = data[field["name"]] - if field["type"] == "string": - if not isinstance(value, str): - raise TypeError( - "Value of `{0}` ({2}) in the struct `{1}` is of the type `{3}`, but expected " - "string value".format( - field["name"], - primary_type, - value, - type(value), - ) - ) - # Special case where the values need to be keccak hashed before they are encoded - hashed_value = keccak(text=value) - yield "bytes32", hashed_value - elif field["type"] == "bytes": - if not isinstance(value, bytes): + return ("bytes32", keccak(value)) + + if field_type == "string": + if not isinstance(value, str): + raise TypeError( + f"Value of field `{name}` ({value}) is of the type `{type(value)}`, " + f"but expected string value" + ) + + return ("bytes32", keccak(text=value)) + + if is_array_type(field_type): + # Get the dimensions from the value + array_dimensions = get_array_dimensions(value) + # Get the dimensions from what was declared in the schema + parsed_field_type = parse(field_type) + + for i in range(len(array_dimensions)): + if len(parsed_field_type.arrlist[i]) == 0: + # Skip empty or dynamically declared dimensions + continue + if array_dimensions[i] != parsed_field_type.arrlist[i][0]: + # Dimensions should match with declared schema raise TypeError( - "Value of `{0}` ({2}) in the struct `{1}` is of the type `{3}`, but expected " - "bytes value".format( - field["name"], - primary_type, - value, - type(value), - ) + f"Array data `{value}` has dimensions `{array_dimensions}`" + f" whereas the schema has dimensions " + f"`{tuple(map(lambda x: x[0] if x else 'dynamic', parsed_field_type.arrlist))}`" # noqa: E501 ) - # Special case where the values need to be keccak hashed before they are encoded - hashed_value = keccak(primitive=value) - yield "bytes32", hashed_value - elif field["type"] in types: - # This means that this type is a user defined type - hashed_value = keccak(primitive=encode_data(field["type"], types, value)) - yield "bytes32", hashed_value - elif is_array_type(field["type"]): - # Get the dimensions from the value - array_dimensions = get_array_dimensions(value) - # Get the dimensions from what was declared in the schema - parsed_type = parse(field["type"]) - for i in range(len(array_dimensions)): - if len(parsed_type.arrlist[i]) == 0: - # Skip empty or dynamically declared dimensions - continue - if array_dimensions[i] != parsed_type.arrlist[i][0]: - # Dimensions should match with declared schema - raise TypeError( - "Array data `{0}` has dimensions `{1}` whereas the " - "schema has dimensions `{2}`".format( - value, - array_dimensions, - tuple(map(lambda x: x[0], parsed_type.arrlist)), - ) - ) - - array_items = flatten_multidimensional_array(value) - array_items_encoding = [ - encode_data(parsed_type.base, types, array_item) - for array_item in array_items - ] - concatenated_array_encodings = b''.join(array_items_encoding) - hashed_value = keccak(concatenated_array_encodings) - yield "bytes32", hashed_value + + field_type_of_inside_array = field_type[: field_type.rindex("[")] + field_type_value_pairs = [ + encode_field(types, name, field_type_of_inside_array, item) + for item in value + ] + + # handle empty array + if value: + data_types, data_hashes = zip(*field_type_value_pairs) else: - # First checking to see if type is valid as per abi - if not is_encodable_type(field["type"]): - raise TypeError( - "Received Invalid type `{0}` in the struct `{1}`".format( - field["type"], - primary_type, - ) - ) + data_types, data_hashes = [], [] + + return ("bytes32", keccak(encode(data_types, data_hashes))) + + # First checking to see if field_type is valid as per abi + if not is_encodable_type(field_type): + raise TypeError(f"Received Invalid type `{field_type}` in field `{name}`") + + # Next, see if the value is encodable as the specified field_type + if is_encodable(field_type, value): + # field_type is a valid type and the provided value is encodable as that type + return (field_type, value) + else: + raise TypeError( + f"Value of `{name}` ({value}) is not encodable as type `{field_type}`. " + f"If the base type is correct, verify that the value does not " + f"exceed the specified size for the type." + ) - # Next, see if the value is encodable as the specified type - if is_encodable(field["type"], value): - # field["type"] is a valid type and the provided value is encodable as that type - yield field["type"], value - else: - raise TypeError( - f"Value of `{field['name']}` ({value}) in the struct `{primary_type}` is not " - f"encodable as the specified type `{field['type']}`. If the base type is " - "correct, make sure the value does not exceed the specified size for the type." - ) +def encode_data(primary_type, types, data): + encoded_types = ["bytes32"] + encoded_values = [hash_struct_type(primary_type, types)] + + for field in types[primary_type]: + type, value = encode_field( + types, field["name"], field["type"], data[field["name"]] + ) + encoded_types.append(type) + encoded_values.append(value) -def encode_data(primaryType, types, data): - data_types_and_hashes = _encode_data(primaryType, types, data) - data_types, data_hashes = zip(*data_types_and_hashes) - return encode_abi(data_types, data_hashes) + return encode(encoded_types, encoded_values) def load_and_validate_structured_message(structured_json_string_data): @@ -273,11 +243,7 @@ def load_and_validate_structured_message(structured_json_string_data): def hash_domain(structured_data): return keccak( - encode_data( - "EIP712Domain", - structured_data["types"], - structured_data["domain"] - ) + encode_data("EIP712Domain", structured_data["types"], structured_data["domain"]) ) @@ -286,6 +252,6 @@ def hash_message(structured_data): encode_data( structured_data["primaryType"], structured_data["types"], - structured_data["message"] + structured_data["message"], ) ) diff --git a/eth_account/_utils/structured_data/validation.py b/eth_account/_utils/structured_data/validation.py index e5b6fa2..5d064e1 100644 --- a/eth_account/_utils/structured_data/validation.py +++ b/eth_account/_utils/structured_data/validation.py @@ -6,15 +6,12 @@ # Regexes IDENTIFIER_REGEX = r"^[a-zA-Z_$][a-zA-Z_$0-9]*$" -TYPE_REGEX = r"^[a-zA-Z_$][a-zA-Z_$0-9]*(\[([1-9]\d*)*\])*$" +TYPE_REGEX = r"^[a-zA-Z_$][a-zA-Z_$0-9]*(\[([1-9]\d*\b)*\])*$" def validate_has_attribute(attr_name, dict_data): if attr_name not in dict_data: - raise ValidationError( - "Attribute `{0}` not found in the JSON string". - format(attr_name) - ) + raise ValidationError(f"Attribute `{attr_name}` not found in the JSON string") def validate_types_attribute(structured_data): @@ -27,39 +24,39 @@ def validate_types_attribute(structured_data): # Check that `struct_name` is of the type string if not isinstance(struct_name, str): raise ValidationError( - "Struct Name of `types` attribute should be a string, but got type `{0}`". - format(type(struct_name)) + "Struct Name of `types` attribute should be a string, " + f"but got type `{type(struct_name)}`" ) for field in structured_data["types"][struct_name]: # Check that `field["name"]` is of the type string if not isinstance(field["name"], str): raise ValidationError( - "Field Name `{0}` of struct `{1}` should be a string, but got type `{2}`". - format(field["name"], struct_name, type(field["name"])) + f"Field Name `{field['name']}` of struct `{struct_name}` " + f"should be a string, but got type `{type(field['name'])}`" ) # Check that `field["type"]` is of the type string if not isinstance(field["type"], str): raise ValidationError( - "Field Type `{0}` of struct `{1}` should be a string, but got type `{2}`". - format(field["type"], struct_name, type(field["type"])) + f"Field Type `{field['type']}` of struct `{struct_name}` " + f"should be a string, but got type `{type(field['name'])}`" ) # Check that field["name"] matches with IDENTIFIER_REGEX if not re.match(IDENTIFIER_REGEX, field["name"]): raise ValidationError( - "Invalid Identifier `{0}` in `{1}`".format(field["name"], struct_name) + f"Invalid Identifier `{field['name']}` in `{struct_name}`" ) # Check that field["type"] matches with TYPE_REGEX if not re.match(TYPE_REGEX, field["type"]): raise ValidationError( - "Invalid Type `{0}` in `{1}`".format(field["type"], struct_name) + f"Invalid Type `{field['type']}` in `{struct_name}`" ) def validate_field_declared_only_once_in_struct(field_name, struct_data, struct_name): if len([field for field in struct_data if field["name"] == field_name]) != 1: raise ValidationError( - "Attribute `{0}` not declared or declared more than once in {1}". - format(field_name, struct_name) + f"Attribute `{field_name}` not declared or declared more " + f"than once in {struct_name}" ) @@ -72,38 +69,48 @@ def validate_field_declared_only_once_in_struct(field_name, struct_data, struct_ def used_header_fields(EIP712Domain_data): - return [field["name"] for field in EIP712Domain_data if field["name"] in EIP712_DOMAIN_FIELDS] + return [ + field["name"] + for field in EIP712Domain_data + if field["name"] in EIP712_DOMAIN_FIELDS + ] def validate_EIP712Domain_schema(structured_data): # Check that the `types` attribute contains `EIP712Domain` schema declaration if "EIP712Domain" not in structured_data["types"]: raise ValidationError("`EIP712Domain struct` not found in types attribute") - # Check that the names and types in `EIP712Domain` are what are mentioned in the EIP-712 - # and they are declared only once (if defined at all) + # Check that the names and types in `EIP712Domain` are what are mentioned in the + # EIP-712 and they are declared only once (if defined at all) EIP712Domain_data = structured_data["types"]["EIP712Domain"] header_fields = used_header_fields(EIP712Domain_data) if len(header_fields) == 0: - raise ValidationError(f"One of {EIP712_DOMAIN_FIELDS} must be defined in {structured_data}") + raise ValidationError( + f"One of {EIP712_DOMAIN_FIELDS} must be defined in {structured_data}" + ) for field in header_fields: - validate_field_declared_only_once_in_struct(field, EIP712Domain_data, "EIP712Domain") + validate_field_declared_only_once_in_struct( + field, EIP712Domain_data, "EIP712Domain" + ) def validate_primaryType_attribute(structured_data): # Check that `primaryType` attribute is present if "primaryType" not in structured_data: - raise ValidationError("The Structured Data needs to have a `primaryType` attribute") + raise ValidationError( + "The Structured Data needs to have a `primaryType` attribute" + ) # Check that `primaryType` value is a string if not isinstance(structured_data["primaryType"], str): raise ValidationError( - "Value of attribute `primaryType` should be `string`, but got type `{0}`". - format(type(structured_data["primaryType"])) + "Value of attribute `primaryType` should be `string`, " + f"but got type `{type(structured_data['primaryType'])}`" ) # Check that the value of `primaryType` is present in the `types` attribute if not structured_data["primaryType"] in structured_data["types"]: raise ValidationError( - "The Primary Type `{0}` is not present in the `types` attribute". - format(structured_data["primaryType"]) + f"The Primary Type `{structured_data['primaryType']}` is not " + "present in the `types` attribute" ) diff --git a/eth_account/_utils/transaction_utils.py b/eth_account/_utils/transaction_utils.py index 86e9ead..674d4f2 100644 --- a/eth_account/_utils/transaction_utils.py +++ b/eth_account/_utils/transaction_utils.py @@ -16,13 +16,16 @@ def set_transaction_type_if_needed(transaction_dict: Dict[str, Any]) -> Dict[str, Any]: - if 'type' not in transaction_dict: - if 'gasPrice' in transaction_dict and 'accessList' in transaction_dict: + if "type" not in transaction_dict: + if "gasPrice" in transaction_dict and "accessList" in transaction_dict: # access list txn - type 1 - transaction_dict = assoc(transaction_dict, 'type', '0x1') - elif 'maxFeePerGas' in transaction_dict and 'maxPriorityFeePerGas' in transaction_dict: + transaction_dict = assoc(transaction_dict, "type", "0x1") + elif ( + "maxFeePerGas" in transaction_dict + and "maxPriorityFeePerGas" in transaction_dict + ): # dynamic fee txn - type 2 - transaction_dict = assoc(transaction_dict, 'type', '0x2') + transaction_dict = assoc(transaction_dict, "type", "0x2") return transaction_dict @@ -31,24 +34,26 @@ def transaction_rpc_to_rlp_structure(dictionary: Dict[str, Any]) -> Dict[str, An """ Convert a JSON-RPC-structured transaction to an rlp-structured transaction. """ - access_list = dictionary.get('accessList') + access_list = dictionary.get("accessList") if access_list: - dictionary = dissoc(dictionary, 'accessList') + dictionary = dissoc(dictionary, "accessList") rlp_structured_access_list = _access_list_rpc_to_rlp_structure(access_list) - dictionary = assoc(dictionary, 'accessList', rlp_structured_access_list) + dictionary = assoc(dictionary, "accessList", rlp_structured_access_list) return dictionary def _access_list_rpc_to_rlp_structure(access_list: Sequence) -> Sequence: if not is_rpc_structured_access_list(access_list): - raise ValueError("provided object not formatted as JSON-RPC-structured access list") + raise ValueError( + "provided object not formatted as JSON-RPC-structured access list" + ) rlp_structured_access_list = [] for d in access_list: # flatten each dict into a tuple of its values rlp_structured_access_list.append( ( - d['address'], # value of address - tuple(_ for _ in d['storageKeys']) # tuple of storage key values + d["address"], # value of address + tuple(_ for _ in d["storageKeys"]), # tuple of storage key values ) ) return tuple(rlp_structured_access_list) @@ -59,11 +64,11 @@ def transaction_rlp_to_rpc_structure(dictionary: Dict[str, Any]) -> Dict[str, An """ Convert an rlp-structured transaction to a JSON-RPC-structured transaction. """ - access_list = dictionary.get('accessList') + access_list = dictionary.get("accessList") if access_list: - dictionary = dissoc(dictionary, 'accessList') + dictionary = dissoc(dictionary, "accessList") rpc_structured_access_list = _access_list_rlp_to_rpc_structure(access_list) - dictionary = assoc(dictionary, 'accessList', rpc_structured_access_list) + dictionary = assoc(dictionary, "accessList", rpc_structured_access_list) return dictionary @@ -73,10 +78,5 @@ def _access_list_rlp_to_rpc_structure(access_list: Sequence) -> Sequence: rpc_structured_access_list = [] for t in access_list: # build a dictionary with appropriate keys for each tuple - rpc_structured_access_list.append( - { - 'address': t[0], - 'storageKeys': t[1] - } - ) + rpc_structured_access_list.append({"address": t[0], "storageKeys": t[1]}) return tuple(rpc_structured_access_list) diff --git a/eth_account/_utils/typed_transactions.py b/eth_account/_utils/typed_transactions.py index c79577e..1f34395 100644 --- a/eth_account/_utils/typed_transactions.py +++ b/eth_account/_utils/typed_transactions.py @@ -6,6 +6,7 @@ Any, Dict, Tuple, + Union, cast, ) @@ -58,32 +59,38 @@ ) TYPED_TRANSACTION_FORMATTERS = merge( - LEGACY_TRANSACTION_FORMATTERS, { - 'chainId': hexstr_if_str(to_int), - 'type': hexstr_if_str(to_int), - 'accessList': apply_formatter_to_array( + LEGACY_TRANSACTION_FORMATTERS, + { + "chainId": hexstr_if_str(to_int), + "type": hexstr_if_str(to_int), + "accessList": apply_formatter_to_array( apply_formatters_to_dict( { - "address": apply_one_of_formatters(( - (is_string, hexstr_if_str(to_bytes)), - (is_bytes, identity), - )), - "storageKeys": apply_formatter_to_array(hexstr_if_str(to_int)) + "address": apply_one_of_formatters( + ( + (is_string, hexstr_if_str(to_bytes)), + (is_bytes, identity), + ) + ), + "storageKeys": apply_formatter_to_array(hexstr_if_str(to_int)), } ), ), - 'maxPriorityFeePerGas': hexstr_if_str(to_int), - 'maxFeePerGas': hexstr_if_str(to_int), + "maxPriorityFeePerGas": hexstr_if_str(to_int), + "maxFeePerGas": hexstr_if_str(to_int), }, ) # Define typed transaction common sedes. -# [[{20 bytes}, [{32 bytes}...]]...], where ... means “zero or more of the thing to the left”. +# [[{20 bytes}, [{32 bytes}...]]...], where ... means +# “zero or more of the thing to the left”. access_list_sede_type = CountableList( - List([ - Binary.fixed_length(20, allow_empty=False), - CountableList(BigEndianInt(32)), - ]), + List( + [ + Binary.fixed_length(20, allow_empty=False), + CountableList(BigEndianInt(32)), + ] + ), ) @@ -92,6 +99,7 @@ class _TypedTransactionImplementation(ABC): Abstract class that every typed transaction must implement. Should not be imported or used by clients of the library. """ + @abstractmethod def hash(self) -> bytes: pass @@ -116,23 +124,31 @@ class TypedTransaction: * EIP-2930's AccessListTransaction * EIP-1559's DynamicFeeTransaction """ - def __init__(self, transaction_type: int, transaction: _TypedTransactionImplementation): + + def __init__( + self, transaction_type: int, transaction: _TypedTransactionImplementation + ): """Should not be called directly. Use instead the 'from_dict' method.""" if not isinstance(transaction, _TypedTransactionImplementation): - raise TypeError("expected _TypedTransactionImplementation, got %s" % type(transaction)) + raise TypeError( + "expected _TypedTransactionImplementation, got %s" % type(transaction) + ) if not isinstance(transaction_type, int): raise TypeError("expected int, got %s" % type(transaction_type)) self.transaction_type = transaction_type self.transaction = transaction @classmethod - def from_dict(cls, dictionary: Dict[str, Any]): - """Builds a TypedTransaction from a dictionary. Verifies the dictionary is well formed.""" + def from_dict(cls, dictionary: Dict[str, Any]) -> "TypedTransaction": + """ + Builds a TypedTransaction from a dictionary. + Verifies the dictionary is well formed. + """ dictionary = set_transaction_type_if_needed(dictionary) - if not ('type' in dictionary and is_int_or_prefixed_hexstr(dictionary['type'])): + if not ("type" in dictionary and is_int_or_prefixed_hexstr(dictionary["type"])): raise ValueError("missing or incorrect transaction type") # Switch on the transaction type to choose the correct constructor. - transaction_type = pipe(dictionary['type'], hexstr_if_str(to_int)) + transaction_type = pipe(dictionary["type"], hexstr_if_str(to_int)) transaction: Any if transaction_type == AccessListTransaction.transaction_type: transaction = AccessListTransaction @@ -146,12 +162,13 @@ def from_dict(cls, dictionary: Dict[str, Any]): ) @classmethod - def from_bytes(cls, encoded_transaction: HexBytes): + def from_bytes(cls, encoded_transaction: HexBytes) -> "TypedTransaction": """Builds a TypedTransaction from a signed encoded transaction.""" if not isinstance(encoded_transaction, HexBytes): raise TypeError("expected Hexbytes, got %s" % type(encoded_transaction)) - if not (len(encoded_transaction) > 0 and encoded_transaction[0] <= 0x7f): + if not (len(encoded_transaction) > 0 and encoded_transaction[0] <= 0x7F): raise ValueError("unexpected input") + transaction: Union["DynamicFeeTransaction", "AccessListTransaction"] if encoded_transaction[0] == AccessListTransaction.transaction_type: transaction_type = AccessListTransaction.transaction_type transaction = AccessListTransaction.from_bytes(encoded_transaction) @@ -160,7 +177,9 @@ def from_bytes(cls, encoded_transaction: HexBytes): transaction = DynamicFeeTransaction.from_bytes(encoded_transaction) else: # The only known transaction types should be explicit if/elif branches. - raise TypeError("typed transaction has unknown type: %s" % encoded_transaction[0]) + raise TypeError( + "typed transaction has unknown type: %s" % encoded_transaction[0] + ) return cls( transaction_type=transaction_type, transaction=transaction, @@ -171,7 +190,8 @@ def hash(self) -> bytes: Hashes this TypedTransaction to prepare it for signing. As per the EIP-2718 specifications, - the hashing format is dictated by the transaction type itself, and so we delegate the call. + the hashing format is dictated by the transaction type itself, + and so we delegate the call. Note that the return type will be bytes. """ return self.transaction.hash() @@ -180,10 +200,11 @@ def encode(self) -> bytes: """ Encodes this TypedTransaction and returns it as bytes. - The transaction format follows - EIP-2718's typed transaction format (TransactionType || TransactionPayload). - Note that we delegate to a transaction type's payload() method as the EIP-2718 does not - prescribe a TransactionPayload format, leaving types free to implement their own encoding. + The transaction format follows EIP-2718's typed transaction + format (TransactionType || TransactionPayload). + Note that we delegate to a transaction type's payload() method as + the EIP-2718 does not prescribe a TransactionPayload format, + leaving types free to implement their own encoding. """ return bytes([self.transaction_type]) + self.transaction.payload() @@ -200,43 +221,48 @@ class AccessListTransaction(_TypedTransactionImplementation): """ Represents an access list transaction per EIP-2930. """ + # This is the first transaction to implement the EIP-2718 typed transaction. transaction_type = 1 # '0x01' unsigned_transaction_fields = ( - ('chainId', big_endian_int), - ('nonce', big_endian_int), - ('gasPrice', big_endian_int), - ('gas', big_endian_int), - ('to', Binary.fixed_length(20, allow_empty=True)), - ('value', big_endian_int), - ('data', binary), - ('accessList', access_list_sede_type), + ("chainId", big_endian_int), + ("nonce", big_endian_int), + ("gasPrice", big_endian_int), + ("gas", big_endian_int), + ("to", Binary.fixed_length(20, allow_empty=True)), + ("value", big_endian_int), + ("data", binary), + ("accessList", access_list_sede_type), ) signature_fields = ( - ('v', big_endian_int), - ('r', big_endian_int), - ('s', big_endian_int), + ("v", big_endian_int), + ("r", big_endian_int), + ("s", big_endian_int), ) transaction_field_defaults = { - 'type': b'0x1', - 'chainId': 0, - 'to': b'', - 'value': 0, - 'data': b'', - 'accessList': [], + "type": b"0x1", + "chainId": 0, + "to": b"", + "value": 0, + "data": b"", + "accessList": [], } _unsigned_transaction_serializer = type( - "_unsigned_transaction_serializer", (HashableRLP, ), { + "_unsigned_transaction_serializer", + (HashableRLP,), + { "fields": unsigned_transaction_fields, }, ) _signed_transaction_serializer = type( - "_signed_transaction_serializer", (HashableRLP, ), { + "_signed_transaction_serializer", + (HashableRLP,), + { "fields": unsigned_transaction_fields + signature_fields, }, ) @@ -245,27 +271,33 @@ def __init__(self, dictionary: Dict[str, Any]): self.dictionary = dictionary @classmethod - def assert_valid_fields(cls, dictionary: Dict[str, Any]): - transaction_valid_values = merge(LEGACY_TRANSACTION_VALID_VALUES, { - 'type': is_int_or_prefixed_hexstr, - 'accessList': is_rpc_structured_access_list, - }) + def assert_valid_fields(cls, dictionary: Dict[str, Any]) -> None: + transaction_valid_values = merge( + LEGACY_TRANSACTION_VALID_VALUES, + { + "type": is_int_or_prefixed_hexstr, + "accessList": is_rpc_structured_access_list, + }, + ) - if 'v' in dictionary and dictionary['v'] == 0: + if "v" in dictionary and dictionary["v"] == 0: # This is insane logic that is required because the way we evaluate # correct types is in the `if not all()` branch below, and 0 obviously - # maps to the int(0), which maps to False... This was not an issue in non-typed - # transaction because v=0, couldn't exist with the chain offset. - dictionary['v'] = '0x0' + # maps to the int(0), which maps to False... This was not an issue in + # non-typed transaction because v=0, couldn't exist with the chain offset. + dictionary["v"] = "0x0" valid_fields = apply_formatters_to_dict( - transaction_valid_values, dictionary, + transaction_valid_values, + dictionary, ) # type: Dict[str, Any] if not all(valid_fields.values()): - invalid = {key: dictionary[key] for key, valid in valid_fields.items() if not valid} + invalid = { + key: dictionary[key] for key, valid in valid_fields.items() if not valid + } raise TypeError("Transaction had invalid fields: %r" % invalid) @classmethod - def from_dict(cls, dictionary: Dict[str, Any]): + def from_dict(cls, dictionary: Dict[str, Any]) -> "AccessListTransaction": """ Builds an AccessListTransaction from a dictionary. Verifies that the dictionary is well formed. @@ -281,55 +313,65 @@ def from_dict(cls, dictionary: Dict[str, Any]): # We have verified the type, we can safely remove it from the dictionary, # given that it is not to be included within the RLP payload. - transaction_type = sanitized_dictionary.pop('type') + transaction_type = sanitized_dictionary.pop("type") if transaction_type != cls.transaction_type: raise ValueError( - "expected transaction type %s, got %s" % (cls.transaction_type, transaction_type), + "expected transaction type %s, got %s" + % (cls.transaction_type, transaction_type), ) return cls( dictionary=sanitized_dictionary, ) @classmethod - def from_bytes(cls, encoded_transaction: HexBytes): + def from_bytes(cls, encoded_transaction: HexBytes) -> "AccessListTransaction": """Builds an AccesslistTransaction from a signed encoded transaction.""" if not isinstance(encoded_transaction, HexBytes): - raise TypeError("expected Hexbytes, got type: %s" % type(encoded_transaction)) - if not (len(encoded_transaction) > 0 and encoded_transaction[0] == cls.transaction_type): + raise TypeError( + "expected Hexbytes, got type: %s" % type(encoded_transaction) + ) + if not ( + len(encoded_transaction) > 0 + and encoded_transaction[0] == cls.transaction_type + ): raise ValueError("unexpected input") # Format is (0x01 || TransactionPayload) - # We strip the prefix, and RLP unmarshal the payload into our signed transaction serializer. + # We strip the prefix, and RLP unmarshal the payload into our + # signed transaction serializer. transaction_payload = encoded_transaction[1:] rlp_serializer = cls._signed_transaction_serializer - dictionary = rlp_serializer.from_bytes(transaction_payload).as_dict() # type: ignore + dictionary = rlp_serializer.from_bytes( # type: ignore + transaction_payload + ).as_dict() rpc_structured_dict = transaction_rlp_to_rpc_structure(dictionary) - rpc_structured_dict['type'] = cls.transaction_type + rpc_structured_dict["type"] = cls.transaction_type return cls.from_dict(rpc_structured_dict) def as_dict(self) -> Dict[str, Any]: """Returns this transaction as a dictionary.""" dictionary = self.dictionary.copy() - dictionary['type'] = self.__class__.transaction_type + dictionary["type"] = self.__class__.transaction_type return dictionary def hash(self) -> bytes: """ Hashes this AccessListTransaction to prepare it for signing. As per the EIP-2930 specifications, the signature is a secp256k1 signature over - keccak256(0x01 || rlp([chainId, nonce, gasPrice, gasLimit, to, value, data, accessList])). + keccak256(0x01 || rlp([chainId, nonce, gasPrice, gasLimit, to, value, data, accessList])). # noqa E501 Here, we compute the keccak256(...) hash. """ # Remove signature fields. - transaction_without_signature_fields = dissoc(self.dictionary, 'v', 'r', 's') + transaction_without_signature_fields = dissoc(self.dictionary, "v", "r", "s") # RPC-structured transaction to rlp-structured transaction rlp_structured_txn_without_sig_fields = transaction_rpc_to_rlp_structure( transaction_without_signature_fields ) rlp_serializer = self.__class__._unsigned_transaction_serializer hash = pipe( - rlp_serializer.from_dict(rlp_structured_txn_without_sig_fields), # type: ignore + rlp_serializer.from_dict(rlp_structured_txn_without_sig_fields), # type: ignore # noqa: E501 lambda val: rlp.encode(val), # rlp([...]) - lambda val: bytes([self.__class__.transaction_type]) + val, # (0x01 || rlp([...])) + lambda val: bytes([self.__class__.transaction_type]) + + val, # (0x01 || rlp([...])) keccak, # keccak256(0x01 || rlp([...])) ) return cast(bytes, hash) @@ -339,65 +381,72 @@ def payload(self) -> bytes: Returns this transaction's payload as bytes. Here, the TransactionPayload = rlp([chainId, - nonce, gasPrice, gasLimit, to, value, data, accessList, signatureYParity, signatureR, - signatureS]) + nonce, gasPrice, gasLimit, to, value, data, accessList, + signatureYParity, signatureR, signatureS]) """ - if not all(k in self.dictionary for k in 'vrs'): + if not all(k in self.dictionary for k in "vrs"): raise ValueError("attempting to encode an unsigned transaction") rlp_serializer = self.__class__._signed_transaction_serializer rlp_structured_dict = transaction_rpc_to_rlp_structure(self.dictionary) - payload = rlp.encode(rlp_serializer.from_dict(rlp_structured_dict)) # type: ignore + payload = rlp.encode( + rlp_serializer.from_dict(rlp_structured_dict) # type: ignore + ) return cast(bytes, payload) def vrs(self) -> Tuple[int, int, int]: """Returns (v, r, s) if they exist.""" - if not all(k in self.dictionary for k in 'vrs'): + if not all(k in self.dictionary for k in "vrs"): raise ValueError("attempting to encode an unsigned transaction") - return (self.dictionary['v'], self.dictionary['r'], self.dictionary['s']) + return (self.dictionary["v"], self.dictionary["r"], self.dictionary["s"]) class DynamicFeeTransaction(_TypedTransactionImplementation): """ Represents a dynamic fee transaction access per EIP-1559. """ + # This is the second transaction to implement the EIP-2718 typed transaction. transaction_type = 2 # '0x02' unsigned_transaction_fields = ( - ('chainId', big_endian_int), - ('nonce', big_endian_int), - ('maxPriorityFeePerGas', big_endian_int), - ('maxFeePerGas', big_endian_int), - ('gas', big_endian_int), - ('to', Binary.fixed_length(20, allow_empty=True)), - ('value', big_endian_int), - ('data', binary), - ('accessList', access_list_sede_type), + ("chainId", big_endian_int), + ("nonce", big_endian_int), + ("maxPriorityFeePerGas", big_endian_int), + ("maxFeePerGas", big_endian_int), + ("gas", big_endian_int), + ("to", Binary.fixed_length(20, allow_empty=True)), + ("value", big_endian_int), + ("data", binary), + ("accessList", access_list_sede_type), ) signature_fields = ( - ('v', big_endian_int), - ('r', big_endian_int), - ('s', big_endian_int), + ("v", big_endian_int), + ("r", big_endian_int), + ("s", big_endian_int), ) transaction_field_defaults = { - 'type': b'0x2', - 'chainId': 0, - 'to': b'', - 'value': 0, - 'data': b'', - 'accessList': [], + "type": b"0x2", + "chainId": 0, + "to": b"", + "value": 0, + "data": b"", + "accessList": [], } _unsigned_transaction_serializer = type( - "_unsigned_transaction_serializer", (HashableRLP, ), { + "_unsigned_transaction_serializer", + (HashableRLP,), + { "fields": unsigned_transaction_fields, }, ) _signed_transaction_serializer = type( - "_signed_transaction_serializer", (HashableRLP, ), { + "_signed_transaction_serializer", + (HashableRLP,), + { "fields": unsigned_transaction_fields + signature_fields, }, ) @@ -406,29 +455,35 @@ def __init__(self, dictionary: Dict[str, Any]): self.dictionary = dictionary @classmethod - def assert_valid_fields(cls, dictionary: Dict[str, Any]): - transaction_valid_values = merge(LEGACY_TRANSACTION_VALID_VALUES, { - 'type': is_int_or_prefixed_hexstr, - 'maxPriorityFeePerGas': is_int_or_prefixed_hexstr, - 'maxFeePerGas': is_int_or_prefixed_hexstr, - 'accessList': is_rpc_structured_access_list, - }) - - if 'v' in dictionary and dictionary['v'] == 0: + def assert_valid_fields(cls, dictionary: Dict[str, Any]) -> None: + transaction_valid_values = merge( + LEGACY_TRANSACTION_VALID_VALUES, + { + "type": is_int_or_prefixed_hexstr, + "maxPriorityFeePerGas": is_int_or_prefixed_hexstr, + "maxFeePerGas": is_int_or_prefixed_hexstr, + "accessList": is_rpc_structured_access_list, + }, + ) + + if "v" in dictionary and dictionary["v"] == 0: # This is insane logic that is required because the way we evaluate # correct types is in the `if not all()` branch below, and 0 obviously - # maps to the int(0), which maps to False... This was not an issue in non-typed - # transaction because v=0, couldn't exist with the chain offset. - dictionary['v'] = '0x0' + # maps to the int(0), which maps to False... This was not an issue in + # non-typed transaction because v=0, couldn't exist with the chain offset. + dictionary["v"] = "0x0" valid_fields = apply_formatters_to_dict( - transaction_valid_values, dictionary, + transaction_valid_values, + dictionary, ) # type: Dict[str, Any] if not all(valid_fields.values()): - invalid = {key: dictionary[key] for key, valid in valid_fields.items() if not valid} + invalid = { + key: dictionary[key] for key, valid in valid_fields.items() if not valid + } raise TypeError("Transaction had invalid fields: %r" % invalid) @classmethod - def from_dict(cls, dictionary: Dict[str, Any]): + def from_dict(cls, dictionary: Dict[str, Any]) -> "DynamicFeeTransaction": """ Builds a DynamicFeeTransaction from a dictionary. Verifies that the dictionary is well formed. @@ -444,55 +499,66 @@ def from_dict(cls, dictionary: Dict[str, Any]): # We have verified the type, we can safely remove it from the dictionary, # given that it is not to be included within the RLP payload. - transaction_type = sanitized_dictionary.pop('type') + transaction_type = sanitized_dictionary.pop("type") if transaction_type != cls.transaction_type: raise ValueError( - "expected transaction type %s, got %s" % (cls.transaction_type, transaction_type), + "expected transaction type %s, got %s" + % (cls.transaction_type, transaction_type), ) return cls( dictionary=sanitized_dictionary, ) @classmethod - def from_bytes(cls, encoded_transaction: HexBytes): + def from_bytes(cls, encoded_transaction: HexBytes) -> "DynamicFeeTransaction": """Builds a DynamicFeeTransaction from a signed encoded transaction.""" if not isinstance(encoded_transaction, HexBytes): - raise TypeError("expected Hexbytes, got type: %s" % type(encoded_transaction)) - if not (len(encoded_transaction) > 0 and encoded_transaction[0] == cls.transaction_type): + raise TypeError( + "expected Hexbytes, got type: %s" % type(encoded_transaction) + ) + if not ( + len(encoded_transaction) > 0 + and encoded_transaction[0] == cls.transaction_type + ): raise ValueError("unexpected input") # Format is (0x02 || TransactionPayload) - # We strip the prefix, and RLP unmarshal the payload into our signed transaction serializer. + # We strip the prefix, and RLP unmarshal the payload into our + # signed transaction serializer. transaction_payload = encoded_transaction[1:] rlp_serializer = cls._signed_transaction_serializer - dictionary = rlp_serializer.from_bytes(transaction_payload).as_dict() # type: ignore + dictionary = rlp_serializer.from_bytes( # type: ignore + transaction_payload + ).as_dict() rpc_structured_dict = transaction_rlp_to_rpc_structure(dictionary) - rpc_structured_dict['type'] = cls.transaction_type + rpc_structured_dict["type"] = cls.transaction_type return cls.from_dict(rpc_structured_dict) def as_dict(self) -> Dict[str, Any]: """Returns this transaction as a dictionary.""" dictionary = self.dictionary.copy() - dictionary['type'] = self.__class__.transaction_type + dictionary["type"] = self.__class__.transaction_type return dictionary def hash(self) -> bytes: """ Hashes this DynamicFeeTransaction to prepare it for signing. As per the EIP-1559 specifications, the signature is a secp256k1 signature over - keccak256(0x02 || rlp([chainId, nonce, maxPriorityFeePerGas, maxFeePerGas, gasLimit, to, - value, data, accessList])). Here, we compute the keccak256(...) hash. + keccak256(0x02 || rlp([chainId, nonce, maxPriorityFeePerGas, + maxFeePerGas, gasLimit, to, value, data, accessList])). + Here, we compute the keccak256(...) hash. """ # Remove signature fields. - transaction_without_signature_fields = dissoc(self.dictionary, 'v', 'r', 's') + transaction_without_signature_fields = dissoc(self.dictionary, "v", "r", "s") # RPC-structured transaction to rlp-structured transaction rlp_structured_txn_without_sig_fields = transaction_rpc_to_rlp_structure( transaction_without_signature_fields ) rlp_serializer = self.__class__._unsigned_transaction_serializer hash = pipe( - rlp_serializer.from_dict(rlp_structured_txn_without_sig_fields), # type: ignore + rlp_serializer.from_dict(rlp_structured_txn_without_sig_fields), # type: ignore # noqa: E501 lambda val: rlp.encode(val), # rlp([...]) - lambda val: bytes([self.__class__.transaction_type]) + val, # (0x02 || rlp([...])) + lambda val: bytes([self.__class__.transaction_type]) + + val, # (0x02 || rlp([...])) keccak, # keccak256(0x02 || rlp([...])) ) return cast(bytes, hash) @@ -502,18 +568,20 @@ def payload(self) -> bytes: Returns this transaction's payload as bytes. Here, the TransactionPayload = rlp([chainId, - nonce, maxPriorityFeePerGas, maxFeePerGas, gasLimit, to, value, data, accessList, - signatureYParity, signatureR, signatureS]) + nonce, maxPriorityFeePerGas, maxFeePerGas, gasLimit, to, value, data, + accessList, signatureYParity, signatureR, signatureS]) """ - if not all(k in self.dictionary for k in 'vrs'): + if not all(k in self.dictionary for k in "vrs"): raise ValueError("attempting to encode an unsigned transaction") rlp_serializer = self.__class__._signed_transaction_serializer rlp_structured_dict = transaction_rpc_to_rlp_structure(self.dictionary) - payload = rlp.encode(rlp_serializer.from_dict(rlp_structured_dict)) # type: ignore + payload = rlp.encode( + rlp_serializer.from_dict(rlp_structured_dict) # type: ignore + ) return cast(bytes, payload) def vrs(self) -> Tuple[int, int, int]: """Returns (v, r, s) if they exist.""" - if not all(k in self.dictionary for k in 'vrs'): + if not all(k in self.dictionary for k in "vrs"): raise ValueError("attempting to encode an unsigned transaction") - return (self.dictionary['v'], self.dictionary['r'], self.dictionary['s']) + return (self.dictionary["v"], self.dictionary["r"], self.dictionary["s"]) diff --git a/eth_account/_utils/validation.py b/eth_account/_utils/validation.py index dcb27c8..2ba22a4 100644 --- a/eth_account/_utils/validation.py +++ b/eth_account/_utils/validation.py @@ -19,7 +19,7 @@ to_int, ) -VALID_EMPTY_ADDRESSES = {None, b'', ''} +VALID_EMPTY_ADDRESSES = {None, b"", ""} def is_none(val): @@ -27,12 +27,7 @@ def is_none(val): def is_valid_address(value): - if is_binary_address(value): - return True - elif is_checksum_address(value): - return True - else: - return False + return is_binary_address(value) or is_checksum_address(value) def is_int_or_prefixed_hexstr(val): @@ -60,8 +55,8 @@ def is_rpc_structured_access_list(val): return False if len(d) != 2: return False - address = d.get('address') - storage_keys = d.get('storageKeys') + address = d.get("address") + storage_keys = d.get("storageKeys") if any(_ is None for _ in (address, storage_keys)): return False if not is_address(address): @@ -93,27 +88,29 @@ def is_rlp_structured_access_list(val): LEGACY_TRANSACTION_FORMATTERS = { - 'nonce': hexstr_if_str(to_int), - 'gasPrice': hexstr_if_str(to_int), - 'gas': hexstr_if_str(to_int), - 'to': apply_one_of_formatters(( - (is_string, hexstr_if_str(to_bytes)), - (is_bytes, identity), - (is_none, lambda val: b''), - )), - 'value': hexstr_if_str(to_int), - 'data': hexstr_if_str(to_bytes), - 'v': hexstr_if_str(to_int), - 'r': hexstr_if_str(to_int), - 's': hexstr_if_str(to_int), + "nonce": hexstr_if_str(to_int), + "gasPrice": hexstr_if_str(to_int), + "gas": hexstr_if_str(to_int), + "to": apply_one_of_formatters( + ( + (is_string, hexstr_if_str(to_bytes)), + (is_bytes, identity), + (is_none, lambda val: b""), + ) + ), + "value": hexstr_if_str(to_int), + "data": hexstr_if_str(to_bytes), + "v": hexstr_if_str(to_int), + "r": hexstr_if_str(to_int), + "s": hexstr_if_str(to_int), } LEGACY_TRANSACTION_VALID_VALUES = { - 'nonce': is_int_or_prefixed_hexstr, - 'gasPrice': is_int_or_prefixed_hexstr, - 'gas': is_int_or_prefixed_hexstr, - 'to': is_empty_or_checksum_address, - 'value': is_int_or_prefixed_hexstr, - 'data': lambda val: isinstance(val, (int, str, bytes, bytearray)), - 'chainId': lambda val: val is None or is_int_or_prefixed_hexstr(val), + "nonce": is_int_or_prefixed_hexstr, + "gasPrice": is_int_or_prefixed_hexstr, + "gas": is_int_or_prefixed_hexstr, + "to": is_empty_or_checksum_address, + "value": is_int_or_prefixed_hexstr, + "data": lambda val: isinstance(val, (int, str, bytes, bytearray)), + "chainId": lambda val: val is None or is_int_or_prefixed_hexstr(val), } diff --git a/eth_account/account.py b/eth_account/account.py index 459979b..8d78a5f 100644 --- a/eth_account/account.py +++ b/eth_account/account.py @@ -3,6 +3,13 @@ ) import json import os +from typing import ( + Optional, + Tuple, + TypeVar, + Union, + cast, +) import warnings from cytoolz import ( @@ -19,6 +26,11 @@ from eth_keys.exceptions import ( ValidationError, ) +from eth_typing import ( + ChecksumAddress, + Hash32, + HexStr, +) from eth_utils.curried import ( combomethod, hexstr_if_str, @@ -64,16 +76,19 @@ LocalAccount, ) +VRS = TypeVar("VRS", bytes, HexStr, int) -class Account(object): + +class Account: """ The primary entry point for working with Ethereum private keys. It does **not** require a connection to an Ethereum node. """ + _keys = keys - _default_kdf = os.getenv('ETH_ACCOUNT_KDF', 'scrypt') + _default_kdf = os.getenv("ETH_ACCOUNT_KDF", "scrypt") # Enable unaudited features (off by default) _use_unaudited_hdwallet_features = False @@ -86,11 +101,13 @@ def enable_unaudited_hdwallet_features(cls): cls._use_unaudited_hdwallet_features = True @combomethod - def create(self, extra_entropy=''): + def create(self, extra_entropy=""): r""" - Creates a new private key, and returns it as a :class:`~eth_account.local.LocalAccount`. + Creates a new private key, and returns it as a + :class:`~eth_account.local.LocalAccount`. - :param extra_entropy: Add extra randomness to whatever randomness your OS can provide + :param extra_entropy: Add extra randomness to whatever randomness your OS + can provide :type extra_entropy: str or bytes or int :returns: an object with private key and convenience methods @@ -103,7 +120,8 @@ def create(self, extra_entropy=''): >>> acct.key HexBytes('0x8676e9a8c86c8921e922e61e0bb6e9e9689aad4c99082620610b00140e5f21b8') - # These methods are also available: sign_message(), sign_transaction(), encrypt() + # These methods are also available: sign_message(), sign_transaction(), + # encrypt(). # They correspond to the same-named methods in Account.* # but without the private key argument """ @@ -116,7 +134,8 @@ def decrypt(keyfile_json, password): """ Decrypts a private key. - The key may have been encrypted using an Ethereum client or :meth:`~Account.encrypt`. + The key may have been encrypted using an Ethereum client or + :meth:`~Account.encrypt`. :param keyfile_json: The encrypted key :type keyfile_json: dict or str @@ -130,7 +149,7 @@ def decrypt(keyfile_json, password): ... 'address': '5ce9454909639d2d17a3f753ce7d93fa0b9ab12e', ... 'crypto': {'cipher': 'aes-128-ctr', ... 'cipherparams': {'iv': '482ef54775b0cc59f25717711286f5c8'}, - ... 'ciphertext': 'cb636716a9fd46adbb31832d964df2082536edd5399a3393327dc89b0193a2be', + ... 'ciphertext': 'cb636716a9fd46adbb31832d964df2082536edd5399a3393327dc89b0193a2be', # noqa: E501 ... 'kdf': 'scrypt', ... 'kdfparams': {}, ... 'kdfparams': {'dklen': 32, @@ -138,7 +157,7 @@ def decrypt(keyfile_json, password): ... 'p': 8, ... 'r': 1, ... 'salt': 'd3c9a9945000fcb6c9df0f854266d573'}, - ... 'mac': '4f626ec5e7fea391b2229348a65bfef532c2a4e8372c0a6a814505a350a7689d'}, + ... 'mac': '4f626ec5e7fea391b2229348a65bfef532c2a4e8372c0a6a814505a350a7689d'}, # noqa: E501 ... 'id': 'b812f3f9-78cc-462a-9e89-74418aa27cb0', ... 'version': 3} >>> Account.decrypt(encrypted, 'password') @@ -150,7 +169,9 @@ def decrypt(keyfile_json, password): elif is_dict(keyfile_json): keyfile = keyfile_json else: - raise TypeError("The keyfile should be supplied as a JSON string, or a dictionary.") + raise TypeError( + "The keyfile should be supplied as a JSON string, or a dictionary." + ) password_bytes = text_if_str(to_bytes, password) return HexBytes(decode_keyfile_json(keyfile, password_bytes)) @@ -164,8 +185,10 @@ def encrypt(cls, private_key, password, kdf=None, iterations=None): :param private_key: The raw private key :type private_key: hex str, bytes, int or :class:`eth_keys.datatypes.PrivateKey` - :param str password: The password which you will need to unlock the account in your client - :param str kdf: The key derivation function to use when encrypting your private key + :param str password: The password which you will need to unlock the account + in your client + :param str kdf: The key derivation function to use when encrypting your + private key :param int iterations: The work factor for the key derivation function :returns: The data to use in your encrypted file :rtype: dict @@ -210,19 +233,9 @@ def encrypt(cls, private_key, password, kdf=None, iterations=None): password_bytes = text_if_str(to_bytes, password) assert len(key_bytes) == 32 - return create_keyfile_json(key_bytes, password_bytes, kdf=kdf, iterations=iterations) - - @combomethod - def privateKeyToAccount(self, private_key): - """ - .. CAUTION:: Deprecated for :meth:`~eth_account.account.Account.from_key`. - This method will be removed in v0.5 - """ - warnings.warn( - "privateKeyToAccount is deprecated in favor of from_key", - category=DeprecationWarning, + return create_keyfile_json( + key_bytes, password_bytes, kdf=kdf, iterations=iterations ) - return self.from_key(private_key) @combomethod def from_key(self, private_key): @@ -243,18 +256,20 @@ def from_key(self, private_key): >>> acct.key HexBytes('0xb25c7db31feed9122727bf0939dc769a96564b2de4c4726d035b36ecf1e5b364') - # These methods are also available: sign_message(), sign_transaction(), encrypt() - # They correspond to the same-named methods in Account.* + # These methods are also available: sign_message(), sign_transaction(), + # encrypt(). They correspond to the same-named methods in Account.* # but without the private key argument """ key = self._parsePrivateKey(private_key) return LocalAccount(key, self) @combomethod - def from_mnemonic(self, - mnemonic: str, - passphrase: str = "", - account_path: str = ETHEREUM_DEFAULT_PATH): + def from_mnemonic( + self, + mnemonic: str, + passphrase: str = "", + account_path: str = ETHEREUM_DEFAULT_PATH, + ) -> LocalAccount: """ Generate an account from a mnemonic. @@ -262,8 +277,8 @@ def from_mnemonic(self, :param str mnemonic: space-separated list of BIP39 mnemonic seed words :param str passphrase: Optional passphrase used to encrypt the mnemonic - :param str account_path: Specify an alternate HD path for deriving the seed using - BIP32 HD wallet key derivation. + :param str account_path: Specify an alternate HD path for deriving the seed + using BIP32 HD wallet key derivation. :return: object with methods for signing and encrypting :rtype: LocalAccount @@ -272,20 +287,49 @@ def from_mnemonic(self, >>> from eth_account import Account >>> Account.enable_unaudited_hdwallet_features() >>> acct = Account.from_mnemonic( - ... "coral allow abandon recipe top tray caught video climb similar prepare bracket " - ... "antenna rubber announce gauge volume hub hood burden skill immense add acid") + ... "coral allow abandon recipe top tray caught video climb similar " + ... "prepare bracket antenna rubber announce gauge volume " + ... "hub hood burden skill immense add acid") >>> acct.address '0x9AdA5dAD14d925f4df1378409731a9B71Bc8569d' - # These methods are also available: sign_message(), sign_transaction(), encrypt() - # They correspond to the same-named methods in Account.* + # These methods are also available: sign_message(), sign_transaction(), + # encrypt(). They correspond to the same-named methods in Account.* # but without the private key argument + + Or, generate multiple accounts from a mnemonic. + + >>> from eth_account import Account + >>> Account.enable_unaudited_hdwallet_features() + >>> iterator = 0 + >>> for i in range(10): + ... acct = Account.from_mnemonic( + ... "health embark april buyer eternal leopard " + ... "want before nominee head thing tackle", + ... account_path=f"m/44'/60'/0'/0/{iterator}") + ... iterator = iterator + 1 + ... acct.address + '0x61Cc15522D06983Ac7aADe23f9d5433d38e78195' + '0x1240460F6E370f28079E5F9B52f9DcB759F051b7' + '0xd30dC9f996539826C646Eb48bb45F6ee1D1474af' + '0x47e64beb58c9A469c5eD086aD231940676b44e7C' + '0x6D39032ffEF9987988a069F52EFe4d95D0770555' + '0x3836A6530D1889853b047799Ecd8827255072e77' + '0xed5490dEfF8d8FfAe45cb4066C3daC7C6BFF6a22' + '0xf04F9Ff322799253bcC6B12762AD127570a092c5' + '0x900F7fa9fbe85BB25b6cdB94Da24D807f7feb213' + '0xa248e118b0D19010387b1B768686cd9B473FA137' + + .. CAUTION:: For the love of Bob please do not use this mnemonic, + it is for testing purposes only. + """ if not self._use_unaudited_hdwallet_features: raise AttributeError( - "The use of the Mnemonic features of Account is disabled by default until " - "its API stabilizes. To use these features, please enable them by running " - "`Account.enable_unaudited_hdwallet_features()` and try again." + "The use of the Mnemonic features of Account is disabled by " + "default until its API stabilizes. To use these features, please " + "enable them by running `Account.enable_unaudited_hdwallet_features()` " + "and try again." ) seed = seed_from_mnemonic(mnemonic, passphrase) private_key = key_from_seed(seed, account_path) @@ -293,27 +337,32 @@ def from_mnemonic(self, return LocalAccount(key, self) @combomethod - def create_with_mnemonic(self, - passphrase: str = "", - num_words: int = 12, - language: str = "english", - account_path: str = ETHEREUM_DEFAULT_PATH): + def create_with_mnemonic( + self, + passphrase: str = "", + num_words: int = 12, + language: str = "english", + account_path: str = ETHEREUM_DEFAULT_PATH, + ) -> Tuple[LocalAccount, str]: r""" Create a new private key and related mnemonic. .. CAUTION:: This feature is experimental, unaudited, and likely to change soon - Creates a new private key, and returns it as a :class:`~eth_account.local.LocalAccount`, - alongside the mnemonic that can used to regenerate it using any BIP39-compatible wallet. + Creates a new private key, and returns it as a + :class:`~eth_account.local.LocalAccount`, alongside the mnemonic that can + used to regenerate it using any BIP39-compatible wallet. :param str passphrase: Extra passphrase to encrypt the seed phrase - :param int num_words: Number of words to use with seed phrase. Default is 12 words. + :param int num_words: Number of words to use with seed phrase. + Default is 12 words. Must be one of [12, 15, 18, 21, 24]. :param str language: Language to use for BIP39 mnemonic seed phrase. - :param str account_path: Specify an alternate HD path for deriving the seed using - BIP32 HD wallet key derivation. - :returns: A tuple consisting of an object with private key and convenience methods, - and the mnemonic seed phrase that can be used to restore the account. + :param str account_path: Specify an alternate HD path for deriving the + seed using BIP32 HD wallet key derivation. + :returns: A tuple consisting of an object with private key and + convenience methods, and the mnemonic seed phrase that can be + used to restore the account. :rtype: (LocalAccount, str) .. doctest:: python @@ -326,21 +375,28 @@ def create_with_mnemonic(self, >>> acct == Account.from_mnemonic(mnemonic) True - # These methods are also available: sign_message(), sign_transaction(), encrypt() + # These methods are also available: + # sign_message(), sign_transaction(), encrypt() # They correspond to the same-named methods in Account.* # but without the private key argument """ if not self._use_unaudited_hdwallet_features: raise AttributeError( - "The use of the Mnemonic features of Account is disabled by default until " - "its API stabilizes. To use these features, please enable them by running " - "`Account.enable_unaudited_hdwallet_features()` and try again." + "The use of the Mnemonic features of Account is disabled by " + "default until its API stabilizes. To use these features, please " + "enable them by running `Account.enable_unaudited_hdwallet_features()` " + "and try again." ) mnemonic = generate_mnemonic(num_words, language) return self.from_mnemonic(mnemonic, passphrase, account_path), mnemonic @combomethod - def recover_message(self, signable_message: SignableMessage, vrs=None, signature=None): + def recover_message( + self, + signable_message: SignableMessage, + vrs: Optional[Tuple[VRS, VRS, VRS]] = None, + signature: bytes = None, + ) -> ChecksumAddress: r""" Get the address of the account that signed the given message. You must specify exactly one of: vrs or signature @@ -404,34 +460,15 @@ def recover_message(self, signable_message: SignableMessage, vrs=None, signature '0x5ce9454909639D2D17A3F753ce7d93fa0b9aB12E' """ message_hash = _hash_eip191_message(signable_message) - return self._recover_hash(message_hash, vrs, signature) - - @combomethod - def recoverHash(self, message_hash, vrs=None, signature=None): - """ - Get the address of the account that signed the message with the given hash. - You must specify exactly one of: vrs or signature - - .. CAUTION:: Deprecated for :meth:`~eth_account.account.Account.recover_message`. - This method might be removed as early as v0.5 - - :param message_hash: the hash of the message that you want to verify - :type message_hash: hex str or bytes or int - :param vrs: the three pieces generated by an elliptic curve signature - :type vrs: tuple(v, r, s), each element is hex str, bytes or int - :param signature: signature bytes concatenated as r+s+v - :type signature: hex str or bytes or int - :returns: address of signer, hex-encoded & checksummed - :rtype: str - """ - warnings.warn( - "recoverHash is deprecated in favor of recover_message", - category=DeprecationWarning, - ) - return self._recover_hash(message_hash, vrs, signature) + return cast(ChecksumAddress, self._recover_hash(message_hash, vrs, signature)) @combomethod - def _recover_hash(self, message_hash, vrs=None, signature=None): + def _recover_hash( + self, + message_hash: Hash32, + vrs: Optional[Tuple[VRS, VRS, VRS]] = None, + signature: bytes = None, + ) -> ChecksumAddress: hash_bytes = HexBytes(message_hash) if len(hash_bytes) != 32: raise ValueError("The message hash must be exactly 32-bytes") @@ -442,23 +479,13 @@ def _recover_hash(self, message_hash, vrs=None, signature=None): elif signature is not None: signature_bytes = HexBytes(signature) signature_bytes_standard = to_standard_signature_bytes(signature_bytes) - signature_obj = self._keys.Signature(signature_bytes=signature_bytes_standard) + signature_obj = self._keys.Signature( + signature_bytes=signature_bytes_standard + ) else: raise TypeError("You must supply the vrs tuple or the signature bytes") pubkey = signature_obj.recover_public_key_from_msg_hash(hash_bytes) - return pubkey.to_checksum_address() - - @combomethod - def recoverTransaction(self, serialized_transaction): - """ - .. CAUTION:: Deprecated for :meth:`~eth_account.account.Account.recover_transaction`. - This method will be removed in v0.5 - """ - warnings.warn( - "recoverTransaction is deprecated in favor of recover_transaction", - category=DeprecationWarning, - ) - return self.recover_transaction(serialized_transaction) + return cast(ChecksumAddress, pubkey.to_checksum_address()) @combomethod def recover_transaction(self, serialized_transaction): @@ -477,7 +504,7 @@ def recover_transaction(self, serialized_transaction): '0x2c7536E3605D9C16a7a3D7b1898e529396a65c23' """ txn_bytes = HexBytes(serialized_transaction) - if len(txn_bytes) > 0 and txn_bytes[0] <= 0x7f: + if len(txn_bytes) > 0 and txn_bytes[0] <= 0x7F: # We are dealing with a typed transaction. typed_transaction = TypedTransaction.from_bytes(txn_bytes) msg_hash = typed_transaction.hash() @@ -488,17 +515,6 @@ def recover_transaction(self, serialized_transaction): msg_hash = hash_of_signed_transaction(txn) return self._recover_hash(msg_hash, vrs=vrs_from(txn)) - def setKeyBackend(self, backend): - """ - .. CAUTION:: Deprecated for :meth:`~eth_account.account.Account.set_key_backend`. - This method will be removed in v0.5 - """ - warnings.warn( - "setKeyBackend is deprecated in favor of set_key_backend", - category=DeprecationWarning, - ) - self.set_key_backend(backend) - def set_key_backend(self, backend): """ Change the backend used by the underlying eth-keys library. @@ -506,31 +522,35 @@ def set_key_backend(self, backend): *(The default is fine for most users)* :param backend: any backend that works in - `eth_keys.KeyApi(backend) `_ + `eth_keys.KeyApi(backend) + `_ """ self._keys = KeyAPI(backend) @combomethod - def sign_message(self, signable_message: SignableMessage, private_key): + def sign_message( + self, + signable_message: SignableMessage, + private_key: Union[bytes, HexStr, int, keys.PrivateKey], + ) -> SignedMessage: r""" Sign the provided message. - This API supports any messaging format that will encode to EIP-191_ messages. + This API supports any messaging format that will encode to EIP-191 messages. - If you would like historical compatibility with - :meth:`w3.eth.sign() ` + If you would like historical compatibility with :meth:`w3.eth.sign() ` you can use :meth:`~eth_account.messages.encode_defunct`. - Other options are the "validator", or "structured data" standards. (Both of these - are in *DRAFT* status currently, so be aware that the implementation is not - guaranteed to be stable). You can import all supported message encoders in + Other options are the "validator", or "structured data" standards. + You can import all supported message encoders in ``eth_account.messages``. :param signable_message: the encoded message for signing :param private_key: the key to sign the message with :type private_key: hex str, bytes, int or :class:`eth_keys.datatypes.PrivateKey` - :returns: Various details about the signature - most importantly the fields: v, r, and s + :returns: Various details about the signature - most importantly the + fields: v, r, and s :rtype: ~eth_account.datastructures.SignedMessage .. doctest:: python @@ -556,7 +576,7 @@ def sign_message(self, signable_message: SignableMessage, private_key): .. _EIP-191: https://eips.ethereum.org/EIPS/eip-191 """ message_hash = _hash_eip191_message(signable_message) - return self._sign_hash(message_hash, private_key) + return cast(SignedMessage, self._sign_hash(message_hash, private_key)) @combomethod def signHash(self, message_hash, private_key): @@ -583,11 +603,16 @@ def signHash(self, message_hash, private_key): warnings.warn( "signHash is deprecated in favor of sign_message", category=DeprecationWarning, + stacklevel=2, ) return self._sign_hash(message_hash, private_key) @combomethod - def _sign_hash(self, message_hash, private_key): + def _sign_hash( + self, + message_hash: Hash32, + private_key: Union[bytes, HexStr, int, keys.PrivateKey], + ) -> SignedMessage: msg_hash_bytes = HexBytes(message_hash) if len(msg_hash_bytes) != 32: raise ValueError("The message hash must be exactly 32-bytes") @@ -603,37 +628,27 @@ def _sign_hash(self, message_hash, private_key): signature=HexBytes(eth_signature_bytes), ) - @combomethod - def signTransaction(self, transaction_dict, private_key): - """ - .. CAUTION:: Deprecated for :meth:`~eth_account.account.Account.sign_transaction`. - This method will be removed in v0.5 - """ - warnings.warn( - "signTransaction is deprecated in favor of sign_transaction", - category=DeprecationWarning, - ) - return self.sign_transaction(transaction_dict, private_key) - @combomethod def sign_transaction(self, transaction_dict, private_key): """ Sign a transaction using a local private key. - It produces signature details and the hex-encoded transaction suitable for broadcast using - :meth:`w3.eth.sendRawTransaction() `. + It produces signature details and the hex-encoded transaction suitable for + broadcast using :meth:`w3.eth.sendRawTransaction() + `. To create the transaction dict that calls a contract, use contract object: `my_contract.functions.my_function().buildTransaction() `_ - Note: For non-legacy (typed) transactions, if the transaction type is not explicitly - provided, it may be determined from the transaction parameters of a well-formed - transaction. See below for examples on how to sign with different transaction types. + Note: For non-legacy (typed) transactions, if the transaction type is not + explicitly provided, it may be determined from the transaction parameters of + a well-formed transaction. See below for examples on how to sign with + different transaction types. - :param dict transaction_dict: the transaction with available keys, depending on the type of - transaction: nonce, chainId, to, data, value, gas, gasPrice, type, accessList, - maxFeePerGas, and maxPriorityFeePerGas + :param dict transaction_dict: the transaction with available keys, depending + on the type of transaction: nonce, chainId, to, data, value, gas, gasPrice, + type, accessList, maxFeePerGas, and maxPriorityFeePerGas :param private_key: the private key to sign the data with :type private_key: hex str, bytes, int or :class:`eth_keys.datatypes.PrivateKey` :returns: Various details about the signature - most @@ -642,7 +657,7 @@ def sign_transaction(self, transaction_dict, private_key): .. code-block:: python - >>> # EIP-1559 dynamic fee transaction (more efficient and preferred over legacy txn) + >>> # EIP-1559 dynamic fee transaction (more efficient and preferred over legacy txn) # noqa: E501 >>> dynamic_fee_transaction = { "type": 2, # optional - can be implicitly determined based on max fee params # noqa: E501 "gas": 100000, @@ -665,9 +680,11 @@ def sign_transaction(self, transaction_dict, private_key): >>> key = '0x4c0883a69102937d6231471b5dbb6204fe5129617082792ae468d01a3f362318' >>> signed = Account.sign_transaction(dynamic_fee_transaction, key) {'hash': HexBytes('0x126431f2a7fda003aada7c2ce52b0ce3cbdbb1896230d3333b9eea24f42d15b0'), + 'r': 110093478023675319011132687961420618950720745285952062287904334878381994888509, - 'rawTransaction': HexBytes('0x02f8b282076c2284773594008477359400830186a09409616c3d61b3331fc4109a9e41a8bdb7d9776609865af3107a400086616263646566f838f7940000000000000000000000000000000000000001e1a0010000000000000000000000000000000000000000000000000000000000000080a0f366b34a5c206859b9778b4c909207e53443cca9e0b82e0b94bc4b47e6434d3da04a731eda413a944d4ea2d2236671e586e57388d0e9d40db53044ae4089f2aec8'), # noqa: E501 + 'rawTransaction': HexBytes('0x02f8b282076c2284773594008477359400830186a09409616c3d61b3331fc4109a9e41a8bdb7d9776609865af3107a400086616263646566f838f7940000000000000000000000000000000000000001e1a0010000000000000000000000000000000000000000000000000000000000000080a0f366b34a5c206859b9778b4c909207e53443cca9e0b82e0b94bc4b47e6434d3da04a731eda413a944d4ea2d2236671e586e57388d0e9d40db53044ae4089f2aec8'), 's': 33674551144139401179914073499472892825822542092106065756005379322302694600392, + 'v': 0} >>> w3.eth.sendRawTransaction(signed.rawTransaction) @@ -687,7 +704,7 @@ def sign_transaction(self, transaction_dict, private_key): >>> signed = Account.sign_transaction(legacy_transaction, key) {'hash': HexBytes('0x6893a6ee8df79b0f5d64a180cd1ef35d030f3e296a5361cf04d02ce720d32ec5'), 'r': 4487286261793418179817841024889747115779324305375823110249149479905075174044, - 'rawTransaction': HexBytes('0xf86a8086d55698372431831e848094f0109fc8df283027b6285cc889f5aa624eac1f55843b9aca008025a009ebb6ca057a0535d6186462bc0b465b561c94a295bdb0621fc19208ab149a9ca0440ffd775ce91a833ab410777204d5341a6f9fa91216a6f3ee2c051fea6a0428'), # noqa: E501 + 'rawTransaction': HexBytes('0xf86a8086d55698372431831e848094f0109fc8df283027b6285cc889f5aa624eac1f55843b9aca008025a009ebb6ca057a0535d6186462bc0b465b561c94a295bdb0621fc19208ab149a9ca0440ffd775ce91a833ab410777204d5341a6f9fa91216a6f3ee2c051fea6a0428'), 's': 30785525769477805655994251009256770582792548537338581640010273753578382951464, 'v': 37} >>> w3.eth.sendRawTransaction(signed.rawTransaction) @@ -722,19 +739,24 @@ def sign_transaction(self, transaction_dict, private_key): >>> w3.eth.sendRawTransaction(signed.rawTransaction) """ if not isinstance(transaction_dict, Mapping): - raise TypeError("transaction_dict must be dict-like, got %r" % transaction_dict) + raise TypeError( + "transaction_dict must be dict-like, got %r" % transaction_dict + ) account = self.from_key(private_key) # allow from field, *only* if it matches the private key - if 'from' in transaction_dict: - if transaction_dict['from'] == account.address: - sanitized_transaction = dissoc(transaction_dict, 'from') + if "from" in transaction_dict: + if transaction_dict["from"] == account.address: + sanitized_transaction = dissoc(transaction_dict, "from") else: - raise TypeError("from field must match key's %s, but it was %s" % ( - account.address, - transaction_dict['from'], - )) + raise TypeError( + "from field must match key's %s, but it was %s" + % ( + account.address, + transaction_dict["from"], + ) + ) else: sanitized_transaction = transaction_dict @@ -760,12 +782,14 @@ def _parsePrivateKey(self, key): """ Generate a :class:`eth_keys.datatypes.PrivateKey` from the provided key. - If the key is already of type :class:`eth_keys.datatypes.PrivateKey`, return the key. + If the key is already of type :class:`eth_keys.datatypes.PrivateKey`, + return the key. :param key: the private key from which a :class:`eth_keys.datatypes.PrivateKey` will be generated :type key: hex str, bytes, int or :class:`eth_keys.datatypes.PrivateKey` - :returns: the provided key represented as a :class:`eth_keys.datatypes.PrivateKey` + :returns: the provided key represented as a + :class:`eth_keys.datatypes.PrivateKey` """ if isinstance(key, self._keys.PrivateKey): return key diff --git a/eth_account/hdaccount/__init__.py b/eth_account/hdaccount/__init__.py index 94af87c..98b9d35 100644 --- a/eth_account/hdaccount/__init__.py +++ b/eth_account/hdaccount/__init__.py @@ -21,10 +21,11 @@ def seed_from_mnemonic(words: str, passphrase: str) -> bytes: expanded_words = Mnemonic(lang).expand(words) if not Mnemonic(lang).is_mnemonic_valid(expanded_words): raise ValidationError( - f"Provided words: '{expanded_words}', are not a valid BIP39 mnemonic phrase!" + f"Provided words: '{expanded_words}', are not a " + "valid BIP39 mnemonic phrase!" ) return Mnemonic.to_seed(expanded_words, passphrase) -def key_from_seed(seed: bytes, account_path: str): +def key_from_seed(seed: bytes, account_path: str) -> bytes: return HDPath(account_path).derive(seed) diff --git a/eth_account/hdaccount/_utils.py b/eth_account/hdaccount/_utils.py index 7179078..385973e 100644 --- a/eth_account/hdaccount/_utils.py +++ b/eth_account/hdaccount/_utils.py @@ -16,7 +16,9 @@ ) PBKDF2_ROUNDS = 2048 -SECP256K1_N = int("FFFFFFFF_FFFFFFFF_FFFFFFFF_FFFFFFFE_BAAEDCE6_AF48A03B_BFD25E8C_D0364141", 16) +SECP256K1_N = int( + "FFFFFFFF_FFFFFFFF_FFFFFFFF_FFFFFFFE_BAAEDCE6_AF48A03B_BFD25E8C_D0364141", 16 +) def normalize_string(txt: Union[str, bytes]) -> str: @@ -56,4 +58,4 @@ def ec_point(pkey: bytes) -> bytes: Note: Result is ecdsa public key serialized to compressed form """ - return keys.PrivateKey(HexBytes(pkey)).public_key.to_compressed_bytes() # type: ignore + return keys.PrivateKey(HexBytes(pkey)).public_key.to_compressed_bytes() # type: ignore # noqa: E501 diff --git a/eth_account/hdaccount/deterministic.py b/eth_account/hdaccount/deterministic.py index 56a3223..733380f 100644 --- a/eth_account/hdaccount/deterministic.py +++ b/eth_account/hdaccount/deterministic.py @@ -12,7 +12,8 @@ ----- * Integers are modulo the order of the curve (referred to as n). -* Addition (+) of two coordinate pair is defined as application of the EC group operation. +* Addition (+) of two coordinate pair is defined as application of + the EC group operation. * Concatenation (||) is the operation of appending one byte sequence onto another. @@ -24,21 +25,25 @@ with the integer p. * ser_32(i): serialize a 32-bit unsigned integer i as a 4-byte sequence, most significant byte first. -* ser_256(p): serializes the integer p as a 32-byte sequence, most significant byte first. -* ser_P(P): serializes the coordinate pair P = (x,y) as a byte sequence using SEC1's compressed - form: (0x02 or 0x03) || ser_256(x), where the header byte depends on the parity of the - omitted y coordinate. -* parse_256(p): interprets a 32-byte sequence as a 256-bit number, most significant byte first. +* ser_256(p): serializes the integer p as a 32-byte sequence, most significant + byte first. +* ser_P(P): serializes the coordinate pair P = (x,y) as a byte sequence using SEC1's + compressed form: (0x02 or 0x03) || ser_256(x), where the header byte depends on the + parity of the omitted y coordinate. +* parse_256(p): interprets a 32-byte sequence as a 256-bit number, most significant + byte first. """ # Additional notes: -# - This module currently only implements private parent key => private child key CKD function, -# as it is not necessary to the HD key derivation functions used in this library to implement -# the other functions yet (as this module is only used for derivation of private keys). That -# could change, but wasn't deemed necessary at the time this module was introduced. -# - Unlike other libraries, this library does not use Bitcoin key serialization, because it is -# not intended to be ultimately used for Bitcoin key derivations. This presents a simplified -# API, and no expectation is given for `xpub/xpriv` key derivation. +# - This module currently only implements private parent key => private child key +# CKD function, as it is not necessary to the HD key derivation functions used +# in this library to implement the other functions yet (as this module is only +# used for derivation of private keys). That could change, but wasn't deemed +# necessary at the time this module was introduced. +# - Unlike other libraries, this library does not use Bitcoin key serialization, +# because it is not intended to be ultimately used for Bitcoin key derivations. +# This presents a simplified API, and no expectation is given for `xpub/xpriv` +# key derivation. from typing import ( Tuple, Type, @@ -71,19 +76,16 @@ class Node(int): def __new__(cls, index): if 0 > index or index > 2**31: - raise ValidationError( - f"{cls} cannot be initialized with value {index}" - ) + raise ValidationError(f"{cls} cannot be initialized with value {index}") - # mypy/typeshed bug requires type ignore: https://github.com/python/typeshed/issues/2686 - obj = int.__new__(cls, index + cls.OFFSET) # type: ignore + obj = int.__new__(cls, index + cls.OFFSET) obj.index = index return obj def __repr__(self): return f"{self.__class__.__name__}({self.index})" - def __add__(self, other: int): + def __add__(self, other: int) -> "Node": return self.__class__(self.index + other) def serialize(self) -> bytes: @@ -117,6 +119,7 @@ class SoftNode(Node): """ Soft node (unhardened), where value = index . """ + TAG = "" # No tag OFFSET = 0x0 # No offset @@ -125,6 +128,7 @@ class HardNode(Node): """ Hard node, where value = index + BIP32_HARDENED_CONSTANT . """ + TAG = "H" # "H" (or "'") means hard node (but use "H" for clarity) OFFSET = 0x80000000 # 2**31, BIP32 "Hardening constant" @@ -142,7 +146,8 @@ def derive_child_key( The function CKDpriv((k_par, c_par), i) → (k_i, c_i) computes a child extended private key from the parent extended private key: - 1. Check whether the child is a hardened key (i ≥ 2**31). If the child is a hardened key, + 1. Check whether the child is a hardened key (i ≥ 2**31). + If the child is a hardened key, let I = HMAC-SHA512(Key = c_par, Data = 0x00 || ser_256(k_par) || ser_32(i)). (Note: The 0x00 pads the private key to make it 33 bytes long.) If it is not a hardened key, then @@ -206,12 +211,12 @@ def __init__(self, path: str): if len(path) < 1: raise ValidationError("Cannot parse path from empty string.") - nodes = path.split('/') # Should at least make 1 entry in resulting list + nodes = path.split("/") # Should at least make 1 entry in resulting list if nodes[0] not in BASE_NODE_IDENTIFIERS: raise ValidationError(f'Path is not valid: "{path}". Must start with "m"') decoded_path = [] - for idx, node in enumerate(nodes[1:]): # We don't need the root node 'm' + for _idx, node in enumerate(nodes[1:]): # We don't need the root node 'm' try: decoded_path.append(Node.decode(node)) except ValidationError as err: @@ -228,8 +233,8 @@ def encode(self) -> str: """ Encodes this class to a string (reversing the decoding in the constructor). """ - encoded_path = ('m',) + tuple(node.encode() for node in self._path) - return '/'.join(encoded_path) + encoded_path = ("m",) + tuple(node.encode() for node in self._path) + return "/".join(encoded_path) def derive(self, seed: bytes) -> bytes: """ diff --git a/eth_account/hdaccount/mnemonic.py b/eth_account/hdaccount/mnemonic.py index fc4c8c0..a65acc3 100644 --- a/eth_account/hdaccount/mnemonic.py +++ b/eth_account/hdaccount/mnemonic.py @@ -72,7 +72,7 @@ def get_wordlist(language): class Mnemonic: def __init__(self, raw_language="english"): - language = raw_language.lower().replace(' ', '_') + language = raw_language.lower().replace(" ", "_") languages = Mnemonic.list_languages() if language not in languages: raise ValidationError( @@ -100,20 +100,26 @@ def detect_language(cls, raw_mnemonic): if len(matching_languages) < 1: raise ValidationError(f"Language not detected for word(s): {raw_mnemonic}") - # If both chinese simplified and chinese traditional match (because one is a subset of the - # other) then return simplified. This doesn't hold for other languages. - if len(matching_languages) == 2 and all("chinese" in lang for lang in matching_languages): + # If both chinese simplified and chinese traditional match (because one is a + # subset of the other) then return simplified. This doesn't hold for + # other languages. + if len(matching_languages) == 2 and all( + "chinese" in lang for lang in matching_languages + ): return "chinese_simplified" - # Because certain wordlists share some similar words, if we detect multiple languages - # that the provided mnemonic word(s) could be valid in, we have to throw + # Because certain wordlists share some similar words, if we detect multiple + # languages that the provided mnemonic word(s) could be valid in, we have + # to throw if len(matching_languages) > 1: - raise ValidationError(f"Word(s) are valid in multiple languages: {raw_mnemonic}") + raise ValidationError( + f"Word(s) are valid in multiple languages: {raw_mnemonic}" + ) (language,) = matching_languages return language - def generate(self, num_words=12) -> str: + def generate(self, num_words: int = 12) -> str: if num_words not in VALID_WORD_COUNTS: raise ValidationError( f"Invalid choice for number of words: {num_words}, should be one of " @@ -121,7 +127,7 @@ def generate(self, num_words=12) -> str: ) return self.to_mnemonic(os.urandom(4 * num_words // 3)) # 4/3 bytes per word - def to_mnemonic(self, entropy) -> str: + def to_mnemonic(self, entropy: bytes) -> str: entropy_size = len(entropy) if entropy_size not in VALID_ENTROPY_SIZES: raise ValidationError( @@ -136,12 +142,14 @@ def to_mnemonic(self, entropy) -> str: checksum.frombytes(sha256(entropy)) # Add enough bits from the checksum to make it modulo 11 (2**11 = 2048) - bits.extend(checksum[:entropy_size // 4]) - indices = tuple(ba2int(bits[i * 11: (i + 1) * 11]) for i in range(len(bits) // 11)) + bits.extend(checksum[: entropy_size // 4]) + indices = tuple( + ba2int(bits[i * 11 : (i + 1) * 11]) for i in range(len(bits) // 11) + ) words = tuple(self.wordlist[idx] for idx in indices) if self.language == "japanese": # Japanese must be joined by ideographic space. - phrase = u"\u3000".join(words) + phrase = "\u3000".join(words) else: phrase = " ".join(words) return phrase @@ -167,11 +175,11 @@ def is_mnemonic_valid(self, mnemonic): # Checksum the raw entropy bits checksum = bitarray() - checksum.frombytes(sha256(encoded_seed[:entropy_size * 8].tobytes())) - computed_checksum = checksum[:len(encoded_seed) - entropy_size * 8].tobytes() + checksum.frombytes(sha256(encoded_seed[: entropy_size * 8].tobytes())) + computed_checksum = checksum[: len(encoded_seed) - entropy_size * 8].tobytes() # Extract the stored checksum bits - stored_checksum = encoded_seed[entropy_size * 8:].tobytes() + stored_checksum = encoded_seed[entropy_size * 8 :].tobytes() # Check that the stored matches the relevant slice of the actual checksum # NOTE: Use secrets.compare_digest for protection again timing attacks @@ -195,17 +203,18 @@ def expand(self, mnemonic): @classmethod def to_seed(cls, checked_mnemonic: str, passphrase: str = "") -> bytes: """ - :param str checked_mnemonic: Must be a correct, fully-expanded BIP39 seed phrase. - :param str passphrase: Encryption passphrase used to secure the mnemonic. + :param str checked_mnemonic: Must be a correct, fully-expanded BIP39 seed phrase + :param str passphrase: Encryption passphrase used to secure the mnemonic :returns bytes: 64 bytes of raw seed material from PRNG """ mnemonic = normalize_string(checked_mnemonic) - # NOTE: This domain separater ("mnemonic") is added per BIP39 spec to the passphrase - # https://github.com/bitcoin/bips/blob/master/bip-0039.mediawiki#from-mnemonic-to-seed + # NOTE: This domain separater ("mnemonic") is added per BIP39 spec + # to the passphrase + # https://github.com/bitcoin/bips/blob/master/bip-0039.mediawiki#from-mnemonic-to-seed # noqa: E501 salt = "mnemonic" + normalize_string(passphrase) # From BIP39: # To create a binary seed from the mnemonic, we use the PBKDF2 function with a - # mnemonic sentence (in UTF-8 NFKD) used as the password and the string "mnemonic" - # and passphrase (again in UTF-8 NFKD) used as the salt. + # mnemonic sentence (in UTF-8 NFKD) used as the password and the string + # "mnemonic" and passphrase (again in UTF-8 NFKD) used as the salt. stretched = pbkdf2_hmac_sha512(mnemonic, salt) return stretched[:64] diff --git a/eth_account/messages.py b/eth_account/messages.py index d9927fc..b085259 100644 --- a/eth_account/messages.py +++ b/eth_account/messages.py @@ -43,8 +43,8 @@ class SignableMessage(NamedTuple): A message compatible with EIP-191_ that is ready to be signed. The properties are components of an EIP-191_ signable message. Other message formats - can be encoded into this format for easy signing. This data structure doesn't need to - know about the original message format. For example, you can think of + can be encoded into this format for easy signing. This data structure doesn't need + to know about the original message format. For example, you can think of EIP-712 as compiling down to an EIP-191 message. In typical usage, you should never need to create these by hand. Instead, use @@ -56,6 +56,7 @@ class SignableMessage(NamedTuple): .. _EIP-191: https://eips.ethereum.org/EIPS/eip-191 """ + version: bytes # must be length 1 header: bytes # aka "version specific data" body: bytes # aka "data to sign" @@ -69,28 +70,29 @@ def _hash_eip191_message(signable_message: SignableMessage) -> Hash32: "The EIP-191 signable message standard only supports one-byte versions." ) - joined = b'\x19' + version + signable_message.header + signable_message.body + joined = b"\x19" + version + signable_message.header + signable_message.body return Hash32(keccak(joined)) # watch for updates to signature format def encode_intended_validator( - validator_address: Union[Address, str], - primitive: bytes = None, - *, - hexstr: str = None, - text: str = None) -> SignableMessage: + validator_address: Union[Address, str], + primitive: bytes = None, + *, + hexstr: str = None, + text: str = None, +) -> SignableMessage: """ - Encode a message using the "intended validator" approach (ie~ version 0) defined in EIP-191_. + Encode a message using the "intended validator" approach (ie~ version 0) + defined in EIP-191_. Supply the message as exactly one of these three arguments: bytes as a primitive, a hex string, or a unicode string. .. WARNING:: Note that this code has not gone through an external audit. - Also, watch for updates to the format, as the EIP is still in DRAFT. - :param validator_address: which on-chain contract is capable of validating this message, - provided as a checksummed address or in native bytes. + :param validator_address: which on-chain contract is capable of validating this + message, provided as a checksummed address or in native bytes. :param primitive: the binary message to be signed :type primitive: bytes or int :param str hexstr: the message encoded as hex @@ -105,21 +107,23 @@ def encode_intended_validator( "It must be a checksum address, or an address converted to bytes." ) # The validator_address is a str or Address (which is a subtype of bytes). Both of - # these are AnyStr, which includes str and bytes. Not sure why mypy complains here... + # these are AnyStr, which includes str and bytes. + # Not sure why mypy complains here... canonical_address = to_canonical_address(validator_address) message_bytes = to_bytes(primitive, hexstr=hexstr, text=text) return SignableMessage( - HexBytes(b'\x00'), # version 0, as defined in EIP-191 + HexBytes(b"\x00"), # version 0, as defined in EIP-191 canonical_address, message_bytes, ) def encode_structured_data( - primitive: Union[bytes, int, Mapping] = None, - *, - hexstr: str = None, - text: str = None) -> SignableMessage: + primitive: Union[bytes, int, Mapping] = None, + *, + hexstr: str = None, + text: str = None, +) -> SignableMessage: """ Encode an EIP-712_ message. @@ -134,7 +138,6 @@ def encode_structured_data( .. WARNING:: Note that this code has not gone through an external audit, and the test cases are incomplete. - Also, watch for updates to the format, as the EIP is still in DRAFT. :param primitive: the binary message to be signed :type primitive: bytes or int or Mapping (eg~ dict ) @@ -151,17 +154,15 @@ def encode_structured_data( message_string = to_text(primitive, hexstr=hexstr, text=text) structured_data = load_and_validate_structured_message(message_string) return SignableMessage( - HexBytes(b'\x01'), + HexBytes(b"\x01"), hash_domain(structured_data), hash_eip712_message(structured_data), ) def encode_defunct( - primitive: bytes = None, - *, - hexstr: str = None, - text: str = None) -> SignableMessage: + primitive: bytes = None, *, hexstr: str = None, text: str = None +) -> SignableMessage: r""" Encode a message for signing, using an old, unrecommended approach. @@ -170,11 +171,12 @@ def encode_defunct( EIP-191 defines this as "version ``E``". - .. NOTE: This standard includes the number of bytes in the message as a part of the header. - Awkwardly, the number of bytes in the message is encoded in decimal ascii. - So if the message is 'abcde', then the length is encoded as the ascii - character '5'. This is one of the reasons that this message format is not preferred. - There is ambiguity when the message '00' is encoded, for example. + .. NOTE: This standard includes the number of bytes in the message as a part of + the header. Awkwardly, the number of bytes in the message is encoded in + decimal ascii. So if the message is 'abcde', then the length is encoded + as the ascii character '5'. This is one of the reasons that this message + format is not preferred. There is ambiguity when the message '00' is + encoded, for example. Supply exactly one of the three arguments: bytes, a hex string, or a unicode string. @@ -191,43 +193,52 @@ def encode_defunct( >>> message_text = "I♥SF" >>> encode_defunct(text=message_text) - SignableMessage(version=b'E', header=b'thereum Signed Message:\n6', body=b'I\xe2\x99\xa5SF') + SignableMessage(version=b'E', + header=b'thereum Signed Message:\n6', + body=b'I\xe2\x99\xa5SF') These four also produce the same hash: >>> encode_defunct(to_bytes(text=message_text)) - SignableMessage(version=b'E', header=b'thereum Signed Message:\n6', body=b'I\xe2\x99\xa5SF') + SignableMessage(version=b'E', + header=b'thereum Signed Message:\n6', + body=b'I\xe2\x99\xa5SF') >>> encode_defunct(bytes(message_text, encoding='utf-8')) - SignableMessage(version=b'E', header=b'thereum Signed Message:\n6', body=b'I\xe2\x99\xa5SF') + SignableMessage(version=b'E', + header=b'thereum Signed Message:\n6', + body=b'I\xe2\x99\xa5SF') >>> to_hex(text=message_text) '0x49e299a55346' >>> encode_defunct(hexstr='0x49e299a55346') - SignableMessage(version=b'E', header=b'thereum Signed Message:\n6', body=b'I\xe2\x99\xa5SF') + SignableMessage(version=b'E', + header=b'thereum Signed Message:\n6', + body=b'I\xe2\x99\xa5SF') >>> encode_defunct(0x49e299a55346) - SignableMessage(version=b'E', header=b'thereum Signed Message:\n6', body=b'I\xe2\x99\xa5SF') + SignableMessage(version=b'E', + header=b'thereum Signed Message:\n6', + body=b'I\xe2\x99\xa5SF') """ message_bytes = to_bytes(primitive, hexstr=hexstr, text=text) - msg_length = str(len(message_bytes)).encode('utf-8') + msg_length = str(len(message_bytes)).encode("utf-8") # Encoding version E defined by EIP-191 return SignableMessage( - b'E', - b'thereum Signed Message:\n' + msg_length, + b"E", + b"thereum Signed Message:\n" + msg_length, message_bytes, ) def defunct_hash_message( - primitive: bytes = None, - *, - hexstr: str = None, - text: str = None) -> HexBytes: + primitive: bytes = None, *, hexstr: str = None, text: str = None +) -> HexBytes: """ Convert the provided message into a message hash, to be signed. - .. CAUTION:: Intented for use with the deprecated :meth:`eth_account.account.Account.signHash`. + .. CAUTION:: Intended for use with the deprecated + :meth:`eth_account.account.Account.signHash`. This is for backwards compatibility only. All new implementations should use :meth:`encode_defunct` instead. diff --git a/eth_account/signers/base.py b/eth_account/signers/base.py index 58d3ade..6bd5a84 100644 --- a/eth_account/signers/base.py +++ b/eth_account/signers/base.py @@ -3,6 +3,9 @@ abstractmethod, ) +from eth_account.datastructures import ( + SignedMessage, +) from eth_account.messages import ( SignableMessage, ) @@ -28,7 +31,7 @@ def address(self): pass @abstractmethod - def sign_message(self, signable_message: SignableMessage): + def sign_message(self, signable_message: SignableMessage) -> SignedMessage: """ Sign the EIP-191_ message. @@ -51,7 +54,8 @@ def signHash(self, message_hash): as in :meth:`~eth_account.account.Account.signHash` but without specifying the private key. - .. CAUTION:: Deprecated for :meth:`~eth_account.signers.base.BaseAccount.sign_message`. + .. CAUTION:: Deprecated for + :meth:`~eth_account.signers.base.BaseAccount.sign_message`. To be removed in v0.6 :param bytes message_hash: 32 byte hash of the message to sign @@ -67,7 +71,8 @@ def signTransaction(self, transaction_dict): :meth:`~eth_account.account.Account.sign_transaction` but without specifying the private key. - .. CAUTION:: Deprecated for :meth:`~eth_account.account.signers.local.sign_transaction`. + .. CAUTION:: Deprecated for + :meth:`~eth_account.account.signers.local.sign_transaction`. This method will be removed in v0.6 :param dict transaction_dict: transaction with all fields specified diff --git a/eth_account/signers/local.py b/eth_account/signers/local.py index ce78b15..8543040 100644 --- a/eth_account/signers/local.py +++ b/eth_account/signers/local.py @@ -7,27 +7,29 @@ class LocalAccount(BaseAccount): r""" - A collection of convenience methods to sign and encrypt, with an embedded private key. + A collection of convenience methods to sign and encrypt, with an + embedded private key. :var bytes key: the 32-byte private key data .. code-block:: python - >>> my_local_account.address # doctest: +SKIP + >>> my_local_account.address "0xF0109fC8DF283027b6285cc889F5aA624EaC1F55" - >>> my_local_account.key # doctest: +SKIP + >>> my_local_account.key b"\x01\x23..." You can also get the private key by casting the account to :class:`bytes`: .. code-block:: python - >>> bytes(my_local_account) # doctest: +SKIP + >>> bytes(my_local_account) b"\\x01\\x23..." """ + def __init__(self, key, account): """ - Initialize a new account with the the given private key. + Initialize a new account with the given private key. :param eth_keys.PrivateKey key: to prefill in private key execution :param ~eth_account.account.Account account: the key-unaware management API @@ -45,18 +47,6 @@ def __init__(self, key, account): def address(self): return self._address - @property - def privateKey(self): - """ - .. CAUTION:: Deprecated for :meth:`~eth_account.signers.local.LocalAccount.key`. - This attribute will be removed in v0.5 - """ - warnings.warn( - "privateKey is deprecated in favor of key", - category=DeprecationWarning, - ) - return self._private_key - @property def key(self): """ @@ -69,9 +59,12 @@ def encrypt(self, password, kdf=None, iterations=None): Generate a string with the encrypted key. This uses the same structure as in - :meth:`~eth_account.account.Account.encrypt`, but without a private key argument. + :meth:`~eth_account.account.Account.encrypt`, but without a + private key argument. """ - return self._publicapi.encrypt(self.key, password, kdf=kdf, iterations=iterations) + return self._publicapi.encrypt( + self.key, password, kdf=kdf, iterations=iterations + ) def signHash(self, message_hash): return self._publicapi.signHash( @@ -84,7 +77,8 @@ def sign_message(self, signable_message): Generate a string with the encrypted key. This uses the same structure as in - :meth:`~eth_account.account.Account.sign_message`, but without a private key argument. + :meth:`~eth_account.account.Account.sign_message`, but without a + private key argument. """ return self._publicapi.sign_message(signable_message, private_key=self.key) @@ -92,6 +86,7 @@ def signTransaction(self, transaction_dict): warnings.warn( "signTransaction is deprecated in favor of sign_transaction", category=DeprecationWarning, + stacklevel=2, ) return self.sign_transaction(transaction_dict)