diff --git a/python/lib/client/dmod/client/_version.py b/python/lib/client/dmod/client/_version.py index df9144c54..0404d8103 100644 --- a/python/lib/client/dmod/client/_version.py +++ b/python/lib/client/dmod/client/_version.py @@ -1 +1 @@ -__version__ = '0.1.1' +__version__ = '0.3.0' diff --git a/python/lib/client/setup.py b/python/lib/client/setup.py index 779e156ba..3052639b1 100644 --- a/python/lib/client/setup.py +++ b/python/lib/client/setup.py @@ -22,6 +22,6 @@ license='', include_package_data=True, #install_requires=['websockets', 'jsonschema'],vi - install_requires=['dmod-core>=0.1.0', 'websockets>=8.1', 'pyyaml', 'dmod-communication>=0.7.0', 'dmod-externalrequests>=0.3.0'], + install_requires=['dmod-core>=0.1.0', 'websockets>=8.1', 'pyyaml', 'dmod-communication>=0.11.0', 'dmod-externalrequests>=0.3.0'], packages=find_namespace_packages(include=['dmod.*'], exclude=['dmod.test']) ) diff --git a/python/lib/communication/dmod/communication/scheduler_request.py b/python/lib/communication/dmod/communication/scheduler_request.py index 790534475..daffe79c0 100644 --- a/python/lib/communication/dmod/communication/scheduler_request.py +++ b/python/lib/communication/dmod/communication/scheduler_request.py @@ -1,10 +1,13 @@ from dmod.core.execution import AllocationParadigm from .maas_request import ModelExecRequest, ModelExecRequestResponse -from .message import AbstractInitRequest, MessageEventType, Response -from typing import Optional, Union +from .maas_request.dmod_job_request import DmodJobRequest +from .message import MessageEventType, Response +from typing import Optional, Union, List +from dmod.core.meta_data import DataRequirement, DataFormat -class SchedulerRequestMessage(AbstractInitRequest): + +class SchedulerRequestMessage(DmodJobRequest): event_type: MessageEventType = MessageEventType.SCHEDULER_REQUEST """ :class:`MessageEventType`: the event type for this message implementation """ @@ -60,7 +63,12 @@ def factory_init_from_deserialized_json(cls, json_obj: dict): # TODO: may need to generalize the underlying request to support, say, scheduling evaluation jobs def __init__(self, model_request: ModelExecRequest, user_id: str, cpus: Optional[int] = None, mem: Optional[int] = None, - allocation_paradigm: Optional[Union[str, AllocationParadigm]] = None): + allocation_paradigm: Optional[Union[str, AllocationParadigm]] = None, *args, **kwargs): + if 'config_data_id' not in kwargs and len(args) == 0: + kwargs['config_data_id'] = model_request.config_data_id + elif (args[0] if len(args) > 0 else kwargs['config_data_id']) != model_request.config_data_id: + raise ValueError('Bad init value for "config_data_id" that does not match model_request') + super(SchedulerRequestMessage, self).__init__(*args, **kwargs) self._model_request = model_request self._user_id = user_id self._cpus = cpus @@ -113,6 +121,10 @@ def cpus(self) -> int: """ return self.model_request.cpu_count if self._cpus is None else self._cpus + @property + def data_requirements(self) -> List[DataRequirement]: + return self.model_request.data_requirements + @property def memory(self) -> int: """ @@ -161,6 +173,10 @@ def nested_event(self) -> MessageEventType: """ return self.model_request.get_message_event_type() + @property + def output_formats(self) -> List[DataFormat]: + return self.model_request.output_formats + @property def user_id(self) -> str: """ diff --git a/python/lib/communication/dmod/test/test_scheduler_request_message.py b/python/lib/communication/dmod/test/test_scheduler_request_message.py index 2af37f478..c4d9b0d3a 100644 --- a/python/lib/communication/dmod/test/test_scheduler_request_message.py +++ b/python/lib/communication/dmod/test/test_scheduler_request_message.py @@ -91,6 +91,29 @@ def test_factory_init_from_deserialized_json_1_a(self): obj = SchedulerRequestMessage.factory_init_from_deserialized_json(self.request_jsons[example_index]) self.assertEqual(obj, self.request_objs[example_index]) + def test_config_data_id_1_a(self): + """ + Test that the ``config_data_id`` of the object matches the same value within the nested model request. + """ + example_index = 1 + ex_obj = self.request_objs[example_index] + self.assertEqual(ex_obj.model_request.config_data_id, ex_obj.config_data_id) + + def test_config_data_id_1_b(self): + """ + Test that an instance will not create if we try to init with a non-matching ``config_data_id``. + """ + example_index = 1 + ex_obj = self.request_objs[example_index] + bogus_config_data_id = ex_obj.model_request.config_data_id + "_bogus_text_non_matching" + + self.assertRaises(ValueError, SchedulerRequestMessage, + model_request=ex_obj.model_request, + user_id=ex_obj.user_id, + config_data_id=bogus_config_data_id, + cpus=ex_obj.cpus, mem=ex_obj.memory, + allocation_paradigm=ex_obj.allocation_paradigm) + def test_to_dict_0_a(self): """ Assert that the example object at the 0th index serializes to a dict as expected by comparing to the pre-set diff --git a/python/lib/scheduler/dmod/scheduler/job/job.py b/python/lib/scheduler/dmod/scheduler/job/job.py index e7cf9b912..78d5fbd68 100644 --- a/python/lib/scheduler/dmod/scheduler/job/job.py +++ b/python/lib/scheduler/dmod/scheduler/job/job.py @@ -946,7 +946,7 @@ def parse_serialized_job_id(cls, serialized_value: Optional[str], **kwargs): raise RuntimeError(msg) def __init__(self, cpu_count: int, memory_size: int, model_request: ExternalRequest, - allocation_paradigm: Union[str, AllocationParadigm], alloc_priority: int = 0): + allocation_paradigm: Union[str, AllocationParadigm], alloc_priority: int = 0, *args, **kwargs): self._cpu_count = cpu_count self._memory_size = memory_size self._model_request = model_request @@ -1329,17 +1329,14 @@ def factory_init_from_deserialized_json(cls, json_obj: dict): return None # Create the object initially from the request - new_obj = cls(job_request=request) + new_obj = cls(job_request=request, cpu_count=cpus, memory_size=memory, allocation_paradigm=paradigm, + alloc_priority=priority) # Then update its properties based on the deserialized values, as those are considered most correct # Use property setter for job id to handle string or UUID new_obj.job_id = job_id - new_obj._cpu_count = cpus - new_obj._memory_size = memory - new_obj._allocation_paradigm = paradigm - new_obj._allocation_priority = priority new_obj._rsa_key_pair = rsa_key_pair new_obj._status = status new_obj._allocations = allocations @@ -1351,12 +1348,50 @@ def factory_init_from_deserialized_json(cls, json_obj: dict): return new_obj - def __init__(self, job_request: SchedulerRequestMessage): + @classmethod + def factory_init_from_request(cls, job_request: SchedulerRequestMessage) -> 'RequestedJob': + """ + Factory init function to create an object from the parameters implied by the job request. + + Parameters + ---------- + job_request + + Returns + ------- + + """ + return cls(job_request=job_request, cpu_count=job_request.cpus, memory_size=job_request.memory, + allocation_paradigm=job_request.allocation_paradigm) + + def __init__(self, job_request: SchedulerRequestMessage, **kwargs): + """ + Initialize this instance. + + Parameters + ---------- + job_request + args + kwargs + + Other Parameters + ---------- + cpu_count + memory_size + model_request + allocation_paradigm + alloc_priority + """ + if 'cpu_count' not in kwargs: + kwargs['cpu_count'] = job_request.cpus + if 'memory_size' not in kwargs: + kwargs['memory_size'] = job_request.memory + if 'allocation_paradigm' not in kwargs: + kwargs['allocation_paradigm'] = job_request.allocation_paradigm + + super().__init__(model_request=job_request.model_request, **kwargs) self._originating_request = job_request - super().__init__(cpu_count=job_request.cpus, memory_size=job_request.memory, - model_request=job_request.model_request, - allocation_paradigm=job_request.allocation_paradigm) - self.data_requirements = self.model_request.data_requirements + self.data_requirements = job_request.model_request.data_requirements @property def model_request(self) -> ExternalRequest: diff --git a/python/lib/scheduler/dmod/scheduler/job/job_manager.py b/python/lib/scheduler/dmod/scheduler/job/job_manager.py index 34a76835f..91552885c 100644 --- a/python/lib/scheduler/dmod/scheduler/job/job_manager.py +++ b/python/lib/scheduler/dmod/scheduler/job/job_manager.py @@ -4,6 +4,7 @@ from typing import Dict, List, Optional, Tuple, Union from uuid import UUID, uuid4 as random_uuid from dmod.core.execution import AllocationParadigm +from dmod.communication.maas_request.dmod_job_request import DmodJobRequest from .job import Job, JobExecPhase, JobExecStep, JobStatus, RequestedJob from .job_util import JobUtil, RedisBackedJobUtil from ..resources.resource_allocation import ResourceAllocation @@ -110,7 +111,7 @@ def build_prioritized_pending_allocation_queues(cls, jobs_eligible_for_allocate: pass @abstractmethod - def create_job(self, **kwargs) -> Job: + def create_job(self, request: DmodJobRequest, *args, **kwargs) -> Job: """ Create and return a new job object. @@ -123,8 +124,12 @@ def create_job(self, **kwargs) -> Job: Parameters ---------- + request : DmodJobRequest + The originating request for the job. + args + Other optional positional arguments kwargs - Other appropriate, implementation-specific keyed parameters supported for creating the job object. + Other optional keyword arguments. Returns ------- @@ -412,7 +417,7 @@ def _request_allocations_for_queue(self, jobs_priority_queue: List[Tuple[int, Re self.save_job(j) return allocated_successfully - def create_job(self, **kwargs) -> RequestedJob: + def create_job(self, request: SchedulerRequestMessage, *args, **kwargs) -> RequestedJob: """ Create and return a new job object that has been saved to the backend store. @@ -420,15 +425,17 @@ def create_job(self, **kwargs) -> RequestedJob: ::class:`SchedulerRequestMessage` as a parameter. This is in the ``request`` keyword arg. Parameters - ---------- - kwargs - Implementation-specific keyed parameters for creating appropriate job objects (see *Keyword Args* section). - - Keyword Args ------------ request : SchedulerRequestMessage The originating request for the job. - job_id : str, UUID, None + args + Other optional positional arguments + kwargs + Other optional keyword arguments. + + Other Parameters + ------------ + job_id : Union[str, UUID] Optional value to try use for the job's id, falling back to random if not present, invalid, or already used. Returns @@ -436,16 +443,19 @@ def create_job(self, **kwargs) -> RequestedJob: RequestedJob The newly created job object. """ - job_obj = RequestedJob(job_request=kwargs['request']) + job_obj = RequestedJob.factory_init_from_request(job_request=request) + # TODO: do some processing here or in the object init to build restrictions and constraints that make sense globally; + # i.e., make sure the requirements for forcings satisfy the necessary hydrofabric if the config doesn't include a specific subset explicitly + uuid_param = kwargs.get('job_id', random_uuid()) try: - job_uuid = kwargs['job_id'] if isinstance(kwargs['job_id'], UUID) else UUID(str(kwargs['job_id'])) - if not self._does_redis_key_exist(self._get_job_key_for_id(job_uuid)): - job_obj.job_id = job_uuid - else: - job_obj.job_id = random_uuid() + job_uuid = uuid_param if isinstance(uuid_param, UUID) else UUID(str(uuid_param)) except: - job_obj.job_id = random_uuid() + job_uuid = random_uuid() + + while self._does_redis_key_exist(self._get_job_key_for_id(job_uuid)): + job_uuid = random_uuid() + job_obj.job_id = job_uuid self.save_job(job_obj) return job_obj diff --git a/python/services/dataservice/setup.py b/python/services/dataservice/setup.py index 5e83ded8b..d1d6b6462 100644 --- a/python/services/dataservice/setup.py +++ b/python/services/dataservice/setup.py @@ -17,7 +17,7 @@ author_email='', url='', license='', - install_requires=['dmod-core>=0.3.0', 'dmod-communication>=0.11.0', 'dmod-scheduler>=0.10.0', 'dmod-modeldata>=0.9.0', - 'redis'], + install_requires=['dmod-core>=0.5.0', 'dmod-communication>=0.11.0', 'dmod-scheduler>=0.10.0', + 'dmod-modeldata>=0.9.0', 'redis'], packages=find_namespace_packages(exclude=['dmod.test', 'deprecated', 'conf', 'schemas', 'ssl', 'src']) ) diff --git a/python/services/requestservice/dmod/requestservice/_version.py b/python/services/requestservice/dmod/requestservice/_version.py index 3966a5f15..19442947c 100644 --- a/python/services/requestservice/dmod/requestservice/_version.py +++ b/python/services/requestservice/dmod/requestservice/_version.py @@ -1 +1 @@ -__version__ = '0.6.1' \ No newline at end of file +__version__ = '0.7.0' \ No newline at end of file