Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Serverless application infrastructure #225

Open
wants to merge 27 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 22 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 58 additions & 16 deletions benchmarks/wrappers/aws/python/handler.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,40 @@

import datetime, io, json, os, sys, uuid

# Add current directory to allow location of packages
sys.path.append(os.path.join(os.path.dirname(__file__), '.python_packages/lib/site-packages'))

# TODO: usual trigger
# implement support for S3 and others
def handler(event, context):

income_timestamp = datetime.datetime.now().timestamp()

# Flag to indicate whether the measurements should be returned as an HTTP
# response or via a result queue.
return_http = True

# Queue trigger
if ("Records" in event and event["Records"][0]["eventSource"] == 'aws:sqs'):
event = json.loads(event["Records"][0]["body"])

return_http = False

# Storage trigger
if ("Records" in event and "s3" in event["Records"][0]):
bucket_name = event["Records"][0]["s3"]["bucket"]["name"]
file_name = event["Records"][0]["s3"]["object"]["key"]

from function import storage
storage_inst = storage.storage.get_instance()

obj = storage_inst.get_object(bucket_name, file_name)
event = json.loads(obj)

return_http = False

# HTTP trigger with API Gateaway
if 'body' in event:
event = json.loads(event['body'])

# Run function and measure.
req_id = context.aws_request_id
event['request-id'] = req_id
event['income-timestamp'] = income_timestamp
Expand All @@ -24,6 +46,10 @@ def handler(event, context):
log_data = {
'output': ret['result']
}
if 'fns_triggered' in ret and ret['fns_triggered'] > 0:
log_data['fns_triggered'] = ret['fns_triggered']
if 'parent_execution_id' in ret:
log_data['parent_execution_id'] = ret['parent_execution_id']
if 'measurement' in ret:
log_data['measurement'] = ret['measurement']
if 'logs' in event:
Expand Down Expand Up @@ -55,17 +81,33 @@ def handler(event, context):
if "cold_start" in os.environ:
cold_start_var = os.environ["cold_start"]

return {
'statusCode': 200,
'body': json.dumps({
'begin': begin.strftime('%s.%f'),
'end': end.strftime('%s.%f'),
'results_time': results_time,
'is_cold': is_cold,
'result': log_data,
'request_id': context.aws_request_id,
'cold_start_var': cold_start_var,
'container_id': container_id,
})
}
stats = json.dumps({
'begin': begin.strftime('%s.%f'),
'end': end.strftime('%s.%f'),
'results_time': results_time,
'is_cold': is_cold,
'result': log_data,
'request_id': context.aws_request_id,
'cold_start_var': cold_start_var,
'container_id': container_id,
})

# Send the results onwards.
result_queue = os.getenv('RESULT_QUEUE')

if (return_http or result_queue is None):
# HTTP / library trigger, standalone function: return an HTTP response.
return {
'statusCode': 200,
'body': stats
}
else:
# Queue trigger, storage trigger, or application: write to a queue.
arn = context.invoked_function_arn.split(":")
region = arn[3]
account_id = arn[4]
queue_name = result_queue

from function import queue
queue_client = queue.queue(queue_name, account_id, region)
queue_client.send_message(stats)
14 changes: 14 additions & 0 deletions benchmarks/wrappers/aws/python/queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import boto3

class queue:
client = None

def __init__(self, queue_name: str, account_id: str, region: str):
self.client = boto3.client('sqs', region_name=region)
self.queue_url = f"https://sqs.{region}.amazonaws.com/{account_id}/{queue_name}"

def send_message(self, message: str):
self.client.send_message(
QueueUrl=self.queue_url,
MessageBody=message,
)
19 changes: 17 additions & 2 deletions benchmarks/wrappers/aws/python/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@ def unique_name(name):
random=str(uuid.uuid4()).split('-')[0]
)

def upload(self, bucket, file, filepath):
def upload(self, bucket, file, filepath, overwrite=False):
key_name = storage.unique_name(file)
if (overwrite):
key_name = file
self.client.upload_file(filepath, bucket, key_name)
return key_name

Expand All @@ -46,8 +48,21 @@ def download_stream(self, bucket, file):
data = io.BytesIO()
self.client.download_fileobj(bucket, file, data)
return data.getbuffer()


def get_object(self, bucket, file):
obj = self.client.get_object(Bucket=bucket, Key=file)
return obj['Body'].read().decode('utf-8')

def get_instance():
if storage.instance is None:
storage.instance = storage()
return storage.instance

def list_blobs(self, bucket):
res = self.client.list_objects(Bucket=bucket)

objs = []
for obj in res['Contents']:
objs.append(obj['Key'])

return objs
77 changes: 65 additions & 12 deletions benchmarks/wrappers/azure/python/handler.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,72 @@

import datetime, io, json, os, uuid
import base64
import datetime, io, json, logging, os, uuid

from azure.identity import ManagedIdentityCredential
from azure.storage.queue import QueueClient

import azure.functions as func


# TODO: usual trigger
# implement support for blob and others
def main(req: func.HttpRequest, context: func.Context) -> func.HttpResponse:
def handler_http(req: func.HttpRequest, context: func.Context) -> func.HttpResponse:
income_timestamp = datetime.datetime.now().timestamp()

req_json = req.get_json()

if 'connection_string' in req_json:
os.environ['STORAGE_CONNECTION_STRING'] = req_json['connection_string']

req_json['request-id'] = context.invocation_id
req_json['income-timestamp'] = income_timestamp

