Skip to content

Commit

Permalink
Remove in_memory upgrade set for multi instance upgrading
Browse files Browse the repository at this point in the history
Signed-off-by: jamshale <[email protected]>
  • Loading branch information
jamshale committed Apr 11, 2024
1 parent 6d92831 commit 51cbe68
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 65 deletions.
19 changes: 12 additions & 7 deletions aries_cloudagent/admin/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@
from ..messaging.responder import BaseResponder
from ..messaging.valid import UUIDFour
from ..multitenant.base import BaseMultitenantManager, MultitenantManagerError
from ..storage.base import BaseStorage
from ..storage.error import StorageNotFoundError
from ..storage.type import RECORD_TYPE_ACAPY_UPGRADING
from ..transport.outbound.message import OutboundMessage
from ..transport.outbound.status import OutboundSendStatus
from ..transport.queue.basic import BasicMessageQueue
from ..utils.stats import Collector
from ..utils.task_queue import TaskQueue
from ..version import __version__
from ..wallet.upgrade_singleton import UpgradeSingleton
from .base_server import BaseAdminServer
from .error import AdminSetupError
from .request_context import AdminRequestContext
Expand All @@ -58,9 +59,6 @@
"acapy::keylist::updated": "keylist",
}

upgrade_singleton = UpgradeSingleton()


class AdminModulesSchema(OpenAPISchema):
"""Schema for the modules endpoint."""

Expand Down Expand Up @@ -212,10 +210,17 @@ async def upgrade_middleware(request: web.BaseRequest, handler: Coroutine):
"""Blocking middleware for upgrades."""
context: AdminRequestContext = request["context"]

if context._profile.name in upgrade_singleton.current_upgrades:
raise web.HTTPServiceUnavailable(reason="Upgrade in progress")
async with context.profile.session() as session:
storage = session.inject(BaseStorage)
try:
await storage.find_record(
RECORD_TYPE_ACAPY_UPGRADING, tag_query={}
)
except StorageNotFoundError:
return await handler(request)

return await handler(request)

raise web.HTTPServiceUnavailable(reason="Upgrade in progress")


@web.middleware
Expand Down
4 changes: 0 additions & 4 deletions aries_cloudagent/admin/tests/test_admin_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from ...core.protocol_registry import ProtocolRegistry
from ...utils.stats import Collector
from ...utils.task_queue import TaskQueue
from ...wallet.upgrade_singleton import UpgradeSingleton
from .. import server as test_module
from ..request_context import AdminRequestContext
from ..server import AdminServer, AdminSetupError
Expand Down Expand Up @@ -480,7 +479,6 @@ async def test_server_health_state(self):
await server.stop()

async def test_upgrade_middleware(self):
upgrade_singleton = UpgradeSingleton()
self.context = AdminRequestContext.test_context(
{}, InMemoryProfile.test_profile()
)
Expand All @@ -497,11 +495,9 @@ async def test_upgrade_middleware(self):

await test_module.upgrade_middleware(request, handler)

upgrade_singleton.set_wallet("test-profile")
with self.assertRaises(test_module.web.HTTPServiceUnavailable):
await test_module.upgrade_middleware(request, handler)

upgrade_singleton.remove_wallet("test-profile")
await test_module.upgrade_middleware(request, handler)


Expand Down
6 changes: 0 additions & 6 deletions aries_cloudagent/wallet/anoncreds_upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,12 @@
from ..storage.error import StorageNotFoundError
from ..storage.record import StorageRecord
from ..storage.type import RECORD_TYPE_ACAPY_STORAGE_TYPE, RECORD_TYPE_ACAPY_UPGRADING
from .upgrade_singleton import UpgradeSingleton

LOGGER = logging.getLogger(__name__)

# Number of times to retry upgrading records
max_retries = 5

upgrade_singleton = UpgradeSingleton()


