Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[22027] Use extra_data to propagate HW Constraints node data #58

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sustainml_modules/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
carbontracker
flask
flask-socketio==5.2.0
kaggle
Expand Down
2 changes: 1 addition & 1 deletion sustainml_modules/sustainml_modules/sustainml-wp2
2 changes: 1 addition & 1 deletion sustainml_modules/sustainml_modules/sustainml-wp3
36 changes: 22 additions & 14 deletions sustainml_modules/sustainml_modules/sustainml-wp5/backend_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import threading
import time
import signal
import sustainml_swig
import sys
from orchestrator_node import orchestrator_node, utils
from werkzeug.serving import make_server
Expand All @@ -30,27 +31,28 @@
# Flask server default route
@server.route('/')
def hello_world():
return jsonify({'mesage': 'Hello world! Use "/terminate" route to stop Back-end node.<br>'}), 200
return jsonify({'mesage': 'Hello world! Use "/terminate" route to stop Back-end node.'}), 200

# Send user input data to orchestrator
@server.route('/user_input', methods=['POST'])
def user_input():
data = request.json
ret = orchestrator.send_user_input(data)
if not ret:
task_id = orchestrator.send_user_input(data)
if task_id is None:
return jsonify({'error': 'Invalid input data'}), 400
return jsonify({'message': 'User input data sent successfully.<br>'}), 200
return jsonify({'message': 'User input data sent successfully.',
'task_id': utils.task_json(task_id)}), 200

# Retrieve Node status methods
@server.route('/status', methods=['GET'])
def status():
return jsonify({'status': f'{orchestrator.get_all_status()}'}), 200
return jsonify({'status': orchestrator.get_all_status()}), 200

@server.route('/status', methods=['POST'])
def status_args():
data = request.json
node_id = data.get('node_id')
return jsonify({'status': f'{orchestrator.get_status(node_id)}'}), 200
return jsonify({'status': orchestrator.get_status(node_id)}), 200

# Retrieve Node results methods
@server.route('/results', methods=['GET'])
Expand All @@ -64,20 +66,26 @@ def results():
model = orchestrator.get_results(utils.node_id.ML_MODEL_PROVIDER.value, last_task_id)
hardware = orchestrator.get_results(utils.node_id.HW_PROVIDER.value, last_task_id)
carbontracker = orchestrator.get_results(utils.node_id.CARBONTRACKER.value, last_task_id)
json = {f'{utils.string_node(utils.node_id.APP_REQUIREMENTS.value)}': f'{app_req}',
f'{utils.string_node(utils.node_id.ML_MODEL_METADATA.value)}': f'{metadata}',
f'{utils.string_node(utils.node_id.HW_CONSTRAINTS.value)}': f'{constraints}',
f'{utils.string_node(utils.node_id.ML_MODEL_PROVIDER.value)}': f'{model}',
f'{utils.string_node(utils.node_id.HW_PROVIDER.value)}': f'{hardware}',
f'{utils.string_node(utils.node_id.CARBONTRACKER.value)}': f'{carbontracker}'}
task_json = {'problem_id': last_task_id.problem_id(), 'iteration_id': last_task_id.iteration_id()}
json = {utils.string_node(utils.node_id.APP_REQUIREMENTS.value): app_req,
utils.string_node(utils.node_id.ML_MODEL_METADATA.value): metadata,
utils.string_node(utils.node_id.HW_CONSTRAINTS.value): constraints,
utils.string_node(utils.node_id.ML_MODEL_PROVIDER.value): model,
utils.string_node(utils.node_id.HW_PROVIDER.value): hardware,
utils.string_node(utils.node_id.CARBONTRACKER.value): carbontracker,
'task_id': task_json}
return jsonify(json), 200

@server.route('/results', methods=['POST'])
def results_args():
data = request.json
node_id = data.get('node_id')
task_id = data.get('task_id')
return jsonify({f'{utils.string_node(node_id)}': f'{orchestrator.get_results(node_id, task_id)}'}), 200
json_task = data.get('task_id')
if json_task is not None:
task_id = sustainml_swig.set_task_id(json_task.get('problem_id', 0), json_task.get('iteration_id', 0))
else:
task_id = None
return jsonify({utils.string_node(node_id): orchestrator.get_results(node_id, task_id)}), 200

