Skip to content

Commit

Permalink
refactor server post handler to return CE-compatible responses
Browse files Browse the repository at this point in the history
  • Loading branch information
michaelcheah committed Feb 19, 2024
1 parent 4c57cc7 commit ee0c6d0
Showing 1 changed file with 70 additions and 32 deletions.
102 changes: 70 additions & 32 deletions components/alibi-detect-server/adserver/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,29 +146,21 @@ def get_request_handler(protocol, request: Dict) -> RequestHandler:
raise Exception(f"Unknown protocol {protocol}")


def sendCloudEvent(event: v1.Event, url: str):
def forward_request(headers, data, url):
"""
Send CloudEvent
Forward request
Parameters
----------
event
CloudEvent to send
headers
Headers to forward
data
Data to forward
url
Url to send event
Url to forward to
"""
http_marshaller = marshaller.NewDefaultHTTPMarshaller()
binary_headers, binary_data = http_marshaller.ToRequest(
event, converters.TypeBinary, json.dumps
)

logging.info("binary CloudEvent")
for k, v in binary_headers.items():
logging.info("{0}: {1}\r\n".format(k, v))
logging.info(binary_data)

response = requests.post(url, headers=binary_headers, data=binary_data)
response = requests.post(url, headers=headers, data=data)
response.raise_for_status()


Expand Down Expand Up @@ -252,27 +244,73 @@ def post(self):
else:
logging.error("Metrics returned are invalid: " + str(runtime_metrics))

if response.data is not None:
revent = create_cloud_event(
response.data,
self.event_type,
self.event_source,
event_id=event.EventID(),
extensions=event.Extensions(),
)

if response.data is not None:
# Create event from response if reply_url is active
revent_headers, revent_data = http_marshaller.ToRequest(
revent, converters.TypeBinary, json.dumps
)

if not self.reply_url == "":
if event.EventID() is None or event.EventID() == "":
resp_event_id = uuid.uuid1().hex
else:
resp_event_id = event.EventID()
revent = (
v1.Event()
.SetContentType("application/json")
.SetData(response.data)
.SetEventID(resp_event_id)
.SetSource(self.event_source)
.SetEventType(self.event_type)
.SetExtensions(event.Extensions())
)
logging.debug(json.dumps(revent.Properties()))
sendCloudEvent(revent, self.reply_url)
self.write(json.dumps(response.data))
logging.info("binary CloudEvent")
for k, v in revent_headers.items():
logging.info("{0}: {1}\r\n".format(k, v))
logging.info(revent_data)
forward_request(revent_headers, revent_data, self.reply_url)

self.set_header("Content-Type", "application/json")
for headers in revent_headers:
self.set_header(headers, revent_headers[headers])
self.write(revent_data)


def create_cloud_event(
data: dict,
event_type: str,
event_source: str,
extensions: dict,
event_id: str = None,
) -> v1.Event:
"""
Create a CloudEvent
Parameters
----------
data
The data to send
event_type
The CE event type
event_source
The CE event source
extensions
Any extensions to add
event_id
The event id
Returns
-------
A CloudEvent
"""
if event_id is None or event_id == "":
event_id = uuid.uuid1().hex

event = (
v1.Event()
.SetData(data)
.SetEventID(event_id if event_id else str(uuid.uuid1().hex))
.SetSource(event_source)
.SetEventType(event_type)
.SetExtensions(extensions)
)
return event

class LivenessHandler(tornado.web.RequestHandler):
def get(self):
Expand Down

0 comments on commit ee0c6d0

Please sign in to comment.