diff --git a/setup.py b/setup.py index ff99b8ef..00907cd1 100644 --- a/setup.py +++ b/setup.py @@ -215,7 +215,7 @@ def pkgconfig(*packages, **kw): ext.sources += zstd_sources ext.include_dirs += zstd_lib ext.depends += zstd_headers - ext.define_macros += [("ZSTD_SUPPORT", 1)] + ext.define_macros += [("ZSTD_SUPPORT", 1),("ZSTD_DISABLE_ASM",1)] # Check for plugin hdf5 plugin support (hdf5 >= 1.8.11) HDF5_PLUGIN_SUPPORT = False @@ -274,7 +274,7 @@ def finalize_options(self): ext.sources += zstd_sources ext.include_dirs += zstd_lib ext.depends += zstd_headers - ext.define_macros += [("ZSTD_SUPPORT", 1)] + ext.define_macros += [("ZSTD_SUPPORT", 1),("ZSTD_DISABLE_ASM",1)] def run(self): install_.run(self) diff --git a/src/bitshuffle.c b/src/bitshuffle.c index a8ef0b5c..a4452090 100644 --- a/src/bitshuffle.c +++ b/src/bitshuffle.c @@ -99,7 +99,9 @@ int64_t bshuf_decompress_lz4_block(ioc_chain *C_ptr, nbytes = LZ4_decompress_safe((const char*) in + 4, (char *) tmp_buf, nbytes_from_header, size * elem_size); CHECK_ERR_FREE_LZ(nbytes, tmp_buf); - if (nbytes != size * elem_size) { + // cast to unsigned safe: as CHECK_ERR_FREE would have handled nbytes < 0 + // by returning from function immediately + if ((size_t)nbytes != size * elem_size) { free(tmp_buf); return -91; } @@ -231,14 +233,20 @@ size_t bshuf_compress_lz4_bound(const size_t size, int64_t bshuf_compress_lz4(const void* in, void* out, const size_t size, const size_t elem_size, size_t block_size) { return bshuf_blocked_wrap_fun(&bshuf_compress_lz4_block, in, out, size, - elem_size, block_size, 0/*option*/); + elem_size, block_size, 0/*option*/,0 /*no output limit*/); +} + +int64_t bshuf_compress_lz4_out_size_limited(const void* in, void* out, const size_t size, + const size_t elem_size, size_t block_size) { + return bshuf_blocked_wrap_fun(&bshuf_compress_lz4_block, in, out, size, + elem_size, block_size,0 /*option*/,size*elem_size); } int64_t bshuf_decompress_lz4(const void* in, void* out, const size_t size, const size_t elem_size, size_t block_size) { return bshuf_blocked_wrap_fun(&bshuf_decompress_lz4_block, in, out, size, - elem_size, block_size, 0/*option*/); + elem_size, block_size, 0/*option*/,0/*no output limit in decompression*/); } #ifdef ZSTD_SUPPORT @@ -267,13 +275,13 @@ size_t bshuf_compress_zstd_bound(const size_t size, int64_t bshuf_compress_zstd(const void* in, void* out, const size_t size, const size_t elem_size, size_t block_size, const int comp_lvl) { return bshuf_blocked_wrap_fun(&bshuf_compress_zstd_block, in, out, size, - elem_size, block_size, comp_lvl); + elem_size, block_size, comp_lvl,0/*no output limit yet*/); } int64_t bshuf_decompress_zstd(const void* in, void* out, const size_t size, const size_t elem_size, size_t block_size) { return bshuf_blocked_wrap_fun(&bshuf_decompress_zstd_block, in, out, size, - elem_size, block_size, 0/*option*/); + elem_size, block_size, 0/*option*/,0/*no output limit in decompression*/); } #endif // ZSTD_SUPPORT diff --git a/src/bitshuffle.h b/src/bitshuffle.h index 1a13dd17..7de95dba 100644 --- a/src/bitshuffle.h +++ b/src/bitshuffle.h @@ -87,6 +87,8 @@ size_t bshuf_compress_lz4_bound(const size_t size, */ int64_t bshuf_compress_lz4(const void* in, void* out, const size_t size, const size_t elem_size, size_t block_size); +int64_t bshuf_compress_lz4_out_size_limited(const void* in, void* out, const size_t size, const size_t + elem_size, size_t block_size); /* ---- bshuf_decompress_lz4 ---- diff --git a/src/bitshuffle_core.c b/src/bitshuffle_core.c index ef33bf55..4a098a92 100644 --- a/src/bitshuffle_core.c +++ b/src/bitshuffle_core.c @@ -49,6 +49,29 @@ typedef size_t omp_size_t; #define CHECK_MULT_EIGHT(n) if (n % 8) return -80; #define MAX(X,Y) ((X) > (Y) ? (X) : (Y)) +#define CHECK_ABORT(condition,error,C) if ( condition ) { \ + ioc_destroy(&C); \ + return error; \ +} + +#if defined(_OPENMP) + +#define PARALLEL_MAY_FAIL(condition,guard) if (condition) + +#define _PRAGMA(x) _Pragma(#x) + +#define PARALLEL_CHECK_ABORT(condition,guard,error,C) if ( condition ) {\ + guard = error; \ + _PRAGMA(omp flush( guard )) \ +} + +#else + +#define PARALLEL_MAY_FAIL(condition,guard) + +#define PARALLEL_CHECK_ABORT(condition,guard,error,C) CHECK_ABORT(condition,error,C) + +#endif /* ---- Functions indicating compile time instruction set. ---- */ @@ -1667,10 +1690,13 @@ int64_t bshuf_untrans_bit_elem(const void* in, void* out, const size_t size, /* Wrap a function for processing a single block to process an entire buffer in * parallel. */ int64_t bshuf_blocked_wrap_fun(bshufBlockFunDef fun, const void* in, void* out, \ - const size_t size, const size_t elem_size, size_t block_size, const int option) { + const size_t size, const size_t elem_size, size_t block_size, const int option, \ + size_t out_size_limit) { omp_size_t ii = 0; +#if defined(_OPENMP) int64_t err = 0; +#endif int64_t count, cum_count=0; size_t last_block_size; size_t leftover_bytes; @@ -1686,29 +1712,63 @@ int64_t bshuf_blocked_wrap_fun(bshufBlockFunDef fun, const void* in, void* out, if (block_size == 0) { block_size = bshuf_default_block_size(elem_size); } - if (block_size % BSHUF_BLOCKED_MULT) return -81; + CHECK_ABORT(block_size % BSHUF_BLOCKED_MULT, -81, C); + if ( out_size_limit ) { #if defined(_OPENMP) + #pragma omp flush (err) #pragma omp parallel for schedule(dynamic, 1) \ private(count) reduction(+ : cum_count) #endif - for (ii = 0; ii < (omp_size_t)( size / block_size ); ii ++) { - count = fun(&C, block_size, elem_size, option); - if (count < 0) err = count; - cum_count += count; + for (ii = 0; ii < (omp_size_t)( size / block_size ); ii ++) { + PARALLEL_MAY_FAIL( !err, err ) { + count = fun(&C, block_size, elem_size,option); + PARALLEL_CHECK_ABORT( count < 0 , err, count, C ); + cum_count += count; + // cast to unsigned safe: cum_count < 0 only possible when count < 0 and + // cum_count < -count which cause PARALLEL_CHECK_ABORT to set err = count + PARALLEL_CHECK_ABORT( (size_t)cum_count > out_size_limit, err, -82, C); + } + } + } + else { +#if defined(_OPENMP) + #pragma omp flush (err) + #pragma omp parallel for schedule(dynamic, 1) \ + private(count) reduction(+ : cum_count) +#endif + for (ii = 0; ii < (omp_size_t)( size / block_size ); ii ++) { + PARALLEL_MAY_FAIL( !err , err ) { + count = fun(&C, block_size, elem_size,option); + PARALLEL_CHECK_ABORT( count < 0 , err, count, C); + cum_count += count; + } + } } +#if defined(_OPENMP) + CHECK_ABORT( err < 0, err, C); +#endif last_block_size = size % block_size; last_block_size = last_block_size - last_block_size % BSHUF_BLOCKED_MULT; if (last_block_size) { count = fun(&C, last_block_size, elem_size, option); - if (count < 0) err = count; + CHECK_ABORT(count < 0, count, C); cum_count += count; + // cast to unsigend safe: cum_count < 0 not possible as this would appart + // from out_size_limit > 0 required that count < 0 and cum_count < -count + // which any preceding call to CHECK_ABORT would have handled by aborting + // bshuf_blocked_wrap_fun immediately. + CHECK_ABORT( out_size_limit && ( (size_t)cum_count > out_size_limit ), -82, C ); } - if (err < 0) return err; - leftover_bytes = size % BSHUF_BLOCKED_MULT * elem_size; + cum_count += leftover_bytes; + // cast to unsigend safe: cum_count < 0 not possible as this would appart + // from out_size_limit > 0 required that count < 0 and cum_count < -count + // which any precedding call to CHECK_ABORT would have handled by aborting + // bshuf_blocked_wrap_fun immediately. + CHECK_ABORT ( out_size_limit && ( (size_t)cum_count > out_size_limit ), -82, C); //this_iter; last_in = (char *) ioc_get_in(&C, &this_iter); ioc_set_next_in(&C, &this_iter, (void *) (last_in + leftover_bytes)); @@ -1719,7 +1779,7 @@ int64_t bshuf_blocked_wrap_fun(bshufBlockFunDef fun, const void* in, void* out, ioc_destroy(&C); - return cum_count + leftover_bytes; + return cum_count;// + leftover_bytes; } @@ -1842,7 +1902,7 @@ int64_t bshuf_bitshuffle(const void* in, void* out, const size_t size, const size_t elem_size, size_t block_size) { return bshuf_blocked_wrap_fun(&bshuf_bitshuffle_block, in, out, size, - elem_size, block_size, 0/*option*/); + elem_size, block_size, 0/*option*/,0/* no output limit */); } @@ -1850,7 +1910,7 @@ int64_t bshuf_bitunshuffle(const void* in, void* out, const size_t size, const size_t elem_size, size_t block_size) { return bshuf_blocked_wrap_fun(&bshuf_bitunshuffle_block, in, out, size, - elem_size, block_size, 0/*option*/); + elem_size, block_size, 0/*option*/,0 /*no output limit in unshuffle */); } diff --git a/src/bitshuffle_internals.h b/src/bitshuffle_internals.h index 59356f10..6bda2d7d 100644 --- a/src/bitshuffle_internals.h +++ b/src/bitshuffle_internals.h @@ -66,7 +66,7 @@ typedef int64_t (*bshufBlockFunDef)(ioc_chain* C_ptr, /* Wrap a function for processing a single block to process an entire buffer in * parallel. */ int64_t bshuf_blocked_wrap_fun(bshufBlockFunDef fun, const void* in, void* out, - const size_t size, const size_t elem_size, size_t block_size, const int option); + const size_t size, const size_t elem_size, size_t block_size, const int option,size_t out_size_limit); #ifdef __cplusplus } // extern "C" diff --git a/src/bshuf_h5filter.c b/src/bshuf_h5filter.c index 114b91ff..78558dad 100644 --- a/src/bshuf_h5filter.c +++ b/src/bshuf_h5filter.c @@ -195,30 +195,55 @@ size_t bshuf_h5_filter(unsigned int flags, size_t cd_nelmts, // Techincally we should be using signed integers instead of // unsigned ones, however for valid inputs (positive numbers) these // have the same representation. + bshuf_write_uint64_BE(out_buf, nbytes_uncomp); bshuf_write_uint32_BE((char*) out_buf + 8, block_size * elem_size); - if(cd_values[4] == BSHUF_H5_COMPRESS_LZ4) { - err = bshuf_compress_lz4(in_buf, (char*) out_buf + 12, size, - elem_size, block_size); - } + if ( flags & H5Z_FLAG_OPTIONAL ) { + if ( nbytes_uncomp < 18 ) { // TODO feel free to properly adjust + // this is 12 header + 4 lz4 block header + 2 bytes lz4 block payload + // feel free to use more realistic values or make lz4 local or use + // larger common value for lz4 and std + err = -82; + } + else { + if(cd_values[4] == BSHUF_H5_COMPRESS_LZ4) { + err = bshuf_compress_lz4_out_size_limited(in_buf, (char*) out_buf + 12, size, elem_size, block_size); + } + // TODO add ZSTD support here + if ( err >= 0 ) { + nbytes_out = err + 12; + if ( nbytes_out > nbytes_uncomp ) err = -82; + } + } + } + else { + if(cd_values[4] == BSHUF_H5_COMPRESS_LZ4) { + err = bshuf_compress_lz4(in_buf, (char*) out_buf + 12, size, + elem_size, block_size); + } #ifdef ZSTD_SUPPORT - else if (cd_values[4] == BSHUF_H5_COMPRESS_ZSTD) { - err = bshuf_compress_zstd(in_buf, (char*) out_buf + 12, size, - elem_size, block_size, comp_lvl); - } + else if (cd_values[4] == BSHUF_H5_COMPRESS_ZSTD) { + err = bshuf_compress_zstd(in_buf, (char*) out_buf + 12, size, + elem_size, block_size, comp_lvl); + } #endif - nbytes_out = err + 12; + nbytes_out = err + 12; + } } } else { - if (flags & H5Z_FLAG_REVERSE) { + if (flags & H5Z_FLAG_REVERSE) { // Bit unshuffle. - err = bshuf_bitunshuffle(in_buf, out_buf, size, elem_size, - block_size); } else { + err = bshuf_bitunshuffle(in_buf, out_buf, size, elem_size, block_size); + } else { // Bit shuffle. - err = bshuf_bitshuffle(in_buf, out_buf, size, elem_size, - block_size); } nbytes_out = nbytes; } - //printf("nb_in %d, nb_uncomp %d, nb_out %d, buf_out %d, block %d\n", - //nbytes, nbytes_uncomp, nbytes_out, buf_size_out, block_size); + err = bshuf_bitshuffle(in_buf, out_buf, size, elem_size, block_size); + } + nbytes_out = nbytes; + } +#if 0 + printf("nb_in %d, nb_uncomp %d, nb_out %d, buf_out %d, block %d\n", + nbytes, nbytes_uncomp, nbytes_out, buf_size_out, block_size); +#endif if (err < 0) { sprintf(msg, "Error in bitshuffle with error code %d.", err); diff --git a/tests/test_h5filter.py b/tests/test_h5filter.py index 2dbb2c3f..0c94ef82 100644 --- a/tests/test_h5filter.py +++ b/tests/test_h5filter.py @@ -128,6 +128,118 @@ def test_with_zstd_compression(self): self.assertTrue(np.all(d == data)) f.close() + def test_with_compression_optional(self): + shape = (128 * 1024 + 783,) + chunks = (4 * 1024 + 23,) + dtype = np.int64 + data = np.arange(shape[0]) + fname = "tmp_test_filters.h5" + f = h5py.File(fname, "w") + h5.create_dataset( + f, + b"range", + shape, + dtype, + chunks, + filter_pipeline=(32008,), + filter_flags=(h5z.FLAG_OPTIONAL,), + filter_opts=((0, h5.H5_COMPRESS_LZ4),), + ) + f["range"][:] = data + + f.close() + # os.system('h5dump -H -p tmp_test_filters.h5') + + f = h5py.File(fname, "r") + self.assertTrue(f["range"].id.get_storage_size() < data.nbytes ) + d = f["range"][:] + self.assertTrue(np.all(d == data)) + f.close() + + def test_skip_compression_on_very_short(self): + shape = (2,)#(128 * 1024 + 783,) + chunks = shape #(4 * 1024 + 23,) + dtype = np.int64 + data = np.arange(shape[0]) + fname = "tmp_test_filters.h5" + f = h5py.File(fname, "w") + h5.create_dataset( + f, + b"range", + shape, + dtype, + chunks, + filter_pipeline=(32008,), + filter_flags=(h5z.FLAG_OPTIONAL,), + filter_opts=((0, h5.H5_COMPRESS_LZ4),), + ) + f["range"][:] = data + + f.close() + # os.system('h5dump -H -p tmp_test_filters.h5') + + f = h5py.File(fname, "r") + self.assertTrue(f["range"].id.get_storage_size() == data.nbytes) + d = f["range"][:] + self.assertTrue(np.all(d == data)) + f.close() + + def test_skip_compression_on_short_random(self): + shape = (4,)#(128 * 1024 + 783,) + chunks = shape #(4 * 1024 + 23,) + dtype = np.float64 + data = np.random.default_rng().random(shape[0]) + fname = "tmp_test_filters.h5" + f = h5py.File(fname, "w") + h5.create_dataset( + f, + b"range", + shape, + dtype, + chunks, + filter_pipeline=(32008,), + filter_flags=(h5z.FLAG_OPTIONAL,), + filter_opts=((0, h5.H5_COMPRESS_LZ4),), + ) + f["range"][:] = data + + f.close() + # os.system('h5dump -H -p tmp_test_filters.h5') + + f = h5py.File(fname, "r") + self.assertTrue(f["range"].id.get_storage_size() == data.nbytes) + d = f["range"][:] + self.assertTrue(np.all(d == data)) + f.close() + + def test_force_compression_on_short_random(self): + shape = (4,)#(128 * 1024 + 783,) + chunks = shape #(4 * 1024 + 23,) + dtype = np.float64 + data = np.random.default_rng().random(shape[0]) + fname = "tmp_test_filters.h5" + f = h5py.File(fname, "w") + h5.create_dataset( + f, + b"range", + shape, + dtype, + chunks, + filter_pipeline=(32008,), + filter_flags=(h5z.FLAG_MANDATORY,), + filter_opts=((0, h5.H5_COMPRESS_LZ4),), + ) + f["range"][:] = data + + f.close() + # os.system('h5dump -H -p tmp_test_filters.h5') + + f = h5py.File(fname, "r") + self.assertTrue(f["range"].id.get_storage_size() >= data.nbytes) + d = f["range"][:] + self.assertTrue(np.all(d == data)) + f.close() + def tearDown(self): files = glob.glob("tmp_test_*") for f in files: