Skip to content

Commit 5426fe5

Browse files
committed
Add support for FLAC compression of 64bit integers.
- Add native support in the compiled extension for compressing 64bit integers with FLAC. The high and low 32bits are compressed in separate channels. On compression (for little-endian systems), the input 64bit integers are simply interpreted as interleaved stereo channels. On decompression, the separate channels in each frame are extracted to the high / low 32bit portions of the 64bit integers. - Add 32bit / 64bit wrappers to the bindings, and support for 64bit integers to the mid-level encode / decode functions. - Changes to the treatment of floating point data. float32 data is converted to int32 as before, but float64 data is now converted to int64 to gain additional precision. - Changes to int64 encoding- previously this was restricted to either 32bit range around an offset or forced conversion as floating point data. Now this data is supported natively. Remove the int32/int64 conversion routines which are no longer needed. - Change the HDF5 and Zarr on-disk format (bumping version to "1"). This change was necessary to support storing int64 data without an associated offset. - Modify and expand unit tests to cover new int64 cases.
1 parent 9d098a5 commit 5426fe5

20 files changed

+1962
-523
lines changed

src/flacarray/array.py

Lines changed: 46 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ class FlacArray:
2929
stream in the overall bytes array. The shape of the starting array corresponds
3030
to the shape of the leading, un-compressed dimensions of the original array.
3131
32-
The input data is converted to 32bit integers. The "quanta" value is used
32+
The input data is converted to 32bit or 64bit integers. The "quanta" value is used
3333
for floating point data conversion and represents the floating point increment
3434
for a single integer value. If quanta is None, each stream is scaled independently
3535
based on its data range. If quanta is a scalar, all streams are scaled with the
@@ -42,20 +42,19 @@ class FlacArray:
4242
The following rules specify the data conversion that is performed depending on
4343
the input type:
4444
45-
* int32: No conversion.
45+
* int32: No conversion. Compressed to single channel FLAC bytestream.
4646
47-
* int64: Subtract the integer closest to the mean, then truncate to lower
48-
32 bits, and check that the higher bits were zero.
47+
* int64: No conversion. Compressed to 2-channel (stereo) FLAC bytestream.
4948
5049
* float32: Subtract the mean and scale data based on the quanta value (see
5150
above). Then round to nearest 32bit integer.
5251
5352
* float64: Subtract the mean and scale data based on the quanta value (see
54-
above). Then round to nearest 32bit integer.
53+
above). Then round to nearest 64bit integer.
5554
56-
After conversion to 32bit integers, each stream's data is separately compressed
57-
into a sequence of FLAC bytes, which is appended to the bytestream. The offset in
58-
bytes for each stream is recorded.
55+
After conversion to integers, each stream's data is separately compressed into a
56+
sequence of FLAC bytes, which is appended to the bytestream. The offset in bytes
57+
for each stream is recorded.
5958
6059
A FlacArray is only constructed directly when making a copy. Use the class methods
6160
to create FlacArrays from numpy arrays or on-disk representations.
@@ -71,6 +70,7 @@ def __init__(
7170
shape=None,
7271
global_shape=None,
7372
compressed=None,
73+
is_int64=None,
7474
stream_starts=None,
7575
stream_nbytes=None,
7676
stream_offsets=None,
@@ -84,6 +84,7 @@ def __init__(
8484
self._shape = copy.deepcopy(other._shape)
8585
self._global_shape = copy.deepcopy(other._global_shape)
8686
self._compressed = copy.deepcopy(other._compressed)
87+
self._is_int64 = other._is_int64
8788
self._stream_starts = copy.deepcopy(other._stream_starts)
8889
self._stream_nbytes = copy.deepcopy(other._stream_nbytes)
8990
self._stream_offsets = copy.deepcopy(other._stream_offsets)
@@ -97,6 +98,7 @@ def __init__(
9798
self._shape = shape
9899
self._global_shape = global_shape
99100
self._compressed = compressed
101+
self._is_int64 = is_int64
100102
self._stream_starts = stream_starts
101103
self._stream_nbytes = stream_nbytes
102104
self._stream_offsets = stream_offsets
@@ -124,18 +126,25 @@ def _init_params(self):
124126
if self._stream_offsets is not None:
125127
if self._stream_gains is not None:
126128
# This is floating point data
127-
if self._stream_gains.dtype == np.dtype(np.float64):
129+
if self._is_int64:
128130
self._typestr = "float64"
129131
else:
130132
self._typestr = "float32"
131133
else:
132-
# This is int64 data
133-
self._typestr = "int64"
134+
raise RuntimeError("Offsets and gains must both be None or not None")
134135
else:
135-
self._typestr = "int32"
136+
if self._is_int64:
137+
self._typestr = "int64"
138+
else:
139+
self._typestr = "int32"
136140

137141
# Shapes of decompressed array
138142

143+
@property
144+
def typestr(self):
145+
"""A string representation of the original data type."""
146+
return self._typestr
147+
139148
@property
140149
def shape(self):
141150
"""The shape of the local, uncompressed array."""
@@ -288,6 +297,7 @@ def __getitem__(self, key):
288297
keep=keep,
289298
first_stream_sample=first,
290299
last_stream_sample=last,
300+
is_int64=self._is_int64,
291301
)
292302
return arr
293303

@@ -313,6 +323,9 @@ def __eq__(self, other):
313323
if self._shape != other._shape:
314324
log.debug(f"other shape {other._shape} != {self._shape}")
315325
return False
326+
if self._typestr != other._typestr:
327+
log.debug(f"other typestr {other._typestr} != {self._typestr}")
328+
return False
316329
if self._global_shape != other._global_shape:
317330
msg = f"other global_shape {other._global_shape} != {self._global_shape}"
318331
log.debug(msg)
@@ -411,6 +424,7 @@ def to_array(
411424
first_stream_sample=first_samp,
412425
last_stream_sample=last_samp,
413426
use_threads=use_threads,
427+
is_int64=self._is_int64,
414428
)
415429
if keep is not None and keep_indices:
416430
return (arr, indices)
@@ -447,6 +461,11 @@ def from_array(
447461
global_shape = global_props["shape"]
448462
mpi_dist = global_props["dist"]
449463

464+
if arr.dtype == np.dtype(np.int64) or arr.dtype == np.dtype(np.float64):
465+
is_int64 = True
466+
else:
467+
is_int64 = False
468+
450469
# Compress our local piece of the array
451470
compressed, starts, nbytes, offsets, gains = array_compress(
452471
arr,
@@ -461,6 +480,7 @@ def from_array(
461480
shape=arr.shape,
462481
global_shape=global_shape,
463482
compressed=compressed,
483+
is_int64=is_int64,
464484
stream_starts=starts,
465485
stream_nbytes=nbytes,
466486
stream_offsets=offsets,
@@ -489,6 +509,10 @@ def write_hdf5(self, hgrp):
489509
None
490510
491511
"""
512+
if self._is_int64:
513+
n_channels = 2
514+
else:
515+
n_channels = 1
492516
hdf5_write_compressed(
493517
hgrp,
494518
self._leading_shape,
@@ -500,6 +524,7 @@ def write_hdf5(self, hgrp):
500524
self._stream_offsets,
501525
self._stream_gains,
502526
self._compressed,
527+
n_channels,
503528
self._compressed.nbytes,
504529
self._global_nbytes,
505530
self._global_proc_nbytes,
@@ -551,6 +576,7 @@ def read_hdf5(
551576
local_shape,
552577
global_shape,
553578
compressed,
579+
n_channels,
554580
stream_starts,
555581
stream_nbytes,
556582
stream_offsets,
@@ -569,6 +595,7 @@ def read_hdf5(
569595
shape=local_shape,
570596
global_shape=global_shape,
571597
compressed=compressed,
598+
is_int64=(n_channels == 2),
572599
stream_starts=stream_starts,
573600
stream_nbytes=stream_nbytes,
574601
stream_offsets=stream_offsets,
@@ -593,6 +620,10 @@ def write_zarr(self, zgrp):
593620
None
594621
595622
"""
623+
if self._is_int64:
624+
n_channels = 2
625+
else:
626+
n_channels = 1
596627
zarr_write_compressed(
597628
zgrp,
598629
self._leading_shape,
@@ -604,6 +635,7 @@ def write_zarr(self, zgrp):
604635
self._stream_offsets,
605636
self._stream_gains,
606637
self._compressed,
638+
n_channels,
607639
self._compressed.nbytes,
608640
self._global_nbytes,
609641
self._global_proc_nbytes,
@@ -653,6 +685,7 @@ def read_zarr(
653685
local_shape,
654686
global_shape,
655687
compressed,
688+
n_channels,
656689
stream_starts,
657690
stream_nbytes,
658691
stream_offsets,
@@ -671,6 +704,7 @@ def read_zarr(
671704
shape=local_shape,
672705
global_shape=global_shape,
673706
compressed=compressed,
707+
is_int64=(n_channels == 2),
674708
stream_starts=stream_starts,
675709
stream_nbytes=stream_nbytes,
676710
stream_offsets=stream_offsets,

src/flacarray/compress.py

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,18 @@
55
import numpy as np
66

77
from .libflacarray import encode_flac
8-
from .utils import int64_to_int32, float_to_int32, function_timer
8+
from .utils import float_to_int, function_timer
99

1010

1111
@function_timer
1212
def array_compress(arr, level=5, quanta=None, precision=None, use_threads=False):
1313
"""Compress a numpy array with optional floating point conversion.
1414
1515
If `arr` is an int32 array, the returned stream offsets and gains will be None.
16-
if `arr` is an int64 array, the stream offsets will be the integer value subtracted
17-
when converting to int32. Both float32 and float64 data will have floating point
18-
offset and gain arrays returned.
16+
if `arr` is an int64 array, the returned stream offsets and gains will be None and
17+
the calling code is responsible for tracking that the compressed bytes are
18+
associated with a 64bit stream. Both float32 and float64 data will have floating
19+
point offset and gain arrays returned.
1920
2021
Args:
2122
arr (numpy.ndarray): The input array data.
@@ -55,17 +56,11 @@ def array_compress(arr, level=5, quanta=None, precision=None, use_threads=False)
5556
else:
5657
dquanta = None
5758

58-
if arr.dtype == np.dtype(np.int32):
59+
if arr.dtype == np.dtype(np.int32) or arr.dtype == np.dtype(np.int64):
5960
(compressed, starts, nbytes) = encode_flac(arr, level, use_threads=use_threads)
6061
return (compressed, starts, nbytes, None, None)
61-
elif arr.dtype == np.dtype(np.int64):
62-
idata, ioff = int64_to_int32(arr)
63-
(compressed, starts, nbytes) = encode_flac(
64-
idata, level, use_threads=use_threads
65-
)
66-
return (compressed, starts, nbytes, ioff, None)
67-
elif arr.dtype == np.dtype(np.float64) or arr.dtype == np.dtype(np.float32):
68-
idata, foff, gains = float_to_int32(arr, quanta=dquanta, precision=precision)
62+
elif arr.dtype == np.dtype(np.float32) or arr.dtype == np.dtype(np.float64):
63+
idata, foff, gains = float_to_int(arr, quanta=dquanta, precision=precision)
6964
(compressed, starts, nbytes) = encode_flac(
7065
idata, level, use_threads=use_threads
7166
)

src/flacarray/decompress.py

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import numpy as np
66

77
from .libflacarray import decode_flac
8-
from .utils import int32_to_float, keep_select, function_timer, select_keep_indices
8+
from .utils import int_to_float, keep_select, function_timer, select_keep_indices
99

1010

1111
@function_timer
@@ -19,6 +19,7 @@ def array_decompress_slice(
1919
keep=None,
2020
first_stream_sample=None,
2121
last_stream_sample=None,
22+
is_int64=False,
2223
use_threads=False,
2324
):
2425
"""Decompress a slice of a FLAC encoded array and restore original data type.
@@ -52,6 +53,7 @@ def array_decompress_slice(
5253
keep (array): Bool array of streams to keep in the decompression.
5354
first_stream_sample (int): The first sample of every stream to decompress.
5455
last_stream_sample (int): The last sample of every stream to decompress.
56+
is_int64 (bool): If True, the compressed stream contains 64bit integers.
5557
use_threads (bool): If True, use OpenMP threads to parallelize decoding.
5658
This is only beneficial for large arrays.
5759
@@ -79,27 +81,19 @@ def array_decompress_slice(
7981
first_sample=first_stream_sample,
8082
last_sample=last_stream_sample,
8183
use_threads=use_threads,
84+
is_int64=is_int64,
8285
)
83-
arr = int32_to_float(idata, offsets, gains)
86+
arr = int_to_float(idata, offsets, gains)
8487
else:
85-
# This is int64 data
86-
idata = decode_flac(
87-
compressed,
88-
starts,
89-
nbytes,
90-
stream_size,
91-
first_sample=first_stream_sample,
92-
last_sample=last_stream_sample,
93-
use_threads=use_threads,
88+
raise RuntimeError(
89+
"When specifying offsets, you must also provide the gains"
9490
)
95-
ext_shape = offsets.shape + (1,)
96-
arr = idata.astype(np.int64) + offsets.reshape(ext_shape)
9791
else:
9892
if stream_gains is not None:
9993
raise RuntimeError(
10094
"When specifying gains, you must also provide the offsets"
10195
)
102-
# This is int32 data
96+
# This is integer data
10397
arr = decode_flac(
10498
compressed,
10599
starts,
@@ -108,6 +102,7 @@ def array_decompress_slice(
108102
first_sample=first_stream_sample,
109103
last_sample=last_stream_sample,
110104
use_threads=use_threads,
105+
is_int64=is_int64,
111106
)
112107
return (arr, indices)
113108

@@ -122,6 +117,7 @@ def array_decompress(
122117
stream_gains=None,
123118
first_stream_sample=None,
124119
last_stream_sample=None,
120+
is_int64=False,
125121
use_threads=False,
126122
):
127123
"""Decompress a FLAC encoded array and restore original data type.
@@ -144,6 +140,7 @@ def array_decompress(
144140
stream_gains (array): The array of gains, one per stream.
145141
first_stream_sample (int): The first sample of every stream to decompress.
146142
last_stream_sample (int): The last sample of every stream to decompress.
143+
is_int64 (bool): If True, the compressed stream contains 64bit integers.
147144
use_threads (bool): If True, use OpenMP threads to parallelize decoding.
148145
This is only beneficial for large arrays.
149146
@@ -161,6 +158,7 @@ def array_decompress(
161158
keep=None,
162159
first_stream_sample=first_stream_sample,
163160
last_stream_sample=last_stream_sample,
161+
is_int64=is_int64,
164162
use_threads=use_threads,
165163
)
166164
return arr

0 commit comments

Comments
 (0)