Skip to content

Commit

Permalink
Merge pull request #2628 from cta-observatory/stats_calc_tool
Browse files Browse the repository at this point in the history
Stats calc tool
  • Loading branch information
maxnoe authored Jan 29, 2025
2 parents b69540b + e24e523 commit ef73aa8
Show file tree
Hide file tree
Showing 7 changed files with 351 additions and 0 deletions.
1 change: 1 addition & 0 deletions docs/changes/2628.feature.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add a generic stats-calculation tool utilizing the PixelStatisticsCalculator.
1 change: 1 addition & 0 deletions docs/user-guide/tools.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Data Processing Tools
* ``ctapipe-quickstart``: create some default analysis configurations and a working directory
* ``ctapipe-process``: Process event data in any supported format from R0/R1/DL0 to DL1 or DL2 HDF5 files.
* ``ctapipe-apply-models``: Tool to apply machine learning models in bulk (as opposed to event by event).
* ``ctapipe-calculate-pixel-statistics``: Tool to aggregate statistics and detect outliers from pixel-wise image data.
* ``ctapipe-train-disp-reconstructor`` : Train the ML models for the `ctapipe.reco.DispReconstructor` (monoscopic reconstruction)
* ``ctapipe-train-energy-regressor``: Train the ML models for the `ctapipe.reco.EnergyRegressor` (energy estimation)
* ``ctapipe-train-particle-classifier``: Train the ML models for the `ctapipe.reco.ParticleClassifier` (gamma-hadron separation)
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ ctapipe-process = "ctapipe.tools.process:main"
ctapipe-merge = "ctapipe.tools.merge:main"
ctapipe-fileinfo = "ctapipe.tools.fileinfo:main"
ctapipe-quickstart = "ctapipe.tools.quickstart:main"
ctapipe-calculate-pixel-statistics = "ctapipe.tools.calculate_pixel_stats:main"
ctapipe-train-energy-regressor = "ctapipe.tools.train_energy_regressor:main"
ctapipe-train-particle-classifier = "ctapipe.tools.train_particle_classifier:main"
ctapipe-train-disp-reconstructor = "ctapipe.tools.train_disp_reconstructor:main"
Expand Down
32 changes: 32 additions & 0 deletions src/ctapipe/resources/calculate_pixel_stats.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
PixelStatisticsCalculatorTool:
allowed_tels: [1,2,3,4]
input_column_name: image
output_table_name: statistics

PixelStatisticsCalculator:
stats_aggregator_type:
- ["type", "LST*", "SigmaClippingAggregator"]
- ["type", "MST*", "PlainAggregator"]

chunk_shift: 1000
faulty_pixels_fraction: 0.1
outlier_detector_list:
- name: MedianOutlierDetector
apply_to: median
config:
median_range_factors: [-15, 15]
- name: RangeOutlierDetector
apply_to: median
config:
validity_range: [-20, 120]
- name: StdOutlierDetector
apply_to: std
config:
std_range_factors: [-15, 15]

SigmaClippingAggregator:
chunk_size: 2500
max_sigma: 4
iterations: 5
PlainAggregator:
chunk_size: 2500
191 changes: 191 additions & 0 deletions src/ctapipe/tools/calculate_pixel_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
"""
Perform statistics calculation from pixel-wise image data
"""

import pathlib

import numpy as np
from astropy.table import vstack

from ctapipe.core import Tool
from ctapipe.core.tool import ToolConfigurationError
from ctapipe.core.traits import (
Bool,
CInt,
Path,
Set,
Unicode,
classes_with_traits,
)
from ctapipe.instrument import SubarrayDescription
from ctapipe.io import write_table
from ctapipe.io.tableloader import TableLoader
from ctapipe.monitoring.calculator import PixelStatisticsCalculator


class PixelStatisticsCalculatorTool(Tool):
"""
Perform statistics calculation for pixel-wise image data
"""

name = "ctapipe-calculate-pixel-statistics"
description = "Perform statistics calculation for pixel-wise image data"

examples = """
To calculate statistics of pixel-wise image data files:
> ctapipe-calculate-pixel-statistics --TableLoader.input_url input.dl1.h5 --output_path /path/monitoring.h5 --overwrite
"""

