diff --git a/.github/workflows/docker.yml b/.github/workflows/docker.yml index 6cdc293..8e4903a 100644 --- a/.github/workflows/docker.yml +++ b/.github/workflows/docker.yml @@ -23,7 +23,7 @@ jobs: - monitor include: - name: cdrhook - FOLDER: server + FOLDER: cdrhook PLATFORM: "linux/amd64,linux/arm64" IMAGE: criticalmaas-cdr - name: uploader diff --git a/CHANGELOG.md b/CHANGELOG.md index a5563cd..3ffd4c7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,19 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](http://keepachangelog.com/) and this project adheres to [Semantic Versioning](http://semver.org/). +## [0.5.0] - 2024-04-29 + +This is a big change, instead of listening to `map.process` events we now listen for updates from +uncharted that has map_area, polygon_legend_area and line_point_legend_area data. Based on this the +system will trigger download messages. + +### Added +- download endpoint to cdr to download cog_area json files from uncharted + +### Changed +- new logic on when to trigger download message and for what model. +- server now uses /cdr/ as prefix to the hook/download code + ## [0.4.0] - 2024-04-29 ### Added @@ -12,6 +25,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ### Changed - renamed server folder to cdrhook - monitor shows number of messages processing, "61 / 1" means 61 messages waiting, 1 being processed +- both uploader and pipeline are now part of profile pipeline ### Fixed - RabbitMQ is now pinned to v3.13 diff --git a/cdrhook/Dockerfile b/cdrhook/Dockerfile index d9e43a8..5e5f73f 100644 --- a/cdrhook/Dockerfile +++ b/cdrhook/Dockerfile @@ -15,6 +15,8 @@ ENV PYTHONUNBUFFERED=1 \ RABBITMQ_URI="amqp://guest:guest@localhost:5672/%2F" \ PREFIX="" +VOLUME /data + COPY requirements.txt ./ RUN pip install -r ./requirements.txt diff --git a/cdrhook/models.json b/cdrhook/models.json new file mode 100644 index 0000000..5915bc9 --- /dev/null +++ b/cdrhook/models.json @@ -0,0 +1,5 @@ +{ + "golden_muscat": ["map_area", "polygon_legend_area"], + "flat_iceberg": ["map_area", "line_point_legend"], + "drab_volcano": ["map_area"] +} diff --git a/cdrhook/server.py b/cdrhook/server.py index 839ddef..204c3fa 100644 --- a/cdrhook/server.py +++ b/cdrhook/server.py @@ -1,4 +1,4 @@ -from flask import Flask, request, abort, current_app +from flask import Flask, request, abort, current_app, send_from_directory from flask_httpauth import HTTPBasicAuth import os import logging @@ -11,10 +11,13 @@ import signal import sys from waitress.server import create_server +import threading +import time auth = HTTPBasicAuth() cdr_url = "https://api.cdr.land" +config = { } # ---------------------------------------------------------------------- # HELPER @@ -43,8 +46,8 @@ def verify_password(username, password): Check username/password. If no username/password is set as environment variables, default to anonymous. """ - u = current_app.config["callback_username"] - p = current_app.config["callback_password"] + u = config["callback_username"] + p = config["callback_password"] if not u or not p: return "anonymous" if username == u and password == p: @@ -59,7 +62,7 @@ def send_message(message, queue): """ Send a message to the RabbitMQ queue """ - parameters = pika.URLParameters(current_app.config["rabbitmq_uri"]) + parameters = pika.URLParameters(config["rabbitmq_uri"]) connection = pika.BlockingConnection(parameters) channel = connection.channel() channel.queue_declare(queue=queue, durable=True) @@ -68,31 +71,86 @@ def send_message(message, queue): channel.close() connection.close() - # ---------------------------------------------------------------------- -# Process incoming requests +# Process maps # ---------------------------------------------------------------------- -def process_map(cog_id, cog_url): +def check_uncharted_event(event_id): """ - Process the map + If the event is an unchared event, we will download the data, and fire + the download event. """ - logging.info("Processing COG %s from %s", cog_id, cog_url) + if not event_id.startswith("uncharted_0."): + return + + # get the event information + headers = {'Authorization': f'Bearer {config["cdr_token"]}'} + r = requests.get(f"{cdr_url}/v1/maps/extractions/{event_id}", headers=headers) + r.raise_for_status() + data = r.json() + + # parse the infomation + cog_id = None + map_area = None + polygon_legend_area = None + line_point_legend_area = None + cog_area = None + for extraction in data: + cog_id = extraction["cog_id"] + for area in extraction["cog_area_extractions"]: + if area["category"] == "map_area": + map_area = area + elif area["category"] == "polygon_legend_area": + polygon_legend_area = area + elif area["category"] == "line_point_legend_area": + line_point_legend_area = area + if map_area: + cog_area = extraction + if not cog_id or not map_area: + logging.error("Could not find cog_id or map_area in uncharted event") + return + + # write the cog_area to disk + folder = os.path.join(cog_id[0:2], cog_id[2:4]) + filepart = os.path.join(folder, cog_id) + filename = os.path.join("/data", f"{filepart}.cog_area.json") + os.makedirs(os.path.dirname(filename) , exist_ok=True) + with open(filename, "w") as outputfile: + json.dump(cog_area, outputfile) + + # get the basic information + r = requests.get(f"{cdr_url}/v1/maps/cog/{cog_id}", headers=headers) + r.raise_for_status() + cog_info = r.json() + + # send the download event + firemodels = [ ] + for k, v in config["models"].items(): + goodmodel = True + if "map_area" in v and not map_area: + logging.debug("Skipping %s because of map_area", k) + goodmodel = False + if "polygon_legend_area" in v and not polygon_legend_area: + logging.debug("Skipping %s because of polygon_legend_area", k) + goodmodel = False + if "line_point_legend_area" in v and not line_point_legend_area: + logging.debug("Skipping %s because of line_point_legend_area", k) + goodmodel = False + if goodmodel: + firemodels.append(k) + message = { "cog_id": cog_id, - "cog_url": cog_url, - "system": current_app.config["name"], - "version": current_app.config["version"] + "cog_url": cog_info["cog_url"], + "map_area": f'{config["callback_url"]}/download/{filepart}.cog_area.json', + "models": firemodels } - headers = {'Authorization': f'Bearer {current_app.config["cdr_token"]}'} - r = requests.get(f"{cdr_url}/v1/maps/cog/{cog_id}", headers=headers) - r.raise_for_status() - message['metadata'] = r.json() - r = requests.get(f"{cdr_url}/v1/maps/cog/{cog_id}/results", headers=headers) - r.raise_for_status() - message['results'] = r.json() - send_message(message, f'{current_app.config["prefix"]}download') + logging.info("Firing download event for %s '%s'", cog_id, json.dumps(message)) + send_message(message, f'{config["prefix"]}download') +# ---------------------------------------------------------------------- +# Process incoming requests +# ---------------------------------------------------------------------- def validate_request(data, signature_header, secret): """ Validate the incoming request. This is a simple check to see if the @@ -111,28 +169,75 @@ def hook(): Our main entry point for CDR calls """ # check the signature - validate_request(request.data, request.headers.get("x-cdr-signature-256"), current_app.config["callback_secret"]) + if request.headers.get("x-cdr-signature-256"): + validate_request(request.data, request.headers.get("x-cdr-signature-256"), config["callback_secret"]) + elif not request.headers.get("x-cdr-signature-256") == config["callback_secret"]: + abort(403, "Request signatures didn't match!") + + send_message(request.get_json(), f'{config["prefix"]}cdrhook') + return {"ok": "success"} - data = request.get_json() - logging.info("Received event : %s", data.get("event")) - if data.get("event") == "ping": - pass + +@auth.login_required +def download(filename): + """ + download the file + """ + logging.info(f"Received download request for {filename}") + return send_from_directory("/data", filename) + +# ---------------------------------------------------------------------- +# Start the server and register with the CDR +# ---------------------------------------------------------------------- +def cdrhook_callback(channel, method, properties, body): + """ + Callback to process maps without required metadata. This will check + a map every 30 seconds to see if the metadata is available. If not + it will be added to the back of the queue. + """ + data = json.loads(body) + + if not data.get("event"): + logging.error("No event in message") + elif data.get("event") == "ping": + logging.debug("ping/pong") elif data.get("event") == "map.process": - try: - process_map(data["payload"]["cog_id"], data["payload"]["cog_url"]) - except Exception as e: - logging.exception("Could not process hook") - mesg = { - "x-cdr-signature-256": request.headers.get("x-cdr-signature-256"), - "exception": str(e), - "data": request.data - } - send_message(mesg, f'{current_app.config["prefix"]}cdrhook.error') - elif current_app.config["cdr_keep_event"]: - send_message(data, f'{current_app.config["prefix"]}cdrhook.unknown') + logging.debug("ignoring map.process") + elif data.get("event") == "feature.process": + event_id = data.get("payload", {}).get("id", "") + if event_id.startswith("uncharted_0."): + check_uncharted_event(event_id) + else: + logging.debug(f"Ignoring feature.process with id {event_id}") + else: + logging.debug("Unknown event %s", data.get("event")) + + if config["cdr_keep_event"]: + send_message(data, f'{config["prefix"]}cdrhook.unknown') - return {"ok": "success"} + channel.basic_ack(delivery_tag=method.delivery_tag) + +def cdrhook_listener(config): + """ + Listen to the RabbitMQ queue and process the messages. The messages + are maps that need to be processed, but don't have the required + metadata yet. + """ + logging.info("Starting RabbitMQ listener") + # connect to rabbitmq + parameters = pika.URLParameters(config['rabbitmq_uri']) + connection = pika.BlockingConnection(parameters) + channel = connection.channel() + + # create all queues needed + channel.queue_declare(queue=f'{config["prefix"]}cdrhook', durable=True) + channel.queue_declare(queue=f'{config["prefix"]}cdrhook.error', durable=True) + + # process messages + channel.basic_qos(prefetch_count=1) + channel.basic_consume(queue=f'{config["prefix"]}cdrhook', on_message_callback=cdrhook_callback, auto_ack=False) + channel.start_consuming() # ---------------------------------------------------------------------- # Start the server and register with the CDR @@ -144,7 +249,7 @@ def register_system(config): registration = { "name": config["name"], "version": config["version"], - "callback_url": config["callback_url"], + "callback_url": f'{config["callback_url"]}/hook', "webhook_secret": config["callback_secret"], "auth_header": config["callback_username"], "auth_token": config["callback_password"], @@ -181,25 +286,34 @@ def create_app(): # set up the config variables app = Flask(__name__) - app.config["name"] = os.getenv("SYSTEM_NAME") - app.config["version"] = os.getenv("SYSTEM_VERSION") - app.config["cdr_token"] = os.getenv("CDR_TOKEN") - app.config["callback_url"] = os.getenv("CALLBACK_URL") - app.config["callback_secret"] = os.getenv("CALLBACK_SECRET") - app.config["callback_username"] = os.getenv("CALLBACK_USERNAME") - app.config["callback_password"] = os.getenv("CALLBACK_PASSWORD") - app.config["rabbitmq_uri"] = os.getenv("RABBITMQ_URI") - app.config["prefix"] = os.getenv("PREFIX") - app.config["cdr_keep_event"] = strtobool(os.getenv("CDR_KEEP_EVENT", "no")) + config["name"] = os.getenv("SYSTEM_NAME") + config["version"] = os.getenv("SYSTEM_VERSION") + config["cdr_token"] = os.getenv("CDR_TOKEN") + config["callback_url"] = os.getenv("CALLBACK_URL") + config["callback_secret"] = os.getenv("CALLBACK_SECRET") + config["callback_username"] = os.getenv("CALLBACK_USERNAME") + config["callback_password"] = os.getenv("CALLBACK_PASSWORD") + config["rabbitmq_uri"] = os.getenv("RABBITMQ_URI") + config["prefix"] = os.getenv("PREFIX") + config["cdr_keep_event"] = strtobool(os.getenv("CDR_KEEP_EVENT", "no")) + + # load the models + with open("models.json", "r") as f: + config["models"] = json.load(f) # register with the CDR - registration = register_system(app.config) - app.config["registration"] = registration + registration = register_system(config) + config["registration"] = registration # register the hook - path = urllib.parse.urlparse(app.config["callback_url"]).path - logging.info("Registering hook at %s", path) - app.route(path, methods=['POST'])(hook) + path = urllib.parse.urlparse(config["callback_url"]).path + app.route(os.path.join(path, "hook"), methods=['POST'])(hook) + app.route(os.path.join(path, "download", ""), methods=['GET'])(download) + + # start daemon thread for rabbitmq + thread = threading.Thread(target=cdrhook_listener, args=(config,)) + thread.daemon = True + thread.start() # app has been created return app @@ -223,7 +337,7 @@ def create_app(): # this does not work. def handle_sig(sig, frame): logging.warning(f"Got signal {sig}, now close worker...") - unregister_system(app.config['cdr_token'], app.config['registration']) + unregister_system(config['cdr_token'], config['registration']) sys.exit(0) for sig in (signal.SIGINT, signal.SIGTERM, signal.SIGQUIT, signal.SIGHUP): diff --git a/docker-compose.yml b/docker-compose.yml index 4c8cdb9..eea1244 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -66,7 +66,7 @@ services: CDR_KEEP_EVENT: "no" SYSTEM_NAME: ${SYSTEM_NAME} SYSTEM_VERSION: ${SYSTEM_VERSION} - CALLBACK_URL: "https://${SERVER_NAME}${CALLBACK_PATH}" + CALLBACK_URL: "https://${SERVER_NAME}/cdr" CALLBACK_SECRET: "${CALLBACK_SECRET}" CALLBACK_USERNAME: "${CALLBACK_USERNAME}" CALLBACK_PASSWORD: "${CALLBACK_PASSWORD}" @@ -74,7 +74,9 @@ services: PREFIX: "" labels: - "traefik.enable=true" - - "traefik.http.routers.cdrhook.rule=Host(`${SERVER_NAME}`) && PathPrefix(`${CALLBACK_PATH}`)" + - "traefik.http.routers.cdrhook.rule=Host(`${SERVER_NAME}`) && PathPrefix(`/cdr`)" + volumes: + - cdrhook:/data # ---------------------------------------------------------------------- # RABBITMQ MONITOR @@ -152,6 +154,7 @@ services: volumes: traefik: rabbitmq: + cdrhook: feedback: data: logs: