Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

new logic for cdrhook #1

Merged
merged 5 commits into from
May 1, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
- monitor
include:
- name: cdrhook
FOLDER: server
FOLDER: cdrhook
PLATFORM: "linux/amd64,linux/arm64"
IMAGE: criticalmaas-cdr
- name: uploader
Expand Down
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,19 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).

## [0.5.0] - 2024-04-29

This is a big change, instead of listening to `map.process` events we now listen for updates from
uncharted that has map_area, polygon_legend_area and line_point_legend_area data. Based on this the
system will trigger download messages.

### Added
- download endpoint to cdr to download cog_area json files from uncharted

### Changed
- new logic on when to trigger download message and for what model.
- server now uses /cdr/ as prefix to the hook/download code

## [0.4.0] - 2024-04-29

### Added
Expand All @@ -12,6 +25,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
### Changed
- renamed server folder to cdrhook
- monitor shows number of messages processing, "61 / 1" means 61 messages waiting, 1 being processed
- both uploader and pipeline are now part of profile pipeline

### Fixed
- RabbitMQ is now pinned to v3.13
Expand Down
2 changes: 2 additions & 0 deletions cdrhook/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ ENV PYTHONUNBUFFERED=1 \
RABBITMQ_URI="amqp://guest:guest@localhost:5672/%2F" \
PREFIX=""

VOLUME /data

COPY requirements.txt ./
RUN pip install -r ./requirements.txt

Expand Down
5 changes: 5 additions & 0 deletions cdrhook/models.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"golden_muscat": ["map_area", "polygon_legend_area"],
"flat_iceberg": ["map_area", "line_point_legend"],
"drab_volcano": ["map_area"]
}
222 changes: 167 additions & 55 deletions cdrhook/server.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from flask import Flask, request, abort, current_app
from flask import Flask, request, abort, current_app, send_from_directory
from flask_httpauth import HTTPBasicAuth
import os
import logging
Expand All @@ -11,10 +11,13 @@
import signal
import sys
from waitress.server import create_server
import threading
import time

auth = HTTPBasicAuth()
cdr_url = "https://api.cdr.land"

config = { }

