Skip to content

Commit 74d866b

Browse files
authored
minor streamwriter changes to allow compression (#55)
1 parent aa4eff5 commit 74d866b

File tree

3 files changed

+24
-29
lines changed

3 files changed

+24
-29
lines changed

src/genomicsdb.pyx

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -199,14 +199,15 @@ cdef class _GenomicsDB:
199199
flatten_intervals=False,
200200
json_output=None,
201201
arrow_output=None,
202-
# batching only used with arrow_output
203-
batching=False):
202+
# batching/compress only used with arrow_output
203+
batching=False,
204+
compress=None):
204205
""" Query for variant calls from the GenomicsDB workspace using array, column_ranges and row_ranges for subsetting """
205206

206207
if json_output is not None:
207208
return self.query_variant_calls_json(array, column_ranges, row_ranges, query_protobuf, json_output);
208209
elif arrow_output is not None:
209-
return self.query_variant_calls_arrow(array, column_ranges, row_ranges, query_protobuf, batching);
210+
return self.query_variant_calls_arrow(array, column_ranges, row_ranges, query_protobuf, batching, compress);
210211
elif flatten_intervals is True:
211212
return self.query_variant_calls_columnar(array, column_ranges, row_ranges, query_protobuf)
212213
else:
@@ -318,7 +319,8 @@ cdef class _GenomicsDB:
318319
column_ranges=None,
319320
row_ranges=None,
320321
query_protobuf: query_pb.QueryConfiguration=None,
321-
batching=False):
322+
batching=False,
323+
compress=None):
322324
""" Query for variant calls from the GenomicsDB workspace using array, column_ranges and row_ranges for subsetting """
323325

324326
cdef ArrowVariantCallProcessor processor
@@ -366,19 +368,24 @@ cdef class _GenomicsDB:
366368
schema_capsule = pycapsule_get_arrow_schema(arrow_schema)
367369
schema_obj = _ArrowSchemaWrapper._import_from_c_capsule(schema_capsule)
368370
schema = pa.schema(schema_obj.children_schema)
369-
yield schema.serialize().to_pybytes()
370371
else:
371372
raise GenomicsDBException("Failed to retrieve arrow schema for query_variant_calls()")
372373

373374
cdef void* arrow_array = NULL
375+
w_opts = pa.ipc.IpcWriteOptions(allow_64bit=True, compression=compress)
374376
while True:
375377
try:
376378
arrow_array = processor.arrow_array()
377379
if arrow_array:
378380
array_capsule = pycapsule_get_arrow_array(arrow_array)
379381
array_obj = _ArrowArrayWrapper._import_from_c_capsule(schema_capsule, array_capsule)
380382
arrays = [pa.array(array_obj.child(i)) for i in range(array_obj.n_children)]
381-
yield pa.RecordBatch.from_arrays(arrays, schema=schema).serialize().to_pybytes()
383+
batch = pa.RecordBatch.from_arrays(arrays, schema=schema)
384+
sink = pa.BufferOutputStream()
385+
writer = pa.RecordBatchStreamWriter(sink, schema, options=w_opts)
386+
writer.write_batch(batch)
387+
writer.close()
388+
yield sink.getvalue().to_pybytes()
382389
else:
383390
break
384391
except Exception as e:

test/test_genomicsdb.py

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -78,29 +78,21 @@ def test_connect_and_query_with_protobuf(setup):
7878
json_output=9999)
7979

8080
# test with query protobuf and arrow output
81-
first = True
8281
schema = pa.schema([("null_field", pa.string())])
8382
for output in gdb.query_variant_calls(row_ranges=[(0, 3)], array="t0_1_2", arrow_output=True):
84-
if first:
85-
schema = pa.ipc.read_schema(pa.py_buffer(output))
86-
first = False
87-
else:
88-
batch = pa.ipc.read_record_batch(pa.py_buffer(output), schema)
89-
assert batch.num_columns == 6
90-
assert batch.num_rows == 5
83+
reader = pa.ipc.open_stream(output)
84+
batch = reader.read_next_batch()
85+
assert batch.num_columns == 6
86+
assert batch.num_rows == 5
9187

92-
first = True
9388
batch = None
9489
for output in gdb.query_variant_calls(
9590
row_ranges=[(0, 3)], array="t0_1_2", arrow_output=True, batching=True
9691
):
97-
if first:
98-
schema = pa.ipc.read_schema(pa.py_buffer(output))
99-
first = False
100-
else:
101-
batch = pa.ipc.read_record_batch(pa.py_buffer(output), schema)
102-
assert batch.num_columns == 6
103-
assert batch.num_rows == 1 or batch.num_rows == 3
92+
reader = pa.ipc.open_stream(output)
93+
batch = reader.read_next_batch()
94+
assert batch.num_columns == 6
95+
assert batch.num_rows == 1 or batch.num_rows == 3
10496

10597
# test with query contig interval and no results
10698
interval = query_coords.ContigInterval()

test/test_genomicsdb_demo.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -111,14 +111,10 @@ def test_genomicsdb_demo_with_arrow_output(self):
111111
start = time.time()
112112
gdb = genomicsdb.connect_with_protobuf(self.query_config)
113113
print("\nSummary for batching mode=" + str(batching_mode) + ":")
114-
first = True
115114
for output in gdb.query_variant_calls(arrow_output=True, batching=batching_mode):
116-
if first:
117-
schema = pa.ipc.read_schema(pa.py_buffer(output))
118-
first = False
119-
else:
120-
batch = pa.ipc.read_record_batch(pa.py_buffer(output), schema)
121-
print("batch num_rows="+str(batch.num_rows)+" num_columns="+str(batch.num_columns))
115+
reader = pa.ipc.open_stream(output)
116+
batch = reader.read_next_batch()
117+
print("batch num_rows="+str(batch.num_rows)+" num_columns="+str(batch.num_columns))
122118
print("\tElapsed time: " + str(time.time() - start))
123119

124120
if __name__ == '__main__':

0 commit comments

Comments
 (0)