diff --git a/.gitignore b/.gitignore index bcd37d5d..6795af6c 100644 --- a/.gitignore +++ b/.gitignore @@ -213,7 +213,7 @@ cython_debug/ .pypirc # Local Ignores -outputs/ +output/ prod_area/ build/ diff --git a/examples/example_hsc_micro.yaml b/examples/example_hsc_micro.yaml index 1419e6cb..889bb09d 100644 --- a/examples/example_hsc_micro.yaml +++ b/examples/example_hsc_micro.yaml @@ -24,9 +24,7 @@ split_min_groups: 2 data: butler_repo: '/repo/main' - prod_area: 'output/archive' data_query: "instrument = 'HSC' AND exposure in (30504, 30502) AND detector in (45, 46, 47, 48)" - lsst_version: w_2025_01 bps_wms_clustering_file: "${DRP_PIPE_DIR}/bps/clustering/HSC/DRP-RC2-clustering.yaml" bps_wms_resources_file: "${DRP_PIPE_DIR}/bps/resources/HSC/DRP-RC2.yaml" - Specification: diff --git a/src/lsst/cmservice/handlers/jobs.py b/src/lsst/cmservice/handlers/jobs.py index 92385e85..e9527b06 100644 --- a/src/lsst/cmservice/handlers/jobs.py +++ b/src/lsst/cmservice/handlers/jobs.py @@ -8,6 +8,7 @@ import yaml from anyio import Path from fastapi.concurrency import run_in_threadpool +from jinja2 import Environment, PackageLoader from sqlalchemy.ext.asyncio import async_scoped_session from lsst.ctrl.bps import BaseWmsService, WmsStates @@ -72,7 +73,7 @@ async def _write_script( pipeline_yaml = os.path.expandvars(data_dict["pipeline_yaml"]) run_coll = resolved_cols["run"] input_colls = resolved_cols["inputs"] - bps_core_yaml_template = data_dict["bps_core_yaml_template"] + # bps_core_yaml_template = data_dict["bps_core_yaml_template"] bps_core_script_template = data_dict["bps_core_script_template"] bps_wms_script_template = data_dict["bps_wms_script_template"] except KeyError as msg: @@ -85,10 +86,11 @@ async def _write_script( bps_wms_yaml_file = data_dict.get("bps_wms_yaml_file", None) bps_wms_clustering_file = data_dict.get("bps_wms_clustering_file", None) bps_wms_resources_file = data_dict.get("bps_wms_resources_file", None) - bps_wms_extra_files = data_dict.get("bps_wms_extra_files", []) - bps_extra_config = data_dict.get("bps_extra_config", None) data_query = data_dict.get("data_query", None) extra_qgraph_options = data_dict.get("extra_qgraph_options", None) + # FIXME refactor these "extras" or remove support + # bps_wms_extra_files = data_dict.get("bps_wms_extra_files", []) + # bps_extra_config = data_dict.get("bps_extra_config", None) # Get the output file paths script_url = await self._set_script_files(session, script, prod_area) @@ -103,17 +105,17 @@ async def _write_script( session, bps_core_script_template, ) - bps_core_yaml_template_ = await specification.get_script_template( - session, - bps_core_yaml_template, - ) bps_wms_script_template_ = await specification.get_script_template( session, bps_wms_script_template, ) + # Template rendering + # - config_url <- output of rendered bps_submit_yaml.j2 + config_path = await Path(config_url).resolve() submit_path = await Path(f"{prod_area}/{parent.fullname}/submit").resolve() + # Clean up any existing artifacts in the target submit_path try: await run_in_threadpool(shutil.rmtree, submit_path) except FileNotFoundError: @@ -129,66 +131,93 @@ async def _write_script( if custom_lsst_setup: # pragma: no cover prepend += f"\n{custom_lsst_setup}\n" prepend += bps_wms_script_template_.data["text"] # type: ignore - await write_bash_script(script_url, command, prepend=prepend) - workflow_config = bps_core_yaml_template_.data.copy() # type: ignore + # Collect values for and render bps submit yaml from template + await session.refresh(parent, attribute_names=["c_", "p_"]) + # FIXME at this point, how could the following path *not* exist? + # is this meant to be `config_url` instead? + await Path(script_url).parent.mkdir(parents=True, exist_ok=True) + # workflow_config becomes values dictionary to use while rendering a + # yaml template, NOT the yaml template itself! + workflow_config: dict[str, Any] = {} + workflow_config["project"] = parent.p_.name # type: ignore + workflow_config["campaign"] = parent.c_.name # type: ignore + workflow_config["submit_path"] = str(submit_path) + workflow_config["lsst_version"] = os.path.expandvars(lsst_version) + workflow_config["pipeline_yaml"] = pipeline_yaml + workflow_config["custom_lsst_setup"] = custom_lsst_setup + workflow_config["extra_qgraph_options"] = extra_qgraph_options + workflow_config["extra_yaml_literals"] = [] + + # //////// INCLUDE_CONFIGS REGION + # include_configs to be a iterator used in rendering the yaml template include_configs = [] for to_include_ in [bps_wms_yaml_file, bps_wms_clustering_file, bps_wms_resources_file]: - if to_include_: - # We want abspaths, but we need to be careful about - # envvars that are not yet expanded + if to_include_: # if it is what? to_include_ = os.path.expandvars(to_include_) - if "$" not in to_include_: - to_include_ = await Path(to_include_).resolve() - include_configs.append(str(to_include_)) - include_configs += bps_wms_extra_files - - workflow_config["includeConfigs"] = include_configs # type: ignore - - await session.refresh(parent, attribute_names=["c_", "p_"]) - workflow_config["project"] = parent.p_.name # type: ignore - workflow_config["campaign"] = parent.c_.name # type: ignore - - workflow_config["submitPath"] = str(submit_path) # type: ignore - - workflow_config["LSST_VERSION"] = os.path.expandvars(lsst_version) # type: ignore - if custom_lsst_setup: # pragma: no cover - workflow_config["custom_lsst_setup"] = custom_lsst_setup # type: ignore - workflow_config["pipelineYaml"] = pipeline_yaml # type: ignore - - if extra_qgraph_options: # pragma: no cover - workflow_config["extraQgraphOptions"] = extra_qgraph_options.replace("\n", " ").strip() # type: ignore - + # If the potential include file has an unexpanded env var, we + # delegate that expansion to the bps runtime, since it may + # refer to a stack env var we do not understand. + if "$" in to_include_: + include_configs.append(str(to_include_)) + continue + + # Otherwise, instead of including it we should render it out + # because it's a path we understand but the bps runtime won't + to_include_ = await Path(to_include_).resolve() + # async load the text of the file and if it is valid yaml + # append it to the extra_yaml_literals + try: + include_yaml_ = yaml.dump(yaml.safe_load(await to_include_.read_text())) + workflow_config["extra_yaml_literals"].append(include_yaml_) + except yaml.YAMLError: + # TODO log the error condition + raise + + # FIXME include this in the list of potential include files above + # include_configs += bps_wms_extra_files + workflow_config["include_configs"] = include_configs + # \\\\\\\\ INCLUDE_CONFIGS REGION + + # //////// CLUSTERING_CONFIG REGION + # \\\\\\\\ CLUSTERING_CONFIG REGION + + # //////// PAYLOAD REGION if isinstance(input_colls, list): # pragma: no cover in_collection = ",".join(input_colls) else: in_collection = input_colls payload = { - "payloadName": parent.c_.name, # type: ignore - "butlerConfig": butler_repo, - "outputRun": run_coll, - "inCollection": in_collection, + "name": parent.c_.name, # type: ignore + "butler_config": butler_repo, + "output_run_collection": run_coll, + "input_collection": in_collection, } if data_query: - payload["dataQuery"] = data_query.replace("\n", " ").strip() + payload["data_query"] = data_query if rescue: # pragma: no cover payload["extra_args"] = f"--skip-existing-in {skip_colls}" - workflow_config["payload"] = payload # type: ignore + workflow_config["payload"] = payload + # \\\\\\\\\\ PAYLOAD REGION - if bps_extra_config: # pragma: no cover - workflow_config.update(**bps_extra_config) # type: ignore - - await Path(script_url).parent.mkdir(parents=True, exist_ok=True) + # FIXME what would this contain? Is it supposed to be dumpable as raw + # yaml and included in the file? + # if bps_extra_config: # pragma: no cover + # workflow_config.update(**bps_extra_config) + # Get the yaml template using package lookup + config_template_environment = Environment(loader=PackageLoader("lsst.cmservice")) + config_template = config_template_environment.get_template("bps_submit_yaml.j2") try: - yaml_output = yaml.dump(workflow_config) + # Render bps_submit_yaml template to `config_url` + yaml_output = config_template.render(workflow_config) await Path(config_url).write_text(yaml_output) - except yaml.YAMLError as yaml_error: - raise yaml.YAMLError(f"Error writing a script to run BPS job {script}; threw {yaml_error}") + except Exception as e: + raise yaml.YAMLError(f"Error writing a script to run BPS job {script}; threw {e}") return StatusEnum.prepared async def _check_slurm_job( diff --git a/src/lsst/cmservice/templates/__init__.py b/src/lsst/cmservice/templates/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/lsst/cmservice/templates/bps_submit_yaml.j2 b/src/lsst/cmservice/templates/bps_submit_yaml.j2 new file mode 100644 index 00000000..ca9bfd48 --- /dev/null +++ b/src/lsst/cmservice/templates/bps_submit_yaml.j2 @@ -0,0 +1,52 @@ +project: {{ project }} +campaign: {{ campaign }} +LSST_VERSION: {{ lsst_version }} +submitPath: {{ submit_path }} +{%- if custom_lsst_setup %} +{{ custom_lsst_setup }} +{%- endif %} +pipelineYaml: {{ pipeline_yaml }} +executionButler: + queue: SLAC_Rubin_Merge + requestMemory: 64000 + command2: '' + command3: '' +{%- if include_configs | length > 0 %} +includeConfigs: +{%- for config in include_configs %} +- {{ config }} +{%- endfor %} +{%- endif %} +payload: + payloadName: {{ payload.name }} + butlerConfig: {{ payload.butler_config }} + inCollection: {{ payload.input_collection }} + outputRun: {{ payload.output_run_collection }} + {%- if payload.data_query %} + dataQuery: {{ payload.data_query | replace("\n", " ") | trim }} + {%- endif %} + {%- if payload.extra_args %} + extra_args: {{ payload.extra_args }} + {%- endif %} +pipetaskOutput: '' +{%- if extra_qgraph_options %} +extraQgraphOptions: {{ extra_qgraph_options | replace("\n", " ") | trim }} +{%- endif %} +{%- if extra_yaml_literals|length > 0 %} +{%- for yaml_literal in extra_yaml_literals %} +{{ yaml_literal }} +{%- endfor %} +{%- endif %} +{%- if compute_site == "usdf" %} +{%- if wms == "htcondor" %} +site: + s3df: + profile: + condor: + +Walltime: 7200 +memoryMultiplier: 4. +numberOfRetries: 3 +memoryLimit: 400000 +wmsServiceClass: lsst.ctrl.bps.htcondor.HTCondorService +{%- endif %} +{%- endif %}