Skip to content

Commit

Permalink
Merge pull request #1 from DARPA-CRITICALMAAS/uncharted-hook
Browse files Browse the repository at this point in the history
new logic for cdrhook
  • Loading branch information
robkooper authored May 1, 2024
2 parents b79f730 + 1209a39 commit 500051e
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 58 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
- monitor
include:
- name: cdrhook
FOLDER: server
FOLDER: cdrhook
PLATFORM: "linux/amd64,linux/arm64"
IMAGE: criticalmaas-cdr
- name: uploader
Expand Down
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,19 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).

## [0.5.0] - 2024-04-29

This is a big change, instead of listening to `map.process` events we now listen for updates from
uncharted that has map_area, polygon_legend_area and line_point_legend_area data. Based on this the
system will trigger download messages.

### Added
- download endpoint to cdr to download cog_area json files from uncharted

### Changed
- new logic on when to trigger download message and for what model.
- server now uses /cdr/ as prefix to the hook/download code

## [0.4.0] - 2024-04-29

### Added
Expand All @@ -12,6 +25,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
### Changed
- renamed server folder to cdrhook
- monitor shows number of messages processing, "61 / 1" means 61 messages waiting, 1 being processed
- both uploader and pipeline are now part of profile pipeline

### Fixed
- RabbitMQ is now pinned to v3.13
Expand Down
2 changes: 2 additions & 0 deletions cdrhook/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ ENV PYTHONUNBUFFERED=1 \
RABBITMQ_URI="amqp://guest:guest@localhost:5672/%2F" \
PREFIX=""

VOLUME /data

COPY requirements.txt ./
RUN pip install -r ./requirements.txt

Expand Down
5 changes: 5 additions & 0 deletions cdrhook/models.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"golden_muscat": ["map_area", "polygon_legend_area"],
"flat_iceberg": ["map_area", "line_point_legend"],
"drab_volcano": ["map_area"]
}
224 changes: 169 additions & 55 deletions cdrhook/server.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from flask import Flask, request, abort, current_app
from flask import Flask, request, abort, current_app, send_from_directory
from flask_httpauth import HTTPBasicAuth
import os
import logging
Expand All @@ -11,10 +11,13 @@
import signal
import sys
from waitress.server import create_server
import threading
import time

auth = HTTPBasicAuth()
cdr_url = "https://api.cdr.land"

config = { }

