diff --git a/docs/user-guide/loki/streaming.ipynb b/docs/user-guide/loki/streaming.ipynb new file mode 100644 index 00000000..4b740cce --- /dev/null +++ b/docs/user-guide/loki/streaming.ipynb @@ -0,0 +1,547 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "50711a44", + "metadata": {}, + "source": [ + "# Streamed processing with Loki workflow" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8c7f7cf7-0582-4953-a772-a0f87d1cf0e2", + "metadata": { + "tags": [] + }, + "outputs": [], + "source": [ + "import scipp as sc\n", + "from ess import sans\n", + "from ess import loki\n", + "import ess.loki.data # noqa: F401\n", + "from ess.sans.types import *" + ] + }, + { + "cell_type": "markdown", + "id": "446d14b9", + "metadata": {}, + "source": [ + "## Create and configure the workflow\n", + "\n", + "We begin by creating the Loki workflow object (this is a [sciline.Pipeline](https://scipp.github.io/sciline/generated/classes/sciline.Pipeline.html) which can be consulted for advanced usage).\n", + "The files we use here come from a Loki detector test at Larmor, so we use the corresponding workflow:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "63ceda7f", + "metadata": {}, + "outputs": [], + "source": [ + "workflow = loki.LokiAtLarmorWorkflow()" + ] + }, + { + "cell_type": "markdown", + "id": "d8fe14da", + "metadata": {}, + "source": [ + "We configure the workflow be defining the series of masks filenames and bank names to reduce.\n", + "In this case there is just a single bank:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b8f42605", + "metadata": {}, + "outputs": [], + "source": [ + "workflow = sans.with_pixel_mask_filenames(\n", + " workflow, masks=loki.data.loki_tutorial_mask_filenames()\n", + ")\n", + "workflow[NeXusDetectorName] = 'larmor_detector'" + ] + }, + { + "cell_type": "markdown", + "id": "eb90e677", + "metadata": {}, + "source": [ + "The workflow can be visualized as a graph.\n", + "For readability we show only sub-workflow for computing `IofQ[Sample]`.\n", + "The workflow can actually compute the full `BackgroundSubtractedIofQ`, which applies and equivalent workflow to the background run, before a subtraction step:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "dc988a34", + "metadata": {}, + "outputs": [], + "source": [ + "workflow.visualize(IofQ[SampleRun], compact=True, graph_attr={'rankdir': 'LR'})\n", + "wf = workflow.copy()\n", + "wf[TransmissionFraction[SampleRun]] = None\n", + "wf.visualize(IofQ[SampleRun], compact=True, graph_attr={'rankdir': 'LR'})" + ] + }, + { + "cell_type": "markdown", + "id": "abde8696", + "metadata": {}, + "source": [ + "Note the red boxes which indicate missing input parameters.\n", + "We can set these missing parameters, as well as parameters where we do not want to use the defaults:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ce2841f0-bd9e-43c3-8cc5-52bb45f392ad", + "metadata": {}, + "outputs": [], + "source": [ + "# Wavelength binning parameters\n", + "wavelength_min = sc.scalar(1.0, unit='angstrom')\n", + "wavelength_max = sc.scalar(13.0, unit='angstrom')\n", + "n_wavelength_bins = 50\n", + "n_wavelength_bands = 50\n", + "\n", + "workflow[WavelengthBins] = sc.linspace(\n", + " 'wavelength', wavelength_min, wavelength_max, n_wavelength_bins + 1\n", + ")\n", + "workflow[WavelengthBands] = None\n", + "\n", + "\n", + "workflow[CorrectForGravity] = True\n", + "workflow[UncertaintyBroadcastMode] = UncertaintyBroadcastMode.upper_bound\n", + "workflow[ReturnEvents] = False\n", + "\n", + "workflow[QBins] = sc.linspace(dim='Q', start=0.01, stop=0.3, num=101, unit='1/angstrom')\n", + "workflow[DirectBeam] = None" + ] + }, + { + "cell_type": "markdown", + "id": "11e6d962", + "metadata": {}, + "source": [ + "## Configuring data to load\n", + "\n", + "We have not configured which files we want to load.\n", + "In this tutorial, we use helpers to fetch the tutorial data which return the filenames of the cached files.\n", + "In a real use case, you would set these parameters manually:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "153ae0ca", + "metadata": {}, + "outputs": [], + "source": [ + "workflow[Filename[SampleRun]] = loki.data.loki_tutorial_sample_run_60339()\n", + "workflow[Filename[SampleRun]] = loki.data.loki_tutorial_sample_run_60250()\n", + "workflow[Filename[BackgroundRun]] = loki.data.loki_tutorial_background_run_60393()\n", + "workflow[Filename[TransmissionRun[SampleRun]]] = (\n", + " loki.data.loki_tutorial_sample_transmission_run()\n", + ")\n", + "workflow[Filename[TransmissionRun[BackgroundRun]]] = loki.data.loki_tutorial_run_60392()\n", + "workflow[Filename[EmptyBeamRun]] = loki.data.loki_tutorial_run_60392()" + ] + }, + { + "cell_type": "markdown", + "id": "632e3c82-89d6-4899-b1c2-f9a318899c33", + "metadata": {}, + "source": [ + "## Finding the beam center\n", + "\n", + "Looking carefully at the workflow above,\n", + "one will notice that there is a missing parameter from the workflow: the red box that contains the `BeamCenter` type.\n", + "Before we can proceed with computing the direct beam function,\n", + "we therefore have to first determine the center of the beam.\n", + "\n", + "There are more details on how this is done in the [beam center finder notebook](../common/beam-center-finder.ipynb),\n", + "but for now we simply reuse the workflow (by making a copy),\n", + "and inserting the provider that will compute the beam center.\n", + "\n", + "For now, we compute the beam center only for the rear detector (named 'larmor_detector') but apply it to all banks (currently there is only one bank).\n", + "The beam center may need to be computed or applied differently to each bank, see [scipp/esssans#28](https://github.com/scipp/esssans/issues/28).\n", + "We use a center-of-mass approach to find the beam center:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6a17ac7c-de17-49c9-a02c-ca078eb7f023", + "metadata": {}, + "outputs": [], + "source": [ + "center = sans.beam_center_from_center_of_mass(workflow)\n", + "center" + ] + }, + { + "cell_type": "markdown", + "id": "0814805a-9611-4951-a015-c12ae254b099", + "metadata": {}, + "source": [ + "and set that value in our workflow" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e79181de-1de8-43a0-a934-e9407ebcd740", + "metadata": {}, + "outputs": [], + "source": [ + "workflow[BeamCenter] = center" + ] + }, + { + "cell_type": "markdown", + "id": "07d98acb", + "metadata": {}, + "source": [ + "## Fake streaming workflow" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "91d60046", + "metadata": {}, + "outputs": [], + "source": [ + "from ess.reduce.nexus.generic_types import NeXusDetectorEventData, NeXusMonitorEventData\n", + "\n", + "det_events = workflow.compute(NeXusDetectorEventData[SampleRun])\n", + "mon_events = workflow.compute(NeXusMonitorEventData[SampleRun, Incident])\n", + "det_events" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "10f77397", + "metadata": {}, + "outputs": [], + "source": [ + "mon_events" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ca3ce55f", + "metadata": {}, + "outputs": [], + "source": [ + "%matplotlib widget\n", + "dummy = sc.DataArray(\n", + " sc.zeros(dims=('Q',), shape=(100,), with_variances=True),\n", + " coords={'Q': sc.linspace('Q', 0.0, 1.0, 101, unit='1/angstrom')},\n", + ")\n", + "fig = dummy.plot(norm='log', scale={'Q': 'log'}, vmin=0.1, vmax=5)\n", + "fig.canvas.ylabel = '$I(Q)$ ' + fig.canvas.ylabel\n", + "artist = next(iter(fig.artists))\n", + "display(fig)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "ec9581bd", + "metadata": {}, + "outputs": [], + "source": [ + "from ess.reduce import streaming\n", + "\n", + "streaming_wf = streaming.StreamProcessor(\n", + " base_workflow=workflow,\n", + " dynamic_keys=(\n", + " NeXusMonitorEventData[SampleRun, Incident],\n", + " NeXusDetectorEventData[SampleRun],\n", + " ),\n", + " accumulators={\n", + " ReducedQ[SampleRun, Numerator]: streaming.RollingAccumulator(window=5),\n", + " ReducedQ[SampleRun, Denominator]: streaming.RollingAccumulator(window=5),\n", + " },\n", + " target_keys=(IofQ[SampleRun],),\n", + ")\n", + "\n", + "\n", + "det_stride = 60\n", + "mon_stride = 1\n", + "for i in range(100):\n", + " det_chunk = det_events[det_stride * i : det_stride * (i + 1)].copy()\n", + " mon_chunk = mon_events[mon_stride * i : mon_stride * (i + 1)].copy()\n", + " if 20 < i < 30:\n", + " det_chunk *= 0.0\n", + " results = streaming_wf.add_chunk(\n", + " {\n", + " NeXusDetectorEventData[SampleRun]: det_chunk,\n", + " NeXusMonitorEventData[SampleRun, Incident]: mon_chunk,\n", + " }\n", + " )\n", + " fig.update({artist: results[IofQ[SampleRun]]})\n", + " fig.fig.canvas.draw()\n", + " fig.fig.canvas.flush_events()" + ] + }, + { + "cell_type": "markdown", + "id": "cf7742c1", + "metadata": {}, + "source": [ + "## Select range with slider" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "af6b20b1", + "metadata": {}, + "outputs": [], + "source": [ + "from ess.reduce import streaming\n", + "from ess.reduce.nexus.generic_types import PulseSelection\n", + "from ess.reduce.nexus.types import PreopenNeXusFile\n", + "from ess.reduce.nexus import generic_types as gt\n", + "\n", + "\n", + "def hack_monitor_events_by_name(\n", + " filename: gt.NeXusFileSpec[gt.RunType],\n", + " name: gt.NeXusMonitorName[gt.MonitorType],\n", + " selection: PulseSelection[gt.RunType],\n", + ") -> gt.NeXusMonitorEventLocationSpec[gt.RunType, gt.MonitorType]:\n", + " scaled = selection.value\n", + " if isinstance(scaled, slice) and scaled.start is not None:\n", + " fudge = 60 # file has 60x fewer monitor pulses than det pulses\n", + " fudge = 5\n", + " start = int(scaled.start // fudge) % 100\n", + " stop = int(scaled.stop // fudge) % 100\n", + " scaled = slice(start, stop)\n", + " return gt.NeXusMonitorEventLocationSpec[gt.RunType, gt.MonitorType](\n", + " filename=filename.value,\n", + " component_name=name,\n", + " selection={'event_time_zero': scaled},\n", + " )\n", + "\n", + "\n", + "base_workflow = workflow.copy()\n", + "base_workflow[PreopenNeXusFile] = True\n", + "base_workflow.insert(hack_monitor_events_by_name)\n", + "\n", + "\n", + "streaming_wf = streaming.StreamProcessor(\n", + " base_workflow=base_workflow,\n", + " dynamic_keys=(PulseSelection[SampleRun],),\n", + " accumulators=(),\n", + " # target_keys=(WavelengthMonitor[SampleRun, Incident],),\n", + " target_keys=(IofQ[SampleRun],),\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6900288e", + "metadata": {}, + "outputs": [], + "source": [ + "%matplotlib widget\n", + "import plopp\n", + "import ipywidgets as widgets\n", + "from time import time\n", + "\n", + "pulse_sel_slider = widgets.IntSlider(\n", + " value=3,\n", + " min=0,\n", + " max=6000,\n", + " step=100,\n", + " description='First pulse:',\n", + " continuous_update=True,\n", + ")\n", + "pulse_width_slider = widgets.IntSlider(\n", + " value=60,\n", + " # min=60, # cannot go lower since this is monitor granularity\n", + " min=5,\n", + " max=200,\n", + " step=1,\n", + " description='Pulse count:',\n", + " continuous_update=True,\n", + ")\n", + "start_node = plopp.widget_node(pulse_sel_slider)\n", + "width_node = plopp.widget_node(pulse_width_slider)\n", + "\n", + "wav = sc.linspace('wavelength', 1.0, 13.0, 100, unit='angstrom')\n", + "\n", + "\n", + "@plopp.node\n", + "def update(start, width):\n", + " end = start + width\n", + " sel = PulseSelection(slice(start, end))\n", + " start = time()\n", + " results = streaming_wf.add_chunk({PulseSelection[SampleRun]: sel})\n", + " # print(f'Processed in {time() - start:.2f} s')\n", + " return next(iter(results.values()))\n", + " mon = results[WavelengthMonitor[SampleRun, Incident]]\n", + " return mon.hist(wavelength=wav)\n", + "\n", + "\n", + "update_node = update(start=start_node, width=width_node)\n", + "\n", + "f = plopp.plot(update_node)\n", + "f.bottom_bar.add(pulse_sel_slider)\n", + "f.bottom_bar.add(pulse_width_slider)\n", + "f" + ] + }, + { + "cell_type": "markdown", + "id": "62bdffdd", + "metadata": {}, + "source": [ + "### Debugging files" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f3ea86fd", + "metadata": {}, + "outputs": [], + "source": [ + "%matplotlib widget\n", + "wav = sc.linspace('wavelength', 1.0, 13.0, 100, unit='angstrom')\n", + "dummy = sc.DataArray(\n", + " sc.zeros(dims=('wavelength',), shape=(100,), with_variances=True, unit='counts'),\n", + " coords={'wavelength': wav},\n", + ")\n", + "fig = dummy.plot()\n", + "fig.canvas.ylabel = '$Incident monitor$ ' + fig.canvas.ylabel\n", + "artist = next(iter(fig.artists))\n", + "display(fig)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "b10ab819", + "metadata": {}, + "outputs": [], + "source": [ + "import ipywidgets as widgets\n", + "\n", + "pulse_sel_slider = widgets.IntSlider(\n", + " value=0, min=0, max=100, step=100, description='Pulse:', continuous_update=False\n", + ")\n", + "\n", + "\n", + "# Note updates glitch\n", + "def update(change):\n", + " start = change.new\n", + " end = start + 10\n", + " sel = PulseSelection(slice(start, end))\n", + " results = streaming_wf.add_chunk({PulseSelection[SampleRun]: sel})\n", + " mon = results[WavelengthMonitor[SampleRun, Incident]]\n", + " # print(f'Loaded {mon.bins.size().sum().value} events')\n", + " fig.update({artist: mon.hist(wavelength=wav)})\n", + " fig.fig.canvas.draw()\n", + " fig.fig.canvas.flush_events()\n", + "\n", + "\n", + "pulse_sel_slider.observe(update, names='value')\n", + "display(pulse_sel_slider)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9dd7250e", + "metadata": {}, + "outputs": [], + "source": [ + "workflow[Filename[SampleRun]] = loki.data.loki_tutorial_sample_run_60339()\n", + "workflow[Filename[SampleRun]] = loki.data.loki_tutorial_sample_run_60250()\n", + "mon = workflow.compute(MonitorData[SampleRun, Incident]).hist()\n", + "mon.plot()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0606a550", + "metadata": {}, + "outputs": [], + "source": [ + "%matplotlib widget\n", + "from ess.reduce.nexus.generic_types import NeXusDetectorEventData\n", + "\n", + "det = workflow.compute(NeXusDetectorEventData[SampleRun]).hist()\n", + "det.plot()\n", + "det.bin(event_time_zero=mon.coords['event_time_zero'][55:]).hist().plot()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3beabae4", + "metadata": {}, + "outputs": [], + "source": [ + "t_mon = workflow.compute(MonitorData[SampleRun, Incident]).coords['event_time_zero']\n", + "t_det = workflow.compute(NeXusDetectorEventData[SampleRun]).coords['event_time_zero']" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "244e0c18", + "metadata": {}, + "outputs": [], + "source": [ + "t_mon.plot()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "75872142", + "metadata": {}, + "outputs": [], + "source": [ + "t_det.plot()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.14" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}