@@ -234,10 +234,11 @@ def test_to_s3(
234
234
):
235
235
dataframe = pd .read_csv ("data_samples/micro.csv" )
236
236
func = session .pandas .to_csv if file_format == "csv" else session .pandas .to_parquet
237
+ path = f"s3://{ bucket } /test/"
237
238
objects_paths = func (
238
239
dataframe = dataframe ,
239
240
database = database ,
240
- path = f"s3:// { bucket } /test/" ,
241
+ path = path ,
241
242
preserve_index = preserve_index ,
242
243
mode = mode ,
243
244
partition_cols = partition_cols ,
@@ -264,9 +265,10 @@ def test_to_parquet_with_cast_int(
264
265
database ,
265
266
):
266
267
dataframe = pd .read_csv ("data_samples/nano.csv" , dtype = {"id" : "Int64" }, parse_dates = ["date" , "time" ])
268
+ path = f"s3://{ bucket } /test/"
267
269
session .pandas .to_parquet (dataframe = dataframe ,
268
270
database = database ,
269
- path = f"s3:// { bucket } /test/" ,
271
+ path = path ,
270
272
preserve_index = False ,
271
273
mode = "overwrite" ,
272
274
procs_cpu_bound = 1 ,
@@ -277,6 +279,7 @@ def test_to_parquet_with_cast_int(
277
279
dataframe2 = session .pandas .read_sql_athena (sql = "select * from test" , database = database )
278
280
if len (dataframe .index ) == len (dataframe2 .index ):
279
281
break
282
+ session .s3 .delete_objects (path = path )
280
283
assert len (dataframe .index ) == len (dataframe2 .index )
281
284
assert len (list (dataframe .columns )) == len (list (dataframe2 .columns ))
282
285
assert dataframe [dataframe ["id" ] == 0 ].iloc [0 ]["name" ] == dataframe2 [dataframe2 ["id" ] == 0 ].iloc [0 ]["name" ]
@@ -385,9 +388,10 @@ def test_etl_complex(session, bucket, database, max_result_size):
385
388
dataframe = pd .read_csv ("data_samples/complex.csv" ,
386
389
dtype = {"my_int_with_null" : "Int64" },
387
390
parse_dates = ["my_timestamp" , "my_date" ])
391
+ path = f"s3://{ bucket } /test/"
388
392
session .pandas .to_parquet (dataframe = dataframe ,
389
393
database = database ,
390
- path = f"s3:// { bucket } /test/" ,
394
+ path = path ,
391
395
preserve_index = False ,
392
396
mode = "overwrite" ,
393
397
procs_cpu_bound = 1 )
@@ -412,6 +416,7 @@ def test_etl_complex(session, bucket, database, max_result_size):
412
416
assert str (
413
417
row .my_string
414
418
) == "foo\n boo\n bar\n FOO\n BOO\n BAR\n xxxxx\n ÁÃÀÂÇ\n 汉字汉字汉字汉字汉字汉字汉字æøåæøåæøåæøåæøåæøåæøåæøåæøåæøå汉字汉字汉字汉字汉字汉字汉字æøåæøåæøåæøåæøåæøåæøåæøåæøåæøå"
419
+ session .s3 .delete_objects (path = path )
415
420
assert count == len (dataframe .index )
416
421
417
422
@@ -423,9 +428,10 @@ def test_to_parquet_with_kms(
423
428
extra_args = {"ServerSideEncryption" : "aws:kms" , "SSEKMSKeyId" : kms_key }
424
429
session_inner = Session (s3_additional_kwargs = extra_args )
425
430
dataframe = pd .read_csv ("data_samples/nano.csv" )
431
+ path = f"s3://{ bucket } /test/"
426
432
session_inner .pandas .to_parquet (dataframe = dataframe ,
427
433
database = database ,
428
- path = f"s3:// { bucket } /test/" ,
434
+ path = path ,
429
435
preserve_index = False ,
430
436
mode = "overwrite" ,
431
437
procs_cpu_bound = 1 )
@@ -435,6 +441,7 @@ def test_to_parquet_with_kms(
435
441
dataframe2 = session_inner .pandas .read_sql_athena (sql = "select * from test" , database = database )
436
442
if len (dataframe .index ) == len (dataframe2 .index ):
437
443
break
444
+ session_inner .s3 .delete_objects (path = path )
438
445
assert len (dataframe .index ) == len (dataframe2 .index )
439
446
assert len (list (dataframe .columns )) == len (list (dataframe2 .columns ))
440
447
assert dataframe [dataframe ["id" ] == 0 ].iloc [0 ]["name" ] == dataframe2 [dataframe2 ["id" ] == 0 ].iloc [0 ]["name" ]
@@ -1196,3 +1203,49 @@ def test_nan_cast(session, bucket, database, partition_cols):
1196
1203
assert df2 .dtypes [4 ] == "Int64"
1197
1204
assert df2 .dtypes [5 ] == "object"
1198
1205
session .s3 .delete_objects (path = path )
1206
+
1207
+
1208
+ def test_to_parquet_date_null (session , bucket , database ):
1209
+ df = pd .DataFrame ({
1210
+ "col1" : ["val1" , "val2" ],
1211
+ "datecol" : [date (2019 , 11 , 9 ), None ],
1212
+ })
1213
+ path = f"s3://{ bucket } /test/"
1214
+ session .pandas .to_parquet (dataframe = df ,
1215
+ database = database ,
1216
+ table = "test" ,
1217
+ path = path ,
1218
+ mode = "overwrite" ,
1219
+ preserve_index = False ,
1220
+ procs_cpu_bound = 1 )
1221
+ df2 = None
1222
+ for counter in range (10 ): # Retrying to workaround s3 eventual consistency
1223
+ sleep (1 )
1224
+ df2 = session .pandas .read_sql_athena (sql = "select * from test" , database = database )
1225
+ if len (df .index ) == len (df2 .index ):
1226
+ break
1227
+ path = f"s3://{ bucket } /test2/"
1228
+ session .pandas .to_parquet (dataframe = df2 ,
1229
+ database = database ,
1230
+ table = "test2" ,
1231
+ path = path ,
1232
+ mode = "overwrite" ,
1233
+ preserve_index = False ,
1234
+ procs_cpu_bound = 1 )
1235
+ df3 = None
1236
+ for counter in range (10 ): # Retrying to workaround s3 eventual consistency
1237
+ sleep (1 )
1238
+ df3 = session .pandas .read_sql_athena (sql = "select * from test2" , database = database )
1239
+ if len (df2 .index ) == len (df3 .index ):
1240
+ break
1241
+
1242
+ session .s3 .delete_objects (path = path )
1243
+
1244
+ assert len (list (df .columns )) == len (list (df2 .columns )) == len (list (df3 .columns ))
1245
+ assert len (df .index ) == len (df2 .index ) == len (df3 .index )
1246
+
1247
+ assert df [df .col1 == "val1" ].iloc [0 ].datecol == df2 [df2 .col1 == "val1" ].iloc [0 ].datecol
1248
+ assert df2 [df2 .col1 == "val1" ].iloc [0 ].datecol == df3 [df3 .col1 == "val1" ].iloc [0 ].datecol == date (2019 , 11 , 9 )
1249
+
1250
+ assert df [df .col1 == "val2" ].iloc [0 ].datecol == df2 [df2 .col1 == "val2" ].iloc [0 ].datecol
1251
+ assert df2 [df2 .col1 == "val2" ].iloc [0 ].datecol == df3 [df3 .col1 == "val2" ].iloc [0 ].datecol is None
0 commit comments