Skip to content

Commit

Permalink
Merge pull request #1 from VIER-CognitiveVoice/feature/dropMessagesIf…
Browse files Browse the repository at this point in the history
…Busy

Option to drop messages if busy and flag to ignore messages.
  • Loading branch information
pschichtel authored Mar 12, 2024
2 parents 9e6e2b8 + d20af30 commit 34cb330
Showing 1 changed file with 42 additions and 6 deletions.
48 changes: 42 additions & 6 deletions rasa_vier_cvg/cvg.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,16 @@ def __init__(self, callback_base_url: Text, on_message: Callable[[UserMessage],
self.base_url = callback_base_url.rstrip('/')
self.proxy = proxy

# This functionality can be used to ignore certain messages received by this channel.
# It can be used as a workaround for dialog setups that produce messages that should not be forwarded to CVG but still be tracked.
# Due to a bug in rasa, this feature is required:
# Normally, when a custom rasa action returns a response, it is sent to the current channel and gets added to the tracker.
# However, returning an Event, it should only get added to the tracker, not sent to the channel. I believe this is a bug.
# If you want to maintain an accurate history in the tracker when using the CVG API directly, we have to send an event.
# Now, to prevent sending the message to CVG twice, this flag can be set to true so that the channel drops the message.
def _is_ignored(self, custom_json) -> bool:
return custom_json is not None and "ignore" in custom_json and custom_json["ignore"] == True

async def _perform_request(self, path: str, method: str, data: Optional[any], dialog_id: Optional[str], retries: int = 0) -> (Optional[int], any):
url = f"{self.base_url}{path}"
try:
Expand Down Expand Up @@ -112,7 +122,10 @@ async def _say(self, dialog_id: str, text: str):
if len(text.strip()) > 0:
await self._perform_request("/call/say", method="POST", data={DIALOG_ID_FIELD: dialog_id, "text": text}, dialog_id=dialog_id)

async def send_text_message(self, recipient_id: Text, text: Text, **kwargs: Any) -> None:
async def send_text_message(self, recipient_id: Text, text: Text, custom, **kwargs: Any) -> None:
if self._is_ignored(custom):
return

reseller_token, project_token, dialog_id = parse_recipient_id(recipient_id)
logger.info(f"{dialog_id} - Sending text to say: {text}")
await self._say(dialog_id, text)
Expand Down Expand Up @@ -208,6 +221,9 @@ async def do_nothing(*args):
logger.info(f"{dialog_id} - Operation {operation_name} complete")

async def send_custom_json(self, recipient_id: Text, json_message: Dict[Text, Any], **kwargs: Any) -> None:
if self._is_ignored(json_message):
return

for operation_name, body in json_message.items():
if operation_name[:len(OPERATION_PREFIX)] == OPERATION_PREFIX:
await asyncio.sleep(0.050)
Expand All @@ -230,7 +246,9 @@ class CVGInput(InputChannel):
proxy: Optional[str]
expected_authorization_header_value: str
blocking_endpoints: bool
ignore_messages_when_busy: bool
task_container: TaskContainer = TaskContainer()
ignore_messages_for: set[Text] = set()

@classmethod
def name(cls) -> Text:
Expand All @@ -253,15 +271,22 @@ def from_credentials(cls, credentials: Optional[Dict[Text, Any]]) -> InputChanne
else:
blocking_endpoints = bool(blocking_endpoints)

logger.info(f"Creating input with: token={'*' * len(token)} proxy={proxy} start_intent={start_intent} blocking_endpoints={blocking_endpoints}")
return cls(token, start_intent, proxy, blocking_endpoints)
ignore_messages_when_busy = credentials.get("ignore_messages_when_busy")
if ignore_messages_when_busy is None:
ignore_messages_when_busy = False
else:
ignore_messages_when_busy = bool(ignore_messages_when_busy)

logger.info(f"Creating input with: token={'*' * len(token)} proxy={proxy} start_intent={start_intent} blocking_endpoints={blocking_endpoints} ignore_messages_when_busy={ignore_messages_when_busy}")
return cls(token, start_intent, proxy, blocking_endpoints, ignore_messages_when_busy)

def __init__(self, token: Text, start_intent: Text, proxy: Optional[Text], blocking_endpoints: bool) -> None:
def __init__(self, token: Text, start_intent: Text, proxy: Optional[Text], blocking_endpoints: bool, ignore_messages_when_busy: bool) -> None:
self.callback = None
self.expected_authorization_header_value = f"Bearer {token}"
self.proxy = proxy
self.start_intent = start_intent
self.blocking_endpoints = blocking_endpoints
self.ignore_messages_when_busy = ignore_messages_when_busy

async def _process_message(self, request: Request, on_new_message: Callable[[UserMessage], Awaitable[Any]], dialog_id: Text, text: Text, sender_id: Text) -> Any:
try:
Expand All @@ -277,9 +302,20 @@ async def _process_message(self, request: Request, on_new_message: Callable[[Use
metadata=metadata,
)

logger.info(f"{dialog_id} - Creating incoming UserMessage: text={text}, output_channel={user_msg.output_channel}, sender_id={sender_id}, metadata={metadata}")
# Ignore Messages when busy uses a local variable to check if there is already a message being processed.
# This means that this feature does NOT work with multiple instances of this channel/rasa handling the same sender_id.
if (self.ignore_messages_when_busy):
if (dialog_id in self.ignore_messages_for):
logger.warning(f"{dialog_id} - A message is already being processed for this dialog and ignore_messages_when_busy is True. Ignoring message from User: '{text}'")
return response.empty(204)
else:
self.ignore_messages_for.add(dialog_id)

await on_new_message(user_msg)
logger.info(f"{dialog_id} - Creating incoming UserMessage: text={text}, output_channel={user_msg.output_channel}, sender_id={sender_id}, metadata={metadata}")
try:
await on_new_message(user_msg)
finally:
self.ignore_messages_for.remove(dialog_id)
except Exception as e:
logger.error(f"{dialog_id} - Exception when trying to handle message: {e}")
logger.error(e, exc_info=True)
Expand Down

0 comments on commit 34cb330

Please sign in to comment.