From 7398ccf917ae12a3d9211d7d743173d26cf9f607 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Tue, 31 Jan 2023 10:17:09 -0600 Subject: [PATCH 01/11] Have SchedulerRequestMessage extnd DmodJobRequest. Modifying class supertype, then updating __init__ and adding necessary properties. --- .../dmod/communication/scheduler_request.py | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/python/lib/communication/dmod/communication/scheduler_request.py b/python/lib/communication/dmod/communication/scheduler_request.py index 790534475..9901c2015 100644 --- a/python/lib/communication/dmod/communication/scheduler_request.py +++ b/python/lib/communication/dmod/communication/scheduler_request.py @@ -1,10 +1,13 @@ from dmod.core.execution import AllocationParadigm from .maas_request import ModelExecRequest, ModelExecRequestResponse -from .message import AbstractInitRequest, MessageEventType, Response -from typing import Optional, Union +from .maas_request.dmod_job_request import DmodJobRequest +from .message import MessageEventType, Response +from typing import Optional, Union, List +from dmod.core.meta_data import DataRequirement, DataFormat -class SchedulerRequestMessage(AbstractInitRequest): + +class SchedulerRequestMessage(DmodJobRequest): event_type: MessageEventType = MessageEventType.SCHEDULER_REQUEST """ :class:`MessageEventType`: the event type for this message implementation """ @@ -60,7 +63,8 @@ def factory_init_from_deserialized_json(cls, json_obj: dict): # TODO: may need to generalize the underlying request to support, say, scheduling evaluation jobs def __init__(self, model_request: ModelExecRequest, user_id: str, cpus: Optional[int] = None, mem: Optional[int] = None, - allocation_paradigm: Optional[Union[str, AllocationParadigm]] = None): + allocation_paradigm: Optional[Union[str, AllocationParadigm]] = None, *args, **kwargs): + super(SchedulerRequestMessage, self).__init__(*args, *kwargs) self._model_request = model_request self._user_id = user_id self._cpus = cpus @@ -113,6 +117,10 @@ def cpus(self) -> int: """ return self.model_request.cpu_count if self._cpus is None else self._cpus + @property + def data_requirements(self) -> List[DataRequirement]: + return self.model_request.data_requirements + @property def memory(self) -> int: """ @@ -161,6 +169,10 @@ def nested_event(self) -> MessageEventType: """ return self.model_request.get_message_event_type() + @property + def output_formats(self) -> List[DataFormat]: + return self.model_request.output_formats + @property def user_id(self) -> str: """ From 6a5d5cff2fa145cbbe7115ac3b7f45b774d70276 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Tue, 31 Jan 2023 10:36:25 -0600 Subject: [PATCH 02/11] Refactor init of RequestedJob class. - Updating class __init__ to require params previously taken from (already present) job_request param to conform to standard pattern used in class hierarchies - Adding convenience class method to factory create an instance with just the originating message, which __init__ previously did - Updating deserialization to account for above changes --- .../lib/scheduler/dmod/scheduler/job/job.py | 50 +++++++++++++++---- 1 file changed, 39 insertions(+), 11 deletions(-) diff --git a/python/lib/scheduler/dmod/scheduler/job/job.py b/python/lib/scheduler/dmod/scheduler/job/job.py index e7cf9b912..a781ed383 100644 --- a/python/lib/scheduler/dmod/scheduler/job/job.py +++ b/python/lib/scheduler/dmod/scheduler/job/job.py @@ -946,7 +946,7 @@ def parse_serialized_job_id(cls, serialized_value: Optional[str], **kwargs): raise RuntimeError(msg) def __init__(self, cpu_count: int, memory_size: int, model_request: ExternalRequest, - allocation_paradigm: Union[str, AllocationParadigm], alloc_priority: int = 0): + allocation_paradigm: Union[str, AllocationParadigm], alloc_priority: int = 0, *args, **kwargs): self._cpu_count = cpu_count self._memory_size = memory_size self._model_request = model_request @@ -1329,17 +1329,14 @@ def factory_init_from_deserialized_json(cls, json_obj: dict): return None # Create the object initially from the request - new_obj = cls(job_request=request) + new_obj = cls(job_request=request, cpu_count=cpus, memory_size=memory, allocation_paradigm=paradigm, + alloc_priority=priority) # Then update its properties based on the deserialized values, as those are considered most correct # Use property setter for job id to handle string or UUID new_obj.job_id = job_id - new_obj._cpu_count = cpus - new_obj._memory_size = memory - new_obj._allocation_paradigm = paradigm - new_obj._allocation_priority = priority new_obj._rsa_key_pair = rsa_key_pair new_obj._status = status new_obj._allocations = allocations @@ -1351,12 +1348,43 @@ def factory_init_from_deserialized_json(cls, json_obj: dict): return new_obj - def __init__(self, job_request: SchedulerRequestMessage): + @classmethod + def factory_init_from_request(cls, job_request: SchedulerRequestMessage) -> 'RequestedJob': + """ + Factory init function to create an object from the parameters implied by the job request. + + Parameters + ---------- + job_request + + Returns + ------- + + """ + return cls(job_request=job_request, cpu_count=job_request.cpus, memory_size=job_request.memory, + allocation_paradigm=job_request.allocation_paradigm) + + def __init__(self, job_request: SchedulerRequestMessage, *args, **kwargs): + """ + Initialize this instance. + + Parameters + ---------- + job_request + args + kwargs + + Other Parameters + ---------- + cpu_count + memory_size + model_request + allocation_paradigm + alloc_priority + """ + super(RequestedJob, self).__init__(model_request=job_request.model_request, *args, **kwargs) self._originating_request = job_request - super().__init__(cpu_count=job_request.cpus, memory_size=job_request.memory, - model_request=job_request.model_request, - allocation_paradigm=job_request.allocation_paradigm) - self.data_requirements = self.model_request.data_requirements + self.data_requirements = job_request.model_request.data_requirements @property def model_request(self) -> ExternalRequest: From 8da21eead65ae7f7aee7273a453efb7a2fe97475 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Tue, 31 Jan 2023 10:38:36 -0600 Subject: [PATCH 03/11] Update manager create_job to expect request param. Updating interface definition and existing implementation of job manager create_job function to have a fixed, required 'request' parameter of a type extending DmodJobRequest. --- .../dmod/scheduler/job/job_manager.py | 42 ++++++++++++------- 1 file changed, 26 insertions(+), 16 deletions(-) diff --git a/python/lib/scheduler/dmod/scheduler/job/job_manager.py b/python/lib/scheduler/dmod/scheduler/job/job_manager.py index 34a76835f..91552885c 100644 --- a/python/lib/scheduler/dmod/scheduler/job/job_manager.py +++ b/python/lib/scheduler/dmod/scheduler/job/job_manager.py @@ -4,6 +4,7 @@ from typing import Dict, List, Optional, Tuple, Union from uuid import UUID, uuid4 as random_uuid from dmod.core.execution import AllocationParadigm +from dmod.communication.maas_request.dmod_job_request import DmodJobRequest from .job import Job, JobExecPhase, JobExecStep, JobStatus, RequestedJob from .job_util import JobUtil, RedisBackedJobUtil from ..resources.resource_allocation import ResourceAllocation @@ -110,7 +111,7 @@ def build_prioritized_pending_allocation_queues(cls, jobs_eligible_for_allocate: pass @abstractmethod - def create_job(self, **kwargs) -> Job: + def create_job(self, request: DmodJobRequest, *args, **kwargs) -> Job: """ Create and return a new job object. @@ -123,8 +124,12 @@ def create_job(self, **kwargs) -> Job: Parameters ---------- + request : DmodJobRequest + The originating request for the job. + args + Other optional positional arguments kwargs - Other appropriate, implementation-specific keyed parameters supported for creating the job object. + Other optional keyword arguments. Returns ------- @@ -412,7 +417,7 @@ def _request_allocations_for_queue(self, jobs_priority_queue: List[Tuple[int, Re self.save_job(j) return allocated_successfully - def create_job(self, **kwargs) -> RequestedJob: + def create_job(self, request: SchedulerRequestMessage, *args, **kwargs) -> RequestedJob: """ Create and return a new job object that has been saved to the backend store. @@ -420,15 +425,17 @@ def create_job(self, **kwargs) -> RequestedJob: ::class:`SchedulerRequestMessage` as a parameter. This is in the ``request`` keyword arg. Parameters - ---------- - kwargs - Implementation-specific keyed parameters for creating appropriate job objects (see *Keyword Args* section). - - Keyword Args ------------ request : SchedulerRequestMessage The originating request for the job. - job_id : str, UUID, None + args + Other optional positional arguments + kwargs + Other optional keyword arguments. + + Other Parameters + ------------ + job_id : Union[str, UUID] Optional value to try use for the job's id, falling back to random if not present, invalid, or already used. Returns @@ -436,16 +443,19 @@ def create_job(self, **kwargs) -> RequestedJob: RequestedJob The newly created job object. """ - job_obj = RequestedJob(job_request=kwargs['request']) + job_obj = RequestedJob.factory_init_from_request(job_request=request) + # TODO: do some processing here or in the object init to build restrictions and constraints that make sense globally; + # i.e., make sure the requirements for forcings satisfy the necessary hydrofabric if the config doesn't include a specific subset explicitly + uuid_param = kwargs.get('job_id', random_uuid()) try: - job_uuid = kwargs['job_id'] if isinstance(kwargs['job_id'], UUID) else UUID(str(kwargs['job_id'])) - if not self._does_redis_key_exist(self._get_job_key_for_id(job_uuid)): - job_obj.job_id = job_uuid - else: - job_obj.job_id = random_uuid() + job_uuid = uuid_param if isinstance(uuid_param, UUID) else UUID(str(uuid_param)) except: - job_obj.job_id = random_uuid() + job_uuid = random_uuid() + + while self._does_redis_key_exist(self._get_job_key_for_id(job_uuid)): + job_uuid = random_uuid() + job_obj.job_id = job_uuid self.save_job(job_obj) return job_obj From c0b57a421927bd97e8b4114cbf04b4630f321e5d Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Tue, 31 Jan 2023 10:40:56 -0600 Subject: [PATCH 04/11] Update client dependency to latest comms. Updating dmod.client to depend on dmod.communication>=0.11.0. --- python/lib/client/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/lib/client/setup.py b/python/lib/client/setup.py index 779e156ba..3052639b1 100644 --- a/python/lib/client/setup.py +++ b/python/lib/client/setup.py @@ -22,6 +22,6 @@ license='', include_package_data=True, #install_requires=['websockets', 'jsonschema'],vi - install_requires=['dmod-core>=0.1.0', 'websockets>=8.1', 'pyyaml', 'dmod-communication>=0.7.0', 'dmod-externalrequests>=0.3.0'], + install_requires=['dmod-core>=0.1.0', 'websockets>=8.1', 'pyyaml', 'dmod-communication>=0.11.0', 'dmod-externalrequests>=0.3.0'], packages=find_namespace_packages(include=['dmod.*'], exclude=['dmod.test']) ) From a7cae8e78adcca8dac3e1eb2a5cfb0987e15a9a9 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Tue, 31 Jan 2023 10:41:09 -0600 Subject: [PATCH 05/11] Bump dmod.client version to 0.3.0. --- python/lib/client/dmod/client/_version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/lib/client/dmod/client/_version.py b/python/lib/client/dmod/client/_version.py index df9144c54..0404d8103 100644 --- a/python/lib/client/dmod/client/_version.py +++ b/python/lib/client/dmod/client/_version.py @@ -1 +1 @@ -__version__ = '0.1.1' +__version__ = '0.3.0' From 1dd251e57239a92e0263d55f2fea7d804bb5bec5 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Tue, 31 Jan 2023 10:49:27 -0600 Subject: [PATCH 06/11] Update dataservice to new dependencies. --- python/services/dataservice/setup.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/services/dataservice/setup.py b/python/services/dataservice/setup.py index 5e83ded8b..d1d6b6462 100644 --- a/python/services/dataservice/setup.py +++ b/python/services/dataservice/setup.py @@ -17,7 +17,7 @@ author_email='', url='', license='', - install_requires=['dmod-core>=0.3.0', 'dmod-communication>=0.11.0', 'dmod-scheduler>=0.10.0', 'dmod-modeldata>=0.9.0', - 'redis'], + install_requires=['dmod-core>=0.5.0', 'dmod-communication>=0.11.0', 'dmod-scheduler>=0.10.0', + 'dmod-modeldata>=0.9.0', 'redis'], packages=find_namespace_packages(exclude=['dmod.test', 'deprecated', 'conf', 'schemas', 'ssl', 'src']) ) From 538588ca3f68107475b5f15edb9429f5d668306b Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Tue, 31 Jan 2023 10:50:51 -0600 Subject: [PATCH 07/11] Bump requestservice version to 0.7.0. --- python/services/requestservice/dmod/requestservice/_version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/services/requestservice/dmod/requestservice/_version.py b/python/services/requestservice/dmod/requestservice/_version.py index 3966a5f15..19442947c 100644 --- a/python/services/requestservice/dmod/requestservice/_version.py +++ b/python/services/requestservice/dmod/requestservice/_version.py @@ -1 +1 @@ -__version__ = '0.6.1' \ No newline at end of file +__version__ = '0.7.0' \ No newline at end of file From 94c14f6bbf91554bea3e7eb2683620a980dbae24 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Wed, 22 Mar 2023 10:20:55 -0500 Subject: [PATCH 08/11] Fix init of RequestedJob. Fixing init to properly handle args passed to super init. --- python/lib/scheduler/dmod/scheduler/job/job.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/python/lib/scheduler/dmod/scheduler/job/job.py b/python/lib/scheduler/dmod/scheduler/job/job.py index a781ed383..78d5fbd68 100644 --- a/python/lib/scheduler/dmod/scheduler/job/job.py +++ b/python/lib/scheduler/dmod/scheduler/job/job.py @@ -1364,7 +1364,7 @@ def factory_init_from_request(cls, job_request: SchedulerRequestMessage) -> 'Req return cls(job_request=job_request, cpu_count=job_request.cpus, memory_size=job_request.memory, allocation_paradigm=job_request.allocation_paradigm) - def __init__(self, job_request: SchedulerRequestMessage, *args, **kwargs): + def __init__(self, job_request: SchedulerRequestMessage, **kwargs): """ Initialize this instance. @@ -1382,7 +1382,14 @@ def __init__(self, job_request: SchedulerRequestMessage, *args, **kwargs): allocation_paradigm alloc_priority """ - super(RequestedJob, self).__init__(model_request=job_request.model_request, *args, **kwargs) + if 'cpu_count' not in kwargs: + kwargs['cpu_count'] = job_request.cpus + if 'memory_size' not in kwargs: + kwargs['memory_size'] = job_request.memory + if 'allocation_paradigm' not in kwargs: + kwargs['allocation_paradigm'] = job_request.allocation_paradigm + + super().__init__(model_request=job_request.model_request, **kwargs) self._originating_request = job_request self.data_requirements = job_request.model_request.data_requirements From 2e75850d52382c3086e4a1f0cfbb1451f777b660 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Thu, 23 Mar 2023 11:56:41 -0500 Subject: [PATCH 09/11] Fix kwargs bug in init of SchedulerRequestMessage. --- .../lib/communication/dmod/communication/scheduler_request.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/lib/communication/dmod/communication/scheduler_request.py b/python/lib/communication/dmod/communication/scheduler_request.py index 9901c2015..a154a0dc4 100644 --- a/python/lib/communication/dmod/communication/scheduler_request.py +++ b/python/lib/communication/dmod/communication/scheduler_request.py @@ -64,7 +64,7 @@ def factory_init_from_deserialized_json(cls, json_obj: dict): # TODO: may need to generalize the underlying request to support, say, scheduling evaluation jobs def __init__(self, model_request: ModelExecRequest, user_id: str, cpus: Optional[int] = None, mem: Optional[int] = None, allocation_paradigm: Optional[Union[str, AllocationParadigm]] = None, *args, **kwargs): - super(SchedulerRequestMessage, self).__init__(*args, *kwargs) + super(SchedulerRequestMessage, self).__init__(*args, **kwargs) self._model_request = model_request self._user_id = user_id self._cpus = cpus From 4068ff8c1cebdab5169fe779c0b7f6791c325837 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Thu, 23 Mar 2023 11:58:36 -0500 Subject: [PATCH 10/11] Ensure config_data_id passed in scheduler request. Make sure value from passed model_request is used if no explicit 'config_data_id' is passed, while also ensuring an explicit value matches if it is provided. --- .../lib/communication/dmod/communication/scheduler_request.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/python/lib/communication/dmod/communication/scheduler_request.py b/python/lib/communication/dmod/communication/scheduler_request.py index a154a0dc4..daffe79c0 100644 --- a/python/lib/communication/dmod/communication/scheduler_request.py +++ b/python/lib/communication/dmod/communication/scheduler_request.py @@ -64,6 +64,10 @@ def factory_init_from_deserialized_json(cls, json_obj: dict): # TODO: may need to generalize the underlying request to support, say, scheduling evaluation jobs def __init__(self, model_request: ModelExecRequest, user_id: str, cpus: Optional[int] = None, mem: Optional[int] = None, allocation_paradigm: Optional[Union[str, AllocationParadigm]] = None, *args, **kwargs): + if 'config_data_id' not in kwargs and len(args) == 0: + kwargs['config_data_id'] = model_request.config_data_id + elif (args[0] if len(args) > 0 else kwargs['config_data_id']) != model_request.config_data_id: + raise ValueError('Bad init value for "config_data_id" that does not match model_request') super(SchedulerRequestMessage, self).__init__(*args, **kwargs) self._model_request = model_request self._user_id = user_id From a4a7b50dee1dffb2359e12d4ddba479706e54fe9 Mon Sep 17 00:00:00 2001 From: Robert Bartel Date: Thu, 23 Mar 2023 11:58:59 -0500 Subject: [PATCH 11/11] Add scheduler request tests for config_data_id. --- .../test/test_scheduler_request_message.py | 23 +++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/python/lib/communication/dmod/test/test_scheduler_request_message.py b/python/lib/communication/dmod/test/test_scheduler_request_message.py index 2af37f478..c4d9b0d3a 100644 --- a/python/lib/communication/dmod/test/test_scheduler_request_message.py +++ b/python/lib/communication/dmod/test/test_scheduler_request_message.py @@ -91,6 +91,29 @@ def test_factory_init_from_deserialized_json_1_a(self): obj = SchedulerRequestMessage.factory_init_from_deserialized_json(self.request_jsons[example_index]) self.assertEqual(obj, self.request_objs[example_index]) + def test_config_data_id_1_a(self): + """ + Test that the ``config_data_id`` of the object matches the same value within the nested model request. + """ + example_index = 1 + ex_obj = self.request_objs[example_index] + self.assertEqual(ex_obj.model_request.config_data_id, ex_obj.config_data_id) + + def test_config_data_id_1_b(self): + """ + Test that an instance will not create if we try to init with a non-matching ``config_data_id``. + """ + example_index = 1 + ex_obj = self.request_objs[example_index] + bogus_config_data_id = ex_obj.model_request.config_data_id + "_bogus_text_non_matching" + + self.assertRaises(ValueError, SchedulerRequestMessage, + model_request=ex_obj.model_request, + user_id=ex_obj.user_id, + config_data_id=bogus_config_data_id, + cpus=ex_obj.cpus, mem=ex_obj.memory, + allocation_paradigm=ex_obj.allocation_paradigm) + def test_to_dict_0_a(self): """ Assert that the example object at the 0th index serializes to a dict as expected by comparing to the pre-set