Skip to content

Commit

Permalink
release 0.7.0
Browse files Browse the repository at this point in the history
  • Loading branch information
robkooper committed May 3, 2024
1 parent dda537b commit 81d2a01
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 37 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).

## [0.7.0] - 2024-05-03

### Fixed
- cdrhook: restart rabbitmq listener in case of error
- cdrhook: in case of exception processing event move to error queue
- cdrhook: strip whitespaces from event id

## [0.6.0] - 2024-05-01

### Added
Expand Down
81 changes: 44 additions & 37 deletions cdrhook/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,6 @@ def check_uncharted_event(event_id):
If the event is an unchared event, we will download the data, and fire
the download event.
"""
if not event_id.startswith("uncharted_0."):
return

# get the event information
headers = {'Authorization': f'Bearer {config["cdr_token"]}'}
r = requests.get(f"{cdr_url}/v1/maps/extractions/{event_id}", headers=headers)
Expand Down Expand Up @@ -171,8 +168,8 @@ def hook():
# check the signature
if request.headers.get("x-cdr-signature-256"):
validate_request(request.data, request.headers.get("x-cdr-signature-256"), config["callback_secret"])
elif not request.headers.get("x-ncsa-secret") == config["callback_secret"]:
abort(403, "Request signatures didn't match!")
elif not request.headers.get("x-ncsa-secret", "") == config["callback_secret"]:
abort(403, "Callback secret didn't match!")

send_message(request.get_json(), f'{config["prefix"]}cdrhook')
return {"ok": "success"}
Expand All @@ -195,26 +192,31 @@ def cdrhook_callback(channel, method, properties, body):
a map every 30 seconds to see if the metadata is available. If not
it will be added to the back of the queue.
"""
data = json.loads(body)

if not data.get("event"):
logging.error("No event in message")
elif data.get("event") == "ping":
logging.debug("ping/pong")
elif data.get("event") == "map.process":
logging.debug("ignoring map.process")
elif data.get("event") == "feature.process":
event_id = data.get("payload", {}).get("id", "")
if event_id.startswith("uncharted_0."):
check_uncharted_event(event_id)
try:
data = json.loads(body)

if not data.get("event"):
logging.error("No event in message")
elif data.get("event") == "ping":
logging.debug("ping/pong")
elif data.get("event") == "map.process":
logging.debug("ignoring map.process")
elif data.get("event") == "feature.process":
event_id = data.get("payload", {}).get("id", "").strip()
if event_id.startswith("uncharted_0."):
check_uncharted_event(event_id)
else:
logging.debug(f"Ignoring feature.process with id {event_id}")
logging.debug(data)
else:
logging.debug(f"Ignoring feature.process with id {event_id}")
else:
logging.debug("Unknown event %s", data.get("event"))

if config["cdr_keep_event"]:
send_message(data, f'{config["prefix"]}cdrhook.unknown')

logging.debug("Unknown event %s", data.get("event"))

if config["cdr_keep_event"]:
send_message(data, f'{config["prefix"]}cdrhook.unknown')
except Exception as e:
logging.exception("Error processing cdrhook message.")
data["exception"] = repr(e)
send_message(data, f'{config["prefix"]}cdrhook.error')
channel.basic_ack(delivery_tag=method.delivery_tag)

def cdrhook_listener(config):
Expand All @@ -225,19 +227,24 @@ def cdrhook_listener(config):
"""
logging.info("Starting RabbitMQ listener")

# connect to rabbitmq
parameters = pika.URLParameters(config['rabbitmq_uri'])
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

# create all queues needed
channel.queue_declare(queue=f'{config["prefix"]}cdrhook', durable=True)
channel.queue_declare(queue=f'{config["prefix"]}cdrhook.error', durable=True)

# process messages
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue=f'{config["prefix"]}cdrhook', on_message_callback=cdrhook_callback, auto_ack=False)
channel.start_consuming()
while True:
try:
# connect to rabbitmq
parameters = pika.URLParameters(config['rabbitmq_uri'])
connection = pika.BlockingConnection(parameters)
channel = connection.channel()

# create all queues needed
channel.queue_declare(queue=f'{config["prefix"]}cdrhook', durable=True)
channel.queue_declare(queue=f'{config["prefix"]}cdrhook.error', durable=True)

# process messages
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue=f'{config["prefix"]}cdrhook', on_message_callback=cdrhook_callback, auto_ack=False)
channel.start_consuming()
except Exception as e:
logging.exception("Error running cdrhook.")
time.sleep(5)

# ----------------------------------------------------------------------
# Start the server and register with the CDR
Expand Down

0 comments on commit 81d2a01

Please sign in to comment.