Skip to content

Commit b786900

Browse files
ffelixgCopilotgargsaumyabewithgaurav
authored
FEAT: Add arrow fetch support (#354)
### Work Item / Issue Reference <!-- IMPORTANT: Please follow the PR template guidelines below. For mssql-python maintainers: Insert your ADO Work Item ID below (e.g. AB#37452) For external contributors: Insert Github Issue number below (e.g. #149) Only one reference is required - either GitHub issue OR ADO Work Item. --> <!-- External contributors: GitHub Issue --> > GitHub Issue: #130 ------------------------------------------------------------------- ### Summary <!-- Insert your summary of changes below. Minimum 10 characters required. --> Hey, you mentioned in issue #130 that you were willing to consider community contributions for adding Apache Arrow support, so here you go. I have focused only on fetching data into Arrow structures from the Database. The Function signatures I chose are: * **`arrow_batch(chunk_size=10000)`**: Fetch a single `pyarrow.RecordBatch`, base for the other two methods. * **`arrow(chunk_size=10000)`**: Fetches the entire result set as a single `pyarrow.Table`. * **`arrow_reader(chunk_size=10000)`**: Returns a `pyarrow.RecordBatchReader` for streaming results without loading the entire dataset into RAM. Using `fetch_arrow...` instead of just `arrow...` could also be a good option, but I think the terse version is not too ambiguous. ### Technical details I am not very familiar with C++, but I did have some prior practice for this task from implementing my [own ODBC driver in Zig](https://github.com/ffelixg/zodbc_py) (a very good language for projects like this!). The implementation is written almost entirely in C++ in the `FetchArrowBatch_wrap` function, which produces PyCapsules that are then consumed by `arrow_batch` and turned into actual arrow objects. The function itself is very large. I'm sure it could be factored in a better way, even sharing some code with the other methods of fetching, but my goal was to keep the whole thing as straight forward as possible. I have also implemented my own loop for SQLGetData for Lob-Columns. Unlike with the python fetch methods, I don't use the result directly, but instead copy it into the same buffer I would use for the case with bound columns. Maybe that's an abstraction that would make sense for that case as well. ### Notes on data types I noticed that you use SQL_C_TYPE_TIME for time(x) columns. The arrow fetch does the same, but I think it would be better to use SQL_C_SS_TIME2, since that supports fractional seconds. Datetimeoffset is a bit tricky, since SQL Server stores timezone information alongside each cell, while arrow tables expect a fixed timezone for the entire column. I don't really see any solution other than converting everything to UTC and returning a UTC column, so that's what I did. SQL_C_CHAR columns get copied directly into arrow utf8 arrays. Maybe some encoding options would be useful. ### Performance I think the main performance win to be gained is not interacting with any Python data structures in the hot path. That is satisfied. Further optimizations, which I did not make are: * Releasing the GIL for the entire fetch loop * Sharing the bound fetch buffer across repeated fetch calls * Improve the hot loop switching Instead of looping over rows and columns and then switching on the data type for each cell, you could * Put the row loop inside each switch case (fastest I think, but would bloat the code a lot more) * Use function pointers like you recently did for python fetching (has overhead because of the indirect function call I think, also code is more scattered) * Replace both loops and the switch with computed gotos. That's what I opted for in my ODBC driver (the Zig equivalent is a labeled switch) and I am quite happy with how it came out. Performance seems very good and it allows you to abstract the fetching process on a row by row basis. I don't know how well that would translate to C++. Overall the arrow performance seems not too far off from what I achieved with zodbc. <!-- ### PR Title Guide > For feature requests FEAT: (short-description) > For non-feature requests like test case updates, config updates , dependency updates etc CHORE: (short-description) > For Fix requests FIX: (short-description) > For doc update requests DOC: (short-description) > For Formatting, indentation, or styling update STYLE: (short-description) > For Refactor, without any feature changes REFACTOR: (short-description) > For release related changes, without any feature changes RELEASE: #<RELEASE_VERSION> (short-description) ### Contribution Guidelines External contributors: - Create a GitHub issue first: https://github.com/microsoft/mssql-python/issues/new - Link the GitHub issue in the "GitHub Issue" section above - Follow the PR title format and provide a meaningful summary mssql-python maintainers: - Create an ADO Work Item following internal processes - Link the ADO Work Item in the "ADO Work Item" section above - Follow the PR title format and provide a meaningful summary --> --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: gargsaumya <saumyagarg.100@gmail.com> Co-authored-by: Gaurav Sharma <sharmag@microsoft.com>
1 parent 87f653b commit b786900

File tree

6 files changed

+1736
-0
lines changed

6 files changed

+1736
-0
lines changed

mssql_python/cursor.py

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,10 @@
3636
)
3737

3838
if TYPE_CHECKING:
39+
import pyarrow # type: ignore
3940
from mssql_python.connection import Connection
41+
else:
42+
pyarrow = None
4043

4144
# Constants for string handling
4245
MAX_INLINE_CHAR: int = (
@@ -775,6 +778,19 @@ def _check_closed(self) -> None:
775778
ddbc_error="",
776779
)
777780

