Skip to content

Commit

Permalink
add prefix, rename server folder, add unack messages to monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
robkooper committed Apr 26, 2024
1 parent 1317b13 commit 8cdc5d9
Show file tree
Hide file tree
Showing 10 changed files with 36 additions and 18 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,15 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).

## [0.4.0] - 2024-04-26

### Added
- can now set a prefix for all queue names (PREFIX="")

### Changed
- renamed server folder to cdrhook
- monitor shows number of messages processing, "61 / 1" means 61 messages waiting, 1 being processed

## [0.3.0] - 2024-04-23

### Added
Expand Down
File renamed without changes.
3 changes: 2 additions & 1 deletion server/Dockerfile → cdrhook/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ ENV PYTHONUNBUFFERED=1 \
CALLBACK_URL="" \
CALLBACK_USERNAME="" \
CALLBACK_PASSWORD="" \
RABBITMQ_URI="amqp://guest:guest@localhost:5672/%2F"
RABBITMQ_URI="amqp://guest:guest@localhost:5672/%2F" \
PREFIX=""

COPY requirements.txt ./
RUN pip install -r ./requirements.txt
Expand Down
File renamed without changes.
8 changes: 4 additions & 4 deletions server/server.py → cdrhook/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def process_map(cog_id, cog_url):
r = requests.get(f"{cdr_url}/v1/maps/cog/{cog_id}/results", headers=headers)
r.raise_for_status()
message['results'] = r.json()
send_message(message, current_app.config["queue"])
send_message(message, f'{current_app.config["prefix"]}download')


def validate_request(data, signature_header, secret):
Expand Down Expand Up @@ -127,9 +127,9 @@ def hook():
"exception": str(e),
"data": request.data
}
send_message(mesg, "cdrhook.error")
send_message(mesg, f'{current_app.config["prefix"]}cdrhook.error')
elif current_app.config["cdr_keep_event"]:
send_message(data, "cdrhook.unknown")
send_message(data, f'{current_app.config["prefix"]}cdrhook.unknown')

return {"ok": "success"}

Expand Down Expand Up @@ -189,7 +189,7 @@ def create_app():
app.config["callback_username"] = os.getenv("CALLBACK_USERNAME")
app.config["callback_password"] = os.getenv("CALLBACK_PASSWORD")
app.config["rabbitmq_uri"] = os.getenv("RABBITMQ_URI")
app.config["queue"] = os.getenv("DOWNLOAD_QUEUE", "download")
app.config["prefix"] = os.getenv("PREFIX")
app.config["cdr_keep_event"] = strtobool(os.getenv("CDR_KEEP_EVENT", "no"))

# register with the CDR
Expand Down
File renamed without changes.
10 changes: 6 additions & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ services:
cdrhook:
image: ncsa/criticalmaas-cdr:latest
hostname: cdrhook
build: server
build: cdrhook
restart: unless-stopped
depends_on:
- rabbitmq
Expand All @@ -71,7 +71,7 @@ services:
CALLBACK_USERNAME: "${CALLBACK_USERNAME}"
CALLBACK_PASSWORD: "${CALLBACK_PASSWORD}"
RABBITMQ_URI: "amqp://${RABBITMQ_USERNAME}:${RABBITMQ_PASSWORD}@rabbitmq/%2F"
DOWNLOAD_QUEUE: "download"
PREFIX: ""
labels:
- "traefik.enable=true"
- "traefik.http.routers.cdrhook.rule=Host(`${SERVER_NAME}`) && PathPrefix(`${CALLBACK_PATH}`)"
Expand Down Expand Up @@ -107,7 +107,8 @@ services:
depends_on:
- rabbitmq
environment:
- NVIDIA_VISIBLE_DEVICES=all
NVIDIA_VISIBLE_DEVICES: all
PREFIX: ""
command:
- -v
- --data
Expand Down Expand Up @@ -142,8 +143,9 @@ services:
depends_on:
- rabbitmq
environment:
RABBITMQ_URI: "amqp://${RABBITMQ_USERNAME}:${RABBITMQ_PASSWORD}@rabbitmq/%2F"
CDR_TOKEN: "${CDR_TOKEN}"
RABBITMQ_URI: "amqp://${RABBITMQ_USERNAME}:${RABBITMQ_PASSWORD}@rabbitmq/%2F"
PREFIX: ""
volumes:
- "output:/output"

Expand Down
8 changes: 4 additions & 4 deletions monitor/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,26 +32,26 @@ def do_GET(self):
if data['name'].endswith(".error"):
queue = data['name'][:-6]
consumers = data['consumers']
messages = 0
messages = None
unknown = 0
errors = data['messages']
elif data['name'].endswith(".unknown"):
queue = data['name'][:-8]
consumers = data['consumers']
messages = 0
messages = None
unknown = data['messages']
errors = 0
else:
queue = data['name']
consumers = data['consumers']
messages = data['messages']
messages = f'{data["messages"]} / {data["messages_unacknowledged"]}'
unknown = 0
errors = 0

if queue in queues:
if consumers != 0:
queues[queue]['consumers'] = consumers
if messages != 0:
if messages:
queues[queue]['messages'] = messages
if errors != 0:
queues[queue]['errors'] = errors
Expand Down
3 changes: 2 additions & 1 deletion uploader/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ FROM python:3.11

# environemnt variables
ENV RABBITMQ_URI="amqp://guest:guest@localhost:5672/%2F" \
MODELS="golden_muscat flat_iceberg" \
PREFIX="" \
CDR_TOKEN="" \
MAX_SIZE=300

WORKDIR /src
Expand Down
13 changes: 9 additions & 4 deletions uploader/uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@

# rabbitmq uri
rabbitmq_uri = os.getenv("RABBITMQ_URI", "amqp://guest:guest@localhost:5672/%2F")

# prefix for the queue names
prefix = os.getenv("PREFIX", "")

# CDR url, token and max size for upload (in MB)
cdr_url = "https://api.cdr.land"
cdr_token = os.getenv("CDR_TOKEN", "")
max_size = int(os.getenv("MAX_SIZE", "300"))
Expand Down Expand Up @@ -56,14 +61,14 @@ def main():
channel = connection.channel()

# create queues
channel.queue_declare(queue="upload", durable=True)
channel.queue_declare(queue="upload.error", durable=True)
channel.queue_declare(queue=f"{prefix}upload", durable=True)
channel.queue_declare(queue=f"{prefix}upload.error", durable=True)

# listen for messages and stop if nothing found after 5 minutes
channel.basic_qos(prefetch_count=1)

# create generator to fetch messages
consumer = channel.consume(queue="upload", inactivity_timeout=1)
consumer = channel.consume(queue=f"{prefix}upload", inactivity_timeout=1)

# loop getting new messages
worker = None
Expand All @@ -78,7 +83,7 @@ def main():
data = json.loads(worker.body)
if worker.exception:
data['exception'] = repr(worker.exception)
channel.basic_publish(exchange='', routing_key=f"upload.error", body=json.dumps(data), properties=worker.properties)
channel.basic_publish(exchange='', routing_key=f"{prefix}upload.error", body=json.dumps(data), properties=worker.properties)
else:
logging.info(f"Finished all processing steps for map {data['cog_id']}")
channel.basic_ack(delivery_tag=worker.method.delivery_tag)
Expand Down

0 comments on commit 8cdc5d9

Please sign in to comment.