@@ -540,7 +540,7 @@ def __str__(self):
540
540
else :
541
541
filter_str = ""
542
542
bypass_str = f" bypass_intersecting_intervals_phase={ self .bypass_intersecting_intervals_phase } "
543
- return f"workspace={ self .workspace } attributes={ self .attributes } { filter_str } { bypass_str } "
543
+ return f"workspace={ self .workspace } vidmap= { self . vidmap_file } callset= { self . callset_file } attributes={ self .attributes } { filter_str } { bypass_str } " # noqa
544
544
545
545
546
546
class GenomicsDBQueryConfig (NamedTuple ):
@@ -607,6 +607,13 @@ def configure_query(config: GenomicsDBQueryConfig):
607
607
return query_config
608
608
609
609
610
+ def instantiate_genomicsdb (pb_config , msg ):
611
+ logging .info ("Instantiating genomicsdb to process " + msg + "..." )
612
+ gdb = genomicsdb .connect_with_protobuf (pb_config )
613
+ logging .info ("Instantiating genomicsdb to process " + msg + " DONE" )
614
+ return gdb
615
+
616
+
610
617
def process (config ):
611
618
export_config = config .export_config
612
619
query_config = config .query_config
@@ -618,52 +625,90 @@ def process(config):
618
625
logging .error (msg + f" not imported into workspace({ export_config .workspace } )" )
619
626
return - 1
620
627
global gdb
621
- try :
622
- if gdb :
623
- logging .info ("Found gdb to process " + msg )
628
+ # Allow one retry to account for expired access tokens for azure URLs
629
+ if export_config .workspace .startswith ("az://" ):
630
+ allow_retry = True
631
+ else :
632
+ allow_retry = False
633
+ while True :
634
+ try :
635
+ if gdb :
636
+ logging .info ("Found gdb to process " + msg )
637
+ else :
638
+ logging .info ("Starting new gdb to process " + msg )
639
+ gdb = instantiate_genomicsdb (configure_export (export_config ), msg )
640
+ except NameError :
641
+ gdb = instantiate_genomicsdb (configure_export (export_config ), msg )
642
+
643
+ query_protobuf = configure_query (query_config )
644
+
645
+ try :
646
+ if output_config .type == "csv" :
647
+ df = gdb .query_variant_calls (query_protobuf = query_protobuf , flatten_intervals = True )
648
+ df .to_csv (output_config .filename , index = False )
649
+ elif output_config .type == "json" :
650
+ json_output = gdb .query_variant_calls (
651
+ query_protobuf = query_protobuf , json_output = output_config .json_type
652
+ )
653
+ with open (output_config .filename , "wb" ) as f :
654
+ f .write (json_output )
655
+ elif output_config .type == "arrow" :
656
+ nbytes = 0
657
+ writer = None
658
+ i = 0
659
+ for out in gdb .query_variant_calls (query_protobuf = query_protobuf , arrow_output = True , batching = True ):
660
+ reader = pa .ipc .open_stream (out )
661
+ for batch in reader :
662
+ if nbytes > output_config .max_arrow_bytes :
663
+ i += 1
664
+ nbytes = 0
665
+ if writer :
666
+ writer .close ()
667
+ writer = None
668
+ if not writer :
669
+ writer = pq .ParquetWriter (f"{ output_config .filename } __{ i } .parquet" , batch .schema )
670
+ nbytes += batch .nbytes
671
+ writer .write_batch (batch )
672
+ if writer :
673
+ writer .close ()
674
+ writer = None
675
+ logging .info (f"Processed { msg } " )
676
+ # exit out of the loop as the query has completed
677
+ return 0
678
+ except Exception as e :
679
+ # Try handle read errors from TileDB storage for azure urls
680
+ # e.g. GenomicsDBIteratorException exception : Error while reading from TileDB array
681
+ if allow_retry and "GenomicsDB" in str (type (e )) and "TileDB" in str (e ):
682
+ # Check for the possibility of an expired access token
683
+ allow_retry = False
684
+ try :
685
+ # If the check for workspace under consideration succeeds and the workspace exists as it should,
686
+ # genomicsdb instance is functional! Probably not an expired token, so re-raise outer exception
687
+ if not gdb .workspace_exists (export_config .workspace ):
688
+ logging .info (f"Retrying query with a new instance of gdb for { msg } ..." )
689
+ gdb = None
690
+ continue
691
+ except Exception as ex :
692
+ logging .info (f"Exception({ ex } ) encountered. Retrying query with a new instance of gdb for { msg } ..." )
693
+ gdb = None
694
+ continue
695
+ logging .critical (f"Unexpected exception while processing { msg } : { e } " )
696
+ raise e
697
+
698
+
699
+ def check_output (output ):
700
+ parent_dir = os .path .dirname (output )
701
+ if parent_dir and not os .path .isdir (parent_dir ):
702
+ if os .path .isfile (os .path .dirname (output )):
703
+ raise RuntimeError (f"Cannot proceed as output's parent directory({ parent_dir } ) is a file" )
624
704
else :
625
- logging .error ("Something wrong, gdb seems to be None" )
626
- return - 1
627
- except NameError :
628
- logging .info ("Instantiating genomicsdb to process " + msg + "..." )
629
- gdb = genomicsdb .connect_with_protobuf (configure_export (export_config ))
630
- logging .info ("Instantiating genomicsdb to process " + msg + " DONE" )
631
- query_protobuf = configure_query (query_config )
632
- try :
633
- if output_config .type == "csv" :
634
- df = gdb .query_variant_calls (query_protobuf = query_protobuf , flatten_intervals = True )
635
- df .to_csv (output_config .filename , index = False )
636
- elif output_config .type == "json" :
637
- json_output = gdb .query_variant_calls (query_protobuf = query_protobuf , json_output = output_config .json_type )
638
- with open (output_config .filename , "wb" ) as f :
639
- f .write (json_output )
640
- elif output_config .type == "arrow" :
641
- nbytes = 0
642
- writer = None
643
- i = 0
644
- for out in gdb .query_variant_calls (query_protobuf = query_protobuf , arrow_output = True , batching = True ):
645
- reader = pa .ipc .open_stream (out )
646
- for batch in reader :
647
- if nbytes > output_config .max_arrow_bytes :
648
- i += 1
649
- nbytes = 0
650
- if writer :
651
- writer .close ()
652
- writer = None
653
- if not writer :
654
- writer = pq .ParquetWriter (f"{ output_config .filename } __{ i } .parquet" , batch .schema )
655
- nbytes += batch .nbytes
656
- writer .write_batch (batch )
657
- if writer :
658
- writer .close ()
659
- writer = None
660
- except Exception as e :
661
- logging .critical (f"Unexpected exception : { e } " )
662
- gdb = None
663
- return - 1
664
-
665
- logging .info (f"Processed { msg } " )
666
- return 0
705
+ raise RuntimeError (
706
+ f"Cannot proceed as output's parent directory({ parent_dir } ) does not exist. Create dir({ parent_dir } ) before restarting query" # noqa
707
+ )
708
+ if output .endswith ("/" ) or os .path .isdir (output ):
709
+ return genomicsdb_common .join_paths (output , "query_output" )
710
+ else :
711
+ return output
667
712
668
713
669
714
def main ():
@@ -672,10 +717,8 @@ def main():
672
717
if row_tuples is not None and len (row_tuples ) == 0 :
673
718
return
674
719
675
- print (f"Starting genomicsdb_query for workspace({ workspace } ) and intervals({ intervals } )" )
676
-
677
720
output_type = args .output_type
678
- output = args .output
721
+ output = check_output ( args .output )
679
722
json_type = None
680
723
if output_type == "json" :
681
724
json_type = parse_args_for_json_type (args .json_output_type )
@@ -686,6 +729,8 @@ def main():
686
729
max_arrow_bytes = parse_args_for_max_bytes (args .max_arrow_byte_size )
687
730
print (f"Using { args .max_arrow_byte_size } number of bytes as hint for writing out parquet files" )
688
731
732
+ print (f"Starting genomicsdb_query for workspace({ workspace } ) and intervals({ intervals } )" )
733
+
689
734
export_config = GenomicsDBExportConfig (
690
735
workspace , vidmap_file , callset_file , attributes , args .filter , args .bypass_intersecting_intervals_phase
691
736
)
@@ -749,10 +794,18 @@ def main():
749
794
sys .exit (0 )
750
795
751
796
if min (len (configs ), args .nproc ) == 1 :
752
- results = list (map (process , configs ))
797
+ try :
798
+ results = list (map (process , configs ))
799
+ except Exception as e :
800
+ raise RuntimeError (f"genomicsdb_query returned unexpectedly: { e } " )
753
801
else :
754
802
with multiprocessing .Pool (processes = min (len (configs ), args .nproc )) as pool :
755
- results = list (pool .map (process , configs ))
803
+ try :
804
+ results = list (pool .map (process , configs ))
805
+ except Exception as e :
806
+ pool .terminate ()
807
+ pool .join ()
808
+ raise RuntimeError (f"Terminating as a query in the multiprocessing pool returned unexpectedly: { e } " )
756
809
757
810
msg = "successfully"
758
811
for result in results :
0 commit comments