Skip to content

Commit

Permalink
add tree to virtual array conversion
Browse files Browse the repository at this point in the history
  • Loading branch information
pfackeldey committed Feb 26, 2025
1 parent 1254938 commit ca5d453
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 8 deletions.
94 changes: 86 additions & 8 deletions src/uproot/_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -950,11 +950,93 @@ def load_buffers(

return container

def load_virtual_arrays(
self,
tree: HasBranches,
keys: frozenset[str],
start: int,
stop: int,
decompression_executor,
interpretation_executor,
options: Any,
) -> Mapping[str, AwkArray]:
awkward = uproot.extras.awkward()

class Generator:
def __init__(self, branch, attribute):
self.branch = branch
self.attribute = attribute

def __repr__(self):
return f"Generator({self.branch}, {self.attribute})"

def __call__(self):
layout = self.branch.array(
entry_start=start,
entry_stop=stop,
interpretation_executor=interpretation_executor,
decompression_executor=decompression_executor,
library="ak",
ak_add_doc=options.get("ak_add_doc"),
).layout
# this is a bit of a hack, but it works for now
if isinstance(layout, awkward.contents.NumpyArray):
return layout.data
elif isinstance(layout, awkward.contents.ListOffsetArray):
if self.attribute == "data":
return layout.content.data
elif self.attribute == "offsets":
return layout.offsets.data
else:
raise NotImplementedError()
else:
raise NotImplementedError()

container = {}
for buffer_key, _ in self._form.expected_from_buffers().items():
form_key, attribute = self.parse_buffer_key(buffer_key)
branch_name = self._form_key_to_key[form_key]
branch = tree[branch_name]
container[buffer_key] = Generator(branch, attribute)

return container


def form_with_unique_keys(form: Form, key: str) -> Form:
awkward = uproot.extras.awkward()

def impl(form: Form, key: str) -> None:
# Set form key
form.form_key = key

# If the form is a record we need to loop over all fields in the
# record and set form that include the field name; this will keep
# recursing as well.
if form.is_record:
for field in form.fields:
impl(form.content(field), f"{key}.{field}")

elif form.is_union:
for i, entry in enumerate(form.contents):
impl(entry, f"{key}#{i}")

# NumPy like array is easy
elif form.is_numpy or form.is_unknown:
pass

# Anything else grab the content and keep recursing
else:
impl(form.content, f"{key}.content")

# Perform a "deep" copy without preserving references
form = awkward.forms.from_dict(form.to_dict())
impl(form, key)
return form


class TrivialFormMapping(ImplementsFormMapping):
def __call__(self, form: Form) -> tuple[Form, TrivialFormMappingInfo]:
dask_awkward = uproot.extras.dask_awkward()
new_form = dask_awkward.lib.utils.form_with_unique_keys(form, "<root>")
new_form = form_with_unique_keys(form, "<root>")
return new_form, TrivialFormMappingInfo(new_form)


Expand Down Expand Up @@ -1548,9 +1630,7 @@ def real_filter_branch(branch):
partition_args.append((0, 0, 0))

if form_mapping is None:
expected_form = dask_awkward.lib.utils.form_with_unique_keys(
base_form, "<root>"
)
expected_form = form_with_unique_keys(base_form, "<root>")
form_mapping_info = TrivialFormMappingInfo(expected_form)
else:
expected_form, form_mapping_info = form_mapping(base_form)
Expand Down Expand Up @@ -1651,9 +1731,7 @@ def _get_dak_array_delay_open(
)

if form_mapping is None:
expected_form = dask_awkward.lib.utils.form_with_unique_keys(
base_form, "<root>"
)
expected_form = form_with_unique_keys(base_form, "<root>")
form_mapping_info = TrivialFormMappingInfo(expected_form)
else:
expected_form, form_mapping_info = form_mapping(base_form)
Expand Down
74 changes: 74 additions & 0 deletions src/uproot/behaviors/TBranch.py
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,80 @@ def show(

stream.write(formatter.format(name, typename, interp).rstrip(" ") + "\n")

def virtual_arrays(
self,
*,
filter_name=no_filter,
filter_typename=no_filter,
filter_branch=no_filter,
aliases=None,
recursive=True,
full_paths=True,
ignore_duplicates=False,
language=uproot.language.python.python_language,
form_mapping=None,
entry_start=None,
entry_stop=None,
decompression_executor=None,
interpretation_executor=None,
array_cache="inherit",
ak_add_doc=False,
):
from uproot._dask import (
TrivialFormMappingInfo,
_get_ttree_form,
form_with_unique_keys,
)

awkward = uproot.extras.awkward()

entry_start, entry_stop = _regularize_entries_start_stop(
self.num_entries, entry_start, entry_stop
)
decompression_executor, interpretation_executor = _regularize_executors(
decompression_executor, interpretation_executor, self._file
)
array_cache = _regularize_array_cache(array_cache, self._file)

keys = self.keys(
filter_name=filter_name,
filter_typename=filter_typename,
filter_branch=filter_branch,
recursive=recursive,
full_paths=full_paths,
ignore_duplicates=ignore_duplicates,
)

base_form = _get_ttree_form(
awkward,
self,
keys,
ak_add_doc,
)

if form_mapping is None:
expected_form = form_with_unique_keys(base_form, "<root>")
form_mapping_info = TrivialFormMappingInfo(expected_form)
else:
expected_form, form_mapping_info = form_mapping(base_form)

container = form_mapping_info.load_virtual_arrays(
self,
keys,
entry_start,
entry_stop,
decompression_executor,
interpretation_executor,
{"ak_add_doc": ak_add_doc},
)
return awkward.from_buffers(
expected_form,
entry_stop - entry_start,
container,
behavior=form_mapping_info.behavior,
buffer_key=form_mapping_info.buffer_key,
)

def arrays(
self,
expressions=None,
Expand Down

0 comments on commit ca5d453

Please sign in to comment.