allowed_tels = Set(
trait=CInt(),
default_value=None,
allow_none=True,
help=(
"List of allowed tel_ids, others will be ignored. "
"If None, all telescopes in the input stream will be included."
),
).tag(config=True)

input_column_name = Unicode(
default_value="image",
allow_none=False,
help="Column name of the pixel-wise image data to calculate statistics",
).tag(config=True)

output_table_name = Unicode(
default_value="statistics",
allow_none=False,
help="Table name of the output statistics",
).tag(config=True)

output_path = Path(
help="Output filename", default_value=pathlib.Path("monitoring.h5")
).tag(config=True)

overwrite = Bool(help="Overwrite output file if it exists").tag(config=True)

aliases = {
("i", "input_url"): "TableLoader.input_url",
("o", "output_path"): "PixelStatisticsCalculatorTool.output_path",
}

flags = {
"overwrite": (
{"PixelStatisticsCalculatorTool": {"overwrite": True}},
"Overwrite existing files",
),
}

classes = [
TableLoader,
] + classes_with_traits(PixelStatisticsCalculator)

def setup(self):
# Read the input data with the 'TableLoader'
self.input_data = TableLoader(
parent=self,
)
# Check that the input and output files are not the same
if self.input_data.input_url == self.output_path:
raise ToolConfigurationError(
"Input and output files are same. Fix your configuration / cli arguments."
)
if "dl1_images" in self.input_data.config.TableLoader:
if not self.input_data.dl1_images:
raise ToolConfigurationError(
"The TableLoader must read dl1 images. Set 'dl1_images' to True."
)
self.input_data.dl1_images = True
# Load the subarray description from the input file
subarray = SubarrayDescription.from_hdf(self.input_data.input_url)
# Get the telescope ids from the input data or use the allowed_tels configuration
self.tel_ids = (
subarray.tel_ids if self.allowed_tels is None else self.allowed_tels
)
# Initialization of the statistics calculator
self.stats_calculator = PixelStatisticsCalculator(
parent=self, subarray=subarray
)

def start(self):
# Iterate over the telescope ids and calculate the statistics
for tel_id in self.tel_ids:
# Read the whole dl1 images for one particular telescope
dl1_table = self.input_data.read_telescope_events_by_id(
telescopes=[
tel_id,
],
)[tel_id]
# Check if the chunk size does not exceed the table length of the input data
if self.stats_calculator.stats_aggregators[
self.stats_calculator.stats_aggregator_type.tel[tel_id]
].chunk_size > len(dl1_table):
raise ToolConfigurationError(
f"Change --StatisticsAggregator.chunk_size to decrease the chunk size "
f"of the aggregation to a maximum of '{len(dl1_table)}' (table length of the "
f"input data for telescope 'tel_id={tel_id}')."
)
# Check if the input column name is in the table
if self.input_column_name not in dl1_table.colnames:
raise ToolConfigurationError(
f"Column '{self.input_column_name}' not found "
f"in the input data for telescope 'tel_id={tel_id}'."
)
# Perform the first pass of the statistics calculation
aggregated_stats = self.stats_calculator.first_pass(
table=dl1_table,
tel_id=tel_id,
col_name=self.input_column_name,
)
# Check if 'chunk_shift' is selected
if self.stats_calculator.chunk_shift is not None:
# Check if there are any faulty chunks to perform a second pass over the data
if np.any(~aggregated_stats["is_valid"].data):
# Perform the second pass of the statistics calculation
aggregated_stats_secondpass = self.stats_calculator.second_pass(
table=dl1_table,
valid_chunks=aggregated_stats["is_valid"].data,
tel_id=tel_id,
col_name=self.input_column_name,
)
# Stack the statistic values from the first and second pass
aggregated_stats = vstack(
[aggregated_stats, aggregated_stats_secondpass]
)
# Sort the stacked aggregated statistic values by starting time
aggregated_stats.sort(["time_start"])
else:
self.log.info(
"No faulty chunks found for telescope 'tel_id=%d'. Skipping second pass.",
tel_id,
)
# Add metadata to the aggregated statistics
aggregated_stats.meta["event_type"] = dl1_table["event_type"][0]
aggregated_stats.meta["input_column_name"] = self.input_column_name
# Write the aggregated statistics and their outlier mask to the output file
write_table(
aggregated_stats,
self.output_path,
f"/dl1/monitoring/telescope/{self.output_table_name}/tel_{tel_id:03d}",
overwrite=self.overwrite,
)

