Skip to content

Commit 1895907

Browse files
committed
Add support for FLAC compression of 64bit integers.
- Add native support in the compiled extension for compressing 64bit integers with FLAC. The high and low 32bits are compressed in separate channels. On compression (for little-endian systems), the input 64bit integers are simply interpreted as interleaved stereo channels. On decompression, the separate channels in each frame are extracted to the high / low 32bit portions of the 64bit integers. Expose C functions in public header. - Add 32bit / 64bit wrappers to the bindings, and support for 64bit integers to the mid-level encode / decode functions. - Changes to the treatment of floating point data. float32 data is converted to int32 as before, but float64 data is now converted to int64 to gain additional precision. - Changes to int64 encoding- previously this was restricted to either 32bit range around an offset or forced conversion as floating point data. Now this data is supported natively. Remove the int32/int64 conversion routines which are no longer needed. - Change the HDF5 and Zarr on-disk format (bumping version to "1"). This change was necessary to support storing int64 data without an associated offset. - Modify and expand unit tests to cover new int64 cases. For MPI tests, ensure random data is created on one process and then distributed. - Cleanup benchmarking script. - Changes to MPI I/O helper functions: refactor the extract, send and receive steps into subroutines for more clarity. Use Send instead of Isend for better simplicity and more robustness across MPI implementations.
1 parent b67e16d commit 1895907

28 files changed

+3217
-1331
lines changed

src/flacarray/array.py

Lines changed: 43 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from .hdf5 import write_compressed as hdf5_write_compressed
1212
from .hdf5 import read_compressed as hdf5_read_compressed
1313
from .mpi import global_bytes, global_array_properties
14-
from .utils import log
14+
from .utils import log, compressed_dtype
1515
from .zarr import write_compressed as zarr_write_compressed
1616
from .zarr import read_compressed as zarr_read_compressed
1717

@@ -29,7 +29,7 @@ class FlacArray:
2929
stream in the overall bytes array. The shape of the starting array corresponds
3030
to the shape of the leading, un-compressed dimensions of the original array.
3131
32-
The input data is converted to 32bit integers. The "quanta" value is used
32+
The input data is converted to 32bit or 64bit integers. The "quanta" value is used
3333
for floating point data conversion and represents the floating point increment
3434
for a single integer value. If quanta is None, each stream is scaled independently
3535
based on its data range. If quanta is a scalar, all streams are scaled with the
@@ -42,20 +42,19 @@ class FlacArray:
4242
The following rules specify the data conversion that is performed depending on
4343
the input type:
4444
45-
* int32: No conversion.
45+
* int32: No conversion. Compressed to single channel FLAC bytestream.
4646
47-
* int64: Subtract the integer closest to the mean, then truncate to lower
48-
32 bits, and check that the higher bits were zero.
47+
* int64: No conversion. Compressed to 2-channel (stereo) FLAC bytestream.
4948
5049
* float32: Subtract the mean and scale data based on the quanta value (see
5150
above). Then round to nearest 32bit integer.
5251
5352
* float64: Subtract the mean and scale data based on the quanta value (see
54-
above). Then round to nearest 32bit integer.
53+
above). Then round to nearest 64bit integer.
5554
56-
After conversion to 32bit integers, each stream's data is separately compressed
57-
into a sequence of FLAC bytes, which is appended to the bytestream. The offset in
58-
bytes for each stream is recorded.
55+
After conversion to integers, each stream's data is separately compressed into a
56+
sequence of FLAC bytes, which is appended to the bytestream. The offset in bytes
57+
for each stream is recorded.
5958
6059
A FlacArray is only constructed directly when making a copy. Use the class methods
6160
to create FlacArrays from numpy arrays or on-disk representations.
@@ -125,6 +124,11 @@ def _init_params(self):
125124
self._global_nstreams = np.prod(self._global_leading_shape)
126125
# For reference, record the type string of the original data.
127126
self._typestr = self._dtype_str(self._dtype)
127+
# Track whether we have 32bit or 64bit data
128+
self._is_int64 = (
129+
self._dtype == np.dtype(np.int64) or
130+
self._dtype == np.dtype(np.float64)
131+
)
128132