# Flask server shutdown route
@server.route('/shutdown', methods=['GET'])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,22 @@
"""SustainML Orchestrator Node API specification."""

from . import utils
import numpy as np

from sustainml_swig import OrchestratorNodeHandle as cpp_OrchestratorNodeHandle
from sustainml_swig import OrchestratorNode as cpp_OrchestratorNode
from sustainml_swig import NodeStatus
import sustainml_swig
import threading

class OrchestratorNodeHandle(cpp_OrchestratorNodeHandle):

def __init__(self):

self.node_status_ = {}
self.condition = threading.Condition()
self.last_task_id = None
self.node_status_ = {}
self.result_status = {}
# Parent class constructor
super().__init__()

Expand All @@ -46,14 +50,37 @@ def on_new_node_output(
self,
id : int,
data):
task = sustainml_swig.get_task_id(id, data)
if (self.last_task_id is None and task is not None) or (
self.last_task_id is not None and task is not None and task > self.last_task_id):
self.last_task_id = task
if task is None:
task_id = sustainml_swig.get_task_id(id, data)
if task_id is None:
print(utils.string_node(id), "node output received.")
else:
print(utils.string_node(id), "node output received from task", utils.string_task(task))
print(utils.string_node(id), "node output received from task", utils.string_task(task_id))
self.register_result(task_id, id)

def register_task(self, task_id):
with self.condition:
if (self.last_task_id is None and task_id is not None) or (
self.last_task_id is not None and task_id is not None and task_id > self.last_task_id):
self.last_task_id = task_id
self.result_status[utils.string_task(task_id)] = {
utils.node_id.APP_REQUIREMENTS.value: False,
utils.node_id.CARBONTRACKER.value: False,
utils.node_id.HW_CONSTRAINTS.value: False,
utils.node_id.HW_PROVIDER.value: False,
utils.node_id.ML_MODEL_METADATA.value: False,
utils.node_id.ML_MODEL_PROVIDER.value: False
}

def register_result(self, task_id, node_id):
with self.condition:
if utils.string_task(task_id) not in self.result_status:
self.register_task(task_id)
self.result_status[utils.string_task(task_id)][node_id] = True
self.condition.notify_all()

def results_available(self, task_id, node_id):
with self.condition:
return self.result_status[utils.string_task(task_id)].get(node_id, False)

class Orchestrator:

Expand All @@ -76,110 +103,150 @@ def get_last_task_id(self):
return self.handler_.last_task_id

def get_all_status(self):
output = ""
json_output = {}
for key, value in self.handler_.node_status_.items():
output += utils.string_node(key) + " node status " + utils.string_status(value) + "<br>"
if output == "":
output = "No nodes have reported their status yet.\n"
return output
json_output[utils.string_node(key)] = utils.string_status(value)
return json_output

def get_status(self, node_id):
if node_id in self.handler_.node_status_:
return utils.string_status(self.handler_.node_status_[node_id])
if node_id is None:
return self.get_all_status()
else:
return utils.string_status(utils.node_status.INACTIVE.value)
if node_id in self.handler_.node_status_:
return utils.string_status(self.handler_.node_status_[node_id])
else:
return utils.string_status(utils.node_status.INACTIVE.value)

def get_app_requirements(self, task_id):
with self.handler_.condition:
while not self.handler_.results_available(task_id, utils.node_id.APP_REQUIREMENTS.value):
self.handler_.condition.wait()

# retrieve node data
node_data = sustainml_swig.get_app_requirements(self.node_, task_id)
if node_data is None:
return {'Error': f"Failed to get {utils.string_node(utils.node_id.APP_REQUIREMENTS.value)} data for task {utils.string_task(task_id)}<br>"}
return {'Error': f"Failed to get {utils.string_node(utils.node_id.APP_REQUIREMENTS.value)} data for task {utils.string_task(task_id)}"}

# Parse data into json
task_json = {'problem_id': task_id.problem_id(), 'iteration_id': task_id.iteration_id()}
app_requirements_str_list = node_data.app_requirements()
json_output = {'app_requirements': f'{utils.string_std_vector(app_requirements_str_list)}<br>'}
json_output = {'task_id': task_json,
'app_requirements': utils.string_std_vector(app_requirements_str_list)}
return json_output

