From 9b9025c1a14d0263eabad814af0c04ee68e2999d Mon Sep 17 00:00:00 2001 From: kdmukai Date: Fri, 5 Aug 2022 13:51:40 -0500 Subject: [PATCH 01/11] Initial BIP-47 support Currently only implements generating the recipient's shareable payment code. --- .gitignore | 3 +++ src/embit/bip47.py | 30 ++++++++++++++++++++++++++++++ tests/tests/__init__.py | 1 + tests/tests/test_bip47.py | 21 +++++++++++++++++++++ 4 files changed, 55 insertions(+) create mode 100644 src/embit/bip47.py create mode 100644 tests/tests/test_bip47.py diff --git a/.gitignore b/.gitignore index 5fcb4e8..5a1e192 100644 --- a/.gitignore +++ b/.gitignore @@ -124,3 +124,6 @@ dmypy.json # Pyre type checker .pyre/ + +# VSCode config +.vscode/ \ No newline at end of file diff --git a/src/embit/bip47.py b/src/embit/bip47.py new file mode 100644 index 0000000..d22960e --- /dev/null +++ b/src/embit/bip47.py @@ -0,0 +1,30 @@ +from embit import base58, bip32, bip39 +from embit.bip32 import HDKey +from io import BytesIO + +""" + BIP-47: https://github.com/bitcoin/bips/blob/master/bip-0047.mediawiki +""" + +def get_payment_code(root: HDKey): + """ + Generates the recipient's BIP-47 shareable payment code (version 1) + for the input root private key. + """ + bip47_child = root.derive("m/47'/0'/0'") + + buf = BytesIO() + buf.write(b'\x01') # bip47 version + buf.write(b'\x00') # Bitmessage; always zero + buf.write(bip47_child.get_public_key().serialize()) + buf.write(bip47_child.chain_code) + buf.write(b'\00' * 13) # bytes reserved for future expansion + + return base58.encode_check(b"\x47" + buf.getvalue()) + + + +""" + TODO: Methods to support notification address, create notification transactions, + send to payment code, etc. +""" \ No newline at end of file diff --git a/tests/tests/__init__.py b/tests/tests/__init__.py index ed7043c..d69887a 100644 --- a/tests/tests/__init__.py +++ b/tests/tests/__init__.py @@ -5,6 +5,7 @@ from .test_bip32 import * from .test_psbt import * from .test_bip39 import * +from .test_bip47 import * from .test_slip39 import * from .test_descriptor import * from .test_liquid import * diff --git a/tests/tests/test_bip47.py b/tests/tests/test_bip47.py new file mode 100644 index 0000000..831fdf0 --- /dev/null +++ b/tests/tests/test_bip47.py @@ -0,0 +1,21 @@ +from embit import bip32, bip39, bip47 +from unittest import TestCase + + +class Bip47Test(TestCase): + """ + Test vectors from: https://gist.github.com/SamouraiDev/6aad669604c5930864bd + """ + def test_get_payment_code(self): + alice = "response seminar brave tip suit recall often sound stick owner lottery motion" + seed_bytes = bip39.mnemonic_to_seed(alice) + root = bip32.HDKey.from_seed(seed_bytes) + alice_payment_code = bip47.get_payment_code(root) + self.assertEqual(alice_payment_code, "PM8TJTLJbPRGxSbc8EJi42Wrr6QbNSaSSVJ5Y3E4pbCYiTHUskHg13935Ubb7q8tx9GVbh2UuRnBc3WSyJHhUrw8KhprKnn9eDznYGieTzFcwQRya4GA") + + bob = "reward upper indicate eight swift arch injury crystal super wrestle already dentist" + seed_bytes = bip39.mnemonic_to_seed(bob) + root = bip32.HDKey.from_seed(seed_bytes) + bob_payment_code = bip47.get_payment_code(root) + self.assertEqual(bob_payment_code, "PM8TJS2JxQ5ztXUpBBRnpTbcUXbUHy2T1abfrb3KkAAtMEGNbey4oumH7Hc578WgQJhPjBxteQ5GHHToTYHE3A1w6p7tU6KSoFmWBVbFGjKPisZDbP97") + From 4fbecc7ff7d7f8f2ce086b8bee435bf1ac5678c1 Mon Sep 17 00:00:00 2001 From: kdmukai Date: Fri, 5 Aug 2022 14:24:13 -0500 Subject: [PATCH 02/11] Adds convenience method to go straight from mnemonic to payment code --- src/embit/bip47.py | 17 +++++++++++++++-- tests/tests/test_bip47.py | 38 +++++++++++++++++++++++++++----------- 2 files changed, 42 insertions(+), 13 deletions(-) diff --git a/src/embit/bip47.py b/src/embit/bip47.py index d22960e..3e08c38 100644 --- a/src/embit/bip47.py +++ b/src/embit/bip47.py @@ -1,5 +1,8 @@ -from embit import base58, bip32, bip39 -from embit.bip32 import HDKey +from . import base58, bip32, bip39 +from .bip32 import HDKey +from .networks import NETWORKS +from .wordlists.bip39 import WORDLIST + from io import BytesIO """ @@ -24,6 +27,16 @@ def get_payment_code(root: HDKey): +def get_payment_code_from_mnemonic(mnemonic: str, password: str = "", wordlist=WORDLIST, version=NETWORKS["main"]["xprv"]): + """ + Convenience method + """ + seed_bytes = bip39.mnemonic_to_seed(mnemonic, password=password, wordlist=wordlist) + root = bip32.HDKey.from_seed(seed_bytes, version=version) + return get_payment_code(root) + + + """ TODO: Methods to support notification address, create notification transactions, send to payment code, etc. diff --git a/tests/tests/test_bip47.py b/tests/tests/test_bip47.py index 831fdf0..3fe5413 100644 --- a/tests/tests/test_bip47.py +++ b/tests/tests/test_bip47.py @@ -2,20 +2,36 @@ from unittest import TestCase +""" + Test vectors from: https://gist.github.com/SamouraiDev/6aad669604c5930864bd +""" +ALICE_MNEMONIC = "response seminar brave tip suit recall often sound stick owner lottery motion" +ALICE_PAYMENT_CODE = "PM8TJTLJbPRGxSbc8EJi42Wrr6QbNSaSSVJ5Y3E4pbCYiTHUskHg13935Ubb7q8tx9GVbh2UuRnBc3WSyJHhUrw8KhprKnn9eDznYGieTzFcwQRya4GA" + +BOB_MNEMONIC = "reward upper indicate eight swift arch injury crystal super wrestle already dentist" +BOB_PAYMENT_CODE = "PM8TJS2JxQ5ztXUpBBRnpTbcUXbUHy2T1abfrb3KkAAtMEGNbey4oumH7Hc578WgQJhPjBxteQ5GHHToTYHE3A1w6p7tU6KSoFmWBVbFGjKPisZDbP97" + + class Bip47Test(TestCase): - """ - Test vectors from: https://gist.github.com/SamouraiDev/6aad669604c5930864bd - """ + def test_get_payment_code(self): - alice = "response seminar brave tip suit recall often sound stick owner lottery motion" - seed_bytes = bip39.mnemonic_to_seed(alice) + # Provide `root` ourselves + seed_bytes = bip39.mnemonic_to_seed(ALICE_MNEMONIC) root = bip32.HDKey.from_seed(seed_bytes) - alice_payment_code = bip47.get_payment_code(root) - self.assertEqual(alice_payment_code, "PM8TJTLJbPRGxSbc8EJi42Wrr6QbNSaSSVJ5Y3E4pbCYiTHUskHg13935Ubb7q8tx9GVbh2UuRnBc3WSyJHhUrw8KhprKnn9eDznYGieTzFcwQRya4GA") + payment_code = bip47.get_payment_code(root) + self.assertEqual(payment_code, ALICE_PAYMENT_CODE) + + # Use the convenience method + payment_code = bip47.get_payment_code_from_mnemonic(ALICE_MNEMONIC) + self.assertEqual(payment_code, ALICE_PAYMENT_CODE) - bob = "reward upper indicate eight swift arch injury crystal super wrestle already dentist" - seed_bytes = bip39.mnemonic_to_seed(bob) + # Provide `root` ourselves + seed_bytes = bip39.mnemonic_to_seed(BOB_MNEMONIC) root = bip32.HDKey.from_seed(seed_bytes) - bob_payment_code = bip47.get_payment_code(root) - self.assertEqual(bob_payment_code, "PM8TJS2JxQ5ztXUpBBRnpTbcUXbUHy2T1abfrb3KkAAtMEGNbey4oumH7Hc578WgQJhPjBxteQ5GHHToTYHE3A1w6p7tU6KSoFmWBVbFGjKPisZDbP97") + payment_code = bip47.get_payment_code(root) + self.assertEqual(payment_code, BOB_PAYMENT_CODE) + + # Use the convenience method + payment_code = bip47.get_payment_code_from_mnemonic(BOB_MNEMONIC) + self.assertEqual(payment_code, BOB_PAYMENT_CODE) From 3ac4b046cba583d25be3bfb238b9f6968884893d Mon Sep 17 00:00:00 2001 From: kdmukai Date: Fri, 5 Aug 2022 19:54:47 -0500 Subject: [PATCH 03/11] Minor consistency edit --- src/embit/bip47.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/embit/bip47.py b/src/embit/bip47.py index 3e08c38..4381783 100644 --- a/src/embit/bip47.py +++ b/src/embit/bip47.py @@ -23,7 +23,7 @@ def get_payment_code(root: HDKey): buf.write(bip47_child.chain_code) buf.write(b'\00' * 13) # bytes reserved for future expansion - return base58.encode_check(b"\x47" + buf.getvalue()) + return base58.encode_check(b'\x47' + buf.getvalue()) From 95a6a8413a826cbc89ec066985f494fdb5a937ab Mon Sep 17 00:00:00 2001 From: kdmukai Date: Tue, 16 Aug 2022 14:40:48 -0500 Subject: [PATCH 04/11] Adds almost all of the remaining necessary methods to implement BIP-47 --- src/embit/bip47.py | 214 +++++++++++++++++++++++++++++++++++--- src/embit/script.py | 4 + tests/tests/test_bip47.py | 80 ++++++++++++-- 3 files changed, 273 insertions(+), 25 deletions(-) diff --git a/src/embit/bip47.py b/src/embit/bip47.py index 4381783..546f53b 100644 --- a/src/embit/bip47.py +++ b/src/embit/bip47.py @@ -1,20 +1,36 @@ -from . import base58, bip32, bip39 -from .bip32 import HDKey -from .networks import NETWORKS -from .wordlists.bip39 import WORDLIST +import hashlib +import hmac +import sys +from binascii import hexlify, unhexlify from io import BytesIO +from typing import Tuple + +from . import base58, ec, script +from .base import EmbitError +from .bip32 import HDKey +from .script import OPCODES +from .transaction import Transaction +if sys.implementation.name == "micropython": + import secp256k1 +else: + from .util import secp256k1 + """ BIP-47: https://github.com/bitcoin/bips/blob/master/bip-0047.mediawiki """ -def get_payment_code(root: HDKey): +class BIP47Exception(Exception): + pass + + +def get_payment_code(root: HDKey, coin: int = 0, account: int = 0) -> str: """ Generates the recipient's BIP-47 shareable payment code (version 1) for the input root private key. """ - bip47_child = root.derive("m/47'/0'/0'") + bip47_child = root.derive(f"m/47'/{coin}'/{account}'") buf = BytesIO() buf.write(b'\x01') # bip47 version @@ -26,18 +42,186 @@ def get_payment_code(root: HDKey): return base58.encode_check(b'\x47' + buf.getvalue()) +def get_derived_payment_code_node(payment_code: str, derivation_index: int) -> HDKey: + """Returns the nth derived child for the payment_code""" + raw_payment_code = base58.decode_check(payment_code) + # 0x47 0x01 0x00 (sign) (32-byte pubkey) (32-byte chain code) (13 0x00 bytes) + pubkey = ec.PublicKey.from_xonly(raw_payment_code[4:36]) + chain_code = raw_payment_code[36:68] + root = HDKey(key=pubkey, chain_code=chain_code) + payment_code_node = root.derive([derivation_index]) + return payment_code_node -def get_payment_code_from_mnemonic(mnemonic: str, password: str = "", wordlist=WORDLIST, version=NETWORKS["main"]["xprv"]): - """ - Convenience method - """ - seed_bytes = bip39.mnemonic_to_seed(mnemonic, password=password, wordlist=wordlist) - root = bip32.HDKey.from_seed(seed_bytes, version=version) - return get_payment_code(root) +def get_notification_address(payment_code: str, script_type: str = "p2pkh") -> str: + """Returns the BIP-47 notification address associated with the given payment_code""" + # Get the 0th public key derived from the payment_code + pubkey = get_derived_payment_code_node(payment_code, derivation_index=0).get_public_key() + + # TODO: Should we limit to just p2pkh? + if script_type == "p2pkh": + return script.p2pkh(pubkey).address() + elif script_type == "p2wpkh": + return script.p2wpkh(pubkey).address() + else: + raise EmbitError(f"Unsupported script_type: {script_type}") + + +def get_payment_address(payer_root: HDKey, recipient_payment_code: str, index: int, coin: int = 0, account: int = 0, script_type: str = "p2pkh") -> str: + """Called by the payer, generates the nth payment address between the payer and recipient""" + # Alice selects the 0th private key derived from her payment code ("a") + payer_key = payer_root.derive(f"m/47'/{coin}'/{account}'/0") + a = payer_key.secret + + # Alice selects the next unused public key derived from Bob's payment code, starting from zero ("B", where B = bG) + recipient_payment_code_node = get_derived_payment_code_node(recipient_payment_code, index) + B = recipient_payment_code_node.get_public_key() + + # Alice calculates a secret point (S = aB) + S = B._xonly() + secp256k1.ec_pubkey_tweak_mul(S, a) + + # Alice calculates a scalar shared secret using the x value of S (s = SHA256(Sx)) + shared_secret = hashlib.sha256(secp256k1.ec_pubkey_serialize(S)[1:33]).digest() + + # If the value of s is not in the secp256k1 group, Alice MUST increment the index used to derive Bob's public key and try again. + if not secp256k1.ec_seckey_verify(shared_secret): + # TODO: Is this a sufficient test??? + raise BIP47Exception(f"Shared secret was not valid for index {index}. Try again with the next index value.") + + # Alice uses the scalar shared secret to calculate the ephemeral public key used to generate the P2PKH address for this transaction (B' = B + sG) + shared_pubkey = secp256k1.ec_pubkey_create(shared_secret) + pub = secp256k1.ec_pubkey_combine(B._point, shared_pubkey) + shared_node = HDKey(key=ec.PublicKey.parse(secp256k1.ec_pubkey_serialize(pub)), chain_code=recipient_payment_code_node.chain_code) + + if script_type == "p2pkh": + return script.p2pkh(shared_node).address() + elif script_type == "p2wpkh": + return script.p2wpkh(shared_node).address() + else: + raise EmbitError(f"Unsupported script_type: {script_type}") + + +def get_receive_address(recipient_root: HDKey, payer_payment_code: str, index: int, coin: int = 0, account: int = 0, script_type: str = "p2pkh") -> Tuple[str, ec.PrivateKey]: + """Called by the recipient, generates the nth receive address between the payer and recipient. + + Returns the payment address and its associated private key.""" + + # Using the 0th public key derived from Alice's payment code... + payer_payment_code_node = get_derived_payment_code_node(payer_payment_code, 0) + B = payer_payment_code_node.get_public_key() + + # ...Bob calculates the nth shared secret with Alice + recipient_key = recipient_root.derive(f"m/47'/{coin}'/{account}'/{index}") + a = recipient_key.secret + + # Bob calculates a secret point (S = aB) + S = B._xonly() + secp256k1.ec_pubkey_tweak_mul(S, a) + + # Bob calculates a scalar shared secret using the x value of S (s = SHA256(Sx)) + shared_secret = hashlib.sha256(secp256k1.ec_pubkey_serialize(S)[1:33]).digest() + + # If the value of s is not in the secp256k1 group, increment the index and try again. + if not secp256k1.ec_seckey_verify(shared_secret): + # TODO: Is this a sufficient test??? + raise BIP47Exception(f"Shared secret was not valid for index {index}. Try again with the next index value.") + + # Bob uses the scalar shared secret to calculate the ephemeral public key used to generate the P2PKH address for this transaction (B' = B + sG) + shared_pubkey = secp256k1.ec_pubkey_create(shared_secret) + pub = secp256k1.ec_pubkey_combine(recipient_key.get_public_key()._point, shared_pubkey) + shared_node = HDKey(key=ec.PublicKey.parse(secp256k1.ec_pubkey_serialize(pub)), chain_code=payer_payment_code_node.chain_code) + + if script_type == "p2pkh": + payment_address = script.p2pkh(shared_node).address() + elif script_type == "p2wpkh": + payment_address = script.p2wpkh(shared_node).address() + else: + raise EmbitError(f"Unsupported script_type: {script_type}") + + # Bob calculates the private key for each ephemeral address as: b' = b + s + prv_key = secp256k1.ec_privkey_add(recipient_key.secret, shared_secret) + spending_key = ec.PrivateKey(secret=prv_key) + + return (payment_address, spending_key) + + +def get_payment_code_from_notification_tx(tx: Transaction, recipient_root: HDKey, coin: int = 0, account: int = 0) -> str: + """If the tx is a BIP-47 notification tx for the recipient, + return the new payer's embedded payment_code, else None""" + # Notification txs have one output sent to the recipient's notification addr + # and another containing the payer's payment code in an OP_RETURN payload. + if len(tx.vout) < 2: + return False + + recipient_payment_code = get_payment_code(recipient_root, coin, account) + + matches_notification_addr = False + payload = None + for vout in tx.vout: + # Notification txs include a dust payment to the recipient's notification address + if vout.script_pubkey.script_type() is not None and vout.script_pubkey.address() == get_notification_address(recipient_payment_code): + matches_notification_addr = True + continue + + # Payer's payment code will be in an OP_RETURN w/exactly 80 bytes of data + data = vout.script_pubkey.data + if data is not None and len(data) == 83 and data[0] == OPCODES.OP_RETURN and data[1] == OPCODES.OP_PUSHDATA1 and data[2] == 80: + # data = OP_RETURN OP_PUSHDATA1 (len of data) + payload = data[3:] + + if payload[0] != 1: + # Only version 1 currently supported + payload = None + continue + + if not matches_notification_addr or payload is None: + return None + + # Bob selects the designated pubkey ("A") + # (the first tx input that exposes a pubkey in scriptsig or witness) + for vin in tx.vin: + if not vin.is_segwit: + # data = (1byte len of sig) (1byte len of pubkey) + sig_len = vin.script_sig.data[0] + A = ec.PublicKey.from_string(hexlify(vin.script_sig.data[sig_len + 2:])) + break + + else: + # Witness should have [sig, pubkey] + A = ec.PublicKey.from_string(hexlify(vin.witness.items[1])) + break + + if not A or len(A.serialize()) != 33: + return None + + # Bob selects the private key associated with his notification address (0th child) + recipient_notification_node = recipient_root.derive(f"m/47'/{coin}'/{account}'/0") + b = recipient_notification_node.secret + + # Bob calculates a secret point (S = bA) + S = A._xonly() + secp256k1.ec_pubkey_tweak_mul(S, b) + + # Bob calculates the blinding factor (s = HMAC-SHA512(x, o)) + # "x" is the x value of the secret point + # "o" is the outpoint being spent by the designated input + x = secp256k1.ec_pubkey_serialize(S)[1:33] + + # TODO: Is there a better way to get the outpoint? + o = vin.to_string()[:72] + s = unhexlify(hmac.new(unhexlify(o), x, hashlib.sha512).hexdigest()) + # Bob interprets the 80 byte payload as a payment code, except: + # Replace the x (pubkey) value with x' (x' = x XOR (first 32 bytes of s)) + # Replace the chain code with c' (c' = c XOR (last 32 bytes of s)) + # payment code: 0x01 0x00 (sign) (32-byte pubkey) (32-byte chain code) (13 0x00 bytes) + x_prime = b''.join([(a ^ b).to_bytes(1, byteorder='little') for (a,b) in zip(payload[3:35], s[:32])]) + c_prime = b''.join([(a ^ b).to_bytes(1, byteorder='little') for (a,b) in zip(payload[35:67], s[-32:])]) + raw_payment_code = payload[0:3] + x_prime + c_prime + payload[-13:] + return base58.encode_check(b'\x47' + raw_payment_code) + """ - TODO: Methods to support notification address, create notification transactions, - send to payment code, etc. + TODO: Method to create notification transaction, etc. """ \ No newline at end of file diff --git a/src/embit/script.py b/src/embit/script.py index 0524091..fb3a2f1 100644 --- a/src/embit/script.py +++ b/src/embit/script.py @@ -12,6 +12,10 @@ SIGHASH_ALL = 1 +class OPCODES: + OP_RETURN = 106 + OP_PUSHDATA1 = 76 + class Script(EmbitBase): def __init__(self, data=b""): diff --git a/tests/tests/test_bip47.py b/tests/tests/test_bip47.py index 3fe5413..b548e5f 100644 --- a/tests/tests/test_bip47.py +++ b/tests/tests/test_bip47.py @@ -1,37 +1,97 @@ -from embit import bip32, bip39, bip47 from unittest import TestCase +from embit import bip32, bip39, bip47, script +from embit.transaction import Transaction + """ Test vectors from: https://gist.github.com/SamouraiDev/6aad669604c5930864bd """ ALICE_MNEMONIC = "response seminar brave tip suit recall often sound stick owner lottery motion" ALICE_PAYMENT_CODE = "PM8TJTLJbPRGxSbc8EJi42Wrr6QbNSaSSVJ5Y3E4pbCYiTHUskHg13935Ubb7q8tx9GVbh2UuRnBc3WSyJHhUrw8KhprKnn9eDznYGieTzFcwQRya4GA" +ALICE_NOTIFICATION_ADDR = "1JDdmqFLhpzcUwPeinhJbUPw4Co3aWLyzW" +ALICE_PAYS_BOB_ADDRS = [ + "141fi7TY3h936vRUKh1qfUZr8rSBuYbVBK", + "12u3Uued2fuko2nY4SoSFGCoGLCBUGPkk6", + "1FsBVhT5dQutGwaPePTYMe5qvYqqjxyftc", + "1CZAmrbKL6fJ7wUxb99aETwXhcGeG3CpeA", + "1KQvRShk6NqPfpr4Ehd53XUhpemBXtJPTL", + "1KsLV2F47JAe6f8RtwzfqhjVa8mZEnTM7t", + "1DdK9TknVwvBrJe7urqFmaxEtGF2TMWxzD", + "16DpovNuhQJH7JUSZQFLBQgQYS4QB9Wy8e", + "17qK2RPGZMDcci2BLQ6Ry2PDGJErrNojT5", + "1GxfdfP286uE24qLZ9YRP3EWk2urqXgC4s", +] BOB_MNEMONIC = "reward upper indicate eight swift arch injury crystal super wrestle already dentist" BOB_PAYMENT_CODE = "PM8TJS2JxQ5ztXUpBBRnpTbcUXbUHy2T1abfrb3KkAAtMEGNbey4oumH7Hc578WgQJhPjBxteQ5GHHToTYHE3A1w6p7tU6KSoFmWBVbFGjKPisZDbP97" +BOB_NOTIFICATION_ADDR = "1ChvUUvht2hUQufHBXF8NgLhW8SwE2ecGV" + +ALICE_NOTIFICATION_TX_FOR_BOB = "010000000186f411ab1c8e70ae8a0795ab7a6757aea6e4d5ae1826fc7b8f00c597d500609c010000006b483045022100ac8c6dbc482c79e86c18928a8b364923c774bfdbd852059f6b3778f2319b59a7022029d7cc5724e2f41ab1fcfc0ba5a0d4f57ca76f72f19530ba97c860c70a6bf0a801210272d83d8a1fa323feab1c085157a0791b46eba34afb8bfbfaeb3a3fcc3f2c9ad8ffffffff0210270000000000001976a9148066a8e7ee82e5c5b9b7dc1765038340dc5420a988ac1027000000000000536a4c50010002063e4eb95e62791b06c50e1a3a942e1ecaaa9afbbeb324d16ae6821e091611fa96c0cf048f607fe51a0327f5e2528979311c78cb2de0d682c61e1180fc3d543b0000000000000000000000000000000000" class Bip47Test(TestCase): def test_get_payment_code(self): - # Provide `root` ourselves + """Alice & Bob's payment codes should match the test vectors in BIP-47""" + # Generate Alice's payment code seed_bytes = bip39.mnemonic_to_seed(ALICE_MNEMONIC) root = bip32.HDKey.from_seed(seed_bytes) payment_code = bip47.get_payment_code(root) self.assertEqual(payment_code, ALICE_PAYMENT_CODE) - # Use the convenience method - payment_code = bip47.get_payment_code_from_mnemonic(ALICE_MNEMONIC) - self.assertEqual(payment_code, ALICE_PAYMENT_CODE) - - # Provide `root` ourselves + # Generate Bob's payment code seed_bytes = bip39.mnemonic_to_seed(BOB_MNEMONIC) root = bip32.HDKey.from_seed(seed_bytes) payment_code = bip47.get_payment_code(root) self.assertEqual(payment_code, BOB_PAYMENT_CODE) + - # Use the convenience method - payment_code = bip47.get_payment_code_from_mnemonic(BOB_MNEMONIC) - self.assertEqual(payment_code, BOB_PAYMENT_CODE) + def test_get_notification_address(self): + """Alice & Bob's derived notification addresses should match the test vectors in BIP-47""" + self.assertEqual(bip47.get_notification_address(ALICE_PAYMENT_CODE), ALICE_NOTIFICATION_ADDR) + self.assertEqual(bip47.get_notification_address(BOB_PAYMENT_CODE), BOB_NOTIFICATION_ADDR) + + + def test_get_payment_address(self): + """Alice's payment addresses to Bob's payment code should match the test vector addresses in BIP-47""" + seed_bytes = bip39.mnemonic_to_seed(ALICE_MNEMONIC) + payer_root = bip32.HDKey.from_seed(seed_bytes) + + for i, addr in enumerate(ALICE_PAYS_BOB_ADDRS): + payment_addr = bip47.get_payment_address( + payer_root=payer_root, + recipient_payment_code=BOB_PAYMENT_CODE, + index=i) + self.assertEqual(addr, payment_addr) + + + def test_get_receive_address(self): + """Bob (the recipient) should be able to use Alice's payment code to generate the same addresses that Alice (the payer) generated""" + seed_bytes = bip39.mnemonic_to_seed(BOB_MNEMONIC) + recipient_root = bip32.HDKey.from_seed(seed_bytes) + + for i, addr in enumerate(ALICE_PAYS_BOB_ADDRS): + payment_addr, spending_key = bip47.get_receive_address( + recipient_root=recipient_root, + payer_payment_code=ALICE_PAYMENT_CODE, + index=i) + self.assertEqual(addr, payment_addr) + self.assertEqual(addr, script.p2pkh(spending_key.get_public_key()).address()) + + # TODO: Verify that the spending_keys can successfully sign a tx for their associated payment_addr. + + + def test_get_payment_code_from_notification_tx(self): + """Bob (the recipient) should be able to decode Alice's payment code from her notification tx""" + tx = Transaction.from_string(ALICE_NOTIFICATION_TX_FOR_BOB) + seed_bytes = bip39.mnemonic_to_seed(BOB_MNEMONIC) + recipient_root = bip32.HDKey.from_seed(seed_bytes) + + payer_payment_code = bip47.get_payment_code_from_notification_tx(tx, recipient_root) + self.assertEqual(payer_payment_code, ALICE_PAYMENT_CODE) + # Any other root should fail + seed_bytes = bip39.mnemonic_to_seed("abandon " * 11 + "about") + other_root = bip32.HDKey.from_seed(seed_bytes) + self.assertEqual(bip47.get_payment_code_from_notification_tx(tx, other_root), None) From 5cef20a8b2d3f3e69d73f3f71bc40cabecb61664 Mon Sep 17 00:00:00 2001 From: kdmukai Date: Wed, 17 Aug 2022 13:38:13 -0500 Subject: [PATCH 05/11] Adds payer-side generation of blinded payment code OP_RETURN payload --- src/embit/bip47.py | 40 ++++++++++++++++++++++++++++++++++++--- tests/tests/test_bip47.py | 18 +++++++++++++++++- 2 files changed, 54 insertions(+), 4 deletions(-) diff --git a/src/embit/bip47.py b/src/embit/bip47.py index 546f53b..a82d827 100644 --- a/src/embit/bip47.py +++ b/src/embit/bip47.py @@ -146,6 +146,42 @@ def get_receive_address(recipient_root: HDKey, payer_payment_code: str, index: i return (payment_address, spending_key) +def get_blinded_payment_code(payer_payment_code: str, input_utxo_private_key: ec.PrivateKey, input_utxo_outpoint: str, recipient_payment_code: str): + """Called by the payer, returns the blinded payload for the payer's notification tx + that is sent to the recipient while spending the input_utxo. The blinded payload + should be inserted as OP_RETURN data.""" + # TODO: method signature was made to easily match the BIP-47 test vector data, but + # isn't necessarily what might be ideal for real-world usage. + + # Alice selects the private key ("a") corresponding to the designated pubkey + a = input_utxo_private_key.secret + + # Alice selects the public key associated with Bob's notification address (B, where B = bG) + B = get_derived_payment_code_node(recipient_payment_code, 0).get_public_key() + + # Alice calculates a secret point (S = aB) + S = B._xonly() + secp256k1.ec_pubkey_tweak_mul(S, a) + + # Alice calculates a 64 byte blinding factor (s = HMAC-SHA512(x, o)) + # "x" is the x value of the secret point + # "o" is the outpoint being spent by the designated input + x = secp256k1.ec_pubkey_serialize(S)[1:33] + o = input_utxo_outpoint + s = unhexlify(hmac.new(unhexlify(o), x, hashlib.sha512).hexdigest()) + + # Alice serializes her payment code in binary form + raw_payment_code = base58.decode_check(payer_payment_code)[1:] # omit the 0x47 leading byte + + # Alice renders her payment code (P) unreadable to anyone except Bob + # Replace the x (pubkey) value with x' (x' = x XOR (first 32 bytes of s)) + # Replace the chain code with c' (c' = c XOR (last 32 bytes of s)) + # payment code: 0x01 0x00 (sign) (32-byte pubkey) (32-byte chain code) (13 0x00 bytes) + x_prime = b''.join([(a ^ b).to_bytes(1, byteorder='little') for (a,b) in zip(raw_payment_code[3:35], s[:32])]) + c_prime = b''.join([(a ^ b).to_bytes(1, byteorder='little') for (a,b) in zip(raw_payment_code[35:67], s[-32:])]) + return hexlify(raw_payment_code[0:3] + x_prime + c_prime + raw_payment_code[-13:]).decode() + + def get_payment_code_from_notification_tx(tx: Transaction, recipient_root: HDKey, coin: int = 0, account: int = 0) -> str: """If the tx is a BIP-47 notification tx for the recipient, return the new payer's embedded payment_code, else None""" @@ -207,9 +243,7 @@ def get_payment_code_from_notification_tx(tx: Transaction, recipient_root: HDKey # "x" is the x value of the secret point # "o" is the outpoint being spent by the designated input x = secp256k1.ec_pubkey_serialize(S)[1:33] - - # TODO: Is there a better way to get the outpoint? - o = vin.to_string()[:72] + o = vin.to_string()[:72] # TODO: Is there a better way to get the outpoint? s = unhexlify(hmac.new(unhexlify(o), x, hashlib.sha512).hexdigest()) # Bob interprets the 80 byte payload as a payment code, except: diff --git a/tests/tests/test_bip47.py b/tests/tests/test_bip47.py index b548e5f..1d52467 100644 --- a/tests/tests/test_bip47.py +++ b/tests/tests/test_bip47.py @@ -1,7 +1,8 @@ from unittest import TestCase -from embit import bip32, bip39, bip47, script +from embit import bip32, bip39, bip47, script, ec from embit.transaction import Transaction +from binascii import hexlify, unhexlify """ @@ -10,6 +11,9 @@ ALICE_MNEMONIC = "response seminar brave tip suit recall often sound stick owner lottery motion" ALICE_PAYMENT_CODE = "PM8TJTLJbPRGxSbc8EJi42Wrr6QbNSaSSVJ5Y3E4pbCYiTHUskHg13935Ubb7q8tx9GVbh2UuRnBc3WSyJHhUrw8KhprKnn9eDznYGieTzFcwQRya4GA" ALICE_NOTIFICATION_ADDR = "1JDdmqFLhpzcUwPeinhJbUPw4Co3aWLyzW" +ALICE_NOTIFICATION_INPUT_PRIVATE_KEY = "Kx983SRhAZpAhj7Aac1wUXMJ6XZeyJKqCxJJ49dxEbYCT4a1ozRD" +ALICE_NOTIFICATION_INPUT_OUTPOINT = "86f411ab1c8e70ae8a0795ab7a6757aea6e4d5ae1826fc7b8f00c597d500609c01000000" +ALICE_NOTIFICATION_BLINDED_PAYLOAD = "010002063e4eb95e62791b06c50e1a3a942e1ecaaa9afbbeb324d16ae6821e091611fa96c0cf048f607fe51a0327f5e2528979311c78cb2de0d682c61e1180fc3d543b00000000000000000000000000" ALICE_PAYS_BOB_ADDRS = [ "141fi7TY3h936vRUKh1qfUZr8rSBuYbVBK", "12u3Uued2fuko2nY4SoSFGCoGLCBUGPkk6", @@ -82,6 +86,18 @@ def test_get_receive_address(self): # TODO: Verify that the spending_keys can successfully sign a tx for their associated payment_addr. + def test_get_blinded_payment_code(self): + input_utxo_private_key = ec.PrivateKey.from_string(ALICE_NOTIFICATION_INPUT_PRIVATE_KEY) + blinded_payload = bip47.get_blinded_payment_code( + payer_payment_code=ALICE_PAYMENT_CODE, + input_utxo_private_key=input_utxo_private_key, + input_utxo_outpoint=ALICE_NOTIFICATION_INPUT_OUTPOINT, + recipient_payment_code=BOB_PAYMENT_CODE + ) + + self.assertEqual(blinded_payload, ALICE_NOTIFICATION_BLINDED_PAYLOAD) + + def test_get_payment_code_from_notification_tx(self): """Bob (the recipient) should be able to decode Alice's payment code from her notification tx""" tx = Transaction.from_string(ALICE_NOTIFICATION_TX_FOR_BOB) From d4c2ca36a41074de3f3c5ae22e02a3f5439692d3 Mon Sep 17 00:00:00 2001 From: kdmukai Date: Thu, 18 Aug 2022 09:01:41 -0500 Subject: [PATCH 06/11] Bugfix on get_notification_address, updates for test/regtest support Refactored blind/unblind to eliminate redundant code. --- src/embit/bip47.py | 89 ++++++++++++++++++++++------------------------ 1 file changed, 42 insertions(+), 47 deletions(-) diff --git a/src/embit/bip47.py b/src/embit/bip47.py index a82d827..db27a36 100644 --- a/src/embit/bip47.py +++ b/src/embit/bip47.py @@ -9,6 +9,7 @@ from . import base58, ec, script from .base import EmbitError from .bip32 import HDKey +from .networks import NETWORKS from .script import OPCODES from .transaction import Transaction if sys.implementation.name == "micropython": @@ -42,27 +43,28 @@ def get_payment_code(root: HDKey, coin: int = 0, account: int = 0) -> str: return base58.encode_check(b'\x47' + buf.getvalue()) -def get_derived_payment_code_node(payment_code: str, derivation_index: int) -> HDKey: +def get_derived_payment_code_node(payment_code: str, derivation_index: int, version: str = NETWORKS["main"]["xpub"]) -> HDKey: """Returns the nth derived child for the payment_code""" raw_payment_code = base58.decode_check(payment_code) - # 0x47 0x01 0x00 (sign) (32-byte pubkey) (32-byte chain code) (13 0x00 bytes) - pubkey = ec.PublicKey.from_xonly(raw_payment_code[4:36]) + + # 81-byte payment code format: + # 0x47 0x01 0x00 (sign) (32-byte pubkey) (32-byte chain code) (13 0x00 bytes) + pubkey = ec.PublicKey.from_string(hexlify(raw_payment_code[3:36])) chain_code = raw_payment_code[36:68] - root = HDKey(key=pubkey, chain_code=chain_code) - payment_code_node = root.derive([derivation_index]) - return payment_code_node + root = HDKey(key=pubkey, chain_code=chain_code, version=version) + return root.derive([derivation_index]) -def get_notification_address(payment_code: str, script_type: str = "p2pkh") -> str: +def get_notification_address(payment_code: str, script_type: str = "p2pkh", network: str = NETWORKS["main"]) -> str: """Returns the BIP-47 notification address associated with the given payment_code""" # Get the 0th public key derived from the payment_code - pubkey = get_derived_payment_code_node(payment_code, derivation_index=0).get_public_key() + pubkey = get_derived_payment_code_node(payment_code, derivation_index=0, version=network["xpub"]).get_public_key() # TODO: Should we limit to just p2pkh? if script_type == "p2pkh": - return script.p2pkh(pubkey).address() + return script.p2pkh(pubkey).address(network) elif script_type == "p2wpkh": - return script.p2wpkh(pubkey).address() + return script.p2wpkh(pubkey).address(network) else: raise EmbitError(f"Unsupported script_type: {script_type}") @@ -146,6 +148,27 @@ def get_receive_address(recipient_root: HDKey, payer_payment_code: str, index: i return (payment_address, spending_key) +def blinding_function(private_key: str, secret_point: HDKey, utxo_outpoint: str, payload: str): + """Reversible blind/unblind function: blinds plaintext payloads and unblinds blinded payloads""" + S = secret_point._xonly() + secp256k1.ec_pubkey_tweak_mul(S, private_key) + + # Calculate a 64 byte blinding factor (s = HMAC-SHA512(x, o)) + # "x" is the x value of the secret point + # "o" is the outpoint being spent by the designated input + x = secp256k1.ec_pubkey_serialize(S)[1:33] + o = utxo_outpoint + s = unhexlify(hmac.new(unhexlify(o), x, hashlib.sha512).hexdigest()) + + # Replace the x (pubkey) value with x' (x' = x XOR (first 32 bytes of s)) + # Replace the chain code with c' (c' = c XOR (last 32 bytes of s)) + # payment code: 0x01 0x00 (sign) (32-byte pubkey) (32-byte chain code) (13 0x00 bytes) + x_prime = b''.join([(a ^ b).to_bytes(1, byteorder='little') for (a,b) in zip(payload[3:35], s[:32])]) + c_prime = b''.join([(a ^ b).to_bytes(1, byteorder='little') for (a,b) in zip(payload[35:67], s[-32:])]) + return payload[0:3] + x_prime + c_prime + payload[-13:] + + + def get_blinded_payment_code(payer_payment_code: str, input_utxo_private_key: ec.PrivateKey, input_utxo_outpoint: str, recipient_payment_code: str): """Called by the payer, returns the blinded payload for the payer's notification tx that is sent to the recipient while spending the input_utxo. The blinded payload @@ -159,27 +182,13 @@ def get_blinded_payment_code(payer_payment_code: str, input_utxo_private_key: ec # Alice selects the public key associated with Bob's notification address (B, where B = bG) B = get_derived_payment_code_node(recipient_payment_code, 0).get_public_key() - # Alice calculates a secret point (S = aB) - S = B._xonly() - secp256k1.ec_pubkey_tweak_mul(S, a) - - # Alice calculates a 64 byte blinding factor (s = HMAC-SHA512(x, o)) - # "x" is the x value of the secret point - # "o" is the outpoint being spent by the designated input - x = secp256k1.ec_pubkey_serialize(S)[1:33] - o = input_utxo_outpoint - s = unhexlify(hmac.new(unhexlify(o), x, hashlib.sha512).hexdigest()) - # Alice serializes her payment code in binary form - raw_payment_code = base58.decode_check(payer_payment_code)[1:] # omit the 0x47 leading byte + payment_code = base58.decode_check(payer_payment_code)[1:] # omit the 0x47 leading byte + + # Blind the payment code + raw_blinded_payload = blinding_function(a, B, utxo_outpoint=input_utxo_outpoint, payload=payment_code) + return hexlify(raw_blinded_payload).decode() - # Alice renders her payment code (P) unreadable to anyone except Bob - # Replace the x (pubkey) value with x' (x' = x XOR (first 32 bytes of s)) - # Replace the chain code with c' (c' = c XOR (last 32 bytes of s)) - # payment code: 0x01 0x00 (sign) (32-byte pubkey) (32-byte chain code) (13 0x00 bytes) - x_prime = b''.join([(a ^ b).to_bytes(1, byteorder='little') for (a,b) in zip(raw_payment_code[3:35], s[:32])]) - c_prime = b''.join([(a ^ b).to_bytes(1, byteorder='little') for (a,b) in zip(raw_payment_code[35:67], s[-32:])]) - return hexlify(raw_payment_code[0:3] + x_prime + c_prime + raw_payment_code[-13:]).decode() def get_payment_code_from_notification_tx(tx: Transaction, recipient_root: HDKey, coin: int = 0, account: int = 0) -> str: @@ -235,26 +244,12 @@ def get_payment_code_from_notification_tx(tx: Transaction, recipient_root: HDKey recipient_notification_node = recipient_root.derive(f"m/47'/{coin}'/{account}'/0") b = recipient_notification_node.secret - # Bob calculates a secret point (S = bA) - S = A._xonly() - secp256k1.ec_pubkey_tweak_mul(S, b) + utxo_outpoint = vin.to_string()[:72] # TODO: Is there a better way to get the outpoint? - # Bob calculates the blinding factor (s = HMAC-SHA512(x, o)) - # "x" is the x value of the secret point - # "o" is the outpoint being spent by the designated input - x = secp256k1.ec_pubkey_serialize(S)[1:33] - o = vin.to_string()[:72] # TODO: Is there a better way to get the outpoint? - s = unhexlify(hmac.new(unhexlify(o), x, hashlib.sha512).hexdigest()) + # Unblind the payload using the reversible `blinding_function`. + raw_unblinded_payload = blinding_function(b, A, utxo_outpoint=utxo_outpoint, payload=payload) + return base58.encode_check(b'\x47' + raw_unblinded_payload) - # Bob interprets the 80 byte payload as a payment code, except: - # Replace the x (pubkey) value with x' (x' = x XOR (first 32 bytes of s)) - # Replace the chain code with c' (c' = c XOR (last 32 bytes of s)) - # payment code: 0x01 0x00 (sign) (32-byte pubkey) (32-byte chain code) (13 0x00 bytes) - x_prime = b''.join([(a ^ b).to_bytes(1, byteorder='little') for (a,b) in zip(payload[3:35], s[:32])]) - c_prime = b''.join([(a ^ b).to_bytes(1, byteorder='little') for (a,b) in zip(payload[35:67], s[-32:])]) - raw_payment_code = payload[0:3] + x_prime + c_prime + payload[-13:] - return base58.encode_check(b'\x47' + raw_payment_code) - """ TODO: Method to create notification transaction, etc. From 18a6ae0926641f6053e2948349dd98032d3bf01a Mon Sep 17 00:00:00 2001 From: kdmukai Date: Fri, 19 Aug 2022 07:33:12 -0500 Subject: [PATCH 07/11] More support for testnet/regtest --- src/embit/bip47.py | 32 +++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/src/embit/bip47.py b/src/embit/bip47.py index db27a36..d210be8 100644 --- a/src/embit/bip47.py +++ b/src/embit/bip47.py @@ -69,14 +69,14 @@ def get_notification_address(payment_code: str, script_type: str = "p2pkh", netw raise EmbitError(f"Unsupported script_type: {script_type}") -def get_payment_address(payer_root: HDKey, recipient_payment_code: str, index: int, coin: int = 0, account: int = 0, script_type: str = "p2pkh") -> str: +def get_payment_address(payer_root: HDKey, recipient_payment_code: str, index: int, coin: int = 0, account: int = 0, network=NETWORKS["main"], script_type: str = "p2wpkh") -> str: """Called by the payer, generates the nth payment address between the payer and recipient""" # Alice selects the 0th private key derived from her payment code ("a") payer_key = payer_root.derive(f"m/47'/{coin}'/{account}'/0") a = payer_key.secret # Alice selects the next unused public key derived from Bob's payment code, starting from zero ("B", where B = bG) - recipient_payment_code_node = get_derived_payment_code_node(recipient_payment_code, index) + recipient_payment_code_node = get_derived_payment_code_node(recipient_payment_code, derivation_index=index, version=network["xpub"]) B = recipient_payment_code_node.get_public_key() # Alice calculates a secret point (S = aB) @@ -97,20 +97,22 @@ def get_payment_address(payer_root: HDKey, recipient_payment_code: str, index: i shared_node = HDKey(key=ec.PublicKey.parse(secp256k1.ec_pubkey_serialize(pub)), chain_code=recipient_payment_code_node.chain_code) if script_type == "p2pkh": - return script.p2pkh(shared_node).address() + return script.p2pkh(shared_node).address(network=network) elif script_type == "p2wpkh": - return script.p2wpkh(shared_node).address() + return script.p2wpkh(shared_node).address(network=network) + elif script_type == "p2sh-p2wpkh": + return script.p2sh(script.p2wpkh(shared_node)).address(network=network) else: raise EmbitError(f"Unsupported script_type: {script_type}") -def get_receive_address(recipient_root: HDKey, payer_payment_code: str, index: int, coin: int = 0, account: int = 0, script_type: str = "p2pkh") -> Tuple[str, ec.PrivateKey]: +def get_receive_address(recipient_root: HDKey, payer_payment_code: str, index: int, coin: int = 0, account: int = 0, network=NETWORKS["main"], script_type: str = "p2wpkh") -> Tuple[str, ec.PrivateKey]: """Called by the recipient, generates the nth receive address between the payer and recipient. Returns the payment address and its associated private key.""" # Using the 0th public key derived from Alice's payment code... - payer_payment_code_node = get_derived_payment_code_node(payer_payment_code, 0) + payer_payment_code_node = get_derived_payment_code_node(payer_payment_code, derivation_index=0, version=network["xpub"]) B = payer_payment_code_node.get_public_key() # ...Bob calculates the nth shared secret with Alice @@ -135,9 +137,11 @@ def get_receive_address(recipient_root: HDKey, payer_payment_code: str, index: i shared_node = HDKey(key=ec.PublicKey.parse(secp256k1.ec_pubkey_serialize(pub)), chain_code=payer_payment_code_node.chain_code) if script_type == "p2pkh": - payment_address = script.p2pkh(shared_node).address() + receive_address = script.p2pkh(shared_node).address(network=network) elif script_type == "p2wpkh": - payment_address = script.p2wpkh(shared_node).address() + receive_address = script.p2wpkh(shared_node).address(network=network) + elif script_type == "p2sh-p2wpkh": + receive_address = script.p2sh(script.p2wpkh(shared_node)).address(network=network) else: raise EmbitError(f"Unsupported script_type: {script_type}") @@ -145,7 +149,7 @@ def get_receive_address(recipient_root: HDKey, payer_payment_code: str, index: i prv_key = secp256k1.ec_privkey_add(recipient_key.secret, shared_secret) spending_key = ec.PrivateKey(secret=prv_key) - return (payment_address, spending_key) + return (receive_address, spending_key) def blinding_function(private_key: str, secret_point: HDKey, utxo_outpoint: str, payload: str): @@ -168,8 +172,7 @@ def blinding_function(private_key: str, secret_point: HDKey, utxo_outpoint: str, return payload[0:3] + x_prime + c_prime + payload[-13:] - -def get_blinded_payment_code(payer_payment_code: str, input_utxo_private_key: ec.PrivateKey, input_utxo_outpoint: str, recipient_payment_code: str): +def get_blinded_payment_code(payer_payment_code: str, input_utxo_private_key: ec.PrivateKey, input_utxo_outpoint: str, recipient_payment_code: str, network=NETWORKS["main"]): """Called by the payer, returns the blinded payload for the payer's notification tx that is sent to the recipient while spending the input_utxo. The blinded payload should be inserted as OP_RETURN data.""" @@ -180,7 +183,7 @@ def get_blinded_payment_code(payer_payment_code: str, input_utxo_private_key: ec a = input_utxo_private_key.secret # Alice selects the public key associated with Bob's notification address (B, where B = bG) - B = get_derived_payment_code_node(recipient_payment_code, 0).get_public_key() + B = get_derived_payment_code_node(recipient_payment_code, derivation_index=0, version=network["xpub"]).get_public_key() # Alice serializes her payment code in binary form payment_code = base58.decode_check(payer_payment_code)[1:] # omit the 0x47 leading byte @@ -190,8 +193,7 @@ def get_blinded_payment_code(payer_payment_code: str, input_utxo_private_key: ec return hexlify(raw_blinded_payload).decode() - -def get_payment_code_from_notification_tx(tx: Transaction, recipient_root: HDKey, coin: int = 0, account: int = 0) -> str: +def get_payment_code_from_notification_tx(tx: Transaction, recipient_root: HDKey, coin: int = 0, account: int = 0, network=NETWORKS["main"]) -> str: """If the tx is a BIP-47 notification tx for the recipient, return the new payer's embedded payment_code, else None""" # Notification txs have one output sent to the recipient's notification addr @@ -205,7 +207,7 @@ def get_payment_code_from_notification_tx(tx: Transaction, recipient_root: HDKey payload = None for vout in tx.vout: # Notification txs include a dust payment to the recipient's notification address - if vout.script_pubkey.script_type() is not None and vout.script_pubkey.address() == get_notification_address(recipient_payment_code): + if vout.script_pubkey.script_type() is not None and vout.script_pubkey.address() == get_notification_address(recipient_payment_code, network=network): matches_notification_addr = True continue From 9713d14863f080c4d1fc98486ad99d16e3876fae Mon Sep 17 00:00:00 2001 From: kdmukai Date: Tue, 30 Aug 2022 13:48:44 -0500 Subject: [PATCH 08/11] Fixes from Stepan's comments; fixed & expanded test data/test cases --- src/embit/bip47.py | 28 +++--- tests/tests/test_bip47.py | 197 ++++++++++++++++++++++++++++++++------ 2 files changed, 181 insertions(+), 44 deletions(-) diff --git a/src/embit/bip47.py b/src/embit/bip47.py index d210be8..7c03e11 100644 --- a/src/embit/bip47.py +++ b/src/embit/bip47.py @@ -31,7 +31,7 @@ def get_payment_code(root: HDKey, coin: int = 0, account: int = 0) -> str: Generates the recipient's BIP-47 shareable payment code (version 1) for the input root private key. """ - bip47_child = root.derive(f"m/47'/{coin}'/{account}'") + bip47_child = root.derive("m/47'/{}'/{}'".format(coin, account)) buf = BytesIO() buf.write(b'\x01') # bip47 version @@ -43,7 +43,7 @@ def get_payment_code(root: HDKey, coin: int = 0, account: int = 0) -> str: return base58.encode_check(b'\x47' + buf.getvalue()) -def get_derived_payment_code_node(payment_code: str, derivation_index: int, version: str = NETWORKS["main"]["xpub"]) -> HDKey: +def get_derived_payment_code_node(payment_code: str, derivation_index: int, version: bytes = NETWORKS["main"]["xpub"]) -> HDKey: """Returns the nth derived child for the payment_code""" raw_payment_code = base58.decode_check(payment_code) @@ -66,13 +66,13 @@ def get_notification_address(payment_code: str, script_type: str = "p2pkh", netw elif script_type == "p2wpkh": return script.p2wpkh(pubkey).address(network) else: - raise EmbitError(f"Unsupported script_type: {script_type}") + raise EmbitError("Unsupported script_type: " + script_type) -def get_payment_address(payer_root: HDKey, recipient_payment_code: str, index: int, coin: int = 0, account: int = 0, network=NETWORKS["main"], script_type: str = "p2wpkh") -> str: +def get_payment_address(payer_root: HDKey, recipient_payment_code: str, index: int, coin: int = 0, account: int = 0, network: dict = NETWORKS["main"], script_type: str = "p2wpkh") -> str: """Called by the payer, generates the nth payment address between the payer and recipient""" # Alice selects the 0th private key derived from her payment code ("a") - payer_key = payer_root.derive(f"m/47'/{coin}'/{account}'/0") + payer_key = payer_root.derive("m/47'/{}'/{}'/0".format(coin, account)) a = payer_key.secret # Alice selects the next unused public key derived from Bob's payment code, starting from zero ("B", where B = bG) @@ -89,7 +89,7 @@ def get_payment_address(payer_root: HDKey, recipient_payment_code: str, index: i # If the value of s is not in the secp256k1 group, Alice MUST increment the index used to derive Bob's public key and try again. if not secp256k1.ec_seckey_verify(shared_secret): # TODO: Is this a sufficient test??? - raise BIP47Exception(f"Shared secret was not valid for index {index}. Try again with the next index value.") + raise BIP47Exception("Shared secret was not valid for index {}. Try again with the next index value.".format(index)) # Alice uses the scalar shared secret to calculate the ephemeral public key used to generate the P2PKH address for this transaction (B' = B + sG) shared_pubkey = secp256k1.ec_pubkey_create(shared_secret) @@ -103,10 +103,10 @@ def get_payment_address(payer_root: HDKey, recipient_payment_code: str, index: i elif script_type == "p2sh-p2wpkh": return script.p2sh(script.p2wpkh(shared_node)).address(network=network) else: - raise EmbitError(f"Unsupported script_type: {script_type}") + raise EmbitError("Unsupported script_type: " + script_type) -def get_receive_address(recipient_root: HDKey, payer_payment_code: str, index: int, coin: int = 0, account: int = 0, network=NETWORKS["main"], script_type: str = "p2wpkh") -> Tuple[str, ec.PrivateKey]: +def get_receive_address(recipient_root: HDKey, payer_payment_code: str, index: int, coin: int = 0, account: int = 0, network: dict = NETWORKS["main"], script_type: str = "p2wpkh") -> Tuple[str, ec.PrivateKey]: """Called by the recipient, generates the nth receive address between the payer and recipient. Returns the payment address and its associated private key.""" @@ -116,7 +116,7 @@ def get_receive_address(recipient_root: HDKey, payer_payment_code: str, index: i B = payer_payment_code_node.get_public_key() # ...Bob calculates the nth shared secret with Alice - recipient_key = recipient_root.derive(f"m/47'/{coin}'/{account}'/{index}") + recipient_key = recipient_root.derive("m/47'/{}'/{}'/{}".format(coin, account, index)) a = recipient_key.secret # Bob calculates a secret point (S = aB) @@ -129,7 +129,7 @@ def get_receive_address(recipient_root: HDKey, payer_payment_code: str, index: i # If the value of s is not in the secp256k1 group, increment the index and try again. if not secp256k1.ec_seckey_verify(shared_secret): # TODO: Is this a sufficient test??? - raise BIP47Exception(f"Shared secret was not valid for index {index}. Try again with the next index value.") + raise BIP47Exception("Shared secret was not valid for index {}. Try again with the next index value.".format(index)) # Bob uses the scalar shared secret to calculate the ephemeral public key used to generate the P2PKH address for this transaction (B' = B + sG) shared_pubkey = secp256k1.ec_pubkey_create(shared_secret) @@ -143,7 +143,7 @@ def get_receive_address(recipient_root: HDKey, payer_payment_code: str, index: i elif script_type == "p2sh-p2wpkh": receive_address = script.p2sh(script.p2wpkh(shared_node)).address(network=network) else: - raise EmbitError(f"Unsupported script_type: {script_type}") + raise EmbitError("Unsupported script_type: " + script_type) # Bob calculates the private key for each ephemeral address as: b' = b + s prv_key = secp256k1.ec_privkey_add(recipient_key.secret, shared_secret) @@ -172,7 +172,7 @@ def blinding_function(private_key: str, secret_point: HDKey, utxo_outpoint: str, return payload[0:3] + x_prime + c_prime + payload[-13:] -def get_blinded_payment_code(payer_payment_code: str, input_utxo_private_key: ec.PrivateKey, input_utxo_outpoint: str, recipient_payment_code: str, network=NETWORKS["main"]): +def get_blinded_payment_code(payer_payment_code: str, input_utxo_private_key: ec.PrivateKey, input_utxo_outpoint: str, recipient_payment_code: str, network: dict = NETWORKS["main"]): """Called by the payer, returns the blinded payload for the payer's notification tx that is sent to the recipient while spending the input_utxo. The blinded payload should be inserted as OP_RETURN data.""" @@ -193,7 +193,7 @@ def get_blinded_payment_code(payer_payment_code: str, input_utxo_private_key: ec return hexlify(raw_blinded_payload).decode() -def get_payment_code_from_notification_tx(tx: Transaction, recipient_root: HDKey, coin: int = 0, account: int = 0, network=NETWORKS["main"]) -> str: +def get_payment_code_from_notification_tx(tx: Transaction, recipient_root: HDKey, coin: int = 0, account: int = 0, network: dict = NETWORKS["main"]) -> str: """If the tx is a BIP-47 notification tx for the recipient, return the new payer's embedded payment_code, else None""" # Notification txs have one output sent to the recipient's notification addr @@ -243,7 +243,7 @@ def get_payment_code_from_notification_tx(tx: Transaction, recipient_root: HDKey return None # Bob selects the private key associated with his notification address (0th child) - recipient_notification_node = recipient_root.derive(f"m/47'/{coin}'/{account}'/0") + recipient_notification_node = recipient_root.derive("m/47'/{}'/{}'/0".format(coin, account)) b = recipient_notification_node.secret utxo_outpoint = vin.to_string()[:72] # TODO: Is there a better way to get the outpoint? diff --git a/tests/tests/test_bip47.py b/tests/tests/test_bip47.py index 1d52467..df0e151 100644 --- a/tests/tests/test_bip47.py +++ b/tests/tests/test_bip47.py @@ -1,6 +1,7 @@ from unittest import TestCase from embit import bip32, bip39, bip47, script, ec +from embit.networks import NETWORKS from embit.transaction import Transaction from binascii import hexlify, unhexlify @@ -10,26 +11,115 @@ """ ALICE_MNEMONIC = "response seminar brave tip suit recall often sound stick owner lottery motion" ALICE_PAYMENT_CODE = "PM8TJTLJbPRGxSbc8EJi42Wrr6QbNSaSSVJ5Y3E4pbCYiTHUskHg13935Ubb7q8tx9GVbh2UuRnBc3WSyJHhUrw8KhprKnn9eDznYGieTzFcwQRya4GA" +ALICE_PAYMENT_CODE_REGTEST = "PM8TJcUtZbTqYoGWcNAnaYDkAzA1cLq6gQV4aPJ3N5jydgmTHUr5UFK74CU58mdL6V8pVo3JJ8JsJFJzriZSGMj27ujJ3jxwFUQwi49ox3Cfai4SG5rk" ALICE_NOTIFICATION_ADDR = "1JDdmqFLhpzcUwPeinhJbUPw4Co3aWLyzW" +ALICE_NOTIFICATION_ADDR_REGTEST = "mod1FsW4dsVRod4ZVRR3D3ovY97SxSjJwk" ALICE_NOTIFICATION_INPUT_PRIVATE_KEY = "Kx983SRhAZpAhj7Aac1wUXMJ6XZeyJKqCxJJ49dxEbYCT4a1ozRD" ALICE_NOTIFICATION_INPUT_OUTPOINT = "86f411ab1c8e70ae8a0795ab7a6757aea6e4d5ae1826fc7b8f00c597d500609c01000000" ALICE_NOTIFICATION_BLINDED_PAYLOAD = "010002063e4eb95e62791b06c50e1a3a942e1ecaaa9afbbeb324d16ae6821e091611fa96c0cf048f607fe51a0327f5e2528979311c78cb2de0d682c61e1180fc3d543b00000000000000000000000000" -ALICE_PAYS_BOB_ADDRS = [ - "141fi7TY3h936vRUKh1qfUZr8rSBuYbVBK", - "12u3Uued2fuko2nY4SoSFGCoGLCBUGPkk6", - "1FsBVhT5dQutGwaPePTYMe5qvYqqjxyftc", - "1CZAmrbKL6fJ7wUxb99aETwXhcGeG3CpeA", - "1KQvRShk6NqPfpr4Ehd53XUhpemBXtJPTL", - "1KsLV2F47JAe6f8RtwzfqhjVa8mZEnTM7t", - "1DdK9TknVwvBrJe7urqFmaxEtGF2TMWxzD", - "16DpovNuhQJH7JUSZQFLBQgQYS4QB9Wy8e", - "17qK2RPGZMDcci2BLQ6Ry2PDGJErrNojT5", - "1GxfdfP286uE24qLZ9YRP3EWk2urqXgC4s", -] + +""" + Mainnet p2pkh from BIP-47 test vectors, remaining addrs generated from: + https://bitcoiner.guide/seed +""" +ALICE_PAYS_BOB_ADDRS = { + "main": { + "p2wpkh": [ + "bc1qyyytpxv60e6hwh5jqkj2dcenckdsw6ekn2htfq", + "bc1qzn8a8drxv6ln7rztjsw660gzf3hnrfwupzmsfh", + "bc1q5v84r4dq2vkdku8h7ewfkj6c00eh20gmf0amr5", + "bc1q06ld55yrxrqdfym235h0jvdddvwc72ktsamh7c", + "bc1qe8uxekd8s59szxgnnfd2nxrn3ncnkmxlku83l9", + "bc1qemm4xmwr0fxwysry5mur0r5q5kakkw79fpezx0", + "bc1q3fl6rfkg4f600tlfrtkn6jv6kndg9tfu3hr009", + "bc1q89zc0ptgrcgsrzkfe4fjrlwcwfvny908976vxh", + "bc1qfteug4efvdlhyek9p9mrgwk0kqsq74y8jm5qw7", + "bc1q4ugsxkh69aknjvskm8k2susv9c6dq0pp3476y0" + ], + "p2pkh": [ + "141fi7TY3h936vRUKh1qfUZr8rSBuYbVBK", + "12u3Uued2fuko2nY4SoSFGCoGLCBUGPkk6", + "1FsBVhT5dQutGwaPePTYMe5qvYqqjxyftc", + "1CZAmrbKL6fJ7wUxb99aETwXhcGeG3CpeA", + "1KQvRShk6NqPfpr4Ehd53XUhpemBXtJPTL", + "1KsLV2F47JAe6f8RtwzfqhjVa8mZEnTM7t", + "1DdK9TknVwvBrJe7urqFmaxEtGF2TMWxzD", + "16DpovNuhQJH7JUSZQFLBQgQYS4QB9Wy8e", + "17qK2RPGZMDcci2BLQ6Ry2PDGJErrNojT5", + "1GxfdfP286uE24qLZ9YRP3EWk2urqXgC4s", + ], + "p2sh-p2wpkh": [ + "3QnEFKkpXFYSipn4uqcMNAKWhZq6PD4Gmz", + "38mr84Lrer3j1pTEZpTJ1pQTQJweMcc4YC", + "37Q2nDn2MGPLR2eCSRRnx3EZUv1bgNJpH3", + "38KnaMF7yiGnuUxDuM5AYoU7biYaGEfaRg", + "38A9WgnPYfNwDbovo12sSGF4E8Kq67qHvc", + "3A41gu3kgtqPpiWQwp5fY5VVS5WNgT11nN", + "33prMnukiGDj4vdwD7r3WV7fDuWxWAFEh5", + "38qRxEnED8hMVqQMywJydEmK595gBXi6yQ", + "3QH8LrqkkTnLNcaq5dsBzcj5LCoo5U8pEz", + "3ALkcRwUk1QhkZhcG7t9ooAAu7o12MGQr7", + ], + }, + "test": { + "p2wpkh": [ + "tb1qvlcks6jystdc984whpcqwm0ftuwvk888w3phmk", + "tb1qrzn3xca8ll4v6j65956ywslwzn7mu8d2t00xqa", + "tb1qwynwpawd5t3twd7yepk8v8wz4cewtel5z88tn5", + "tb1q3a6ltk6pycyy4ds5lt67whrglude5l85l0ru43", + "tb1qx630tnvjdx98r6cukv905ltn6ndtyr9zmvdp4l", + "tb1quqe2w3jz334gyadtns0gjzn535dsy8jlrkmdgt", + "tb1q4353mvaglaflcxk65u3t579pn4lgle5vldvpty", + "tb1qj5j0xrujhh0fns4q353q8uf970d6dp5xnacq6a", + "tb1qkqrwshzah97q5dmfz7pr7vfk4saulqsyvnyrth", + "tb1qwvsls5tydc7pw0f0r93ypexcyfundyve5z5t25", + ], + "p2pkh": [ + "mpzZ68EWiTQ3kb8dLoJ8YYd5yH8YaTHYvR", + "mhmJfHR5YJosP3CWm9fYrv1Sm5CJrTZ9Yd", + "mqqFE5EZGwSxAdfGR9NM4BskZbdyjN1h1g", + "mtbWEWuYrhaxZPposbVPBtgsZoEn1ZiczJ", + "mkVr15pHUVEnwi9L9cc61K8A5zcMbv2cox", + "n1xQLV627ei3exa7jXedAFEw2DcdgQSDoS", + "mwEaSdxyqPvTpvytzWkSqcPQBiX8Qmh9hB", + "mu7ZEvtjwqu8kwqkjFqZqi1KEVS3nvfVEu", + "mwZhVmYNhZLfsM7YnLCCABnatS27CtyYyD", + "mr1ihPdNQYqyAxWcV6MEZgwgzW1bmfUyXH", + ], + "p2sh-p2wpkh": [ + "2MxeHXEAqnX45Lc5pW5WJepo1PbaBiGVa4i", + "2N65jANRymkKniZXtdmud2ycLdbC9yPheU3", + "2NE4CDQbkbzJ6HAuYgStwHAhK2WhPQu4yqq", + "2N8f9kvkGoJ1oZdn63pUpewgzmwX9S1sY7K", + "2NB69mKr5v8b4AsgZZrTU5yMEu56yVCRgQQ", + "2NCmmAswkz6nL7Td7KUx9paMLNC1dHyDHhs", + "2Mwp4ufTEoqSgWuGvhp528EwQHT7StY5pDc", + "2NGM11Heusc71BpztycAgmG1yBgXy3D5WKe", + "2MwBYsrDuv4B64otzC5EampPJRmQyZaJQKo", + "2N12k1GuMrywSFGe1wBDk1v2j8eiw7SP77P", + ], + }, + "regtest": { + "p2wpkh": [ + "bcrt1qvlcks6jystdc984whpcqwm0ftuwvk888vcc6vl", + "bcrt1qrzn3xca8ll4v6j65956ywslwzn7mu8d2fxkth5", + "bcrt1qwynwpawd5t3twd7yepk8v8wz4cewtel5qw7xya", + "bcrt1q3a6ltk6pycyy4ds5lt67whrglude5l85ax63zc", + "bcrt1qx630tnvjdx98r6cukv905ltn6ndtyr9ze95vzk", + "bcrt1quqe2w3jz334gyadtns0gjzn535dsy8jlplzqlz", + "bcrt1q4353mvaglaflcxk65u3t579pn4lgle5vay4vud", + "bcrt1qj5j0xrujhh0fns4q353q8uf970d6dp5x35pdd5", + "bcrt1qkqrwshzah97q5dmfz7pr7vfk4saulqsyw6awu7", + "bcrt1qwvsls5tydc7pw0f0r93ypexcyfundyvektdxaa", + ], + }, +} BOB_MNEMONIC = "reward upper indicate eight swift arch injury crystal super wrestle already dentist" BOB_PAYMENT_CODE = "PM8TJS2JxQ5ztXUpBBRnpTbcUXbUHy2T1abfrb3KkAAtMEGNbey4oumH7Hc578WgQJhPjBxteQ5GHHToTYHE3A1w6p7tU6KSoFmWBVbFGjKPisZDbP97" +BOB_PAYMENT_CODE_REGTEST = "PM8TJMJnBXShCFdcGRaGiCrhcCXczikNSyXJeAES6ciFMBv9jNY3ZwEc8fSV8DLmNRqnP9RPP1NPDxUf6vBoUnohPt5bwFFpTvosRw7gV2W4Tr34MULo" BOB_NOTIFICATION_ADDR = "1ChvUUvht2hUQufHBXF8NgLhW8SwE2ecGV" +BOB_NOTIFICATION_ADDR_REGTEST = "mrVYeCNDyrzYwUuZNMWTFL76wdQ3mfXYHL" ALICE_NOTIFICATION_TX_FOR_BOB = "010000000186f411ab1c8e70ae8a0795ab7a6757aea6e4d5ae1826fc7b8f00c597d500609c010000006b483045022100ac8c6dbc482c79e86c18928a8b364923c774bfdbd852059f6b3778f2319b59a7022029d7cc5724e2f41ab1fcfc0ba5a0d4f57ca76f72f19530ba97c860c70a6bf0a801210272d83d8a1fa323feab1c085157a0791b46eba34afb8bfbfaeb3a3fcc3f2c9ad8ffffffff0210270000000000001976a9148066a8e7ee82e5c5b9b7dc1765038340dc5420a988ac1027000000000000536a4c50010002063e4eb95e62791b06c50e1a3a942e1ecaaa9afbbeb324d16ae6821e091611fa96c0cf048f607fe51a0327f5e2528979311c78cb2de0d682c61e1180fc3d543b0000000000000000000000000000000000" @@ -51,42 +141,89 @@ def test_get_payment_code(self): self.assertEqual(payment_code, BOB_PAYMENT_CODE) + def test_get_payment_code_regtest(self): + """Regtest payment codes are different from mainnet and should be properly generated""" + seed_bytes = bip39.mnemonic_to_seed(ALICE_MNEMONIC) + root = bip32.HDKey.from_seed(seed_bytes) + + # coin=1 for test/regtest, per BIP-44 + payment_code = bip47.get_payment_code(root, coin=1) + self.assertEqual(payment_code, ALICE_PAYMENT_CODE_REGTEST) + + seed_bytes = bip39.mnemonic_to_seed(BOB_MNEMONIC) + root = bip32.HDKey.from_seed(seed_bytes) + payment_code = bip47.get_payment_code(root, coin=1) + self.assertEqual(payment_code, BOB_PAYMENT_CODE_REGTEST) + + def test_get_notification_address(self): """Alice & Bob's derived notification addresses should match the test vectors in BIP-47""" self.assertEqual(bip47.get_notification_address(ALICE_PAYMENT_CODE), ALICE_NOTIFICATION_ADDR) self.assertEqual(bip47.get_notification_address(BOB_PAYMENT_CODE), BOB_NOTIFICATION_ADDR) + + + def test_get_notification_address_regtest(self): + """Regtest notification addresses are different from mainnet and should be properly generated""" + self.assertEqual(bip47.get_notification_address(ALICE_PAYMENT_CODE_REGTEST, network=NETWORKS["regtest"]), ALICE_NOTIFICATION_ADDR_REGTEST) + self.assertEqual(bip47.get_notification_address(BOB_PAYMENT_CODE_REGTEST, network=NETWORKS["regtest"]), BOB_NOTIFICATION_ADDR_REGTEST) def test_get_payment_address(self): - """Alice's payment addresses to Bob's payment code should match the test vector addresses in BIP-47""" + """ Alice's payment addresses to Bob's payment code should match the test vector + addresses in BIP-47 and additional ones generated by Seed Tool. """ seed_bytes = bip39.mnemonic_to_seed(ALICE_MNEMONIC) payer_root = bip32.HDKey.from_seed(seed_bytes) - for i, addr in enumerate(ALICE_PAYS_BOB_ADDRS): - payment_addr = bip47.get_payment_address( - payer_root=payer_root, - recipient_payment_code=BOB_PAYMENT_CODE, - index=i) - self.assertEqual(addr, payment_addr) + # Test against all the networks and script types for Alice pays Bob + for network, addrs_dict in ALICE_PAYS_BOB_ADDRS.items(): + if network == "main": + coin = 0 + recipient_payment_code = BOB_PAYMENT_CODE + else: + coin = 1 # for test/regtest, per BIP-44 + recipient_payment_code = BOB_PAYMENT_CODE_REGTEST + for script_type, addrs in addrs_dict.items(): + for i, addr in enumerate(addrs): + payment_addr = bip47.get_payment_address( + payer_root=payer_root, + recipient_payment_code=recipient_payment_code, + coin=coin, + index=i, + network=NETWORKS[network], + script_type=script_type, + ) + self.assertEqual(addr, payment_addr) def test_get_receive_address(self): - """Bob (the recipient) should be able to use Alice's payment code to generate the same addresses that Alice (the payer) generated""" + """ Bob (the recipient) should be able to use Alice's payment code to generate the + same addresses that Alice (the payer) generated. """ + # Test against all the networks and script types for A pays B seed_bytes = bip39.mnemonic_to_seed(BOB_MNEMONIC) recipient_root = bip32.HDKey.from_seed(seed_bytes) - for i, addr in enumerate(ALICE_PAYS_BOB_ADDRS): - payment_addr, spending_key = bip47.get_receive_address( - recipient_root=recipient_root, - payer_payment_code=ALICE_PAYMENT_CODE, - index=i) - self.assertEqual(addr, payment_addr) - self.assertEqual(addr, script.p2pkh(spending_key.get_public_key()).address()) - - # TODO: Verify that the spending_keys can successfully sign a tx for their associated payment_addr. + for network, addr_dict in ALICE_PAYS_BOB_ADDRS.items(): + if network == "main": + coin = 0 + payer_payment_code = ALICE_PAYMENT_CODE + else: + coin = 1 # for test/regtest, per BIP-44 + payer_payment_code = ALICE_PAYMENT_CODE_REGTEST + for script_type, addrs in addr_dict.items(): + for i, addr in enumerate(addrs): + payment_addr, spending_key = bip47.get_receive_address( + recipient_root=recipient_root, + payer_payment_code=payer_payment_code, + coin=coin, + index=i, + network=NETWORKS[network], + script_type=script_type, + ) + self.assertEqual(addr, payment_addr) def test_get_blinded_payment_code(self): + """Alice should be able to blind her payment code for Bob to unblind""" input_utxo_private_key = ec.PrivateKey.from_string(ALICE_NOTIFICATION_INPUT_PRIVATE_KEY) blinded_payload = bip47.get_blinded_payment_code( payer_payment_code=ALICE_PAYMENT_CODE, @@ -110,4 +247,4 @@ def test_get_payment_code_from_notification_tx(self): # Any other root should fail seed_bytes = bip39.mnemonic_to_seed("abandon " * 11 + "about") other_root = bip32.HDKey.from_seed(seed_bytes) - self.assertEqual(bip47.get_payment_code_from_notification_tx(tx, other_root), None) + self.assertTrue(bip47.get_payment_code_from_notification_tx(tx, other_root) is None) From 5c22ecd3f78efff89c4dc593c33b1c19d14b08e4 Mon Sep 17 00:00:00 2001 From: kdmukai Date: Sat, 3 Sep 2022 07:33:45 -0500 Subject: [PATCH 09/11] testnet/regtest bugfix Note: regtest blind/unblind still has a bug I'm trying to work through. But as far as I can tell, mainnet works as expected, as confirmed via the test cases. --- src/embit/bip47.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/embit/bip47.py b/src/embit/bip47.py index 7c03e11..afa8b4e 100644 --- a/src/embit/bip47.py +++ b/src/embit/bip47.py @@ -207,7 +207,7 @@ def get_payment_code_from_notification_tx(tx: Transaction, recipient_root: HDKey payload = None for vout in tx.vout: # Notification txs include a dust payment to the recipient's notification address - if vout.script_pubkey.script_type() is not None and vout.script_pubkey.address() == get_notification_address(recipient_payment_code, network=network): + if vout.script_pubkey.script_type() is not None and vout.script_pubkey.address(network=network) == get_notification_address(recipient_payment_code, network=network): matches_notification_addr = True continue From da21ce371767d93396d0ce69587cada9a358f268 Mon Sep 17 00:00:00 2001 From: kdmukai Date: Sun, 4 Sep 2022 12:44:27 -0500 Subject: [PATCH 10/11] Bugfixes, simplifications --- src/embit/bip47.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/embit/bip47.py b/src/embit/bip47.py index afa8b4e..5dc4fd5 100644 --- a/src/embit/bip47.py +++ b/src/embit/bip47.py @@ -43,7 +43,7 @@ def get_payment_code(root: HDKey, coin: int = 0, account: int = 0) -> str: return base58.encode_check(b'\x47' + buf.getvalue()) -def get_derived_payment_code_node(payment_code: str, derivation_index: int, version: bytes = NETWORKS["main"]["xpub"]) -> HDKey: +def get_derived_payment_code_node(payment_code: str, derivation_index: int) -> HDKey: """Returns the nth derived child for the payment_code""" raw_payment_code = base58.decode_check(payment_code) @@ -51,14 +51,14 @@ def get_derived_payment_code_node(payment_code: str, derivation_index: int, vers # 0x47 0x01 0x00 (sign) (32-byte pubkey) (32-byte chain code) (13 0x00 bytes) pubkey = ec.PublicKey.from_string(hexlify(raw_payment_code[3:36])) chain_code = raw_payment_code[36:68] - root = HDKey(key=pubkey, chain_code=chain_code, version=version) + root = HDKey(key=pubkey, chain_code=chain_code) return root.derive([derivation_index]) def get_notification_address(payment_code: str, script_type: str = "p2pkh", network: str = NETWORKS["main"]) -> str: """Returns the BIP-47 notification address associated with the given payment_code""" # Get the 0th public key derived from the payment_code - pubkey = get_derived_payment_code_node(payment_code, derivation_index=0, version=network["xpub"]).get_public_key() + pubkey = get_derived_payment_code_node(payment_code, derivation_index=0).get_public_key() # TODO: Should we limit to just p2pkh? if script_type == "p2pkh": @@ -76,7 +76,7 @@ def get_payment_address(payer_root: HDKey, recipient_payment_code: str, index: i a = payer_key.secret # Alice selects the next unused public key derived from Bob's payment code, starting from zero ("B", where B = bG) - recipient_payment_code_node = get_derived_payment_code_node(recipient_payment_code, derivation_index=index, version=network["xpub"]) + recipient_payment_code_node = get_derived_payment_code_node(recipient_payment_code, derivation_index=index) B = recipient_payment_code_node.get_public_key() # Alice calculates a secret point (S = aB) @@ -112,7 +112,7 @@ def get_receive_address(recipient_root: HDKey, payer_payment_code: str, index: i Returns the payment address and its associated private key.""" # Using the 0th public key derived from Alice's payment code... - payer_payment_code_node = get_derived_payment_code_node(payer_payment_code, derivation_index=0, version=network["xpub"]) + payer_payment_code_node = get_derived_payment_code_node(payer_payment_code, derivation_index=0) B = payer_payment_code_node.get_public_key() # ...Bob calculates the nth shared secret with Alice @@ -172,7 +172,7 @@ def blinding_function(private_key: str, secret_point: HDKey, utxo_outpoint: str, return payload[0:3] + x_prime + c_prime + payload[-13:] -def get_blinded_payment_code(payer_payment_code: str, input_utxo_private_key: ec.PrivateKey, input_utxo_outpoint: str, recipient_payment_code: str, network: dict = NETWORKS["main"]): +def get_blinded_payment_code(payer_payment_code: str, input_utxo_private_key: ec.PrivateKey, input_utxo_outpoint: str, recipient_payment_code: str): """Called by the payer, returns the blinded payload for the payer's notification tx that is sent to the recipient while spending the input_utxo. The blinded payload should be inserted as OP_RETURN data.""" @@ -183,13 +183,13 @@ def get_blinded_payment_code(payer_payment_code: str, input_utxo_private_key: ec a = input_utxo_private_key.secret # Alice selects the public key associated with Bob's notification address (B, where B = bG) - B = get_derived_payment_code_node(recipient_payment_code, derivation_index=0, version=network["xpub"]).get_public_key() + B = get_derived_payment_code_node(recipient_payment_code, derivation_index=0).get_public_key() # Alice serializes her payment code in binary form payment_code = base58.decode_check(payer_payment_code)[1:] # omit the 0x47 leading byte # Blind the payment code - raw_blinded_payload = blinding_function(a, B, utxo_outpoint=input_utxo_outpoint, payload=payment_code) + raw_blinded_payload = blinding_function(a, B, utxo_outpoint=input_utxo_outpoint[:72], payload=payment_code) return hexlify(raw_blinded_payload).decode() From ff516c964bdddbcf776bd4719ef2a13a54323b11 Mon Sep 17 00:00:00 2001 From: kdmukai Date: Mon, 5 Sep 2022 08:14:08 -0500 Subject: [PATCH 11/11] Adds full psbt-to-tx construction test; final cleanup --- src/embit/bip47.py | 17 ++++------- tests/tests/test_bip47.py | 61 +++++++++++++++++++++++++++++++++++++-- 2 files changed, 64 insertions(+), 14 deletions(-) diff --git a/src/embit/bip47.py b/src/embit/bip47.py index 5dc4fd5..df69e63 100644 --- a/src/embit/bip47.py +++ b/src/embit/bip47.py @@ -152,7 +152,7 @@ def get_receive_address(recipient_root: HDKey, payer_payment_code: str, index: i return (receive_address, spending_key) -def blinding_function(private_key: str, secret_point: HDKey, utxo_outpoint: str, payload: str): +def blinding_function(private_key: bytes, secret_point: HDKey, utxo_outpoint: str, payload: bytes) -> bytes: """Reversible blind/unblind function: blinds plaintext payloads and unblinds blinded payloads""" S = secret_point._xonly() secp256k1.ec_pubkey_tweak_mul(S, private_key) @@ -172,7 +172,7 @@ def blinding_function(private_key: str, secret_point: HDKey, utxo_outpoint: str, return payload[0:3] + x_prime + c_prime + payload[-13:] -def get_blinded_payment_code(payer_payment_code: str, input_utxo_private_key: ec.PrivateKey, input_utxo_outpoint: str, recipient_payment_code: str): +def get_blinded_payment_code(payer_payment_code: str, input_utxo_private_key: ec.PrivateKey, input_utxo_outpoint: str, recipient_payment_code: str) -> str: """Called by the payer, returns the blinded payload for the payer's notification tx that is sent to the recipient while spending the input_utxo. The blinded payload should be inserted as OP_RETURN data.""" @@ -194,8 +194,8 @@ def get_blinded_payment_code(payer_payment_code: str, input_utxo_private_key: ec def get_payment_code_from_notification_tx(tx: Transaction, recipient_root: HDKey, coin: int = 0, account: int = 0, network: dict = NETWORKS["main"]) -> str: - """If the tx is a BIP-47 notification tx for the recipient, - return the new payer's embedded payment_code, else None""" + """If the tx is a BIP-47 notification tx for the recipient, return the new payer's + embedded payment_code, else None.""" # Notification txs have one output sent to the recipient's notification addr # and another containing the payer's payment code in an OP_RETURN payload. if len(tx.vout) < 2: @@ -211,10 +211,10 @@ def get_payment_code_from_notification_tx(tx: Transaction, recipient_root: HDKey matches_notification_addr = True continue - # Payer's payment code will be in an OP_RETURN w/exactly 80 bytes of data + # Payer's blinded payment code will be in an OP_RETURN w/exactly 80 bytes of data + # data = OP_RETURN OP_PUSHDATA1 (len of payload) data = vout.script_pubkey.data if data is not None and len(data) == 83 and data[0] == OPCODES.OP_RETURN and data[1] == OPCODES.OP_PUSHDATA1 and data[2] == 80: - # data = OP_RETURN OP_PUSHDATA1 (len of data) payload = data[3:] if payload[0] != 1: @@ -251,8 +251,3 @@ def get_payment_code_from_notification_tx(tx: Transaction, recipient_root: HDKey # Unblind the payload using the reversible `blinding_function`. raw_unblinded_payload = blinding_function(b, A, utxo_outpoint=utxo_outpoint, payload=payload) return base58.encode_check(b'\x47' + raw_unblinded_payload) - - -""" - TODO: Method to create notification transaction, etc. -""" \ No newline at end of file diff --git a/tests/tests/test_bip47.py b/tests/tests/test_bip47.py index df0e151..bdf34ab 100644 --- a/tests/tests/test_bip47.py +++ b/tests/tests/test_bip47.py @@ -1,9 +1,11 @@ from unittest import TestCase -from embit import bip32, bip39, bip47, script, ec +from embit import bip32, bip39, bip47, ec, compact, finalizer from embit.networks import NETWORKS +from embit.psbt import PSBT, OutputScope +from embit.script import OPCODES, Script, p2wpkh from embit.transaction import Transaction -from binascii import hexlify, unhexlify +from binascii import unhexlify """ @@ -236,7 +238,7 @@ def test_get_blinded_payment_code(self): def test_get_payment_code_from_notification_tx(self): - """Bob (the recipient) should be able to decode Alice's payment code from her notification tx""" + """Bob should be able to decode Alice's payment code from her notification tx""" tx = Transaction.from_string(ALICE_NOTIFICATION_TX_FOR_BOB) seed_bytes = bip39.mnemonic_to_seed(BOB_MNEMONIC) recipient_root = bip32.HDKey.from_seed(seed_bytes) @@ -248,3 +250,56 @@ def test_get_payment_code_from_notification_tx(self): seed_bytes = bip39.mnemonic_to_seed("abandon " * 11 + "about") other_root = bip32.HDKey.from_seed(seed_bytes) self.assertTrue(bip47.get_payment_code_from_notification_tx(tx, other_root) is None) + + + def test_end_to_end_notification_tx(self): + """Alice should be able to construct a psbt that uses the utxo being spent to + blind her payment code and include it as an OP_RETURN output that Bob can + successfully unblind. + + Note: the above test used the already-constructed tx from the BIP-47 test + vectors; this test demonstrates Alice's full psbt-to-tx construction + process. + """ + # Regtest initial psbt that spends one of Alice's utxos to Bob's notification + # address. + ALICE_NOTIFICATION_BASE_PSBT = "cHNidP8BAHQCAAAAAbSCxQCModbVqGCXS5f5q5BbyrIPipQGLzRkUWlvMk41AAAAAAD9////AiICAAAAAAAAGXapFHhlOS7V9B6XfLmRNMuc4ixew5VliKyQFKgEAAAAABYAFMCPedocVwqxQv7WE3dtTEHFcgdeFwQAAAABAGkCAAAAAXVRckaobGUWT22l1cJgRaArprnVqrpKuIxmjoTenYzgAAAAABcWABQAOKoxxLYTkMskJhqVMhXE2L3mW/3///8BQxeoBAAAAAAWABR/yGzYyF9cb44byWc2v+DhQgtggBYEAAABAR9DF6gEAAAAABYAFH/IbNjIX1xvjhvJZza/4OFCC2CAIgYDdwO70Vy+5ob7m/irfdzOCIzKa2NpA+qZc8M4vUb/X/8YREM4d1QAAIABAACAAAAAgAAAAAAAAAAAAAAiAgPOQHgFgYW4Q0pSBPYgwA2DwbTSJsnO+ylBnsDILKIRpxhEQzh3VAAAgAEAAIAAAACAAQAAAAAAAAAA" + psbt = PSBT.from_base64(ALICE_NOTIFICATION_BASE_PSBT) + + # Verify that this psbt is spending to Bob's notification addr + self.assertEqual(psbt.outputs[0].script_pubkey.address(NETWORKS["regtest"]), BOB_NOTIFICATION_ADDR_REGTEST) + + # Extract the utxo that Alice will be spending + outpoint = psbt.inputs[0].vin.to_string() + + # Derive Alice's private key for this utxo + seed_bytes = bip39.mnemonic_to_seed(ALICE_MNEMONIC) + alice_root = bip32.HDKey.from_seed(seed_bytes) + prvkey = alice_root.derive("m/84'/1'/0'/0/0") # This utxo is Alice's first receive addr on regtest + + # Sanity check the prvkey + self.assertEqual(p2wpkh(prvkey.get_public_key()).address(NETWORKS["regtest"]), psbt.inputs[0].script_pubkey.address(NETWORKS["regtest"])) + + # Use the utxo to blind Alice's payment code + blinded_payload = bip47.get_blinded_payment_code(ALICE_PAYMENT_CODE_REGTEST, prvkey, outpoint, BOB_PAYMENT_CODE_REGTEST) + + # Build the additional psbt output with the blinded payload in an OP_RETURN + # data = OP_RETURN OP_PUSHDATA1 (len of payload) + raw_payload_data = unhexlify(blinded_payload) + data = compact.to_bytes(OPCODES.OP_RETURN) + compact.to_bytes(OPCODES.OP_PUSHDATA1) + compact.to_bytes(len(raw_payload_data)) + raw_payload_data + script = Script(data) + output = OutputScope() + output.script_pubkey = script + output.value = 0 + psbt.outputs.append(output) + + # Alice signs and finalizes her psbt into a tx + psbt.sign_with(alice_root) + tx = finalizer.finalize_psbt(psbt) + + # Can Bob decode the payload in the tx? + seed_bytes = bip39.mnemonic_to_seed(BOB_MNEMONIC) + bob_root = bip32.HDKey.from_seed(seed_bytes) + unblinded_payment_code = bip47.get_payment_code_from_notification_tx(tx, bob_root, coin=1, network=NETWORKS["regtest"]) + + self.assertEqual(unblinded_payment_code, ALICE_PAYMENT_CODE_REGTEST)