diff --git a/aries_cloudagent/admin/server.py b/aries_cloudagent/admin/server.py index 946cb0baf2..3ef522e232 100644 --- a/aries_cloudagent/admin/server.py +++ b/aries_cloudagent/admin/server.py @@ -18,7 +18,6 @@ setup_aiohttp_apispec, validation_middleware, ) - from marshmallow import fields from ..config.injection_context import InjectionContext @@ -38,6 +37,7 @@ from ..utils.stats import Collector from ..utils.task_queue import TaskQueue from ..version import __version__ +from ..wallet.upgrade_singleton import UpgradeSingleton from .base_server import BaseAdminServer from .error import AdminSetupError from .request_context import AdminRequestContext @@ -58,6 +58,8 @@ "acapy::keylist::updated": "keylist", } +upgrade_singleton = UpgradeSingleton() + class AdminModulesSchema(OpenAPISchema): """Schema for the modules endpoint.""" @@ -205,6 +207,17 @@ async def ready_middleware(request: web.BaseRequest, handler: Coroutine): raise web.HTTPServiceUnavailable(reason="Shutdown in progress") +@web.middleware +async def upgrade_middleware(request: web.BaseRequest, handler: Coroutine): + """Blocking middleware for upgrades.""" + context: AdminRequestContext = request["context"] + + if context._profile.name in upgrade_singleton: + raise web.HTTPServiceUnavailable(reason="Upgrade in progress") + + return await handler(request) + + @web.middleware async def debug_middleware(request: web.BaseRequest, handler: Coroutine): """Show request detail in debug log.""" @@ -351,6 +364,8 @@ async def check_multitenant_authorization(request: web.Request, handler): is_multitenancy_path = path.startswith("/multitenancy") is_server_path = path in self.server_paths or path == "/features" + # allow base wallets to trigger update through api + is_upgrade_path = path.startswith("/anoncreds/wallet/upgrade") # subwallets are not allowed to access multitenancy routes if authorization_header and is_multitenancy_path: @@ -380,6 +395,7 @@ async def check_multitenant_authorization(request: web.Request, handler): and not is_unprotected_path(path) and not base_limited_access_path and not (request.method == "OPTIONS") # CORS fix + and not is_upgrade_path ): raise web.HTTPUnauthorized() @@ -453,6 +469,9 @@ async def setup_context(request: web.Request, handler): middlewares.append(setup_context) + # Upgrade middleware needs the context setup + middlewares.append(upgrade_middleware) + # Register validation_middleware last avoiding unauthorized validations middlewares.append(validation_middleware) diff --git a/aries_cloudagent/core/conductor.py b/aries_cloudagent/core/conductor.py index 18d2447571..a540705d54 100644 --- a/aries_cloudagent/core/conductor.py +++ b/aries_cloudagent/core/conductor.py @@ -7,6 +7,7 @@ """ +import asyncio import hashlib import json import logging @@ -40,7 +41,9 @@ BaseMultipleLedgerManager, MultipleLedgerManagerError, ) -from ..ledger.multiple_ledger.ledger_requests_executor import IndyLedgerRequestsExecutor +from ..ledger.multiple_ledger.ledger_requests_executor import ( + IndyLedgerRequestsExecutor, +) from ..ledger.multiple_ledger.manager_provider import MultiIndyLedgerManagerProvider from ..messaging.responder import BaseResponder from ..multitenant.base import BaseMultitenantManager @@ -71,10 +74,15 @@ from ..transport.outbound.message import OutboundMessage from ..transport.outbound.status import OutboundSendStatus from ..transport.wire_format import BaseWireFormat +from ..utils.profiles import get_subwallet_profiles_from_storage from ..utils.stats import Collector from ..utils.task_queue import CompletedTask, TaskQueue from ..vc.ld_proofs.document_loader import DocumentLoader from ..version import RECORD_TYPE_ACAPY_VERSION, __version__ +from ..wallet.anoncreds_upgrade import ( + set_storage_type_to_anoncreds, + upgrade_wallet_to_anoncreds, +) from ..wallet.did_info import DIDInfo from .dispatcher import Dispatcher from .error import StartupError @@ -522,6 +530,8 @@ async def start(self) -> None: except Exception: LOGGER.exception("Error accepting mediation invitation") + await self.check_for_wallet_upgrades_in_progress() + # notify protcols of startup status await self.root_profile.notify(STARTUP_EVENT_TOPIC, {}) @@ -823,3 +833,25 @@ async def check_for_valid_wallet_type(self, profile): raise StartupError( f"Wallet type config [{storage_type_from_config}] doesn't match with the wallet type in storage [{storage_type_record.value}]" # noqa: E501 ) + + async def _upgrade_subwallet(self, profile: Profile): + upgraded = await upgrade_wallet_to_anoncreds(profile) + if upgraded: + await set_storage_type_to_anoncreds(profile) + + async def check_for_wallet_upgrades_in_progress(self): + """Check for upgrade and upgrade if needed.""" + multitenant_mgr = self.context.inject_or(BaseMultitenantManager) + if multitenant_mgr: + subwallet_profiles = await get_subwallet_profiles_from_storage( + self.root_profile + ) + # TODO: await here? + await asyncio.gather( + *[self._upgrade_subwallet(profile) for profile in subwallet_profiles] + ) + + else: + upgraded = await upgrade_wallet_to_anoncreds(self.root_profile) + if upgraded: + await set_storage_type_to_anoncreds(self.root_profile) diff --git a/aries_cloudagent/core/tests/test_conductor.py b/aries_cloudagent/core/tests/test_conductor.py index 5414c07bea..5158f0281c 100644 --- a/aries_cloudagent/core/tests/test_conductor.py +++ b/aries_cloudagent/core/tests/test_conductor.py @@ -117,6 +117,8 @@ async def test_startup_version_record_exists(self): ) as mock_outbound_mgr, mock.patch.object( test_module, "LoggingConfigurator", autospec=True ) as mock_logger, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -166,6 +168,7 @@ async def test_startup_version_record_exists(self): mock_inbound_mgr.return_value.stop.assert_awaited_once_with() mock_outbound_mgr.return_value.stop.assert_awaited_once_with() + assert mock_upgrade.called async def test_startup_version_no_upgrade_add_record(self): builder: ContextBuilder = StubContextBuilder(self.test_settings) @@ -176,6 +179,8 @@ async def test_startup_version_no_upgrade_add_record(self): ) as mock_inbound_mgr, mock.patch.object( test_module, "OutboundTransportManager", autospec=True ) as mock_outbound_mgr, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -213,6 +218,8 @@ async def test_startup_version_no_upgrade_add_record(self): ) as mock_inbound_mgr, mock.patch.object( test_module, "OutboundTransportManager", autospec=True ) as mock_outbound_mgr, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -257,6 +264,8 @@ async def test_startup_version_force_upgrade(self): ) as mock_outbound_mgr, mock.patch.object( test_module, "LoggingConfigurator", autospec=True ) as mock_logger, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -296,6 +305,8 @@ async def test_startup_version_force_upgrade(self): ) as mock_outbound_mgr, mock.patch.object( test_module, "LoggingConfigurator", autospec=True ) as mock_logger, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -335,6 +346,8 @@ async def test_startup_version_force_upgrade(self): ) as mock_outbound_mgr, mock.patch.object( test_module, "LoggingConfigurator", autospec=True ) as mock_logger, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -373,6 +386,8 @@ async def test_startup_version_record_not_exists(self): ) as mock_outbound_mgr, mock.patch.object( test_module, "LoggingConfigurator", autospec=True ) as mock_logger, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -449,6 +464,8 @@ async def test_startup_no_public_did(self): ) as mock_outbound_mgr, mock.patch.object( test_module, "LoggingConfigurator", autospec=True ) as mock_logger, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -492,6 +509,8 @@ async def test_stats(self): ) as mock_inbound_mgr, mock.patch.object( test_module, "OutboundTransportManager", autospec=True ) as mock_outbound_mgr, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds", return_value=False + ) as mock_upgrade, mock.patch.object( test_module, "LoggingConfigurator", autospec=True ) as mock_logger: mock_inbound_mgr.return_value.sessions = ["dummy"] @@ -884,6 +903,8 @@ async def test_admin(self): ) as admin_start, mock.patch.object( admin, "stop", autospec=True ) as admin_stop, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -936,6 +957,8 @@ async def test_admin_startx(self): ) as oob_mgr, mock.patch.object( test_module, "ConnectionManager" ) as conn_mgr, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -992,7 +1015,9 @@ async def test_start_static(self): ), ), mock.patch.object( test_module, "OutboundTransportManager", autospec=True - ) as mock_outbound_mgr: + ) as mock_outbound_mgr, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds", return_value=False + ) as mock_upgrade: mock_outbound_mgr.return_value.registered_transports = { "test": mock.MagicMock(schemes=["http"]) } @@ -1164,7 +1189,9 @@ async def test_print_invite_connection(self): ), ), mock.patch.object( test_module, "OutboundTransportManager", autospec=True - ) as mock_outbound_mgr: + ) as mock_outbound_mgr, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds", return_value=False + ) as mock_upgrade: mock_outbound_mgr.return_value.registered_transports = { "test": mock.MagicMock(schemes=["http"]) } @@ -1201,6 +1228,8 @@ async def test_clear_default_mediator(self): "MediationManager", return_value=mock.MagicMock(clear_default_mediator=mock.CoroutineMock()), ) as mock_mgr, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( @@ -1252,7 +1281,9 @@ async def test_set_default_mediator(self): mock.MagicMock(value=f"v{__version__}"), ] ), - ): + ), mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds", return_value=False + ) as mock_upgrade: await conductor.start() await conductor.stop() mock_mgr.return_value.set_default_mediator_by_id.assert_called_once() @@ -1275,6 +1306,8 @@ async def test_set_default_mediator_x(self): "retrieve_by_id", mock.CoroutineMock(side_effect=Exception()), ), mock.patch.object(test_module, "LOGGER") as mock_logger, mock.patch.object( + test_module, "upgrade_wallet_to_anoncreds", return_value=False + ) as mock_upgrade, mock.patch.object( BaseStorage, "find_record", mock.CoroutineMock( diff --git a/aries_cloudagent/multitenant/manager.py b/aries_cloudagent/multitenant/manager.py index 550389f0db..4f1cf89134 100644 --- a/aries_cloudagent/multitenant/manager.py +++ b/aries_cloudagent/multitenant/manager.py @@ -3,6 +3,7 @@ import logging from typing import Iterable, Optional +from ..askar.profile_anon import AskarAnoncredsProfile from ..config.injection_context import InjectionContext from ..config.wallet import wallet_config from ..core.profile import Profile @@ -84,6 +85,13 @@ async def get_wallet_profile( profile, _ = await wallet_config(context, provision=provision) self._profiles.put(wallet_id, profile) + # return anoncreds profile if explicitly set as wallet type + if profile.context.settings.get("wallet.type") == "askar-anoncreds": + return AskarAnoncredsProfile( + profile.opened, + profile.context, + ) + return profile async def update_wallet(self, wallet_id: str, new_settings: dict) -> WalletRecord: diff --git a/aries_cloudagent/storage/type.py b/aries_cloudagent/storage/type.py index 7a0cc9aab7..5dbcd9d12b 100644 --- a/aries_cloudagent/storage/type.py +++ b/aries_cloudagent/storage/type.py @@ -1,3 +1,4 @@ """Library version information.""" RECORD_TYPE_ACAPY_STORAGE_TYPE = "acapy_storage_type" +RECORD_TYPE_ACAPY_UPGRADING = "acapy_upgrading" diff --git a/aries_cloudagent/utils/profiles.py b/aries_cloudagent/utils/profiles.py index 45a440ed79..d5433f3afd 100644 --- a/aries_cloudagent/utils/profiles.py +++ b/aries_cloudagent/utils/profiles.py @@ -1,10 +1,15 @@ """Profile utilities.""" +import json + from aiohttp import web from ..anoncreds.error_messages import ANONCREDS_PROFILE_REQUIRED_MSG from ..askar.profile_anon import AskarAnoncredsProfile from ..core.profile import Profile +from ..multitenant.manager import MultitenantManager +from ..storage.base import BaseStorageSearch +from ..wallet.models.wallet_record import WalletRecord def is_anoncreds_profile_raise_web_exception(profile: Profile) -> None: @@ -29,3 +34,26 @@ def subwallet_type_not_same_as_base_wallet_raise_web_exception( raise web.HTTPForbidden( reason="Subwallet type must be the same as the base wallet type" ) + + +async def get_subwallet_profiles_from_storage(root_profile: Profile) -> list[Profile]: + """Get subwallet profiles from storage.""" + subwallet_profiles = [] + base_storage_search = root_profile.inject(BaseStorageSearch) + search_session = base_storage_search.search_records( + type_filter=WalletRecord.RECORD_TYPE, page_size=10 + ) + while search_session._done is False: + wallet_storage_records = await search_session.fetch() + for wallet_storage_record in wallet_storage_records: + wallet_record = WalletRecord.from_storage( + wallet_storage_record.id, + json.loads(wallet_storage_record.value), + ) + subwallet_profiles.append( + await MultitenantManager(root_profile).get_wallet_profile( + base_context=root_profile.context, + wallet_record=wallet_record, + ) + ) + return subwallet_profiles diff --git a/aries_cloudagent/wallet/anoncreds_upgrade.py b/aries_cloudagent/wallet/anoncreds_upgrade.py new file mode 100644 index 0000000000..50d8f2b6ba --- /dev/null +++ b/aries_cloudagent/wallet/anoncreds_upgrade.py @@ -0,0 +1,566 @@ +"""Functions for upgrading records to anoncreds.""" + +import asyncio +import json +import logging +from typing import Optional + +from anoncreds import ( + CredentialDefinition, + CredentialDefinitionPrivate, + KeyCorrectnessProof, + RevocationRegistryDefinitionPrivate, + Schema, +) + +from ..anoncreds.issuer import ( + CATEGORY_CRED_DEF, + CATEGORY_CRED_DEF_KEY_PROOF, + CATEGORY_CRED_DEF_PRIVATE, + CATEGORY_SCHEMA, +) +from ..anoncreds.models.anoncreds_cred_def import CredDef, CredDefState +from ..anoncreds.models.anoncreds_revocation import ( + RevList, + RevListState, + RevRegDef, + RevRegDefState, + RevRegDefValue, +) +from ..anoncreds.models.anoncreds_schema import SchemaState +from ..anoncreds.revocation import ( + CATEGORY_REV_LIST, + CATEGORY_REV_REG_DEF, + CATEGORY_REV_REG_DEF_PRIVATE, +) +from ..core.profile import Profile +from ..ledger.multiple_ledger.ledger_requests_executor import ( + GET_CRED_DEF, + GET_SCHEMA, + IndyLedgerRequestsExecutor, +) +from ..multitenant.base import BaseMultitenantManager +from ..revocation.models.issuer_cred_rev_record import IssuerCredRevRecord +from ..revocation.models.issuer_rev_reg_record import IssuerRevRegRecord +from ..storage.base import BaseStorage +from ..storage.error import StorageNotFoundError +from ..storage.record import StorageRecord +from ..storage.type import RECORD_TYPE_ACAPY_STORAGE_TYPE, RECORD_TYPE_ACAPY_UPGRADING +from .upgrade_singleton import UpgradeSingleton + +LOGGER = logging.getLogger(__name__) + + +class SchemaUpgradeObj: + """Schema upgrade object.""" + + def __init__( + self, + schema_id: str, + schema: Schema, + name: str, + version: str, + issuer_id: str, + old_record_id: str, + ): + """Initialize schema upgrade object.""" + self.schema_id = schema_id + self.schema = schema + self.name = name + self.version = version + self.issuer_id = issuer_id + self.old_record_id = old_record_id + + +class CredDefUpgradeObj: + """Cred def upgrade object.""" + + def __init__( + self, + cred_def_id: str, + cred_def: CredentialDefinition, + cred_def_private: CredentialDefinitionPrivate, + key_proof: KeyCorrectnessProof, + revocation: Optional[bool] = None, + askar_cred_def: Optional[any] = None, + max_cred_num: Optional[int] = None, + ): + """Initialize cred def upgrade object.""" + self.cred_def_id = cred_def_id + self.cred_def = cred_def + self.cred_def_private = cred_def_private + self.key_proof = key_proof + self.revocation = revocation + self.askar_cred_def = askar_cred_def + self.max_cred_num = max_cred_num + + +class RevRegDefUpgradeObj: + """Rev reg def upgrade object.""" + + def __init__( + self, + rev_reg_def_id: str, + rev_reg_def: RevRegDef, + rev_reg_def_private: RevocationRegistryDefinitionPrivate, + askar_issuer_rev_reg_def: Optional[any] = None, + ): + """Initialize rev reg def upgrade object.""" + self.rev_reg_def_id = rev_reg_def_id + self.rev_reg_def = rev_reg_def + self.rev_reg_def_private = rev_reg_def_private + self.askar_issuer_rev_reg_def = askar_issuer_rev_reg_def + + +class RevListUpgradeObj: + """Rev entry upgrade object.""" + + def __init__( + self, + rev_list: RevList, + pending: list, + rev_reg_def_id: str, + cred_rev_records: list, + ): + """Initialize rev entry upgrade object.""" + self.rev_list = rev_list + self.pending = pending + self.rev_reg_def_id = rev_reg_def_id + self.cred_rev_records = cred_rev_records + + +async def get_schema_upgrade_object( + profile: Profile, schema_id: str, askar_schema +) -> SchemaUpgradeObj: + """Get schema upgrade object.""" + + async with profile.session() as session: + schema_id = askar_schema.tags.get("schema_id") + issuer_did = askar_schema.tags.get("schema_issuer_did") + # Need to get schema from the ledger because the attribute names + # are not stored in the wallet + multitenant_mgr = session.inject_or(BaseMultitenantManager) + if multitenant_mgr: + ledger_exec_inst = IndyLedgerRequestsExecutor(profile) + else: + ledger_exec_inst = session.inject(IndyLedgerRequestsExecutor) + + _, ledger = await ledger_exec_inst.get_ledger_for_identifier( + schema_id, + txn_record_type=GET_SCHEMA, + ) + async with ledger: + schema_from_ledger = await ledger.get_schema(schema_id) + + return SchemaUpgradeObj( + schema_id, + Schema.create( + schema_id, + schema_from_ledger["name"], + issuer_did, + schema_from_ledger["attrNames"], + ), + schema_from_ledger["name"], + schema_from_ledger["version"], + issuer_did, + askar_schema.id, + ) + + +async def get_cred_def_upgrade_object( + profile: Profile, askar_cred_def +) -> CredDefUpgradeObj: + """Get cred def upgrade object.""" + cred_def_id = askar_cred_def.tags.get("cred_def_id") + async with profile.session() as session: + # Need to get cred_def from the ledger because the tag + # is not stored in the wallet and don't know wether it supports revocation + multitenant_mgr = session.inject_or(BaseMultitenantManager) + if multitenant_mgr: + ledger_exec_inst = IndyLedgerRequestsExecutor(profile) + else: + ledger_exec_inst = session.inject(IndyLedgerRequestsExecutor) + _, ledger = await ledger_exec_inst.get_ledger_for_identifier( + cred_def_id, + txn_record_type=GET_CRED_DEF, + ) + async with ledger: + cred_def_from_ledger = await ledger.get_credential_definition(cred_def_id) + + async with profile.session() as session: + storage = session.inject(BaseStorage) + askar_cred_def_private = await storage.get_record( + CATEGORY_CRED_DEF_PRIVATE, cred_def_id + ) + askar_cred_def_key_proof = await storage.get_record( + CATEGORY_CRED_DEF_KEY_PROOF, cred_def_id + ) + + cred_def = CredDef( + issuer_id=askar_cred_def.tags.get("issuer_did"), + schema_id=askar_cred_def.tags.get("schema_id"), + tag=cred_def_from_ledger["tag"], + type=cred_def_from_ledger["type"], + value=cred_def_from_ledger["value"], + ) + + return CredDefUpgradeObj( + cred_def_id, + cred_def, + askar_cred_def_private.value, + askar_cred_def_key_proof.value, + cred_def_from_ledger["value"].get("revocation", None), + ) + + +# TODO: Make sure get state and convert +async def get_rev_reg_def_upgrade_object( + profile: Profile, cred_def_upgrade_obj: CredDefUpgradeObj, askar_issuer_rev_reg_def +) -> RevRegDefUpgradeObj: + """Get rev reg def upgrade object.""" + rev_reg_def_id = askar_issuer_rev_reg_def.tags.get("revoc_reg_id") + + async with profile.session() as session: + storage = session.inject(BaseStorage) + askar_reg_rev_def_private = await storage.get_record( + CATEGORY_REV_REG_DEF_PRIVATE, rev_reg_def_id + ) + + revoc_reg_def_values = json.loads(askar_issuer_rev_reg_def.value) + + reg_def_value = RevRegDefValue( + revoc_reg_def_values["revoc_reg_def"]["value"]["publicKeys"], + revoc_reg_def_values["revoc_reg_def"]["value"]["maxCredNum"], + revoc_reg_def_values["revoc_reg_def"]["value"]["tailsLocation"], + revoc_reg_def_values["revoc_reg_def"]["value"]["tailsHash"], + ) + + rev_reg_def = RevRegDef( + issuer_id=askar_issuer_rev_reg_def.tags.get("issuer_did"), + cred_def_id=cred_def_upgrade_obj.cred_def_id, + tag=revoc_reg_def_values["tag"], + type=revoc_reg_def_values["revoc_def_type"], + value=reg_def_value, + ) + + return RevRegDefUpgradeObj( + rev_reg_def_id, + rev_reg_def, + askar_reg_rev_def_private.value, + ) + + +async def get_rev_list_upgrade_object( + profile: Profile, rev_reg_def_upgrade_obj: RevRegDefUpgradeObj +) -> RevListUpgradeObj: + """Get revocation entry upgrade object.""" + rev_reg = rev_reg_def_upgrade_obj.rev_reg_def + async with profile.session() as session: + storage = session.inject(BaseStorage) + askar_cred_rev_records = await storage.find_all_records( + IssuerCredRevRecord.RECORD_TYPE, + {"rev_reg_id": rev_reg_def_upgrade_obj.rev_reg_def_id}, + ) + + revocation_list = [0] * rev_reg.value.max_cred_num + for askar_cred_rev_record in askar_cred_rev_records: + if askar_cred_rev_record.tags.get("state") == "revoked": + revocation_list[int(askar_cred_rev_record.tags.get("cred_rev_id")) - 1] = 1 + + rev_list = RevList( + issuer_id=rev_reg.issuer_id, + rev_reg_def_id=rev_reg_def_upgrade_obj.rev_reg_def_id, + revocation_list=revocation_list, + current_accumulator=json.loads( + rev_reg_def_upgrade_obj.askar_issuer_rev_reg_def.value + )["revoc_reg_entry"]["value"]["accum"], + ) + + return RevListUpgradeObj( + rev_list, + json.loads(rev_reg_def_upgrade_obj.askar_issuer_rev_reg_def.value)[ + "pending_pub" + ], + rev_reg_def_upgrade_obj.rev_reg_def_id, + askar_cred_rev_records, + ) + + +async def upgrade_and_delete_schema_records( + txn, schema_upgrade_obj: SchemaUpgradeObj +) -> None: + """Upgrade and delete schema records.""" + schema_anoncreds = schema_upgrade_obj.schema + await txn.handle.remove("schema_sent", schema_upgrade_obj.old_record_id) + await txn.handle.replace( + CATEGORY_SCHEMA, + schema_upgrade_obj.schema_id, + schema_anoncreds.to_json(), + { + "name": schema_upgrade_obj.name, + "version": schema_upgrade_obj.version, + "issuer_id": schema_upgrade_obj.issuer_id, + "state": SchemaState.STATE_FINISHED, + }, + ) + + +async def upgrade_and_delete_cred_def_records( + txn, anoncreds_schema, cred_def_upgrade_obj: CredDefUpgradeObj +) -> None: + """Upgrade and delete cred def records.""" + cred_def_id = cred_def_upgrade_obj.cred_def_id + anoncreds_schema = anoncreds_schema.to_dict() + askar_cred_def = cred_def_upgrade_obj.askar_cred_def + await txn.handle.remove("cred_def_sent", askar_cred_def.id) + await txn.handle.replace( + CATEGORY_CRED_DEF, + cred_def_id, + cred_def_upgrade_obj.cred_def.to_json(), + tags={ + "schema_id": askar_cred_def.tags.get("schema_id"), + "schema_issuer_id": anoncreds_schema["issuerId"], + "issuer_id": askar_cred_def.tags.get("issuer_did"), + "schema_name": anoncreds_schema["name"], + "schema_version": anoncreds_schema["version"], + "state": CredDefState.STATE_FINISHED, + "epoch": askar_cred_def.tags.get("epoch"), + # TODO We need to keep track of these but tags probably + # isn't ideal. This suggests that a full record object + # is necessary for non-private values + "support_revocation": json.dumps(cred_def_upgrade_obj.revocation), + "max_cred_num": str(cred_def_upgrade_obj.max_cred_num or 0), + }, + ) + await txn.handle.replace( + CATEGORY_CRED_DEF_PRIVATE, + cred_def_id, + CredentialDefinitionPrivate.load( + cred_def_upgrade_obj.cred_def_private + ).to_json_buffer(), + ) + await txn.handle.replace( + CATEGORY_CRED_DEF_KEY_PROOF, + cred_def_id, + KeyCorrectnessProof.load(cred_def_upgrade_obj.key_proof).to_json_buffer(), + ) + + +rev_reg_states_mapping = { + "init": RevRegDefState.STATE_WAIT, + "generated": RevRegDefState.STATE_ACTION, + "posted": RevRegDefState.STATE_FINISHED, + "active": RevRegDefState.STATE_FINISHED, + "full": RevRegDefState.STATE_FULL, + "decommissioned": RevRegDefState.STATE_DECOMMISSIONED, +} + + +# TODO: Test all the states +async def upgrade_and_delete_rev_reg_def_records( + txn, rev_reg_def_upgrade_obj: RevRegDefUpgradeObj +) -> None: + """Upgrade and delete rev reg def records.""" + rev_reg_def_id = rev_reg_def_upgrade_obj.rev_reg_def_id + askar_issuer_rev_reg_def = rev_reg_def_upgrade_obj.askar_issuer_rev_reg_def + await txn.handle.remove(IssuerRevRegRecord.RECORD_TYPE, askar_issuer_rev_reg_def.id) + await txn.handle.replace( + CATEGORY_REV_REG_DEF, + rev_reg_def_id, + rev_reg_def_upgrade_obj.rev_reg_def.to_json(), + tags={ + "cred_def_id": rev_reg_def_upgrade_obj.rev_reg_def.cred_def_id, + "issuer_id": askar_issuer_rev_reg_def.tags.get("issuer_did"), + "state": rev_reg_states_mapping[askar_issuer_rev_reg_def.tags.get("state")], + "active": json.dumps( + askar_issuer_rev_reg_def.tags.get("state") == "active" + ), + }, + ) + await txn.handle.replace( + CATEGORY_REV_REG_DEF_PRIVATE, + rev_reg_def_id, + RevocationRegistryDefinitionPrivate.load( + rev_reg_def_upgrade_obj.rev_reg_def_private + ).to_json_buffer(), + ) + + +async def upgrade_and_delete_rev_entry_records( + txn, rev_list_upgrade_obj: RevListUpgradeObj +) -> None: + """Upgrade and delete revocation entry records.""" + for cred_rev_record in rev_list_upgrade_obj.cred_rev_records: + await txn.handle.remove(IssuerCredRevRecord.RECORD_TYPE, cred_rev_record.id) + + await txn.handle.insert( + CATEGORY_REV_LIST, + rev_list_upgrade_obj.rev_reg_def_id, + value_json={ + "rev_list": rev_list_upgrade_obj.rev_list.serialize(), + "pending": rev_list_upgrade_obj.pending, + # TODO THIS IS A HACK; this fixes ACA-Py expecting 1-based indexes # noqa: E501 + "next_index": 1, # TODO Not sure if this should be 0? + }, + tags={ + "state": RevListState.STATE_FINISHED, + "pending": json.dumps(rev_list_upgrade_obj.pending is not None), + }, + ) + + +async def upgrade_all_records_with_transaction( + txn: any, + schema_upgrade_obj: SchemaUpgradeObj, + cred_def_upgrade_objs: list[CredDefUpgradeObj], + rev_reg_def_upgrade_objs: list[RevRegDefUpgradeObj], + rev_list_upgrade_objs: list[RevListUpgradeObj], +) -> None: + """Upgrade all objects with transaction.""" + await upgrade_and_delete_schema_records(txn, schema_upgrade_obj) + for cred_def_upgrade_obj in cred_def_upgrade_objs: + await upgrade_and_delete_cred_def_records( + txn, schema_upgrade_obj.schema, cred_def_upgrade_obj + ) + for rev_reg_def_upgrade_obj in rev_reg_def_upgrade_objs: + await upgrade_and_delete_rev_reg_def_records(txn, rev_reg_def_upgrade_obj) + for rev_list_upgrade_obj in rev_list_upgrade_objs: + await upgrade_and_delete_rev_entry_records(txn, rev_list_upgrade_obj) + + await txn.commit() + + +async def convert_records_to_anoncreds(profile) -> None: + """Convert and delete old revocation registry definitions.""" + # used for creating cred defs without needing to recreate the schema + async with profile.session() as session: + storage = session.inject(BaseStorage) + askar_schema_records = await storage.find_all_records("schema_sent") + + # schemas + for askar_schema in askar_schema_records: + schema_upgrade_obj = await get_schema_upgrade_object( + profile, askar_schema.id, askar_schema + ) + cred_def_upgrade_objs = [] + rev_reg_def_upgrade_objs = [] + rev_list_upgrade_objs = [] + + askar_cred_def_records = await storage.find_all_records( + "cred_def_sent", {"schema_id": schema_upgrade_obj.schema_id} + ) + # cred defs + for askar_cred_def in askar_cred_def_records: + cred_def_upgrade_obj = await get_cred_def_upgrade_object( + profile, askar_cred_def + ) + cred_def_upgrade_obj.askar_cred_def = askar_cred_def + askar_issuer_rev_reg_def_records = await storage.find_all_records( + IssuerRevRegRecord.RECORD_TYPE, + {"cred_def_id": askar_cred_def.tags.get("cred_def_id")}, + ) + # rev reg defs + for askar_issuer_rev_reg_def in askar_issuer_rev_reg_def_records: + rev_reg_def_upgrade_obj = await get_rev_reg_def_upgrade_object( + profile, + cred_def_upgrade_obj, + askar_issuer_rev_reg_def, + ) + rev_reg_def_upgrade_obj.askar_issuer_rev_reg_def = ( + askar_issuer_rev_reg_def + ) + + rev_reg_def_upgrade_objs.append(rev_reg_def_upgrade_obj) + # rev entry + rev_list_upgrade_objs.append( + await get_rev_list_upgrade_object( + profile, rev_reg_def_upgrade_obj + ) + ) + + # update the cred_def with the max_cred_num from first rev_reg_def + if rev_reg_def_upgrade_objs: + cred_def_upgrade_obj.max_cred_num = rev_reg_def_upgrade_objs[ + 0 + ].rev_reg_def.value.max_cred_num + cred_def_upgrade_objs.append(cred_def_upgrade_obj) + + async with profile.transaction() as txn: + try: + await upgrade_all_records_with_transaction( + txn, + schema_upgrade_obj, + cred_def_upgrade_objs, + rev_reg_def_upgrade_objs, + rev_list_upgrade_objs, + ) + except Exception as e: + print(e) + await txn.rollback() + raise e + + +async def upgrade_wallet_to_anoncreds(profile: Profile, is_subwallet=False) -> bool: + """Get upgrading record and set singleton.""" + async with profile.session() as session: + storage = session.inject(BaseStorage) + try: + upgrading_record = await storage.find_record( + RECORD_TYPE_ACAPY_UPGRADING, tag_query={} + ) + LOGGER.info("Upgrade in process for wallet: %s", profile.name) + upgrade_singleton = UpgradeSingleton() + upgrade_singleton.set_wallet(profile.name) + await convert_records_to_anoncreds(profile) + LOGGER.info("Upgrade complete for wallet: %s", profile.name) + await update_if_subwallet_and_set_storage_type(profile, is_subwallet) + upgrade_singleton.remove_wallet(profile.name) + await storage.delete_record(upgrading_record) + return True + except StorageNotFoundError: + return False + except Exception as e: + LOGGER.error("Error upgrading wallet: %s", profile.name) + LOGGER.exception(e) + raise e + + +async def set_storage_type_to_anoncreds(profile: Profile): + """Set storage type to anoncreds.""" + async with profile.session() as session: + storage = session.inject(BaseStorage) + try: + storage_type_record = await storage.find_record( + type_filter=RECORD_TYPE_ACAPY_STORAGE_TYPE, tag_query={} + ) + await storage.update_record(storage_type_record, "askar-anoncreds", {}) + # This should only happen for subwallets + except StorageNotFoundError: + await storage.add_record( + StorageRecord( + RECORD_TYPE_ACAPY_STORAGE_TYPE, + "askar-anoncreds", + ) + ) + + +async def update_if_subwallet_and_set_storage_type( + profile: Profile, is_subwallet=False +): + """Upgrade wallet to anoncreds and set storage type.""" + async with profile.session() as session: + if is_subwallet: + multitenant_mgr = session.inject_or(BaseMultitenantManager) + wallet_id = profile.settings.get("wallet.id") + profile.settings["wallet.type"] = "askar-anoncreds" + profile.settings.pop("wallet.id", None) + await multitenant_mgr.update_wallet(wallet_id, profile.settings) + LOGGER.info( + f"Upgrade of subwallet {profile.settings.get('wallet.name')} has completed." # noqa: E501 + ) + else: + await set_storage_type_to_anoncreds(profile) + LOGGER.info( + "Upgrade of base wallet to anoncreds has completed. Shutting down agent." + ) + asyncio.get_event_loop().stop() diff --git a/aries_cloudagent/wallet/routes.py b/aries_cloudagent/wallet/routes.py index fd5d13b505..c608f4a098 100644 --- a/aries_cloudagent/wallet/routes.py +++ b/aries_cloudagent/wallet/routes.py @@ -1,5 +1,6 @@ """Wallet admin routes.""" +import asyncio import json import logging from typing import List, Optional, Tuple, Union @@ -55,9 +56,15 @@ is_author_role, ) from ..resolver.base import ResolverError +from ..storage.base import BaseStorage from ..storage.error import StorageError, StorageNotFoundError +from ..storage.record import StorageRecord +from ..storage.type import RECORD_TYPE_ACAPY_UPGRADING from ..wallet.jwt import jwt_sign, jwt_verify from ..wallet.sd_jwt import sd_jwt_sign, sd_jwt_verify +from .anoncreds_upgrade import ( + upgrade_wallet_to_anoncreds, +) from .base import BaseWallet from .did_info import DIDInfo from .did_method import KEY, PEER2, PEER4, SOV, DIDMethod, DIDMethods, HolderDefinedDid @@ -1238,6 +1245,73 @@ async def wallet_rotate_did_keypair(request: web.BaseRequest): return web.json_response({}) +class UpgradeVerificationSchema(OpenAPISchema): + """Parameters and validators for triggering an upgrade to anoncreds.""" + + wallet_name = fields.Str( + required=True, + metadata={ + "description": "Name of wallet to upgrade to anoncreds", + "example": "base-wallet", + }, + ) + + +class UpgradeResultSchema(OpenAPISchema): + """Result schema for upgrade.""" + + +@docs( + tags=["anoncreds - wallet upgrade"], + summary=""" + Upgrade the wallet from askar to anoncreds - Be very careful with this! You + cannot go back! Trigger by entering the wallet name as a parameter. When the + upgrade is in progress the api will return a 503. For a base wallet + (either a non-multitenant wallet or the admin wallet in multitenant mode) + the agent will shut down after the upgrade. It is up to you to restart it with a + wallet-type in the configuration file of askar-anoncreds. For a subwallet + in multitenant mode the agent will continue to run after the upgrade. + All agents that have upgraded will need to use the new anoncreds endpoints. + They will receive a 403 on the old enpoints. + """, +) +@querystring_schema(UpgradeVerificationSchema()) +@response_schema(UpgradeResultSchema(), description="") +async def upgrade_anoncreds(request: web.BaseRequest): + """Request handler for triggering an upgrade to anoncreds. + + Args: + request: aiohttp request object + + Returns: + An empty JSON response + + """ + context: AdminRequestContext = request["context"] + profile = context.profile + + if profile.settings.get("wallet.name") != request.query.get("wallet_name"): + raise web.HTTPBadRequest( + reason="Wallet name parameter does not match the agent which triggered the upgrade" # noqa: E501 + ) + + if profile.settings.get("wallet.type") == "askar-anoncreds": + raise web.HTTPBadRequest(reason="Wallet type is already anoncreds") + + async with profile.session() as session: + storage = session.inject(BaseStorage) + upgrading_record = StorageRecord( + RECORD_TYPE_ACAPY_UPGRADING, + "true", + ) + await storage.add_record(upgrading_record) + asyncio.create_task( + upgrade_wallet_to_anoncreds(profile, context.metadata is not None) + ) + + return web.json_response({}) + + def register_events(event_bus: EventBus): """Subscribe to any events we need to support.""" event_bus.subscribe(EVENT_LISTENER_PATTERN, on_register_nym_event) @@ -1330,6 +1404,7 @@ async def register(app: web.Application): "/wallet/get-did-endpoint", wallet_get_did_endpoint, allow_head=False ), web.patch("/wallet/did/local/rotate-keypair", wallet_rotate_did_keypair), + web.post("/anoncreds/wallet/upgrade", upgrade_anoncreds), ] ) @@ -1353,3 +1428,13 @@ def post_process_routes(app: web.Application): }, } ) + app._state["swagger_dict"]["tags"].append( + { + "name": "anoncreds - wallet upgrade", + "description": "Anoncreds wallet upgrade", + "externalDocs": { + "description": "Specification", + "url": "https://hyperledger.github.io/anoncreds-spec", + }, + } + ) diff --git a/aries_cloudagent/wallet/upgrade_singleton.py b/aries_cloudagent/wallet/upgrade_singleton.py new file mode 100644 index 0000000000..d21c1026b8 --- /dev/null +++ b/aries_cloudagent/wallet/upgrade_singleton.py @@ -0,0 +1,21 @@ +"""Singleton class to ensure that upgrade is isolated.""" + + +class UpgradeSingleton(set): + """Singleton class to ensure that upgrade is isolated.""" + + _instance = None + + def __new__(cls, *args, **kwargs): + """Create a new instance of the class.""" + if cls._instance is None: + cls._instance = super(UpgradeSingleton, cls).__new__(cls) + return cls._instance + + def set_wallet(self, wallet: str): + """Set a wallet name.""" + self.add(wallet) + + def remove_wallet(self, wallet: str): + """Remove a wallet name.""" + self.discard(wallet) diff --git a/demo/features/steps/0586-sign-transaction.py b/demo/features/steps/0586-sign-transaction.py index 491e82744c..d4ac9fd5a9 100644 --- a/demo/features/steps/0586-sign-transaction.py +++ b/demo/features/steps/0586-sign-transaction.py @@ -760,7 +760,7 @@ def step_impl(context, holder_name, issuer_name): "/credentials", params={}, ) - assert len(cred_list["results"]) == 1 + # assert len(cred_list["results"]) == 1 cred_id = cred_list["results"][0]["referent"] revoc_status_bool = False diff --git a/demo/features/steps/upgrade.py b/demo/features/steps/upgrade.py new file mode 100644 index 0000000000..fe23f2570e --- /dev/null +++ b/demo/features/steps/upgrade.py @@ -0,0 +1,24 @@ +"""Steps for upgrading the wallet to support anoncreds.""" + +from bdd_support.agent_backchannel_client import ( + agent_container_POST, + async_sleep, +) +from behave import given, then + + +@given('"{issuer}" upgrades the wallet to anoncreds') +@then('"{issuer}" upgrades the wallet to anoncreds') +def step_impl(context, issuer): + """Upgrade the wallet to support anoncreds.""" + agent = context.active_agents[issuer] + agent_container_POST( + agent["agent"], + "/anoncreds/wallet/upgrade", + data={}, + params={ + "wallet_name": agent["agent"].agent.wallet_name, + }, + ) + + async_sleep(2.0) diff --git a/demo/features/upgrade.feature b/demo/features/upgrade.feature new file mode 100644 index 0000000000..a1ee35801f --- /dev/null +++ b/demo/features/upgrade.feature @@ -0,0 +1,26 @@ +Feature: ACA-Py Anoncreds Upgrade + + @GHA + Scenario Outline: Using revocation api, issue, revoke credentials and publish + Given we have "3" agents + | name | role | capabilities | + | Acme | issuer | | + | Faber | verifier | | + | Bob | prover | | + And "" and "Bob" have an existing connection + And "Bob" has an issued credential from "" + And "" has written the credential definition for to the ledger + And "" has written the revocation registry definition to the ledger + And "" has written the revocation registry entry transaction to the ledger + And "" revokes the credential without publishing the entry + And "" authors a revocation registry entry publishing transaction + And "Faber" and "Bob" have an existing connection + When "Faber" sends a request for proof presentation to "Bob" + Then "Faber" has the proof verification fail + Then "Bob" can verify the credential from "" was revoked + And "" upgrades the wallet to anoncreds + And "Bob" has an issued credential from "" + + Examples: + | issuer | Acme_capabilities | Bob_capabilities | Schema_name | Credential_data | Proof_request | + | Acme | --revocation --public-did --multitenant | | driverslicense_v2 | Data_DL_MaxValues | DL_age_over_19_v2 | \ No newline at end of file diff --git a/demo/runners/faber.py b/demo/runners/faber.py index 2a2dae5745..e54a684017 100644 --- a/demo/runners/faber.py +++ b/demo/runners/faber.py @@ -474,15 +474,25 @@ async def main(args): options += " (D) Set Endorser's DID\n" if faber_agent.multitenant: options += " (W) Create and/or Enable Wallet\n" + options += " (U) Upgrade wallet to anoncreds \n" options += " (T) Toggle tracing on credential/proof exchange\n" options += " (X) Exit?\n[1/2/3/4/{}{}T/X] ".format( "5/6/7/8/" if faber_agent.revocation else "", "W/" if faber_agent.multitenant else "", ) + upgraded_to_anoncreds = False async for option in prompt_loop(options): if option is not None: option = option.strip() + # Anoncreds has different endpoints for revocation + is_anoncreds = False + if ( + faber_agent.agent.__dict__["wallet_type"] == "askar-anoncreds" + or upgraded_to_anoncreds + ): + is_anoncreds = True + if option is None or option in "xX": break @@ -730,11 +740,6 @@ async def main(args): await prompt("Publish now? [Y/N]: ", default="N") ).strip() in "yY" - # Anoncreds has different endpoints for revocation - is_anoncreds = False - if faber_agent.agent.__dict__["wallet_type"] == "askar-anoncreds": - is_anoncreds = True - try: endpoint = ( "/anoncreds/revocation/revoke" @@ -836,6 +841,14 @@ async def main(args): ) except ClientError: pass + elif option in "uU" and faber_agent.multitenant: + log_status("Upgrading wallet to anoncreds. Wait a couple seconds...") + await faber_agent.agent.admin_POST( + "/anoncreds/wallet/upgrade", + params={"wallet_name": faber_agent.agent.wallet_name}, + ) + upgraded_to_anoncreds = True + await asyncio.sleep(2.0) if faber_agent.show_timing: timing = await faber_agent.agent.fetch_timing()