Skip to content

Commit d7ba8b0

Browse files
authored
Add timestamp as object to database methods (#1132)
* Add timestamp as object to database methods * Adding test
1 parent 02a2d46 commit d7ba8b0

File tree

6 files changed

+120
-9
lines changed

6 files changed

+120
-9
lines changed

awswrangler/_databases.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ def _records2df(
128128
index: Optional[Union[str, List[str]]],
129129
safe: bool,
130130
dtype: Optional[Dict[str, pa.DataType]],
131+
timestamp_as_object: bool,
131132
) -> pd.DataFrame:
132133
arrays: List[pa.Array] = []
133134
for col_values, col_name in zip(tuple(zip(*records)), cols_names): # Transposing
@@ -155,6 +156,7 @@ def _records2df(
155156
date_as_object=True,
156157
types_mapper=_data_types.pyarrow2pandas_extension,
157158
safe=safe,
159+
timestamp_as_object=timestamp_as_object,
158160
)
159161
if index is not None:
160162
df.set_index(index, inplace=True)
@@ -175,6 +177,7 @@ def _iterate_results(
175177
index_col: Optional[Union[str, List[str]]],
176178
safe: bool,
177179
dtype: Optional[Dict[str, pa.DataType]],
180+
timestamp_as_object: bool,
178181
) -> Iterator[pd.DataFrame]:
179182
with con.cursor() as cursor:
180183
cursor.execute(*cursor_args)
@@ -183,7 +186,14 @@ def _iterate_results(
183186
records = cursor.fetchmany(chunksize)
184187
if not records:
185188
break
186-
yield _records2df(records=records, cols_names=cols_names, index=index_col, safe=safe, dtype=dtype)
189+
yield _records2df(
190+
records=records,
191+
cols_names=cols_names,
192+
index=index_col,
193+
safe=safe,
194+
dtype=dtype,
195+
timestamp_as_object=timestamp_as_object,
196+
)
187197

188198

189199
def _fetch_all_results(
@@ -192,6 +202,7 @@ def _fetch_all_results(
192202
index_col: Optional[Union[str, List[str]]] = None,
193203
dtype: Optional[Dict[str, pa.DataType]] = None,
194204
safe: bool = True,
205+
timestamp_as_object: bool = False,
195206
) -> pd.DataFrame:
196207
with con.cursor() as cursor:
197208
cursor.execute(*cursor_args)
@@ -202,6 +213,7 @@ def _fetch_all_results(
202213
index=index_col,
203214
dtype=dtype,
204215
safe=safe,
216+
timestamp_as_object=timestamp_as_object,
205217
)
206218

207219

@@ -213,6 +225,7 @@ def read_sql_query(
213225
chunksize: Optional[int] = None,
214226
dtype: Optional[Dict[str, pa.DataType]] = None,
215227
safe: bool = True,
228+
timestamp_as_object: bool = False,
216229
) -> Union[pd.DataFrame, Iterator[pd.DataFrame]]:
217230
"""Read SQL Query (generic)."""
218231
args = _convert_params(sql, params)
@@ -224,6 +237,7 @@ def read_sql_query(
224237
index_col=index_col,
225238
dtype=dtype,
226239
safe=safe,
240+
timestamp_as_object=timestamp_as_object,
227241
)
228242

229243
return _iterate_results(
@@ -233,6 +247,7 @@ def read_sql_query(
233247
index_col=index_col,
234248
dtype=dtype,
235249
safe=safe,
250+
timestamp_as_object=timestamp_as_object,
236251
)
237252
except Exception as ex:
238253
con.rollback()

awswrangler/mysql.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ def read_sql_query(
174174
chunksize: Optional[int] = None,
175175
dtype: Optional[Dict[str, pa.DataType]] = None,
176176
safe: bool = True,
177+
timestamp_as_object: bool = False,
177178
) -> Union[pd.DataFrame, Iterator[pd.DataFrame]]:
178179
"""Return a DataFrame corresponding to the result set of the query string.
179180
@@ -197,6 +198,8 @@ def read_sql_query(
197198
The keys should be the column names and the values should be the PyArrow types.
198199
safe : bool
199200
Check for overflows or other unsafe data type conversions.
201+
timestamp_as_object : bool
202+
Cast non-nanosecond timestamps (np.datetime64) to objects.
200203
201204
Returns
202205
-------
@@ -218,7 +221,14 @@ def read_sql_query(
218221
"""
219222
_validate_connection(con=con)
220223
return _db_utils.read_sql_query(
221-
sql=sql, con=con, index_col=index_col, params=params, chunksize=chunksize, dtype=dtype, safe=safe
224+
sql=sql,
225+
con=con,
226+
index_col=index_col,
227+
params=params,
228+
chunksize=chunksize,
229+
dtype=dtype,
230+
safe=safe,
231+
timestamp_as_object=timestamp_as_object,
222232
)
223233

224234

@@ -231,6 +241,7 @@ def read_sql_table(
231241
chunksize: Optional[int] = None,
232242
dtype: Optional[Dict[str, pa.DataType]] = None,
233243
safe: bool = True,
244+
timestamp_as_object: bool = False,
234245
) -> Union[pd.DataFrame, Iterator[pd.DataFrame]]:
235246
"""Return a DataFrame corresponding the table.
236247
@@ -257,6 +268,8 @@ def read_sql_table(
257268
The keys should be the column names and the values should be the PyArrow types.
258269
safe : bool
259270
Check for overflows or other unsafe data type conversions.
271+
timestamp_as_object : bool
272+
Cast non-nanosecond timestamps (np.datetime64) to objects.
260273
261274
Returns
262275
-------
@@ -279,7 +292,14 @@ def read_sql_table(
279292
"""
280293
sql: str = f"SELECT * FROM `{table}`" if schema is None else f"SELECT * FROM `{schema}`.`{table}`"
281294
return read_sql_query(
282-
sql=sql, con=con, index_col=index_col, params=params, chunksize=chunksize, dtype=dtype, safe=safe
295+
sql=sql,
296+
con=con,
297+
index_col=index_col,
298+
params=params,
299+
chunksize=chunksize,
300+
dtype=dtype,
301+
safe=safe,
302+
timestamp_as_object=timestamp_as_object,
283303
)
284304

285305

awswrangler/postgresql.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ def read_sql_query(
169169
chunksize: Optional[int] = None,
170170
dtype: Optional[Dict[str, pa.DataType]] = None,
171171
safe: bool = True,
172+
timestamp_as_object: bool = False,
172173
) -> Union[pd.DataFrame, Iterator[pd.DataFrame]]:
173174
"""Return a DataFrame corresponding to the result set of the query string.
174175
@@ -192,6 +193,8 @@ def read_sql_query(
192193
The keys should be the column names and the values should be the PyArrow types.
193194
safe : bool
194195
Check for overflows or other unsafe data type conversions.
196+
timestamp_as_object : bool
197+
Cast non-nanosecond timestamps (np.datetime64) to objects.
195198
196199
Returns
197200
-------
@@ -213,7 +216,14 @@ def read_sql_query(
213216
"""
214217
_validate_connection(con=con)
215218
return _db_utils.read_sql_query(
216-
sql=sql, con=con, index_col=index_col, params=params, chunksize=chunksize, dtype=dtype, safe=safe
219+
sql=sql,
220+
con=con,
221+
index_col=index_col,
222+
params=params,
223+
chunksize=chunksize,
224+
dtype=dtype,
225+
safe=safe,
226+
timestamp_as_object=timestamp_as_object,
217227
)
218228

219229

@@ -226,6 +236,7 @@ def read_sql_table(
226236
chunksize: Optional[int] = None,
227237
dtype: Optional[Dict[str, pa.DataType]] = None,
228238
safe: bool = True,
239+
timestamp_as_object: bool = False,
229240
) -> Union[pd.DataFrame, Iterator[pd.DataFrame]]:
230241
"""Return a DataFrame corresponding the table.
231242
@@ -252,6 +263,8 @@ def read_sql_table(
252263
The keys should be the column names and the values should be the PyArrow types.
253264
safe : bool
254265
Check for overflows or other unsafe data type conversions.
266+
timestamp_as_object : bool
267+
Cast non-nanosecond timestamps (np.datetime64) to objects.
255268
256269
Returns
257270
-------
@@ -274,7 +287,14 @@ def read_sql_table(
274287
"""
275288
sql: str = f'SELECT * FROM "{table}"' if schema is None else f'SELECT * FROM "{schema}"."{table}"'
276289
return read_sql_query(
277-
sql=sql, con=con, index_col=index_col, params=params, chunksize=chunksize, dtype=dtype, safe=safe
290+
sql=sql,
291+
con=con,
292+
index_col=index_col,
293+
params=params,
294+
chunksize=chunksize,
295+
dtype=dtype,
296+
safe=safe,
297+
timestamp_as_object=timestamp_as_object,
278298
)
279299

280300

awswrangler/redshift.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,7 @@ def read_sql_query(
574574
chunksize: Optional[int] = None,
575575
dtype: Optional[Dict[str, pa.DataType]] = None,
576576
safe: bool = True,
577+
timestamp_as_object: bool = False,
577578
) -> Union[pd.DataFrame, Iterator[pd.DataFrame]]:
578579
"""Return a DataFrame corresponding to the result set of the query string.
579580
@@ -602,6 +603,8 @@ def read_sql_query(
602603
The keys should be the column names and the values should be the PyArrow types.
603604
safe : bool
604605
Check for overflows or other unsafe data type conversions.
606+
timestamp_as_object : bool
607+
Cast non-nanosecond timestamps (np.datetime64) to objects.
605608
606609
Returns
607610
-------
@@ -623,7 +626,14 @@ def read_sql_query(
623626
"""
624627
_validate_connection(con=con)
625628
return _db_utils.read_sql_query(
626-
sql=sql, con=con, index_col=index_col, params=params, chunksize=chunksize, dtype=dtype, safe=safe
629+
sql=sql,
630+
con=con,
631+
index_col=index_col,
632+
params=params,
633+
chunksize=chunksize,
634+
dtype=dtype,
635+
safe=safe,
636+
timestamp_as_object=timestamp_as_object,
627637
)
628638

629639

@@ -636,6 +646,7 @@ def read_sql_table(
636646
chunksize: Optional[int] = None,
637647
dtype: Optional[Dict[str, pa.DataType]] = None,
638648
safe: bool = True,
649+
timestamp_as_object: bool = False,
639650
) -> Union[pd.DataFrame, Iterator[pd.DataFrame]]:
640651
"""Return a DataFrame corresponding the table.
641652
@@ -667,6 +678,8 @@ def read_sql_table(
667678
The keys should be the column names and the values should be the PyArrow types.
668679
safe : bool
669680
Check for overflows or other unsafe data type conversions.
681+
timestamp_as_object : bool
682+
Cast non-nanosecond timestamps (np.datetime64) to objects.
670683
671684
Returns
672685
-------
@@ -689,7 +702,14 @@ def read_sql_table(
689702
"""
690703
sql: str = f'SELECT * FROM "{table}"' if schema is None else f'SELECT * FROM "{schema}"."{table}"'
691704
return read_sql_query(
692-
sql=sql, con=con, index_col=index_col, params=params, chunksize=chunksize, dtype=dtype, safe=safe
705+
sql=sql,
706+
con=con,
707+
index_col=index_col,
708+
params=params,
709+
chunksize=chunksize,
710+
dtype=dtype,
711+
safe=safe,
712+
timestamp_as_object=timestamp_as_object,
693713
)
694714

695715

awswrangler/sqlserver.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ def read_sql_query(
190190
chunksize: Optional[int] = None,
191191
dtype: Optional[Dict[str, pa.DataType]] = None,
192192
safe: bool = True,
193+
timestamp_as_object: bool = False,
193194
) -> Union[pd.DataFrame, Iterator[pd.DataFrame]]:
194195
"""Return a DataFrame corresponding to the result set of the query string.
195196
@@ -213,6 +214,8 @@ def read_sql_query(
213214
The keys should be the column names and the values should be the PyArrow types.
214215
safe : bool
215216
Check for overflows or other unsafe data type conversions.
217+
timestamp_as_object : bool
218+
Cast non-nanosecond timestamps (np.datetime64) to objects.
216219
217220
Returns
218221
-------
@@ -233,7 +236,14 @@ def read_sql_query(
233236
"""
234237
_validate_connection(con=con)
235238
return _db_utils.read_sql_query(
236-
sql=sql, con=con, index_col=index_col, params=params, chunksize=chunksize, dtype=dtype, safe=safe
239+
sql=sql,
240+
con=con,
241+
index_col=index_col,
242+
params=params,
243+
chunksize=chunksize,
244+
dtype=dtype,
245+
safe=safe,
246+
timestamp_as_object=timestamp_as_object,
237247
)
238248

239249

@@ -247,6 +257,7 @@ def read_sql_table(
247257
chunksize: Optional[int] = None,
248258
dtype: Optional[Dict[str, pa.DataType]] = None,
249259
safe: bool = True,
260+
timestamp_as_object: bool = False,
250261
) -> Union[pd.DataFrame, Iterator[pd.DataFrame]]:
251262
"""Return a DataFrame corresponding the table.
252263
@@ -273,6 +284,8 @@ def read_sql_table(
273284
The keys should be the column names and the values should be the PyArrow types.
274285
safe : bool
275286
Check for overflows or other unsafe data type conversions.
287+
timestamp_as_object : bool
288+
Cast non-nanosecond timestamps (np.datetime64) to objects.
276289
277290
Returns
278291
-------
@@ -295,7 +308,14 @@ def read_sql_table(
295308
table_identifier = _get_table_identifier(schema, table)
296309
sql: str = f"SELECT * FROM {table_identifier}"
297310
return read_sql_query(
298-
sql=sql, con=con, index_col=index_col, params=params, chunksize=chunksize, dtype=dtype, safe=safe
311+
sql=sql,
312+
con=con,
313+
index_col=index_col,
314+
params=params,
315+
chunksize=chunksize,
316+
dtype=dtype,
317+
safe=safe,
318+
timestamp_as_object=timestamp_as_object,
299319
)
300320

301321

tests/test_postgresql.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import logging
2+
from datetime import datetime
23
from decimal import Decimal
34

45
import pandas as pd
@@ -371,3 +372,18 @@ def test_upsert_multiple_conflict_columns(postgresql_table, postgresql_con):
371372
df7["c1"] = df7["c1"].astype("Int64")
372373
df7["c2"] = df7["c2"].astype("Int64")
373374
assert df6.equals(df7)
375+
376+
377+
def test_timestamp_overflow(postgresql_table, postgresql_con):
378+
df = pd.DataFrame({"c0": [datetime.strptime("1677-01-01 00:00:00.0", "%Y-%m-%d %H:%M:%S.%f")]})
379+
wr.postgresql.to_sql(df=df, con=postgresql_con, schema="public", table=postgresql_table)
380+
381+
with pytest.raises(pa._lib.ArrowInvalid):
382+
wr.postgresql.read_sql_table(
383+
con=postgresql_con, schema="public", table=postgresql_table, timestamp_as_object=False
384+
)
385+
386+
df2 = wr.postgresql.read_sql_table(
387+
con=postgresql_con, schema="public", table=postgresql_table, timestamp_as_object=True
388+
)
389+
assert df.c0.values[0] == df2.c0.values[0]

0 commit comments

Comments
 (0)