diff --git a/sustainml_modules/requirements.txt b/sustainml_modules/requirements.txt index 1e425ba..783d416 100644 --- a/sustainml_modules/requirements.txt +++ b/sustainml_modules/requirements.txt @@ -1,3 +1,4 @@ +carbontracker flask flask-socketio==5.2.0 kaggle diff --git a/sustainml_modules/sustainml_modules/sustainml-wp2 b/sustainml_modules/sustainml_modules/sustainml-wp2 index 618391c..5d0582c 160000 --- a/sustainml_modules/sustainml_modules/sustainml-wp2 +++ b/sustainml_modules/sustainml_modules/sustainml-wp2 @@ -1 +1 @@ -Subproject commit 618391c6982957f67a3240c0f1d647adaa18a866 +Subproject commit 5d0582c2a32ba923d951a18f3016c759af49e96b diff --git a/sustainml_modules/sustainml_modules/sustainml-wp3 b/sustainml_modules/sustainml_modules/sustainml-wp3 index 8f60542..b896e6e 160000 --- a/sustainml_modules/sustainml_modules/sustainml-wp3 +++ b/sustainml_modules/sustainml_modules/sustainml-wp3 @@ -1 +1 @@ -Subproject commit 8f605427cfe1af0215ac6f0029d5f199274f3bc8 +Subproject commit b896e6ebe356d902d4aec342ed209fb13f785ef9 diff --git a/sustainml_modules/sustainml_modules/sustainml-wp5/backend_node.py b/sustainml_modules/sustainml_modules/sustainml-wp5/backend_node.py index b30412a..589e5d8 100644 --- a/sustainml_modules/sustainml_modules/sustainml-wp5/backend_node.py +++ b/sustainml_modules/sustainml_modules/sustainml-wp5/backend_node.py @@ -17,6 +17,7 @@ import threading import time import signal +import sustainml_swig import sys from orchestrator_node import orchestrator_node, utils from werkzeug.serving import make_server @@ -30,27 +31,28 @@ # Flask server default route @server.route('/') def hello_world(): - return jsonify({'mesage': 'Hello world! Use "/terminate" route to stop Back-end node.
'}), 200 + return jsonify({'mesage': 'Hello world! Use "/terminate" route to stop Back-end node.'}), 200 # Send user input data to orchestrator @server.route('/user_input', methods=['POST']) def user_input(): data = request.json - ret = orchestrator.send_user_input(data) - if not ret: + task_id = orchestrator.send_user_input(data) + if task_id is None: return jsonify({'error': 'Invalid input data'}), 400 - return jsonify({'message': 'User input data sent successfully.
'}), 200 + return jsonify({'message': 'User input data sent successfully.', + 'task_id': utils.task_json(task_id)}), 200 # Retrieve Node status methods @server.route('/status', methods=['GET']) def status(): - return jsonify({'status': f'{orchestrator.get_all_status()}'}), 200 + return jsonify({'status': orchestrator.get_all_status()}), 200 @server.route('/status', methods=['POST']) def status_args(): data = request.json node_id = data.get('node_id') - return jsonify({'status': f'{orchestrator.get_status(node_id)}'}), 200 + return jsonify({'status': orchestrator.get_status(node_id)}), 200 # Retrieve Node results methods @server.route('/results', methods=['GET']) @@ -64,20 +66,26 @@ def results(): model = orchestrator.get_results(utils.node_id.ML_MODEL_PROVIDER.value, last_task_id) hardware = orchestrator.get_results(utils.node_id.HW_PROVIDER.value, last_task_id) carbontracker = orchestrator.get_results(utils.node_id.CARBONTRACKER.value, last_task_id) - json = {f'{utils.string_node(utils.node_id.APP_REQUIREMENTS.value)}': f'{app_req}', - f'{utils.string_node(utils.node_id.ML_MODEL_METADATA.value)}': f'{metadata}', - f'{utils.string_node(utils.node_id.HW_CONSTRAINTS.value)}': f'{constraints}', - f'{utils.string_node(utils.node_id.ML_MODEL_PROVIDER.value)}': f'{model}', - f'{utils.string_node(utils.node_id.HW_PROVIDER.value)}': f'{hardware}', - f'{utils.string_node(utils.node_id.CARBONTRACKER.value)}': f'{carbontracker}'} + task_json = {'problem_id': last_task_id.problem_id(), 'iteration_id': last_task_id.iteration_id()} + json = {utils.string_node(utils.node_id.APP_REQUIREMENTS.value): app_req, + utils.string_node(utils.node_id.ML_MODEL_METADATA.value): metadata, + utils.string_node(utils.node_id.HW_CONSTRAINTS.value): constraints, + utils.string_node(utils.node_id.ML_MODEL_PROVIDER.value): model, + utils.string_node(utils.node_id.HW_PROVIDER.value): hardware, + utils.string_node(utils.node_id.CARBONTRACKER.value): carbontracker, + 'task_id': task_json} return jsonify(json), 200 @server.route('/results', methods=['POST']) def results_args(): data = request.json node_id = data.get('node_id') - task_id = data.get('task_id') - return jsonify({f'{utils.string_node(node_id)}': f'{orchestrator.get_results(node_id, task_id)}'}), 200 + json_task = data.get('task_id') + if json_task is not None: + task_id = sustainml_swig.set_task_id(json_task.get('problem_id', 0), json_task.get('iteration_id', 0)) + else: + task_id = None + return jsonify({utils.string_node(node_id): orchestrator.get_results(node_id, task_id)}), 200 # Flask server shutdown route @server.route('/shutdown', methods=['GET']) diff --git a/sustainml_modules/sustainml_modules/sustainml-wp5/orchestrator_node/orchestrator_node.py b/sustainml_modules/sustainml_modules/sustainml-wp5/orchestrator_node/orchestrator_node.py index e455d4c..2b60850 100644 --- a/sustainml_modules/sustainml_modules/sustainml-wp5/orchestrator_node/orchestrator_node.py +++ b/sustainml_modules/sustainml_modules/sustainml-wp5/orchestrator_node/orchestrator_node.py @@ -14,18 +14,22 @@ """SustainML Orchestrator Node API specification.""" from . import utils +import numpy as np from sustainml_swig import OrchestratorNodeHandle as cpp_OrchestratorNodeHandle from sustainml_swig import OrchestratorNode as cpp_OrchestratorNode from sustainml_swig import NodeStatus import sustainml_swig +import threading class OrchestratorNodeHandle(cpp_OrchestratorNodeHandle): def __init__(self): - self.node_status_ = {} + self.condition = threading.Condition() self.last_task_id = None + self.node_status_ = {} + self.result_status = {} # Parent class constructor super().__init__() @@ -46,14 +50,37 @@ def on_new_node_output( self, id : int, data): - task = sustainml_swig.get_task_id(id, data) - if (self.last_task_id is None and task is not None) or ( - self.last_task_id is not None and task is not None and task > self.last_task_id): - self.last_task_id = task - if task is None: + task_id = sustainml_swig.get_task_id(id, data) + if task_id is None: print(utils.string_node(id), "node output received.") else: - print(utils.string_node(id), "node output received from task", utils.string_task(task)) + print(utils.string_node(id), "node output received from task", utils.string_task(task_id)) + self.register_result(task_id, id) + + def register_task(self, task_id): + with self.condition: + if (self.last_task_id is None and task_id is not None) or ( + self.last_task_id is not None and task_id is not None and task_id > self.last_task_id): + self.last_task_id = task_id + self.result_status[utils.string_task(task_id)] = { + utils.node_id.APP_REQUIREMENTS.value: False, + utils.node_id.CARBONTRACKER.value: False, + utils.node_id.HW_CONSTRAINTS.value: False, + utils.node_id.HW_PROVIDER.value: False, + utils.node_id.ML_MODEL_METADATA.value: False, + utils.node_id.ML_MODEL_PROVIDER.value: False + } + + def register_result(self, task_id, node_id): + with self.condition: + if utils.string_task(task_id) not in self.result_status: + self.register_task(task_id) + self.result_status[utils.string_task(task_id)][node_id] = True + self.condition.notify_all() + + def results_available(self, task_id, node_id): + with self.condition: + return self.result_status[utils.string_task(task_id)].get(node_id, False) class Orchestrator: @@ -76,110 +103,150 @@ def get_last_task_id(self): return self.handler_.last_task_id def get_all_status(self): - output = "" + json_output = {} for key, value in self.handler_.node_status_.items(): - output += utils.string_node(key) + " node status " + utils.string_status(value) + "
" - if output == "": - output = "No nodes have reported their status yet.\n" - return output + json_output[utils.string_node(key)] = utils.string_status(value) + return json_output def get_status(self, node_id): - if node_id in self.handler_.node_status_: - return utils.string_status(self.handler_.node_status_[node_id]) + if node_id is None: + return self.get_all_status() else: - return utils.string_status(utils.node_status.INACTIVE.value) + if node_id in self.handler_.node_status_: + return utils.string_status(self.handler_.node_status_[node_id]) + else: + return utils.string_status(utils.node_status.INACTIVE.value) def get_app_requirements(self, task_id): + with self.handler_.condition: + while not self.handler_.results_available(task_id, utils.node_id.APP_REQUIREMENTS.value): + self.handler_.condition.wait() + # retrieve node data node_data = sustainml_swig.get_app_requirements(self.node_, task_id) if node_data is None: - return {'Error': f"Failed to get {utils.string_node(utils.node_id.APP_REQUIREMENTS.value)} data for task {utils.string_task(task_id)}
"} + return {'Error': f"Failed to get {utils.string_node(utils.node_id.APP_REQUIREMENTS.value)} data for task {utils.string_task(task_id)}"} # Parse data into json + task_json = {'problem_id': task_id.problem_id(), 'iteration_id': task_id.iteration_id()} app_requirements_str_list = node_data.app_requirements() - json_output = {'app_requirements': f'{utils.string_std_vector(app_requirements_str_list)}
'} + json_output = {'task_id': task_json, + 'app_requirements': utils.string_std_vector(app_requirements_str_list)} return json_output def get_model_metadata(self, task_id): + with self.handler_.condition: + while not self.handler_.results_available(task_id, utils.node_id.ML_MODEL_METADATA.value): + self.handler_.condition.wait() + # retrieve node data node_data = sustainml_swig.get_model_metadata(self.node_, task_id) if node_data is None: - return {'Error': f"Failed to get {utils.string_node(utils.node_id.ML_MODEL_METADATA.value)} data for task {utils.string_task(task_id)}
"} + return {'Error': f"Failed to get {utils.string_node(utils.node_id.ML_MODEL_METADATA.value)} data for task {utils.string_task(task_id)}"} # Parse data into json + task_json = {'problem_id': task_id.problem_id(), 'iteration_id': task_id.iteration_id()} keywords_str_list = node_data.keywords() metadata_str_list = node_data.ml_model_metadata() - json_output = {'keywords': f'{utils.string_std_vector(keywords_str_list)}
', - 'metadata': f'{utils.string_std_vector(metadata_str_list)}
'} + json_output = {'task_id': task_json, + 'keywords': utils.string_std_vector(keywords_str_list), + 'metadata': utils.string_std_vector(metadata_str_list)} return json_output def get_hw_constraints(self, task_id): + with self.handler_.condition: + while not self.handler_.results_available(task_id, utils.node_id.HW_CONSTRAINTS.value): + self.handler_.condition.wait() + # retrieve node data node_data = sustainml_swig.get_hw_constraints(self.node_, task_id) if node_data is None: - return {'Error': f"Failed to get {utils.string_node(utils.node_id.HW_CONSTRAINTS.value)} data for task {utils.string_task(task_id)}
"} + return {'Error': f"Failed to get {utils.string_node(utils.node_id.HW_CONSTRAINTS.value)} data for task {utils.string_task(task_id)}"} # Parse data into json + task_json = {'problem_id': task_id.problem_id(), 'iteration_id': task_id.iteration_id()} max_value = node_data.max_memory_footprint() required_hardware = node_data.hardware_required() - json_output = {'max_memory_footprint': f'{max_value}
', - 'hardware_required': f'{utils.string_std_vector(required_hardware)}
'} + json_output = {'task_id': task_json, + 'max_memory_footprint': max_value, + 'hardware_required': utils.string_std_vector(required_hardware)} return json_output def get_ml_model_provider(self, task_id): + with self.handler_.condition: + while not self.handler_.results_available(task_id, utils.node_id.ML_MODEL_PROVIDER.value): + self.handler_.condition.wait() + # retrieve node data node_data = sustainml_swig.get_model_provider(self.node_, task_id) if node_data is None: - return {'Error': f"Failed to get {utils.string_node(utils.node_id.ML_MODEL_PROVIDER.value)} data for task {utils.string_task(task_id)}
"} + return {'Error': f"Failed to get {utils.string_node(utils.node_id.ML_MODEL_PROVIDER.value)} data for task {utils.string_task(task_id)}"} # Parse data into json + task_json = {'problem_id': task_id.problem_id(), 'iteration_id': task_id.iteration_id()} model = node_data.model() model_path = node_data.model_path() model_properties = node_data.model_properties() model_properties_path = node_data.model_properties_path() input_batch = node_data.input_batch() target_latency = node_data.target_latency() - json_output = {'model': f'{model}
', - 'model_path': f'{model_path}
', - 'model_properties': f'{model_properties}
', - 'model_properties_path': f'{model_properties_path}
', - 'input_batch': f'{utils.string_std_vector(input_batch)}
', - 'target_latency': f'{target_latency}
'} + json_output = {'task_id': task_json, + 'model': model, + 'model_path': model_path, + 'model_properties': model_properties, + 'model_properties_path': model_properties_path, + 'input_batch': utils.string_std_vector(input_batch), + 'target_latency': target_latency} return json_output def get_hw_provider(self, task_id): + with self.handler_.condition: + while not self.handler_.results_available(task_id, utils.node_id.HW_PROVIDER.value): + self.handler_.condition.wait() + # retrieve node data node_data = sustainml_swig.get_hw_provider(self.node_, task_id) if node_data is None: - return {'Error': f"Failed to get {utils.string_node(utils.node_id.HW_PROVIDER.value)} data for task {utils.string_task(task_id)}
"} + return {'Error': f"Failed to get {utils.string_node(utils.node_id.HW_PROVIDER.value)} data for task {utils.string_task(task_id)}"} # Parse data into json + task_json = {'problem_id': task_id.problem_id(), 'iteration_id': task_id.iteration_id()} hw_description = node_data.hw_description() power_consumption = node_data.power_consumption() latency = node_data.latency() memory_footprint_of_ml_model = node_data.memory_footprint_of_ml_model() - json_output = {'hw_description': f'{hw_description}
', - 'power_consumption': f'{power_consumption}
', - 'latency': f'{latency}
', - 'memory_footprint_of_ml_model': f'{memory_footprint_of_ml_model}
'} + json_output = {'task_id': task_json, + 'hw_description': hw_description, + 'power_consumption': power_consumption, + 'latency': latency, + 'memory_footprint_of_ml_model': memory_footprint_of_ml_model} return json_output def get_carbontracker(self, task_id): + with self.handler_.condition: + while not self.handler_.results_available(task_id, utils.node_id.CARBONTRACKER.value): + self.handler_.condition.wait() + # retrieve node data node_data = sustainml_swig.get_carbontracker(self.node_, task_id) if node_data is None: - return {'Error': f"Failed to get {utils.string_node(utils.node_id.CARBONTRACKER.value)} data for task {utils.string_task(task_id)}
"} + return {'Error': f"Failed to get {utils.string_node(utils.node_id.CARBONTRACKER.value)} data for task {utils.string_task(task_id)}"} # Parse data into json + task_json = {'problem_id': task_id.problem_id(), 'iteration_id': task_id.iteration_id()} carbon_footprint = node_data.carbon_footprint() energy_consumption = node_data.energy_consumption() carbon_intensity = node_data.carbon_intensity() - json_output = {'carbon_footprint': f'{carbon_footprint}
', - 'energy_consumption': f'{energy_consumption}
', - 'carbon_intensity': f'{carbon_intensity}
'} + json_output = {'task_id': task_json, + 'carbon_footprint': carbon_footprint, + 'energy_consumption': energy_consumption, + 'carbon_intensity': carbon_intensity} return json_output def get_results(self, node_id, task_id): + if task_id is None: + task_id = self.get_last_task_id() + if node_id == utils.node_id.APP_REQUIREMENTS.value: return self.get_app_requirements(task_id) elif node_id == utils.node_id.ML_MODEL_METADATA.value: @@ -193,19 +260,40 @@ def get_results(self, node_id, task_id): elif node_id == utils.node_id.CARBONTRACKER.value: return self.get_carbontracker(task_id) else: - return utils.string_node(node_id) + " node does not have any results to show.
" + message = utils.string_node(node_id) + " node does not have any results to show." + return {'message': message, 'task_id': utils.task_json(task_id)} def send_user_input(self, json_data): pair = self.node_.prepare_new_task() task_id = pair[0] user_input = pair[1] + self.handler_.register_task(task_id) user_input.task_id(task_id) if (json_data.get('modality') is not None): user_input.modality(json_data.get('modality')) - if (json_data.get('problem_type') is not None): - user_input.problem_definition(json_data.get('problem_type')) + if (json_data.get('problem_short_description') is not None): + user_input.problem_short_description(json_data.get('problem_short_description')) #user_input.evaluation_metrics(evaluation_metrics) #user_input.model(model) - - return self.node_.start_task(task_id, user_input) + # TODO add missing fields + + # Prepare extra data + hw_req = utils.default_hw_requirement + mem_footprint = utils.default_mem_footprint + if (json_data.get('hardware_required') is not None): + hw_req = json_data.get('hardware_required') + if (json_data.get('max_memory_footprint') is not None): + mem_footprint = json_data.get('max_memory_footprint') + + # Add extra data to user user_input + extra_data = {'hardware_required': hw_req, + 'max_memory_footprint': mem_footprint} + json_obj = utils.json_dict(extra_data) + data_array = np.frombuffer(json_obj.encode(), dtype=np.uint8) + user_input.extra_data(sustainml_swig.uint8_t_vector(data_array.tolist())) + + if self.node_.start_task(task_id, user_input): + return task_id + else: + return None diff --git a/sustainml_modules/sustainml_modules/sustainml-wp5/orchestrator_node/utils.py b/sustainml_modules/sustainml_modules/sustainml-wp5/orchestrator_node/utils.py index 45b2ce9..a3e6ed2 100644 --- a/sustainml_modules/sustainml_modules/sustainml-wp5/orchestrator_node/utils.py +++ b/sustainml_modules/sustainml_modules/sustainml-wp5/orchestrator_node/utils.py @@ -14,8 +14,10 @@ """SustainML Orchestrator Node utility methods.""" from enum import Enum -from sustainml_swig import AppRequirements, CO2Footprint, HWConstraints, HWResource, MLModelMetadata, MLModel -import sustainml_swig +import json + +default_hw_requirement = "PIM_AI_1chip" +default_mem_footprint = 100 class node_id(Enum): APP_REQUIREMENTS = 0 @@ -57,21 +59,23 @@ def string_status(status): def string_node(node): if node == node_id.APP_REQUIREMENTS.value: # ID_APP_REQUIREMENTS - return "Application-level requirements" + return "APP_REQUIREMENTS" elif node == node_id.CARBONTRACKER.value: # ID_CARBON_FOOTPRINT - return "Carbontracker" + return "CARBON_FOOTPRINT" elif node == node_id.HW_CONSTRAINTS.value: # ID_HW_CONSTRAINTS - return "HW Constraints for inference" + return "HW_CONSTRAINTS" elif node == node_id.HW_PROVIDER.value: # ID_HW_RESOURCES - return "HW Provider" + return "HW_RESOURCES" elif node == node_id.ML_MODEL_METADATA.value: # ID_ML_MODEL_METADATA - return "ML Model Metadata" + return "ML_MODEL_METADATA" elif node == node_id.ML_MODEL_PROVIDER.value: # ID_ML_MODEL - return "ML Model Provider" - elif node == node_id.ORCHESTRATOR.value: # ID_ORCHESTRATOR (MAX is ID 6) - return "Orchestrator" + return "ML_MODEL" + elif node == node_id.MAX.value: # MAX + return "MAX" + elif node == node_id.ORCHESTRATOR.value: # ID_ORCHESTRATOR + return "ORCHESTRATOR" else: - return "Unknown node" + return "UNKNOWN" def string_std_vector(vector): output = "" @@ -84,6 +88,14 @@ def string_std_vector(vector): output += str(vector[i]) return output +def task_json(task_id): + return {"problem_id": task_id.problem_id(), "iteration_id": task_id.iteration_id()} + def string_task(task): return "{" + str(task.problem_id()) + ", " + str(task.iteration_id()) + "}" +def json_dict(dict): + return json.dumps(dict, indent=4) + +def dict_from_json(json_obj): + return json.loads(json_obj) diff --git a/sustainml_swig/src/swig/sustainml_swig/nodes/OrchestratorNode.i b/sustainml_swig/src/swig/sustainml_swig/nodes/OrchestratorNode.i index 985235c..3608ae4 100644 --- a/sustainml_swig/src/swig/sustainml_swig/nodes/OrchestratorNode.i +++ b/sustainml_swig/src/swig/sustainml_swig/nodes/OrchestratorNode.i @@ -170,6 +170,13 @@ } return task_id; } + + types::TaskId* set_task_id( + const uint32_t& problem_id, + const uint32_t& iteration_id) + { + return new types::TaskId(problem_id, iteration_id); + } %} // Include the class interfaces diff --git a/sustainml_swig/src/swig/sustainml_swig/types/types.i b/sustainml_swig/src/swig/sustainml_swig/types/types.i index 7567c10..0a86605 100644 --- a/sustainml_swig/src/swig/sustainml_swig/types/types.i +++ b/sustainml_swig/src/swig/sustainml_swig/types/types.i @@ -23,7 +23,7 @@ %include std_string.i // Ignore overloaded methods that have no application on Python -// Warnings regarding equiality operators and stuff +// Warnings regarding equality operators and stuff %ignore *::operator=; %ignore operator<<; @@ -31,5 +31,15 @@ #include %} +%extend std::vector +{ + const uint8_t* get_buffer() const + { + return self->data(); + } +} + +%template(uint8_t_vector) std::vector; + // Include the class interfaces %include