From 4ff6c317beba7e760508b2346450818ffb9a62a8 Mon Sep 17 00:00:00 2001
From: Toby Jennings <tobyj@slac.stanford.edu>
Date: Sat, 25 Jan 2025 17:00:51 -0600
Subject: [PATCH] feat: refactor bps yaml as jinja template

---
 .gitignore                                    |   2 +-
 examples/example_hsc_micro.yaml               |   2 -
 src/lsst/cmservice/handlers/jobs.py           | 121 +++++++++++-------
 src/lsst/cmservice/templates/__init__.py      |   0
 .../cmservice/templates/bps_submit_yaml.j2    |  52 ++++++++
 5 files changed, 128 insertions(+), 49 deletions(-)
 create mode 100644 src/lsst/cmservice/templates/__init__.py
 create mode 100644 src/lsst/cmservice/templates/bps_submit_yaml.j2

diff --git a/.gitignore b/.gitignore
index bcd37d5d6..6795af6cc 100644
--- a/.gitignore
+++ b/.gitignore
@@ -213,7 +213,7 @@ cython_debug/
 .pypirc
 
 # Local Ignores
-outputs/
+output/
 prod_area/
 build/
 
diff --git a/examples/example_hsc_micro.yaml b/examples/example_hsc_micro.yaml
index 1419e6cbd..889bb09de 100644
--- a/examples/example_hsc_micro.yaml
+++ b/examples/example_hsc_micro.yaml
@@ -24,9 +24,7 @@
                     split_min_groups: 2
       data:
           butler_repo: '/repo/main'
-          prod_area: 'output/archive'
           data_query: "instrument = 'HSC' AND exposure in (30504, 30502) AND detector in (45, 46, 47, 48)"
-          lsst_version: w_2025_01
           bps_wms_clustering_file: "${DRP_PIPE_DIR}/bps/clustering/HSC/DRP-RC2-clustering.yaml"
           bps_wms_resources_file: "${DRP_PIPE_DIR}/bps/resources/HSC/DRP-RC2.yaml"
 - Specification:
diff --git a/src/lsst/cmservice/handlers/jobs.py b/src/lsst/cmservice/handlers/jobs.py
index 92385e85b..e9527b06c 100644
--- a/src/lsst/cmservice/handlers/jobs.py
+++ b/src/lsst/cmservice/handlers/jobs.py
@@ -8,6 +8,7 @@
 import yaml
 from anyio import Path
 from fastapi.concurrency import run_in_threadpool
+from jinja2 import Environment, PackageLoader
 from sqlalchemy.ext.asyncio import async_scoped_session
 
 from lsst.ctrl.bps import BaseWmsService, WmsStates
