Skip to content

Commit

Permalink
Fix confirmation timeouts (#260)
Browse files Browse the repository at this point in the history
* Fix confirmation timeouts

Reduced max timeout from 1200 to 300s
Set maximum retry interval to 30s
handles merkleLog confirmation correctly

AB#9103
  • Loading branch information
eccles authored Apr 8, 2024
1 parent 4cf032c commit bf471e6
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 19 deletions.
33 changes: 20 additions & 13 deletions archivist/confirmer.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from .errors import ArchivistUnconfirmedError
from .utils import backoff_handler

MAX_TIME = 1200
MAX_TIME = 300
LOGGER = getLogger(__name__)

# pylint: disable=protected-access
Expand Down Expand Up @@ -78,6 +78,7 @@ def _wait_for_confirmation(

@backoff.on_predicate(
backoff.expo,
max_value=30.0,
logger=None, # pyright: ignore
max_time=__lookup_max_time,
on_backoff=backoff_handler,
Expand All @@ -94,19 +95,17 @@ def _wait_for_confirmation(self: Managers, identity: str) -> ReturnTypes:
f"cannot confirm {identity} as confirmation_status is not present"
)

if entity[CONFIRMATION_STATUS] == ConfirmationStatus.FAILED.name:
status = entity[CONFIRMATION_STATUS]
if status == ConfirmationStatus.FAILED.name:
raise ArchivistUnconfirmedError(
f"confirmation for {identity} FAILED - this is unusable"
)

# Simple hash
if entity[CONFIRMATION_STATUS] == ConfirmationStatus.CONFIRMED.name:
return entity

# merkle_log
if (
ConfirmationStatus[entity[CONFIRMATION_STATUS]].value
>= ConfirmationStatus.COMMITTED.value
# Simple hash and merkleLog
if status in (
ConfirmationStatus.CONFIRMED.name,
ConfirmationStatus.COMMITTED.name,
ConfirmationStatus.UNEQUIVOCAL.name,
):
return entity

Expand All @@ -124,6 +123,7 @@ def __on_giveup_confirmed(details: "Details"):

@backoff.on_predicate(
backoff.expo,
max_value=30.0,
logger=None, # pyright: ignore
max_time=__lookup_max_time,
on_backoff=backoff_handler,
Expand All @@ -137,12 +137,19 @@ def _wait_for_confirmed(
) -> bool:
"""Return False until all entities are confirmed"""

# look for unconfirmed entities
# look for pending entities
newprops = deepcopy(props) if props else {}
newprops[CONFIRMATION_STATUS] = ConfirmationStatus.PENDING.name
LOGGER.debug("Count pending entities %s", newprops)
pending_count = self.count(props=newprops, **kwargs)

# look for stored entities
newprops = deepcopy(props) if props else {}
newprops[CONFIRMATION_STATUS] = ConfirmationStatus.STORED.name
LOGGER.debug("Count stored entities %s", newprops)
stored_count = self.count(props=newprops, **kwargs)

LOGGER.debug("Count unconfirmed entities %s", newprops)
count = self.count(props=newprops, **kwargs)
count = pending_count + stored_count

if count == 0:
# did any fail
Expand Down
3 changes: 2 additions & 1 deletion archivist/subjects_confirmer.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
# pylint:disable=cyclic-import # but pylint doesn't understand this feature
from .utils import backoff_handler

MAX_TIME = 1200
MAX_TIME = 300

LOGGER = getLogger(__name__)

Expand All @@ -37,6 +37,7 @@ def __on_giveup_confirmation(details):

@backoff.on_predicate(
backoff.expo,
max_value=30.0,
logger=None, # pyright: ignore
max_time=__lookup_max_time,
on_backoff=backoff_handler,
Expand Down
2 changes: 0 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@
# for the published wheel - the file docs/requirements.txt
# must be kept in sync with this file.
#
# upgrading to 2.2 of backoff raises all kinds of type hint errors so
# this will require more work - stick to 1.11 for now.
backoff~=2.2.1
certifi
flatten-dict~=0.4
Expand Down
8 changes: 7 additions & 1 deletion unittests/testassetsconstants.py
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,12 @@
"attributes": ATTRS,
"confirmation_status": "PENDING",
}
RESPONSE_STORED = {
"identity": IDENTITY,
"behaviours": ASSET_BEHAVIOURS,
"attributes": ATTRS,
"confirmation_status": "STORED",
}
RESPONSE_FAILED = {
"identity": IDENTITY,
"behaviours": ASSET_BEHAVIOURS,
Expand All @@ -553,7 +559,7 @@ class TestAssetsBase(TestCase):
maxDiff = None

def setUp(self):
self.arch = Archivist("url", "authauthauth", max_time=1)
self.arch = Archivist("url", "authauthauth", max_time=0.5)

def tearDown(self):
self.arch.close()
Expand Down
50 changes: 49 additions & 1 deletion unittests/testassetswait.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from .testassetsconstants import (
RESPONSE_FAILED,
RESPONSE_PENDING,
RESPONSE_STORED,
SUBPATH,
TestAssetsBase,
)
Expand Down Expand Up @@ -45,9 +46,11 @@ def test_assets_wait_for_confirmed(self):
status = (
{"page_size": 1},
{"page_size": 1, "confirmation_status": "PENDING"},
{"page_size": 1, "confirmation_status": "STORED"},
{"page_size": 1, "confirmation_status": "FAILED"},
)
with mock.patch.object(self.arch.session, "get") as mock_get:
# there are 2 gets for each retry - one for PENDING and one for STORED
mock_get.side_effect = [
MockResponse(
200,
Expand All @@ -66,6 +69,16 @@ def test_assets_wait_for_confirmed(self):
headers={HEADERS_TOTAL_COUNT: 0},
assets=[],
),
MockResponse(
200,
headers={HEADERS_TOTAL_COUNT: 0},
assets=[],
),
MockResponse(
200,
headers={HEADERS_TOTAL_COUNT: 0},
assets=[],
),
]

self.arch.assets.wait_for_confirmed()
Expand Down Expand Up @@ -110,6 +123,8 @@ def test_assets_wait_for_confirmed_timeout(self):
"""
## last call to get looks for FAILED assets
with mock.patch.object(self.arch.session, "get") as mock_get:
# there are 2 gets for each retry - one for PENDING and one for STORED
# enough entries to be supplied so that timeout occurs
mock_get.side_effect = [
MockResponse(
200,
Expand All @@ -122,7 +137,7 @@ def test_assets_wait_for_confirmed_timeout(self):
200,
headers={HEADERS_TOTAL_COUNT: 2},
assets=[
RESPONSE_PENDING,
RESPONSE_STORED,
],
),
MockResponse(
Expand All @@ -132,27 +147,55 @@ def test_assets_wait_for_confirmed_timeout(self):
RESPONSE_PENDING,
],
),
MockResponse(
200,
headers={HEADERS_TOTAL_COUNT: 2},
assets=[
RESPONSE_STORED,
],
),
MockResponse(
200,
headers={HEADERS_TOTAL_COUNT: 2},
assets=[
RESPONSE_PENDING,
],
),
MockResponse(
200,
headers={HEADERS_TOTAL_COUNT: 2},
assets=[
RESPONSE_STORED,
],
),
MockResponse(
200,
headers={HEADERS_TOTAL_COUNT: 2},
assets=[
RESPONSE_PENDING,
],
),
MockResponse(
200,
headers={HEADERS_TOTAL_COUNT: 2},
assets=[
RESPONSE_STORED,
],
),
MockResponse(
200,
headers={HEADERS_TOTAL_COUNT: 2},
assets=[
RESPONSE_PENDING,
],
),
MockResponse(
200,
headers={HEADERS_TOTAL_COUNT: 2},
assets=[
RESPONSE_STORED,
],
),
]

with self.assertRaises(ArchivistUnconfirmedError):
Expand All @@ -177,6 +220,11 @@ def test_assets_wait_for_confirmed_failed(self):
headers={HEADERS_TOTAL_COUNT: 0},
assets=[],
),
MockResponse(
200,
headers={HEADERS_TOTAL_COUNT: 0},
assets=[],
),
MockResponse(
200,
headers={HEADERS_TOTAL_COUNT: 1},
Expand Down
8 changes: 7 additions & 1 deletion unittests/testeventswait.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,13 @@ class TestEventsWait(TestEventsBase):

def test_events_wait_for_confirmed(self):
"""
Test event counting
Test event confirmation
"""
## last call to get looks for FAILED assets
status = (
{"page_size": 1},
{"page_size": 1, "confirmation_status": "PENDING"},
{"page_size": 1, "confirmation_status": "STORED"},
{"page_size": 1, "confirmation_status": "FAILED"},
)
with mock.patch.object(self.arch.session, "get") as mock_get:
Expand All @@ -62,6 +63,11 @@ def test_events_wait_for_confirmed(self):
headers={HEADERS_TOTAL_COUNT: 0},
assets=[],
),
MockResponse(
200,
headers={HEADERS_TOTAL_COUNT: 0},
assets=[],
),
]

self.arch.events.wait_for_confirmed()
Expand Down

0 comments on commit bf471e6

Please sign in to comment.