diff --git a/components/alibi-detect-server/adserver/server.py b/components/alibi-detect-server/adserver/server.py index 58f4b66195..95351003de 100644 --- a/components/alibi-detect-server/adserver/server.py +++ b/components/alibi-detect-server/adserver/server.py @@ -146,29 +146,21 @@ def get_request_handler(protocol, request: Dict) -> RequestHandler: raise Exception(f"Unknown protocol {protocol}") -def sendCloudEvent(event: v1.Event, url: str): +def forward_request(headers, data, url): """ - Send CloudEvent + Forward request Parameters ---------- - event - CloudEvent to send + headers + Headers to forward + data + Data to forward url - Url to send event + Url to forward to """ - http_marshaller = marshaller.NewDefaultHTTPMarshaller() - binary_headers, binary_data = http_marshaller.ToRequest( - event, converters.TypeBinary, json.dumps - ) - - logging.info("binary CloudEvent") - for k, v in binary_headers.items(): - logging.info("{0}: {1}\r\n".format(k, v)) - logging.info(binary_data) - - response = requests.post(url, headers=binary_headers, data=binary_data) + response = requests.post(url, headers=headers, data=data) response.raise_for_status() @@ -252,27 +244,73 @@ def post(self): else: logging.error("Metrics returned are invalid: " + str(runtime_metrics)) - if response.data is not None: + revent = create_cloud_event( + response.data, + self.event_type, + self.event_source, + event_id=event.EventID(), + extensions=event.Extensions(), + ) + if response.data is not None: # Create event from response if reply_url is active + revent_headers, revent_data = http_marshaller.ToRequest( + revent, converters.TypeBinary, json.dumps + ) + if not self.reply_url == "": - if event.EventID() is None or event.EventID() == "": - resp_event_id = uuid.uuid1().hex - else: - resp_event_id = event.EventID() - revent = ( - v1.Event() - .SetContentType("application/json") - .SetData(response.data) - .SetEventID(resp_event_id) - .SetSource(self.event_source) - .SetEventType(self.event_type) - .SetExtensions(event.Extensions()) - ) logging.debug(json.dumps(revent.Properties())) - sendCloudEvent(revent, self.reply_url) - self.write(json.dumps(response.data)) + logging.info("binary CloudEvent") + for k, v in revent_headers.items(): + logging.info("{0}: {1}\r\n".format(k, v)) + logging.info(revent_data) + forward_request(revent_headers, revent_data, self.reply_url) + + self.set_header("Content-Type", "application/json") + for headers in revent_headers: + self.set_header(headers, revent_headers[headers]) + self.write(revent_data) + + +def create_cloud_event( + data: dict, + event_type: str, + event_source: str, + extensions: dict, + event_id: str = None, +) -> v1.Event: + """ + Create a CloudEvent + + Parameters + ---------- + data + The data to send + event_type + The CE event type + event_source + The CE event source + extensions + Any extensions to add + event_id + The event id + Returns + ------- + A CloudEvent + """ + if event_id is None or event_id == "": + event_id = uuid.uuid1().hex + + event = ( + v1.Event() + .SetData(data) + .SetEventID(event_id if event_id else str(uuid.uuid1().hex)) + .SetSource(event_source) + .SetEventType(event_type) + .SetExtensions(extensions) + ) + return event class LivenessHandler(tornado.web.RequestHandler): def get(self):