Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring and fixing issues with job requests types #300

Merged
2 changes: 1 addition & 1 deletion python/lib/client/dmod/client/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.1.1'
__version__ = '0.3.0'
2 changes: 1 addition & 1 deletion python/lib/client/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,6 @@
license='',
include_package_data=True,
#install_requires=['websockets', 'jsonschema'],vi
install_requires=['dmod-core>=0.1.0', 'websockets>=8.1', 'pyyaml', 'dmod-communication>=0.7.0', 'dmod-externalrequests>=0.3.0'],
install_requires=['dmod-core>=0.1.0', 'websockets>=8.1', 'pyyaml', 'dmod-communication>=0.11.0', 'dmod-externalrequests>=0.3.0'],
packages=find_namespace_packages(include=['dmod.*'], exclude=['dmod.test'])
)
24 changes: 20 additions & 4 deletions python/lib/communication/dmod/communication/scheduler_request.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from dmod.core.execution import AllocationParadigm
from .maas_request import ModelExecRequest, ModelExecRequestResponse
from .message import AbstractInitRequest, MessageEventType, Response
from typing import Optional, Union
from .maas_request.dmod_job_request import DmodJobRequest
from .message import MessageEventType, Response
from typing import Optional, Union, List

from dmod.core.meta_data import DataRequirement, DataFormat

class SchedulerRequestMessage(AbstractInitRequest):

class SchedulerRequestMessage(DmodJobRequest):

event_type: MessageEventType = MessageEventType.SCHEDULER_REQUEST
""" :class:`MessageEventType`: the event type for this message implementation """
Expand Down Expand Up @@ -60,7 +63,12 @@ def factory_init_from_deserialized_json(cls, json_obj: dict):

# TODO: may need to generalize the underlying request to support, say, scheduling evaluation jobs
def __init__(self, model_request: ModelExecRequest, user_id: str, cpus: Optional[int] = None, mem: Optional[int] = None,
allocation_paradigm: Optional[Union[str, AllocationParadigm]] = None):
allocation_paradigm: Optional[Union[str, AllocationParadigm]] = None, *args, **kwargs):
if 'config_data_id' not in kwargs and len(args) == 0:
kwargs['config_data_id'] = model_request.config_data_id
elif (args[0] if len(args) > 0 else kwargs['config_data_id']) != model_request.config_data_id:
raise ValueError('Bad init value for "config_data_id" that does not match model_request')
super(SchedulerRequestMessage, self).__init__(*args, **kwargs)
self._model_request = model_request
self._user_id = user_id
self._cpus = cpus
Expand Down Expand Up @@ -113,6 +121,10 @@ def cpus(self) -> int:
"""
return self.model_request.cpu_count if self._cpus is None else self._cpus

@property
def data_requirements(self) -> List[DataRequirement]:
return self.model_request.data_requirements

@property
def memory(self) -> int:
"""
Expand Down Expand Up @@ -161,6 +173,10 @@ def nested_event(self) -> MessageEventType:
"""
return self.model_request.get_message_event_type()

@property
def output_formats(self) -> List[DataFormat]:
return self.model_request.output_formats

@property
def user_id(self) -> str:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,29 @@ def test_factory_init_from_deserialized_json_1_a(self):
obj = SchedulerRequestMessage.factory_init_from_deserialized_json(self.request_jsons[example_index])
self.assertEqual(obj, self.request_objs[example_index])

def test_config_data_id_1_a(self):
"""
Test that the ``config_data_id`` of the object matches the same value within the nested model request.
"""
example_index = 1
ex_obj = self.request_objs[example_index]
self.assertEqual(ex_obj.model_request.config_data_id, ex_obj.config_data_id)

def test_config_data_id_1_b(self):
"""
Test that an instance will not create if we try to init with a non-matching ``config_data_id``.
"""
example_index = 1
ex_obj = self.request_objs[example_index]
bogus_config_data_id = ex_obj.model_request.config_data_id + "_bogus_text_non_matching"

self.assertRaises(ValueError, SchedulerRequestMessage,
model_request=ex_obj.model_request,
user_id=ex_obj.user_id,
config_data_id=bogus_config_data_id,
cpus=ex_obj.cpus, mem=ex_obj.memory,
allocation_paradigm=ex_obj.allocation_paradigm)

def test_to_dict_0_a(self):
"""
Assert that the example object at the 0th index serializes to a dict as expected by comparing to the pre-set
Expand Down
57 changes: 46 additions & 11 deletions python/lib/scheduler/dmod/scheduler/job/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -946,7 +946,7 @@ def parse_serialized_job_id(cls, serialized_value: Optional[str], **kwargs):
raise RuntimeError(msg)

def __init__(self, cpu_count: int, memory_size: int, model_request: ExternalRequest,
allocation_paradigm: Union[str, AllocationParadigm], alloc_priority: int = 0):
allocation_paradigm: Union[str, AllocationParadigm], alloc_priority: int = 0, *args, **kwargs):
self._cpu_count = cpu_count
self._memory_size = memory_size
self._model_request = model_request
Expand Down Expand Up @@ -1329,17 +1329,14 @@ def factory_init_from_deserialized_json(cls, json_obj: dict):
return None

# Create the object initially from the request
new_obj = cls(job_request=request)
new_obj = cls(job_request=request, cpu_count=cpus, memory_size=memory, allocation_paradigm=paradigm,
alloc_priority=priority)

# Then update its properties based on the deserialized values, as those are considered most correct

# Use property setter for job id to handle string or UUID
new_obj.job_id = job_id

new_obj._cpu_count = cpus
new_obj._memory_size = memory
new_obj._allocation_paradigm = paradigm
new_obj._allocation_priority = priority
new_obj._rsa_key_pair = rsa_key_pair
new_obj._status = status
new_obj._allocations = allocations
Expand All @@ -1351,12 +1348,50 @@ def factory_init_from_deserialized_json(cls, json_obj: dict):