781+
def _ensure_pyarrow(self) -> Any:
782+
"""
783+
Import and return pyarrow or raise ImportError accordingly.
784+
"""
785+
try:
786+
import pyarrow
787+
788+
return pyarrow
789+
except ImportError as e:
790+
raise ImportError(
791+
"pyarrow is required for Arrow fetch methods. Please install pyarrow."
792+
) from e
793+
778794
def setinputsizes(self, sizes: List[Union[int, tuple]]) -> None:
779795
"""
780796
Sets the type information to be used for parameters in execute and executemany.
@@ -2516,6 +2532,94 @@ def fetchall(self) -> List[Row]:
25162532
# On error, don't increment rownumber - rethrow the error
25172533
raise e
25182534

2535+
def arrow_batch(self, batch_size: int = 8192) -> "pyarrow.RecordBatch":
2536+
"""
2537+
Fetch a single pyarrow Record Batch of the specified size from the
2538+
query result set.
2539+
2540+
Args:
2541+
batch_size: Maximum number of rows to fetch in the Record Batch.
2542+
2543+
Returns:
2544+
A pyarrow RecordBatch object containing up to batch_size rows.
2545+
"""
2546+
self._check_closed() # Check if the cursor is closed
2547+
pyarrow = self._ensure_pyarrow()
2548+
2549+
if not self._has_result_set and self.description:
2550+
self._reset_rownumber()
2551+
2552+
capsules = []
2553+
ret = ddbc_bindings.DDBCSQLFetchArrowBatch(self.hstmt, capsules, max(batch_size, 0))
2554+
check_error(ddbc_sql_const.SQL_HANDLE_STMT.value, self.hstmt, ret)
2555+
2556+
batch = pyarrow.RecordBatch._import_from_c_capsule(*capsules)
2557+
2558+
if self.hstmt:
2559+
self.messages.extend(ddbc_bindings.DDBCSQLGetAllDiagRecords(self.hstmt))
2560+
2561+
# Update rownumber for the number of rows actually fetched
2562+
num_fetched = batch.num_rows
2563+
if num_fetched > 0 and self._has_result_set:
2564+
self._next_row_index += num_fetched
2565+
self._rownumber = self._next_row_index - 1
2566+
2567+
# Centralize rowcount assignment after fetch
2568+
if num_fetched == 0 and self._next_row_index == 0:
2569+
self.rowcount = 0
2570+
else:
2571+
self.rowcount = self._next_row_index
2572+
2573+
return batch
2574+
2575+
def arrow(self, batch_size: int = 8192) -> "pyarrow.Table":
2576+
"""
2577+
Fetch the entire result as a pyarrow Table.
2578+
2579+
Args:
2580+
batch_size: Size of the Record Batches which make up the Table.
2581+
2582+
Returns:
2583+
A pyarrow Table containing all remaining rows from the result set.
2584+
"""
2585+
self._check_closed() # Check if the cursor is closed
2586+
pyarrow = self._ensure_pyarrow()
2587+
2588+
batches: list["pyarrow.RecordBatch"] = []
2589+
while True:
2590+
batch = self.arrow_batch(batch_size)
2591+
if batch.num_rows < batch_size or batch_size <= 0:
2592+
if not batches or batch.num_rows > 0:
2593+
batches.append(batch)
2594+
break
2595+
batches.append(batch)
2596+
return pyarrow.Table.from_batches(batches, schema=batches[0].schema)
2597+
2598+
def arrow_reader(self, batch_size: int = 8192) -> "pyarrow.RecordBatchReader":
2599+
"""
2600+
Fetch the result as a pyarrow RecordBatchReader, which yields Record
2601+
Batches of the specified size until the current result set is
2602+
exhausted.
2603+
2604+
Args:
2605+
batch_size: Size of the Record Batches produced by the reader.
2606+
2607+
Returns:
2608+
A pyarrow RecordBatchReader for the result set.
2609+
"""
2610+
self._check_closed() # Check if the cursor is closed
2611+
pyarrow = self._ensure_pyarrow()
2612+
2613+
# Fetch schema without advancing cursor
2614+
schema_batch = self.arrow_batch(0)
2615+
schema = schema_batch.schema
2616+
2617+
def batch_generator():
2618+
while (batch := self.arrow_batch(batch_size)).num_rows > 0:
2619+
yield batch
2620+
2621+
return pyarrow.RecordBatchReader.from_batches(schema, batch_generator())
2622+
25192623
def nextset(self) -> Union[bool, None]:
25202624
"""
25212625
Skip to the next available result set.

mssql_python/mssql_python.pyi

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ Type stubs for mssql_python package - based on actual public API
77
from typing import Any, Dict, List, Optional, Union, Tuple, Sequence, Callable, Iterator
88
import datetime
99
import logging
10+
import pyarrow
1011

1112
# GLOBALS - DB-API 2.0 Required Module Globals
1213
# https://www.python.org/dev/peps/pep-0249/#module-interface
@@ -199,6 +200,11 @@ class Cursor:
199200
def setinputsizes(self, sizes: List[Union[int, Tuple[Any, ...]]]) -> None: ...
200201
def setoutputsize(self, size: int, column: Optional[int] = None) -> None: ...
201202

203+
# Arrow Extension Methods (requires pyarrow)
204+
def arrow_batch(self, batch_size: int = 8192) -> pyarrow.RecordBatch: ...
205+
def arrow(self, batch_size: int = 8192) -> pyarrow.Table: ...
206+
def arrow_reader(self, batch_size: int = 8192) -> pyarrow.RecordBatchReader: ...
207+
202208
# DB-API 2.0 Connection Object
203209
# https://www.python.org/dev/peps/pep-0249/#connection-objects
204210
class Connection:

0 commit comments

Comments
 (0)