def get_model_metadata(self, task_id):
with self.handler_.condition:
while not self.handler_.results_available(task_id, utils.node_id.ML_MODEL_METADATA.value):
self.handler_.condition.wait()

# retrieve node data
node_data = sustainml_swig.get_model_metadata(self.node_, task_id)
if node_data is None:
return {'Error': f"Failed to get {utils.string_node(utils.node_id.ML_MODEL_METADATA.value)} data for task {utils.string_task(task_id)}<br>"}
return {'Error': f"Failed to get {utils.string_node(utils.node_id.ML_MODEL_METADATA.value)} data for task {utils.string_task(task_id)}"}

# Parse data into json
task_json = {'problem_id': task_id.problem_id(), 'iteration_id': task_id.iteration_id()}
keywords_str_list = node_data.keywords()
metadata_str_list = node_data.ml_model_metadata()
json_output = {'keywords': f'{utils.string_std_vector(keywords_str_list)}<br>',
'metadata': f'{utils.string_std_vector(metadata_str_list)}<br>'}
json_output = {'task_id': task_json,
'keywords': utils.string_std_vector(keywords_str_list),
'metadata': utils.string_std_vector(metadata_str_list)}
return json_output

def get_hw_constraints(self, task_id):
with self.handler_.condition:
while not self.handler_.results_available(task_id, utils.node_id.HW_CONSTRAINTS.value):
self.handler_.condition.wait()

# retrieve node data
node_data = sustainml_swig.get_hw_constraints(self.node_, task_id)
if node_data is None:
return {'Error': f"Failed to get {utils.string_node(utils.node_id.HW_CONSTRAINTS.value)} data for task {utils.string_task(task_id)}<br>"}
return {'Error': f"Failed to get {utils.string_node(utils.node_id.HW_CONSTRAINTS.value)} data for task {utils.string_task(task_id)}"}

# Parse data into json
task_json = {'problem_id': task_id.problem_id(), 'iteration_id': task_id.iteration_id()}
max_value = node_data.max_memory_footprint()
required_hardware = node_data.hardware_required()
json_output = {'max_memory_footprint': f'{max_value}<br>',
'hardware_required': f'{utils.string_std_vector(required_hardware)}<br>'}
json_output = {'task_id': task_json,
'max_memory_footprint': max_value,
'hardware_required': utils.string_std_vector(required_hardware)}
return json_output

def get_ml_model_provider(self, task_id):
with self.handler_.condition:
while not self.handler_.results_available(task_id, utils.node_id.ML_MODEL_PROVIDER.value):
self.handler_.condition.wait()

# retrieve node data
node_data = sustainml_swig.get_model_provider(self.node_, task_id)
if node_data is None:
return {'Error': f"Failed to get {utils.string_node(utils.node_id.ML_MODEL_PROVIDER.value)} data for task {utils.string_task(task_id)}<br>"}
return {'Error': f"Failed to get {utils.string_node(utils.node_id.ML_MODEL_PROVIDER.value)} data for task {utils.string_task(task_id)}"}

# Parse data into json
task_json = {'problem_id': task_id.problem_id(), 'iteration_id': task_id.iteration_id()}
model = node_data.model()
model_path = node_data.model_path()
model_properties = node_data.model_properties()
model_properties_path = node_data.model_properties_path()
input_batch = node_data.input_batch()
target_latency = node_data.target_latency()
json_output = {'model': f'{model}<br>',
'model_path': f'{model_path}<br>',
'model_properties': f'{model_properties}<br>',
'model_properties_path': f'{model_properties_path}<br>',
'input_batch': f'{utils.string_std_vector(input_batch)}<br>',
'target_latency': f'{target_latency}<br>'}
json_output = {'task_id': task_json,
'model': model,
'model_path': model_path,
'model_properties': model_properties,
'model_properties_path': model_properties_path,
'input_batch': utils.string_std_vector(input_batch),
'target_latency': target_latency}
return json_output

def get_hw_provider(self, task_id):
with self.handler_.condition:
while not self.handler_.results_available(task_id, utils.node_id.HW_PROVIDER.value):
self.handler_.condition.wait()

# retrieve node data
node_data = sustainml_swig.get_hw_provider(self.node_, task_id)
if node_data is None:
return {'Error': f"Failed to get {utils.string_node(utils.node_id.HW_PROVIDER.value)} data for task {utils.string_task(task_id)}<br>"}
return {'Error': f"Failed to get {utils.string_node(utils.node_id.HW_PROVIDER.value)} data for task {utils.string_task(task_id)}"}

