diff --git a/src/uproot/behaviors/RNTuple.py b/src/uproot/behaviors/RNTuple.py index 705212755..fa0272d2a 100644 --- a/src/uproot/behaviors/RNTuple.py +++ b/src/uproot/behaviors/RNTuple.py @@ -207,6 +207,8 @@ def concatenate( entry_stop=None, decompression_executor=None, # TODO: Not implemented yet library="ak", # TODO: Not implemented yet + backend="cpu", + interpreter="cpu", ak_add_doc=False, how=None, allow_missing=False, @@ -382,6 +384,8 @@ def concatenate( decompression_executor=decompression_executor, array_cache=None, library=library, + backend=backend, + interpreter=interpreter, ak_add_doc=ak_add_doc, how=how, filter_branch=filter_branch, @@ -574,8 +578,8 @@ def arrays( decompression_executor=None, # TODO: Not implemented yet array_cache="inherit", # TODO: Not implemented yet library="ak", # TODO: Not implemented yet - backend="cpu", # TODO: Not Implemented yet - use_GDS=False, + backend="cpu", + interpreter="cpu", ak_add_doc=False, how=None, # For compatibility reasons we also accepts kwargs meant for TTrees @@ -621,10 +625,9 @@ def arrays( that is used to represent arrays. Options are ``"np"`` for NumPy, ``"ak"`` for Awkward Array, and ``"pd"`` for Pandas. (Not implemented yet.) backend (str): The backend Awkward Array will use. - use_GDS (bool): If True and ``backend="cuda"`` will use kvikIO bindings - to CuFile to provide direct memory access (DMA) transfers between GPU - memory and storage. KvikIO bindings to nvcomp decompress data - buffers. + interpreter (str): If "cpu" will use cpu to interpret raw data. If "gpu" and + ``backend="cuda"`` will use KvikIO bindings to CuFile and nvCOMP to + interpret raw data on gpu if available. ak_add_doc (bool | dict ): If True and ``library="ak"``, add the RField ``description`` to the Awkward ``__doc__`` parameter of the array. if dict = {key:value} and ``library="ak"``, add the RField ``value`` to the @@ -656,93 +659,7 @@ def arrays( See also :ref:`uproot.behaviors.RNTuple.HasFields.iterate` to iterate over the array in contiguous ranges of entries. """ - if not use_GDS: - return self._arrays( - expressions, - cut, - filter_name=filter_name, - filter_typename=filter_typename, - filter_field=filter_field, - aliases=aliases, # TODO: Not implemented yet - language=language, # TODO: Not implemented yet - entry_start=entry_start, - entry_stop=entry_stop, - decompression_executor=decompression_executor, # TODO: Not implemented yet - array_cache=array_cache, # TODO: Not implemented yet - library=library, # TODO: Not implemented yet - backend=backend, # TODO: Not Implemented yet - ak_add_doc=ak_add_doc, - how=how, - # For compatibility reasons we also accepts kwargs meant for TTrees - interpretation_executor=interpretation_executor, - filter_branch=filter_branch, - ) - - elif use_GDS and backend == "cuda": - return self._arrays_GDS( - expressions, - cut, - filter_name=filter_name, - filter_typename=filter_typename, - filter_field=filter_field, - aliases=aliases, # TODO: Not implemented yet - language=language, # TODO: Not implemented yet - entry_start=entry_start, - entry_stop=entry_stop, - decompression_executor=decompression_executor, # TODO: Not implemented yet - array_cache=array_cache, # TODO: Not implemented yet - library=library, # TODO: Not implemented yet - backend=backend, # TODO: Not Implemented yet - ak_add_doc=ak_add_doc, - how=how, - # For compatibility reasons we also accepts kwargs meant for TTrees - interpretation_executor=interpretation_executor, - filter_branch=filter_branch, - ) - - elif use_GDS and backend != "cuda": - raise NotImplementedError( - f"Backend {backend} GDS support not implemented.".format(backend) - ) - - def _arrays( - self, - expressions=None, # TODO: Not implemented yet - cut=None, # TODO: Not implemented yet - *, - filter_name=no_filter, - filter_typename=no_filter, - filter_field=no_filter, - aliases=None, # TODO: Not implemented yet - language=uproot.language.python.python_language, # TODO: Not implemented yet - entry_start=None, - entry_stop=None, - decompression_executor=None, # TODO: Not implemented yet - array_cache="inherit", # TODO: Not implemented yet - library="ak", # TODO: Not implemented yet - backend="cpu", # TODO: Not Implemented yet - ak_add_doc=False, - how=None, - # For compatibility reasons we also accepts kwargs meant for TTrees - interpretation_executor=None, - filter_branch=unset, - ): - """ - Returns a group of arrays from the ``RNTuple``. - For example: - - .. code-block:: python - - >>> my_ntuple.arrays() - - - See also :ref:`uproot.behaviors.RNTuple.HasFields.array` to read a single - ``RField`` as an array. - - See also :ref:`uproot.behaviors.RNTuple.HasFields.iterate` to iterate over - the array in contiguous ranges of entries. - """ # This temporarily provides basic functionality while expressions are properly implemented if expressions is not None: if filter_name == no_filter: @@ -782,18 +699,34 @@ def _arrays( container_dict = {} _recursive_find(form, target_cols) + if interpreter == "gpu" and backend == "cuda": + clusters_datas = self.ntuple.gpu_read_clusters( + target_cols, start_cluster_idx, stop_cluster_idx + ) + clusters_datas._decompress() + content_dict = self.ntuple.gpu_deserialize_decompressed_content( + clusters_datas, + start_cluster_idx, + stop_cluster_idx, + pad_missing_element=True, + ) + for key in target_cols: if "column" in key and "union" not in key: key_nr = int(key.split("-")[1]) - + if interpreter == "cpu": + content = self.ntuple.read_col_pages( + key_nr, + range(start_cluster_idx, stop_cluster_idx), + pad_missing_element=True, + ) + elif interpreter == "gpu" and backend == "cuda": + content = content_dict[key_nr] + elif interpreter == "gpu": + raise NotImplementedError( + f"Backend {backend} GDS support not implemented." + ) dtype_byte = self.ntuple.column_records[key_nr].type - - content = self.ntuple.read_col_pages( - key_nr, - range(start_cluster_idx, stop_cluster_idx), - dtype_byte=dtype_byte, - pad_missing_element=True, - ) _fill_container_dict(container_dict, content, key, dtype_byte) cluster_offset = cluster_starts[start_cluster_idx] @@ -804,6 +737,7 @@ def _arrays( cluster_num_entries, container_dict, allow_noncanonical_form=True, + backend="cuda" if interpreter == "gpu" and backend == "cuda" else "cpu", )[entry_start:entry_stop] arrays = uproot.extras.awkward().to_backend(arrays, backend=backend) @@ -825,126 +759,6 @@ def _arrays( raise ValueError( f"unrecognized 'how' parameter: {how}. Options are None, tuple, list and dict." ) - - return arrays - - def _arrays_GDS( - self, - expressions=None, # TODO: Not implemented yet - cut=None, # TODO: Not implemented yet - *, - filter_name=no_filter, - filter_typename=no_filter, - filter_field=no_filter, - aliases=None, # TODO: Not implemented yet - language=uproot.language.python.python_language, # TODO: Not implemented yet - entry_start=None, - entry_stop=None, - decompression_executor=None, # TODO: Not implemented yet - array_cache="inherit", # TODO: Not implemented yet - library="ak", # TODO: Not implemented yet - backend="cuda", # TODO: Not Implemented yet - ak_add_doc=False, - how=None, - # For compatibility reasons we also accepts kwargs meant for TTrees - interpretation_executor=None, - filter_branch=unset, - ): - """ - Current GDS support is limited to nvidia GPUs. The python library kvikIO is - a required dependency for Uproot GDS reading which can be installed by - calling pip install uproot[GDS_cuX] where X corresponds to the major cuda - version available on the user's system. - - Returns a group of arrays from the ``RNTuple``. - - For example: - - .. code-block:: python - - >>> my_ntuple.arrays(useGDS = True, backend = "cuda") - - - - """ - # This temporarily provides basic functionality while expressions are properly implemented - if expressions is not None: - if filter_name == no_filter: - filter_name = expressions - else: - raise ValueError( - "Expressions are not supported yet. They are currently equivalent to filter_name." - ) - - ##### - # Find clusters to read that contain data from entry_start to entry_stop - entry_start, entry_stop = ( - uproot.behaviors.TBranch._regularize_entries_start_stop( - self.num_entries, entry_start, entry_stop - ) - ) - clusters = self.ntuple.cluster_summaries - cluster_starts = numpy.array([c.num_first_entry for c in clusters]) - start_cluster_idx = ( - numpy.searchsorted(cluster_starts, entry_start, side="right") - 1 - ) - stop_cluster_idx = numpy.searchsorted(cluster_starts, entry_stop, side="right") - cluster_num_entries = numpy.sum( - [c.num_entries for c in clusters[start_cluster_idx:stop_cluster_idx]] - ) - - # Get form for requested columns - form = self.to_akform( - filter_name=filter_name, - filter_typename=filter_typename, - filter_field=filter_field, - filter_branch=filter_branch, - ) - - # Only read columns mentioned in the awkward form - target_cols = [] - container_dict = {} - - _recursive_find(form, target_cols) - - ##### - # Read and decompress all columns' data - clusters_datas = self.ntuple.gpu_read_clusters( - target_cols, start_cluster_idx, stop_cluster_idx - ) - clusters_datas._decompress() - ##### - # Deserialize decompressed datas - content_dict = self.ntuple.gpu_deserialize_decompressed_content( - clusters_datas, start_cluster_idx, stop_cluster_idx - ) - ##### - # Reconstitute arrays to an awkward array - container_dict = {} - # Debugging - for key in target_cols: - if "column" in key and "union" not in key: - key_nr = int(key.split("-")[1]) - - dtype_byte = self.ntuple.column_records[key_nr].type - content = content_dict[key_nr] - _fill_container_dict(container_dict, content, key, dtype_byte) - - cluster_offset = cluster_starts[start_cluster_idx] - entry_start -= cluster_offset - entry_stop -= cluster_offset - - arrays = uproot.extras.awkward().from_buffers( - form, - cluster_num_entries, - container_dict, - allow_noncanonical_form=True, - backend="cuda", - )[entry_start:entry_stop] - - # Free memory - del content_dict, container_dict, clusters_datas - return arrays def __array__(self, *args, **kwargs): @@ -972,6 +786,8 @@ def iterate( step_size="100 MB", decompression_executor=None, # TODO: Not implemented yet library="ak", # TODO: Not implemented yet + backend="cpu", + interpreter="cpu", ak_add_doc=False, how=None, report=False, # TODO: Not implemented yet @@ -1082,13 +898,15 @@ def iterate( ) # TODO: This can be done more efficiently for start in range(0, self.num_entries, step_size): - yield self._arrays( + yield self.arrays( filter_name=filter_name, filter_typename=filter_typename, filter_field=filter_field, entry_start=start, entry_stop=start + step_size, library=library, + backend=backend, + interpreter=interpreter, how=how, filter_branch=filter_branch, ) diff --git a/src/uproot/models/RNTuple.py b/src/uproot/models/RNTuple.py index 322d5d3c3..15f2a73e7 100644 --- a/src/uproot/models/RNTuple.py +++ b/src/uproot/models/RNTuple.py @@ -585,84 +585,66 @@ def field_form(self, this_id, keys, ak_add_doc=False): # everything should recurse above this branch raise AssertionError("this should be unreachable") - def read_pagedesc(self, destination, desc, dtype_str, dtype, nbits, split): + def read_pagedesc(self, destination, desc, field_metadata): """ Args: destination (numpy.ndarray): The array to fill. desc (:doc:`uproot.models.RNTuple.MetaData`): The page description. - dtype_str (str): The data type as a string. - dtype (numpy.dtype): The data type. - nbits (int): The number of bits. - split (bool): Whether the data is split. + field_metadata (:doc:`uproot.models.RNTuple.FieldClusterMetadata`): + The metadata needed to deserialize destination. Fills the destination array with the data from the page. """ loc = desc.locator context = {} - # bool in RNTuple is always stored as bits - isbit = dtype_str == "bit" num_elements = len(destination) - if isbit: + # Pages storing bits, real32trunc, and real32quant need num_elements + # corrected + if field_metadata.isbit: num_elements_toread = int(numpy.ceil(num_elements / 8)) - elif dtype_str in ("real32trunc", "real32quant"): - num_elements_toread = int(numpy.ceil((num_elements * 4 * nbits) / 32)) - dtype = numpy.dtype("uint8") + elif field_metadata.dtype_str in ("real32trunc", "real32quant"): + num_elements_toread = int( + numpy.ceil((num_elements * 4 * field_metadata.nbits) / 32) + ) else: num_elements_toread = num_elements - uncomp_size = num_elements_toread * dtype.itemsize + + uncomp_size = num_elements_toread * field_metadata.dtype_toread.itemsize decomp_chunk, cursor = self.read_locator(loc, uncomp_size, context) content = cursor.array( - decomp_chunk, num_elements_toread, dtype, context, move=False - ) - destination.view(dtype)[:num_elements_toread] = content[:num_elements_toread] - self.deserialize_page_decompressed_buffer( - destination, desc, dtype_str, dtype, nbits, split + decomp_chunk, + num_elements_toread, + field_metadata.dtype_toread, + context, + move=False, ) + destination.view(field_metadata.dtype_toread)[:num_elements_toread] = content[ + :num_elements_toread + ] + self.deserialize_page_decompressed_buffer(destination, field_metadata) - def read_col_pages( - self, ncol, cluster_range, dtype_byte, pad_missing_element=False - ): + def read_col_pages(self, ncol, cluster_range, pad_missing_element=False): """ Args: ncol (int): The column id. cluster_range (range): The range of cluster indices. - dtype_byte (int): The data type. pad_missing_element (bool): Whether to pad the missing elements. Returns a numpy array with the data from the column. """ - arrays = [self.read_col_page(ncol, i) for i in cluster_range] - - # Check if column stores offset values - if dtype_byte in uproot.const.rntuple_index_types: - # Extract the last offset values: - last_elements = [ - (arr[-1] if len(arr) > 0 else numpy.zeros((), dtype=arr.dtype)) - for arr in arrays[:-1] - ] # First value always zero, therefore skip first arr. - last_offsets = numpy.cumsum(last_elements) - for i in range(1, len(arrays)): - arrays[i] += last_offsets[i - 1] + field_metadata = self.get_field_metadata(ncol) + arrays = [self.read_col_page(ncol, i, field_metadata) for i in cluster_range] + res = self.combine_cluster_arrays(arrays, field_metadata, pad_missing_element) - res = numpy.concatenate(arrays, axis=0) - - # No longer needed; free memory - del arrays - - dtype_byte = self.column_records[ncol].type - if dtype_byte in uproot.const.rntuple_index_types: - res = numpy.insert(res, 0, 0) # for offsets - - if pad_missing_element: - first_element_index = self.column_records[ncol].first_element_index - res = numpy.pad(res, (first_element_index, 0)) return res - def read_col_page(self, ncol, cluster_i): + def read_col_page(self, ncol, cluster_i, field_metadata): """ Args: ncol (int): The column id. cluster_i (int): The cluster index. + field_metadata (:doc:`uproot.models.RNTuple.FieldClusterMetadata`): + The metadata needed to read the field's pages. Returns a numpy array with the data from the column. """ @@ -671,102 +653,81 @@ def read_col_page(self, ncol, cluster_i): if ncol < len(linklist) and linklist[ncol].suppressed: rel_crs = self._column_records_dict[self.column_records[ncol].field_id] ncol = next(cr.idx for cr in rel_crs if not linklist[cr.idx].suppressed) - pagelist = linklist[ncol].pages if ncol < len(linklist) else [] - dtype_byte = self.column_records[ncol].type - dtype_str = uproot.const.rntuple_col_num_to_dtype_dict[dtype_byte] - total_len = numpy.sum([desc.num_elements for desc in pagelist], dtype=int) - if dtype_str == "switch": - dtype = numpy.dtype([("index", "int64"), ("tag", "int32")]) - elif dtype_str == "bit": - dtype = numpy.dtype("bool") - elif dtype_byte in uproot.const.rntuple_custom_float_types: - dtype = numpy.dtype("uint32") # for easier bit manipulation - else: - dtype = numpy.dtype(dtype_str) - res = numpy.empty(total_len, dtype) - split = dtype_byte in uproot.const.rntuple_split_types - zigzag = dtype_byte in uproot.const.rntuple_zigzag_types - delta = dtype_byte in uproot.const.rntuple_delta_types - nbits = ( - self.column_records[ncol].nbits - if ncol < len(self.column_records) - else uproot.const.rntuple_col_num_to_size_dict[dtype_byte] + field_metadata = self.get_field_metadata( + ncol + ) # Update metadata if suppressed + pagelist = ( + linklist[field_metadata.ncol].pages + if field_metadata.ncol < len(linklist) + else [] ) + total_len = numpy.sum([desc.num_elements for desc in pagelist], dtype=int) + res = numpy.empty(total_len, field_metadata.dtype) + tracker = 0 cumsum = 0 for page_desc in pagelist: n_elements = page_desc.num_elements tracker_end = tracker + n_elements self.read_pagedesc( - res[tracker:tracker_end], page_desc, dtype_str, dtype, nbits, split + res[tracker:tracker_end], + page_desc, + field_metadata, ) - if delta: + if field_metadata.delta: res[tracker] -= cumsum cumsum += numpy.sum(res[tracker:tracker_end]) tracker = tracker_end - if zigzag: - res = _from_zigzag(res) - elif delta: - res = numpy.cumsum(res) - elif dtype_str == "real32trunc": - res = res.view(numpy.float32) - elif dtype_str == "real32quant" and ncol < len(self.column_records): - min_value = self.column_records[ncol].min_value - max_value = self.column_records[ncol].max_value - res = min_value + res.astype(numpy.float32) * (max_value - min_value) / ( - (1 << nbits) - 1 - ) - res = res.astype(numpy.float32) + res = self.post_process(res, field_metadata) return res - def gpu_read_clusters(self, columns, start_cluster_idx, stop_cluster_idx): + def gpu_read_clusters(self, fields, start_cluster_idx, stop_cluster_idx): """ Args: - columns (list): The target columns to read. + fields (list: str): The target fields to read. start_cluster_idx (int): The first cluster index containing entries in the range requested. stop_cluster_idx (int): The last cluster index containing entries in the range requested. - Returns a ClusterRefs containing ColRefs_Cluster for each cluster. Each - ColRefs_Cluster contains all ColBuffersCluster for each column in - columns. Each ColBuffersCluster contains the page buffers, decompression - target buffers, and compression metadata for a column in a given cluster. - - The ClusterRefs object contains all information needed for and performs - decompression in parallel over all compressed buffers. + Returns a ClusterRefs containing FieldRefsCluster for each cluster. Each + FieldRefsCluster contains all FieldPayload objects for each field in + fields. Each FieldPayload contains the page buffers, decompression + target buffers, and compression metadata for a field in a given cluster. """ cluster_range = range(start_cluster_idx, stop_cluster_idx) clusters_datas = ClusterRefs() - # Open filehandle and read columns for clusters filehandle = CuFileSource(self.file.source.file_path, "rb") # Iterate through each cluster for cluster_i in cluster_range: - colrefs_cluster = ColRefsCluster(cluster_i) - for key in columns: + colrefs_cluster = FieldRefsCluster(cluster_i) + for key in fields: if "column" in key and "union" not in key: - key_nr = int(key.split("-")[1]) - if key_nr not in colrefs_cluster.colbuffersclusters.keys(): + ncol = int(key.split("-")[1]) + field_metadata = self.get_field_metadata(ncol) + if ncol not in colrefs_cluster.fieldpayloads.keys(): Col_ClusterBuffers = self.gpu_read_col_cluster_pages( - key_nr, cluster_i, filehandle + ncol, cluster_i, filehandle, field_metadata ) - colrefs_cluster._add_Col(Col_ClusterBuffers) + colrefs_cluster._add_field(Col_ClusterBuffers) clusters_datas._add_cluster(colrefs_cluster) filehandle.get_all() return clusters_datas - def gpu_read_col_cluster_pages(self, ncol, cluster_i, filehandle): + def gpu_read_col_cluster_pages(self, ncol, cluster_i, filehandle, field_metadata): """ Args: ncol (int): The target column's key number. cluster_i (int): The cluster to read column data from. filehandle (uproot.source.cufile_interface.CuFileSource): CuFile filehandle interface which performs CuFile API calls. + field_metadata (:doc:`uproot.models.RNTuple.FieldClusterMetadata`): + The metadata needed to read the fields pages. - Returns a ColBuffersCluster containing raw page buffers, decompression + Returns a FieldPayload containing raw page buffers, decompression target buffers, and compression metadata. """ # Get cluster and pages metadatas @@ -777,6 +738,7 @@ def gpu_read_col_cluster_pages(self, ncol, cluster_i, filehandle): if linklist[ncol].suppressed: rel_crs = self._column_records_dict[self.column_records[ncol].field_id] ncol = next(cr.idx for cr in rel_crs if not linklist[cr.idx].suppressed) + field_metadata = self.get_field_metadata(ncol) linklist_col = linklist[ncol] pagelist = linklist_col.pages compression = linklist_col.compression_settings @@ -788,43 +750,24 @@ def gpu_read_col_cluster_pages(self, ncol, cluster_i, filehandle): algorithm_str = None compression_level = None - dtype_byte = self.column_records[ncol].type - - dtype_str = uproot.const.rntuple_col_num_to_dtype_dict[dtype_byte] - isbit = dtype_str == "bit" # Prepare full output buffer total_len = numpy.sum([desc.num_elements for desc in pagelist], dtype=int) - if dtype_str == "switch": - dtype = numpy.dtype([("index", "int64"), ("tag", "int32")]) - elif dtype_str == "bit": - dtype = numpy.dtype("bool") - elif dtype_byte in uproot.const.rntuple_custom_float_types: - dtype = numpy.dtype("uint32") # for easier bit manipulation - else: - dtype = numpy.dtype(dtype_str) - - full_output_buffer = cupy.empty(total_len, dtype=dtype) + full_output_buffer = cupy.empty(total_len, dtype=field_metadata.dtype) - nbits = ( - self.column_records[ncol].nbits - if ncol < len(self.column_records) - else uproot.const.rntuple_col_num_to_size_dict[dtype_byte] - ) - # Check if col compressed/decompressed - if isbit: # Need to correct length when dtype = bit + # Check if field compressed/decompressed + if field_metadata.isbit: # Need to correct length when dtype = bit total_len = int(numpy.ceil(total_len / 8)) - elif dtype_str in ("real32trunc", "real32quant"): - total_len = int(numpy.ceil((total_len * 4 * nbits) / 32)) - dtype = numpy.dtype("uint8") - - total_bytes = numpy.sum([desc.locator.num_bytes for desc in pagelist]) - - isCompressed = total_bytes != total_len * dtype.itemsize + elif field_metadata.dtype_str in ("real32trunc", "real32quant"): + total_len = int(numpy.ceil((total_len * 4 * field_metadata.nbits) / 32)) + total_raw_bytes = numpy.sum([desc.locator.num_bytes for desc in pagelist]) + page_is_compressed = ( + total_raw_bytes != total_len * field_metadata.dtype_toread.itemsize + ) - Cluster_Contents = ColBuffersCluster( + cluster_contents = FieldPayload( ncol_orig, full_output_buffer, - isCompressed, + page_is_compressed, algorithm_str, compression_level, ) @@ -833,14 +776,14 @@ def gpu_read_col_cluster_pages(self, ncol, cluster_i, filehandle): num_elements = page_desc.num_elements loc = page_desc.locator n_bytes = loc.num_bytes - if isbit: # Need to correct length when dtype = bit + if field_metadata.isbit: # Need to correct length when dtype = bit num_elements = int(numpy.ceil(num_elements / 8)) tracker_end = tracker + num_elements out_buff = full_output_buffer[tracker:tracker_end] # If compressed, skip 9 byte header - if isCompressed: + if page_is_compressed: # If LZ4, page contains additional 8-byte checksum offset = ( int(loc.offset + 9) @@ -857,15 +800,19 @@ def gpu_read_col_cluster_pages(self, ncol, cluster_i, filehandle): out_buff, size=int(n_bytes), file_offset=int(loc.offset) ) - Cluster_Contents._add_page(comp_buff) - Cluster_Contents._add_output(out_buff) + cluster_contents._add_page(comp_buff) + cluster_contents._add_output(out_buff) tracker = tracker_end - return Cluster_Contents + return cluster_contents def gpu_deserialize_decompressed_content( - self, clusters_datas, start_cluster_idx, stop_cluster_idx + self, + clusters_datas, + start_cluster_idx, + stop_cluster_idx, + pad_missing_element=False, ): """ Args: @@ -878,55 +825,36 @@ def gpu_deserialize_decompressed_content( Returns a dictionary containing contiguous buffers of deserialized data across requested clusters organized by column key. """ - cupy = uproot.extras.cupy() cluster_range = range(start_cluster_idx, stop_cluster_idx) col_arrays = {} # collect content for each col for key_nr in clusters_datas.columns: ncol = int(key_nr) + field_metadata = self.get_field_metadata(ncol) # Get uncompressed array for key for all clusters - col_decompressed_buffers = clusters_datas._grab_ColOutput(ncol) - dtype_byte = self.ntuple.column_records[ncol].type + col_decompressed_buffers = clusters_datas._grab_field_output(ncol) arrays = [] - for cluster_i in cluster_range: - # Get decompressed buffer corresponding to cluster i cluster_buffer = col_decompressed_buffers[cluster_i] - - self.gpu_deserialize_pages(cluster_buffer, ncol, cluster_i, arrays) - - if dtype_byte in uproot.const.rntuple_delta_types: - # Extract the last offset values: - last_elements = [ - arr[-1].get() for arr in arrays[:-1] - ] # First value always zero, therefore skip first arr. - # Compute cumulative sum using itertools.accumulate: - last_offsets = numpy.cumsum(last_elements) - - # Add the offsets to each array - for i in range(1, len(arrays)): - arrays[i] += last_offsets[i - 1] - # Remove the first element from every sub-array except for the first one: - arrays = [arrays[0]] + [arr[1:] for arr in arrays[1:]] - - res = cupy.concatenate(arrays, axis=0) - del arrays - if True: - first_element_index = self.column_records[ncol].first_element_index - res = cupy.pad(res, (first_element_index, 0)) - + cluster_buffer = self.gpu_deserialize_pages( + cluster_buffer, ncol, cluster_i, field_metadata + ) + arrays.append(cluster_buffer) + res = self.combine_cluster_arrays( + arrays, field_metadata, pad_missing_element + ) col_arrays[key_nr] = res return col_arrays - def gpu_deserialize_pages(self, cluster_buffer, ncol, cluster_i, arrays): + def gpu_deserialize_pages(self, cluster_buffer, ncol, cluster_i, field_metadata): """ Args: cluster_buffer (cupy.ndarray): Buffer to deserialize. ncol (int): The column's key number cluster_buffer originates from. cluster_i (int): The cluster cluster_buffer originates from. - arrays (list): Container for storing results of deserialization - across clusters. + field_metadata (:doc:`uproot.models.RNTuple.FieldClusterMetadata`): + The metadata needed to deserialize the field's pages. Returns nothing. Appends deserialized data buffer for ncol from cluster_i to arrays. @@ -938,100 +866,84 @@ def gpu_deserialize_pages(self, cluster_buffer, ncol, cluster_i, arrays): if linklist[ncol].suppressed: rel_crs = self._column_records_dict[self.column_records[ncol].field_id] ncol = next(cr.idx for cr in rel_crs if not linklist[cr.idx].suppressed) + field_metadata = self.get_field_metadata(ncol) linklist_col = linklist[ncol] pagelist = linklist_col.pages else: pagelist = [] - dtype_byte = self.column_records[ncol].type - dtype_str = uproot.const.rntuple_col_num_to_dtype_dict[dtype_byte] - - if dtype_str == "switch": - dtype = numpy.dtype([("index", "int64"), ("tag", "int32")]) - elif dtype_str == "bit": - dtype = numpy.dtype("bool") - elif dtype_byte in uproot.const.rntuple_custom_float_types: - dtype = numpy.dtype("uint32") # for easier bit manipulation - else: - dtype = numpy.dtype(dtype_str) - split = dtype_byte in uproot.const.rntuple_split_types - zigzag = dtype_byte in uproot.const.rntuple_zigzag_types - delta = dtype_byte in uproot.const.rntuple_delta_types - index = dtype_byte in uproot.const.rntuple_index_types - nbits = ( - self.column_records[ncol].nbits - if ncol < len(self.column_records) - else uproot.const.rntuple_col_num_to_size_dict[dtype_byte] - ) - # Begin looping through pages tracker = 0 cumsum = 0 for page_desc in pagelist: num_elements = page_desc.num_elements tracker_end = tracker + num_elements - # Get content associated with page page_buffer = cluster_buffer[tracker:tracker_end] - self.deserialize_page_decompressed_buffer( - page_buffer, page_desc, dtype_str, dtype, nbits, split - ) - if delta: + self.deserialize_page_decompressed_buffer(page_buffer, field_metadata) + if field_metadata.delta: cluster_buffer[tracker] -= cumsum cumsum += cupy.sum(cluster_buffer[tracker:tracker_end]) tracker = tracker_end - if index: - cluster_buffer = _cupy_insert0(cluster_buffer) # for offsets - if zigzag: - cluster_buffer = _from_zigzag(cluster_buffer) - elif delta: - cluster_buffer = cupy.cumsum(cluster_buffer) - elif dtype_str == "real32trunc": - cluster_buffer = cluster_buffer.view(cupy.float32) - elif dtype_str == "real32quant" and ncol < len(self.column_records): - min_value = self.column_records[ncol].min_value - max_value = self.column_records[ncol].max_value - cluster_buffer = min_value + cluster_buffer.astype(cupy.float32) * ( - max_value - min_value - ) / ((1 << nbits) - 1) - cluster_buffer = cluster_buffer.astype(cupy.float32) + cluster_buffer = self.post_process(cluster_buffer, field_metadata) + return cluster_buffer - arrays.append(cluster_buffer) + def post_process(self, buffer, field_metadata): + """ + Args: + buffer (library.ndarray): The buffer to post-process. + field_metadata (:doc:`uproot.models.RNTuple.FieldClusterMetadata`): + The metadata needed to post_process buffer. - def deserialize_page_decompressed_buffer( - self, destination, desc, dtype_str, dtype, nbits, split - ): + Returns post-processed buffer. + """ + array_library_string = uproot._util.get_array_library(buffer) + library = numpy if array_library_string == "numpy" else uproot.extras.cupy() + if field_metadata.zigzag: + buffer = _from_zigzag(buffer) + elif field_metadata.delta: + buffer = library.cumsum(buffer) + elif field_metadata.dtype_str == "real32trunc": + buffer = buffer.view(library.float32) + elif field_metadata.dtype_str == "real32quant" and field_metadata.ncol < len( + self.column_records + ): + min_value = self.column_records[field_metadata.ncol].min_value + max_value = self.column_records[field_metadata.ncol].max_value + buffer = min_value + buffer.astype(library.float32) * ( + max_value - min_value + ) / ((1 << field_metadata.nbits) - 1) + buffer = buffer.astype(library.float32) + return buffer + + def deserialize_page_decompressed_buffer(self, destination, field_metadata): """ Args: destination (cupy.ndarray): The array to fill. - desc (:doc:`uproot.models.RNTuple.MetaData`): The page description. - dtype_str (str): The data type as a string. - dtype (cupy.dtype): The data type. - nbits (int): The number of bits. - split (bool): Whether the data is split. + field_metadata (:doc:`uproot.models.RNTuple.FieldClusterMetadata`): + The metadata needed to deserialize destination. + Returns nothing. Edits destination buffer in-place with deserialized data. """ array_library_string = uproot._util.get_array_library(destination) library = numpy if array_library_string == "numpy" else uproot.extras.cupy() + num_elements = len(destination) - # bool in RNTuple is always stored as bits - isbit = dtype_str == "bit" - num_elements = desc.num_elements content = library.copy(destination) - if split: + if field_metadata.split: content = content.view(library.uint8) length = content.shape[0] - if nbits == 16: + if field_metadata.nbits == 16: # AAAAABBBBB needs to become # ABABABABAB res = library.empty(length, library.uint8) res[0::2] = content[length * 0 // 2 : length * 1 // 2] res[1::2] = content[length * 1 // 2 : length * 2 // 2] - elif nbits == 32: + elif field_metadata.nbits == 32: # AAAAABBBBBCCCCCDDDDD needs to become # ABCDABCDABCDABCDABCD res = library.empty(length, library.uint8) @@ -1040,7 +952,7 @@ def deserialize_page_decompressed_buffer( res[2::4] = content[length * 2 // 4 : length * 3 // 4] res[3::4] = content[length * 3 // 4 : length * 4 // 4] - elif nbits == 64: + elif field_metadata.nbits == 64: # AAAAABBBBBCCCCCDDDDDEEEEEFFFFFGGGGGHHHHH needs to become # ABCDEFGHABCDEFGHABCDEFGHABCDEFGHABCDEFGH res = library.empty(length, library.uint8) @@ -1052,24 +964,110 @@ def deserialize_page_decompressed_buffer( res[5::8] = content[length * 5 // 8 : length * 6 // 8] res[6::8] = content[length * 6 // 8 : length * 7 // 8] res[7::8] = content[length * 7 // 8 : length * 8 // 8] - content = res.view(dtype) + content = res.view(field_metadata.dtype) - if isbit: + if field_metadata.isbit: content = library.unpackbits( destination.view(dtype=library.uint8), bitorder="little" ) - elif dtype_str in ("real32trunc", "real32quant"): - if nbits == 32: + elif field_metadata.dtype_str in ("real32trunc", "real32quant"): + if field_metadata.nbits == 32: content = library.copy(destination).view(library.uint32) else: content = library.copy(destination) - content = _extract_bits(content, nbits) - if dtype_str == "real32trunc": - content <<= 32 - nbits + content = _extract_bits(content, field_metadata.nbits) + if field_metadata.dtype_str == "real32trunc": + content <<= 32 - field_metadata.nbits # needed to chop off extra bits incase we used `unpackbits` destination[:] = content[:num_elements] + def get_field_metadata(self, ncol): + """ + Args: + ncol (int): The column id. + + Returns a uproot.models.RNTuple.FieldClusterMetadata which provides + metadata needed for processing payload data associated with column ncol. + """ + dtype_byte = self.column_records[ncol].type + dtype_str = uproot.const.rntuple_col_num_to_dtype_dict[dtype_byte] + isbit = dtype_str == "bit" + if dtype_str == "switch": + dtype = numpy.dtype([("index", "int64"), ("tag", "int32")]) + elif dtype_str == "bit": + dtype = numpy.dtype("bool") + elif dtype_byte in uproot.const.rntuple_custom_float_types: + dtype = numpy.dtype("uint32") # for easier bit manipulation + else: + dtype = numpy.dtype(dtype_str) + split = dtype_byte in uproot.const.rntuple_split_types + zigzag = dtype_byte in uproot.const.rntuple_zigzag_types + delta = dtype_byte in uproot.const.rntuple_delta_types + nbits = ( + self.column_records[ncol].nbits + if ncol < len(self.column_records) + else uproot.const.rntuple_col_num_to_size_dict[dtype_byte] + ) + if dtype_str in ("real32trunc", "real32quant"): + dtype_toread = numpy.dtype("uint8") + else: + dtype_toread = dtype + field_metadata = FieldClusterMetadata( + ncol, + dtype_byte, + dtype_str, + dtype, + dtype_toread, + split, + zigzag, + delta, + isbit, + nbits, + ) + return field_metadata + + def combine_cluster_arrays(self, arrays, field_metadata, pad_missing_element): + """ + Args: + arrays (list): A list of arrays to combine. + field_metadata (:doc:`uproot.models.RNTuple.FieldClusterMetadata`): + The metadata needed to combine arrays. + pad_missing_element (bool): Whether to pad for missing elements + + Returns a field's page arrays concatenated todgether. + """ + array_library_string = uproot._util.get_array_library(arrays[0]) + library = numpy if array_library_string == "numpy" else uproot.extras.cupy() + + # Check if column stores offset values + if field_metadata.dtype_byte in uproot.const.rntuple_index_types: + # Extract the last offset values: + last_elements = [ + (arr[-1] if len(arr) > 0 else library.zeros((), dtype=arr.dtype)) + for arr in arrays[:-1] + ] # First value always zero, therefore skip first arr. + last_offsets = library.cumsum(library.array(last_elements)) + for i in range(1, len(arrays)): + arrays[i] += last_offsets[i - 1] + + res = library.concatenate(arrays, axis=0) + + # No longer needed; free memory + del arrays + + if field_metadata.dtype_byte in uproot.const.rntuple_index_types: + # for offsets + res = numpy.insert(res, 0, 0) if library == numpy else _cupy_insert0(res) + + if pad_missing_element: + first_element_index = self.column_records[ + field_metadata.ncol + ].first_element_index + res = numpy.pad(res, (first_element_index, 0)) + + return res + def _extract_bits(packed, nbits): """ @@ -1541,12 +1539,12 @@ def array( decompression_executor=None, # TODO: Not implemented yet array_cache="inherit", # TODO: Not implemented yet library="ak", + interpreter="cpu", + backend="cpu", ak_add_doc=False, # For compatibility reasons we also accepts kwargs meant for TTrees interpretation=None, interpretation_executor=None, - use_GDS=False, - backend="cpu", ): """ Args: @@ -1595,9 +1593,9 @@ def array( entry_start=entry_start, entry_stop=entry_stop, library=library, - ak_add_doc=ak_add_doc, - use_GDS=use_GDS, + interpreter=interpreter, backend=backend, + ak_add_doc=ak_add_doc, )[self.name] @@ -1617,9 +1615,28 @@ def _cupy_insert0(arr): @dataclasses.dataclass -class ColBuffersCluster: +class FieldClusterMetadata: + """ + A FieldClusterMetadata stores metadata for a given field within a cluster + needed to read, decompress, and deserialize the data. + """ + + ncol: int + dtype_byte: type + dtype_str: str + dtype: numpy.dtype + dtype_toread: numpy.dtype + split: bool + zigzag: bool + delta: bool + isbit: bool + nbits: int + + +@dataclasses.dataclass +class FieldPayload: """ - A ColBuffersCluster contains the compressed and decompression target output + A FieldPayload contains the compressed and decompression target output buffers for a particular column in a particular cluster of all pages. It contains pointers to portions of the cluster data which correspond to the different pages of that cluster. @@ -1627,7 +1644,7 @@ class ColBuffersCluster: key: str data: CupyArray # Type: ignore - isCompressed: bool + page_is_compressed: bool algorithm: str compression_level: int pages: list[CupyArray] = dataclasses.field(default_factory=list) @@ -1640,49 +1657,48 @@ def _add_output(self, buffer: CupyArray): self.output.append(buffer) def _decompress(self): - if self.isCompressed and self.algorithm is not None: + if self.page_is_compressed and self.algorithm is not None: kvikio_nvcomp_codec = uproot.extras.kvikio_nvcomp_codec() codec = kvikio_nvcomp_codec.NvCompBatchCodec(self.algorithm) codec.decode_batch(self.pages, self.output) @dataclasses.dataclass -class ColRefsCluster: +class FieldRefsCluster: """ - A ColRefsCluster contains the ColBuffersCluster for all requested columns - in a given cluster. Columns are separated by whether they are compressed or - uncompressed. Compressed columns can be decompressed. + A FieldRefsCluster contains the FieldPayload for all requested fields + in a given cluster. """ cluster_i: int - colbuffersclusters: dict[str, ColBuffersCluster] = dataclasses.field( - default_factory=dict - ) + fieldpayloads: dict[str, FieldPayload] = dataclasses.field(default_factory=dict) - def _add_Col(self, ColBuffersCluster): - self.colbuffersclusters[ColBuffersCluster.key] = ColBuffersCluster + def _add_field(self, FieldPayload): + self.fieldpayloads[FieldPayload.key] = FieldPayload @dataclasses.dataclass class ClusterRefs: - """ " - A ClusterRefs contains the ColRefs_Cluster for multiple clusters. + """ + A ClusterRefs contains the FieldRefsCluster for multiple clusters. It also + contains routines for steering and executing parallel decompression of + payload datas and for accessing field payload datas across multiple clusters. """ clusters: [int] = dataclasses.field(default_factory=list) columns: list[str] = dataclasses.field(default_factory=list) - refs: dict[int:ColRefsCluster] = dataclasses.field(default_factory=dict) + refs: dict[int:FieldRefsCluster] = dataclasses.field(default_factory=dict) def _add_cluster(self, Cluster): - for nCol in Cluster.colbuffersclusters.keys(): + for nCol in Cluster.fieldpayloads.keys(): if nCol not in self.columns: self.columns.append(nCol) self.refs[Cluster.cluster_i] = Cluster - def _grab_ColOutput(self, nCol): + def _grab_field_output(self, nCol): output_list = [] for cluster in self.refs.values(): - colbuffer = cluster.colbuffersclusters[nCol].data + colbuffer = cluster.fieldpayloads[nCol].data output_list.append(colbuffer) return output_list @@ -1692,27 +1708,27 @@ def _decompress(self): target = {} # organize data by compression algorithm for cluster in self.refs.values(): - for colbuffers in cluster.colbuffersclusters.values(): - if colbuffers.algorithm is not None: - if colbuffers.algorithm not in to_decompress.keys(): - to_decompress[colbuffers.algorithm] = [] - target[colbuffers.algorithm] = [] - if colbuffers.isCompressed: - to_decompress[colbuffers.algorithm].extend(colbuffers.pages) - target[colbuffers.algorithm].extend(colbuffers.output) + for fieldpayload in cluster.fieldpayloads.values(): + if fieldpayload.algorithm is not None: + if fieldpayload.algorithm not in to_decompress.keys(): + to_decompress[fieldpayload.algorithm] = [] + target[fieldpayload.algorithm] = [] + if fieldpayload.page_is_compressed: + to_decompress[fieldpayload.algorithm].extend(fieldpayload.pages) + target[fieldpayload.algorithm].extend(fieldpayload.output) # Batch decompress + kvikio_nvcomp_codec = uproot.extras.kvikio_nvcomp_codec() for algorithm, batch in to_decompress.items(): - kvikio_nvcomp_codec = uproot.extras.kvikio_nvcomp_codec() codec = kvikio_nvcomp_codec.NvCompBatchCodec(algorithm) codec.decode_batch(batch, target[algorithm]) # Clean up compressed buffers from memory after decompression for cluster in self.refs.values(): - for colbuffers in cluster.colbuffersclusters.values(): + for fieldpayload in cluster.fieldpayloads.values(): # Clear python references to GPU memory - del colbuffers.pages - colbuffers.pages = [] + del fieldpayload.pages + fieldpayload.pages = [] # Tell GPU to free unused memory blocks cupy = uproot.extras.cupy() mempool = cupy.get_default_memory_pool() diff --git a/tests-cuda/test_0630_rntuple_basics.py b/tests-cuda/test_0630_rntuple_basics.py index beb2d6c23..afe41131f 100644 --- a/tests-cuda/test_0630_rntuple_basics.py +++ b/tests-cuda/test_0630_rntuple_basics.py @@ -14,10 +14,10 @@ @pytest.mark.parametrize( - ("backend", "GDS", "library"), - [("cuda", False, cupy), ("cuda", True, cupy)], + ("backend", "interpreter", "library"), + [("cuda", "cpu", cupy), ("cuda", "gpu", cupy)], ) -def test_flat(backend, GDS, library): +def test_flat(backend, interpreter, library): filename = skhep_testdata.data_path("test_int_float_rntuple_v1-0-0-0.root") with uproot.open(filename) as f: R = f["ntuple"] @@ -28,19 +28,21 @@ def test_flat(backend, GDS, library): ] assert R.header.checksum == R.footer.header_checksum assert all( - R.arrays(entry_stop=3, use_GDS=GDS, backend=backend)["one_integers"] + R.arrays(entry_stop=3, interpreter=interpreter, backend=backend)[ + "one_integers" + ] == library.array([9, 8, 7]) ) assert all( - R.arrays("one_integers", entry_stop=3, use_GDS=GDS, backend=backend)[ - "one_integers" - ] + R.arrays( + "one_integers", entry_stop=3, interpreter=interpreter, backend=backend + )["one_integers"] == library.array([9, 8, 7]) ) assert all( - R.arrays(entry_start=1, entry_stop=3, use_GDS=GDS, backend=backend)[ - "one_integers" - ] + R.arrays( + entry_start=1, entry_stop=3, interpreter=interpreter, backend=backend + )["one_integers"] == library.array([8, 7]) ) @@ -48,10 +50,14 @@ def test_flat(backend, GDS, library): with uproot.open(filename) as f: R = f["ntuple"] assert all( - R.arrays(entry_stop=3, use_GDS=GDS, backend=backend)["one_integers"] + R.arrays(entry_stop=3, interpreter=interpreter, backend=backend)[ + "one_integers" + ] == library.array([50000, 49999, 49998]) ) assert all( - R.arrays(entry_start=-3, use_GDS=GDS, backend=backend)["one_integers"] + R.arrays(entry_start=-3, interpreter=interpreter, backend=backend)[ + "one_integers" + ] == library.array([3, 2, 1]) ) diff --git a/tests-cuda/test_0662_rntuple_stl_containers.py b/tests-cuda/test_0662_rntuple_stl_containers.py index d0a058247..c704bcdb3 100644 --- a/tests-cuda/test_0662_rntuple_stl_containers.py +++ b/tests-cuda/test_0662_rntuple_stl_containers.py @@ -13,8 +13,9 @@ ) -@pytest.mark.parametrize(("backend", "GDS", "library"), [("cuda", False, cupy)]) -def test_rntuple_stl_containers(backend, GDS, library): +# GPU Interpretation not yet supported +@pytest.mark.parametrize(("backend", "interpreter", "library"), [("cuda", "cpu", cupy)]) +def test_rntuple_stl_containers(backend, interpreter, library): filename = skhep_testdata.data_path("test_stl_containers_rntuple_v1-0-0-0.root") with uproot.open(filename) as f: R = f["ntuple"] @@ -33,7 +34,7 @@ def test_rntuple_stl_containers(backend, GDS, library): "lorentz_vector", "array_lv", ] - r = R.arrays(backend=backend, use_GDS=GDS) + r = R.arrays(backend=backend, interpreter=interpreter) assert ak.all(r["string"] == ["one", "two", "three", "four", "five"]) assert r["vector_int32"][0] == [1] diff --git a/tests-cuda/test_0962_rntuple_update.py b/tests-cuda/test_0962_rntuple_update.py index b236559f0..f4ad24a0d 100644 --- a/tests-cuda/test_0962_rntuple_update.py +++ b/tests-cuda/test_0962_rntuple_update.py @@ -14,41 +14,41 @@ @pytest.mark.parametrize( - ("backend", "GDS", "library"), - [("cuda", False, cupy), ("cuda", True, cupy)], + ("backend", "interpreter", "library"), + [("cuda", "cpu", cupy), ("cuda", "gpu", cupy)], ) -def test_new_support_RNTuple_split_int32_reading(backend, GDS, library): +def test_new_support_RNTuple_split_int32_reading(backend, interpreter, library): with uproot.open( skhep_testdata.data_path("test_int_5e4_rntuple_v1-0-0-0.root") ) as f: obj = f["ntuple"] - df = obj.arrays(backend=backend, use_GDS=GDS) + df = obj.arrays(backend=backend, interpreter=interpreter) assert len(df) == 5e4 assert len(df.one_integers) == 5e4 assert ak.all(df.one_integers == library.arange(5e4 + 1)[::-1][:-1]) @pytest.mark.parametrize( - ("backend", "GDS", "library"), - [("cuda", False, cupy), ("cuda", True, cupy)], + ("backend", "interpreter", "library"), + [("cuda", "cpu", cupy), ("cuda", "gpu", cupy)], ) -def test_new_support_RNTuple_bit_bool_reading(backend, GDS, library): +def test_new_support_RNTuple_bit_bool_reading(backend, interpreter, library): with uproot.open(skhep_testdata.data_path("test_bit_rntuple_v1-0-0-0.root")) as f: obj = f["ntuple"] - df = obj.arrays(backend=backend, use_GDS=GDS) + df = obj.arrays(backend=backend, interpreter=interpreter) assert ak.all(df.one_bit == library.asarray([1, 0, 0, 1, 0, 0, 1, 0, 0, 1])) @pytest.mark.parametrize( - ("backend", "GDS", "library"), - [("cuda", False, cupy), ("cuda", True, cupy)], + ("backend", "interpreter", "library"), + [("cuda", "cpu", cupy), ("cuda", "gpu", cupy)], ) -def test_new_support_RNTuple_split_int16_reading(backend, GDS, library): +def test_new_support_RNTuple_split_int16_reading(backend, interpreter, library): with uproot.open( skhep_testdata.data_path("test_int_multicluster_rntuple_v1-0-0-0.root") ) as f: obj = f["ntuple"] - df = obj.arrays(backend=backend, use_GDS=GDS) + df = obj.arrays(backend=backend, interpreter=interpreter) assert len(df.one_integers) == 1e8 assert df.one_integers[0] == 2 assert df.one_integers[-1] == 1 diff --git a/tests-cuda/test_1159_rntuple_cluster_groups.py b/tests-cuda/test_1159_rntuple_cluster_groups.py index 353516503..13fbd51f8 100644 --- a/tests-cuda/test_1159_rntuple_cluster_groups.py +++ b/tests-cuda/test_1159_rntuple_cluster_groups.py @@ -14,10 +14,10 @@ @pytest.mark.parametrize( - ("backend", "GDS", "library"), - [("cuda", False, cupy), ("cuda", True, cupy)], + ("backend", "interpreter", "library"), + [("cuda", "cpu", cupy), ("cuda", "gpu", cupy)], ) -def test_multiple_cluster_groups(backend, GDS, library): +def test_multiple_cluster_groups(backend, interpreter, library): filename = skhep_testdata.data_path( "test_multiple_cluster_groups_rntuple_v1-0-0-0.root" ) @@ -32,7 +32,7 @@ def test_multiple_cluster_groups(backend, GDS, library): assert obj.num_entries == 1000 - arrays = obj.arrays(backend=backend, use_GDS=GDS) + arrays = obj.arrays(backend=backend, interpreter=interpreter) assert ak.all(arrays.one == library.array(list(range(1000)))) assert ak.all( diff --git a/tests-cuda/test_1191_rntuple_fixes.py b/tests-cuda/test_1191_rntuple_fixes.py index cfc100cd1..22510ab97 100644 --- a/tests-cuda/test_1191_rntuple_fixes.py +++ b/tests-cuda/test_1191_rntuple_fixes.py @@ -14,10 +14,10 @@ @pytest.mark.parametrize( - ("backend", "GDS", "library"), - [("cuda", False, cupy), ("cuda", True, cupy)], + ("backend", "interpreter", "library"), + [("cuda", "cpu", cupy), ("cuda", "gpu", cupy)], ) -def test_schema_extension(backend, GDS, library): +def test_schema_extension(backend, interpreter, library): filename = skhep_testdata.data_path("test_extension_columns_rntuple_v1-0-0-0.root") with uproot.open(filename) as f: obj = f["ntuple"] @@ -29,7 +29,7 @@ def test_schema_extension(backend, GDS, library): assert obj.column_records[1].first_element_index == 200 assert obj.column_records[2].first_element_index == 400 - arrays = obj.arrays(backend=backend, use_GDS=GDS) + arrays = obj.arrays(backend=backend, interpreter=interpreter) assert len(arrays.float_field) == 600 assert len(arrays.intvec_field) == 600 @@ -42,48 +42,51 @@ def test_schema_extension(backend, GDS, library): @pytest.mark.parametrize( - ("backend", "GDS", "library"), - [("cuda", False, cupy), ("cuda", True, cupy)], + ("backend", "interpreter", "library"), + [("cuda", "cpu", cupy), ("cuda", "gpu", cupy)], ) -def test_rntuple_cardinality(backend, GDS, library): +def test_rntuple_cardinality(backend, interpreter, library): filename = skhep_testdata.data_path( "Run2012BC_DoubleMuParked_Muons_1000evts_rntuple_v1-0-0-0.root" ) with uproot.open(filename) as f: obj = f["Events"] - arrays = obj.arrays(backend=backend, use_GDS=GDS) + arrays = obj.arrays(backend=backend, interpreter=interpreter) assert ak.all( arrays["nMuon"] == library.array([len(l) for l in arrays["Muon_pt"]]) ) @pytest.mark.parametrize( - ("backend", "GDS", "library"), - [("cuda", True, cupy)], + ("backend", "interpreter", "library"), + [("cuda", "gpu", cupy)], ) -def test_multiple_page_delta_encoding_GDS(backend, GDS, library): +def test_multiple_page_delta_encoding_GDS(backend, interpreter, library): filename = skhep_testdata.data_path("test_index_multicluster_rntuple_v1-0-0-0.root") with uproot.open(filename) as f: obj = f["ntuple"] filehandle = uproot.source.cufile_interface.CuFileSource(filename, "rb") - col_clusterbuffers = obj.gpu_read_col_cluster_pages(0, 0, filehandle) + field_metadata = obj.get_field_metadata(0) + col_clusterbuffers = obj.gpu_read_col_cluster_pages( + 0, 0, filehandle, field_metadata + ) filehandle.get_all() col_clusterbuffers._decompress() - data = [] - obj.gpu_deserialize_pages(col_clusterbuffers.data, 0, 0, data) - assert data[0][64] - data[0][63] == 2 + data = obj.gpu_deserialize_pages(col_clusterbuffers.data, 0, 0, field_metadata) + assert data[64] - data[63] == 2 @pytest.mark.parametrize( - ("backend", "GDS", "library"), [("cuda", False, cupy), ("cuda", True, cupy)] + ("backend", "interpreter", "library"), + [("cuda", "cpu", cupy), ("cuda", "gpu", cupy)], ) -def test_split_encoding(backend, GDS, library): +def test_split_encoding(backend, interpreter, library): filename = skhep_testdata.data_path( "Run2012BC_DoubleMuParked_Muons_1000evts_rntuple_v1-0-0-0.root" ) with uproot.open(filename) as f: obj = f["Events"] - arrays = obj.arrays(backend=backend, use_GDS=GDS) + arrays = obj.arrays(backend=backend, interpreter=interpreter) expected_pt = library.array([10.763696670532227, 15.736522674560547]) expected_charge = library.array([-1, -1]) diff --git a/tests-cuda/test_1223_more_rntuple_types.py b/tests-cuda/test_1223_more_rntuple_types.py index 1b3895fed..7d10a3f63 100644 --- a/tests-cuda/test_1223_more_rntuple_types.py +++ b/tests-cuda/test_1223_more_rntuple_types.py @@ -14,29 +14,29 @@ @pytest.mark.parametrize( - ("backend", "GDS", "library"), - [("cuda", False, cupy), ("cuda", True, cupy)], + ("backend", "interpreter", "library"), + [("cuda", "cpu", cupy), ("cuda", "gpu", cupy)], ) -def test_atomic(backend, GDS, library): +def test_atomic(backend, interpreter, library): filename = skhep_testdata.data_path("test_atomic_bitset_rntuple_v1-0-0-0.root") with uproot.open(filename) as f: obj = f["ntuple"] - a = obj.arrays("atomic_int", backend=backend, use_GDS=GDS) + a = obj.arrays("atomic_int", backend=backend, interpreter=interpreter) assert ak.all(a.atomic_int == library.array([1, 2, 3])) @pytest.mark.parametrize( - ("backend", "GDS", "library"), - [("cuda", False, cupy), ("cuda", True, cupy)], + ("backend", "interpreter", "library"), + [("cuda", "cpu", cupy), ("cuda", "gpu", cupy)], ) -def test_bitset(backend, GDS, library): +def test_bitset(backend, interpreter, library): filename = skhep_testdata.data_path("test_atomic_bitset_rntuple_v1-0-0-0.root") with uproot.open(filename) as f: obj = f["ntuple"] - a = obj.arrays("bitset", backend=backend, use_GDS=GDS) + a = obj.arrays("bitset", backend=backend, interpreter=interpreter) assert len(a.bitset) == 3 assert len(a.bitset[0]) == 42 @@ -93,17 +93,17 @@ def test_bitset(backend, GDS, library): @pytest.mark.parametrize( - ("backend", "GDS", "library"), - [("cuda", False, cupy), ("cuda", True, cupy)], + ("backend", "interpreter", "library"), + [("cuda", "cpu", cupy), ("cuda", "gpu", cupy)], ) -def test_empty_struct(backend, GDS, library): +def test_empty_struct(backend, interpreter, library): filename = skhep_testdata.data_path( "test_emptystruct_invalidvar_rntuple_v1-0-0-0.root" ) with uproot.open(filename) as f: obj = f["ntuple"] - a = obj.arrays("empty_struct", backend=backend, use_GDS=GDS) + a = obj.arrays("empty_struct", backend=backend, interpreter=interpreter) assert a.empty_struct.tolist() == [(), (), ()] @@ -117,6 +117,6 @@ def test_invalid_variant(): with uproot.open(filename) as f: obj = f["ntuple"] - a = obj.arrays("variant.*", backend="cuda", use_GDS=False) + a = obj.arrays("variant.*", backend="cuda", interpreter="cpu") assert a.variant.tolist() == [1, None, {"i": 2}] diff --git a/tests-cuda/test_1250_rntuple_improvements.py b/tests-cuda/test_1250_rntuple_improvements.py index dd958bff3..ce3269b94 100644 --- a/tests-cuda/test_1250_rntuple_improvements.py +++ b/tests-cuda/test_1250_rntuple_improvements.py @@ -14,42 +14,38 @@ @pytest.mark.parametrize( - ("backend", "GDS", "library"), - [ - ("cuda", False, cupy), - ("cuda", True, cupy), - ], + ("backend", "interpreter", "library"), + [("cuda", "cpu", cupy), ("cuda", "gpu", cupy)], ) -def test_array_methods(backend, GDS, library): +def test_array_methods(backend, interpreter, library): filename = skhep_testdata.data_path( "Run2012BC_DoubleMuParked_Muons_1000evts_rntuple_v1-0-0-0.root" ) with uproot.open(filename) as f: obj = f["Events"] - nMuon_array = obj["nMuon"].array(backend=backend, use_GDS=GDS) - Muon_pt_array = obj["Muon_pt"].array(backend=backend, use_GDS=GDS) + nMuon_array = obj["nMuon"].array(backend=backend, interpreter=interpreter) + Muon_pt_array = obj["Muon_pt"].array(backend=backend, interpreter=interpreter) assert ak.all(nMuon_array == library.array([len(l) for l in Muon_pt_array])) - nMuon_arrays = obj["nMuon"].arrays(backend=backend, use_GDS=GDS) + nMuon_arrays = obj["nMuon"].arrays(backend=backend, interpreter=interpreter) assert len(nMuon_arrays.fields) == 1 assert len(nMuon_arrays) == 1000 assert ak.all(nMuon_arrays["nMuon"] == nMuon_array) @pytest.mark.parametrize( - ("backend", "GDS", "library"), - [ - ("cuda", False, cupy), - ("cuda", True, cupy), - ], + ("backend", "interpreter", "library"), + [("cuda", "cpu", cupy), ("cuda", "gpu", cupy)], ) -def test_iterate(backend, GDS, library): +def test_iterate(backend, interpreter, library): filename = skhep_testdata.data_path( "Run2012BC_DoubleMuParked_Muons_1000evts_rntuple_v1-0-0-0.root" ) with uproot.open(filename) as f: obj = f["Events"] - for i, arrays in enumerate(obj.iterate(step_size=100)): + for i, arrays in enumerate( + obj.iterate(step_size=100, backend=backend, interpreter=interpreter) + ): assert len(arrays) == 100 if i == 0: expected_pt = [10.763696670532227, 15.736522674560547] @@ -57,7 +53,9 @@ def test_iterate(backend, GDS, library): assert arrays["Muon_pt"][0].tolist() == expected_pt assert arrays["Muon_charge"][0].tolist() == expected_charge - for i, arrays in enumerate(obj.iterate(step_size="10 kB")): + for i, arrays in enumerate( + obj.iterate(step_size="10 kB", backend=backend, interpreter=interpreter) + ): if i == 0: assert len(arrays) == 384 expected_pt = [10.763696670532227, 15.736522674560547] @@ -72,13 +70,17 @@ def test_iterate(backend, GDS, library): assert False Muon_pt = obj["Muon_pt"] - for i, arrays in enumerate(Muon_pt.iterate(step_size=100)): + for i, arrays in enumerate( + Muon_pt.iterate(step_size=100, backend=backend, interpreter=interpreter) + ): assert len(arrays) == 100 if i == 0: expected_pt = [10.763696670532227, 15.736522674560547] assert arrays["Muon_pt"][0].tolist() == expected_pt - for i, arrays in enumerate(Muon_pt.iterate(step_size="5 kB")): + for i, arrays in enumerate( + Muon_pt.iterate(step_size="5 kB", backend=backend, interpreter=interpreter) + ): if i == 0: assert len(arrays) == 611 expected_pt = [10.763696670532227, 15.736522674560547] diff --git a/tests-cuda/test_1285_rntuple_multicluster_concatenation.py b/tests-cuda/test_1285_rntuple_multicluster_concatenation.py index 7d50d781a..e19ec9046 100644 --- a/tests-cuda/test_1285_rntuple_multicluster_concatenation.py +++ b/tests-cuda/test_1285_rntuple_multicluster_concatenation.py @@ -14,15 +14,15 @@ @pytest.mark.parametrize( - ("backend", "GDS", "library"), - [("cuda", False, cupy), ("cuda", True, cupy)], + ("backend", "interpreter", "library"), + [("cuda", "cpu", cupy), ("cuda", "gpu", cupy)], ) -def test_schema_extension(backend, GDS, library): +def test_schema_extension(backend, interpreter, library): filename = skhep_testdata.data_path("test_index_multicluster_rntuple_v1-0-0-0.root") with uproot.open(filename) as f: obj = f["ntuple"] - arrays = obj.arrays(backend=backend, use_GDS=GDS) + arrays = obj.arrays(backend=backend, interpreter=interpreter) int_vec_array = arrays["int_vector"] for j in range(2): diff --git a/tests-cuda/test_1347_rntuple_floats_suppressed_cols.py b/tests-cuda/test_1347_rntuple_floats_suppressed_cols.py index 77ced1043..bfbf96c1d 100644 --- a/tests-cuda/test_1347_rntuple_floats_suppressed_cols.py +++ b/tests-cuda/test_1347_rntuple_floats_suppressed_cols.py @@ -32,15 +32,15 @@ def quantize_float(value, bits, min, max): @pytest.mark.parametrize( - ("backend", "GDS", "library"), - [("cuda", False, cupy), ("cuda", True, cupy)], + ("backend", "interpreter", "library"), + [("cuda", "cpu", cupy), ("cuda", "gpu", cupy)], ) -def test_custom_floats(backend, GDS, library): +def test_custom_floats(backend, interpreter, library): filename = skhep_testdata.data_path("test_float_types_rntuple_v1-0-0-0.root") with uproot.open(filename) as f: obj = f["ntuple"] - arrays = obj.arrays(backend=backend, use_GDS=GDS) + arrays = obj.arrays(backend=backend, interpreter=interpreter) min_value = -2.0 max_value = 3.0 @@ -162,10 +162,10 @@ def test_custom_floats(backend, GDS, library): @pytest.mark.parametrize( - ("backend", "GDS", "library"), - [("cuda", False, cupy), ("cuda", True, cupy)], + ("backend", "interpreter", "library"), + [("cuda", "cpu", cupy), ("cuda", "gpu", cupy)], ) -def test_multiple_representations(backend, GDS, library): +def test_multiple_representations(backend, interpreter, library): filename = skhep_testdata.data_path( "test_multiple_representations_rntuple_v1-0-0-0.root" ) @@ -178,6 +178,6 @@ def test_multiple_representations(backend, GDS, library): assert obj.page_link_list[1][0].suppressed assert not obj.page_link_list[2][0].suppressed - arrays = obj.arrays(backend=backend, use_GDS=GDS) + arrays = obj.arrays(backend=backend, interpreter=interpreter) assert library.allclose(arrays.real, library.array([1, 2, 3])) diff --git a/tests-cuda/test_1411_rntuple_physlite_ATLAS.py b/tests-cuda/test_1411_rntuple_physlite_ATLAS.py index bc255ec25..2066dbb70 100644 --- a/tests-cuda/test_1411_rntuple_physlite_ATLAS.py +++ b/tests-cuda/test_1411_rntuple_physlite_ATLAS.py @@ -26,10 +26,10 @@ def physlite_file(): @pytest.mark.parametrize( - ("backend", "GDS", "library"), - [("cuda", False, cupy), ("cuda", True, cupy)], + ("backend", "interpreter", "library"), + [("cuda", "cpu", cupy), ("cuda", "gpu", cupy)], ) -def test_analysis_muons_kinematics(physlite_file, backend, GDS, library): +def test_analysis_muons_kinematics(physlite_file, backend, interpreter, library): """Test that kinematic variables of AnalysisMuons can be read and match expected length.""" cols = [ "AnalysisMuonsAuxDyn:pt", @@ -41,7 +41,7 @@ def test_analysis_muons_kinematics(physlite_file, backend, GDS, library): arrays = {} for col in cols: assert col in physlite_file.keys(), f"Column '{col}' not found" - arrays[col] = physlite_file[col].array(backend=backend, use_GDS=GDS) + arrays[col] = physlite_file[col].array(backend=backend, interpreter=interpreter) # Check same structure, number of total muons, and values n_expected_muons = 88 @@ -59,10 +59,10 @@ def test_analysis_muons_kinematics(physlite_file, backend, GDS, library): @pytest.mark.parametrize( - ("backend", "GDS", "library"), - [("cuda", False, cupy), ("cuda", True, cupy)], + ("backend", "interpreter", "library"), + [("cuda", "cpu", cupy), ("cuda", "gpu", cupy)], ) -def test_event_info(physlite_file, backend, GDS, library): +def test_event_info(physlite_file, backend, interpreter, library): """Test that eventInfo variables can be read and match expected first event.""" cols = [ "EventInfoAuxDyn:eventNumber", @@ -73,7 +73,9 @@ def test_event_info(physlite_file, backend, GDS, library): first_event = {} for col in cols: assert col in physlite_file.keys(), f"Column '{col}' not found" - first_event[col] = physlite_file[col].array(backend=backend, use_GDS=GDS)[0] + first_event[col] = physlite_file[col].array( + backend=backend, interpreter=interpreter + )[0] # Check first event values # expected event info values: event number, pile-up, lumiBlock @@ -86,10 +88,10 @@ def test_event_info(physlite_file, backend, GDS, library): @pytest.mark.parametrize( - ("backend", "GDS", "library"), - [("cuda", False, cupy), ("cuda", True, cupy)], + ("backend", "interpreter", "library"), + [("cuda", "cpu", cupy), ("cuda", "gpu", cupy)], ) -def test_truth_muon_containers(physlite_file, backend, GDS, library): +def test_truth_muon_containers(physlite_file, backend, interpreter, library): """Test that truth muon variables can be read and match expected values.""" cols = [ "TruthMuons", # AOD Container @@ -101,7 +103,7 @@ def test_truth_muon_containers(physlite_file, backend, GDS, library): arrays = {} for col in cols: assert col in physlite_file.keys(), f"Column '{col}' not found" - temp = physlite_file[col].array(backend=backend, use_GDS=GDS) + temp = physlite_file[col].array(backend=backend, interpreter=interpreter) arrays[col] = temp # Check values diff --git a/tests/test_1191_rntuple_fixes.py b/tests/test_1191_rntuple_fixes.py index ee6c19d29..4f45a8272 100644 --- a/tests/test_1191_rntuple_fixes.py +++ b/tests/test_1191_rntuple_fixes.py @@ -44,7 +44,8 @@ def test_multiple_page_delta_encoding(): filename = skhep_testdata.data_path("test_index_multicluster_rntuple_v1-0-0-0.root") with uproot.open(filename) as f: obj = f["ntuple"] - data = obj.read_col_page(0, 0) + field_metadata = obj.get_field_metadata(0) + data = obj.read_col_page(0, 0, field_metadata) # first page has 64 elements, so this checks that data was stitched together correctly assert data[64] - data[63] == 2