diff --git a/cram/cram_index.c b/cram/cram_index.c index 45d420df2..c8a5054d1 100644 --- a/cram/cram_index.c +++ b/cram/cram_index.c @@ -738,9 +738,10 @@ int cram_index_container(cram_fd *fd, * Returns 0 on success, * negative on failure (-1 for read failure, -4 for write failure) */ -int cram_index_build(cram_fd *fd, const char *fn_base, const char *fn_idx) { +int cram_index_build(cram_fd *fd, const char *fn_base, const char *fn_idx, const hts_progress_callback progress_fn, void *progress_data) { cram_container *c; off_t cpos, hpos; + off_t ppos = 0; BGZF *fp; kstring_t fn_idx_str = {0}; int64_t last_ref = -9, last_start = -9; @@ -771,6 +772,13 @@ int cram_index_build(cram_fd *fd, const char *fn_base, const char *fn_idx) { hpos = htell(fd->fp); + if (progress_fn && hpos != ppos) { + if (progress_fn(hpos, progress_data) != 0) { + return -5; + } + ppos = hpos; + } + if (!(c->comp_hdr_block = cram_read_block(fd))) return -1; assert(c->comp_hdr_block->content_type == COMPRESSION_HEADER); diff --git a/cram/cram_index.h b/cram/cram_index.h index 5fa1154f5..d54cf78c0 100644 --- a/cram/cram_index.h +++ b/cram/cram_index.h @@ -87,12 +87,14 @@ int cram_seek_to_refpos(cram_fd *fd, cram_range *r); * fd is a newly opened cram file that we wish to index. * fn_base is the filename of the associated CRAM file. * fn_idx is the filename of the index file to be written; + * progress_fn is an optional callback which will be called periodically with the current working offset into the file + * progress_data is the optional data passed to the progress_fn * if NULL, we add ".crai" to fn_base to get the index filename. * * Returns 0 on success, * negative on failure (-1 for read failure, -4 for write failure) */ -int cram_index_build(cram_fd *fd, const char *fn_base, const char *fn_idx); +int cram_index_build(cram_fd *fd, const char *fn_base, const char *fn_idx, const hts_progress_callback progress_fn, void *progress_data); /* * Adds a single slice to the index. diff --git a/htslib/hts.h b/htslib/hts.h index 48c263cab..9f74cb614 100644 --- a/htslib/hts.h +++ b/htslib/hts.h @@ -59,6 +59,9 @@ struct hFILE; struct hts_tpool; struct sam_hdr_t; + +typedef int (*hts_progress_callback)(int64_t current_pos, void *data); + /** * @hideinitializer * Deprecated macro to expand a dynamic array of a given type diff --git a/htslib/sam.h b/htslib/sam.h index 79ac26e4b..397e9e021 100644 --- a/htslib/sam.h +++ b/htslib/sam.h @@ -1237,6 +1237,20 @@ int sam_index_build2(const char *fn, const char *fnidx, int min_shift) HTS_RESUL HTSLIB_EXPORT int sam_index_build3(const char *fn, const char *fnidx, int min_shift, int nthreads) HTS_RESULT_USED; +/// Generate and save an index to a specific file +/** @param fn Input BAM/CRAM/etc filename + @param fnidx Output filename, or NULL to add .bai/.csi/etc to @a fn + @param min_shift Positive to generate CSI, or 0 to generate BAI + @param nthreads Number of threads to use when building the index + @param progress_fn Funciton to call with progress updates + @param progress_data Data to pass to the progress function + @return 0 if successful, or negative if an error occurred (see + sam_index_build for error codes) +*/ +HTSLIB_EXPORT +int sam_index_build4(const char *fn, const char *fnidx, int min_shift, int nthreads, const hts_progress_callback progress_fn, void* progress_data) HTS_RESULT_USED; + + /// Free a SAM iterator /// @param iter Iterator to free #define sam_itr_destroy(iter) hts_itr_destroy(iter) diff --git a/htslib/tbx.h b/htslib/tbx.h index 9b9e111b9..0502481b1 100644 --- a/htslib/tbx.h +++ b/htslib/tbx.h @@ -52,6 +52,7 @@ typedef struct tbx_t { void *dict; } tbx_t; + extern const tbx_conf_t tbx_conf_gff, tbx_conf_bed, tbx_conf_psltbl, tbx_conf_sam, tbx_conf_vcf; #define tbx_itr_destroy(iter) hts_itr_destroy(iter) @@ -76,6 +77,10 @@ extern const tbx_conf_t tbx_conf_gff, tbx_conf_bed, tbx_conf_psltbl, tbx_conf_sa */ HTSLIB_EXPORT tbx_t *tbx_index(BGZF *fp, int min_shift, const tbx_conf_t *conf); + + HTSLIB_EXPORT + tbx_t *tbx_index2(BGZF *fp, int min_shift, const tbx_conf_t *conf, const hts_progress_callback progress_fn, void *progress_data); + /* * All tbx_index_build* methods return: 0 (success), -1 (general failure) or -2 (compression not BGZF) */ @@ -88,6 +93,10 @@ extern const tbx_conf_t tbx_conf_gff, tbx_conf_bed, tbx_conf_psltbl, tbx_conf_sa HTSLIB_EXPORT int tbx_index_build3(const char *fn, const char *fnidx, int min_shift, int n_threads, const tbx_conf_t *conf); + HTSLIB_EXPORT + int tbx_index_build4(const char *fn, const char *fnidx, int min_shift, int n_threads, const tbx_conf_t *conf, + const hts_progress_callback progress_fn, void *progress_data); + /// Load or stream a .tbi or .csi index /** @param fn Name of the data file corresponding to the index diff --git a/sam.c b/sam.c index 393a3b22e..cd03e421f 100644 --- a/sam.c +++ b/sam.c @@ -923,9 +923,10 @@ int bam_set_qname(bam1_t *rec, const char *qname) *** BAM indexing *** ********************/ -static hts_idx_t *sam_index(htsFile *fp, int min_shift) +static hts_idx_t *sam_index(htsFile *fp, int min_shift, const hts_progress_callback progress_fn, void* progress_data) { int n_lvls, i, fmt, ret; + int64_t last_block_address = 0; bam1_t *b; hts_idx_t *idx; sam_hdr_t *h; @@ -944,6 +945,12 @@ static hts_idx_t *sam_index(htsFile *fp, int min_shift) idx = hts_idx_init(h->n_targets, fmt, bgzf_tell(fp->fp.bgzf), min_shift, n_lvls); b = bam_init1(); while ((ret = sam_read1(fp, h, b)) >= 0) { + if (progress_fn && fp->fp.bgzf->block_address != last_block_address) { + if (progress_fn(fp->fp.bgzf->block_address, progress_data) != 0) { + goto err; + } + last_block_address = fp->fp.bgzf->block_address; + } ret = hts_idx_push(idx, b->core.tid, b->core.pos, bam_endpos(b), bgzf_tell(fp->fp.bgzf), !(b->core.flag&BAM_FUNMAP)); if (ret < 0) { // unsorted or doesn't fit hts_log_error("Read '%s' with ref_name='%s', ref_length=%"PRIhts_pos", flags=%d, pos=%"PRIhts_pos" cannot be indexed", bam_get_qname(b), sam_hdr_tid2name(h, b->core.tid), sam_hdr_tid2len(h, b->core.tid), b->core.flag, b->core.pos+1); @@ -963,7 +970,7 @@ static hts_idx_t *sam_index(htsFile *fp, int min_shift) return NULL; } -int sam_index_build3(const char *fn, const char *fnidx, int min_shift, int nthreads) +int sam_index_build4(const char *fn, const char *fnidx, int min_shift, int nthreads, const hts_progress_callback progress_fn, void* progress_data) { hts_idx_t *idx; htsFile *fp; @@ -976,7 +983,7 @@ int sam_index_build3(const char *fn, const char *fnidx, int min_shift, int nthre switch (fp->format.format) { case cram: - ret = cram_index_build(fp->fp.cram, fn, fnidx); + ret = cram_index_build(fp->fp.cram, fn, fnidx, progress_fn, progress_data); break; case bam: @@ -987,7 +994,7 @@ int sam_index_build3(const char *fn, const char *fnidx, int min_shift, int nthre ret = -1; break; } - idx = sam_index(fp, min_shift); + idx = sam_index(fp, min_shift, progress_fn, progress_data); if (idx) { ret = hts_idx_save_as(idx, fn, fnidx, (min_shift > 0)? HTS_FMT_CSI : HTS_FMT_BAI); if (ret < 0) ret = -4; @@ -1005,14 +1012,19 @@ int sam_index_build3(const char *fn, const char *fnidx, int min_shift, int nthre return ret; } +int sam_index_build3(const char *fn, const char *fnidx, int min_shift, int nthreads) +{ + return sam_index_build4(fn, fnidx, min_shift, nthreads, NULL, NULL); +} + int sam_index_build2(const char *fn, const char *fnidx, int min_shift) { - return sam_index_build3(fn, fnidx, min_shift, 0); + return sam_index_build4(fn, fnidx, min_shift, 0, NULL, NULL); } int sam_index_build(const char *fn, int min_shift) { - return sam_index_build3(fn, NULL, min_shift, 0); + return sam_index_build4(fn, NULL, min_shift, 0, NULL, NULL); } // Provide bam_index_build() symbol for binary compatibility with earlier HTSlib diff --git a/tbx.c b/tbx.c index 0d486ad9c..d97ff59cd 100644 --- a/tbx.c +++ b/tbx.c @@ -286,6 +286,12 @@ static int adjust_n_lvls(int min_shift, int n_lvls, int64_t max_len) } tbx_t *tbx_index(BGZF *fp, int min_shift, const tbx_conf_t *conf) +{ + return tbx_index2(fp, min_shift, conf, NULL, NULL); +} + +tbx_t *tbx_index2(BGZF *fp, int min_shift, const tbx_conf_t *conf, + const hts_progress_callback progress_fn, void *progress_data) { tbx_t *tbx; kstring_t str; @@ -294,6 +300,7 @@ tbx_t *tbx_index(BGZF *fp, int min_shift, const tbx_conf_t *conf) uint64_t last_off = 0; tbx_intv_t intv; int64_t max_ref_len = 0; + int64_t last_block_address = 0; str.s = 0; str.l = str.m = 0; tbx = (tbx_t*)calloc(1, sizeof(tbx_t)); @@ -303,6 +310,14 @@ tbx_t *tbx_index(BGZF *fp, int min_shift, const tbx_conf_t *conf) else min_shift = 14, n_lvls = 5, fmt = HTS_FMT_TBI; while ((ret = bgzf_getline(fp, '\n', &str)) >= 0) { ++lineno; + + if (last_block_address != fp->block_address) { + if (progress_fn && progress_fn(fp->block_address, progress_data) != 0) { + goto fail; + } + last_block_address = fp->block_address; + } + if (str.s[0] == tbx->conf.meta_char && fmt == HTS_FMT_CSI) { switch (tbx->conf.preset) { case TBX_SAM: @@ -363,6 +378,13 @@ void tbx_destroy(tbx_t *tbx) } int tbx_index_build3(const char *fn, const char *fnidx, int min_shift, int n_threads, const tbx_conf_t *conf) +{ + return tbx_index_build4(fn, fnidx, min_shift, n_threads, conf, NULL, NULL); +} + +int tbx_index_build4(const char *fn, const char *fnidx, int min_shift, int n_threads, + const tbx_conf_t *conf, const hts_progress_callback progress_fn, + void *progress_data) { tbx_t *tbx; BGZF *fp; @@ -370,7 +392,7 @@ int tbx_index_build3(const char *fn, const char *fnidx, int min_shift, int n_thr if ((fp = bgzf_open(fn, "r")) == 0) return -1; if ( n_threads ) bgzf_mt(fp, n_threads, 256); if ( bgzf_compression(fp) != bgzf ) { bgzf_close(fp); return -2; } - tbx = tbx_index(fp, min_shift, conf); + tbx = tbx_index2(fp, min_shift, conf, progress_fn, progress_data); bgzf_close(fp); if ( !tbx ) return -1; ret = hts_idx_save_as(tbx->idx, fn, fnidx, min_shift > 0? HTS_FMT_CSI : HTS_FMT_TBI); @@ -378,14 +400,15 @@ int tbx_index_build3(const char *fn, const char *fnidx, int min_shift, int n_thr return ret; } + int tbx_index_build2(const char *fn, const char *fnidx, int min_shift, const tbx_conf_t *conf) { - return tbx_index_build3(fn, fnidx, min_shift, 0, conf); + return tbx_index_build4(fn, fnidx, min_shift, 0, conf, NULL, NULL); } int tbx_index_build(const char *fn, int min_shift, const tbx_conf_t *conf) { - return tbx_index_build3(fn, NULL, min_shift, 0, conf); + return tbx_index_build4(fn, NULL, min_shift, 0, conf, NULL, NULL); } static tbx_t *index_load(const char *fn, const char *fnidx, int flags)