class SchemaUpgradeObj:
"""Schema upgrade object."""
Expand Down Expand Up @@ -540,7 +537,6 @@ async def retry_converting_records(
async def clear_upgrade():
async with profile.session() as session:
storage = session.inject(BaseStorage)
upgrade_singleton.remove_wallet(profile.name)
await storage.delete_record(upgrading_record)

try:
Expand Down Expand Up @@ -579,12 +575,10 @@ async def upgrade_wallet_to_anoncreds(profile: Profile, is_subwallet=False) -> N

try:
LOGGER.info("Upgrade in process for wallet: %s", profile.name)
upgrade_singleton.set_wallet(profile.name)
await convert_records_to_anoncreds(profile)
await set_storage_type_and_update_profile_if_subwallet(
profile, is_subwallet
)
upgrade_singleton.remove_wallet(profile.name)
await storage.delete_record(upgrading_record)
except Exception as e:
LOGGER.error(f"Error when upgrading wallet {profile.name} : {e} ")
Expand Down
12 changes: 0 additions & 12 deletions aries_cloudagent/wallet/tests/test_anoncreds_upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from ...storage.record import StorageRecord
from ...storage.type import RECORD_TYPE_ACAPY_STORAGE_TYPE, RECORD_TYPE_ACAPY_UPGRADING
from .. import anoncreds_upgrade
from ..upgrade_singleton import UpgradeSingleton


class TestAnoncredsUpgrade(IsolatedAsyncioTestCase):
Expand Down Expand Up @@ -128,12 +127,10 @@ async def test_retry_converting_records(self):
await anoncreds_upgrade.retry_converting_records(
self.profile, upgrading_record, 0
)
upgrade_singleton = UpgradeSingleton()
assert mock_convert_records_to_anoncreds.called
assert mock_convert_records_to_anoncreds.call_count == 3
_, storage_type_record = next(iter(self.profile.records.items()))
assert storage_type_record.value == "askar-anoncreds"
assert not upgrade_singleton.current_upgrades

async def test_upgrade_wallet_to_anoncreds(self):
# upgrading record not present
Expand All @@ -154,9 +151,6 @@ async def test_upgrade_wallet_to_anoncreds(self):
_, storage_type_record = next(iter(self.profile.records.items()))
assert storage_type_record.value == "askar-anoncreds"

upgrade_singleton = UpgradeSingleton()
assert not upgrade_singleton.current_upgrades

# retry called on exception
with mock.patch.object(
anoncreds_upgrade,
Expand Down Expand Up @@ -339,9 +333,6 @@ async def test_failed_upgrade(self):
)
# Storage type should not be updated
assert storage_type_record.value == "askar"
# Upgrade singleton should be empty
upgrade_singleton = UpgradeSingleton()
assert upgrade_singleton.current_upgrades.__len__() == 0

# Cred_defs fails to upgrade
anoncreds_upgrade.upgrade_and_delete_cred_def_records = (
Expand Down Expand Up @@ -369,6 +360,3 @@ async def test_failed_upgrade(self):
)
# Storage type should not be updated
assert storage_type_record.value == "askar"
# Upgrade singleton should be empty
upgrade_singleton = UpgradeSingleton()
assert upgrade_singleton.current_upgrades.__len__() == 0
22 changes: 0 additions & 22 deletions aries_cloudagent/wallet/upgrade_singleton.py

This file was deleted.

24 changes: 10 additions & 14 deletions docs/design/UpgradeViaApi.md
Original file line number Diff line number Diff line change
@@ -1,50 +1,46 @@
# Upgrade via API Design

To isolate an upgrade process and trigger it via API the following pattern was designed to handle multitenant scenarios. It includes a per instance memory singleton and an is_upgrading record in the wallet(DB) and a middleware to prevent requests during the upgrade process.
#### To isolate an upgrade process and trigger it via API the following pattern was designed to handle multitenant scenarios. It includes an is_upgrading record in the wallet(DB) and a middleware to prevent requests during the upgrade process.

```mermaid
sequenceDiagram
participant A as Agent
participant M as Middleware
participant S as Singleton
participant W as Wallet (DB)
Note over A: Start upgrade
A->>M: POST /any-upgrade-path
M-->>S: check wallet name
S-->>M:
M-->>W: check is_upgrading
W-->>M:
M->>A: OK
A-->>S: add wallet name to set
A-->>W: update is_upgrading = true for wallet or subwallet
Note over A: Attempted Request
A->>M: GET /any-endpoint
M-->>S: check wallet name
S-->>M:
M-->>W: check is_upgrading
W-->>M:
M->>A: 503 Service Unavailable
Note over A: Agent Restart
A-->>W: Get is_upgrading record for wallet or all subwallets
W-->>A:
A-->>S: Populate set with wallet names
Note over A: Attempted Request
A->>M: GET /any-endpoint
M-->>S: check wallet name
S-->>M:
M-->>W: check is_upgrading
W-->>M:
M->>A: 503 Service Unavailable
Note over A: End upgrade
A-->>S: Remove wallet name from set
A-->>W: delete is_upgrading record for wallet
Note over A: Attempted Request
A->>M: GET /any-endpoint
M-->>S: check wallet name
S-->>M:
M-->>W: check is_upgrading
W-->>M:
M->>A: OK
```

#### To use this mehanism you simply need to set the upgrading record in the wallet (DB) and add the wallet name to the singleton set. The middleware will prevent requests from being processed until the upgrade process is finished. After the upgrade process is finished you must remove the wallet name from the set and delete the upgrading record in the wallet (DB).
#### To use this mehanism you simply need to set the upgrading record in the wallet (DB). The middleware will prevent requests from being processed until the upgrade process is finished. After the upgrade process is finished you must remove the upgrading record in the wallet (DB).

##### An example can be found via the anoncreds upgrade `aries_cloudagent/wallet/routes.py` in the `upgrade_anoncreds` controller.

0 comments on commit 51cbe68

Please sign in to comment.