From 8cdc5d94afb063fe10451b4ab7e456aadeb28858 Mon Sep 17 00:00:00 2001 From: Rob Kooper Date: Fri, 26 Apr 2024 08:51:54 -0500 Subject: [PATCH] add prefix, rename server folder, add unack messages to monitor --- CHANGELOG.md | 9 +++++++++ {server => cdrhook}/.dockerignore | 0 {server => cdrhook}/Dockerfile | 3 ++- {server => cdrhook}/requirements.txt | 0 {server => cdrhook}/server.py | 8 ++++---- {server => cdrhook}/unregister.sh | 0 docker-compose.yml | 10 ++++++---- monitor/monitor.py | 8 ++++---- uploader/Dockerfile | 3 ++- uploader/uploader.py | 13 +++++++++---- 10 files changed, 36 insertions(+), 18 deletions(-) rename {server => cdrhook}/.dockerignore (100%) rename {server => cdrhook}/Dockerfile (84%) rename {server => cdrhook}/requirements.txt (100%) rename {server => cdrhook}/server.py (96%) rename {server => cdrhook}/unregister.sh (100%) diff --git a/CHANGELOG.md b/CHANGELOG.md index cbabb8f..0207025 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,15 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). +## [0.4.0] - 2024-04-26 + +### Added +- can now set a prefix for all queue names (PREFIX="") + +### Changed +- renamed server folder to cdrhook +- monitor shows number of messages processing, "61 / 1" means 61 messages waiting, 1 being processed + ## [0.3.0] - 2024-04-23 ### Added diff --git a/server/.dockerignore b/cdrhook/.dockerignore similarity index 100% rename from server/.dockerignore rename to cdrhook/.dockerignore diff --git a/server/Dockerfile b/cdrhook/Dockerfile similarity index 84% rename from server/Dockerfile rename to cdrhook/Dockerfile index f3ada25..d9e43a8 100644 --- a/server/Dockerfile +++ b/cdrhook/Dockerfile @@ -12,7 +12,8 @@ ENV PYTHONUNBUFFERED=1 \ CALLBACK_URL="" \ CALLBACK_USERNAME="" \ CALLBACK_PASSWORD="" \ - RABBITMQ_URI="amqp://guest:guest@localhost:5672/%2F" + RABBITMQ_URI="amqp://guest:guest@localhost:5672/%2F" \ + PREFIX="" COPY requirements.txt ./ RUN pip install -r ./requirements.txt diff --git a/server/requirements.txt b/cdrhook/requirements.txt similarity index 100% rename from server/requirements.txt rename to cdrhook/requirements.txt diff --git a/server/server.py b/cdrhook/server.py similarity index 96% rename from server/server.py rename to cdrhook/server.py index 1358cc4..839ddef 100644 --- a/server/server.py +++ b/cdrhook/server.py @@ -90,7 +90,7 @@ def process_map(cog_id, cog_url): r = requests.get(f"{cdr_url}/v1/maps/cog/{cog_id}/results", headers=headers) r.raise_for_status() message['results'] = r.json() - send_message(message, current_app.config["queue"]) + send_message(message, f'{current_app.config["prefix"]}download') def validate_request(data, signature_header, secret): @@ -127,9 +127,9 @@ def hook(): "exception": str(e), "data": request.data } - send_message(mesg, "cdrhook.error") + send_message(mesg, f'{current_app.config["prefix"]}cdrhook.error') elif current_app.config["cdr_keep_event"]: - send_message(data, "cdrhook.unknown") + send_message(data, f'{current_app.config["prefix"]}cdrhook.unknown') return {"ok": "success"} @@ -189,7 +189,7 @@ def create_app(): app.config["callback_username"] = os.getenv("CALLBACK_USERNAME") app.config["callback_password"] = os.getenv("CALLBACK_PASSWORD") app.config["rabbitmq_uri"] = os.getenv("RABBITMQ_URI") - app.config["queue"] = os.getenv("DOWNLOAD_QUEUE", "download") + app.config["prefix"] = os.getenv("PREFIX") app.config["cdr_keep_event"] = strtobool(os.getenv("CDR_KEEP_EVENT", "no")) # register with the CDR diff --git a/server/unregister.sh b/cdrhook/unregister.sh similarity index 100% rename from server/unregister.sh rename to cdrhook/unregister.sh diff --git a/docker-compose.yml b/docker-compose.yml index 95190ab..1808a0f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -57,7 +57,7 @@ services: cdrhook: image: ncsa/criticalmaas-cdr:latest hostname: cdrhook - build: server + build: cdrhook restart: unless-stopped depends_on: - rabbitmq @@ -71,7 +71,7 @@ services: CALLBACK_USERNAME: "${CALLBACK_USERNAME}" CALLBACK_PASSWORD: "${CALLBACK_PASSWORD}" RABBITMQ_URI: "amqp://${RABBITMQ_USERNAME}:${RABBITMQ_PASSWORD}@rabbitmq/%2F" - DOWNLOAD_QUEUE: "download" + PREFIX: "" labels: - "traefik.enable=true" - "traefik.http.routers.cdrhook.rule=Host(`${SERVER_NAME}`) && PathPrefix(`${CALLBACK_PATH}`)" @@ -107,7 +107,8 @@ services: depends_on: - rabbitmq environment: - - NVIDIA_VISIBLE_DEVICES=all + NVIDIA_VISIBLE_DEVICES: all + PREFIX: "" command: - -v - --data @@ -142,8 +143,9 @@ services: depends_on: - rabbitmq environment: - RABBITMQ_URI: "amqp://${RABBITMQ_USERNAME}:${RABBITMQ_PASSWORD}@rabbitmq/%2F" CDR_TOKEN: "${CDR_TOKEN}" + RABBITMQ_URI: "amqp://${RABBITMQ_USERNAME}:${RABBITMQ_PASSWORD}@rabbitmq/%2F" + PREFIX: "" volumes: - "output:/output" diff --git a/monitor/monitor.py b/monitor/monitor.py index cefd191..8802fac 100644 --- a/monitor/monitor.py +++ b/monitor/monitor.py @@ -32,26 +32,26 @@ def do_GET(self): if data['name'].endswith(".error"): queue = data['name'][:-6] consumers = data['consumers'] - messages = 0 + messages = None unknown = 0 errors = data['messages'] elif data['name'].endswith(".unknown"): queue = data['name'][:-8] consumers = data['consumers'] - messages = 0 + messages = None unknown = data['messages'] errors = 0 else: queue = data['name'] consumers = data['consumers'] - messages = data['messages'] + messages = f'{data["messages"]} / {data["messages_unacknowledged"]}' unknown = 0 errors = 0 if queue in queues: if consumers != 0: queues[queue]['consumers'] = consumers - if messages != 0: + if messages: queues[queue]['messages'] = messages if errors != 0: queues[queue]['errors'] = errors diff --git a/uploader/Dockerfile b/uploader/Dockerfile index e7e5515..66dedf6 100644 --- a/uploader/Dockerfile +++ b/uploader/Dockerfile @@ -2,7 +2,8 @@ FROM python:3.11 # environemnt variables ENV RABBITMQ_URI="amqp://guest:guest@localhost:5672/%2F" \ - MODELS="golden_muscat flat_iceberg" \ + PREFIX="" \ + CDR_TOKEN="" \ MAX_SIZE=300 WORKDIR /src diff --git a/uploader/uploader.py b/uploader/uploader.py index ee11f1b..71512ee 100644 --- a/uploader/uploader.py +++ b/uploader/uploader.py @@ -8,6 +8,11 @@ # rabbitmq uri rabbitmq_uri = os.getenv("RABBITMQ_URI", "amqp://guest:guest@localhost:5672/%2F") + +# prefix for the queue names +prefix = os.getenv("PREFIX", "") + +# CDR url, token and max size for upload (in MB) cdr_url = "https://api.cdr.land" cdr_token = os.getenv("CDR_TOKEN", "") max_size = int(os.getenv("MAX_SIZE", "300")) @@ -56,14 +61,14 @@ def main(): channel = connection.channel() # create queues - channel.queue_declare(queue="upload", durable=True) - channel.queue_declare(queue="upload.error", durable=True) + channel.queue_declare(queue=f"{prefix}upload", durable=True) + channel.queue_declare(queue=f"{prefix}upload.error", durable=True) # listen for messages and stop if nothing found after 5 minutes channel.basic_qos(prefetch_count=1) # create generator to fetch messages - consumer = channel.consume(queue="upload", inactivity_timeout=1) + consumer = channel.consume(queue=f"{prefix}upload", inactivity_timeout=1) # loop getting new messages worker = None @@ -78,7 +83,7 @@ def main(): data = json.loads(worker.body) if worker.exception: data['exception'] = repr(worker.exception) - channel.basic_publish(exchange='', routing_key=f"upload.error", body=json.dumps(data), properties=worker.properties) + channel.basic_publish(exchange='', routing_key=f"{prefix}upload.error", body=json.dumps(data), properties=worker.properties) else: logging.info(f"Finished all processing steps for map {data['cog_id']}") channel.basic_ack(delivery_tag=worker.method.delivery_tag)