Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added pg_notify #616

Merged
merged 1 commit into from
Jan 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ RUN useradd -u $USER_ID -d $APP_DIR appuser
WORKDIR $APP_DIR
COPY . $WORKDIR
RUN chown -R $USER_ID $APP_DIR
RUN dnf install -y java-17-openjdk-devel python3-pip
RUN dnf install -y java-17-openjdk-devel python3-pip postgresql-devel gcc python3-devel

RUN bash -c "if [ $DEVEL_COLLECTION_LIBRARY -ne 0 ]; then \
dnf install -y git; fi"
Expand Down
2 changes: 2 additions & 0 deletions ansible_rulebook/action/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

KEY_EDA_VARS = "ansible_eda"
INTERNAL_ACTION_STATUS = "successful"
FAILED_STATUS = "failed"
SUCCESSFUL_STATUS = "successful"


class Helper:
Expand Down
111 changes: 111 additions & 0 deletions ansible_rulebook/action/pg_notify.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
# Copyright 2023 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import logging
import uuid

import xxhash
from psycopg import AsyncClientCursor, AsyncConnection, OperationalError

from .control import Control
from .helper import FAILED_STATUS, Helper
from .metadata import Metadata

logger = logging.getLogger(__name__)

MAX_MESSAGE_LENGTH = 7 * 1024
MESSAGE_CHUNKED_UUID = "_message_chunked_uuid"
MESSAGE_CHUNK_COUNT = "_message_chunk_count"
MESSAGE_CHUNK_SEQUENCE = "_message_chunk_sequence"
MESSAGE_CHUNK = "_chunk"
MESSAGE_LENGTH = "_message_length"
MESSAGE_XX_HASH = "_message_xx_hash"


class PGNotify:
"""The PGNotify action sends an event to a PG Pub Sub Channel
Needs
dsn https://www.postgresql.org/docs/current/libpq-connect.html
#LIBPQ-CONNSTRING-KEYWORD-VALUE
channel the channel name to send the notifies
event
"""

def __init__(self, metadata: Metadata, control: Control, **action_args):
self.helper = Helper(metadata, control, "pg_notify")
self.action_args = action_args

async def __call__(self):
if not self.action_args["event"]:
return

try:
async with await AsyncConnection.connect(
mkanoor marked this conversation as resolved.
Show resolved Hide resolved
conninfo=self.action_args["dsn"],
autocommit=True,
) as conn:
async with AsyncClientCursor(connection=conn) as cursor:
if self.action_args.get("remove_meta", False):
event = self.action_args["event"].copy()
if "meta" in event:
event.pop("meta")
else:
event = self.action_args["event"]

payload = json.dumps(event)
Alex-Izquierdo marked this conversation as resolved.
Show resolved Hide resolved
message_length = len(payload)
if message_length >= MAX_MESSAGE_LENGTH:
for chunk in self._to_chunks(payload, message_length):
await cursor.execute(
f"NOTIFY {self.action_args['channel']}, "
f"'{json.dumps(chunk)}';"
)
else:
await cursor.execute(
f"NOTIFY {self.action_args['channel']}, "
f"'{payload}';"
)
except OperationalError as e:
logger.error("PG Notify operational error %s", str(e))
data = dict(status=FAILED_STATUS, message=str(e))
await self.helper.send_status(data)
raise e

await self.helper.send_default_status()

def _to_chunks(self, payload: str, message_length: int):
xx_hash = xxhash.xxh32(payload.encode("utf-8")).hexdigest()
logger.debug(
"Message length exceeds %d bytes, will chunk", MAX_MESSAGE_LENGTH
)
message_uuid = str(uuid.uuid4())
number_of_chunks = int(message_length / MAX_MESSAGE_LENGTH) + 1
chunked = {
MESSAGE_CHUNKED_UUID: message_uuid,
MESSAGE_CHUNK_COUNT: number_of_chunks,
MESSAGE_LENGTH: message_length,
MESSAGE_XX_HASH: xx_hash,
}
logger.debug("Chunk info %s", message_uuid)
logger.debug("Number of chunks %d", number_of_chunks)
logger.debug("Total data size %d", message_length)
logger.debug("XX Hash %s", xx_hash)

sequence = 1
for i in range(0, message_length, MAX_MESSAGE_LENGTH):
chunked[MESSAGE_CHUNK] = payload[i : i + MAX_MESSAGE_LENGTH]
chunked[MESSAGE_CHUNK_SEQUENCE] = sequence
sequence += 1
yield chunked
2 changes: 2 additions & 0 deletions ansible_rulebook/rule_set_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from ansible_rulebook.action.debug import Debug
from ansible_rulebook.action.metadata import Metadata
from ansible_rulebook.action.noop import Noop
from ansible_rulebook.action.pg_notify import PGNotify
from ansible_rulebook.action.post_event import PostEvent
from ansible_rulebook.action.print_event import PrintEvent
from ansible_rulebook.action.retract_fact import RetractFact
Expand Down Expand Up @@ -75,6 +76,7 @@
"run_module": RunModule,
"run_job_template": RunJobTemplate,
"run_workflow_template": RunWorkflowTemplate,
"pg_notify": PGNotify,
}


Expand Down
42 changes: 42 additions & 0 deletions ansible_rulebook/schema/ruleset_schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@
},
{
"$ref": "#/$defs/shutdown-action"
},
{
"$ref": "#/$defs/pg-notify-action"
}
]
}
Expand Down Expand Up @@ -244,6 +247,9 @@
},
{
"$ref": "#/$defs/shutdown-action"
},
{
"$ref": "#/$defs/pg-notify-action"
}
]
}
Expand Down Expand Up @@ -510,6 +516,42 @@
],
"additionalProperties": false
},
"pg-notify-action": {
"type": "object",
"properties": {
"pg_notify": {
"type": "object",
"properties": {
"dsn": {
"type": "string"
},
"channel": {
"type": "string"
},
"event": {
"type": [
"string",
"object"
]
},
"remove_meta": {
"type": "boolean",
"default": false
}
},
"required": [
"dsn",
"channel",
"event"
],
"additionalProperties": false
}
},
"required": [
"pg_notify"
],
"additionalProperties": false
},
"post-event-action": {
"type": "object",
"properties": {
Expand Down
4 changes: 3 additions & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ install_requires =
ansible-runner
websockets
drools_jpy == 0.3.8
watchdog
watchdog
psycopg[c]
mkanoor marked this conversation as resolved.
Show resolved Hide resolved
xxhash

[options.packages.find]
include =
Expand Down
Loading