Skip to content

Commit

Permalink
feat: added pg_notify
Browse files Browse the repository at this point in the history
Added an action to send notifications via postgres LISTEN/NOTIFY
  • Loading branch information
mkanoor committed Jan 12, 2024
1 parent 099beda commit 8e411d7
Show file tree
Hide file tree
Showing 7 changed files with 407 additions and 1 deletion.
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ RUN pip install -U pip \
aiokafka \
watchdog \
azure-servicebus \
psycopg \
&& ansible-galaxy collection install ansible.eda

RUN bash -c "if [ $DEVEL_COLLECTION_LIBRARY -ne 0 ]; then \
Expand Down
2 changes: 2 additions & 0 deletions ansible_rulebook/action/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

KEY_EDA_VARS = "ansible_eda"
INTERNAL_ACTION_STATUS = "successful"
FAILED_STATUS = "failed"
SUCCESSFUL_STATUS = "successful"


class Helper:
Expand Down
109 changes: 109 additions & 0 deletions ansible_rulebook/action/pg_notify.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# Copyright 2023 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import logging
import uuid

import xxhash
from psycopg import AsyncClientCursor, AsyncConnection, OperationalError

from .control import Control
from .helper import FAILED_STATUS, Helper
from .metadata import Metadata

logger = logging.getLogger(__name__)

MAX_MESSAGE_LENGTH = 7 * 1024
MESSAGE_CHUNKED_UUID = "_message_chunked_uuid"
MESSAGE_CHUNK_COUNT = "_message_chunk_count"
MESSAGE_CHUNK_SEQUENCE = "_message_chunk_sequence"
MESSAGE_CHUNK = "_chunk"
MESSAGE_LENGTH = "_message_length"
MESSAGE_XX_HASH = "_message_xx_hash"


class PGNotify:
"""The PGNotify action sends an event to a PG Pub Sub Channel
Needs
dsn https://www.postgresql.org/docs/current/libpq-connect.html
#LIBPQ-CONNSTRING-KEYWORD-VALUE
channel the channel name to send the notifies
event
"""

def __init__(self, metadata: Metadata, control: Control, **action_args):
self.helper = Helper(metadata, control, "pg_notify")
self.action_args = action_args

async def __call__(self):
if not self.action_args["event"]:
return

try:
async with await AsyncConnection.connect(
conninfo=self.action_args["dsn"],
autocommit=True,
) as conn:
async with AsyncClientCursor(connection=conn) as cursor:
if self.action_args.get("remove_meta", False):
event = self.action_args["event"].copy()
if "meta" in event:
event.pop("meta")
else:
event = self.action_args["event"]

payload = json.dumps(event)
message_length = len(payload)
if message_length >= MAX_MESSAGE_LENGTH:
for chunk in self._to_chunks(payload, message_length):
await cursor.execute(
f"NOTIFY {self.action_args['channel']}, "
f"'{json.dumps(chunk)}';"
)
else:
await cursor.execute(
f"NOTIFY {self.action_args['channel']}, "
f"'{payload}';"
)
except OperationalError as e:
logger.error(f"PG Notify operational error {e}")
data = dict(status=FAILED_STATUS, message=str(e))
await self.helper.send_status(data)
raise e

await self.helper.send_default_status()

def _to_chunks(self, payload: str, message_length: int):
xx_hash = xxhash.xxh32(payload.encode("utf-8")).hexdigest()
logger.debug("Message length exceeds, will chunk")
message_uuid = str(uuid.uuid4())
number_of_chunks = int(message_length / MAX_MESSAGE_LENGTH) + 1
chunked = {
MESSAGE_CHUNKED_UUID: message_uuid,
MESSAGE_CHUNK_COUNT: number_of_chunks,
MESSAGE_LENGTH: message_length,
MESSAGE_XX_HASH: xx_hash,
}
logger.debug(f"Chunk info {message_uuid}")
logger.debug(f"Number of chunks {number_of_chunks}")
logger.debug(f"Total data size {message_length}")
logger.debug(f"XX Hash {xx_hash}")

sequence = 1
for i in range(0, message_length, MAX_MESSAGE_LENGTH):
chunked[MESSAGE_CHUNK] = payload[i : i + MAX_MESSAGE_LENGTH]
chunked[MESSAGE_CHUNK_SEQUENCE] = sequence
sequence += 1
yield chunked
2 changes: 2 additions & 0 deletions ansible_rulebook/rule_set_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from ansible_rulebook.action.debug import Debug
from ansible_rulebook.action.metadata import Metadata
from ansible_rulebook.action.noop import Noop
from ansible_rulebook.action.pg_notify import PGNotify
from ansible_rulebook.action.post_event import PostEvent
from ansible_rulebook.action.print_event import PrintEvent
from ansible_rulebook.action.retract_fact import RetractFact
Expand Down Expand Up @@ -75,6 +76,7 @@
"run_module": RunModule,
"run_job_template": RunJobTemplate,
"run_workflow_template": RunWorkflowTemplate,
"pg_notify": PGNotify,
}


Expand Down
42 changes: 42 additions & 0 deletions ansible_rulebook/schema/ruleset_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@
},
{
"$ref": "#/$defs/shutdown-action"
},
{
"$ref": "#/$defs/pg-notify-action"
}
]
}
Expand Down Expand Up @@ -244,6 +247,9 @@
},
{
"$ref": "#/$defs/shutdown-action"
},
{
"$ref": "#/$defs/pg-notify-action"
}
]
}
Expand Down Expand Up @@ -510,6 +516,42 @@
],
"additionalProperties": false
},
"pg-notify-action": {
"type": "object",
"properties": {
"pg_notify": {
"type": "object",
"properties": {
"dsn": {
"type": "string"
},
"channel": {
"type": "string"
},
"event": {
"type": [
"string",
"object"
]
},
"remove_meta": {
"type": "boolean",
"default": false
}
},
"required": [
"dsn",
"channel",
"event"
],
"additionalProperties": false
}
},
"required": [
"pg_notify"
],
"additionalProperties": false
},
"post-event-action": {
"type": "object",
"properties": {
Expand Down
4 changes: 3 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ install_requires =
ansible-runner
websockets
drools_jpy == 0.3.8
watchdog
watchdog
psycopg
xxhash

[options.packages.find]
include =
Expand Down
Loading

0 comments on commit 8e411d7

Please sign in to comment.