Skip to content

Commit

Permalink
Merge pull request #60 from vmarandon/blocksource
Browse files Browse the repository at this point in the history
Add BlockNectarCAMEventSource class
  • Loading branch information
jlenain authored Jan 20, 2025
2 parents ffbff42 + cb5646d commit 6f6dd62
Show file tree
Hide file tree
Showing 2 changed files with 330 additions and 52 deletions.
221 changes: 220 additions & 1 deletion src/ctapipe_io_nectarcam/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@
"""

import glob
import os
import re
import struct
from collections.abc import Iterable
from enum import IntFlag, auto

import numpy as np
Expand Down Expand Up @@ -38,6 +40,7 @@
)
from ctapipe.io import DataLevel, EventSource
from pkg_resources import resource_filename
from protozfits import File
from traitlets.config import Config

from .anyarray_dtypes import CDTS_AFTER_37201_DTYPE, CDTS_BEFORE_37201_DTYPE, TIB_DTYPE
Expand All @@ -51,7 +54,12 @@
)
from .version import __version__

__all__ = ["LightNectarCAMEventSource", "NectarCAMEventSource", "__version__"]
__all__ = [
"LightNectarCAMEventSource",
"NectarCAMEventSource",
"BlockNectarCAMEventSource",
"__version__",
]

S_TO_NS = np.uint64(1e9)

Expand Down Expand Up @@ -1291,6 +1299,217 @@ def is_compatible(file_path):
return False


class BlockNectarCAMEventSource:
"""
EventSource for long NectarCAMObservations or read specific part of the run.
The grouping is only done if the number of files is a multiple of the block_size.
It is also possible to analyse only certain blocks via the allowed_blocks argument.
The grouping has the advantage of not opening all files at the same time.
At the moment, it's a standalone class to have better control on what is done.
Could be made the default behavior of NectarCAMEventSource but need some rewriting.
Input:
block_size: The number of file per group.
default: 4
allowed_blocks : id or list of id of block to analyse
default: None (all analysed)
"""

def __init__(self, block_size=4, allowed_blocks=None, **kwargs):
self._arguments = kwargs # blocks
self._file_names = None
self._block_file_names = list()
self._current_source = None
self._current_block = None
self._current_generator = None
self._total_entries = 0
self._current_counts = 0
self.block_size = block_size
self.allowed_blocks = None
self.max_events = None
self.empty_entries = 0
self.show_empty_stats = False

if isinstance(allowed_blocks, int):
self.allowed_blocks = [
allowed_blocks,
]
elif isinstance(allowed_blocks, Iterable):
self.allowed_blocks = list(set([int(e) for e in allowed_blocks]))
else:
self.allowed_blocks = None

if "input_url" in self._arguments.keys():
# give list to NectarCAMEventSource so remove it from arguments
self._file_names = glob.glob(str(kwargs["input_url"]))
self._file_names.sort()
del self._arguments["input_url"]
elif "input_filelist" in self._arguments.keys():
# give list to NectarCAMEventSource so remove it from arguments
self._file_names = kwargs["input_filelist"]
self._file_names.sort()
del self._arguments["input_filelist"]
else:
raise ValueError("No input_irl or input_filelist given !")

if "max_events" in self._arguments.keys():
# treating option here, don't forward it to NectarCAMEventSource
self.max_events = int(kwargs["max_events"])
del self._arguments["max_events"]

if "show_empty_stats" in self._arguments.keys():
# treating option here, don't forward it to NectarCAMEventSource
self.show_empty_stats = bool(kwargs["show_empty_stats"])
del self._arguments["show_empty_stats"]

self._create_blocks()
self._switch_block()

@staticmethod
def is_compatible(file_path):
"""
This version should only be called directly, so return False
such that it is not used when using EventSource.
Nevertheless, in principle it should work as NectarCAMEventSource by default.
"""
return False

def __getattr__(self, attr):
# Forward unknown methods to the current NectarCAMEventSource, if it exist
# More careful checks are needed to know if this truly works...
if hasattr(self._current_source, attr):
attr_val = getattr(self._current_source, attr)
if callable(attr_val):

def call_wrapper(*args, **kwargs):
return getattr(self._current_source, attr)(*args, **kwargs)

return call_wrapper
else:
return attr_val

def _rewind(self):
self._current_block = None
self._switch_block()

def get_entries(self):
if self._total_entries == 0:
for filename in self._file_names:
self._total_entries += len(File(str(filename)).Events)
return (
self._total_entries
if self.max_events is None
else min(self._total_entries, self.max_events)
)

def _switch_block(self):
if self._current_block is None:
self._current_block = 0
else:
self._current_block += 1

valid = False
if self._current_block < len(self._block_file_names):
self._current_source = NectarCAMEventSource(
input_filelist=self._block_file_names[self._current_block],
**self._arguments,
)
self._current_generator = self._current_source._generator()
valid = True
return valid

def __len__(self):
return self.get_entries()

def _create_blocks(self):
if len(self._file_names) % self.block_size != 0 or not self.consecutive_files(
self._file_names
):
print("Not possible to block --> Read everything")
block_list = list()
block_list.append(list(self._file_names))
else:
block_list = list()
nBlocks = len(self._file_names) // self.block_size
for i in range(nBlocks):
imin = i * self.block_size
imax = (i + 1) * self.block_size
block_list.append(self._file_names[imin:imax])
if self.allowed_blocks is not None:
# going to only take the selected blocks
filtered_blocks = list()
for block in self.allowed_blocks:
if block < len(block_list):
filtered_blocks.append(block_list[block])
# Sanity check --> Remove duplicated entries
filtered_blocks = [
x
for n, x in enumerate(filtered_blocks)
if x not in filtered_blocks[:n]
]
filtered_blocks.sort() # just in case
block_list = filtered_blocks
# Erase the input list to keep only the selected files
self._file_names = [file for block in filtered_blocks for file in block]

self._block_file_names = block_list

def consecutive_files(self, file_list=None):
if file_list is None:
file_list = self._file_names
# assume files are of type: 'NectarCAM.Run5665.0246.fits.fz'
consecutive = False
try:
numbers = np.array(
[
int(os.path.basename(f).split(".fits.fz")[0].split(".")[-1])
for f in file_list
]
)
delta_numbers = numbers[1:] - numbers[:-1]
consecutive = np.all(delta_numbers == 1) and numbers[0] == 0
except ValueError:
consecutive = False
return consecutive

def __iter__(self):
self._rewind()
return self

def __next__(self):
if self.max_events is not None and self._current_counts >= self.max_events:
raise StopIteration
try:
next_entry = next(self._current_generator)
except StopIteration:
# End of current block, try if there is a next one
self.empty_entries += self._current_source.get_empty_entries()
if self._switch_block():
next_entry = next(self._current_generator)
else:
if self.show_empty_stats:
self.print_empty_stats()
raise StopIteration
self._current_counts += 1
return next_entry

def get_empty_entries(self):
return self.empty_entries

def print_empty_stats(self):
if self.empty_entries > 0:
print(
f"WARNING> Empty events :"
f" {self.empty_entries}/{self.get_entries()}"
f" --> "
f"{100.*self.empty_entries/self.get_entries():.2f} %"
)


class MultiFiles:
"""
This class open all the files in file_list and read the events following
Expand Down
Loading

0 comments on commit 6f6dd62

Please sign in to comment.