def finish(self):
self.log.info(
"DL1 monitoring data was stored in '%s' under '%s'",
self.output_path,
f"/dl1/monitoring/telescope/{self.output_table_name}",
)
self.log.info("Tool is shutting down")


def main():
# Run the tool
tool = PixelStatisticsCalculatorTool()
tool.run()


if __name__ == "main":
main()
1 change: 1 addition & 0 deletions src/ctapipe/tools/quickstart.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

CONFIGS_TO_WRITE = [
"base_config.yaml",
"calculate_pixel_stats.yaml",
"stage1_config.yaml",
"stage2_config.yaml",
"ml_preprocessing_config.yaml",
Expand Down
124 changes: 124 additions & 0 deletions src/ctapipe/tools/tests/test_calculate_pixel_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
#!/usr/bin/env python3
"""
Test ctapipe-calculate-pixel-statistics tool
"""

import pytest
from traitlets.config.loader import Config

from ctapipe.core import run_tool
from ctapipe.core.tool import ToolConfigurationError
from ctapipe.io import read_table
from ctapipe.tools.calculate_pixel_stats import PixelStatisticsCalculatorTool


def test_calculate_pixel_stats_tool(tmp_path, dl1_image_file):
"""check statistics calculation from pixel-wise image data files"""

# Create a configuration suitable for the test
tel_id = 3
config = Config(
{
"PixelStatisticsCalculatorTool": {
"allowed_tels": [tel_id],
"input_column_name": "image",
"output_table_name": "statistics",
},
"PixelStatisticsCalculator": {
"stats_aggregator_type": [
("id", tel_id, "PlainAggregator"),
],
},
"PlainAggregator": {
"chunk_size": 1,
},
}
)
# Set the output file path
monitoring_file = tmp_path / "monitoring.dl1.h5"
# Run the tool with the configuration and the input file
run_tool(
PixelStatisticsCalculatorTool(config=config),
argv=[
f"--input_url={dl1_image_file}",
f"--output_path={monitoring_file}",
"--overwrite",
],
cwd=tmp_path,
raises=True,
)
# Check that the output file has been created
assert monitoring_file.exists()
# Check that the output file is not empty
assert (
read_table(
monitoring_file,
path=f"/dl1/monitoring/telescope/statistics/tel_{tel_id:03d}",
)["mean"]
is not None
)


def test_tool_config_error(tmp_path, dl1_image_file):
"""check tool configuration error"""

# Run the tool with the configuration and the input file
config = Config(
{
"PixelStatisticsCalculatorTool": {
"allowed_tels": [3],
"input_column_name": "image_charges",
"output_table_name": "statistics",
}
}
)
# Set the output file path
monitoring_file = tmp_path / "monitoring.dl1.h5"
# Check if ToolConfigurationError is raised
# when the column name of the pixel-wise image data is not correct
with pytest.raises(
ToolConfigurationError, match="Column 'image_charges' not found"
):
run_tool(
PixelStatisticsCalculatorTool(config=config),
argv=[
f"--input_url={dl1_image_file}",
f"--output_path={monitoring_file}",
"--StatisticsAggregator.chunk_size=1",
"--overwrite",
],
cwd=tmp_path,
raises=True,
)
# Check if ToolConfigurationError is raised
# when the input and output files are the same
with pytest.raises(
ToolConfigurationError, match="Input and output files are same."
):
run_tool(
PixelStatisticsCalculatorTool(),
argv=[
f"--input_url={dl1_image_file}",
f"--output_path={dl1_image_file}",
"--overwrite",
],
cwd=tmp_path,
raises=True,
)
# Check if ToolConfigurationError is raised
# when the chunk size is larger than the number of events in the input file
with pytest.raises(
ToolConfigurationError, match="Change --StatisticsAggregator.chunk_size"
):
run_tool(
PixelStatisticsCalculatorTool(),
argv=[
f"--input_url={dl1_image_file}",
f"--output_path={monitoring_file}",
"--PixelStatisticsCalculatorTool.allowed_tels=3",
"--StatisticsAggregator.chunk_size=2500",
"--overwrite",
],
cwd=tmp_path,
raises=True,
)

0 comments on commit ef73aa8

Please sign in to comment.