diff --git a/.travis.yml b/.travis.yml index ebc2ea8..f0850da 100644 --- a/.travis.yml +++ b/.travis.yml @@ -14,7 +14,7 @@ matrix: fast_finish: true install: - - docker build -t jobs-test --file docker/Dockerfile --build-arg HTCONDOR_VERSION --build-arg PYTHON_VERSION=$TRAVIS_PYTHON_VERSION . + - docker build -t jobs-test --file tests/_inf/Dockerfile --build-arg HTCONDOR_VERSION --build-arg PYTHON_VERSION=$TRAVIS_PYTHON_VERSION . script: - docker run jobs-test tests/travis.sh diff --git a/docs/Makefile b/docs/Makefile new file mode 100644 index 0000000..69fe55e --- /dev/null +++ b/docs/Makefile @@ -0,0 +1,19 @@ +# Minimal makefile for Sphinx documentation +# + +# You can set these variables from the command line. +SPHINXOPTS = +SPHINXBUILD = sphinx-build +SOURCEDIR = source +BUILDDIR = build + +# Put it first so that "make" without argument is like "make help". +help: + @$(SPHINXBUILD) -M help "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) + +.PHONY: help Makefile + +# Catch-all target: route all unknown targets to Sphinx using the new +# "make mode" option. $(O) is meant as a shortcut for $(SPHINXOPTS). +%: Makefile + @$(SPHINXBUILD) -M $@ "$(SOURCEDIR)" "$(BUILDDIR)" $(SPHINXOPTS) $(O) \ No newline at end of file diff --git a/docs/make.bat b/docs/make.bat new file mode 100644 index 0000000..4d9eb83 --- /dev/null +++ b/docs/make.bat @@ -0,0 +1,35 @@ +@ECHO OFF + +pushd %~dp0 + +REM Command file for Sphinx documentation + +if "%SPHINXBUILD%" == "" ( + set SPHINXBUILD=sphinx-build +) +set SOURCEDIR=source +set BUILDDIR=build + +if "%1" == "" goto help + +%SPHINXBUILD% >NUL 2>NUL +if errorlevel 9009 ( + echo. + echo.The 'sphinx-build' command was not found. Make sure you have Sphinx + echo.installed, then set the SPHINXBUILD environment variable to point + echo.to the full path of the 'sphinx-build' executable. Alternatively you + echo.may add the Sphinx directory to PATH. + echo. + echo.If you don't have Sphinx installed, grab it from + echo.http://sphinx-doc.org/ + exit /b 1 +) + +%SPHINXBUILD% -M %1 %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% +goto end + +:help +%SPHINXBUILD% -M help %SOURCEDIR% %BUILDDIR% %SPHINXOPTS% + +:end +popd diff --git a/docs/source/api.rst b/docs/source/api.rst new file mode 100644 index 0000000..3493351 --- /dev/null +++ b/docs/source/api.rst @@ -0,0 +1,93 @@ +API Reference +============= + +.. py:currentmodule:: htcondor_jobs + +Describing Jobs +--------------- + +.. autoclass:: SubmitDescription + + .. automethod:: as_submit + + +Submitting Jobs +--------------- + +.. autofunction:: submit + +.. autoclass:: Transaction + + .. automethod:: submit + + +Querying, Acting on, and Editing Jobs +------------------------------------- + +.. autoclass:: ConstraintHandle + + .. automethod:: query + + .. automethod:: remove + .. automethod:: hold + .. automethod:: release + .. automethod:: pause + .. automethod:: resume + .. automethod:: vacate + + .. automethod:: edit + + .. autoattribute:: constraint + .. automethod:: reduce + +Cluster Handles +--------------- + +.. autoclass:: ClusterHandle + + .. autoattribute:: state + + .. automethod:: save + .. automethod:: load + .. automethod:: to_json + .. automethod:: from_json + +.. autoclass:: ClusterState + + .. automethod:: is_complete + .. automethod:: any_running + .. automethod:: any_in_queue + .. automethod:: any_held + +Constraints +----------- + +.. autoclass:: Constraint + + .. automethod:: reduce + + +Combinators ++++++++++++ + +.. autoclass:: And + +.. autoclass:: Or + +.. autoclass:: Not + + + +Comparisons ++++++++++++ + +.. autoclass:: Comparison + +.. autoclass:: Operator + +.. autoclass:: ComparisonConstraint + +Shortcuts ++++++++++ + +.. autoclass:: InCluster diff --git a/docs/source/conf.py b/docs/source/conf.py new file mode 100644 index 0000000..08af161 --- /dev/null +++ b/docs/source/conf.py @@ -0,0 +1,74 @@ +# Configuration file for the Sphinx documentation builder. +# +# This file only contains a selection of the most common options. For a full +# list see the documentation: +# http://www.sphinx-doc.org/en/master/config + +# -- Path setup -------------------------------------------------------------- + +# If extensions (or modules to document with autodoc) are in another directory, +# add these directories to sys.path here. If the directory is relative to the +# documentation root, use os.path.abspath to make it absolute, like shown here. +# +# import os +# import sys +# sys.path.insert(0, os.path.abspath('.')) + + +# -- Project information ----------------------------------------------------- + +project = "htcondor-jobs" +copyright = "2019, CHTC" +author = "CHTC" + + +# -- General configuration --------------------------------------------------- + +# Add any Sphinx extension module names here, as strings. They can be +# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom +# ones. +extensions = [ + "sphinx.ext.autodoc", + "sphinx.ext.napoleon", + "sphinx_autodoc_typehints", + "sphinx.ext.intersphinx", + "sphinx.ext.viewcode", +] + +# Add any paths that contain templates here, relative to this directory. +templates_path = ["_templates"] + +# List of patterns, relative to source directory, that match files and +# directories to ignore when looking for source files. +# This pattern also affects html_static_path and html_extra_path. +exclude_patterns = [] + + +# -- Options for HTML output ------------------------------------------------- + +# The theme to use for HTML and HTML Help pages. See the documentation for +# a list of builtin themes. +# +html_theme = "sphinx_rtd_theme" + +# Add any paths that contain custom static files (such as style sheets) here, +# relative to this directory. They are copied after the builtin static files, +# so a file named "default.css" will overwrite the builtin "default.css". +html_static_path = ["_static"] + + +# -- Extension configuration ------------------------------------------------- + +autodoc_member_order = "bysource" +autoclass_content = "both" +autodoc_default_flags = ["undoc-members"] + +napoleon_use_rtype = True + +# -- Options for intersphinx extension --------------------------------------- + +# Example configuration for intersphinx: refer to the Python standard library. +intersphinx_mapping = { + "https://docs.python.org/": None, + "https://htcondor.readthedocs.io/en/latest/": None, +} diff --git a/docs/source/faq.rst b/docs/source/faq.rst new file mode 100644 index 0000000..8126fc0 --- /dev/null +++ b/docs/source/faq.rst @@ -0,0 +1,15 @@ +FAQ +=== + +.. py:currentmodule:: htmap + +.. _install: + +How do I install ``htcondor-jobs``? +----------------------------------- + +On Unix/Linux systems, running ``pip install htcondor-jobs`` from the command line should suffice. + +* To get the latest development version of ``htcondor-jobs``, run ``pip install git+https://github.com/htcondor/htcondor-jobs.git`` instead. +* Run ``pip install git+https://github.com/htcondor/htcondor-jobs.git@`` to install a specific branch. +* You may need to append ``--user`` to the ``pip`` command if you do not have permission to install packages directly into the Python you are using. diff --git a/docs/source/index.rst b/docs/source/index.rst new file mode 100644 index 0000000..2b9329c --- /dev/null +++ b/docs/source/index.rst @@ -0,0 +1,25 @@ +htcondor-jobs +============= + +.. py:currentmodule:: htcondor_jobs + + +:doc:`api` + Public API documentation. + +:doc:`faq` + These questions are asked, sometimes frequently. + +:doc:`version-history` + New features, bug fixes, and known issues by version. + + +.. toctree:: + :maxdepth: 2 + :hidden: + + self + api + faq + version-history + diff --git a/docs/source/version-history.rst b/docs/source/version-history.rst new file mode 100644 index 0000000..ae0f02c --- /dev/null +++ b/docs/source/version-history.rst @@ -0,0 +1,9 @@ +Version History +=============== + +.. toctree:: + :maxdepth: 1 + :glob: + :reversed: + + versions/* diff --git a/docs/source/versions/v0_1_0.rst b/docs/source/versions/v0_1_0.rst new file mode 100644 index 0000000..472b5fd --- /dev/null +++ b/docs/source/versions/v0_1_0.rst @@ -0,0 +1,16 @@ +v0.1.0 +====== + +New Features +------------ + +* Clusters of jobs can be described and submitted, returning a handle. +* Handles can be used to query, act on, or edit the jobs they are connected to. + + +Bug Fixes +--------- + + +Known Issues +------------ diff --git a/dr b/dr index a5c4df6..d9097fd 100644 --- a/dr +++ b/dr @@ -4,5 +4,5 @@ CONTAINER_TAG=jobs-tests set -e echo "Building htcondor-jobs testing container..." -docker build --quiet -t ${CONTAINER_TAG} --file docker/Dockerfile . +docker build --quiet -t ${CONTAINER_TAG} --file tests/_inf/Dockerfile . docker run -it --rm ${CONTAINER_TAG} $@ diff --git a/htcondor_jobs/__init__.py b/htcondor_jobs/__init__.py index 1652451..17b3abd 100644 --- a/htcondor_jobs/__init__.py +++ b/htcondor_jobs/__init__.py @@ -41,18 +41,19 @@ def version_info() -> _Tuple[int, int, int, str]: from .constraints import ( Constraint, - ComparisonConstraint, - Operator, - Comparison, BooleanConstraint, true, false, And, Or, Not, + Operator, + Comparison, + ComparisonConstraint, + InCluster, ) from .handles import Handle, ConstraintHandle, ClusterHandle from .descriptions import SubmitDescription from .submit import submit, Transaction -from .status import JobStatus +from .status import JobStatus, ClusterState from . import exceptions diff --git a/htcondor_jobs/constraints.py b/htcondor_jobs/constraints.py index 3ea1269..fb120c7 100644 --- a/htcondor_jobs/constraints.py +++ b/htcondor_jobs/constraints.py @@ -17,25 +17,22 @@ import logging import abc -import enum import dataclasses import itertools import classad +from . import utils, exceptions + logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) -logger.addHandler(logging.NullHandler()) - - -# https://en.wikipedia.org/wiki/Quine%E2%80%93McCluskey_algorithm -class StrEnum(str, enum.Enum): - pass +class Operator(utils.StrEnum): + """ + An enumeration of the possible ClassAd comparison operators. + """ - -class Operator(StrEnum): Equals = "==" NotEquals = "!=" GreaterEquals = ">=" @@ -44,10 +41,13 @@ class Operator(StrEnum): Less = "<" Is = "=?=" Isnt = "=!=" + # todo: add meta comparisons @dataclasses.dataclass(frozen=True) -class Comparison: +class Comparison(utils.SlotPickleMixin): + __slots__ = ("left", "operator", "right") + left: str operator: Operator right: Union[str, int, float, classad.ExprTree] @@ -57,12 +57,18 @@ def flatten(iterables): return itertools.chain.from_iterable(iterables) -class Constraint(abc.ABC): +class Constraint(abc.ABC, utils.SlotPickleMixin): + """An object that represents an HTCondor constraint expression.""" + + __slots__ = () + @abc.abstractmethod def __str__(self) -> str: raise NotImplementedError def reduce(self) -> "Constraint": + """Produce a possibly-simpler version of this constraint.""" + # todo: https://en.wikipedia.org/wiki/Quine%E2%80%93McCluskey_algorithm return self @abc.abstractmethod @@ -90,6 +96,8 @@ def __hash__(self) -> int: class BooleanConstraint(Constraint): + __slots__ = () + def __iter__(self) -> Iterator["BooleanConstraint"]: yield self @@ -102,6 +110,8 @@ def __bool__(self) -> bool: class _true(BooleanConstraint): + __slots__ = () + def __str__(self) -> str: return "true" @@ -113,6 +123,8 @@ def __invert__(self) -> BooleanConstraint: class _false(BooleanConstraint): + __slots__ = () + def __str__(self) -> str: return "false" @@ -128,7 +140,9 @@ def __invert__(self) -> BooleanConstraint: false = _false() -class MultiConstraint(Constraint): +class MultiConstraint(Constraint, abc.ABC): + __slots__ = ("_constraints",) + def __init__(self, *constraints: Union[Constraint, Iterable[Constraint]]): self._constraints = list(flatten(constraints)) @@ -160,6 +174,13 @@ def reduce(self) -> "Constraint": class And(MultiConstraint): + """ + A constraint that evaluates to ``true`` only if all of the given + ``constraints`` evaluate to ``true``. + """ + + __slots__ = () + def __str__(self) -> str: return " && ".join(f"({c})" for c in self) @@ -172,6 +193,13 @@ def reduce(self) -> Constraint: class Or(MultiConstraint): + """ + A constraint that evaluates to ``true`` if any of the given + ``constraints`` evaluate to ``true``. + """ + + __slots__ = () + def __str__(self) -> str: return " || ".join(f"({c})" for c in self) @@ -184,8 +212,15 @@ def reduce(self) -> Constraint: class Not(Constraint): + """ + A constraint which evaluates to ``true`` if the given + ``constraint`` evaluates to ``false``. + """ + + __slots__ = ("_constraint",) + def __init__(self, constraint: Constraint): - self.constraint = constraint + self._constraint = constraint def __iter__(self) -> Iterator[Constraint]: yield self @@ -194,18 +229,20 @@ def __len__(self) -> int: return 1 def __str__(self) -> str: - return f"!({self.constraint})" + return f"!({self._constraint})" def reduce(self) -> "Constraint": - if self.constraint is true: + if self._constraint is true: return false - elif self.constraint is false: + elif self._constraint is false: return true return super().reduce() class ComparisonConstraint(Constraint): + __slots__ = ("expr",) + def __init__( self, key: str, @@ -214,6 +251,24 @@ def __init__( ): self.expr = Comparison(key, operator, value) + @classmethod + def from_expr(cls, expr): + try: + key, operator, value = expr.split(" ") + except ValueError: + raise exceptions.ExpressionParseFailed( + f"Comparison expression {expr} was not in the form 'key operator value'" + ) + + try: + operator = Operator(operator) + except ValueError as e: + raise exceptions.ExpressionParseFailed( + f"'{operator}' is not a valid Operator" + ) from e + + return cls(key, operator, value) + def __iter__(self) -> Iterator[Constraint]: yield self @@ -228,5 +283,9 @@ def __repr__(self) -> str: class InCluster(ComparisonConstraint): + """A :class:`ComparisonConstraint` that targets a single ``ClusterID``.""" + + __slots__ = () + def __init__(self, clusterid: int): super().__init__(key="ClusterId", operator=Operator.Equals, value=clusterid) diff --git a/htcondor_jobs/descriptions.py b/htcondor_jobs/descriptions.py index 5fcf98c..0f430e2 100644 --- a/htcondor_jobs/descriptions.py +++ b/htcondor_jobs/descriptions.py @@ -21,15 +21,35 @@ logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) -logger.addHandler(logging.NullHandler()) -T_SUBMIT_VALUE = Union[str, int, float, classad.ExprTree] +T_SUBMIT_VALUE = Union[str, int, float, bool, classad.ExprTree] class SubmitDescription(MutableMapping[str, T_SUBMIT_VALUE]): + """ + Describes a single cluster of jobs. + The description behaves like a dictionary of key-values pairs, + where each pair is a submit descriptor + (see `the manual `_). + :class:`SubmitDescription` supports the standard dictionary methods such as + ``get``, ``setdefault``, ``keys``, ``items``, etc., + as well as the ``[]`` operator for both getting and setting. + """ + + __slots__ = ("_descriptors",) + def __init__( self, mapping: Optional[Mapping] = None, **descriptors: T_SUBMIT_VALUE ): + """ + Parameters + ---------- + mapping + An optional mapping which provides initial key-value pairs for the + description. + descriptors + Additional submit descriptors, provided as keyword arguments. + """ if mapping is None: mapping = {} self._descriptors = dict(mapping, **descriptors) @@ -54,4 +74,15 @@ def __str__(self) -> str: return "\n".join(f"{k} = {v}" for k, v in self.items()) def as_submit(self) -> htcondor.Submit: + """ + Generate a :class:`htcondor.Submit` + from this :class:`SubmitDescription`. + """ return htcondor.Submit(str(self)) + + def copy(self, **descriptors): + """ + Produce a copy of this :class:`SubmitDescription`, + with the given ``descriptors`` changed. + """ + return self.__class__(self._descriptors, **descriptors) diff --git a/htcondor_jobs/exceptions.py b/htcondor_jobs/exceptions.py index c625062..f594784 100644 --- a/htcondor_jobs/exceptions.py +++ b/htcondor_jobs/exceptions.py @@ -28,3 +28,15 @@ class InvalidHandle(JobsException): class UninitializedTransaction(JobsException): pass + + +class NoJobEventLog(JobsException): + pass + + +class Timeout(JobsException): + pass + + +class ExpressionParseFailed(JobsException): + pass diff --git a/htcondor_jobs/handles.py b/htcondor_jobs/handles.py index cc8aeae..30e3d28 100644 --- a/htcondor_jobs/handles.py +++ b/htcondor_jobs/handles.py @@ -13,28 +13,32 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import List, Optional, Union, Iterator, Any +from typing import List, Optional, Union, Iterator, Any, Callable import logging import abc +import time +from pathlib import Path +import pickle +import operator import htcondor import classad -from . import constraints, locate, exceptions - +from . import constraints, locate, status, utils, exceptions logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) -logger.addHandler(logging.NullHandler()) -class Handle(abc.ABC): +class Handle(abc.ABC, utils.SlotPickleMixin): """ - A handle for a group of jobs defined by a constraint, given as a string. + A connection to a set of jobs defined by a constraint. The handle can be used to query, act on, or edit those jobs. """ + __slots__ = ("collector", "scheduler", "__weakref__") + def __init__( self, collector: Optional[str] = None, scheduler: Optional[str] = None ): @@ -46,7 +50,22 @@ def constraint_string(self) -> str: raise NotImplementedError def __repr__(self): - return f"{self.__class__.__name__}({self.constraint_string})" + return f"{self.__class__.__name__}(constraint = {self.constraint_string})" + + def __eq__(self, other): + return all( + ( + isinstance(other, self.__class__), + self.constraint_string == other.constraint_string, + self.collector == other.collector, + self.scheduler == other.scheduler, + ) + ) + + def __hash__(self): + return hash( + (self.__class__, self.constraint_string, self.collector, self.scheduler) + ) @property def schedd(self): @@ -55,7 +74,7 @@ def schedd(self): def query( self, projection: List[str] = None, - opts: Optional[htcondor.QueryOpts] = None, + options: Optional[htcondor.QueryOpts] = None, limit: Optional[int] = None, ) -> Iterator[classad.ClassAd]: """ @@ -66,21 +85,21 @@ def query( projection The :class:`classad.ClassAd` attributes to retrieve, as a list of case-insensitive strings. If ``None`` (the default), all attributes will be returned. - opts + options limit The total number of matches to return from the query. If ``None`` (the default), return all matches. Returns ------- - ads : + ads : Iterator[:class:`classad.ClassAd`] An iterator over the :class:`classad.ClassAd` that match the constraint. """ if projection is None: projection = [] - if opts is None: - opts = htcondor.QueryOpts.Default + if options is None: + options = htcondor.QueryOpts.Default if limit is None: limit = -1 @@ -89,7 +108,7 @@ def query( logger.info( f"Executing query against schedd {self.schedd} with constraint {cs}, projection {projection}, and limit {limit}" ) - return self.schedd.xquery(cs, projection=projection, opts=opts, limit=limit) + return self.schedd.xquery(cs, projection=projection, opts=options, limit=limit) def _act(self, action: htcondor.JobAction) -> classad.ClassAd: cs = self.constraint_string @@ -104,7 +123,7 @@ def remove(self) -> classad.ClassAd: Returns ------- - ad + ad : :class:`classad.ClassAd` An ad describing the results of the action. """ return self._act(htcondor.JobAction.Remove) @@ -115,7 +134,7 @@ def hold(self) -> classad.ClassAd: Returns ------- - ad + ad : :class:`classad.ClassAd` An ad describing the results of the action. """ return self._act(htcondor.JobAction.Hold) @@ -127,7 +146,7 @@ def release(self) -> classad.ClassAd: Returns ------- - ad + ad : :class:`classad.ClassAd` An ad describing the results of the action. """ return self._act(htcondor.JobAction.Release) @@ -139,7 +158,7 @@ def pause(self) -> classad.ClassAd: Returns ------- - ad + ad : :class:`classad.ClassAd` An ad describing the results of the action. """ return self._act(htcondor.JobAction.Suspend) @@ -150,7 +169,7 @@ def resume(self) -> classad.ClassAd: Returns ------- - ad + ad : :class:`classad.ClassAd` An ad describing the results of the action. """ return self._act(htcondor.JobAction.Continue) @@ -162,7 +181,7 @@ def vacate(self) -> classad.ClassAd: Returns ------- - ad + ad : :class:`classad.ClassAd` An ad describing the results of the action. """ return self._act(htcondor.JobAction.Vacate) @@ -187,7 +206,7 @@ def edit(self, attr: str, value: Union[str, int, float]) -> classad.ClassAd: Returns ------- - ad + ad : :class:`classad.ClassAd` An ad describing the results of the edit. """ cs = self.constraint_string @@ -199,9 +218,12 @@ def edit(self, attr: str, value: Union[str, int, float]) -> classad.ClassAd: class ConstraintHandle(Handle): """ - A handle defined by a :class:`constraints.Constraint`. + A connection to a set of jobs defined by a :class:`Constraint`. + The handle can be used to query, act on, or edit those jobs. """ + __slots__ = ("_constraint",) + def __init__( self, constraint: constraints.Constraint, @@ -220,82 +242,236 @@ def constraint(self) -> constraints.Constraint: def constraint_string(self) -> str: return str(self.constraint) + def __repr__(self): + return f"{self.__class__.__name__}(constraint = {self.constraint})" + def reduce(self) -> "ConstraintHandle": return ConstraintHandle( self.constraint.reduce(), collector=self.collector, scheduler=self.scheduler ) - def __and__(self, other: "ConstraintHandle") -> "ConstraintHandle": - if not all( - (self.collector == other.collector, self.scheduler == other.scheduler) + def __and__( + self, other: Union["ConstraintHandle", constraints.Constraint, str] + ) -> "ConstraintHandle": + return self._combine(other, operator.and_) + + def __or__( + self, other: Union["ConstraintHandle", constraints.Constraint, str] + ) -> "ConstraintHandle": + return self._combine(other, operator.or_) + + def _combine( + self, other: Union["ConstraintHandle", constraints.Constraint, str], combinator + ): + if isinstance(other, ConstraintHandle) and ( + self.collector != other.collector or self.scheduler != other.scheduler ): raise exceptions.InvalidHandle( "Cannot construct a handle for separate schedds" ) - return ConstraintHandle( - self.constraint & other.constraint, - collector=self.collector, - scheduler=self.scheduler, - ) - - def __or__(self, other: "ConstraintHandle") -> "ConstraintHandle": - if not all( - (self.collector == other.collector, self.scheduler == other.scheduler) - ): + if isinstance(other, ConstraintHandle): + c = other.constraint + elif isinstance(other, constraints.Constraint): + c = other + elif isinstance(other, str): + c = constraints.ComparisonConstraint.from_expr(other) + else: raise exceptions.InvalidHandle( - "Cannot construct a handle for separate schedds" + f"Cannot construct a combined handle from {self} and {other} because {other} is not a ConstraintHandle, Constraint, or comparison constraint expression" ) return ConstraintHandle( - self.constraint | other.constraint, + combinator(self.constraint, c), collector=self.collector, scheduler=self.scheduler, ) - def __eq__(self, other: Any) -> bool: - return all( - ( - isinstance(other, ConstraintHandle), - self.constraint == other.constraint, - self.collector == other.collector, - self.scheduler == other.scheduler, - ) - ) + def save(self, path: Path) -> None: + """Save this :class:`ConstraintHandle` to a file at ``path`` for later use (see :meth:`ConstraintHandle.load`).""" + with path.open(mode="wb") as f: + pickle.dump(self, f, protocol=-1) + + @classmethod + def load(cls, path: Path) -> "ConstraintHandle": + """Load a :class:`ConstraintHandle` from a file at ``path`` that was created by :meth:`ConstraintHandle.save`.""" + with path.open(mode="rb") as f: + return pickle.load(f) + + +COMPACT_STATE_SWITCHOVER_SIZE = 100_000 class ClusterHandle(ConstraintHandle): + """ + A subclass of :class:`ConstraintHandle` that targets a single cluster of jobs, + as produced by :func:`submit`. + + Because this handle targets a single cluster of jobs, it has superpowers. + If the cluster has an event log + (``log = `` in the :class:`SubmitDescription`, + see `the docs `_), + this handle's ``state`` attribute will be a :class:`ClusterState` that provides + information about the current state of the jobs in the cluster. + """ + + __slots__ = ("clusterid", "clusterad", "_first_proc", "_num_procs", "_state") + def __init__( self, - clusterid: int, - clusterad: Optional[classad.ClassAd] = None, + submit_result: htcondor.SubmitResult, collector: Optional[str] = None, scheduler: Optional[str] = None, ): + self.clusterid = submit_result.cluster() + self.clusterad = submit_result.clusterad() + self._first_proc = submit_result.first_proc() + self._num_procs = submit_result.num_procs() + super().__init__( - constraint=constraints.InCluster(clusterid), + constraint=constraints.InCluster(self.clusterid), collector=collector, scheduler=scheduler, ) - self.clusterid = clusterid - - if clusterad is None: - # try to get the clusterad from the schedd - try: - clusterad = next(self.query(opts=htcondor.QueryOpts(0x10), limit=1)) - except IndexError: - # no clusterad in the schedd - # try to get a jobad from the schedd's history - try: - clusterad = next(self.schedd.history(self.constraint_string, [], 1)) - except StopIteration: - clusterad = None - self.clusterad = clusterad - - @classmethod - def from_submit_result(cls, result: htcondor.SubmitResult) -> "ClusterHandle": - return cls(clusterid=result.cluster(), clusterad=result.clusterad()) + # must delay this until after init, because at this point the submit + # transaction may not be done yet + self._state = None def __int__(self): return self.clusterid + + def __repr__(self): + batch_name = self.clusterad.get("JobBatchName", None) + batch = f", JobBatchName = {batch_name}" if batch_name is not None else "" + return f"{self.__class__.__name__}(ClusterID = {self.clusterid}{batch})" + + def __len__(self): + return self._num_procs + + @property + def first_proc(self): + return self._first_proc + + @property + def state(self) -> status.ClusterState: + """A :class:`ClusterState` that provides information about job state for this cluster.""" + if self._state is None: + if len(self) > COMPACT_STATE_SWITCHOVER_SIZE: + state_type = status.CompactClusterState + else: + state_type = status.ClusterState + + self._state = state_type(self) + + return self._state + + def wait( + self, + condition: Callable[["ClusterHandle"], bool] = None, + timeout: Optional[Union[int, float]] = None, + test_delay: Union[int, float] = 0.25, + ) -> float: + """ + Wait for some condition to be satisfied. + By default, this waits until all of the jobs in the cluster are complete, + equivalent to + + .. code:: python + + handle.wait( + condition = lambda hnd: hnd.state.is_complete() + ) + + Where possible, for increased efficiency, use :class:`ClusterState` methods or + status counts instead of raw job statuses to determine the state of the + cluster. + + Parameters + ---------- + condition + A callable that defines the state to wait for. + It will be called with the :class:`ClusterHandle` as its only argument, + and when it returns ``True``, ``wait_for_condition`` will complete. + **The default condition is to wait for all jobs to be :class:`JobStatus.COMPLETED`.** + timeout + The maximum amount of time to wait before raising a + :class:`exceptions.WaitedTooLong` exception. + **The ``condition`` will always be checked at least once, even if ``timeout <= 0``**. + test_delay + The amount of time to wait between test loops. + + Returns + ------- + elapsed_time : + The amount of time spent waiting. + """ + if condition is None: + condition = lambda hnd: hnd.state.is_complete() + + start_time = time.time() + while not condition(self): + if timeout is not None and time.time() > start_time + timeout: + raise exceptions.Timeout( + f"waited too long for handle {self} to satisfy {condition}" + ) + time.sleep(test_delay) + return time.time() - start_time + + def __getstate__(self): + state = super().__getstate__() + + state["_state"] = None # remove state tracker + + return state + + def to_json(self) -> dict: + """Return a JSON-formatted dictionary that describes the :class:`ClusterHandle`.""" + return dict( + clusterid=self.clusterid, + clusterad=str(self.clusterad), + first_proc=self.first_proc, + num_procs=len(self), + collector=self.collector, + scheduler=self.scheduler, + ) + + @classmethod + def from_json(cls, json: dict): + """Return a :class:`ClusterHandle` from the dictionary produced by :meth:`ClusterHandle.to_json`.""" + submit_result = _MockSubmitResult( + clusterid=json["clusterid"], + clusterad=classad.parseOne(json["clusterad"]), + first_proc=json["first_proc"], + num_procs=json["num_procs"], + ) + + return cls( + submit_result, collector=json["collector"], scheduler=json["scheduler"] + ) + + +class _MockSubmitResult: + """ + This class is used purely to transform unpacked submit results back into + "submit results" to accommodate the :class:`ClusterHandle` constructor. + **Should not be used in user code.** + """ + + def __init__(self, clusterid, clusterad, first_proc, num_procs): + self._clusterid = clusterid + self._clusterad = clusterad + self._first_proc = first_proc + self._num_procs = num_procs + + def cluster(self): + return self._clusterid + + def clusterad(self): + return self._clusterad + + def first_proc(self): + return self._first_proc + + def num_procs(self): + return self._num_procs diff --git a/htcondor_jobs/locate.py b/htcondor_jobs/locate.py index b4598a2..901ae1f 100644 --- a/htcondor_jobs/locate.py +++ b/htcondor_jobs/locate.py @@ -23,7 +23,6 @@ logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) -logger.addHandler(logging.NullHandler()) K = TypeVar("K") V = TypeVar("V") @@ -42,6 +41,8 @@ class TimedCache(collections.abc.MutableMapping, Generic[K, V]): As a dictionary, except that the entries expire after a specified amount of time. """ + __slots__ = ("cache_time", "cache") + def __init__(self, *, cache_time: Union[int, float]): """ Parameters diff --git a/htcondor_jobs/status.py b/htcondor_jobs/status.py index 24553f9..4b62c4f 100644 --- a/htcondor_jobs/status.py +++ b/htcondor_jobs/status.py @@ -16,17 +16,200 @@ import logging import enum +import array +import collections +from pathlib import Path +import functools +import weakref +from typing import MutableSequence +import htcondor + +from . import handles, utils, exceptions logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) -logger.addHandler(logging.NullHandler()) class JobStatus(enum.IntEnum): - Idle = 1 - Running = 2 - Removed = 3 - Completed = 4 - Held = 5 - TransferringOutput = 6 + IDLE = 1 + RUNNING = 2 + REMOVED = 3 + COMPLETED = 4 + HELD = 5 + TRANSFERRING_OUTPUT = 6 + SUSPENDED = 7 # todo: ? + UNMATERIALIZED = 100 + + +JOB_EVENT_STATUS_TRANSITIONS = { + htcondor.JobEventType.SUBMIT: JobStatus.IDLE, + htcondor.JobEventType.JOB_EVICTED: JobStatus.IDLE, + htcondor.JobEventType.JOB_UNSUSPENDED: JobStatus.IDLE, + htcondor.JobEventType.JOB_RELEASED: JobStatus.IDLE, + htcondor.JobEventType.SHADOW_EXCEPTION: JobStatus.IDLE, + htcondor.JobEventType.JOB_RECONNECT_FAILED: JobStatus.IDLE, + htcondor.JobEventType.JOB_TERMINATED: JobStatus.COMPLETED, + htcondor.JobEventType.EXECUTE: JobStatus.RUNNING, + htcondor.JobEventType.JOB_HELD: JobStatus.HELD, + htcondor.JobEventType.JOB_SUSPENDED: JobStatus.SUSPENDED, + htcondor.JobEventType.JOB_ABORTED: JobStatus.REMOVED, +} + +NO_EVENT_LOG = object() + + +def update_before(func): + @functools.wraps(func) + def wrapper(self, *args, **kwargs): + self._update() + return func(self, *args, **kwargs) + + return wrapper + + +class ClusterState: + """ + A class that manages the state of the cluster tracked by a :class:`ClusterHandle`. + It reads from the cluster's event log internally and provides a variety of views + of the individual job states. + + .. warning:: + + :class:`ClusterState` objects should not be instantiated manually. + :class:`ClusterHandle` will create them automatically when needed. + """ + + __slots__ = ( + "_handle", + "_clusterid", + "_offset", + "_event_log_path", + "_events", + "_data", + "_counts", + ) + + def __init__(self, handle: "handles.ClusterHandle"): + self._handle = weakref.proxy(handle) + self._clusterid = handle.clusterid + self._offset = handle.first_proc + + raw_event_log_path = utils.chain_get( + handle.clusterad, ("UserLog", "DAGManNodesLog"), default=NO_EVENT_LOG + ) + if raw_event_log_path is NO_EVENT_LOG: + raise exceptions.NoJobEventLog( + "this cluster does not have a job event log, so it cannot track job state" + ) + self._event_log_path = Path(raw_event_log_path).absolute() + + self._events = None + + self._data = self._make_initial_data(handle) + self._counts = collections.Counter(JobStatus(js) for js in self._data) + + def _make_initial_data(self, handle: "handles.ClusterHandle") -> MutableSequence: + return [JobStatus.UNMATERIALIZED for _ in range(len(handle))] + + def _update(self): + logger.debug(f"triggered status update for handle {self._handle}") + + if self._events is None: + logger.debug( + f"looking for event log for handle {self._handle} at {self._event_log_path}" + ) + self._events = htcondor.JobEventLog(self._event_log_path.as_posix()).events( + 0 + ) + logger.debug( + f"initialized event log reader for handle {self._handle}, targeting {self._event_log_path}" + ) + + for event in self._events: + if event.cluster != self._clusterid: + continue + + new_status = JOB_EVENT_STATUS_TRANSITIONS.get(event.type, None) + if new_status is not None: + key = event.proc - self._offset + + # update counts + old_status = self._data[key] + self._counts[old_status] -= 1 + self._counts[new_status] += 1 + + # set new status on individual job + self._data[key] = new_status + + logger.debug(f"new status counts for {self._handle}: {self._counts}") + + @update_before + def __getitem__(self, proc: int) -> JobStatus: + return self._data[proc - self._offset] + + @update_before + def counts(self) -> collections.Counter: + """ + Return the number of jobs in each :class:`JobStatus`, as a :class:`collections.Counter`. + """ + return self._counts.copy() + + @update_before + def __iter__(self): + yield from self._data + + @update_before + def __str__(self): + return str(self._data) + + @update_before + def __repr__(self): + return repr(self._data) + + def __len__(self): + return len(self._data) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self._handle == other._handle + + def is_complete(self) -> bool: + """Return ``True`` if **all** of the jobs in the cluster are complete.""" + return self.counts()[JobStatus.COMPLETED] == len(self) + + def any_running(self) -> bool: + """Return ``True`` if **any** of the jobs in the cluster are running.""" + return self.counts()[JobStatus.RUNNING] > 0 + + def any_in_queue(self) -> bool: + """Return ``True`` if **any** of the jobs in the cluster are still in the queue (idle, running, or held).""" + c = self.counts() + jobs_in_queue = sum( + (c[JobStatus.IDLE], c[JobStatus.RUNNING], c[JobStatus.HELD]) + ) + return jobs_in_queue > 0 + + def any_held(self) -> bool: + """Return ``True`` if **any** of the jobs in the cluster are held.""" + return self.counts()[JobStatus.HELD] > 0 + + +class CompactClusterState(ClusterState): + """ + A specialized :class:`ClusterState` that uses a more compact + internal data structure for storing job state. + """ + + # The internal storage is an unsigned byte array. + # Because JobStatus is an IntEnum, we can insert JobStatus values directly + # as long as they're small. + # However, when they come back out, they'll just be integers, and we need + # to turn them back into JobStatus. + + __slots__ = () + + def _make_initial_data(self, handle: "handles.ClusterHandle") -> MutableSequence: + return array.array("B", [JobStatus.UNMATERIALIZED for _ in range(len(handle))]) + + def __getitem__(self, proc: int): + return JobStatus(super().__getitem__(proc)) diff --git a/htcondor_jobs/submit.py b/htcondor_jobs/submit.py index 4f3a9f2..fe88aac 100644 --- a/htcondor_jobs/submit.py +++ b/htcondor_jobs/submit.py @@ -24,7 +24,6 @@ logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) -logger.addHandler(logging.NullHandler()) T_ITEMDATA = Union[str, int, float] T_ITEMDATA_MAPPING = Mapping[str, T_ITEMDATA] @@ -40,7 +39,28 @@ def submit( itemdata: Optional[Iterable[T_ITEMDATA_ELEMENT]] = None, collector: Optional[str] = None, scheduler: Optional[str] = None, -) -> handles.ConstraintHandle: +) -> handles.ClusterHandle: + """ + Submit a single cluster of jobs based on a submit description. + If you are submitting many clusters at once, + you should do so on a single :class:`Transaction`. + + Parameters + ---------- + description + A submit description. + count + The number of jobs to submit **for each element of the itemdata**. + If ``itemdata`` is ``None``, this is the total number of jobs to submit. + itemdata + collector + scheduler + + Returns + ------- + handle : :class:`ClusterHandle` + A handle connected to the jobs that were submitted. + """ with Transaction(collector=collector, scheduler=scheduler) as txn: handle = txn.submit(description, count, itemdata) @@ -48,9 +68,21 @@ def submit( class Transaction: + __slots__ = ("collector", "scheduler", "_schedd", "_txn") + def __init__( self, collector: Optional[str] = None, scheduler: Optional[str] = None ): + """ + Open a transaction with a schedd. + If you are submitting many clusters at once, + you should do so on a single transaction. + + Parameters + ---------- + collector + scheduler + """ self.collector = collector self.scheduler = scheduler @@ -62,10 +94,15 @@ def submit( description: descriptions.SubmitDescription, count: Optional[int] = 1, itemdata: Optional[Iterable[T_ITEMDATA_ELEMENT]] = None, - ) -> handles.ConstraintHandle: + ) -> handles.ClusterHandle: + """ + Identical to :func:`submit`, + except without the ``collector`` and ``scheduler`` arguments, + which are instead given to the :class:`Transaction`. + """ if any((self._schedd is None, self._txn is None)): raise exceptions.UninitializedTransaction( - "the transaction has not been initialized (use it as a context manager)" + "the Transaction has not been initialized (use it as a context manager)" ) sub = description.as_submit() @@ -78,7 +115,9 @@ def submit( itemdata_msg = "" result = sub.queue_with_itemdata(self._txn, count, itemdata) - handle = handles.ClusterHandle.from_submit_result(result) + handle = handles.ClusterHandle( + result, collector=self.collector, scheduler=self.scheduler + ) logger.info( f"Submitted {repr(sub)} to {self._schedd} on transaction {self._txn} with count {count}{itemdata_msg}" @@ -99,14 +138,14 @@ def __exit__(self, exc_type, exc_val, exc_tb): def check_itemdata(itemdata: List[T_ITEMDATA_ELEMENT]) -> None: if len(itemdata) < 1: - raise exceptions.InvalidItemdata("empty itemdata") + raise exceptions.InvalidItemdata("empty itemdata, pass itemdata = None instead") - if isinstance(itemdata[0], collections.abc.Mapping): + if all(isinstance(item, collections.abc.Mapping) for item in itemdata): return _check_itemdata_as_mappings(itemdata) - elif isinstance(itemdata[0], collections.abc.Sequence): + elif all(isinstance(item, collections.abc.Sequence) for item in itemdata): return _check_itemdata_as_sequences(itemdata) - raise exceptions.InvalidItemdata("illegal itemdata type") + raise exceptions.InvalidItemdata(f"mixed or illegal itemdata types") def _check_itemdata_as_mappings(itemdata: List[T_ITEMDATA_MAPPING]) -> None: diff --git a/htcondor_jobs/utils.py b/htcondor_jobs/utils.py new file mode 100644 index 0000000..d112eaa --- /dev/null +++ b/htcondor_jobs/utils.py @@ -0,0 +1,75 @@ +# Copyright 2019 HTCondor Team, Computer Sciences Department, +# University of Wisconsin-Madison, WI. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Optional, Any, Mapping, Iterable + +import enum + + +class StrEnum(str, enum.Enum): + pass + + +class SlotPickleMixin: + """A mixin class which lets classes with __slots__ be pickled.""" + + __slots__ = () + + def __getstate__(self): + # get all the __slots__ in the inheritance tree + # if any class has a __dict__, it will be included! no special case needed + slots = sum((getattr(c, "__slots__", ()) for c in self.__class__.__mro__), ()) + + state = dict( + (slot, getattr(self, slot)) for slot in slots if hasattr(self, slot) + ) + + # __weakref__ should always be removed from the state dict + state.pop("__weakref__", None) + + return state + + def __setstate__(self, state: Mapping): + for slot, value in state.items(): + object.__setattr__(self, slot, value) + + +def chain_get(mapping: Mapping, keys: Iterable[str], default: Optional[Any] = None): + """ + As Mapping.get(key, default), except that it will try multiple keys before returning the default. + + Parameters + ---------- + mapping + The :class:`collections.abc.Mapping` to get from. + keys + The keys to try, in order. + default + What to return if none of the keys are in the mapping. + Defaults to ``None``. + + Returns + ------- + val : + The value of the first key that was in the mapping, + or the ``default`` if none of the keys were in the mapping. + """ + for k in keys: + try: + return mapping[k] + except KeyError: + pass + + return default diff --git a/docker/Dockerfile b/tests/_inf/Dockerfile similarity index 92% rename from docker/Dockerfile rename to tests/_inf/Dockerfile index e5996c9..1aa1bc1 100644 --- a/docker/Dockerfile +++ b/tests/_inf/Dockerfile @@ -83,15 +83,15 @@ RUN pip install --no-cache -r /home/${SUBMIT_USER}/requirements_dev.txt \ # set default entrypoint and command # the entrypoint is critical: it starts HTCondor in the container -ENTRYPOINT ["docker/entrypoint.sh"] +ENTRYPOINT ["tests/_inf/entrypoint.sh"] CMD ["pytest"] # copy HTCondor testing config into place -COPY docker/condor_config.local /etc/condor/condor_config.local +COPY tests/_inf/condor_config.local /etc/condor/condor_config.local # copy package into container and install it # this is the only part that can't be cached against editing the package COPY --chown=jobber:jobber . /home/${SUBMIT_USER}/htcondor-jobs WORKDIR /home/${SUBMIT_USER}/htcondor-jobs -RUN chmod +x /home/${SUBMIT_USER}/htcondor-jobs/docker/entrypoint.sh /home/${SUBMIT_USER}/htcondor-jobs/tests/travis.sh \ - && pip install --no-cache --no-deps -e . +RUN chmod +x /home/${SUBMIT_USER}/htcondor-jobs/tests/_inf/entrypoint.sh /home/${SUBMIT_USER}/htcondor-jobs/tests/travis.sh \ + && pip install --no-cache-dir --no-deps -e . diff --git a/docker/condor_config.local b/tests/_inf/condor_config.local similarity index 87% rename from docker/condor_config.local rename to tests/_inf/condor_config.local index 58b8d6d..22e0456 100644 --- a/docker/condor_config.local +++ b/tests/_inf/condor_config.local @@ -14,12 +14,12 @@ EXECUTE=$(LOCAL_DIR)/execute CRED_STORE_DIR=$(LOCAL_DIR)/cred_dir # Tuning so jobs start quickly -SCHEDD_INTERVAL=5 -NEGOTIATOR_INTERVAL=2 -NEGOTIATOR_CYCLE_DELAY=5 -STARTER_UPDATE_INTERVAL=5 -SHADOW_QUEUE_UPDATE_INTERVAL=10 -UPDATE_INTERVAL=5 +SCHEDD_INTERVAL=1 +NEGOTIATOR_INTERVAL=1 +NEGOTIATOR_CYCLE_DELAY=1 +STARTER_UPDATE_INTERVAL=1 +SHADOW_QUEUE_UPDATE_INTERVAL=1 +UPDATE_INTERVAL=1 RUNBENCHMARKS=0 # Don't use all the machine resources diff --git a/docker/entrypoint.sh b/tests/_inf/entrypoint.sh similarity index 100% rename from docker/entrypoint.sh rename to tests/_inf/entrypoint.sh diff --git a/tests/conftest.py b/tests/conftest.py index 08bb40b..47a9362 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -14,6 +14,7 @@ # limitations under the License. import pytest +import os import htcondor_jobs as jobs from htcondor_jobs.locate import SCHEDD_CACHE @@ -24,6 +25,25 @@ def clear_schedd_cache(): SCHEDD_CACHE.clear() +@pytest.fixture(scope="function", autouse=True) +def clear_queue(): + yield + os.system("condor_rm --all") + + +@pytest.fixture(scope="function") +def long_sleep(tmp_path): + return jobs.SubmitDescription( + executable="/bin/sleep", + arguments="5m", + log=(tmp_path / "events.log").as_posix(), + ) + + @pytest.fixture(scope="function") -def long_sleep(): - return jobs.SubmitDescription(executable="/bin/sleep", args="5m") +def short_sleep(tmp_path): + return jobs.SubmitDescription( + executable="/bin/sleep", + arguments="1s", + log=(tmp_path / "events.log").as_posix(), + ) diff --git a/tests/integration/test_cluster_handles/__init__.py b/tests/integration/test_cluster_handles/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/integration/test_cluster_handles/test_json.py b/tests/integration/test_cluster_handles/test_json.py new file mode 100644 index 0000000..da250c7 --- /dev/null +++ b/tests/integration/test_cluster_handles/test_json.py @@ -0,0 +1,63 @@ +# Copyright 2019 HTCondor Team, Computer Sciences Department, +# University of Wisconsin-Madison, WI. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +import time + +import classad + +import htcondor_jobs as jobs + + +@pytest.fixture(scope="function") +def roundtripped_handle(short_sleep): + a = jobs.submit(short_sleep) + + j = a.to_json() + b = jobs.ClusterHandle.from_json(j) + + return a, b + + +def test_save_then_load_is_equal_and_same_hash(roundtripped_handle): + a, b = roundtripped_handle + + assert a == b + assert hash(a) == hash(b) + + +def test_clusterad_is_reconstructed_correctly(roundtripped_handle): + a, b = roundtripped_handle + + # we have to do this awful manual nonsense because if the values are + # expressions, doing a == will just return a new expression. + # ... so instead, we manually check that the lengths and contents are the same + # (plain dict equality will internally use ==, so no help there) + da = dict(a.clusterad) + db = dict(b.clusterad) + assert len(da) == len(db) + for (ka, va) in da.items(): + if isinstance(va, classad.ExprTree): + assert str(va) == str(db[ka]) + else: + assert va == db[ka] + + +def test_states_are_same(roundtripped_handle): + a, b = roundtripped_handle + + a.wait(timeout=180) + + assert list(a.state) == list(b.state) diff --git a/tests/integration/test_cluster_handles/test_pickling.py b/tests/integration/test_cluster_handles/test_pickling.py new file mode 100644 index 0000000..d1cc5f5 --- /dev/null +++ b/tests/integration/test_cluster_handles/test_pickling.py @@ -0,0 +1,45 @@ +# Copyright 2019 HTCondor Team, Computer Sciences Department, +# University of Wisconsin-Madison, WI. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +import time + +import htcondor_jobs as jobs + + +@pytest.fixture(scope="function") +def roundtripped_handle(short_sleep, tmp_path): + path = tmp_path / "handle.pkl" + a = jobs.submit(short_sleep) + + a.save(path) + b = jobs.ClusterHandle.load(path) + + return a, b + + +def test_save_then_load_is_equal_and_same_hash(roundtripped_handle): + a, b = roundtripped_handle + + assert a == b + assert hash(a) == hash(b) + + +def test_states_are_same(roundtripped_handle): + a, b = roundtripped_handle + + a.wait(timeout=180) + + assert list(a.state) == list(b.state) diff --git a/tests/integration/test_cluster_handles/test_state.py b/tests/integration/test_cluster_handles/test_state.py new file mode 100644 index 0000000..074265e --- /dev/null +++ b/tests/integration/test_cluster_handles/test_state.py @@ -0,0 +1,93 @@ +# Copyright 2019 HTCondor Team, Computer Sciences Department, +# University of Wisconsin-Madison, WI. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +import htcondor_jobs as jobs + + +def test_no_job_event_log(): + desc = jobs.SubmitDescription(executable="/bin/sleep", arguments="5m") + + handle = jobs.submit(desc, count=1) + + with pytest.raises(jobs.exceptions.NoJobEventLog): + handle.state + + +def test_hold(long_sleep): + handle = jobs.submit(long_sleep, count=1) + + handle.hold() + + assert handle.state[0] is jobs.JobStatus.HELD + + +def test_hold_count(long_sleep, tmp_path): + handle = jobs.submit(long_sleep, count=1) + + handle.hold() + + assert handle.state.counts()[jobs.JobStatus.HELD] == 1 + + +def test_is_complete(short_sleep): + handle = jobs.submit(short_sleep, count=1) + + handle.wait(timeout=180) + + assert handle.state.is_complete() + + +def test_any_in_queue_when_idle(long_sleep): + handle = jobs.submit(long_sleep, count=1) + + handle.wait(condition=lambda h: h.state[0] is jobs.JobStatus.IDLE, timeout=180) + # yes, it could start running now... oh well + assert handle.state.any_in_queue() + + +def test_any_in_queue_when_held(long_sleep): + handle = jobs.submit(long_sleep, count=1) + + handle.hold() + handle.wait(condition=lambda h: h.state[0] is jobs.JobStatus.HELD, timeout=180) + + assert handle.state.any_in_queue() + + +def test_any_in_queue_when_running(long_sleep): + handle = jobs.submit(long_sleep, count=1) + + handle.wait(condition=lambda h: h.state[0] is jobs.JobStatus.RUNNING, timeout=180) + + assert handle.state.any_in_queue() + + +def test_any_running(long_sleep): + handle = jobs.submit(long_sleep, count=1) + + handle.wait(condition=lambda h: h.state[0] is jobs.JobStatus.RUNNING, timeout=180) + + assert handle.state.any_running() + + +def test_any_held(long_sleep): + handle = jobs.submit(long_sleep, count=1) + + handle.hold() + handle.wait(condition=lambda h: h.state[0] is jobs.JobStatus.HELD, timeout=180) + + assert handle.state.any_held() diff --git a/tests/integration/test_cluster_handles/test_wait.py b/tests/integration/test_cluster_handles/test_wait.py new file mode 100644 index 0000000..99244ad --- /dev/null +++ b/tests/integration/test_cluster_handles/test_wait.py @@ -0,0 +1,34 @@ +# Copyright 2019 HTCondor Team, Computer Sciences Department, +# University of Wisconsin-Madison, WI. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest +import time + +import htcondor_jobs as jobs + + +def test_wait(short_sleep): + handle = jobs.submit(short_sleep, count=1) + + handle.wait(timeout=180) + + assert handle.state.is_complete() + + +def test_timeout(long_sleep): + handle = jobs.submit(long_sleep, count=1) + + with pytest.raises(jobs.exceptions.Timeout): + handle.wait(timeout=0) diff --git a/tests/integration/test_handles/test_actions.py b/tests/integration/test_handles/test_actions.py index 953e550..55058db 100644 --- a/tests/integration/test_handles/test_actions.py +++ b/tests/integration/test_handles/test_actions.py @@ -15,6 +15,8 @@ import pytest +import time + import htcondor_jobs as jobs @@ -28,8 +30,10 @@ def test_hold(long_sleep): handle.hold() + time.sleep(5) + status = get_status(handle) - assert status == jobs.JobStatus.Held + assert status == jobs.JobStatus.HELD @pytest.mark.parametrize( diff --git a/tests/travis.sh b/tests/travis.sh index 83e3a7a..5ff6354 100644 --- a/tests/travis.sh +++ b/tests/travis.sh @@ -2,6 +2,6 @@ set -e -pytest -n 10 --cov +pytest --cov codecov -t f53345d1-71af-4dfa-ade6-16ce5bb3cba6 diff --git a/tests/unit/constraints/test_comparison_constraint.py b/tests/unit/constraints/test_comparison_constraint.py index f6e432f..00fcf04 100644 --- a/tests/unit/constraints/test_comparison_constraint.py +++ b/tests/unit/constraints/test_comparison_constraint.py @@ -39,3 +39,20 @@ def test_len(): c = jobs.ComparisonConstraint("foo", jobs.Operator.Equals, 0) assert len(c) == 1 + + +@pytest.mark.parametrize( + "c, t", + [ + (jobs.ComparisonConstraint("foo", jobs.Operator.Equals, 0), "foo == 0"), + (jobs.ComparisonConstraint("foo", jobs.Operator.NotEquals, 0), "foo != 0"), + (jobs.ComparisonConstraint("foo", jobs.Operator.Greater, 0), "foo > 0"), + (jobs.ComparisonConstraint("foo", jobs.Operator.GreaterEquals, 0), "foo >= 0"), + (jobs.ComparisonConstraint("foo", jobs.Operator.Less, 0), "foo < 0"), + (jobs.ComparisonConstraint("foo", jobs.Operator.LessEquals, 0), "foo <= 0"), + (jobs.ComparisonConstraint("foo", jobs.Operator.Is, 0), "foo =?= 0"), + (jobs.ComparisonConstraint("foo", jobs.Operator.Isnt, 0), "foo =!= 0"), + ], +) +def test_from_expr(c, t): + assert jobs.ComparisonConstraint.from_expr(t) == c diff --git a/tests/unit/handles/test_constraint_handle.py b/tests/unit/handles/test_constraint_handle.py index 1f65217..a10f421 100644 --- a/tests/unit/handles/test_constraint_handle.py +++ b/tests/unit/handles/test_constraint_handle.py @@ -44,3 +44,61 @@ def test_cannot_combine_handles_with_different_schedulers(combinator): with pytest.raises(jobs.exceptions.InvalidHandle): combinator(h1, h2) + + +@pytest.mark.parametrize("combinator", [operator.and_, operator.or_]) +def test_can_combine_handle_with_constraint(combinator): + h = jobs.ConstraintHandle( + jobs.ComparisonConstraint("foo", jobs.Operator.Equals, "bar") + ) + c = jobs.ComparisonConstraint("fizz", jobs.Operator.Equals, "buzz") + + combined = combinator(h, c) + + assert isinstance(combined, jobs.ConstraintHandle) + + +@pytest.mark.parametrize("combinator", [operator.and_, operator.or_]) +def test_can_combine_handle_with_comparison_constraint_string(combinator): + h = jobs.ConstraintHandle( + jobs.ComparisonConstraint("foo", jobs.Operator.Equals, "bar") + ) + c = "fizz == buzz" + + combined = combinator(h, c) + + assert isinstance(combined, jobs.ConstraintHandle) + + +@pytest.mark.parametrize("combinator", [operator.and_, operator.or_]) +def test_cannot_combine_handle_with_arbitrary_string(combinator): + h = jobs.ConstraintHandle( + jobs.ComparisonConstraint("foo", jobs.Operator.Equals, "bar") + ) + c = "dsifjaodgj" + + with pytest.raises(jobs.exceptions.ExpressionParseFailed): + combined = combinator(h, c) + + +@pytest.mark.parametrize("combinator", [operator.and_, operator.or_]) +@pytest.mark.parametrize("bad_value", [None, True, 1, 5.5, {}, [], set()]) +def test_cannot_combine_handle_with_other_types(combinator, bad_value): + h = jobs.ConstraintHandle( + jobs.ComparisonConstraint("foo", jobs.Operator.Equals, "bar") + ) + c = bad_value + + with pytest.raises(jobs.exceptions.InvalidHandle): + combined = combinator(h, c) + + +@pytest.mark.parametrize("combinator", [operator.and_, operator.or_]) +def test_cannot_combine_handle_with_bad_operator(combinator): + h = jobs.ConstraintHandle( + jobs.ComparisonConstraint("foo", jobs.Operator.Equals, "bar") + ) + c = "foo !?= bar" + + with pytest.raises(jobs.exceptions.ExpressionParseFailed): + combined = combinator(h, c) diff --git a/tests/unit/utils/__init__.py b/tests/unit/utils/__init__.py new file mode 100644 index 0000000..e69de29