Skip to content

Commit af6786d

Browse files
isicheijaidisidokukushking
authored
Issue 592 backward compat (#949)
* Fixes #592 - Allows user to pass `coerce_int96_timestamp_unit` in a pyarrow_additional_kwargs dict. This avoids integer overflow error when reading Parquet INT96 types into Arrow. - Parquet reader will return a timestamp as a column of datetime objs rather than pd.Timestamps if any column in the table has a timestamp unit that is not nanosecond. * black * Unblocking black * Athena now defaults to reading parquet files with INT96 as milliseconds * black * reblack * pydocstyle * Backwards compatible solution to issue #592 * Type fix * Adding test Co-authored-by: jaidisido <[email protected]> Co-authored-by: kukushking <[email protected]>
1 parent 066b81a commit af6786d

File tree

4 files changed

+117
-5
lines changed

4 files changed

+117
-5
lines changed

awswrangler/_data_types.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -701,3 +701,10 @@ def timestream_type_from_pandas(df: pd.DataFrame) -> str:
701701
pyarrow_type: pa.DataType = list(pyarrow_types.values())[0]
702702
_logger.debug("pyarrow_type: %s", pyarrow_type)
703703
return pyarrow2timestream(dtype=pyarrow_type)
704+
705+
706+
def get_arrow_timestamp_unit(data_type: pa.lib.DataType) -> Any:
707+
"""Return unit of pyarrow timestamp. If the pyarrow type is not timestamp then None is returned."""
708+
if isinstance(data_type, pa.lib.TimestampType):
709+
return data_type.unit
710+
return None

awswrangler/athena/_read.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,7 @@ def _fetch_parquet_result(
222222
boto3_session: boto3.Session,
223223
s3_additional_kwargs: Optional[Dict[str, Any]],
224224
temp_table_fqn: Optional[str] = None,
225+
pyarrow_additional_kwargs: Optional[Dict[str, Any]] = None,
225226
) -> Union[pd.DataFrame, Iterator[pd.DataFrame]]:
226227
ret: Union[pd.DataFrame, Iterator[pd.DataFrame]]
227228
chunked: Union[bool, int] = False if chunksize is None else chunksize
@@ -249,6 +250,7 @@ def _fetch_parquet_result(
249250
chunked=chunked,
250251
categories=categories,
251252
ignore_index=True,
253+
pyarrow_additional_kwargs=pyarrow_additional_kwargs,
252254
)
253255
if chunked is False:
254256
ret = _apply_query_metadata(df=ret, query_metadata=query_metadata)
@@ -337,6 +339,7 @@ def _resolve_query_with_cache(
337339
use_threads: bool,
338340
session: Optional[boto3.Session],
339341
s3_additional_kwargs: Optional[Dict[str, Any]],
342+
pyarrow_additional_kwargs: Optional[Dict[str, Any]] = None,
340343
) -> Union[pd.DataFrame, Iterator[pd.DataFrame]]:
341344
"""Fetch cached data and return it as a pandas DataFrame (or list of DataFrames)."""
342345
_logger.debug("cache_info:\n%s", cache_info)
@@ -358,6 +361,7 @@ def _resolve_query_with_cache(
358361
use_threads=use_threads,
359362
boto3_session=session,
360363
s3_additional_kwargs=s3_additional_kwargs,
364+
pyarrow_additional_kwargs=pyarrow_additional_kwargs,
361365
)
362366
if cache_info.file_format == "csv":
363367
return _fetch_csv_result(
@@ -389,6 +393,7 @@ def _resolve_query_without_cache_ctas(
389393
use_threads: bool,
390394
s3_additional_kwargs: Optional[Dict[str, Any]],
391395
boto3_session: boto3.Session,
396+
pyarrow_additional_kwargs: Optional[Dict[str, Any]] = None,
392397
) -> Union[pd.DataFrame, Iterator[pd.DataFrame]]:
393398
path: str = f"{s3_output}/{name}"
394399
ext_location: str = "\n" if wg_config.enforced is True else f",\n external_location = '{path}'\n"
@@ -465,6 +470,7 @@ def _resolve_query_without_cache_ctas(
465470
s3_additional_kwargs=s3_additional_kwargs,
466471
boto3_session=boto3_session,
467472
temp_table_fqn=fully_qualified_name,
473+
pyarrow_additional_kwargs=pyarrow_additional_kwargs,
468474
)
469475

470476

@@ -532,6 +538,7 @@ def _resolve_query_without_cache(
532538
use_threads: bool,
533539
s3_additional_kwargs: Optional[Dict[str, Any]],
534540
boto3_session: boto3.Session,
541+
pyarrow_additional_kwargs: Optional[Dict[str, Any]] = None,
535542
) -> Union[pd.DataFrame, Iterator[pd.DataFrame]]:
536543
"""
537544
Execute a query in Athena and returns results as DataFrame, back to `read_sql_query`.
@@ -565,6 +572,7 @@ def _resolve_query_without_cache(
565572
use_threads=use_threads,
566573
s3_additional_kwargs=s3_additional_kwargs,
567574
boto3_session=boto3_session,
575+
pyarrow_additional_kwargs=pyarrow_additional_kwargs,
568576
)
569577
finally:
570578
catalog.delete_table_if_exists(
@@ -612,6 +620,7 @@ def read_sql_query(
612620
data_source: Optional[str] = None,
613621
params: Optional[Dict[str, Any]] = None,
614622
s3_additional_kwargs: Optional[Dict[str, Any]] = None,
623+
pyarrow_additional_kwargs: Optional[Dict[str, Any]] = None,
615624
) -> Union[pd.DataFrame, Iterator[pd.DataFrame]]:
616625
"""Execute any SQL query on AWS Athena and return the results as a Pandas DataFrame.
617626
@@ -781,6 +790,14 @@ def read_sql_query(
781790
s3_additional_kwargs : Optional[Dict[str, Any]]
782791
Forwarded to botocore requests.
783792
e.g. s3_additional_kwargs={'RequestPayer': 'requester'}
793+
pyarrow_additional_kwargs : Optional[Dict[str, Any]]
794+
Forward to the ParquetFile class or converting an Arrow table to Pandas, currently only an
795+
"coerce_int96_timestamp_unit" or "timestamp_as_object" argument will be considered. If reading parquet
796+
files where you cannot convert a timestamp to pandas Timestamp[ns] consider setting timestamp_as_object=True,
797+
to allow for timestamp units larger than "ns". If reading parquet data that still uses INT96 (like Athena
798+
outputs) you can use coerce_int96_timestamp_unit to specify what timestamp unit to encode INT96 to (by default
799+
this is "ns", if you know the output parquet came from a system that encodes timestamp to a particular unit
800+
then set this to that same unit e.g. coerce_int96_timestamp_unit="ms").
784801
785802
Returns
786803
-------
@@ -837,6 +854,7 @@ def read_sql_query(
837854
use_threads=use_threads,
838855
session=session,
839856
s3_additional_kwargs=s3_additional_kwargs,
857+
pyarrow_additional_kwargs=pyarrow_additional_kwargs,
840858
)
841859
except Exception as e: # pylint: disable=broad-except
842860
_logger.error(e) # if there is anything wrong with the cache, just fallback to the usual path
@@ -859,6 +877,7 @@ def read_sql_query(
859877
use_threads=use_threads,
860878
s3_additional_kwargs=s3_additional_kwargs,
861879
boto3_session=session,
880+
pyarrow_additional_kwargs=pyarrow_additional_kwargs,
862881
)
863882

864883

@@ -885,6 +904,7 @@ def read_sql_table(
885904
max_local_cache_entries: int = 100,
886905
data_source: Optional[str] = None,
887906
s3_additional_kwargs: Optional[Dict[str, Any]] = None,
907+
pyarrow_additional_kwargs: Optional[Dict[str, Any]] = None,
888908
) -> Union[pd.DataFrame, Iterator[pd.DataFrame]]:
889909
"""Extract the full table AWS Athena and return the results as a Pandas DataFrame.
890910
@@ -1045,6 +1065,15 @@ def read_sql_table(
10451065
s3_additional_kwargs : Optional[Dict[str, Any]]
10461066
Forwarded to botocore requests.
10471067
e.g. s3_additional_kwargs={'RequestPayer': 'requester'}
1068+
pyarrow_additional_kwargs : Optional[Dict[str, Any]]
1069+
Forward to the ParquetFile class or converting an Arrow table to Pandas, currently only an
1070+
"coerce_int96_timestamp_unit" or "timestamp_as_object" argument will be considered. If
1071+
reading parquet fileswhere you cannot convert a timestamp to pandas Timestamp[ns] consider
1072+
setting timestamp_as_object=True, to allow for timestamp units > NS. If reading parquet data that
1073+
still uses INT96 (like Athena outputs) you can use coerce_int96_timestamp_unit to specify what
1074+
timestamp unit to encode INT96 to (by default this is "ns", if you know the output parquet came from
1075+
a system that encodes timestamp to a particular unit then set this to that same unit e.g.
1076+
coerce_int96_timestamp_unit="ms").
10481077
10491078
Returns
10501079
-------
@@ -1081,6 +1110,7 @@ def read_sql_table(
10811110
max_remote_cache_entries=max_remote_cache_entries,
10821111
max_local_cache_entries=max_local_cache_entries,
10831112
s3_additional_kwargs=s3_additional_kwargs,
1113+
pyarrow_additional_kwargs=pyarrow_additional_kwargs,
10841114
)
10851115

10861116

0 commit comments

Comments
 (0)