@@ -72,7 +73,7 @@ async def _write_script(
             pipeline_yaml = os.path.expandvars(data_dict["pipeline_yaml"])
             run_coll = resolved_cols["run"]
             input_colls = resolved_cols["inputs"]
-            bps_core_yaml_template = data_dict["bps_core_yaml_template"]
+            # bps_core_yaml_template = data_dict["bps_core_yaml_template"]
             bps_core_script_template = data_dict["bps_core_script_template"]
             bps_wms_script_template = data_dict["bps_wms_script_template"]
         except KeyError as msg:
@@ -85,10 +86,11 @@ async def _write_script(
         bps_wms_yaml_file = data_dict.get("bps_wms_yaml_file", None)
         bps_wms_clustering_file = data_dict.get("bps_wms_clustering_file", None)
         bps_wms_resources_file = data_dict.get("bps_wms_resources_file", None)
-        bps_wms_extra_files = data_dict.get("bps_wms_extra_files", [])
-        bps_extra_config = data_dict.get("bps_extra_config", None)
         data_query = data_dict.get("data_query", None)
         extra_qgraph_options = data_dict.get("extra_qgraph_options", None)
+        # FIXME refactor these "extras" or remove support
+        # bps_wms_extra_files = data_dict.get("bps_wms_extra_files", [])
+        # bps_extra_config = data_dict.get("bps_extra_config", None)
 
         # Get the output file paths
         script_url = await self._set_script_files(session, script, prod_area)
@@ -103,17 +105,17 @@ async def _write_script(
             session,
             bps_core_script_template,
         )
-        bps_core_yaml_template_ = await specification.get_script_template(
-            session,
-            bps_core_yaml_template,
-        )
         bps_wms_script_template_ = await specification.get_script_template(
             session,
             bps_wms_script_template,
         )
 
+        # Template rendering
+        # - config_url <- output of rendered bps_submit_yaml.j2
+
         config_path = await Path(config_url).resolve()
         submit_path = await Path(f"{prod_area}/{parent.fullname}/submit").resolve()
+        # Clean up any existing artifacts in the target submit_path
         try:
             await run_in_threadpool(shutil.rmtree, submit_path)
         except FileNotFoundError:
@@ -129,66 +131,93 @@ async def _write_script(
         if custom_lsst_setup:  # pragma: no cover
             prepend += f"\n{custom_lsst_setup}\n"
         prepend += bps_wms_script_template_.data["text"]  # type: ignore
-
         await write_bash_script(script_url, command, prepend=prepend)
 
-        workflow_config = bps_core_yaml_template_.data.copy()  # type: ignore
+        # Collect values for and render bps submit yaml from template
+        await session.refresh(parent, attribute_names=["c_", "p_"])
+        # FIXME at this point, how could the following path *not* exist?
+        #       is this meant to be `config_url` instead?
+        await Path(script_url).parent.mkdir(parents=True, exist_ok=True)
 
+        # workflow_config becomes values dictionary to use while rendering a
+        # yaml template, NOT the yaml template itself!
+        workflow_config: dict[str, Any] = {}
+        workflow_config["project"] = parent.p_.name  # type: ignore
+        workflow_config["campaign"] = parent.c_.name  # type: ignore
+        workflow_config["submit_path"] = str(submit_path)
+        workflow_config["lsst_version"] = os.path.expandvars(lsst_version)
+        workflow_config["pipeline_yaml"] = pipeline_yaml
+        workflow_config["custom_lsst_setup"] = custom_lsst_setup
+        workflow_config["extra_qgraph_options"] = extra_qgraph_options
+        workflow_config["extra_yaml_literals"] = []
+
+        # //////// INCLUDE_CONFIGS REGION
+        # include_configs to be a iterator used in rendering the yaml template
         include_configs = []
         for to_include_ in [bps_wms_yaml_file, bps_wms_clustering_file, bps_wms_resources_file]:
-            if to_include_:
-                # We want abspaths, but we need to be careful about
-                # envvars that are not yet expanded
+            if to_include_:  # if it is what?
                 to_include_ = os.path.expandvars(to_include_)
-                if "$" not in to_include_:
-                    to_include_ = await Path(to_include_).resolve()
-                include_configs.append(str(to_include_))
-        include_configs += bps_wms_extra_files
-
-        workflow_config["includeConfigs"] = include_configs  # type: ignore
-
-        await session.refresh(parent, attribute_names=["c_", "p_"])
-        workflow_config["project"] = parent.p_.name  # type: ignore
-        workflow_config["campaign"] = parent.c_.name  # type: ignore
-
-        workflow_config["submitPath"] = str(submit_path)  # type: ignore
-
-        workflow_config["LSST_VERSION"] = os.path.expandvars(lsst_version)  # type: ignore
-        if custom_lsst_setup:  # pragma: no cover
-            workflow_config["custom_lsst_setup"] = custom_lsst_setup  # type: ignore
-        workflow_config["pipelineYaml"] = pipeline_yaml  # type: ignore
-
-        if extra_qgraph_options:  # pragma: no cover
-            workflow_config["extraQgraphOptions"] = extra_qgraph_options.replace("\n", " ").strip()  # type: ignore
-
+                # If the potential include file has an unexpanded env var, we
+                # delegate that expansion to the bps runtime, since it may
+                # refer to a stack env var we do not understand.
+                if "$" in to_include_:
+                    include_configs.append(str(to_include_))
+                    continue
+
+                # Otherwise, instead of including it we should render it out
+                # because it's a path we understand but the bps runtime won't
+                to_include_ = await Path(to_include_).resolve()
+                # async load the text of the file and if it is valid yaml
+                # append it to the extra_yaml_literals
+                try:
+                    include_yaml_ = yaml.dump(yaml.safe_load(await to_include_.read_text()))
+                    workflow_config["extra_yaml_literals"].append(include_yaml_)
+                except yaml.YAMLError:
+                    # TODO log the error condition
+                    raise
+
+        # FIXME include this in the list of potential include files above
+        # include_configs += bps_wms_extra_files
+        workflow_config["include_configs"] = include_configs
+        # \\\\\\\\ INCLUDE_CONFIGS REGION
+
+        # //////// CLUSTERING_CONFIG REGION
+        # \\\\\\\\ CLUSTERING_CONFIG REGION
+
+        # //////// PAYLOAD REGION
         if isinstance(input_colls, list):  # pragma: no cover
             in_collection = ",".join(input_colls)
         else:
             in_collection = input_colls
 
         payload = {
-            "payloadName": parent.c_.name,  # type: ignore
-            "butlerConfig": butler_repo,
-            "outputRun": run_coll,
-            "inCollection": in_collection,
+            "name": parent.c_.name,  # type: ignore
+            "butler_config": butler_repo,
+            "output_run_collection": run_coll,
+            "input_collection": in_collection,
         }
         if data_query:
-            payload["dataQuery"] = data_query.replace("\n", " ").strip()
+            payload["data_query"] = data_query
         if rescue:  # pragma: no cover
             payload["extra_args"] = f"--skip-existing-in {skip_colls}"
 
-        workflow_config["payload"] = payload  # type: ignore
+        workflow_config["payload"] = payload
+        # \\\\\\\\\\ PAYLOAD REGION
 
-        if bps_extra_config:  # pragma: no cover
-            workflow_config.update(**bps_extra_config)  # type: ignore
-
-        await Path(script_url).parent.mkdir(parents=True, exist_ok=True)
+        # FIXME what would this contain? Is it supposed to be dumpable as raw
+        #       yaml and included in the file?
+        # if bps_extra_config:  # pragma: no cover
+        #     workflow_config.update(**bps_extra_config)
 
+        # Get the yaml template using package lookup
+        config_template_environment = Environment(loader=PackageLoader("lsst.cmservice"))
+        config_template = config_template_environment.get_template("bps_submit_yaml.j2")
         try:
-            yaml_output = yaml.dump(workflow_config)
+            # Render bps_submit_yaml template to `config_url`
+            yaml_output = config_template.render(workflow_config)
             await Path(config_url).write_text(yaml_output)
-        except yaml.YAMLError as yaml_error:
-            raise yaml.YAMLError(f"Error writing a script to run BPS job {script}; threw {yaml_error}")
+        except Exception as e:
+            raise yaml.YAMLError(f"Error writing a script to run BPS job {script}; threw {e}")
         return StatusEnum.prepared
 
     async def _check_slurm_job(
diff --git a/src/lsst/cmservice/templates/__init__.py b/src/lsst/cmservice/templates/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/src/lsst/cmservice/templates/bps_submit_yaml.j2 b/src/lsst/cmservice/templates/bps_submit_yaml.j2
new file mode 100644
index 000000000..ca9bfd483
--- /dev/null
+++ b/src/lsst/cmservice/templates/bps_submit_yaml.j2
@@ -0,0 +1,52 @@
+project: {{ project }}
+campaign: {{ campaign }}
+LSST_VERSION: {{ lsst_version }}
+submitPath: {{ submit_path }}
+{%- if custom_lsst_setup %}
+{{ custom_lsst_setup }}
+{%- endif %}
+pipelineYaml: {{ pipeline_yaml }}
+executionButler:
+  queue: SLAC_Rubin_Merge
+  requestMemory: 64000
+  command2: ''
+  command3: ''
+{%- if include_configs | length > 0 %}
+includeConfigs:
+{%- for config in include_configs %}
+- {{ config }}
+{%- endfor %}
+{%- endif %}
+payload:
+  payloadName: {{ payload.name }}
+  butlerConfig: {{ payload.butler_config }}
+  inCollection: {{ payload.input_collection }}
+  outputRun: {{ payload.output_run_collection }}
+  {%- if payload.data_query %}
+  dataQuery: {{ payload.data_query | replace("\n", " ") | trim }}
+  {%- endif %}
+  {%- if payload.extra_args %}
+  extra_args: {{ payload.extra_args }}
+  {%- endif %}
+pipetaskOutput: ''
+{%- if extra_qgraph_options %}
+extraQgraphOptions: {{ extra_qgraph_options | replace("\n", " ") | trim }}
+{%- endif %}
+{%- if extra_yaml_literals|length > 0 %}
+{%- for yaml_literal in extra_yaml_literals %}
+{{ yaml_literal }}
+{%- endfor %}
+{%- endif %}
+{%- if compute_site == "usdf" %}
+{%- if wms == "htcondor" %}
+site:
+  s3df:
+    profile:
+      condor:
+        +Walltime: 7200
+memoryMultiplier: 4.
+numberOfRetries: 3
+memoryLimit: 400000
+wmsServiceClass: lsst.ctrl.bps.htcondor.HTCondorService
+{%- endif %}
+{%- endif %}