diff --git a/src/uproot/models/RNTuple.py b/src/uproot/models/RNTuple.py index a8faf9c6e..aa1746819 100644 --- a/src/uproot/models/RNTuple.py +++ b/src/uproot/models/RNTuple.py @@ -8,7 +8,6 @@ import struct import sys from collections import defaultdict -from itertools import accumulate import numpy import xxhash @@ -743,10 +742,8 @@ def read_pagedesc(self, destination, desc, dtype_str, dtype, nbits, split): content = res.view(dtype) if isbit: - content = ( - numpy.unpackbits(content.view(dtype=numpy.uint8)) - .reshape(-1, 8)[:, ::-1] - .reshape(-1) + content = numpy.unpackbits( + content.view(dtype=numpy.uint8), bitorder="little" ) elif dtype_str in ("real32trunc", "real32quant"): if nbits == 32: @@ -778,18 +775,21 @@ def read_col_pages( if dtype_byte in uproot.const.rntuple_delta_types: # Extract the last offset values: last_elements = [ - arr[-1] for arr in arrays[:-1] + (arr[-1] if len(arr) > 0 else numpy.zeros((), dtype=arr.dtype)) + for arr in arrays[:-1] ] # First value always zero, therefore skip first arr. # Compute cumulative sum using itertools.accumulate: - last_offsets = list(accumulate(last_elements)) + last_offsets = numpy.cumsum(last_elements) # Add the offsets to each array for i in range(1, len(arrays)): arrays[i] += last_offsets[i - 1] - # Remove the first element from every sub-array except for the first one: - arrays = [arrays[0]] + [arr[1:] for arr in arrays[1:]] res = numpy.concatenate(arrays, axis=0) + dtype_byte = self.column_records[ncol].type + if dtype_byte in uproot.const.rntuple_index_types: + res = numpy.insert(res, 0, 0) # for offsets + if pad_missing_element: first_element_index = self.column_records[ncol].first_element_index res = numpy.pad(res, (first_element_index, 0)) @@ -817,7 +817,6 @@ def read_col_page(self, ncol, cluster_i): split = dtype_byte in uproot.const.rntuple_split_types zigzag = dtype_byte in uproot.const.rntuple_zigzag_types delta = dtype_byte in uproot.const.rntuple_delta_types - index = dtype_byte in uproot.const.rntuple_index_types nbits = ( self.column_records[ncol].nbits if ncol < len(self.column_records) @@ -836,8 +835,6 @@ def read_col_page(self, ncol, cluster_i): cumsum += numpy.sum(res[tracker:tracker_end]) tracker = tracker_end - if index: - res = numpy.insert(res, 0, 0) # for offsets if zigzag: res = _from_zigzag(res) elif delta: diff --git a/src/uproot/writing/_cascadentuple.py b/src/uproot/writing/_cascadentuple.py index f2089ab0b..6bffd885d 100644 --- a/src/uproot/writing/_cascadentuple.py +++ b/src/uproot/writing/_cascadentuple.py @@ -14,6 +14,7 @@ import struct import awkward +import numpy import xxhash import uproot @@ -39,36 +40,31 @@ _rntuple_locator_offset_format, _rntuple_locator_size_format, _rntuple_page_num_elements_format, + _rntuple_repetition_format, ) from uproot.writing._cascade import CascadeLeaf, CascadeNode, Key, String _rntuple_string_length_format = struct.Struct("" + elif isinstance(akform, awkward.forms.ListOffsetForm): + content_typename = _cpp_typename(akform.content, subcall=True) + typename = f"std::vector<{content_typename}>" + override_typename = akform.parameters.get("__array__", "") + if override_typename != "": + typename = ( + f"std::{override_typename}" # TODO: check if this could cause issues + ) + elif isinstance(akform, awkward.forms.RecordForm): + typename = "UntypedRecord" + elif isinstance(akform, awkward.forms.RegularForm): + content_typename = _cpp_typename(akform.content, subcall=True) + typename = f"std::array<{content_typename},{akform.size}>" + else: + raise NotImplementedError(f"Form type {type(akform)} cannot be written yet") + if not subcall and "UntypedRecord" in typename: + typename = "" # empty types for anything that contains UntypedRecord + return typename + + class RBlob_Key(Key): def __init__( self, @@ -181,6 +206,7 @@ def __init__( type_name, type_alias, field_description, + repetition=None, ): self.field_version = field_version self.type_version = type_version @@ -191,6 +217,7 @@ def __init__( self.type_name = type_name self.type_alias = type_alias self.field_description = field_description + self.repetition = repetition def __repr__(self): return f"{type(self).__name__}({self.field_version!r}, {self.type_version!r}, {self.parent_field_id!r}, {self.struct_role!r}, {self.flags!r}, {self.field_name!r}, {self.type_name!r}, {self.type_alias!r}, {self.field_description!r})" @@ -214,7 +241,10 @@ def serialize(self): ) ] ) - return b"".join([header_bytes, string_bytes]) + additional_bytes = b"" + if self.flags & uproot.const.RNTupleFieldFlag.REPETITIVE: + additional_bytes += _rntuple_repetition_format.pack(self.repetition) + return b"".join([header_bytes, string_bytes, additional_bytes]) # https://github.com/root-project/root/blob/master/tree/ntuple/v7/doc/specifications.md#column-description @@ -246,6 +276,10 @@ def __init__(self, location, name, ntuple_description, akform): self._serialize = None self._checksum = None + self._field_records = [] + self._column_records = [] + self._column_keys = [] + self._ak_node_count = 0 aloc = len(self.serialize()) super().__init__(location, aloc) @@ -256,30 +290,143 @@ def __repr__(self): ", ".join([repr(x) for x in self._akform]), ) - def generate_field_col_records(self): - akform = self._akform - field_names = akform.fields - contents = akform.contents - field_records = [] - column_records = [] - - for field_id, (field_name, ak_col) in enumerate(zip(field_names, contents)): - if not isinstance(ak_col, awkward.forms.NumpyForm): - raise NotImplementedError("only flat column is supported") - ak_primitive = ak_col.primitive - type_name = _ak_primitive_to_typename_dict[ak_primitive] - parent_field_id = field_id + def _build_field_col_records( + self, akform, field_name=None, parent_fid=None, add_field=True + ): + field_id = len(self._field_records) + if parent_fid is None: + parent_fid = field_id + if field_name is None: + field_name = f"_{field_id}" + self._ak_node_count += 1 + if isinstance(akform, awkward.forms.NumpyForm) and akform.inner_shape == (): + type_name = _cpp_typename(akform) field = NTuple_Field_Description( - 0, 0, parent_field_id, 0, 0, field_name, type_name, "", "" + 0, + 0, + parent_fid, + uproot.const.RNTupleFieldRole.LEAF, + 0, + field_name, + type_name, + "", + "", ) + if add_field: + self._field_records.append(field) + else: + field_id = parent_fid + ak_primitive = akform.parameters.get("__array__", akform.primitive) type_num = _ak_primitive_to_num_dict[ak_primitive] type_size = uproot.const.rntuple_col_num_to_size_dict[type_num] col = NTuple_Column_Description(type_num, type_size, field_id, 0, 0) + self._column_records.append(col) + self._column_keys.append(f"node{self._ak_node_count}-data") + elif isinstance(akform, awkward.forms.NumpyForm): + inner_shape = akform.inner_shape + for i, arr_size in enumerate(inner_shape): + if i > 0: + field_id = len(self._field_records) + parent_fid = field_id + type_name = _cpp_typename(akform) + field = NTuple_Field_Description( + 0, + 0, + parent_fid, + uproot.const.RNTupleFieldRole.LEAF, + uproot.const.RNTupleFieldFlag.REPETITIVE, + field_name, + type_name, + "", + "", + repetition=arr_size, + ) + self._field_records.append(field) + ak_primitive = akform.primitive + type_num = _ak_primitive_to_num_dict[ak_primitive] + type_size = uproot.const.rntuple_col_num_to_size_dict[type_num] + col = NTuple_Column_Description(type_num, type_size, field_id, 0, 0) + self._column_records.append(col) + self._column_keys.append(f"node{self._ak_node_count}-data") + elif isinstance(akform, awkward.forms.ListOffsetForm): + type_name = _cpp_typename(akform) + field_role = uproot.const.RNTupleFieldRole.COLLECTION + if akform.parameters.get("__array__", "") == "string": + type_name = "std::string" + field_role = uproot.const.RNTupleFieldRole.LEAF + field = NTuple_Field_Description( + 0, + 0, + parent_fid, + field_role, + 0, + field_name, + type_name, + "", + "", + ) + self._field_records.append(field) + ak_offset = akform.offsets + type_num = _ak_primitive_to_num_dict[ak_offset] + type_size = uproot.const.rntuple_col_num_to_size_dict[type_num] + col = NTuple_Column_Description(type_num, type_size, field_id, 0, 0) + self._column_records.append(col) + self._column_keys.append(f"node{self._ak_node_count}-offsets") + # content data + self._build_field_col_records( + akform.content, + parent_fid=field_id, + add_field=field_role == uproot.const.RNTupleFieldRole.COLLECTION, + ) + elif isinstance(akform, awkward.forms.RecordForm): + field = NTuple_Field_Description( + 0, + 0, + parent_fid, + uproot.const.RNTupleFieldRole.RECORD, + 0, + field_name, + "", + "", + "", + ) + self._field_records.append(field) + for subfield_name, subakform in zip(akform.fields, akform.contents): + self._build_field_col_records( + subakform, + field_name=subfield_name, + parent_fid=field_id, + ) + elif isinstance(akform, awkward.forms.RegularForm): + type_name = _cpp_typename(akform) + field_role = uproot.const.RNTupleFieldRole.LEAF + field = NTuple_Field_Description( + 0, + 0, + parent_fid, + field_role, + uproot.const.RNTupleFieldFlag.REPETITIVE, + field_name, + type_name, + "", + "", + repetition=akform.size, + ) + self._field_records.append(field) + self._build_field_col_records( + akform.content, + parent_fid=field_id, + ) + else: + raise NotImplementedError(f"Form type {type(akform)} cannot be written yet") - field_records.append(field) - column_records.append(col) - - return field_records, column_records + def generate_field_col_records(self): + akform = self._akform + for field_name, topakform in zip(akform.fields, akform.contents): + self._build_field_col_records( + topakform, + field_name=field_name, + ) def serialize(self): if self._serialize: @@ -293,12 +440,12 @@ def serialize(self): out = [] out.extend([feature_flag, name, description, writer]) - field_records, column_records = self.generate_field_col_records() + self.generate_field_col_records() alias_records = [] extra_type_info = [] - out.append(_serialize_rntuple_list_frame(field_records)) - out.append(_serialize_rntuple_list_frame(column_records)) + out.append(_serialize_rntuple_list_frame(self._field_records)) + out.append(_serialize_rntuple_list_frame(self._column_records)) out.append(_serialize_rntuple_list_frame(alias_records)) out.append(_serialize_rntuple_list_frame(extra_type_info)) payload = b"".join(out) @@ -375,8 +522,7 @@ def serialize(self): class NTuple_Locator: def __init__(self, num_bytes, offset): - # approximate 2^16 - size of locator itself - assert num_bytes < 32768 + assert num_bytes < (1 << 32) self.num_bytes = num_bytes self.offset = offset @@ -524,7 +670,7 @@ def __repr__(self): class NTuple_PageDescription: def __init__(self, num_entries, locator): - assert num_entries <= 65536 + assert num_entries < (1 << 32) self.num_entries = num_entries self.locator = locator @@ -658,6 +804,9 @@ def __init__( self._header_key = None self._num_entries = 0 + self._column_counts = numpy.zeros(len(self._header._column_keys), dtype=int) + self._column_offsets = numpy.zeros(len(self._header._column_keys), dtype=int) + def __repr__(self): return f"{type(self).__name__}({self._directory}, {self._header}, {self._footer}, {self._cluster_metadata}, {self._anchor}, {self._freesegments})" @@ -712,13 +861,29 @@ def extend(self, file, sink, data): # We write a single page for each column for now cluster_page_data = [] # list of list of (locator, len, offset) - for field in data.fields: - raw_data = data[field].to_numpy().view("uint8") + data_buffers = awkward.to_buffers(data)[2] + for idx, key in enumerate(self._header._column_keys): + col_data = data_buffers[key] + if "offsets" in key: + col_data = ( + col_data[1:] + self._column_offsets[idx] + ) # TODO: check if there is a better way to do this + if len(col_data) > 0: + self._column_offsets[idx] = col_data[-1] + col_len = len(col_data.reshape(-1)) + # TODO: when col_length is zero we can skip writing the page + # but other things need to be adjusted + raw_data = col_data.reshape(-1).view("uint8") + if col_data.dtype == numpy.dtype("bool"): + raw_data = numpy.packbits(raw_data, bitorder="little") page_key = self.add_rblob(sink, raw_data, len(raw_data), big=False) page_locator = NTuple_Locator( - len(raw_data), page_key.location + page_key.allocation # probably wrong + len(raw_data), page_key.location + page_key.allocation + ) + cluster_page_data.append( + [(page_locator, col_len, self._column_counts[idx])] ) - cluster_page_data.append([(page_locator, len(data), self._num_entries)]) + self._column_counts[idx] += col_len page_data = [ cluster_page_data ] # list of list of list of (locator, len, offset)