Skip to content

Commit

Permalink
fix: wip htcondor
Browse files Browse the repository at this point in the history
  • Loading branch information
tcjennings committed Jan 26, 2025
1 parent 55e462e commit 9cd9456
Show file tree
Hide file tree
Showing 5 changed files with 126 additions and 34 deletions.
9 changes: 1 addition & 8 deletions examples/templates/example_bps_core_script_template.yaml
Original file line number Diff line number Diff line change
@@ -1,11 +1,4 @@
text: "#!/usr/bin/env -S -i CM_CONFIGS=\"${CM_CONFIGS}\" HOME=\"${HOME}\" bash\n
# The shebang lines above are needed b/c setup lsst_distrib is putting\n
# the lsst python _after_ the virtual env python in the PATH, which\n
# is causing errors\n
# This is needed to define butler aliases\n
export DAF_BUTLER_REPOSITORY_INDEX=/sdf/group/rubin/shared/data-repos.yaml\n
text: "#!/usr/bin/env bash\n
# setup LSST env.\n
export LSST_VERSION='{lsst_version}'\n
Expand Down
53 changes: 45 additions & 8 deletions src/lsst/cmservice/common/htcondor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Utility functions for working with htcondor jobs"""

from collections.abc import Mapping
from typing import Any

from anyio import Path, open_process
Expand Down Expand Up @@ -49,6 +50,8 @@ async def write_htcondor_script(
Path to the log wrapper log
"""
options = dict(
initialdir=config.htcondor.working_directory,
batch_name=config.htcondor.batch_name,
should_transfer_files="Yes",
when_to_transfer_output="ON_EXIT",
get_env=True,
Expand All @@ -58,13 +61,6 @@ async def write_htcondor_script(
)
options.update(**kwargs)

if config.htcondor.alias_path is not None:
_alias = Path(config.htcondor.alias_path)
# FIXME can we use the actual campaign prod_area here
script_url = _alias / script_url.relative_to("/output")
htcondor_log = _alias / htcondor_log.relative_to("/output")
log_url = _alias / log_url.relative_to("/output")

htcondor_script_contents = [
f"executable = {script_url}",
f"log = {htcondor_log}",
Expand Down Expand Up @@ -100,7 +96,8 @@ async def submit_htcondor_job(

try:
async with await open_process(
[config.htcondor.condor_submit_bin, "-disable", "-file", htcondor_script_path]
[config.htcondor.condor_submit_bin, "-file", htcondor_script_path],
env=build_htcondor_submit_environment(),
) as condor_submit:
if await condor_submit.wait() != 0: # pragma: no cover
assert condor_submit.stderr
Expand Down Expand Up @@ -140,6 +137,7 @@ async def check_htcondor_job(
raise CMHTCondorCheckError("No htcondor_id")
async with await open_process(
[config.htcondor.condor_q_bin, "-userlog", htcondor_id, "-af", "JobStatus", "ExitCode"],
env=build_htcondor_submit_environment(),
) as condor_q: # pragma: no cover
if await condor_q.wait() != 0:
assert condor_q.stderr
Expand Down Expand Up @@ -169,3 +167,42 @@ async def check_htcondor_job(
else:
status = StatusEnum.failed
return status # pragma: no cover


def build_htcondor_submit_environment() -> Mapping[str, str]:
"""Construct an environment to apply to the subprocess shell when
submitting an htcondor job.
The condor job will inherit this specific environment via the submit file
command `get_env = True`, so it must satisfy the requirements of any work
being performed in the submitted job.
This primarily means that if the job is to run a butler command, the
necessary environment variables to support butler must be present; if the
job is to run a bps command, the environment variables must support it.
This also means that the environment constructed here is fundamentally
different to the environment in which the service or daemon operates and
should closer match the environment of an interactive sdfianaXXX user at
SLAC.
"""
return dict(
CONDOR_CONFIG="ONLY_ENV",
_CONDOR_CONDOR_HOST=config.htcondor.collector_host,
_CONDOR_COLLECTOR_HOST=config.htcondor.collector_host,
_CONDOR_SCHEDD_HOST=config.htcondor.schedd_host,
_CONDOR_SEC_CLIENT_AUTHENTICATION_METHODS=config.htcondor.authn_methods,
DAF_BUTLER_REPOSITORY_INDEX="/sdf/group/rubin/shared/data-repos.yaml",
FS_REMOTE_DIR=config.htcondor.fs_remote_dir,
HOME="/sdf/home/l/lsstsvc1",
LSST_VERSION="w_latest",
LSST_DISTRIB_DIR="/sdf/group/rubin/sw",
# LSST_DISTRIB_DIR="/cvmfs/sw.lsst.eu/linux-x86_64/lsst_distrib",
PGPASSFILE="/sdf/home/l/lsstsvc1/.lsst/postgres-credentials.txt",
PGUSER="rubin",
PATH=(
"/sdf/home/l/lsstsvc1/.local/bin:/sdf/home/l/lsstsvc1/bin:"
"/opt/slurm/slurm-curr/bin:/usr/local/bin:/usr/bin:"
"/usr/local/sbin:/usr/sbin"
),
)
57 changes: 48 additions & 9 deletions src/lsst/cmservice/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ class HTCondorConfiguration(BaseModel):
their serialization alias.
"""

condor_home: str = Field(
description=("Path to the Condor home directory. Equivalent to the condor ``RELEASE_DIR`` macro."),
default="/opt/htcondor",
serialization_alias="_CONDOR_RELEASE_DIR",
)

# TODO retire these in favor of a path relative to condor_home
condor_submit_bin: str = Field(
description="Name of condor_submit client binary",
default="condor_submit",
Expand All @@ -101,6 +108,33 @@ class HTCondorConfiguration(BaseModel):
exclude=True,
)

universe: str = Field(
description="HTCondor Universe into which a job will be submitted.",
default="vanilla",
serialization_alias="_CONDOR_DEFAULT_UNIVERSE",
)

working_directory: str = Field(
description=(
"Path to a working directory to use when submitting condor jobs. "
"This path must be available to both the submitting service and "
"the access point receiving the job. Corresponds to the "
"`initialdir` submit file command."
),
default=".",
exclude=True,
)

batch_name: str = Field(
description=(
"Name to use in identifying condor jobs. Corresponds to "
"the `condor_submit` `-batch-name` parameter or submit file "
"`batch_name` command."
),
default="usdf-cm-dev",
exclude=True,
)

request_cpus: int = Field(
description="Number of cores to request when submitting an htcondor job.",
default=1,
Expand All @@ -122,29 +156,29 @@ class HTCondorConfiguration(BaseModel):
collector_host: str = Field(
description="Name of an htcondor collector host.",
default="localhost",
serialization_alias="_condor_COLLECTOR_HOST",
serialization_alias="_CONDOR_COLLECTOR_HOST",
)

schedd_host: str = Field(
description="Name of an htcondor schedd host.",
default="localhost",
serialization_alias="_condor_SCHEDD_HOST",
serialization_alias="_CONDOR_SCHEDD_HOST",
)

authn_methods: str = Field(
description="Secure client authentication methods, as comma-delimited strings",
default="FS,FS_REMOTE",
serialization_alias="_condor_SEC_CLIENT_AUTHENTICATION_METHODS",
serialization_alias="_CONDOR_SEC_CLIENT_AUTHENTICATION_METHODS",
)

dagman_job_append_get_env: bool = Field(
description="...", default=True, serialization_alias="_condor_DAGMAN_MANAGER_JOB_APPEND_GETENV"
fs_remote_dir: str = Field(
description="...",
default=".",
serialization_alias="FS_REMOTE_DIR",
)

alias_path: str | None = Field(
description="The alias path to use in htcondor submission files instead of a campaign's prod_area",
default=None,
exclude=True,
dagman_job_append_get_env: bool = Field(
description="...", default=True, serialization_alias="_CONDOR_DAGMAN_MANAGER_JOB_APPEND_GETENV"
)


Expand All @@ -159,6 +193,11 @@ class SlurmConfiguration(BaseModel):
have this as a document of what settings are actually used.
"""

home: str = Field(
description="Location of the installed slurm client binaries",
default="",
)

sacct_bin: str = Field(
description="Name of sacct slurm client binary",
default="sacct",
Expand Down
9 changes: 8 additions & 1 deletion src/lsst/cmservice/handlers/script_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,13 @@ async def _check_htcondor_job(
await script.update_values(session, status=status)
return status

@staticmethod
def _prepend_htcondor_job(*, setup_stack: bool = False) -> str:
prepend = "#!/usr/bin/env bash\n"
if setup_stack:
prepend += "source ${LSST_DISTRIB_DIR}/${LSST_VERSION}/loadLSST.bash\nsetup lsst_distrib\n"
return prepend

async def prepare(
self,
session: async_scoped_session,
Expand All @@ -437,7 +444,7 @@ async def prepare(
elif script_method == ScriptMethodEnum.slurm:
status = await self._write_script(session, script, parent, **kwargs)
elif script_method == ScriptMethodEnum.htcondor:
status = await self._write_script(session, script, parent, **kwargs)
status = await self._write_script(session, script, parent, setup_stack=True, **kwargs)
else: # pragma: no cover
raise CMBadExecutionMethodError(f"Bad script method {script_method}")
await script.update_values(session, status=status)
Expand Down
32 changes: 24 additions & 8 deletions src/lsst/cmservice/handlers/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ async def _write_script(
parent: ElementMixin,
**kwargs: Any,
) -> StatusEnum:
setup_stack = kwargs.get("setup_stack", False)
resolved_cols = await script.resolve_collections(session)
data_dict = await script.data_dict(session)
try:
Expand All @@ -89,6 +90,7 @@ async def _write_script(
butler_repo = data_dict["butler_repo"]
except KeyError as msg:
raise CMMissingScriptInputError(f"{script.fullname} missing an input: {msg}") from msg
prepend = self._prepend_htcondor_job(setup_stack=setup_stack)
command = f"{config.butler.butler_bin} collection-chain {butler_repo} {output_coll}"
# This is here out of paranoia.
# script.resolved_collections should convert the list to a string
Expand All @@ -97,7 +99,7 @@ async def _write_script(
command += f" {input_coll}"
else:
command += f" {input_colls}"
await write_bash_script(script_url, command, prepend="#!/usr/bin/env bash\n", **data_dict)
await write_bash_script(script_url, command, prepend=prepend, **data_dict)
await script.update_values(session, script_url=script_url, status=StatusEnum.prepared)
return StatusEnum.prepared

Expand Down Expand Up @@ -139,6 +141,7 @@ async def _write_script(
parent: ElementMixin,
**kwargs: Any,
) -> StatusEnum:
setup_stack = kwargs.get("setup_stack", False)
resolved_cols = await script.resolve_collections(session)
data_dict = await script.data_dict(session)
try:
Expand All @@ -148,11 +151,12 @@ async def _write_script(
butler_repo = data_dict["butler_repo"]
except KeyError as msg:
raise CMMissingScriptInputError(f"{script.fullname} missing an input: {msg}") from msg
prepend = self._prepend_htcondor_job(setup_stack=setup_stack)
command = (
f"{config.butler.butler_bin} collection-chain "
f"{butler_repo} {output_coll} --mode prepend {input_coll}"
)
await write_bash_script(script_url, command, prepend="#!/usr/bin/env bash\n", **data_dict)
await write_bash_script(script_url, command, prepend=prepend, **data_dict)
await script.update_values(session, script_url=script_url, status=StatusEnum.prepared)
return StatusEnum.prepared

Expand Down Expand Up @@ -195,6 +199,7 @@ async def _write_script(
parent: ElementMixin,
**kwargs: Any,
) -> StatusEnum:
setup_stack = kwargs.get("setup_stack", False)
resolved_cols = await script.resolve_collections(session)
data_dict = await script.data_dict(session)
try:
Expand All @@ -220,12 +225,13 @@ async def _write_script(
)
script_url = await self._set_script_files(session, script, data_dict["prod_area"])
butler_repo = data_dict["butler_repo"]
prepend = self._prepend_htcondor_job(setup_stack=setup_stack)
command = f"{config.butler.butler_bin} collection-chain {butler_repo} {output_coll}"
for collect_coll_ in collect_colls:
command += f" {collect_coll_}"
for input_coll_ in input_colls:
command += f" {input_coll_}"
await write_bash_script(script_url, command, prepend="#!/usr/bin/env bash\n", **data_dict)
await write_bash_script(script_url, command, prepend=prepend, **data_dict)
await script.update_values(session, script_url=script_url, status=StatusEnum.prepared)
return StatusEnum.prepared

Expand Down Expand Up @@ -267,6 +273,7 @@ async def _write_script(
parent: ElementMixin,
**kwargs: Any,
) -> StatusEnum:
setup_stack = kwargs.get("setup_stack", False)
resolved_cols = await script.resolve_collections(session)
data_dict = await script.data_dict(session)
try:
Expand All @@ -277,10 +284,11 @@ async def _write_script(
data_query = data_dict.get("data_query")
except KeyError as msg:
raise CMMissingScriptInputError(f"{script.fullname} missing an input: {msg}") from msg
prepend = self._prepend_htcondor_job(setup_stack=setup_stack)
command = f"{config.butler.butler_bin} associate {butler_repo} {output_coll}"
command += f" --collections {input_coll}"
command += f' --where "{data_query}"' if data_query else ""
await write_bash_script(script_url, command, prepend="#!/usr/bin/env bash\n", **data_dict)
await write_bash_script(script_url, command, prepend=prepend, **data_dict)
await script.update_values(session, script_url=script_url, status=StatusEnum.prepared)
return StatusEnum.prepared

Expand Down Expand Up @@ -318,6 +326,7 @@ async def _write_script(
parent: ElementMixin,
**kwargs: Any,
) -> StatusEnum:
setup_stack = kwargs.get("setup_stack", False)
resolved_cols = await script.resolve_collections(session)
data_dict = await script.data_dict(session)
try:
Expand All @@ -326,8 +335,9 @@ async def _write_script(
butler_repo = data_dict["butler_repo"]
except KeyError as msg:
raise CMMissingScriptInputError(f"{script.fullname} missing an input: {msg}") from msg
prepend = self._prepend_htcondor_job(setup_stack=setup_stack)
command = f"{config.butler.butler_bin} associate {butler_repo} {output_coll}"
await write_bash_script(script_url, command, prepend="#!/usr/bin/env bash\n", **data_dict)
await write_bash_script(script_url, command, prepend=prepend, **data_dict)
await script.update_values(session, status=StatusEnum.prepared)
return StatusEnum.prepared

Expand Down Expand Up @@ -367,6 +377,7 @@ async def _write_script(
parent: ElementMixin,
**kwargs: Any,
) -> StatusEnum:
setup_stack = kwargs.get("setup_stack", False)
resolved_cols = await script.resolve_collections(session)
data_dict = await script.data_dict(session)
try:
Expand All @@ -376,9 +387,10 @@ async def _write_script(
butler_repo = data_dict["butler_repo"]
except KeyError as msg:
raise CMMissingScriptInputError(f"{script.fullname} missing an input: {msg}") from msg
prepend = self._prepend_htcondor_job(setup_stack=setup_stack)
command = f"{config.butler.butler_bin} associate {butler_repo} {output_coll}"
command += f" --collections {input_coll}"
await write_bash_script(script_url, command, prepend="#!/usr/bin/env bash\n", **data_dict)
await write_bash_script(script_url, command, prepend=prepend, **data_dict)
await script.update_values(session, script_url=script_url, status=StatusEnum.prepared)
return StatusEnum.prepared

Expand Down Expand Up @@ -428,6 +440,7 @@ async def _write_script(
) -> StatusEnum:
test_type_and_raise(parent, Step, "PrepareStepScriptHandler._write_script parent")

setup_stack = kwargs.get("setup_stack", False)
resolved_cols = await script.resolve_collections(session)
data_dict = await script.data_dict(session)
try:
Expand All @@ -447,10 +460,11 @@ async def _write_script(
if not prereq_colls:
prereq_colls.append(resolved_cols["global_inputs"])

prepend = self._prepend_htcondor_job(setup_stack=setup_stack)
command = f"{config.butler.butler_bin} collection-chain {butler_repo} {output_coll}"
for prereq_coll_ in prereq_colls:
command += f" {prereq_coll_}"
await write_bash_script(script_url, command, prepend="#!/usr/bin/env bash\n", **data_dict)
await write_bash_script(script_url, command, prepend=prepend, **data_dict)
await script.update_values(session, script_url=script_url, status=StatusEnum.prepared)
return StatusEnum.prepared

Expand Down Expand Up @@ -667,6 +681,7 @@ async def _write_script(
parent: ElementMixin,
**kwargs: Any,
) -> StatusEnum:
setup_stack = kwargs.get("setup_stack", False)
resolved_cols = await script.resolve_collections(session)
data_dict = await script.data_dict(session)
try:
Expand All @@ -676,8 +691,9 @@ async def _write_script(
butler_repo = data_dict["butler_repo"]
except KeyError as msg:
raise CMMissingScriptInputError(f"{script.fullname} missing an input: {msg}") from msg
prepend = self._prepend_htcondor_job(setup_stack=setup_stack)
command = f"{config.bps.pipetask_bin} validate {butler_repo} {input_coll} {output_coll}"
await write_bash_script(script_url, command, prepend="#!/usr/bin/env bash\n", **data_dict)
await write_bash_script(script_url, command, prepend=prepend, **data_dict)
await script.update_values(session, script_url=script_url, status=StatusEnum.prepared)
return StatusEnum.prepared

Expand Down

0 comments on commit 9cd9456

Please sign in to comment.