diff --git a/tests/functional/modules/test_vault_pki.py b/tests/functional/modules/test_vault_pki.py new file mode 100644 index 00000000..0022db67 --- /dev/null +++ b/tests/functional/modules/test_vault_pki.py @@ -0,0 +1,519 @@ +from datetime import datetime +from datetime import timedelta +from datetime import timezone + +import pytest +from cryptography import x509 +from cryptography.hazmat.primitives import serialization +from salt.utils.x509 import generate_rsa_privkey +from salt.utils.x509 import load_cert + +from saltext.vault.utils.vault.exceptions import VaultException +from saltext.vault.utils.vault.pki import dec2hex +from tests.support.vault import vault_delete +from tests.support.vault import vault_disable_secret_engine +from tests.support.vault import vault_enable_secret_engine +from tests.support.vault import vault_list +from tests.support.vault import vault_list_detailed +from tests.support.vault import vault_read +from tests.support.vault import vault_write + +pytest.importorskip("docker") + +pytestmark = [ + pytest.mark.slow_test, + pytest.mark.skip_if_binaries_missing("vault", "getent"), + pytest.mark.usefixtures("vault_container_version"), +] + + +@pytest.fixture(scope="module") +def minion_config_overrides(vault_port): + return { + "vault": { + "auth": { + "method": "token", + "token": "testsecret", + }, + "cache": { + "backend": "disk", # ensure a persistent cache is available for get_creds + }, + "server": { + "url": f"http://127.0.0.1:{vault_port}", + }, + } + } + + +@pytest.fixture +def testrole(): + return {"ttl": 3600, "max_ttl": 86400, "allow_any_name": True, "issuer_ref": "testissuer"} + + +@pytest.fixture +def testissuer(): + return {"issuer_name": "testissuer", "common_name": "Test Issuer CA"} + + +@pytest.fixture +def testissuer2(): + return {"issuer_name": "testissuer2", "common_name": "Test Issuer CA 2"} + + +@pytest.fixture +def private_key(): + pk = generate_rsa_privkey(2048) + pk_encoding = getattr(serialization.Encoding, "PEM") + pk_bytes = pk.private_bytes( + pk_encoding, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ) + return pk_bytes.decode() + + +@pytest.fixture(scope="module", autouse=True) +def pki_engine(vault_container_version): # pylint: disable=unused-argument + assert vault_enable_secret_engine("pki") + yield + assert vault_disable_secret_engine("pki") + + +@pytest.fixture(params=[["testrole"]]) +def roles_setup(request): # pylint: disable=unused-argument + try: + for role_name in request.param: + role_args = request.getfixturevalue(role_name) + # role_args.update(role_args_common) + vault_write(f"pki/roles/{role_name}", **role_args) + assert role_name in vault_list("pki/roles") + yield + finally: + for role_name in request.param: + if role_name in vault_list("pki/roles"): + vault_delete(f"pki/roles/{role_name}") + assert role_name not in vault_list("pki/roles") + + +@pytest.fixture() +def root_issuer_setup(request): # pylint: disable=unused-argument + try: + root_ca_args = {"issuer_name": "test-issuer-root", "common_name": "Test Issuer Root CA"} + + vault_write("pki/root/generate/internal", **root_ca_args) + assert vault_read(f"pki/issuer/{root_ca_args['issuer_name']}") + + return True + except VaultException: + return False + + +@pytest.fixture(params=[["testissuer"]]) +def issuers_setup(request, root_issuer_setup): # pylint: disable=unused-argument + try: + assert root_issuer_setup + for issuer_name in request.param: + issuer_args = request.getfixturevalue(issuer_name) + + csr_resp = vault_write("pki/intermediate/generate/internal", **issuer_args)["data"] + sign_resp = vault_write( + "/pki/root/sign-intermediate", csr=csr_resp["csr"], **issuer_args + )["data"] + resp = vault_write( + "/pki/intermediate/set-signed", certificate=sign_resp["certificate"] + )["data"] + for issuer in resp["imported_issuers"]: + vault_write(f"/pki/issuer/{issuer}", **issuer_args) + assert vault_read(f"pki/issuer/{issuer_name}") + yield + finally: + all_issuers = vault_list_detailed("pki/issuers") + issuers_names = [] + if len(all_issuers) > 0: + issuers_names = [v["issuer_name"] for k, v in all_issuers["key_info"].items()] + for issuer_name in request.param: + if issuer_name in issuers_names: + vault_delete(f"pki/issuer/{issuer_name}") + + +@pytest.fixture +def vault_pki(modules): + try: + yield modules.vault_pki + finally: + if "testrole" in vault_list("pki/roles"): + vault_delete("pki/roles/testrole") + assert "testrole" not in vault_list("pki/roles") + + vault_delete("pki/issuer/test-issuer-root") + + +@pytest.fixture +def generated_root(): + yield + vault_delete("pki/issuer/generated-root") + + +@pytest.mark.usefixtures("roles_setup") +def test_list_roles(vault_pki): + ret = vault_pki.list_roles() + assert ret == ["testrole"] + + +def test_list_empty_roles(vault_pki): + ret = vault_pki.list_roles() + assert ret == [] + + +@pytest.mark.usefixtures("roles_setup") +def test_delete_role(vault_pki): + ret = vault_pki.delete_role("testrole") + assert ret + assert "testrole" not in vault_list("pki/roles") + + +def test_write_role(vault_pki): + assert vault_pki.write_role("testrole2", ttl="360h") is True + assert "testrole2" in vault_list("pki/roles") + + +@pytest.mark.usefixtures("roles_setup") +def test_update_role(vault_pki): + assert vault_pki.write_role("testrole", ttl="4h", max_ttl="24h") is True + assert vault_read("pki/roles/testrole")["data"]["ttl"] == 14400 # 4 hours in seconds + + +@pytest.mark.usefixtures("issuers_setup") +def test_list_issuers(vault_pki): + ret = [info["issuer_name"] for id, info in vault_pki.list_issuers().items()] + diff = set(ret) ^ {"test-issuer-root", "testissuer"} + + assert not diff + + +@pytest.mark.usefixtures("issuers_setup") +def test_read_issuer(vault_pki): + ret = vault_pki.read_issuer("testissuer") + assert ret["issuer_name"] == "testissuer" + assert "ca_chain" in ret + assert "certificate" in ret + + +@pytest.mark.usefixtures("issuers_setup") +@pytest.mark.usefixtures("roles_setup") +def test_issue_certificate(vault_pki): + # Certificate expiration is always in UTC, so we need to compare with UTC. + run_time = datetime.now(tz=timezone.utc).replace(tzinfo=None) + + ret = vault_pki.issue_certificate( + role_name="testrole", + common_name="test.example.com", + ttl="2h", + alt_names=["DNS:test2.example.com"], + ) + assert "certificate" in ret + certificate = load_cert(ret["certificate"]) + + assert certificate.issuer.rfc4514_string() == "CN=Test Issuer CA" + + assert certificate.subject.rfc4514_string() == "CN=test.example.com" + san = certificate.extensions.get_extension_for_class(x509.SubjectAlternativeName) + dns_sans = san.value.get_values_for_type(x509.DNSName) + assert certificate.not_valid_after - run_time > timedelta(hours=1) + assert "test2.example.com" in dns_sans + assert "test.example.com" in dns_sans + + +@pytest.mark.usefixtures("issuers_setup") +@pytest.mark.usefixtures("roles_setup") +def test_sign_certificate_with_private_key(vault_pki, private_key): + # Certificate expiration is always in UTC, so we need to compare with UTC. + run_time = datetime.now(tz=timezone.utc).replace(tzinfo=None) + + ret = vault_pki.sign_certificate( + "testrole", + common_name="test.example.com", + private_key=private_key, + ttl="2h", + alt_names=["dns:test2.example.com"], + ) + assert "certificate" in ret + certificate = load_cert(ret["certificate"]) + + assert certificate.issuer.rfc4514_string() == "CN=Test Issuer CA" + + assert certificate.subject.rfc4514_string() == "CN=test.example.com" + san = certificate.extensions.get_extension_for_class(x509.SubjectAlternativeName) + dns_sans = san.value.get_values_for_type(x509.DNSName) + assert certificate.not_valid_after - run_time > timedelta(hours=1) + assert "test2.example.com" in dns_sans + assert "test.example.com" in dns_sans + + +@pytest.mark.usefixtures("issuers_setup") +@pytest.mark.usefixtures("roles_setup") +def test_sign_certificate_with_sign_verbatim(vault_pki, private_key): + ret = vault_pki.sign_certificate( + "testrole", + common_name="test.example.com", + private_key=private_key, + ttl="2h", + sign_verbatim=True, + alt_names=["dns:test2.example.com"], + L="Boston", + C="US", + ) + assert "certificate" in ret + certificate = load_cert(ret["certificate"]) + assert certificate.subject.get_attributes_for_oid(x509.OID_COUNTRY_NAME)[0].value == "US" + assert certificate.subject.get_attributes_for_oid(x509.OID_LOCALITY_NAME)[0].value == "Boston" + + +@pytest.mark.usefixtures("issuers_setup") +@pytest.mark.usefixtures("roles_setup") +@pytest.mark.parametrize("issuers_setup", [["testissuer", "testissuer2"]], indirect=True) +def test_sign_certificate_with_alternative_issuer(vault_pki, private_key): + ret = vault_pki.sign_certificate( + "testrole", + common_name="test.example.com", + private_key=private_key, + ttl="2h", + sign_verbatim=True, + issuer="testissuer2", + alt_names=["dns:test2.example.com"], + L="Boston", + C="US", + ) + assert "certificate" in ret + certificate = load_cert(ret["certificate"]) + assert certificate.issuer.rfc4514_string() == "CN=Test Issuer CA 2" + + +@pytest.mark.usefixtures("issuers_setup") +@pytest.mark.usefixtures("roles_setup") +def test_sign_certificate_with_der_encoding(vault_pki, private_key): + ret = vault_pki.sign_certificate( + "testrole", + common_name="test.example.com", + private_key=private_key, + ttl="2h", + encoding="der", + alt_names=["dns:test2.example.com"], + ) + + assert "certificate" in ret + _, encoding, _, _ = load_cert(ret["certificate"], get_encoding=True) + assert encoding.lower() == "der" + + +@pytest.mark.usefixtures("issuers_setup") +def test_read_issuer_certificate(vault_pki): + ret = vault_pki.read_issuer_certificate("testissuer") + + certificate = load_cert(ret) + + assert certificate.subject.rfc4514_string() == "CN=Test Issuer CA" + + +@pytest.mark.usefixtures("issuers_setup") +def test_read_issuer_certificate_with_chain(vault_pki): + ret = vault_pki.read_issuer_certificate("testissuer", include_chain=True) + + certificate, chain = load_cert(ret, load_chain=True) + + assert certificate.subject.rfc4514_string() == "CN=Test Issuer CA" + assert len(chain) == 1 + + assert chain[0].subject.rfc4514_string() == "CN=Test Issuer Root CA" + + +@pytest.mark.usefixtures("issuers_setup") +def test_delete_issuer(vault_pki): + ret = [info["issuer_name"] for id, info in vault_pki.list_issuers().items()] + assert "testissuer" in ret + + ret = vault_pki.delete_issuer("testissuer") + assert ret + + ret = [info["issuer_name"] for id, info in vault_pki.list_issuers().items()] + assert "testissuer" not in ret + + +@pytest.mark.usefixtures("issuers_setup") +def test_delete_issuer_with_private_key(vault_pki): + ret = vault_pki.read_issuer("testissuer") + assert ret["key_id"] + private_key_id = ret["key_id"] + + ret = vault_pki.delete_issuer("testissuer", include_key=True) + assert ret + + ret = [info["issuer_name"] for id, info in vault_pki.list_issuers().items()] + assert "testissuer" not in ret + + keys = vault_list("pki/keys") + assert private_key_id not in keys + + +@pytest.mark.usefixtures("issuers_setup") +def test_update_issuer(vault_pki): + ret = vault_pki.update_issuer( + "testissuer", + aia_urls=["http://aia.example.com/ca.list"], + ) + assert ret + + # Now update OCSP endpoints. + ret = vault_pki.update_issuer( + "testissuer", + ocsp_servers=["http://ocsp.example.com"], + ) + + # Now update CRL endpoints. + ret = vault_pki.update_issuer( + "testissuer", + crl_endpoints=["http://crl.example.com/ca.crl"], + ) + + ret = vault_pki.read_issuer("testissuer") + + assert "http://crl.example.com/ca.crl" in ret["crl_distribution_points"] + assert "http://aia.example.com/ca.list" in ret["issuing_certificates"] + assert "http://ocsp.example.com" in ret["ocsp_servers"] + + +@pytest.mark.usefixtures("issuers_setup") +def test_read_issuer_crl(vault_pki): + ret_complete = vault_pki.read_issuer_crl("testissuer") + + crl_complete = x509.load_pem_x509_crl(str.encode(ret_complete)) + assert crl_complete.issuer.rfc4514_string() == "CN=Test Issuer CA" + + ret_delta = vault_pki.read_issuer_crl("testissuer", delta=True) + crl_delta = x509.load_pem_x509_crl(str.encode(ret_complete)) + assert crl_delta.issuer.rfc4514_string() == "CN=Test Issuer CA" + + assert ret_delta != ret_complete + + +@pytest.mark.usefixtures("issuers_setup") +@pytest.mark.usefixtures("roles_setup") +def test_revoke_certificate(vault_pki, private_key): + ret = vault_pki.sign_certificate( + "testrole", + common_name="test.example.com", + private_key=private_key, + ) + assert "certificate" in ret + certificate = load_cert(ret["certificate"]) + + # all_certs = vault_pki.list_certificates() + # assert certificate.serial_number in all_certs + serial = dec2hex(certificate.serial_number) + + ret = vault_pki.revoke_certificate(serial=serial) + assert ret + + revoked_certs = [serial.upper() for serial in vault_pki.list_revoked_certificates()] + assert serial in revoked_certs + + +@pytest.mark.usefixtures("issuers_setup") +def test_get_default_issuer(vault_pki): + before = vault_pki.get_default_issuer() + assert before + + # Now, delete default issuer + vault_pki.delete_issuer(before) + + after = vault_pki.get_default_issuer() + assert after is None + + +@pytest.mark.usefixtures("issuers_setup") +@pytest.mark.parametrize("issuers_setup", [["testissuer", "testissuer2"]], indirect=True) +def test_set_default_issuer(vault_pki): + before = vault_pki.get_default_issuer() + + assert before + ret = vault_pki.set_default_issuer(name="testissuer2") + assert ret + + after = vault_pki.get_default_issuer() + assert not after == before + + +@pytest.mark.usefixtures("generated_root") +def test_generate_root(vault_pki): + ret = vault_pki.list_issuers() + assert len(ret) == 0 + + ret = vault_pki.generate_root( + common_name="generated root", + issuer_name="generated-root", + ) + + assert "certificate" in ret + + certificate = load_cert(ret["certificate"]) + assert certificate.subject.rfc4514_string() == "CN=generated root" + + ret = vault_pki.list_issuers() + assert len(ret) == 1 + + +@pytest.mark.usefixtures("generated_root") +def test_generate_root_exported(vault_pki): + ret = vault_pki.list_issuers() + assert len(ret) == 0 + + ret = vault_pki.generate_root( + common_name="generated root", + issuer_name="generated-root", + type="exported", + ) + + assert "certificate" in ret + assert "private_key" in ret + + +@pytest.mark.usefixtures("issuers_setup") +@pytest.mark.usefixtures("roles_setup") +def test_list_certificates(vault_pki, private_key): + ret = vault_pki.sign_certificate( + "testrole", + common_name="test.example.com", + private_key=private_key, + ) + assert "certificate" in ret + certificate = load_cert(ret["certificate"]) + + # all_certs = vault_pki.list_certificates() + # assert certificate.serial_number in all_certs + serial = dec2hex(certificate.serial_number) + + all_certs = [serial.upper() for serial in vault_pki.list_certificates()] + + assert serial in all_certs + + +@pytest.mark.usefixtures("issuers_setup") +@pytest.mark.usefixtures("roles_setup") +def test_read_certificate(vault_pki, private_key): + ret = vault_pki.sign_certificate( + "testrole", + common_name="test.example.com", + private_key=private_key, + ) + assert "certificate" in ret + signed_certificate = load_cert(ret["certificate"]) + + # all_certs = vault_pki.list_certificates() + # assert certificate.serial_number in all_certs + serial = dec2hex(signed_certificate.serial_number) + + ret = vault_pki.read_certificate(serial) + + read_certificate = load_cert(ret) + + assert read_certificate.serial_number == signed_certificate.serial_number diff --git a/tests/support/vault.py b/tests/support/vault.py index 70bffa11..850666d3 100644 --- a/tests/support/vault.py +++ b/tests/support/vault.py @@ -241,6 +241,16 @@ def vault_list(path): return json.loads(ret.stdout) +def vault_list_detailed(path): + ret = _vault_cmd(["list", "-detailed", "-format=json", path], raw=True) + if ret.returncode != 0: + if ret.returncode == 2: + return [] + log.debug("Failed to list path at `%s`:\n%s\nSTDERR: %s", path, ret, ret.stderr) + pytest.fail(f"Failed to list path at `{path}`: {ret.stderr or ret.stdout}") + return json.loads(ret.stdout)["data"] + + def vault_read(path): try: ret = _vault_cmd(["read", "-format=json", path]) diff --git a/tests/unit/modules/test_vault_pki.py b/tests/unit/modules/test_vault_pki.py new file mode 100644 index 00000000..fa26c9b0 --- /dev/null +++ b/tests/unit/modules/test_vault_pki.py @@ -0,0 +1,208 @@ +from unittest.mock import patch + +import pytest +from salt.exceptions import CommandExecutionError + +import saltext.vault.utils.vault as vaultutil +from saltext.vault.modules import vault_pki + + +@pytest.fixture +def configure_loader_modules(): + return { + vault_pki: { + "__grains__": {"id": "test-minion"}, + } + } + + +@pytest.fixture +def data(): + return {"foo": "bar"} + + +@pytest.fixture +def data_role(): + return { + "dummy": { + "data": { + "allow_any_name": True, + } + }, + "no-subddomains": { + "data": { + "allow_subdomains": False, + } + }, + "no-serverflag": { + "data": { + "server_flag": True, + } + }, + } + + +@pytest.fixture +def data_roles_list(): + return {"data": {"keys": ["foo"]}} + + +@pytest.fixture +def role_not_found(query): + query.side_effect = vaultutil.VaultNotFoundError + yield query + + +@pytest.fixture +def list_roles(data_roles_list): + with patch("saltext.vault.utils.vault.query", autospec=True) as _list: + _list.return_value = data_roles_list + yield _list + + +@pytest.fixture +def read_role(data_role, role_name): + with patch("saltext.vault.utils.vault.query", autospec=True) as _data: + _data.return_value = data_role[role_name] + yield _data + + +# @pytest.fixture +# def delete_role(): +# with patch("saltext.vault.utils.vault.query", autospec=True) as delete_role: +# yield delete_role + + +@pytest.fixture +def query(): + with patch("saltext.vault.utils.vault.query", autospec=True) as _query: + yield _query + + +@pytest.mark.usefixtures("list_roles") +@pytest.mark.parametrize( + "expected", + [["foo"]], +) +def test_list_roles(expected): + res = vault_pki.list_roles() + assert res == expected + + +@pytest.mark.usefixtures("read_role") +@pytest.mark.parametrize( + "role_name, expected", + [ + ("dummy", {"allow_any_name": True}), + ("no-subddomains", {"allow_subdomains": False}), + ("no-serverflag", {"server_flag": True}), + ], +) +def test_read_role(role_name, expected): + ret = vault_pki.read_role(role_name) + assert ret == expected + + +@pytest.mark.parametrize("issuer", [None, "default", "someother"]) +def test_write_role_payload(query, issuer): + args = { + "ttl": "300h", + "max_ttl": "360h", + "server_flag": True, + "allow_subdomains": False, + "allowed_domains": ["example.com", "saltproject.io"], + "client_flag": True, + "allow_localhost": True, + "require_cn": False, + } + + assert vault_pki.write_role("role", mount="mount", issuer_ref=issuer, **args) is True + endpoint = query.call_args[0][1] + payload = query.call_args[1]["payload"] + assert endpoint == "mount/roles/role" + expected_payload = args.copy() + if issuer is not None: + expected_payload["issuer_ref"] = issuer + assert payload == expected_payload + + +def test_delete_role_payload(query): + assert vault_pki.delete_role("role", mount="mount") is True + endpoint = query.call_args[0][1] + assert endpoint == "mount/roles/role" + + +@pytest.mark.usefixtures("role_not_found") +def test_write_role_raises_err(): + with pytest.raises(CommandExecutionError, match=".*VaultNotFoundError.*"): + vault_pki.write_role("some/path") + + +@pytest.mark.usefixtures("role_not_found") +def test_delete_role_return_false_if_not_found(): + ret = vault_pki.delete_role("some/path") + assert not ret + + +@pytest.mark.usefixtures("role_not_found") +def test_read_role_return_none_if_not_found(): + ret = vault_pki.read_role("some/path") + assert ret is None + + +@pytest.mark.usefixtures("role_not_found") +def test_list_roles_return_empty_array_if_not_found(): + ret = vault_pki.list_roles("some/path") + assert ret == [] + + +@pytest.mark.parametrize( + "common_name,root_type,args", + [ + ( + "root_ca", + "exported", + { + "issuer_name": "root-ca", + "key-name": "root-ca-key", + "max_path_length": 4, + "key_bits": 384, + "key_type": "ec", + }, + ), + ( + "root_ca", + "internal", + { + "key_bits": 384, + "key_type": "ec", + }, + ), + ("root_ca", "internal", {}), + ], +) +def test_generate_root_payload(query, common_name, root_type, args): + vault_pki.generate_root(common_name=common_name, type=root_type, mount="mount", **args) + + endpoint = query.call_args[0][1] + payload = query.call_args[1]["payload"] + assert endpoint == f"mount/root/generate/{root_type}" + expected_payload = payload.copy() + expected_payload["common_name"] = common_name + assert payload == expected_payload + + +def test_generate_root_raise_err_with_default_name(): + with pytest.raises(CommandExecutionError): + vault_pki.generate_root("my root", issuer_name="default") + + with pytest.raises(CommandExecutionError): + vault_pki.generate_root("my root", key_name="default") + + +@pytest.mark.parametrize( + "args", [{"serial": "00:11:22:33:44:55", "certificate": "-----BEGIN CERTIFICATE..."}, {}] +) +def test_revoke_certificate_raise_err(args): + with pytest.raises(CommandExecutionError): + vault_pki.revoke_certificate(**args) diff --git a/tests/unit/utils/vault/test_helpers.py b/tests/unit/utils/vault/test_helpers.py index 1604512a..22692e8b 100644 --- a/tests/unit/utils/vault/test_helpers.py +++ b/tests/unit/utils/vault/test_helpers.py @@ -104,6 +104,26 @@ def test_timestring_map(inpt, expected): assert hlp.timestring_map(inpt) == expected +@pytest.mark.parametrize( + "inpt,expected", + [ + (60.0, 60), + (60, 60), + ("60", 60), + ("60s", 60), + ("2m", 120), + ("1h", 3600), + ("1d", 86400), + ("1.5s", 1), + ("1.5m", 90), + ("1.5h", 5400), + ("7.5d", 648000), + ], +) +def test_timestring_map_with_int(inpt, expected): + assert hlp.timestring_map(inpt, cast=int) == expected + + @pytest.mark.parametrize( "creation_time,expected", [