Skip to content

Commit 2b676f9

Browse files
committed
Add read_csv with iterator for files that does not fit in memory.
1 parent 2584dae commit 2b676f9

File tree

4 files changed

+234
-18
lines changed

4 files changed

+234
-18
lines changed

awswrangler/exceptions.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,5 +26,9 @@ class AthenaQueryError(Exception):
2626
pass
2727

2828

29+
class EmptyS3Object(Exception):
30+
pass
31+
32+
2933
class MissingBatchDetected(Exception):
3034
pass

awswrangler/pandas.py

Lines changed: 205 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from io import BytesIO
1+
from io import BytesIO, StringIO
22
import multiprocessing as mp
33
import logging
44
from math import floor
@@ -7,7 +7,7 @@
77
import pyarrow
88
from pyarrow import parquet
99

10-
from awswrangler.exceptions import UnsupportedWriteMode, UnsupportedFileFormat, AthenaQueryError
10+
from awswrangler.exceptions import UnsupportedWriteMode, UnsupportedFileFormat, AthenaQueryError, EmptyS3Object
1111
from awswrangler.utils import calculate_bounders
1212
from awswrangler import s3
1313

@@ -34,6 +34,7 @@ def _parse_path(path):
3434
def read_csv(
3535
self,
3636
path,
37+
max_result_size=None,
3738
header="infer",
3839
names=None,
3940
dtype=None,
@@ -44,15 +45,211 @@ def read_csv(
4445
escapechar=None,
4546
parse_dates=False,
4647
infer_datetime_format=False,
47-
encoding=None,
48+
encoding="utf-8",
4849
):
50+
"""
51+
Read CSV file from AWS S3 using optimized strategies.
52+
Try to mimic as most as possible pandas.read_csv()
53+
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html
54+
P.S. max_result_size != None tries to mimic the chunksize behaviour in pandas.read_sql()
55+
:param path: AWS S3 path (E.g. S3://BUCKET_NAME/KEY_NAME)
56+
:param max_result_size: Max number of bytes on each request to S3
57+
:param header: Same as pandas.read_csv()
58+
:param names: Same as pandas.read_csv()
59+
:param dtype: Same as pandas.read_csv()
60+
:param sep: Same as pandas.read_csv()
61+
:param lineterminator: Same as pandas.read_csv()
62+
:param quotechar: Same as pandas.read_csv()
63+
:param quoting: Same as pandas.read_csv()
64+
:param escapechar: Same as pandas.read_csv()
65+
:param parse_dates: Same as pandas.read_csv()
66+
:param infer_datetime_format: Same as pandas.read_csv()
67+
:param encoding: Same as pandas.read_csv()
68+
:return: Pandas Dataframe or Iterator of Pandas Dataframes if max_result_size != None
69+
"""
4970
bucket_name, key_path = self._parse_path(path)
50-
s3_client = self._session.boto3_session.client(
71+
client_s3 = self._session.boto3_session.client(
5172
service_name="s3",
5273
use_ssl=True,
5374
config=self._session.botocore_config)
75+
if max_result_size:
76+
ret = Pandas._read_csv_iterator(
77+
client_s3=client_s3,
78+
bucket_name=bucket_name,
79+
key_path=key_path,
80+
max_result_size=max_result_size,
81+
header=header,
82+
names=names,
83+
dtype=dtype,
84+
sep=sep,
85+
lineterminator=lineterminator,
86+
quotechar=quotechar,
87+
quoting=quoting,
88+
escapechar=escapechar,
89+
parse_dates=parse_dates,
90+
infer_datetime_format=infer_datetime_format,
91+
encoding=encoding)
92+
else:
93+
ret = Pandas._read_csv_once(
94+
client_s3=client_s3,
95+
bucket_name=bucket_name,
96+
key_path=key_path,
97+
header=header,
98+
names=names,
99+
dtype=dtype,
100+
sep=sep,
101+
lineterminator=lineterminator,
102+
quotechar=quotechar,
103+
quoting=quoting,
104+
escapechar=escapechar,
105+
parse_dates=parse_dates,
106+
infer_datetime_format=infer_datetime_format,
107+
encoding=encoding)
108+
return ret
109+
110+
@staticmethod
111+
def _read_csv_iterator(
112+
client_s3,
113+
bucket_name,
114+
key_path,
115+
max_result_size=200_000_000, # 200 MB
116+
header="infer",
117+
names=None,
118+
dtype=None,
119+
sep=",",
120+
lineterminator="\n",
121+
quotechar='"',
122+
quoting=0,
123+
escapechar=None,
124+
parse_dates=False,
125+
infer_datetime_format=False,
126+
encoding="utf-8",
127+
):
128+
"""
129+
Read CSV file from AWS S3 using optimized strategies.
130+
Try to mimic as most as possible pandas.read_csv()
131+
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html
132+
:param client_s3: Boto3 S3 client object
133+
:param bucket_name: S3 bucket name
134+
:param key_path: S3 key path (W/o bucket)
135+
:param max_result_size: Max number of bytes on each request to S3
136+
:param header: Same as pandas.read_csv()
137+
:param names: Same as pandas.read_csv()
138+
:param dtype: Same as pandas.read_csv()
139+
:param sep: Same as pandas.read_csv()
140+
:param lineterminator: Same as pandas.read_csv()
141+
:param quotechar: Same as pandas.read_csv()
142+
:param quoting: Same as pandas.read_csv()
143+
:param escapechar: Same as pandas.read_csv()
144+
:param parse_dates: Same as pandas.read_csv()
145+
:param infer_datetime_format: Same as pandas.read_csv()
146+
:param encoding: Same as pandas.read_csv()
147+
:return: Pandas Dataframe
148+
"""
149+
metadata = s3.S3.head_object_with_retry(client=client_s3,
150+
bucket=bucket_name,
151+
key=key_path)
152+
logger.debug(f"metadata: {metadata}")
153+
total_size = metadata["ContentLength"]
154+
logger.debug(f"total_size: {total_size}")
155+
if total_size <= 0:
156+
raise EmptyS3Object(metadata)
157+
else:
158+
bounders = calculate_bounders(num_items=total_size,
159+
max_size=max_result_size)
160+
logger.debug(f"bounders: {bounders}")
161+
bounders_len = len(bounders)
162+
count = 0
163+
forgotten_bytes = 0
164+
cols_names = None
165+
for ini, end in bounders:
166+
count += 1
167+
ini -= forgotten_bytes
168+
end -= 1 # Range is inclusive, contrary to Python's List
169+
bytes_range = "bytes={}-{}".format(ini, end)
170+
logger.debug(f"bytes_range: {bytes_range}")
171+
body = client_s3.get_object(Bucket=bucket_name, Key=key_path, Range=bytes_range)["Body"]\
172+
.read()\
173+
.decode(encoding, errors="ignore")
174+
chunk_size = len(body)
175+
logger.debug(f"chunk_size: {chunk_size}")
176+
if body[0] == lineterminator:
177+
first_char = 1
178+
else:
179+
first_char = 0
180+
if (count == 1) and (count == bounders_len):
181+
last_break_line_idx = chunk_size
182+
elif count == 1: # first chunk
183+
last_break_line_idx = body.rindex(lineterminator)
184+
forgotten_bytes = chunk_size - last_break_line_idx
185+
elif count == bounders_len: # Last chunk
186+
header = None
187+
names = cols_names
188+
last_break_line_idx = chunk_size
189+
else:
190+
header = None
191+
names = cols_names
192+
last_break_line_idx = body.rindex(lineterminator)
193+
forgotten_bytes = chunk_size - last_break_line_idx
194+
df = pandas.read_csv(
195+
StringIO(body[first_char:last_break_line_idx]),
196+
header=header,
197+
names=names,
198+
sep=sep,
199+
quotechar=quotechar,
200+
quoting=quoting,
201+
escapechar=escapechar,
202+
parse_dates=parse_dates,
203+
infer_datetime_format=infer_datetime_format,
204+
lineterminator=lineterminator,
205+
dtype=dtype,
206+
encoding=encoding,
207+
)
208+
yield df
209+
if count == 1: # first chunk
210+
cols_names = df.columns
211+
212+
@staticmethod
213+
def _read_csv_once(
214+
client_s3,
215+
bucket_name,
216+
key_path,
217+
header="infer",
218+
names=None,
219+
dtype=None,
220+
sep=",",
221+
lineterminator="\n",
222+
quotechar='"',
223+
quoting=0,
224+
escapechar=None,
225+
parse_dates=False,
226+
infer_datetime_format=False,
227+
encoding=None,
228+
):
229+
"""
230+
Read CSV file from AWS S3 using optimized strategies.
231+
Try to mimic as most as possible pandas.read_csv()
232+
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_csv.html
233+
:param client_s3: Boto3 S3 client object
234+
:param bucket_name: S3 bucket name
235+
:param key_path: S3 key path (W/o bucket)
236+
:param header: Same as pandas.read_csv()
237+
:param names: Same as pandas.read_csv()
238+
:param dtype: Same as pandas.read_csv()
239+
:param sep: Same as pandas.read_csv()
240+
:param lineterminator: Same as pandas.read_csv()
241+
:param quotechar: Same as pandas.read_csv()
242+
:param quoting: Same as pandas.read_csv()
243+
:param escapechar: Same as pandas.read_csv()
244+
:param parse_dates: Same as pandas.read_csv()
245+
:param infer_datetime_format: Same as pandas.read_csv()
246+
:param encoding: Same as pandas.read_csv()
247+
:return: Pandas Dataframe
248+
"""
54249
buff = BytesIO()
55-
s3_client.download_fileobj(bucket_name, key_path, buff)
250+
client_s3.download_fileobj(Bucket=bucket_name,
251+
Key=key_path,
252+
Fileobj=buff)
56253
buff.seek(0),
57254
dataframe = pandas.read_csv(
58255
buff,
@@ -84,8 +281,9 @@ def read_sql_athena(self, sql, database, s3_output=None):
84281
query=sql, database=database, s3_output=s3_output)
85282
query_response = self._session.athena.wait_query(
86283
query_execution_id=query_execution_id)
87-
if query_response.get("QueryExecution").get("Status").get(
88-
"State") in ["FAILED", "CANCELLED"]:
284+
if query_response.get("QueryExecution").get("Status").get("State") in [
285+
"FAILED", "CANCELLED"
286+
]:
89287
reason = (query_response.get("QueryExecution").get("Status").get(
90288
"StateChangeReason"))
91289
message_error = f"Query error: {reason}"

awswrangler/s3.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ def list_objects(self, path):
225225
stop=tenacity.stop_after_attempt(max_attempt_number=15),
226226
reraise=True,
227227
)
228-
def _head_object_with_retry(client, bucket, key):
228+
def head_object_with_retry(client, bucket, key):
229229
return client.head_object(Bucket=bucket, Key=key)
230230

231231
@staticmethod
@@ -237,10 +237,10 @@ def _get_objects_head_remote(send_pipe, session_primitives, objects_paths):
237237
logger.debug(f"len(objects_paths): {len(objects_paths)}")
238238
for object_path in objects_paths:
239239
bucket, key = object_path.replace("s3://", "").split("/", 1)
240-
res = S3._head_object_with_retry(client=client,
241-
bucket=bucket,
242-
key=key)
243-
size = res.get("ContentLength")
240+
res = S3.head_object_with_retry(client=client,
241+
bucket=bucket,
242+
key=key)
243+
size = res["ContentLength"]
244244
objects_sizes[object_path] = size
245245
logger.debug(f"len(objects_sizes): {len(objects_sizes)}")
246246
send_pipe.send(objects_sizes)

testing/test_awswrangler/test_pandas.py

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -48,13 +48,27 @@ def database(cloudformation_outputs):
4848
yield database
4949

5050

51-
def test_read_csv(session, bucket):
52-
boto3.client("s3").upload_file("data_samples/small.csv", bucket,
53-
"data_samples/small.csv")
54-
path = f"s3://{bucket}/data_samples/small.csv"
51+
@pytest.mark.parametrize("sample, row_num", [("data_samples/micro.csv", 30),
52+
("data_samples/small.csv", 100)])
53+
def test_read_csv(session, bucket, sample, row_num):
54+
boto3.client("s3").upload_file(sample, bucket, sample)
55+
path = f"s3://{bucket}/{sample}"
5556
dataframe = session.pandas.read_csv(path=path)
56-
session.s3.delete_objects(path=f"s3://{bucket}/data_samples/")
57-
assert len(dataframe.index) == 100
57+
session.s3.delete_objects(path=path)
58+
assert len(dataframe.index) == row_num
59+
60+
61+
@pytest.mark.parametrize("sample, row_num", [("data_samples/micro.csv", 30),
62+
("data_samples/small.csv", 100)])
63+
def test_read_csv_iterator(session, bucket, sample, row_num):
64+
boto3.client("s3").upload_file(sample, bucket, sample)
65+
path = f"s3://{bucket}/{sample}"
66+
dataframe_iter = session.pandas.read_csv(path=path, max_result_size=200)
67+
total_count = 0
68+
for dataframe in dataframe_iter:
69+
total_count += len(dataframe.index)
70+
session.s3.delete_objects(path=path)
71+
assert total_count == row_num
5872

5973

6074
@pytest.mark.parametrize(

0 commit comments

Comments
 (0)