Skip to content

Commit dc8fd04

Browse files
fstrugariostas
authored andcommitted
Code refactoring. Increased function sharing for CPU and GPU interpretation of RNTuple data. Changed arrays() argument 'use_GDS' to 'interpreter'.
1 parent a757541 commit dc8fd04

13 files changed

+444
-593
lines changed

src/uproot/behaviors/RNTuple.py

Lines changed: 42 additions & 222 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,8 @@ def concatenate(
207207
entry_stop=None,
208208
decompression_executor=None, # TODO: Not implemented yet
209209
library="ak", # TODO: Not implemented yet
210+
backend="cpu",
211+
interpreter="cpu",
210212
ak_add_doc=False,
211213
how=None,
212214
allow_missing=False,
@@ -382,6 +384,8 @@ def concatenate(
382384
decompression_executor=decompression_executor,
383385
array_cache=None,
384386
library=library,
387+
backend=backend,
388+
interpreter=interpreter,
385389
ak_add_doc=ak_add_doc,
386390
how=how,
387391
filter_branch=filter_branch,
@@ -571,8 +575,8 @@ def arrays(
571575
decompression_executor=None, # TODO: Not implemented yet
572576
array_cache="inherit", # TODO: Not implemented yet
573577
library="ak", # TODO: Not implemented yet
574-
backend="cpu", # TODO: Not Implemented yet
575-
use_GDS=False,
578+
backend="cpu",
579+
interpreter="cpu",
576580
ak_add_doc=False,
577581
how=None,
578582
# For compatibility reasons we also accepts kwargs meant for TTrees
@@ -618,11 +622,10 @@ def arrays(
618622
that is used to represent arrays. Options are ``"np"`` for NumPy,
619623
``"ak"`` for Awkward Array, and ``"pd"`` for Pandas. (Not implemented yet.)
620624
backend (str): The backend Awkward Array will use.
621-
use_GDS (bool): If True and ``backend="cuda"`` will use kvikIO bindings
622-
to CuFile to provide direct memory access (DMA) transfers between GPU
623-
memory and storage. KvikIO bindings to nvcomp decompress data
624-
buffers.
625-
ak_add_doc (bool | dict ): If True and ``library="ak"``, add the RField ``description``
625+
interpreter (str): If "cpu" will use cpu to interpret raw data. If "gpu" and
626+
``backend="cuda"`` will use KvikIO bindings to CuFile and nvCOMP to
627+
interpret raw data on gpu if available.
628+
ak_add_doc (bool | dict ): If True and ``library="ak"``, add the RField ``name``
626629
to the Awkward ``__doc__`` parameter of the array.
627630
if dict = {key:value} and ``library="ak"``, add the RField ``value`` to the
628631
Awkward ``key`` parameter of the array.
@@ -653,93 +656,7 @@ def arrays(
653656
See also :ref:`uproot.behaviors.RNTuple.HasFields.iterate` to iterate over
654657
the array in contiguous ranges of entries.
655658
"""
656-
if not use_GDS:
657-
return self._arrays(
658-
expressions,
659-
cut,
660-
filter_name=filter_name,
661-
filter_typename=filter_typename,
662-
filter_field=filter_field,
663-
aliases=aliases, # TODO: Not implemented yet
664-
language=language, # TODO: Not implemented yet
665-
entry_start=entry_start,
666-
entry_stop=entry_stop,
667-
decompression_executor=decompression_executor, # TODO: Not implemented yet
668-
array_cache=array_cache, # TODO: Not implemented yet
669-
library=library, # TODO: Not implemented yet
670-
backend=backend, # TODO: Not Implemented yet
671-
ak_add_doc=ak_add_doc,
672-
how=how,
673-
# For compatibility reasons we also accepts kwargs meant for TTrees
674-
interpretation_executor=interpretation_executor,
675-
filter_branch=filter_branch,
676-
)
677-
678-
elif use_GDS and backend == "cuda":
679-
return self._arrays_GDS(
680-
expressions,
681-
cut,
682-
filter_name=filter_name,
683-
filter_typename=filter_typename,
684-
filter_field=filter_field,
685-
aliases=aliases, # TODO: Not implemented yet
686-
language=language, # TODO: Not implemented yet
687-
entry_start=entry_start,
688-
entry_stop=entry_stop,
689-
decompression_executor=decompression_executor, # TODO: Not implemented yet
690-
array_cache=array_cache, # TODO: Not implemented yet
691-
library=library, # TODO: Not implemented yet
692-
backend=backend, # TODO: Not Implemented yet
693-
ak_add_doc=ak_add_doc,
694-
how=how,
695-
# For compatibility reasons we also accepts kwargs meant for TTrees
696-
interpretation_executor=interpretation_executor,
697-
filter_branch=filter_branch,
698-
)
699-
700-
elif use_GDS and backend != "cuda":
701-
raise NotImplementedError(
702-
f"Backend {backend} GDS support not implemented.".format(backend)
703-
)
704-
705-
def _arrays(
706-
self,
707-
expressions=None, # TODO: Not implemented yet
708-
cut=None, # TODO: Not implemented yet
709-
*,
710-
filter_name=no_filter,
711-
filter_typename=no_filter,
712-
filter_field=no_filter,
713-
aliases=None, # TODO: Not implemented yet
714-
language=uproot.language.python.python_language, # TODO: Not implemented yet
715-
entry_start=None,
716-
entry_stop=None,
717-
decompression_executor=None, # TODO: Not implemented yet
718-
array_cache="inherit", # TODO: Not implemented yet
719-
library="ak", # TODO: Not implemented yet
720-
backend="cpu", # TODO: Not Implemented yet
721-
ak_add_doc=False,
722-
how=None,
723-
# For compatibility reasons we also accepts kwargs meant for TTrees
724-
interpretation_executor=None,
725-
filter_branch=unset,
726-
):
727-
"""
728-
Returns a group of arrays from the ``RNTuple``.
729659

730-
For example:
731-
732-
.. code-block:: python
733-
734-
>>> my_ntuple.arrays()
735-
<Array [{my_vector: [1, 2]}, {...}] type='2 * {my_vector: var * int64}'>
736-
737-
See also :ref:`uproot.behaviors.RNTuple.HasFields.array` to read a single
738-
``RField`` as an array.
739-
740-
See also :ref:`uproot.behaviors.RNTuple.HasFields.iterate` to iterate over
741-
the array in contiguous ranges of entries.
742-
"""
743660
# This temporarily provides basic functionality while expressions are properly implemented
744661
if expressions is not None:
745662
if filter_name == no_filter:
@@ -779,18 +696,36 @@ def _arrays(
779696
container_dict = {}
780697
_recursive_find(form, target_cols)
781698

699+
if interpreter == "gpu" and backend == "cuda":
700+
clusters_datas = self.ntuple.gpu_read_clusters(
701+
target_cols, start_cluster_idx, stop_cluster_idx
702+
)
703+
clusters_datas._decompress()
704+
content_dict = self.ntuple.gpu_deserialize_decompressed_content(
705+
clusters_datas,
706+
start_cluster_idx,
707+
stop_cluster_idx,
708+
pad_missing_element=True,
709+
)
710+
782711
for key in target_cols:
783712
if "column" in key and "union" not in key:
784713
key_nr = int(key.split("-")[1])
785-
714+
if interpreter == "cpu":
715+
content = self.ntuple.read_col_pages(
716+
key_nr,
717+
range(start_cluster_idx, stop_cluster_idx),
718+
pad_missing_element=True,
719+
)
720+
elif interpreter == "gpu" and backend == "cuda":
721+
content = content_dict[key_nr]
722+
elif interpreter == "gpu" and backend != "cuda":
723+
raise NotImplementedError(
724+
f"Backend {backend} GDS support not implemented.".format(
725+
backend
726+
)
727+
)
786728
dtype_byte = self.ntuple.column_records[key_nr].type
787-
788-
content = self.ntuple.read_col_pages(
789-
key_nr,
790-
range(start_cluster_idx, stop_cluster_idx),
791-
dtype_byte=dtype_byte,
792-
pad_missing_element=True,
793-
)
794729
_fill_container_dict(container_dict, content, key, dtype_byte)
795730

796731
cluster_offset = cluster_starts[start_cluster_idx]
@@ -801,6 +736,7 @@ def _arrays(
801736
cluster_num_entries,
802737
container_dict,
803738
allow_noncanonical_form=True,
739+
backend="cuda" if interpreter == "gpu" and backend == "cuda" else "cpu",
804740
)[entry_start:entry_stop]
805741

806742
arrays = uproot.extras.awkward().to_backend(arrays, backend=backend)
@@ -822,126 +758,6 @@ def _arrays(
822758
raise ValueError(
823759
f"unrecognized 'how' parameter: {how}. Options are None, tuple, list and dict."
824760
)
825-
826-
return arrays
827-
828-
def _arrays_GDS(
829-
self,
830-
expressions=None, # TODO: Not implemented yet
831-
cut=None, # TODO: Not implemented yet
832-
*,
833-
filter_name=no_filter,
834-
filter_typename=no_filter,
835-
filter_field=no_filter,
836-
aliases=None, # TODO: Not implemented yet
837-
language=uproot.language.python.python_language, # TODO: Not implemented yet
838-
entry_start=None,
839-
entry_stop=None,
840-
decompression_executor=None, # TODO: Not implemented yet
841-
array_cache="inherit", # TODO: Not implemented yet
842-
library="ak", # TODO: Not implemented yet
843-
backend="cuda", # TODO: Not Implemented yet
844-
ak_add_doc=False,
845-
how=None,
846-
# For compatibility reasons we also accepts kwargs meant for TTrees
847-
interpretation_executor=None,
848-
filter_branch=unset,
849-
):
850-
"""
851-
Current GDS support is limited to nvidia GPUs. The python library kvikIO is
852-
a required dependency for Uproot GDS reading which can be installed by
853-
calling pip install uproot[GDS_cuX] where X corresponds to the major cuda
854-
version available on the user's system.
855-
856-
Returns a group of arrays from the ``RNTuple``.
857-
858-
For example:
859-
860-
.. code-block:: python
861-
862-
>>> my_ntuple.arrays(useGDS = True, backend = "cuda")
863-
<Array [{my_vector: [1, 2]}, {...}] type='2 * {my_vector: var * int64}'>
864-
865-
866-
"""
867-
# This temporarily provides basic functionality while expressions are properly implemented
868-
if expressions is not None:
869-
if filter_name == no_filter:
870-
filter_name = expressions
871-
else:
872-
raise ValueError(
873-
"Expressions are not supported yet. They are currently equivalent to filter_name."
874-
)
875-
876-
#####
877-
# Find clusters to read that contain data from entry_start to entry_stop
878-
entry_start, entry_stop = (
879-
uproot.behaviors.TBranch._regularize_entries_start_stop(
880-
self.num_entries, entry_start, entry_stop
881-
)
882-
)
883-
clusters = self.ntuple.cluster_summaries
884-
cluster_starts = numpy.array([c.num_first_entry for c in clusters])
885-
start_cluster_idx = (
886-
numpy.searchsorted(cluster_starts, entry_start, side="right") - 1
887-
)
888-
stop_cluster_idx = numpy.searchsorted(cluster_starts, entry_stop, side="right")
889-
cluster_num_entries = numpy.sum(
890-
[c.num_entries for c in clusters[start_cluster_idx:stop_cluster_idx]]
891-
)
892-
893-
# Get form for requested columns
894-
form = self.to_akform(
895-
filter_name=filter_name,
896-
filter_typename=filter_typename,
897-
filter_field=filter_field,
898-
filter_branch=filter_branch,
899-
)
900-
901-
# Only read columns mentioned in the awkward form
902-
target_cols = []
903-
container_dict = {}
904-
905-
_recursive_find(form, target_cols)
906-
907-
#####
908-
# Read and decompress all columns' data
909-
clusters_datas = self.ntuple.gpu_read_clusters(
910-
target_cols, start_cluster_idx, stop_cluster_idx
911-
)
912-
clusters_datas._decompress()
913-
#####
914-
# Deserialize decompressed datas
915-
content_dict = self.ntuple.gpu_deserialize_decompressed_content(
916-
clusters_datas, start_cluster_idx, stop_cluster_idx
917-
)
918-
#####
919-
# Reconstitute arrays to an awkward array
920-
container_dict = {}
921-
# Debugging
922-
for key in target_cols:
923-
if "column" in key and "union" not in key:
924-
key_nr = int(key.split("-")[1])
925-
926-
dtype_byte = self.ntuple.column_records[key_nr].type
927-
content = content_dict[key_nr]
928-
_fill_container_dict(container_dict, content, key, dtype_byte)
929-
930-
cluster_offset = cluster_starts[start_cluster_idx]
931-
entry_start -= cluster_offset
932-
entry_stop -= cluster_offset
933-
934-
arrays = uproot.extras.awkward().from_buffers(
935-
form,
936-
cluster_num_entries,
937-
container_dict,
938-
allow_noncanonical_form=True,
939-
backend="cuda",
940-
)[entry_start:entry_stop]
941-
942-
# Free memory
943-
del content_dict, container_dict, clusters_datas
944-
945761
return arrays
946762

947763
def __array__(self, *args, **kwargs):
@@ -969,6 +785,8 @@ def iterate(
969785
step_size="100 MB",
970786
decompression_executor=None, # TODO: Not implemented yet
971787
library="ak", # TODO: Not implemented yet
788+
backend="cpu",
789+
interpreter="cpu",
972790
ak_add_doc=False,
973791
how=None,
974792
report=False, # TODO: Not implemented yet
@@ -1079,13 +897,15 @@ def iterate(
1079897
)
1080898
# TODO: This can be done more efficiently
1081899
for start in range(0, self.num_entries, step_size):
1082-
yield self._arrays(
900+
yield self.arrays(
1083901
filter_name=filter_name,
1084902
filter_typename=filter_typename,
1085903
filter_field=filter_field,
1086904
entry_start=start,
1087905
entry_stop=start + step_size,
1088906
library=library,
907+
backend=backend,
908+
interpreter=interpreter,
1089909
how=how,
1090910
filter_branch=filter_branch,
1091911
)

0 commit comments

Comments
 (0)