From f7fc5929ac1de2e364e297fa0894b68a347be1f0 Mon Sep 17 00:00:00 2001 From: Andrew Chow Date: Mon, 24 Jul 2023 15:19:27 -0400 Subject: [PATCH] test: Add functional test for wallets with encrypted db --- test/functional/test_framework/test_node.py | 4 +- test/functional/test_runner.py | 1 + test/functional/wallet_createwallet.py | 8 +- test/functional/wallet_db_encryption.py | 157 ++++++++++++++++++++ 4 files changed, 165 insertions(+), 5 deletions(-) create mode 100755 test/functional/wallet_db_encryption.py diff --git a/test/functional/test_framework/test_node.py b/test/functional/test_framework/test_node.py index 90c1213deb30ad..bbeda442db425c 100755 --- a/test/functional/test_framework/test_node.py +++ b/test/functional/test_framework/test_node.py @@ -846,10 +846,10 @@ def __getattr__(self, name): def createwallet_passthrough(self, *args, **kwargs): return self.__getattr__("createwallet")(*args, **kwargs) - def createwallet(self, wallet_name, disable_private_keys=None, blank=None, passphrase='', avoid_reuse=None, descriptors=None, load_on_startup=None, external_signer=None): + def createwallet(self, wallet_name, disable_private_keys=None, blank=None, passphrase='', avoid_reuse=None, descriptors=None, load_on_startup=None, external_signer=None, db_passphrase=''): if descriptors is None: descriptors = self.descriptors - return self.__getattr__('createwallet')(wallet_name, disable_private_keys, blank, passphrase, avoid_reuse, descriptors, load_on_startup, external_signer) + return self.__getattr__('createwallet')(wallet_name, disable_private_keys, blank, passphrase, avoid_reuse, descriptors, load_on_startup, external_signer, db_passphrase) def importprivkey(self, privkey, label=None, rescan=None): wallet_info = self.getwalletinfo() diff --git a/test/functional/test_runner.py b/test/functional/test_runner.py index 6016a482f8d1f3..8bcec6c96a97d3 100755 --- a/test/functional/test_runner.py +++ b/test/functional/test_runner.py @@ -312,6 +312,7 @@ 'p2p_leak.py', 'wallet_encryption.py --legacy-wallet', 'wallet_encryption.py --descriptors', + 'wallet_db_encryption.py --descriptors', 'feature_dersig.py', 'feature_cltv.py', 'rpc_uptime.py', diff --git a/test/functional/wallet_createwallet.py b/test/functional/wallet_createwallet.py index 8e07021e036857..83507e281ce820 100755 --- a/test/functional/wallet_createwallet.py +++ b/test/functional/wallet_createwallet.py @@ -16,6 +16,8 @@ EMPTY_PASSPHRASE_MSG = "Empty string given as passphrase, wallet will not be encrypted." +EMPTY_DB_PASSPHRASE_MSG = "Empty string given as database passphrase, wallet database will not be encrypted." +EMPTY_PASSPHRASE_MSGS = [EMPTY_PASSPHRASE_MSG, EMPTY_DB_PASSPHRASE_MSG] LEGACY_WALLET_MSG = "Wallet created successfully. The legacy wallet type is being deprecated and support for creating and opening legacy wallets will be removed in the future." @@ -161,7 +163,7 @@ def run_test(self): assert_equal(walletinfo['keypoolsize_hd_internal'], keys) # Allow empty passphrase, but there should be a warning resp = self.nodes[0].createwallet(wallet_name='w7', disable_private_keys=False, blank=False, passphrase='') - assert_equal(resp["warnings"], [EMPTY_PASSPHRASE_MSG] if self.options.descriptors else [EMPTY_PASSPHRASE_MSG, LEGACY_WALLET_MSG]) + assert_equal(resp["warnings"], EMPTY_PASSPHRASE_MSGS if self.options.descriptors else EMPTY_PASSPHRASE_MSGS + [LEGACY_WALLET_MSG]) w7 = node.get_wallet_rpc('w7') assert_raises_rpc_error(-15, 'Error: running with an unencrypted wallet, but walletpassphrase was called.', w7.walletpassphrase, '', 60) @@ -180,12 +182,12 @@ def run_test(self): result = self.nodes[0].createwallet(wallet_name="legacy_w0", descriptors=False, passphrase=None) assert_equal(result, { "name": "legacy_w0", - "warnings": [LEGACY_WALLET_MSG], + "warnings": [EMPTY_DB_PASSPHRASE_MSG, LEGACY_WALLET_MSG], }) result = self.nodes[0].createwallet(wallet_name="legacy_w1", descriptors=False, passphrase="") assert_equal(result, { "name": "legacy_w1", - "warnings": [EMPTY_PASSPHRASE_MSG, LEGACY_WALLET_MSG], + "warnings": EMPTY_PASSPHRASE_MSGS + [LEGACY_WALLET_MSG], }) diff --git a/test/functional/wallet_db_encryption.py b/test/functional/wallet_db_encryption.py new file mode 100755 index 00000000000000..7ea632277e2f36 --- /dev/null +++ b/test/functional/wallet_db_encryption.py @@ -0,0 +1,157 @@ +#!/usr/bin/env python3 +# Copyright (c) 2023 The Bitcoin Core developers +# Distributed under the MIT software license, see the accompanying +# file COPYING or https://www.opensource.org/licenses/mit-license.php. +"""Test Wallets with encrypted database""" + +from test_framework.test_framework import BitcoinTestFramework +from test_framework.util import ( + assert_raises_rpc_error, + assert_equal, + assert_greater_than, +) + + +class WalletDBEncryptionTest(BitcoinTestFramework): + PASSPHRASE = "WalletPassphrase" + PASSPHRASE2 = "SecondWalletPassphrase" + WRONG_PASSPHRASE = "NotTheRightPassphrase" + PASSPHRASE_WITH_NULLS = "Passphrase\0With\0Nulls" + + def add_options(self, parser): + self.add_wallet_options(parser, descriptors=True) + + def set_test_params(self): + self.setup_clean_chain = True + self.num_nodes = 1 + + def skip_test_if_missing_module(self): + self.skip_if_no_wallet() + + def test_no_legacy(self): + if not self.is_bdb_compiled(): + return + self.log.info("Test that legacy wallets do not support encrypted databases") + assert_raises_rpc_error(-4, "Database encryption is only supported for descriptor wallets", self.nodes[0].createwallet, wallet_name="legacy_encdb", db_passphrase=self.PASSPHRASE, descriptors=False) + + def test_create_and_load(self): + self.log.info("Testing that a wallet with encrypted database can be created") + self.nodes[0].createwallet(wallet_name="basic_encrypted_db", db_passphrase=self.PASSPHRASE) + wallet = self.nodes[0].get_wallet_rpc("basic_encrypted_db") + info = wallet.getwalletinfo() + assert_equal("encrypted_sqlite", info["format"]) + + # Add some data to the wallet that we should see persisted + addr = wallet.getnewaddress() + txid_in = self.default_wallet.sendtoaddress(addr, 10) + self.generate(self.nodes[0], 1) + txid_out = wallet.sendtoaddress(self.default_wallet.getnewaddress(), 5) + self.generate(self.nodes[0], 1) + + wallet.unloadwallet() + + self.log.info("Testing loading of a wallet with encrypted database") + assert_raises_rpc_error(-4, "Database is encrypted but passphrase was not provided", self.nodes[0].loadwallet, filename="basic_encrypted_db") + assert_raises_rpc_error(-4, "Unable to decrypt database, are you sure the passphrase is correct?", self.nodes[0].loadwallet, filename="basic_encrypted_db", db_passphrase=self.WRONG_PASSPHRASE) + self.nodes[0].loadwallet(filename="basic_encrypted_db", db_passphrase=self.PASSPHRASE) + info = wallet.getwalletinfo() + assert_equal("encrypted_sqlite", info["format"]) + + # Make sure that our persisted data is still here + addr_info = wallet.getaddressinfo(addr) + assert_equal(addr_info["ismine"], True) + tx_in = wallet.gettransaction(txid_in) + assert_equal(tx_in["amount"], 10) + tx_out = wallet.gettransaction(txid_out) + assert_equal(tx_out["amount"], -5) + wallet.sendtoaddress(self.default_wallet.getnewaddress(), 2) + + wallet.unloadwallet() + + self.log.info("Test that listwalletdir lists wallets with encrypted dbs") + wallets = [w["name"] for w in self.nodes[0].listwalletdir()["wallets"]] + assert "basic_encrypted_db" in wallets + + def test_passphrases_with_nulls(self): + self.log.info("Testing passphrases with nulls for wallets with encrypted databases") + self.nodes[0].createwallet(wallet_name="encdb_with_nulls", db_passphrase=self.PASSPHRASE_WITH_NULLS) + wallet = self.nodes[0].get_wallet_rpc("encdb_with_nulls") + info = wallet.getwalletinfo() + assert_equal("encrypted_sqlite", info["format"]) + wallet.unloadwallet() + + assert_raises_rpc_error(-4, "Unable to decrypt database, are you sure the passphrase is correct?", self.nodes[0].loadwallet, filename="encdb_with_nulls", db_passphrase=self.PASSPHRASE_WITH_NULLS.partition("\0")[0]) + self.nodes[0].loadwallet(filename="encdb_with_nulls", db_passphrase=self.PASSPHRASE_WITH_NULLS) + + def test_on_start(self): + self.log.info("Test that wallets with encrypted db are ignored on startup") + self.nodes[0].createwallet(wallet_name="startup_encdb", db_passphrase=self.PASSPHRASE) + with self.nodes[0].assert_debug_log(expected_msgs=["Warning: Skipping -wallet path to encrypted wallet, use loadwallet to load it."]): + self.stop_node(0) + self.start_node(0, extra_args=["-wallet=startup_encdb"]) + # Need to clear stderr file so that test shutdown works + self.nodes[0].stderr.seek(0) + self.nodes[0].stderr.truncate(0) + assert_equal(self.nodes[0].listwallets(), [self.default_wallet_name]) + self.nodes[0].loadwallet(filename="startup_encdb", db_passphrase=self.PASSPHRASE) + + def test_double_encrypted(self): + self.log.info("Test that wallet encryption is not db encryption") + self.nodes[0].createwallet(wallet_name="enc_wallet_not_db", passphrase=self.PASSPHRASE) + wallet = self.nodes[0].get_wallet_rpc("enc_wallet_not_db") + info = wallet.getwalletinfo() + assert_equal(info["format"], "sqlite") + assert_equal(info["unlocked_until"], 0) + + self.nodes[0].createwallet(wallet_name="enc_wallet_not_db2") + wallet = self.nodes[0].get_wallet_rpc("enc_wallet_not_db2") + wallet.encryptwallet(self.PASSPHRASE) + info = wallet.getwalletinfo() + assert_equal(info["format"], "sqlite") + assert_equal(info["unlocked_until"], 0) + + self.log.info("Test that wallets with encrypted db can also be encrypted normally") + self.nodes[0].createwallet(wallet_name="double_enc", db_passphrase=self.PASSPHRASE, passphrase=self.PASSPHRASE2) + wallet = self.nodes[0].get_wallet_rpc("double_enc") + info = wallet.getwalletinfo() + assert_equal(info["format"], "encrypted_sqlite") + assert_equal(info["unlocked_until"], 0) + wallet.walletpassphrase(self.PASSPHRASE2, 10) + assert_greater_than(wallet.getwalletinfo()["unlocked_until"], 0) + wallet.walletlock() + + self.nodes[0].createwallet(wallet_name="double_enc2", db_passphrase=self.PASSPHRASE) + wallet = self.nodes[0].get_wallet_rpc("double_enc2") + wallet.encryptwallet(self.PASSPHRASE2) + info = wallet.getwalletinfo() + assert_equal(info["format"], "encrypted_sqlite") + assert_equal(info["unlocked_until"], 0) + wallet.walletpassphrase(self.PASSPHRASE2, 10) + assert_greater_than(wallet.getwalletinfo()["unlocked_until"], 0) + wallet.walletlock() + + self.log.info("Test that changing wallet passphrase does not affect database passphrase") + wallet.walletpassphrase(self.PASSPHRASE2, 10) + wallet.walletpassphrasechange(self.PASSPHRASE2, self.PASSPHRASE_WITH_NULLS) + wallet.walletlock() + assert_raises_rpc_error(-14, "Error: The wallet passphrase entered was incorrect.", wallet.walletpassphrase, self.PASSPHRASE, 10) + assert_raises_rpc_error(-14, "Error: The wallet passphrase entered was incorrect.", wallet.walletpassphrase, self.PASSPHRASE2, 10) + wallet.walletpassphrase(self.PASSPHRASE_WITH_NULLS, 10) + wallet.unloadwallet() + + assert_raises_rpc_error(-4, "Wallet file verification failed. Unable to decrypt database, are you sure the passphrase is correct?", self.nodes[0].loadwallet, filename="double_enc2", db_passphrase=self.PASSPHRASE_WITH_NULLS) + assert_raises_rpc_error(-4, "Wallet file verification failed. Unable to decrypt database, are you sure the passphrase is correct?", self.nodes[0].loadwallet, filename="double_enc2", db_passphrase=self.PASSPHRASE2) + self.nodes[0].loadwallet(filename="double_enc2", db_passphrase=self.PASSPHRASE) + + def run_test(self): + self.default_wallet = self.nodes[0].get_wallet_rpc(self.default_wallet_name) + self.generate(self.nodes[0], 101) + + self.test_no_legacy() + self.test_create_and_load() + self.test_passphrases_with_nulls() + self.test_on_start() + self.test_double_encrypted() + +if __name__ == '__main__': + WalletDBEncryptionTest().main()