return new_obj

def __init__(self, job_request: SchedulerRequestMessage):
@classmethod
def factory_init_from_request(cls, job_request: SchedulerRequestMessage) -> 'RequestedJob':
"""
Factory init function to create an object from the parameters implied by the job request.

Parameters
----------
job_request

Returns
-------

"""
return cls(job_request=job_request, cpu_count=job_request.cpus, memory_size=job_request.memory,
allocation_paradigm=job_request.allocation_paradigm)

def __init__(self, job_request: SchedulerRequestMessage, **kwargs):
"""
Initialize this instance.

Parameters
----------
job_request
args
kwargs

Other Parameters
----------
cpu_count
memory_size
model_request
allocation_paradigm
alloc_priority
"""
if 'cpu_count' not in kwargs:
kwargs['cpu_count'] = job_request.cpus
if 'memory_size' not in kwargs:
kwargs['memory_size'] = job_request.memory
if 'allocation_paradigm' not in kwargs:
kwargs['allocation_paradigm'] = job_request.allocation_paradigm

super().__init__(model_request=job_request.model_request, **kwargs)
self._originating_request = job_request
super().__init__(cpu_count=job_request.cpus, memory_size=job_request.memory,
model_request=job_request.model_request,
allocation_paradigm=job_request.allocation_paradigm)
self.data_requirements = self.model_request.data_requirements
self.data_requirements = job_request.model_request.data_requirements

@property
def model_request(self) -> ExternalRequest:
Expand Down
42 changes: 26 additions & 16 deletions python/lib/scheduler/dmod/scheduler/job/job_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import Dict, List, Optional, Tuple, Union
from uuid import UUID, uuid4 as random_uuid
from dmod.core.execution import AllocationParadigm
from dmod.communication.maas_request.dmod_job_request import DmodJobRequest
from .job import Job, JobExecPhase, JobExecStep, JobStatus, RequestedJob
from .job_util import JobUtil, RedisBackedJobUtil
from ..resources.resource_allocation import ResourceAllocation
Expand Down Expand Up @@ -110,7 +111,7 @@ def build_prioritized_pending_allocation_queues(cls, jobs_eligible_for_allocate:
pass

@abstractmethod
def create_job(self, **kwargs) -> Job:
def create_job(self, request: DmodJobRequest, *args, **kwargs) -> Job:
"""
Create and return a new job object.

Expand All @@ -123,8 +124,12 @@ def create_job(self, **kwargs) -> Job:

Parameters
----------
request : DmodJobRequest
The originating request for the job.
args
Other optional positional arguments
kwargs
Other appropriate, implementation-specific keyed parameters supported for creating the job object.
Other optional keyword arguments.

Returns
-------
Expand Down Expand Up @@ -412,40 +417,45 @@ def _request_allocations_for_queue(self, jobs_priority_queue: List[Tuple[int, Re
self.save_job(j)
return allocated_successfully

def create_job(self, **kwargs) -> RequestedJob:
def create_job(self, request: SchedulerRequestMessage, *args, **kwargs) -> RequestedJob:
"""
Create and return a new job object that has been saved to the backend store.

Since this class works with ::class:`RequestedJob` objects, a new object must receive a
::class:`SchedulerRequestMessage` as a parameter. This is in the ``request`` keyword arg.

Parameters
----------
kwargs
Implementation-specific keyed parameters for creating appropriate job objects (see *Keyword Args* section).

Keyword Args
------------
request : SchedulerRequestMessage
The originating request for the job.
job_id : str, UUID, None
args
Other optional positional arguments
kwargs
Other optional keyword arguments.

Other Parameters
------------
job_id : Union[str, UUID]
Optional value to try use for the job's id, falling back to random if not present, invalid, or already used.

Returns
-------
RequestedJob
The newly created job object.
"""
job_obj = RequestedJob(job_request=kwargs['request'])
job_obj = RequestedJob.factory_init_from_request(job_request=request)
# TODO: do some processing here or in the object init to build restrictions and constraints that make sense globally;
# i.e., make sure the requirements for forcings satisfy the necessary hydrofabric if the config doesn't include a specific subset explicitly
uuid_param = kwargs.get('job_id', random_uuid())
try:
job_uuid = kwargs['job_id'] if isinstance(kwargs['job_id'], UUID) else UUID(str(kwargs['job_id']))
if not self._does_redis_key_exist(self._get_job_key_for_id(job_uuid)):
job_obj.job_id = job_uuid
else:
job_obj.job_id = random_uuid()
job_uuid = uuid_param if isinstance(uuid_param, UUID) else UUID(str(uuid_param))
except:
job_obj.job_id = random_uuid()
job_uuid = random_uuid()

while self._does_redis_key_exist(self._get_job_key_for_id(job_uuid)):
job_uuid = random_uuid()

job_obj.job_id = job_uuid
self.save_job(job_obj)
return job_obj

Expand Down
4 changes: 2 additions & 2 deletions python/services/dataservice/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
author_email='',
url='',
license='',
install_requires=['dmod-core>=0.3.0', 'dmod-communication>=0.11.0', 'dmod-scheduler>=0.10.0', 'dmod-modeldata>=0.9.0',
'redis'],
install_requires=['dmod-core>=0.5.0', 'dmod-communication>=0.11.0', 'dmod-scheduler>=0.10.0',
'dmod-modeldata>=0.9.0', 'redis'],
packages=find_namespace_packages(exclude=['dmod.test', 'deprecated', 'conf', 'schemas', 'ssl', 'src'])
)
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.6.1'
__version__ = '0.7.0'