129133
@staticmethod
130134
def _dtype_str(dt):
@@ -245,6 +249,11 @@ def dtype(self):
245249
"""The dtype of the uncompressed array."""
246250
return self._dtype
247251

252+
@property
253+
def typestr(self):
254+
"""A string representation of the original data type."""
255+
return self._typestr
256+
248257
def _keep_view(self, key):
249258
if len(key) != len(self._leading_shape):
250259
raise ValueError("view size does not match leading dimensions")
@@ -341,6 +350,7 @@ def __getitem__(self, raw_key):
341350
keep=keep,
342351
first_stream_sample=first,
343352
last_stream_sample=last,
353+
is_int64=self._is_int64,
344354
)
345355
return arr.reshape(full_shape)
346356

@@ -366,6 +376,9 @@ def __eq__(self, other):
366376
if self._shape != other._shape:
367377
log.debug(f"other shape {other._shape} != {self._shape}")
368378
return False
379+
if self._dtype != other._dtype:
380+
log.debug(f"other dtype {other._dtype} != {self._dtype}")
381+
return False
369382
if self._global_shape != other._global_shape:
370383
msg = f"other global_shape {other._global_shape} != {self._global_shape}"
371384
log.debug(msg)
@@ -463,6 +476,7 @@ def to_array(
463476
keep=keep,
464477
first_stream_sample=first_samp,
465478
last_stream_sample=last_samp,
479+
is_int64=self._is_int64,
466480
use_threads=use_threads,
467481
)
468482
if keep is not None and keep_indices:
@@ -514,6 +528,7 @@ def from_array(
514528
shape=arr.shape,
515529
global_shape=global_shape,
516530
compressed=compressed,
531+
dtype=arr.dtype,
517532
stream_starts=starts,
518533
stream_nbytes=nbytes,
519534
stream_offsets=offsets,
@@ -542,6 +557,11 @@ def write_hdf5(self, hgrp):
542557
None
543558
544559
"""
560+
if self._is_int64:
561+
n_channels = 2
562+
else:
563+
n_channels = 1
564+
545565
hdf5_write_compressed(
546566
hgrp,
547567
self._leading_shape,
@@ -553,6 +573,7 @@ def write_hdf5(self, hgrp):
553573
self._stream_offsets,
554574
self._stream_gains,
555575
self._compressed,
576+
n_channels,
556577
self._compressed.nbytes,
557578
self._global_nbytes,
558579
self._global_proc_nbytes,
@@ -604,6 +625,7 @@ def read_hdf5(
604625
local_shape,
605626
global_shape,
606627
compressed,
628+
n_channels,
607629
stream_starts,
608630
stream_nbytes,
609631
stream_offsets,
@@ -617,11 +639,14 @@ def read_hdf5(
617639
mpi_dist=mpi_dist,
618640
)
619641

642+
dt = compressed_dtype(n_channels, stream_offsets, stream_gains)
643+
620644
return FlacArray(
621645
None,
622646
shape=local_shape,
623647
global_shape=global_shape,
624648
compressed=compressed,
649+
dtype=dt,
625650
stream_starts=stream_starts,
626651
stream_nbytes=stream_nbytes,
627652
stream_offsets=stream_offsets,
@@ -646,6 +671,10 @@ def write_zarr(self, zgrp):
646671
None
647672
648673
"""
674+
if self._is_int64:
675+
n_channels = 2
676+
else:
677+
n_channels = 1
649678
zarr_write_compressed(
650679
zgrp,
651680
self._leading_shape,
@@ -657,6 +686,7 @@ def write_zarr(self, zgrp):
657686
self._stream_offsets,
658687
self._stream_gains,
659688
self._compressed,
689+
n_channels,
660690
self._compressed.nbytes,
661691
self._global_nbytes,
662692
self._global_proc_nbytes,
@@ -706,6 +736,7 @@ def read_zarr(
706736
local_shape,
707737
global_shape,
708738
compressed,
739+
n_channels,
709740
stream_starts,
710741
stream_nbytes,
711742
stream_offsets,
@@ -719,11 +750,14 @@ def read_zarr(
719750
mpi_dist=mpi_dist,
720751
)
721752

753+
dt = compressed_dtype(n_channels, stream_offsets, stream_gains)
754+
722755
return FlacArray(
723756
None,
724757
shape=local_shape,
725758
global_shape=global_shape,
726759
compressed=compressed,
760+
dtype=dt,
727761
stream_starts=stream_starts,
728762
stream_nbytes=stream_nbytes,
729763
stream_offsets=stream_offsets,

src/flacarray/compress.py

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,18 @@
55
import numpy as np
66

77
from .libflacarray import encode_flac
8-
from .utils import int64_to_int32, float_to_int32, function_timer
8+
from .utils import float_to_int, function_timer
99

1010

1111
@function_timer
1212
def array_compress(arr, level=5, quanta=None, precision=None, use_threads=False):
1313
"""Compress a numpy array with optional floating point conversion.
1414
1515
If `arr` is an int32 array, the returned stream offsets and gains will be None.
16-
if `arr` is an int64 array, the stream offsets will be the integer value subtracted
17-
when converting to int32. Both float32 and float64 data will have floating point
18-
offset and gain arrays returned.
16+
if `arr` is an int64 array, the returned stream offsets and gains will be None and
17+
the calling code is responsible for tracking that the compressed bytes are
18+
associated with a 64bit stream. Both float32 and float64 data will have floating
19+
point offset and gain arrays returned.
1920
2021
Args:
2122
arr (numpy.ndarray): The input array data.
@@ -55,17 +56,11 @@ def array_compress(arr, level=5, quanta=None, precision=None, use_threads=False)
5556
else:
5657
dquanta = None
5758

58-
if arr.dtype == np.dtype(np.int32):
59+
if arr.dtype == np.dtype(np.int32) or arr.dtype == np.dtype(np.int64):
5960
(compressed, starts, nbytes) = encode_flac(arr, level, use_threads=use_threads)
6061
return (compressed, starts, nbytes, None, None)
61-
elif arr.dtype == np.dtype(np.int64):
62-
idata, ioff = int64_to_int32(arr)
63-
(compressed, starts, nbytes) = encode_flac(
64-
idata, level, use_threads=use_threads
65-
)
66-
return (compressed, starts, nbytes, ioff, None)
67-
elif arr.dtype == np.dtype(np.float64) or arr.dtype == np.dtype(np.float32):
68-
idata, foff, gains = float_to_int32(arr, quanta=dquanta, precision=precision)
62+
elif arr.dtype == np.dtype(np.float32) or arr.dtype == np.dtype(np.float64):
63+
idata, foff, gains = float_to_int(arr, quanta=dquanta, precision=precision)
6964
(compressed, starts, nbytes) = encode_flac(
7065
idata, level, use_threads=use_threads
7166
)

src/flacarray/decompress.py

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import numpy as np
66

77
from .libflacarray import decode_flac
8-
from .utils import int32_to_float, keep_select, function_timer, select_keep_indices
8+
from .utils import int_to_float, keep_select, function_timer, select_keep_indices
99

1010

1111
@function_timer
@@ -19,14 +19,18 @@ def array_decompress_slice(
1919
keep=None,
2020
first_stream_sample=None,
2121
last_stream_sample=None,
22+
is_int64=False,
2223
use_threads=False,
2324
):
2425
"""Decompress a slice of a FLAC encoded array and restore original data type.
2526
26-
If `stream_gains` is specified, the output data will be float32 and `stream_offsets`
27-
must also be provided. If `stream_gains` is not specified, but `stream_offsets` is,
28-
then the returned data will be int64. If neither offsets or gains are specified,
29-
the decompressed int32 array is returned.
27+
If both `stream_gains` and `stream_offsets` are specified, the output will be
28+
floating point data. If neither is specified, the output will be integer data.
29+
It is an error to specify only one of those options.
30+
31+
The compressed byte stream might contain either int32 or int64 data, and the calling
32+
code is responsible for tracking this. The `is_int64` parameter should be set to
33+
True if the byte stream contains 64bit integers.
3034
3135
To decompress a subset of samples in all streams, specify the `first_stream_sample`
3236
and `last_stream_sample` values. None values or negative values disable this
@@ -52,6 +56,7 @@ def array_decompress_slice(
5256
keep (array): Bool array of streams to keep in the decompression.
5357
first_stream_sample (int): The first sample of every stream to decompress.
5458
last_stream_sample (int): The last sample of every stream to decompress.
59+
is_int64 (bool): If True, the compressed stream contains 64bit integers.
5560
use_threads (bool): If True, use OpenMP threads to parallelize decoding.
5661
This is only beneficial for large arrays.
5762
@@ -79,27 +84,19 @@ def array_decompress_slice(
7984
first_sample=first_stream_sample,
8085
last_sample=last_stream_sample,
8186
use_threads=use_threads,
87+
is_int64=is_int64,
8288
)
83-
arr = int32_to_float(idata, offsets, gains)
89+
arr = int_to_float(idata, offsets, gains)
8490
else:
85-
# This is int64 data
86-
idata = decode_flac(
87-
compressed,
88-
starts,
89-
nbytes,
90-
stream_size,
91-
first_sample=first_stream_sample,
92-
last_sample=last_stream_sample,
93-
use_threads=use_threads,
91+
raise RuntimeError(
92+
"When specifying offsets, you must also provide the gains"
9493
)
95-
ext_shape = offsets.shape + (1,)
96-
arr = idata.astype(np.int64) + offsets.reshape(ext_shape)
9794
else:
9895
if stream_gains is not None:
9996
raise RuntimeError(
10097
"When specifying gains, you must also provide the offsets"
10198
)
102-
# This is int32 data
99+
# This is integer data
103100
arr = decode_flac(
104101
compressed,
105102
starts,
@@ -108,6 +105,7 @@ def array_decompress_slice(
108105
first_sample=first_stream_sample,
109106
last_sample=last_stream_sample,
110107
use_threads=use_threads,
108+
is_int64=is_int64,
111109
)
112110
return (arr, indices)
113111

@@ -122,14 +120,18 @@ def array_decompress(
122120
stream_gains=None,
123121
first_stream_sample=None,
124122
last_stream_sample=None,
123+
is_int64=False,
125124
use_threads=False,
126125
):
127126
"""Decompress a FLAC encoded array and restore original data type.
128127
129-
If `stream_gains` is specified, the output data will be float32 and `stream_offsets`
130-
must also be provided. If `stream_gains` is not specified, but `stream_offsets` is,
131-
then the returned data will be int64. If neither offsets or gains are specified,
132-
the decompressed int32 array is returned.
128+
If both `stream_gains` and `stream_offsets` are specified, the output will be
129+
floating point data. If neither is specified, the output will be integer data.
130+
It is an error to specify only one of those options.
131+
132+
The compressed byte stream might contain either int32 or int64 data, and the calling
133+
code is responsible for tracking this. The `is_int64` parameter should be set to
134+
True if the byte stream contains 64bit integers.
133135
134136
To decompress a subset of samples in all streams, specify the `first_stream_sample`
135137
and `last_stream_sample` values. None values or negative values disable this
@@ -144,6 +146,7 @@ def array_decompress(
144146
stream_gains (array): The array of gains, one per stream.
145147
first_stream_sample (int): The first sample of every stream to decompress.
146148
last_stream_sample (int): The last sample of every stream to decompress.
149+
is_int64 (bool): If True, the compressed stream contains 64bit integers.
147150
use_threads (bool): If True, use OpenMP threads to parallelize decoding.
148151
This is only beneficial for large arrays.
149152
@@ -161,6 +164,7 @@ def array_decompress(
161164
keep=None,
162165
first_stream_sample=first_stream_sample,
163166
last_stream_sample=last_stream_sample,
167+
is_int64=is_int64,
164168
use_threads=use_threads,
165169
)
166170
return arr

0 commit comments

Comments
 (0)