Skip to content

Commit

Permalink
feat: refactor bps yaml as jinja template
Browse files Browse the repository at this point in the history
  • Loading branch information
tcjennings committed Jan 26, 2025
1 parent 9cd9456 commit 4ff6c31
Show file tree
Hide file tree
Showing 5 changed files with 128 additions and 49 deletions.
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ cython_debug/
.pypirc

# Local Ignores
outputs/
output/
prod_area/
build/

Expand Down
2 changes: 0 additions & 2 deletions examples/example_hsc_micro.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@
split_min_groups: 2
data:
butler_repo: '/repo/main'
prod_area: 'output/archive'
data_query: "instrument = 'HSC' AND exposure in (30504, 30502) AND detector in (45, 46, 47, 48)"
lsst_version: w_2025_01
bps_wms_clustering_file: "${DRP_PIPE_DIR}/bps/clustering/HSC/DRP-RC2-clustering.yaml"
bps_wms_resources_file: "${DRP_PIPE_DIR}/bps/resources/HSC/DRP-RC2.yaml"
- Specification:
Expand Down
121 changes: 75 additions & 46 deletions src/lsst/cmservice/handlers/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import yaml
from anyio import Path
from fastapi.concurrency import run_in_threadpool
from jinja2 import Environment, PackageLoader
from sqlalchemy.ext.asyncio import async_scoped_session