# ----------------------------------------------------------------------
# HELPER
Expand Down Expand Up @@ -43,8 +46,8 @@ def verify_password(username, password):
Check username/password. If no username/password is set as
environment variables, default to anonymous.
"""
u = current_app.config["callback_username"]
p = current_app.config["callback_password"]
u = config["callback_username"]
p = config["callback_password"]
if not u or not p:
return "anonymous"
if username == u and password == p:
Expand All @@ -59,7 +62,7 @@ def send_message(message, queue):
"""
Send a message to the RabbitMQ queue
"""
parameters = pika.URLParameters(current_app.config["rabbitmq_uri"])
parameters = pika.URLParameters(config["rabbitmq_uri"])
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue=queue, durable=True)
Expand All @@ -68,31 +71,86 @@ def send_message(message, queue):
channel.close()
connection.close()


# ----------------------------------------------------------------------
# Process incoming requests
# Process maps
# ----------------------------------------------------------------------
def process_map(cog_id, cog_url):
def check_uncharted_event(event_id):
"""
Process the map
If the event is an unchared event, we will download the data, and fire
the download event.
"""
logging.info("Processing COG %s from %s", cog_id, cog_url)
if not event_id.startswith("uncharted_0."):
return

# get the event information
headers = {'Authorization': f'Bearer {config["cdr_token"]}'}
r = requests.get(f"{cdr_url}/v1/maps/extractions/{event_id}", headers=headers)
r.raise_for_status()
data = r.json()

# parse the infomation
cog_id = None
map_area = None
polygon_legend_area = None
line_point_legend_area = None
cog_area = None
for extraction in data:
cog_id = extraction["cog_id"]
for area in extraction["cog_area_extractions"]:
if area["category"] == "map_area":
map_area = area
elif area["category"] == "polygon_legend_area":
polygon_legend_area = area
elif area["category"] == "line_point_legend_area":
line_point_legend_area = area
if map_area:
cog_area = extraction
if not cog_id or not map_area:
logging.error("Could not find cog_id or map_area in uncharted event")
return

# write the cog_area to disk
folder = os.path.join(cog_id[0:2], cog_id[2:4])
filepart = os.path.join(folder, cog_id)
filename = os.path.join("/data", f"{filepart}.cog_area.json")
os.makedirs(os.path.dirname(filename) , exist_ok=True)
with open(filename, "w") as outputfile:
json.dump(cog_area, outputfile)

# get the basic information
r = requests.get(f"{cdr_url}/v1/maps/cog/{cog_id}", headers=headers)
r.raise_for_status()
cog_info = r.json()

# send the download event
firemodels = [ ]
for k, v in config["models"].items():
goodmodel = True
if "map_area" in v and not map_area:
logging.debug("Skipping %s because of map_area", k)
goodmodel = False
if "polygon_legend_area" in v and not polygon_legend_area:
logging.debug("Skipping %s because of polygon_legend_area", k)
goodmodel = False
if "line_point_legend_area" in v and not line_point_legend_area:
logging.debug("Skipping %s because of line_point_legend_area", k)
goodmodel = False
if goodmodel:
firemodels.append(k)

message = {
"cog_id": cog_id,
"cog_url": cog_url,
"system": current_app.config["name"],
"version": current_app.config["version"]
"cog_url": cog_info["cog_url"],
"map_area": f'{config["callback_url"]}/download/{filepart}.cog_area.json',
"models": firemodels
}
headers = {'Authorization': f'Bearer {current_app.config["cdr_token"]}'}
r = requests.get(f"{cdr_url}/v1/maps/cog/{cog_id}", headers=headers)
r.raise_for_status()
message['metadata'] = r.json()
r = requests.get(f"{cdr_url}/v1/maps/cog/{cog_id}/results", headers=headers)
r.raise_for_status()
message['results'] = r.json()
send_message(message, f'{current_app.config["prefix"]}download')
logging.info("Firing download event for %s '%s'", cog_id, json.dumps(message))
send_message(message, f'{config["prefix"]}download')


# ----------------------------------------------------------------------
# Process incoming requests
# ----------------------------------------------------------------------
def validate_request(data, signature_header, secret):
"""
Validate the incoming request. This is a simple check to see if the
Expand All @@ -111,28 +169,73 @@ def hook():
Our main entry point for CDR calls
"""
# check the signature
validate_request(request.data, request.headers.get("x-cdr-signature-256"), current_app.config["callback_secret"])
if request.headers.get("x-cdr-signature-256"):
validate_request(request.data, request.headers.get("x-cdr-signature-256"), config["callback_secret"])

send_message(request.get_json(), f'{config["prefix"]}cdrhook')
return {"ok": "success"}


data = request.get_json()
logging.info("Received event : %s", data.get("event"))
if data.get("event") == "ping":
pass
@auth.login_required
def donwload(filename):
"""
download the file
"""
logging.info(f"Received download request for {filename}")
return send_from_directory("/data", filename)

# ----------------------------------------------------------------------
# Start the server and register with the CDR
# ----------------------------------------------------------------------
def cdrhook_callback(channel, method, properties, body):
"""
Callback to process maps without required metadata. This will check
a map every 30 seconds to see if the metadata is available. If not
it will be added to the back of the queue.
"""
data = json.loads(body)

if not data.get("event"):
logging.error("No event in message")
elif data.get("event") == "ping":
logging.debug("ping/pong")
elif data.get("event") == "map.process":
try:
process_map(data["payload"]["cog_id"], data["payload"]["cog_url"])
except Exception as e:
logging.exception("Could not process hook")
mesg = {
"x-cdr-signature-256": request.headers.get("x-cdr-signature-256"),
"exception": str(e),
"data": request.data
}
send_message(mesg, f'{current_app.config["prefix"]}cdrhook.error')
elif current_app.config["cdr_keep_event"]:
send_message(data, f'{current_app.config["prefix"]}cdrhook.unknown')
logging.debug("ignoring map.process")
elif data.get("event") == "feature.process":
event_id = data.get("payload", {}).get("id", "")
if event_id.startswith("uncharted_0."):
check_uncharted_event(event_id)
else:
logging.debug(f"Ignoring feature.process with id {event_id}")
else:
logging.debug("Unknown event %s", data.get("event"))

if config["cdr_keep_event"]:
send_message(data, f'{config["prefix"]}cdrhook.unknown')

return {"ok": "success"}
channel.basic_ack(delivery_tag=method.delivery_tag)

def cdrhook_listener(config):
"""
Listen to the RabbitMQ queue and process the messages. The messages
are maps that need to be processed, but don't have the required
metadata yet.
"""
logging.info("Starting RabbitMQ listener")

# connect to rabbitmq
parameters = pika.URLParameters(config['rabbitmq_uri'])
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

# create all queues needed
channel.queue_declare(queue=f'{config["prefix"]}cdrhook', durable=True)
channel.queue_declare(queue=f'{config["prefix"]}cdrhook.error', durable=True)

# process messages
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue=f'{config["prefix"]}cdrhook', on_message_callback=cdrhook_callback, auto_ack=False)
channel.start_consuming()

# ----------------------------------------------------------------------
# Start the server and register with the CDR
Expand All @@ -144,7 +247,7 @@ def register_system(config):
registration = {
"name": config["name"],
"version": config["version"],
"callback_url": config["callback_url"],
"callback_url": f'{config["callback_url"]}/hook',
"webhook_secret": config["callback_secret"],
"auth_header": config["callback_username"],
"auth_token": config["callback_password"],
Expand Down Expand Up @@ -181,25 +284,34 @@ def create_app():

# set up the config variables
app = Flask(__name__)
app.config["name"] = os.getenv("SYSTEM_NAME")
app.config["version"] = os.getenv("SYSTEM_VERSION")
app.config["cdr_token"] = os.getenv("CDR_TOKEN")
app.config["callback_url"] = os.getenv("CALLBACK_URL")
app.config["callback_secret"] = os.getenv("CALLBACK_SECRET")
app.config["callback_username"] = os.getenv("CALLBACK_USERNAME")
app.config["callback_password"] = os.getenv("CALLBACK_PASSWORD")
app.config["rabbitmq_uri"] = os.getenv("RABBITMQ_URI")
app.config["prefix"] = os.getenv("PREFIX")
app.config["cdr_keep_event"] = strtobool(os.getenv("CDR_KEEP_EVENT", "no"))
config["name"] = os.getenv("SYSTEM_NAME")
config["version"] = os.getenv("SYSTEM_VERSION")
config["cdr_token"] = os.getenv("CDR_TOKEN")
config["callback_url"] = os.getenv("CALLBACK_URL")
config["callback_secret"] = os.getenv("CALLBACK_SECRET")
config["callback_username"] = os.getenv("CALLBACK_USERNAME")
config["callback_password"] = os.getenv("CALLBACK_PASSWORD")
config["rabbitmq_uri"] = os.getenv("RABBITMQ_URI")
config["prefix"] = os.getenv("PREFIX")
config["cdr_keep_event"] = strtobool(os.getenv("CDR_KEEP_EVENT", "no"))

# load the models
with open("models.json", "r") as f:
config["models"] = json.load(f)

# register with the CDR
registration = register_system(app.config)
app.config["registration"] = registration
registration = register_system(config)
config["registration"] = registration

# register the hook
path = urllib.parse.urlparse(app.config["callback_url"]).path
logging.info("Registering hook at %s", path)
app.route(path, methods=['POST'])(hook)
path = urllib.parse.urlparse(config["callback_url"]).path
app.route(os.path.join(path, "hook"), methods=['POST'])(hook)
app.route(os.path.join(path, "download", "<path:filename>"), methods=['GET'])(donwload)

# start daemon thread for rabbitmq
thread = threading.Thread(target=cdrhook_listener, args=(config,))
thread.daemon = True
thread.start()

# app has been created
return app
Expand All @@ -223,7 +335,7 @@ def create_app():
# this does not work.
def handle_sig(sig, frame):
logging.warning(f"Got signal {sig}, now close worker...")
unregister_system(app.config['cdr_token'], app.config['registration'])
unregister_system(config['cdr_token'], config['registration'])
sys.exit(0)

for sig in (signal.SIGINT, signal.SIGTERM, signal.SIGQUIT, signal.SIGHUP):
Expand Down
7 changes: 5 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,17 @@ services:
CDR_KEEP_EVENT: "no"
SYSTEM_NAME: ${SYSTEM_NAME}
SYSTEM_VERSION: ${SYSTEM_VERSION}
CALLBACK_URL: "https://${SERVER_NAME}${CALLBACK_PATH}"
CALLBACK_URL: "https://${SERVER_NAME}/cdr"
CALLBACK_SECRET: "${CALLBACK_SECRET}"
CALLBACK_USERNAME: "${CALLBACK_USERNAME}"
CALLBACK_PASSWORD: "${CALLBACK_PASSWORD}"
RABBITMQ_URI: "amqp://${RABBITMQ_USERNAME}:${RABBITMQ_PASSWORD}@rabbitmq/%2F"
PREFIX: ""
labels:
- "traefik.enable=true"
- "traefik.http.routers.cdrhook.rule=Host(`${SERVER_NAME}`) && PathPrefix(`${CALLBACK_PATH}`)"
- "traefik.http.routers.cdrhook.rule=Host(`${SERVER_NAME}`) && PathPrefix(`/cdr`)"
volumes:
- cdrhook:/data

# ----------------------------------------------------------------------
# RABBITMQ MONITOR
Expand Down Expand Up @@ -152,6 +154,7 @@ services:
volumes:
traefik:
rabbitmq:
cdrhook:
feedback:
data:
logs:
Expand Down