From 8aabf5931245b5a2dbf3960fe6a37edefb9ef9f1 Mon Sep 17 00:00:00 2001 From: Vince Date: Fri, 21 Jun 2019 22:41:12 -0400 Subject: [PATCH 01/34] first pass at infrastructure for pipeline requirements declaration; #195; https://github.com/databio/pypiper/issues/94 --- docs/changelog.md | 4 +++ looper/_version.py | 2 +- looper/exceptions.py | 23 +++++++++----- looper/looper.py | 3 +- looper/pipeline_interface.py | 50 +++++++++++++++++++++++++++---- looper/pipereqs.py | 58 ++++++++++++++++++++++++++++++++++++ looper/project.py | 21 ++++++++++--- 7 files changed, 141 insertions(+), 20 deletions(-) create mode 100644 looper/pipereqs.py diff --git a/docs/changelog.md b/docs/changelog.md index b3af00f2b..b8e44a7bb 100644 --- a/docs/changelog.md +++ b/docs/changelog.md @@ -2,6 +2,10 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html) and [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) format. +## [Unreleased] +### Added +- Ability to declare `required_executables` in a `PipelineInterface`, to trigger a naive "runnability" check for a sample submission + ## [0.12.3] -- 2019-06-20 ### Fixed - Bug in `Sample` YAML naming, whereby a base `Sample` was being suffixed as a subtype would be, leading to a pipeline argument based on `yaml_file` that did not exist on disk. diff --git a/looper/_version.py b/looper/_version.py index 8e1395bd3..d2ab16709 100644 --- a/looper/_version.py +++ b/looper/_version.py @@ -1 +1 @@ -__version__ = "0.12.3" +__version__ = "0.12.4-dev" diff --git a/looper/exceptions.py b/looper/exceptions.py index bb3fd0c4e..995b72c7b 100644 --- a/looper/exceptions.py +++ b/looper/exceptions.py @@ -1,20 +1,18 @@ """ Exceptions for specific looper issues. """ from abc import ABCMeta -import sys -if sys.version_info < (3, 3): - from collections import Iterable -else: - from collections.abc import Iterable +from collections import Iterable __author__ = "Vince Reuter" __email__ = "vreuter@virginia.edu" _all__ = ["DuplicatePipelineKeyException", "InvalidResourceSpecificationException", - "JobSubmissionException", "LooperError", + "JobSubmissionException", + "LooperError", "MissingPipelineConfigurationException", - "PipelineInterfaceConfigError"] + "PipelineInterfaceConfigError", + "PipelineInterfaceRequirementsError"] class LooperError(Exception): @@ -40,7 +38,7 @@ class JobSubmissionException(LooperError): def __init__(self, sub_cmd, script): self.script = script reason = "Error for command {} and script '{}'".\ - format(sub_cmd, self.script) + format(sub_cmd, self.script) super(JobSubmissionException, self).__init__(reason) @@ -61,3 +59,12 @@ def __init__(self, context): if not isinstance(context, str) and isinstance(context, Iterable): context = "Missing section(s): {}".format(", ".join(context)) super(PipelineInterfaceConfigError, self).__init__(context) + + +class PipelineInterfaceRequirementsError(LooperError): + """ Invalid specification of pipeline requirements in interface config. """ + def __init__(self, typename_by_requirement): + super(PipelineInterfaceRequirementsError, self).__init__( + "{} invalid requirements: {}".format( + len(typename_by_requirement), typename_by_requirement)) + self.error_specs = typename_by_requirement diff --git a/looper/looper.py b/looper/looper.py index 0c8c6a49b..75b8f433c 100755 --- a/looper/looper.py +++ b/looper/looper.py @@ -261,7 +261,8 @@ def process_protocols(prj, protocols, resource_setting_kwargs=None, **kwargs): submission_bundles = prj.build_submission_bundles(proto) if not submission_bundles: if proto != GENERIC_PROTOCOL_KEY: - _LOGGER.warning("No mapping for protocol: '%s'", proto) + _LOGGER.warning( + "No valid pipelines for protocol '{}'".format(proto)) continue for pl_iface, sample_subtype, pl_key, script_with_flags in \ submission_bundles: diff --git a/looper/pipeline_interface.py b/looper/pipeline_interface.py index 28cbfa958..f0dede1d8 100644 --- a/looper/pipeline_interface.py +++ b/looper/pipeline_interface.py @@ -1,20 +1,18 @@ """ Model the connection between a pipeline and a project or executor. """ +from collections import Iterable, Mapping import inspect import logging import os -import sys -if sys.version_info < (3, 3): - from collections import Mapping -else: - from collections.abc import Mapping import warnings import yaml from yaml import SafeLoader from .exceptions import InvalidResourceSpecificationException, \ - MissingPipelineConfigurationException, PipelineInterfaceConfigError + MissingPipelineConfigurationException, PipelineInterfaceConfigError, \ + PipelineInterfaceRequirementsError +from .pipereqs import * from .sample import Sample from .utils import get_logger from attmap import PathExAttMap @@ -30,6 +28,7 @@ PL_KEY = "pipelines" PROTOMAP_KEY = "protocol_mapping" +PIPELINE_REQUIREMENTS_KEY = "required_executables" RESOURCES_KEY = "resources" SUBTYPE_MAPPING_SECTION = "sample_subtypes" @@ -79,6 +78,29 @@ def __init__(self, config): config = standardize_protocols(config) self.add_entries(config) + # Establish requirements for across this interface's pipelines, and + # propagate those to individual pipelines. + universal_requirements = self.setdefault(PIPELINE_REQUIREMENTS_KEY, {}) + if isinstance(universal_requirements, str): + universal_requirements = [universal_requirements] + if isinstance(universal_requirements, Mapping): + reqdat, errors = {}, {} + for req, reqtype in universal_requirements.items(): + try: + reqdat[req] = create_pipeline_requirement(req, typename=reqtype) + except ValueError: + errors[req] = reqtype + if errors: + raise PipelineInterfaceRequirementsError(errors) + elif isinstance(universal_requirements, Iterable): + reqdat = {r: RequiredExecutable(r) for r in universal_requirements} + else: + raise TypeError( + "Interface-wide pipeline requirements are non-iterable ({})". + format(type(universal_requirements).__name__)) + for p, pipe_data in self[PL_KEY].items(): + pipe_data.setdefault(PIPELINE_REQUIREMENTS_KEY, {}).update(reqdat) + def __repr__(self): """ String representation """ source = self.pipe_iface_file or "Mapping" @@ -474,6 +496,11 @@ def iterpipes(self): """ return iter(self.pipelines.items()) + def missing_requirements(self, pipeline): + pipe_data = self.select_pipeline(pipeline) + return {k: v.req for k, v in pipe_data[PIPELINE_REQUIREMENTS_KEY].items() + if not v.satisfied} + @property def pipeline_names(self): """ @@ -552,6 +579,17 @@ def uses_looper_args(self, pipeline_name): config = self.select_pipeline(pipeline_name) return "looper_args" in config and config["looper_args"] + def validate(self, pipeline): + """ + + + :param str pipeline: key for the pipeline to validate + :return Mapping[str, str]: binding between + :raise MissingPipelineConfigurationException: if the requested pipeline + is not defined in this interface + """ + return not self.missing_requirements(pipeline) + def expand_pl_paths(piface): """ diff --git a/looper/pipereqs.py b/looper/pipereqs.py new file mode 100644 index 000000000..858936609 --- /dev/null +++ b/looper/pipereqs.py @@ -0,0 +1,58 @@ +""" Pipeline requirements declaration """ + +import os + +__author__ = "Vince Reuter" +__email__ = "vreuter@virginia.edu" + +__all__ = ["create_pipeline_requirement", "PipelineRequirement", + "RequiredExecutable", "RequiredPath"] + + +class PipelineRequirement(object): + + def __init__(self, req, check): + self.req = req + if not hasattr(check, "__call__"): + raise TypeError("Validator isn't callable ({})". + format(type(check).__name__)) + self.check = check + + @property + def satisfied(self): + return self.check(self.req) + + +class RequiredPath(PipelineRequirement): + + def __init__(self, p, check=None, folder=None): + if check is None: + if folder in [False, True]: + check = os.path.isdir if folder else os.path.isfile + else: + raise ValueError( + "If no validation function is provided, folder argument " + "must be boolean; got {} ({})".format( + folder, type(folder).__name__)) + super(RequiredPath, self).__init__(p, check) + + +class RequiredExecutable(PipelineRequirement): + + def __init__(self, cmd, check=None, locs=None): + if check is None: + locs = locs or [os.getenv("PATH")] + check = lambda c: any(c in l for l in locs) + super(RequiredExecutable, self).__init__(cmd, check) + + + +def create_pipeline_requirement(req, typename, **kwargs): + if typename == "executable": + return RequiredExecutable(req, **kwargs) + if typename == "file": + return RequiredPath(req, folder=False) + elif typename == "folder": + return RequiredPath(req, folder=True) + else: + raise ValueError("Invalid requirement typename: '{}'".format(typename)) diff --git a/looper/project.py b/looper/project.py index ffaacc036..3b4eba379 100644 --- a/looper/project.py +++ b/looper/project.py @@ -9,7 +9,8 @@ from peppy import METADATA_KEY, OUTDIR_KEY from peppy.utils import is_command_callable from .const import * -from .exceptions import DuplicatePipelineKeyException +from .exceptions import DuplicatePipelineKeyException, \ + PipelineInterfaceRequirementsError from .pipeline_interface import PROTOMAP_KEY from .project_piface_group import ProjectPifaceGroup from .utils import get_logger, partition @@ -160,8 +161,7 @@ def build_submission_bundles(self, protocol, priority=True): for pipeline_key in new_scripts: # Determine how to reference the pipeline and where it is. strict_pipe_key, full_pipe_path, full_pipe_path_with_flags = \ - pipe_iface.finalize_pipeline_key_and_paths( - pipeline_key) + pipe_iface.finalize_pipeline_key_and_paths(pipeline_key) # Skip and warn about nonexistent alleged pipeline path. if not (os.path.exists(full_pipe_path) or @@ -170,6 +170,15 @@ def build_submission_bundles(self, protocol, priority=True): "Missing pipeline script: '%s'", full_pipe_path) continue + if not pipe_iface.validate(pipeline_key): + unmet = pipe_iface.missing_requirements(pipeline_key) + _LOGGER.warning( + "{n} requirements unsatisfied for pipeline '{p}' " + "(interface from {s}): {data}".format( + n=len(unmet), p=pipeline_key, s=pipe_iface.source, + data=unmet)) + continue + # Determine which interface and Sample subtype to use. sample_subtype = \ pipe_iface.fetch_sample_subtype( @@ -335,7 +344,11 @@ def process_pipeline_interfaces(pipeline_interface_locations): if os.path.splitext(f)[1] in [".yaml", ".yml"]] for f in fs: _LOGGER.debug("Processing interface definition: {}".format(f)) - iface_group.update(f) + try: + iface_group.update(f) + except PipelineInterfaceRequirementsError as e: + _LOGGER.warning("Cannot build pipeline interface from {} ({})". + format(f, str(e))) return iface_group From 3732fadc1bf58fd37600deaff89980cbb02a0be0 Mon Sep 17 00:00:00 2001 From: Vince Date: Fri, 21 Jun 2019 23:00:33 -0400 Subject: [PATCH 02/34] organize tests --- tests/models/pipeline_interface/__init__.py | 0 tests/models/{ => pipeline_interface}/conftest.py | 0 tests/models/{ => pipeline_interface}/test_PipelineInterface.py | 0 .../test_PipelineInterface_sample_subtypes.py | 0 4 files changed, 0 insertions(+), 0 deletions(-) create mode 100644 tests/models/pipeline_interface/__init__.py rename tests/models/{ => pipeline_interface}/conftest.py (100%) rename tests/models/{ => pipeline_interface}/test_PipelineInterface.py (100%) rename tests/models/{ => pipeline_interface}/test_PipelineInterface_sample_subtypes.py (100%) diff --git a/tests/models/pipeline_interface/__init__.py b/tests/models/pipeline_interface/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/models/conftest.py b/tests/models/pipeline_interface/conftest.py similarity index 100% rename from tests/models/conftest.py rename to tests/models/pipeline_interface/conftest.py diff --git a/tests/models/test_PipelineInterface.py b/tests/models/pipeline_interface/test_PipelineInterface.py similarity index 100% rename from tests/models/test_PipelineInterface.py rename to tests/models/pipeline_interface/test_PipelineInterface.py diff --git a/tests/models/test_PipelineInterface_sample_subtypes.py b/tests/models/pipeline_interface/test_PipelineInterface_sample_subtypes.py similarity index 100% rename from tests/models/test_PipelineInterface_sample_subtypes.py rename to tests/models/pipeline_interface/test_PipelineInterface_sample_subtypes.py From 794f02c738add7405c93cb873a76914ff3e32d7b Mon Sep 17 00:00:00 2001 From: Vince Date: Fri, 21 Jun 2019 23:44:42 -0400 Subject: [PATCH 03/34] fix main PipelineInterface tests, remove reqs declaration for comparison --- tests/helpers.py | 18 ++++++++++++++++++ .../test_PipelineInterface.py | 6 ++++-- 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/tests/helpers.py b/tests/helpers.py index 2bb49a110..d9b3adccb 100644 --- a/tests/helpers.py +++ b/tests/helpers.py @@ -63,3 +63,21 @@ def randconf(ext=".yaml"): :return str: randomly generated string to function as filename """ return randstr(LETTERS_AND_DIGITS, 15) + ext + + +def remove_piface_requirements(data): + """ + Remove the requirements declaration section from all mappings. + + :param Mapping data: (likely nested) mappings + :return Mapping: same as input, but with requirements keys removed + """ + from collections import Mapping + from looper.pipeline_interface import PIPELINE_REQUIREMENTS_KEY as REQS_KEY + def go(m, acc): + for k, v in m.items(): + if k == REQS_KEY: + continue + acc[k] = go(v, {}) if isinstance(v, Mapping) else v + return acc + return go(data, {}) diff --git a/tests/models/pipeline_interface/test_PipelineInterface.py b/tests/models/pipeline_interface/test_PipelineInterface.py index 71a0eba71..6e764defc 100644 --- a/tests/models/pipeline_interface/test_PipelineInterface.py +++ b/tests/models/pipeline_interface/test_PipelineInterface.py @@ -22,9 +22,11 @@ MissingPipelineConfigurationException, PipelineInterfaceConfigError from peppy import Project, Sample from peppy.const import * -from .conftest import ATAC_PROTOCOL_NAME, write_config_data from ubiquerg import powerset +from .conftest import ATAC_PROTOCOL_NAME, write_config_data +from tests.helpers import remove_piface_requirements + __author__ = "Vince Reuter" __email__ = "vreuter@virginia.edu" @@ -109,7 +111,7 @@ def test_basic_construction(tmpdir, from_file, bundled_piface): assert pi.pipelines_path is None # Validate protocol mapping and interfaces contents. - assert PathExAttMap(bundled_piface[PL_KEY]) == pi[PL_KEY] + assert bundled_piface[PL_KEY] == remove_piface_requirements(pi[PL_KEY]) assert PathExAttMap(bundled_piface[PROTOMAP_KEY]) == pi[PROTOMAP_KEY] # Certain access modes should agree with one another. From 2b58a93e74e0cd9ca277dbbe09294601175c6969 Mon Sep 17 00:00:00 2001 From: Vince Date: Fri, 21 Jun 2019 23:48:54 -0400 Subject: [PATCH 04/34] initial test and stubs for reqs; docstrings --- looper/pipeline_interface.py | 14 ++++- tests/conftest.py | 6 +- tests/integration/test_project_get_outputs.py | 4 +- .../test_submission_bundles_with_pipe_reqs.py | 0 tests/models/pipeline_interface/conftest.py | 4 +- .../test_PipelineInterface_requirements.py | 58 +++++++++++++++++++ 6 files changed, 76 insertions(+), 10 deletions(-) create mode 100644 tests/integration/test_submission_bundles_with_pipe_reqs.py create mode 100644 tests/models/pipeline_interface/test_PipelineInterface_requirements.py diff --git a/looper/pipeline_interface.py b/looper/pipeline_interface.py index f0dede1d8..6132fa84f 100644 --- a/looper/pipeline_interface.py +++ b/looper/pipeline_interface.py @@ -87,7 +87,8 @@ def __init__(self, config): reqdat, errors = {}, {} for req, reqtype in universal_requirements.items(): try: - reqdat[req] = create_pipeline_requirement(req, typename=reqtype) + reqdat[req] = create_pipeline_requirement( + req, typename=reqtype) except ValueError: errors[req] = reqtype if errors: @@ -497,6 +498,13 @@ def iterpipes(self): return iter(self.pipelines.items()) def missing_requirements(self, pipeline): + """ + Determine which requirements--if any--declared by a pipeline are unmet. + + :param str pipeline: key for pipeline for which to determine unmet reqs + :return Mapping[str, str]: binding between requirement path/name and + requirement instance + """ pipe_data = self.select_pipeline(pipeline) return {k: v.req for k, v in pipe_data[PIPELINE_REQUIREMENTS_KEY].items() if not v.satisfied} @@ -581,10 +589,10 @@ def uses_looper_args(self, pipeline_name): def validate(self, pipeline): """ - + Determine whether any declared requirements are unmet. :param str pipeline: key for the pipeline to validate - :return Mapping[str, str]: binding between + :return bool: whether any declared requirements are unmet :raise MissingPipelineConfigurationException: if the requested pipeline is not defined in this interface """ diff --git a/tests/conftest.py b/tests/conftest.py index 4a5bc45ad..6f46c5046 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -19,7 +19,7 @@ import pytest import yaml -from looper.pipeline_interface import PipelineInterface +from looper.pipeline_interface import PipelineInterface, PROTOMAP_KEY from looper.project import Project from logmuse import init_logger from peppy import SAMPLE_NAME_COLNAME, \ @@ -67,7 +67,7 @@ SRC3_TEMPLATE = "data/{sample_name}.txt" -PIPELINE_INTERFACE_CONFIG_LINES = """protocol_mapping: +PIPELINE_INTERFACE_CONFIG_LINES = """{pm_key}: standard: testpipeline.sh ngs: testngs.sh pipelines: @@ -107,7 +107,7 @@ mem: "32000" time: "2-00:00:00" partition: "longq" -""".splitlines(True) +""".format(pm_key=PROTOMAP_KEY).splitlines(True) # Determined by "looper_args" in pipeline interface lines. LOOPER_ARGS_BY_PIPELINE = { diff --git a/tests/integration/test_project_get_outputs.py b/tests/integration/test_project_get_outputs.py index 439c70c00..d9b38ab16 100644 --- a/tests/integration/test_project_get_outputs.py +++ b/tests/integration/test_project_get_outputs.py @@ -657,7 +657,7 @@ def __init__(self, key, name=None): @pytest.fixture(scope="function") def rna_pi_lines(): - return """protocol_mapping: + return """{pm_key}: {rnaseq_proto_name}: [{bs_name}, {kall_name}, {th_name}] SMART: [{bs_name}, {th_name}] @@ -728,7 +728,7 @@ def rna_pi_lines(): mem: "8000" time: "0-12:00:00" """.format( - res=RESOURCES_KEY, dr=DEF_RES, rnaseq_proto_name=RNASEQ, + pm_key=PROTOMAP_KEY, res=RESOURCES_KEY, dr=DEF_RES, rnaseq_proto_name=RNASEQ, bs_key=RNA_PIPES["bitseq"].key, bs_name=RNA_PIPES["bitseq"].name, th_key=RNA_PIPES["tophat"].key, th_name=RNA_PIPES["tophat"].name, kall_key=RNA_PIPES["kallisto"].key, kall_name=RNA_PIPES["kallisto"].name, diff --git a/tests/integration/test_submission_bundles_with_pipe_reqs.py b/tests/integration/test_submission_bundles_with_pipe_reqs.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/models/pipeline_interface/conftest.py b/tests/models/pipeline_interface/conftest.py index e5ff9f229..9db6d7ba4 100644 --- a/tests/models/pipeline_interface/conftest.py +++ b/tests/models/pipeline_interface/conftest.py @@ -21,7 +21,7 @@ ATAC_PROTOCOL_NAME = "ATAC" - +ATAC_PIPE_NAME = "ATACseq" CONFIG_FILENAME = "test-proj-conf.yaml" ANNOTATIONS_FILENAME = "anns.csv" SAMPLE_NAME_1 = "test-sample-1" @@ -71,7 +71,7 @@ def pytest_generate_tests(metafunc): ATACSEQ_IFACE_WITHOUT_RESOURCES = { - "name": "ATACseq", + "name": ATAC_PIPE_NAME, "looper_args": True, "required_input_files": ["read1", "read2"], "all_input_files": ["read1", "read2"], diff --git a/tests/models/pipeline_interface/test_PipelineInterface_requirements.py b/tests/models/pipeline_interface/test_PipelineInterface_requirements.py new file mode 100644 index 000000000..84c388d2f --- /dev/null +++ b/tests/models/pipeline_interface/test_PipelineInterface_requirements.py @@ -0,0 +1,58 @@ +""" Tests for declaration of requirements in pipeline interface """ + +from attmap import PathExAttMap +from looper.pipeline_interface import \ + PL_KEY, PROTOMAP_KEY, PIPELINE_REQUIREMENTS_KEY +from looper import PipelineInterface +import pytest +from tests.models.pipeline_interface.conftest import \ + ATAC_PIPE_NAME, ATAC_PROTOCOL_NAME + +__author__ = "Vince Reuter" +__email__ = "vreuter@virginia.edu" + + +@pytest.mark.parametrize(["observe", "expected"], [ + (lambda pi, pk: pi.validate(pk), True), + (lambda pi, pk: pi.missing_requirements(pk), {}) +]) +def test_no_requirements_successfully_validates( + atac_pipe_name, atacseq_piface_data, observe, expected): + """ PipelineInterface declaring no requirements successfully validates. """ + + # Pretest--check that we're keying the data as expected. + assert [atac_pipe_name] == list(atacseq_piface_data.keys()) + assert ATAC_PIPE_NAME == atacseq_piface_data[atac_pipe_name]["name"] + + pi = PipelineInterface({ + PROTOMAP_KEY: {ATAC_PROTOCOL_NAME: atac_pipe_name}, + PL_KEY: {atac_pipe_name: atacseq_piface_data} + }) + + assert PathExAttMap({}) == pi[PIPELINE_REQUIREMENTS_KEY] + assert expected == observe(pi, atac_pipe_name) + + +@pytest.mark.skip("not implemented") +def test_illegal_requirements_specification(): + pass + + +@pytest.mark.skip("not implemented") +def test_validity_iff_missing_reqs_return_is_false(): + pass + + +@pytest.mark.skip("not implemented") +def test_all_requirements_satisfied(): + pass + + +@pytest.mark.skip("not implemented") +def test_mixed_requirement_satisfaction(): + pass + + +@pytest.mark.skip("not implemented") +def test_no_requirements_satisfied(): + pass From 91f1056b244df9ad1fabbb9d2a68ca4c4de136e7 Mon Sep 17 00:00:00 2001 From: Vince Date: Mon, 24 Jun 2019 18:47:49 -0400 Subject: [PATCH 05/34] more required executables testing; leverage is_command_callable --- looper/const.py | 5 +- looper/pipeline_interface.py | 120 ++++++++++++------ looper/pipereqs.py | 29 +++-- looper/project.py | 2 +- requirements/requirements-all.txt | 4 +- requirements/requirements-dev.txt | 1 + .../test_PipelineInterface_requirements.py | 65 +++++++++- 7 files changed, 166 insertions(+), 60 deletions(-) diff --git a/looper/const.py b/looper/const.py index 9cbb846b9..bfd1bbcf2 100644 --- a/looper/const.py +++ b/looper/const.py @@ -5,8 +5,8 @@ __all__ = ["APPEARANCE_BY_FLAG", "NO_DATA_PLACEHOLDER", "OUTKEY", - "PIPELINE_INTERFACES_KEY", "RESULTS_SUBDIR_KEY", - "SUBMISSION_SUBDIR_KEY", "TEMPLATES_DIRNAME"] + "PIPELINE_INTERFACES_KEY", "PIPELINE_REQUIREMENTS_KEY", + "RESULTS_SUBDIR_KEY", "SUBMISSION_SUBDIR_KEY", "TEMPLATES_DIRNAME"] APPEARANCE_BY_FLAG = { "completed": { @@ -32,6 +32,7 @@ } NO_DATA_PLACEHOLDER = "NA" PIPELINE_INTERFACES_KEY = "pipeline_interfaces" +PIPELINE_REQUIREMENTS_KEY = "required_executables" OUTKEY = "outputs" RESULTS_SUBDIR_KEY = "results_subdir" SUBMISSION_SUBDIR_KEY = "submission_subdir" diff --git a/looper/pipeline_interface.py b/looper/pipeline_interface.py index 6132fa84f..e469c97a3 100644 --- a/looper/pipeline_interface.py +++ b/looper/pipeline_interface.py @@ -1,6 +1,6 @@ """ Model the connection between a pipeline and a project or executor. """ -from collections import Iterable, Mapping +from collections import Iterable, Mapping, OrderedDict import inspect import logging import os @@ -9,18 +9,19 @@ import yaml from yaml import SafeLoader +from .const import PIPELINE_REQUIREMENTS_KEY from .exceptions import InvalidResourceSpecificationException, \ MissingPipelineConfigurationException, PipelineInterfaceConfigError, \ PipelineInterfaceRequirementsError -from .pipereqs import * +from .pipereqs import create_pipeline_requirement, RequiredExecutable from .sample import Sample from .utils import get_logger -from attmap import PathExAttMap +from attmap import PathExAttMap as PXAM from divvy import DEFAULT_COMPUTE_RESOURCES_NAME, NEW_COMPUTE_KEY as COMPUTE_KEY from divvy.const import OLD_COMPUTE_KEY from peppy import utils as peputil from peppy.sample import SAMPLE_YAML_FILE_KEY -from ubiquerg import expandpath +from ubiquerg import expandpath, is_command_callable _LOGGER = get_logger(__name__) @@ -28,13 +29,12 @@ PL_KEY = "pipelines" PROTOMAP_KEY = "protocol_mapping" -PIPELINE_REQUIREMENTS_KEY = "required_executables" RESOURCES_KEY = "resources" SUBTYPE_MAPPING_SECTION = "sample_subtypes" @peputil.copy -class PipelineInterface(PathExAttMap): +class PipelineInterface(PXAM): """ This class parses, holds, and returns information for a yaml file that specifies how to interact with each individual pipeline. This @@ -57,6 +57,7 @@ def __init__(self, config): _LOGGER.debug("Parsing '%s' for %s config data", config, self.__class__.__name__) self.pipe_iface_file = config + self.source = config try: with open(config, 'r') as f: config = yaml.load(f, SafeLoader) @@ -66,7 +67,6 @@ def __init__(self, config): "Failed to parse YAML from {}:\n{}". format(config, "".join(f.readlines()))) raise - self.source = config # Check presence of 2 main sections (protocol mapping and pipelines). missing = [s for s in self.REQUIRED_SECTIONS if s not in config] @@ -75,32 +75,19 @@ def __init__(self, config): # Format and add the protocol mappings and individual interfaces. config = expand_pl_paths(config) - config = standardize_protocols(config) - self.add_entries(config) + assert PROTOMAP_KEY in config, \ + "For protocol mapping standardization, pipeline interface data " \ + "must contain key '{}'".format(PROTOMAP_KEY) + + for k, v in config.items(): + assert k not in self, "Interface key already mapped: {}".format(k) + self[k] = v # Establish requirements for across this interface's pipelines, and # propagate those to individual pipelines. - universal_requirements = self.setdefault(PIPELINE_REQUIREMENTS_KEY, {}) - if isinstance(universal_requirements, str): - universal_requirements = [universal_requirements] - if isinstance(universal_requirements, Mapping): - reqdat, errors = {}, {} - for req, reqtype in universal_requirements.items(): - try: - reqdat[req] = create_pipeline_requirement( - req, typename=reqtype) - except ValueError: - errors[req] = reqtype - if errors: - raise PipelineInterfaceRequirementsError(errors) - elif isinstance(universal_requirements, Iterable): - reqdat = {r: RequiredExecutable(r) for r in universal_requirements} - else: - raise TypeError( - "Interface-wide pipeline requirements are non-iterable ({})". - format(type(universal_requirements).__name__)) + reqdat = self.setdefault(PIPELINE_REQUIREMENTS_KEY, PXAM()) for p, pipe_data in self[PL_KEY].items(): - pipe_data.setdefault(PIPELINE_REQUIREMENTS_KEY, {}).update(reqdat) + pipe_data.setdefault(PIPELINE_REQUIREMENTS_KEY, PXAM()).update(reqdat) def __repr__(self): """ String representation """ @@ -111,6 +98,36 @@ def __repr__(self): return "{} from {}, with {} pipeline(s): {}".format( self.__class__.__name__, source, num_pipelines, pipelines) + def __setitem__(self, key, value): + if key == PIPELINE_REQUIREMENTS_KEY: + super(PipelineInterface, self).__setitem__( + key, read_pipe_reqs(value), finalize=False) + elif key == PL_KEY: + # DEBUG + print("PIPELINES!") + print("VALUE: {}".format(value)) + assert isinstance(value, Mapping) or not value, \ + "If non-null, value for key '{}' in interface specification " \ + "must be a mapping; got {}".format(key, type(value).__name__) + m = PXAM() + for k, v in value.items(): + print("SUB KEY: {}".format(k)) + print("SUBVAL: {}".format(v)) + assert isinstance(v, Mapping), \ + "Value for pipeline {} is {}, not mapping".\ + format(k, type(v).__name__) + m_sub = PXAM() + for k_sub, v_sub in v.items(): + if k_sub == PIPELINE_REQUIREMENTS_KEY: + m_sub.__setitem__(k_sub, read_pipe_reqs(v_sub), finalize=False) + else: + m_sub.__setitem__(k_sub, v_sub, finalize=True) + m.__setitem__(k, m_sub, finalize=False) + print("NEW M: {}".format(m)) + super(PipelineInterface, self).__setitem__(key, m) + else: + super(PipelineInterface, self).__setitem__(key, value) + def choose_resource_package(self, pipeline_name, file_size): """ Select resource bundle for given input file size to given pipeline. @@ -149,7 +166,7 @@ def notify(msg): try: universal_compute = pl[OLD_COMPUTE_KEY] except KeyError: - universal_compute = PathExAttMap() + universal_compute = PXAM() else: warnings.warn( "To declare pipeline compute section, use {} rather " @@ -266,7 +283,7 @@ def finalize_pipeline_key_and_paths(self, pipeline_key): # TODO: determine how to deal with pipelines_path (i.e., could be null) if not os.path.isabs(script_path_only) and not \ - peputil.is_command_callable(script_path_only): + is_command_callable(script_path_only): _LOGGER.whisper("Expanding non-absolute script path: '%s'", script_path_only) script_path_only = os.path.join( @@ -620,19 +637,38 @@ def expand_pl_paths(piface): return piface -def standardize_protocols(piface): +def read_pipe_reqs(reqs_data): """ - Handle casing and punctuation of protocol keys in pipeline interface. - - :param MutableMapping piface: Pipeline interface data to standardize. - :return MutableMapping: Same as the input, but with protocol keys case and - punctuation handled in a more uniform way for matching later. + Read/parse a requirements section or subsection of a pipeline interface config. + + :param Mapping reqs_data: the data to parse; this should be a collection + of strings (names/paths of executables), or a mapping of requirements + declarations, keyed on name/path with each key mapping to a string + that indicates the kind of requirement (file, folder, executable). + If nothing's specified (list rather than dict) of requirements, or if + the value for a requirement is empty/null, the requirement is assumed + to be the declaration of an executable. + :return attmap.PathExAttMap[str, looper.pipereqs.PipelineRequirement]: a + binding between requirement name/path and validation instance """ - from copy import copy as cp - assert PROTOMAP_KEY in piface, "For protocol mapping standardization, " \ - "pipeline interface data must contain key '{}'".format(PROTOMAP_KEY) - piface[PROTOMAP_KEY] = cp(piface[PROTOMAP_KEY]) - return piface + if isinstance(reqs_data, str): + reqs_data = [reqs_data] + if isinstance(reqs_data, Mapping): + newval, errors = OrderedDict(), {} + for r, t in reqs_data.items(): + try: + newval[r] = create_pipeline_requirement(r, typename=r) + except ValueError: + errors[r] = t + if errors: + raise PipelineInterfaceRequirementsError(errors) + elif isinstance(reqs_data, Iterable): + newval = OrderedDict([(r, RequiredExecutable(r)) for r in reqs_data]) + else: + raise TypeError( + "Non-iterable pipeline requirements (key '{}'): {}". + format(PIPELINE_REQUIREMENTS_KEY, type(reqs_data).__name__)) + return PXAM(newval) def _import_sample_subtype(pipeline_filepath, subtype_name=None): diff --git a/looper/pipereqs.py b/looper/pipereqs.py index 858936609..da3664210 100644 --- a/looper/pipereqs.py +++ b/looper/pipereqs.py @@ -1,6 +1,7 @@ """ Pipeline requirements declaration """ import os +from ubiquerg import is_command_callable __author__ = "Vince Reuter" __email__ = "vreuter@virginia.edu" @@ -12,12 +13,25 @@ class PipelineRequirement(object): def __init__(self, req, check): + def _checkattr(trait_attr, trait_name): + if not hasattr(check, trait_attr): + raise TypeError("Validator isn't {} ({})". + format(trait_name, type(check).__name__)) self.req = req - if not hasattr(check, "__call__"): - raise TypeError("Validator isn't callable ({})". - format(type(check).__name__)) + _checkattr("__call__", "callable") + _checkattr("__hash__", "hashable") self.check = check + def __eq__(self, other): + return type(self) is type(other) and \ + self.req == other.req and self.check == other.check + + def __hash__(self): + return hash((self.req, self.check)) + + def __repr__(self): + return "{}: {}".format(type(self).__name__, self.req) + @property def satisfied(self): return self.check(self.req) @@ -39,15 +53,12 @@ def __init__(self, p, check=None, folder=None): class RequiredExecutable(PipelineRequirement): - def __init__(self, cmd, check=None, locs=None): - if check is None: - locs = locs or [os.getenv("PATH")] - check = lambda c: any(c in l for l in locs) - super(RequiredExecutable, self).__init__(cmd, check) - + def __init__(self, cmd, check=None): + super(RequiredExecutable, self).__init__(cmd, check or is_command_callable) def create_pipeline_requirement(req, typename, **kwargs): + typename = typename or "executable" if typename == "executable": return RequiredExecutable(req, **kwargs) if typename == "file": diff --git a/looper/project.py b/looper/project.py index 3b4eba379..3d1926232 100644 --- a/looper/project.py +++ b/looper/project.py @@ -7,7 +7,7 @@ import peppy from peppy import METADATA_KEY, OUTDIR_KEY -from peppy.utils import is_command_callable +from ubiquerg import is_command_callable from .const import * from .exceptions import DuplicatePipelineKeyException, \ PipelineInterfaceRequirementsError diff --git a/requirements/requirements-all.txt b/requirements/requirements-all.txt index 92fb862c4..011eb59f8 100644 --- a/requirements/requirements-all.txt +++ b/requirements/requirements-all.txt @@ -1,10 +1,10 @@ -attmap>=0.12.5 +attmap>=0.12.7 colorama>=0.3.9 logmuse>=0.2.0 pandas>=0.20.2 pyyaml>=3.12 divvy>=0.3.1 peppy>=0.22.2 -ubiquerg>=0.4.3 +ubiquerg>=0.4.4 ngstk>=0.0.1rc1 jinja2 diff --git a/requirements/requirements-dev.txt b/requirements/requirements-dev.txt index 72dcff1ac..eda47a59a 100644 --- a/requirements/requirements-dev.txt +++ b/requirements/requirements-dev.txt @@ -2,3 +2,4 @@ jinja2 mock>=2.0.0 pytest>=3.0.7 ubiquerg>=0.0.3 +veracitools diff --git a/tests/models/pipeline_interface/test_PipelineInterface_requirements.py b/tests/models/pipeline_interface/test_PipelineInterface_requirements.py index 84c388d2f..9abd11de5 100644 --- a/tests/models/pipeline_interface/test_PipelineInterface_requirements.py +++ b/tests/models/pipeline_interface/test_PipelineInterface_requirements.py @@ -1,17 +1,25 @@ """ Tests for declaration of requirements in pipeline interface """ from attmap import PathExAttMap +from looper import PipelineInterface +from looper.exceptions import PipelineInterfaceRequirementsError from looper.pipeline_interface import \ PL_KEY, PROTOMAP_KEY, PIPELINE_REQUIREMENTS_KEY -from looper import PipelineInterface import pytest +import yaml from tests.models.pipeline_interface.conftest import \ ATAC_PIPE_NAME, ATAC_PROTOCOL_NAME +from veracitools import ExpectContext __author__ = "Vince Reuter" __email__ = "vreuter@virginia.edu" +def randn(): + import random, sys + return random.randint(-sys.maxsize, sys.maxsize) + + @pytest.mark.parametrize(["observe", "expected"], [ (lambda pi, pk: pi.validate(pk), True), (lambda pi, pk: pi.missing_requirements(pk), {}) @@ -33,9 +41,58 @@ def test_no_requirements_successfully_validates( assert expected == observe(pi, atac_pipe_name) -@pytest.mark.skip("not implemented") -def test_illegal_requirements_specification(): - pass +class IllegalPipelineRequirementsSpecificationTests: + + @pytest.mark.parametrize(["reqs_data", "expected"], [ + (randn(), TypeError), + ({"ls": "not-a-valid-check-type"}, PipelineInterfaceRequirementsError)]) + @pytest.mark.parametrize("from_file", [False, True]) + def test_bad_reqs_top_level(self, reqs_data, expected, atac_pipe_name, + atacseq_piface_data, from_file, tmpdir): + assert PIPELINE_REQUIREMENTS_KEY not in atacseq_piface_data[atac_pipe_name] + pi_data = {PROTOMAP_KEY: {ATAC_PROTOCOL_NAME: atac_pipe_name}, + PL_KEY: atacseq_piface_data, + PIPELINE_REQUIREMENTS_KEY: reqs_data} + if from_file: + src = tmpdir.join("pi.yaml").strpath + with open(src, 'w') as f: + yaml.dump(pi_data, f) + else: + src = pi_data + with ExpectContext(expected, PipelineInterface) as build_iface: + build_iface(src) + + @pytest.mark.parametrize(["reqs_data", "expected"], [ + (randn(), TypeError), + ({"ls": "not-a-valid-check-type"}, PipelineInterfaceRequirementsError)]) + @pytest.mark.parametrize("from_file", [False, True]) + def test_bad_reqs_specific_pipeline(self, reqs_data, expected, atac_pipe_name, + atacseq_piface_data, from_file, tmpdir): + assert PIPELINE_REQUIREMENTS_KEY not in atacseq_piface_data[atac_pipe_name] + atacseq_piface_data[atac_pipe_name][PIPELINE_REQUIREMENTS_KEY] = reqs_data + assert atacseq_piface_data[atac_pipe_name][PIPELINE_REQUIREMENTS_KEY] == reqs_data + pi_data = {PROTOMAP_KEY: {ATAC_PROTOCOL_NAME: atac_pipe_name}, + PL_KEY: atacseq_piface_data} + print("DATA: {}".format(pi_data)) + if from_file: + src = tmpdir.join("pi.yaml").strpath + with open(src, 'w') as f: + yaml.dump(pi_data, f) + else: + src = pi_data + # DEBUG + #pi = PipelineInterface(src) + #print("PI: {}".format(pi[PL_KEY])) + with ExpectContext(expected, PipelineInterface) as build_iface: + print(build_iface(src)) + + @pytest.mark.skip("not implemented") + def test_bad_reqs_post_construction(self): + pass + + @pytest.mark.skip("not implemented") + def test_bad_reqs_specific_pipeline_post_construction(self): + pass @pytest.mark.skip("not implemented") From 27f91257da0be5314b8aa9ba4f6e9a36150aa7df Mon Sep 17 00:00:00 2001 From: Vince Date: Mon, 24 Jun 2019 18:49:47 -0400 Subject: [PATCH 06/34] remove some debug stuff --- looper/pipeline_interface.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/looper/pipeline_interface.py b/looper/pipeline_interface.py index e469c97a3..64388c201 100644 --- a/looper/pipeline_interface.py +++ b/looper/pipeline_interface.py @@ -103,16 +103,11 @@ def __setitem__(self, key, value): super(PipelineInterface, self).__setitem__( key, read_pipe_reqs(value), finalize=False) elif key == PL_KEY: - # DEBUG - print("PIPELINES!") - print("VALUE: {}".format(value)) assert isinstance(value, Mapping) or not value, \ "If non-null, value for key '{}' in interface specification " \ "must be a mapping; got {}".format(key, type(value).__name__) m = PXAM() for k, v in value.items(): - print("SUB KEY: {}".format(k)) - print("SUBVAL: {}".format(v)) assert isinstance(v, Mapping), \ "Value for pipeline {} is {}, not mapping".\ format(k, type(v).__name__) @@ -123,7 +118,6 @@ def __setitem__(self, key, value): else: m_sub.__setitem__(k_sub, v_sub, finalize=True) m.__setitem__(k, m_sub, finalize=False) - print("NEW M: {}".format(m)) super(PipelineInterface, self).__setitem__(key, m) else: super(PipelineInterface, self).__setitem__(key, value) From 4e760be6b88dee955b13be84fbca3c62f65c4fca Mon Sep 17 00:00:00 2001 From: Vince Date: Wed, 26 Jun 2019 04:30:42 -0400 Subject: [PATCH 07/34] fix missing reqs format and check; more testing --- looper/pipeline_interface.py | 21 +-- .../test_PipelineInterface_requirements.py | 167 +++++++++++++++--- 2 files changed, 152 insertions(+), 36 deletions(-) diff --git a/looper/pipeline_interface.py b/looper/pipeline_interface.py index 64388c201..aee804bdb 100644 --- a/looper/pipeline_interface.py +++ b/looper/pipeline_interface.py @@ -83,12 +83,6 @@ def __init__(self, config): assert k not in self, "Interface key already mapped: {}".format(k) self[k] = v - # Establish requirements for across this interface's pipelines, and - # propagate those to individual pipelines. - reqdat = self.setdefault(PIPELINE_REQUIREMENTS_KEY, PXAM()) - for p, pipe_data in self[PL_KEY].items(): - pipe_data.setdefault(PIPELINE_REQUIREMENTS_KEY, PXAM()).update(reqdat) - def __repr__(self): """ String representation """ source = self.pipe_iface_file or "Mapping" @@ -513,12 +507,12 @@ def missing_requirements(self, pipeline): Determine which requirements--if any--declared by a pipeline are unmet. :param str pipeline: key for pipeline for which to determine unmet reqs - :return Mapping[str, str]: binding between requirement path/name and - requirement instance + :return Iterable[looper.PipelineRequirement]: unmet requirements """ - pipe_data = self.select_pipeline(pipeline) - return {k: v.req for k, v in pipe_data[PIPELINE_REQUIREMENTS_KEY].items() - if not v.satisfied} + reqs_data = {name: req for name, req in + self.get(PIPELINE_REQUIREMENTS_KEY, {}).items()} + reqs_data.update(self.select_pipeline(pipeline).get(PIPELINE_REQUIREMENTS_KEY, {})) + return [v.req for v in reqs_data.values() if not v.satisfied] @property def pipeline_names(self): @@ -645,13 +639,16 @@ def read_pipe_reqs(reqs_data): :return attmap.PathExAttMap[str, looper.pipereqs.PipelineRequirement]: a binding between requirement name/path and validation instance """ + reqs_data = reqs_data or {} if isinstance(reqs_data, str): reqs_data = [reqs_data] if isinstance(reqs_data, Mapping): newval, errors = OrderedDict(), {} for r, t in reqs_data.items(): + # DEBUG + print("TYPE: {}".format(t)) try: - newval[r] = create_pipeline_requirement(r, typename=r) + newval[r] = create_pipeline_requirement(r, typename=t) except ValueError: errors[r] = t if errors: diff --git a/tests/models/pipeline_interface/test_PipelineInterface_requirements.py b/tests/models/pipeline_interface/test_PipelineInterface_requirements.py index 9abd11de5..3c0adfbe0 100644 --- a/tests/models/pipeline_interface/test_PipelineInterface_requirements.py +++ b/tests/models/pipeline_interface/test_PipelineInterface_requirements.py @@ -1,5 +1,6 @@ """ Tests for declaration of requirements in pipeline interface """ +import os from attmap import PathExAttMap from looper import PipelineInterface from looper.exceptions import PipelineInterfaceRequirementsError @@ -15,38 +16,99 @@ __email__ = "vreuter@virginia.edu" +TOP_LEVEL_KEY = "top-level" +PIPE_LEVEL_KEY = "pipeline" + + +def pytest_generate_tests(metafunc): + """ Dynamic test case generation/parameterization local to this module. """ + if "from_file" in metafunc.fixturenames: + metafunc.parametrize("from_file", [False, True]) + + def randn(): + """ Singly random integer from a huge interval """ import random, sys return random.randint(-sys.maxsize, sys.maxsize) +def _make_from_data(from_file, folder, data): + """ + Homogenize PipelineInterface build over both in-memory and on-disk data. + + :param bool from_file: whether to route the construction through disk + :param str folder: folder in which to create config file if via disk + :param Mapping data: raw PI config data + :return looper.PipelineInterface: the new PipelineInterface instance + """ + assert type(from_file) is bool + if from_file: + fp = os.path.join(folder, "pipeline_interface.yaml") + with open(fp, 'w') as f: + yaml.dump(data, f) + data = fp + return PipelineInterface(data) + + @pytest.mark.parametrize(["observe", "expected"], [ (lambda pi, pk: pi.validate(pk), True), (lambda pi, pk: pi.missing_requirements(pk), {}) ]) def test_no_requirements_successfully_validates( - atac_pipe_name, atacseq_piface_data, observe, expected): + observe, expected, from_file, atac_pipe_name, atacseq_piface_data, tmpdir): """ PipelineInterface declaring no requirements successfully validates. """ # Pretest--check that we're keying the data as expected. assert [atac_pipe_name] == list(atacseq_piface_data.keys()) assert ATAC_PIPE_NAME == atacseq_piface_data[atac_pipe_name]["name"] - pi = PipelineInterface({ + pi = _make_from_data(from_file, tmpdir.strpath, { PROTOMAP_KEY: {ATAC_PROTOCOL_NAME: atac_pipe_name}, PL_KEY: {atac_pipe_name: atacseq_piface_data} }) - assert PathExAttMap({}) == pi[PIPELINE_REQUIREMENTS_KEY] + with pytest.raises(KeyError): + pi[PIPELINE_REQUIREMENTS_KEY] + assert expected == observe(pi, atac_pipe_name) + + +@pytest.mark.parametrize(["observe", "expected"], [ + (lambda pi, pk: pi.validate(pk), True), + (lambda pi, pk: pi.missing_requirements(pk), {}) +]) +@pytest.mark.parametrize("reqs_data", [None, {}]) +@pytest.mark.parametrize("placement", ["top-level", "pipeline"]) +def test_empty_requirements_successfully_validates( + observe, expected, from_file, tmpdir, reqs_data, + atac_pipe_name, atacseq_piface_data, placement): + """ Null value or empty mapping for requirements still validates. """ + + # Pretest--check that we're keying the data as expected. + assert [atac_pipe_name] == list(atacseq_piface_data.keys()) + assert ATAC_PIPE_NAME == atacseq_piface_data[atac_pipe_name]["name"] + + data = { + PROTOMAP_KEY: {ATAC_PROTOCOL_NAME: atac_pipe_name}, + PL_KEY: {atac_pipe_name: atacseq_piface_data} + } + + if placement == "top-level": + data[PIPELINE_REQUIREMENTS_KEY] = reqs_data + elif placement == "pipeline": + data[PL_KEY][atac_pipe_name][PIPELINE_REQUIREMENTS_KEY] = reqs_data + else: + raise ValueError("Unexpected reqs placement spec: {}".format(placement)) + + pi = _make_from_data(from_file, tmpdir.strpath, data) assert expected == observe(pi, atac_pipe_name) class IllegalPipelineRequirementsSpecificationTests: + """ Test expected behavior of various invalid reqs specs. """ @pytest.mark.parametrize(["reqs_data", "expected"], [ (randn(), TypeError), ({"ls": "not-a-valid-check-type"}, PipelineInterfaceRequirementsError)]) - @pytest.mark.parametrize("from_file", [False, True]) def test_bad_reqs_top_level(self, reqs_data, expected, atac_pipe_name, atacseq_piface_data, from_file, tmpdir): assert PIPELINE_REQUIREMENTS_KEY not in atacseq_piface_data[atac_pipe_name] @@ -65,9 +127,9 @@ def test_bad_reqs_top_level(self, reqs_data, expected, atac_pipe_name, @pytest.mark.parametrize(["reqs_data", "expected"], [ (randn(), TypeError), ({"ls": "not-a-valid-check-type"}, PipelineInterfaceRequirementsError)]) - @pytest.mark.parametrize("from_file", [False, True]) def test_bad_reqs_specific_pipeline(self, reqs_data, expected, atac_pipe_name, atacseq_piface_data, from_file, tmpdir): + """ Invalid requirements within a specific pipeline section is exceptional. """ assert PIPELINE_REQUIREMENTS_KEY not in atacseq_piface_data[atac_pipe_name] atacseq_piface_data[atac_pipe_name][PIPELINE_REQUIREMENTS_KEY] = reqs_data assert atacseq_piface_data[atac_pipe_name][PIPELINE_REQUIREMENTS_KEY] == reqs_data @@ -86,30 +148,87 @@ def test_bad_reqs_specific_pipeline(self, reqs_data, expected, atac_pipe_name, with ExpectContext(expected, PipelineInterface) as build_iface: print(build_iface(src)) - @pytest.mark.skip("not implemented") - def test_bad_reqs_post_construction(self): - pass - - @pytest.mark.skip("not implemented") - def test_bad_reqs_specific_pipeline_post_construction(self): - pass - - -@pytest.mark.skip("not implemented") -def test_validity_iff_missing_reqs_return_is_false(): - pass + @pytest.mark.parametrize("init_reqs_data", [None, {}]) + @pytest.mark.parametrize(["new_bad_reqs_data", "expected"], [ + (randn(), TypeError), + ({"ls": "not-a-valid-check-type"}, PipelineInterfaceRequirementsError) + ]) + @pytest.mark.parametrize(["init_place", "pretest"], [ + ("top-level", lambda pi, _, init: PIPELINE_REQUIREMENTS_KEY not in pi + if init is None else pi[PIPELINE_REQUIREMENTS_KEY] == PathExAttMap()), + ("pipeline", lambda pi, pipe_name, init: PIPELINE_REQUIREMENTS_KEY + not in pi[PL_KEY][pipe_name] + if init is None else pi[PL_KEY][pipe_name][PIPELINE_REQUIREMENTS_KEY] == PathExAttMap())]) + @pytest.mark.parametrize("post_place_loc", ["top-level", "pipeline"]) + @pytest.mark.parametrize("post_place_fun", [ + lambda obj, data: setattr(obj, PIPELINE_REQUIREMENTS_KEY, data), + lambda obj, data: obj.__setitem__(PIPELINE_REQUIREMENTS_KEY, data) + ]) + def test_bad_reqs_post_construction( + self, init_place, pretest, from_file, tmpdir, atac_pipe_name, + atacseq_piface_data, init_reqs_data, new_bad_reqs_data, + post_place_loc, post_place_fun, expected): + """ Modification of requirements in invalid way is exceptional. """ + + data = { + PROTOMAP_KEY: {ATAC_PROTOCOL_NAME: atac_pipe_name}, + PL_KEY: {atac_pipe_name: atacseq_piface_data} + } + + if init_place == "top-level": + data[PIPELINE_REQUIREMENTS_KEY] = init_reqs_data + elif init_place == "pipeline": + data[PL_KEY][atac_pipe_name][PIPELINE_REQUIREMENTS_KEY] = init_reqs_data + else: + raise ValueError("Unexpected reqs placement spec: {}".format(init_place)) + pi = _make_from_data(from_file, tmpdir.strpath, data) + pretest(pi, atac_pipe_name, init_reqs_data) -@pytest.mark.skip("not implemented") -def test_all_requirements_satisfied(): - pass + if post_place_loc == "top-level": + with ExpectContext(expected, post_place_fun) as try_bad_reqs_placement: + try_bad_reqs_placement(pi, new_bad_reqs_data) + elif post_place_loc == "pipeline": + with ExpectContext(expected, post_place_fun) as try_bad_reqs_placement: + try_bad_reqs_placement(pi, new_bad_reqs_data) + else: + raise ValueError("Unexpected reqs placement spec: {}".format(post_place_loc)) -@pytest.mark.skip("not implemented") -def test_mixed_requirement_satisfaction(): - pass +@pytest.mark.parametrize( + "reqs", [{}, ["ls", "date"], {"ls": "executable", "date": "executable"}]) +def test_top_level_requirements_do_not_literally_propagate( + reqs, from_file, tmpdir, atac_pipe_name, atacseq_piface_data): + """ Don't literally store universal requirements in each pipeline. """ + data = { + PROTOMAP_KEY: {ATAC_PROTOCOL_NAME: atac_pipe_name}, + PL_KEY: {atac_pipe_name: atacseq_piface_data}, + PIPELINE_REQUIREMENTS_KEY: reqs + } + pi = _make_from_data(from_file, tmpdir.strpath, data) + assert reqs == pi[PIPELINE_REQUIREMENTS_KEY] + assert all(map(lambda d: PIPELINE_REQUIREMENTS_KEY not in d, pi[PL_KEY].values())) + + +@pytest.mark.parametrize(["reqs", "expected"], [ + ("nonexec", ["nonexec"]), (["not-on-path", "ls"], ["not-on-path"]), + ({"nonexec": "executable", "$HOME": "folder"}, ["nonexec"])]) +def test_top_level_requirements_functionally_propagate( + reqs, from_file, tmpdir, atac_pipe_name, atacseq_piface_data, expected): + """ The universal requirements do functionally apply to each pipeline. """ + data = { + PROTOMAP_KEY: {ATAC_PROTOCOL_NAME: atac_pipe_name}, + PL_KEY: {atac_pipe_name: atacseq_piface_data}, + PIPELINE_REQUIREMENTS_KEY: reqs + } + pi = _make_from_data(from_file, tmpdir.strpath, data) + assert set(reqs.keys()) == set(pi[PIPELINE_REQUIREMENTS_KEY].keys()) + assert PIPELINE_REQUIREMENTS_KEY not in pi[PL_KEY][atac_pipe_name] + assert expected == pi.missing_requirements(atac_pipe_name) + assert not pi.validate(atac_pipe_name) @pytest.mark.skip("not implemented") -def test_no_requirements_satisfied(): +def test_pipeline_specific_requirements_remain_local(): + """ A single pipeline's requirements don't pollute others'. """ pass From 1ad94444b2cfa451110dd583bb2633a51bba64d6 Mon Sep 17 00:00:00 2001 From: Vince Date: Wed, 26 Jun 2019 15:53:54 -0400 Subject: [PATCH 08/34] handle path expansion; remove debug --- looper/pipeline_interface.py | 2 -- looper/pipereqs.py | 7 +++++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/looper/pipeline_interface.py b/looper/pipeline_interface.py index aee804bdb..415053710 100644 --- a/looper/pipeline_interface.py +++ b/looper/pipeline_interface.py @@ -645,8 +645,6 @@ def read_pipe_reqs(reqs_data): if isinstance(reqs_data, Mapping): newval, errors = OrderedDict(), {} for r, t in reqs_data.items(): - # DEBUG - print("TYPE: {}".format(t)) try: newval[r] = create_pipeline_requirement(r, typename=t) except ValueError: diff --git a/looper/pipereqs.py b/looper/pipereqs.py index da3664210..29badd4cb 100644 --- a/looper/pipereqs.py +++ b/looper/pipereqs.py @@ -1,7 +1,7 @@ """ Pipeline requirements declaration """ import os -from ubiquerg import is_command_callable +from ubiquerg import expandpath, is_command_callable __author__ = "Vince Reuter" __email__ = "vreuter@virginia.edu" @@ -32,9 +32,12 @@ def __hash__(self): def __repr__(self): return "{}: {}".format(type(self).__name__, self.req) + def _finalize_for_check(self): + return expandpath(self.req) + @property def satisfied(self): - return self.check(self.req) + return self.check(self._finalize_for_check()) class RequiredPath(PipelineRequirement): From 09a9e0780fdb9432873bb5a935af72cdc49d1c63 Mon Sep 17 00:00:00 2001 From: Vince Date: Wed, 26 Jun 2019 15:54:16 -0400 Subject: [PATCH 09/34] finish reqs tests --- .../test_PipelineInterface_requirements.py | 69 +++++++++++++++++-- 1 file changed, 62 insertions(+), 7 deletions(-) diff --git a/tests/models/pipeline_interface/test_PipelineInterface_requirements.py b/tests/models/pipeline_interface/test_PipelineInterface_requirements.py index 3c0adfbe0..82afcf10b 100644 --- a/tests/models/pipeline_interface/test_PipelineInterface_requirements.py +++ b/tests/models/pipeline_interface/test_PipelineInterface_requirements.py @@ -52,7 +52,7 @@ def _make_from_data(from_file, folder, data): @pytest.mark.parametrize(["observe", "expected"], [ (lambda pi, pk: pi.validate(pk), True), - (lambda pi, pk: pi.missing_requirements(pk), {}) + (lambda pi, pk: pi.missing_requirements(pk), []) ]) def test_no_requirements_successfully_validates( observe, expected, from_file, atac_pipe_name, atacseq_piface_data, tmpdir): @@ -74,7 +74,7 @@ def test_no_requirements_successfully_validates( @pytest.mark.parametrize(["observe", "expected"], [ (lambda pi, pk: pi.validate(pk), True), - (lambda pi, pk: pi.missing_requirements(pk), {}) + (lambda pi, pk: pi.missing_requirements(pk), []) ]) @pytest.mark.parametrize("reqs_data", [None, {}]) @pytest.mark.parametrize("placement", ["top-level", "pipeline"]) @@ -206,7 +206,7 @@ def test_top_level_requirements_do_not_literally_propagate( PIPELINE_REQUIREMENTS_KEY: reqs } pi = _make_from_data(from_file, tmpdir.strpath, data) - assert reqs == pi[PIPELINE_REQUIREMENTS_KEY] + assert_reqs_eq(reqs, pi[PIPELINE_REQUIREMENTS_KEY]) assert all(map(lambda d: PIPELINE_REQUIREMENTS_KEY not in d, pi[PL_KEY].values())) @@ -222,13 +222,68 @@ def test_top_level_requirements_functionally_propagate( PIPELINE_REQUIREMENTS_KEY: reqs } pi = _make_from_data(from_file, tmpdir.strpath, data) - assert set(reqs.keys()) == set(pi[PIPELINE_REQUIREMENTS_KEY].keys()) + print(pi[PIPELINE_REQUIREMENTS_KEY]) + assert_reqs_eq(reqs, pi[PIPELINE_REQUIREMENTS_KEY]) assert PIPELINE_REQUIREMENTS_KEY not in pi[PL_KEY][atac_pipe_name] assert expected == pi.missing_requirements(atac_pipe_name) assert not pi.validate(atac_pipe_name) -@pytest.mark.skip("not implemented") -def test_pipeline_specific_requirements_remain_local(): +@pytest.mark.parametrize( + ["atac_reqs", "other_reqs", "check_atac", "check_other"], + [(["ls"], ["badexec"], + lambda pi, pk: pi.validate(pk), lambda pi, pk: not pi.validate(pk)), + (["ls"], ["badexec"], + lambda pi, pk: [] == pi.missing_requirements(pk), + lambda pi, pk: ["badexec"] == pi.missing_requirements(pk)), + ({"ls": "folder"}, {"ls": "executable"}, + lambda pi, pk: not pi.validate(pk), lambda pi, pk: pi.validate(pk)), + ({"ls": "folder"}, {"ls": "executable"}, + lambda pi, pk: ["ls"] == pi.missing_requirements(pk), + lambda pi, pk: [] == pi.missing_requirements(pk)), + (None, {"ls": "file"}, + lambda pi, pk: pi.validate(pk), + lambda pi, pk: ["ls"] == pi.missing_requirements(pk))] +) +def test_pipeline_specific_requirements_remain_local( + atac_pipe_name, atacseq_piface_data, tmpdir, from_file, + atac_reqs, other_reqs, check_atac, check_other): """ A single pipeline's requirements don't pollute others'. """ - pass + other_name = "testpipe.sh" + data = { + PROTOMAP_KEY: {ATAC_PROTOCOL_NAME: atac_pipe_name}, + PL_KEY: { + atac_pipe_name: atacseq_piface_data, + other_name: { + k: "testpipe" if k == "name" else v + for k, v in atacseq_piface_data.items() + } + } + } + if atac_reqs is not None: + data[PL_KEY][atac_pipe_name][PIPELINE_REQUIREMENTS_KEY] = atac_reqs + if other_reqs is not None: + data[PL_KEY][other_name][PIPELINE_REQUIREMENTS_KEY] = other_reqs + def assert_reqs_other(iface): + assert PIPELINE_REQUIREMENTS_KEY in iface[PL_KEY][other_name] + else: + def assert_reqs_other(iface): + assert PIPELINE_REQUIREMENTS_KEY not in iface[PL_KEY][other_name] + pi = _make_from_data(from_file, tmpdir.strpath, data) + assert PIPELINE_REQUIREMENTS_KEY not in pi + assert_reqs_other(pi) + check_atac(pi, atac_pipe_name) + check_other(pi, other_name) + + +def assert_reqs_eq(exp, obs): + from collections import Iterable, Sized + if isinstance(obs, PathExAttMap): + obs = set(obs) + if isinstance(exp, str): + exp = {exp} + if isinstance(exp, Iterable) and isinstance(exp, Sized): + assert len(exp) == len(obs) and set(exp) == set(obs) + else: + raise TypeError("Need sized iterables to compare; got {} and {}". + format(type(exp).__name__, type(obs).__name__)) From bd76609b9d1323cde0d1f7f3a0428f9dc645e5a3 Mon Sep 17 00:00:00 2001 From: Vince Date: Thu, 27 Jun 2019 00:27:12 -0400 Subject: [PATCH 10/34] integration testing project and pipe interface --- looper/pipeline_interface.py | 6 +- looper/pipereqs.py | 82 ++++++++++-- looper/project_piface_group.py | 4 +- tests/data/methyl_piface.yaml | 82 ++++++++++++ tests/data/src/rrbs.py | 0 tests/data/src/wgbs.py | 0 tests/helpers.py | 46 +++++++ ..._project_pipeline_interface_interaction.py | 119 ++++++++++++++++++ .../test_PipelineInterface_requirements.py | 43 ++----- 9 files changed, 338 insertions(+), 44 deletions(-) create mode 100644 tests/data/methyl_piface.yaml create mode 100644 tests/data/src/rrbs.py create mode 100644 tests/data/src/wgbs.py create mode 100644 tests/integration/test_project_pipeline_interface_interaction.py diff --git a/looper/pipeline_interface.py b/looper/pipeline_interface.py index 415053710..26b7d8d72 100644 --- a/looper/pipeline_interface.py +++ b/looper/pipeline_interface.py @@ -80,7 +80,10 @@ def __init__(self, config): "must contain key '{}'".format(PROTOMAP_KEY) for k, v in config.items(): - assert k not in self, "Interface key already mapped: {}".format(k) + if k in ["pipe_iface_file", "source"]: + continue + assert k not in self, \ + "Interface key already mapped: {} ({})".format(k, self[k]) self[k] = v def __repr__(self): @@ -254,6 +257,7 @@ def finalize_pipeline_key_and_paths(self, pipeline_key): full_pipe_path = \ self.get_attribute(strict_pipeline_key, "path") + if full_pipe_path: script_path_only = os.path.expanduser( os.path.expandvars(full_pipe_path[0].strip())) diff --git a/looper/pipereqs.py b/looper/pipereqs.py index 29badd4cb..8fc1310ef 100644 --- a/looper/pipereqs.py +++ b/looper/pipereqs.py @@ -10,9 +10,21 @@ "RequiredExecutable", "RequiredPath"] +KEY_EXEC_REQ = "executable" +KEY_FILE_REQ = "file" +KEY_FOLDER_REQ = "folder" + + class PipelineRequirement(object): + """ Requirement that must be satisfied for a pipeline to run. """ def __init__(self, req, check): + """ + Create the requirement by specifying name/path and validation function. + + :param str req: the requirement to eventually verify + :param function(str) check: how to perform the verification + """ def _checkattr(trait_attr, trait_name): if not hasattr(check, trait_attr): raise TypeError("Validator isn't {} ({})". @@ -23,50 +35,96 @@ def _checkattr(trait_attr, trait_name): self.check = check def __eq__(self, other): + """ Equality treats each instance as a product type. """ return type(self) is type(other) and \ - self.req == other.req and self.check == other.check + self.req == other.req and self.check == other.check def __hash__(self): + """ Hash as for product type. """ return hash((self.req, self.check)) def __repr__(self): + """ Print type and requirement value> """ return "{}: {}".format(type(self).__name__, self.req) def _finalize_for_check(self): + """ Expand any user or env vars in requirement. """ return expandpath(self.req) @property def satisfied(self): + """ + Determine whether the requirement is satisfied acc. to the validation. + + :return bool: whether the requirement is satisfied acc. to the validation + """ return self.check(self._finalize_for_check()) class RequiredPath(PipelineRequirement): + """ A single file or folder requirement """ def __init__(self, p, check=None, folder=None): + """ + Create the path requirement by specifying the path and how to verify. + + :param str p: the path on which to base the requirement + :param function(str) -> bool check: how to verify the requirement; + required if and only if no folder flag is given + :param bool folder: whether the path is a folder (not file); + required if and only if no validation function is provided + :raise ValueError: if no validation strategy is specified, and no + argument to folder parameter is given + :raise TypeError: if no validation strategy is specified, and the + argument to the folder parameter is not a Boolean + """ + if (check is not None and folder is not None) or \ + (check is None and folder is None): + raise ValueError( + "Either validation function or folder flag--but not both--must " + "be provided") if check is None: - if folder in [False, True]: - check = os.path.isdir if folder else os.path.isfile - else: - raise ValueError( - "If no validation function is provided, folder argument " - "must be boolean; got {} ({})".format( - folder, type(folder).__name__)) + if type(folder) is not bool: + raise TypeError("Folder flag must be boolean; got {}". + format(type(folder).__name__)) + check = os.path.isdir if folder else os.path.isfile super(RequiredPath, self).__init__(p, check) class RequiredExecutable(PipelineRequirement): + """ A requirement that should be executable as a command """ def __init__(self, cmd, check=None): + """ + Create the requirement by specifying the command and validation. + + :param str cmd: the command requirement to validate as executable + :param function(str) -> bool check: how to verify that the command + requirement is in fact satisfied by executability; defaults to + the callability function in ubiquerg + """ super(RequiredExecutable, self).__init__(cmd, check or is_command_callable) def create_pipeline_requirement(req, typename, **kwargs): - typename = typename or "executable" - if typename == "executable": + """ + Create a single requirement instance for a pipeline + + :param str req: name/path that specifices the requirement, e.g. samtools + :param str typename: keyword indicating the kind of requirement to be + created + :param dict kwargs: variable keyword arguments to the RequiredExecutable + constructor + :return looper.pipereqs.PipelineRequirement: requirement as named, and + typed according to the keyword provided + :raise ValueError: if the given typename is unrecognized, raise ValueError. + """ + typename = typename or KEY_EXEC_REQ + if typename == KEY_EXEC_REQ: return RequiredExecutable(req, **kwargs) - if typename == "file": + if typename == KEY_FILE_REQ: return RequiredPath(req, folder=False) - elif typename == "folder": + elif typename == KEY_FOLDER_REQ: return RequiredPath(req, folder=True) else: raise ValueError("Invalid requirement typename: '{}'".format(typename)) diff --git a/looper/project_piface_group.py b/looper/project_piface_group.py index cbd06c95b..f2792c059 100644 --- a/looper/project_piface_group.py +++ b/looper/project_piface_group.py @@ -94,7 +94,9 @@ def update(self, piface): PipelineInterface from the argument if the argument itself is not already a PipelineInterface. """ - if isinstance(piface, (str, Mapping)): + if isinstance(piface, PipelineInterface): + _LOGGER.debug("Interface group argument is already an interface.") + elif isinstance(piface, (str, Mapping)): piface = PipelineInterface(piface) elif not isinstance(piface, PipelineInterface): raise TypeError( diff --git a/tests/data/methyl_piface.yaml b/tests/data/methyl_piface.yaml new file mode 100644 index 000000000..af8623e7f --- /dev/null +++ b/tests/data/methyl_piface.yaml @@ -0,0 +1,82 @@ +protocol_mapping: + RRBS: rrbs + WGBS: wgbs + BS: rrbs;wgbs + EG: wgbs + +pipelines: + wgbs: + name: WGBS # Name used by pypiper so looper can find the logs + path: src/wgbs.py + looper_args: True + required_input_files: [data_source] + ngs_input_files: [data_source] + arguments: + "--sample-name": sample_name + "--genome": genome + "--input": data_source + "--single-or-paired": read_type + resources: + default: + file_size: "0" + cores: "4" + mem: "4000" + time: "0-02:00:00" + small: + file_size: "0.3" + cores: "8" + mem: "32000" + time: "2-00:00:00" + medium: + file_size: "3" + cores: "16" + mem: "64000" + time: "4-00:00:00" + high: + file_size: "10" + cores: "24" + mem: "64000" + time: "7-00:00:00" + huge: + file_size: "30" + cores: "16" + mem: "128000" + time: "4-00:00:00" + partition: "largemem" + + rrbs: + name: RRBS + path: src/rrbs.py + looper_args: True + required_input_files: [data_source] + all_input_files: [data_source, read1, read2] + ngs_input_files: [data_source, read1, read2] + arguments: + "--sample-name": sample_name + "--genome": genome + "--input": data_source + "--single-or-paired": read_type + optional_arguments: + "--input2": read2 + "--dark-bases": dark_bases + resources: + default: + file_size: "0" + cores: "4" + mem: "4000" + time: "0-02:00:00" + small: + file_size: "0.3" + cores: "4" + mem: "16000" + time: "1-00:00:00" + medium: + file_size: "2" + cores: "4" + mem: "24000" + time: "1-12:00:00" + high: + file_size: "4" + cores: "6" + mem: "24000" + time: "2-00:00:00" diff --git a/tests/data/src/rrbs.py b/tests/data/src/rrbs.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/data/src/wgbs.py b/tests/data/src/wgbs.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/helpers.py b/tests/helpers.py index d9b3adccb..a6fb031c9 100644 --- a/tests/helpers.py +++ b/tests/helpers.py @@ -24,6 +24,26 @@ def assert_entirely_equal(observed, expected): assert (observed == expected).all() +def build_pipeline_iface(from_file, folder, data): + """ + Homogenize PipelineInterface build over both in-memory and on-disk data. + + :param bool from_file: whether to route the construction through disk + :param str folder: folder in which to create config file if via disk + :param Mapping data: raw PI config data + :return looper.PipelineInterface: the new PipelineInterface instance + """ + import os, yaml + from looper import PipelineInterface + assert type(from_file) is bool + if from_file: + fp = os.path.join(folder, "pipeline_interface.yaml") + with open(fp, 'w') as f: + yaml.dump(data, f) + data = fp + return PipelineInterface(data) + + def named_param(argnames, argvalues): """ Parameterize a test case and automatically name/label by value @@ -81,3 +101,29 @@ def go(m, acc): acc[k] = go(v, {}) if isinstance(v, Mapping) else v return acc return go(data, {}) + + +class ReqsSpec(object): + """ Basically a namedtuple but with type validation. """ + + def __init__(self, reqs, exp_valid, exp_unmet): + """ + This is used for PipelineInterface requirements specification testing. + + :param str | Iterable[str] | Mapping[str, str] reqs: pipeline + requirements specification, either for entire interface or for + a specific pipeline + :param Iterable[str] exp_valid: expected satisfied requirements + :param Iterable[str] exp_unmet: expected unmet requirements + """ + def proc_exp(exp_val): + types = (tuple, list, set) + if not isinstance(exp_val, types): + raise TypeError( + "Illegal type of expected value ({}); must be one of: {}". + format(type(exp_val).__name__, + ", ".join(map(lambda t: t.__name__, types)))) + return set(exp_val) + self.exp_valid = proc_exp(exp_valid or []) + self.exp_valid = proc_exp(exp_unmet or []) + self.reqs = reqs diff --git a/tests/integration/test_project_pipeline_interface_interaction.py b/tests/integration/test_project_pipeline_interface_interaction.py new file mode 100644 index 000000000..e2d2b6947 --- /dev/null +++ b/tests/integration/test_project_pipeline_interface_interaction.py @@ -0,0 +1,119 @@ +""" Tests regarding interaction between Project and PipelineInterface """ + +from copy import deepcopy +import itertools +import mock +import os +import pytest +import yaml +from peppy import CONSTANTS_DECLARATION, DATA_SOURCES_SECTION, \ + DERIVATIONS_DECLARATION, OUTDIR_KEY, SAMPLE_ANNOTATIONS_KEY +from looper import Project +from looper.const import PIPELINE_REQUIREMENTS_KEY +from looper.pipeline_interface import PL_KEY, PROTOMAP_KEY +from looper.pipereqs import KEY_EXEC_REQ, KEY_FILE_REQ, KEY_FOLDER_REQ +from looper.project_piface_group import ProjectPifaceGroup +import tests +from tests.helpers import build_pipeline_iface +from ubiquerg import powerset + +__author__ = "Vince Reuter" +__email__ = "vreuter@virginia.edu" + + +GOOD_EXEC_REQS_DATA = [(r, KEY_EXEC_REQ) for r in ["ls", "date"]] +GOOD_PATH_REQS_DATA = [("$HOME", KEY_FOLDER_REQ), (__file__, KEY_FILE_REQ)] +GOOD_REQS_MAPS = [dict(c) for c in powerset(GOOD_PATH_REQS_DATA + GOOD_EXEC_REQS_DATA, nonempty=True)] +GOOD_REQS_LISTS = [list(c) for c in powerset([r for r, _ in GOOD_EXEC_REQS_DATA], nonempty=True)] + +BAD_EXEC_REQS_DATA = [(r, KEY_EXEC_REQ) for r in [__file__, "$HOME"]] +BAD_PATH_REQS_DATA = [("not-a-file", KEY_FILE_REQ), ("notdir", KEY_FOLDER_REQ)] +BAD_REQS_MAPS = list(map(dict, powerset(BAD_EXEC_REQS_DATA + BAD_PATH_REQS_DATA, nonempty=True))) +BAD_REQS_LISTS = list(map(list, powerset([r for r, _ in BAD_PATH_REQS_DATA], nonempty=True))) + +ANNS_FILE_NAME = "anns.csv" +DATA_FOLDER_PATH = os.path.join(os.path.dirname(tests.__file__), "data") +INTERFACE_FILEPATH = os.path.join(DATA_FOLDER_PATH, "methyl_piface.yaml") + +""" +{source_key}: + src1: "{{basedir}}/data/{{sample_name}}.txt" + src2: "{{basedir}}/data/{{sample_name}}-bamfile.bam" +""" + +PROJECT_CONFIG_LINES = """metadata: + {tab_key}: {anns_file} + {outkey}: test + +{const_key}: + genome: mm10 +""".format(outkey=OUTDIR_KEY, tab_key=SAMPLE_ANNOTATIONS_KEY, + anns_file=ANNS_FILE_NAME, derivations_key=DERIVATIONS_DECLARATION, + source_key=DATA_SOURCES_SECTION, const_key=CONSTANTS_DECLARATION).splitlines(True) + +BSSEQ_PROTO = "BS" + +SAMPLE_ANNOTATION_LINES = """sample_name,protocol,file,file2 +a,{p},src1,src2 +b,{p},src1,src2 +c,{p},src1,src2 +d,{p},src1,src2 +""".format(p=BSSEQ_PROTO).splitlines(True) + + +@pytest.fixture +def methyl_config(): + """ Return parse of on-disk PipelineInterface file. """ + with open(INTERFACE_FILEPATH, 'r') as f: + return yaml.load(f, yaml.SafeLoader) + + +@pytest.fixture +def project(tmpdir): + srcdir = os.path.join(tmpdir.strpath, "src") + os.makedirs(srcdir) + with open(os.path.join(srcdir, "wgbs.py"), 'w'), \ + open(os.path.join(srcdir, "rrbs.py"), 'w'): + pass + conf_file = tmpdir.join("prjcfg.yaml").strpath + with open(conf_file, 'w') as f: + for l in PROJECT_CONFIG_LINES: + f.write(l) + with open(tmpdir.join(ANNS_FILE_NAME).strpath, 'w') as f: + for l in SAMPLE_ANNOTATION_LINES: + f.write(l) + return Project(conf_file) + + +@pytest.mark.parametrize(["good_reqs", "bad_reqs"], itertools.product( + GOOD_REQS_LISTS + GOOD_REQS_MAPS, BAD_REQS_LISTS + BAD_REQS_MAPS)) +@pytest.mark.parametrize( + ["good_proto", "bad_proto"], [("WGBS", "RRBS"), ("RRBS", "WGBS")]) +def test_submission_bundle_construction( + tmpdir, project, methyl_config, + good_reqs, bad_reqs, good_proto, bad_proto): + print("TMPDIR CONTENTS: {}".format(os.listdir(tmpdir.strpath))) + good_pipe = methyl_config[PROTOMAP_KEY][good_proto] + bad_pipe = methyl_config[PROTOMAP_KEY][bad_proto] + data = deepcopy(methyl_config) + print("DATA: {}".format(data)) + data[PL_KEY][good_pipe][PIPELINE_REQUIREMENTS_KEY] = good_reqs + data[PL_KEY][bad_pipe][PIPELINE_REQUIREMENTS_KEY] = bad_reqs + iface_group = ProjectPifaceGroup() + pi = build_pipeline_iface(from_file=True, folder=tmpdir.strpath, data=data) + iface_group.update(pi) + project.interfaces = iface_group + #with mock.patch.object(project, "interfaces", return_value=iface_group): + # obs_good = project.build_submission_bundles(good_proto) + # obs_bad = project.build_submission_bundles(bad_proto) + obs_good = project.build_submission_bundles(good_proto) + obs_bad = project.build_submission_bundles(bad_proto) + assert 1 == len(obs_good) + assert pi == obs_good[0][0] + assert [] == obs_bad + + +@pytest.mark.skip("not implemented") +@pytest.mark.parametrize(["top_reqs", "check"], []) +def test_submission_bundle_construction_top_level_reqs(top_reqs, check): + pass diff --git a/tests/models/pipeline_interface/test_PipelineInterface_requirements.py b/tests/models/pipeline_interface/test_PipelineInterface_requirements.py index 82afcf10b..d0585175e 100644 --- a/tests/models/pipeline_interface/test_PipelineInterface_requirements.py +++ b/tests/models/pipeline_interface/test_PipelineInterface_requirements.py @@ -1,13 +1,14 @@ """ Tests for declaration of requirements in pipeline interface """ -import os from attmap import PathExAttMap from looper import PipelineInterface from looper.exceptions import PipelineInterfaceRequirementsError from looper.pipeline_interface import \ PL_KEY, PROTOMAP_KEY, PIPELINE_REQUIREMENTS_KEY +from looper.pipereqs import KEY_EXEC_REQ, KEY_FILE_REQ, KEY_FOLDER_REQ import pytest import yaml +from tests.helpers import build_pipeline_iface from tests.models.pipeline_interface.conftest import \ ATAC_PIPE_NAME, ATAC_PROTOCOL_NAME from veracitools import ExpectContext @@ -32,24 +33,6 @@ def randn(): return random.randint(-sys.maxsize, sys.maxsize) -def _make_from_data(from_file, folder, data): - """ - Homogenize PipelineInterface build over both in-memory and on-disk data. - - :param bool from_file: whether to route the construction through disk - :param str folder: folder in which to create config file if via disk - :param Mapping data: raw PI config data - :return looper.PipelineInterface: the new PipelineInterface instance - """ - assert type(from_file) is bool - if from_file: - fp = os.path.join(folder, "pipeline_interface.yaml") - with open(fp, 'w') as f: - yaml.dump(data, f) - data = fp - return PipelineInterface(data) - - @pytest.mark.parametrize(["observe", "expected"], [ (lambda pi, pk: pi.validate(pk), True), (lambda pi, pk: pi.missing_requirements(pk), []) @@ -62,7 +45,7 @@ def test_no_requirements_successfully_validates( assert [atac_pipe_name] == list(atacseq_piface_data.keys()) assert ATAC_PIPE_NAME == atacseq_piface_data[atac_pipe_name]["name"] - pi = _make_from_data(from_file, tmpdir.strpath, { + pi = build_pipeline_iface(from_file, tmpdir.strpath, { PROTOMAP_KEY: {ATAC_PROTOCOL_NAME: atac_pipe_name}, PL_KEY: {atac_pipe_name: atacseq_piface_data} }) @@ -99,7 +82,7 @@ def test_empty_requirements_successfully_validates( else: raise ValueError("Unexpected reqs placement spec: {}".format(placement)) - pi = _make_from_data(from_file, tmpdir.strpath, data) + pi = build_pipeline_iface(from_file, tmpdir.strpath, data) assert expected == observe(pi, atac_pipe_name) @@ -182,7 +165,7 @@ def test_bad_reqs_post_construction( else: raise ValueError("Unexpected reqs placement spec: {}".format(init_place)) - pi = _make_from_data(from_file, tmpdir.strpath, data) + pi = build_pipeline_iface(from_file, tmpdir.strpath, data) pretest(pi, atac_pipe_name, init_reqs_data) if post_place_loc == "top-level": @@ -196,7 +179,7 @@ def test_bad_reqs_post_construction( @pytest.mark.parametrize( - "reqs", [{}, ["ls", "date"], {"ls": "executable", "date": "executable"}]) + "reqs", [{}, ["ls", "date"], {"ls": KEY_EXEC_REQ, "date": KEY_EXEC_REQ}]) def test_top_level_requirements_do_not_literally_propagate( reqs, from_file, tmpdir, atac_pipe_name, atacseq_piface_data): """ Don't literally store universal requirements in each pipeline. """ @@ -205,14 +188,14 @@ def test_top_level_requirements_do_not_literally_propagate( PL_KEY: {atac_pipe_name: atacseq_piface_data}, PIPELINE_REQUIREMENTS_KEY: reqs } - pi = _make_from_data(from_file, tmpdir.strpath, data) + pi = build_pipeline_iface(from_file, tmpdir.strpath, data) assert_reqs_eq(reqs, pi[PIPELINE_REQUIREMENTS_KEY]) assert all(map(lambda d: PIPELINE_REQUIREMENTS_KEY not in d, pi[PL_KEY].values())) @pytest.mark.parametrize(["reqs", "expected"], [ ("nonexec", ["nonexec"]), (["not-on-path", "ls"], ["not-on-path"]), - ({"nonexec": "executable", "$HOME": "folder"}, ["nonexec"])]) + ({"nonexec": KEY_EXEC_REQ, "$HOME": KEY_FOLDER_REQ}, ["nonexec"])]) def test_top_level_requirements_functionally_propagate( reqs, from_file, tmpdir, atac_pipe_name, atacseq_piface_data, expected): """ The universal requirements do functionally apply to each pipeline. """ @@ -221,7 +204,7 @@ def test_top_level_requirements_functionally_propagate( PL_KEY: {atac_pipe_name: atacseq_piface_data}, PIPELINE_REQUIREMENTS_KEY: reqs } - pi = _make_from_data(from_file, tmpdir.strpath, data) + pi = build_pipeline_iface(from_file, tmpdir.strpath, data) print(pi[PIPELINE_REQUIREMENTS_KEY]) assert_reqs_eq(reqs, pi[PIPELINE_REQUIREMENTS_KEY]) assert PIPELINE_REQUIREMENTS_KEY not in pi[PL_KEY][atac_pipe_name] @@ -236,12 +219,12 @@ def test_top_level_requirements_functionally_propagate( (["ls"], ["badexec"], lambda pi, pk: [] == pi.missing_requirements(pk), lambda pi, pk: ["badexec"] == pi.missing_requirements(pk)), - ({"ls": "folder"}, {"ls": "executable"}, + ({"ls": KEY_FOLDER_REQ}, {"ls": KEY_EXEC_REQ}, lambda pi, pk: not pi.validate(pk), lambda pi, pk: pi.validate(pk)), - ({"ls": "folder"}, {"ls": "executable"}, + ({"ls": KEY_FOLDER_REQ}, {"ls": KEY_EXEC_REQ}, lambda pi, pk: ["ls"] == pi.missing_requirements(pk), lambda pi, pk: [] == pi.missing_requirements(pk)), - (None, {"ls": "file"}, + (None, {"ls": KEY_FILE_REQ}, lambda pi, pk: pi.validate(pk), lambda pi, pk: ["ls"] == pi.missing_requirements(pk))] ) @@ -269,7 +252,7 @@ def assert_reqs_other(iface): else: def assert_reqs_other(iface): assert PIPELINE_REQUIREMENTS_KEY not in iface[PL_KEY][other_name] - pi = _make_from_data(from_file, tmpdir.strpath, data) + pi = build_pipeline_iface(from_file, tmpdir.strpath, data) assert PIPELINE_REQUIREMENTS_KEY not in pi assert_reqs_other(pi) check_atac(pi, atac_pipe_name) From fa1f3ede5eafa75c653881a5881ce6616ac8438c Mon Sep 17 00:00:00 2001 From: Michal Stolarczyk Date: Fri, 28 Jun 2019 10:10:15 -0400 Subject: [PATCH 11/34] simplify navbar paths contextualization --- looper/html_reports.py | 78 +++++++++++++++--------------------------- 1 file changed, 27 insertions(+), 51 deletions(-) diff --git a/looper/html_reports.py b/looper/html_reports.py index 42e5a0f73..89713df3f 100644 --- a/looper/html_reports.py +++ b/looper/html_reports.py @@ -33,12 +33,12 @@ def __init__(self, prj): def __call__(self, objs, stats, columns): """ Do the work of the subcommand/program. """ - # Generate HTML report - index_html_path = self.create_index_html(objs, stats, columns, - navbar=self.create_navbar(self.create_navbar_links( - prj=self.prj, objs=objs, stats=stats, - wd=self.prj.metadata.output_dir)), + navbar = self.create_navbar( + self.create_navbar_links(objs=objs, stats=stats, wd=self.prj.metadata.output_dir, context=["reports"])) + navbar_reports = self.create_navbar( + self.create_navbar_links(objs=objs, stats=stats, wd=self.reports_dir, context=["reports"])) + index_html_path = self.create_index_html(objs, stats, columns, navbar=navbar, navbar_reports=navbar_reports, footer=self.create_footer()) return index_html_path @@ -117,28 +117,28 @@ def create_footer(self): """ return render_jinja_template("footer.html", self.j_env, dict(version=v)) - def create_navbar_links(self, prj, objs, stats, wd=None, context=None): + def create_navbar_links(self, objs, stats, wd=None, context=None): """ Return a string containing the navbar prebuilt html. Generates links to each page relative to the directory of interest (wd arg) or uses the provided context to create the paths (context arg) - :param looper.Project prj: a project the navbar links should be created for :param pandas.DataFrame objs: project results dataframe containing object data :param list stats[dict] stats: a summary file of pipeline statistics for each analyzed sample - :param path wd: the working directory of the current HTML page - being generated, enables navbar links relative to page - :param list[str] context: the context the links will be used in + :param path wd: the working directory of the current HTML page being generated, enables navbar links + relative to page + :param list[str] context: the context the links will be used in. + The sequence of directories to be prepended to the HTML file in the resulting navbar """ if wd is None and context is None: raise ValueError("Either 'wd' (path the links should be relative to) or 'context'" " (the context for the links) has to be provided.") - status_relpath = _make_relpath(prj=prj, file_name="status.html", dir=wd, context=context) - objects_relpath = _make_relpath(prj=prj, file_name="objects.html", dir=wd, context=context) - samples_relpath = _make_relpath(prj=prj, file_name="samples.html", dir=wd, context=context) + status_relpath = _make_relpath(file_name="status.html", wd=wd, context=context) + objects_relpath = _make_relpath(file_name="objects.html", wd=wd, context=context) + samples_relpath = _make_relpath(file_name="samples.html", wd=wd, context=context) dropdown_keys_objects = None dropdown_relpaths_objects = None dropdown_relpaths_samples = None @@ -147,13 +147,13 @@ def create_navbar_links(self, prj, objs, stats, wd=None, context=None): # If the number of objects is 20 or less, use a drop-down menu if len(objs['key'].drop_duplicates()) <= 20: dropdown_relpaths_objects, dropdown_keys_objects = \ - _get_navbar_dropdown_data_objects(prj=prj, objs=objs, wd=wd, context=context) + _get_navbar_dropdown_data_objects(objs=objs, wd=wd, context=context) else: dropdown_relpaths_objects = objects_relpath if stats: if len(stats) <= 20: dropdown_relpaths_samples, sample_names = \ - _get_navbar_dropdown_data_samples(prj=prj, stats=stats, wd=wd, context=context) + _get_navbar_dropdown_data_samples(stats=stats, wd=wd, context=context) else: # Create a menu link to the samples parent page dropdown_relpaths_samples = samples_relpath @@ -372,7 +372,6 @@ def create_sample_html(self, objs, sample_name, sample_stats, navbar, footer): save_html(html_page, render_jinja_template("sample.html", self.j_env, template_vars)) return sample_page_relpath - def create_status_html(self, status_table, navbar, footer): """ Generates a page listing all the samples, their run status, their @@ -645,53 +644,38 @@ def _get_relpath_to_file(file_name, sample_name, location, relative_to): return rel_file_path -def _make_relpath(prj, file_name, dir, context): +def _make_relpath(file_name, wd, context=None): """ Create a path relative to the context. This function introduces the flexibility to the navbar links creation, which the can be used outside of the native looper summary pages. - :param str path: the path to make relative - :param str dir: the dir the path should be relative to - :param list[str] context: names of the directories that create the context for the path + :param str file_name: the path to make relative + :param str wd: the dir the path should be relative to + :param list[str] context: the context the links will be used in. + The sequence of directories to be prepended to the HTML file in the resulting navbar :return str: relative path """ - rep_dir = os.path.basename(get_reports_dir(prj)) if context is not None: - full_context = ["summary", rep_dir] - caravel_mount_point = [item for item in full_context if item not in context] - caravel_mount_point.append(file_name) - relpath = os.path.join(*caravel_mount_point) - else: - relpath = os.path.relpath(file_name, dir) + mount_point = context + [file_name] + file_name = os.path.join(*mount_point) + relpath = os.path.relpath(file_name, wd) return relpath -def _get_navbar_dropdown_data_objects(prj, objs, wd, context): +def _get_navbar_dropdown_data_objects(objs, wd, context): if objs is None: return None, None - rep_dir_path = get_reports_dir(prj) - rep_dir = os.path.basename(rep_dir_path) relpaths = [] df_keys = objs['key'].drop_duplicates().sort_values() for key in df_keys: page_name = (key + ".html").replace(' ', '_').lower() - page_path = os.path.join(rep_dir_path, page_name) - if context is not None: - full_context = ["summary", rep_dir] - caravel_mount_point = [item for item in full_context if item not in context] - caravel_mount_point.append(page_name) - relpath = os.path.join(*caravel_mount_point) - else: - relpath = os.path.relpath(page_path, wd) - relpaths.append(relpath) + relpaths.append(_make_relpath(page_name, wd, context)) return relpaths, df_keys -def _get_navbar_dropdown_data_samples(prj, stats, wd, context): +def _get_navbar_dropdown_data_samples(stats, wd, context): if stats is None: return None, None - rep_dir_path = get_reports_dir(prj) - rep_dir = os.path.basename(rep_dir_path) relpaths = [] sample_names = [] for sample in stats: @@ -699,15 +683,7 @@ def _get_navbar_dropdown_data_samples(prj, stats, wd, context): if entry == "sample_name": sample_name = str(val) page_name = (sample_name + ".html").replace(' ', '_').lower() - page_path = os.path.join(rep_dir_path, page_name) - if context is not None: - full_context = ["summary", rep_dir] - caravel_mount_point = [item for item in full_context if item not in context] - caravel_mount_point.append(page_name) - relpath = os.path.join(*caravel_mount_point) - else: - relpath = os.path.relpath(page_path, wd) - relpaths.append(relpath) + relpaths.append(_make_relpath(page_name, wd, context)) sample_names.append(sample_name) break else: From 10d5a45101290d7e06849dbc6f0e95b7c06aeb78 Mon Sep 17 00:00:00 2001 From: Michal Stolarczyk Date: Mon, 1 Jul 2019 12:01:47 -0400 Subject: [PATCH 12/34] use DataTables for status table display, closes #203 --- looper/jinja_templates/head.html | 4 ++++ looper/jinja_templates/status_table.html | 9 +++++++-- looper/jinja_templates/status_table_no_links.html | 9 +++++++-- 3 files changed, 18 insertions(+), 4 deletions(-) diff --git a/looper/jinja_templates/head.html b/looper/jinja_templates/head.html index a74c034ef..0ac8a0f05 100644 --- a/looper/jinja_templates/head.html +++ b/looper/jinja_templates/head.html @@ -20,6 +20,10 @@ + + + +