# Parse data into json
task_json = {'problem_id': task_id.problem_id(), 'iteration_id': task_id.iteration_id()}
hw_description = node_data.hw_description()
power_consumption = node_data.power_consumption()
latency = node_data.latency()
memory_footprint_of_ml_model = node_data.memory_footprint_of_ml_model()
json_output = {'hw_description': f'{hw_description}<br>',
'power_consumption': f'{power_consumption}<br>',
'latency': f'{latency}<br>',
'memory_footprint_of_ml_model': f'{memory_footprint_of_ml_model}<br>'}
json_output = {'task_id': task_json,
'hw_description': hw_description,
'power_consumption': power_consumption,
'latency': latency,
'memory_footprint_of_ml_model': memory_footprint_of_ml_model}
return json_output

def get_carbontracker(self, task_id):
with self.handler_.condition:
while not self.handler_.results_available(task_id, utils.node_id.CARBONTRACKER.value):
self.handler_.condition.wait()

# retrieve node data
node_data = sustainml_swig.get_carbontracker(self.node_, task_id)
if node_data is None:
return {'Error': f"Failed to get {utils.string_node(utils.node_id.CARBONTRACKER.value)} data for task {utils.string_task(task_id)}<br>"}
return {'Error': f"Failed to get {utils.string_node(utils.node_id.CARBONTRACKER.value)} data for task {utils.string_task(task_id)}"}

# Parse data into json
task_json = {'problem_id': task_id.problem_id(), 'iteration_id': task_id.iteration_id()}
carbon_footprint = node_data.carbon_footprint()
energy_consumption = node_data.energy_consumption()
carbon_intensity = node_data.carbon_intensity()
json_output = {'carbon_footprint': f'{carbon_footprint}<br>',
'energy_consumption': f'{energy_consumption}<br>',
'carbon_intensity': f'{carbon_intensity}<br>'}
json_output = {'task_id': task_json,
'carbon_footprint': carbon_footprint,
'energy_consumption': energy_consumption,
'carbon_intensity': carbon_intensity}
return json_output

def get_results(self, node_id, task_id):
if task_id is None:
task_id = self.get_last_task_id()

if node_id == utils.node_id.APP_REQUIREMENTS.value:
return self.get_app_requirements(task_id)
elif node_id == utils.node_id.ML_MODEL_METADATA.value:
Expand All @@ -193,19 +260,40 @@ def get_results(self, node_id, task_id):
elif node_id == utils.node_id.CARBONTRACKER.value:
return self.get_carbontracker(task_id)
else:
return utils.string_node(node_id) + " node does not have any results to show.<br>"
message = utils.string_node(node_id) + " node does not have any results to show."
return {'message': message, 'task_id': utils.task_json(task_id)}

def send_user_input(self, json_data):
pair = self.node_.prepare_new_task()
task_id = pair[0]
user_input = pair[1]
self.handler_.register_task(task_id)

user_input.task_id(task_id)
if (json_data.get('modality') is not None):
user_input.modality(json_data.get('modality'))
if (json_data.get('problem_type') is not None):
user_input.problem_definition(json_data.get('problem_type'))
if (json_data.get('problem_short_description') is not None):
user_input.problem_short_description(json_data.get('problem_short_description'))
#user_input.evaluation_metrics(evaluation_metrics)
#user_input.model(model)

return self.node_.start_task(task_id, user_input)
# TODO add missing fields

# Prepare extra data
hw_req = utils.default_hw_requirement
mem_footprint = utils.default_mem_footprint
if (json_data.get('hardware_required') is not None):
hw_req = json_data.get('hardware_required')
if (json_data.get('max_memory_footprint') is not None):
mem_footprint = json_data.get('max_memory_footprint')

# Add extra data to user user_input
extra_data = {'hardware_required': hw_req,
'max_memory_footprint': mem_footprint}
json_obj = utils.json_dict(extra_data)
data_array = np.frombuffer(json_obj.encode(), dtype=np.uint8)
user_input.extra_data(sustainml_swig.uint8_t_vector(data_array.tolist()))

if self.node_.start_task(task_id, user_input):
return task_id
else:
return None
Loading
Loading