from lsst.ctrl.bps import BaseWmsService, WmsStates
Expand Down Expand Up @@ -72,7 +73,7 @@ async def _write_script(
pipeline_yaml = os.path.expandvars(data_dict["pipeline_yaml"])
run_coll = resolved_cols["run"]
input_colls = resolved_cols["inputs"]
bps_core_yaml_template = data_dict["bps_core_yaml_template"]
# bps_core_yaml_template = data_dict["bps_core_yaml_template"]
bps_core_script_template = data_dict["bps_core_script_template"]
bps_wms_script_template = data_dict["bps_wms_script_template"]
except KeyError as msg:
Expand All @@ -85,10 +86,11 @@ async def _write_script(
bps_wms_yaml_file = data_dict.get("bps_wms_yaml_file", None)
bps_wms_clustering_file = data_dict.get("bps_wms_clustering_file", None)
bps_wms_resources_file = data_dict.get("bps_wms_resources_file", None)
bps_wms_extra_files = data_dict.get("bps_wms_extra_files", [])
bps_extra_config = data_dict.get("bps_extra_config", None)
data_query = data_dict.get("data_query", None)
extra_qgraph_options = data_dict.get("extra_qgraph_options", None)
# FIXME refactor these "extras" or remove support
# bps_wms_extra_files = data_dict.get("bps_wms_extra_files", [])
# bps_extra_config = data_dict.get("bps_extra_config", None)

# Get the output file paths
script_url = await self._set_script_files(session, script, prod_area)
Expand All @@ -103,17 +105,17 @@ async def _write_script(
session,
bps_core_script_template,
)
bps_core_yaml_template_ = await specification.get_script_template(
session,
bps_core_yaml_template,
)
bps_wms_script_template_ = await specification.get_script_template(
session,
bps_wms_script_template,
)

# Template rendering
# - config_url <- output of rendered bps_submit_yaml.j2

config_path = await Path(config_url).resolve()
submit_path = await Path(f"{prod_area}/{parent.fullname}/submit").resolve()
# Clean up any existing artifacts in the target submit_path
try:
await run_in_threadpool(shutil.rmtree, submit_path)
except FileNotFoundError:
Expand All @@ -129,66 +131,93 @@ async def _write_script(
if custom_lsst_setup: # pragma: no cover
prepend += f"\n{custom_lsst_setup}\n"
prepend += bps_wms_script_template_.data["text"] # type: ignore

await write_bash_script(script_url, command, prepend=prepend)

workflow_config = bps_core_yaml_template_.data.copy() # type: ignore
# Collect values for and render bps submit yaml from template
await session.refresh(parent, attribute_names=["c_", "p_"])
# FIXME at this point, how could the following path *not* exist?
# is this meant to be `config_url` instead?
await Path(script_url).parent.mkdir(parents=True, exist_ok=True)

# workflow_config becomes values dictionary to use while rendering a
# yaml template, NOT the yaml template itself!
workflow_config: dict[str, Any] = {}
workflow_config["project"] = parent.p_.name # type: ignore
workflow_config["campaign"] = parent.c_.name # type: ignore
workflow_config["submit_path"] = str(submit_path)
workflow_config["lsst_version"] = os.path.expandvars(lsst_version)
workflow_config["pipeline_yaml"] = pipeline_yaml
workflow_config["custom_lsst_setup"] = custom_lsst_setup
workflow_config["extra_qgraph_options"] = extra_qgraph_options
workflow_config["extra_yaml_literals"] = []

# //////// INCLUDE_CONFIGS REGION
# include_configs to be a iterator used in rendering the yaml template
include_configs = []
for to_include_ in [bps_wms_yaml_file, bps_wms_clustering_file, bps_wms_resources_file]:
if to_include_:
# We want abspaths, but we need to be careful about
# envvars that are not yet expanded
if to_include_: # if it is what?
to_include_ = os.path.expandvars(to_include_)
if "$" not in to_include_:
to_include_ = await Path(to_include_).resolve()
include_configs.append(str(to_include_))
include_configs += bps_wms_extra_files

workflow_config["includeConfigs"] = include_configs # type: ignore

await session.refresh(parent, attribute_names=["c_", "p_"])
workflow_config["project"] = parent.p_.name # type: ignore
workflow_config["campaign"] = parent.c_.name # type: ignore

workflow_config["submitPath"] = str(submit_path) # type: ignore

workflow_config["LSST_VERSION"] = os.path.expandvars(lsst_version) # type: ignore
if custom_lsst_setup: # pragma: no cover
workflow_config["custom_lsst_setup"] = custom_lsst_setup # type: ignore
workflow_config["pipelineYaml"] = pipeline_yaml # type: ignore

if extra_qgraph_options: # pragma: no cover
workflow_config["extraQgraphOptions"] = extra_qgraph_options.replace("\n", " ").strip() # type: ignore

# If the potential include file has an unexpanded env var, we
# delegate that expansion to the bps runtime, since it may
# refer to a stack env var we do not understand.
if "$" in to_include_:
include_configs.append(str(to_include_))
continue

# Otherwise, instead of including it we should render it out
# because it's a path we understand but the bps runtime won't
to_include_ = await Path(to_include_).resolve()
# async load the text of the file and if it is valid yaml
# append it to the extra_yaml_literals
try:
include_yaml_ = yaml.dump(yaml.safe_load(await to_include_.read_text()))
workflow_config["extra_yaml_literals"].append(include_yaml_)
except yaml.YAMLError:
# TODO log the error condition
raise

# FIXME include this in the list of potential include files above
# include_configs += bps_wms_extra_files
workflow_config["include_configs"] = include_configs
# \\\\\\\\ INCLUDE_CONFIGS REGION

# //////// CLUSTERING_CONFIG REGION
# \\\\\\\\ CLUSTERING_CONFIG REGION

# //////// PAYLOAD REGION
if isinstance(input_colls, list): # pragma: no cover
in_collection = ",".join(input_colls)
else:
in_collection = input_colls

payload = {
"payloadName": parent.c_.name, # type: ignore
"butlerConfig": butler_repo,
"outputRun": run_coll,
"inCollection": in_collection,
"name": parent.c_.name, # type: ignore
"butler_config": butler_repo,
"output_run_collection": run_coll,
"input_collection": in_collection,
}
if data_query:
payload["dataQuery"] = data_query.replace("\n", " ").strip()
payload["data_query"] = data_query
if rescue: # pragma: no cover
payload["extra_args"] = f"--skip-existing-in {skip_colls}"

workflow_config["payload"] = payload # type: ignore
workflow_config["payload"] = payload
# \\\\\\\\\\ PAYLOAD REGION

if bps_extra_config: # pragma: no cover
workflow_config.update(**bps_extra_config) # type: ignore

await Path(script_url).parent.mkdir(parents=True, exist_ok=True)
# FIXME what would this contain? Is it supposed to be dumpable as raw
# yaml and included in the file?
# if bps_extra_config: # pragma: no cover
# workflow_config.update(**bps_extra_config)

# Get the yaml template using package lookup
config_template_environment = Environment(loader=PackageLoader("lsst.cmservice"))
config_template = config_template_environment.get_template("bps_submit_yaml.j2")
try:
yaml_output = yaml.dump(workflow_config)
# Render bps_submit_yaml template to `config_url`
yaml_output = config_template.render(workflow_config)
await Path(config_url).write_text(yaml_output)
except yaml.YAMLError as yaml_error:
raise yaml.YAMLError(f"Error writing a script to run BPS job {script}; threw {yaml_error}")
except Exception as e:
raise yaml.YAMLError(f"Error writing a script to run BPS job {script}; threw {e}")
return StatusEnum.prepared

async def _check_slurm_job(
Expand Down
Empty file.
52 changes: 52 additions & 0 deletions src/lsst/cmservice/templates/bps_submit_yaml.j2
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
project: {{ project }}
campaign: {{ campaign }}
LSST_VERSION: {{ lsst_version }}
submitPath: {{ submit_path }}
{%- if custom_lsst_setup %}
{{ custom_lsst_setup }}
{%- endif %}
pipelineYaml: {{ pipeline_yaml }}
executionButler:
queue: SLAC_Rubin_Merge
requestMemory: 64000
command2: ''
command3: ''
{%- if include_configs | length > 0 %}
includeConfigs:
{%- for config in include_configs %}
- {{ config }}
{%- endfor %}
{%- endif %}
payload:
payloadName: {{ payload.name }}
butlerConfig: {{ payload.butler_config }}
inCollection: {{ payload.input_collection }}
outputRun: {{ payload.output_run_collection }}
{%- if payload.data_query %}
dataQuery: {{ payload.data_query | replace("\n", " ") | trim }}
{%- endif %}
{%- if payload.extra_args %}
extra_args: {{ payload.extra_args }}
{%- endif %}
pipetaskOutput: ''
{%- if extra_qgraph_options %}
extraQgraphOptions: {{ extra_qgraph_options | replace("\n", " ") | trim }}
{%- endif %}
{%- if extra_yaml_literals|length > 0 %}
{%- for yaml_literal in extra_yaml_literals %}
{{ yaml_literal }}
{%- endfor %}
{%- endif %}
{%- if compute_site == "usdf" %}
{%- if wms == "htcondor" %}
site:
s3df:
profile:
condor:
+Walltime: 7200
memoryMultiplier: 4.
numberOfRetries: 3
memoryLimit: 400000
wmsServiceClass: lsst.ctrl.bps.htcondor.HTCondorService
{%- endif %}
{%- endif %}

0 comments on commit 4ff6c31

Please sign in to comment.