# ----------------------------------------------------------------------
# HELPER
Expand Down Expand Up @@ -43,8 +46,8 @@ def verify_password(username, password):
Check username/password. If no username/password is set as
environment variables, default to anonymous.
"""
u = current_app.config["callback_username"]
p = current_app.config["callback_password"]
u = config["callback_username"]
p = config["callback_password"]
if not u or not p:
return "anonymous"
if username == u and password == p:
Expand All @@ -59,7 +62,7 @@ def send_message(message, queue):
"""
Send a message to the RabbitMQ queue
"""
parameters = pika.URLParameters(current_app.config["rabbitmq_uri"])
parameters = pika.URLParameters(config["rabbitmq_uri"])
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
channel.queue_declare(queue=queue, durable=True)
Expand All @@ -68,31 +71,86 @@ def send_message(message, queue):
channel.close()
connection.close()


# ----------------------------------------------------------------------
# Process incoming requests
# Process maps
# ----------------------------------------------------------------------
def process_map(cog_id, cog_url):
def check_uncharted_event(event_id):
"""
Process the map
If the event is an unchared event, we will download the data, and fire
the download event.
"""
logging.info("Processing COG %s from %s", cog_id, cog_url)
if not event_id.startswith("uncharted_0."):
return

# get the event information
headers = {'Authorization': f'Bearer {config["cdr_token"]}'}
r = requests.get(f"{cdr_url}/v1/maps/extractions/{event_id}", headers=headers)
r.raise_for_status()
data = r.json()

# parse the infomation
cog_id = None
map_area = None
polygon_legend_area = None
line_point_legend_area = None
cog_area = None
for extraction in data:
cog_id = extraction["cog_id"]
for area in extraction["cog_area_extractions"]:
if area["category"] == "map_area":
map_area = area
elif area["category"] == "polygon_legend_area":
polygon_legend_area = area
elif area["category"] == "line_point_legend_area":
line_point_legend_area = area
if map_area:
cog_area = extraction
if not cog_id or not map_area:
logging.error("Could not find cog_id or map_area in uncharted event")
return

# write the cog_area to disk
folder = os.path.join(cog_id[0:2], cog_id[2:4])
filepart = os.path.join(folder, cog_id)
filename = os.path.join("/data", f"{filepart}.cog_area.json")
os.makedirs(os.path.dirname(filename) , exist_ok=True)
with open(filename, "w") as outputfile:
json.dump(cog_area, outputfile)

# get the basic information
r = requests.get(f"{cdr_url}/v1/maps/cog/{cog_id}", headers=headers)
r.raise_for_status()
cog_info = r.json()

# send the download event
firemodels = [ ]
for k, v in config["models"].items():
goodmodel = True
if "map_area" in v and not map_area:
logging.debug("Skipping %s because of map_area", k)
goodmodel = False
if "polygon_legend_area" in v and not polygon_legend_area:
logging.debug("Skipping %s because of polygon_legend_area", k)
goodmodel = False
if "line_point_legend_area" in v and not line_point_legend_area:
logging.debug("Skipping %s because of line_point_legend_area", k)
goodmodel = False
if goodmodel:
firemodels.append(k)

message = {
"cog_id": cog_id,
"cog_url": cog_url,
"system": current_app.config["name"],
"version": current_app.config["version"]
"cog_url": cog_info["cog_url"],
"map_area": f'{config["callback_url"]}/download/{filepart}.cog_area.json',
"models": firemodels
}
headers = {'Authorization': f'Bearer {current_app.config["cdr_token"]}'}
r = requests.get(f"{cdr_url}/v1/maps/cog/{cog_id}", headers=headers)
r.raise_for_status()
message['metadata'] = r.json()
r = requests.get(f"{cdr_url}/v1/maps/cog/{cog_id}/results", headers=headers)
r.raise_for_status()
message['results'] = r.json()
send_message(message, f'{current_app.config["prefix"]}download')
logging.info("Firing download event for %s '%s'", cog_id, json.dumps(message))
send_message(message, f'{config["prefix"]}download')


# ----------------------------------------------------------------------
# Process incoming requests
# ----------------------------------------------------------------------
def validate_request(data, signature_header, secret):
"""
Validate the incoming request. This is a simple check to see if the
Expand All @@ -111,28 +169,75 @@ def hook():
Our main entry point for CDR calls
"""
# check the signature
validate_request(request.data, request.headers.get("x-cdr-signature-256"), current_app.config["callback_secret"])
if request.headers.get("x-cdr-signature-256"):
validate_request(request.data, request.headers.get("x-cdr-signature-256"), config["callback_secret"])
elif not request.headers.get("x-cdr-signature-256") == config["callback_secret"]:
abort(403, "Request signatures didn't match!")

send_message(request.get_json(), f'{config["prefix"]}cdrhook')
return {"ok": "success"}

data = request.get_json()
logging.info("Received event : %s", data.get("event"))
if data.get("event") == "ping":
pass

@auth.login_required
def download(filename):
"""
download the file
"""
logging.info(f"Received download request for {filename}")
return send_from_directory("/data", filename)

# ----------------------------------------------------------------------
# Start the server and register with the CDR
# ----------------------------------------------------------------------
def cdrhook_callback(channel, method, properties, body):
"""
Callback to process maps without required metadata. This will check
a map every 30 seconds to see if the metadata is available. If not
it will be added to the back of the queue.
"""
data = json.loads(body)

if not data.get("event"):
logging.error("No event in message")
elif data.get("event") == "ping":
logging.debug("ping/pong")
elif data.get("event") == "map.process":
try:
process_map(data["payload"]["cog_id"], data["payload"]["cog_url"])
except Exception as e:
logging.exception("Could not process hook")
mesg = {
"x-cdr-signature-256": request.headers.get("x-cdr-signature-256"),
"exception": str(e),
"data": request.data
}
send_message(mesg, f'{current_app.config["prefix"]}cdrhook.error')
elif current_app.config["cdr_keep_event"]:
send_message(data, f'{current_app.config["prefix"]}cdrhook.unknown')
logging.debug("ignoring map.process")
elif data.get("event") == "feature.process":
event_id = data.get("payload", {}).get("id", "")
if event_id.startswith("uncharted_0."):
check_uncharted_event(event_id)
else:
logging.debug(f"Ignoring feature.process with id {event_id}")
else:
logging.debug("Unknown event %s", data.get("event"))

if config["cdr_keep_event"]:
send_message(data, f'{config["prefix"]}cdrhook.unknown')

return {"ok": "success"}
channel.basic_ack(delivery_tag=method.delivery_tag)

def cdrhook_listener(config):
"""
Listen to the RabbitMQ queue and process the messages. The messages
are maps that need to be processed, but don't have the required
metadata yet.
"""
logging.info("Starting RabbitMQ listener")

# connect to rabbitmq
parameters = pika.URLParameters(config['rabbitmq_uri'])
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

# create all queues needed
channel.queue_declare(queue=f'{config["prefix"]}cdrhook', durable=True)
channel.queue_declare(queue=f'{config["prefix"]}cdrhook.error', durable=True)

# process messages
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue=f'{config["prefix"]}cdrhook', on_message_callback=cdrhook_callback, auto_ack=False)
channel.start_consuming()

# ----------------------------------------------------------------------
# Start the server and register with the CDR
Expand All @@ -144,7 +249,7 @@ def register_system(config):
registration = {
"name": config["name"],
"version": config["version"],
"callback_url": config["callback_url"],
"callback_url": f'{config["callback_url"]}/hook',
"webhook_secret": config["callback_secret"],
"auth_header": config["callback_username"],
"auth_token": config["callback_password"],
Expand Down Expand Up @@ -181,25 +286,34 @@ def create_app():

# set up the config variables
app = Flask(__name__)
app.config["name"] = os.getenv("SYSTEM_NAME")
app.config["version"] = os.getenv("SYSTEM_VERSION")
app.config["cdr_token"] = os.getenv("CDR_TOKEN")
app.config["callback_url"] = os.getenv("CALLBACK_URL")
app.config["callback_secret"] = os.getenv("CALLBACK_SECRET")
app.config["callback_username"] = os.getenv("CALLBACK_USERNAME")
app.config["callback_password"] = os.getenv("CALLBACK_PASSWORD")
app.config["rabbitmq_uri"] = os.getenv("RABBITMQ_URI")
app.config["prefix"] = os.getenv("PREFIX")
app.config["cdr_keep_event"] = strtobool(os.getenv("CDR_KEEP_EVENT", "no"))
config["name"] = os.getenv("SYSTEM_NAME")
config["version"] = os.getenv("SYSTEM_VERSION")
config["cdr_token"] = os.getenv("CDR_TOKEN")
config["callback_url"] = os.getenv("CALLBACK_URL")
config["callback_secret"] = os.getenv("CALLBACK_SECRET")
config["callback_username"] = os.getenv("CALLBACK_USERNAME")
config["callback_password"] = os.getenv("CALLBACK_PASSWORD")
config["rabbitmq_uri"] = os.getenv("RABBITMQ_URI")
config["prefix"] = os.getenv("PREFIX")
config["cdr_keep_event"] = strtobool(os.getenv("CDR_KEEP_EVENT", "no"))

# load the models
with open("models.json", "r") as f:
config["models"] = json.load(f)

# register with the CDR
registration = register_system(app.config)
app.config["registration"] = registration
registration = register_system(config)
config["registration"] = registration

# register the hook
path = urllib.parse.urlparse(app.config["callback_url"]).path
logging.info("Registering hook at %s", path)
app.route(path, methods=['POST'])(hook)
path = urllib.parse.urlparse(config["callback_url"]).path
app.route(os.path.join(path, "hook"), methods=['POST'])(hook)
app.route(os.path.join(path, "download", "<path:filename>"), methods=['GET'])(download)

# start daemon thread for rabbitmq
thread = threading.Thread(target=cdrhook_listener, args=(config,))
thread.daemon = True
thread.start()

# app has been created
return app
Expand All @@ -223,7 +337,7 @@ def create_app():
# this does not work.
def handle_sig(sig, frame):
logging.warning(f"Got signal {sig}, now close worker...")
unregister_system(app.config['cdr_token'], app.config['registration'])
unregister_system(config['cdr_token'], config['registration'])
sys.exit(0)

for sig in (signal.SIGINT, signal.SIGTERM, signal.SIGQUIT, signal.SIGHUP):
Expand Down
7 changes: 5 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,17 @@ services:
CDR_KEEP_EVENT: "no"
SYSTEM_NAME: ${SYSTEM_NAME}
SYSTEM_VERSION: ${SYSTEM_VERSION}
CALLBACK_URL: "https://${SERVER_NAME}${CALLBACK_PATH}"
CALLBACK_URL: "https://${SERVER_NAME}/cdr"
CALLBACK_SECRET: "${CALLBACK_SECRET}"
CALLBACK_USERNAME: "${CALLBACK_USERNAME}"
CALLBACK_PASSWORD: "${CALLBACK_PASSWORD}"
RABBITMQ_URI: "amqp://${RABBITMQ_USERNAME}:${RABBITMQ_PASSWORD}@rabbitmq/%2F"
PREFIX: ""
labels:
- "traefik.enable=true"
- "traefik.http.routers.cdrhook.rule=Host(`${SERVER_NAME}`) && PathPrefix(`${CALLBACK_PATH}`)"
- "traefik.http.routers.cdrhook.rule=Host(`${SERVER_NAME}`) && PathPrefix(`/cdr`)"
volumes:
- cdrhook:/data

# ----------------------------------------------------------------------
# RABBITMQ MONITOR
Expand Down Expand Up @@ -152,6 +154,7 @@ services:
volumes:
traefik:
rabbitmq:
cdrhook:
feedback:
data:
logs:
Expand Down

0 comments on commit 500051e

Please sign in to comment.