Skip to content

Commit

Permalink
Catch errors from event driver publish retries (#1264)
Browse files Browse the repository at this point in the history
  • Loading branch information
vachillo authored Oct 17, 2024
1 parent f6b15ab commit 829456b
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 13 deletions.
1 change: 1 addition & 0 deletions docs/griptape-framework/misc/events.md
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ Assistant:
## `EventListenerDriver.handler` Return Value Behavior

The value that gets returned from the [`EventListener.handler`](../../reference/griptape/events/event_listener.md#griptape.events.event_listener.EventListener.handler) will determine what gets sent to the `event_listener_driver`.

### `EventListener.handler` is None

By default, the `EventListener.handler` function is `None`. Any events that the `EventListener` is listening for will get sent to the `event_listener_driver` as-is.
Expand Down
22 changes: 12 additions & 10 deletions griptape/drivers/event_listener/base_event_listener_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,17 @@ def try_publish_event_payload(self, event_payload: dict) -> None: ...
def try_publish_event_payload_batch(self, event_payload_batch: list[dict]) -> None: ...

def _safe_publish_event_payload(self, event_payload: dict) -> None:
for attempt in self.retrying():
with attempt:
self.try_publish_event_payload(event_payload)
else:
logger.error("event listener driver failed after all retry attempts")
try:
for attempt in self.retrying():
with attempt:
self.try_publish_event_payload(event_payload)
except Exception:
logger.warning("Failed to publish event after %s attempts", self.max_attempts, exc_info=True)

def _safe_publish_event_payload_batch(self, event_payload_batch: list[dict]) -> None:
for attempt in self.retrying():
with attempt:
self.try_publish_event_payload_batch(event_payload_batch)
else:
logger.error("event listener driver failed after all retry attempts")
try:
for attempt in self.retrying():
with attempt:
self.try_publish_event_payload_batch(event_payload_batch)
except Exception:
logger.warning("Failed to publish event batch after %s attempts", self.max_attempts, exc_info=True)
13 changes: 10 additions & 3 deletions tests/mocks/mock_event_listener_driver.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,21 @@
from __future__ import annotations

from attrs import define
from typing import Callable, Optional

from attrs import define, field

from griptape.drivers import BaseEventListenerDriver


@define
class MockEventListenerDriver(BaseEventListenerDriver):
try_publish_event_payload_fn: Optional[Callable[[dict], None]] = field(default=None, kw_only=True)
try_publish_event_payload_batch_fn: Optional[Callable[[list[dict]], None]] = field(default=None, kw_only=True)

def try_publish_event_payload(self, event_payload: dict) -> None:
pass
if self.try_publish_event_payload_fn is not None:
self.try_publish_event_payload_fn(event_payload)

def try_publish_event_payload_batch(self, event_payload_batch: list[dict]) -> None:
pass
if self.try_publish_event_payload_batch_fn is not None:
self.try_publish_event_payload_batch_fn(event_payload_batch)
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,61 @@ def test_flush_events(self):
driver.flush_events()
executor.submit.assert_called_once_with(driver._safe_publish_event_payload_batch, mock_event_payloads)
assert len(driver.batch) == 0

def test__safe_publish_event_payload(self):
mock_fn = MagicMock()
driver = MockEventListenerDriver(
batched=False,
try_publish_event_payload_fn=mock_fn,
)
mock_event_payload = MockEvent().to_dict()

driver._safe_publish_event_payload(mock_event_payload)

mock_fn.assert_called_once_with(mock_event_payload)

def test__safe_publish_event_payload_batch(self):
mock_fn = MagicMock()
driver = MockEventListenerDriver(
batched=True,
try_publish_event_payload_batch_fn=mock_fn,
)
mock_event_payloads = [MockEvent().to_dict() for _ in range(0, 3)]

driver._safe_publish_event_payload_batch(mock_event_payloads)

mock_fn.assert_called_once_with(mock_event_payloads)

def test__safe_publish_event_payload_error(self):
mock_fn = MagicMock()
driver = MockEventListenerDriver(
batched=False,
try_publish_event_payload_fn=mock_fn,
max_attempts=2,
max_retry_delay=0.1,
min_retry_delay=0.1,
)
mock_fn.side_effect = Exception("Test Exception")
mock_event_payload = MockEvent().to_dict()

driver._safe_publish_event_payload(mock_event_payload)

assert mock_fn.call_count == driver.max_attempts
mock_fn.assert_called_with(mock_event_payload)

def test__safe_publish_event_payload_batch_error(self):
mock_fn = MagicMock()
driver = MockEventListenerDriver(
batched=True,
try_publish_event_payload_batch_fn=mock_fn,
max_attempts=2,
max_retry_delay=0.1,
min_retry_delay=0.1,
)
mock_fn.side_effect = Exception("Test Exception")
mock_event_payloads = [MockEvent().to_dict() for _ in range(0, 3)]

driver._safe_publish_event_payload_batch(mock_event_payloads)

assert mock_fn.call_count == driver.max_attempts
mock_fn.assert_called_with(mock_event_payloads)

0 comments on commit 829456b

Please sign in to comment.