return func.HttpResponse(measure(req_json), mimetype="application/json")

def handler_queue(msg: func.QueueMessage, context: func.Context):
income_timestamp = datetime.datetime.now().timestamp()

payload = msg.get_json()

payload['request-id'] = context.invocation_id
payload['income-timestamp'] = income_timestamp

stats = measure(payload)

# Send the results onwards.
result_queue = os.getenv('RESULT_QUEUE')
storage_account = os.getenv('DATA_STORAGE_ACCOUNT')

if (result_queue and storage_account):
storage_account = os.getenv('STORAGE_ACCOUNT')

from . import queue
queue_client = queue.queue(result_queue, storage_account)
queue_client.send_message(stats)

def handler_storage(blob: func.InputStream, context: func.Context):
income_timestamp = datetime.datetime.now().timestamp()

payload = json.loads(blob.readline().decode('utf-8'))

payload['request-id'] = context.invocation_id
payload['income-timestamp'] = income_timestamp

stats = measure(payload)

# Send the results onwards.
result_queue = os.getenv('RESULT_QUEUE')
storage_account = os.getenv('DATA_STORAGE_ACCOUNT')

if (result_queue and storage_account):

from . import queue
queue_client = queue.queue(result_queue, storage_account)
queue_client.send_message(stats)

# Contains generic logic for gathering measurements for the function at hand,
# given a request JSON. Used by all handlers, regardless of the trigger.
def measure(req_json) -> str:
req_id = req_json['request-id']

begin = datetime.datetime.now()
# We are deployed in the same directory
from . import function
Expand All @@ -22,6 +76,10 @@ def main(req: func.HttpRequest, context: func.Context) -> func.HttpResponse:
log_data = {
'output': ret['result']
}
if 'fns_triggered' in ret and ret['fns_triggered'] > 0:
log_data['fns_triggered'] = ret['fns_triggered']
if 'parent_execution_id' in ret:
log_data['parent_execution_id'] = ret['parent_execution_id']
if 'measurement' in ret:
log_data['measurement'] = ret['measurement']
if 'logs' in req_json:
Expand All @@ -30,7 +88,6 @@ def main(req: func.HttpRequest, context: func.Context) -> func.HttpResponse:
from . import storage
storage_inst = storage.storage.get_instance()
b = req_json.get('logs').get('bucket')
req_id = context.invocation_id
storage_inst.upload_stream(b, '{}.json'.format(req_id),
io.BytesIO(json.dumps(log_data).encode('utf-8')))
results_end = datetime.datetime.now()
Expand Down Expand Up @@ -58,8 +115,7 @@ def main(req: func.HttpRequest, context: func.Context) -> func.HttpResponse:
cold_marker = True
is_cold_worker = True

return func.HttpResponse(
json.dumps({
return json.dumps({
'begin': begin.strftime('%s.%f'),
'end': end.strftime('%s.%f'),
'results_time': results_time,
Expand All @@ -68,8 +124,5 @@ def main(req: func.HttpRequest, context: func.Context) -> func.HttpResponse:
'is_cold_worker': is_cold_worker,
'container_id': container_id,
'environ_container_id': os.environ['CONTAINER_NAME'],
'request_id': context.invocation_id
}),
mimetype="application/json"
)

'request_id': req_id
})
15 changes: 15 additions & 0 deletions benchmarks/wrappers/azure/python/queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from azure.identity import ManagedIdentityCredential
from azure.storage.queue import QueueClient

class queue:
client = None

def __init__(self, queue_name: str, storage_account: str):
account_url = f"https://{storage_account}.queue.core.windows.net"
managed_credential = ManagedIdentityCredential()
self.client = QueueClient(account_url,
queue_name=queue_name,
credential=managed_credential)

def send_message(self, message: str):
self.client.send_message(message)
26 changes: 22 additions & 4 deletions benchmarks/wrappers/azure/python/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ def unique_name(name):
random=str(uuid.uuid4()).split('-')[0]
)

def upload(self, container, file, filepath):
def upload(self, container, file, filepath, overwrite=False):
with open(filepath, 'rb') as data:
return self.upload_stream(container, file, data)
return self.upload_stream(container, file, data, overwrite)

def download(self, container, file, filepath):
with open(filepath, 'wb') as download_file:
Expand All @@ -39,13 +39,15 @@ def download_directory(self, container, prefix, path):
os.makedirs(os.path.join(path, path_to_file), exist_ok=True)
self.download(container, file_name, os.path.join(path, file_name))

def upload_stream(self, container, file, data):
def upload_stream(self, container, file, data, overwrite=False):
key_name = storage.unique_name(file)
if (overwrite):
key_name = file
client = self.client.get_blob_client(
container=container,
blob=key_name
)
client.upload_blob(data)
client.upload_blob(data, overwrite=overwrite)
return key_name

def download_stream(self, container, file):
Expand All @@ -56,3 +58,19 @@ def get_instance():
if storage.instance is None:
storage.instance = storage()
return storage.instance

def get_object(self, container, key):
blob_client = self.client.get_blob_client(container=container, blob=key)
downloader = blob_client.download_blob(max_concurrency=1, encoding='UTF-8')
return downloader.readall()

def list_blobs(self, container):
client = self.client.get_container_client(container=container)

# Azure returns an iterator. Turn it into a list.
objs = []
res = client.list_blob_names()
for obj in res:
objs.append(obj)

return objs
Loading