Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ def pkgconfig(*packages, **kw):
ext.sources += zstd_sources
ext.include_dirs += zstd_lib
ext.depends += zstd_headers
ext.define_macros += [("ZSTD_SUPPORT", 1)]
ext.define_macros += [("ZSTD_SUPPORT", 1),("ZSTD_DISABLE_ASM",1)]

# Check for plugin hdf5 plugin support (hdf5 >= 1.8.11)
HDF5_PLUGIN_SUPPORT = False
Expand Down Expand Up @@ -274,7 +274,7 @@ def finalize_options(self):
ext.sources += zstd_sources
ext.include_dirs += zstd_lib
ext.depends += zstd_headers
ext.define_macros += [("ZSTD_SUPPORT", 1)]
ext.define_macros += [("ZSTD_SUPPORT", 1),("ZSTD_DISABLE_ASM",1)]

def run(self):
install_.run(self)
Expand Down
18 changes: 13 additions & 5 deletions src/bitshuffle.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,9 @@ int64_t bshuf_decompress_lz4_block(ioc_chain *C_ptr,
nbytes = LZ4_decompress_safe((const char*) in + 4, (char *) tmp_buf, nbytes_from_header,
size * elem_size);
CHECK_ERR_FREE_LZ(nbytes, tmp_buf);
if (nbytes != size * elem_size) {
// cast to unsigned safe: as CHECK_ERR_FREE would have handled nbytes < 0
// by returning from function immediately
if ((size_t)nbytes != size * elem_size) {
free(tmp_buf);
return -91;
}
Expand Down Expand Up @@ -231,14 +233,20 @@ size_t bshuf_compress_lz4_bound(const size_t size,
int64_t bshuf_compress_lz4(const void* in, void* out, const size_t size,
const size_t elem_size, size_t block_size) {
return bshuf_blocked_wrap_fun(&bshuf_compress_lz4_block, in, out, size,
elem_size, block_size, 0/*option*/);
elem_size, block_size, 0/*option*/,0 /*no output limit*/);
}

int64_t bshuf_compress_lz4_out_size_limited(const void* in, void* out, const size_t size,
const size_t elem_size, size_t block_size) {
return bshuf_blocked_wrap_fun(&bshuf_compress_lz4_block, in, out, size,
elem_size, block_size,0 /*option*/,size*elem_size);
}


int64_t bshuf_decompress_lz4(const void* in, void* out, const size_t size,
const size_t elem_size, size_t block_size) {
return bshuf_blocked_wrap_fun(&bshuf_decompress_lz4_block, in, out, size,
elem_size, block_size, 0/*option*/);
elem_size, block_size, 0/*option*/,0/*no output limit in decompression*/);
}

#ifdef ZSTD_SUPPORT
Expand Down Expand Up @@ -267,13 +275,13 @@ size_t bshuf_compress_zstd_bound(const size_t size,
int64_t bshuf_compress_zstd(const void* in, void* out, const size_t size,
const size_t elem_size, size_t block_size, const int comp_lvl) {
return bshuf_blocked_wrap_fun(&bshuf_compress_zstd_block, in, out, size,
elem_size, block_size, comp_lvl);
elem_size, block_size, comp_lvl,0/*no output limit yet*/);
}


int64_t bshuf_decompress_zstd(const void* in, void* out, const size_t size,
const size_t elem_size, size_t block_size) {
return bshuf_blocked_wrap_fun(&bshuf_decompress_zstd_block, in, out, size,
elem_size, block_size, 0/*option*/);
elem_size, block_size, 0/*option*/,0/*no output limit in decompression*/);
}
#endif // ZSTD_SUPPORT
2 changes: 2 additions & 0 deletions src/bitshuffle.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ size_t bshuf_compress_lz4_bound(const size_t size,
*/
int64_t bshuf_compress_lz4(const void* in, void* out, const size_t size, const size_t
elem_size, size_t block_size);
int64_t bshuf_compress_lz4_out_size_limited(const void* in, void* out, const size_t size, const size_t
elem_size, size_t block_size);


/* ---- bshuf_decompress_lz4 ----
Expand Down
84 changes: 72 additions & 12 deletions src/bitshuffle_core.c
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,29 @@ typedef size_t omp_size_t;
#define CHECK_MULT_EIGHT(n) if (n % 8) return -80;
#define MAX(X,Y) ((X) > (Y) ? (X) : (Y))

#define CHECK_ABORT(condition,error,C) if ( condition ) { \
ioc_destroy(&C); \
return error; \
}

#if defined(_OPENMP)

#define PARALLEL_MAY_FAIL(condition,guard) if (condition)

#define _PRAGMA(x) _Pragma(#x)

#define PARALLEL_CHECK_ABORT(condition,guard,error,C) if ( condition ) {\
guard = error; \
_PRAGMA(omp flush( guard )) \
}

#else

#define PARALLEL_MAY_FAIL(condition,guard)

#define PARALLEL_CHECK_ABORT(condition,guard,error,C) CHECK_ABORT(condition,error,C)

#endif

/* ---- Functions indicating compile time instruction set. ---- */

Expand Down Expand Up @@ -1667,10 +1690,13 @@ int64_t bshuf_untrans_bit_elem(const void* in, void* out, const size_t size,
/* Wrap a function for processing a single block to process an entire buffer in
* parallel. */
int64_t bshuf_blocked_wrap_fun(bshufBlockFunDef fun, const void* in, void* out, \
const size_t size, const size_t elem_size, size_t block_size, const int option) {
const size_t size, const size_t elem_size, size_t block_size, const int option, \
size_t out_size_limit) {

omp_size_t ii = 0;
#if defined(_OPENMP)
int64_t err = 0;
#endif
int64_t count, cum_count=0;
size_t last_block_size;
size_t leftover_bytes;
Expand All @@ -1686,29 +1712,63 @@ int64_t bshuf_blocked_wrap_fun(bshufBlockFunDef fun, const void* in, void* out,
if (block_size == 0) {
block_size = bshuf_default_block_size(elem_size);
}
if (block_size % BSHUF_BLOCKED_MULT) return -81;
CHECK_ABORT(block_size % BSHUF_BLOCKED_MULT, -81, C);

if ( out_size_limit ) {
#if defined(_OPENMP)
#pragma omp flush (err)
#pragma omp parallel for schedule(dynamic, 1) \
private(count) reduction(+ : cum_count)
#endif
for (ii = 0; ii < (omp_size_t)( size / block_size ); ii ++) {
count = fun(&C, block_size, elem_size, option);
if (count < 0) err = count;
cum_count += count;
for (ii = 0; ii < (omp_size_t)( size / block_size ); ii ++) {
PARALLEL_MAY_FAIL( !err, err ) {
count = fun(&C, block_size, elem_size,option);
PARALLEL_CHECK_ABORT( count < 0 , err, count, C );
cum_count += count;
// cast to unsigned safe: cum_count < 0 only possible when count < 0 and
// cum_count < -count which cause PARALLEL_CHECK_ABORT to set err = count
PARALLEL_CHECK_ABORT( (size_t)cum_count > out_size_limit, err, -82, C);
}
}
}
else {
#if defined(_OPENMP)
#pragma omp flush (err)
#pragma omp parallel for schedule(dynamic, 1) \
private(count) reduction(+ : cum_count)
#endif
for (ii = 0; ii < (omp_size_t)( size / block_size ); ii ++) {
PARALLEL_MAY_FAIL( !err , err ) {
count = fun(&C, block_size, elem_size,option);
PARALLEL_CHECK_ABORT( count < 0 , err, count, C);
cum_count += count;
}
}
}
#if defined(_OPENMP)
CHECK_ABORT( err < 0, err, C);
#endif

last_block_size = size % block_size;
last_block_size = last_block_size - last_block_size % BSHUF_BLOCKED_MULT;
if (last_block_size) {
count = fun(&C, last_block_size, elem_size, option);
if (count < 0) err = count;
CHECK_ABORT(count < 0, count, C);
cum_count += count;
// cast to unsigend safe: cum_count < 0 not possible as this would appart
// from out_size_limit > 0 required that count < 0 and cum_count < -count
// which any preceding call to CHECK_ABORT would have handled by aborting
// bshuf_blocked_wrap_fun immediately.
CHECK_ABORT( out_size_limit && ( (size_t)cum_count > out_size_limit ), -82, C );
}

if (err < 0) return err;

leftover_bytes = size % BSHUF_BLOCKED_MULT * elem_size;
cum_count += leftover_bytes;
// cast to unsigend safe: cum_count < 0 not possible as this would appart
// from out_size_limit > 0 required that count < 0 and cum_count < -count
// which any precedding call to CHECK_ABORT would have handled by aborting
// bshuf_blocked_wrap_fun immediately.
CHECK_ABORT ( out_size_limit && ( (size_t)cum_count > out_size_limit ), -82, C);
//this_iter;
last_in = (char *) ioc_get_in(&C, &this_iter);
ioc_set_next_in(&C, &this_iter, (void *) (last_in + leftover_bytes));
Expand All @@ -1719,7 +1779,7 @@ int64_t bshuf_blocked_wrap_fun(bshufBlockFunDef fun, const void* in, void* out,

ioc_destroy(&C);

return cum_count + leftover_bytes;
return cum_count;// + leftover_bytes;
}


Expand Down Expand Up @@ -1842,15 +1902,15 @@ int64_t bshuf_bitshuffle(const void* in, void* out, const size_t size,
const size_t elem_size, size_t block_size) {

return bshuf_blocked_wrap_fun(&bshuf_bitshuffle_block, in, out, size,
elem_size, block_size, 0/*option*/);
elem_size, block_size, 0/*option*/,0/* no output limit */);
}


int64_t bshuf_bitunshuffle(const void* in, void* out, const size_t size,
const size_t elem_size, size_t block_size) {

return bshuf_blocked_wrap_fun(&bshuf_bitunshuffle_block, in, out, size,
elem_size, block_size, 0/*option*/);
elem_size, block_size, 0/*option*/,0 /*no output limit in unshuffle */);
}


Expand Down
2 changes: 1 addition & 1 deletion src/bitshuffle_internals.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ typedef int64_t (*bshufBlockFunDef)(ioc_chain* C_ptr,
/* Wrap a function for processing a single block to process an entire buffer in
* parallel. */
int64_t bshuf_blocked_wrap_fun(bshufBlockFunDef fun, const void* in, void* out,
const size_t size, const size_t elem_size, size_t block_size, const int option);
const size_t size, const size_t elem_size, size_t block_size, const int option,size_t out_size_limit);

#ifdef __cplusplus
} // extern "C"
Expand Down
57 changes: 41 additions & 16 deletions src/bshuf_h5filter.c
Original file line number Diff line number Diff line change
Expand Up @@ -195,30 +195,55 @@ size_t bshuf_h5_filter(unsigned int flags, size_t cd_nelmts,
// Techincally we should be using signed integers instead of
// unsigned ones, however for valid inputs (positive numbers) these
// have the same representation.

bshuf_write_uint64_BE(out_buf, nbytes_uncomp);
bshuf_write_uint32_BE((char*) out_buf + 8, block_size * elem_size);
if(cd_values[4] == BSHUF_H5_COMPRESS_LZ4) {
err = bshuf_compress_lz4(in_buf, (char*) out_buf + 12, size,
elem_size, block_size);
}
if ( flags & H5Z_FLAG_OPTIONAL ) {
if ( nbytes_uncomp < 18 ) { // TODO feel free to properly adjust
// this is 12 header + 4 lz4 block header + 2 bytes lz4 block payload
// feel free to use more realistic values or make lz4 local or use
// larger common value for lz4 and std
err = -82;
}
else {
if(cd_values[4] == BSHUF_H5_COMPRESS_LZ4) {
err = bshuf_compress_lz4_out_size_limited(in_buf, (char*) out_buf + 12, size, elem_size, block_size);
}
// TODO add ZSTD support here
if ( err >= 0 ) {
nbytes_out = err + 12;
if ( nbytes_out > nbytes_uncomp ) err = -82;
}
}
}
else {
if(cd_values[4] == BSHUF_H5_COMPRESS_LZ4) {
err = bshuf_compress_lz4(in_buf, (char*) out_buf + 12, size,
elem_size, block_size);
}
#ifdef ZSTD_SUPPORT
else if (cd_values[4] == BSHUF_H5_COMPRESS_ZSTD) {
err = bshuf_compress_zstd(in_buf, (char*) out_buf + 12, size,
elem_size, block_size, comp_lvl);
}
else if (cd_values[4] == BSHUF_H5_COMPRESS_ZSTD) {
err = bshuf_compress_zstd(in_buf, (char*) out_buf + 12, size,
elem_size, block_size, comp_lvl);
}
#endif
nbytes_out = err + 12;
nbytes_out = err + 12;
}
}
} else {
if (flags & H5Z_FLAG_REVERSE) {
if (flags & H5Z_FLAG_REVERSE) {
// Bit unshuffle.
err = bshuf_bitunshuffle(in_buf, out_buf, size, elem_size,
block_size); } else {
err = bshuf_bitunshuffle(in_buf, out_buf, size, elem_size, block_size);
} else {
// Bit shuffle.
err = bshuf_bitshuffle(in_buf, out_buf, size, elem_size,
block_size); } nbytes_out = nbytes; }
//printf("nb_in %d, nb_uncomp %d, nb_out %d, buf_out %d, block %d\n",
//nbytes, nbytes_uncomp, nbytes_out, buf_size_out, block_size);
err = bshuf_bitshuffle(in_buf, out_buf, size, elem_size, block_size);
}
nbytes_out = nbytes;
}
#if 0
printf("nb_in %d, nb_uncomp %d, nb_out %d, buf_out %d, block %d\n",
nbytes, nbytes_uncomp, nbytes_out, buf_size_out, block_size);
#endif

if (err < 0) {
sprintf(msg, "Error in bitshuffle with error code %d.", err);
Expand Down
Loading