Skip to content

Commit

Permalink
Merge pull request #20 from BCDA-APS/19-hoist-DM-from-XPCS
Browse files Browse the repository at this point in the history
hoist DM plan stubs from XPCS and make generic
  • Loading branch information
prjemian authored Nov 20, 2024
2 parents 4b8e9d2 + 319c494 commit be01d54
Show file tree
Hide file tree
Showing 7 changed files with 221 additions and 2 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,7 @@ see the [Sphinx](https://www.sphinx-doc.org/) documentation.
The QS host process writes files into the `qserver/` directory. This directory can be
relocated. However, it should not be moved into the instrument package since
that might be installed into a read-only directory.
## How-To Guides
- [APS Data Management Plans](./docs/source/guides/dm.md)
12 changes: 10 additions & 2 deletions docs/source/api/plans.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,13 @@
``instrument.plans``
====================

Add Python modules here that describe your instrument's custom measurement
procedures
Python modules that describe your instrument's custom measurement procedures.

.. autosummary::
:nosignatures:

~instrument.plans.dm_plans
~instrument.plans.sim_plans

.. automodule:: instrument.plans.dm_plans
.. automodule:: instrument.plans.sim_plans
85 changes: 85 additions & 0 deletions docs/source/guides/dm.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# Guide: APS Data Management Plans

Provides a few examples of the plans that interact with APS Data Management (DM)
tools.

## Required

The DM tools rely on the existence of a set of environment variables that define various aspects of the DM system.

## Show any DM jobs still processing

Use the `dm_list_processing_jobs()` plan stub to show DM any workflow jobs that
are still running or pending. These are installed by calling
`aps_dm_setup(DM_SETUP_SCRIPT)` in each session before you call any other DM
code.

Here, `DM_SETUP_SCRIPT` is the full path to the bash setup shell script provided
by DM for this account. The exact path can be different for some installations.
If unsure, contact the APS DM team for advice.

Note: `aps_dm_setup` is not a bluesky plan stub. Call it as a standard Python
function.

Here's an example:

```py
from instrument.utils.aps_functions import aps_dm_setup

aps_dm_setup("/home/dm/etc/dm.setup.sh")
```

## Start a new workflow job

The `dm_kickoff_workflow()` plan can be used to start a DM workflow job. See
the source code for additional options (such as how often to report progress and
how to wait for the workflow to finish before the bluesky plan proceeds).

```py
from instrument.plans.dm_plans import dm_kickoff_workflow

# Use the run with `uid` from the catalog `cat`.
run = cat[uid]

# Create the dictionary of arguments for the chosen workflow.
argsDict = {
"filePath": "path/to/data/file.mda", # example
"experimentName": "testing-2024-11", # example
"workflowName": "processing", # existing workflow name
# ... any other items required by the workflow
}

# Start the workflow job from the command line:
RE(dm_kickoff_workflow(run, argsDict))
```

In a plan, replace the call to `RE(...)` with `yield from ...`, such as:

```py
def a_plan():
# earlier steps
yield from dm_kickoff_workflow(run, argsDict)
# later steps
```

## Start a new workflow job (Low-level)

If the `dm_kickoff_workflow()` plan stub does more than you want, you might consider the `dm_submit_workflow_job()`
plan stub. The `dm_submit_workflow_job()` plan stub is
a thin wrapper around DM's `startProcessingJob()` function.
The plan stub converts this DM function into a bluesky plan, and reports the DM workflow job `id` once the job has been submitted.

As above, you'll need the `workflowName` and the `argsDict`.

From the command line: `RE(dm_submit_workflow_job(workflowName, argsDict))`

In a plan: `yield from dm_submit_workflow_job(workflowName, argsDict)`

## References

The `apstools`
[package](https://bcda-aps.github.io/apstools/latest/api/_utils.html#aps-data-management)
has more support to integrate various capabilities of the DM tools.

For more information about DM, see its [API
Reference](https://git.aps.anl.gov/DM/dm-docs/-/wikis/DM/Beamline-Services/API-Reference).
8 changes: 8 additions & 0 deletions docs/source/guides/index.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# Guides, how-tos, ...

Guides show how to use certain features of this instrument.

.. toctree::
:maxdepth: 2

dm
1 change: 1 addition & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ console, a Jupyter notebook, the queueserver, or even a Python script:

demo
sessions
guides/index
install
logging_config
api/index
Expand Down
3 changes: 3 additions & 0 deletions src/instrument/plans/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
"""Bluesky plans."""

from .dm_plans import dm_kickoff_workflow # noqa: F401
from .dm_plans import dm_list_processing_jobs # noqa: F401
from .dm_plans import dm_submit_workflow_job # noqa: F401
from .sim_plans import sim_count_plan # noqa: F401
from .sim_plans import sim_print_plan # noqa: F401
from .sim_plans import sim_rel_scan_plan # noqa: F401
110 changes: 110 additions & 0 deletions src/instrument/plans/dm_plans.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
"""
Plans in support of APS Data Management.
.. autosummary::
~dm_kickoff_workflow
~dm_list_processing_jobs
~dm_submit_workflow_job
"""

import logging

from apstools.devices import DM_WorkflowConnector
from apstools.utils import dm_api_proc
from apstools.utils import share_bluesky_metadata_with_dm
from bluesky import plan_stubs as bps

logger = logging.getLogger(__name__)
logger.bsdev(__file__)


def dm_kickoff_workflow(run, argsDict, timeout=None, wait=False):
"""
Start a DM workflow for this bluesky run and share run's metadata with DM.
PARAMETERS:
run (*obj*): Bluesky run object (such as 'run = cat[uid]').
argsDict (*dict*): Dictionary of parameters needed by 'workflowName'.
At minimum, most workflows expect these keys: 'filePath' and
'experimentName'. Consult the workflow for the expected
content of 'argsDict'.
timeout (*number*): When should bluesky stop reporting on this
DM workflow job (if it has not ended). Units are seconds.
Default is forever.
wait (*bool*): Should this plan stub wait for the job to end?
Default is 'False'.
"""
dm_workflow = DM_WorkflowConnector(name="dm_workflow")

if timeout is None:
# Disable periodic reports, use a long time (s).
timeout = 999_999_999_999

yield from bps.mv(dm_workflow.concise_reporting, True)
yield from bps.mv(dm_workflow.reporting_period, timeout)

workflow_name = argsDict.pop["workflowName"]
yield from dm_workflow.run_as_plan(
workflow=workflow_name,
wait=wait,
timeout=timeout,
**argsDict,
)

# Upload bluesky run metadata to APS DM.
share_bluesky_metadata_with_dm(argsDict["experimentName"], workflow_name, run)

# Users requested the DM workflow job ID be printed to the console.
dm_workflow._update_processing_data()
job_id = dm_workflow.job_id.get()
job_stage = dm_workflow.stage_id.get()
job_status = dm_workflow.status.get()
print(f"DM workflow id: {job_id!r} status: {job_status} stage: {job_stage}")


def dm_list_processing_jobs(exclude=None):
"""
Show all the DM jobs with status not excluded.
Excluded status (default): 'done', 'failed'
"""
yield from bps.null() # make this a plan stub
api = dm_api_proc()
if exclude is None:
exclude = ("done", "failed")

for j in api.listProcessingJobs():
if j["status"] not in exclude:
print(
f"id={j['id']!r}"
f" submitted={j.get('submissionTimestamp')}"
f" status={j['status']!r}"
)


def dm_submit_workflow_job(workflowName, argsDict):
"""
Low-level plan stub to submit a job to a DM workflow.
It is recommended to use dm_kickoff_workflow() instead.
This plan does not share run metadata with DM.
PARAMETERS:
workflowName (*str*): Name of the DM workflow to be run.
argsDict (*dict*): Dictionary of parameters needed by 'workflowName'.
At minimum, most workflows expect these keys: 'filePath' and
'experimentName'. Consult the workflow for the expected
content of 'argsDict'.
"""
yield from bps.null() # make this a plan stub
api = dm_api_proc()

job = api.startProcessingJob(api.username, workflowName, argsDict)
print(f"workflow={workflowName!r} id={job['id']!r}")

0 comments on commit be01d54

Please sign in to comment.